/*
// Full copyright information is available in the file ../doc/CREDITS
*/
#define _io_
#include "defs.h"
#include <ctype.h>
#include <string.h>
#include "cdc_pcode.h"
#include "util.h"
#include "cache.h"
INTERNAL void connection_read(Conn *conn);
INTERNAL void connection_write(Conn *conn);
INTERNAL Conn *connection_add(Int fd, Long objnum);
INTERNAL void connection_discard(Conn *conn);
INTERNAL void pend_discard(pending_t *pend);
INTERNAL void server_discard(server_t *serv);
INTERNAL Conn * connections; /* List of client connections. */
INTERNAL server_t * servers; /* List of server sockets. */
INTERNAL pending_t * pendings; /* List of pending connections. */
/*
// --------------------------------------------------------------------
// Flush defunct connections and files.
//
// Notify the connection object of any dead connections and delete them.
*/
void flush_defunct(void) {
Conn **connp, *conn;
server_t **servp, *serv;
pending_t **pendp, *pend;
connp = &connections;
while (*connp) {
conn = *connp;
if (conn->flags.dead && conn->write_buf->len == 0) {
*connp = conn->next;
connection_discard(conn);
} else {
connp = &conn->next;
}
}
servp = &servers;
while (*servp) {
serv = *servp;
if (serv->dead) {
*servp = serv->next;
server_discard(serv);
} else {
servp = &serv->next;
}
}
pendp = &pendings;
while (*pendp) {
pend = *pendp;
if (pend->finished) {
*pendp = pend->next;
pend_discard(pend);
} else {
pendp = &pend->next;
}
}
}
/*
// --------------------------------------------------------------------
// Call io_event_wait() to wait for something to happen. The return
// value is nonzero if an I/O event occurred. If there is a new
// connection, then *fd will be set to the descriptor of the new
// connection; otherwise, it is set to -1.
*/
void handle_io_event_wait(Int seconds) {
io_event_wait(seconds, connections, servers, pendings);
}
/*
// --------------------------------------------------------------------
*/
void handle_connection_input(void) {
Conn * conn;
for (conn = connections; conn; conn = conn->next) {
if (conn->flags.readable && !conn->flags.dead)
connection_read(conn);
}
}
/*
// --------------------------------------------------------------------
*/
void handle_connection_output(void) {
Conn * conn;
for (conn = connections; conn; conn = conn->next) {
if (conn->flags.writable)
connection_write(conn);
}
}
/*
// --------------------------------------------------------------------
*/
void handle_new_and_pending_connections(void) {
Conn *conn;
server_t *serv;
pending_t *pend;
cStr *str;
cData d1, d2, d3;
/* Look for new connections on the server sockets. */
for (serv = servers; serv; serv = serv->next) {
if (serv->client_socket == -1)
continue;
conn = connection_add(serv->client_socket, serv->objnum);
serv->client_socket = -1;
str = string_from_chars(serv->client_addr, strlen(serv->client_addr));
d1.type = STRING;
d1.u.str = str;
d2.type = STRING;
d2.u.str = serv->addr; /* dont dup, task() will */
d3.type = INTEGER;
d3.u.val = serv->client_port;
task(conn->objnum, connect_id, 3, &d1, &d2, &d3);
string_discard(str);
}
/* Look for pending connections succeeding or failing. */
for (pend = pendings; pend; pend = pend->next) {
if (pend->finished) {
if (pend->error == NOT_AN_IDENT) {
conn = connection_add(pend->fd, pend->objnum);
d1.type = INTEGER;
d1.u.val = pend->task_id;
task(conn->objnum, connect_id, 1, &d1);
} else {
SOCK_CLOSE(pend->fd);
d1.type = INTEGER;
d1.u.val = pend->task_id;
d2.type = T_ERROR;
d2.u.error = pend->error;
task(pend->objnum, failed_id, 2, &d1, &d2);
}
}
}
}
/*
// --------------------------------------------------------------------
// This will attempt to find a connection associated with an object.
// For faster hunting we will check obj->conn, which may be set to NULL
// even though a connection may exist (the pointer is only valid while
// the object is in the cache, and is reset to NULL when it is read from
// disk). If obj->conn is NULL and a connection exists, we set
// obj->conn to the connection, so we will know it next time.
//
// Note: if more than one connection is associated with an object, this
// will only return the most recent connection. Hopefully more than one
// connection will not get associated, we need to hack the server to
// blast old connections when new ones are associated, or to deny new
// ones. Either way the db should be paying close attention to what
// is occuring.
//
// Once new connections bump old connections, this problem will go
// away.
*/
Conn * find_connection(Obj * obj) {
/* obj->conn is only for faster lookups */
if (obj->conn == NULL) {
Conn * conn;
/* lets try and find the connection */
for (conn = connections; conn; conn = conn->next) {
if (conn->objnum == obj->objnum && !conn->flags.dead) {
obj->conn = conn;
break;
}
}
}
/* it could still be NULL */
return obj->conn;
}
/*
// --------------------------------------------------------------------
// returning the connection is what we are using as a status report, if
// there is no connection, it will be NULL, and we will know.
*/
Conn * tell(Obj * obj, cBuf * buf) {
Conn * conn = find_connection(obj);
if (conn != NULL)
conn->write_buf = buffer_append(conn->write_buf, buf);
return conn;
}
/*
// --------------------------------------------------------------------
*/
Int boot(Obj * obj) {
Conn * conn = find_connection(obj);
if (conn != NULL) {
conn->flags.dead = 1;
return 1;
}
return 0;
}
/*
// --------------------------------------------------------------------
*/
Int tcp_server(Int port, char * ipaddr, Long objnum) {
server_t * cnew;
SOCKET server_socket;
/* Check if a server already exists for this port and address */
for (cnew = servers; cnew; cnew = cnew->next) {
if (cnew->port == port) {
if (ipaddr && strcmp(string_chars(cnew->addr), ipaddr))
continue;
cnew->objnum = objnum;
cnew->dead = 0;
return TRUE;
}
}
/* Get a server socket for the port. */
server_socket = get_tcp_socket(port, ipaddr);
if (server_socket == SOCKET_ERROR)
return FALSE;
cnew = EMALLOC(server_t, 1);
cnew->server_socket = server_socket;
cnew->client_socket = -1;
cnew->port = port;
if (ipaddr)
cnew->addr = string_from_chars(ipaddr, strlen(ipaddr));
else
cnew->addr = string_new(0);
cnew->objnum = objnum;
cnew->dead = 0;
cnew->next = servers;
servers = cnew;
return TRUE;
}
Int udp_server(Int port, char * ipaddr, Long objnum) {
SOCKET server_socket;
/* Get a server socket for the port. */
server_socket = get_udp_socket(port, ipaddr);
if (server_socket == SOCKET_ERROR)
return FALSE;
connection_add(server_socket, objnum);
return TRUE;
}
/*
// --------------------------------------------------------------------
*/
Int remove_server(Int port) {
server_t **servp;
for (servp = &servers; *servp; servp = &((*servp)->next)) {
if ((*servp)->port == port) {
(*servp)->dead = 1;
return 1;
}
}
return 0;
}
/*
// --------------------------------------------------------------------
*/
INTERNAL void connection_read(Conn *conn) {
unsigned char temp[BIGBUF];
Int len;
cBuf *buf;
cData d;
len = SOCK_READ(conn->fd, (char *) temp, BIGBUF);
if (len == SOCKET_ERROR && GETERR() == ERR_INTR)
return;
conn->flags.readable = 0;
if (len <= 0) {
/* The connection closed. */
conn->flags.dead = 1;
return;
}
/* We successfully read some data. Handle it. */
buf = buffer_new(len);
MEMCPY(buf->s, temp, len);
d.type = BUFFER;
d.u.buffer = buf;
task(conn->objnum, parse_id, 1, &d);
buffer_discard(buf);
}
/*
// --------------------------------------------------------------------
*/
INTERNAL void connection_write(Conn *conn) {
cBuf *buf = conn->write_buf;
Int r;
r = SOCK_WRITE(conn->fd, buf->s, buf->len);
conn->flags.writable = 0;
/* We lost the connection. */
if (r == SOCKET_ERROR) {
conn->flags.dead = 1;
buf = buffer_resize(buf, 0);
} else {
MEMMOVE(buf->s, buf->s + r, buf->len - r);
buf = buffer_resize(buf, buf->len - r);
}
conn->write_buf = buf;
}
/*
// --------------------------------------------------------------------
*/
INTERNAL Conn * connection_add(Int fd, Long objnum) {
Conn * conn;
/* clear old connections to this objnum */
for (conn = connections; conn; conn = conn->next) {
if (conn->objnum == objnum && !conn->flags.dead)
conn->flags.dead = 1;
}
/* initialize new connection */
conn = EMALLOC(Conn, 1);
conn->fd = fd;
conn->write_buf = buffer_new(0);
conn->objnum = objnum;
conn->flags.readable = 0;
conn->flags.writable = 0;
conn->flags.dead = 0;
conn->next = connections;
connections = conn;
return conn;
}
/*
// --------------------------------------------------------------------
*/
INTERNAL void connection_discard(Conn *conn) {
Obj * obj;
/* Notify connection object that the connection is gone. */
task(conn->objnum, disconnect_id, 0);
/* reset the conn variable on the object */
obj = cache_retrieve(conn->objnum);
if (obj != NULL) {
obj->conn = NULL;
cache_discard(obj);
}
/* Free the data associated with the connection. */
SOCK_CLOSE(conn->fd);
buffer_discard(conn->write_buf);
efree(conn);
}
/*
// --------------------------------------------------------------------
*/
INTERNAL void pend_discard(pending_t *pend) {
efree(pend);
}
/*
// --------------------------------------------------------------------
*/
INTERNAL void server_discard(server_t *serv) {
SOCK_CLOSE(serv->server_socket);
string_discard(serv->addr);
}
/*
// --------------------------------------------------------------------
*/
Long make_connection(char *addr, Int port, cObjnum receiver) {
pending_t *cnew;
SOCKET socket;
Long result;
result = non_blocking_connect(addr, port, &socket);
if (result == address_id || result == socket_id)
return result;
cnew = TMALLOC(pending_t, 1);
cnew->fd = socket;
cnew->task_id = task_id;
cnew->objnum = receiver;
cnew->finished = 0;
cnew->error = result;
cnew->next = pendings;
pendings = cnew;
return NOT_AN_IDENT;
}
Long make_udp_connection(char *addr, Int port, cObjnum receiver) {
pending_t *cnew;
SOCKET socket;
Long result;
result = udp_connect(addr, port, &socket);
if (result == address_id || result == socket_id)
return result;
cnew = TMALLOC(pending_t, 1);
cnew->fd = socket;
cnew->task_id = task_id;
cnew->objnum = receiver;
cnew->finished = 0;
cnew->error = result;
cnew->next = pendings;
pendings = cnew;
return NOT_AN_IDENT;
}
/*
// --------------------------------------------------------------------
// Write out everything in connections' write buffers. Called by main()
// before exiting; does not modify the write buffers to reflect writing.
*/
void flush_output(void) {
Conn * conn;
unsigned char * s;
Int len, r;
/* do connections */
for (conn = connections; conn; conn = conn->next) {
s = conn->write_buf->s;
len = conn->write_buf->len;
while (len) {
r = SOCK_WRITE(conn->fd, s, len);
if (r == SOCKET_ERROR)
break;
len -= r;
s += r;
}
}
}
#undef _io_