diff options
author | Jonas Karlsson <[email protected]> | 2014-02-21 14:03:00 +0100 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2014-02-24 15:16:09 +0100 |
commit | 06e55b6f2ac30c95717532a259a6148226f63b24 (patch) | |
tree | 4f522b0db240e82e955c2169a9372606c7ab5fcb /erts/emulator | |
parent | 4a6850e522b91eb009ddd0ed9d9f542f1baf1bee (diff) | |
download | otp-06e55b6f2ac30c95717532a259a6148226f63b24.tar.gz otp-06e55b6f2ac30c95717532a259a6148226f63b24.tar.bz2 otp-06e55b6f2ac30c95717532a259a6148226f63b24.zip |
ose: Updating fd_driver and spawn_driver for OSE
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/sys/ose/sys.c | 1339 |
1 files changed, 650 insertions, 689 deletions
diff --git a/erts/emulator/sys/ose/sys.c b/erts/emulator/sys/ose/sys.c index 88dbd7fcf8..c892cc69c7 100644 --- a/erts/emulator/sys/ose/sys.c +++ b/erts/emulator/sys/ose/sys.c @@ -49,6 +49,9 @@ #include "unistd.h" #include "efs.h" #include "erl_printf.h" +#include "aio.h" +#include "pm.h" +#include "fcntl.h" /* Set the define to 1 to get some logging */ #if 0 @@ -60,6 +63,7 @@ extern char **environ; static erts_smp_rwmtx_t environ_rwmtx; +static PROCESS sig_proxy_pid = 0; #define MAX_VSIZE 16 /* Max number of entries allowed in an I/O * vector sock_sendv(). @@ -80,31 +84,44 @@ static erts_smp_rwmtx_t environ_rwmtx; typedef struct ErtsSysReportExit_ ErtsSysReportExit; struct ErtsSysReportExit_ { - ErtsSysReportExit *next; - Eterm port; - int pid; - int ifd; - int ofd; - ErlDrvEvent in_sig_descr; - ErlDrvEvent out_sig_descr; + ErtsSysReportExit *next; + Eterm port; + int pid; + int ifd; + int ofd; + ErlDrvEvent attach_event; + ErlDrvEvent input_event; + ErlDrvEvent output_event; }; /* This data is shared by these drivers - initialized by spawn_init() */ static struct driver_data { - ErlDrvPort port_num; - int ofd, packet_bytes; - ErtsSysReportExit *report_exit; - int pid; - int alive; - int status; - ErlDrvEvent in_sig_descr; - ErlDrvEvent out_sig_descr; - PROCESS in_proc; - PROCESS out_proc; - ErlDrvPDL pdl; + ErlDrvPort port_num; + int ofd; + int ifd; + int packet_bytes; + ErtsSysReportExit *report_exit; + int pid; + int alive; + int status; + ErlDrvEvent input_event; + ErlDrvEvent output_event; + struct aiocb aiocb; + FmHandle handle; + char *install_handle; } *driver_data; /* indexed by fd */ +struct async { + SIGSELECT signo; + ErlDrvTermData port; + ErlDrvTermData proc; + PROCESS spid; + PROCESS target; + Uint32 ref; +}; + static ErtsSysReportExit *report_exit_list; +static ERTS_INLINE void report_exit_status(ErtsSysReportExit *rep, int status); extern int driver_interrupt(int, int); extern void do_break(void); @@ -157,6 +174,66 @@ erts_mtx_t chld_stat_mtx; static volatile int children_died; #endif +#define SET_AIO(REQ,FD,SIZE,BUFF) \ + memset(&(REQ),0,sizeof(REQ)); \ + (REQ).aio_fildes = FD; \ + (REQ).aio_offset = FM_POSITION_CURRENT; \ + (REQ).aio_nbytes = SIZE; \ + (REQ).aio_buf = BUFF; \ + (REQ).aio_sigevent.sigev_notify = SIGEV_NONE + +/* the first sizeof(struct aiocb *) bytes of the write buffer + * will contain the pointer to the aiocb struct, this needs + * to be freed between asynchronous writes. + * A write of 0 bytes is ignored. */ +#define WRITE_AIO(FD,SIZE,BUFF) do { \ + if (SIZE > 0) { \ + struct aiocb *write_req = driver_alloc(sizeof(struct aiocb)); \ + char *write_buff = driver_alloc((sizeof(char)*SIZE)+1+ \ + (sizeof(struct aiocb *))); \ + *(struct aiocb **)write_buff = (struct aiocb *)write_req; \ + write_buff += sizeof(struct aiocb *); \ + memcpy(write_buff,BUFF,SIZE+1); \ + SET_AIO(*write_req,FD,SIZE,write_buff); \ + aio_write(write_req); \ + } \ +} while(0) + +/* free the write_buffer and write_req + * created in the WRITE_AIO() request macro */ +#define FREE_AIO(ptr) do { \ + struct aiocb *aiocb_ptr; \ + char *buffer_ptr; \ + aiocb_ptr = *(struct aiocb **)((ptr)-sizeof(struct aiocb *)); \ + buffer_ptr = (((char*)ptr)-sizeof(struct aiocb *)); \ + driver_free(aiocb_ptr); \ + driver_free(buffer_ptr); \ +} while(0) + +/* When we have several schedulers, we need to make sure + * that scheduler issuing aio_dispatch() is the owner on the signal */ +#define DISPATCH_AIO(sig) do { \ + restore(sig); \ + aio_dispatch(sig); \ + } while(0) + + +/* debug print macros */ +#define DEBUG_RES 0 + +#ifdef DEBUG_RES +#define DEBUG_CHECK_RES(actual, expected) \ + do { \ + if (actual != expected ) { \ + ramlog_printf("Result check failed" \ + " got: 0x%08x expected:0x%08x\nat: %s:%d\n", \ + actual, expected, __FILE__, __LINE__); \ + abort(); /* This might perhaps be too harsh? */ \ + } \ + } while(0) +#else +#define DEBUG_CHECK_RES +#endif static struct fd_data { char pbuf[4]; /* hold partial packet bytes */ @@ -191,6 +268,7 @@ static struct termios initial_tty_mode; static int replace_intr = 0; /* assume yes initially, ttsl_init will clear it */ int using_oldshell = 1; +static PROCESS get_signal_proxy_pid(void); static void init_check_io(void) @@ -540,28 +618,13 @@ void fini_getenv_state(GETENV_STATE *state) /************************** Port I/O *******************************/ - - /* I. Common stuff */ -typedef struct SysDriverAsyncSignal_ { - SIGSELECT sig_no; - int type; - byte *buff; - ssize_t res; - int errno_copy; -} SysDriverAsyncSignal; - -typedef struct SysDriverConfSignal_ { - SIGSELECT sig_no; - int fd; - PROCESS parent; -} SysDriverConfSignal; - union SIGNAL { SIGSELECT sig_no; - SysDriverAsyncSignal sys_async; - SysDriverConfSignal conf_async; + struct FmReadPtr fm_read_reply; + struct FmWritePtr fm_write_reply; + struct async async; }; /* II. The spawn/fd drivers */ @@ -582,14 +645,42 @@ static void erl_stop(ErlDrvData); static void ready_input(ErlDrvData, ErlDrvEvent); static void ready_output(ErlDrvData, ErlDrvEvent); static void output(ErlDrvData, char*, ErlDrvSizeT); -static void outputv(ErlDrvData, ErlIOVec*); static void stop_select(ErlDrvEvent, void*); -static ErlDrvOseEventId resolve_signal(union SIGNAL* sig) { - return sig->sig_no == ERTS_SIGNAL_FD_DRV_ASYNC ? sig->sys_async.type : -1; + +static PROCESS +get_signal_proxy_pid(void) { + union SIGNAL *sig; + SIGSELECT any_sig[] = {0}; + + if (!sig_proxy_pid) { + sig = alloc(sizeof(union SIGNAL), ERTS_SIGNAL_OSE_DRV_ATTACH); + hunt("ose_signal_driver_proxy", 0, NULL, &sig); + sig = receive(any_sig); + sig_proxy_pid = sender(&sig); + free_buf(&sig); + } + ASSERT(sig_proxy_pid); + return sig_proxy_pid; } -OS_PROCESS(fd_writer_process); -OS_PROCESS(fd_reader_process); +static ErlDrvOseEventId +resolve_signal(union SIGNAL* sig) { + switch(sig->sig_no) { + + case FM_READ_PTR_REPLY: + return (ErlDrvOseEventId)sig->fm_read_reply.handle; + + case FM_WRITE_PTR_REPLY: + return (ErlDrvOseEventId)sig->fm_write_reply.handle; + + case ERTS_SIGNAL_OSE_DRV_ATTACH: + return (ErlDrvOseEventId)sig->async.target; + + default: + break; + } + return (ErlDrvOseEventId)-1; +} struct erl_drv_entry spawn_driver_entry = { spawn_init, @@ -627,7 +718,7 @@ struct erl_drv_entry fd_driver_entry = { NULL, fd_control, NULL, - outputv, + NULL, NULL, /* ready_async */ NULL, /* flush */ NULL, /* call */ @@ -641,120 +732,167 @@ struct erl_drv_entry fd_driver_entry = { stop_select }; -static int set_driver_data(ErlDrvPort port_num, +static void +set_spawn_fd(int local_fd, int remote_fd, PROCESS remote_pid) { + PROCESS vm_pid; + FmHandle handle; + char env_val[55]; + char env_name[10]; + EfsStatus efs_res; + + /* get pid of pipevm and handle of chosen fd */ + efs_res = efs_examine_fd(local_fd, FLIB_FD_VMPID, &vm_pid, 0); + DEBUG_CHECK_RES(efs_res, EFS_SUCCESS); + + /* setup the file descriptor to buffer per line */ + efs_res = efs_config_fd(local_fd, FLIB_FD_BUFMODE, FM_BUFF_LINE, + FLIB_FD_BUFSIZE, 80, 0); + DEBUG_CHECK_RES(efs_res, EFS_SUCCESS); + + /* duplicate handle and set spawn pid owner */ + efs_res = efs_dup_to(local_fd, remote_pid, &handle); + DEBUG_CHECK_RES(efs_res, EFS_SUCCESS); + + sprintf(env_name, "FD%d", remote_fd); + + /* Syntax of the environment variable: + * "FD#" "<pid of pipevm>,<handle>,<buffer mode>,<buff size>,<omode>" */ + sprintf(env_val, "0x%lx,0x%lx,%lu,%lu,0x%x", + vm_pid, handle, + FM_BUFF_LINE, 80, + O_APPEND); + + set_env(remote_pid, env_name, env_val); +} + +static ErlDrvData +set_driver_data(ErlDrvPort port_num, int ifd, int ofd, int packet_bytes, int read_write, int exit_status, - int pid) + PROCESS pid) { Port *prt; ErtsSysReportExit *report_exit; - union SIGNAL *sig; - - /*erts_fprintf(stderr, " %s / pid %x / ofd %d / ifd %d\n", - __FUNCTION__, current_process(), ofd, ifd);*/ - - - if (!exit_status) - report_exit = NULL; - else { - report_exit = erts_alloc(ERTS_ALC_T_PRT_REP_EXIT, - sizeof(ErtsSysReportExit)); - report_exit->next = report_exit_list; - report_exit->port = erts_drvport2id(port_num); - report_exit->pid = pid; - report_exit->ifd = read_write & DO_READ ? ifd : -1; - report_exit->ofd = read_write & DO_WRITE ? ofd : -1; - - if (read_write & DO_READ) - report_exit->in_sig_descr = - erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC, ifd, - resolve_signal); - if (read_write & DO_WRITE) - report_exit->out_sig_descr = - erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC, ofd, - resolve_signal); - - report_exit_list = report_exit; - } prt = erts_drvport2port(port_num); - if (prt != ERTS_INVALID_ERL_DRV_PORT) - prt->os_pid = pid; + if (prt != ERTS_INVALID_ERL_DRV_PORT) { + prt->os_pid = pid; + } + /* READ */ if (read_write & DO_READ) { - driver_data[ifd].packet_bytes = packet_bytes; - driver_data[ifd].port_num = port_num; - driver_data[ifd].report_exit = report_exit; - driver_data[ifd].pid = pid; - driver_data[ifd].alive = 1; - driver_data[ifd].status = 0; - driver_data[ifd].in_sig_descr = - erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC,ifd, - resolve_signal); - - driver_data[ifd].in_proc = create_process(OS_PRI_PROC,"beam_fd_reader", - fd_reader_process, 0x800, - FD_PROC_PRI, 0, 0, - NULL, 0, 0); - efs_clone(driver_data[ifd].in_proc); - sig = alloc(sizeof(SysDriverConfSignal), ERTS_SIGNAL_FD_DRV_CONFIG); - sig->conf_async.fd = ifd; - sig->conf_async.parent = current_process(); - send(&sig, driver_data[ifd].in_proc); - start(driver_data[ifd].in_proc); - - if (read_write & DO_WRITE) { - driver_data[ifd].ofd = ofd; - driver_data[ifd].out_sig_descr = - erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC,ofd, - resolve_signal); - driver_data[ifd].pdl = driver_pdl_create(port_num); - driver_data[ifd].out_proc = - create_process(OS_PRI_PROC,"beam_fd_writer", - fd_writer_process, 0x800, - FD_PROC_PRI, 0, 0, NULL, 0, 0); - sig = alloc(sizeof(SysDriverConfSignal), ERTS_SIGNAL_FD_DRV_CONFIG); - sig->conf_async.fd = ofd; - sig->conf_async.parent = current_process(); - send(&sig, driver_data[ifd].out_proc); - // efs_clone(driver_data[ifd].out_proc); - start(driver_data[ifd].out_proc); - if (ifd != ofd) - driver_data[ofd] = driver_data[ifd]; /* structure copy */ - } else { /* DO_READ only */ - driver_data[ifd].ofd = -1; - } - (void) driver_select(port_num, driver_data[ifd].in_sig_descr, + efs_examine_fd(ifd, FLIB_FD_HANDLE, &driver_data[ifd].handle, 0); + driver_data[ifd].ifd = ifd; + driver_data[ifd].packet_bytes = packet_bytes; + driver_data[ifd].port_num = port_num; + driver_data[ifd].pid = pid; + + /* async read struct */ + memset(&driver_data[ifd].aiocb, 0, sizeof(struct aiocb)); + driver_data[ifd].aiocb.aio_buf = driver_alloc(255); + driver_data[ifd].aiocb.aio_fildes = ifd; + driver_data[ifd].aiocb.aio_nbytes = 255; + + driver_data[ifd].alive = 1; + driver_data[ifd].status = 0; + driver_data[ifd].input_event = + erl_drv_ose_event_alloc(FM_READ_PTR_REPLY, + driver_data[ifd].handle, resolve_signal, + &driver_data[ifd].ifd); + + /* READ & WRITE */ + if (read_write & DO_WRITE) { + driver_data[ifd].ofd = ofd; + efs_examine_fd(ofd, FLIB_FD_HANDLE, &driver_data[ofd].handle, 0); + + driver_data[ifd].output_event = + erl_drv_ose_event_alloc(FM_WRITE_PTR_REPLY, + driver_data[ofd].handle, resolve_signal, + &driver_data[ofd].ofd); + driver_data[ofd].pid = pid; + if (ifd != ofd) { + driver_data[ofd] = driver_data[ifd]; + driver_data[ofd].aiocb.aio_buf = NULL; + } + } + else { /* READ ONLY */ + driver_data[ifd].ofd = -1; + } + + /* enable input event */ + (void) driver_select(port_num, driver_data[ifd].input_event, (ERL_DRV_READ | ERL_DRV_USE), 1); - return(ifd); - } else { /* DO_WRITE only */ - driver_data[ofd].packet_bytes = packet_bytes; - driver_data[ofd].port_num = port_num; - driver_data[ofd].report_exit = report_exit; - driver_data[ofd].ofd = ofd; - driver_data[ofd].pid = pid; - driver_data[ofd].alive = 1; - driver_data[ofd].status = 0; - driver_data[ofd].in_sig_descr = - erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC,ofd, - resolve_signal); - driver_data[ofd].out_sig_descr = driver_data[ofd].in_sig_descr; - driver_data[ofd].out_proc = - create_process(OS_PRI_PROC, "beam_fd_writer", - fd_writer_process, 0x800, - FD_PROC_PRI, 0, 0, NULL, 0, 0); - sig = alloc(sizeof(SysDriverConfSignal), ERTS_SIGNAL_FD_DRV_CONFIG); - sig->conf_async.fd = ofd; - sig->conf_async.parent = current_process(); - send(&sig, driver_data[ofd].out_proc); - start(driver_data[ofd].out_proc); - //efs_clone(driver_data[ifd].out_proc); - driver_data[ofd].pdl = driver_pdl_create(port_num); - return(ofd); + + aio_read(&driver_data[ifd].aiocb); + } + else { /* WRITE ONLY */ + efs_examine_fd(ofd, FLIB_FD_HANDLE, &driver_data[ofd].handle, 0); + driver_data[ofd].packet_bytes = packet_bytes; + driver_data[ofd].port_num = port_num; + driver_data[ofd].ofd = ofd; + driver_data[ofd].pid = pid; + driver_data[ofd].alive = 1; + driver_data[ofd].status = 0; + driver_data[ofd].output_event = + erl_drv_ose_event_alloc(FM_WRITE_PTR_REPLY, driver_data[ofd].handle, + resolve_signal, &driver_data[ofd].ofd); + driver_data[ofd].input_event = driver_data[ofd].output_event; } + + /* this is used for spawned load modules, and is needed + * to properly uninstall them */ + if (exit_status) { + struct PmProgramInfo *info; + int install_handle_size; + union SIGNAL *sig; + PmStatus pm_status; + report_exit = erts_alloc(ERTS_ALC_T_PRT_REP_EXIT, + sizeof(ErtsSysReportExit)); + report_exit->next = report_exit_list; + report_exit->port = erts_drvport2id(port_num); + report_exit->pid = pid; + report_exit->ifd = (read_write & DO_READ) ? ifd : -1; + report_exit->ofd = (read_write & DO_WRITE) ? ofd : -1; + report_exit_list = report_exit; + report_exit->attach_event = + erl_drv_ose_event_alloc(ERTS_SIGNAL_OSE_DRV_ATTACH, pid, + resolve_signal, &driver_data[ifd].ifd); + + /* setup ifd and ofd report exit */ + driver_data[ifd].report_exit = report_exit; + driver_data[ofd].report_exit = report_exit; + + pm_status = ose_pm_program_info(pid, &info); + DEBUG_CHECK_RES(pm_status, PM_SUCCESS); + + install_handle_size = strlen(info->install_handle)+1; + driver_data[ifd].install_handle = driver_alloc(install_handle_size); + strcpy(driver_data[ifd].install_handle, + info->install_handle); + + free_buf((union SIGNAL **)&info); + + sig = alloc(sizeof(struct async), ERTS_SIGNAL_OSE_DRV_ATTACH); + sig->async.target = pid; + send(&sig, get_signal_proxy_pid()); + + /* this event will trigger when we receive an attach signal + * from the recently dead load module */ + (void)driver_select(port_num,report_exit->attach_event, DO_READ, 1); + } + else { + report_exit = NULL; + } + + /* the return value is the pointer to the driver_data struct we created + * in this function, it will be used in the drivers input + * and output functions */ + return (ErlDrvData)((!(read_write & DO_READ) && read_write & DO_WRITE) + ? &driver_data[ofd] + : &driver_data[ifd]); } static int spawn_init() @@ -772,9 +910,9 @@ static int spawn_init() return 1; } -static void init_fd_data(int fd, ErlDrvPort port_num) +static void +init_fd_data(int fd, ErlDrvPort port_num) { - fd_data[fd].buf = NULL; fd_data[fd].cpos = NULL; fd_data[fd].remain = 0; @@ -782,173 +920,109 @@ static void init_fd_data(int fd, ErlDrvPort port_num) fd_data[fd].psz = 0; } -static ErlDrvData spawn_start(ErlDrvPort port_num, - char* name, - SysDriverOpts* opts) +/* FIXME write a decent text on pipes on ose */ +static ErlDrvData +spawn_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts) { - - long res = 0; - - /* Have to implement for OSE */ - return (ErlDrvData)res; -} - -OS_PROCESS(fd_reader_process) { - union SIGNAL *sig; - PROCESS parent; - int fd; - byte *read_buf; - - SIGSELECT sigsel[] = {1,ERTS_SIGNAL_FD_DRV_CONFIG}; - -#ifdef ERTS_ENABLE_LOCK_COUNT - erts_lcnt_init(); -#endif - - sig = receive(sigsel); - - fd = sig->conf_async.fd; - - parent = sig->conf_async.parent; - free_buf(&sig); - -#ifdef ERTS_ENABLE_LOCK_CHECK - { - char buf[31]; - erts_snprintf(&buf[0], 31, "fd_reader %beu", fd); - erts_lc_set_thread_name(&buf[0]); + int ifd[2]; + int ofd[2]; + static uint32_t ticker = 0; + PmStatus pm_status; + OSDOMAIN domain = PM_NEW_DOMAIN; + PROCESS progpid, mainbid, mainpid; + char *handle = NULL; + struct PmProgramInfo *info; + char *args = NULL; + char *tmp_handle; + ErlDrvData res = (ErlDrvData)-1; + int handle_size; + char *ptr; + + /* handle arguments */ + ptr = strchr(name, ' '); + if (ptr != NULL) { + *ptr ='\0'; + ptr++; + args = ptr; } -#endif - - if (fd == 0) { - FILE *ffd = stdin; - (void)stdin; + else { + args = NULL; } - sigsel[1] = ERTS_SIGNAL_FD_DRV_ASYNC; - - read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF, - ERTS_SYS_READ_BUF_SZ); - while (1) { - int errno_copy = errno; - ssize_t res; - res = read(fd, read_buf, ERTS_SYS_READ_BUF_SZ); - sig = alloc(sizeof(SysDriverAsyncSignal), ERTS_SIGNAL_FD_DRV_ASYNC); - sig->sys_async.buff = read_buf; - sig->sys_async.res = res; - if (res <= 0 && errno == EBADF) { - fprintf(stderr,"Could not read from input fd (fd %d/ errno %d/ res %d)\n", - fd, errno, res); - break; - } - if (errno != errno_copy) - sig->sys_async.errno_copy = errno; - else - sig->sys_async.errno_copy = -1; - sig->sys_async.type = fd; - send(&sig,parent); - /* Wait for acc from async_read */ - sig = receive(sigsel); - free_buf(&sig); + /* create an install handle */ + ptr = strrchr(name, '/'); + if (ptr != NULL) { + ptr++; + tmp_handle = ptr; + } + else { + tmp_handle = name; + } + handle_size = strlen(tmp_handle)+1; + handle_size += (ticker<10)?3:((ticker<100)?4:5); + + handle = driver_alloc(handle_size); + snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker); + + do { + snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker++); + pm_status = ose_pm_install_load_module(0, "ELF", name, handle, + 0, 0, NULL); + + } while (pm_status == PM_EINSTALL_HANDLE_ALREADY_INSTALLED); + DEBUG_CHECK_RES(pm_status, PM_SUCCESS); + + /* Create Program */ + pm_status = ose_pm_create_program(&domain, handle, 0, 0, + NULL, &progpid, &mainbid); + DEBUG_CHECK_RES(pm_status, PM_SUCCESS); + + /* Get the mainpid from the newly created program */ + pm_status = ose_pm_program_info(progpid, &info); + DEBUG_CHECK_RES(pm_status, PM_SUCCESS); + + mainpid = info->main_process; + free_buf ((union SIGNAL **)&info); + + /* pipevm needs to be started + * pipe will return 0 if success, -1 if not, + * errno will be set */ + if (pipe(ifd) != 0 || pipe(ofd) != 0) { + DEBUG_CHECK_RES(0, -1); + ASSERT(0); } - erts_free(ERTS_ALC_T_SYS_READ_BUF, read_buf); -} - -OS_PROCESS(fd_writer_process) { - union SIGNAL *sig; - PROCESS parent; - int fd; - SIGSELECT sigsel[] = { 1, ERTS_SIGNAL_FD_DRV_CONFIG, - ERTS_SIGNAL_FD_DRV_ASYNC }; - /* Only wait for config event with the fd which we are printing to */ - sig = receive(sigsel); + /* setup driver data */ + res = set_driver_data(port_num, ofd[0], ifd[1], opts->packet_bytes, + opts->read_write, 1 /* opts->exit_status */, progpid); - fd = sig->conf_async.fd; - parent = sig->conf_async.parent; - free_buf(&sig); + /* init the fd_data array for read/write */ + init_fd_data(ofd[0], port_num); + init_fd_data(ifd[1], port_num); -#ifdef ERTS_ENABLE_LOCK_COUNT - { - char buf[31]; - erts_snprintf(&buf[0], 31, "fd_writer %beu", fd); - erts_lc_set_thread_name(&buf[0]); + /* setup additional configurations + * for the spawned applications environment */ + if (args != NULL) { + set_env(progpid, "ARGV", args); } -#endif + set_env(mainbid, "EFS_RESOLVE_TMO", 0); + set_spawn_fd(ifd[0], 0, mainpid); + set_spawn_fd(ofd[1], 1, mainpid); + set_spawn_fd(ofd[1], 2, mainpid); - sigsel[0] = 2; - /* Why do I need these?!? */ - if (fd == 1) { - FILE* ffd = stdout; - (void)stdout; - } else if (fd == 2) { - FILE* ffd = stderr; - (void)stderr; - } + /* start the spawned program */ + pm_status = ose_pm_start_program(mainbid); + DEBUG_CHECK_RES(pm_status, PM_SUCCESS); - while (1) { - int errno_copy = errno; - int res; - SysIOVec *iov0; - SysIOVec *iov; - int iovlen; - int iovcnt; - int n = 0, i = 0, offset = 0; - size_t p; - /* fprintf(stderr,"0x%x: fd_writer, receive\n", current_process()); */ - sig = receive(sigsel); - /* size = sig->sys_async.res;*/ - if (sig->sig_no == ERTS_SIGNAL_FD_DRV_CONFIG) - return; - driver_pdl_lock(driver_data[fd].pdl); - - iov0 = driver_peekq(driver_data[fd].port_num, &iovlen); - - /* Calculate iovcnt */ - for (p = 0, iovcnt = 0; iovcnt < iovlen; - p += iov0[iovcnt++].iov_len) - ; - iov = driver_alloc(sizeof(SysIOVec) * iovcnt); - memcpy(iov, iov0, iovcnt * sizeof(SysIOVec)); - /* Let go of lock until we deque from original vector */ - driver_pdl_unlock(driver_data[fd].pdl); - - if (iovlen > 0) { - while(i < iovcnt) { - /* We only write 256 chars at the time in order to - not overflow the stdout process */ - if ((iov[i].iov_len-offset) > 256) { - res = write(fd, iov[i].iov_base+offset, 256); - if (res < 0) - break; - offset += res; - } else { - res = write(fd, iov[i].iov_base+offset, iov[i].iov_len-offset); - if (res < 0) - break; - offset = 0; - i++; - } - n += res; - } - if (res > 0) - res = n; - } else if (iovlen == 0) { - res = 0; - } else { /* Port has terminated */ - res = -1; - } - driver_free(iov); - - sig->sys_async.buff = NULL; - sig->sys_async.res = res; - if (errno != errno_copy) - sig->sys_async.errno_copy = errno; - else - sig->sys_async.errno_copy = -1; - sig->sys_async.type = fd; - send(&sig, parent); + /* close unused fd's */ + close(ifd[0]); + close(ofd[1]); + + if (handle) { + driver_free(handle); } + + return (ErlDrvData)res; } #define FD_DEF_HEIGHT 24 @@ -974,13 +1048,13 @@ static ErlDrvSSizeT fd_control(ErlDrvData drv_data, char *buf, ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen) { - int fd = (int)(long)drv_data; + struct driver_data *data = (struct driver_data *)drv_data; char resbuff[2*sizeof(Uint32)]; switch (command) { case FD_CTRL_OP_GET_WINSIZE: { Uint32 w,h; - if (fd_get_window_size(fd,&w,&h)) + if (fd_get_window_size(data->ifd,&w,&h)) return 0; memcpy(resbuff,&w,sizeof(Uint32)); memcpy(resbuff+sizeof(Uint32),&h,sizeof(Uint32)); @@ -1008,7 +1082,7 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name, if (opts->read_write & DO_WRITE) { init_fd_data(opts->ofd, port_num); } - res = (ErlDrvData)(long)set_driver_data(port_num, opts->ifd, opts->ofd, + res = set_driver_data(port_num, opts->ifd, opts->ofd, opts->packet_bytes, opts->read_write, 0, -1); CHLD_STAT_UNLOCK; @@ -1031,404 +1105,317 @@ static void clear_fd_data(int fd) static void nbio_stop_fd(ErlDrvPort prt, ErlDrvEvent ev) { - int fd; + int *fd; driver_select(prt,ev,DO_READ|DO_WRITE,0); - erl_drv_ose_event_fetch(ev, NULL, &fd); - clear_fd_data(fd); - SET_BLOCKING(fd); + erl_drv_ose_event_fetch(ev, NULL, NULL, (void **)&fd); + clear_fd_data(*fd); + SET_BLOCKING(*fd); } -static void fd_stop(ErlDrvData fd) /* Does not close the fds */ +static void fd_stop(ErlDrvData drv_data) /* Does not close the fds */ { - int ofd; + struct driver_data *data = (struct driver_data *)drv_data; - nbio_stop_fd(driver_data[(int)(long)fd].port_num, - driver_data[(int)(long)fd].in_sig_descr); - ofd = driver_data[(int)(long)fd].ofd; - if (ofd != (int)(long)fd && ofd != -1) - nbio_stop_fd(driver_data[(int)(long)fd].port_num, - driver_data[(int)(long)fd].out_sig_descr); + if (data->ofd != -1) { + if (data->ifd != data->ofd) { /* read and write */ + nbio_stop_fd(data->port_num, data->input_event); + nbio_stop_fd(data->port_num, data->output_event); + } + else { /* write only */ + nbio_stop_fd(data->port_num, data->output_event); + } + } + else { /* read only */ + nbio_stop_fd(data->port_num, data->input_event); + } } -/* Note that driver_data[fd].ifd == fd if the port was opened for reading, */ -/* otherwise (i.e. write only) driver_data[fd].ofd = fd. */ - -static void erl_stop(ErlDrvData fd) -{ - ErlDrvPort prt; - int ofd; - - prt = driver_data[(int)(long)fd].port_num; - nbio_stop_fd(prt, driver_data[(int)(long)fd].in_sig_descr); - - ofd = driver_data[(int)(long)fd].ofd; - if (ofd != (int)(long)fd && (int)(long)ofd != -1) - nbio_stop_fd(prt, driver_data[(int)(long)fd].out_sig_descr); - else - ofd = -1; - - CHLD_STAT_LOCK; - - /* Mark as unused. */ - driver_data[(int)(long)fd].pid = -1; - - CHLD_STAT_UNLOCK; - /* SMP note: Close has to be last thing done (open file descriptors work - as locks on driver_data[] entries) */ - driver_select(prt, driver_data[(int)(long)fd].in_sig_descr, - ERL_DRV_USE, 0); /* close(fd); */ - if (ofd >= 0) { - driver_select(prt, driver_data[(int)(long)fd].out_sig_descr, - ERL_DRV_USE, 0); /* close(ofd); */ - } -} - -static void outputv(ErlDrvData e, ErlIOVec* ev) +static void erl_stop(ErlDrvData drv_data) { - int fd = (int)(long)e; - ErlDrvPort ix = driver_data[fd].port_num; - int pb = driver_data[fd].packet_bytes; - ErlDrvSizeT sz; - char lb[4]; - char* lbp; - ErlDrvSizeT len = ev->size; - - /* (len > ((unsigned long)-1 >> (4-pb)*8)) */ - /* if (pb >= 0 && (len & (((ErlDrvSizeT)1 << (pb*8))) - 1) != len) {*/ - if (((pb == 2) && (len > 0xffff)) || (pb == 1 && len > 0xff)) { - driver_failure_posix(ix, EINVAL); - return; /* -1; */ - } - /* Handles 0 <= pb <= 4 only */ - put_int32((Uint32) len, lb); - lbp = lb + (4-pb); - - ev->iov[0].iov_base = lbp; - ev->iov[0].iov_len = pb; - ev->size += pb; - driver_pdl_lock(driver_data[fd].pdl); - if ((sz = driver_sizeq(ix)) > 0) { - /* fprintf(stderr,"0x%x: outputv, enq\n", current_process()); */ - driver_enqv(ix, ev, 0); - if (sz + ev->size >= (1 << 13)) - set_busy_port(ix, 1); - driver_pdl_unlock(driver_data[fd].pdl); - } - else { - union SIGNAL *sig; - /* fprintf(stderr,"0x%x: outputv, enq+sel\n", current_process()); */ - driver_enqv(ix, ev, 0); /* n is the skip value */ - driver_pdl_unlock(driver_data[fd].pdl); - driver_select(ix, driver_data[fd].out_sig_descr, - ERL_DRV_WRITE|ERL_DRV_USE, 1); - sig = alloc(sizeof(SysDriverAsyncSignal),ERTS_SIGNAL_FD_DRV_ASYNC); - sig->sys_async.type = fd; - sig->sys_async.res = pb+len; - send(&sig,driver_data[fd].out_proc); - } - /* return 0;*/ -} - - -static void output(ErlDrvData e, char* buf, ErlDrvSizeT len) + struct driver_data *data = (struct driver_data *)drv_data; + + CHLD_STAT_LOCK; + data->pid = -1; + CHLD_STAT_UNLOCK; + + if (data->ofd != -1) { + if (data->ifd != data->ofd) { /* read and write */ + nbio_stop_fd(data->port_num, data->input_event); + nbio_stop_fd(data->port_num, data->output_event); + driver_select(data->port_num, data->input_event, ERL_DRV_USE, 0); + driver_select(data->port_num, data->output_event, ERL_DRV_USE, 0); + } + else { /* write only */ + nbio_stop_fd(data->port_num, data->output_event); + driver_select(data->port_num, data->output_event, ERL_DRV_USE, 0); + } + } + else { /* read only */ + nbio_stop_fd(data->port_num, data->input_event); + driver_select(data->port_num, data->input_event, ERL_DRV_USE, 0); + } + close(data->ifd); + close(data->ofd); +} + +/* The parameter e is a pointer to the driver_data structure + * related to the fd to be used as output */ +static void output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len) { - int fd = (int)(long)e; - ErlDrvPort ix = driver_data[fd].port_num; - int pb = driver_data[fd].packet_bytes; - int ofd = driver_data[fd].ofd; ErlDrvSizeT sz; char lb[4]; char* lbp; -#if 0 - struct iovec iv[2]; -#endif + struct driver_data *data = (struct driver_data *)drv_data; - /* (len > ((unsigned long)-1 >> (4-pb)*8)) */ - if (((pb == 2) && (len > 0xffff)) || (pb == 1 && len > 0xff)) { - driver_failure_posix(ix, EINVAL); + if (((data->packet_bytes == 2) && + (len > 0xffff)) || (data->packet_bytes == 1 && len > 0xff)) { + driver_failure_posix(data->port_num, EINVAL); return; /* -1; */ } put_int32(len, lb); - lbp = lb + (4-pb); - - driver_pdl_lock(driver_data[fd].pdl); - if ((sz = driver_sizeq(ix)) > 0) { - /* fprintf(stderr,"0x%x: output, enq\n", current_process()); */ - driver_enq(ix, lbp, pb); - driver_enq(ix, buf, len); - driver_pdl_unlock(driver_data[fd].pdl); - if (sz + len + pb >= (1 << 13)) - set_busy_port(ix, 1); + lbp = lb + (4-(data->packet_bytes)); + + if ((sz = driver_sizeq(data->port_num)) > 0) { + driver_enq(data->port_num, lbp, data->packet_bytes); + driver_enq(data->port_num, buf, len); + if (sz + len + data->packet_bytes >= (1 << 13)) + set_busy_port(data->port_num, 1); } else { - union SIGNAL *sig; - /* fprintf(stderr,"0x%x: output, enq+select\n", current_process()); */ -#if 0 - iv[0].iov_base = lbp; - iv[0].iov_len = pb; /* should work for pb=0 */ - iv[1].iov_base = buf; - iv[1].iov_len = len; -#endif - driver_enq(ix, lbp, pb); - driver_enq(ix, buf, len); - driver_pdl_unlock(driver_data[fd].pdl); - driver_select(ix, driver_data[ofd].out_sig_descr, + driver_enq(data->port_num, buf, len); /* n is the skip value */ + + driver_select(data->port_num, data->output_event, ERL_DRV_WRITE|ERL_DRV_USE, 1); - sig = alloc(sizeof(SysDriverAsyncSignal),ERTS_SIGNAL_FD_DRV_ASYNC); - sig->sys_async.type = fd; - sig->sys_async.res = pb+len; - send(&sig,driver_data[fd].out_proc); + + WRITE_AIO(data->ofd, len, buf); } return; /* 0; */ } +/* This function is being run when we in recieve + * either a read of 0 bytes, or the attach signal from a dying + * spawned load module */ static int port_inp_failure(ErlDrvPort port_num, ErlDrvEvent ready_fd, int res) /* Result: 0 (eof) or -1 (error) */ { - int err = errno; - int fd; - + int *fd; + SIGSELECT sig_no; ASSERT(res <= 0); - (void) driver_select(port_num, ready_fd, ERL_DRV_READ|ERL_DRV_WRITE, 0); - erl_drv_ose_event_fetch(ready_fd,NULL,&fd); - clear_fd_data(fd); - if (res == 0) { - if (driver_data[fd].report_exit) { - CHLD_STAT_LOCK; - - if (driver_data[fd].alive) { - /* - * We have eof and want to report exit status, but the process - * hasn't exited yet. When it does report_exit_status() will - * driver_select() this fd which will make sure that we get - * back here with driver_data[ready_fd].alive == 0 and - * driver_data[ready_fd].status set. - */ - CHLD_STAT_UNLOCK; - return 0; - } - else { - int status = driver_data[fd].status; - CHLD_STAT_UNLOCK; - -#if 0 /*ose we should find something for these statuses*/ - /* We need not be prepared for stopped/continued processes. */ - if (WIFSIGNALED(status)) - status = 128 + WTERMSIG(status); - else - status = WEXITSTATUS(status); -#endif - driver_report_exit(driver_data[fd].port_num, status); - } - } - driver_failure_eof(port_num); - } else { - driver_failure_posix(port_num, err); + + erl_drv_ose_event_fetch(ready_fd,&sig_no, NULL, (void **)&fd); + + /* As we need to handle two signals, we do this in two steps */ + if (driver_data[*fd].alive) { + report_exit_status(driver_data[*fd].report_exit, 0); /* status? */ + } + else { + clear_fd_data(*fd); + driver_report_exit(driver_data[*fd].port_num, driver_data[*fd].status); + /* As we do not really know if the spawn has crashed or exited nicely + * we do not check the result status of the following call.. FIXME + * can we handle this in a better way? */ + ose_pm_uninstall_load_module(driver_data[*fd].install_handle); + driver_free(driver_data[*fd].install_handle); + driver_free((void *)driver_data[*fd].aiocb.aio_buf); + + close(*fd); } - return 0; -} -static int async_read(ErlDrvEvent fd, byte *buff, int size) { - union SIGNAL *sigptr = erl_drv_ose_get_signal(fd); - int res = sigptr->sys_async.res; - if (res > 0) - memcpy(buff,sigptr->sys_async.buff,sigptr->sys_async.res); - errno = sigptr->sys_async.errno_copy; - send(&sigptr,sender(&sigptr)); - ASSERT(erl_drv_ose_get_signal(fd) == NULL); - return res; + return 0; } -/* fd is the drv_data that is returned from the */ -/* initial start routine */ -/* ready_fd is the descriptor that is ready to read */ - -static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd) +/* The parameter e is a pointer to the driver_data structure + * related to the fd to be used as output. + * ready_fd is the event that triggered this call to ready_input */ +static void ready_input(ErlDrvData drv_data, ErlDrvEvent ready_fd) { - int fd = (int)(long)e; - ErlDrvPort port_num; - int packet_bytes; int res; Uint h; + char *buf; + union SIGNAL *sig; + struct driver_data *data = (struct driver_data *)drv_data; - port_num = driver_data[fd].port_num; - packet_bytes = driver_data[fd].packet_bytes; + sig = erl_drv_ose_get_signal(ready_fd); + ASSERT(sig); - if (packet_bytes == 0) { - byte *read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF, - ERTS_SYS_READ_BUF_SZ); - res = async_read(ready_fd, read_buf, ERTS_SYS_READ_BUF_SZ); - if (res < 0) { - if ((errno != EINTR) && (errno != ERRNO_BLOCK)) - port_inp_failure(port_num, ready_fd, res); - } - else if (res == 0) - port_inp_failure(port_num, ready_fd, res); - else - driver_output(port_num, (char*) read_buf, res); - erts_free(ERTS_ALC_T_SYS_READ_BUF, (void *) read_buf); - } - else if (fd_data[fd].remain > 0) { /* We try to read the remainder */ - /* space is allocated in buf */ - res = async_read(ready_fd, (byte*)fd_data[fd].cpos, - fd_data[fd].remain); - if (res < 0) { - if ((errno != EINTR) && (errno != ERRNO_BLOCK)) - port_inp_failure(port_num, ready_fd, res); - } - else if (res == 0) { - port_inp_failure(port_num, ready_fd, res); - } - else if (res == fd_data[fd].remain) { /* we're done */ - driver_output(port_num, fd_data[fd].buf, - fd_data[fd].sz); - clear_fd_data(fd); - } - else { /* if (res < fd_data[ready_fd].remain) */ - fd_data[fd].cpos += res; - fd_data[fd].remain -= res; - } - } - else if (fd_data[fd].remain == 0) { /* clean fd */ - byte *read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF, - ERTS_SYS_READ_BUF_SZ); - /* We make one read attempt and see what happens */ - res = async_read(ready_fd, read_buf, ERTS_SYS_READ_BUF_SZ); - if (res < 0) { - if ((errno != EINTR) && (errno != ERRNO_BLOCK)) - port_inp_failure(port_num, ready_fd, res); - } - else if (res == 0) { /* eof */ - port_inp_failure(port_num, ready_fd, res); - } - else if (res < packet_bytes - fd_data[fd].psz) { - memcpy(fd_data[fd].pbuf+fd_data[fd].psz, - read_buf, res); - fd_data[fd].psz += res; - } - else { /* if (res >= packet_bytes) */ - unsigned char* cpos = read_buf; - int bytes_left = res; - - while (1) { - int psz = fd_data[fd].psz; - char* pbp = fd_data[fd].pbuf + psz; - - while(bytes_left && (psz < packet_bytes)) { - *pbp++ = *cpos++; - bytes_left--; - psz++; - } - - if (psz < packet_bytes) { - fd_data[fd].psz = psz; - break; - } - fd_data[fd].psz = 0; - - switch (packet_bytes) { - case 1: h = get_int8(fd_data[fd].pbuf); break; - case 2: h = get_int16(fd_data[fd].pbuf); break; - case 4: h = get_int32(fd_data[fd].pbuf); break; - default: ASSERT(0); return; /* -1; */ - } - - if (h <= (bytes_left)) { - driver_output(port_num, (char*) cpos, h); - cpos += h; - bytes_left -= h; - continue; - } - else { /* The last message we got was split */ - char *buf = erts_alloc_fnf(ERTS_ALC_T_FD_ENTRY_BUF, h); - if (!buf) { - errno = ENOMEM; - port_inp_failure(port_num, ready_fd, -1); - } - else { - erts_smp_atomic_add_nob(&sys_misc_mem_sz, h); - sys_memcpy(buf, cpos, bytes_left); - fd_data[fd].buf = buf; - fd_data[fd].sz = h; - fd_data[fd].remain = h - bytes_left; - fd_data[fd].cpos = buf + bytes_left; - } - break; - } - } - } - erts_free(ERTS_ALC_T_SYS_READ_BUF, (void *) read_buf); + + while (sig) { + /* If we've recieved an attach signal, we need to handle + * it in port_inp_failure */ + if (sig->sig_no == ERTS_SIGNAL_OSE_DRV_ATTACH) { + port_inp_failure(data->port_num, ready_fd, 0); + } + else { + res = sig->fm_read_reply.actual; + + if (data->packet_bytes == 0) { + if (res < 0) { + if ((errno != EINTR) && (errno != ERRNO_BLOCK)) { + port_inp_failure(data->port_num, ready_fd, res); + } + } + else if (res == 0) { + /* read of 0 bytes, eof, otherside of pipe is assumed dead */ + port_inp_failure(data->port_num, ready_fd, res); + } + else { + buf = driver_alloc(res); + memcpy(buf, (void *)data->aiocb.aio_buf, res); + driver_select(data->port_num, data->output_event, + ERL_DRV_WRITE|ERL_DRV_USE, 1); + driver_output(data->port_num, (char*) buf, res); + driver_free(buf); + } + } + /* We try to read the remainder */ + else if (fd_data[data->ifd].remain > 0) { + if (res < 0) { + if ((errno != EINTR) && (errno != ERRNO_BLOCK)) { + port_inp_failure(data->port_num, ready_fd, res); + } + } + else if (res == 0) { + port_inp_failure(data->port_num, ready_fd, res); + } + else if (res == fd_data[data->ifd].remain) { /* we're done */ + driver_output(data->port_num, + fd_data[data->ifd].buf, + fd_data[data->ifd].sz); + clear_fd_data(data->ifd); + } + else { /* if (res < fd_data[fd].remain) */ + fd_data[data->ifd].cpos += res; + fd_data[data->ifd].remain -= res; + } + } + else if (fd_data[data->ifd].remain == 0) { /* clean fd */ + if (res < 0) { + if ((errno != EINTR) && (errno != ERRNO_BLOCK)) { + port_inp_failure(data->port_num, ready_fd, res); + } + } + else if (res == 0) { /* eof */ + port_inp_failure(data->port_num, ready_fd, res); + } + else if (res < data->packet_bytes - fd_data[data->ifd].psz) { + memcpy(fd_data[data->ifd].pbuf+fd_data[data->ifd].psz, + (void *)data->aiocb.aio_buf, res); + fd_data[data->ifd].psz += res; + } + else { /* if (res >= packet_bytes) */ + unsigned char* cpos = (unsigned char*)data->aiocb.aio_buf; + int bytes_left = res; + + while (1) { + int psz = fd_data[data->ifd].psz; + char* pbp = fd_data[data->ifd].pbuf + psz; + + while (bytes_left && (psz < data->packet_bytes)) { + *pbp++ = *cpos++; + bytes_left--; + psz++; + } + + if (psz < data->packet_bytes) { + fd_data[data->ifd].psz = psz; + break; + } + fd_data[data->ifd].psz = 0; + + switch (data->packet_bytes) { + case 1: h = get_int8(fd_data[data->ifd].pbuf); break; + case 2: h = get_int16(fd_data[data->ifd].pbuf); break; + case 4: h = get_int32(fd_data[data->ifd].pbuf); break; + default: ASSERT(0); return; /* -1; */ + } + + if (h <= (bytes_left)) { + driver_output(data->port_num, (char*) cpos, h); + cpos += h; + bytes_left -= h; + continue; + } + else { /* The last message we got was split */ + char *buf = erts_alloc_fnf(ERTS_ALC_T_FD_ENTRY_BUF, h); + if (!buf) { + errno = ENOMEM; + port_inp_failure(data->port_num, ready_fd, -1); + } + else { + erts_smp_atomic_add_nob(&sys_misc_mem_sz, h); + sys_memcpy(buf, cpos, bytes_left); + fd_data[data->ifd].buf = buf; + fd_data[data->ifd].sz = h; + fd_data[data->ifd].remain = h - bytes_left; + fd_data[data->ifd].cpos = buf + bytes_left; + } + break; + } + } + } + } + + /* reset the read buffer and init next asynch read */ + DISPATCH_AIO(sig); + memset((void *)data->aiocb.aio_buf, 0, 255); + + if (res > 0) { + aio_read(&data->aiocb); + } + } + sig = erl_drv_ose_get_signal(ready_fd); } } -/* fd is the drv_data that is returned from the */ -/* initial start routine */ -/* ready_fd is the descriptor that is ready to read */ - -static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd) +/* The parameter e is a pointer to the driver_data structure + * related to the fd to be used as output. + * ready_fd is the event that triggered this call to ready_input */ +static void ready_output(ErlDrvData drv_data, ErlDrvEvent ready_fd) { - int fd = (int)(long)e; - ErlDrvPort ix = driver_data[fd].port_num; - union SIGNAL *sigptr = erl_drv_ose_get_signal(ready_fd); - ssize_t n; - struct iovec* iv; - int vsize; - - while (sigptr != NULL) { - - driver_pdl_lock(driver_data[fd].pdl); - if ((iv = (struct iovec*) driver_peekq(ix, &vsize)) == NULL) { - /* fprintf(stderr,"0x%x: ready_output, unselect\n", current_process()); */ - driver_pdl_unlock(driver_data[fd].pdl); - driver_select(ix, ready_fd, ERL_DRV_WRITE, 0); - set_busy_port(ix, 0); - free_buf(&sigptr); - if ((sigptr = erl_drv_ose_get_signal(ready_fd)) == NULL) - return; /* 0; */ - continue; + SysIOVec *iov; + int vlen; + int res; + union SIGNAL *sig; + struct driver_data *data = (struct driver_data *)drv_data; + + sig = erl_drv_ose_get_signal(ready_fd); + ASSERT(sig); + + while (sig != NULL) { + if (sig->fm_write_reply.actual <= 0) { + int status; + + status = efs_status_to_errno(sig->fm_write_reply.status); + driver_select(data->port_num, ready_fd, ERL_DRV_WRITE, 0); + DISPATCH_AIO(sig); + FREE_AIO(sig->fm_write_reply.buffer); + + driver_failure_posix(data->port_num, status); } - driver_pdl_unlock(driver_data[fd].pdl); - n = sigptr->sys_async.res; - if (n < 0) { - if (errno == ERRNO_BLOCK || errno == EINTR) { - /* fprintf(stderr,"0x%x: ready_output, send to %x\n", current_process(),driver_data[fd].out_proc);*/ - send(&sigptr,driver_data[fd].out_proc); - if ((sigptr = erl_drv_ose_get_signal(ready_fd)) == NULL) - return; /* 0; */ - continue; - } else { - int res = sigptr->sys_async.errno_copy; - /* fprintf(stderr,"0x%x: ready_output, error\n", current_process()); */ - free_buf(&sigptr); - driver_select(ix, ready_fd, ERL_DRV_WRITE, 0); - driver_failure_posix(ix, res); - if ((sigptr = erl_drv_ose_get_signal(ready_fd)) == NULL) - return; /* -1; */ - continue; - } - } else { - int remain; - driver_pdl_lock(driver_data[fd].pdl); - if ((remain = driver_deq(driver_data[fd].port_num, n)) == -1) - abort(); - /* fprintf(stderr, "0x%x: ready_output, %d to %x, remain %d\n", current_process(), - n, driver_data[fd].out_proc, remain); */ - driver_pdl_unlock(driver_data[fd].pdl); - if (remain != 0) - send(&sigptr, driver_data[fd].out_proc); - else - continue; + else { /* written bytes > 0 */ + iov = driver_peekq(data->port_num, &vlen); + if (vlen > 0) { + DISPATCH_AIO(sig); + FREE_AIO(sig->fm_write_reply.buffer); + res = driver_deq(data->port_num, iov[0].iov_len); + if (res > 0) { + iov = driver_peekq(data->port_num, &vlen); + WRITE_AIO(data->ofd, iov[0].iov_len, iov[0].iov_base); + } + } } - sigptr = erl_drv_ose_get_signal(ready_fd); - } - return; /* 0; */ + sig = erl_drv_ose_get_signal(ready_fd); + } } -static void stop_select(ErlDrvEvent fd, void* _) +static void stop_select(ErlDrvEvent ready_fd, void* _) { - close((int)fd); + int *fd; + erl_drv_ose_event_fetch(ready_fd, NULL, NULL, (void **)&fd); + erl_drv_ose_event_free(ready_fd); + close(*fd); } @@ -1690,42 +1677,16 @@ erl_debug(char* fmt, ...) static ERTS_INLINE void report_exit_status(ErtsSysReportExit *rep, int status) { - Port *pp; -#ifdef ERTS_SMP - CHLD_STAT_UNLOCK; - pp = erts_thr_id2port_sflgs(rep->port, - ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); - CHLD_STAT_LOCK; -#else - pp = erts_id2port_sflgs(rep->port, - NULL, - 0, - ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP); -#endif - if (pp) { - if (rep->ifd >= 0) { - driver_data[rep->ifd].alive = 0; - driver_data[rep->ifd].status = status; - (void) driver_select(ERTS_Port2ErlDrvPort(pp), - rep->in_sig_descr, - (ERL_DRV_READ|ERL_DRV_USE), - 1); - } - if (rep->ofd >= 0) { - driver_data[rep->ofd].alive = 0; - driver_data[rep->ofd].status = status; - (void) driver_select(ERTS_Port2ErlDrvPort(pp), - rep->out_sig_descr, - (ERL_DRV_WRITE|ERL_DRV_USE), - 1); - } -#ifdef ERTS_SMP - erts_thr_port_release(pp); -#else - erts_port_release(pp); -#endif - } - erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep); + if (rep->ifd >= 0) { + driver_data[rep->ifd].alive = 0; + driver_data[rep->ifd].status = status; + } + if (rep->ofd >= 0) { + driver_data[rep->ofd].alive = 0; + driver_data[rep->ofd].status = status; + } + + erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep); } #define ERTS_REPORT_EXIT_STATUS report_exit_status |