# include "host/telnet.h" # include "dgd.h" # include "str.h" # include "array.h" # include "object.h" # include "interpret.h" # include "data.h" # include "comm.net.h" typedef struct _user_ { object *obj; /* associated object */ connection *conn; /* connection */ int inbufsz; /* bytes in input buffer */ int outbufsz; /* bytes in output buffer */ int optbufsz; /* bytes in option buffer */ int extrabufsz; /* bytes in extra buffer */ int newlines; /* telnet: # of newlines in input buffer */ short flags; /* connection flags */ char tflags; /* telnet negotiation flags */ char state; /* telnet state */ char *inbuf; /* input buffer */ char *outbuf; /* output buffer */ char *optbuf; /* buffer for telnet options */ char *extrabuf; /* buffer to hold overflow from outbuf */ char *extrabufp; /* points to start of data in extrabuf */ } user; /* flags */ # define CF_TELNET 0x0001 /* telnet connection */ # define CF_PORT 0x0002 /* port connection */ # define CF_DATAGRAM 0x0004 /* datagram connection */ # define CF_CONNECTING 0x0008 /* connection being initiated */ # define CF_WAITING 0x0010 /* waiting for write select */ # define CF_CONFIRM 0x0020 /* remember to call message_done() */ # define CF_ERROR 0x0040 /* connection no longer usable */ # define CF_EAGER 0x0080 /* object was not done with previous input */ # define CF_BLOCKED 0x0100 /* do not handle incoming data/connections */ /* tflags */ # define TF_FAKEECHO 0x01 /* server is pretending to echo input */ # define TF_GA 0x02 /* send GA after prompt */ # define TF_SEENCR 0x04 /* just seen a CR */ # define TF_LINEMODE 0x08 /* confirmed LINEMODE */ /* state */ # define TS_DATA 0 # define TS_IAC 1 # define TS_DO 2 # define TS_DONT 3 # define TS_WILL 4 # define TS_WONT 5 # define TS_SB 6 # define TS_SE 7 static user **users; /* array of users */ static int maxusers; /* max # of users */ static int nusers; /* # of users */ static int nports; /* # of ports */ static int limusers; /* highest used user index + 1 */ static int newlines; /* # of newlines in all input buffers */ static int waiting; /* # of waiting users */ static int neager; /* # of eager users */ static int callbacks; /* true if comm_confirm() has work to do */ static int flush; /* true if comm_flush() should flush */ static object *this_user; /* current user */ /* * NAME: comm->init() * DESCRIPTION: initialize communications */ void comm_init(cf_users) int cf_users; { register int i; register user **usr; conn_init(cf_users); users = ALLOC(user*, maxusers = cf_users); for (i = cf_users, usr = users; i > 0; --i, usr++) { *usr = (user *) NULL; } } /* * NAME: comm->finish() * DESCRIPTION: terminate connections */ void comm_finish() { comm_flush(); conn_finish(); } /* * NAME: comm->wait() * DESCRIPTION: mark or unmark a user as waiting for write select */ static void comm_wait(usr, wait) user *usr; int wait; { if (wait) { if (!(usr->flags & CF_WAITING)) { usr->flags |= CF_WAITING; waiting++; conn_wait(usr->conn, wait); } } else { if (usr->flags & CF_WAITING) { usr->flags &= ~CF_WAITING; waiting--; conn_wait(usr->conn, wait); } } } /* * NAME: comm->new() * DESCRIPTION: allocate and init a new user struct (assumes nusers < maxusers) */ static void comm_new(obj, conn, flags) object *obj; connection *conn; int flags; { register user *usr; register int n, size; if (obj->flags & (O_USER | O_EDITOR)) { conn_del(conn); error("User object is already used for user or editor"); } if (flags & CF_PORT) { size = sizeof(user); } else if (flags & CF_TELNET) { size = sizeof(user) + INBUF_SIZE + OUTBUF_SIZE + OPTBUF_SIZE; } else { size = sizeof(user) + OUTBUF_SIZE; } for (n = 0; users[n] != (user *) NULL; n++) ; m_static(); usr = users[n] = (user *)ALLOC(char, size); m_dynamic(); usr->obj = obj; obj->flags |= O_USER; obj->etabi = n; usr->inbufsz = 0; usr->outbufsz = 0; usr->optbufsz = 0; usr->extrabufsz = 0; usr->newlines = 0; usr->conn = conn; usr->flags = flags; usr->tflags = 0; usr->extrabuf = (char *) NULL; usr->extrabufp = (char *) NULL; if (flags & CF_PORT) { usr->inbuf = (char *) NULL; usr->outbuf = (char *) NULL; nports++; } else if (flags & CF_TELNET) { usr->inbuf = (char *) usr + sizeof(user); usr->outbuf = usr->inbuf + INBUF_SIZE; usr->optbuf = usr->outbuf + OPTBUF_SIZE; usr->state = TS_DATA; } else { usr->inbuf = (char *) NULL; usr->outbuf = (char *) usr + sizeof(user); } nusers++; if (n + 1 > limusers) { limusers = n + 1; } } /* * NAME: comm->del() * DESCRIPTION: delete a connection */ static void comm_del(f, usr, force) register frame *f; register user **usr; bool force; { object *obj, *olduser; comm_wait(*usr, 0); conn_del((*usr)->conn); if (!((*usr)->flags & CF_BLOCKED)) { if ((*usr)->flags & CF_TELNET) { newlines -= (*usr)->newlines; } if ((*usr)->flags & CF_EAGER) { neager--; } } nports -= ((*usr)->flags & CF_PORT) != 0; obj = (*usr)->obj; obj->flags &= ~O_USER; if ((*usr)->extrabuf) { FREE((*usr)->extrabuf); } FREE(*usr); *usr = (user *) NULL; --nusers; while (limusers > 0 && users[limusers-1] == (user *) NULL) { limusers--; } olduser = this_user; this_user = obj; if (ec_push((ec_ftn) NULL)) { if (obj == olduser) { this_user = (object *) NULL; } else { this_user = olduser; } error((char *) NULL); } else { (--f->sp)->type = T_INT; f->sp->u.number = force; if (i_call(f, obj, "close", 5, TRUE, 1)) { i_del_value(f->sp++); } if (obj == olduser) { this_user = (object *) NULL; } else { this_user = olduser; } ec_pop(); } } /* * NAME: comm->listen() * DESCRIPTION: have an object listen to a port */ void comm_listen(f, obj, port, protocol) frame *f; object *obj; Int port; int protocol; { register connection *conn; int flags; object *olduser; if (nusers >= maxusers) error("Max number of connection/port objects exceeded"); if (protocol == PRC_TELNET) { flags = CF_TELNET | CF_PORT; protocol = PRC_TCP; } else if (protocol == PRC_UDP) { flags = CF_DATAGRAM | CF_PORT; } else { flags = CF_PORT; } conn = conn_listen(port, protocol); if (conn != (connection *) NULL) { comm_new(obj, conn, flags); } else { error("Error opening port"); } olduser = this_user; this_user = obj; (--f->sp)->type = T_INT; f->sp->u.number = conn_port(conn); if (i_call(f, this_user, "open", 4, TRUE, 1)) { i_del_value(f->sp++); } this_user = olduser; } /* * NAME: comm->connect() * DESCRIPTION: initiate a telnet or tcp connection */ void comm_connect(obj, host, port, protocol) object *obj; char *host; int port, protocol; { register connection *conn; int flags; if (nusers >= maxusers) error("Max number of connection/port objects exceeded"); if (protocol == PRC_TELNET) { flags = CF_TELNET | CF_CONNECTING; protocol = PRC_TCP; } else /* if (protocol == PRC_TCP) */ { flags = CF_CONNECTING; } conn = conn_connect(host, port, protocol); if (conn != (connection *) NULL) { comm_new(obj, conn, flags); } else { error("Bad connect()"); } } /* * NAME: telnet_fill() * DESCRIPTION: process telnet output. */ static Uint telnet_fill(inbuf, inbuflen, outbuf, outbuflen, outbufsize) char *inbuf; char *outbuf; Uint inbuflen, outbuflen; Uint *outbufsize; { register char *p, *q, *outbufend, *inbufend; p = inbuf; q = outbuf; outbufend = outbuf + outbuflen; inbufend = inbuf + inbuflen; while (p < inbufend) { if (UCHAR(*p) == IAC) { /* double the telnet IAC character */ if (q >= outbufend - 1) { break; } *q++ = IAC; *q++ = IAC; p++; } else if (*p == LF) { /* insert CR before LF */ if (q >= outbufend - 1) { break; } *q++ = CR; *q++ = LF; p++; } else if ((*p & 0x7f) < ' ' && *p != HT && *p != BEL && *p != BS && *p != ESC ) { /* illegal character */ p++; } else { if (q == outbufend) { break; } *q++ = *p++; } } *outbufsize += q - outbuf; return p - inbuf; } /* * NAME: comm->write() * DESCRIPTION: try to write out the output buffers */ static void comm_write(usr) register user *usr; { register Int i; if (usr->optbufsz > 0) { i = conn_write(usr->conn, usr->optbuf, usr->optbufsz); if (i < 0) { usr->flags |= CF_ERROR; callbacks = TRUE; return; } else if ((usr->optbufsz -= i) > 0) { char *p, *q; int n; p = usr->optbuf; q = p + i; n = usr->optbufsz; while (n--) { *p++ = *q++; } comm_wait(usr, TRUE); return; } } while (usr->outbufsz > 0) { i = conn_write(usr->conn, usr->outbuf, usr->outbufsz); if (i < 0) { usr->flags |= CF_ERROR; callbacks = TRUE; return; } else if ((usr->outbufsz -= i) > 0) { char *p, *q; int n; p = usr->outbuf; q = p + i; n = usr->outbufsz; while (n--) { *p++ = *q++; } comm_wait(usr, TRUE); return; } if ((usr->flags & CF_TELNET) && usr->extrabufsz > 0) { /* Refill outbuf */ i = telnet_fill(usr->extrabufp, usr->extrabufsz, usr->outbuf, OUTBUF_SIZE, &(usr->outbufsz)); if (i == usr->extrabufsz) { FREE(usr->extrabuf); usr->extrabuf = (char *)NULL; usr->extrabufp = (char *)NULL; usr->extrabufsz = 0; } else { usr->extrabufp += i; usr->extrabufsz -= i; } /* outbufsz can be 0 here if extrabuf consisted solely of * characters skipped by telnet_fill(). */ if (usr->outbufsz == 0) { comm_wait(usr, FALSE); return; } } } /* Only binary connections will reach this statement. */ if (usr->extrabufsz > 0) { i = conn_write(usr->conn, usr->extrabufp, usr->extrabufsz); if (i < 0) { usr->flags |= CF_ERROR; callbacks = TRUE; return; } else if (i < usr->extrabufsz) { usr->extrabufp += i; usr->extrabufsz -= i; comm_wait(usr, TRUE); return; } else { FREE(usr->extrabuf); usr->extrabuf = (char *)NULL; usr->extrabufp = (char *)NULL; usr->extrabufsz = 0; } } comm_wait(usr, FALSE); } /* * NAME: comm->telopt() * DESCRIPTION: send a telnet option negotiation string */ static void comm_telopt(usr, opt, len) user *usr; char *opt; int len; { /* It is possible for options to be dropped if the connection is blocked, * but I expect that to be rare. */ if (usr->optbufsz + len > OPTBUF_SIZE) { comm_write(usr); } if (usr->optbufsz + len <= OPTBUF_SIZE) { memcpy(usr->optbuf + usr->optbufsz, opt, len); usr->optbufsz += len; if (!(usr->flags & CF_WAITING)) { comm_write(usr); } } } /* * NAME: comm->check() * DESCRIPTION: check if previous message has been sent */ int comm_check(obj) object *obj; { register user *usr; usr = users[UCHAR(obj->etabi)]; if (usr->flags & CF_PORT) { error("Object is not a stream connection"); } if (usr->flags & CF_CONNECTING) { error("Connection is not yet ready"); } if (usr->optbufsz > 0 || usr->outbufsz > 0 || usr->extrabufsz > 0 || (usr->flags & CF_ERROR)) { callbacks = TRUE; usr->flags |= CF_CONFIRM; return 1; } return 0; } /* * NAME: comm->send() * DESCRIPTION: send a message to a user */ int comm_send(obj, str) object *obj; string *str; { register user *usr; register char *p; register Uint len, size; usr = users[UCHAR(obj->etabi)]; if (usr->flags & CF_PORT) { error("Object is not a stream connection"); } if (usr->flags & CF_CONNECTING) { error("Connection is not yet ready"); } if (usr->extrabuf) { /* * Discard overflow from previous message. */ FREE(usr->extrabuf); usr->extrabuf = (char *) NULL; usr->extrabufp = (char *) NULL; usr->extrabufsz = 0; comm_wait(usr, 0); } p = str->text; len = str->len; if (len == 0) { return 0; } if (usr->flags & CF_TELNET) { size = telnet_fill(p, len, usr->outbuf + usr->outbufsz, OUTBUF_SIZE - usr->outbufsz, &(usr->outbufsz)); if (len == size) { flush = TRUE; return 0; } len -= size; p += size; } else { if (usr->outbufsz + len <= OUTBUF_SIZE) { memcpy(usr->outbuf + usr->outbufsz, p, len); usr->outbufsz += len; flush = TRUE; return 0; } } if (len < 2048) { size = 2048; } else if (len < 4096) { size = 4096; } else { size = (len & 0xE000) + 8192; /* Round up to multiple of 8192 */ } m_static(); usr->extrabuf = ALLOC(char, size); m_dynamic(); usr->extrabufp = usr->extrabuf; usr->extrabufsz = len; memcpy(usr->extrabuf, p, len); comm_write(usr); if (usr->extrabufsz > 0 || (usr->flags & CF_ERROR)) { callbacks = TRUE; usr->flags |= CF_CONFIRM; return 1; } return 0; } /* * NAME: comm->sendto() * DESCRIPTION: send a datagram */ void comm_sendto(obj, data, host, port) object *obj; string *data; char *host; int port; { register user *usr; usr = users[UCHAR(obj->etabi)]; if (!(usr->flags & CF_DATAGRAM)) { error("Object is not a datagram object"); } conn_sendto(usr->conn, data->text, data->len, host, port); } /* * NAME: comm->echo() * DESCRIPTION: turn on/off input echoing for a user */ void comm_echo(obj, echo) object *obj; int echo; { register user *usr; char buf[3]; usr = users[UCHAR(obj->etabi)]; if (usr->flags & CF_CONNECTING) error("Connection is not yet ready"); if ((usr->flags & CF_TELNET) && !echo != (usr->tflags & TF_FAKEECHO)) { buf[0] = IAC; buf[1] = (echo) ? WONT : WILL; buf[2] = TELOPT_ECHO; comm_telopt(usr, buf, 3); usr->tflags ^= TF_FAKEECHO; } } /* * NAME: comm->add_goahead() * DESCRIPTION: add an IAC GA sequence to the current user's buffer. */ void comm_add_goahead() { register user *usr; register int n, i, size; for (n = 0, i = nusers; i > 0; n++) { usr = users[n]; if (usr != (user *) NULL) { --i; if ((usr->flags & CF_TELNET) && (usr->tflags & TF_GA) && (((size=usr->outbufsz) > 0 && usr->outbuf[size - 1] != LF && !usr->extrabuf) || usr->obj == this_user)) { /* * Append "go ahead" to indicate that the prompt * has been sent. Don't try too hard if it won't * fit. */ if (!(usr->flags & CF_WAITING) && (size >= OUTBUF_SIZE - 2 || usr->extrabuf)) { comm_write(usr); } if (size < OUTBUF_SIZE - 2 && !usr->extrabuf) { usr->outbuf[size++] = IAC; usr->outbuf[size++] = GA; usr->outbufsz += 2; } } } } } /* * NAME: comm->flush() * DESCRIPTION: flush output to all users */ void comm_flush() { register user *usr; register int i, n; if (!flush) return; flush = FALSE; for (n = 0, i = nusers; i > 0; n++) { usr = users[n]; if (usr != (user *) NULL) { --i; if (usr->outbufsz > 0 && !(usr->flags & CF_WAITING)) { comm_write(usr); } } } this_user = (object *) NULL; } /* * NAME: comm->confirm() * DESCRIPTION: call close() and message_done() when appropriate */ void comm_confirm(f) frame *f; { static int n; register user *usr; if (!callbacks) return; callbacks = FALSE; n = 0; while (ec_push((ec_ftn) errhandler)) { n++; } for ( ; n < limusers; n++) { usr = users[n]; if (usr == (user *) NULL) { continue; } if (usr->flags & CF_ERROR) { comm_del(f, &users[n], FALSE); endthread(); } else if ((usr->flags & (CF_CONFIRM|CF_WAITING)) == CF_CONFIRM) { usr->flags &= ~CF_CONFIRM; this_user = usr->obj; if (i_call(f, this_user, "message_done", 12, TRUE, 0)) { i_del_value(f->sp++); endthread(); comm_add_goahead(); comm_flush(); } } } ec_pop(); } /* * NAME: comm->user_call() * DESCRIPTION: call a receive function in a user object */ static void comm_user_call(f, usr, func, args, prompt) register frame *f; register user *usr; char *func; int args; bool prompt; { this_user = usr->obj; if (i_call(f, this_user, func, strlen(func), TRUE, args)) { if (this_user != (object *) NULL && f->sp->type == T_INT && f->sp->u.number != 0) { usr->flags |= CF_EAGER; if (!(usr->flags & CF_BLOCKED)) { neager++; } } i_del_value(f->sp++); if (prompt) comm_add_goahead(); comm_flush(); endthread(); } } /* * NAME: comm->receive() * DESCRIPTION: receive a message from a user */ void comm_receive(f, poll) register frame *f; int poll; { static int lastuser; connection *conn; object *o; register int n; register char *p, *q; this_user = (object *) NULL; n = conn_select(!poll && newlines == 0 && neager == 0); if (n > 0 || newlines > 0 || neager > 0) { static char ayt[] = { CR, LF, '[', 'Y', 'e', 's', ']', CR, LF }; static char mode_edit[] = { IAC, SB, TELOPT_LINEMODE, LM_MODE, MODE_EDIT, IAC, SE }; register user *usr; char buf[TMPBUF_SIZE]; Int size; register int cnt; register int eager; for (n = 0, cnt = waiting; cnt > 0; n++) { usr = users[n]; if (usr != (user *) NULL && (usr->flags & CF_WAITING)) { cnt--; if (conn_writable(usr->conn)) { comm_write(usr); callbacks |= (usr->flags & (CF_WAITING|CF_CONFIRM)) == CF_CONFIRM; } } } if (lastuser >= limusers) { lastuser = 0; } n = lastuser; for (;;) { n = (n + 1) % limusers; usr = users[n]; if (usr != (user *) NULL && !(usr->flags & CF_BLOCKED)) { if ((eager = usr->flags & CF_EAGER)) { neager--; usr->flags &= ~CF_EAGER; } if (usr->flags & CF_CONNECTING) { int r; r = conn_connected(usr->conn); if (r == 1) { usr->flags &= ~CF_CONNECTING; this_user = usr->obj; if (i_call(f, this_user, "open", 4, TRUE, 0)) { i_del_value(f->sp++); comm_add_goahead(); comm_flush(); endthread(); } } else if (r == -1) { comm_del(f, &users[n], FALSE); endthread(); } } else if (usr->flags & CF_DATAGRAM) { size = conn_recvfrom(usr->conn, buf, TMPBUF_SIZE); if (size < 0) { comm_del(f, &users[n], FALSE); endthread(); } else if (size > 0) { lastuser = n; (--f->sp)->type = T_STRING; str_ref(f->sp->u.string = str_new(buf, (long) size)); (--f->sp)->type = T_STRING; str_ref(f->sp->u.string = conn_ipnum(usr->conn)); (--f->sp)->type = T_INT; f->sp->u.number = conn_port(usr->conn); comm_user_call(f, usr, "receive_datagram", 3, FALSE); return; } else if (eager) { lastuser = n; this_user = usr->obj; comm_user_call(f, usr, "process_datagram", 0, FALSE); return; } } else if (usr->flags & CF_PORT) { if (nusers < maxusers) { conn = conn_accept(usr->conn); if (conn != (connection *) NULL) { if (ec_push((ec_ftn) NULL)) { conn_del(conn); error((char *) NULL); /* pass on error */ } (--f->sp)->type = T_STRING; str_ref(f->sp->u.string = conn_ipnum(conn)); (--f->sp)->type = T_INT; f->sp->u.number = conn_port(conn); /* Problem: during port->connection(), * nusers does not match number of allocated * connection structs. */ this_user = usr->obj; if (!i_call(f, this_user, "connection", 10, TRUE, 2)) { error("missing connection() function"); } if (f->sp->type != T_OBJECT) { endthread(); error("connection() did not return an object"); } o = &otable[f->sp->oindex]; f->sp++; endthread(); ec_pop(); comm_new(o, conn, usr->flags & ~CF_PORT); this_user = o; if (i_call(f, this_user, "open", 4, TRUE, 0)) { i_del_value(f->sp++); endthread(); } comm_add_goahead(); comm_flush(); } } } else if (usr->flags & CF_TELNET) { p = usr->inbuf + usr->inbufsz; size = conn_read(usr->conn, p, INBUF_SIZE - usr->inbufsz); if (size < 0) { comm_del(f, &users[n], FALSE); endthread(); continue; } else if (size > 0) { register int state, tflags, nls, seencr; char optbuf[20]; tflags = usr->tflags & ~TF_SEENCR; seencr = usr->tflags & TF_SEENCR; state = usr->state; nls = usr->newlines; q = p; while (size > 0) { switch (state) { case TS_DATA: switch (UCHAR(*p)) { case 0: seencr = 0; break; case IAC: state = TS_IAC; break; case BS: case 0x7f: if (q != usr->inbuf && q[-1] != LF) { --q; } seencr = 0; break; case CR: nls++; newlines++; *q++ = LF; seencr = TF_SEENCR; break; case LF: if (seencr) { seencr = 0; } else { nls++; newlines++; *q++ = *p; } break; default: *q++ = *p; seencr = 0; break; } break; case TS_IAC: switch (UCHAR(*p)) { case IAC: *q++ = *p; state = TS_DATA; break; case DO: state = TS_DO; break; case DONT: state = TS_DONT; break; case WILL: state = TS_WILL; break; case WONT: state = TS_WONT; break; case SB: state = TS_SB; break; case AYT: comm_telopt(usr, ayt, 9); state = TS_DATA; break; case EC: if (q != usr->inbuf && q[-1] != LF) { --q; } state = TS_DATA; break; case EL: while (q != usr->inbuf && q[-1] !=LF) { --q; } state = TS_DATA; break; case IP: case DM: q = usr->inbuf; newlines -= nls; nls = 0; state = TS_DATA; break; case BREAK: default: state = TS_DATA; break; } break; case TS_DO: optbuf[0] = IAC; optbuf[2] = UCHAR(*p); switch(UCHAR(*p)) { case TELOPT_SGA: if (tflags & TF_GA) { tflags &= ~TF_GA; optbuf[1] = WILL; comm_telopt(usr, optbuf, 3); } break; case TELOPT_ECHO: if (!(tflags & TF_FAKEECHO)) { optbuf[1] = WONT; comm_telopt(usr, optbuf, 3); } break; default: optbuf[1] = WONT; comm_telopt(usr, optbuf, 3); break; } state = TS_DATA; break; case TS_DONT: if (UCHAR(*p) == TELOPT_SGA && !(tflags & TF_GA)) { tflags |= TF_GA; optbuf[0] = IAC; optbuf[1] = WONT; optbuf[2] = TELOPT_SGA; comm_telopt(usr, optbuf, 3); } state = TS_DATA; break; case TS_WILL: optbuf[0] = IAC; optbuf[2] = UCHAR(*p); if (UCHAR(*p) == TELOPT_LINEMODE) { if (!(tflags & TF_LINEMODE)) { tflags |= TF_LINEMODE; optbuf[1] = DO; comm_telopt(usr, optbuf, 3); } comm_telopt(usr, mode_edit, 7); } else { optbuf[1] = DONT; comm_telopt(usr, optbuf, 3); } state = TS_DATA; break; case TS_WONT: if (UCHAR(*p) == TELOPT_LINEMODE) { tflags &= ~TF_LINEMODE; optbuf[0] = IAC; optbuf[1] = DONT; optbuf[2] = TELOPT_LINEMODE; comm_telopt(usr, optbuf, 3); } state = TS_DATA; break; case TS_SB: /* skip to the end */ if (UCHAR(*p) == IAC) { state = TS_SE; } break; case TS_SE: if (UCHAR(*p) == SE) { state = TS_DATA; } else if (UCHAR(*p) == IAC) { /* doubled IAC counts as data */ state = TS_SB; } break; } p++; --size; } usr->tflags = tflags | seencr; usr->state = state; usr->newlines = nls; usr->inbufsz = q - usr->inbuf; } if (usr->newlines > 0) { p = (char *) memchr(usr->inbuf, LF, usr->inbufsz); --newlines; --(usr->newlines); lastuser = n; size = n = p - usr->inbuf; if (size != 0) { memcpy(buf, usr->inbuf, size); } p++; /* skip \n */ n++; usr->inbufsz -= size + 1; for (q = usr->inbuf, n = usr->inbufsz; n > 0; --n) { *q++ = *p++; } (--f->sp)->type = T_STRING; str_ref(f->sp->u.string = str_new(buf, (long) size)); comm_user_call(f, usr, "receive_message", 1, TRUE); return; } else if (usr->inbufsz == INBUF_SIZE) { /* * input buffer full */ lastuser = n; (--f->sp)->type = T_STRING; str_ref(f->sp->u.string = str_new(usr->inbuf, (long) INBUF_SIZE)); usr->inbufsz = 0; comm_user_call(f, usr, "receive_message", 1, TRUE); return; } else if (eager) { lastuser = n; comm_user_call(f, usr, "process_message", 0, TRUE); return; } } else { /* * tcp (binary) connection */ size = conn_read(usr->conn, buf, TMPBUF_SIZE); if (size < 0) { comm_del(f, &users[n], FALSE); endthread(); } else if (size > 0) { lastuser = n; (--f->sp)->type = T_STRING; str_ref(f->sp->u.string = str_new(buf, (long) size)); comm_user_call(f, usr, "receive_message", 1, FALSE); return; } else if (eager) { lastuser = n; comm_user_call(f, usr, "process_message", 0, FALSE); return; } } } if (n == lastuser || lastuser >= limusers) { return; } } } } /* * NAME: comm->ip_number() * DESCRIPTION: return the ip number of a user (as a string) */ string *comm_ip_number(obj) object *obj; { register user *usr; usr = users[UCHAR(obj->etabi)]; if (usr->flags & CF_PORT) { return (string *)NULL; } else { return conn_ipnum(usr->conn); } } /* * NAME: comm->close() * DESCRIPTION: remove a user */ void comm_close(f, obj) frame *f; object *obj; { register user **usr; usr = &users[UCHAR(obj->etabi)]; if ((*usr)->outbufsz != 0) { /* * flush last bit of output */ conn_write((*usr)->conn, (*usr)->outbuf, (*usr)->outbufsz); } comm_del(f, usr, TRUE); } /* * NAME: comm->user() * DESCRIPTION: return the current user */ object *comm_user() { return this_user; } /* * NAME: comm->ports() * DESCRIPTION: return an array with all port objects */ array *comm_ports(data) dataspace *data; { array *a; register int i; register user **usr; register value *v; a = arr_new(data, (long) nports); v = a->elts; for (i = nports, usr = users; i > 0; usr++) { if (*usr != (user *) NULL && ((*usr)->flags & CF_PORT)) { v->type = T_OBJECT; v->oindex = (*usr)->obj->index; v->u.objcnt = (*usr)->obj->count; v++; --i; } } return a; } /* * NAME: comm->connections() * DESCRIPTION: return an array with all connection objects */ array *comm_connections(data) dataspace *data; { array *a; register int i; register user **usr; register value *v; a = arr_new(data, (long) (nusers - nports)); v = a->elts; for (i = nusers - nports, usr = users; i > 0; usr++) { if (*usr != (user *) NULL && !((*usr)->flags & CF_PORT)) { v->type = T_OBJECT; v->oindex = (*usr)->obj->index; v->u.objcnt = (*usr)->obj->count; v++; --i; } } return a; } /* * NAME: comm->block() * DESCRIPTION: suspend or release input from a connection or port */ void comm_block(obj, flag) object *obj; int flag; { register user *usr; usr = users[UCHAR(obj->etabi)]; if (flag) { if (!(usr->flags & CF_BLOCKED)) { conn_block(usr->conn, flag); usr->flags |= CF_BLOCKED; if (usr->flags & CF_EAGER) { neager--; } newlines -= usr->newlines; } } else { if (usr->flags & CF_BLOCKED) { conn_block(usr->conn, flag); usr->flags &= ~CF_BLOCKED; if (usr->flags & CF_EAGER) { neager++; } newlines += usr->newlines; } } } /* * NAME: comm->active() * DESCRIPTION: return TRUE if there is any pending comm activity */ bool comm_active() { return newlines != 0 || neager != 0 || conn_select(0) > 0; }