Short: Threads for MCCP
From: Gawain
Date: 2002-08-10
Type: Patch
State: New
See also: p-011012
Note: The actual threads implementation differs from what was used here.
diff -Naur old/comm.c new/comm.c
--- old/comm.c Fri Aug 9 12:06:50 2002
+++ new/comm.c Fri Aug 9 08:46:29 2002
@@ -433,6 +433,10 @@
static void mccp_telnet_neg(int);
#endif
+#ifdef USE_PTHREAD
+static void *writer_thread(void *arg);
+#endif
+
static void free_input_to(input_to_t *);
static void telnet_neg(interactive_t *);
static void send_will(int);
@@ -456,6 +460,41 @@
#endif /* ERQ_DEMON */
+#ifdef USE_PTHREAD
+#define PTHREAD_WRITE_MAX_SIZE 100000
+static int thread_socket_write(SOCKET_T socket, char *msg, size_t size, interactive_t *ip)
+{
+ struct write_buffer_s *b;
+ int i=0;
+
+ xallocate(b, sizeof(struct write_buffer_s), "thread_socket_write()");
+ xallocate(b->buffer, size, "thread_socket_write()");
+ b->length = size;
+ b->next = NULL;
+
+ for(i=0; i<size; i++) b->buffer[i]=msg[i];
+
+ pthread_mutex_lock(&ip->write_mutex);
+
+ if(ip->write_first)
+ ip->write_last = ip->write_last->next = b;
+ else
+ ip->write_first = ip->write_last = b;
+ ip->write_size += size;
+
+ while(ip->write_size>=PTHREAD_WRITE_MAX_SIZE) {
+ struct write_buffer_s *tmp = ip->write_first;
+ ip->write_first = tmp->next;
+ ip->write_size -= tmp->length;
+ xfree(tmp->buffer);
+ xfree(tmp);
+ }
+
+ pthread_mutex_unlock(&ip->write_mutex);
+ pthread_cond_signal(&ip->write_cond);
+ return size;
+}
+#endif
#ifdef USE_IPV6
@@ -1067,7 +1093,9 @@
perror("listen");
exit(1);
}
+#ifndef USE_PTHREAD
set_socket_nonblocking(sos[i]);
+#endif
set_close_on_exec(sos[i]);
if (socket_number(sos[i]) >= min_nfds)
@@ -1456,10 +1484,14 @@
for (retries = 6;;) {
#ifdef USE_MCCP
- if (!ip->out_compress) /* here we choose the correct buffer */
+ if (!ip->out_compress) /* here we choose the correct buffer */
{
#endif
+#ifdef USE_PTHREAD
+ if ((n = (int)thread_socket_write(ip->socket, ip->message_buf, (size_t)chunk, ip)) != -1)
+#else
if ((n = (int)socket_write(ip->socket, ip->message_buf, (size_t)chunk)) != -1)
+#endif
{
break;
}
@@ -1467,12 +1499,17 @@
}
else
{
+#ifdef USE_PTHREAD
+ if ((n = (int)thread_socket_write(ip->socket, ip->out_compress_buf, (size_t)length, ip)) != -1)
+#else
if ((n = (int)socket_write(ip->socket, ip->out_compress_buf, (size_t)length)) != -1)
+#endif
{
break;
}
}
#endif
+#ifndef USE_PTHREAD
switch (errno) {
case EINTR:
if (--retries)
@@ -1521,11 +1558,11 @@
, time_stamp(), e);
}
}
+#endif /* USE_PTHREAD */
if (old_message_length)
remove_flush_entry(ip);
ip->do_close = FLAG_DO_CLOSE;
return;
-
} /* for (retries) */
#ifdef COMM_STAT
@@ -2467,8 +2504,13 @@
}
if (length > ip->chars_ready)
{
+#ifdef USE_PTHREAD
+ thread_socket_write(ip->socket, ip->text + ip->chars_ready
+ , (size_t)(length - ip->chars_ready), ip);
+#else
socket_write(ip->socket, ip->text + ip->chars_ready
, (size_t)(length - ip->chars_ready));
+#endif
ip->chars_ready = length;
}
}
@@ -2655,6 +2697,11 @@
if (interactive->out_compress)
end_compress(interactive);
#endif
+
+#ifdef USE_PTHREAD
+ /* buffer list is cleaned up by thread */
+ pthread_cancel(interactive->write_thread);
+#endif
shutdown(interactive->socket, 2);
socket_close(interactive->socket);
} /* if (erq or user) */
@@ -2687,7 +2734,11 @@
if (interactive->out_compress)
free(interactive->out_compress);
#endif
-
+
+#ifdef USE_PTHREAD
+ pthread_mutex_destroy(&interactive->write_mutex);
+ pthread_cond_destroy(&interactive->write_cond);
+#endif
free_svalue(&interactive->prompt);
/* Unlink the interactive structure from the shadow sentence
@@ -2833,7 +2884,9 @@
#endif
/* Set some useful socket options */
+#ifndef USE_PTHREAD
set_socket_nonblocking(new_socket);
+#endif
set_close_on_exec(new_socket);
set_socket_own(new_socket);
@@ -2924,6 +2977,14 @@
new_interactive->out_compress_buf=NULL;
#endif
+#ifdef USE_PTHREAD
+ pthread_mutex_init(&new_interactive->write_mutex, NULL);
+ pthread_cond_init(&new_interactive->write_cond, NULL);
+ new_interactive->write_first = new_interactive->write_last = NULL;
+ new_interactive->write_size = 0;
+ pthread_create(&new_interactive->write_thread, NULL, writer_thread, new_interactive);
+#endif
+
new_interactive->input_to = NULL;
put_volatile_string(&new_interactive->prompt, "> ");
new_interactive->modify_command = NULL;
@@ -4169,6 +4229,19 @@
if (ip->text[0] == '!' && ! (find_no_bang(ip) & IGNORE_BANG) )
{
+#ifdef USE_PTHREAD
+ if (to > &ip->text[ip->chars_ready])
+ {
+ thread_socket_write(ip->socket, &ip->text[ip->chars_ready],
+ (size_t)(to - &ip->text[ip->chars_ready]), ip);
+ ip->chars_ready = to - ip->text;
+ }
+ if (to > first) {
+ thread_socket_write(ip->socket, "\b \b", 3, ip);
+ to--;
+ ip->chars_ready--;
+ }
+#else
if (to > &ip->text[ip->chars_ready])
{
socket_write(ip->socket, &ip->text[ip->chars_ready],
@@ -4180,6 +4253,7 @@
to--;
ip->chars_ready--;
}
+#endif
goto ts_data;
}
/* FALLTHROUGH */
@@ -6015,6 +6089,9 @@
* to the socket now.
*/
+#ifdef USE_PTHREAD
+ wrote = (mp_int)thread_socket_write(ip->socket, message, (size_t)size, ip);
+#else
for (i = 6;;) {
wrote = (mp_int)socket_write(ip->socket, message, (size_t)size);
if (wrote != -1)
@@ -6045,6 +6122,7 @@
}
break;
} /* end for on retry count */
+#endif
} /* if (type of write) */
command_giver = save_command_giver;
@@ -6961,5 +7039,81 @@
/* success */
return 1;
+}
+#endif
+
+#ifdef USE_PTHREAD
+static void writer_thread_cleanup(void *arg) {
+ interactive_t * ip = (interactive_t *) arg;
+ struct write_buffer_s *buf = ip->write_first, *tmp;
+
+ while(buf) {
+ tmp = buf->next;
+ if(buf->buffer) xfree(buf->buffer);
+ if(buf) xfree(buf);
+ buf = tmp;
+ }
+ fprintf(stderr, "Thread %d canceled and cleaned up!\n", pthread_self());
+}
+
+void *writer_thread(void *arg) {
+ interactive_t *ip = (interactive_t *) arg;
+ struct write_buffer_s *buf;
+
+ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // make us cancelable
+ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
+ pthread_cleanup_push(writer_thread_cleanup, ip);
+
+ while(1) {
+ /* cancellation point */
+ pthread_testcancel();
+
+ /* mutex protected getting of first write_buffer */
+ pthread_mutex_lock(&ip->write_mutex);
+ if(!ip->write_first) // if no first write_buffer -> wait on signal from mainthread
+ pthread_cond_wait(&ip->write_cond, &ip->write_mutex);
+ buf = ip->write_first;
+ ip->write_first = buf->next;
+ ip->write_size -= buf->length;
+ pthread_mutex_unlock(&ip->write_mutex);
+
+ /* write the stuff to socket */
+ if ((socket_write(ip->socket, buf->buffer, buf->length)) != -1) ;
+ else
+ switch (errno) {
+ case EINTR:
+ fprintf(stderr, "%s comm: write EINTR. Message discarded.\n", time_stamp());
+
+ case EWOULDBLOCK:
+ fprintf(stderr, "%s comm: write EWOULDBLOCK. Message discarded.\n", time_stamp());
+
+ case EMSGSIZE:
+ fprintf(stderr, "%s comm: write EMSGSIZE.\n", time_stamp());
+
+ case EINVAL:
+ fprintf(stderr, "%s comm: write EINVAL.\n", time_stamp());
+
+ case ENETUNREACH:
+ fprintf(stderr, "%s comm: write ENETUNREACH.\n", time_stamp());
+
+ case EHOSTUNREACH:
+ fprintf(stderr, "%s comm: write EHOSTUNREACH.\n", time_stamp());
+
+ case EPIPE:
+ fprintf(stderr, "%s comm: write EPIPE detected\n", time_stamp());
+
+ default:
+ {
+ int e = errno;
+ fprintf(stderr, "%s comm: write: unknown errno %d\n"
+ , time_stamp(), e);
+ }
+ }
+
+ if(buf) {
+ xfree(buf->buffer);
+ xfree(buf);
+ }
+ }
}
#endif
diff -Naur old/comm.h new/comm.h
--- old/comm.h Fri Aug 9 12:06:50 2002
+++ new/comm.h Thu Aug 8 14:34:30 2002
@@ -4,6 +4,9 @@
#include "driver.h"
#include "typedefs.h"
#include <sys/types.h>
+#ifdef USE_PTHREAD
+#include <pthread.h>
+#endif
#ifdef USE_MCCP
#include <zlib.h>
#endif
@@ -70,6 +73,14 @@
/* --- Types --- */
+#ifdef USE_PTHREAD
+struct write_buffer_s {
+ struct write_buffer_s *next;
+ int length;
+ char *buffer;
+};
+#endif
+
/* --- struct input_to: input_to() datastructure
*
* input-to structures describe a pending input_to() for a given
@@ -167,11 +178,19 @@
/* The send buffer. */
#ifdef USE_MCCP
- unsigned char compressing; /* MCCP support */
- z_stream * out_compress; /* MCCP support */
- unsigned char * out_compress_buf; /* MCCP support */
+ unsigned char compressing; /* MCCP support */
+ z_stream * out_compress; /* MCCP support */
+ unsigned char * out_compress_buf; /* MCCP support */
+#endif
+
+#ifdef USE_PTHREAD
+ pthread_mutex_t write_mutex;
+ pthread_cond_t write_cond;
+ pthread_t write_thread;
+ struct write_buffer_s *write_first;
+ struct write_buffer_s *write_last;
+ unsigned long write_size;
#endif
-
};
/* --- Bitflags and masks for interactive.noecho ---