aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2011-11-25 11:16:54 +0100
committerLukas Larsson <[email protected]>2011-12-01 14:10:04 +0100
commit59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893 (patch)
treec7a0583a81e70b6e70fca4ca2d06359719eab4e9
parenteccba49e87ad32248a678d90cfdf355ffd97015d (diff)
downloadotp-59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893.tar.gz
otp-59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893.tar.bz2
otp-59e7e345ba51b7c2d6c9e479ce4cbb7c745c7893.zip
Implement ignorefd for TCP
Ignore fd is a feature used by sendfile to temporarily remove all driver_select calls on that fd so that another driver can select on it. It also delays all actions which sends or receives data in that fd until in the fd is no longer ignored. Only the controlling_process should use the feature as it is otherwise possible that the ignore will never be cleaned up and hence create a memory leak in the driver. An ignored driver will not detect that an fd has been closed until it is unignored.
-rw-r--r--erts/emulator/drivers/common/inet_drv.c67
-rw-r--r--erts/preloaded/src/prim_inet.erl23
-rw-r--r--lib/kernel/src/gen_tcp.erl7
-rw-r--r--lib/kernel/src/inet_int.hrl2
4 files changed, 74 insertions, 25 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 1fe9e04341..56799555a9 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -445,6 +445,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
driver_select(port, e, mode | (on?ERL_DRV_USE:0), on)
#define sock_select(d, flags, onoff) do { \
+ ASSERT(!onoff || !(d)->is_ignored); \
(d)->event_mask = (onoff) ? \
((d)->event_mask | (flags)) : \
((d)->event_mask & ~(flags)); \
@@ -538,6 +539,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_REQ_GETIFADDRS 25
#define INET_REQ_ACCEPT 26
#define INET_REQ_LISTEN 27
+#define INET_REQ_IGNOREFD 28
+
/* TCP requests */
/* #define TCP_REQ_ACCEPT 40 MOVED */
/* #define TCP_REQ_LISTEN 41 MERGED */
@@ -725,6 +728,11 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
/* Max interface name */
#define INET_IFNAMSIZ 16
+/* INET Ignore states */
+#define INET_IGNORE_NONE 0
+#define INET_IGNORE_READ 1
+#define INET_IGNORE_WRITE 1 << 1
+
/* Max length of Erlang Term Buffer (for outputting structured terms): */
#ifdef HAVE_SCTP
#define PACKET_ERL_DRV_TERM_DATA_LEN 512
@@ -864,6 +872,9 @@ typedef struct {
double send_avg; /* average packet size sent */
subs_list empty_out_q_subs; /* Empty out queue subscribers */
+ int is_ignored; /* if a fd is ignored by from the inet_drv,
+ this should be set to true when the fd is used
+ outside of inet_drv. */
} inet_descriptor;
@@ -7302,6 +7313,8 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol)
sys_memzero((char *)&desc->remote,sizeof(desc->remote));
+ desc->is_ignored = 0;
+
return (ErlDrvData)desc;
}
@@ -7584,6 +7597,33 @@ static int inet_ctl(inet_descriptor* desc, int cmd, char* buf, int len,
return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize);
}
+ case INET_REQ_IGNOREFD: {
+ DEBUGF(("inet_ctl(%ld): IGNOREFD, IGNORED = %d\r\n",
+ (long)desc->port,(int)*buf));
+
+ /*
+ * FD can only be ignored for connected TCP connections for now,
+ * possible to add UDP and SCTP support if needed.
+ */
+ if (!IS_CONNECTED(desc))
+ return ctl_error(ENOTCONN, rbuf, rsize);
+
+ if (!desc->stype == SOCK_STREAM)
+ return ctl_error(EINVAL, rbuf, rsize);
+
+ if (*buf == 1 && !desc->is_ignored) {
+ desc->is_ignored = INET_IGNORE_READ;
+ sock_select(desc, (FD_READ|FD_WRITE|FD_CLOSE|ERL_DRV_USE_NO_CALLBACK), 0);
+ } else if (*buf == 0 && desc->is_ignored) {
+ int flags = (FD_READ|FD_CLOSE|((desc->is_ignored & INET_IGNORE_WRITE)?FD_WRITE:0));
+ desc->is_ignored = INET_IGNORE_NONE;
+ sock_select(desc, flags, 1);
+ } else
+ return ctl_error(EINVAL, rbuf, rsize);
+
+ return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
+ }
+
#ifndef VXWORKS
case INET_REQ_GETSERVBYNAME: { /* L1 Name-String L2 Proto-String */
@@ -7959,6 +7999,7 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len,
char** rbuf, int rsize)
{
tcp_descriptor* desc = (tcp_descriptor*)e;
+
switch(cmd) {
case INET_REQ_OPEN: { /* open socket and return internal index */
int domain;
@@ -8224,13 +8265,14 @@ static int tcp_inet_ctl(ErlDrvData e, unsigned int cmd, char* buf, int len,
if (enq_async(INETP(desc), tbuf, TCP_REQ_RECV) < 0)
return ctl_error(EALREADY, rbuf, rsize);
- if (tcp_recv(desc, n) == 0) {
+ if (INETP(desc)->is_ignored || tcp_recv(desc, n) == 0) {
if (timeout == 0)
async_error_am(INETP(desc), am_timeout);
else {
if (timeout != INET_INFINITY)
- driver_set_timer(desc->inet.port, timeout);
- sock_select(INETP(desc),(FD_READ|FD_CLOSE),1);
+ driver_set_timer(desc->inet.port, timeout);
+ if (!INETP(desc)->is_ignored)
+ sock_select(INETP(desc),(FD_READ|FD_CLOSE),1);
}
}
return ctl_reply(INET_REP_OK, tbuf, 2, rbuf, rsize);
@@ -8970,6 +9012,7 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event)
#ifdef DEBUG
long port = (long) desc->inet.port; /* Used after driver_exit() */
#endif
+ ASSERT(!INETP(desc)->is_ignored);
DEBUGF(("tcp_inet_input(%ld) {s=%d\r\n", port, desc->inet.s));
if (desc->inet.state == INET_STATE_ACCEPTING) {
SOCKET s;
@@ -9231,7 +9274,11 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
DEBUGF(("tcp_sendv(%ld): s=%d, about to send %d,%d bytes\r\n",
(long)desc->inet.port, desc->inet.s, h_len, len));
- if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) {
+
+ if (INETP(desc)->is_ignored) {
+ INETP(desc)->is_ignored |= INET_IGNORE_WRITE;
+ n = 0;
+ } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) {
n = 0;
} else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s, ev->iov,
vsize, &n, 0))) {
@@ -9259,7 +9306,8 @@ static int tcp_sendv(tcp_descriptor* desc, ErlIOVec* ev)
DEBUGF(("tcp_sendv(%ld): s=%d, Send failed, queuing\r\n",
(long)desc->inet.port, desc->inet.s));
driver_enqv(ix, ev, n);
- sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
+ if (!INETP(desc)->is_ignored)
+ sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
}
return 0;
}
@@ -9324,7 +9372,10 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len)
DEBUGF(("tcp_send(%ld): s=%d, about to send %d,%d bytes\r\n",
(long)desc->inet.port, desc->inet.s, h_len, len));
- if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) {
+ if (INETP(desc)->is_ignored) {
+ INETP(desc)->is_ignored |= INET_IGNORE_WRITE;
+ n = 0;
+ } else if (desc->tcp_add_flags & TCP_ADDF_DELAY_SEND) {
sock_send(desc->inet.s, buf, 0, 0);
n = 0;
} else if (IS_SOCKET_ERROR(sock_sendv(desc->inet.s,iov,2,&n,0))) {
@@ -9355,7 +9406,8 @@ static int tcp_send(tcp_descriptor* desc, char* ptr, int len)
n -= h_len;
driver_enq(ix, ptr+n, len-n);
}
- sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
+ if (!INETP(desc)->is_ignored)
+ sock_select(INETP(desc),(FD_WRITE|FD_CLOSE), 1);
}
return 0;
}
@@ -9379,6 +9431,7 @@ static int tcp_inet_output(tcp_descriptor* desc, HANDLE event)
int ret = 0;
ErlDrvPort ix = desc->inet.port;
+ ASSERT(!INETP(desc)->is_ignored);
DEBUGF(("tcp_inet_output(%ld) {s=%d\r\n",
(long)desc->inet.port, desc->inet.s));
if (desc->inet.state == INET_STATE_CONNECTING) {
diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl
index 015930c0c0..0cedd284db 100644
--- a/erts/preloaded/src/prim_inet.erl
+++ b/erts/preloaded/src/prim_inet.erl
@@ -36,7 +36,7 @@
-export([recvfrom/2, recvfrom/3]).
-export([setopt/3, setopts/2, getopt/2, getopts/2, is_sockopt_val/2]).
-export([chgopt/3, chgopts/2]).
--export([getstat/2, getfd/1, stealfd/1, returnfd/1,
+-export([getstat/2, getfd/1, ignorefd/2,
getindex/1, getstatus/1, gettype/1,
getifaddrs/1, getiflist/1, ifget/3, ifset/3,
gethostname/1]).
@@ -843,25 +843,18 @@ getfd(S) when is_port(S) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
-%% STEALFD(insock()) -> {ok,integer()} | {error, Reason}
+%% IGNOREFD(insock(),boolean()) -> {ok,integer()} | {error, Reason}
%%
%% steal internal file descriptor
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-stealfd(S) when is_port(S) ->
- getfd(S).
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%%
-%% RETURNFD(insock()) -> {ok,integer()} | {error, Reason}
-%%
-%% return internal file descriptor
-%%
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
-returnfd(S) when is_port(S) ->
- ok.
+ignorefd(S,Bool) when is_port(S) ->
+ Val = if Bool -> 1; true -> 0 end,
+ case ctl_cmd(S, ?INET_REQ_IGNOREFD, [Val]) of
+ {ok, _} -> ok;
+ Error -> Error
+ end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl
index 26afed4ff9..3f1e432f7a 100644
--- a/lib/kernel/src/gen_tcp.erl
+++ b/lib/kernel/src/gen_tcp.erl
@@ -420,15 +420,16 @@ mod([], Address) ->
sendfile(#file_descriptor{ module = Mod } = Fd, Sock, Offset, Bytes,
ChunkSize, Headers, Trailers, Nodiskio, MNowait, Sync)
when is_port(Sock) ->
- {ok,SockFd} = prim_inet:stealfd(Sock),
+ ok = prim_inet:ignorefd(Sock,true),
+ {ok, SockFd} = prim_inet:getfd(Sock),
case Mod:sendfile(Fd, SockFd, Offset, Bytes, ChunkSize, Headers, Trailers,
Nodiskio, MNowait, Sync) of
{error, enotsup} ->
- prim_inet:returnfd(Sock),
+ ok = prim_inet:ignorefd(Sock,false),
sendfile_fallback(Fd, Sock, Offset, Bytes, ChunkSize,
Headers, Trailers);
Else ->
- prim_inet:returnfd(Sock),
+ ok = prim_inet:ignorefd(Sock,false),
Else
end;
sendfile(_,_,_,_,_,_,_,_,_,_) ->
diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl
index f8984b13fe..cf893c73eb 100644
--- a/lib/kernel/src/inet_int.hrl
+++ b/lib/kernel/src/inet_int.hrl
@@ -85,6 +85,8 @@
-define(INET_REQ_GETIFADDRS, 25).
-define(INET_REQ_ACCEPT, 26).
-define(INET_REQ_LISTEN, 27).
+-define(INET_REQ_IGNOREFD, 28).
+
%% TCP requests
%%-define(TCP_REQ_ACCEPT, 40). MOVED
%%-define(TCP_REQ_LISTEN, 41). MERGED