aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/drivers/common/inet_drv.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/drivers/common/inet_drv.c')
-rw-r--r--erts/emulator/drivers/common/inet_drv.c710
1 files changed, 566 insertions, 144 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 3029958f26..5e9afdc5ca 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1997-2016. All Rights Reserved.
+ * Copyright Ericsson AB 1997-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -63,6 +63,20 @@
#include <sys/un.h>
#endif
+#ifdef HAVE_SENDFILE
+#if defined(__linux__) || (defined(__sun) && defined(__SVR4))
+ #include <sys/sendfile.h>
+#elif defined(__FreeBSD__) || defined(__DragonFly__)
+ /* Need to define __BSD_VISIBLE in order to expose prototype of sendfile */
+ #define __BSD_VISIBLE 1
+ #include <sys/socket.h>
+#endif
+#endif
+
+#if defined(__APPLE__) && defined(__MACH__) && !defined(__DARWIN__)
+ #define __DARWIN__ 1
+#endif
+
/* All platforms fail on malloc errors. */
#define FATAL_MALLOC
@@ -591,7 +605,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
(((unsigned char*) (s))[1] << 8) | \
(((unsigned char*) (s))[0]))
-#ifdef HAVE_SYS_UN_H
+#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
/* strnlen doesn't exist everywhere */
static size_t my_strnlen(const char *s, size_t maxlen)
@@ -602,14 +616,6 @@ static size_t my_strnlen(const char *s, size_t maxlen)
return i;
}
-/* Check that some character in the buffer != '\0' */
-static int is_nonzero(const char *s, size_t n)
-{
- size_t i;
- for (i = 0; i < n; i++) if (s[i] != '\0') return !0;
- return 0;
-}
-
#endif
#ifdef VALGRIND
@@ -709,6 +715,7 @@ static int is_nonzero(const char *s, size_t n)
#define TCP_REQ_RECV 42
#define TCP_REQ_UNRECV 43
#define TCP_REQ_SHUTDOWN 44
+#define TCP_REQ_SENDFILE 45
/* UDP and SCTP requests */
#define PACKET_REQ_RECV 60 /* Common for UDP and SCTP */
/* #define SCTP_REQ_LISTEN 61 MERGED Different from TCP; not for UDP */
@@ -728,9 +735,10 @@ static int is_nonzero(const char *s, size_t n)
#define TCP_ADDF_PENDING_SHUTDOWN \
(TCP_ADDF_PENDING_SHUT_WR | TCP_ADDF_PENDING_SHUT_RDWR)
#define TCP_ADDF_SHOW_ECONNRESET 64 /* Tell user about incoming RST */
-#define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occured on send or shutdown */
+#define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occurred on send or shutdown */
#define TCP_ADDF_SHUTDOWN_WR_DONE 256 /* A shutdown(sock, SHUT_WR) or SHUT_RDWR was made */
#define TCP_ADDF_LINGER_ZERO 512 /* Discard driver queue on port close */
+#define TCP_ADDF_SENDFILE 1024 /* Send from an fd instead of the driver queue */
/* *_REQ_* replies */
#define INET_REP_ERROR 0
@@ -778,6 +786,7 @@ static int is_nonzero(const char *s, size_t n)
#define INET_LOPT_TCP_SHOW_ECONNRESET 39 /* tell user about incoming RST */
#define INET_LOPT_LINE_DELIM 40 /* Line delimiting char */
#define INET_OPT_TCLASS 41 /* IPv6 transport class */
+#define INET_OPT_BIND_TO_DEVICE 42 /* get/set network device the socket is bound to */
/* SCTP options: a separate range, from 100: */
#define SCTP_OPT_RTOINFO 100
#define SCTP_OPT_ASSOCINFO 101
@@ -1144,7 +1153,6 @@ static int packet_inet_init(void);
static void packet_inet_stop(ErlDrvData);
static void packet_inet_command(ErlDrvData, char*, ErlDrvSizeT);
static void packet_inet_drv_input(ErlDrvData data, ErlDrvEvent event);
-static void packet_inet_drv_output(ErlDrvData data, ErlDrvEvent event);
static ErlDrvData udp_inet_start(ErlDrvPort, char* command);
#ifdef HAVE_SCTP
static ErlDrvData sctp_inet_start(ErlDrvPort, char* command);
@@ -1169,7 +1177,7 @@ static struct erl_drv_entry udp_inet_driver_entry =
NULL,
#else
packet_inet_drv_input,
- packet_inet_drv_output,
+ NULL,
#endif
"udp_inet",
NULL,
@@ -1204,7 +1212,7 @@ static struct erl_drv_entry sctp_inet_driver_entry =
NULL,
#else
packet_inet_drv_input,
- packet_inet_drv_output,
+ NULL,
#endif
"sctp_inet",
NULL,
@@ -1244,6 +1252,21 @@ typedef struct {
inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */
inet_async_multi_op *multi_last;
MultiTimerData *mtd; /* Timer structures for multiple accept */
+#ifdef HAVE_SENDFILE
+ struct {
+ ErlDrvSizeT ioq_skip; /* The number of bytes in the queue at the time
+ * sendfile was issued, which must be sent
+ * before issuing the sendfile call itself. */
+ int dup_file_fd; /* The file handle to send from; this is
+ * duplicated when sendfile is issued to
+ * reduce (but not eliminate) the impact of a
+ * nasty race, so we have to remember to close
+ * it. */
+ Uint64 bytes_sent;
+ Uint64 offset;
+ Uint64 length;
+ } sendfile;
+#endif
} tcp_descriptor;
/* send function */
@@ -1254,9 +1277,13 @@ static int tcp_deliver(tcp_descriptor* desc, int len);
static int tcp_shutdown_error(tcp_descriptor* desc, int err);
+static int tcp_inet_sendfile(tcp_descriptor* desc);
+
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event);
static int tcp_inet_input(tcp_descriptor* desc, HANDLE event);
+static void tcp_desc_close(tcp_descriptor*);
+
#ifdef HAVE_UDP
typedef struct {
inet_descriptor inet; /* common data structure (DON'T MOVE) */
@@ -1268,7 +1295,6 @@ typedef struct {
static int packet_inet_input(udp_descriptor* udesc, HANDLE event);
-static int packet_inet_output(udp_descriptor* udesc, HANDLE event);
#endif
/* convert descriptor pointer to inet_descriptor pointer */
@@ -1334,6 +1360,10 @@ static ErlDrvTermData am_tos;
static ErlDrvTermData am_tclass;
static ErlDrvTermData am_ipv6_v6only;
static ErlDrvTermData am_netns;
+static ErlDrvTermData am_bind_to_device;
+#endif
+#ifdef HAVE_SENDFILE
+static ErlDrvTermData am_sendfile;
#endif
static char str_eafnosupport[] = "eafnosupport";
@@ -2203,13 +2233,16 @@ static int inet_reply_ok(inet_descriptor* desc)
ErlDrvTermData caller = desc->caller;
int i = 0;
+ desc->caller = 0;
+ if (is_not_internal_pid(caller))
+ return 0;
+
i = LOAD_ATOM(spec, i, am_inet_reply);
i = LOAD_PORT(spec, i, desc->dport);
i = LOAD_ATOM(spec, i, am_ok);
i = LOAD_TUPLE(spec, i, 3);
ASSERT(i == sizeof(spec)/sizeof(*spec));
- desc->caller = 0;
return erl_drv_send_term(desc->dport, caller, spec, i);
}
@@ -3725,6 +3758,7 @@ static void inet_init_sctp(void) {
INIT_ATOM(tclass);
INIT_ATOM(ipv6_v6only);
INIT_ATOM(netns);
+ INIT_ATOM(bind_to_device);
/* Option names */
INIT_ATOM(sctp_rtoinfo);
@@ -3877,6 +3911,10 @@ static int inet_init()
INIT_ATOM(https);
INIT_ATOM(scheme);
+#ifdef HAVE_SENDFILE
+ INIT_ATOM(sendfile);
+#endif
+
/* add TCP, UDP and SCTP drivers */
add_driver_entry(&tcp_inet_driver_entry);
#ifdef HAVE_UDP
@@ -4018,13 +4056,30 @@ static char* inet_set_address(int family, inet_address* dst,
int n;
if (*len == 0) return str_einval;
n = *((unsigned char*)(*src)); /* Length field */
- if ((*len < 1+n) || (sizeof(dst->sal.sun_path) < n+1)) {
+ if (*len < 1+n) return str_einval;
+ if (n +
+#ifdef __linux__
+ /* Make sure the address gets zero terminated
+ * except when the first byte is \0 because then it is
+ * sort of zero terminated although the zero termination
+ * comes before the address...
+ * This fix handles Linux's nonportable
+ * abstract socket address extension.
+ */
+ ((*len) > 1 && (*src)[1] == '\0' ? 0 : 1)
+#else
+ 1
+#endif
+ > sizeof(dst->sal.sun_path)) {
return str_einval;
}
sys_memzero((char*)dst, sizeof(struct sockaddr_un));
dst->sal.sun_family = family;
sys_memcpy(dst->sal.sun_path, (*src)+1, n);
*len = offsetof(struct sockaddr_un, sun_path) + n;
+#ifndef NO_SA_LEN
+ dst->sal.sun_len = *len;
+#endif
*src += 1 + n;
return NULL;
}
@@ -4132,8 +4187,8 @@ static char *inet_set_faddress(int family, inet_address* dst,
/* Get a inaddr structure
** src = inaddr structure
-** *len is the lenght of structure
** dst is filled with [F,P1,P0,X1,....]
+** *len is the length of structure
** where F is the family code (coded)
** and *len is the length of dst on return
** (suitable to deliver to erlang)
@@ -4169,15 +4224,16 @@ static int inet_get_address(char* dst, inet_address* src, unsigned int* len)
if (*len < offsetof(struct sockaddr_un, sun_path)) return -1;
n = *len - offsetof(struct sockaddr_un, sun_path);
if (255 < n) return -1;
- /* Portability fix: Assume that the address is a zero terminated
- * string, except when the first byte is \0 i.e the
- * string length is 0. Then use the reported length instead.
- * This fix handles Linux's abstract socket address
- * nonportable extension.
- */
m = my_strnlen(src->sal.sun_path, n);
- if ((m == 0) && is_nonzero(src->sal.sun_path, n))
- m = n;
+#ifdef __linux__
+ /* Assume that the address is a zero terminated string,
+ * except when the first byte is \0 i.e the string length is 0,
+ * then use the reported length instead.
+ * This fix handles Linux's nonportable
+ * abstract socket address extension.
+ */
+ if (m == 0) m = n;
+#endif
dst[0] = INET_AF_LOCAL;
dst[1] = (char) ((unsigned char) m);
sys_memcpy(dst+2, src->sal.sun_path, m);
@@ -4234,15 +4290,16 @@ inet_address_to_erlang(char *dst, inet_address **src, SOCKLEN_T sz) {
if (sz < offsetof(struct sockaddr_un, sun_path)) return -1;
n = sz - offsetof(struct sockaddr_un, sun_path);
if (255 < n) return -1;
- /* Portability fix: Assume that the address is a zero terminated
- * string, except when the first byte is \0 i.e the
- * string length is 0. Then use the reported length instead.
- * This fix handles Linux's abstract socket address
- * nonportable extension.
- */
m = my_strnlen((*src)->sal.sun_path, n);
- if ((m == 0) && is_nonzero((*src)->sal.sun_path, n))
- m = n;
+#ifdef __linux__
+ /* Assume that the address is a zero terminated string,
+ * except when the first byte is \0 i.e the string length is 0,
+ * Then use the reported length instead.
+ * This fix handles Linux's nonportable
+ * abstract socket address extension.
+ */
+ if (m == 0) m = n;
+#endif
if (dst) {
dst[0] = INET_AF_LOCAL;
dst[1] = (char) ((unsigned char) m);
@@ -4320,6 +4377,12 @@ static void desc_close(inet_descriptor* desc)
desc->event = INVALID_EVENT; /* closed by stop_select callback */
desc->s = INVALID_SOCKET;
desc->event_mask = 0;
+
+ /* mark as disconnected in case when socket is left lingering due to
+ * {exit_on_close, false} option in gen_tcp socket creation. Next
+ * write to socket should produce {error, enotconn} and send a
+ * message {tcp_error,#Port<>,econnreset} */
+ desc->state &= ~INET_STATE_CONNECTED;
}
}
@@ -5946,6 +6009,9 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
int ival;
char* arg_ptr;
int arg_sz;
+#ifdef SO_BINDTODEVICE
+ char ifname[IFNAMSIZ];
+#endif
enum PacketParseType old_htype = desc->htype;
int old_active = desc->active;
int propagate; /* Set to 1 if failure to set this option
@@ -6331,6 +6397,29 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
len -= arg_sz;
break;
+#ifdef SO_BINDTODEVICE
+ case INET_OPT_BIND_TO_DEVICE:
+ if (ival < 0) return -1;
+ if (len < ival) return -1;
+ if (ival > sizeof(ifname)) {
+ return -1;
+ }
+ memcpy(ifname, ptr, ival);
+ ifname[ival] = '\0';
+ ptr += ival;
+ len -= ival;
+
+ proto = SOL_SOCKET;
+ type = SO_BINDTODEVICE;
+ arg_ptr = (char*)&ifname;
+ arg_sz = sizeof(ifname);
+ propagate = 1; /* We do want to know if this fails */
+
+ DEBUGF(("inet_set_opts(%ld): s=%d, SO_BINDTODEVICE=%s\r\n",
+ (long)desc->port, desc->s, ifname));
+ break;
+#endif
+
default:
return -1;
}
@@ -6463,6 +6552,9 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len)
# ifdef SCTP_DELAYED_ACK_TIME
struct sctp_assoc_value av; /* Not in SOLARIS10 */
# endif
+# ifdef SO_BINDTODEVICE
+ char ifname[IFNAMSIZ];
+# endif
}
arg;
@@ -6702,6 +6794,23 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len)
continue; /* Option not supported -- ignore it */
# endif
+#ifdef SO_BINDTODEVICE
+ case INET_OPT_BIND_TO_DEVICE:
+ arg_sz = get_int32(curr); curr += 4;
+ CHKLEN(curr, arg_sz);
+ if (arg_sz >= sizeof(arg.ifname))
+ return -1;
+ memcpy(arg.ifname, curr, arg_sz);
+ arg.ifname[arg_sz] = '\0';
+ curr += arg_sz;
+
+ proto = SOL_SOCKET;
+ type = SO_BINDTODEVICE;
+ arg_ptr = (char*) (&arg.ifname);
+ arg_sz = sizeof ( arg.ifname);
+ break;
+#endif
+
case SCTP_OPT_AUTOCLOSE:
{
arg.ival= get_int32 (curr); curr += 4;
@@ -6967,6 +7076,9 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
ErlDrvSizeT dest_used = 0;
ErlDrvSizeT dest_allocated = destlen;
char *orig_dest = *dest;
+#ifdef SO_BINDTODEVICE
+ char ifname[IFNAMSIZ];
+#endif
/* Ptr is a name parameter */
#define RETURN_ERROR() \
@@ -7302,6 +7414,26 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
put_int32(arg_sz,ptr);
continue;
}
+
+#ifdef SO_BINDTODEVICE
+ case INET_OPT_BIND_TO_DEVICE:
+ arg_sz = sizeof(ifname);
+ TRUNCATE_TO(0,ptr);
+ PLACE_FOR(5 + arg_sz,ptr);
+ arg_ptr = ptr + 5;
+ if (IS_SOCKET_ERROR(sock_getopt(desc->s,SOL_SOCKET,SO_BINDTODEVICE,
+ arg_ptr,&arg_sz))) {
+ TRUNCATE_TO(0,ptr);
+ continue;
+ }
+ arg_sz = my_strnlen(arg_ptr, arg_sz);
+ TRUNCATE_TO(arg_sz + 5,ptr);
+ *ptr++ = opt;
+ put_int32(arg_sz,ptr);
+ ptr += arg_sz;
+ continue;
+#endif
+
default:
RETURN_ERROR();
}
@@ -7583,6 +7715,25 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc,
i = LOAD_TUPLE (spec, i, 2);
break;
}
+
+#ifdef SO_BINDTODEVICE
+ /* The following option returns a binary: */
+ case INET_OPT_BIND_TO_DEVICE: {
+ char ifname[IFNAMSIZ];
+ unsigned int sz = sizeof(ifname);
+
+ if (sock_getopt(desc->s, SOL_SOCKET, SO_BINDTODEVICE,
+ &ifname, &sz) < 0) continue;
+ /* Fill in the response: */
+ PLACE_FOR(spec, i,
+ LOAD_ATOM_CNT + LOAD_BUF2BINARY_CNT + LOAD_TUPLE_CNT);
+ i = LOAD_ATOM (spec, i, am_bind_to_device);
+ i = LOAD_BUF2BINARY(spec, i, ifname, my_strnlen(ifname, sz));
+ i = LOAD_TUPLE (spec, i, 2);
+ break;
+ }
+#endif
+
/* The following options just return an integer value: */
case INET_OPT_RCVBUF :
case INET_OPT_SNDBUF :
@@ -8346,10 +8497,10 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
return (ErlDrvData)desc;
}
-
-#ifndef MAXHOSTNAMELEN
-#define MAXHOSTNAMELEN 256
-#endif
+/* MAXHOSTNAMELEN could be 64 or 255 depending
+on the platform. Instead, use INET_MAXHOSTNAMELEN
+which is always 255 across all platforms */
+#define INET_MAXHOSTNAMELEN 255
/*
** common TCP/UDP/SCTP control command
@@ -8526,13 +8677,14 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
}
case INET_REQ_GETHOSTNAME: { /* get host name */
- char tbuf[MAXHOSTNAMELEN];
+ char tbuf[INET_MAXHOSTNAMELEN + 1];
DEBUGF(("inet_ctl(%ld): GETHOSTNAME\r\n", (long)desc->port));
if (len != 0)
return ctl_error(EINVAL, rbuf, rsize);
- if (IS_SOCKET_ERROR(sock_hostname(tbuf, MAXHOSTNAMELEN)))
+ /* gethostname requires len to be max(hostname) + 1 */
+ if (IS_SOCKET_ERROR(sock_hostname(tbuf, INET_MAXHOSTNAMELEN + 1)))
return ctl_error(sock_errno(), rbuf, rsize);
return ctl_reply(INET_REP_OK, tbuf, strlen(tbuf), rbuf, rsize);
}
@@ -8585,6 +8737,7 @@ static ErlDrvSSizeT inet_ctl(inet_descriptor* desc, int cmd, char* buf,
else {
ptr = &peer;
sz = sizeof(peer);
+ sys_memzero((char *) &peer, sz);
if (IS_SOCKET_ERROR
(sock_peer
(desc->s, (struct sockaddr*)ptr, &sz)))
@@ -9139,16 +9292,38 @@ static void tcp_inet_stop(ErlDrvData e)
tcp_descriptor* desc = (tcp_descriptor*)e;
DEBUGF(("tcp_inet_stop(%ld) {s=%d\r\n",
(long)desc->inet.port, desc->inet.s));
+
tcp_close_check(desc);
- /* free input buffer & output buffer */
- if (desc->i_buf != NULL)
- release_buffer(desc->i_buf);
- desc->i_buf = NULL; /* net_mess2 may call this function recursively when
- faulty messages arrive on dist ports*/
+ tcp_clear_input(desc);
+
DEBUGF(("tcp_inet_stop(%ld) }\r\n", (long)desc->inet.port));
inet_stop(INETP(desc));
}
+/* Closes a tcp descriptor without leaving things hanging; the VM keeps trying
+ * to flush IO queues as long as it contains anything even after the port has
+ * been closed from the erlang side, which is desired behavior (Think escripts
+ * writing to files) but pretty hopeless if the underlying fd has been set to
+ * INVALID_SOCKET through desc_close.
+ *
+ * This function should be used in place of desc_close/erl_inet_close in all
+ * TCP-related operations. Note that this only closes the desc cleanly; it
+ * will be freed through tcp_inet_stop later on. */
+static void tcp_desc_close(tcp_descriptor* desc)
+{
+#ifdef HAVE_SENDFILE
+ if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) {
+ desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE;
+ close(desc->sendfile.dup_file_fd);
+ }
+#endif
+
+ tcp_clear_input(desc);
+ tcp_clear_output(desc);
+
+ erl_inet_close(INETP(desc));
+}
+
/* TCP requests from Erlang */
static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
char* buf, ErlDrvSizeT len,
@@ -9393,7 +9568,7 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
case INET_REQ_CLOSE:
DEBUGF(("tcp_inet_ctl(%ld): CLOSE\r\n", (long)desc->inet.port));
tcp_close_check(desc);
- erl_inet_close(INETP(desc));
+ tcp_desc_close(desc);
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
@@ -9481,6 +9656,60 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
}
}
+
+ case TCP_REQ_SENDFILE: {
+#ifdef HAVE_SENDFILE
+ const ErlDrvSizeT required_len =
+ sizeof(desc->sendfile.dup_file_fd) +
+ sizeof(Uint64) * 2;
+
+ int raw_file_fd;
+
+ DEBUGF(("tcp_inet_ctl(%ld): SENDFILE\r\n", (long)desc->inet.port));
+
+ if (len != required_len) {
+ return ctl_error(EINVAL, rbuf, rsize);
+ } else if (!IS_CONNECTED(INETP(desc))) {
+ return ctl_error(ENOTCONN, rbuf, rsize);
+ }
+
+ sys_memcpy(&raw_file_fd, buf, sizeof(raw_file_fd));
+ buf += sizeof(raw_file_fd);
+
+ desc->sendfile.dup_file_fd = dup(raw_file_fd);
+
+ if(desc->sendfile.dup_file_fd == -1) {
+ return ctl_error(errno, rbuf, rsize);
+ }
+
+ desc->sendfile.offset = get_int64(buf);
+ buf += sizeof(Uint64);
+
+ desc->sendfile.length = get_int64(buf);
+ buf += sizeof(Uint64);
+
+ ASSERT(desc->sendfile.offset >= 0);
+ ASSERT(desc->sendfile.length >= 0);
+
+ desc->sendfile.ioq_skip = driver_sizeq(desc->inet.port);
+ desc->sendfile.bytes_sent = 0;
+
+ desc->inet.caller = driver_caller(desc->inet.port);
+ desc->tcp_add_flags |= TCP_ADDF_SENDFILE;
+
+ /* See if we can finish sending without selecting & rescheduling. */
+ tcp_inet_sendfile(desc);
+
+ if(desc->sendfile.length > 0) {
+ sock_select(INETP(desc), FD_WRITE, 1);
+ }
+
+ return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
+#else
+ return ctl_error(ENOTSUP, rbuf, rsize);
+#endif
+ }
+
default:
DEBUGF(("tcp_inet_ctl(%ld): %u\r\n", (long)desc->inet.port, cmd));
return inet_ctl(INETP(desc), cmd, buf, len, rbuf, rsize);
@@ -9517,7 +9746,7 @@ static void tcp_inet_timeout(ErlDrvData e)
set_busy_port(desc->inet.port, 0);
inet_reply_error_am(INETP(desc), am_timeout);
if (desc->send_timeout_close) {
- erl_inet_close(INETP(desc));
+ tcp_desc_close(desc);
}
}
else {
@@ -9531,7 +9760,7 @@ static void tcp_inet_timeout(ErlDrvData e)
else if ((state & INET_STATE_CONNECTING) == INET_STATE_CONNECTING) {
/* assume connect timeout */
/* close the socket since it's not usable (see man pages) */
- erl_inet_close(INETP(desc));
+ tcp_desc_close(desc);
async_error_am(INETP(desc), am_timeout);
}
else if ((state & INET_STATE_ACCEPTING) == INET_STATE_ACCEPTING) {
@@ -9620,12 +9849,27 @@ static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
static void tcp_inet_flush(ErlDrvData e)
{
tcp_descriptor* desc = (tcp_descriptor*)e;
- if (!(desc->inet.event_mask & FD_WRITE)) {
- /* Discard send queue to avoid hanging port (OTP-7615) */
- tcp_clear_output(desc);
+ int discard_output;
+
+ /* Discard send queue to avoid hanging port (OTP-7615) */
+ discard_output = !(desc->inet.event_mask & FD_WRITE);
+
+ discard_output |= desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO;
+
+#ifdef HAVE_SENDFILE
+ /* The old file driver aborted when it was stopped during sendfile, so
+ * we'll clear the flag and discard all output. */
+ if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) {
+ desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE;
+ close(desc->sendfile.dup_file_fd);
+
+ discard_output = 1;
+ }
+#endif
+
+ if (discard_output) {
+ tcp_clear_output(desc);
}
- if (desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO)
- tcp_clear_output(desc);
}
static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp)
@@ -9687,6 +9931,12 @@ static int tcp_recv_closed(tcp_descriptor* desc)
set_busy_port(desc->inet.port, 0);
inet_reply_error_am(INETP(desc), am_closed);
DEBUGF(("tcp_recv_closed(%ld): busy reply 'closed'\r\n", port));
+ } else {
+ /* No blocking send op to reply to right now.
+ * If next op is a send, make sure it returns {error,closed}
+ * rather than {error,enotconn}.
+ */
+ desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND;
}
if (!desc->inet.active) {
/* We must cancel any timer here ! */
@@ -9694,8 +9944,7 @@ static int tcp_recv_closed(tcp_descriptor* desc)
/* passive mode do not terminate port ! */
tcp_clear_input(desc);
if (desc->inet.exitf) {
- tcp_clear_output(desc);
- desc_close(INETP(desc));
+ tcp_desc_close(desc);
} else {
desc_close_read(INETP(desc));
}
@@ -9738,7 +9987,7 @@ static int tcp_recv_error(tcp_descriptor* desc, int err)
driver_cancel_timer(desc->inet.port);
tcp_clear_input(desc);
if (desc->inet.exitf) {
- desc_close(INETP(desc));
+ tcp_desc_close(desc);
} else {
desc_close_read(INETP(desc));
}
@@ -10387,9 +10636,6 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
set_busy_port(desc->inet.port, 0);
}
- tcp_clear_output(desc);
- tcp_clear_input(desc);
-
/*
* We used to handle "expected errors" differently from unexpected ones.
* Now we handle all errors in the same way (unless the show_econnreset
@@ -10410,10 +10656,10 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
if (desc->inet.exitf)
driver_exit(desc->inet.port, 0);
else
- desc_close(INETP(desc));
+ tcp_desc_close(desc);
} else {
tcp_close_check(desc);
- erl_inet_close(INETP(desc));
+ tcp_desc_close(desc);
if (desc->inet.caller) {
if (show_econnreset)
@@ -10524,7 +10770,9 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
ev->size += h_len;
}
- if ((sz = driver_sizeq(ix)) > 0) {
+ sz = driver_sizeq(ix);
+
+ if ((desc->tcp_add_flags & TCP_ADDF_SENDFILE) || sz > 0) {
driver_enqv(ix, ev, 0);
if (sz+ev->size >= desc->high) {
DEBUGF(("tcp_sendv(%ld): s=%d, sender forced busy\r\n",
@@ -10618,8 +10866,9 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len)
inet_output_count(INETP(desc), len+h_len);
+ sz = driver_sizeq(ix);
- if ((sz = driver_sizeq(ix)) > 0) {
+ if ((desc->tcp_add_flags & TCP_ADDF_SENDFILE) || sz > 0) {
if (h_len > 0)
driver_enq(ix, buf, h_len);
driver_enq(ix, ptr, len);
@@ -10709,6 +10958,246 @@ static void tcp_inet_drv_input(ErlDrvData data, ErlDrvEvent event)
(void)tcp_inet_input((tcp_descriptor*)data, (HANDLE)event);
}
+#ifdef HAVE_SENDFILE
+static int tcp_sendfile_completed(tcp_descriptor* desc) {
+ ErlDrvTermData spec[LOAD_PORT_CNT + LOAD_TUPLE_CNT * 2 +
+ LOAD_ATOM_CNT * 2 + LOAD_UINT_CNT * 2];
+ Uint32 sent_low, sent_high;
+ int i;
+
+ desc->tcp_add_flags &= ~TCP_ADDF_SENDFILE;
+ close(desc->sendfile.dup_file_fd);
+
+ /* While we flushed the output queue prior to sending the file, we've
+ * deferred clearing busy status until now as there's no point in doing so
+ * while we still have a file to send.
+ *
+ * The watermark is checked since more data may have been added while we
+ * were sending the file. */
+
+ if (driver_sizeq(desc->inet.port) <= desc->low) {
+ if (IS_BUSY(INETP(desc))) {
+ desc->inet.caller = desc->inet.busy_caller;
+ desc->inet.state &= ~INET_F_BUSY;
+
+ set_busy_port(desc->inet.port, 0);
+
+ /* if we have a timer then cancel and send ok to client */
+ if (desc->busy_on_send) {
+ driver_cancel_timer(desc->inet.port);
+ desc->busy_on_send = 0;
+ }
+
+ inet_reply_ok(INETP(desc));
+ }
+ }
+
+ if (driver_sizeq(desc->inet.port) == 0) {
+ sock_select(INETP(desc), FD_WRITE, 0);
+ send_empty_out_q_msgs(INETP(desc));
+
+ if (desc->tcp_add_flags & TCP_ADDF_PENDING_SHUTDOWN) {
+ tcp_shutdown_async(desc);
+ }
+ }
+
+ sent_low = ((Uint64)desc->sendfile.bytes_sent >> 0) & 0xFFFFFFFF;
+ sent_high = ((Uint64)desc->sendfile.bytes_sent >> 32) & 0xFFFFFFFF;
+
+ i = LOAD_ATOM(spec, 0, am_sendfile);
+ i = LOAD_PORT(spec, i, desc->inet.dport);
+ i = LOAD_ATOM(spec, i, am_ok);
+ i = LOAD_UINT(spec, i, sent_low);
+ i = LOAD_UINT(spec, i, sent_high);
+ i = LOAD_TUPLE(spec, i, 3);
+ i = LOAD_TUPLE(spec, i, 3);
+
+ ASSERT(i == sizeof(spec)/sizeof(*spec));
+
+ return erl_drv_output_term(desc->inet.dport, spec, i);
+}
+
+static int tcp_sendfile_aborted(tcp_descriptor* desc, int socket_error) {
+ ErlDrvTermData spec[LOAD_PORT_CNT + LOAD_TUPLE_CNT * 2 + LOAD_ATOM_CNT * 3];
+ int i;
+
+ /* We don't clean up sendfile state here, as that's done in tcp_desc_close
+ * following normal error handling. All we do here is report the failure. */
+
+ i = LOAD_ATOM(spec, 0, am_sendfile);
+ i = LOAD_PORT(spec, i, desc->inet.dport);
+ i = LOAD_ATOM(spec, i, am_error);
+
+ switch (socket_error) {
+ case ECONNRESET:
+ case ENOTCONN:
+ case EPIPE:
+ i = LOAD_ATOM(spec, i, am_closed);
+ break;
+ default:
+ i = LOAD_ATOM(spec, i, error_atom(socket_error));
+ }
+
+ i = LOAD_TUPLE(spec, i, 2);
+ i = LOAD_TUPLE(spec, i, 3);
+
+ ASSERT(i == sizeof(spec)/sizeof(*spec));
+
+ return erl_drv_output_term(desc->inet.dport, spec, i);
+}
+
+static int tcp_inet_sendfile(tcp_descriptor* desc) {
+ ErlDrvPort ix = desc->inet.port;
+ int result = 0;
+ ssize_t n;
+
+ DEBUGF(("tcp_inet_sendfile(%ld) {s=%d\r\n", (long)ix, desc->inet.s));
+
+ /* If there was any data in the queue by the time sendfile was issued,
+ * we'll need to skip it first. Note that we don't clear busy status until
+ * we're finished sending the file. */
+ while (desc->sendfile.ioq_skip > 0) {
+ ssize_t bytes_to_send;
+ SysIOVec* iov;
+ int vsize;
+
+ ASSERT(driver_sizeq(ix) >= desc->sendfile.ioq_skip);
+
+ if ((iov = driver_peekq(ix, &vsize)) == NULL) {
+ ERTS_INTERNAL_ERROR("ioq empty when sendfile.ioq_skip > 0");
+ }
+
+ bytes_to_send = MIN(desc->sendfile.ioq_skip, iov[0].iov_len);
+ n = sock_send(desc->inet.s, iov[0].iov_base, bytes_to_send, 0);
+
+ if (!IS_SOCKET_ERROR(n)) {
+ desc->sendfile.ioq_skip -= n;
+ driver_deq(ix, n);
+ } else if (sock_errno() == ERRNO_BLOCK) {
+#ifdef __WIN32__
+ desc->inet.send_would_block = 1;
+#endif
+ goto done;
+ } else if (sock_errno() != EINTR) {
+ goto socket_error;
+ }
+ }
+
+ while (desc->sendfile.length > 0) {
+ /* For some reason the maximum ssize_t cannot be used as the max size.
+ * 1GB seems to work on all platforms */
+ const Sint64 SENDFILE_CHUNK_SIZE = ((1UL << 30) - 1);
+
+ ssize_t bytes_to_send = MIN(SENDFILE_CHUNK_SIZE, desc->sendfile.length);
+ off_t offset = desc->sendfile.offset;
+
+#if defined(__linux__)
+ n = sendfile(desc->inet.s, desc->sendfile.dup_file_fd, &offset,
+ bytes_to_send);
+#elif defined(__FreeBSD__) || defined(__DragonFly__) || defined(__DARWIN__)
+ {
+ off_t bytes_sent;
+ int error;
+
+ #if defined(__DARWIN__)
+ bytes_sent = bytes_to_send;
+
+ error = sendfile(desc->sendfile.dup_file_fd, desc->inet.s, offset,
+ &bytes_sent, NULL, 0);
+ n = bytes_sent;
+ #else
+ error = sendfile(desc->sendfile.dup_file_fd, desc->inet.s, offset,
+ bytes_to_send, NULL, &bytes_sent, 0);
+ n = bytes_sent;
+ #endif
+
+ if(error < 0) {
+ /* EAGAIN/EINTR report partial success by setting bytes_sent,
+ * so we have to skip error handling if nonzero, and skip EOF
+ * handling if zero, as it's possible that we didn't manage to
+ * send anything at all before being interrupted by a
+ * signal. */
+ if((errno != EAGAIN && errno != EINTR) || bytes_sent == 0) {
+ n = -1;
+ }
+ }
+ }
+#elif defined(__sun) && defined(__SVR4) && defined(HAVE_SENDFILEV)
+ {
+ sendfilevec_t sfvec[1];
+ size_t bytes_sent;
+ ssize_t error;
+
+ sfvec[0].sfv_fd = desc->sendfile.dup_file_fd;
+ sfvec[0].sfv_len = bytes_to_send;
+ sfvec[0].sfv_off = offset;
+ sfvec[0].sfv_flag = 0;
+
+ error = sendfilev(desc->inet.s, sfvec, 1, &bytes_sent);
+ n = bytes_sent;
+
+ if(error < 0) {
+ if(errno == EINVAL) {
+ /* On some solaris versions (I've seen it on SunOS 5.10),
+ * using a sfv_len larger than the filesize will result in
+ * a (-1 && errno == EINVAL). We translate this to a
+ * successful send of the data.*/
+ } else {
+ /* EAGAIN/EINTR behavior is identical to *BSD. */
+ if((errno != EAGAIN && errno != EINTR) || bytes_sent == 0) {
+ n = -1;
+ }
+ }
+ }
+ }
+#else
+ #error "Unsupported sendfile syscall; update configure test."
+#endif
+
+ if (n > 0) {
+ desc->sendfile.bytes_sent += n;
+ desc->sendfile.offset += n;
+ desc->sendfile.length -= n;
+ } else if (n == 0) {
+ /* EOF. */
+ desc->sendfile.length = 0;
+ break;
+ } else if (IS_SOCKET_ERROR(n) && sock_errno() != EINTR) {
+ if (sock_errno() != ERRNO_BLOCK) {
+ goto socket_error;
+ }
+
+#ifdef __WIN32__
+ desc->inet.send_would_block = 1;
+#endif
+ break;
+ }
+ }
+
+ if (desc->sendfile.length == 0) {
+ tcp_sendfile_completed(desc);
+ }
+
+ goto done;
+
+socket_error: {
+ int socket_errno = sock_errno();
+
+ DEBUGF(("tcp_inet_sendfile(%ld): send errno = %d (errno %d)\r\n",
+ (long)desc->inet.port, socket_errno, errno));
+
+ result = tcp_send_error(desc, socket_errno);
+ tcp_sendfile_aborted(desc, socket_errno);
+
+ goto done;
+ }
+
+done:
+ DEBUGF(("tcp_inet_sendfile(%ld) }\r\n", (long)desc->inet.port));
+ return result;
+}
+#endif /* HAVE_SENDFILE */
+
/* socket ready for ouput:
** 1. INET_STATE_CONNECTING => non block connect ?
** 2. INET_STATE_CONNECTED => write output
@@ -10736,10 +11225,11 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
#ifndef SO_ERROR
{
- int sz = sizeof(desc->inet.remote);
- int code = sock_peer(desc->inet.s,
- (struct sockaddr*) &desc->inet.remote, &sz);
-
+ int sz, code;
+ sz = sizeof(desc->inet.remote);
+ sys_memzero((char *) &desc->inet.remote, sz);
+ code = sock_peer(desc->inet.s,
+ (struct sockaddr*) &desc->inet.remote, &sz);
if (IS_SOCKET_ERROR(code)) {
desc->inet.state = INET_STATE_OPEN; /* restore state */
ret = async_error(INETP(desc), sock_errno());
@@ -10768,7 +11258,14 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
async_ok(INETP(desc));
}
else if (IS_CONNECTED(INETP(desc))) {
- for (;;) {
+
+#ifdef HAVE_SENDFILE
+ if(desc->tcp_add_flags & TCP_ADDF_SENDFILE) {
+ return tcp_inet_sendfile(desc);
+ }
+#endif
+
+ for (;;) {
int vsize;
ssize_t n;
SysIOVec* iov;
@@ -11174,24 +11671,20 @@ static ErlDrvSSizeT packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf,
(desc->sfamily, &remote, &buf, &len)) != NULL)
return ctl_xerror(xerror, rbuf, rsize);
- sock_select(desc, FD_CONNECT, 1);
code = sock_connect(desc->s, &remote.sa, len);
if (IS_SOCKET_ERROR(code) && (sock_errno() == EINPROGRESS)) {
/* XXX: Unix only -- WinSock would have a different cond! */
- desc->state = INET_STATE_CONNECTING;
if (timeout != INET_INFINITY)
driver_set_timer(desc->port, timeout);
enq_async(desc, tbuf, INET_REQ_CONNECT);
+ async_ok(desc);
}
else if (code == 0) { /* OK we are connected */
- sock_select(desc, FD_CONNECT, 0);
- desc->state = INET_STATE_CONNECTED;
enq_async(desc, tbuf, INET_REQ_CONNECT);
async_ok(desc);
}
else {
- sock_select(desc, FD_CONNECT, 0);
return ctl_error(sock_errno(), rbuf, rsize);
}
return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize);
@@ -11706,77 +12199,6 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event)
return count;
}
-static void packet_inet_drv_output(ErlDrvData e, ErlDrvEvent event)
-{
- (void) packet_inet_output((udp_descriptor*)e, (HANDLE)event);
-}
-
-/* UDP/SCTP socket ready for output:
-** This is a Back-End for Non-Block SCTP Connect (INET_STATE_CONNECTING)
-*/
-static int packet_inet_output(udp_descriptor* udesc, HANDLE event)
-{
- inet_descriptor* desc = INETP(udesc);
- int ret = 0;
- ErlDrvPort ix = desc->port;
-
- DEBUGF(("packet_inet_output(%ld) {s=%d\r\n",
- (long)desc->port, desc->s));
-
- if (desc->state == INET_STATE_CONNECTING) {
- sock_select(desc, FD_CONNECT, 0);
-
- driver_cancel_timer(ix); /* posssibly cancel a timer */
-#ifndef __WIN32__
- /*
- * XXX This is strange. This *should* work on Windows NT too,
- * but doesn't. An bug in Winsock 2.0 for Windows NT?
- *
- * See "Unix Netwok Programming", W.R.Stevens, p 412 for a
- * discussion about Unix portability and non blocking connect.
- */
-
-#ifndef SO_ERROR
- {
- int sz = sizeof(desc->remote);
- int code = sock_peer(desc->s,
- (struct sockaddr*) &desc->remote, &sz);
-
- if (IS_SOCKET_ERROR(code)) {
- desc->state = INET_STATE_OPEN; /* restore state */
- ret = async_error(desc, sock_errno());
- goto done;
- }
- }
-#else
- {
- int error = 0; /* Has to be initiated, we check it */
- unsigned int sz = sizeof(error); /* even if we get -1 */
- int code = sock_getopt(desc->s, SOL_SOCKET, SO_ERROR,
- (void *)&error, &sz);
-
- if ((code < 0) || error) {
- desc->state = INET_STATE_OPEN; /* restore state */
- ret = async_error(desc, error);
- goto done;
- }
- }
-#endif /* SO_ERROR */
-#endif /* !__WIN32__ */
-
- desc->state = INET_STATE_CONNECTED;
- async_ok(desc);
- }
- else {
- sock_select(desc,FD_CONNECT,0);
-
- DEBUGF(("packet_inet_output(%ld): bad state: %04x\r\n",
- (long)desc->port, desc->state));
- }
- done:
- DEBUGF(("packet_inet_output(%ld) }\r\n", (long)desc->port));
- return ret;
-}
#endif
/*---------------------------------------------------------------------------*/