#include "std.h" #include "../lpc_incl.h" #include "async.h" #include "../function.h" #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <pthread.h> #ifdef F_ASYNC_GETDIR #define _GNU_SOURCE #include <sys/syscall.h> #endif #include "../config.h" #include "../interpret.h" #include "../file.h" #if defined(F_ASYNC_READ) || defined(F_ASYNC_WRITE) struct cb_mem{ function_to_call_t cb; struct cb_mem *next; } *cbs = 0; struct aiob_mem{ aiob aio; struct aiob_mem *next; } *aiobs = 0; struct req_mem{ struct request req; struct req_mem *next; } *reqms; struct stuff{ void (*func)(void *); void *data; struct stuff *next; } *todo, *lasttodo; struct stuff_mem{ struct stuff stuff; struct stuff_mem *next; } *stuffs; pthread_mutex_t mem_mut; struct stuff *get_stuff(){ struct stuff *ret; if(stuffs){ pthread_mutex_lock(&mem_mut); ret = &stuffs->stuff; stuffs = stuffs->next; ((struct stuff_mem *)ret)->next = 0; pthread_mutex_unlock(&mem_mut); }else{ ret = MALLOC(sizeof(struct stuff_mem)); ((struct stuff_mem *)ret)->next = 0; } return ret; } void free_stuff(struct stuff *stuff){ struct stuff_mem *stufft = (struct stuff_mem *)stuff; pthread_mutex_lock(&mem_mut); stufft->next = stuffs; stuffs = stufft; pthread_mutex_unlock(&mem_mut); } pthread_mutex_t mut; pthread_mutex_t work_mut; int thread_started = 0; void thread_func(void *mydata){ while(1){ pthread_mutex_lock(&mut); while(todo){ pthread_mutex_lock(&work_mut); struct stuff *work = todo; todo = todo->next; if(!todo) lasttodo = NULL; pthread_mutex_unlock(&work_mut); work->func(work->data); free_stuff(work); } } } void do_stuff(void (*func)(void *), void *data){ if(!thread_started){ pthread_mutex_init(&mut, NULL); pthread_mutex_init(&mem_mut, NULL); pthread_mutex_init(&work_mut, NULL); pthread_mutex_lock(&mut); pthread_t t; pthread_create(&t, NULL, thread_func, NULL); thread_started = 1; } struct stuff *work = get_stuff(); work->func = func; work->data = data; work->next = NULL; pthread_mutex_lock(&work_mut); if(lasttodo){ lasttodo->next = work; lasttodo = work; } else { todo = lasttodo = work; } pthread_mutex_unlock(&work_mut); pthread_mutex_unlock(&mut); } function_to_call_t *get_cb(){ function_to_call_t *ret; if(cbs){ ret = &cbs->cb; cbs = cbs->next; ((struct cb_mem *)ret)->next = 0; }else{ ret = MALLOC(sizeof(struct cb_mem)); ((struct cb_mem *)ret)->next = 0; } memset(ret, 0, sizeof(function_to_call_t)); return ret; } void free_cb(function_to_call_t *cb){ struct cb_mem *cbt = (struct cb_mem *)cb; cbt->next = cbs; cbs = cbt; } aiob *get_aiob(){ aiob *ret; if(aiobs){ ret = &aiobs->aio; aiobs = aiobs->next; ((struct aiob_mem *)ret)->next = 0; }else{ ret = MALLOC(sizeof(struct aiob_mem)); ((struct aiob_mem *)ret)->next = 0; } return ret; } void free_aiob(aiob *aio){ struct aiob_mem *aiobt = (struct aiob_mem *)aio; aiobt->next = aiobs; aiobs = aiobt; } struct request *get_req(){ struct request *ret; if(reqms){ ret = &reqms->req; reqms = reqms->next; ((struct req_mem *)ret)->next = 0; }else{ ret = MALLOC(sizeof(struct req_mem)); ((struct req_mem *)ret)->next = 0; } return ret; } void free_req(struct request *req){ struct req_mem *reqt = (struct req_mem *)req; reqt->next = reqms; reqms = reqt; } static struct request *reqs = NULL; static struct request *lastreq = NULL; void add_req(struct request *req){ if(lastreq){ lastreq->next = req; } else { reqs = req; } req->next = NULL; lastreq = req; } #ifdef PACKAGE_COMPRESS #include <zlib.h> void *gzreadthread(void *data){ aiob *aio = (aiob *)data; void *file = gzdopen(dup(aio->aio_fildes), "rb"); aio->__return_value = gzread(file, (void *)(aio->aio_buf), aio->aio_nbytes); aio->__error_code = 0; gzclose(file); return NULL; } int aio_gzread(aiob *aio){ pthread_t thread; aio->__error_code = EINPROGRESS; do_stuff(gzreadthread, aio); return 0; } void *gzwritethread(void *data){ aiob *aio = (aiob *)data; void *file = gzdopen(dup(aio->aio_fildes), "wb"); aio->__return_value = gzwrite(file, (void *)(aio->aio_buf), aio->aio_nbytes); aio->__error_code = 0; gzclose(file); return NULL; } int aio_gzwrite(aiob *aio){ pthread_t thread; aio->__error_code = EINPROGRESS; do_stuff(gzwritethread, aio); return 0; } #endif void *writethread(void *data){ aiob *aio = (aiob *)data; aio->__return_value = write(aio->aio_fildes, (void *)(aio->aio_buf), aio->aio_nbytes); aio->__error_code = 0; return NULL; } int aio_write(aiob *aio){ pthread_t thread; aio->__error_code = EINPROGRESS; do_stuff(writethread, aio); return 0; } void *readthread(void *data){ aiob *aio = (aiob *)data; aio->__return_value = read(aio->aio_fildes, (void *)(aio->aio_buf), aio->aio_nbytes); aio->__error_code = 0; return NULL; } int aio_read(aiob *aio){ pthread_t thread; aio->__error_code = EINPROGRESS; do_stuff(readthread, aio); return 0; } #ifdef F_ASYNC_GETDIR void *getdirthread(void *data){ aiob *aio = (aiob *)data; aio->__return_value = syscall(SYS_getdents, aio->aio_fildes, aio->aio_buf, aio->aio_nbytes); aio->__error_code = 0; return NULL; } int aio_getdir(aiob *aio){ pthread_t thread; aio->__error_code = EINPROGRESS; do_stuff(getdirthread, aio); return 0; } #endif int add_read(const char *fname, function_to_call_t *fun) { if (fname) { aiob *aio = get_aiob(); memset(aio, 0, sizeof(aiob)); //printf("fname: %s\n", fname); int fd = open(fname, O_RDONLY); aio->aio_fildes = fd; aio->aio_buf = (char *)MALLOC(READ_FILE_MAX_SIZE); aio->aio_nbytes = READ_FILE_MAX_SIZE; struct request *req = get_req(); req->aio = aio; req->fun = fun; req->type = aread; add_req(req); #ifdef PACKAGE_COMPRESS return aio_gzread(aio); #else return aio_read(aio); #endif }else error("permission denied\n"); return 1; } #ifdef F_ASYNC_GETDIR extern int max_array_size; int add_getdir(const char *fname, function_to_call_t *fun) { if (fname) { aiob *aio= get_aiob(); memset(aio, 0, sizeof(aiob)); //printf("fname: %s\n", fname); int fd = open(fname, O_RDONLY); aio->aio_fildes = fd; aio->aio_buf = (char *)MALLOC(sizeof(struct dirent) * max_array_size); aio->aio_nbytes = sizeof(struct dirent) * max_array_size; struct request *req = get_req(); req->aio = aio; req->fun = fun; req->type = agetdir; add_req(req); return aio_getdir(aio); }else error("permission denied\n"); return 1; } #endif int add_write(const char *fname, const char *buf, int size, char flags, function_to_call_t *fun) { if (fname) { aiob *aio = get_aiob(); memset(aio, 0, sizeof(aiob)); int fd = open(fname, flags & 1 ? O_CREAT|O_WRONLY : O_CREAT|O_WRONLY|O_APPEND, S_IRWXU|S_IRWXG); aio->aio_fildes = fd; aio->aio_buf = buf; aio->aio_nbytes = size; struct request *req = get_req(); req->aio = aio; req->fun = fun; req->type = awrite; assign_svalue_no_free(&req->tmp, sp-2); add_req(req); #ifdef PACKAGE_COMPRESS if(flags & 2) return aio_gzwrite(aio); else #endif return aio_write(aio); } else error("permission denied\n"); return 1; } void handle_read(struct request *req, int val){ aiob *aio = req->aio; close(aio->aio_fildes); if(val){ FREE(aio->aio_buf); push_number(val); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); return; } val = aio_return(aio); if(val < 0){ FREE(aio->aio_buf); push_number(val); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); return; } char *file = new_string(val, "read_file_async: str"); memcpy(file, (char *)(aio->aio_buf), val); file[val]=0; push_malloced_string(file); FREE(aio->aio_buf); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); } #ifdef F_ASYNC_GETDIR struct linux_dirent { unsigned long d_ino; /* Inode number */ unsigned long d_off; /* Offset to next dirent */ unsigned short d_reclen; /* Length of this dirent */ char d_name []; /* Filename (null-terminated) */ /* length is actually (d_reclen - 2 - offsetof(struct linux_dirent, d_name) */ }; void handle_getdir(struct request *req, int val){ aiob *aio = req->aio; close(aio->aio_fildes); val = aio_return(aio); array_t *ret = allocate_empty_array(val); int i; if(val > -1) { struct linux_dirent *de = (struct linux_dirent *)aio->aio_buf; for(i=0; ((char *)de) - (char *)(aio->aio_buf) < val; i++) { svalue_t *vp = &(ret->item[i]); vp->type = T_STRING; vp->subtype = STRING_MALLOC; //printf("%s ", de->d_name); vp->u.string = string_copy(de->d_name, "encode_stat"); de = (struct linux_dirent *)(((char *)de) + de->d_reclen); } } ret = RESIZE_ARRAY(ret, i); ret->size = i; push_refed_array(ret); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); } #endif void handle_write(struct request *req, int val){ aiob *aio = req->aio; close(aio->aio_fildes); free_svalue(&req->tmp, "handle_write"); if(val){ push_number(val); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); return; } val = aio_return(aio); if(val < 0){ push_number(val); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); return; } push_undefined(); set_eval(max_cost); safe_call_efun_callback(req->fun, 1); } void check_reqs() { while (reqs) { int val = aio_error(reqs->aio); if (val != EINPROGRESS) { enum atypes type = (reqs->type); reqs->type = done; switch (type) { case aread: handle_read(reqs, val); break; case awrite: handle_write(reqs, val); break; #ifdef F_ASYNC_GETDIR case agetdir: handle_getdir(reqs, val); break; #endif case done: //must have had an error while handling it before. break; default: fatal("unknown async type\n"); } struct request *here = reqs; reqs = reqs->next; if(!reqs) lastreq = reqs; free_aiob(here->aio); free_funp(here->fun->f.fp); free_cb(here->fun); free_req(here); } else return; } } #ifdef F_ASYNC_READ void f_async_read(){ function_to_call_t *cb = get_cb(); process_efun_callback(1, cb, F_ASYNC_READ); cb->f.fp->hdr.ref++; add_read(check_valid_path((sp-1)->u.string, current_object, "read_file", 0), cb); pop_2_elems(); } #endif #ifdef F_ASYNC_WRITE void f_async_write(){ function_to_call_t *cb = get_cb(); process_efun_callback(3, cb, F_ASYNC_WRITE); cb->f.fp->hdr.ref++; add_write(check_valid_path((sp-3)->u.string, current_object, "write_file", 1), (sp-2)->u.string, strlen((sp-2)->u.string), (sp-1)->u.number, cb); pop_n_elems(4); } #endif #ifdef F_ASYNC_GETDIR void f_async_getdir(){ function_to_call_t *cb = get_cb(); process_efun_callback(1, cb, F_ASYNC_READ); cb->f.fp->hdr.ref++; add_getdir(check_valid_path((sp-1)->u.string, current_object, "get_dir", 0), cb); pop_2_elems(); } #endif #ifdef F_ASYNC_DB_EXEC void f_async_db_exec(){ array_t *info; db_t *db; info = allocate_empty_array(1); info->item[0].type = T_STRING; info->item[0].subtype = STRING_MALLOC; info->item[0].u.string = string_copy((sp-1)->u.string, "f_db_exec"); valid_database("exec", info); db = find_db_conn((sp-1)->u.number); if (!db) { error("Attempt to exec on an invalid database handle\n"); } if (db->type->cleanup) { db->type->cleanup(&(db->c)); } function_to_call_t *cb = get_cb(); process_efun_callback(2, cb, F_ASYNC_READ); cb->f.fp->hdr.ref++; add_db_exec(db, (sp-1)->u.string, cb); pop_2_elems(); } #endif #endif