Short: Threads for MCCP From: Gawain Date: 2002-08-10 Type: Patch State: New See also: p-011012 Note: The actual threads implementation differs from what was used here. diff -Naur old/comm.c new/comm.c --- old/comm.c Fri Aug 9 12:06:50 2002 +++ new/comm.c Fri Aug 9 08:46:29 2002 @@ -433,6 +433,10 @@ static void mccp_telnet_neg(int); #endif +#ifdef USE_PTHREAD +static void *writer_thread(void *arg); +#endif + static void free_input_to(input_to_t *); static void telnet_neg(interactive_t *); static void send_will(int); @@ -456,6 +460,41 @@ #endif /* ERQ_DEMON */ +#ifdef USE_PTHREAD +#define PTHREAD_WRITE_MAX_SIZE 100000 +static int thread_socket_write(SOCKET_T socket, char *msg, size_t size, interactive_t *ip) +{ + struct write_buffer_s *b; + int i=0; + + xallocate(b, sizeof(struct write_buffer_s), "thread_socket_write()"); + xallocate(b->buffer, size, "thread_socket_write()"); + b->length = size; + b->next = NULL; + + for(i=0; i<size; i++) b->buffer[i]=msg[i]; + + pthread_mutex_lock(&ip->write_mutex); + + if(ip->write_first) + ip->write_last = ip->write_last->next = b; + else + ip->write_first = ip->write_last = b; + ip->write_size += size; + + while(ip->write_size>=PTHREAD_WRITE_MAX_SIZE) { + struct write_buffer_s *tmp = ip->write_first; + ip->write_first = tmp->next; + ip->write_size -= tmp->length; + xfree(tmp->buffer); + xfree(tmp); + } + + pthread_mutex_unlock(&ip->write_mutex); + pthread_cond_signal(&ip->write_cond); + return size; +} +#endif #ifdef USE_IPV6 @@ -1067,7 +1093,9 @@ perror("listen"); exit(1); } +#ifndef USE_PTHREAD set_socket_nonblocking(sos[i]); +#endif set_close_on_exec(sos[i]); if (socket_number(sos[i]) >= min_nfds) @@ -1456,10 +1484,14 @@ for (retries = 6;;) { #ifdef USE_MCCP - if (!ip->out_compress) /* here we choose the correct buffer */ + if (!ip->out_compress) /* here we choose the correct buffer */ { #endif +#ifdef USE_PTHREAD + if ((n = (int)thread_socket_write(ip->socket, ip->message_buf, (size_t)chunk, ip)) != -1) +#else if ((n = (int)socket_write(ip->socket, ip->message_buf, (size_t)chunk)) != -1) +#endif { break; } @@ -1467,12 +1499,17 @@ } else { +#ifdef USE_PTHREAD + if ((n = (int)thread_socket_write(ip->socket, ip->out_compress_buf, (size_t)length, ip)) != -1) +#else if ((n = (int)socket_write(ip->socket, ip->out_compress_buf, (size_t)length)) != -1) +#endif { break; } } #endif +#ifndef USE_PTHREAD switch (errno) { case EINTR: if (--retries) @@ -1521,11 +1558,11 @@ , time_stamp(), e); } } +#endif /* USE_PTHREAD */ if (old_message_length) remove_flush_entry(ip); ip->do_close = FLAG_DO_CLOSE; return; - } /* for (retries) */ #ifdef COMM_STAT @@ -2467,8 +2504,13 @@ } if (length > ip->chars_ready) { +#ifdef USE_PTHREAD + thread_socket_write(ip->socket, ip->text + ip->chars_ready + , (size_t)(length - ip->chars_ready), ip); +#else socket_write(ip->socket, ip->text + ip->chars_ready , (size_t)(length - ip->chars_ready)); +#endif ip->chars_ready = length; } } @@ -2655,6 +2697,11 @@ if (interactive->out_compress) end_compress(interactive); #endif + +#ifdef USE_PTHREAD + /* buffer list is cleaned up by thread */ + pthread_cancel(interactive->write_thread); +#endif shutdown(interactive->socket, 2); socket_close(interactive->socket); } /* if (erq or user) */ @@ -2687,7 +2734,11 @@ if (interactive->out_compress) free(interactive->out_compress); #endif - + +#ifdef USE_PTHREAD + pthread_mutex_destroy(&interactive->write_mutex); + pthread_cond_destroy(&interactive->write_cond); +#endif free_svalue(&interactive->prompt); /* Unlink the interactive structure from the shadow sentence @@ -2833,7 +2884,9 @@ #endif /* Set some useful socket options */ +#ifndef USE_PTHREAD set_socket_nonblocking(new_socket); +#endif set_close_on_exec(new_socket); set_socket_own(new_socket); @@ -2924,6 +2977,14 @@ new_interactive->out_compress_buf=NULL; #endif +#ifdef USE_PTHREAD + pthread_mutex_init(&new_interactive->write_mutex, NULL); + pthread_cond_init(&new_interactive->write_cond, NULL); + new_interactive->write_first = new_interactive->write_last = NULL; + new_interactive->write_size = 0; + pthread_create(&new_interactive->write_thread, NULL, writer_thread, new_interactive); +#endif + new_interactive->input_to = NULL; put_volatile_string(&new_interactive->prompt, "> "); new_interactive->modify_command = NULL; @@ -4169,6 +4229,19 @@ if (ip->text[0] == '!' && ! (find_no_bang(ip) & IGNORE_BANG) ) { +#ifdef USE_PTHREAD + if (to > &ip->text[ip->chars_ready]) + { + thread_socket_write(ip->socket, &ip->text[ip->chars_ready], + (size_t)(to - &ip->text[ip->chars_ready]), ip); + ip->chars_ready = to - ip->text; + } + if (to > first) { + thread_socket_write(ip->socket, "\b \b", 3, ip); + to--; + ip->chars_ready--; + } +#else if (to > &ip->text[ip->chars_ready]) { socket_write(ip->socket, &ip->text[ip->chars_ready], @@ -4180,6 +4253,7 @@ to--; ip->chars_ready--; } +#endif goto ts_data; } /* FALLTHROUGH */ @@ -6015,6 +6089,9 @@ * to the socket now. */ +#ifdef USE_PTHREAD + wrote = (mp_int)thread_socket_write(ip->socket, message, (size_t)size, ip); +#else for (i = 6;;) { wrote = (mp_int)socket_write(ip->socket, message, (size_t)size); if (wrote != -1) @@ -6045,6 +6122,7 @@ } break; } /* end for on retry count */ +#endif } /* if (type of write) */ command_giver = save_command_giver; @@ -6961,5 +7039,81 @@ /* success */ return 1; +} +#endif + +#ifdef USE_PTHREAD +static void writer_thread_cleanup(void *arg) { + interactive_t * ip = (interactive_t *) arg; + struct write_buffer_s *buf = ip->write_first, *tmp; + + while(buf) { + tmp = buf->next; + if(buf->buffer) xfree(buf->buffer); + if(buf) xfree(buf); + buf = tmp; + } + fprintf(stderr, "Thread %d canceled and cleaned up!\n", pthread_self()); +} + +void *writer_thread(void *arg) { + interactive_t *ip = (interactive_t *) arg; + struct write_buffer_s *buf; + + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); // make us cancelable + pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); + pthread_cleanup_push(writer_thread_cleanup, ip); + + while(1) { + /* cancellation point */ + pthread_testcancel(); + + /* mutex protected getting of first write_buffer */ + pthread_mutex_lock(&ip->write_mutex); + if(!ip->write_first) // if no first write_buffer -> wait on signal from mainthread + pthread_cond_wait(&ip->write_cond, &ip->write_mutex); + buf = ip->write_first; + ip->write_first = buf->next; + ip->write_size -= buf->length; + pthread_mutex_unlock(&ip->write_mutex); + + /* write the stuff to socket */ + if ((socket_write(ip->socket, buf->buffer, buf->length)) != -1) ; + else + switch (errno) { + case EINTR: + fprintf(stderr, "%s comm: write EINTR. Message discarded.\n", time_stamp()); + + case EWOULDBLOCK: + fprintf(stderr, "%s comm: write EWOULDBLOCK. Message discarded.\n", time_stamp()); + + case EMSGSIZE: + fprintf(stderr, "%s comm: write EMSGSIZE.\n", time_stamp()); + + case EINVAL: + fprintf(stderr, "%s comm: write EINVAL.\n", time_stamp()); + + case ENETUNREACH: + fprintf(stderr, "%s comm: write ENETUNREACH.\n", time_stamp()); + + case EHOSTUNREACH: + fprintf(stderr, "%s comm: write EHOSTUNREACH.\n", time_stamp()); + + case EPIPE: + fprintf(stderr, "%s comm: write EPIPE detected\n", time_stamp()); + + default: + { + int e = errno; + fprintf(stderr, "%s comm: write: unknown errno %d\n" + , time_stamp(), e); + } + } + + if(buf) { + xfree(buf->buffer); + xfree(buf); + } + } } #endif diff -Naur old/comm.h new/comm.h --- old/comm.h Fri Aug 9 12:06:50 2002 +++ new/comm.h Thu Aug 8 14:34:30 2002 @@ -4,6 +4,9 @@ #include "driver.h" #include "typedefs.h" #include <sys/types.h> +#ifdef USE_PTHREAD +#include <pthread.h> +#endif #ifdef USE_MCCP #include <zlib.h> #endif @@ -70,6 +73,14 @@ /* --- Types --- */ +#ifdef USE_PTHREAD +struct write_buffer_s { + struct write_buffer_s *next; + int length; + char *buffer; +}; +#endif + /* --- struct input_to: input_to() datastructure * * input-to structures describe a pending input_to() for a given @@ -167,11 +178,19 @@ /* The send buffer. */ #ifdef USE_MCCP - unsigned char compressing; /* MCCP support */ - z_stream * out_compress; /* MCCP support */ - unsigned char * out_compress_buf; /* MCCP support */ + unsigned char compressing; /* MCCP support */ + z_stream * out_compress; /* MCCP support */ + unsigned char * out_compress_buf; /* MCCP support */ +#endif + +#ifdef USE_PTHREAD + pthread_mutex_t write_mutex; + pthread_cond_t write_cond; + pthread_t write_thread; + struct write_buffer_s *write_first; + struct write_buffer_s *write_last; + unsigned long write_size; #endif - }; /* --- Bitflags and masks for interactive.noecho ---