diff options
author | Lukas Larsson <[email protected]> | 2011-11-25 11:16:54 +0100 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2011-12-01 14:10:04 +0100 |
commit | 59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893 (patch) | |
tree | c7a0583a81e70b6e70fca4ca2d06359719eab4e9 | |
parent | eccba49e87ad32248a678d90cfdf355ffd97015d (diff) | |
download | otp-59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893.tar.gz otp-59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893.tar.bz2 otp-59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893.zip |
Implement ignorefd for TCP
Ignore fd is a feature used by sendfile to temporarily remove
all driver_select calls on that fd so that another driver can
select on it. It also delays all actions which sends or receives
data in that fd until in the fd is no longer ignored.
Only the controlling_process should use the feature as it is otherwise
possible that the ignore will never be cleaned up and hence create
a memory leak in the driver.
An ignored driver will not detect that an fd has been closed until
it is unignored.
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 67 | ||||
-rw-r--r-- | erts/preloaded/src/prim_inet.erl | 23 | ||||
-rw-r--r-- | lib/kernel/src/gen_tcp.erl | 7 | ||||
-rw-r--r-- | lib/kernel/src/inet_int.hrl | 2 |
4 files changed, 74 insertions, 25 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 1fe9e04341..56799555a9 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -445,6 +445,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) driver_select(port, e, mode | (on?ERL_DRV_USE:0), on) #define sock_select(d, flags, onoff) do { \ + ASSERT(!onoff || !(d)->is_ignored); \ (d)->event_mask = (onoff) ? \ ((d)->event_mask | (flags)) : \ ((d)->event_mask & ~(flags)); \ @@ -538,6 +539,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_REQ_GETIFADDRS 25 #define INET_REQ_ACCEPT 26 #define INET_REQ_LISTEN 27 +#define INET_REQ_IGNOREFD 28 + /* TCP requests */ /* #define TCP_REQ_ACCEPT 40 MOVED */ /* #define TCP_REQ_LISTEN 41 MERGED */ @@ -725,6 +728,11 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) /* Max interface name */ #define INET_IFNAMSIZ 16 +/* INET Ignore states */ +#define INET_IGNORE_NONE 0 +#define INET_IGNORE_READ 1 +#define INET_IGNORE_WRITE 1 << 1 + /* Max length of Erlang Term Buffer (for outputting structured terms): */ #ifdef HAVE_SCTP #define PACKET_ERL_DRV_TERM_DATA_LEN 512 @@ -864,6 +872,9 @@ typedef struct { double send_avg; /* average packet size sent */ subs_list empty_out_q_subs; /* Empty out queue subscribers */ + int is_ignored; /* if a fd is ignored by from the inet_drv, + this should be set to true when the fd is used + outside of inet_drv. */ } inet_descriptor; @@ -7302,6 +7313,8 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) sys_memzero((char *)&desc->remote,sizeof(desc->remote)); + desc->is_ignored = 0; + return (ErlDrvData)desc; } @@ -7584,6 +7597,33 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len, return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); } + case INET_REQ_IGNOREFD: { + DEBUGF(("inet_ctl(%ld): IGNOREFD, IGNORED = %d\r\n", + (long)desc->port,(int)*buf)); + + /* + * FD can only be ignored for connected TCP connections for now, + * possible to add UDP and SCTP support if needed. + */ + if (!IS_CONNECTED(desc)) + return ctl_error(ENOTCONN, rbuf, rsize); + + if (!desc->stype == SOCK_STREAM) + return ctl_error(EINVAL, rbuf, rsize); + + if (*buf == 1 && !desc->is_ignored) { + desc->is_ignored = INET_IGNORE_READ; + sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0); + } else if (*buf == 0 && desc->is_ignored) { + int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0)); + desc->is_ignored = INET_IGNORE_NONE; + sock_select(desc, flags, 1); + } else + return ctl_error(EINVAL, rbuf, rsize); + + return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); + } + #ifndef VXWORKS case INET_REQ_GETSERVBYNAME: { /* L1 Name-String L2 Proto-String */ @@ -7959,6 +7999,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, char** rbuf, int rsize) { tcp_descriptor* desc = (tcp_descriptor*)e; + switch(cmd) { case INET_REQ_OPEN: { /* open socket and return internal index */ int domain; @@ -8224,13 +8265,14 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len, if (enq_async(INETP(desc), tbuf, TCP_REQ_RECV) < 0) return ctl_error(EALREADY, rbuf, rsize); - if (tcp_recv(desc, n) == 0) { + if (INETP(desc)->is_ignored || tcp_recv(desc, n) == 0) { if (timeout == 0) async_error_am(INETP(desc), am_timeout); else { if (timeout != INET_INFINITY) - driver_set_timer(desc->inet.port, timeout); - sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); + driver_set_timer(desc->inet.port, timeout); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_READ|FD_CLOSE),1); } } return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize); @@ -8970,6 +9012,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) #ifdef DEBUG long port = (long) desc->inet.port; /* Used after driver_exit() */ #endif + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_input(%ld) {s=%d\r\n", port, desc->inet.s)); if (desc->inet.state == INET_STATE_ACCEPTING) { SOCKET s; @@ -9231,7 +9274,11 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov, vsize, &n, 0))) { @@ -9259,7 +9306,8 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev) DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n", (long)desc->inet.port, desc->inet.s)); driver_enqv(ix, ev, n); - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9324,7 +9372,10 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) DEBUGF(("tcp_send(%ld): s=%d, about to send %d,%d bytes\r\n", (long)desc->inet.port, desc->inet.s, h_len, len)); - if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { + if (INETP(desc)->is_ignored) { + INETP(desc)->is_ignored |= INET_IGNORE_WRITE; + n = 0; + } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) { sock_send(desc->inet.s, buf, 0, 0); n = 0; } else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,2,&n,0))) { @@ -9355,7 +9406,8 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len) n -= h_len; driver_enq(ix, ptr+n, len-n); } - sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); + if (!INETP(desc)->is_ignored) + sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1); } return 0; } @@ -9379,6 +9431,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event) int ret = 0; ErlDrvPort ix = desc->inet.port; + ASSERT(!INETP(desc)->is_ignored); DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n", (long)desc->inet.port, desc->inet.s)); if (desc->inet.state == INET_STATE_CONNECTING) { diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index 015930c0c0..0cedd284db 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -36,7 +36,7 @@ -export([recvfrom/2, recvfrom/3]). -export([setopt/3, setopts/2, getopt/2, getopts/2, is_sockopt_val/2]). -export([chgopt/3, chgopts/2]). --export([getstat/2, getfd/1, stealfd/1, returnfd/1, +-export([getstat/2, getfd/1, ignorefd/2, getindex/1, getstatus/1, gettype/1, getifaddrs/1, getiflist/1, ifget/3, ifset/3, gethostname/1]). @@ -843,25 +843,18 @@ getfd(S) when is_port(S) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% -%% STEALFD(insock()) -> {ok,integer()} | {error, Reason} +%% IGNOREFD(insock(),boolean()) -> {ok,integer()} | {error, Reason} %% %% steal internal file descriptor %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -stealfd(S) when is_port(S) -> - getfd(S). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% -%% RETURNFD(insock()) -> {ok,integer()} | {error, Reason} -%% -%% return internal file descriptor -%% -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -returnfd(S) when is_port(S) -> - ok. +ignorefd(S,Bool) when is_port(S) -> + Val = if Bool -> 1; true -> 0 end, + case ctl_cmd(S, ?INET_REQ_IGNOREFD, [Val]) of + {ok, _} -> ok; + Error -> Error + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl index 26afed4ff9..3f1e432f7a 100644 --- a/lib/kernel/src/gen_tcp.erl +++ b/lib/kernel/src/gen_tcp.erl @@ -420,15 +420,16 @@ mod([], Address) -> sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) when is_port(Sock) -> - {ok,SockFd} = prim_inet:stealfd(Sock), + ok = prim_inet:ignorefd(Sock,true), + {ok, SockFd} = prim_inet:getfd(Sock), case Mod:sendfile(Fd, SockFd, Offset, Bytes, ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync) of {error, enotsup} -> - prim_inet:returnfd(Sock), + ok = prim_inet:ignorefd(Sock,false), sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize, Headers, Trailers); Else -> - prim_inet:returnfd(Sock), + ok = prim_inet:ignorefd(Sock,false), Else end; sendfile(_,_,_,_,_,_,_,_,_,_) -> diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl index f8984b13fe..cf893c73eb 100644 --- a/lib/kernel/src/inet_int.hrl +++ b/lib/kernel/src/inet_int.hrl @@ -85,6 +85,8 @@ -define(INET_REQ_GETIFADDRS, 25). -define(INET_REQ_ACCEPT, 26). -define(INET_REQ_LISTEN, 27). +-define(INET_REQ_IGNOREFD, 28). + %% TCP requests %%-define(TCP_REQ_ACCEPT, 40). MOVED %%-define(TCP_REQ_LISTEN, 41). MERGED |