diff options
author | Micael Karlberg <[email protected]> | 2018-10-30 18:26:10 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-10-30 18:26:10 +0100 |
commit | 3f46e8a184a503ead01674ee180e7222b3928712 (patch) | |
tree | 4a4f245517d476d3f7e82949b9988750c9664bc7 | |
parent | 06f79ae03385ad668b143f4f1f7085131c2973ed (diff) | |
download | otp-3f46e8a184a503ead01674ee180e7222b3928712.tar.gz otp-3f46e8a184a503ead01674ee180e7222b3928712.tar.bz2 otp-3f46e8a184a503ead01674ee180e7222b3928712.zip |
[socket-nif] Add a send and receive chunks test case
The send and recv test case triggered a two bugs. One was
that there was no re-selecting when only a portion of the data
was received (which meant that we stopped reading).
Also, the wrong 'current' (writer) was reset when demonitor
current reader after a successful read (which meant that
future readers would never have been monitored).
OTP-14831
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 20 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 1568 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 68544 -> 68564 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 11 |
4 files changed, 1305 insertions, 294 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index f9eb041ad1..27395b5cf6 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -13479,7 +13479,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, descP->currentReader.ref); } else { - descP->currentWriterP = NULL; + descP->currentReaderP = NULL; } } @@ -13744,14 +13744,23 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, } else { /* +++ We got only a part of what was expected +++ - * +++ => receive more later. +++ */ + * +++ => select for more more later and +++ + * +++ deliver what we got. +++ */ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] " "only part of message - expect more\r\n", toRead) ); + /* SELECT for more data */ + + SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), + descP, NULL, recvRef); + cnt_inc(&descP->readByteCnt, read); - return esock_make_ok3(env, atom_false, MKBIN(env, bufP)); + data = MKBIN(env, bufP); + data = MKSBIN(env, data, 0, read); + + return esock_make_ok3(env, atom_false, data); } } } @@ -16709,7 +16718,7 @@ int esock_monitor(const char* slogan, int res; SSDBG( descP, ("SOCKET", "[%d] %s: try monitor", descP->sock, slogan) ); - // esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); + /* esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); */ res = enif_monitor_process(env, descP, pid, &monP->mon); if (res != 0) { @@ -16722,7 +16731,7 @@ int esock_monitor(const char* slogan, descP->sock, monP->raw[0], monP->raw[1], monP->raw[2], monP->raw[3]); - } */ + } */ return res; } @@ -16737,6 +16746,7 @@ int esock_demonitor(const char* slogan, int res; SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) ); + /* esock_dbg_printf("DEMONP", "[%d] %s: %u,%u,%u,%u\r\n", descP->sock, slogan, diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 3308b9364e..7a1a362181 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -84,10 +84,11 @@ sc_rc_recv_response_tcp4/1, sc_rc_recv_response_tcp6/1, sc_rc_recvmsg_response_tcp4/1, - sc_rc_recvmsg_response_tcp6/1 + sc_rc_recvmsg_response_tcp6/1, %% Traffic - + traffic_send_and_recv_chunks_tcp4/1, + traffic_send_and_recv_chunks_tcp6/1 %% Tickets @@ -99,21 +100,6 @@ %% Internal exports %% -export([]). -%% -record(ev, {name :: string(), -%% pid :: pid(), -%% mref :: reference()}). -%% -type ev() :: #ev{}. --define(MKEV(N,P,R), #ev{name = N, pid = P, mref = R}). -%% -type initial_evaluator_state() :: map(). -%% -type evaluator_state() :: map(). -%% -type command_fun() :: -%% fun((State :: evaluator_state()) -> ok) | -%% fun((State :: evaluator_state()) -> {ok, evaluator_state()}) | -%% fun((State :: evaluator_state()) -> {error, term()}). - -%% -type command() :: #{desc := string(), -%% cmd := command_fun()}. - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -139,7 +125,8 @@ suite() -> all() -> [ {group, api}, - {group, socket_closure} + {group, socket_closure}, + {group, traffic} %% {group, tickets} ]. @@ -151,7 +138,8 @@ groups() -> {socket_closure, [], socket_closure_cases()}, {sc_ctrl_proc_exit, [], sc_cp_exit_cases()}, {sc_local_close, [], sc_lc_cases()}, - {sc_remote_close, [], sc_rc_cases()} + {sc_remote_close, [], sc_rc_cases()}, + {traffic, [], traffic_cases()} %% {tickets, [], ticket_cases()} ]. @@ -252,8 +240,11 @@ sc_rc_cases() -> ]. -%% traffic_cases() -> -%% [snr_send_and_recv_chunks]. +traffic_cases() -> + [ + traffic_send_and_recv_chunks_tcp4, + traffic_send_and_recv_chunks_tcp6 + ]. %% ticket_cases() -> @@ -881,8 +872,7 @@ api_b_send_and_recv_tcp(InitState) -> end}, #{desc => "await client ready (init)", cmd => fun(#{client := Pid} = _State) -> - ?SEV_AWAIT_READY(Pid, client, init), - ok + ok = ?SEV_AWAIT_READY(Pid, client, init) end}, %% *** The actual test *** @@ -4315,6 +4305,439 @@ sc_lc_recvmsg_response_udp6(_Config) when is_list(_Config) -> end). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test what happens when a socket is +%% locally closed while the process is calling the accept function. +%% We test what happens with a non-controlling_process also, since we +%% git the setup anyway. +%% Socket is IPv4. + +sc_lc_acceptor_response_tcp4(suite) -> + []; +sc_lc_acceptor_response_tcp4(doc) -> + []; +sc_lc_acceptor_response_tcp4(_Config) when is_list(_Config) -> + tc_try(sc_lc_acceptor_response_tcp4, + fun() -> + ?TT(?SECS(10)), + InitState = #{domain => inet, + type => stream, + protocol => tcp}, + ok = sc_lc_acceptor_response_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test what happens when a socket is +%% locally closed while the process is calling the accept function. +%% We test what happens with a non-controlling_process also, since we +%% git the setup anyway. +%% Socket is IPv6. + +sc_lc_acceptor_response_tcp6(suite) -> + []; +sc_lc_acceptor_response_tcp6(doc) -> + []; +sc_lc_acceptor_response_tcp6(_Config) when is_list(_Config) -> + tc_try(sc_lc_acceptor_response_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(10)), + InitState = #{domain => inet, + type => stream, + protocol => tcp}, + ok = sc_lc_acceptor_response_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +sc_lc_acceptor_response_tcp(InitState) -> + PrimAcceptorSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{lsa => LSA}} + end}, + #{desc => "create (listen) socket", + cmd => fun(#{domain := Domain, + type := Type, + protocol := Proto} = State) -> + case socket:open(Domain, Type, Proto) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, lsa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{sock := Sock}) -> + socket:listen(Sock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_READY(Tester, init, Sock), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, accept) of + {ok, Timeout} -> + {ok, State#{timeout => Timeout}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await connection", + cmd => fun(#{sock := Sock, timeout := Timeout} = _State) -> + case socket:accept(Sock, Timeout) of + {error, timeout} -> + ok; + {ok, Sock} -> + ?SEV_EPRINT("unexpected success"), + (catch socket:close(Sock)), + {error, unexpected_success}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept timeout)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept_timeout), + ok + end}, + #{desc => "await continue (close)", + cmd => fun(#{tester := Tester} = _State) -> + ok = ?SEV_AWAIT_CONTINUE(Tester, tester, close) + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + case socket:close(Sock) of + ok -> + {ok, maps:remove(sock, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (close)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, close), + ok + end}, + + % Termination + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + SecAcceptorSeq = + [ + %% *** Init part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, Sock} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, sock => Sock}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init) + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = _State) -> + ok = ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "accept", + cmd => fun(#{sock := Sock} = State) -> + case socket:accept(Sock) of + {error, closed} -> + {ok, maps:remove(sock, State)}; + {ok, _} -> + {error, unexpected_success}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept closed)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept_closed) + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor 'primary acceptor'", + cmd => fun(#{prim_acc := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor 'secondary acceptor 1'", + cmd => fun(#{sec_acc1 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor secondary acceptor 2", + cmd => fun(#{sec_acc2 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor secondary acceptor 3", + cmd => fun(#{sec_acc3 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the primary server + #{desc => "order 'primary acceptor' start", + cmd => fun(#{prim_acc := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await 'primary acceptor' ready (init)", + cmd => fun(#{prim_acc := Pid} = State) -> + {ok, Sock} = ?SEV_AWAIT_READY(Pid, prim_acc, init), + {ok, State#{sock => Sock}} + end}, + + %% Start the secondary acceptor 1 + #{desc => "order 'secondary acceptor 1' start", + cmd => fun(#{sec_acc1 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await 'secondary acceptor 1' ready (init)", + cmd => fun(#{sec_acc1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, sec_acc1, init) + end}, + + %% Start the secondary acceptor 2 + #{desc => "order 'secondary acceptor 2' start", + cmd => fun(#{sec_acc2 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await 'secondary acceptor 2' ready (init)", + cmd => fun(#{sec_acc2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, sec_acc2, init) + end}, + + %% Start the secondary acceptor 3 + #{desc => "order 'secondary acceptor 3' start", + cmd => fun(#{sec_acc3 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await 'secondary acceptor 3' ready (init)", + cmd => fun(#{sec_acc3 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, sec_acc3, init) + end}, + + + %% The actual test + %% Make all the seondary servers continue, with an infinit recvfrom + %% and then the prim-server with a timed recvfrom. + %% After the prim server notifies us (about the timeout) we order it + %% to close the socket, which should cause the all the secondary + %% server to return with error-closed. + + #{desc => "order 'secondary acceptor 1' to continue (accept)", + cmd => fun(#{sec_acc1 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order 'secondary acceptor 2' to continue (accept)", + cmd => fun(#{sec_acc2 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order 'secondary acceptor 3' to continue (accept)", + cmd => fun(#{sec_acc3 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order 'primary acceptor' to continue", + cmd => fun(#{prim_acc := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept, ?SECS(5)), + ok + end}, + #{desc => "await 'primary acceptor' ready (accept timeout)", + cmd => fun(#{prim_acc := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, prim_acc, accept_timeout) + end}, + #{desc => "order 'primary acceptor' to continue (close)", + cmd => fun(#{prim_acc := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, close), + ok + end}, + #{desc => "await 'primary acceptor' ready (close)", + cmd => fun(#{prim_acc := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, prim_acc, close) + end}, + #{desc => "await 'secondary acceptor 1' ready (accept closed)", + cmd => fun(#{sec_acc1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, sec_acc1, accept_closed) + end}, + #{desc => "await 'secondary acceptor 2' ready (accept closed)", + cmd => fun(#{sec_acc2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, sec_acc2, accept_closed) + end}, + #{desc => "await 'secondary acceptor 3' ready (accept closed)", + cmd => fun(#{sec_acc3 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, sec_acc3, accept_closed) + end}, + + + %% Terminations + #{desc => "order 'secondary acceptor 3' to terminate", + cmd => fun(#{sec_acc3 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await 'secondary acceptor 3' termination", + cmd => fun(#{sec_acc3 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + {ok, maps:remove(sec_acc3, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order 'secondary acceptor 2' to terminate", + cmd => fun(#{sec_acc2 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await 'secondary acceptor 2' termination", + cmd => fun(#{sec_acc2 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + {ok, maps:remove(sec_acc2, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order 'secondary acceptor 1' to terminate", + cmd => fun(#{sec_acc1 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await 'secondary acceptor 1' termination", + cmd => fun(#{sec_acc1 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + {ok, maps:remove(sec_acc1, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order 'primary acceptor' to terminate", + cmd => fun(#{prim_acc := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await 'primary acceptor' termination", + cmd => fun(#{prim_acc := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + {ok, maps:remove(prim_acc, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + i("start 'primary acceptor' evaluator"), + PrimAccInitState = InitState, + PrimAcc = ?SEV_START("prim-acceptor", PrimAcceptorSeq, PrimAccInitState), + + i("start 'secondary acceptor 1' evaluator"), + SecAccInitState = #{}, + SecAcc1 = ?SEV_START("sec-acceptor-1", SecAcceptorSeq, SecAccInitState), + + i("start 'secondary acceptor 2' evaluator"), + SecAcc2 = ?SEV_START("sec-acceptor-2", SecAcceptorSeq, SecAccInitState), + + i("start 'secondary acceptor 3' evaluator"), + SecAcc3 = ?SEV_START("sec-acceptor-3", SecAcceptorSeq, SecAccInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{prim_acc => PrimAcc#ev.pid, + sec_acc1 => SecAcc1#ev.pid, + sec_acc2 => SecAcc2#ev.pid, + sec_acc3 => SecAcc3#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([PrimAcc, SecAcc1, SecAcc2, SecAcc3, Tester]). + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test what happens when a socket is %% remotely closed while the process is calling the recv function. @@ -4421,7 +4844,7 @@ sc_rc_receive_response_tcp(InitState) -> ok end}, - %% The actual test (we expect three connections) + %% The actual test #{desc => "await continue (accept all three connections)", cmd => fun(#{tester := Tester} = _State) -> ?SEV_ANNOUNCE_CONTINUE(Tester, tester, accept) @@ -5048,32 +5471,6 @@ sc_rc_receive_response_tcp(InitState) -> Tester]). -start_node(Host, NodeName) -> - UniqueNodeName = f("~w_~w", [NodeName, erlang:unique_integer([positive])]), - case do_start_node(Host, UniqueNodeName) of - {ok, _} = OK -> - OK; - {error, Reason, _} -> - {error, Reason} - end. - -do_start_node(Host, NodeName) when is_list(NodeName) -> - do_start_node(Host, list_to_atom(NodeName)); -do_start_node(Host, NodeName) when is_atom(NodeName) -> - Dir = filename:dirname(code:which(?MODULE)), - Flags = "-pa " ++ Dir, - Opts = [{monitor_master, true}, {erl_flags, Flags}], - ct_slave:start(Host, NodeName, Opts). - - -stop_node(Node) -> - case ct_slave:stop(Node) of - {ok, _} -> - ok; - {error, _} = ERROR -> - ERROR - end. - local_host() -> try net_adm:localhost() of Host when is_list(Host) -> @@ -5275,57 +5672,52 @@ sc_rc_recvmsg_response_tcp6(_Config) when is_list(_Config) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This test case is intended to test what happens when a socket is -%% locally closed while the process is calling the accept function. -%% We test what happens with a non-controlling_process also, since we -%% git the setup anyway. +%% This test case is intended to test that the send and recv functions +%% behave as expected when sending and/or reading chunks. +%% First send data in one "big" chunk, and read it in "small" chunks. +%% Second, send in a bunch of "small" chunks, and read in one "big" chunk. %% Socket is IPv4. -sc_lc_acceptor_response_tcp4(suite) -> +traffic_send_and_recv_chunks_tcp4(suite) -> []; -sc_lc_acceptor_response_tcp4(doc) -> +traffic_send_and_recv_chunks_tcp4(doc) -> []; -sc_lc_acceptor_response_tcp4(_Config) when is_list(_Config) -> - tc_try(sc_lc_acceptor_response_tcp4, +traffic_send_and_recv_chunks_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_send_and_recv_chunks_tcp4, fun() -> - ?TT(?SECS(10)), - InitState = #{domain => inet, - type => stream, - protocol => tcp}, - ok = sc_lc_acceptor_response_tcp(InitState) + ?TT(?SECS(30)), + InitState = #{domain => inet}, + ok = traffic_send_and_recv_chunks_tcp(InitState) end). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This test case is intended to test what happens when a socket is -%% locally closed while the process is calling the accept function. -%% We test what happens with a non-controlling_process also, since we -%% git the setup anyway. +%% This test case is intended to test that the send and recv functions +%% behave as expected when sending and/or reading chunks. +%% First send data in one "big" chunk, and read it in "small" chunks. +%% Second, send in a bunch of "small" chunks, and read in one "big" chunk. %% Socket is IPv6. -sc_lc_acceptor_response_tcp6(suite) -> +traffic_send_and_recv_chunks_tcp6(suite) -> []; -sc_lc_acceptor_response_tcp6(doc) -> +traffic_send_and_recv_chunks_tcp6(doc) -> []; -sc_lc_acceptor_response_tcp6(_Config) when is_list(_Config) -> - tc_try(sc_lc_acceptor_response_tcp6, +traffic_send_and_recv_chunks_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_send_and_recv_chunks_tcp6, fun() -> not_yet_implemented(), - ?TT(?SECS(10)), - InitState = #{domain => inet, - type => stream, - protocol => tcp}, - ok = sc_lc_acceptor_response_tcp(InitState) + ?TT(?SECS(30)), + InitState = #{domain => inet6}, + ok = traffic_send_and_recv_chunks_tcp(InitState) end). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -sc_lc_acceptor_response_tcp(InitState) -> - PrimAcceptorSeq = +traffic_send_and_recv_chunks_tcp(InitState) -> + ServerSeq = [ %% *** Wait for start order part *** - #{desc => "await start (from tester)", + #{desc => "await start", cmd => fun(State) -> Tester = ?SEV_AWAIT_START(), {ok, State#{tester => Tester}} @@ -5341,87 +5733,213 @@ sc_lc_acceptor_response_tcp(InitState) -> cmd => fun(#{domain := Domain} = State) -> LAddr = which_local_addr(Domain), LSA = #{family => Domain, addr => LAddr}, - {ok, State#{lsa => LSA}} + {ok, State#{local_sa => LSA}} end}, - #{desc => "create (listen) socket", - cmd => fun(#{domain := Domain, - type := Type, - protocol := Proto} = State) -> - case socket:open(Domain, Type, Proto) of + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of {ok, Sock} -> - {ok, State#{sock => Sock}}; + {ok, State#{lsock => Sock}}; {error, _} = ERROR -> ERROR end end}, #{desc => "bind to local address", - cmd => fun(#{sock := Sock, lsa := LSA} = _State) -> - case socket:bind(Sock, LSA) of - {ok, _Port} -> - ok; + cmd => fun(#{lsock := LSock, local_sa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; {error, _} = ERROR -> ERROR end end}, #{desc => "make listen socket", - cmd => fun(#{sock := Sock}) -> - socket:listen(Sock) + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) end}, #{desc => "announce ready (init)", - cmd => fun(#{tester := Tester, sock := Sock} = _State) -> - ?SEV_ANNOUNCE_READY(Tester, init, Sock), + cmd => fun(#{tester := Tester, local_sa := LSA, lport := Port}) -> + ServerSA = LSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), ok end}, - + %% The actual test #{desc => "await continue (accept)", - cmd => fun(#{tester := Tester} = State) -> - case ?SEV_AWAIT_CONTINUE(Tester, tester, accept) of - {ok, Timeout} -> - {ok, State#{timeout => Timeout}}; - {error, _} = ERROR -> - ERROR - end + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) end}, - #{desc => "await connection", - cmd => fun(#{sock := Sock, timeout := Timeout} = _State) -> - case socket:accept(Sock, Timeout) of - {error, timeout} -> - ok; + #{desc => "accept", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of {ok, Sock} -> - ?SEV_EPRINT("unexpected success"), - (catch socket:close(Sock)), - {error, unexpected_success}; + {ok, State#{csock => Sock}}; {error, _} = ERROR -> ERROR end end}, - #{desc => "announce ready (accept timeout)", + #{desc => "announce ready (accept)", cmd => fun(#{tester := Tester}) -> - ?SEV_ANNOUNCE_READY(Tester, accept_timeout), + ?SEV_ANNOUNCE_READY(Tester, accept), ok end}, - #{desc => "await continue (close)", + + #{desc => "await continue (recv-many-small)", cmd => fun(#{tester := Tester} = _State) -> - ok = ?SEV_AWAIT_CONTINUE(Tester, tester, close) + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_many_small) end}, - #{desc => "close socket", - cmd => fun(#{sock := Sock} = State) -> - case socket:close(Sock) of - ok -> - {ok, maps:remove(sock, State)}; + #{desc => "recv chunk 1", + cmd => fun(#{csock := Sock} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 1 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)]}}; {error, _} = ERROR -> ERROR end end}, - #{desc => "announce ready (close)", - cmd => fun(#{tester := Tester}) -> - ?SEV_ANNOUNCE_READY(Tester, close), - ok + #{desc => "recv chunk 2", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 2 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 3", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 3 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 4", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 4 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 5", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 5 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 6", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 6 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 7", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 7 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 8", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 8 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 9", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 9 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv chunk 10", + cmd => fun(#{csock := Sock, + chunks := Chunks} = State) -> + case socket:recv(Sock, 100) of + {ok, Chunk} -> + ?SEV_IPRINT("recv of chunk 10 of ~p bytes", + [size(Chunk)]), + {ok, State#{chunks => [b2l(Chunk)|Chunks]}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv-many-small)", + cmd => fun(#{tester := Tester, + chunks := Chunks} = State) -> + Data = lists:flatten(lists:reverse(Chunks)), + ?SEV_ANNOUNCE_READY(Tester, recv_many_small, Data), + {ok, maps:remove(chunks, State)} end}, - % Termination - #{desc => "await terminate", + #{desc => "await continue (recv-one-big)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, recv_one_big) of + {ok, Size} -> + {ok, State#{size => Size}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "recv (one big)", + cmd => fun(#{tester := Tester, csock := Sock, size := Size} = _State) -> + %% ok = socket:setopt(Sock, otp, debug, true), + case socket:recv(Sock, Size) of + {ok, Data} -> + %% ok = socket:setopt(Sock, otp, debug, false), + ?SEV_ANNOUNCE_READY(Tester, + recv_one_big, + b2l(Data)), + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "await terminate (from tester)", cmd => fun(#{tester := Tester} = State) -> case ?SEV_AWAIT_TERMINATE(Tester, tester) of ok -> @@ -5429,61 +5947,419 @@ sc_lc_acceptor_response_tcp(InitState) -> {error, _} = ERROR -> ERROR end - end}, + end}, + #{desc => "close connection socket (just in case)", + cmd => fun(#{csock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(csock, State)} + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(lsock, State)} + end}, %% *** We are done *** ?SEV_FINISH_NORMAL ], - SecAcceptorSeq = + ClientSeq = [ - %% *** Init part *** + %% *** Wait for start order part *** #{desc => "await start", cmd => fun(State) -> - {Tester, Sock} = ?SEV_AWAIT_START(), - {ok, State#{tester => Tester, sock => Sock}} + {Tester, ServerSA} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, + server_sa => ServerSA}} end}, #{desc => "monitor tester", cmd => fun(#{tester := Tester} = _State) -> _MRef = erlang:monitor(process, Tester), ok end}, + + %% *** Init part *** + #{desc => "create node", + cmd => fun(#{host := Host} = State) -> + case start_node(Host, client) of + {ok, Node} -> + ?SEV_IPRINT("(remote) client node ~p started", + [Node]), + {ok, State#{node => Node}}; + {error, Reason, _} -> + {error, Reason} + end + end}, + #{desc => "monitor client node", + cmd => fun(#{node := Node} = _State) -> + true = erlang:monitor_node(Node, true), + ok + end}, + #{desc => "start remote client", + cmd => fun(#{node := Node} = State) -> + Pid = traffic_snr_tcp_client_start(Node), + ?SEV_IPRINT("client ~p started", [Pid]), + {ok, State#{rclient => Pid}} + end}, + #{desc => "monitor remote client", + cmd => fun(#{rclient := Pid}) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "order remote client to start", + cmd => fun(#{rclient := Client, server_sa := ServerSA}) -> + ?SEV_ANNOUNCE_START(Client, ServerSA), + ok + end}, + #{desc => "await remote client ready", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + ?SEV_AWAIT_READY(Client, rclient, init, + [{tester, Tester}]) + end}, #{desc => "announce ready (init)", cmd => fun(#{tester := Tester}) -> - ?SEV_ANNOUNCE_READY(Tester, init) + ?SEV_ANNOUNCE_READY(Tester, init), + ok end}, %% The actual test - #{desc => "await continue (accept)", - cmd => fun(#{tester := Tester} = _State) -> - ok = ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect, + [{rclient, Client}]), + ok end}, - #{desc => "accept", - cmd => fun(#{sock := Sock} = State) -> - case socket:accept(Sock) of - {error, closed} -> - {ok, maps:remove(sock, State)}; - {ok, _} -> - {error, unexpected_success}; + #{desc => "order remote client to continue (connect)", + cmd => fun(#{rclient := Client}) -> + ?SEV_ANNOUNCE_CONTINUE(Client, connect), + ok + end}, + #{desc => "await client process ready (connect)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + ?SEV_AWAIT_READY(Client, rclient, connect, + [{tester, Tester}]) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + #{desc => "await continue (send-one-big)", + cmd => fun(#{tester := Tester, + rclient := Client} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, + send_one_big, + [{rclient, Client}]) of + {ok, Data} -> + {ok, State#{data => Data}}; {error, _} = ERROR -> ERROR end end}, - #{desc => "announce ready (accept closed)", + #{desc => "order remote client to continue (send)", + cmd => fun(#{rclient := Client, data := Data}) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send, Data), + ok + end}, + #{desc => "await client process ready (send)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (send-one-big)", cmd => fun(#{tester := Tester}) -> - ?SEV_ANNOUNCE_READY(Tester, accept_closed) + ?SEV_ANNOUNCE_READY(Tester, send_one_big), + ok + end}, + + #{desc => "await continue (send-many-small)", + cmd => fun(#{tester := Tester, + rclient := Client} = State) -> + case ?SEV_AWAIT_CONTINUE(Tester, tester, + send_many_small, + [{rclient, Client}]) of + {ok, Data} -> + {ok, State#{data => Data}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 1)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 1: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 1)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 2)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 2: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 2)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 3)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 3: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 3)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 4)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 4: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 4)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 5)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 5: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 5)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 6)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 6: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 6)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 7)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 7: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 7)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 8)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 8: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 8)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 9)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, RestData} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 9: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, State#{data => RestData}} + end}, + #{desc => "await client process ready (send chunk 9)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send chunk 10)", + cmd => fun(#{rclient := Client, + data := Data} = State) -> + {Chunk, []} = lists:split(100, Data), + %% ?SEV_IPRINT("order send of chunk 10: " + %% "~n Size: ~p" + %% "~n ~p", [length(Chunk), Chunk]), + ?SEV_ANNOUNCE_CONTINUE(Client, send, Chunk), + {ok, maps:remove(data, State)} + end}, + #{desc => "await client process ready (send chunk 10)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order remote client to continue (send stop)", + cmd => fun(#{rclient := Client} = State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send, stop), + {ok, maps:remove(data, State)} + end}, + #{desc => "await client process ready (send stop)", + cmd => fun(#{tester := Tester, + rclient := Client} = _State) -> + case ?SEV_AWAIT_READY(Client, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + Result; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (send-many-small)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_many_small), + ok end}, %% Termination #{desc => "await terminate (from tester)", - cmd => fun(#{tester := Tester} = State) -> - case ?SEV_AWAIT_TERMINATE(Tester, tester) of + cmd => fun(#{tester := Tester, + rclient := Client} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester, + [{rclient, Client}]) of ok -> {ok, maps:remove(tester, State)}; {error, _} = ERROR -> ERROR end end}, + #{desc => "kill remote client", + cmd => fun(#{rclient := Client}) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await remote client termination", + cmd => fun(#{rclient := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(rclient, State), + {ok, State1} + end}, + #{desc => "stop client node", + cmd => fun(#{node := Node} = _State) -> + stop_node(Node) + end}, + #{desc => "await client node termination", + cmd => fun(#{node := Node} = State) -> + receive + {nodedown, Node} -> + {ok, maps:remove(node, State)} + end + end}, %% *** We are done *** ?SEV_FINISH_NORMAL @@ -5492,183 +6368,167 @@ sc_lc_acceptor_response_tcp(InitState) -> TesterSeq = [ %% *** Init part *** - #{desc => "monitor 'primary acceptor'", - cmd => fun(#{prim_acc := Pid} = _State) -> - _MRef = erlang:monitor(process, Pid), - ok - end}, - #{desc => "monitor 'secondary acceptor 1'", - cmd => fun(#{sec_acc1 := Pid} = _State) -> - _MRef = erlang:monitor(process, Pid), - ok - end}, - #{desc => "monitor secondary acceptor 2", - cmd => fun(#{sec_acc2 := Pid} = _State) -> + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> _MRef = erlang:monitor(process, Pid), ok end}, - #{desc => "monitor secondary acceptor 3", - cmd => fun(#{sec_acc3 := Pid} = _State) -> + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> _MRef = erlang:monitor(process, Pid), ok end}, - %% Start the primary server - #{desc => "order 'primary acceptor' start", - cmd => fun(#{prim_acc := Pid} = _State) -> + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> ?SEV_ANNOUNCE_START(Pid), ok end}, - #{desc => "await 'primary acceptor' ready (init)", - cmd => fun(#{prim_acc := Pid} = State) -> - {ok, Sock} = ?SEV_AWAIT_READY(Pid, prim_acc, init), - {ok, State#{sock => Sock}} - end}, - - %% Start the secondary acceptor 1 - #{desc => "order 'secondary acceptor 1' start", - cmd => fun(#{sec_acc1 := Pid, sock := Sock} = _State) -> - ?SEV_ANNOUNCE_START(Pid, Sock), - ok - end}, - #{desc => "await 'secondary acceptor 1' ready (init)", - cmd => fun(#{sec_acc1 := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, sec_acc1, init) - end}, - - %% Start the secondary acceptor 2 - #{desc => "order 'secondary acceptor 2' start", - cmd => fun(#{sec_acc2 := Pid, sock := Sock} = _State) -> - ?SEV_ANNOUNCE_START(Pid, Sock), - ok - end}, - #{desc => "await 'secondary acceptor 2' ready (init)", - cmd => fun(#{sec_acc2 := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, sec_acc2, init) + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_sa => ServerSA}} end}, - %% Start the secondary acceptor 3 - #{desc => "order 'secondary acceptor 3' start", - cmd => fun(#{sec_acc3 := Pid, sock := Sock} = _State) -> - ?SEV_ANNOUNCE_START(Pid, Sock), + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, + server_sa := ServerSA} = _State) -> + ?SEV_ANNOUNCE_START(Pid, ServerSA), ok end}, - #{desc => "await 'secondary acceptor 3' ready (init)", - cmd => fun(#{sec_acc3 := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, sec_acc3, init) + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) end}, - - + %% The actual test - %% Make all the seondary servers continue, with an infinit recvfrom - %% and then the prim-server with a timed recvfrom. - %% After the prim server notifies us (about the timeout) we order it - %% to close the socket, which should cause the all the secondary - %% server to return with error-closed. - - #{desc => "order 'secondary acceptor 1' to continue (accept)", - cmd => fun(#{sec_acc1 := Pid} = _State) -> - ?SEV_ANNOUNCE_CONTINUE(Pid, accept), - ok - end}, - ?SEV_SLEEP(?SECS(1)), - #{desc => "order 'secondary acceptor 2' to continue (accept)", - cmd => fun(#{sec_acc2 := Pid} = _State) -> - ?SEV_ANNOUNCE_CONTINUE(Pid, accept), - ok - end}, - ?SEV_SLEEP(?SECS(1)), - #{desc => "order 'secondary acceptor 3' to continue (accept)", - cmd => fun(#{sec_acc3 := Pid} = _State) -> + #{desc => "order server continue (accept)", + cmd => fun(#{server := Pid} = _State) -> ?SEV_ANNOUNCE_CONTINUE(Pid, accept), ok end}, ?SEV_SLEEP(?SECS(1)), - #{desc => "order 'primary acceptor' to continue", - cmd => fun(#{prim_acc := Pid} = _State) -> - ?SEV_ANNOUNCE_CONTINUE(Pid, accept, ?SECS(5)), + #{desc => "order client continue (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), ok end}, - #{desc => "await 'primary acceptor' ready (accept timeout)", - cmd => fun(#{prim_acc := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, prim_acc, accept_timeout) - end}, - #{desc => "order 'primary acceptor' to continue (close)", - cmd => fun(#{prim_acc := Pid} = _State) -> - ?SEV_ANNOUNCE_CONTINUE(Pid, close), + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept, + [{client, Client}]), ok end}, - #{desc => "await 'primary acceptor' ready (close)", - cmd => fun(#{prim_acc := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, prim_acc, close) - end}, - #{desc => "await 'secondary acceptor 1' ready (accept closed)", - cmd => fun(#{sec_acc1 := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, sec_acc1, accept_closed) - end}, - #{desc => "await 'secondary acceptor 2' ready (accept closed)", - cmd => fun(#{sec_acc2 := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, sec_acc2, accept_closed) + #{desc => "await client ready (connect)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect, + [{server, Server}]) end}, - #{desc => "await 'secondary acceptor 3' ready (accept closed)", - cmd => fun(#{sec_acc3 := Pid} = _State) -> - ok = ?SEV_AWAIT_READY(Pid, sec_acc3, accept_closed) + + #{desc => "generate data", + cmd => fun(State) -> + D1 = lists:seq(1,250), + D2 = lists:duplicate(4, D1), + D3 = lists:flatten(D2), + {ok, State#{data => D3}} end}, - - %% Terminations - #{desc => "order 'secondary acceptor 3' to terminate", - cmd => fun(#{sec_acc3 := Pid} = _State) -> - ?SEV_ANNOUNCE_TERMINATE(Pid), + %% (client) Send one big and (server) recv may small + #{desc => "order server continue (recv-many-small)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv_many_small), ok end}, - #{desc => "await 'secondary acceptor 3' termination", - cmd => fun(#{sec_acc3 := Pid} = State) -> - case ?SEV_AWAIT_TERMINATION(Pid) of - ok -> - {ok, maps:remove(sec_acc3, State)}; + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (send-one-big)", + cmd => fun(#{client := Pid, data := Data} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send_one_big, Data), + ok + end}, + #{desc => "await client ready (send-one-big)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ok = ?SEV_AWAIT_READY(Client, client, send_one_big, + [{server, Server}]) + end}, + #{desc => "await server ready (recv-many-small)", + cmd => fun(#{server := Server, + client := Client, + data := Data} = _State) -> + case ?SEV_AWAIT_READY(Server, server, recv_many_small, + [{client, Client}]) of + {ok, Data} -> + ok; + {ok, OtherData} -> + {error, {mismatched_data, Data, OtherData}}; {error, _} = ERROR -> ERROR end end}, - #{desc => "order 'secondary acceptor 2' to terminate", - cmd => fun(#{sec_acc2 := Pid} = _State) -> - ?SEV_ANNOUNCE_TERMINATE(Pid), + + #{desc => "order server continue (recv-one-big)", + cmd => fun(#{server := Pid, data := Data} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv_one_big, length(Data)), ok end}, - #{desc => "await 'secondary acceptor 2' termination", - cmd => fun(#{sec_acc2 := Pid} = State) -> - case ?SEV_AWAIT_TERMINATION(Pid) of - ok -> - {ok, maps:remove(sec_acc2, State)}; + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (send-many-small)", + cmd => fun(#{client := Pid, data := Data} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send_many_small, Data), + ok + end}, + #{desc => "await client ready (send-many-small)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ok = ?SEV_AWAIT_READY(Client, client, send_many_small, + [{server, Server}]) + end}, + #{desc => "await server ready (recv-one-big)", + cmd => fun(#{server := Server, + client := Client, + data := Data} = State) -> + case ?SEV_AWAIT_READY(Server, server, recv_one_big, + [{client, Client}]) of + {ok, Data} -> + {ok, maps:remove(data, State)}; + {ok, OtherData} -> + {error, {mismatched_data, Data, OtherData}}; {error, _} = ERROR -> ERROR end end}, - #{desc => "order 'secondary acceptor 1' to terminate", - cmd => fun(#{sec_acc1 := Pid} = _State) -> + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Pid} = _State) -> ?SEV_ANNOUNCE_TERMINATE(Pid), ok end}, - #{desc => "await 'secondary acceptor 1' termination", - cmd => fun(#{sec_acc1 := Pid} = State) -> + #{desc => "await client termination", + cmd => fun(#{client := Pid} = State) -> case ?SEV_AWAIT_TERMINATION(Pid) of ok -> - {ok, maps:remove(sec_acc1, State)}; + State1 = maps:remove(client, State), + {ok, State1}; {error, _} = ERROR -> ERROR end end}, - #{desc => "order 'primary acceptor' to terminate", - cmd => fun(#{prim_acc := Pid} = _State) -> + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> ?SEV_ANNOUNCE_TERMINATE(Pid), ok end}, - #{desc => "await 'primary acceptor' termination", - cmd => fun(#{prim_acc := Pid} = State) -> + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> case ?SEV_AWAIT_TERMINATION(Pid) of ok -> - {ok, maps:remove(prim_acc, State)}; + State1 = maps:remove(server, State), + {ok, State1}; {error, _} = ERROR -> ERROR end @@ -5679,31 +6539,128 @@ sc_lc_acceptor_response_tcp(InitState) -> ?SEV_FINISH_NORMAL ], + i("start server evaluator"), + ServerInitState = InitState, + Server = ?SEV_START("server", ServerSeq, ServerInitState), - i("start 'primary acceptor' evaluator"), - PrimAccInitState = InitState, - PrimAcc = ?SEV_START("prim-acceptor", PrimAcceptorSeq, PrimAccInitState), - - i("start 'secondary acceptor 1' evaluator"), - SecAccInitState = #{}, - SecAcc1 = ?SEV_START("sec-acceptor-1", SecAcceptorSeq, SecAccInitState), - - i("start 'secondary acceptor 2' evaluator"), - SecAcc2 = ?SEV_START("sec-acceptor-2", SecAcceptorSeq, SecAccInitState), - - i("start 'secondary acceptor 3' evaluator"), - SecAcc3 = ?SEV_START("sec-acceptor-3", SecAcceptorSeq, SecAccInitState), + i("start client evaluator(s)"), + ClientInitState = InitState#{host => local_host()}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), i("start 'tester' evaluator"), - TesterInitState = #{prim_acc => PrimAcc#ev.pid, - sec_acc1 => SecAcc1#ev.pid, - sec_acc2 => SecAcc2#ev.pid, - sec_acc3 => SecAcc3#ev.pid}, + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, Tester = ?SEV_START("tester", TesterSeq, TesterInitState), i("await evaluator"), - ok = ?SEV_AWAIT_FINISH([PrimAcc, SecAcc1, SecAcc2, SecAcc3, Tester]). + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + +traffic_snr_tcp_client_start(Node) -> + Self = self(), + GL = group_leader(), + Fun = fun() -> traffic_snr_tcp_client(Self, GL) end, + erlang:spawn(Node, Fun). + +traffic_snr_tcp_client(Parent, GL) -> + {Sock, ServerSA} = traffic_snr_tcp_client_init(Parent, GL), + traffic_snr_tcp_client_announce_ready(Parent, init), + traffic_snr_tcp_client_await_continue(Parent, connect), + traffic_snr_tcp_client_connect(Sock, ServerSA), + traffic_snr_tcp_client_announce_ready(Parent, connect), + traffic_snr_tcp_client_send_loop(Parent, Sock), + Reason = traffic_snr_tcp_client_await_terminate(Parent), + traffic_snr_tcp_client_close(Sock), + exit(Reason). + + +traffic_snr_tcp_client_send_loop(Parent, Sock) -> + case ?SEV_AWAIT_CONTINUE(Parent, parent, send) of + {ok, stop} -> % Breakes the loop + ?SEV_ANNOUNCE_READY(Parent, send, ok), + ok; + {ok, Data} -> + case socket:send(Sock, Data) of + ok -> + ?SEV_ANNOUNCE_READY(Parent, send, ok), + traffic_snr_tcp_client_send_loop(Parent, Sock); + {error, Reason} = ERROR -> + ?SEV_ANNOUNCE_READY(Parent, send, ERROR), + exit({send, Reason}) + end; + {error, Reason} -> + exit({await_continue, Reason}) + end. + +traffic_snr_tcp_client_init(Parent, GL) -> + i("traffic_snr_tcp_client_init -> entry"), + _MRef = erlang:monitor(process, Parent), + group_leader(self(), GL), + ServerSA = traffic_snr_tcp_client_await_start(Parent), + Domain = maps:get(family, ServerSA), + Sock = traffic_snr_tcp_client_create(Domain), + traffic_snr_tcp_client_bind(Sock, Domain), + {Sock, ServerSA}. + +traffic_snr_tcp_client_await_start(Parent) -> + i("traffic_snr_tcp_client_await_start -> entry"), + ?SEV_AWAIT_START(Parent). +traffic_snr_tcp_client_create(Domain) -> + i("traffic_snr_tcp_client_create -> entry"), + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + Sock; + {error, Reason} -> + exit({open_failed, Reason}) + end. + +traffic_snr_tcp_client_bind(Sock, Domain) -> + i("traffic_snr_tcp_client_bind -> entry"), + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, + addr => LAddr}, + case socket:bind(Sock, LSA) of + {ok, _} -> + ok; + {error, Reason} -> + exit({bind, Reason}) + end. + +traffic_snr_tcp_client_announce_ready(Parent, Slogan) -> + ?SEV_ANNOUNCE_READY(Parent, Slogan). + +traffic_snr_tcp_client_await_continue(Parent, Slogan) -> + i("traffic_snr_tcp_client_await_continue -> entry"), + ?SEV_AWAIT_CONTINUE(Parent, parent, Slogan). + +traffic_snr_tcp_client_connect(Sock, ServerSA) -> + i("traffic_snr_tcp_client_connect -> entry"), + case socket:connect(Sock, ServerSA) of + ok -> + ok; + {error, Reason} -> + exit({connect, Reason}) + end. + +traffic_snr_tcp_client_close(Sock) -> + i("traffic_snr_tcp_client_close -> entry"), + case socket:close(Sock) of + ok -> + ok; + {error, Reason} -> + exit({close, Reason}) + end. + +traffic_snr_tcp_client_await_terminate(Parent) -> + i("traffic_snr_tcp_client_await_terminate -> entry"), + case ?SEV_AWAIT_TERMINATE(Parent, parent) of + ok -> + ok; + {error, Reason} -> + Reason + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -5740,6 +6697,36 @@ which_addr2(Domain, [_|IFO]) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +start_node(Host, NodeName) -> + UniqueNodeName = f("~w_~w", [NodeName, erlang:unique_integer([positive])]), + case do_start_node(Host, UniqueNodeName) of + {ok, _} = OK -> + OK; + {error, Reason, _} -> + {error, Reason} + end. + +do_start_node(Host, NodeName) when is_list(NodeName) -> + do_start_node(Host, list_to_atom(NodeName)); +do_start_node(Host, NodeName) when is_atom(NodeName) -> + Dir = filename:dirname(code:which(?MODULE)), + Flags = "-pa " ++ Dir, + Opts = [{monitor_master, true}, {erl_flags, Flags}], + ct_slave:start(Host, NodeName, Opts). + + +stop_node(Node) -> + case ct_slave:stop(Node) of + {ok, _} -> + ok; + {error, _} = ERROR -> + ERROR + end. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + sock_open(Domain, Type, Proto) -> try socket:open(Domain, Type, Proto) of {ok, Socket} -> @@ -5936,6 +6923,9 @@ tc_which_name() -> l2a(S) when is_list(S) -> list_to_atom(S). +b2l(B) when is_binary(B) -> + binary_to_list(B). + f(F, A) -> lists:flatten(io_lib:format(F, A)). diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex a0bf156263..480f86334c 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index acf5e18cec..5ebc2074e0 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1716,10 +1716,14 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), + %% p("do_recv -> try read with" + %% "~n Length: ~p", [Length]), case nif_recv(SockRef, RecvRef, Length, EFlags) of {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> + %% p("do_recv -> complete success: ~w", [size(Bin)]), {ok, Bin}; {ok, true = _Complete, Bin} -> + %% p("do_recv -> completed success: ~w (~w)", [size(Bin), size(Acc)]), {ok, <<Acc/binary, Bin/binary>>}; %% It depends on the amount of bytes we tried to read: @@ -1728,6 +1732,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> + %% p("do_recv -> partial success: ~w", [size(Bin)]), do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, @@ -1737,6 +1742,8 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% We got the first chunk of it. %% We will be notified (select message) when there %% is more to read. + %% p("do_recv -> partial success(~w): ~w" + %% "~n ~p", [Length, size(Bin), Bin]), NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> @@ -1756,6 +1763,8 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {ok, false = _Completed, Bin} -> %% We got a chunk of it! + %% p("do_recv -> partial success(~w): ~w (~w)", + %% [Length, size(Bin), size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> @@ -1780,6 +1789,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). + %% p("do_recv -> eagain(~w): ~w", [Length, size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> @@ -3473,6 +3483,7 @@ tdiff(T1, T2) -> %% p(undefined, F, A) -> %% p("***", F, A); %% p(SName, F, A) -> +%% io:format(user,"[~s,~p] " ++ F ++ "~n", [SName, self()|A]), %% io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]). |