aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorJonas Karlsson <[email protected]>2014-03-21 17:09:16 +0100
committerLukas Larsson <[email protected]>2014-03-26 15:18:59 +0100
commitd28049c638f19de0f0f4c805d3efd004de417210 (patch)
tree239dcd38d61e0b6ef53a35c79b01b7495921ab00 /erts/emulator
parent8af4baec6e25febcc4457c85bd9079b672990c9c (diff)
downloadotp-d28049c638f19de0f0f4c805d3efd004de417210.tar.gz
otp-d28049c638f19de0f0f4c805d3efd004de417210.tar.bz2
otp-d28049c638f19de0f0f4c805d3efd004de417210.zip
ose: fix for packet_bytes in fd/spawn driver.
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/sys/ose/sys.c289
1 files changed, 155 insertions, 134 deletions
diff --git a/erts/emulator/sys/ose/sys.c b/erts/emulator/sys/ose/sys.c
index d58e90b352..414dfb2264 100644
--- a/erts/emulator/sys/ose/sys.c
+++ b/erts/emulator/sys/ose/sys.c
@@ -212,14 +212,13 @@ static volatile int children_died;
driver_free(buffer_ptr); \
} while(0)
-/* When we have several schedulers, we need to make sure
- * that scheduler issuing aio_dispatch() is the owner on the signal */
#define DISPATCH_AIO(sig) do { \
if (aio_dispatch(sig)) \
ramlog_printf("%s:%d: dispatch failed with %d\n", \
__FILE__,__LINE__,errno); \
} while(0)
+#define AIO_PIPE_SIZE 1024
/* debug print macros */
#define DEBUG_RES 0
@@ -748,7 +747,7 @@ resolve_signal(union SIGNAL* sig) {
struct erl_drv_entry spawn_driver_entry = {
spawn_init,
spawn_start,
- erl_stop,
+ NULL, /* erl_stop, */
output,
ready_input,
ready_output,
@@ -859,10 +858,9 @@ set_driver_data(ErlDrvPort port_num,
/* async read struct */
memset(&driver_data[ifd].aiocb, 0, sizeof(struct aiocb));
- driver_data[ifd].aiocb.aio_buf = driver_alloc(255);
+ driver_data[ifd].aiocb.aio_buf = driver_alloc(AIO_PIPE_SIZE);
driver_data[ifd].aiocb.aio_fildes = ifd;
- driver_data[ifd].aiocb.aio_nbytes = 255;
-
+ driver_data[ifd].aiocb.aio_nbytes = (packet_bytes?packet_bytes:AIO_PIPE_SIZE);
driver_data[ifd].alive = 1;
driver_data[ifd].status = 0;
driver_data[ifd].input_event =
@@ -995,7 +993,7 @@ spawn_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
{
int ifd[2];
int ofd[2];
- static uint32_t ticker = 0;
+ static uint32_t ticker = 1;
PmStatus pm_status;
OSDOMAIN domain = PM_NEW_DOMAIN;
PROCESS progpid, mainbid, mainpid;
@@ -1007,39 +1005,53 @@ spawn_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
int handle_size;
char *ptr;
- /* handle arguments */
- ptr = strchr(name, ' ');
- if (ptr != NULL) {
- *ptr ='\0';
- ptr++;
- args = ptr;
+
+ args = driver_alloc(strlen(name)+1);
+ strcpy(args, name);
+ /* We need to handle name in three parts
+ * - install handle (must be unique)
+ * - install binary (needed for ose_pm_install_load_module())
+ * - full path (as argument to the spawned applications env.var
+ */
+
+ /* full path including arguments */
+ args = driver_alloc(strlen(name)+1);
+ strcpy(args, name);
+
+ /* handle path */
+ tmp_handle = strrchr(name, '/');
+ if (tmp_handle == NULL) {
+ tmp_handle = name;
}
else {
- args = NULL;
+ tmp_handle++;
}
- /* create an install handle */
- ptr = strrchr(name, '/');
+ /* handle args */
+ ptr = strchr(tmp_handle, ' ');
if (ptr != NULL) {
- ptr++;
- tmp_handle = ptr;
+ *ptr = '\0';
+ handle_size = ptr - tmp_handle;
}
else {
- tmp_handle = name;
+ handle_size = strlen(name)+1;
}
- handle_size = strlen(tmp_handle)+1;
- handle_size += (ticker<10)?3:((ticker<100)?4:5);
+ /* make room for ticker */
+ handle_size += (ticker<10)?3:((ticker<100)?4:5);
handle = driver_alloc(handle_size);
- snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker);
-
+
do {
- snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker++);
+ snprintf(handle, handle_size, "%s_%d", tmp_handle, ticker);
pm_status = ose_pm_install_load_module(0, "ELF", name, handle,
0, 0, NULL);
-
+ ticker++;
} while (pm_status == PM_EINSTALL_HANDLE_ALREADY_INSTALLED);
- DEBUG_CHECK_RES(pm_status, PM_SUCCESS);
+
+ if (pm_status != PM_SUCCESS) {
+ errno = ENOSYS; /* FIXME add comment */
+ return ERL_DRV_ERROR_ERRNO;
+ }
/* Create Program */
pm_status = ose_pm_create_program(&domain, handle, 0, 0,
@@ -1212,17 +1224,13 @@ static void erl_stop(ErlDrvData drv_data)
if (data->ifd != data->ofd) { /* read and write */
nbio_stop_fd(data->port_num, data->input_event);
nbio_stop_fd(data->port_num, data->output_event);
- driver_select(data->port_num, data->input_event, ERL_DRV_USE, 0);
- driver_select(data->port_num, data->output_event, ERL_DRV_USE, 0);
}
else { /* write only */
nbio_stop_fd(data->port_num, data->output_event);
- driver_select(data->port_num, data->output_event, ERL_DRV_USE, 0);
}
}
else { /* read only */
nbio_stop_fd(data->port_num, data->input_event);
- driver_select(data->port_num, data->input_event, ERL_DRV_USE, 0);
}
close(data->ifd);
close(data->ofd);
@@ -1246,18 +1254,31 @@ static void output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
lbp = lb + (4-(data->packet_bytes));
if ((sz = driver_sizeq(data->port_num)) > 0) {
- driver_enq(data->port_num, lbp, data->packet_bytes);
- driver_enq(data->port_num, buf, len);
- if (sz + len + data->packet_bytes >= (1 << 13))
+ if (data->packet_bytes != 0) {
+ driver_enq(data->port_num, lbp, data->packet_bytes);
+ }
+ driver_enq(data->port_num, buf, len);
+
+ if (sz + len + data->packet_bytes >= (1 << 13))
set_busy_port(data->port_num, 1);
}
else {
- driver_enq(data->port_num, buf, len); /* n is the skip value */
-
+ char *pbbuf;
+ if (data->packet_bytes != 0) {
+ pbbuf = malloc(len + data->packet_bytes);
+ int i;
+ for (i = 0; i < data->packet_bytes; i++) {
+ *pbbuf++ = *lbp++;
+ }
+ strncpy(pbbuf, buf, len);
+ pbbuf -= data->packet_bytes;
+ }
driver_select(data->port_num, data->output_event,
ERL_DRV_WRITE|ERL_DRV_USE, 1);
-
- WRITE_AIO(data->ofd, len, buf);
+ WRITE_AIO(data->ofd,
+ (data->packet_bytes ? len+data->packet_bytes : len),
+ (data->packet_bytes ? pbbuf : buf));
+ if (data->packet_bytes != 0) free(pbbuf);
}
return; /* 0; */
}
@@ -1273,12 +1294,12 @@ static int port_inp_failure(ErlDrvPort port_num, ErlDrvEvent ready_fd, int res)
ASSERT(res <= 0);
erl_drv_ose_event_fetch(ready_fd,&sig_no, NULL, (void **)&fd);
-
/* As we need to handle two signals, we do this in two steps */
if (driver_data[*fd].alive) {
report_exit_status(driver_data[*fd].report_exit, 0); /* status? */
}
else {
+ driver_select(port_num,ready_fd,DO_READ|DO_WRITE,0);
clear_fd_data(*fd);
driver_report_exit(driver_data[*fd].port_num, driver_data[*fd].status);
/* As we do not really know if the spawn has crashed or exited nicely
@@ -1317,6 +1338,10 @@ static void ready_input(ErlDrvData drv_data, ErlDrvEvent ready_fd)
}
else {
res = sig->fm_read_reply.actual;
+ if (res == 0) {
+ port_inp_failure(data->port_num, ready_fd, res);
+ break;
+ }
if (data->packet_bytes == 0) {
if (res < 0) {
@@ -1327,6 +1352,7 @@ static void ready_input(ErlDrvData drv_data, ErlDrvEvent ready_fd)
else if (res == 0) {
/* read of 0 bytes, eof, otherside of pipe is assumed dead */
port_inp_failure(data->port_num, ready_fd, res);
+ break;
}
else {
buf = driver_alloc(res);
@@ -1336,103 +1362,92 @@ static void ready_input(ErlDrvData drv_data, ErlDrvEvent ready_fd)
driver_output(data->port_num, (char*) buf, res);
driver_free(buf);
}
+ /* clear the previous read */
+ memset(data->aiocb.aio_buf, 0, res);
+
+ /* issue a new read */
+ DISPATCH_AIO(sig);
+ aio_read(&data->aiocb);
}
- /* We try to read the remainder */
- else if (fd_data[data->ifd].remain > 0) {
- if (res < 0) {
- if ((errno != EINTR) && (errno != ERRNO_BLOCK)) {
- port_inp_failure(data->port_num, ready_fd, res);
+ else if (data->packet_bytes && fd_data[data->ifd].remain > 0) {
+ /* we've read a partial package, or a header */
+
+ if (res == fd_data[data->ifd].remain) { /* we are done! */
+ char *buf = data->aiocb.aio_buf;
+ int i;
+
+ /* do we have anything buffered? */
+ if (fd_data[data->ifd].buf != NULL) {
+ memcpy(fd_data[data->ifd].buf + fd_data[data->ifd].sz,
+ buf, res);
+ buf = fd_data[data->ifd].buf;
}
- }
- else if (res == 0) {
- port_inp_failure(data->port_num, ready_fd, res);
- }
- else if (res == fd_data[data->ifd].remain) { /* we're done */
- driver_output(data->port_num,
- fd_data[data->ifd].buf,
- fd_data[data->ifd].sz);
+
+ fd_data[data->ifd].sz += res;
+ driver_output(data->port_num, buf, (fd_data[data->ifd].sz>0?fd_data[data->ifd].sz:res));
clear_fd_data(data->ifd);
- }
- else { /* if (res < fd_data[fd].remain) */
- fd_data[data->ifd].cpos += res;
- fd_data[data->ifd].remain -= res;
- }
- }
- else if (fd_data[data->ifd].remain == 0) { /* clean fd */
- if (res < 0) {
- if ((errno != EINTR) && (errno != ERRNO_BLOCK)) {
- port_inp_failure(data->port_num, ready_fd, res);
+
+ /* clear the previous read */
+ memset(data->aiocb.aio_buf, 0, res);
+
+ /* issue a new read */
+ DISPATCH_AIO(sig);
+ data->aiocb.aio_nbytes = data->packet_bytes;
+
+ if (data->aiocb.aio_buf == NULL) {
+ port_inp_failure(data->port_num, ready_fd, -1);
}
+ aio_read(&data->aiocb);
}
- else if (res == 0) { /* eof */
- port_inp_failure(data->port_num, ready_fd, res);
- }
- else if (res < data->packet_bytes - fd_data[data->ifd].psz) {
- memcpy(fd_data[data->ifd].pbuf+fd_data[data->ifd].psz,
- (void *)data->aiocb.aio_buf, res);
- fd_data[data->ifd].psz += res;
- }
- else { /* if (res >= packet_bytes) */
- unsigned char* cpos = (unsigned char*)data->aiocb.aio_buf;
- int bytes_left = res;
-
- while (1) {
- int psz = fd_data[data->ifd].psz;
- char* pbp = fd_data[data->ifd].pbuf + psz;
-
- while (bytes_left && (psz < data->packet_bytes)) {
- *pbp++ = *cpos++;
- bytes_left--;
- psz++;
- }
-
- if (psz < data->packet_bytes) {
- fd_data[data->ifd].psz = psz;
- break;
- }
- fd_data[data->ifd].psz = 0;
-
- switch (data->packet_bytes) {
- case 1: h = get_int8(fd_data[data->ifd].pbuf); break;
- case 2: h = get_int16(fd_data[data->ifd].pbuf); break;
- case 4: h = get_int32(fd_data[data->ifd].pbuf); break;
- default: ASSERT(0); return; /* -1; */
- }
-
- if (h <= (bytes_left)) {
- driver_output(data->port_num, (char*) cpos, h);
- cpos += h;
- bytes_left -= h;
- continue;
- }
- else { /* The last message we got was split */
- char *buf = erts_alloc_fnf(ERTS_ALC_T_FD_ENTRY_BUF, h);
- if (!buf) {
- errno = ENOMEM;
- port_inp_failure(data->port_num, ready_fd, -1);
- }
- else {
- erts_smp_atomic_add_nob(&sys_misc_mem_sz, h);
- sys_memcpy(buf, cpos, bytes_left);
- fd_data[data->ifd].buf = buf;
- fd_data[data->ifd].sz = h;
- fd_data[data->ifd].remain = h - bytes_left;
- fd_data[data->ifd].cpos = buf + bytes_left;
- }
- break;
- }
+ else if(res < fd_data[data->ifd].remain) { /* received part of a package */
+ if (fd_data[data->ifd].sz == 0) {
+
+ fd_data[data->ifd].sz += res;
+ memcpy(fd_data[data->ifd].buf, data->aiocb.aio_buf, res);
+ fd_data[data->ifd].remain -= res;
+ }
+ else {
+ memcpy(fd_data[data->ifd].buf + fd_data[data->ifd].sz,
+ data->aiocb.aio_buf, res);
+ fd_data[data->ifd].sz += res;
+ fd_data[data->ifd].remain -= res;
}
+ /* clear the previous read */
+ memset(data->aiocb.aio_buf, 0, res);
+
+ /* issue a new read */
+ DISPATCH_AIO(sig);
+ data->aiocb.aio_nbytes = fd_data[data->ifd].remain;
+
+ if (data->aiocb.aio_buf == NULL) {
+ port_inp_failure(data->port_num, ready_fd, -1);
+ }
+ aio_read(&data->aiocb);
}
}
+ else if (data->packet_bytes && fd_data[data->ifd].remain == 0) { /* we've recieved a header */
+
+ /* analyze the header FIXME */
+ switch (data->packet_bytes) {
+ case 1: h = get_int8(data->aiocb.aio_buf); break;
+ case 2: h = get_int16(data->aiocb.aio_buf); break;
+ case 4: h = get_int32(data->aiocb.aio_buf); break;
+ }
- /* reset the read buffer and init next asynch read */
- DISPATCH_AIO(sig);
- memset((void *)data->aiocb.aio_buf, 0, 255);
+ fd_data[data->ifd].buf = erts_alloc_fnf(ERTS_ALC_T_FD_ENTRY_BUF, h + data->packet_bytes);
+ fd_data[data->ifd].remain = ((h + data->packet_bytes) - res);
- if (res > 0) {
- if (aio_read(&data->aiocb))
- ramlog_printf("%s:%d: aio_read(%d) failed with %d\n",
- __FILE__,__LINE__,data->ifd,errno);
+ /* clear the previous read */
+ memset(data->aiocb.aio_buf, 0, data->packet_bytes);
+
+ /* issue a new read */
+ DISPATCH_AIO(sig);
+ data->aiocb.aio_nbytes = h;
+
+ if (data->aiocb.aio_buf == NULL) {
+ port_inp_failure(data->port_num, ready_fd, -1);
+ }
+ aio_read(&data->aiocb);
}
}
sig = erl_drv_ose_get_signal(ready_fd);
@@ -1466,15 +1481,21 @@ static void ready_output(ErlDrvData drv_data, ErlDrvEvent ready_fd)
driver_failure_posix(data->port_num, status);
}
else { /* written bytes > 0 */
- DISPATCH_AIO(sig);
- res = driver_deq(data->port_num, sig->fm_write_reply.actual);
- FREE_AIO(sig->fm_write_reply.buffer);
- if (res == 0)
- set_busy_port(data->port_num, 0);
- else {
- iov = driver_peekq(data->port_num, &vlen);
- WRITE_AIO(data->ofd, iov[0].iov_len, iov[0].iov_base);
- }
+ iov = driver_peekq(data->port_num, &vlen);
+ if (vlen > 0) {
+ DISPATCH_AIO(sig);
+ FREE_AIO(sig->fm_write_reply.buffer);
+ res = driver_deq(data->port_num, iov[0].iov_len);
+ if (res > 0) {
+ iov = driver_peekq(data->port_num, &vlen);
+ WRITE_AIO(data->ofd, iov[0].iov_len, iov[0].iov_base);
+ }
+ }
+ else if (vlen == 0) {
+ DISPATCH_AIO(sig);
+ FREE_AIO(sig->fm_write_reply.buffer);
+ }
+
}
sig = erl_drv_ose_get_signal(ready_fd);
}