#include "std.h"
#include "../lpc_incl.h"
#include "async.h"
#include "../function.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define _GNU_SOURCE
#include <sys/syscall.h>
#include "../config.h"
#include "../interpret.h"
#include "../file.h"
#if defined(F_ASYNC_READ) || defined(F_ASYNC_WRITE)
static struct request *reqs = NULL;
void add_req(struct request *req){
req->next = reqs;
reqs = req;
}
#if defined PACKAGE_COMPRESS && defined linux
#include <pthread.h>
#include <zlib.h>
void *gzreadthread(void *data){
aiob *aio = (aiob *)data;
void *file = gzdopen(dup(aio->aio_fildes), "rb");
aio->__return_value = gzread(file, (void *)(aio->aio_buf), aio->aio_nbytes);
aio->__error_code = 0;
gzclose(file);
return NULL;
}
int aio_gzread(aiob *aio){
pthread_t thread;
aio->__error_code = EINPROGRESS;
pthread_create(&thread, NULL, gzreadthread, aio);
return 0;
}
void *gzwritethread(void *data){
aiob *aio = (aiob *)data;
void *file = gzdopen(dup(aio->aio_fildes), "wb");
aio->__return_value = gzwrite(file, (void *)(aio->aio_buf), aio->aio_nbytes);
aio->__error_code = 0;
gzclose(file);
return NULL;
}
int aio_gzwrite(aiob *aio){
pthread_t thread;
aio->__error_code = EINPROGRESS;
pthread_create(&thread, NULL, gzwritethread, aio);
return 0;
}
#endif
#ifdef F_ASYNC_GETDIR
void *getdirthread(void *data){
aiob *aio = (aiob *)data;
aio->__return_value = syscall(SYS_getdents, aio->aio_fildes, aio->aio_buf, aio->aio_nbytes);
aio->__error_code = 0;
return NULL;
}
int aio_getdir(aiob *aio){
pthread_t *thread = (pthread_t *)MALLOC(sizeof(pthread_t));
aio->__error_code = EINPROGRESS;
pthread_create(thread, NULL, getdirthread, aio);
FREE(thread); //like WE care
return 0;
}
#endif
int add_read(const char *fname, function_to_call_t *fun) {
if (fname) {
aiob *aio= (aiob *)MALLOC(sizeof(aiob));
memset(aio, 0, sizeof(aiob));
//printf("fname: %s\n", fname);
int fd = open(fname, O_RDONLY);
aio->aio_fildes = fd;
aio->aio_buf = (char *)MALLOC(READ_FILE_MAX_SIZE);
aio->aio_nbytes = READ_FILE_MAX_SIZE;
struct request *req = (struct request *)MALLOC(sizeof(struct request));
req->aio = aio;
req->fun = fun;
req->type = aread;
add_req(req);
#if defined PACKAGE_COMPRESS && defined linux
return aio_gzread(aio);
#else
return aio_read(aio);
#endif
}
return 1;
}
#ifdef F_ASYNC_GETDIR
extern int max_array_size;
int add_getdir(const char *fname, function_to_call_t *fun) {
if (fname) {
aiob *aio= (aiob *)MALLOC(sizeof(aiob));
memset(aio, 0, sizeof(aiob));
//printf("fname: %s\n", fname);
int fd = open(fname, O_RDONLY);
aio->aio_fildes = fd;
aio->aio_buf = (char *)MALLOC(sizeof(struct dirent) * max_array_size);
aio->aio_nbytes = sizeof(struct dirent) * max_array_size;
struct request *req = (struct request *)MALLOC(sizeof(struct request));
req->aio = aio;
req->fun = fun;
req->type = agetdir;
add_req(req);
return aio_getdir(aio);
}
return 1;
}
#endif
int add_write(const char *fname, char *buf, int size, char flags, function_to_call_t *fun) {
if (fname) {
aiob *aio = (aiob *)MALLOC(sizeof(aiob));
memset(aio, 0, sizeof(aiob));
int fd = open(fname, flags & 1 ? O_CREAT|O_WRONLY
: O_CREAT|O_WRONLY|O_APPEND, S_IRWXU|S_IRWXG);
aio->aio_fildes = fd;
aio->aio_buf = buf;
aio->aio_nbytes = size;
struct request *req = (struct request *)MALLOC(sizeof(struct request));
req->aio = aio;
req->fun = fun;
req->type = awrite;
add_req(req);
#if defined PACKAGE_COMPRESS && defined linux
if(flags & 2)
return aio_gzwrite(aio);
else
#endif
return aio_write(aio);
}
FREE(buf);
return 1;
}
void handle_read(struct request *req, int val){
aiob *aio = req->aio;
close(aio->aio_fildes);
if(val){
push_number(val);
safe_call_efun_callback(req->fun, 1);
return;
}
val = aio_return(aio);
if(val < 0){
push_number(val);
safe_call_efun_callback(req->fun, 1);
return;
}
char *file = new_string(val, "read_file_async: str");
memcpy(file, (char *)(aio->aio_buf), val);
file[val]=0;
push_malloced_string(file);
safe_call_efun_callback(req->fun, 1);
}
#ifdef F_ASYNC_GETDIR
void handle_getdir(struct request *req, int val){
aiob *aio = req->aio;
close(aio->aio_fildes);
val = aio_return(aio);
array_t *ret = allocate_empty_array(val);
int i;
if(val > -1)
{
struct dirent *de = (struct dirent *)aio->aio_buf;
for(i=0; ((char *)de) - (char *)(aio->aio_buf) < val; i++)
{
svalue_t *vp = &(ret->item[i]);
vp->type = T_STRING;
vp->subtype = STRING_MALLOC;
//printf("%s ", de->d_name);
vp->u.string = string_copy((de->d_name - 1), "encode_stat"); //hmm, wrong struct??
de = (struct dirent *)(((char *)de) + de->d_reclen);
}
}
ret = RESIZE_ARRAY(ret, i);
ret->size = i;
push_refed_array(ret);
safe_call_efun_callback(req->fun, 1);
}
#endif
void handle_write(struct request *req, int val){
aiob *aio = req->aio;
close(aio->aio_fildes);
if(val){
push_number(val);
safe_call_efun_callback(req->fun, 1);
return;
}
val = aio_return(aio);
if(val < 0){
push_number(val);
safe_call_efun_callback(req->fun, 1);
return;
}
push_undefined();
safe_call_efun_callback(req->fun, 1);
}
void check_reqs() {
struct request **check = &reqs;
while (*check) {
struct request *here = *check;
int val = aio_error((*check)->aio);
if (val != EINPROGRESS) {
enum atypes type = ((*check)->type);
(*check)->type = done;
switch (type) {
case aread:
handle_read(here, val);
break;
case awrite:
handle_write(here, val);
break;
#ifdef F_ASYNC_GETDIR
case agetdir:
handle_getdir(here, val);
break;
#endif
case done:
//must have had an error while handling it before.
break;
default:
fatal("unknown async type\n");
}
while(*check != here)
check = &(*check)->next;
*check = (*check)->next;
FREE((char *)(here->aio->aio_buf));
FREE(here->aio);
free_funp(here->fun->f.fp);
FREE(here->fun);
FREE(here);
} else {
struct request *tmp = *check;
if(tmp->next)
check = &((*check)->next);
else
return;
}
}
}
#ifdef F_ASYNC_READ
void f_async_read(){
function_to_call_t *cb = (function_to_call_t *)MALLOC(sizeof(function_to_call_t));
memset(cb, 0, sizeof(function_to_call_t));
process_efun_callback(1, cb, F_ASYNC_READ);
cb->f.fp->hdr.ref++;
add_read(check_valid_path((sp-1)->u.string, current_object, "read_file", 0), cb);
pop_2_elems();
}
#endif
#ifdef F_ASYNC_WRITE
void f_async_write(){
char *buf = (char *)MALLOC(strlen((sp-2)->u.string)+1);
strcpy(buf, (sp-2)->u.string);
function_to_call_t *cb = (function_to_call_t *)MALLOC(sizeof(function_to_call_t));
memset(cb, 0, sizeof(function_to_call_t));
process_efun_callback(3, cb, F_ASYNC_WRITE);
cb->f.fp->hdr.ref++;
add_write(check_valid_path((sp-3)->u.string, current_object, "write_file", 1), buf, strlen(buf), (sp-1)->u.number, cb);
pop_n_elems(4);
}
#endif
#ifdef F_ASYNC_GETDIR
void f_async_getdir(){
function_to_call_t *cb = (function_to_call_t *)MALLOC(sizeof(function_to_call_t));
memset(cb, 0, sizeof(function_to_call_t));
process_efun_callback(1, cb, F_ASYNC_READ);
cb->f.fp->hdr.ref++;
add_getdir(check_valid_path((sp-1)->u.string, current_object, "get_dir", 0), cb);
pop_2_elems();
}
#endif
#endif