diff options
author | Raimo Niskanen <[email protected]> | 2011-09-09 17:40:29 +0200 |
---|---|---|
committer | Raimo Niskanen <[email protected]> | 2011-11-17 12:11:03 +0100 |
commit | 6dd80f3e9a556dc94ae59280f532e153e463b798 (patch) | |
tree | b57e338c7eaea69e9ddc7a833930d7c482b0a35e | |
parent | 5895bb8d7bde52972f74f6b51748230982f762f7 (diff) | |
download | otp-6dd80f3e9a556dc94ae59280f532e153e463b798.tar.gz otp-6dd80f3e9a556dc94ae59280f532e153e463b798.tar.bz2 otp-6dd80f3e9a556dc94ae59280f532e153e463b798.zip |
erts,kernel: Bugfix - collect fragmented SCTP messages on recv
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 136 | ||||
-rw-r--r-- | lib/kernel/test/gen_sctp_SUITE.erl | 53 |
2 files changed, 130 insertions, 59 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 283301f166..79422dc54c 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -683,7 +683,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #endif -#define BIN_REALLOC_LIMIT(x) (((x)*3)/4) /* 75% */ +#define BIN_REALLOC_MARGIN(x) ((x)/4) /* 25% */ /* The general purpose sockaddr */ typedef union { @@ -990,6 +990,9 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event); typedef struct { inet_descriptor inet; /* common data structure (DON'T MOVE) */ int read_packets; /* Number of packets to read per invocation */ + int i_bufsz; /* current input buffer size */ + ErlDrvBinary* i_buf; /* current binary buffer */ + char* i_ptr; /* current pos in buf */ } udp_descriptor; @@ -9517,6 +9520,9 @@ static ErlDrvData packet_inet_start(ErlDrvPort port, char* args, int protocol) return ERL_DRV_ERROR_ERRNO; desc->read_packets = INET_PACKET_POLL; + desc->i_bufsz = 0; + desc->i_buf = NULL; + desc->i_ptr = NULL; return drvd; } @@ -9541,6 +9547,10 @@ static void packet_inet_stop(ErlDrvData e) */ udp_descriptor * udesc = (udp_descriptor*) e; inet_descriptor* descr = INETP(udesc); + if (udesc->i_buf != NULL) { + release_buffer(udesc->i_buf); + udesc->i_buf = NULL; + } ASSERT(NO_SUBSCRIBERS(&(descr->empty_out_q_subs))); inet_stop(descr); @@ -10075,39 +10085,50 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) { inet_descriptor* desc = INETP(udesc); int n; - unsigned int len; inet_address other; char abuf[sizeof(inet_address)]; /* buffer address; enough??? */ - int sz; - char* ptr; - ErlDrvBinary* buf; /* binary */ int packet_count = udesc->read_packets; int count = 0; /* number of packets delivered to owner */ #ifdef HAVE_SCTP struct msghdr mhdr; /* Top-level msg structure */ struct iovec iov[1]; /* Data or Notification Event */ char ancd[SCTP_ANC_BUFF_SIZE]; /* Ancillary Data */ - int short_recv = 0; #endif while(packet_count--) { - len = sizeof(other); - sz = desc->bufsz; - /* Allocate space for message and address. NB: "bufsz" is in "desc", - but the "buf" itself is allocated separately: - */ - if ((buf = alloc_buffer(sz+len)) == NULL) - return packet_error(udesc, ENOMEM); - ptr = buf->orig_bytes + len; /* pointer to message part */ + unsigned int len = sizeof(other); + + /* udesc->i_buf is only kept between SCTP fragments */ + if (udesc->i_buf == NULL) { + udesc->i_bufsz = desc->bufsz + len; + if ((udesc->i_buf = alloc_buffer(udesc->i_bufsz)) == NULL) + return packet_error(udesc, ENOMEM); + /* pointer to message start */ + udesc->i_ptr = udesc->i_buf->orig_bytes + len; + } else { + ErlDrvBinary* tmp; + int bufsz; + bufsz = desc->bufsz + (udesc->i_ptr - udesc->i_buf->orig_bytes); + if ((tmp = realloc_buffer(udesc->i_buf, bufsz)) == NULL) { + release_buffer(udesc->i_buf); + udesc->i_buf = NULL; + return packet_error(udesc, ENOMEM); + } else { + udesc->i_ptr = + tmp->orig_bytes + (udesc->i_ptr - udesc->i_buf->orig_bytes); + udesc->i_buf = tmp; + udesc->i_bufsz = bufsz; + } + } /* Note: On Windows NT, recvfrom() fails if the socket is connected. */ #ifdef HAVE_SCTP /* For SCTP we must use recvmsg() */ if (IS_SCTP(desc)) { - iov->iov_base = ptr; /* Data will come here */ - iov->iov_len = sz; /* Remaining buffer space */ + iov->iov_base = udesc->i_ptr; /* Data will come here */ + iov->iov_len = desc->bufsz; /* Remaining buffer space */ - mhdr.msg_name = &other; /* Peer addr comes into "other" */ + mhdr.msg_name = &other; /* Peer addr comes into "other" */ mhdr.msg_namelen = len; mhdr.msg_iov = iov; mhdr.msg_iovlen = 1; @@ -10117,42 +10138,31 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) /* Do the actual SCTP receive: */ n = sock_recvmsg(desc->s, &mhdr, 0); + len = mhdr.msg_namelen; goto check_result; } #endif /* Use recv() instead on connected sockets. */ if ((desc->state & INET_F_ACTIVE)) { - n = sock_recv(desc->s, ptr, sz, 0); + n = sock_recv(desc->s, udesc->i_ptr, desc->bufsz, 0); other = desc->remote; } - else - n = sock_recvfrom(desc->s, ptr, sz, 0, &other.sa, &len); + else { + n = sock_recvfrom(desc->s, udesc->i_ptr, desc->bufsz, + 0, &other.sa, &len); + } #ifdef HAVE_SCTP check_result: #endif /* Analyse the result: */ - if (IS_SOCKET_ERROR(n) -#ifdef HAVE_SCTP - || (short_recv = (IS_SCTP(desc) && !(mhdr.msg_flags & MSG_EOR))) - /* NB: here we check for EOR not being set -- this is an error as - well, we don't support partial msgs: - */ -#endif - ) { + if (IS_SOCKET_ERROR(n)) { int err = sock_errno(); - release_buffer(buf); + release_buffer(udesc->i_buf); + udesc->i_buf = NULL; if (err != ERRNO_BLOCK) { if (!desc->active) { -#ifdef HAVE_SCTP - if (short_recv) { - async_error_am(desc, am_short_recv); - } else { - async_error(desc, err); - } -#else async_error(desc, err); -#endif driver_cancel_timer(desc->port); sock_select(desc,FD_READ,0); } @@ -10165,41 +10175,55 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event) sock_select(desc,FD_READ,1); return count; /* strange, not ready */ } +#ifdef HAVE_SCTP + else if (IS_SCTP(desc) && !(mhdr.msg_flags & MSG_EOR)) { + /* short recv */ + inet_input_count(desc, n); + udesc->i_ptr += n; + sock_select(desc, FD_READ, 1); + return count; /* more fragments will come later */ + } +#endif else { - int offs; - int nsz; int code; - unsigned int alen = len; void * extra = NULL; + char * ptr; inet_input_count(desc, n); - inet_get_address(desc->sfamily, abuf, &other, &alen); - /* Copy formatted address to the buffer allocated; "alen" is the - actual length which must be <= than the original reserved "len". + udesc->i_ptr += n; + inet_get_address(desc->sfamily, abuf, &other, &len); + /* Copy formatted address to the buffer allocated; "len" is the + actual length which must be <= than the original reserved. This means that the addr + data in the buffer are contiguous, - but they may start not at the "orig_bytes", but with some "offs" - from them: + but they may start not at the "orig_bytes", instead at "ptr": */ - ASSERT (alen <= len); - sys_memcpy(ptr - alen, abuf, alen); - ptr -= alen; - nsz = n + alen; /* nsz = data + address */ - offs = ptr - buf->orig_bytes; /* initial pointer offset */ + ASSERT (len <= sizeof(other)); + ptr = udesc->i_buf->orig_bytes + sizeof(other) - len; + sys_memcpy(ptr, abuf, len); /* Check if we need to reallocate binary */ if ((desc->mode == INET_MODE_BINARY) && - (desc->hsz < n) && (nsz < BIN_REALLOC_LIMIT(sz))) { + (desc->hsz < (udesc->i_ptr - ptr)) && + ((udesc->i_ptr - ptr) + BIN_REALLOC_MARGIN(desc->bufsz) >= + udesc->i_bufsz)) { ErlDrvBinary* tmp; - if ((tmp = realloc_buffer(buf,nsz+offs)) != NULL) - buf = tmp; + int bufsz; + bufsz = udesc->i_ptr - udesc->i_buf->orig_bytes; + if ((tmp = realloc_buffer(udesc->i_buf, bufsz)) != NULL) { + udesc->i_buf = tmp; + udesc->i_bufsz = bufsz; + } } #ifdef HAVE_SCTP if (IS_SCTP(desc)) extra = &mhdr; #endif /* Actual parsing and return of the data received, occur here: */ - code = packet_reply_binary_data(desc, (unsigned int)alen, - buf, offs, nsz, extra); - free_buffer(buf); + code = packet_reply_binary_data(desc, len, udesc->i_buf, + ptr - udesc->i_buf->orig_bytes, + udesc->i_ptr - ptr, + extra); + free_buffer(udesc->i_buf); + udesc->i_buf = NULL; if (code < 0) return count; count++; diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl index f422ffe442..9d12c629de 100644 --- a/lib/kernel/test/gen_sctp_SUITE.erl +++ b/lib/kernel/test/gen_sctp_SUITE.erl @@ -31,14 +31,14 @@ [basic/1, api_open_close/1,api_listen/1,api_connect_init/1,api_opts/1, xfer_min/1,xfer_active/1,def_sndrcvinfo/1,implicit_inet6/1, - basic_stream/1, xfer_stream_min/1, peeloff/1]). + basic_stream/1, xfer_stream_min/1, peeloff/1, buffers/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> [basic, api_open_close, api_listen, api_connect_init, api_opts, xfer_min, xfer_active, def_sndrcvinfo, implicit_inet6, - basic_stream, xfer_stream_min, peeloff]. + basic_stream, xfer_stream_min, peeloff, buffers]. groups() -> []. @@ -826,6 +826,49 @@ peeloff(Config) when is_list(Config) -> ok(socket_stop(S3)), ok. + + +buffers(doc) -> + ["Check sndbuf and recbuf behaviour"]; +buffers(suite) -> + []; +buffers(Config) when is_list(Config) -> + ?line Limit = 8192, + ?line Data = mk_data(Limit), + ?line Addr = {127,0,0,1}, + ?line Stream = 1, + ?line Timeout = 333, + ?line S1 = socket_start(Addr, Timeout), + ?line P1 = socket_call(S1, port), + ?line ok = socket_call(S1, {listen,true}), + ?line S2 = socket_start(Addr, Timeout), + ?line P2 = socket_call(S2, port), + %% + ?line H_a = socket_req(S1, recv_assoc), + ?line {S2Ai,Sa,Sb} = socket_call(S2, {connect,Addr,P1,[]}), + ?line {S1Ai,Sb,Sa,Addr,P2} = socket_resp(H_a), + %% + ?line ok = socket_call(S1, {setopts,[{recbuf,Limit}]}), + ?line case socket_call(S1, {getopts,[recbuf]}) of + {ok,[{recbuf,RB1}]} when RB1 >= Limit -> ok + end, + ?line H_b = socket_req(S1, recv), + ?line ok = socket_call(S2, {send,S2Ai,Stream,Data}), + ?line {Addr,P2,S1Ai,Stream,Data} = socket_resp(H_b), + %% + ?line ok = socket_stop(S1), + ?line {Addr,P1,[],#sctp_shutdown_event{assoc_id=S2Ai}} = + ok(socket_stop(S2)), + ok. + +mk_data(Words) -> + mk_data(0, Words, <<>>). +%% +mk_data(Words, Words, Bin) -> + Bin; +mk_data(N, Words, Bin) -> + mk_data(N+1, Words, <<Bin/binary,N:32>>). + %%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% socket gen_server ultra light @@ -937,7 +980,11 @@ socket_handler(Socket, Timeout) -> {Addr,Port, [#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}],Data} = ok(gen_sctp:recv(S, infinity)), - {Addr,Port,AssocId,Stream,Data} + {Addr,Port,AssocId,Stream,Data}; + ({setopts,Opts}) -> + inet:setopts(Socket, Opts); + ({getopts,Optnames}) -> + inet:getopts(Socket, Optnames) end. socket_stop(Handler) -> |