From 7fbf2c26ac063988818230a0e18a9df48c2fbf2d Mon Sep 17 00:00:00 2001 From: Steve Vinoski Date: Tue, 27 Aug 2013 11:42:00 -0400 Subject: add {active,N} socket option for TCP, UDP, and SCTP Add the {active,N} socket option, where N is an integer in the range -32768..32767, to allow a caller to specify the number of data messages to be delivered to the controlling process. Once the socket's delivered message count either reaches 0 or is explicitly set to 0 with inet:setopts/2 or by including {active,0} as an option when the socket is created, the socket transitions to passive ({active, false}) mode and the socket's controlling process receives a message to inform it of the transition. TCP sockets receive {tcp_passive,Socket}, UDP sockets receive {udp_passive,Socket} and SCTP sockets receive {sctp_passive,Socket}. The socket's delivered message counter defaults to 0, but it can be set using {active,N} via any gen_tcp, gen_udp, or gen_sctp function that takes socket options as arguments, or via inet:setopts/2. New N values are added to the socket's current counter value, and negative numbers can be used to reduce the counter value. Specifying a number that would cause the socket's counter value to go above 32767 causes an einval error. If a negative number is specified such that the counter value would become negative, the socket's counter value is set to 0 and the socket transitions to passive mode. If the counter value is already 0 and inet:setopts(Socket, [{active,0}]) is specified, the counter value remains at 0 but the appropriate passive mode transition message is generated for the socket. This commit contains a modified preloaded prim_inet.beam due to changes in prim_inet.erl. Add tests for {active,N} mode for TCP, UDP, and SCTP sockets. Add documentation for {active,N} mode for inet, gen_tcp, gen_udp, and gen_sctp. --- erts/emulator/drivers/common/inet_drv.c | 119 +++++++++++++++++++++++++++++--- 1 file changed, 109 insertions(+), 10 deletions(-) (limited to 'erts/emulator/drivers/common/inet_drv.c') diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 60db50e80a..7dcb191a83 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -86,6 +86,13 @@ #endif typedef unsigned long long llu_t; +#ifndef INT16_MIN +#define INT16_MIN (-32768) +#endif +#ifndef INT16_MAX +#define INT16_MAX (32767) +#endif + #ifdef __WIN32__ #define STRNCASECMP strncasecmp @@ -581,6 +588,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_PASSIVE 0 /* false */ #define INET_ACTIVE 1 /* true */ #define INET_ONCE 2 /* true; active once then passive */ +#define INET_MULTI 3 /* true; active N then passive */ /* INET_REQ_GETSTATUS enumeration */ #define INET_F_OPEN 0x0001 @@ -925,6 +933,7 @@ typedef struct { inet_async_op op_queue[INET_MAX_ASYNC]; /* call queue */ int active; /* 0 = passive, 1 = active, 2 = active once */ + Sint16 active_count; /* counter for {active,N} */ int stype; /* socket type: SOCK_STREAM/SOCK_DGRAM/SOCK_SEQPACKET */ int sprotocol; /* socket protocol: @@ -1160,22 +1169,36 @@ static int packet_inet_output(udp_descriptor* udesc, HANDLE event); static int async_ref = 0; /* async reference id generator */ #define NEW_ASYNC_ID() ((async_ref++) & 0xffff) +/* check for transition from active to passive */ +#define INET_CHECK_ACTIVE_TO_PASSIVE(inet) \ + do { \ + if ((inet)->active == INET_ONCE) \ + (inet)->active = INET_PASSIVE; \ + else if ((inet)->active == INET_MULTI && --((inet)->active_count) == 0) { \ + (inet)->active = INET_PASSIVE; \ + packet_passive_message(inet); \ + } \ + } while (0) static ErlDrvTermData am_ok; static ErlDrvTermData am_tcp; static ErlDrvTermData am_udp; static ErlDrvTermData am_error; +static ErlDrvTermData am_einval; static ErlDrvTermData am_inet_async; static ErlDrvTermData am_inet_reply; static ErlDrvTermData am_timeout; static ErlDrvTermData am_closed; +static ErlDrvTermData am_tcp_passive; static ErlDrvTermData am_tcp_closed; static ErlDrvTermData am_tcp_error; +static ErlDrvTermData am_udp_passive; static ErlDrvTermData am_udp_error; static ErlDrvTermData am_empty_out_q; static ErlDrvTermData am_ssl_tls; #ifdef HAVE_SCTP static ErlDrvTermData am_sctp; +static ErlDrvTermData am_sctp_passive; static ErlDrvTermData am_sctp_error; static ErlDrvTermData am_true; static ErlDrvTermData am_false; @@ -1185,6 +1208,7 @@ static ErlDrvTermData am_list; static ErlDrvTermData am_binary; static ErlDrvTermData am_active; static ErlDrvTermData am_once; +static ErlDrvTermData am_multi; static ErlDrvTermData am_buffer; static ErlDrvTermData am_linger; static ErlDrvTermData am_recbuf; @@ -3336,6 +3360,34 @@ static int packet_binary_message return erl_drv_output_term(desc->dport, spec, i); } +/* +** active mode message: send active-to-passive transition message +** {tcp_passive, S} or +** {udp_passive, S} or +** {sctp_passive, S} +*/ + static int packet_passive_message(inet_descriptor* desc) + { + ErlDrvTermData spec[6]; + int i = 0; + + DEBUGF(("packet_passive_message(%ld):\r\n", (long)desc->port)); + + if (desc->sprotocol == IPPROTO_TCP) + i = LOAD_ATOM(spec, i, am_tcp_passive); + else { +#ifdef HAVE_SCTP + i = LOAD_ATOM(spec, i, IS_SCTP(desc) ? am_sctp_passive : am_udp_passive); +#else + i = LOAD_ATOM(spec, i, am_udp_passive); +#endif + } + i = LOAD_PORT(spec, i, desc->dport); + i = LOAD_TUPLE(spec, i, 2); + ASSERT(i <= 6); + return erl_drv_output_term(desc->dport, spec, i); + } + /* ** send active message {udp_error|sctp_error, S, Error} */ @@ -3378,7 +3430,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len) int code; const char* body = buf; int bodylen = len; - + packet_get_body(desc->inet.htype, &body, &bodylen); if (desc->inet.deliver == INET_DELIVER_PORT) { @@ -3396,8 +3448,7 @@ static int tcp_reply_data(tcp_descriptor* desc, char* buf, int len) if (code < 0) return code; - if (desc->inet.active == INET_ONCE) - desc->inet.active = INET_PASSIVE; + INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc)); return code; } @@ -3424,8 +3475,7 @@ tcp_reply_binary_data(tcp_descriptor* desc, ErlDrvBinary* bin, int offs, int len } if (code < 0) return code; - if (desc->inet.active == INET_ONCE) - desc->inet.active = INET_PASSIVE; + INET_CHECK_ACTIVE_TO_PASSIVE(INETP(desc)); return code; } @@ -3448,8 +3498,7 @@ packet_reply_binary_data(inet_descriptor* desc, unsigned int hsz, code = packet_binary_message(desc, bin, offs, len, extra); if (code < 0) return code; - if (desc->active == INET_ONCE) - desc->active = INET_PASSIVE; + INET_CHECK_ACTIVE_TO_PASSIVE(desc); return code; } } @@ -3494,6 +3543,7 @@ sock_init(void) /* May be called multiple times. */ #ifdef HAVE_SCTP static void inet_init_sctp(void) { INIT_ATOM(sctp); + INIT_ATOM(sctp_passive); INIT_ATOM(sctp_error); INIT_ATOM(true); INIT_ATOM(false); @@ -3503,6 +3553,7 @@ static void inet_init_sctp(void) { INIT_ATOM(binary); INIT_ATOM(active); INIT_ATOM(once); + INIT_ATOM(multi); INIT_ATOM(buffer); INIT_ATOM(linger); INIT_ATOM(recbuf); @@ -3628,12 +3679,15 @@ static int inet_init() INIT_ATOM(tcp); INIT_ATOM(udp); INIT_ATOM(error); + INIT_ATOM(einval); INIT_ATOM(inet_async); INIT_ATOM(inet_reply); INIT_ATOM(timeout); INIT_ATOM(closed); + INIT_ATOM(tcp_passive); INIT_ATOM(tcp_closed); INIT_ATOM(tcp_error); + INIT_ATOM(udp_passive); INIT_ATOM(udp_error); INIT_ATOM(empty_out_q); INIT_ATOM(ssl_tls); @@ -5508,8 +5562,25 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) case INET_LOPT_ACTIVE: DEBUGF(("inet_set_opts(%ld): s=%d, ACTIVE=%d\r\n", - (long)desc->port, desc->s,ival)); + (long)desc->port, desc->s, ival)); desc->active = ival; + if (desc->active == INET_MULTI) { + long ac = desc->active_count; + Sint16 nval = get_int16(ptr); + ptr += 2; + len -= 2; + ac += nval; + if (ac > INT16_MAX || ac < INT16_MIN) + return -1; + desc->active_count += nval; + if (desc->active_count < 0) + desc->active_count = 0; + if (desc->active_count == 0) { + desc->active = INET_PASSIVE; + packet_passive_message(desc); + } + } else + desc->active_count = 0; if ((desc->stype == SOCK_STREAM) && (desc->active != INET_PASSIVE) && (desc->state == INET_STATE_CLOSED)) { tcp_closed_message((tcp_descriptor *) desc); @@ -5820,7 +5891,8 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) /* XXX fprintf(stderr,"desc->htype == %d, old_htype == %d, desc->active == %d, old_active == %d\r\n",(int)desc->htype, (int) old_htype, (int) desc->active, (int) old_active );*/ - return 1+(desc->htype == old_htype && desc->active == INET_ONCE); + return 1+(desc->htype == old_htype && + (desc->active == INET_ONCE || desc->active == INET_MULTI)); } return 0; } @@ -5953,6 +6025,21 @@ static int sctp_set_opts(inet_descriptor* desc, char* ptr, int len) case INET_LOPT_ACTIVE: desc->active = get_int32(curr); curr += 4; + if (desc->active == INET_MULTI) { + long ac = desc->active_count; + Sint16 nval = get_int16(curr); curr += 2; + ac += nval; + if (ac > INT16_MAX || ac < INT16_MIN) + return -1; + desc->active_count += nval; + if (desc->active_count < 0) + desc->active_count = 0; + if (desc->active_count == 0) { + desc->active = INET_PASSIVE; + packet_passive_message(desc); + } + } else + desc->active_count = 0; res = 0; continue; @@ -6475,6 +6562,11 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, case INET_LOPT_ACTIVE: *ptr++ = opt; put_int32(desc->active, ptr); + if (desc->active == INET_MULTI) { + PLACE_FOR(2,ptr); + put_int16(desc->active_count, ptr); + ptr += 2; + } continue; case INET_LOPT_PACKET: *ptr++ = opt; @@ -6847,7 +6939,10 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, } case INET_LOPT_ACTIVE: { - PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT); + if (desc->active == INET_MULTI) + PLACE_FOR(spec, i, LOAD_ATOM_CNT + LOAD_INT_CNT + LOAD_TUPLE_CNT); + else + PLACE_FOR(spec, i, 2*LOAD_ATOM_CNT + LOAD_TUPLE_CNT); i = LOAD_ATOM (spec, i, am_active); switch (desc->active) { @@ -6860,6 +6955,9 @@ static ErlDrvSSizeT sctp_fill_opts(inet_descriptor* desc, case INET_ONCE : { i = LOAD_ATOM (spec, i, am_once); break; } + case INET_MULTI : + { i = LOAD_INT(spec, i, desc->active_count); break; } + default: ASSERT (0); } i = LOAD_TUPLE (spec, i, 2); @@ -7656,6 +7754,7 @@ static ErlDrvData inet_start(ErlDrvPort port, int size, int protocol) socket */ desc->deliver = INET_DELIVER_TERM; /* standard term format */ desc->active = INET_PASSIVE; /* start passive */ + desc->active_count = 0; desc->oph = NULL; desc->opt = NULL; -- cgit v1.2.3