/* Copyright (c) 1993 Stephen F. White */
#include "cool.h"
#include "proto.h"
#include "netio.h"
#include "servers.h"
#include "execute.h"
#define ISREPLY(message) (!strcasecmp(message->str, "return") \
|| !strcasecmp(message->str, "raise") \
|| !strcasecmp(message->str, "halt"))
typedef struct Message Message;
struct Message {
int msgid;
int age;
int ticks;
Objid player;
Objid from;
Objid to;
String *message;
List *args;
Objid on;
Message *prev;
Message *next;
};
Event *event_head, *event_tail;
Message *msg_head, *msg_tail;
static Message *find_reply (Event * e);
static int find_lock (Object * o, const char *name, int msgid);
static int process_event (Event * e, Timeval cur_time, Timeval * timeout);
static int process_message_event (Event * e, Timeval cur_time,
Timeval * timeout);
static int process_sys_message_event (Event * e, Timeval cur_time,
Timeval * timeout);
static int process_lock_event (Event * e, Timeval cur_time,
Timeval * timeout);
static int process_timer_event (Event * e, Timeval cur_time,
Timeval * timeout);
static int next_msgid = 0;
static void event_free_assoc (Event * e);
static void msg_add (Message * m), msg_rm (Message * m);
static String *compose_var (String * st, Var v);
static String *compose_list (String * st, List * list);
static String *compose_map (String * st, Map * map);
static void do_parse(Playerid player, int msg, List *args);
/*
* process_queues()
*
* Process the message and event queues. Any incoming reply messages are
* checked against waiting events. If a matching event is found,
* the event is resumed and removed. If not, the message is discarded.
* Each incoming non-reply message calls call_method().
* The time value pointed to by 'timeout' is updated to reflect the next time
* that the server must call process_queues() (for timeouts, retrys, etc)
*/
void process_queues (Timeval cur_time, Timeval * timeout)
{
Event *e, *enext;
Message *m, *mnext;
Timeval newtime;
int events_handled, msgs_handled;
do { /* continue both loops while there are any messages */
msgs_handled = 0;
events_handled = 0;
for (e = event_head; e; e = enext) {
enext = e->next;
if (process_event (e, cur_time, timeout)) {
event_rm (e);
events_handled++;
} else {
newtime = timer_sub (e->timeout_at, cur_time);
*timeout = timercmp (&newtime, timeout, <)? newtime : *timeout;
}
}
for (m = msg_head; m; m = mnext) { /* process all messages */
mnext = m->next;
if (!ISREPLY (m->message)) {
if (!valid (m->to)) {
send_message (m->msgid, m->age, m->ticks, m->player, m->to,
m->from, sym_sys (RAISE), make_raise_args (E_OBJNF), 0, m->from);
} else if (!strcasecmp (m->message->str, "call_verb")) {
Error r = call_verb (m->msgid, m->age, m->ticks, m->player,
m->from, m->to, m->args);
if (r == E_VERBNF) {
send_message (m->msgid, m->age, m->ticks, m->player,
m->to, m->from, sym_sys (RETURN), list_dup (one), 0, m->from);
} else if (r != E_NONE) {
send_message (m->msgid, m->age, m->ticks, m->player,
m->to, m->from, sym_sys (RAISE), make_raise_args (r), 0,
m->from);
}
} else if (call_method (m->msgid, m->age, m->ticks, m->player,
m->from, m->to, m->message->str, m->args, m->on) != E_NONE) {
send_message (m->msgid, m->age, m->ticks, m->player, m->to,
m->from, sym_sys (RAISE),
make_raise_args (E_METHODNF), 0, m->from);
}
msg_rm (m);
msgs_handled++;
}
}
} while (msgs_handled || events_handled);
/*
* remove orphaned messages
*/
for (m = msg_head; m; m = mnext) { /* process all messages */
mnext = m->next;
if (ISREPLY (m->message)) {
msg_rm (m);
}
}
cache_reset ();
if (timeout->tv_sec < 0) {
timeout->tv_sec = 0;
}
}
static int process_event (Event * e, Timeval cur_time, Timeval * timeout)
{
List *args;
switch (e->blocked_on) {
case BL_MESSAGE:
return process_message_event (e, cur_time, timeout);
case BL_LOCK:
return process_lock_event (e, cur_time, timeout);
case BL_TIMER:
return process_timer_event (e, cur_time, timeout);
case BL_SYS_MESSAGE:
return process_sys_message_event (e, cur_time, timeout);
case BL_DEAD:
args = make_raise_args (E_TERM);
resume_method_raise (e, args); /****/
list_free (args);
return 1;
}
return 0;
}
static int
process_message_event (Event * e, Timeval cur_time, Timeval * timeout)
{
Message *m = find_reply (e);
Timeval newtime;
if (m) {
e->ticks = m->ticks;
if (!strcasecmp (m->message->str, "return")) {
resume_method_return (e, m->args->el[0]);
} else if (!strcasecmp (m->message->str, "raise")) {
resume_method_raise (e, m->args);
} else if (!strcasecmp (m->message->str, "halt")) {
resume_method_halt (e);
}
msg_rm (m);
return 1;
} else if (e->blocked_objid.server) { /* if blocked on remote msg */
if (timercmp (&cur_time, &e->timeout_at, >)) {
List *args = make_raise_args (E_TIMEOUT);
resume_method_raise (e, args); /****/
list_free (args);
return 1;
} else if (timercmp (&cur_time, &e->retry_at, >)) {
#ifdef EXP_BACKOFF
e->retry_interval *= 2;
#endif
e->retry_at = timer_addmsec (cur_time, e->retry_interval);
if (yo (e->blocked_objid.server, e->msg->str)) {
List *args = make_raise_args (E_TIMEOUT);
resume_method_raise (e, args); /****/
list_free (args);
return 1;
}
}
newtime = timer_sub (e->retry_at, cur_time);
*timeout = timercmp (&newtime, timeout, <)? newtime : *timeout;
return 0;
} else {
return 0;
}
}
static int
process_sys_message_event (Event * e, Timeval cur_time, Timeval * timeout)
{
Message *m = find_reply (e);
if (m) {
switch (e->msgid) {
case PARSE:
if (!strcasecmp(m->message->str, "raise")) {
tell(m->player.id, m->args->el[1].v.str->str, 1 );
}
break;
}
msg_rm (m);
return 1;
} else if (timercmp (&cur_time, &e->timeout_at, >)) {
tell(e->player.id, "Parse timed out; programmer error.", 1);
return 1;
} else {
return 0;
}
}
static int process_lock_event (Event * e, Timeval cur_time, Timeval * timeout)
{
if (!find_lock (retrieve (e->this), e->lock->str, e->msgid)) {
resume_method (e);
return 1;
} else if (timercmp (&cur_time, &e->timeout_at, >)) {
List *args = make_raise_args (E_TIMEOUT);
resume_method_raise (e, args); /****/
list_free (args);
return 1;
} else {
return 0;
}
}
static int
process_timer_event (Event * e, Timeval cur_time, Timeval * timeout)
{
if (timercmp (&cur_time, &e->timeout_at, >)) {
resume_method (e);
return 1;
} else {
return 0;
}
}
static Message *find_reply (Event * e)
{
Message *m;
for (m = msg_head; m; m = m->next) {
if (m->msgid == e->blocked_msgid && ISREPLY (m->message)) {
return m;
}
}
return 0;
}
static int find_lock (Object * o, const char *name, int msgid)
{
Lock *l;
for (l = o->locks; l; l = l->next) {
if (!strcasecmp (l->name->str, name)) {
if (l->added_by == msgid) { /* our lock comes first */
return 0;
} else { /* some other lock comes first */
return 1;
}
}
}
return 0;
}
/*
* send_message() - da big one
*
* Send a message from one object to another. If the object is local,
* the message is queued directly. If the object is remote, the message
* is converted to ASCII and sent out the network port. In either case,
* the "msg" and "args" arguments are eaten (ie., they must be allocated
* prior to the call). If msgid is negative, a new msgid is generated
* for this message (ie., it's an original message). Otherwise, msgid
* is used (it's a response). If the server requested is down, E_SERVERDN
* is returned. If the age exceeds max_age, E_MAXREC is returned.
*/
Error
send_message (int msgid, int age, int ticks, Objid player, Objid from,
Objid to, String * msg, List * args, Event * e, Objid on)
{
String *message;
Message *m;
if (msgid < 0) {
if (age >= max_age && max_age > 0) {
string_free(msg);
list_free(args);
return E_MAXREC;
}
msgid = next_msgid++;
}
/*
* if it's a local message, queue it directly
*/
if (!to.server) {
m = MALLOC (Message, 1);
m->from = from;
m->to = to;
m->player = player;
m->message = msg;
m->args = args;
m->msgid = msgid;
m->age = age;
m->ticks = ticks;
m->on = on;
msg_add (m);
} else {
message = string_new (0);
message = string_catnum (message, msgid);
message = string_catc (message, ' ');
message = string_catnum (message, age);
message = string_catc (message, ' ');
message = string_catnum (message, ticks);
message = string_catc (message, ' ');
message = string_catobj (message, player, 1);
message = string_catc (message, ' ');
message = string_catobj (message, from, 1);
message = string_catc (message, ' ');
message = string_catobj (message, to, 1);
message = string_cat (message, " \"");
message = string_backslash (message, msg->str);
message = string_cat (message, "\" ");
message = compose_list (message, args);
message = string_catc (message, '\n');
string_free (msg);
list_free (args);
#ifdef LOG_YO
writelog ();
fprintf (stderr, "SENT: %s", message->str);
#endif /* LOG_YO */
if (yo (to.server, message->str)) {
string_free (message);
return E_SERVERDN;
} else if (e) {
e->msg = message;
} else {
string_free (message);
}
}
if (e) {
e->blocked_on = BL_MESSAGE;
e->blocked_msgid = msgid;
e->blocked_objid = to;
event_add (e);
}
return E_NONE;
}
/*
* msg_add()
*
* Add a message to the message queue
*/
static void msg_add (Message * m)
{
m->next = m->prev = 0;
if (!msg_head) {
msg_head = msg_tail = m;
} else {
msg_tail->next = m;
m->prev = msg_tail;
msg_tail = m;
}
}
/*
* event_add()
*
* Add an event to the event queue.
*/
void event_add (Event * e)
{
e->next = e->prev = 0;
if (!event_head) {
event_head = event_tail = e;
} else {
event_tail->next = e;
e->prev = event_tail;
event_tail = e;
}
}
/*
* event_rm()
*
* Removes an event from the event queue.
* Frees the space used by the event.
*/
void event_rm (Event * e)
{
if (e->next) {
e->next->prev = e->prev;
} else {
event_tail = e->prev;
}
if (e->prev) {
e->prev->next = e->next;
} else {
event_head = e->next;
}
event_free_assoc (e);
FREE (e);
}
/*
* event_free_assoc()
*
* Frees the space associated with an Event. Does not free the event itself.
*/
static void event_free_assoc (Event * e)
{
if (e->blocked_on == BL_LOCK) {
string_free (e->lock);
} else if (e->blocked_on == BL_MESSAGE && e->msg) {
string_free (e->msg);
}
}
/*
* msg_rm()
*
* Remove a message from the message queue.
* Frees the space used by the message structure, its message, and its
* arguments.
*/
static void msg_rm (Message * m)
{
if (m->next) {
m->next->prev = m->prev;
} else {
msg_tail = m->prev;
}
if (m->prev) {
m->prev->next = m->next;
} else {
msg_head = m->next;
}
list_free (m->args);
string_free (m->message);
FREE (m);
}
Message *parse_message (const char *msg);
/*
* receive_message()
*
* Receive and parse a message from a remote server. If the message is
* parseable, it is added to the queue. Otherwise, an error is written
* to the log, and the message is discarded.
*/
int receive_message (Serverid server, const char *msg)
{
Message *m;
if ((m = parse_message (msg))) {
#ifdef LOG_YO
writelog ();
fprintf (stderr, "RECD: %s", msg);
#endif /* LOG_YO */
if (!ISREPLY (m->message)
&& serv_discardmsg (m->from.server, m->msgid)) {
list_free (m->args);
string_free (m->message);
FREE (m);
} else {
msg_add (m);
}
return 0;
} else {
writelog ();
fprintf (stderr, "UNPARSEABLE msg from server #%d: %s\n", server, msg);
return 1;
}
}
/*
* parse()
*
* Receive a command from a player, and queue up a "parse" message
* from SYS_OBJ to the player object. The "return" to this message
* is ignored. This function is called by the interface when
* input comes from a player's socket.
*
*/
void parse(Playerid player, const char *command, SOCKET fd)
{
List *args = list_new(2);
args->el[0].type = STR;
args->el[0].v.str = string_cpy(command);
args->el[1].type = NUM;
args->el[1].v.num = fd;
do_parse(player, PARSE, args);
}
void new_connect(Playerid player, SOCKET fd)
{
List *args = list_new(1);
args->el[0].type = NUM;
args->el[0].v.num = fd;
do_parse(player, SYM_CONNECT, args);
}
static void do_parse(Playerid player, int msg, List *args)
{
Message *m = MALLOC (Message, 1);
Event *e = MALLOC (Event, 1);
Timeval cur_time;
gettimeofday (&cur_time, 0);
m->msgid = e->blocked_msgid = next_msgid++;
m->age = m->ticks = 0;
m->player.id = player;
m->player.server = 0;
m->from.id = player;
m->from.server = 0;
m->to.id = player;
m->to.server = 0;
m->message = sym_sys(msg);
m->args = args;
m->on.id = player;
m->on.server = 0;
e->msg = 0;
gettimeofday (&e->timeout_at, 0);
e->m = 0;
e->timeout_at.tv_sec += MSG_TIMEOUT * 2;
e->blocked_on = BL_SYS_MESSAGE;
e->msgid = PARSE; /* system message 'PARSE' */
msg_add(m); event_add(e);
}
/*
* These routines are used to parse incoming messages from remote servers.
*/
const char *parse_num (const char *s, int *num);
const char *parse_obj (const char *s, Objid * obj);
const char *parse_string (const char *s, String ** str);
const char *parse_id (const char *s, char **id);
const char *parse_list (const char *s, List ** list);
const char *parse_err (const char *s, Error * err);
const char *parse_var (const char *s, Var * v);
const char *parse_map (const char *s, Map ** map);
Message *parse_message (const char *msg)
{
Message *m = MALLOC (Message, 1);
m->args = 0;
m->message = 0;
if (!(msg = parse_num (msg, &m->msgid))
|| !(msg = parse_num (msg, &m->age))
|| !(msg = parse_num (msg, &m->ticks))
|| !(msg = parse_obj (msg, &m->player))
|| !(msg = parse_obj (msg, &m->from))
|| !(msg = parse_obj (msg, &m->to))
|| !(msg = parse_string (msg, &m->message))
|| !(msg = parse_list (msg, &m->args))) {
if (m->message) {
FREE (m->message);
}
if (m->args) {
list_free (m->args);
}
FREE (m);
return 0;
}
m->on = m->to; /* look for methods on 'to' object */
return m;
}
const char *parse_num (const char *msg, int *num)
{
int neg = 0;
*num = 0;
while (isspace ((int)*msg))
msg++;
if (*msg == '-') {
neg = 1;
msg++;
}
if (!isdigit ((int)*msg)) {
return 0;
}
do {
*num = *num * 10 + *msg++ - '0';
} while (isdigit ((int)*msg));
*num = neg ? -*num : *num;
return msg;
}
const char *parse_obj (const char *msg, Objid * obj)
{
char *serv;
while (isspace ((int)*msg))
msg++;
if (*msg++ != '#') {
return 0;
}
if (!(msg = parse_num (msg, &obj->id))) {
return 0;
}
while (isspace ((int)*msg))
msg++;
if (*msg != '@') {
obj->server = 0;
return msg;
}
msg++;
if (!(msg = parse_id (msg, &serv))) {
return 0;
}
if ((obj->server = serv_name2id (serv)) < 0) {
FREE (serv);
return 0;
}
FREE (serv);
return msg;
}
const char *parse_id (const char *msg, char **id)
{
int len = 0;
const char *s;
while (isspace ((int)*msg))
msg++;
if (*msg && !isalpha ((int)*msg) && *msg != '_') {
return 0;
}
s = msg;
while (*msg && (isalnum ((int)*msg) || *msg == '_')) {
len++;
msg++;
}
*id = str_ndup (s, len);
return msg;
}
const char *parse_string (const char *s, String ** r)
{
int n;
const char *start;
while (isspace ((int)*s))
s++;
if (*s++ != '"') {
return 0;
}
for (start = s, n = 0; *s != '"'; s++, n++) {
if (*s == '\\') { /* escaped char (eg., '\n') */
s++;
} else if (!*s) { /* non-terminated string */
return 0;
}
}
*r = string_new (n);
for (s = start; n; s++, n--) {
if (*s == '\\') {
switch (*++s) {
case 'n':
*r = string_cat (*r, "\r\n");
break;
case 't':
*r = string_catc (*r, '\t');
break;
default:
*r = string_catc (*r, *s);
break;
}
} else {
*r = string_catc (*r, *s);
}
}
return ++s;
}
/*
* compose_list()
*
* Converts a list to YO ASCII format, in the form of a String.
* The contents are surrounded by braces and separated by
* spaces. The first value indicates the length of the list. This
* routine should only be used by functions inteded to send YO
* packets onto the net (ie., not for COOL). The ASCII
* representation is appended to the buffer. (ie., the buffer is
* *not* initialized.)
*/
static String *compose_var (String * st, Var v)
{
switch (v.type) {
case LIST:
st = compose_list (st, v.v.list);
break;
case MAP:
st = compose_map (st, v.v.map);
break;
case STR:
st = string_catc (st, '"');
st = string_backslash (st, v.v.str->str);
st = string_catc (st, '"');
break;
case NUM:
st = string_catnum (st, v.v.num);
break;
case OBJ:
st = string_catobj (st, v.v.obj, 1);
break;
case ERR:
st = string_cat (st, err_id2name (v.v.err));
break;
case PC: /* shouldn't happen */
break;
}
return st;
}
static String *compose_list (String * st, List * list)
{
int i;
st = string_cat (st, "{ ");
st = string_catnum (st, list->len);
st = string_catc (st, ' ');
for (i = 0; i < list->len; i++) {
st = compose_var (st, list->el[i]);
st = string_catc (st, ' ');
}
st = string_catc (st, '}');
return st;
}
static String *compose_map (String * st, Map * map)
{
int i;
MapPair *pair;
st = string_cat (st, "[ ");
st = string_catnum (st, map->num);
st = string_catc (st, ' ');
for (i = 0; i < map->size; i++) {
for (pair = map->table[i]; pair; pair = pair->next) {
st = compose_var (st, pair->from);
st = string_catc (st, ' ');
st = compose_var (st, pair->to);
st = string_catc (st, ' ');
}
}
st = string_cat (st, "]");
return st;
}
const char *parse_map (const char *s, Map ** map)
{
int i, num;
Var from, to;
while (isspace ((int)*s))
s++;
if (*s++ != '[') {
return 0;
} else if (!(s = parse_num (s, &num))) {
return 0;
}
*map = map_new (num);
for (i = 0; i < num; i++) {
if (!(s = parse_var (s, &from))) {
return 0;
} else if (!(s = parse_var (s, &to))) {
return 0;
}
*map = map_add (*map, from, to);
}
while (isspace ((int)*s))
s++;
if (*s++ != ']') {
return 0;
}
return s;
}
const char *parse_list (const char *s, List ** list)
{
int i, len;
while (isspace ((int)*s))
s++;
if (*s++ != '{') {
return 0;
}
if (!(s = parse_num (s, &len))) {
return 0;
}
*list = list_new (len);
for (i = 0; i < len; i++) {
if (!(s = parse_var (s, &((*list)->el[i])))) {
(*list)->len = i; /* abort free_list() before current value */
return 0;
}
}
while (isspace ((int)*s))
s++;
if (*s++ != '}') {
return 0;
}
return s;
}
const char *parse_err (const char *s, Error * err)
{
const char *end;
char *name;
while (isspace ((int)*s))
s++;
if (*s != 'E') {
return 0;
}
if (!(end = index (s, ' '))) {
end = index (s, '\0');
}
name = str_ndup (s, end - s);
*err = err_name2id (name);
FREE (name);
return end;
}
const char *parse_var (const char *s, Var * v)
{
while (isspace ((int)*s))
s++;
if (*s == '"') {
v->type = STR;
return parse_string (s, &v->v.str);
} else if (isdigit ((int)*s) || *s == '-') {
v->type = NUM;
return parse_num (s, &v->v.num);
} else if (*s == '#') {
v->type = OBJ;
return parse_obj (s, &v->v.obj);
} else if (*s == '{') {
v->type = LIST;
return parse_list (s, &v->v.list);
} else if (*s == '[') {
v->type = MAP;
return parse_map (s, &v->v.map);
} else if (*s == 'E') {
v->type = ERR;
return parse_err (s, &v->v.err);
} else {
return 0;
}
}
List *ps (void)
{
List *l;
Var *el;
Event *e;
int len;
len = 0;
for (e = event_head; e; e = e->next) {
if (e->blocked_on == BL_SYS_MESSAGE)
continue;
len++;
} /* for */
l = list_new (len);
len = 0;
for (e = event_head; e; e = e->next) {
if (e->blocked_on == BL_SYS_MESSAGE)
continue;
l->el[len].type = LIST;
l->el[len].v.list = list_new (11);
el = l->el[len].v.list->el;
el[0].type = NUM; el[0].v.num = e->msgid;
el[1].type = NUM; el[1].v.num = e->age;
el[2].type = NUM; el[2].v.num = e->ticks;
el[3].type = OBJ; el[3].v.obj = e->player;
el[4].type = OBJ; el[4].v.obj = e->this;
el[5].type = OBJ; el[5].v.obj = e->on->id;
el[6].type = OBJ; el[6].v.obj = e->caller;
el[7].type = LIST; el[7].v.list = list_dup(e->args);
el[8].type = STR;
el[8].v.str = string_dup (sym_get (e->on, e->m->name));
el[9].type = NUM; el[9].v.num = e->blocked_on;
el[10].type = NUM; el[10].v.num = e->timeout_at.tv_sec;
len++;
} /* for */
return l;
} /* ps() */
int cmkill (int pid, Objid who)
{
Event *e;
for (e = event_head; e; e = e->next) {
if (e->msgid == pid && e->blocked_on != BL_SYS_MESSAGE) {
if ((e->player.id == who.id && e->player.server == who.server)
|| is_wizard (who)) {
event_free_assoc (e);
e->blocked_on = BL_DEAD;
e->timeout_at.tv_sec = 0;
return 0;
} else {
return -2;
} /* if */
} /* if */
} /* for */
return -1;
} /* cmkill() */
void cmkill_all(void)
{
Event *e;
struct timeval dummy = {0, 0};
for (e = event_head; e; e = e->next) {
if (e->blocked_on != BL_SYS_MESSAGE) {
event_free_assoc(e);
e->blocked_on = BL_DEAD;
e->timeout_at.tv_sec = 0;
}
}
process_queues(dummy, &dummy);
}