diff options
-rw-r--r-- | erts/doc/src/socket_usage.xml | 5 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 190 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 708 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 70344 -> 69888 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 8 |
5 files changed, 865 insertions, 46 deletions
diff --git a/erts/doc/src/socket_usage.xml b/erts/doc/src/socket_usage.xml index 401a70992f..756f0dbd44 100644 --- a/erts/doc/src/socket_usage.xml +++ b/erts/doc/src/socket_usage.xml @@ -83,10 +83,11 @@ </row> <row> <cell>rcvbuf</cell> - <cell>default | pos_integer()</cell> + <cell>default | pos_integer() | {pos_integer(), pos_ineteger()}</cell> <cell>yes</cell> <cell>yes</cell> - <cell>default only valid for set</cell> + <cell>'default' only valid for set. + The tuple form is only valid for type 'stream' and protocol 'tcp'.</cell> </row> <row> <cell>rcvctrlbuf</cell> diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 9375e9c005..fc218f5163 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -812,11 +812,20 @@ typedef struct { SocketRequestQueue acceptorsQ; /* +++ Config & Misc stuff +++ */ - size_t rBufSz; // Read buffer size (when data length = 0 is specified) - size_t rCtrlSz; // Read control buffer size - size_t wCtrlSz; // Write control buffer size - BOOLEAN_T iow; // Inform On (counter) Wrap - BOOLEAN_T dbg; + size_t rBufSz; // Read buffer size (when data length = 0) + /* rNum and rNumCnt are used (together with rBufSz) when calling the recv + * function with the Length argument set to 0 (zero). + * If rNum is 0 (zero), then rNumCnt is not used and only *one* read will + * be done. Also, when get'ing the value of the option (rcvbuf) with + * getopt, the value will be reported as an integer. If the rNum has a + * value greater then 0 (zero), then it will instead be reported as {N, BufSz}. + */ + unsigned int rNum; // recv: Number of reads using rBufSz + unsigned int rNumCnt; // recv: Current number of reads (so far) + size_t rCtrlSz; // Read control buffer size + size_t wCtrlSz; // Write control buffer size + BOOLEAN_T iow; // Inform On (counter) Wrap + BOOLEAN_T dbg; /* +++ Close stuff +++ */ ErlNifMutex* closeMtx; @@ -4959,6 +4968,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->type = descP->type; accDescP->protocol = descP->protocol; accDescP->rBufSz = descP->rBufSz; // Inherit buffer size + accDescP->rNum = descP->rNum; // Inherit buffer uses + accDescP->rNumCnt = 0; accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size @@ -5737,8 +5748,8 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } - sockRef = argv[0]; // We need this in case we send in case we send abort - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we case we send abort + recvRef = argv[1]; if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { return enif_make_badarg(env); @@ -5791,9 +5802,9 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, int bufSz = (len ? len : descP->rBufSz); SSDBG( descP, ("SOCKET", "nrecv -> entry with" - "\r\n len: %d (%d)" + "\r\n len: %d (%d:%d)" "\r\n flags: %d" - "\r\n", len, bufSz, flags) ); + "\r\n", len, descP->rNumCnt, bufSz, flags) ); if (!descP->isReadable) return enif_make_badarg(env); @@ -5817,10 +5828,11 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, // If it fails (read = -1), we need errno... SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) ); read = sock_recv(descP->sock, buf.data, buf.size, flags); - if (IS_SOCKET_ERROR(read)) + if (IS_SOCKET_ERROR(read)) { save_errno = sock_errno(); - else + } else { save_errno = -1; // The value does not actually matter in this case + } SSDBG( descP, ("SOCKET", "nrecv -> read: %d (%d)\r\n", read, save_errno) ); @@ -6719,7 +6731,7 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, } descP->ctrlPid = newCtrlPid; - descP->ctrlMon = newCtrlMon; + descP->ctrlMon = newCtrlMon; SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") ); @@ -6729,22 +6741,60 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, /* nsetopt_otp_rcvbuf - Handle the OTP (level) rcvbuf option + * The (otp) rcvbuf option is provided as: + * + * BufSz :: integer() | {N :: pos_integer(), BufSz :: pod_integer()} + * + * Where N is the max number of reads. */ static ERL_NIF_TERM nsetopt_otp_rcvbuf(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM eVal) { - size_t val; - char* xres; + const ERL_NIF_TERM* t; // The array of the elements of the tuple + int tsz; // The size of the tuple - should be 2 + unsigned int n; + size_t bufSz; + char* xres; - if ((xres = esock_decode_bufsz(env, - eVal, - SOCKET_RECV_BUFFER_SIZE_DEFAULT, &val)) != NULL) - return esock_make_error_str(env, xres); + if (IS_NUM(env, eVal)) { + + /* This will have the effect that the buffer size will be + * reported as an integer (getopt). + */ + n = 0; - descP->rBufSz = val; + if ((xres = esock_decode_bufsz(env, + eVal, + SOCKET_RECV_BUFFER_SIZE_DEFAULT, + &bufSz)) != NULL) + return esock_make_error_str(env, xres); + + } else if (IS_TUPLE(env, eVal)) { + + if (!GET_TUPLE(env, eVal, &tsz, &t)) + return enif_make_badarg(env); // We should use a "proper" error value... + + if (tsz != 2) + return enif_make_badarg(env); // We should use a "proper" error value... + if (!GET_UINT(env, t[0], &n)) + return enif_make_badarg(env); // We should use a "proper" error value... + + if ((xres = esock_decode_bufsz(env, + t[1], + SOCKET_RECV_BUFFER_SIZE_DEFAULT, + &bufSz)) != NULL) + return esock_make_error_str(env, xres); + + } else { + return enif_make_badarg(env); // We should use a "proper" error value... + } + + descP->rNum = n; + descP->rBufSz = bufSz; + return esock_atom_ok; } @@ -10097,7 +10147,13 @@ static ERL_NIF_TERM ngetopt_otp_rcvbuf(ErlNifEnv* env, SocketDescriptor* descP) { - ERL_NIF_TERM eVal = MKI(env, descP->rBufSz); + ERL_NIF_TERM eVal; + + if (descP->rNum == 0) { + eVal = MKI(env, descP->rBufSz); + } else { + eVal = MKT2(env, MKI(env, descP->rNum), MKI(env, descP->rBufSz)); + } return esock_make_ok2(env, eVal); } @@ -13681,7 +13737,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, if (toRead == 0) { - /* +++ Give us everything you have got => needs to continue +++ */ + /* +++ Give us everything you have got => * + * (maybe) needs to continue +++ */ /* How do we do this? * Either: @@ -13695,36 +13752,83 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * (continuous binary realloc "here"). * * => We choose alt 1 for now. + * + * Also, we need to check if the rNumCnt has reached its max (rNum), + * in which case we will assume the read to be done! */ cnt_inc(&descP->readByteCnt, read); - if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) - return esock_make_error_str(env, xres); + SSDBG( descP, + ("SOCKET", "recv_check_result -> shall we continue reading" + "\r\n read: %d" + "\r\n rNum: %d" + "\r\n rNumCnt: %d" + "\r\n", read, descP->rNum, descP->rNumCnt) ); - /* This transfers "ownership" of the *allocated* binary to an - * erlang term (no need for an explicit free). - */ - data = MKBIN(env, bufP); + if (descP->rNum > 0) { - SSDBG( descP, - ("SOCKET", - "recv_check_result -> [%d] " - "we are done for now - read more\r\n", toRead) ); + descP->rNumCnt++; + if (descP->rNumCnt >= descP->rNum) { - return esock_make_ok3(env, atom_false, data); + descP->rNumCnt = 0; - } else { + cnt_inc(&descP->readPkgCnt, 1); + + recv_update_current_reader(env, descP); + + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ + data = MKBIN(env, bufP); + + return esock_make_ok3(env, atom_true, data); - /* +++ We got exactly as much as we requested +++ */ + } else { - /* <KOLLA> - * WE NEED TO INFORM ANY WAITING READERS - * - * DEMONP of the current reader! - * - * </KOLLA> - */ + /* Yes, we *do* need to continue reading */ + + if ((xres = recv_init_current_reader(env, + descP, recvRef)) != NULL) { + descP->rNumCnt = 0; + return esock_make_error_str(env, xres); + } + + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ + data = MKBIN(env, bufP); + + return esock_make_ok3(env, atom_false, data); + + } + + } else { + + /* Yes, we *do* need to continue reading */ + + if ((xres = recv_init_current_reader(env, + descP, recvRef)) != NULL) { + descP->rNumCnt = 0; + return esock_make_error_str(env, xres); + } + + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ + data = MKBIN(env, bufP); + + SSDBG( descP, + ("SOCKET", + "recv_check_result -> [%d] " + "we are done for now - read more\r\n", toRead) ); + + return esock_make_ok3(env, atom_false, data); + } + + } else { + + /* +++ We got exactly as much as we requested => We are done +++ */ cnt_inc(&descP->readPkgCnt, 1); cnt_inc(&descP->readByteCnt, read); @@ -13793,6 +13897,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); + descP->rNumCnt = 0; if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); @@ -13840,6 +13945,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] split buffer\r\n", toRead) ); + descP->rNumCnt = 0; cnt_inc(&descP->readPkgCnt, 1); cnt_inc(&descP->readByteCnt, read); @@ -15859,6 +15965,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->closeMtx = MCREATE(buf); descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; + descP->rNum = 0; + descP->rNumCnt = 0; descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT; descP->wCtrlSz = SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT; descP->iow = FALSE; diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 01d332c34e..803f06809b 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -49,6 +49,7 @@ %% *** API Options *** api_opt_simple_otp_options/1, + api_opt_simple_otp_rcvbuf_option/1, api_opt_simple_otp_controlling_process/1, %% *** API Operation Timeout *** @@ -568,6 +569,7 @@ api_basic_cases() -> api_options_cases() -> [ api_opt_simple_otp_options, + api_opt_simple_otp_rcvbuf_option, api_opt_simple_otp_controlling_process ]. @@ -2146,6 +2148,12 @@ api_opt_simple_otp_options() -> ERROR end end}, + + %% #{desc => "enable debug", + %% cmd => fun(#{sock := Sock}) -> + %% ok = socket:setopt(Sock, otp, debug, true) + %% end}, + #{desc => "set (new) iow", cmd => fun(#{sock := Sock, iow := OldIOW} = State) -> NewIOW = not OldIOW, @@ -2174,6 +2182,9 @@ api_opt_simple_otp_options() -> case Get(Sock, rcvbuf) of {ok, RcvBuf} when is_integer(RcvBuf) -> {ok, State#{rcvbuf => RcvBuf}}; + {ok, {N, RcvBuf} = V} when is_integer(N) andalso + is_integer(RcvBuf) -> + {ok, State#{rcvbuf => V}}; {ok, InvalidRcvBuf} -> {error, {invalid, InvalidRcvBuf}}; {error, _} = ERROR -> @@ -2181,13 +2192,31 @@ api_opt_simple_otp_options() -> end end}, #{desc => "set (new) rcvbuf", - cmd => fun(#{sock := Sock, rcvbuf := OldRcvBuf} = State) -> + cmd => fun(#{sock := Sock, rcvbuf := {OldN, OldRcvBuf}} = State) -> + NewRcvBuf = {OldN+2, OldRcvBuf + 1024}, + case Set(Sock, rcvbuf, NewRcvBuf) of + ok -> + {ok, State#{rcvbuf => NewRcvBuf}}; + {error, _} = ERROR -> + ERROR + end; + (#{sock := Sock, rcvbuf := OldRcvBuf} = State) when is_integer(OldRcvBuf) -> NewRcvBuf = 2 * OldRcvBuf, case Set(Sock, rcvbuf, NewRcvBuf) of ok -> {ok, State#{rcvbuf => NewRcvBuf}}; {error, _} = ERROR -> ERROR + end; + (#{sock := Sock, rcvbuf := OldRcvBuf, + type := stream, + protocol := tcp} = State) when is_integer(OldRcvBuf) -> + NewRcvBuf = {2, OldRcvBuf}, + case Set(Sock, rcvbuf, NewRcvBuf) of + ok -> + {ok, State#{rcvbuf => NewRcvBuf}}; + {error, _} = ERROR -> + ERROR end end}, #{desc => "get (new) rcvbuf", @@ -2355,6 +2384,680 @@ api_opt_simple_otp_options() -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Perform some simple operations with the rcvbuf otp option +%% The operations we test here are only for type = stream and +%% protocol = tcp. +api_opt_simple_otp_rcvbuf_option(suite) -> + []; +api_opt_simple_otp_rcvbuf_option(doc) -> + []; +api_opt_simple_otp_rcvbuf_option(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(api_opt_simple_otp_rcvbuf_option, + fun() -> api_opt_simple_otp_rcvbuf_option() end). + +api_opt_simple_otp_rcvbuf_option() -> + Get = fun(S) -> + socket:getopt(S, otp, rcvbuf) + end, + Set = fun(S, Val) -> + socket:setopt(S, otp, rcvbuf, Val) + end, + + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, local_sa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, + local_sa := LocalSA, + lport := Port}) -> + ServerSA = LocalSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), + ok + end}, + + + %% *** The actual test part *** + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "attempt to accept", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + + %% Recv with default size for (otp) rcvbuf + #{desc => "await continue (recv initial)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, recv) of + {ok, MsgSz} -> + ?SEV_IPRINT("MsgSz: ~p", [MsgSz]), + {ok, State#{msg_sz => MsgSz}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to recv", + cmd => fun(#{sock := Sock, msg_sz := MsgSz} = _State) -> + ?SEV_IPRINT("try recv ~w bytes when rcvbuf is ~s", + [MsgSz, + case Get(Sock) of + {ok, RcvBuf} -> f("~w", [RcvBuf]); + {error, _} -> "-" + end]), + case socket:recv(Sock) of + {ok, Data} when (size(Data) =:= MsgSz) -> + ok; + {ok, Data} -> + {error, {invalid_msg_sz, MsgSz, size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv initial)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv), + ok + end}, + + %% Recv with new size (1) for (otp) rcvbuf + #{desc => "await continue (recv 1)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, recv) of + {ok, NewRcvBuf} -> + ?SEV_IPRINT("set new rcvbuf: ~p", [NewRcvBuf]), + {ok, State#{rcvbuf => NewRcvBuf}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to setopt rcvbuf", + cmd => fun(#{sock := Sock, rcvbuf := NewRcvBuf} = _State) -> + case Set(Sock, NewRcvBuf) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to recv", + cmd => fun(#{sock := Sock, msg_sz := MsgSz} = _State) -> + case socket:recv(Sock) of + {ok, Data} when (size(Data) =:= MsgSz) -> + ok; + {ok, Data} -> + {error, {invalid_msg_sz, MsgSz, size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv), + ok + end}, + + %% Recv with new size (2) for (otp) rcvbuf + #{desc => "await continue (recv 2)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, recv) of + {ok, NewRcvBuf} -> + ?SEV_IPRINT("set new rcvbuf: ~p", [NewRcvBuf]), + {ok, State#{rcvbuf => NewRcvBuf}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to setopt rcvbuf", + cmd => fun(#{sock := Sock, rcvbuf := NewRcvBuf} = _State) -> + case Set(Sock, NewRcvBuf) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to recv", + cmd => fun(#{sock := Sock, msg_sz := MsgSz} = _State) -> + case socket:recv(Sock) of + {ok, Data} when (size(Data) =:= MsgSz) -> + ok; + {ok, Data} -> + {error, {invalid_msg_sz, MsgSz, size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv), + ok + end}, + + %% Recv with new size (3) for (otp) rcvbuf + #{desc => "await continue (recv 3, truncated)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, recv) of + {ok, {ExpSz, NewRcvBuf}} -> + {ok, State#{msg_sz => ExpSz, + rcvbuf => NewRcvBuf}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to setopt rcvbuf", + cmd => fun(#{sock := Sock, rcvbuf := NewRcvBuf} = _State) -> + case Set(Sock, NewRcvBuf) of + ok -> + ?SEV_IPRINT("set new rcvbuf: ~p", [NewRcvBuf]), + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "attempt to recv", + cmd => fun(#{sock := Sock, msg_sz := MsgSz} = _State) -> + ?SEV_IPRINT("try recv ~w bytes of data", [MsgSz]), + case socket:recv(Sock) of + {ok, Data} when (size(Data) =:= MsgSz) -> + ok; + {ok, Data} -> + {error, {invalid_msg_sz, MsgSz, size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv), + ok + end}, + + + %% Termination + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket(s)", + cmd => fun(#{lsock := LSock, sock := Sock} = State) -> + sock_close(Sock), + sock_close(LSock), + State1 = maps:remove(sock, State), + State2 = maps:remove(lport, State1), + State3 = maps:remove(lsock, State2), + {ok, State3} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, ServerSA} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, + server_sa => ServerSA}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect) + end}, + #{desc => "connect to server", + cmd => fun(#{sock := Sock, server_sa := SSA}) -> + socket:connect(Sock, SSA) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + #{desc => "await continue (send initial)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, send) of + {ok, Data} -> + {ok, State#{data => Data}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "send (initial) data to server", + cmd => fun(#{sock := Sock, data := Data} = _State) -> + ?SEV_IPRINT("try send ~w bytes", [size(Data)]), + socket:send(Sock, Data) + end}, + #{desc => "announce ready (send initial)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send), + ok + end}, + + #{desc => "await continue (send 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send) + end}, + #{desc => "send (1) data to server", + cmd => fun(#{sock := Sock, data := Data}) -> + ?SEV_IPRINT("try send ~w bytes", [size(Data)]), + socket:send(Sock, Data) + end}, + #{desc => "announce ready (send 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send), + ok + end}, + + #{desc => "await continue (send 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send) + end}, + #{desc => "send (2) data to server", + cmd => fun(#{sock := Sock, data := Data}) -> + ?SEV_IPRINT("try send ~w bytes", [size(Data)]), + socket:send(Sock, Data) + end}, + #{desc => "announce ready (send 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send), + ok + end}, + + #{desc => "await continue (send 3)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send) + end}, + #{desc => "send (3) data to server", + cmd => fun(#{sock := Sock, data := Data}) -> + ?SEV_IPRINT("try send ~w bytes", [size(Data)]), + socket:send(Sock, Data) + end}, + #{desc => "announce ready (send 3)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send), + ok + end}, + + + %% Termination + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Server} = _State) -> + _MRef = erlang:monitor(process, Server), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Client} = _State) -> + _MRef = erlang:monitor(process, Client), + ok + end}, + #{desc => "order server start", + cmd => fun(#{server := Server}) -> + ?SEV_ANNOUNCE_START(Server) + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Server} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Server, server, init), + {ok, State#{server_sa => ServerSA}} + end}, + #{desc => "order client start", + cmd => fun(#{client := Client, + server_sa := ServerSA}) -> + ?SEV_ANNOUNCE_START(Client, ServerSA), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, init) + end}, + + + %% The actual test (connecting) + #{desc => "order server accept (accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, connect), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept) + end}, + + %% The actual test (initial part) + #{desc => "order client continue (send initial)", + cmd => fun(#{client := Client, data := Data} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send, Data), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order server continue (recv initial)", + cmd => fun(#{server := Server, data := Data} = _State) -> + ExpMsgSz = size(Data), + ?SEV_ANNOUNCE_CONTINUE(Server, recv, ExpMsgSz), + ok + end}, + #{desc => "await client ready (send initial)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await server ready (recv initial)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Server, client, recv, + [{client, Client}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + %% The actual test (part 1) + #{desc => "order client continue (send 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order server continue (recv 1)", + cmd => fun(#{server := Server, data := Data} = _State) -> + MsgSz = size(Data), + NewRcvBuf = {2 + (MsgSz div 1024), 1024}, + ?SEV_ANNOUNCE_CONTINUE(Server, recv, NewRcvBuf), + ok + end}, + #{desc => "await client ready (send 1)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await server ready (recv 1)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Server, client, recv, + [{client, Client}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + %% The actual test (part 2) + #{desc => "order client continue (send 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order server continue (recv 2)", + cmd => fun(#{server := Server, data := Data} = _State) -> + MsgSz = size(Data), + NewRcvBuf = {2 + (MsgSz div 2048), 2048}, + ?SEV_ANNOUNCE_CONTINUE(Server, recv, NewRcvBuf), + ok + end}, + #{desc => "await client ready (send 2)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await server ready (recv 2)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Server, client, recv, + [{client, Client}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% The actual test (part 3) + #{desc => "order client continue (send 3)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order server continue (recv 3)", + cmd => fun(#{server := Server, data := Data} = _State) -> + MsgSz = size(Data), + BufSz = 2048, + N = MsgSz div BufSz - 1, + NewRcvBuf = {N, BufSz}, + ?SEV_ANNOUNCE_CONTINUE(Server, recv, + {N*BufSz, NewRcvBuf}) + end}, + #{desc => "await client ready (send 3)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await server ready (recv 3)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Server, client, recv, + [{client, Client}]) of + ok -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + + ?SEV_SLEEP(?SECS(1)), + + %% *** Terminate server *** + #{desc => "order client terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client down", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server down", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + State2 = maps:remove(server_sa, State1), + {ok, State2} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + %% Create a data binary of 6*1024 bytes + Data = list_to_binary(lists:duplicate(6*4, lists:seq(0, 255))), + InitState = #{domain => inet, + data => Data}, + + i("create server evaluator"), + ServerInitState = #{domain => maps:get(domain, InitState)}, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("create client evaluator"), + ClientInitState = #{host => local_host(), + domain => maps:get(domain, InitState)}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("create tester evaluator"), + TesterInitState = InitState#{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% Perform some simple getopt and setopt with the level = otp options api_opt_simple_otp_controlling_process(suite) -> []; @@ -9507,7 +10210,7 @@ traffic_ping_pong_send_and_receive_tcp(#{msg := Msg} = InitState) -> true -> ok end, - ok = socket:setopt(Sock, otp, rcvbuf, 8*1024) + ok = socket:setopt(Sock, otp, rcvbuf, {12, 1024}) end, traffic_ping_pong_send_and_receive_tcp2(InitState#{buf_init => Fun}). @@ -16621,6 +17324,7 @@ tc_try(Case, Fun) when is_atom(Case) andalso is_function(Fun, 0) -> try begin Fun(), + ?SLEEP(?SECS(1)), tc_end("ok") end catch diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 06e7c2910d..0066ca2433 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 839087ef2a..80ccd7ea10 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -2553,9 +2553,15 @@ enc_setopt_value(otp, iow, V, _, _, _) when is_boolean(V) -> enc_setopt_value(otp, controlling_process, V, _, _, _) when is_pid(V) -> V; enc_setopt_value(otp, rcvbuf, V, _, _, _) when (V =:= default) -> - 0; + 0; % This will cause the nif-code to choose the default value enc_setopt_value(otp, rcvbuf, V, _, _, _) when is_integer(V) andalso (V > 0) -> V; +%% N: Number of reads (when specifying length = 0) +%% V: Size of the "read" buffer +enc_setopt_value(otp, rcvbuf, {N, BufSz} = V, _, stream = _T, tcp = _P) + when (is_integer(N) andalso (N > 0)) andalso + (is_integer(BufSz) andalso (BufSz > 0)) -> + V; enc_setopt_value(otp, rcvctrlbuf, V, _, _, _) when (V =:= default) -> 0; enc_setopt_value(otp, rcvctrlbuf, V, _, _, _) when is_integer(V) andalso (V > 0) -> |