From 59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893 Mon Sep 17 00:00:00 2001 From: Lukas Larsson Date: Fri, 25 Nov 2011 11:16:54 +0100 Subject: Implement ignorefd for TCP Ignore fd is a feature used by sendfile to temporarily remove all driver_select calls on that fd so that another driver can select on it. It also delays all actions which sends or receives data in that fd until in the fd is no longer ignored. Only the controlling_process should use the feature as it is otherwise possible that the ignore will never be cleaned up and hence create a memory leak in the driver. An ignored driver will not detect that an fd has been closed until it is unignored. --- erts/emulator/drivers/common/inet_drv.c | 67 +++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 7 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 1fe9e04341..56799555a9 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -445,6 +445,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) driver_select(port, e, mode | (on?ERL_DRV_USE:0), on) #define sock_select(d, flags, onoff) do { \ + ASSERT(!onoff || !(d)->is_ignored); \ (d)->event_mask = (onoff) ? \ ((d)->event_mask | (flags)) : \ ((d)->event_mask & ~(flags)); \ @@ -538,6 +539,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_REQ_GETIFADDRS 25 #define INET_REQ_ACCEPT 26 #define INET_REQ_LISTEN 27 +#define INET_REQ_IGNOREFD 28 + /* TCP requests */ /* #define TCP_REQ_ACCEPT 40 MOVED */ /* #define TCP_REQ_LISTEN 41 MERGED */ @@ -725,6 +728,11 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) /* Max interface name */ #define INET_IFNAMSIZ 16 +/* INET Ignore states */ +#define INET_IGNORE_NONE 0 +#define INET_IGNORE_READ 1 +#define INET_IGNORE_WRITE 1 << 1 + /* Max length of Erlang Term Buffer (for outputting structured terms): */ #ifdef HAVE_SCTP #define PACKET_ERL_DRV_TERM_DATA_LEN 512 @@ -864,6 +872,9 @@ typedef struct { double send_avg; /* average packet size sent */ subs_list empty_out_q_subs; /* Empty out queue subscribers */ + int is_ignored; /* if a fd is ignored by from the inet_drv, + this should be set to true when the fd is used + outside of inet_drv. */ } inet_descriptor; @@ -7302,6 +7313,8 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) sys_memzero((char *)&desc->remote,sizeof(desc->remote)); + desc->is_ignored = 0; + return (ErlDrvData)desc; } @@ -7584,6 +7597,33 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len, return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); } + case INET_REQ_IGNOREFD: { + DEBUGF(("inet_ctl(%ld): IGNOREFD, IGNORED = %d\r\n", + (long)desc->port,(int)*buf)); + + /* + * FD can only be ignored for connected TCP connections for now, + * possible to add UDP and SCTP support if needed. + */ + if (!IS_CONNECTED(desc)) + return ctl_error(ENOTCONN, rbuf, rsize); + + if (!desc->stype == SOCK_STREAM) + return ctl_error(EINVAL, rbuf, rsize); + + if (*buf == 1 && !desc->is_ignored) { + desc->is_ignored = INET_IGNORE_READ; + sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0); + } else if (*buf == 0 && desc->is_ignored) { + int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0)); + desc->is_ignored = INET_IGNORE_NONE; + sock_select(desc, flags, 1); + } else + return ctl_error(EINVAL, rbuf, rsize); + + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); + } + #ifndef VXWORKS case INET_REQ_GETSERVBYNAME: { /* L1 Name-String L2 Proto-String */ @@ -7959,6 +7999,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, char** rbuf, int rsize) { tcp_descriptor* desc = (tcp_descriptor*)e; + switch(cmd) { case INET_REQ_OPEN: { /* open socket and return internal index */ int domain; @@ -8224,13 +8265,14 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, if (enq_async(INETP(desc), tbuf, TCP_REQ_RECV) < 0) return ctl_error(EALREADY, rbuf, rsize); - if (tcp_recv(desc, n) == 0) { + if (INETP(desc)->is_ignored || tcp_recv(desc, n) == 0) { if (timeout == 0) async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); + driver_set_timer(desc->inet.port, timeout); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); } } return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); @@ -8970,6 +9012,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) #ifdef DEBUG long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_input(%ld) {s=%d\r\n", port, desc->inet.s)); if (desc->inet.state == INET_STATE_ACCEPTING) { SOCKET s; @@ -9231,7 +9274,11 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov, vsize, &n, 0))) { @@ -9259,7 +9306,8 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n", (long)desc->inet.port, desc->inet.s)); driver_enqv(ix, ev, n); - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9324,7 +9372,10 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) DEBUGF(("tcp_send(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { sock_send(desc->inet.s, buf, 0, 0); n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,2,&n,0))) { @@ -9355,7 +9406,8 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) n -= h_len; driver_enq(ix, ptr+n, len-n); } - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9379,6 +9431,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) int ret = 0; ErlDrvPort ix = desc->inet.port; + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); if (desc->inet.state == INET_STATE_CONNECTING) { -- cgit v1.2.3