diff options
Diffstat (limited to 'erts/emulator/drivers/common/inet_drv.c')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 570 |
1 files changed, 407 insertions, 163 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 54aefd9261..78411f324c 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -38,6 +38,7 @@ #include <ctype.h> #include <sys/types.h> #include <errno.h> +#include <stdint.h> #define IDENTITY(c) c #define STRINGIFY_1(b) IDENTITY(#b) @@ -633,12 +634,15 @@ static size_t my_strnlen(const char *s, size_t maxlen) * header length. To get the header length we use * the pointer difference from the cmsg start pointer * to the CMSG_DATA(cmsg) pointer. + * + * Some platforms (seen on ppc Linux 2.6.29-3.ydl61.3) + * may return 0 as the cmsg_len if the cmsg is to be ignored. */ -#define LEN_CMSG_DATA(cmsg) ((char*)CMSG_DATA(cmsg) - (char*)(cmsg)) +#define LEN_CMSG_DATA(cmsg) \ + ((cmsg)->cmsg_len < sizeof (struct cmsghdr) ? 0 : \ + (cmsg)->cmsg_len - ((char*)CMSG_DATA(cmsg) - (char*)(cmsg))) #define NXT_CMSG_HDR(cmsg) \ - ((struct cmsghdr*) \ - (((char*)(cmsg)) + \ - CMSG_SPACE((cmsg)->cmsg_len - LEN_CMSG_DATA(cmsg)))) + ((struct cmsghdr*)(((char*)(cmsg)) + CMSG_SPACE(LEN_CMSG_DATA(cmsg)))) #endif #if !defined(IPV6_PKTOPTIONS) && defined(IPV6_2292PKTOPTIONS) @@ -813,6 +817,7 @@ static size_t my_strnlen(const char *s, size_t maxlen) #define INET_OPT_PKTOPTIONS 45 /* IP(V6)_PKTOPTIONS get ancillary data */ #define INET_OPT_TTL 46 /* IP_TTL */ #define INET_OPT_RECVTTL 47 /* IP_RECVTTL ancillary data */ +#define TCP_OPT_NOPUSH 48 /* super-Nagle, aka TCP_CORK */ /* SCTP options: a separate range, from 100: */ #define SCTP_OPT_RTOINFO 100 #define SCTP_OPT_ASSOCINFO 101 @@ -955,6 +960,13 @@ static size_t my_strnlen(const char *s, size_t maxlen) #endif #endif +typedef struct _tcp_descriptor tcp_descriptor; + +#if defined(TCP_CORK) +#define INET_TCP_NOPUSH TCP_CORK +#elif defined(TCP_NOPUSH) && !defined(__DARWIN__) +#define INET_TCP_NOPUSH TCP_NOPUSH +#endif #define BIN_REALLOC_MARGIN(x) ((x)/4) /* 25% */ @@ -1004,16 +1016,19 @@ typedef struct _multi_timer_data { struct _multi_timer_data *prev; } MultiTimerData; -static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, - ErlDrvTermData caller, unsigned timeout, - void (*timeout_fun)(ErlDrvData drv_data, - ErlDrvTermData caller)); -static void fire_multi_timers(MultiTimerData **first, ErlDrvPort port, +static MultiTimerData *add_multi_timer(tcp_descriptor *desc, ErlDrvPort port, + ErlDrvTermData caller, unsigned timeout, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)); +static void fire_multi_timers(tcp_descriptor *desc, ErlDrvPort port, ErlDrvData data); -static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTimerData *p); +static void remove_multi_timer(tcp_descriptor *desc, ErlDrvPort port, MultiTimerData *p); +static void cancel_multi_timer(tcp_descriptor *desc, ErlDrvPort port, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)); static void tcp_inet_multi_timeout(ErlDrvData e, ErlDrvTermData caller); -static void clean_multi_timers(MultiTimerData **first, ErlDrvPort port); +static void clean_multi_timers(tcp_descriptor *desc, ErlDrvPort port); typedef struct { int id; /* id used to identify reply */ @@ -1272,7 +1287,7 @@ static struct erl_drv_entry sctp_inet_driver_entry = }; #endif -typedef struct { +struct _tcp_descriptor { inet_descriptor inet; /* common data structure (DON'T MOVE) */ int high; /* high watermark */ int low; /* low watermark */ @@ -1288,7 +1303,8 @@ typedef struct { int http_state; /* 0 = response|request 1=headers fields */ inet_async_multi_op *multi_first;/* NULL == no multi-accept-queue, op is in ordinary queue */ inet_async_multi_op *multi_last; - MultiTimerData *mtd; /* Timer structures for multiple accept */ + MultiTimerData *mtd; /* Timer structures for multiple accept */ + MultiTimerData *mtd_cache; /* A cache for timer allocations */ #ifdef HAVE_SENDFILE struct { ErlDrvSizeT ioq_skip; /* The number of bytes in the queue at the time @@ -1304,7 +1320,7 @@ typedef struct { Uint64 length; } sendfile; #endif -} tcp_descriptor; +}; /* send function */ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len); @@ -1314,7 +1330,10 @@ static int tcp_deliver(tcp_descriptor* desc, int len); static int tcp_shutdown_error(tcp_descriptor* desc, int err); +#ifdef HAVE_SENDFILE static int tcp_inet_sendfile(tcp_descriptor* desc); +static int tcp_sendfile_aborted(tcp_descriptor* desc, int socket_error); +#endif static int tcp_inet_output(tcp_descriptor* desc, HANDLE event); static int tcp_inet_input(tcp_descriptor* desc, HANDLE event); @@ -2796,39 +2815,61 @@ static int inet_async_data(inet_descriptor* desc, const char* buf, int len) } #ifndef __WIN32__ +static int load_cmsg_int(ErlDrvTermData *spec, int i, + struct cmsghdr *cmsg) { + union u { + byte uint8; + Uint16 uint16; + Uint32 uint32; + Uint64 uint64; + } *p; + p = (union u*) CMSG_DATA(cmsg); + switch (LEN_CMSG_DATA(cmsg) * CHAR_BIT) { + case 8: + return LOAD_INT(spec, i, p->uint8); + case 16: + return LOAD_INT(spec, i, p->uint16); + + case 32: + return LOAD_INT(spec, i, p->uint32); + + case 64: + return LOAD_INT(spec, i, p->uint64); + } + return LOAD_INT(spec, i, 0); +} + static int parse_ancillary_data_item(ErlDrvTermData *spec, int i, struct cmsghdr *cmsg, int *n) { -#define LOAD_CMSG(proto, type, vtype, am, load) \ - if (cmsg->cmsg_level == (proto) && \ - cmsg->cmsg_type == (type)) { \ - vtype *vp; \ - vp = (vtype *)CMSG_DATA(cmsg); \ - i = LOAD_ATOM(spec, i, (am)); \ - i = load(spec, i, *vp); \ - i = LOAD_TUPLE(spec, i, 2); \ - (*n)++; \ - return i; \ +#define LOAD_CMSG_INT(proto, type, am) \ + if (cmsg->cmsg_level == (proto) && \ + cmsg->cmsg_type == (type)) { \ + i = LOAD_ATOM(spec, i, (am)); \ + i = load_cmsg_int(spec, i, cmsg); \ + i = LOAD_TUPLE(spec, i, 2); \ + (*n)++; \ + return i; \ } #if defined(IPPROTO_IP) && defined(IP_TOS) - LOAD_CMSG(IPPROTO_IP, IP_TOS, unsigned char, am_tos, LOAD_INT); + LOAD_CMSG_INT(IPPROTO_IP, IP_TOS, am_tos); #endif #if defined(IPPROTO_IPV6) && defined(IPV6_TCLASS) - LOAD_CMSG(IPPROTO_IPV6, IPV6_TCLASS, unsigned char, am_tclass, LOAD_INT); + LOAD_CMSG_INT(IPPROTO_IPV6, IPV6_TCLASS, am_tclass); #endif #if defined(IPPROTO_IP) && defined(IP_TTL) - LOAD_CMSG(IPPROTO_IP, IP_TTL, unsigned char, am_ttl, LOAD_INT); + LOAD_CMSG_INT(IPPROTO_IP, IP_TTL, am_ttl); #endif /* BSD uses the RECV* names in CMSG fields */ #if defined(IPPROTO_IP) && defined(IP_RECVTOS) - LOAD_CMSG(IPPROTO_IP, IP_RECVTOS, unsigned char, am_tos, LOAD_INT); + LOAD_CMSG_INT(IPPROTO_IP, IP_RECVTOS, am_tos); #endif #if defined(IPPROTO_IPV6) && defined(IPV6_RECVTCLASS) - LOAD_CMSG(IPPROTO_IPV6, IPV6_RECVTCLASS, unsigned char, am_tclass, LOAD_INT); + LOAD_CMSG_INT(IPPROTO_IPV6, IPV6_RECVTCLASS, am_tclass); #endif #if defined(IPPROTO_IP) && defined(IP_RECVTTL) - LOAD_CMSG(IPPROTO_IP, IP_RECVTTL, unsigned char, am_ttl, LOAD_INT); + LOAD_CMSG_INT(IPPROTO_IP, IP_RECVTTL, am_ttl); #endif -#undef LOAD_CMSG +#undef LOAD_CMSG_INT return i; } #endif /* #ifndef __WIN32__ */ @@ -5157,6 +5198,71 @@ static int hwaddr_libdlpi_lookup(const char *ifnm, } #endif +#ifdef HAVE_GETIFADDRS +/* Returns 0 for success and errno() for failure */ +static int call_getifaddrs(inet_descriptor* desc_p, struct ifaddrs **ifa_pp) +{ + int result, save_errno; +#ifdef HAVE_SETNS + int current_ns; + + current_ns = 0; + if (desc_p->netns != NULL) { + int new_ns; + /* Temporarily change network namespace for this thread + * over the getifaddrs() call + */ + current_ns = open("/proc/self/ns/net", O_RDONLY); + if (current_ns == INVALID_SOCKET) + return sock_errno(); + new_ns = open(desc_p->netns, O_RDONLY); + if (new_ns == INVALID_SOCKET) { + save_errno = sock_errno(); + while (close(current_ns) == INVALID_SOCKET && + sock_errno() == EINTR); + return save_errno; + } + if (setns(new_ns, CLONE_NEWNET) != 0) { + save_errno = sock_errno(); + while (close(new_ns) == INVALID_SOCKET && + sock_errno() == EINTR); + while (close(current_ns) == INVALID_SOCKET && + sock_errno() == EINTR); + return save_errno; + } + else { + while (close(new_ns) == INVALID_SOCKET && + sock_errno() == EINTR); + } + } +#endif + save_errno = 0; + result = getifaddrs(ifa_pp); + if (result < 0) + save_errno = sock_errno(); +#ifdef HAVE_SETNS + if (desc_p->netns != NULL) { + /* Restore network namespace */ + if (setns(current_ns, CLONE_NEWNET) != 0) { + /* XXX Failed to restore network namespace. + * What to do? Tidy up and return an error... + * Note that the thread now might still be in the set namespace. + * Can this even happen? Should the emulator be aborted? + */ + if (result >= 0) { + /* We got a result but have to waste it */ + save_errno = sock_errno(); + freeifaddrs(*ifa_pp); + } + } + while (close(current_ns) == INVALID_SOCKET && + sock_errno() == EINTR); + } +#endif + return save_errno; +} +#endif /* #ifdef HAVE_GETIFADDRS */ + /* FIXME: temporary hack */ #ifndef IFHWADDRLEN #define IFHWADDRLEN 6 @@ -5234,8 +5340,8 @@ static ErlDrvSSizeT inet_ctl_ifget(inet_descriptor* desc, struct sockaddr_dl *sdlp; int found = 0; - if (getifaddrs(&ifa) == -1) - goto error; + if (call_getifaddrs(desc, &ifa) != 0) + goto error; for (ifp = ifa; ifp; ifp = ifp->ifa_next) { if ((ifp->ifa_addr->sa_family == AF_LINK) && @@ -5953,6 +6059,7 @@ static ErlDrvSSizeT inet_ctl_getifaddrs(inet_descriptor* desc_p, ErlDrvSizeT buf_size; char *buf_p; char *buf_alloc_p; + int save_errno; buf_size = GETIFADDRS_BUFSZ; buf_alloc_p = ALLOC(GETIFADDRS_BUFSZ); @@ -5987,9 +6094,9 @@ static ErlDrvSSizeT inet_ctl_getifaddrs(inet_descriptor* desc_p, } \ } while (0) - if (getifaddrs(&ifa_p) < 0) { - return ctl_error(sock_errno(), rbuf_pp, rsize); - } + if ((save_errno = call_getifaddrs(desc_p, &ifa_p)) != 0) + return ctl_error(save_errno, rbuf_pp, rsize); + ifa_free_p = ifa_p; *buf_p++ = INET_REP_OK; for (; ifa_p; ifa_p = ifa_p->ifa_next) { @@ -6511,6 +6618,19 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) (long)desc->port, desc->s, ival)); break; + case TCP_OPT_NOPUSH: +#if defined(INET_TCP_NOPUSH) + proto = IPPROTO_TCP; + type = INET_TCP_NOPUSH; + DEBUGF(("inet_set_opts(%ld): s=%d, t=%d TCP_NOPUSH=%d\r\n", + (long)desc->port, desc->s, type, ival)); + break; +#else + /* inet_fill_opts always returns a value for this option, + * so we need to ignore it if not implemented, just in case */ + continue; +#endif + #if defined(HAVE_MULTICAST_SUPPORT) && defined(IPPROTO_IP) case UDP_OPT_MULTICAST_TTL: @@ -7330,6 +7450,35 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) } #endif /* HAVE_SCTP */ +#ifndef __WIN32__ +static void put_cmsg_int32(struct cmsghdr *cmsg, char *ptr) { + union u { + byte uint8; + Uint16 uint16; + Uint32 uint32; + Uint64 uint64; + } *p; + p = (union u*) CMSG_DATA(cmsg); + switch (LEN_CMSG_DATA(cmsg) * CHAR_BIT) { + case 8: + put_int32((Uint32) p->uint8, ptr); + break; + case 16: + put_int32((Uint32) p->uint16, ptr); + break; + case 32: + put_int32(p->uint32, ptr); + break; + case 64: + put_int32((Uint32) p->uint64, ptr); + break; + default: + put_int32(0, ptr); + } + return; +} +#endif + /* load all option values into the buf and reply ** return total length of reply filled into ptr ** ptr should point to a buffer with 9*len +1 to be safe!! @@ -7643,6 +7792,16 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, proto = IPPROTO_TCP; type = TCP_NODELAY; break; + case TCP_OPT_NOPUSH: +#if defined(INET_TCP_NOPUSH) + proto = IPPROTO_TCP; + type = INET_TCP_NOPUSH; + break; +#else + *ptr++ = opt; + put_int32(0, ptr); + continue; +#endif #if defined(HAVE_MULTICAST_SUPPORT) && defined(IPPROTO_IP) case UDP_OPT_MULTICAST_TTL: @@ -7789,51 +7948,43 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, * cmsg options and values */ PLACE_FOR(1+4, ptr); - *ptr = opt; - arg_ptr = ptr+1; /* Where to put total length */ + *ptr++ = opt; + arg_ptr = ptr; /* Where to put total length */ arg_sz = 0; /* Total length */ for (cmsg_top = (struct cmsghdr*)(cmsgbuf.buf + cmsg_sz), cmsg = (struct cmsghdr*)cmsgbuf.buf; cmsg < cmsg_top; cmsg = NXT_CMSG_HDR(cmsg)) { -#define PUT_CMSG_DATA(CMSG_LEVEL, CMSG_TYPE, OPT, TYPE, SZ, PUT) \ - if ((cmsg->cmsg_level == CMSG_LEVEL) && \ - (cmsg->cmsg_type == CMSG_TYPE)) { \ - TYPE *cmsgp; \ - cmsgp = (TYPE *)CMSG_DATA(cmsg); \ - PLACE_FOR(1+SZ, ptr); \ - *ptr = OPT; \ - PUT(*cmsgp, ptr+1); \ - arg_sz += 1+SZ; \ - continue; \ +#define PUT_CMSG_INT32(CMSG_LEVEL, CMSG_TYPE, OPT) \ + if ((cmsg->cmsg_level == CMSG_LEVEL) && \ + (cmsg->cmsg_type == CMSG_TYPE)) { \ + PLACE_FOR(1+4, ptr); \ + *ptr++ = OPT; \ + put_cmsg_int32(cmsg, ptr); \ + arg_sz += 1+4; \ + continue; \ } #if defined(IPPROTO_IP) && defined(IP_TOS) - PUT_CMSG_DATA(IPPROTO_IP, IP_TOS, - INET_OPT_TOS, unsigned char, 4, put_int32); + PUT_CMSG_INT32(IPPROTO_IP, IP_TOS, INET_OPT_TOS); #endif #if defined(IPPROTO_IPV6) && defined(IPV6_TCLASS) - PUT_CMSG_DATA(IPPROTO_IPV6, IPV6_TCLASS, - INET_OPT_TCLASS, unsigned char, 4, put_int32); + PUT_CMSG_INT32(IPPROTO_IPV6, IPV6_TCLASS, INET_OPT_TCLASS); #endif #if defined(IPPROTO_IP) && defined(IP_TTL) - PUT_CMSG_DATA(IPPROTO_IP, IP_TTL, - INET_OPT_TTL, unsigned char, 4, put_int32); + PUT_CMSG_INT32(IPPROTO_IP, IP_TTL, INET_OPT_TTL); #endif /* BSD uses the RECV* names in CMSG fields */ - } #if defined(IPPROTO_IP) && defined(IP_RECVTOS) - PUT_CMSG_DATA(IPPROTO_IP, IP_RECVTOS, - INET_OPT_TOS, unsigned char, 4, put_int32); + PUT_CMSG_INT32(IPPROTO_IP, IP_RECVTOS, INET_OPT_TOS); #endif #if defined(IPPROTO_IPV6) && defined(IPV6_RECVTCLASS) - PUT_CMSG_DATA(IPPROTO_IPV6, IPV6_RECVTCLASS, - INET_OPT_TCLASS, unsigned char, 4, put_int32); + PUT_CMSG_INT32(IPPROTO_IPV6, IPV6_RECVTCLASS, INET_OPT_TCLASS); #endif #if defined(IPPROTO_IP) && defined(IP_RECVTTL) - PUT_CMSG_DATA(IPPROTO_IP, IP_RECVTTL, - INET_OPT_TTL, unsigned char, 4, put_int32); + PUT_CMSG_INT32(IPPROTO_IP, IP_RECVTTL, INET_OPT_TTL); #endif -#undef PUT_CMSG_DATA +#undef PUT_CMSG_INT32 + } put_int32(arg_sz, arg_ptr); /* Put total length */ continue; } @@ -9634,6 +9785,7 @@ static ErlDrvData prep_tcp_inet_start(ErlDrvPort port, char* args) desc->tcp_add_flags = 0; desc->http_state = 0; desc->mtd = NULL; + desc->mtd_cache = NULL; desc->multi_first = desc->multi_last = NULL; DEBUGF(("tcp_inet_start(%ld) }\r\n", (long)port)); return (ErlDrvData) desc; @@ -9737,15 +9889,14 @@ static void tcp_close_check(tcp_descriptor* desc) driver_demonitor_process(desc->inet.port, &monitor); send_async_error(desc->inet.dport, id, caller, am_closed); } - clean_multi_timers(&(desc->mtd), desc->inet.port); } - else if (desc->inet.state == INET_STATE_CONNECTING) { async_error_am(INETP(desc), am_closed); } else if (desc->inet.state == INET_STATE_CONNECTED) { async_error_am_all(INETP(desc), am_closed); } + clean_multi_timers(desc, desc->inet.port); } /* @@ -9788,6 +9939,15 @@ static void tcp_desc_close(tcp_descriptor* desc) erl_inet_close(INETP(desc)); } +static void tcp_inet_recv_timeout(ErlDrvData e, ErlDrvTermData dummy) +{ + tcp_descriptor* desc = (tcp_descriptor*)e; + ASSERT(!desc->inet.active); + sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); + desc->i_remain = 0; + async_error_am(INETP(desc), am_timeout); +} + /* TCP requests from Erlang */ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, ErlDrvSizeT len, @@ -9795,6 +9955,7 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, { tcp_descriptor* desc = (tcp_descriptor*)e; + cmd -= ERTS_INET_DRV_CONTROL_MAGIC_NUMBER; switch(cmd) { case INET_REQ_OPEN: { /* open socket and return internal index */ int domain; @@ -9958,12 +10119,12 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, if (time_left <= 0) { time_left = 1; } - omtd = add_multi_timer(&(desc->mtd), desc->inet.port, ocaller, + omtd = add_multi_timer(desc, desc->inet.port, ocaller, time_left, &tcp_inet_multi_timeout); } enq_old_multi_op(desc, oid, oreq, ocaller, omtd, &omonitor); if (timeout != INET_INFINITY) { - mtd = add_multi_timer(&(desc->mtd), desc->inet.port, caller, + mtd = add_multi_timer(desc, desc->inet.port, caller, timeout, &tcp_inet_multi_timeout); } enq_multi_op(desc, tbuf, INET_REQ_ACCEPT, caller, mtd, &monitor); @@ -9978,7 +10139,7 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, return ctl_xerror("noproc", rbuf, rsize); } if (timeout != INET_INFINITY) { - mtd = add_multi_timer(&(desc->mtd), desc->inet.port, caller, + mtd = add_multi_timer(desc, desc->inet.port, caller, timeout, &tcp_inet_multi_timeout); } enq_multi_op(desc, tbuf, INET_REQ_ACCEPT, caller, mtd, &monitor); @@ -10075,7 +10236,8 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); + add_multi_timer(desc, INETP(desc)->port, 0, + timeout, &tcp_inet_recv_timeout); if (!INETP(desc)->is_ignored) sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); else @@ -10162,12 +10324,11 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, desc->tcp_add_flags |= TCP_ADDF_SENDFILE; /* See if we can finish sending without selecting & rescheduling. */ - tcp_inet_sendfile(desc); - - if(desc->sendfile.length > 0) { - sock_select(INETP(desc), FD_WRITE, 1); + if (tcp_inet_sendfile(desc) == 0) { + if(desc->sendfile.length > 0) { + sock_select(INETP(desc), FD_WRITE, 1); + } } - return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); #else return ctl_error(ENOTSUP, rbuf, rsize); @@ -10181,12 +10342,27 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, } +static void tcp_inet_send_timeout(ErlDrvData e, ErlDrvTermData dummy) +{ + tcp_descriptor* desc = (tcp_descriptor*)e; + ASSERT(IS_BUSY(INETP(desc))); + ASSERT(desc->busy_on_send); + desc->inet.caller = desc->inet.busy_caller; + desc->inet.state &= ~INET_F_BUSY; + desc->busy_on_send = 0; + set_busy_port(desc->inet.port, 0); + inet_reply_error_am(INETP(desc), am_timeout); + if (desc->send_timeout_close) { + tcp_desc_close(desc); + } +} + /* ** tcp_inet_timeout: ** called when timer expire: ** TCP socket may be: ** -** a) receiving -- deselect +** a) receiving -- send timeout ** b) connecting -- close socket ** c) accepting -- reset listener ** @@ -10200,26 +10376,9 @@ static void tcp_inet_timeout(ErlDrvData e) DEBUGF(("tcp_inet_timeout(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); if ((state & INET_F_MULTI_CLIENT)) { /* Multi-client always means multi-timers */ - fire_multi_timers(&(desc->mtd), desc->inet.port, e); + fire_multi_timers(desc, desc->inet.port, e); } else if ((state & INET_STATE_CONNECTED) == INET_STATE_CONNECTED) { - if (desc->busy_on_send) { - ASSERT(IS_BUSY(INETP(desc))); - desc->inet.caller = desc->inet.busy_caller; - desc->inet.state &= ~INET_F_BUSY; - desc->busy_on_send = 0; - set_busy_port(desc->inet.port, 0); - inet_reply_error_am(INETP(desc), am_timeout); - if (desc->send_timeout_close) { - tcp_desc_close(desc); - } - } - else { - /* assume recv timeout */ - ASSERT(!desc->inet.active); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); - desc->i_remain = 0; - async_error_am(INETP(desc), am_timeout); - } + fire_multi_timers(desc, desc->inet.port, e); } else if ((state & INET_STATE_CONNECTING) == INET_STATE_CONNECTING) { /* assume connect timeout */ @@ -10349,7 +10508,7 @@ static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) return; } if (timeout != NULL) { - remove_multi_timer(&(desc->mtd), desc->inet.port, timeout); + remove_multi_timer(desc, desc->inet.port, timeout); } if (desc->multi_first == NULL) { sock_select(INETP(desc),FD_ACCEPT,0); @@ -10380,6 +10539,7 @@ static int tcp_recv_closed(tcp_descriptor* desc) #ifdef DEBUG long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif + int blocking_send = 0; DEBUGF(("tcp_recv_closed(%ld): s=%d, in %s, line %d\r\n", port, desc->inet.s, __FILE__, __LINE__)); if (IS_BUSY(INETP(desc))) { @@ -10387,7 +10547,7 @@ static int tcp_recv_closed(tcp_descriptor* desc) desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; DEBUGF(("tcp_recv_closed(%ld): busy on send\r\n", port)); } @@ -10395,16 +10555,25 @@ static int tcp_recv_closed(tcp_descriptor* desc) set_busy_port(desc->inet.port, 0); inet_reply_error_am(INETP(desc), am_closed); DEBUGF(("tcp_recv_closed(%ld): busy reply 'closed'\r\n", port)); - } else { + blocking_send = 1; + } +#ifdef HAVE_SENDFILE + if (desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + tcp_sendfile_aborted(desc, ENOTCONN); + blocking_send = 1; + } +#endif + if (!blocking_send) { /* No blocking send op to reply to right now. * If next op is a send, make sure it returns {error,closed} * rather than {error,enotconn}. */ desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND; } + if (!desc->inet.active) { - /* We must cancel any timer here ! */ - driver_cancel_timer(desc->inet.port); + /* We must cancel any timer here ! */ + clean_multi_timers(desc, INETP(desc)->port); /* passive mode do not terminate port ! */ tcp_clear_input(desc); if (desc->inet.exitf) { @@ -10439,16 +10608,21 @@ static int tcp_recv_error(tcp_descriptor* desc, int err) desc->inet.caller = desc->inet.busy_caller; tcp_clear_output(desc); if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; } desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); inet_reply_error_am(INETP(desc), am_closed); } +#ifdef HAVE_SENDFILE + if (desc->tcp_add_flags & TCP_ADDF_SENDFILE) { + tcp_sendfile_aborted(desc, err); + } +#endif if (!desc->inet.active) { /* We must cancel any timer here ! */ - driver_cancel_timer(desc->inet.port); + clean_multi_timers(desc, INETP(desc)->port); tcp_clear_input(desc); if (desc->inet.exitf) { tcp_desc_close(desc); @@ -10553,13 +10727,13 @@ static int tcp_deliver(tcp_descriptor* desc, int len) if (len == 0) { /* empty buffer or waiting for more input */ if ((desc->i_buf == NULL) || (desc->i_remain > 0)) - return count; + return 0; if ((n = tcp_remain(desc, &len)) != 0) { if (n < 0) /* packet error */ return n; if (len > 0) /* more data pending */ desc->i_remain = len; - return count; + return 0; } } @@ -10611,9 +10785,7 @@ static int tcp_deliver(tcp_descriptor* desc, int len) len = 0; if (!desc->inet.active) { - if (!desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); - } + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_recv_timeout); sock_select(INETP(desc),(FD_READ|FD_CLOSE),0); if (desc->i_buf != NULL) tcp_restart_input(desc); @@ -10639,7 +10811,7 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) int len; int nread; - if (desc->i_buf == NULL) { /* allocte a read buffer */ + if (desc->i_buf == NULL) { /* allocate a read buffer */ int sz = (request_len > 0) ? request_len : desc->inet.bufsz; if ((desc->i_buf = alloc_buffer(sz)) == NULL) @@ -10712,10 +10884,11 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) return tcp_deliver(desc, desc->i_ptr - desc->i_ptr_start); } else { - if ((nread = tcp_remain(desc, &len)) < 0) + nread = tcp_remain(desc, &len); + if (nread < 0) return tcp_recv_error(desc, EMSGSIZE); else if (nread == 0) - return tcp_deliver(desc, len); + return tcp_deliver(desc, len); else if (len > 0) desc->i_remain = len; /* set remain */ } @@ -11034,7 +11207,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) } if (timeout != NULL) { - remove_multi_timer(&(desc->mtd), desc->inet.port, timeout); + remove_multi_timer(desc, desc->inet.port, timeout); } driver_demonitor_process(desc->inet.port, &monitor); @@ -11093,8 +11266,8 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) if (IS_BUSY(INETP(desc))) { desc->inet.caller = desc->inet.busy_caller; if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); - desc->busy_on_send = 0; + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); + desc->busy_on_send = 0; } desc->inet.state &= ~INET_F_BUSY; set_busy_port(desc->inet.port, 0); @@ -11109,27 +11282,31 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) DEBUGF(("driver_failure_eof(%ld) in %s, line %d\r\n", (long)desc->inet.port, __FILE__, __LINE__)); if (desc->inet.active) { + ErlDrvTermData err_atom; if (show_econnreset) { tcp_error_message(desc, err); - tcp_closed_message(desc); - inet_reply_error(INETP(desc), err); + err_atom = error_atom(err); } else { - tcp_closed_message(desc); - inet_reply_error_am(INETP(desc), am_closed); + err_atom = am_closed; } + tcp_closed_message(desc); + if (!(desc->tcp_add_flags & TCP_ADDF_SENDFILE)) + inet_reply_error_am(INETP(desc), err_atom); + if (desc->inet.exitf) driver_exit(desc->inet.port, 0); else tcp_desc_close(desc); } else { tcp_close_check(desc); - tcp_desc_close(desc); if (desc->inet.caller) { - if (show_econnreset) - inet_reply_error(INETP(desc), err); - else - inet_reply_error_am(INETP(desc), am_closed); + if (!(desc->tcp_add_flags & TCP_ADDF_SENDFILE)) { + if (show_econnreset) + inet_reply_error(INETP(desc), err); + else + inet_reply_error_am(INETP(desc), am_closed); + } } else { /* No blocking send op to reply to right now. @@ -11138,6 +11315,7 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) */ desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND; } + tcp_desc_close(desc); /* * Make sure that the next receive operation gets an {error,closed} @@ -11194,6 +11372,12 @@ static int tcp_shutdown_error(tcp_descriptor* desc, int err) return tcp_send_or_shutdown_error(desc, err); } +static void tcp_inet_delay_send(ErlDrvData data, ErlDrvTermData dummy) +{ + tcp_descriptor *desc = (tcp_descriptor*)data; + (void)tcp_inet_output(desc, INETP(desc)->s); +} + /* ** Send non-blocking vector data */ @@ -11246,7 +11430,9 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; - driver_set_timer(desc->inet.port, desc->send_timeout); + add_multi_timer(desc, INETP(desc)->port, + 0 /* arg */, desc->send_timeout /* timeout */, + &tcp_inet_send_timeout); } return 1; } @@ -11261,7 +11447,10 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) INETP(desc)->is_ignored |= INET_IGNORE_WRITE; n = 0; } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { - n = 0; + driver_enqv(ix, ev, 0); + add_multi_timer(desc, INETP(desc)->port, 0, + 0, &tcp_inet_delay_send); + return 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov, vsize, &n, 0))) { if ((sock_errno() != ERRNO_BLOCK) && (sock_errno() != EINTR)) { @@ -11344,7 +11533,9 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, ErlDrvSizeT len) set_busy_port(desc->inet.port, 1); if (desc->send_timeout != INET_INFINITY) { desc->busy_on_send = 1; - driver_set_timer(desc->inet.port, desc->send_timeout); + add_multi_timer(desc, INETP(desc)->port, + 0 /* arg */, desc->send_timeout /* timeout */, + &tcp_inet_send_timeout); } return 1; } @@ -11448,7 +11639,8 @@ static int tcp_sendfile_completed(tcp_descriptor* desc) { /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, + &tcp_inet_send_timeout); desc->busy_on_send = 0; } @@ -11650,8 +11842,8 @@ socket_error: { DEBUGF(("tcp_inet_sendfile(%ld): send errno = %d (errno %d)\r\n", (long)desc->inet.port, socket_errno, errno)); - result = tcp_send_error(desc, socket_errno); tcp_sendfile_aborted(desc, socket_errno); + result = tcp_send_error(desc, socket_errno); goto done; } @@ -11755,6 +11947,12 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) #ifdef __WIN32__ desc->inet.send_would_block = 1; #endif + /* If DELAY_SEND is set ready_output may have + been called without doing select so we do + a select in order to get into the correct + state */ + if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) + sock_select(INETP(desc), FD_WRITE, 1); goto done; } else if (n == 0) { /* Workaround for redhat/CentOS 6.3 returning 0 when sending packets with @@ -11780,7 +11978,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) set_busy_port(desc->inet.port, 0); /* if we have a timer then cancel and send ok to client */ if (desc->busy_on_send) { - driver_cancel_timer(desc->inet.port); + cancel_multi_timer(desc, INETP(desc)->port, &tcp_inet_send_timeout); desc->busy_on_send = 0; } inet_reply_ok(INETP(desc)); @@ -11987,6 +12185,7 @@ static ErlDrvSSizeT packet_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int type = SOCK_DGRAM; int af = AF_INET; + cmd -= ERTS_INET_DRV_CONTROL_MAGIC_NUMBER; switch(cmd) { case INET_REQ_OPEN: /* open socket and return internal index */ DEBUGF(("packet_inet_ctl(%ld): OPEN\r\n", (long)desc->port)); @@ -12592,7 +12791,7 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) udesc->i_buf = NULL; if (!desc->active) { async_error(desc, err); - driver_cancel_timer(desc->port); + driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); } else { @@ -12681,7 +12880,7 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) return count; count++; if (!desc->active) { - driver_cancel_timer(desc->port); /* possibly cancel */ + driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); return count; /* passive mode (read one packet only) */ } @@ -12760,55 +12959,71 @@ make_noninheritable_handle(SOCKET s) * Multi-timers */ -static void fire_multi_timers(MultiTimerData **first, ErlDrvPort port, +static void fire_multi_timers(tcp_descriptor *desc, ErlDrvPort port, ErlDrvData data) { - ErlDrvTime next_timeout; - if (!*first) { - ASSERT(0); - return; + ErlDrvTime next_timeout = 0; + if (!desc->mtd) { + ASSERT(0); + return; } #ifdef DEBUG { ErlDrvTime chk = erl_drv_monotonic_time(ERL_DRV_MSEC); - ASSERT(chk >= (*first)->when); + ASSERT(chk >= desc->mtd->when); } #endif do { - MultiTimerData *save = *first; - *first = save->next; - (*(save->timeout_function))(data,save->caller); - FREE(save); - if (*first == NULL) { + MultiTimerData save = *desc->mtd; + + /* We first remove the timer so that the timeout_functions has + can call clean_multi_timers without breaking anything */ + if (desc->mtd_cache == NULL) { + desc->mtd_cache = desc->mtd; + } else { + FREE(desc->mtd); + } + + desc->mtd = save.next; + if (desc->mtd != NULL) + desc->mtd->prev = NULL; + + (*(save.timeout_function))(data,save.caller); + + if (desc->mtd == NULL) return; - } - (*first)->prev = NULL; - next_timeout = (*first)->when - erl_drv_monotonic_time(ERL_DRV_MSEC); + + next_timeout = desc->mtd->when - erl_drv_monotonic_time(ERL_DRV_MSEC); } while (next_timeout <= 0); + driver_set_timer(port, (unsigned long) next_timeout); } -static void clean_multi_timers(MultiTimerData **first, ErlDrvPort port) +static void clean_multi_timers(tcp_descriptor *desc, ErlDrvPort port) { - MultiTimerData *p; - if (*first) { + if (desc->mtd) { driver_cancel_timer(port); } - while (*first) { - p = *first; - *first = p->next; - FREE(p); + while (desc->mtd) { + MultiTimerData *p = desc->mtd; + desc->mtd = p->next; + FREE(p); + } + desc->mtd = NULL; + if (desc->mtd_cache) { + FREE(desc->mtd_cache); + desc->mtd_cache = NULL; } } -static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTimerData *p) +static void remove_multi_timer(tcp_descriptor *desc, ErlDrvPort port, MultiTimerData *p) { if (p->prev != NULL) { p->prev->next = p->next; } else { driver_cancel_timer(port); - *first = p->next; - if (*first) { - ErlDrvTime ntmo = (*first)->when - erl_drv_monotonic_time(ERL_DRV_MSEC); + desc->mtd = p->next; + if (desc->mtd) { + ErlDrvTime ntmo = desc->mtd->when - erl_drv_monotonic_time(ERL_DRV_MSEC); if (ntmo < 0) ntmo = 0; driver_set_timer(port, (unsigned long) ntmo); @@ -12817,36 +13032,67 @@ static void remove_multi_timer(MultiTimerData **first, ErlDrvPort port, MultiTim if (p->next != NULL) { p->next->prev = p->prev; } - FREE(p); + if (desc->mtd_cache == NULL) + desc->mtd_cache = p; + else + FREE(p); } -static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, +/* Cancel a timer based on the timeout_fun */ +static void cancel_multi_timer(tcp_descriptor *desc, ErlDrvPort port, + void (*timeout_fun)(ErlDrvData drv_data, + ErlDrvTermData caller)) +{ + MultiTimerData *timer = desc->mtd; + while(timer && timer->timeout_function != timeout_fun) { + timer = timer->next; + } + if (timer) { + remove_multi_timer(desc, port, timer); + } +} + +static MultiTimerData *add_multi_timer(tcp_descriptor *desc, ErlDrvPort port, ErlDrvTermData caller, unsigned timeout, void (*timeout_fun)(ErlDrvData drv_data, ErlDrvTermData caller)) { MultiTimerData *mtd, *p, *s; - mtd = ALLOC(sizeof(MultiTimerData)); - mtd->when = erl_drv_monotonic_time(ERL_DRV_MSEC) + ((ErlDrvTime) timeout) + 1; + + /* Use cached timer if available */ + if (desc->mtd_cache != NULL) { + mtd = desc->mtd_cache; + desc->mtd_cache = NULL; + } else + mtd = ALLOC(sizeof(MultiTimerData)); + + if (timeout) + mtd->when = erl_drv_monotonic_time(ERL_DRV_MSEC) + ((ErlDrvTime) timeout); + else + mtd->when = INT64_MIN; /* Don't have to get the time for 0 msec timeouts */ + mtd->timeout_function = timeout_fun; mtd->caller = caller; mtd->next = mtd->prev = NULL; - for(p = *first,s = NULL; p != NULL; s = p, p = p->next) { + + /* Find correct slot in timer linked list */ + for(p = desc->mtd,s = NULL; p != NULL; s = p, p = p->next) { if (p->when >= mtd->when) { break; } } + /* Insert in linked list */ if (!p) { if (!s) { - *first = mtd; + desc->mtd = mtd; } else { s->next = mtd; mtd->prev = s; } } else { if (!s) { - *first = mtd; + desc->mtd = mtd; } else { s->next = mtd; mtd->prev = s; @@ -12854,10 +13100,8 @@ static MultiTimerData *add_multi_timer(MultiTimerData **first, ErlDrvPort port, mtd->next = p; p->prev = mtd; } + /* Possibly set new timer */ if (!s) { - if (mtd->next) { - driver_cancel_timer(port); - } driver_set_timer(port,timeout); } return mtd; |