diff options
author | Steve Vinoski <[email protected]> | 2013-08-27 11:42:00 -0400 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2013-09-23 17:23:14 +0200 |
commit | 7fbf2c26ac063988818230a0e18a9df48c2fbf2d (patch) | |
tree | 8ee574a718eb61d7d99133e47a2a37bc1d94e115 /erts | |
parent | cc3c34fce28062afcaadc288b69939ced8fd8cde (diff) | |
download | otp-7fbf2c26ac063988818230a0e18a9df48c2fbf2d.tar.gz otp-7fbf2c26ac063988818230a0e18a9df48c2fbf2d.tar.bz2 otp-7fbf2c26ac063988818230a0e18a9df48c2fbf2d.zip |
add {active,N} socket option for TCP, UDP, and SCTP
Add the {active,N} socket option, where N is an integer in the range
-32768..32767, to allow a caller to specify the number of data messages to
be delivered to the controlling process. Once the socket's delivered
message count either reaches 0 or is explicitly set to 0 with
inet:setopts/2 or by including {active,0} as an option when the socket is
created, the socket transitions to passive ({active, false}) mode and the
socket's controlling process receives a message to inform it of the
transition. TCP sockets receive {tcp_passive,Socket}, UDP sockets receive
{udp_passive,Socket} and SCTP sockets receive {sctp_passive,Socket}.
The socket's delivered message counter defaults to 0, but it can be set
using {active,N} via any gen_tcp, gen_udp, or gen_sctp function that takes
socket options as arguments, or via inet:setopts/2. New N values are added
to the socket's current counter value, and negative numbers can be used to
reduce the counter value. Specifying a number that would cause the socket's
counter value to go above 32767 causes an einval error. If a negative
number is specified such that the counter value would become negative, the
socket's counter value is set to 0 and the socket transitions to passive
mode. If the counter value is already 0 and inet:setopts(Socket,
[{active,0}]) is specified, the counter value remains at 0 but the
appropriate passive mode transition message is generated for the socket.
This commit contains a modified preloaded prim_inet.beam due to changes in
prim_inet.erl.
Add tests for {active,N} mode for TCP, UDP, and SCTP sockets.
Add documentation for {active,N} mode for inet, gen_tcp, gen_udp, and
gen_sctp.
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 119 | ||||
-rw-r--r-- | erts/preloaded/src/prim_inet.erl | 20 |
2 files changed, 125 insertions, 14 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 60db50e80a..7dcb191a83 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -86,6 +86,13 @@ #endif typedef unsigned long long llu_t; +#ifndef INT16_MIN +#define INT16_MIN (-32768) +#endif +#ifndef INT16_MAX +#define INT16_MAX (32767) +#endif + #ifdef __WIN32__ #define STRNCASECMP strncasecmp @@ -581,6 +588,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_PASSIVE 0 /* false */ #define INET_ACTIVE 1 /* true */ #define INET_ONCE 2 /* true; active once then passive */ +#define INET_MULTI 3 /* true; active N then passive */ /* INET_REQ_GETSTATUS enumeration */ #define INET_F_OPEN 0x0001 @@ -925,6 +933,7 @@ typedef struct { inet_async_op op_queue[INET_MAX_ASYNC]; /* call queue */ int active; /* 0 = passive, 1 = active, 2 = active once */ + Sint16 active_count; /* counter for {active,N} */ int stype; /* socket type: SOCK_STREAM/SOCK_DGRAM/SOCK_SEQPACKET */ int sprotocol; /* socket protocol: @@ -1160,22 +1169,36 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event); static int async_ref = 0; /* async reference id generator */ #define NEW_ASYNC_ID() ((async_ref++) & 0xffff) +/* check for transition from active to passive */ +#define INET_CHECK_ACTIVE_TO_PASSIVE(inet) \ + do { \ + if ((inet)->active == INET_ONCE) \ + (inet)->active = INET_PASSIVE; \ + else if ((inet)->active == INET_MULTI && --((inet)->active_count) == 0) { \ + (inet)->active = INET_PASSIVE; \ + packet_passive_message(inet); \ + } \ + } while (0) static ErlDrvTermData am_ok; static ErlDrvTermData am_tcp; static ErlDrvTermData am_udp; static ErlDrvTermData am_error; +static ErlDrvTermData am_einval; static ErlDrvTermData am_inet_async; static ErlDrvTermData am_inet_reply; static ErlDrvTermData am_timeout; static ErlDrvTermData am_closed; +static ErlDrvTermData am_tcp_passive; static ErlDrvTermData am_tcp_closed; static ErlDrvTermData am_tcp_error; +static ErlDrvTermData am_udp_passive; static ErlDrvTermData am_udp_error; static ErlDrvTermData am_empty_out_q; static ErlDrvTermData am_ssl_tls; #ifdef HAVE_SCTP static ErlDrvTermData am_sctp; +static ErlDrvTermData am_sctp_passive; static ErlDrvTermData am_sctp_error; static ErlDrvTermData am_true; static ErlDrvTermData am_false; @@ -1185,6 +1208,7 @@ static ErlDrvTermData am_list; static ErlDrvTermData am_binary; static ErlDrvTermData am_active; static ErlDrvTermData am_once; +static ErlDrvTermData am_multi; static ErlDrvTermData am_buffer; static ErlDrvTermData am_linger; static ErlDrvTermData am_recbuf; @@ -3337,6 +3361,34 @@ static int packet_binary_message } /* +** active mode message: send active-to-passive transition message +** {tcp_passive, S} or +** {udp_passive, S} or +** {sctp_passive, S} +*/ + static int packet_passive_message(inet_descriptor* desc) + { + ErlDrvTermData spec[6]; + int i = 0; + + DEBUGF(("packet_passive_message(%ld):\r\n", (long)desc->port)); + + if (desc->sprotocol == IPPROTO_TCP) + i = LOAD_ATOM(spec, i, am_tcp_passive); + else { +#ifdef HAVE_SCTP + i = LOAD_ATOM(spec, i, IS_SCTP(desc) ? am_sctp_passive : am_udp_passive); +#else + i = LOAD_ATOM(spec, i, am_udp_passive); +#endif + } + i = LOAD_PORT(spec, i, desc->dport); + i = LOAD_TUPLE(spec, i, 2); + ASSERT(i <= 6); + return erl_drv_output_term(desc->dport, spec, i); + } + +/* ** send active message {udp_error|sctp_error, S, Error} */ static int packet_error_message(udp_descriptor* udesc, int err) @@ -3378,7 +3430,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len) int code; const char* body = buf; int bodylen = len; - + packet_get_body(desc->inet.htype, &body, &bodylen); if (desc->inet.deliver == INET_DELIVER_PORT) { @@ -3396,8 +3448,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len) if (code < 0) return code; - if (desc->inet.active == INET_ONCE) - desc->inet.active = INET_PASSIVE; + INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc)); return code; } @@ -3424,8 +3475,7 @@ tcp_reply_binary_data(tcp_descriptor* desc, ErlDrvBinary* bin, int offs, int len } if (code < 0) return code; - if (desc->inet.active == INET_ONCE) - desc->inet.active = INET_PASSIVE; + INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc)); return code; } @@ -3448,8 +3498,7 @@ packet_reply_binary_data(inet_descriptor* desc, unsigned int hsz, code = packet_binary_message(desc, bin, offs, len, extra); if (code < 0) return code; - if (desc->active == INET_ONCE) - desc->active = INET_PASSIVE; + INET_CHECK_ACTIVE_TO_PASSIVE(desc); return code; } } @@ -3494,6 +3543,7 @@ sock_init(void) /* May be called multiple times. */ #ifdef HAVE_SCTP static void inet_init_sctp(void) { INIT_ATOM(sctp); + INIT_ATOM(sctp_passive); INIT_ATOM(sctp_error); INIT_ATOM(true); INIT_ATOM(false); @@ -3503,6 +3553,7 @@ static void inet_init_sctp(void) { INIT_ATOM(binary); INIT_ATOM(active); INIT_ATOM(once); + INIT_ATOM(multi); INIT_ATOM(buffer); INIT_ATOM(linger); INIT_ATOM(recbuf); @@ -3628,12 +3679,15 @@ static int inet_init() INIT_ATOM(tcp); INIT_ATOM(udp); INIT_ATOM(error); + INIT_ATOM(einval); INIT_ATOM(inet_async); INIT_ATOM(inet_reply); INIT_ATOM(timeout); INIT_ATOM(closed); + INIT_ATOM(tcp_passive); INIT_ATOM(tcp_closed); INIT_ATOM(tcp_error); + INIT_ATOM(udp_passive); INIT_ATOM(udp_error); INIT_ATOM(empty_out_q); INIT_ATOM(ssl_tls); @@ -5508,8 +5562,25 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) case INET_LOPT_ACTIVE: DEBUGF(("inet_set_opts(%ld): s=%d, ACTIVE=%d\r\n", - (long)desc->port, desc->s,ival)); + (long)desc->port, desc->s, ival)); desc->active = ival; + if (desc->active == INET_MULTI) { + long ac = desc->active_count; + Sint16 nval = get_int16(ptr); + ptr += 2; + len -= 2; + ac += nval; + if (ac > INT16_MAX || ac < INT16_MIN) + return -1; + desc->active_count += nval; + if (desc->active_count < 0) + desc->active_count = 0; + if (desc->active_count == 0) { + desc->active = INET_PASSIVE; + packet_passive_message(desc); + } + } else + desc->active_count = 0; if ((desc->stype == SOCK_STREAM) && (desc->active != INET_PASSIVE) && (desc->state == INET_STATE_CLOSED)) { tcp_closed_message((tcp_descriptor *) desc); @@ -5820,7 +5891,8 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) /* XXX fprintf(stderr,"desc->htype == %d, old_htype == %d, desc->active == %d, old_active == %d\r\n",(int)desc->htype, (int) old_htype, (int) desc->active, (int) old_active );*/ - return 1+(desc->htype == old_htype && desc->active == INET_ONCE); + return 1+(desc->htype == old_htype && + (desc->active == INET_ONCE || desc->active == INET_MULTI)); } return 0; } @@ -5953,6 +6025,21 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) case INET_LOPT_ACTIVE: desc->active = get_int32(curr); curr += 4; + if (desc->active == INET_MULTI) { + long ac = desc->active_count; + Sint16 nval = get_int16(curr); curr += 2; + ac += nval; + if (ac > INT16_MAX || ac < INT16_MIN) + return -1; + desc->active_count += nval; + if (desc->active_count < 0) + desc->active_count = 0; + if (desc->active_count == 0) { + desc->active = INET_PASSIVE; + packet_passive_message(desc); + } + } else + desc->active_count = 0; res = 0; continue; @@ -6475,6 +6562,11 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, case INET_LOPT_ACTIVE: *ptr++ = opt; put_int32(desc->active, ptr); + if (desc->active == INET_MULTI) { + PLACE_FOR(2,ptr); + put_int16(desc->active_count, ptr); + ptr += 2; + } continue; case INET_LOPT_PACKET: *ptr++ = opt; @@ -6847,7 +6939,10 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, } case INET_LOPT_ACTIVE: { - PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT); + if (desc->active == INET_MULTI) + PLACE_FOR(spec, i, LOAD_ATOM_CNT + LOAD_INT_CNT + LOAD_TUPLE_CNT); + else + PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT); i = LOAD_ATOM (spec, i, am_active); switch (desc->active) { @@ -6860,6 +6955,9 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, case INET_ONCE : { i = LOAD_ATOM (spec, i, am_once); break; } + case INET_MULTI : + { i = LOAD_INT(spec, i, desc->active_count); break; } + default: ASSERT (0); } i = LOAD_TUPLE (spec, i, 2); @@ -7656,6 +7754,7 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) socket */ desc->deliver = INET_DELIVER_TERM; /* standard term format */ desc->active = INET_PASSIVE; /* start passive */ + desc->active_count = 0; desc->oph = NULL; desc->opt = NULL; diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index fa621681f3..69eed716ff 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -1237,7 +1237,8 @@ type_opt_1(buffer) -> int; type_opt_1(active) -> {enum,[{false, ?INET_PASSIVE}, {true, ?INET_ACTIVE}, - {once, ?INET_ONCE}]}; + {once, ?INET_ONCE}, + {multi, ?INET_MULTI}]}; type_opt_1(packet) -> {enum,[{0, ?TCP_PB_RAW}, {1, ?TCP_PB_1}, @@ -1716,11 +1717,14 @@ encode_opt_val(Opts) -> Reason -> {error,Reason} end. +%% {active, once} and {active, N} are specially optimized because they will +%% be used for every packet or every N packets, not only once when +%% initializing the socket. Measurements show that this optimization is +%% worthwhile. enc_opt_val([{active,once}|Opts], Acc) -> - %% Specially optimized because {active,once} will be used for - %% every packet, not only once when initializing the socket. - %% Measurements show that this optimization is worthwhile. enc_opt_val(Opts, [<<?INET_LOPT_ACTIVE:8,?INET_ONCE:32>>|Acc]); +enc_opt_val([{active,N}|Opts], Acc) when is_integer(N), N < 32768, N >= -32768 -> + enc_opt_val(Opts, [<<?INET_LOPT_ACTIVE:8,?INET_MULTI:32,N:16>>|Acc]); enc_opt_val([{raw,P,O,B}|Opts], Acc) -> enc_opt_val(Opts, Acc, raw, {P,O,B}); enc_opt_val([{Opt,Val}|Opts], Acc) -> @@ -1810,6 +1814,14 @@ dec_opt_val([]) -> []. dec_opt_val(Buf, raw, Type) -> {{P,O,B},T} = dec_value(Type, Buf), [{raw,P,O,B}|dec_opt_val(T)]; +dec_opt_val(Buf, active, Type) -> + case dec_value(Type, Buf) of + {multi,[M0,M1|T]} -> + <<N:16>> = list_to_binary([M0,M1]), + [{active,N}|dec_opt_val(T)]; + {Val,T} -> + [{active,Val}|dec_opt_val(T)] + end; dec_opt_val(Buf, Opt, Type) -> {Val,T} = dec_value(Type, Buf), [{Opt,Val}|dec_opt_val(T)]. |