diff options
Diffstat (limited to 'erts/emulator/drivers/common/inet_drv.c')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 382 |
1 files changed, 374 insertions, 8 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 95d61fcc5d..4294fb4f46 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -63,6 +63,20 @@ #include <sys/un.h> #endif +#ifdef HAVE_SENDFILE +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) + #include <sys/sendfile.h> +#elif defined(__FreeBSD__) || defined(__DragonFly__) + /* Need to define __BSD_VISIBLE in order to expose prototype of sendfile */ + #define __BSD_VISIBLE 1 + #include <sys/socket.h> +#endif +#endif + +#if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__) + #define __DARWIN__ 1 +#endif + /* All platforms fail on malloc errors. */ #define FATAL_MALLOC @@ -701,6 +715,7 @@ static size_t my_strnlen(const char *s, size_t maxlen) #define TCP_REQ_RECV 42 #define TCP_REQ_UNRECV 43 #define TCP_REQ_SHUTDOWN 44 +#define TCP_REQ_SENDFILE 45 /* UDP and SCTP requests */ #define PACKET_REQ_RECV 60 /* Common for UDP and SCTP */ /* #define SCTP_REQ_LISTEN 61 MERGED Different from TCP; not for UDP */ @@ -723,6 +738,7 @@ static size_t my_strnlen(const char *s, size_t maxlen) #define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occurred on send or shutdown */ #define TCP_ADDF_SHUTDOWN_WR_DONE 256 /* A shutdown(sock, SHUT_WR) or SHUT_RDWR was made */ #define TCP_ADDF_LINGER_ZERO 512 /* Discard driver queue on port close */ +#define TCP_ADDF_SENDFILE 1024 /* Send from an fd instead of the driver queue */ /* *_REQ_* replies */ #define INET_REP_ERROR 0 @@ -1235,6 +1251,21 @@ typedef struct { inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */ inet_async_multi_op *multi_last; MultiTimerData *mtd; /* Timer structures for multiple accept */ +#ifdef HAVE_SENDFILE + struct { + ErlDrvSizeT ioq_skip; /* The number of bytes in the queue at the time + * sendfile was issued, which must be sent + * before issuing the sendfile call itself. */ + int dup_file_fd; /* The file handle to send from; this is + * duplicated when sendfile is issued to + * reduce (but not eliminate) the impact of a + * nasty race, so we have to remember to close + * it. */ + Uint64 bytes_sent; + Uint64 offset; + Uint64 length; + } sendfile; +#endif } tcp_descriptor; /* send function */ @@ -1245,6 +1276,8 @@ static int tcp_deliver(tcp_descriptor* desc, int len); static int tcp_shutdown_error(tcp_descriptor* desc, int err); +static int tcp_inet_sendfile(tcp_descriptor* desc); + static int tcp_inet_output(tcp_descriptor* desc, HANDLE event); static int tcp_inet_input(tcp_descriptor* desc, HANDLE event); @@ -1329,6 +1362,9 @@ static ErlDrvTermData am_ipv6_v6only; static ErlDrvTermData am_netns; static ErlDrvTermData am_bind_to_device; #endif +#ifdef HAVE_SENDFILE +static ErlDrvTermData am_sendfile; +#endif static char str_eafnosupport[] = "eafnosupport"; static char str_einval[] = "einval"; @@ -3875,6 +3911,10 @@ static int inet_init() INIT_ATOM(https); INIT_ATOM(scheme); +#ifdef HAVE_SENDFILE + INIT_ATOM(sendfile); +#endif + /* add TCP, UDP and SCTP drivers */ add_driver_entry(&tcp_inet_driver_entry); #ifdef HAVE_UDP @@ -9270,6 +9310,13 @@ static void tcp_inet_stop(ErlDrvData e) * will be freed through tcp_inet_stop later on. */ static void tcp_desc_close(tcp_descriptor* desc) { +#ifdef HAVE_SENDFILE + if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE; + close(desc->sendfile.dup_file_fd); + } +#endif + tcp_clear_input(desc); tcp_clear_output(desc); @@ -9608,6 +9655,60 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } } + + case TCP_REQ_SENDFILE: { +#ifdef HAVE_SENDFILE + const ErlDrvSizeT required_len = + sizeof(desc->sendfile.dup_file_fd) + + sizeof(Uint64) * 2; + + int raw_file_fd; + + DEBUGF(("tcp_inet_ctl(%ld): SENDFILE\r\n", (long)desc->inet.port)); + + if (len != required_len) { + return ctl_error(EINVAL, rbuf, rsize); + } else if (!IS_CONNECTED(INETP(desc))) { + return ctl_error(ENOTCONN, rbuf, rsize); + } + + sys_memcpy(&raw_file_fd, buf, sizeof(raw_file_fd)); + buf += sizeof(raw_file_fd); + + desc->sendfile.dup_file_fd = dup(raw_file_fd); + + if(desc->sendfile.dup_file_fd == -1) { + return ctl_error(errno, rbuf, rsize); + } + + desc->sendfile.offset = get_int64(buf); + buf += sizeof(Uint64); + + desc->sendfile.length = get_int64(buf); + buf += sizeof(Uint64); + + ASSERT(desc->sendfile.offset >= 0); + ASSERT(desc->sendfile.length >= 0); + + desc->sendfile.ioq_skip = driver_sizeq(desc->inet.port); + desc->sendfile.bytes_sent = 0; + + desc->inet.caller = driver_caller(desc->inet.port); + desc->tcp_add_flags |= TCP_ADDF_SENDFILE; + + /* See if we can finish sending without selecting & rescheduling. */ + tcp_inet_sendfile(desc); + + if(desc->sendfile.length > 0) { + sock_select(INETP(desc), FD_WRITE, 1); + } + + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); +#else + return ctl_error(ENOTSUP, rbuf, rsize); +#endif + } + default: DEBUGF(("tcp_inet_ctl(%ld): %u\r\n", (long)desc->inet.port, cmd)); return inet_ctl(INETP(desc), cmd, buf, len, rbuf, rsize); @@ -9747,12 +9848,27 @@ static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev) static void tcp_inet_flush(ErlDrvData e) { tcp_descriptor* desc = (tcp_descriptor*)e; - if (!(desc->inet.event_mask & FD_WRITE)) { - /* Discard send queue to avoid hanging port (OTP-7615) */ - tcp_clear_output(desc); + int discard_output; + + /* Discard send queue to avoid hanging port (OTP-7615) */ + discard_output = !(desc->inet.event_mask & FD_WRITE); + + discard_output |= desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO; + +#ifdef HAVE_SENDFILE + /* The old file driver aborted when it was stopped during sendfile, so + * we'll clear the flag and discard all output. */ + if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE; + close(desc->sendfile.dup_file_fd); + + discard_output = 1; + } +#endif + + if (discard_output) { + tcp_clear_output(desc); } - if (desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO) - tcp_clear_output(desc); } static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) @@ -10647,7 +10763,9 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) ev->size += h_len; } - if ((sz = driver_sizeq(ix)) > 0) { + sz = driver_sizeq(ix); + + if ((desc->tcp_add_flags & TCP_ADDF_SENDFILE) || sz > 0) { driver_enqv(ix, ev, 0); if (sz+ev->size >= desc->high) { DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n", @@ -10741,8 +10859,9 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len) inet_output_count(INETP(desc), len+h_len); + sz = driver_sizeq(ix); - if ((sz = driver_sizeq(ix)) > 0) { + if ((desc->tcp_add_flags & TCP_ADDF_SENDFILE) || sz > 0) { if (h_len > 0) driver_enq(ix, buf, h_len); driver_enq(ix, ptr, len); @@ -10832,6 +10951,246 @@ static void tcp_inet_drv_input(ErlDrvData data, ErlDrvEvent event) (void)tcp_inet_input((tcp_descriptor*)data, (HANDLE)event); } +#ifdef HAVE_SENDFILE +static int tcp_sendfile_completed(tcp_descriptor* desc) { + ErlDrvTermData spec[LOAD_PORT_CNT + LOAD_TUPLE_CNT * 2 + + LOAD_ATOM_CNT * 2 + LOAD_UINT_CNT * 2]; + Uint32 sent_low, sent_high; + int i; + + desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE; + close(desc->sendfile.dup_file_fd); + + /* While we flushed the output queue prior to sending the file, we've + * deferred clearing busy status until now as there's no point in doing so + * while we still have a file to send. + * + * The watermark is checked since more data may have been added while we + * were sending the file. */ + + if (driver_sizeq(desc->inet.port) <= desc->low) { + if (IS_BUSY(INETP(desc))) { + desc->inet.caller = desc->inet.busy_caller; + desc->inet.state &= ~INET_F_BUSY; + + set_busy_port(desc->inet.port, 0); + + /* if we have a timer then cancel and send ok to client */ + if (desc->busy_on_send) { + driver_cancel_timer(desc->inet.port); + desc->busy_on_send = 0; + } + + inet_reply_ok(INETP(desc)); + } + } + + if (driver_sizeq(desc->inet.port) == 0) { + sock_select(INETP(desc), FD_WRITE, 0); + send_empty_out_q_msgs(INETP(desc)); + + if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN) { + tcp_shutdown_async(desc); + } + } + + sent_low = ((Uint64)desc->sendfile.bytes_sent >> 0) & 0xFFFFFFFF; + sent_high = ((Uint64)desc->sendfile.bytes_sent >> 32) & 0xFFFFFFFF; + + i = LOAD_ATOM(spec, 0, am_sendfile); + i = LOAD_PORT(spec, i, desc->inet.dport); + i = LOAD_ATOM(spec, i, am_ok); + i = LOAD_UINT(spec, i, sent_low); + i = LOAD_UINT(spec, i, sent_high); + i = LOAD_TUPLE(spec, i, 3); + i = LOAD_TUPLE(spec, i, 3); + + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return erl_drv_output_term(desc->inet.dport, spec, i); +} + +static int tcp_sendfile_aborted(tcp_descriptor* desc, int socket_error) { + ErlDrvTermData spec[LOAD_PORT_CNT + LOAD_TUPLE_CNT * 2 + LOAD_ATOM_CNT * 3]; + int i; + + /* We don't clean up sendfile state here, as that's done in tcp_desc_close + * following normal error handling. All we do here is report the failure. */ + + i = LOAD_ATOM(spec, 0, am_sendfile); + i = LOAD_PORT(spec, i, desc->inet.dport); + i = LOAD_ATOM(spec, i, am_error); + + switch (socket_error) { + case ECONNRESET: + case ENOTCONN: + case EPIPE: + i = LOAD_ATOM(spec, i, am_closed); + break; + default: + i = LOAD_ATOM(spec, i, error_atom(socket_error)); + } + + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 3); + + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return erl_drv_output_term(desc->inet.dport, spec, i); +} + +static int tcp_inet_sendfile(tcp_descriptor* desc) { + ErlDrvPort ix = desc->inet.port; + int result = 0; + ssize_t n; + + DEBUGF(("tcp_inet_sendfile(%ld) {s=%d\r\n", (long)ix, desc->inet.s)); + + /* If there was any data in the queue by the time sendfile was issued, + * we'll need to skip it first. Note that we don't clear busy status until + * we're finished sending the file. */ + while (desc->sendfile.ioq_skip > 0) { + ssize_t bytes_to_send; + SysIOVec* iov; + int vsize; + + ASSERT(driver_sizeq(ix) >= desc->sendfile.ioq_skip); + + if ((iov = driver_peekq(ix, &vsize)) == NULL) { + ERTS_INTERNAL_ERROR("ioq empty when sendfile.ioq_skip > 0"); + } + + bytes_to_send = MIN(desc->sendfile.ioq_skip, iov[0].iov_len); + n = sock_send(desc->inet.s, iov[0].iov_base, bytes_to_send, 0); + + if (!IS_SOCKET_ERROR(n)) { + desc->sendfile.ioq_skip -= n; + driver_deq(ix, n); + } else if (sock_errno() == ERRNO_BLOCK) { +#ifdef __WIN32__ + desc->inet.send_would_block = 1; +#endif + goto done; + } else if (sock_errno() != EINTR) { + goto socket_error; + } + } + + while (desc->sendfile.length > 0) { + /* For some reason the maximum ssize_t cannot be used as the max size. + * 1GB seems to work on all platforms */ + const Sint64 SENDFILE_CHUNK_SIZE = ((1UL << 30) - 1); + + ssize_t bytes_to_send = MIN(SENDFILE_CHUNK_SIZE, desc->sendfile.length); + off_t offset = desc->sendfile.offset; + +#if defined(__linux__) + n = sendfile(desc->inet.s, desc->sendfile.dup_file_fd, &offset, + bytes_to_send); +#elif defined(__FreeBSD__) || defined(__DragonFly__) || defined(__DARWIN__) + { + off_t bytes_sent; + int error; + + #if defined(__DARWIN__) + bytes_sent = bytes_to_send; + + error = sendfile(desc->sendfile.dup_file_fd, desc->inet.s, offset, + &bytes_sent, NULL, 0); + n = bytes_sent; + #else + error = sendfile(desc->sendfile.dup_file_fd, desc->inet.s, offset, + bytes_to_send, NULL, &bytes_sent, 0); + n = bytes_sent; + #endif + + if(error < 0) { + /* EAGAIN/EINTR report partial success by setting bytes_sent, + * so we have to skip error handling if nonzero, and skip EOF + * handling if zero, as it's possible that we didn't manage to + * send anything at all before being interrupted by a + * signal. */ + if((errno != EAGAIN && errno != EINTR) || bytes_sent == 0) { + n = -1; + } + } + } +#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV) + { + sendfilevec_t sfvec[1]; + size_t bytes_sent; + ssize_t error; + + sfvec[0].sfv_fd = desc->sendfile.dup_file_fd; + sfvec[0].sfv_len = bytes_to_send; + sfvec[0].sfv_off = offset; + sfvec[0].sfv_flag = 0; + + error = sendfilev(desc->inet.s, sfvec, 1, &bytes_sent); + n = bytes_sent; + + if(error < 0) { + if(errno == EINVAL) { + /* On some solaris versions (I've seen it on SunOS 5.10), + * using a sfv_len larger than the filesize will result in + * a (-1 && errno == EINVAL). We translate this to a + * successful send of the data.*/ + } else { + /* EAGAIN/EINTR behavior is identical to *BSD. */ + if((errno != EAGAIN && errno != EINTR) || bytes_sent == 0) { + n = -1; + } + } + } + } +#else + #error "Unsupported sendfile syscall; update configure test." +#endif + + if (n > 0) { + desc->sendfile.bytes_sent += n; + desc->sendfile.offset += n; + desc->sendfile.length -= n; + } else if (n == 0) { + /* EOF. */ + desc->sendfile.length = 0; + break; + } else if (IS_SOCKET_ERROR(n) && sock_errno() != EINTR) { + if (sock_errno() != ERRNO_BLOCK) { + goto socket_error; + } + +#ifdef __WIN32__ + desc->inet.send_would_block = 1; +#endif + break; + } + } + + if (desc->sendfile.length == 0) { + tcp_sendfile_completed(desc); + } + + goto done; + +socket_error: { + int socket_errno = sock_errno(); + + DEBUGF(("tcp_inet_sendfile(%ld): send errno = %d (errno %d)\r\n", + (long)desc->inet.port, socket_errno, errno)); + + result = tcp_send_error(desc, socket_errno); + tcp_sendfile_aborted(desc, socket_errno); + + goto done; + } + +done: + DEBUGF(("tcp_inet_sendfile(%ld) }\r\n", (long)desc->inet.port)); + return result; +} +#endif /* HAVE_SENDFILE */ + /* socket ready for ouput: ** 1. INET_STATE_CONNECTING => non block connect ? ** 2. INET_STATE_CONNECTED => write output @@ -10892,7 +11251,14 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) async_ok(INETP(desc)); } else if (IS_CONNECTED(INETP(desc))) { - for (;;) { + +#ifdef HAVE_SENDFILE + if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + return tcp_inet_sendfile(desc); + } +#endif + + for (;;) { int vsize; ssize_t n; SysIOVec* iov; |