diff options
Diffstat (limited to 'erts/emulator/drivers')
-rw-r--r-- | erts/emulator/drivers/common/efile_drv.c | 102 | ||||
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 40 | ||||
-rw-r--r-- | erts/emulator/drivers/unix/unix_efile.c | 21 |
3 files changed, 96 insertions, 67 deletions
diff --git a/erts/emulator/drivers/common/efile_drv.c b/erts/emulator/drivers/common/efile_drv.c index 36ed108b76..a251b064da 100644 --- a/erts/emulator/drivers/common/efile_drv.c +++ b/erts/emulator/drivers/common/efile_drv.c @@ -156,11 +156,11 @@ static ErlDrvSysInfo sys_info; * DARWIN. The testcase t_sendfile_crashduring reproduces * this error when using +A 10. */ -#if !defined(DARWIN) -#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0) -#else +#if defined(__APPLE__) && defined(__MACH__) #define USE_THRDS_FOR_SENDFILE 0 -#endif /* !DARWIN */ +#else +#define USE_THRDS_FOR_SENDFILE (sys_info.async_threads > 0) +#endif /* defined(__APPLE__) && defined(__MACH__) */ @@ -259,6 +259,7 @@ static void file_stop_select(ErlDrvEvent event, void* _); enum e_timer {timer_idle, timer_again, timer_write}; #ifdef HAVE_SENDFILE enum e_sendfile {sending, not_sending}; +static void free_sendfile(void *data); #endif /* HAVE_SENDFILE */ struct t_data; @@ -445,6 +446,8 @@ struct t_data } fadvise; #ifdef HAVE_SENDFILE struct { + ErlDrvPort port; + ErlDrvPDL q_mtx; int out_fd; off_t offset; Uint64 nbytes; @@ -752,15 +755,6 @@ file_stop(ErlDrvData e) TRACE_C('p'); -#ifdef HAVE_SENDFILE - if (desc->sendfile_state == sending && !USE_THRDS_FOR_SENDFILE) { - driver_select(desc->port,(ErlDrvEvent)(long)desc->d->c.sendfile.out_fd, - ERL_DRV_WRITE|ERL_DRV_USE,0); - } else if (desc->sendfile_state == sending) { - SET_NONBLOCKING(desc->d->c.sendfile.out_fd); - } -#endif /* HAVE_SENDFILE */ - if (desc->fd != FILE_FD_INVALID) { do_close(desc->flags, desc->fd); desc->fd = FILE_FD_INVALID; @@ -1783,26 +1777,31 @@ static void invoke_sendfile(void *data) d->c.sendfile.written += nbytes; - if (result == 1) { - if (USE_THRDS_FOR_SENDFILE) { - d->result_ok = 0; - } else if (d->c.sendfile.nbytes == 0 && nbytes != 0) { - d->result_ok = 1; - } else if ((d->c.sendfile.nbytes - nbytes) != 0) { - d->result_ok = 1; - d->c.sendfile.nbytes -= nbytes; - } else { - d->result_ok = 0; - } + if (result == 1 || (result == 0 && USE_THRDS_FOR_SENDFILE)) { + d->result_ok = 0; } else if (result == 0 && (d->errInfo.posix_errno == EAGAIN || d->errInfo.posix_errno == EINTR)) { + if ((d->c.sendfile.nbytes - nbytes) != 0) { d->result_ok = 1; + if (d->c.sendfile.nbytes != 0) + d->c.sendfile.nbytes -= nbytes; + } else + d->result_ok = 0; } else { d->result_ok = -1; } } static void free_sendfile(void *data) { + struct t_data *d = (struct t_data *)data; + if (USE_THRDS_FOR_SENDFILE) { + SET_NONBLOCKING(d->c.sendfile.out_fd); + } else { + MUTEX_LOCK(d->c.sendfile.q_mtx); + driver_deq(d->c.sendfile.port,1); + MUTEX_UNLOCK(d->c.sendfile.q_mtx); + driver_select(d->c.sendfile.port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE_NO_CALLBACK|ERL_DRV_WRITE, 0); + } EF_FREE(data); } @@ -1812,7 +1811,7 @@ static void file_ready_output(ErlDrvData data, ErlDrvEvent event) switch (fd->d->command) { case FILE_SENDFILE: - driver_select(fd->port, event, + driver_select(fd->d->c.sendfile.port, event, (int)ERL_DRV_WRITE,(int) 0); invoke_sendfile((void *)fd->d); file_async_ready(data, (ErlDrvThreadData)fd->d); @@ -1826,6 +1825,15 @@ static void file_stop_select(ErlDrvEvent event, void* _) { } + +static int flush_sendfile(file_descriptor *desc,void *_) { + if (desc->sendfile_state == sending) { + desc->d->result_ok = -1; + desc->d->errInfo.posix_errno = ECONNABORTED; + file_async_ready((ErlDrvData)desc,(ErlDrvThreadData)desc->d); + } + return 1; +} #endif /* HAVE_SENDFILE */ @@ -2248,36 +2256,23 @@ file_async_ready(ErlDrvData e, ErlDrvThreadData data) #ifdef HAVE_SENDFILE case FILE_SENDFILE: if (d->result_ok == -1) { - desc->sendfile_state = not_sending; if (d->errInfo.posix_errno == ECONNRESET || d->errInfo.posix_errno == ENOTCONN || d->errInfo.posix_errno == EPIPE) reply_string_error(desc,"closed"); else reply_error(desc, &d->errInfo); - if (USE_THRDS_FOR_SENDFILE) { - SET_NONBLOCKING(d->c.sendfile.out_fd); - free_sendfile(data); - } else { - driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, - ERL_DRV_USE, 0); - free_sendfile(data); - } - } else if (d->result_ok == 0) { desc->sendfile_state = not_sending; + free_sendfile(data); + } else if (d->result_ok == 0) { reply_Sint64(desc, d->c.sendfile.written); - if (USE_THRDS_FOR_SENDFILE) { - SET_NONBLOCKING(d->c.sendfile.out_fd); - free_sendfile(data); - } else { - driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, ERL_DRV_USE, 0); - free_sendfile(data); - } + desc->sendfile_state = not_sending; + free_sendfile(data); } else if (d->result_ok == 1) { // If we are using select to send the rest of the data desc->sendfile_state = sending; desc->d = d; driver_select(desc->port, (ErlDrvEvent)(long)d->c.sendfile.out_fd, - ERL_DRV_USE|ERL_DRV_WRITE, 1); + ERL_DRV_USE_NO_CALLBACK|ERL_DRV_WRITE, 1); } break; #endif @@ -2655,6 +2650,10 @@ file_flush(ErlDrvData e) { TRACE_C('f'); +#ifdef HAVE_SENDFILE + flush_sendfile(desc, NULL); +#endif + #ifdef DEBUG r = #endif @@ -3454,11 +3453,13 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { d->fd = desc->fd; d->command = command; d->invoke = invoke_sendfile; - d->free = NULL; + d->free = free_sendfile; d->level = 2; d->c.sendfile.out_fd = (int) out_fd; d->c.sendfile.written = 0; + d->c.sendfile.port = desc->port; + d->c.sendfile.q_mtx = desc->q_mtx; #if SIZEOF_OFF_T == 4 if (offsetH != 0) { @@ -3474,6 +3475,19 @@ file_outputv(ErlDrvData e, ErlIOVec *ev) { if (USE_THRDS_FOR_SENDFILE) { SET_BLOCKING(d->c.sendfile.out_fd); + } else { + /** + * Write a place holder to queue in order to force file_flush + * to be called before the driver is closed. + */ + char tmp[1] = ""; + MUTEX_LOCK(d->c.sendfile.q_mtx); + if (driver_enq(d->c.sendfile.port, tmp, 1)) { + MUTEX_UNLOCK(d->c.sendfile.q_mtx); + reply_posix_error(desc, ENOMEM); + goto done; + } + MUTEX_UNLOCK(d->c.sendfile.q_mtx); } cq_enq(desc, d); diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 47a99fdbe6..d1c2dbf94c 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -553,6 +553,12 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) # define VALGRIND_MAKE_MEM_DEFINED(ptr,size) #endif +/* + Magic errno value used locally for return of {error, system_limit} + - the emulator definition of SYSTEM_LIMIT is not available here. +*/ +#define INET_ERRNO_SYSTEM_LIMIT (15 << 8) + /*---------------------------------------------------------------------------- ** Interface constants. ** @@ -1645,6 +1651,17 @@ static struct erl_drv_entry dummy_sctp_driver_entry = #endif +/* return lowercase string form of errno value */ +static char *errno_str(int err) +{ + switch (err) { + case INET_ERRNO_SYSTEM_LIMIT: + return "system_limit"; + default: + return erl_errno_id(err); + } +} + /* general control reply function */ static ErlDrvSSizeT ctl_reply(int rep, char* buf, ErlDrvSizeT len, char** rbuf, ErlDrvSizeT rsize) @@ -1665,13 +1682,9 @@ static ErlDrvSSizeT ctl_reply(int rep, char* buf, ErlDrvSizeT len, /* general control error reply function */ static ErlDrvSSizeT ctl_error(int err, char** rbuf, ErlDrvSizeT rsize) { - char response[256]; /* Response buffer. */ - char* s; - char* t; + char* s = errno_str(err); - for (s = erl_errno_id(err), t = response; *s; s++, t++) - *t = tolower(*s); - return ctl_reply(INET_REP_ERROR, response, t-response, rbuf, rsize); + return ctl_reply(INET_REP_ERROR, s, strlen(s), rbuf, rsize); } static ErlDrvSSizeT ctl_xerror(char* xerr, char** rbuf, ErlDrvSizeT rsize) @@ -1683,14 +1696,7 @@ static ErlDrvSSizeT ctl_xerror(char* xerr, char** rbuf, ErlDrvSizeT rsize) static ErlDrvTermData error_atom(int err) { - char errstr[256]; - char* s; - char* t; - - for (s = erl_errno_id(err), t = errstr; *s; s++, t++) - *t = tolower(*s); - *t = '\0'; - return driver_mk_atom(errstr); + return driver_mk_atom(errno_str(err)); } @@ -4089,6 +4095,7 @@ static char* buf_to_sockaddr(char* ptr, char* end, struct sockaddr* addr) addr->sa_family = AF_INET; return ptr + sizeof(struct in_addr); } +#if defined(HAVE_IN6) && defined(AF_INET6) case INET_AF_INET6: { struct in6_addr *p = &((struct sockaddr_in6*)addr)->sin6_addr; buf_check(ptr,end,sizeof(struct in6_addr)); @@ -4096,6 +4103,7 @@ static char* buf_to_sockaddr(char* ptr, char* end, struct sockaddr* addr) addr->sa_family = AF_INET6; return ptr + sizeof(struct in6_addr); } +#endif } error: return NULL; @@ -8051,7 +8059,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args) /* Copy a descriptor, by creating a new port with same settings * as the descriptor desc. - * return NULL on error (ENFILE no ports avail) + * return NULL on error (SYSTEM_LIMIT no ports avail) */ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, ErlDrvTermData owner, int* err) @@ -8090,7 +8098,7 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, /* The new port will be linked and connected to the original caller */ port = driver_create_port(port, owner, "tcp_inet", (ErlDrvData) copy_desc); if ((long)port == -1) { - *err = ENFILE; + *err = INET_ERRNO_SYSTEM_LIMIT; FREE(copy_desc); return NULL; } diff --git a/erts/emulator/drivers/unix/unix_efile.c b/erts/emulator/drivers/unix/unix_efile.c index 796843a735..dfb6cece14 100644 --- a/erts/emulator/drivers/unix/unix_efile.c +++ b/erts/emulator/drivers/unix/unix_efile.c @@ -1426,6 +1426,14 @@ efile_fadvise(Efile_error* errInfo, int fd, Sint64 offset, * you would have to emulate it in linux and on BSD/Darwin some complex * calculations have to be made when using a non blocking socket to figure * out how much of the header/file/trailer was sent in each command. + * + * The semantics of the API is this: + * Return value: 1 if all data was sent and the function does not need to + * be called again. 0 if an error occures OR if there is more data which + * has to be sent (EAGAIN or EINTR will be set appropriately) + * + * The amount of data written in a call is returned through nbytes. + * */ int @@ -1446,8 +1454,11 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, *nbytes -= retval; } } while (retval == SENDFILE_CHUNK_SIZE); - *nbytes = written; - return check_error(retval == -1 ? -1 : 0, errInfo); + if (written != 0) { + // -1 is not returned by the linux API so we have to simulate it + retval = -1; + errno = EAGAIN; + } #elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV) ssize_t retval; size_t len; @@ -1469,8 +1480,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, written += len; } } while (len == SENDFILE_CHUNK_SIZE); - *nbytes = written; - return check_error(retval == -1 ? -1 : 0, errInfo); #elif defined(DARWIN) int retval; off_t len; @@ -1487,8 +1496,6 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, written += len; } } while (len == SENDFILE_CHUNK_SIZE); - *nbytes = written; - return check_error(retval, errInfo); #elif defined(__FreeBSD__) || defined(__DragonFly__) off_t len; int retval; @@ -1504,8 +1511,8 @@ efile_sendfile(Efile_error* errInfo, int in_fd, int out_fd, written += len; } } while(len == SENDFILE_CHUNK_SIZE); +#endif *nbytes = written; return check_error(retval, errInfo); -#endif } #endif /* HAVE_SENDFILE */ |