diff options
Diffstat (limited to 'erts/emulator/drivers/common/inet_drv.c')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 710 |
1 files changed, 566 insertions, 144 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 3029958f26..5e9afdc5ca 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1997-2016. All Rights Reserved. + * Copyright Ericsson AB 1997-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,6 +63,20 @@ #include <sys/un.h> #endif +#ifdef HAVE_SENDFILE +#if defined(__linux__) || (defined(__sun) && defined(__SVR4)) + #include <sys/sendfile.h> +#elif defined(__FreeBSD__) || defined(__DragonFly__) + /* Need to define __BSD_VISIBLE in order to expose prototype of sendfile */ + #define __BSD_VISIBLE 1 + #include <sys/socket.h> +#endif +#endif + +#if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__) + #define __DARWIN__ 1 +#endif + /* All platforms fail on malloc errors. */ #define FATAL_MALLOC @@ -591,7 +605,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) (((unsigned char*) (s))[1] << 8) | \ (((unsigned char*) (s))[0])) -#ifdef HAVE_SYS_UN_H +#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) /* strnlen doesn't exist everywhere */ static size_t my_strnlen(const char *s, size_t maxlen) @@ -602,14 +616,6 @@ static size_t my_strnlen(const char *s, size_t maxlen) return i; } -/* Check that some character in the buffer != '\0' */ -static int is_nonzero(const char *s, size_t n) -{ - size_t i; - for (i = 0; i < n; i++) if (s[i] != '\0') return !0; - return 0; -} - #endif #ifdef VALGRIND @@ -709,6 +715,7 @@ static int is_nonzero(const char *s, size_t n) #define TCP_REQ_RECV 42 #define TCP_REQ_UNRECV 43 #define TCP_REQ_SHUTDOWN 44 +#define TCP_REQ_SENDFILE 45 /* UDP and SCTP requests */ #define PACKET_REQ_RECV 60 /* Common for UDP and SCTP */ /* #define SCTP_REQ_LISTEN 61 MERGED Different from TCP; not for UDP */ @@ -728,9 +735,10 @@ static int is_nonzero(const char *s, size_t n) #define TCP_ADDF_PENDING_SHUTDOWN \ (TCP_ADDF_PENDING_SHUT_WR | TCP_ADDF_PENDING_SHUT_RDWR) #define TCP_ADDF_SHOW_ECONNRESET 64 /* Tell user about incoming RST */ -#define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occured on send or shutdown */ +#define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occurred on send or shutdown */ #define TCP_ADDF_SHUTDOWN_WR_DONE 256 /* A shutdown(sock, SHUT_WR) or SHUT_RDWR was made */ #define TCP_ADDF_LINGER_ZERO 512 /* Discard driver queue on port close */ +#define TCP_ADDF_SENDFILE 1024 /* Send from an fd instead of the driver queue */ /* *_REQ_* replies */ #define INET_REP_ERROR 0 @@ -778,6 +786,7 @@ static int is_nonzero(const char *s, size_t n) #define INET_LOPT_TCP_SHOW_ECONNRESET 39 /* tell user about incoming RST */ #define INET_LOPT_LINE_DELIM 40 /* Line delimiting char */ #define INET_OPT_TCLASS 41 /* IPv6 transport class */ +#define INET_OPT_BIND_TO_DEVICE 42 /* get/set network device the socket is bound to */ /* SCTP options: a separate range, from 100: */ #define SCTP_OPT_RTOINFO 100 #define SCTP_OPT_ASSOCINFO 101 @@ -1144,7 +1153,6 @@ static int packet_inet_init(void); static void packet_inet_stop(ErlDrvData); static void packet_inet_command(ErlDrvData, char*, ErlDrvSizeT); static void packet_inet_drv_input(ErlDrvData data, ErlDrvEvent event); -static void packet_inet_drv_output(ErlDrvData data, ErlDrvEvent event); static ErlDrvData udp_inet_start(ErlDrvPort, char* command); #ifdef HAVE_SCTP static ErlDrvData sctp_inet_start(ErlDrvPort, char* command); @@ -1169,7 +1177,7 @@ static struct erl_drv_entry udp_inet_driver_entry = NULL, #else packet_inet_drv_input, - packet_inet_drv_output, + NULL, #endif "udp_inet", NULL, @@ -1204,7 +1212,7 @@ static struct erl_drv_entry sctp_inet_driver_entry = NULL, #else packet_inet_drv_input, - packet_inet_drv_output, + NULL, #endif "sctp_inet", NULL, @@ -1244,6 +1252,21 @@ typedef struct { inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */ inet_async_multi_op *multi_last; MultiTimerData *mtd; /* Timer structures for multiple accept */ +#ifdef HAVE_SENDFILE + struct { + ErlDrvSizeT ioq_skip; /* The number of bytes in the queue at the time + * sendfile was issued, which must be sent + * before issuing the sendfile call itself. */ + int dup_file_fd; /* The file handle to send from; this is + * duplicated when sendfile is issued to + * reduce (but not eliminate) the impact of a + * nasty race, so we have to remember to close + * it. */ + Uint64 bytes_sent; + Uint64 offset; + Uint64 length; + } sendfile; +#endif } tcp_descriptor; /* send function */ @@ -1254,9 +1277,13 @@ static int tcp_deliver(tcp_descriptor* desc, int len); static int tcp_shutdown_error(tcp_descriptor* desc, int err); +static int tcp_inet_sendfile(tcp_descriptor* desc); + static int tcp_inet_output(tcp_descriptor* desc, HANDLE event); static int tcp_inet_input(tcp_descriptor* desc, HANDLE event); +static void tcp_desc_close(tcp_descriptor*); + #ifdef HAVE_UDP typedef struct { inet_descriptor inet; /* common data structure (DON'T MOVE) */ @@ -1268,7 +1295,6 @@ typedef struct { static int packet_inet_input(udp_descriptor* udesc, HANDLE event); -static int packet_inet_output(udp_descriptor* udesc, HANDLE event); #endif /* convert descriptor pointer to inet_descriptor pointer */ @@ -1334,6 +1360,10 @@ static ErlDrvTermData am_tos; static ErlDrvTermData am_tclass; static ErlDrvTermData am_ipv6_v6only; static ErlDrvTermData am_netns; +static ErlDrvTermData am_bind_to_device; +#endif +#ifdef HAVE_SENDFILE +static ErlDrvTermData am_sendfile; #endif static char str_eafnosupport[] = "eafnosupport"; @@ -2203,13 +2233,16 @@ static int inet_reply_ok(inet_descriptor* desc) ErlDrvTermData caller = desc->caller; int i = 0; + desc->caller = 0; + if (is_not_internal_pid(caller)) + return 0; + i = LOAD_ATOM(spec, i, am_inet_reply); i = LOAD_PORT(spec, i, desc->dport); i = LOAD_ATOM(spec, i, am_ok); i = LOAD_TUPLE(spec, i, 3); ASSERT(i == sizeof(spec)/sizeof(*spec)); - desc->caller = 0; return erl_drv_send_term(desc->dport, caller, spec, i); } @@ -3725,6 +3758,7 @@ static void inet_init_sctp(void) { INIT_ATOM(tclass); INIT_ATOM(ipv6_v6only); INIT_ATOM(netns); + INIT_ATOM(bind_to_device); /* Option names */ INIT_ATOM(sctp_rtoinfo); @@ -3877,6 +3911,10 @@ static int inet_init() INIT_ATOM(https); INIT_ATOM(scheme); +#ifdef HAVE_SENDFILE + INIT_ATOM(sendfile); +#endif + /* add TCP, UDP and SCTP drivers */ add_driver_entry(&tcp_inet_driver_entry); #ifdef HAVE_UDP @@ -4018,13 +4056,30 @@ static char* inet_set_address(int family, inet_address* dst, int n; if (*len == 0) return str_einval; n = *((unsigned char*)(*src)); /* Length field */ - if ((*len < 1+n) || (sizeof(dst->sal.sun_path) < n+1)) { + if (*len < 1+n) return str_einval; + if (n + +#ifdef __linux__ + /* Make sure the address gets zero terminated + * except when the first byte is \0 because then it is + * sort of zero terminated although the zero termination + * comes before the address... + * This fix handles Linux's nonportable + * abstract socket address extension. + */ + ((*len) > 1 && (*src)[1] == '\0' ? 0 : 1) +#else + 1 +#endif + > sizeof(dst->sal.sun_path)) { return str_einval; } sys_memzero((char*)dst, sizeof(struct sockaddr_un)); dst->sal.sun_family = family; sys_memcpy(dst->sal.sun_path, (*src)+1, n); *len = offsetof(struct sockaddr_un, sun_path) + n; +#ifndef NO_SA_LEN + dst->sal.sun_len = *len; +#endif *src += 1 + n; return NULL; } @@ -4132,8 +4187,8 @@ static char *inet_set_faddress(int family, inet_address* dst, /* Get a inaddr structure ** src = inaddr structure -** *len is the lenght of structure ** dst is filled with [F,P1,P0,X1,....] +** *len is the length of structure ** where F is the family code (coded) ** and *len is the length of dst on return ** (suitable to deliver to erlang) @@ -4169,15 +4224,16 @@ static int inet_get_address(char* dst, inet_address* src, unsigned int* len) if (*len < offsetof(struct sockaddr_un, sun_path)) return -1; n = *len - offsetof(struct sockaddr_un, sun_path); if (255 < n) return -1; - /* Portability fix: Assume that the address is a zero terminated - * string, except when the first byte is \0 i.e the - * string length is 0. Then use the reported length instead. - * This fix handles Linux's abstract socket address - * nonportable extension. - */ m = my_strnlen(src->sal.sun_path, n); - if ((m == 0) && is_nonzero(src->sal.sun_path, n)) - m = n; +#ifdef __linux__ + /* Assume that the address is a zero terminated string, + * except when the first byte is \0 i.e the string length is 0, + * then use the reported length instead. + * This fix handles Linux's nonportable + * abstract socket address extension. + */ + if (m == 0) m = n; +#endif dst[0] = INET_AF_LOCAL; dst[1] = (char) ((unsigned char) m); sys_memcpy(dst+2, src->sal.sun_path, m); @@ -4234,15 +4290,16 @@ inet_address_to_erlang(char *dst, inet_address **src, SOCKLEN_T sz) { if (sz < offsetof(struct sockaddr_un, sun_path)) return -1; n = sz - offsetof(struct sockaddr_un, sun_path); if (255 < n) return -1; - /* Portability fix: Assume that the address is a zero terminated - * string, except when the first byte is \0 i.e the - * string length is 0. Then use the reported length instead. - * This fix handles Linux's abstract socket address - * nonportable extension. - */ m = my_strnlen((*src)->sal.sun_path, n); - if ((m == 0) && is_nonzero((*src)->sal.sun_path, n)) - m = n; +#ifdef __linux__ + /* Assume that the address is a zero terminated string, + * except when the first byte is \0 i.e the string length is 0, + * Then use the reported length instead. + * This fix handles Linux's nonportable + * abstract socket address extension. + */ + if (m == 0) m = n; +#endif if (dst) { dst[0] = INET_AF_LOCAL; dst[1] = (char) ((unsigned char) m); @@ -4320,6 +4377,12 @@ static void desc_close(inet_descriptor* desc) desc->event = INVALID_EVENT; /* closed by stop_select callback */ desc->s = INVALID_SOCKET; desc->event_mask = 0; + + /* mark as disconnected in case when socket is left lingering due to + * {exit_on_close, false} option in gen_tcp socket creation. Next + * write to socket should produce {error, enotconn} and send a + * message {tcp_error,#Port<>,econnreset} */ + desc->state &= ~INET_STATE_CONNECTED; } } @@ -5946,6 +6009,9 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) int ival; char* arg_ptr; int arg_sz; +#ifdef SO_BINDTODEVICE + char ifname[IFNAMSIZ]; +#endif enum PacketParseType old_htype = desc->htype; int old_active = desc->active; int propagate; /* Set to 1 if failure to set this option @@ -6331,6 +6397,29 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) len -= arg_sz; break; +#ifdef SO_BINDTODEVICE + case INET_OPT_BIND_TO_DEVICE: + if (ival < 0) return -1; + if (len < ival) return -1; + if (ival > sizeof(ifname)) { + return -1; + } + memcpy(ifname, ptr, ival); + ifname[ival] = '\0'; + ptr += ival; + len -= ival; + + proto = SOL_SOCKET; + type = SO_BINDTODEVICE; + arg_ptr = (char*)&ifname; + arg_sz = sizeof(ifname); + propagate = 1; /* We do want to know if this fails */ + + DEBUGF(("inet_set_opts(%ld): s=%d, SO_BINDTODEVICE=%s\r\n", + (long)desc->port, desc->s, ifname)); + break; +#endif + default: return -1; } @@ -6463,6 +6552,9 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) # ifdef SCTP_DELAYED_ACK_TIME struct sctp_assoc_value av; /* Not in SOLARIS10 */ # endif +# ifdef SO_BINDTODEVICE + char ifname[IFNAMSIZ]; +# endif } arg; @@ -6702,6 +6794,23 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) continue; /* Option not supported -- ignore it */ # endif +#ifdef SO_BINDTODEVICE + case INET_OPT_BIND_TO_DEVICE: + arg_sz = get_int32(curr); curr += 4; + CHKLEN(curr, arg_sz); + if (arg_sz >= sizeof(arg.ifname)) + return -1; + memcpy(arg.ifname, curr, arg_sz); + arg.ifname[arg_sz] = '\0'; + curr += arg_sz; + + proto = SOL_SOCKET; + type = SO_BINDTODEVICE; + arg_ptr = (char*) (&arg.ifname); + arg_sz = sizeof ( arg.ifname); + break; +#endif + case SCTP_OPT_AUTOCLOSE: { arg.ival= get_int32 (curr); curr += 4; @@ -6967,6 +7076,9 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, ErlDrvSizeT dest_used = 0; ErlDrvSizeT dest_allocated = destlen; char *orig_dest = *dest; +#ifdef SO_BINDTODEVICE + char ifname[IFNAMSIZ]; +#endif /* Ptr is a name parameter */ #define RETURN_ERROR() \ @@ -7302,6 +7414,26 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, put_int32(arg_sz,ptr); continue; } + +#ifdef SO_BINDTODEVICE + case INET_OPT_BIND_TO_DEVICE: + arg_sz = sizeof(ifname); + TRUNCATE_TO(0,ptr); + PLACE_FOR(5 + arg_sz,ptr); + arg_ptr = ptr + 5; + if (IS_SOCKET_ERROR(sock_getopt(desc->s,SOL_SOCKET,SO_BINDTODEVICE, + arg_ptr,&arg_sz))) { + TRUNCATE_TO(0,ptr); + continue; + } + arg_sz = my_strnlen(arg_ptr, arg_sz); + TRUNCATE_TO(arg_sz + 5,ptr); + *ptr++ = opt; + put_int32(arg_sz,ptr); + ptr += arg_sz; + continue; +#endif + default: RETURN_ERROR(); } @@ -7583,6 +7715,25 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, i = LOAD_TUPLE (spec, i, 2); break; } + +#ifdef SO_BINDTODEVICE + /* The following option returns a binary: */ + case INET_OPT_BIND_TO_DEVICE: { + char ifname[IFNAMSIZ]; + unsigned int sz = sizeof(ifname); + + if (sock_getopt(desc->s, SOL_SOCKET, SO_BINDTODEVICE, + &ifname, &sz) < 0) continue; + /* Fill in the response: */ + PLACE_FOR(spec, i, + LOAD_ATOM_CNT + LOAD_BUF2BINARY_CNT + LOAD_TUPLE_CNT); + i = LOAD_ATOM (spec, i, am_bind_to_device); + i = LOAD_BUF2BINARY(spec, i, ifname, my_strnlen(ifname, sz)); + i = LOAD_TUPLE (spec, i, 2); + break; + } +#endif + /* The following options just return an integer value: */ case INET_OPT_RCVBUF : case INET_OPT_SNDBUF : @@ -8346,10 +8497,10 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) return (ErlDrvData)desc; } - -#ifndef MAXHOSTNAMELEN -#define MAXHOSTNAMELEN 256 -#endif +/* MAXHOSTNAMELEN could be 64 or 255 depending +on the platform. Instead, use INET_MAXHOSTNAMELEN +which is always 255 across all platforms */ +#define INET_MAXHOSTNAMELEN 255 /* ** common TCP/UDP/SCTP control command @@ -8526,13 +8677,14 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf, } case INET_REQ_GETHOSTNAME: { /* get host name */ - char tbuf[MAXHOSTNAMELEN]; + char tbuf[INET_MAXHOSTNAMELEN + 1]; DEBUGF(("inet_ctl(%ld): GETHOSTNAME\r\n", (long)desc->port)); if (len != 0) return ctl_error(EINVAL, rbuf, rsize); - if (IS_SOCKET_ERROR(sock_hostname(tbuf, MAXHOSTNAMELEN))) + /* gethostname requires len to be max(hostname) + 1 */ + if (IS_SOCKET_ERROR(sock_hostname(tbuf, INET_MAXHOSTNAMELEN + 1))) return ctl_error(sock_errno(), rbuf, rsize); return ctl_reply(INET_REP_OK, tbuf, strlen(tbuf), rbuf, rsize); } @@ -8585,6 +8737,7 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf, else { ptr = &peer; sz = sizeof(peer); + sys_memzero((char *) &peer, sz); if (IS_SOCKET_ERROR (sock_peer (desc->s, (struct sockaddr*)ptr, &sz))) @@ -9139,16 +9292,38 @@ static void tcp_inet_stop(ErlDrvData e) tcp_descriptor* desc = (tcp_descriptor*)e; DEBUGF(("tcp_inet_stop(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); + tcp_close_check(desc); - /* free input buffer & output buffer */ - if (desc->i_buf != NULL) - release_buffer(desc->i_buf); - desc->i_buf = NULL; /* net_mess2 may call this function recursively when - faulty messages arrive on dist ports*/ + tcp_clear_input(desc); + DEBUGF(("tcp_inet_stop(%ld) }\r\n", (long)desc->inet.port)); inet_stop(INETP(desc)); } +/* Closes a tcp descriptor without leaving things hanging; the VM keeps trying + * to flush IO queues as long as it contains anything even after the port has + * been closed from the erlang side, which is desired behavior (Think escripts + * writing to files) but pretty hopeless if the underlying fd has been set to + * INVALID_SOCKET through desc_close. + * + * This function should be used in place of desc_close/erl_inet_close in all + * TCP-related operations. Note that this only closes the desc cleanly; it + * will be freed through tcp_inet_stop later on. */ +static void tcp_desc_close(tcp_descriptor* desc) +{ +#ifdef HAVE_SENDFILE + if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE; + close(desc->sendfile.dup_file_fd); + } +#endif + + tcp_clear_input(desc); + tcp_clear_output(desc); + + erl_inet_close(INETP(desc)); +} + /* TCP requests from Erlang */ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, ErlDrvSizeT len, @@ -9393,7 +9568,7 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, case INET_REQ_CLOSE: DEBUGF(("tcp_inet_ctl(%ld): CLOSE\r\n", (long)desc->inet.port)); tcp_close_check(desc); - erl_inet_close(INETP(desc)); + tcp_desc_close(desc); return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); @@ -9481,6 +9656,60 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } } + + case TCP_REQ_SENDFILE: { +#ifdef HAVE_SENDFILE + const ErlDrvSizeT required_len = + sizeof(desc->sendfile.dup_file_fd) + + sizeof(Uint64) * 2; + + int raw_file_fd; + + DEBUGF(("tcp_inet_ctl(%ld): SENDFILE\r\n", (long)desc->inet.port)); + + if (len != required_len) { + return ctl_error(EINVAL, rbuf, rsize); + } else if (!IS_CONNECTED(INETP(desc))) { + return ctl_error(ENOTCONN, rbuf, rsize); + } + + sys_memcpy(&raw_file_fd, buf, sizeof(raw_file_fd)); + buf += sizeof(raw_file_fd); + + desc->sendfile.dup_file_fd = dup(raw_file_fd); + + if(desc->sendfile.dup_file_fd == -1) { + return ctl_error(errno, rbuf, rsize); + } + + desc->sendfile.offset = get_int64(buf); + buf += sizeof(Uint64); + + desc->sendfile.length = get_int64(buf); + buf += sizeof(Uint64); + + ASSERT(desc->sendfile.offset >= 0); + ASSERT(desc->sendfile.length >= 0); + + desc->sendfile.ioq_skip = driver_sizeq(desc->inet.port); + desc->sendfile.bytes_sent = 0; + + desc->inet.caller = driver_caller(desc->inet.port); + desc->tcp_add_flags |= TCP_ADDF_SENDFILE; + + /* See if we can finish sending without selecting & rescheduling. */ + tcp_inet_sendfile(desc); + + if(desc->sendfile.length > 0) { + sock_select(INETP(desc), FD_WRITE, 1); + } + + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); +#else + return ctl_error(ENOTSUP, rbuf, rsize); +#endif + } + default: DEBUGF(("tcp_inet_ctl(%ld): %u\r\n", (long)desc->inet.port, cmd)); return inet_ctl(INETP(desc), cmd, buf, len, rbuf, rsize); @@ -9517,7 +9746,7 @@ static void tcp_inet_timeout(ErlDrvData e) set_busy_port(desc->inet.port, 0); inet_reply_error_am(INETP(desc), am_timeout); if (desc->send_timeout_close) { - erl_inet_close(INETP(desc)); + tcp_desc_close(desc); } } else { @@ -9531,7 +9760,7 @@ static void tcp_inet_timeout(ErlDrvData e) else if ((state & INET_STATE_CONNECTING) == INET_STATE_CONNECTING) { /* assume connect timeout */ /* close the socket since it's not usable (see man pages) */ - erl_inet_close(INETP(desc)); + tcp_desc_close(desc); async_error_am(INETP(desc), am_timeout); } else if ((state & INET_STATE_ACCEPTING) == INET_STATE_ACCEPTING) { @@ -9620,12 +9849,27 @@ static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev) static void tcp_inet_flush(ErlDrvData e) { tcp_descriptor* desc = (tcp_descriptor*)e; - if (!(desc->inet.event_mask & FD_WRITE)) { - /* Discard send queue to avoid hanging port (OTP-7615) */ - tcp_clear_output(desc); + int discard_output; + + /* Discard send queue to avoid hanging port (OTP-7615) */ + discard_output = !(desc->inet.event_mask & FD_WRITE); + + discard_output |= desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO; + +#ifdef HAVE_SENDFILE + /* The old file driver aborted when it was stopped during sendfile, so + * we'll clear the flag and discard all output. */ + if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE; + close(desc->sendfile.dup_file_fd); + + discard_output = 1; + } +#endif + + if (discard_output) { + tcp_clear_output(desc); } - if (desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO) - tcp_clear_output(desc); } static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) @@ -9687,6 +9931,12 @@ static int tcp_recv_closed(tcp_descriptor* desc) set_busy_port(desc->inet.port, 0); inet_reply_error_am(INETP(desc), am_closed); DEBUGF(("tcp_recv_closed(%ld): busy reply 'closed'\r\n", port)); + } else { + /* No blocking send op to reply to right now. + * If next op is a send, make sure it returns {error,closed} + * rather than {error,enotconn}. + */ + desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND; } if (!desc->inet.active) { /* We must cancel any timer here ! */ @@ -9694,8 +9944,7 @@ static int tcp_recv_closed(tcp_descriptor* desc) /* passive mode do not terminate port ! */ tcp_clear_input(desc); if (desc->inet.exitf) { - tcp_clear_output(desc); - desc_close(INETP(desc)); + tcp_desc_close(desc); } else { desc_close_read(INETP(desc)); } @@ -9738,7 +9987,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err) driver_cancel_timer(desc->inet.port); tcp_clear_input(desc); if (desc->inet.exitf) { - desc_close(INETP(desc)); + tcp_desc_close(desc); } else { desc_close_read(INETP(desc)); } @@ -10387,9 +10636,6 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) set_busy_port(desc->inet.port, 0); } - tcp_clear_output(desc); - tcp_clear_input(desc); - /* * We used to handle "expected errors" differently from unexpected ones. * Now we handle all errors in the same way (unless the show_econnreset @@ -10410,10 +10656,10 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) if (desc->inet.exitf) driver_exit(desc->inet.port, 0); else - desc_close(INETP(desc)); + tcp_desc_close(desc); } else { tcp_close_check(desc); - erl_inet_close(INETP(desc)); + tcp_desc_close(desc); if (desc->inet.caller) { if (show_econnreset) @@ -10524,7 +10770,9 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) ev->size += h_len; } - if ((sz = driver_sizeq(ix)) > 0) { + sz = driver_sizeq(ix); + + if ((desc->tcp_add_flags & TCP_ADDF_SENDFILE) || sz > 0) { driver_enqv(ix, ev, 0); if (sz+ev->size >= desc->high) { DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n", @@ -10618,8 +10866,9 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len) inet_output_count(INETP(desc), len+h_len); + sz = driver_sizeq(ix); - if ((sz = driver_sizeq(ix)) > 0) { + if ((desc->tcp_add_flags & TCP_ADDF_SENDFILE) || sz > 0) { if (h_len > 0) driver_enq(ix, buf, h_len); driver_enq(ix, ptr, len); @@ -10709,6 +10958,246 @@ static void tcp_inet_drv_input(ErlDrvData data, ErlDrvEvent event) (void)tcp_inet_input((tcp_descriptor*)data, (HANDLE)event); } +#ifdef HAVE_SENDFILE +static int tcp_sendfile_completed(tcp_descriptor* desc) { + ErlDrvTermData spec[LOAD_PORT_CNT + LOAD_TUPLE_CNT * 2 + + LOAD_ATOM_CNT * 2 + LOAD_UINT_CNT * 2]; + Uint32 sent_low, sent_high; + int i; + + desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE; + close(desc->sendfile.dup_file_fd); + + /* While we flushed the output queue prior to sending the file, we've + * deferred clearing busy status until now as there's no point in doing so + * while we still have a file to send. + * + * The watermark is checked since more data may have been added while we + * were sending the file. */ + + if (driver_sizeq(desc->inet.port) <= desc->low) { + if (IS_BUSY(INETP(desc))) { + desc->inet.caller = desc->inet.busy_caller; + desc->inet.state &= ~INET_F_BUSY; + + set_busy_port(desc->inet.port, 0); + + /* if we have a timer then cancel and send ok to client */ + if (desc->busy_on_send) { + driver_cancel_timer(desc->inet.port); + desc->busy_on_send = 0; + } + + inet_reply_ok(INETP(desc)); + } + } + + if (driver_sizeq(desc->inet.port) == 0) { + sock_select(INETP(desc), FD_WRITE, 0); + send_empty_out_q_msgs(INETP(desc)); + + if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN) { + tcp_shutdown_async(desc); + } + } + + sent_low = ((Uint64)desc->sendfile.bytes_sent >> 0) & 0xFFFFFFFF; + sent_high = ((Uint64)desc->sendfile.bytes_sent >> 32) & 0xFFFFFFFF; + + i = LOAD_ATOM(spec, 0, am_sendfile); + i = LOAD_PORT(spec, i, desc->inet.dport); + i = LOAD_ATOM(spec, i, am_ok); + i = LOAD_UINT(spec, i, sent_low); + i = LOAD_UINT(spec, i, sent_high); + i = LOAD_TUPLE(spec, i, 3); + i = LOAD_TUPLE(spec, i, 3); + + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return erl_drv_output_term(desc->inet.dport, spec, i); +} + +static int tcp_sendfile_aborted(tcp_descriptor* desc, int socket_error) { + ErlDrvTermData spec[LOAD_PORT_CNT + LOAD_TUPLE_CNT * 2 + LOAD_ATOM_CNT * 3]; + int i; + + /* We don't clean up sendfile state here, as that's done in tcp_desc_close + * following normal error handling. All we do here is report the failure. */ + + i = LOAD_ATOM(spec, 0, am_sendfile); + i = LOAD_PORT(spec, i, desc->inet.dport); + i = LOAD_ATOM(spec, i, am_error); + + switch (socket_error) { + case ECONNRESET: + case ENOTCONN: + case EPIPE: + i = LOAD_ATOM(spec, i, am_closed); + break; + default: + i = LOAD_ATOM(spec, i, error_atom(socket_error)); + } + + i = LOAD_TUPLE(spec, i, 2); + i = LOAD_TUPLE(spec, i, 3); + + ASSERT(i == sizeof(spec)/sizeof(*spec)); + + return erl_drv_output_term(desc->inet.dport, spec, i); +} + +static int tcp_inet_sendfile(tcp_descriptor* desc) { + ErlDrvPort ix = desc->inet.port; + int result = 0; + ssize_t n; + + DEBUGF(("tcp_inet_sendfile(%ld) {s=%d\r\n", (long)ix, desc->inet.s)); + + /* If there was any data in the queue by the time sendfile was issued, + * we'll need to skip it first. Note that we don't clear busy status until + * we're finished sending the file. */ + while (desc->sendfile.ioq_skip > 0) { + ssize_t bytes_to_send; + SysIOVec* iov; + int vsize; + + ASSERT(driver_sizeq(ix) >= desc->sendfile.ioq_skip); + + if ((iov = driver_peekq(ix, &vsize)) == NULL) { + ERTS_INTERNAL_ERROR("ioq empty when sendfile.ioq_skip > 0"); + } + + bytes_to_send = MIN(desc->sendfile.ioq_skip, iov[0].iov_len); + n = sock_send(desc->inet.s, iov[0].iov_base, bytes_to_send, 0); + + if (!IS_SOCKET_ERROR(n)) { + desc->sendfile.ioq_skip -= n; + driver_deq(ix, n); + } else if (sock_errno() == ERRNO_BLOCK) { +#ifdef __WIN32__ + desc->inet.send_would_block = 1; +#endif + goto done; + } else if (sock_errno() != EINTR) { + goto socket_error; + } + } + + while (desc->sendfile.length > 0) { + /* For some reason the maximum ssize_t cannot be used as the max size. + * 1GB seems to work on all platforms */ + const Sint64 SENDFILE_CHUNK_SIZE = ((1UL << 30) - 1); + + ssize_t bytes_to_send = MIN(SENDFILE_CHUNK_SIZE, desc->sendfile.length); + off_t offset = desc->sendfile.offset; + +#if defined(__linux__) + n = sendfile(desc->inet.s, desc->sendfile.dup_file_fd, &offset, + bytes_to_send); +#elif defined(__FreeBSD__) || defined(__DragonFly__) || defined(__DARWIN__) + { + off_t bytes_sent; + int error; + + #if defined(__DARWIN__) + bytes_sent = bytes_to_send; + + error = sendfile(desc->sendfile.dup_file_fd, desc->inet.s, offset, + &bytes_sent, NULL, 0); + n = bytes_sent; + #else + error = sendfile(desc->sendfile.dup_file_fd, desc->inet.s, offset, + bytes_to_send, NULL, &bytes_sent, 0); + n = bytes_sent; + #endif + + if(error < 0) { + /* EAGAIN/EINTR report partial success by setting bytes_sent, + * so we have to skip error handling if nonzero, and skip EOF + * handling if zero, as it's possible that we didn't manage to + * send anything at all before being interrupted by a + * signal. */ + if((errno != EAGAIN && errno != EINTR) || bytes_sent == 0) { + n = -1; + } + } + } +#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV) + { + sendfilevec_t sfvec[1]; + size_t bytes_sent; + ssize_t error; + + sfvec[0].sfv_fd = desc->sendfile.dup_file_fd; + sfvec[0].sfv_len = bytes_to_send; + sfvec[0].sfv_off = offset; + sfvec[0].sfv_flag = 0; + + error = sendfilev(desc->inet.s, sfvec, 1, &bytes_sent); + n = bytes_sent; + + if(error < 0) { + if(errno == EINVAL) { + /* On some solaris versions (I've seen it on SunOS 5.10), + * using a sfv_len larger than the filesize will result in + * a (-1 && errno == EINVAL). We translate this to a + * successful send of the data.*/ + } else { + /* EAGAIN/EINTR behavior is identical to *BSD. */ + if((errno != EAGAIN && errno != EINTR) || bytes_sent == 0) { + n = -1; + } + } + } + } +#else + #error "Unsupported sendfile syscall; update configure test." +#endif + + if (n > 0) { + desc->sendfile.bytes_sent += n; + desc->sendfile.offset += n; + desc->sendfile.length -= n; + } else if (n == 0) { + /* EOF. */ + desc->sendfile.length = 0; + break; + } else if (IS_SOCKET_ERROR(n) && sock_errno() != EINTR) { + if (sock_errno() != ERRNO_BLOCK) { + goto socket_error; + } + +#ifdef __WIN32__ + desc->inet.send_would_block = 1; +#endif + break; + } + } + + if (desc->sendfile.length == 0) { + tcp_sendfile_completed(desc); + } + + goto done; + +socket_error: { + int socket_errno = sock_errno(); + + DEBUGF(("tcp_inet_sendfile(%ld): send errno = %d (errno %d)\r\n", + (long)desc->inet.port, socket_errno, errno)); + + result = tcp_send_error(desc, socket_errno); + tcp_sendfile_aborted(desc, socket_errno); + + goto done; + } + +done: + DEBUGF(("tcp_inet_sendfile(%ld) }\r\n", (long)desc->inet.port)); + return result; +} +#endif /* HAVE_SENDFILE */ + /* socket ready for ouput: ** 1. INET_STATE_CONNECTING => non block connect ? ** 2. INET_STATE_CONNECTED => write output @@ -10736,10 +11225,11 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) #ifndef SO_ERROR { - int sz = sizeof(desc->inet.remote); - int code = sock_peer(desc->inet.s, - (struct sockaddr*) &desc->inet.remote, &sz); - + int sz, code; + sz = sizeof(desc->inet.remote); + sys_memzero((char *) &desc->inet.remote, sz); + code = sock_peer(desc->inet.s, + (struct sockaddr*) &desc->inet.remote, &sz); if (IS_SOCKET_ERROR(code)) { desc->inet.state = INET_STATE_OPEN; /* restore state */ ret = async_error(INETP(desc), sock_errno()); @@ -10768,7 +11258,14 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) async_ok(INETP(desc)); } else if (IS_CONNECTED(INETP(desc))) { - for (;;) { + +#ifdef HAVE_SENDFILE + if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + return tcp_inet_sendfile(desc); + } +#endif + + for (;;) { int vsize; ssize_t n; SysIOVec* iov; @@ -11174,24 +11671,20 @@ static ErlDrvSSizeT packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, (desc->sfamily, &remote, &buf, &len)) != NULL) return ctl_xerror(xerror, rbuf, rsize); - sock_select(desc, FD_CONNECT, 1); code = sock_connect(desc->s, &remote.sa, len); if (IS_SOCKET_ERROR(code) && (sock_errno() == EINPROGRESS)) { /* XXX: Unix only -- WinSock would have a different cond! */ - desc->state = INET_STATE_CONNECTING; if (timeout != INET_INFINITY) driver_set_timer(desc->port, timeout); enq_async(desc, tbuf, INET_REQ_CONNECT); + async_ok(desc); } else if (code == 0) { /* OK we are connected */ - sock_select(desc, FD_CONNECT, 0); - desc->state = INET_STATE_CONNECTED; enq_async(desc, tbuf, INET_REQ_CONNECT); async_ok(desc); } else { - sock_select(desc, FD_CONNECT, 0); return ctl_error(sock_errno(), rbuf, rsize); } return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); @@ -11706,77 +12199,6 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) return count; } -static void packet_inet_drv_output(ErlDrvData e, ErlDrvEvent event) -{ - (void) packet_inet_output((udp_descriptor*)e, (HANDLE)event); -} - -/* UDP/SCTP socket ready for output: -** This is a Back-End for Non-Block SCTP Connect (INET_STATE_CONNECTING) -*/ -static int packet_inet_output(udp_descriptor* udesc, HANDLE event) -{ - inet_descriptor* desc = INETP(udesc); - int ret = 0; - ErlDrvPort ix = desc->port; - - DEBUGF(("packet_inet_output(%ld) {s=%d\r\n", - (long)desc->port, desc->s)); - - if (desc->state == INET_STATE_CONNECTING) { - sock_select(desc, FD_CONNECT, 0); - - driver_cancel_timer(ix); /* posssibly cancel a timer */ -#ifndef __WIN32__ - /* - * XXX This is strange. This *should* work on Windows NT too, - * but doesn't. An bug in Winsock 2.0 for Windows NT? - * - * See "Unix Netwok Programming", W.R.Stevens, p 412 for a - * discussion about Unix portability and non blocking connect. - */ - -#ifndef SO_ERROR - { - int sz = sizeof(desc->remote); - int code = sock_peer(desc->s, - (struct sockaddr*) &desc->remote, &sz); - - if (IS_SOCKET_ERROR(code)) { - desc->state = INET_STATE_OPEN; /* restore state */ - ret = async_error(desc, sock_errno()); - goto done; - } - } -#else - { - int error = 0; /* Has to be initiated, we check it */ - unsigned int sz = sizeof(error); /* even if we get -1 */ - int code = sock_getopt(desc->s, SOL_SOCKET, SO_ERROR, - (void *)&error, &sz); - - if ((code < 0) || error) { - desc->state = INET_STATE_OPEN; /* restore state */ - ret = async_error(desc, error); - goto done; - } - } -#endif /* SO_ERROR */ -#endif /* !__WIN32__ */ - - desc->state = INET_STATE_CONNECTED; - async_ok(desc); - } - else { - sock_select(desc,FD_CONNECT,0); - - DEBUGF(("packet_inet_output(%ld): bad state: %04x\r\n", - (long)desc->port, desc->state)); - } - done: - DEBUGF(("packet_inet_output(%ld) }\r\n", (long)desc->port)); - return ret; -} #endif /*---------------------------------------------------------------------------*/ |