aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJonas Karlsson <[email protected]>2014-02-21 14:03:00 +0100
committerLukas Larsson <[email protected]>2014-02-24 15:16:09 +0100
commit06e55b6f2ac30c95717532a259a6148226f63b24 (patch)
tree4f522b0db240e82e955c2169a9372606c7ab5fcb
parent4a6850e522b91eb009ddd0ed9d9f542f1baf1bee (diff)
downloadotp-06e55b6f2ac30c95717532a259a6148226f63b24.tar.gz
otp-06e55b6f2ac30c95717532a259a6148226f63b24.tar.bz2
otp-06e55b6f2ac30c95717532a259a6148226f63b24.zip
ose: Updating fd_driver and spawn_driver for OSE
-rw-r--r--erts/emulator/sys/ose/sys.c1339
1 files changed, 650 insertions, 689 deletions
diff --git a/erts/emulator/sys/ose/sys.c b/erts/emulator/sys/ose/sys.c
index 88dbd7fcf8..c892cc69c7 100644
--- a/erts/emulator/sys/ose/sys.c
+++ b/erts/emulator/sys/ose/sys.c
@@ -49,6 +49,9 @@
#include "unistd.h"
#include "efs.h"
#include "erl_printf.h"
+#include "aio.h"
+#include "pm.h"
+#include "fcntl.h"
/* Set the define to 1 to get some logging */
#if 0
@@ -60,6 +63,7 @@
extern char **environ;
static erts_smp_rwmtx_t environ_rwmtx;
+static PROCESS sig_proxy_pid = 0;
#define MAX_VSIZE 16 /* Max number of entries allowed in an I/O
* vector sock_sendv().
@@ -80,31 +84,44 @@ static erts_smp_rwmtx_t environ_rwmtx;
typedef struct ErtsSysReportExit_ ErtsSysReportExit;
struct ErtsSysReportExit_ {
- ErtsSysReportExit *next;
- Eterm port;
- int pid;
- int ifd;
- int ofd;
- ErlDrvEvent in_sig_descr;
- ErlDrvEvent out_sig_descr;
+ ErtsSysReportExit *next;
+ Eterm port;
+ int pid;
+ int ifd;
+ int ofd;
+ ErlDrvEvent attach_event;
+ ErlDrvEvent input_event;
+ ErlDrvEvent output_event;
};
/* This data is shared by these drivers - initialized by spawn_init() */
static struct driver_data {
- ErlDrvPort port_num;
- int ofd, packet_bytes;
- ErtsSysReportExit *report_exit;
- int pid;
- int alive;
- int status;
- ErlDrvEvent in_sig_descr;
- ErlDrvEvent out_sig_descr;
- PROCESS in_proc;
- PROCESS out_proc;
- ErlDrvPDL pdl;
+ ErlDrvPort port_num;
+ int ofd;
+ int ifd;
+ int packet_bytes;
+ ErtsSysReportExit *report_exit;
+ int pid;
+ int alive;
+ int status;
+ ErlDrvEvent input_event;
+ ErlDrvEvent output_event;
+ struct aiocb aiocb;
+ FmHandle handle;
+ char *install_handle;
} *driver_data; /* indexed by fd */
+struct async {
+ SIGSELECT signo;
+ ErlDrvTermData port;
+ ErlDrvTermData proc;
+ PROCESS spid;
+ PROCESS target;
+ Uint32 ref;
+};
+
static ErtsSysReportExit *report_exit_list;
+static ERTS_INLINE void report_exit_status(ErtsSysReportExit *rep, int status);
extern int driver_interrupt(int, int);
extern void do_break(void);
@@ -157,6 +174,66 @@ erts_mtx_t chld_stat_mtx;
static volatile int children_died;
#endif
+#define SET_AIO(REQ,FD,SIZE,BUFF) \
+ memset(&(REQ),0,sizeof(REQ)); \
+ (REQ).aio_fildes = FD; \
+ (REQ).aio_offset = FM_POSITION_CURRENT; \
+ (REQ).aio_nbytes = SIZE; \
+ (REQ).aio_buf = BUFF; \
+ (REQ).aio_sigevent.sigev_notify = SIGEV_NONE
+
+/* the first sizeof(struct aiocb *) bytes of the write buffer
+ * will contain the pointer to the aiocb struct, this needs
+ * to be freed between asynchronous writes.
+ * A write of 0 bytes is ignored. */
+#define WRITE_AIO(FD,SIZE,BUFF) do { \
+ if (SIZE > 0) { \
+ struct aiocb *write_req = driver_alloc(sizeof(struct aiocb)); \
+ char *write_buff = driver_alloc((sizeof(char)*SIZE)+1+ \
+ (sizeof(struct aiocb *))); \
+ *(struct aiocb **)write_buff = (struct aiocb *)write_req; \
+ write_buff += sizeof(struct aiocb *); \
+ memcpy(write_buff,BUFF,SIZE+1); \
+ SET_AIO(*write_req,FD,SIZE,write_buff); \
+ aio_write(write_req); \
+ } \
+} while(0)
+
+/* free the write_buffer and write_req
+ * created in the WRITE_AIO() request macro */
+#define FREE_AIO(ptr) do { \
+ struct aiocb *aiocb_ptr; \
+ char *buffer_ptr; \
+ aiocb_ptr = *(struct aiocb **)((ptr)-sizeof(struct aiocb *)); \
+ buffer_ptr = (((char*)ptr)-sizeof(struct aiocb *)); \
+ driver_free(aiocb_ptr); \
+ driver_free(buffer_ptr); \
+} while(0)
+
+/* When we have several schedulers, we need to make sure
+ * that scheduler issuing aio_dispatch() is the owner on the signal */
+#define DISPATCH_AIO(sig) do { \
+ restore(sig); \
+ aio_dispatch(sig); \
+ } while(0)
+
+
+/* debug print macros */
+#define DEBUG_RES 0
+
+#ifdef DEBUG_RES
+#define DEBUG_CHECK_RES(actual, expected) \
+ do { \
+ if (actual != expected ) { \
+ ramlog_printf("Result check failed" \
+ " got: 0x%08x expected:0x%08x\nat: %s:%d\n", \
+ actual, expected, __FILE__, __LINE__); \
+ abort(); /* This might perhaps be too harsh? */ \
+ } \
+ } while(0)
+#else
+#define DEBUG_CHECK_RES
+#endif
static struct fd_data {
char pbuf[4]; /* hold partial packet bytes */
@@ -191,6 +268,7 @@ static struct termios initial_tty_mode;
static int replace_intr = 0;
/* assume yes initially, ttsl_init will clear it */
int using_oldshell = 1;
+static PROCESS get_signal_proxy_pid(void);
static void
init_check_io(void)
@@ -540,28 +618,13 @@ void fini_getenv_state(GETENV_STATE *state)
/************************** Port I/O *******************************/
-
-
/* I. Common stuff */
-typedef struct SysDriverAsyncSignal_ {
- SIGSELECT sig_no;
- int type;
- byte *buff;
- ssize_t res;
- int errno_copy;
-} SysDriverAsyncSignal;
-
-typedef struct SysDriverConfSignal_ {
- SIGSELECT sig_no;
- int fd;
- PROCESS parent;
-} SysDriverConfSignal;
-
union SIGNAL {
SIGSELECT sig_no;
- SysDriverAsyncSignal sys_async;
- SysDriverConfSignal conf_async;
+ struct FmReadPtr fm_read_reply;
+ struct FmWritePtr fm_write_reply;
+ struct async async;
};
/* II. The spawn/fd drivers */
@@ -582,14 +645,42 @@ static void erl_stop(ErlDrvData);
static void ready_input(ErlDrvData, ErlDrvEvent);
static void ready_output(ErlDrvData, ErlDrvEvent);
static void output(ErlDrvData, char*, ErlDrvSizeT);
-static void outputv(ErlDrvData, ErlIOVec*);
static void stop_select(ErlDrvEvent, void*);
-static ErlDrvOseEventId resolve_signal(union SIGNAL* sig) {
- return sig->sig_no == ERTS_SIGNAL_FD_DRV_ASYNC ? sig->sys_async.type : -1;
+
+static PROCESS
+get_signal_proxy_pid(void) {
+ union SIGNAL *sig;
+ SIGSELECT any_sig[] = {0};
+
+ if (!sig_proxy_pid) {
+ sig = alloc(sizeof(union SIGNAL), ERTS_SIGNAL_OSE_DRV_ATTACH);
+ hunt("ose_signal_driver_proxy", 0, NULL, &sig);
+ sig = receive(any_sig);
+ sig_proxy_pid = sender(&sig);
+ free_buf(&sig);
+ }
+ ASSERT(sig_proxy_pid);
+ return sig_proxy_pid;
}
-OS_PROCESS(fd_writer_process);
-OS_PROCESS(fd_reader_process);
+static ErlDrvOseEventId
+resolve_signal(union SIGNAL* sig) {
+ switch(sig->sig_no) {
+
+ case FM_READ_PTR_REPLY:
+ return (ErlDrvOseEventId)sig->fm_read_reply.handle;
+
+ case FM_WRITE_PTR_REPLY:
+ return (ErlDrvOseEventId)sig->fm_write_reply.handle;
+
+ case ERTS_SIGNAL_OSE_DRV_ATTACH:
+ return (ErlDrvOseEventId)sig->async.target;
+
+ default:
+ break;
+ }
+ return (ErlDrvOseEventId)-1;
+}
struct erl_drv_entry spawn_driver_entry = {
spawn_init,
@@ -627,7 +718,7 @@ struct erl_drv_entry fd_driver_entry = {
NULL,
fd_control,
NULL,
- outputv,
+ NULL,
NULL, /* ready_async */
NULL, /* flush */
NULL, /* call */
@@ -641,120 +732,167 @@ struct erl_drv_entry fd_driver_entry = {
stop_select
};
-static int set_driver_data(ErlDrvPort port_num,
+static void
+set_spawn_fd(int local_fd, int remote_fd, PROCESS remote_pid) {
+ PROCESS vm_pid;
+ FmHandle handle;
+ char env_val[55];
+ char env_name[10];
+ EfsStatus efs_res;
+
+ /* get pid of pipevm and handle of chosen fd */
+ efs_res = efs_examine_fd(local_fd, FLIB_FD_VMPID, &vm_pid, 0);
+ DEBUG_CHECK_RES(efs_res, EFS_SUCCESS);
+
+ /* setup the file descriptor to buffer per line */
+ efs_res = efs_config_fd(local_fd, FLIB_FD_BUFMODE, FM_BUFF_LINE,
+ FLIB_FD_BUFSIZE, 80, 0);
+ DEBUG_CHECK_RES(efs_res, EFS_SUCCESS);
+
+ /* duplicate handle and set spawn pid owner */
+ efs_res = efs_dup_to(local_fd, remote_pid, &handle);
+ DEBUG_CHECK_RES(efs_res, EFS_SUCCESS);
+
+ sprintf(env_name, "FD%d", remote_fd);
+
+ /* Syntax of the environment variable:
+ * "FD#" "<pid of pipevm>,<handle>,<buffer mode>,<buff size>,<omode>" */
+ sprintf(env_val, "0x%lx,0x%lx,%lu,%lu,0x%x",
+ vm_pid, handle,
+ FM_BUFF_LINE, 80,
+ O_APPEND);
+
+ set_env(remote_pid, env_name, env_val);
+}
+
+static ErlDrvData
+set_driver_data(ErlDrvPort port_num,
int ifd,
int ofd,
int packet_bytes,
int read_write,
int exit_status,
- int pid)
+ PROCESS pid)
{
Port *prt;
ErtsSysReportExit *report_exit;
- union SIGNAL *sig;
-
- /*erts_fprintf(stderr, " %s / pid %x / ofd %d / ifd %d\n",
- __FUNCTION__, current_process(), ofd, ifd);*/
-
-
- if (!exit_status)
- report_exit = NULL;
- else {
- report_exit = erts_alloc(ERTS_ALC_T_PRT_REP_EXIT,
- sizeof(ErtsSysReportExit));
- report_exit->next = report_exit_list;
- report_exit->port = erts_drvport2id(port_num);
- report_exit->pid = pid;
- report_exit->ifd = read_write & DO_READ ? ifd : -1;
- report_exit->ofd = read_write & DO_WRITE ? ofd : -1;
-
- if (read_write & DO_READ)
- report_exit->in_sig_descr =
- erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC, ifd,
- resolve_signal);
- if (read_write & DO_WRITE)
- report_exit->out_sig_descr =
- erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC, ofd,
- resolve_signal);
-
- report_exit_list = report_exit;
- }
prt = erts_drvport2port(port_num);
- if (prt != ERTS_INVALID_ERL_DRV_PORT)
- prt->os_pid = pid;
+ if (prt != ERTS_INVALID_ERL_DRV_PORT) {
+ prt->os_pid = pid;
+ }
+ /* READ */
if (read_write & DO_READ) {
- driver_data[ifd].packet_bytes = packet_bytes;
- driver_data[ifd].port_num = port_num;
- driver_data[ifd].report_exit = report_exit;
- driver_data[ifd].pid = pid;
- driver_data[ifd].alive = 1;
- driver_data[ifd].status = 0;
- driver_data[ifd].in_sig_descr =
- erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC,ifd,
- resolve_signal);
-
- driver_data[ifd].in_proc = create_process(OS_PRI_PROC,"beam_fd_reader",
- fd_reader_process, 0x800,
- FD_PROC_PRI, 0, 0,
- NULL, 0, 0);
- efs_clone(driver_data[ifd].in_proc);
- sig = alloc(sizeof(SysDriverConfSignal), ERTS_SIGNAL_FD_DRV_CONFIG);
- sig->conf_async.fd = ifd;
- sig->conf_async.parent = current_process();
- send(&sig, driver_data[ifd].in_proc);
- start(driver_data[ifd].in_proc);
-
- if (read_write & DO_WRITE) {
- driver_data[ifd].ofd = ofd;
- driver_data[ifd].out_sig_descr =
- erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC,ofd,
- resolve_signal);
- driver_data[ifd].pdl = driver_pdl_create(port_num);
- driver_data[ifd].out_proc =
- create_process(OS_PRI_PROC,"beam_fd_writer",
- fd_writer_process, 0x800,
- FD_PROC_PRI, 0, 0, NULL, 0, 0);
- sig = alloc(sizeof(SysDriverConfSignal), ERTS_SIGNAL_FD_DRV_CONFIG);
- sig->conf_async.fd = ofd;
- sig->conf_async.parent = current_process();
- send(&sig, driver_data[ifd].out_proc);
- // efs_clone(driver_data[ifd].out_proc);
- start(driver_data[ifd].out_proc);
- if (ifd != ofd)
- driver_data[ofd] = driver_data[ifd]; /* structure copy */
- } else { /* DO_READ only */
- driver_data[ifd].ofd = -1;
- }
- (void) driver_select(port_num, driver_data[ifd].in_sig_descr,
+ efs_examine_fd(ifd, FLIB_FD_HANDLE, &driver_data[ifd].handle, 0);
+ driver_data[ifd].ifd = ifd;
+ driver_data[ifd].packet_bytes = packet_bytes;
+ driver_data[ifd].port_num = port_num;
+ driver_data[ifd].pid = pid;
+
+ /* async read struct */
+ memset(&driver_data[ifd].aiocb, 0, sizeof(struct aiocb));
+ driver_data[ifd].aiocb.aio_buf = driver_alloc(255);
+ driver_data[ifd].aiocb.aio_fildes = ifd;
+ driver_data[ifd].aiocb.aio_nbytes = 255;
+
+ driver_data[ifd].alive = 1;
+ driver_data[ifd].status = 0;
+ driver_data[ifd].input_event =
+ erl_drv_ose_event_alloc(FM_READ_PTR_REPLY,
+ driver_data[ifd].handle, resolve_signal,
+ &driver_data[ifd].ifd);
+
+ /* READ & WRITE */
+ if (read_write & DO_WRITE) {
+ driver_data[ifd].ofd = ofd;
+ efs_examine_fd(ofd, FLIB_FD_HANDLE, &driver_data[ofd].handle, 0);
+
+ driver_data[ifd].output_event =
+ erl_drv_ose_event_alloc(FM_WRITE_PTR_REPLY,
+ driver_data[ofd].handle, resolve_signal,
+ &driver_data[ofd].ofd);
+ driver_data[ofd].pid = pid;
+ if (ifd != ofd) {
+ driver_data[ofd] = driver_data[ifd];
+ driver_data[ofd].aiocb.aio_buf = NULL;
+ }
+ }
+ else { /* READ ONLY */
+ driver_data[ifd].ofd = -1;
+ }
+
+ /* enable input event */
+ (void) driver_select(port_num, driver_data[ifd].input_event,
(ERL_DRV_READ | ERL_DRV_USE), 1);
- return(ifd);
- } else { /* DO_WRITE only */
- driver_data[ofd].packet_bytes = packet_bytes;
- driver_data[ofd].port_num = port_num;
- driver_data[ofd].report_exit = report_exit;
- driver_data[ofd].ofd = ofd;
- driver_data[ofd].pid = pid;
- driver_data[ofd].alive = 1;
- driver_data[ofd].status = 0;
- driver_data[ofd].in_sig_descr =
- erl_drv_ose_event_alloc(ERTS_SIGNAL_FD_DRV_ASYNC,ofd,
- resolve_signal);
- driver_data[ofd].out_sig_descr = driver_data[ofd].in_sig_descr;
- driver_data[ofd].out_proc =
- create_process(OS_PRI_PROC, "beam_fd_writer",
- fd_writer_process, 0x800,
- FD_PROC_PRI, 0, 0, NULL, 0, 0);
- sig = alloc(sizeof(SysDriverConfSignal), ERTS_SIGNAL_FD_DRV_CONFIG);
- sig->conf_async.fd = ofd;
- sig->conf_async.parent = current_process();
- send(&sig, driver_data[ofd].out_proc);
- start(driver_data[ofd].out_proc);
- //efs_clone(driver_data[ifd].out_proc);
- driver_data[ofd].pdl = driver_pdl_create(port_num);
- return(ofd);
+
+ aio_read(&driver_data[ifd].aiocb);
+ }
+ else { /* WRITE ONLY */
+ efs_examine_fd(ofd, FLIB_FD_HANDLE, &driver_data[ofd].handle, 0);
+ driver_data[ofd].packet_bytes = packet_bytes;
+ driver_data[ofd].port_num = port_num;
+ driver_data[ofd].ofd = ofd;
+ driver_data[ofd].pid = pid;
+ driver_data[ofd].alive = 1;
+ driver_data[ofd].status = 0;
+ driver_data[ofd].output_event =
+ erl_drv_ose_event_alloc(FM_WRITE_PTR_REPLY, driver_data[ofd].handle,
+ resolve_signal, &driver_data[ofd].ofd);
+ driver_data[ofd].input_event = driver_data[ofd].output_event;
}
+
+ /* this is used for spawned load modules, and is needed
+ * to properly uninstall them */
+ if (exit_status) {
+ struct PmProgramInfo *info;
+ int install_handle_size;
+ union SIGNAL *sig;
+ PmStatus pm_status;
+ report_exit = erts_alloc(ERTS_ALC_T_PRT_REP_EXIT,
+ sizeof(ErtsSysReportExit));
+ report_exit->next = report_exit_list;
+ report_exit->port = erts_drvport2id(port_num);
+ report_exit->pid = pid;
+ report_exit->ifd = (read_write & DO_READ) ? ifd : -1;
+ report_exit->ofd = (read_write & DO_WRITE) ? ofd : -1;
+ report_exit_list = report_exit;
+ report_exit->attach_event =
+ erl_drv_ose_event_alloc(ERTS_SIGNAL_OSE_DRV_ATTACH, pid,
+ resolve_signal, &driver_data[ifd].ifd);
+
+ /* setup ifd and ofd report exit */
+ driver_data[ifd].report_exit = report_exit;
+ driver_data[ofd].report_exit = report_exit;
+
+ pm_status = ose_pm_program_info(pid, &info);
+ DEBUG_CHECK_RES(pm_status, PM_SUCCESS);
+
+ install_handle_size = strlen(info->install_handle)+1;
+ driver_data[ifd].install_handle = driver_alloc(install_handle_size);
+ strcpy(driver_data[ifd].install_handle,
+ info->install_handle);
+
+ free_buf((union SIGNAL **)&info);
+
+ sig = alloc(sizeof(struct async), ERTS_SIGNAL_OSE_DRV_ATTACH);
+ sig->async.target = pid;
+ send(&sig, get_signal_proxy_pid());
+
+ /* this event will trigger when we receive an attach signal
+ * from the recently dead load module */
+ (void)driver_select(port_num,report_exit->attach_event, DO_READ, 1);
+ }
+ else {
+ report_exit = NULL;
+ }
+
+ /* the return value is the pointer to the driver_data struct we created
+ * in this function, it will be used in the drivers input
+ * and output functions */
+ return (ErlDrvData)((!(read_write & DO_READ) && read_write & DO_WRITE)
+ ? &driver_data[ofd]
+ : &driver_data[ifd]);
}
static int spawn_init()
@@ -772,9 +910,9 @@ static int spawn_init()
return 1;
}
-static void init_fd_data(int fd, ErlDrvPort port_num)
+static void
+init_fd_data(int fd, ErlDrvPort port_num)
{
-
fd_data[fd].buf = NULL;
fd_data[fd].cpos = NULL;
fd_data[fd].remain = 0;
@@ -782,173 +920,109 @@ static void init_fd_data(int fd, ErlDrvPort port_num)
fd_data[fd].psz = 0;
}
-static ErlDrvData spawn_start(ErlDrvPort port_num,
- char* name,
- SysDriverOpts* opts)
+/* FIXME write a decent text on pipes on ose */
+static ErlDrvData
+spawn_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
{
-
- long res = 0;
-
- /* Have to implement for OSE */
- return (ErlDrvData)res;
-}
-
-OS_PROCESS(fd_reader_process) {
- union SIGNAL *sig;
- PROCESS parent;
- int fd;
- byte *read_buf;
-
- SIGSELECT sigsel[] = {1,ERTS_SIGNAL_FD_DRV_CONFIG};
-
-#ifdef ERTS_ENABLE_LOCK_COUNT
- erts_lcnt_init();
-#endif
-
- sig = receive(sigsel);
-
- fd = sig->conf_async.fd;
-
- parent = sig->conf_async.parent;
- free_buf(&sig);
-
-#ifdef ERTS_ENABLE_LOCK_CHECK
- {
- char buf[31];
- erts_snprintf(&buf[0], 31, "fd_reader %beu", fd);
- erts_lc_set_thread_name(&buf[0]);
+ int ifd[2];
+ int ofd[2];
+ static uint32_t ticker = 0;
+ PmStatus pm_status;
+ OSDOMAIN domain = PM_NEW_DOMAIN;
+ PROCESS progpid, mainbid, mainpid;
+ char *handle = NULL;
+ struct PmProgramInfo *info;
+ char *args = NULL;
+ char *tmp_handle;
+ ErlDrvData res = (ErlDrvData)-1;
+ int handle_size;
+ char *ptr;
+
+ /* handle arguments */
+ ptr = strchr(name, ' ');
+ if (ptr != NULL) {
+ *ptr ='\0';
+ ptr++;
+ args = ptr;
}
-#endif
-
- if (fd == 0) {
- FILE *ffd = stdin;
- (void)stdin;
+ else {
+ args = NULL;
}
- sigsel[1] = ERTS_SIGNAL_FD_DRV_ASYNC;
-
- read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF,
- ERTS_SYS_READ_BUF_SZ);
- while (1) {
- int errno_copy = errno;
- ssize_t res;
- res = read(fd, read_buf, ERTS_SYS_READ_BUF_SZ);
- sig = alloc(sizeof(SysDriverAsyncSignal), ERTS_SIGNAL_FD_DRV_ASYNC);
- sig->sys_async.buff = read_buf;
- sig->sys_async.res = res;
- if (res <= 0 && errno == EBADF) {
- fprintf(stderr,"Could not read from input fd (fd %d/ errno %d/ res %d)\n",
- fd, errno, res);
- break;
- }
- if (errno != errno_copy)
- sig->sys_async.errno_copy = errno;
- else
- sig->sys_async.errno_copy = -1;
- sig->sys_async.type = fd;
- send(&sig,parent);
- /* Wait for acc from async_read */
- sig = receive(sigsel);
- free_buf(&sig);
+ /* create an install handle */
+ ptr = strrchr(name, '/');
+ if (ptr != NULL) {
+ ptr++;
+ tmp_handle = ptr;
+ }
+ else {
+ tmp_handle = name;
+ }
+ handle_size = strlen(tmp_handle)+1;
+ handle_size += (ticker<10)?3:((ticker<100)?4:5);
+
+ handle = driver_alloc(handle_size);
+ snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker);
+
+ do {
+ snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker++);
+ pm_status = ose_pm_install_load_module(0, "ELF", name, handle,
+ 0, 0, NULL);
+
+ } while (pm_status == PM_EINSTALL_HANDLE_ALREADY_INSTALLED);
+ DEBUG_CHECK_RES(pm_status, PM_SUCCESS);
+
+ /* Create Program */
+ pm_status = ose_pm_create_program(&domain, handle, 0, 0,
+ NULL, &progpid, &mainbid);
+ DEBUG_CHECK_RES(pm_status, PM_SUCCESS);
+
+ /* Get the mainpid from the newly created program */
+ pm_status = ose_pm_program_info(progpid, &info);
+ DEBUG_CHECK_RES(pm_status, PM_SUCCESS);
+
+ mainpid = info->main_process;
+ free_buf ((union SIGNAL **)&info);
+
+ /* pipevm needs to be started
+ * pipe will return 0 if success, -1 if not,
+ * errno will be set */
+ if (pipe(ifd) != 0 || pipe(ofd) != 0) {
+ DEBUG_CHECK_RES(0, -1);
+ ASSERT(0);
}
- erts_free(ERTS_ALC_T_SYS_READ_BUF, read_buf);
-}
-
-OS_PROCESS(fd_writer_process) {
- union SIGNAL *sig;
- PROCESS parent;
- int fd;
- SIGSELECT sigsel[] = { 1, ERTS_SIGNAL_FD_DRV_CONFIG,
- ERTS_SIGNAL_FD_DRV_ASYNC };
- /* Only wait for config event with the fd which we are printing to */
- sig = receive(sigsel);
+ /* setup driver data */
+ res = set_driver_data(port_num, ofd[0], ifd[1], opts->packet_bytes,
+ opts->read_write, 1 /* opts->exit_status */, progpid);
- fd = sig->conf_async.fd;
- parent = sig->conf_async.parent;
- free_buf(&sig);
+ /* init the fd_data array for read/write */
+ init_fd_data(ofd[0], port_num);
+ init_fd_data(ifd[1], port_num);
-#ifdef ERTS_ENABLE_LOCK_COUNT
- {
- char buf[31];
- erts_snprintf(&buf[0], 31, "fd_writer %beu", fd);
- erts_lc_set_thread_name(&buf[0]);
+ /* setup additional configurations
+ * for the spawned applications environment */
+ if (args != NULL) {
+ set_env(progpid, "ARGV", args);
}
-#endif
+ set_env(mainbid, "EFS_RESOLVE_TMO", 0);
+ set_spawn_fd(ifd[0], 0, mainpid);
+ set_spawn_fd(ofd[1], 1, mainpid);
+ set_spawn_fd(ofd[1], 2, mainpid);
- sigsel[0] = 2;
- /* Why do I need these?!? */
- if (fd == 1) {
- FILE* ffd = stdout;
- (void)stdout;
- } else if (fd == 2) {
- FILE* ffd = stderr;
- (void)stderr;
- }
+ /* start the spawned program */
+ pm_status = ose_pm_start_program(mainbid);
+ DEBUG_CHECK_RES(pm_status, PM_SUCCESS);
- while (1) {
- int errno_copy = errno;
- int res;
- SysIOVec *iov0;
- SysIOVec *iov;
- int iovlen;
- int iovcnt;
- int n = 0, i = 0, offset = 0;
- size_t p;
- /* fprintf(stderr,"0x%x: fd_writer, receive\n", current_process()); */
- sig = receive(sigsel);
- /* size = sig->sys_async.res;*/
- if (sig->sig_no == ERTS_SIGNAL_FD_DRV_CONFIG)
- return;
- driver_pdl_lock(driver_data[fd].pdl);
-
- iov0 = driver_peekq(driver_data[fd].port_num, &iovlen);
-
- /* Calculate iovcnt */
- for (p = 0, iovcnt = 0; iovcnt < iovlen;
- p += iov0[iovcnt++].iov_len)
- ;
- iov = driver_alloc(sizeof(SysIOVec) * iovcnt);
- memcpy(iov, iov0, iovcnt * sizeof(SysIOVec));
- /* Let go of lock until we deque from original vector */
- driver_pdl_unlock(driver_data[fd].pdl);
-
- if (iovlen > 0) {
- while(i < iovcnt) {
- /* We only write 256 chars at the time in order to
- not overflow the stdout process */
- if ((iov[i].iov_len-offset) > 256) {
- res = write(fd, iov[i].iov_base+offset, 256);
- if (res < 0)
- break;
- offset += res;
- } else {
- res = write(fd, iov[i].iov_base+offset, iov[i].iov_len-offset);
- if (res < 0)
- break;
- offset = 0;
- i++;
- }
- n += res;
- }
- if (res > 0)
- res = n;
- } else if (iovlen == 0) {
- res = 0;
- } else { /* Port has terminated */
- res = -1;
- }
- driver_free(iov);
-
- sig->sys_async.buff = NULL;
- sig->sys_async.res = res;
- if (errno != errno_copy)
- sig->sys_async.errno_copy = errno;
- else
- sig->sys_async.errno_copy = -1;
- sig->sys_async.type = fd;
- send(&sig, parent);
+ /* close unused fd's */
+ close(ifd[0]);
+ close(ofd[1]);
+
+ if (handle) {
+ driver_free(handle);
}
+
+ return (ErlDrvData)res;
}
#define FD_DEF_HEIGHT 24
@@ -974,13 +1048,13 @@ static ErlDrvSSizeT fd_control(ErlDrvData drv_data,
char *buf, ErlDrvSizeT len,
char **rbuf, ErlDrvSizeT rlen)
{
- int fd = (int)(long)drv_data;
+ struct driver_data *data = (struct driver_data *)drv_data;
char resbuff[2*sizeof(Uint32)];
switch (command) {
case FD_CTRL_OP_GET_WINSIZE:
{
Uint32 w,h;
- if (fd_get_window_size(fd,&w,&h))
+ if (fd_get_window_size(data->ifd,&w,&h))
return 0;
memcpy(resbuff,&w,sizeof(Uint32));
memcpy(resbuff+sizeof(Uint32),&h,sizeof(Uint32));
@@ -1008,7 +1082,7 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,
if (opts->read_write & DO_WRITE) {
init_fd_data(opts->ofd, port_num);
}
- res = (ErlDrvData)(long)set_driver_data(port_num, opts->ifd, opts->ofd,
+ res = set_driver_data(port_num, opts->ifd, opts->ofd,
opts->packet_bytes,
opts->read_write, 0, -1);
CHLD_STAT_UNLOCK;
@@ -1031,404 +1105,317 @@ static void clear_fd_data(int fd)
static void nbio_stop_fd(ErlDrvPort prt, ErlDrvEvent ev)
{
- int fd;
+ int *fd;
driver_select(prt,ev,DO_READ|DO_WRITE,0);
- erl_drv_ose_event_fetch(ev, NULL, &fd);
- clear_fd_data(fd);
- SET_BLOCKING(fd);
+ erl_drv_ose_event_fetch(ev, NULL, NULL, (void **)&fd);
+ clear_fd_data(*fd);
+ SET_BLOCKING(*fd);
}
-static void fd_stop(ErlDrvData fd) /* Does not close the fds */
+static void fd_stop(ErlDrvData drv_data) /* Does not close the fds */
{
- int ofd;
+ struct driver_data *data = (struct driver_data *)drv_data;
- nbio_stop_fd(driver_data[(int)(long)fd].port_num,
- driver_data[(int)(long)fd].in_sig_descr);
- ofd = driver_data[(int)(long)fd].ofd;
- if (ofd != (int)(long)fd && ofd != -1)
- nbio_stop_fd(driver_data[(int)(long)fd].port_num,
- driver_data[(int)(long)fd].out_sig_descr);
+ if (data->ofd != -1) {
+ if (data->ifd != data->ofd) { /* read and write */
+ nbio_stop_fd(data->port_num, data->input_event);
+ nbio_stop_fd(data->port_num, data->output_event);
+ }
+ else { /* write only */
+ nbio_stop_fd(data->port_num, data->output_event);
+ }
+ }
+ else { /* read only */
+ nbio_stop_fd(data->port_num, data->input_event);
+ }
}
-/* Note that driver_data[fd].ifd == fd if the port was opened for reading, */
-/* otherwise (i.e. write only) driver_data[fd].ofd = fd. */
-
-static void erl_stop(ErlDrvData fd)
-{
- ErlDrvPort prt;
- int ofd;
-
- prt = driver_data[(int)(long)fd].port_num;
- nbio_stop_fd(prt, driver_data[(int)(long)fd].in_sig_descr);
-
- ofd = driver_data[(int)(long)fd].ofd;
- if (ofd != (int)(long)fd && (int)(long)ofd != -1)
- nbio_stop_fd(prt, driver_data[(int)(long)fd].out_sig_descr);
- else
- ofd = -1;
-
- CHLD_STAT_LOCK;
-
- /* Mark as unused. */
- driver_data[(int)(long)fd].pid = -1;
-
- CHLD_STAT_UNLOCK;
- /* SMP note: Close has to be last thing done (open file descriptors work
- as locks on driver_data[] entries) */
- driver_select(prt, driver_data[(int)(long)fd].in_sig_descr,
- ERL_DRV_USE, 0); /* close(fd); */
- if (ofd >= 0) {
- driver_select(prt, driver_data[(int)(long)fd].out_sig_descr,
- ERL_DRV_USE, 0); /* close(ofd); */
- }
-}
-
-static void outputv(ErlDrvData e, ErlIOVec* ev)
+static void erl_stop(ErlDrvData drv_data)
{
- int fd = (int)(long)e;
- ErlDrvPort ix = driver_data[fd].port_num;
- int pb = driver_data[fd].packet_bytes;
- ErlDrvSizeT sz;
- char lb[4];
- char* lbp;
- ErlDrvSizeT len = ev->size;
-
- /* (len > ((unsigned long)-1 >> (4-pb)*8)) */
- /* if (pb >= 0 && (len & (((ErlDrvSizeT)1 << (pb*8))) - 1) != len) {*/
- if (((pb == 2) && (len > 0xffff)) || (pb == 1 && len > 0xff)) {
- driver_failure_posix(ix, EINVAL);
- return; /* -1; */
- }
- /* Handles 0 <= pb <= 4 only */
- put_int32((Uint32) len, lb);
- lbp = lb + (4-pb);
-
- ev->iov[0].iov_base = lbp;
- ev->iov[0].iov_len = pb;
- ev->size += pb;
- driver_pdl_lock(driver_data[fd].pdl);
- if ((sz = driver_sizeq(ix)) > 0) {
- /* fprintf(stderr,"0x%x: outputv, enq\n", current_process()); */
- driver_enqv(ix, ev, 0);
- if (sz + ev->size >= (1 << 13))
- set_busy_port(ix, 1);
- driver_pdl_unlock(driver_data[fd].pdl);
- }
- else {
- union SIGNAL *sig;
- /* fprintf(stderr,"0x%x: outputv, enq+sel\n", current_process()); */
- driver_enqv(ix, ev, 0); /* n is the skip value */
- driver_pdl_unlock(driver_data[fd].pdl);
- driver_select(ix, driver_data[fd].out_sig_descr,
- ERL_DRV_WRITE|ERL_DRV_USE, 1);
- sig = alloc(sizeof(SysDriverAsyncSignal),ERTS_SIGNAL_FD_DRV_ASYNC);
- sig->sys_async.type = fd;
- sig->sys_async.res = pb+len;
- send(&sig,driver_data[fd].out_proc);
- }
- /* return 0;*/
-}
-
-
-static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
+ struct driver_data *data = (struct driver_data *)drv_data;
+
+ CHLD_STAT_LOCK;
+ data->pid = -1;
+ CHLD_STAT_UNLOCK;
+
+ if (data->ofd != -1) {
+ if (data->ifd != data->ofd) { /* read and write */
+ nbio_stop_fd(data->port_num, data->input_event);
+ nbio_stop_fd(data->port_num, data->output_event);
+ driver_select(data->port_num, data->input_event, ERL_DRV_USE, 0);
+ driver_select(data->port_num, data->output_event, ERL_DRV_USE, 0);
+ }
+ else { /* write only */
+ nbio_stop_fd(data->port_num, data->output_event);
+ driver_select(data->port_num, data->output_event, ERL_DRV_USE, 0);
+ }
+ }
+ else { /* read only */
+ nbio_stop_fd(data->port_num, data->input_event);
+ driver_select(data->port_num, data->input_event, ERL_DRV_USE, 0);
+ }
+ close(data->ifd);
+ close(data->ofd);
+}
+
+/* The parameter e is a pointer to the driver_data structure
+ * related to the fd to be used as output */
+static void output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
{
- int fd = (int)(long)e;
- ErlDrvPort ix = driver_data[fd].port_num;
- int pb = driver_data[fd].packet_bytes;
- int ofd = driver_data[fd].ofd;
ErlDrvSizeT sz;
char lb[4];
char* lbp;
-#if 0
- struct iovec iv[2];
-#endif
+ struct driver_data *data = (struct driver_data *)drv_data;
- /* (len > ((unsigned long)-1 >> (4-pb)*8)) */
- if (((pb == 2) && (len > 0xffff)) || (pb == 1 && len > 0xff)) {
- driver_failure_posix(ix, EINVAL);
+ if (((data->packet_bytes == 2) &&
+ (len > 0xffff)) || (data->packet_bytes == 1 && len > 0xff)) {
+ driver_failure_posix(data->port_num, EINVAL);
return; /* -1; */
}
put_int32(len, lb);
- lbp = lb + (4-pb);
-
- driver_pdl_lock(driver_data[fd].pdl);
- if ((sz = driver_sizeq(ix)) > 0) {
- /* fprintf(stderr,"0x%x: output, enq\n", current_process()); */
- driver_enq(ix, lbp, pb);
- driver_enq(ix, buf, len);
- driver_pdl_unlock(driver_data[fd].pdl);
- if (sz + len + pb >= (1 << 13))
- set_busy_port(ix, 1);
+ lbp = lb + (4-(data->packet_bytes));
+
+ if ((sz = driver_sizeq(data->port_num)) > 0) {
+ driver_enq(data->port_num, lbp, data->packet_bytes);
+ driver_enq(data->port_num, buf, len);
+ if (sz + len + data->packet_bytes >= (1 << 13))
+ set_busy_port(data->port_num, 1);
}
else {
- union SIGNAL *sig;
- /* fprintf(stderr,"0x%x: output, enq+select\n", current_process()); */
-#if 0
- iv[0].iov_base = lbp;
- iv[0].iov_len = pb; /* should work for pb=0 */
- iv[1].iov_base = buf;
- iv[1].iov_len = len;
-#endif
- driver_enq(ix, lbp, pb);
- driver_enq(ix, buf, len);
- driver_pdl_unlock(driver_data[fd].pdl);
- driver_select(ix, driver_data[ofd].out_sig_descr,
+ driver_enq(data->port_num, buf, len); /* n is the skip value */
+
+ driver_select(data->port_num, data->output_event,
ERL_DRV_WRITE|ERL_DRV_USE, 1);
- sig = alloc(sizeof(SysDriverAsyncSignal),ERTS_SIGNAL_FD_DRV_ASYNC);
- sig->sys_async.type = fd;
- sig->sys_async.res = pb+len;
- send(&sig,driver_data[fd].out_proc);
+
+ WRITE_AIO(data->ofd, len, buf);
}
return; /* 0; */
}
+/* This function is being run when we in recieve
+ * either a read of 0 bytes, or the attach signal from a dying
+ * spawned load module */
static int port_inp_failure(ErlDrvPort port_num, ErlDrvEvent ready_fd, int res)
/* Result: 0 (eof) or -1 (error) */
{
- int err = errno;
- int fd;
-
+ int *fd;
+ SIGSELECT sig_no;
ASSERT(res <= 0);
- (void) driver_select(port_num, ready_fd, ERL_DRV_READ|ERL_DRV_WRITE, 0);
- erl_drv_ose_event_fetch(ready_fd,NULL,&fd);
- clear_fd_data(fd);
- if (res == 0) {
- if (driver_data[fd].report_exit) {
- CHLD_STAT_LOCK;
-
- if (driver_data[fd].alive) {
- /*
- * We have eof and want to report exit status, but the process
- * hasn't exited yet. When it does report_exit_status() will
- * driver_select() this fd which will make sure that we get
- * back here with driver_data[ready_fd].alive == 0 and
- * driver_data[ready_fd].status set.
- */
- CHLD_STAT_UNLOCK;
- return 0;
- }
- else {
- int status = driver_data[fd].status;
- CHLD_STAT_UNLOCK;
-
-#if 0 /*ose we should find something for these statuses*/
- /* We need not be prepared for stopped/continued processes. */
- if (WIFSIGNALED(status))
- status = 128 + WTERMSIG(status);
- else
- status = WEXITSTATUS(status);
-#endif
- driver_report_exit(driver_data[fd].port_num, status);
- }
- }
- driver_failure_eof(port_num);
- } else {
- driver_failure_posix(port_num, err);
+
+ erl_drv_ose_event_fetch(ready_fd,&sig_no, NULL, (void **)&fd);
+
+ /* As we need to handle two signals, we do this in two steps */
+ if (driver_data[*fd].alive) {
+ report_exit_status(driver_data[*fd].report_exit, 0); /* status? */
+ }
+ else {
+ clear_fd_data(*fd);
+ driver_report_exit(driver_data[*fd].port_num, driver_data[*fd].status);
+ /* As we do not really know if the spawn has crashed or exited nicely
+ * we do not check the result status of the following call.. FIXME
+ * can we handle this in a better way? */
+ ose_pm_uninstall_load_module(driver_data[*fd].install_handle);
+ driver_free(driver_data[*fd].install_handle);
+ driver_free((void *)driver_data[*fd].aiocb.aio_buf);
+
+ close(*fd);
}
- return 0;
-}
-static int async_read(ErlDrvEvent fd, byte *buff, int size) {
- union SIGNAL *sigptr = erl_drv_ose_get_signal(fd);
- int res = sigptr->sys_async.res;
- if (res > 0)
- memcpy(buff,sigptr->sys_async.buff,sigptr->sys_async.res);
- errno = sigptr->sys_async.errno_copy;
- send(&sigptr,sender(&sigptr));
- ASSERT(erl_drv_ose_get_signal(fd) == NULL);
- return res;
+ return 0;
}
-/* fd is the drv_data that is returned from the */
-/* initial start routine */
-/* ready_fd is the descriptor that is ready to read */
-
-static void ready_input(ErlDrvData e, ErlDrvEvent ready_fd)
+/* The parameter e is a pointer to the driver_data structure
+ * related to the fd to be used as output.
+ * ready_fd is the event that triggered this call to ready_input */
+static void ready_input(ErlDrvData drv_data, ErlDrvEvent ready_fd)
{
- int fd = (int)(long)e;
- ErlDrvPort port_num;
- int packet_bytes;
int res;
Uint h;
+ char *buf;
+ union SIGNAL *sig;
+ struct driver_data *data = (struct driver_data *)drv_data;
- port_num = driver_data[fd].port_num;
- packet_bytes = driver_data[fd].packet_bytes;
+ sig = erl_drv_ose_get_signal(ready_fd);
+ ASSERT(sig);
- if (packet_bytes == 0) {
- byte *read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF,
- ERTS_SYS_READ_BUF_SZ);
- res = async_read(ready_fd, read_buf, ERTS_SYS_READ_BUF_SZ);
- if (res < 0) {
- if ((errno != EINTR) && (errno != ERRNO_BLOCK))
- port_inp_failure(port_num, ready_fd, res);
- }
- else if (res == 0)
- port_inp_failure(port_num, ready_fd, res);
- else
- driver_output(port_num, (char*) read_buf, res);
- erts_free(ERTS_ALC_T_SYS_READ_BUF, (void *) read_buf);
- }
- else if (fd_data[fd].remain > 0) { /* We try to read the remainder */
- /* space is allocated in buf */
- res = async_read(ready_fd, (byte*)fd_data[fd].cpos,
- fd_data[fd].remain);
- if (res < 0) {
- if ((errno != EINTR) && (errno != ERRNO_BLOCK))
- port_inp_failure(port_num, ready_fd, res);
- }
- else if (res == 0) {
- port_inp_failure(port_num, ready_fd, res);
- }
- else if (res == fd_data[fd].remain) { /* we're done */
- driver_output(port_num, fd_data[fd].buf,
- fd_data[fd].sz);
- clear_fd_data(fd);
- }
- else { /* if (res < fd_data[ready_fd].remain) */
- fd_data[fd].cpos += res;
- fd_data[fd].remain -= res;
- }
- }
- else if (fd_data[fd].remain == 0) { /* clean fd */
- byte *read_buf = (byte *) erts_alloc(ERTS_ALC_T_SYS_READ_BUF,
- ERTS_SYS_READ_BUF_SZ);
- /* We make one read attempt and see what happens */
- res = async_read(ready_fd, read_buf, ERTS_SYS_READ_BUF_SZ);
- if (res < 0) {
- if ((errno != EINTR) && (errno != ERRNO_BLOCK))
- port_inp_failure(port_num, ready_fd, res);
- }
- else if (res == 0) { /* eof */
- port_inp_failure(port_num, ready_fd, res);
- }
- else if (res < packet_bytes - fd_data[fd].psz) {
- memcpy(fd_data[fd].pbuf+fd_data[fd].psz,
- read_buf, res);
- fd_data[fd].psz += res;
- }
- else { /* if (res >= packet_bytes) */
- unsigned char* cpos = read_buf;
- int bytes_left = res;
-
- while (1) {
- int psz = fd_data[fd].psz;
- char* pbp = fd_data[fd].pbuf + psz;
-
- while(bytes_left && (psz < packet_bytes)) {
- *pbp++ = *cpos++;
- bytes_left--;
- psz++;
- }
-
- if (psz < packet_bytes) {
- fd_data[fd].psz = psz;
- break;
- }
- fd_data[fd].psz = 0;
-
- switch (packet_bytes) {
- case 1: h = get_int8(fd_data[fd].pbuf); break;
- case 2: h = get_int16(fd_data[fd].pbuf); break;
- case 4: h = get_int32(fd_data[fd].pbuf); break;
- default: ASSERT(0); return; /* -1; */
- }
-
- if (h <= (bytes_left)) {
- driver_output(port_num, (char*) cpos, h);
- cpos += h;
- bytes_left -= h;
- continue;
- }
- else { /* The last message we got was split */
- char *buf = erts_alloc_fnf(ERTS_ALC_T_FD_ENTRY_BUF, h);
- if (!buf) {
- errno = ENOMEM;
- port_inp_failure(port_num, ready_fd, -1);
- }
- else {
- erts_smp_atomic_add_nob(&sys_misc_mem_sz, h);
- sys_memcpy(buf, cpos, bytes_left);
- fd_data[fd].buf = buf;
- fd_data[fd].sz = h;
- fd_data[fd].remain = h - bytes_left;
- fd_data[fd].cpos = buf + bytes_left;
- }
- break;
- }
- }
- }
- erts_free(ERTS_ALC_T_SYS_READ_BUF, (void *) read_buf);
+
+ while (sig) {
+ /* If we've recieved an attach signal, we need to handle
+ * it in port_inp_failure */
+ if (sig->sig_no == ERTS_SIGNAL_OSE_DRV_ATTACH) {
+ port_inp_failure(data->port_num, ready_fd, 0);
+ }
+ else {
+ res = sig->fm_read_reply.actual;
+
+ if (data->packet_bytes == 0) {
+ if (res < 0) {
+ if ((errno != EINTR) && (errno != ERRNO_BLOCK)) {
+ port_inp_failure(data->port_num, ready_fd, res);
+ }
+ }
+ else if (res == 0) {
+ /* read of 0 bytes, eof, otherside of pipe is assumed dead */
+ port_inp_failure(data->port_num, ready_fd, res);
+ }
+ else {
+ buf = driver_alloc(res);
+ memcpy(buf, (void *)data->aiocb.aio_buf, res);
+ driver_select(data->port_num, data->output_event,
+ ERL_DRV_WRITE|ERL_DRV_USE, 1);
+ driver_output(data->port_num, (char*) buf, res);
+ driver_free(buf);
+ }
+ }
+ /* We try to read the remainder */
+ else if (fd_data[data->ifd].remain > 0) {
+ if (res < 0) {
+ if ((errno != EINTR) && (errno != ERRNO_BLOCK)) {
+ port_inp_failure(data->port_num, ready_fd, res);
+ }
+ }
+ else if (res == 0) {
+ port_inp_failure(data->port_num, ready_fd, res);
+ }
+ else if (res == fd_data[data->ifd].remain) { /* we're done */
+ driver_output(data->port_num,
+ fd_data[data->ifd].buf,
+ fd_data[data->ifd].sz);
+ clear_fd_data(data->ifd);
+ }
+ else { /* if (res < fd_data[fd].remain) */
+ fd_data[data->ifd].cpos += res;
+ fd_data[data->ifd].remain -= res;
+ }
+ }
+ else if (fd_data[data->ifd].remain == 0) { /* clean fd */
+ if (res < 0) {
+ if ((errno != EINTR) && (errno != ERRNO_BLOCK)) {
+ port_inp_failure(data->port_num, ready_fd, res);
+ }
+ }
+ else if (res == 0) { /* eof */
+ port_inp_failure(data->port_num, ready_fd, res);
+ }
+ else if (res < data->packet_bytes - fd_data[data->ifd].psz) {
+ memcpy(fd_data[data->ifd].pbuf+fd_data[data->ifd].psz,
+ (void *)data->aiocb.aio_buf, res);
+ fd_data[data->ifd].psz += res;
+ }
+ else { /* if (res >= packet_bytes) */
+ unsigned char* cpos = (unsigned char*)data->aiocb.aio_buf;
+ int bytes_left = res;
+
+ while (1) {
+ int psz = fd_data[data->ifd].psz;
+ char* pbp = fd_data[data->ifd].pbuf + psz;
+
+ while (bytes_left && (psz < data->packet_bytes)) {
+ *pbp++ = *cpos++;
+ bytes_left--;
+ psz++;
+ }
+
+ if (psz < data->packet_bytes) {
+ fd_data[data->ifd].psz = psz;
+ break;
+ }
+ fd_data[data->ifd].psz = 0;
+
+ switch (data->packet_bytes) {
+ case 1: h = get_int8(fd_data[data->ifd].pbuf); break;
+ case 2: h = get_int16(fd_data[data->ifd].pbuf); break;
+ case 4: h = get_int32(fd_data[data->ifd].pbuf); break;
+ default: ASSERT(0); return; /* -1; */
+ }
+
+ if (h <= (bytes_left)) {
+ driver_output(data->port_num, (char*) cpos, h);
+ cpos += h;
+ bytes_left -= h;
+ continue;
+ }
+ else { /* The last message we got was split */
+ char *buf = erts_alloc_fnf(ERTS_ALC_T_FD_ENTRY_BUF, h);
+ if (!buf) {
+ errno = ENOMEM;
+ port_inp_failure(data->port_num, ready_fd, -1);
+ }
+ else {
+ erts_smp_atomic_add_nob(&sys_misc_mem_sz, h);
+ sys_memcpy(buf, cpos, bytes_left);
+ fd_data[data->ifd].buf = buf;
+ fd_data[data->ifd].sz = h;
+ fd_data[data->ifd].remain = h - bytes_left;
+ fd_data[data->ifd].cpos = buf + bytes_left;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ /* reset the read buffer and init next asynch read */
+ DISPATCH_AIO(sig);
+ memset((void *)data->aiocb.aio_buf, 0, 255);
+
+ if (res > 0) {
+ aio_read(&data->aiocb);
+ }
+ }
+ sig = erl_drv_ose_get_signal(ready_fd);
}
}
-/* fd is the drv_data that is returned from the */
-/* initial start routine */
-/* ready_fd is the descriptor that is ready to read */
-
-static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd)
+/* The parameter e is a pointer to the driver_data structure
+ * related to the fd to be used as output.
+ * ready_fd is the event that triggered this call to ready_input */
+static void ready_output(ErlDrvData drv_data, ErlDrvEvent ready_fd)
{
- int fd = (int)(long)e;
- ErlDrvPort ix = driver_data[fd].port_num;
- union SIGNAL *sigptr = erl_drv_ose_get_signal(ready_fd);
- ssize_t n;
- struct iovec* iv;
- int vsize;
-
- while (sigptr != NULL) {
-
- driver_pdl_lock(driver_data[fd].pdl);
- if ((iv = (struct iovec*) driver_peekq(ix, &vsize)) == NULL) {
- /* fprintf(stderr,"0x%x: ready_output, unselect\n", current_process()); */
- driver_pdl_unlock(driver_data[fd].pdl);
- driver_select(ix, ready_fd, ERL_DRV_WRITE, 0);
- set_busy_port(ix, 0);
- free_buf(&sigptr);
- if ((sigptr = erl_drv_ose_get_signal(ready_fd)) == NULL)
- return; /* 0; */
- continue;
+ SysIOVec *iov;
+ int vlen;
+ int res;
+ union SIGNAL *sig;
+ struct driver_data *data = (struct driver_data *)drv_data;
+
+ sig = erl_drv_ose_get_signal(ready_fd);
+ ASSERT(sig);
+
+ while (sig != NULL) {
+ if (sig->fm_write_reply.actual <= 0) {
+ int status;
+
+ status = efs_status_to_errno(sig->fm_write_reply.status);
+ driver_select(data->port_num, ready_fd, ERL_DRV_WRITE, 0);
+ DISPATCH_AIO(sig);
+ FREE_AIO(sig->fm_write_reply.buffer);
+
+ driver_failure_posix(data->port_num, status);
}
- driver_pdl_unlock(driver_data[fd].pdl);
- n = sigptr->sys_async.res;
- if (n < 0) {
- if (errno == ERRNO_BLOCK || errno == EINTR) {
- /* fprintf(stderr,"0x%x: ready_output, send to %x\n", current_process(),driver_data[fd].out_proc);*/
- send(&sigptr,driver_data[fd].out_proc);
- if ((sigptr = erl_drv_ose_get_signal(ready_fd)) == NULL)
- return; /* 0; */
- continue;
- } else {
- int res = sigptr->sys_async.errno_copy;
- /* fprintf(stderr,"0x%x: ready_output, error\n", current_process()); */
- free_buf(&sigptr);
- driver_select(ix, ready_fd, ERL_DRV_WRITE, 0);
- driver_failure_posix(ix, res);
- if ((sigptr = erl_drv_ose_get_signal(ready_fd)) == NULL)
- return; /* -1; */
- continue;
- }
- } else {
- int remain;
- driver_pdl_lock(driver_data[fd].pdl);
- if ((remain = driver_deq(driver_data[fd].port_num, n)) == -1)
- abort();
- /* fprintf(stderr, "0x%x: ready_output, %d to %x, remain %d\n", current_process(),
- n, driver_data[fd].out_proc, remain); */
- driver_pdl_unlock(driver_data[fd].pdl);
- if (remain != 0)
- send(&sigptr, driver_data[fd].out_proc);
- else
- continue;
+ else { /* written bytes > 0 */
+ iov = driver_peekq(data->port_num, &vlen);
+ if (vlen > 0) {
+ DISPATCH_AIO(sig);
+ FREE_AIO(sig->fm_write_reply.buffer);
+ res = driver_deq(data->port_num, iov[0].iov_len);
+ if (res > 0) {
+ iov = driver_peekq(data->port_num, &vlen);
+ WRITE_AIO(data->ofd, iov[0].iov_len, iov[0].iov_base);
+ }
+ }
}
- sigptr = erl_drv_ose_get_signal(ready_fd);
- }
- return; /* 0; */
+ sig = erl_drv_ose_get_signal(ready_fd);
+ }
}
-static void stop_select(ErlDrvEvent fd, void* _)
+static void stop_select(ErlDrvEvent ready_fd, void* _)
{
- close((int)fd);
+ int *fd;
+ erl_drv_ose_event_fetch(ready_fd, NULL, NULL, (void **)&fd);
+ erl_drv_ose_event_free(ready_fd);
+ close(*fd);
}
@@ -1690,42 +1677,16 @@ erl_debug(char* fmt, ...)
static ERTS_INLINE void
report_exit_status(ErtsSysReportExit *rep, int status)
{
- Port *pp;
-#ifdef ERTS_SMP
- CHLD_STAT_UNLOCK;
- pp = erts_thr_id2port_sflgs(rep->port,
- ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
- CHLD_STAT_LOCK;
-#else
- pp = erts_id2port_sflgs(rep->port,
- NULL,
- 0,
- ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
-#endif
- if (pp) {
- if (rep->ifd >= 0) {
- driver_data[rep->ifd].alive = 0;
- driver_data[rep->ifd].status = status;
- (void) driver_select(ERTS_Port2ErlDrvPort(pp),
- rep->in_sig_descr,
- (ERL_DRV_READ|ERL_DRV_USE),
- 1);
- }
- if (rep->ofd >= 0) {
- driver_data[rep->ofd].alive = 0;
- driver_data[rep->ofd].status = status;
- (void) driver_select(ERTS_Port2ErlDrvPort(pp),
- rep->out_sig_descr,
- (ERL_DRV_WRITE|ERL_DRV_USE),
- 1);
- }
-#ifdef ERTS_SMP
- erts_thr_port_release(pp);
-#else
- erts_port_release(pp);
-#endif
- }
- erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep);
+ if (rep->ifd >= 0) {
+ driver_data[rep->ifd].alive = 0;
+ driver_data[rep->ifd].status = status;
+ }
+ if (rep->ofd >= 0) {
+ driver_data[rep->ofd].alive = 0;
+ driver_data[rep->ofd].status = status;
+ }
+
+ erts_free(ERTS_ALC_T_PRT_REP_EXIT, rep);
}
#define ERTS_REPORT_EXIT_STATUS report_exit_status