aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaimo Niskanen <[email protected]>2011-09-09 17:40:29 +0200
committerRaimo Niskanen <[email protected]>2011-11-17 12:11:03 +0100
commit6dd80f3e9a556dc94ae59280f532e153e463b798 (patch)
treeb57e338c7eaea69e9ddc7a833930d7c482b0a35e
parent5895bb8d7bde52972f74f6b51748230982f762f7 (diff)
downloadotp-6dd80f3e9a556dc94ae59280f532e153e463b798.tar.gz
otp-6dd80f3e9a556dc94ae59280f532e153e463b798.tar.bz2
otp-6dd80f3e9a556dc94ae59280f532e153e463b798.zip
erts,kernel: Bugfix - collect fragmented SCTP messages on recv
-rw-r--r--erts/emulator/drivers/common/inet_drv.c136
-rw-r--r--lib/kernel/test/gen_sctp_SUITE.erl53
2 files changed, 130 insertions, 59 deletions
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 283301f166..79422dc54c 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -683,7 +683,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#endif
-#define BIN_REALLOC_LIMIT(x) (((x)*3)/4) /* 75% */
+#define BIN_REALLOC_MARGIN(x) ((x)/4) /* 25% */
/* The general purpose sockaddr */
typedef union {
@@ -990,6 +990,9 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event);
typedef struct {
inet_descriptor inet; /* common data structure (DON'T MOVE) */
int read_packets; /* Number of packets to read per invocation */
+ int i_bufsz; /* current input buffer size */
+ ErlDrvBinary* i_buf; /* current binary buffer */
+ char* i_ptr; /* current pos in buf */
} udp_descriptor;
@@ -9517,6 +9520,9 @@ static ErlDrvData packet_inet_start(ErlDrvPort port, char* args, int protocol)
return ERL_DRV_ERROR_ERRNO;
desc->read_packets = INET_PACKET_POLL;
+ desc->i_bufsz = 0;
+ desc->i_buf = NULL;
+ desc->i_ptr = NULL;
return drvd;
}
@@ -9541,6 +9547,10 @@ static void packet_inet_stop(ErlDrvData e)
*/
udp_descriptor * udesc = (udp_descriptor*) e;
inet_descriptor* descr = INETP(udesc);
+ if (udesc->i_buf != NULL) {
+ release_buffer(udesc->i_buf);
+ udesc->i_buf = NULL;
+ }
ASSERT(NO_SUBSCRIBERS(&(descr->empty_out_q_subs)));
inet_stop(descr);
@@ -10075,39 +10085,50 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event)
{
inet_descriptor* desc = INETP(udesc);
int n;
- unsigned int len;
inet_address other;
char abuf[sizeof(inet_address)]; /* buffer address; enough??? */
- int sz;
- char* ptr;
- ErlDrvBinary* buf; /* binary */
int packet_count = udesc->read_packets;
int count = 0; /* number of packets delivered to owner */
#ifdef HAVE_SCTP
struct msghdr mhdr; /* Top-level msg structure */
struct iovec iov[1]; /* Data or Notification Event */
char ancd[SCTP_ANC_BUFF_SIZE]; /* Ancillary Data */
- int short_recv = 0;
#endif
while(packet_count--) {
- len = sizeof(other);
- sz = desc->bufsz;
- /* Allocate space for message and address. NB: "bufsz" is in "desc",
- but the "buf" itself is allocated separately:
- */
- if ((buf = alloc_buffer(sz+len)) == NULL)
- return packet_error(udesc, ENOMEM);
- ptr = buf->orig_bytes + len; /* pointer to message part */
+ unsigned int len = sizeof(other);
+
+ /* udesc->i_buf is only kept between SCTP fragments */
+ if (udesc->i_buf == NULL) {
+ udesc->i_bufsz = desc->bufsz + len;
+ if ((udesc->i_buf = alloc_buffer(udesc->i_bufsz)) == NULL)
+ return packet_error(udesc, ENOMEM);
+ /* pointer to message start */
+ udesc->i_ptr = udesc->i_buf->orig_bytes + len;
+ } else {
+ ErlDrvBinary* tmp;
+ int bufsz;
+ bufsz = desc->bufsz + (udesc->i_ptr - udesc->i_buf->orig_bytes);
+ if ((tmp = realloc_buffer(udesc->i_buf, bufsz)) == NULL) {
+ release_buffer(udesc->i_buf);
+ udesc->i_buf = NULL;
+ return packet_error(udesc, ENOMEM);
+ } else {
+ udesc->i_ptr =
+ tmp->orig_bytes + (udesc->i_ptr - udesc->i_buf->orig_bytes);
+ udesc->i_buf = tmp;
+ udesc->i_bufsz = bufsz;
+ }
+ }
/* Note: On Windows NT, recvfrom() fails if the socket is connected. */
#ifdef HAVE_SCTP
/* For SCTP we must use recvmsg() */
if (IS_SCTP(desc)) {
- iov->iov_base = ptr; /* Data will come here */
- iov->iov_len = sz; /* Remaining buffer space */
+ iov->iov_base = udesc->i_ptr; /* Data will come here */
+ iov->iov_len = desc->bufsz; /* Remaining buffer space */
- mhdr.msg_name = &other; /* Peer addr comes into "other" */
+ mhdr.msg_name = &other; /* Peer addr comes into "other" */
mhdr.msg_namelen = len;
mhdr.msg_iov = iov;
mhdr.msg_iovlen = 1;
@@ -10117,42 +10138,31 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event)
/* Do the actual SCTP receive: */
n = sock_recvmsg(desc->s, &mhdr, 0);
+ len = mhdr.msg_namelen;
goto check_result;
}
#endif
/* Use recv() instead on connected sockets. */
if ((desc->state & INET_F_ACTIVE)) {
- n = sock_recv(desc->s, ptr, sz, 0);
+ n = sock_recv(desc->s, udesc->i_ptr, desc->bufsz, 0);
other = desc->remote;
}
- else
- n = sock_recvfrom(desc->s, ptr, sz, 0, &other.sa, &len);
+ else {
+ n = sock_recvfrom(desc->s, udesc->i_ptr, desc->bufsz,
+ 0, &other.sa, &len);
+ }
#ifdef HAVE_SCTP
check_result:
#endif
/* Analyse the result: */
- if (IS_SOCKET_ERROR(n)
-#ifdef HAVE_SCTP
- || (short_recv = (IS_SCTP(desc) && !(mhdr.msg_flags & MSG_EOR)))
- /* NB: here we check for EOR not being set -- this is an error as
- well, we don't support partial msgs:
- */
-#endif
- ) {
+ if (IS_SOCKET_ERROR(n)) {
int err = sock_errno();
- release_buffer(buf);
+ release_buffer(udesc->i_buf);
+ udesc->i_buf = NULL;
if (err != ERRNO_BLOCK) {
if (!desc->active) {
-#ifdef HAVE_SCTP
- if (short_recv) {
- async_error_am(desc, am_short_recv);
- } else {
- async_error(desc, err);
- }
-#else
async_error(desc, err);
-#endif
driver_cancel_timer(desc->port);
sock_select(desc,FD_READ,0);
}
@@ -10165,41 +10175,55 @@ static int packet_inet_input(udp_descriptor* udesc, HANDLE event)
sock_select(desc,FD_READ,1);
return count; /* strange, not ready */
}
+#ifdef HAVE_SCTP
+ else if (IS_SCTP(desc) && !(mhdr.msg_flags & MSG_EOR)) {
+ /* short recv */
+ inet_input_count(desc, n);
+ udesc->i_ptr += n;
+ sock_select(desc, FD_READ, 1);
+ return count; /* more fragments will come later */
+ }
+#endif
else {
- int offs;
- int nsz;
int code;
- unsigned int alen = len;
void * extra = NULL;
+ char * ptr;
inet_input_count(desc, n);
- inet_get_address(desc->sfamily, abuf, &other, &alen);
- /* Copy formatted address to the buffer allocated; "alen" is the
- actual length which must be <= than the original reserved "len".
+ udesc->i_ptr += n;
+ inet_get_address(desc->sfamily, abuf, &other, &len);
+ /* Copy formatted address to the buffer allocated; "len" is the
+ actual length which must be <= than the original reserved.
This means that the addr + data in the buffer are contiguous,
- but they may start not at the "orig_bytes", but with some "offs"
- from them:
+ but they may start not at the "orig_bytes", instead at "ptr":
*/
- ASSERT (alen <= len);
- sys_memcpy(ptr - alen, abuf, alen);
- ptr -= alen;
- nsz = n + alen; /* nsz = data + address */
- offs = ptr - buf->orig_bytes; /* initial pointer offset */
+ ASSERT (len <= sizeof(other));
+ ptr = udesc->i_buf->orig_bytes + sizeof(other) - len;
+ sys_memcpy(ptr, abuf, len);
/* Check if we need to reallocate binary */
if ((desc->mode == INET_MODE_BINARY) &&
- (desc->hsz < n) && (nsz < BIN_REALLOC_LIMIT(sz))) {
+ (desc->hsz < (udesc->i_ptr - ptr)) &&
+ ((udesc->i_ptr - ptr) + BIN_REALLOC_MARGIN(desc->bufsz) >=
+ udesc->i_bufsz)) {
ErlDrvBinary* tmp;
- if ((tmp = realloc_buffer(buf,nsz+offs)) != NULL)
- buf = tmp;
+ int bufsz;
+ bufsz = udesc->i_ptr - udesc->i_buf->orig_bytes;
+ if ((tmp = realloc_buffer(udesc->i_buf, bufsz)) != NULL) {
+ udesc->i_buf = tmp;
+ udesc->i_bufsz = bufsz;
+ }
}
#ifdef HAVE_SCTP
if (IS_SCTP(desc)) extra = &mhdr;
#endif
/* Actual parsing and return of the data received, occur here: */
- code = packet_reply_binary_data(desc, (unsigned int)alen,
- buf, offs, nsz, extra);
- free_buffer(buf);
+ code = packet_reply_binary_data(desc, len, udesc->i_buf,
+ ptr - udesc->i_buf->orig_bytes,
+ udesc->i_ptr - ptr,
+ extra);
+ free_buffer(udesc->i_buf);
+ udesc->i_buf = NULL;
if (code < 0)
return count;
count++;
diff --git a/lib/kernel/test/gen_sctp_SUITE.erl b/lib/kernel/test/gen_sctp_SUITE.erl
index f422ffe442..9d12c629de 100644
--- a/lib/kernel/test/gen_sctp_SUITE.erl
+++ b/lib/kernel/test/gen_sctp_SUITE.erl
@@ -31,14 +31,14 @@
[basic/1,
api_open_close/1,api_listen/1,api_connect_init/1,api_opts/1,
xfer_min/1,xfer_active/1,def_sndrcvinfo/1,implicit_inet6/1,
- basic_stream/1, xfer_stream_min/1, peeloff/1]).
+ basic_stream/1, xfer_stream_min/1, peeloff/1, buffers/1]).
suite() -> [{ct_hooks,[ts_install_cth]}].
all() ->
[basic, api_open_close, api_listen, api_connect_init,
api_opts, xfer_min, xfer_active, def_sndrcvinfo, implicit_inet6,
- basic_stream, xfer_stream_min, peeloff].
+ basic_stream, xfer_stream_min, peeloff, buffers].
groups() ->
[].
@@ -826,6 +826,49 @@ peeloff(Config) when is_list(Config) ->
ok(socket_stop(S3)),
ok.
+
+
+buffers(doc) ->
+ ["Check sndbuf and recbuf behaviour"];
+buffers(suite) ->
+ [];
+buffers(Config) when is_list(Config) ->
+ ?line Limit = 8192,
+ ?line Data = mk_data(Limit),
+ ?line Addr = {127,0,0,1},
+ ?line Stream = 1,
+ ?line Timeout = 333,
+ ?line S1 = socket_start(Addr, Timeout),
+ ?line P1 = socket_call(S1, port),
+ ?line ok = socket_call(S1, {listen,true}),
+ ?line S2 = socket_start(Addr, Timeout),
+ ?line P2 = socket_call(S2, port),
+ %%
+ ?line H_a = socket_req(S1, recv_assoc),
+ ?line {S2Ai,Sa,Sb} = socket_call(S2, {connect,Addr,P1,[]}),
+ ?line {S1Ai,Sb,Sa,Addr,P2} = socket_resp(H_a),
+ %%
+ ?line ok = socket_call(S1, {setopts,[{recbuf,Limit}]}),
+ ?line case socket_call(S1, {getopts,[recbuf]}) of
+ {ok,[{recbuf,RB1}]} when RB1 >= Limit -> ok
+ end,
+ ?line H_b = socket_req(S1, recv),
+ ?line ok = socket_call(S2, {send,S2Ai,Stream,Data}),
+ ?line {Addr,P2,S1Ai,Stream,Data} = socket_resp(H_b),
+ %%
+ ?line ok = socket_stop(S1),
+ ?line {Addr,P1,[],#sctp_shutdown_event{assoc_id=S2Ai}} =
+ ok(socket_stop(S2)),
+ ok.
+
+mk_data(Words) ->
+ mk_data(0, Words, <<>>).
+%%
+mk_data(Words, Words, Bin) ->
+ Bin;
+mk_data(N, Words, Bin) ->
+ mk_data(N+1, Words, <<Bin/binary,N:32>>).
+
%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% socket gen_server ultra light
@@ -937,7 +980,11 @@ socket_handler(Socket, Timeout) ->
{Addr,Port,
[#sctp_sndrcvinfo{stream=Stream,assoc_id=AssocId}],Data} =
ok(gen_sctp:recv(S, infinity)),
- {Addr,Port,AssocId,Stream,Data}
+ {Addr,Port,AssocId,Stream,Data};
+ ({setopts,Opts}) ->
+ inet:setopts(Socket, Opts);
+ ({getopts,Optnames}) ->
+ inet:getopts(Socket, Optnames)
end.
socket_stop(Handler) ->