From 0c6a8375990e491405e2282e5e038d384727f1e2 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 1 Nov 2018 18:30:09 +0100 Subject: [sock-nif|test] Add a ping-pong test case We got some kind of send hang... --- erts/emulator/nifs/common/socket_nif.c | 20 +- erts/emulator/test/socket_SUITE.erl | 1408 +++++++++++++++++++++++++++----- 2 files changed, 1224 insertions(+), 204 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 27395b5cf6..389d43ee42 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -13314,12 +13314,6 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); - /* - * SHOULD RESULT IN {error, eagain}!!!! - * - */ - written = 0; - } } @@ -13349,7 +13343,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, descP, NULL, sendRef); SSDBG( descP, - ("SOCKET", "send_check_result -> not entire package written\r\n") ); + ("SOCKET", "send_check_result -> " + "not entire package written (%d of %d)\r\n", written, dataSize) ); return esock_make_ok2(env, MKI(env, written)); @@ -13687,15 +13682,26 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { + int sres; + SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); + SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") ); + + /* SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); + */ + sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ), + descP, NULL, recvRef); + + SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); + return esock_make_error(env, esock_atom_eagain); } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 7a1a362181..90e9407f6a 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -88,7 +88,19 @@ %% Traffic traffic_send_and_recv_chunks_tcp4/1, - traffic_send_and_recv_chunks_tcp6/1 + traffic_send_and_recv_chunks_tcp6/1, + traffic_ping_pong_small_send_and_recv_tcp4/1, + traffic_ping_pong_small_send_and_recv_tcp6/1, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4/1, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_medium_send_and_recv_tcp4/1, + traffic_ping_pong_medium_send_and_recv_tcp6/1, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4/1, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_large_send_and_recv_tcp4/1, + traffic_ping_pong_large_send_and_recv_tcp6/1%, + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4/1, + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6/1 %% Tickets @@ -115,6 +127,8 @@ -define(TT(T), ct:timetrap(T)). +-define(LIB, socket_test_lib). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -243,7 +257,20 @@ sc_rc_cases() -> traffic_cases() -> [ traffic_send_and_recv_chunks_tcp4, - traffic_send_and_recv_chunks_tcp6 + traffic_send_and_recv_chunks_tcp6, + + traffic_ping_pong_small_send_and_recv_tcp4, + traffic_ping_pong_small_send_and_recv_tcp6, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_medium_send_and_recv_tcp4, + traffic_ping_pong_medium_send_and_recv_tcp6, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_large_send_and_recv_tcp4, + traffic_ping_pong_large_send_and_recv_tcp6%% , + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6 ]. @@ -5713,6 +5740,8 @@ traffic_send_and_recv_chunks_tcp6(_Config) when is_list(_Config) -> end). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + traffic_send_and_recv_chunks_tcp(InitState) -> ServerSeq = [ @@ -6663,223 +6692,1189 @@ traffic_snr_tcp_client_await_terminate(Parent) -> end. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% This gets the local address (not 127.0...) -%% We should really implement this using the (new) net module, -%% but until that gets the necessary functionality... -which_local_addr(Domain) -> - case inet:getifaddrs() of - {ok, IFL} -> - which_addr(Domain, IFL); - {error, Reason} -> - ?FAIL({inet, getifaddrs, Reason}) - end. - -which_addr(_Domain, []) -> - ?FAIL(no_address); -which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> - which_addr2(Domain, IFO); -which_addr(Domain, [_|IFL]) -> - which_addr(Domain, IFL). - -which_addr2(_Domain, []) -> - ?FAIL(no_address); -which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> - Addr; -which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> - Addr; -which_addr2(Domain, [_|IFO]) -> - which_addr2(Domain, IFO). - - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -start_node(Host, NodeName) -> - UniqueNodeName = f("~w_~w", [NodeName, erlang:unique_integer([positive])]), - case do_start_node(Host, UniqueNodeName) of - {ok, _} = OK -> - OK; - {error, Reason, _} -> - {error, Reason} - end. - -do_start_node(Host, NodeName) when is_list(NodeName) -> - do_start_node(Host, list_to_atom(NodeName)); -do_start_node(Host, NodeName) when is_atom(NodeName) -> - Dir = filename:dirname(code:which(?MODULE)), - Flags = "-pa " ++ Dir, - Opts = [{monitor_master, true}, {erl_flags, Flags}], - ct_slave:start(Host, NodeName, Opts). - - -stop_node(Node) -> - case ct_slave:stop(Node) of - {ok, _} -> - ok; - {error, _} = ERROR -> - ERROR - end. - +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv4. + +traffic_ping_pong_small_send_and_recv_tcp4(suite) -> + []; +traffic_ping_pong_small_send_and_recv_tcp4(doc) -> + []; +traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_small_send_and_recv_tcp4, + fun() -> + ?TT(?SECS(15)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + %% Recv = fun(Sock) -> socket:recv(Sock, 0, 5000) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + end). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -sock_open(Domain, Type, Proto) -> - try socket:open(Domain, Type, Proto) of - {ok, Socket} -> - Socket; - {error, Reason} -> - ?FAIL({open, Reason}) - catch - C:E:S -> - ?FAIL({open, C, E, S}) - end. - - -sock_bind(Sock, SockAddr) -> - try socket:bind(Sock, SockAddr) of - {ok, Port} -> - Port; - {error, Reason} -> - i("sock_bind -> error: ~p", [Reason]), - ?FAIL({bind, Reason}) - catch - C:E:S -> - i("sock_bind -> failed: ~p, ~p, ~p", [C, E, S]), - ?FAIL({bind, C, E, S}) - end. - -sock_connect(Sock, SockAddr) -> - try socket:connect(Sock, SockAddr) of - ok -> - ok; - {error, Reason} -> - ?FAIL({connect, Reason}) - catch - C:E:S -> - ?FAIL({connect, C, E, S}) - end. - -sock_sockname(Sock) -> - try socket:sockname(Sock) of - {ok, SockAddr} -> - SockAddr; - {error, Reason} -> - ?FAIL({sockname, Reason}) - catch - C:E:S -> - ?FAIL({sockname, C, E, S}) - end. - - -%% sock_listen(Sock) -> -%% sock_listen2(fun() -> socket:listen(Sock) end). - -%% sock_listen(Sock, BackLog) -> -%% sock_listen2(fun() -> socket:listen(Sock, BackLog) end). - -%% sock_listen2(Listen) -> -%% try Listen() of -%% ok -> -%% ok; -%% {error, Reason} -> -%% ?FAIL({listen, Reason}) -%% catch -%% C:E:S -> -%% ?FAIL({listen, C, E, S}) -%% end. - - -%% sock_accept(LSock) -> -%% try socket:accept(LSock) of -%% {ok, Sock} -> -%% Sock; -%% {error, Reason} -> -%% i("sock_accept -> error: ~p", [Reason]), -%% ?FAIL({accept, Reason}) -%% catch -%% C:E:S -> -%% i("sock_accept -> failed: ~p, ~p, ~p", [C, E, S]), -%% ?FAIL({accept, C, E, S}) -%% end. - - -sock_close(Sock) -> - try socket:close(Sock) of - ok -> - ok; - {error, Reason} -> - i("sock_close -> error: ~p", [Reason]), - ?FAIL({close, Reason}) - catch - C:E:S -> - i("sock_close -> failed: ~p, ~p, ~p", [C, E, S]), - ?FAIL({close, C, E, S}) - end. - +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv6. + +traffic_ping_pong_small_send_and_recv_tcp6(suite) -> + []; +traffic_ping_pong_small_send_and_recv_tcp6(doc) -> + []; +traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_small_send_and_recv_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + end). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -not_yet_implemented() -> - skip("not yet implemented"). - -skip(Reason) -> - throw({skip, Reason}). +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv4. + +traffic_ping_pong_medium_send_and_recv_tcp4(suite) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp4(doc) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_medium_send_and_recv_tcp4, + fun() -> + %% not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + end). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv6. + +traffic_ping_pong_medium_send_and_recv_tcp6(suite) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp6(doc) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_medium_send_and_recv_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + end). -t() -> - os:timestamp(). - - -tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> - T1 = A1*1000000000+B1*1000+(C1 div 1000), - T2 = A2*1000000000+B2*1000+(C2 div 1000), - T2 - T1. -formated_timestamp() -> - format_timestamp(os:timestamp()). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'large' message test case, for IPv4. + +traffic_ping_pong_large_send_and_recv_tcp4(suite) -> + []; +traffic_ping_pong_large_send_and_recv_tcp4(doc) -> + []; +traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_large_send_and_recv_tcp4, + fun() -> + %% not_yet_implemented(), + ?TT(?MINS(5)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + end). -format_timestamp({_N1, _N2, _N3} = TS) -> - {_Date, Time} = calendar:now_to_local_time(TS), - %% {YYYY,MM,DD} = Date, - {Hour,Min,Sec} = Time, - %% FormatTS = - %% io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w.~w", - %% [YYYY, MM, DD, Hour, Min, Sec, N3]), - FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w", [Hour, Min, Sec]), - lists:flatten(FormatTS). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'large' message test case, for IPv6. + +traffic_ping_pong_large_send_and_recv_tcp6(suite) -> + []; +traffic_ping_pong_large_send_and_recv_tcp6(doc) -> + []; +traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_large_send_and_recv_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?MINS(5)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + end). -set_tc_name(N) when is_atom(N) -> - set_tc_name(atom_to_list(N)); -set_tc_name(N) when is_list(N) -> - put(tc_name, N). -%% get_tc_name() -> -%% get(tc_name). -tc_begin(TC) -> - set_tc_name(TC), - tc_print("begin ***", - "~n----------------------------------------------------~n", ""). - -tc_end(Result) when is_list(Result) -> - tc_print("done: ~s", [Result], - "", "----------------------------------------------------~n~n"), - ok. +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-define(SMALL, lists:seq(1, 8)). +-define(MEDIUM, lists:flatten(lists:duplicate(1024, ?SMALL))). +-define(LARGE, lists:flatten(lists:duplicate(1024, ?MEDIUM))). + +traffic_ping_pong_small_send_and_receive_tcp(InitState) -> + Msg = l2b(?SMALL), + Num = 100000, + Fun = fun(_) -> ok end, %% Fun to update the buffers: Not needed here + traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, + num => Num, + buf_init => Fun}). + +traffic_ping_pong_medium_send_and_receive_tcp(InitState) -> + Msg = l2b(?MEDIUM), + Num = 100000, + Fun = fun(_) -> ok end, %% Fun to update the buffers: MAYBE needed here + traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, + num => Num, + buf_init => Fun}). + +traffic_ping_pong_large_send_and_receive_tcp(InitState) -> + Msg = l2b(?LARGE), + Num = 10, + Fun = fun(Sock) -> + ok = socket:setopt(Sock, socket, rcvbuf, 64*1024*1024), + ok = socket:setopt(Sock, socket, sndbuf, 64*1024*1024), + %% ok = socket:setopt(Sock, otp, rcvbuf, 64*1024*1024), + %% ok = socket:setopt(Sock, otp, sndbuf, 64*1024*1024), + ok + end, %% Fun to update the buffers: NEEDED here!!! + traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, + num => Num, + buf_init => Fun}). + +traffic_ping_pong_send_and_receive_tcp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, -tc_try(Case, Fun) when is_atom(Case) andalso is_function(Fun, 0) -> - tc_begin(Case), + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, local_sa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, local_sa := LSA, lport := Port}) -> + ServerSA = LSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "accept", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, Sock} -> + {ok, State#{csock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "create handler", + cmd => fun(State) -> + Handler = tpp_tcp_handler_create(), + ?SEV_IPRINT("handler created: ~p", [Handler]), + {ok, State#{handler => Handler}} + end}, + #{desc => "monitor handler", + cmd => fun(#{handler := Handler} = _State) -> + _MRef = erlang:monitor(process, Handler), + ok + end}, + #{desc => "transfer connection socket ownership to handler", + cmd => fun(#{handler := Handler, csock := Sock} = _State) -> + socket:setopt(Sock, otp, controlling_process, Handler) + end}, + #{desc => "start handler", + cmd => fun(#{handler := Handler, + csock := Sock, + buf_init := BufInit, + send := Send, + recv := Recv} = _State) -> + ?SEV_ANNOUNCE_START(Handler, + {Sock, BufInit, Send, Recv}), + ok + end}, + #{desc => "await handler ready (init)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, init, + [{tester, Tester}]) of + ok -> + {ok, maps:remove(csock, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester, + handler := Handler} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv, + [{handler, Handler}]) + end}, + #{desc => "order handler to recv", + cmd => fun(#{handler := Handler} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Handler, recv), + ok + end}, + #{desc => "await handler ready (recv)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, recv, + [{tester, Tester}]) of + {ok, Result} -> + %% ?SEV_IPRINT("Result: ~p", [Result]), + {ok, State#{result => Result}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv)", + cmd => fun(#{tester := Tester, + result := Result} = State) -> + ?SEV_ANNOUNCE_READY(Tester, recv, Result), + {ok, maps:remove(result, State)} + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "stop handler", + cmd => fun(#{handler := Handler}) -> + ?SEV_ANNOUNCE_TERMINATE(Handler), + ok + end}, + #{desc => "await handler termination", + cmd => fun(#{handler := Handler} = State) -> + ?SEV_AWAIT_TERMINATION(Handler), + State1 = maps:remove(handler, State), + {ok, State1} + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(lsock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, ServerSA} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, + server_sa => ServerSA}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "create node", + cmd => fun(#{host := Host} = State) -> + case start_node(Host, client) of + {ok, Node} -> + ?SEV_IPRINT("(remote) client node ~p started", + [Node]), + {ok, State#{node => Node}}; + {error, Reason, _} -> + {error, Reason} + end + end}, + #{desc => "monitor client node", + cmd => fun(#{node := Node} = _State) -> + true = erlang:monitor_node(Node, true), + ok + end}, + #{desc => "create remote client", + cmd => fun(#{node := Node} = State) -> + Pid = tpp_tcp_client_create(Node), + ?SEV_IPRINT("remote client created: ~p", [Pid]), + {ok, State#{rclient => Pid}} + end}, + #{desc => "monitor remote client", + cmd => fun(#{rclient := Pid}) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "order remote client to start", + cmd => fun(#{rclient := RClient, + server_sa := ServerSA, + buf_init := BufInit, + send := Send, + recv := Recv}) -> + ?SEV_ANNOUNCE_START(RClient, + {ServerSA, BufInit, Send, Recv}), + ok + end}, + #{desc => "await remote client ready", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_READY(RClient, rclient, init, + [{tester, Tester}]) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect, + [{rclient, RClient}]), + ok + end}, + #{desc => "order remote client to continue (connect)", + cmd => fun(#{rclient := RClient}) -> + ?SEV_ANNOUNCE_CONTINUE(RClient, connect), + ok + end}, + #{desc => "await remote client ready (connect)", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_READY(RClient, rclient, connect, + [{tester, Tester}]) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + #{desc => "await continue (send)", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, + send, + [{rclient, RClient}]) + end}, + #{desc => "order remote client to continue (send)", + cmd => fun(#{rclient := RClient, + msg := Msg, + num := Num} = State) -> + Data = {Msg, Num}, + ?SEV_ANNOUNCE_CONTINUE(RClient, send, Data), + {ok, maps:remove(data, State)} + end}, + #{desc => "await remote client ready (send)", + cmd => fun(#{tester := Tester, + rclient := RClient} = State) -> + case ?SEV_AWAIT_READY(RClient, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + %% ?SEV_IPRINT("remote client result: " + %% "~n ~p", [Result]), + {ok, State#{result => Result}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (send)", + cmd => fun(#{tester := Tester, result := Result} = State) -> + ?SEV_ANNOUNCE_READY(Tester, send, Result), + {ok, maps:remove(result, State)} + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester, + rclient := RClient} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester, + [{rclient, RClient}]) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "stop remote client", + cmd => fun(#{rclient := RClient}) -> + ?SEV_ANNOUNCE_TERMINATE(RClient), + ok + end}, + #{desc => "await remote client termination", + cmd => fun(#{rclient := RClient} = State) -> + ?SEV_AWAIT_TERMINATION(RClient), + State1 = maps:remove(rclient, State), + {ok, State1} + end}, + #{desc => "stop client node", + cmd => fun(#{node := Node} = _State) -> + stop_node(Node) + end}, + #{desc => "await client node termination", + cmd => fun(#{node := Node} = State) -> + receive + {nodedown, Node} -> + {ok, maps:remove(node, State)} + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_sa => ServerSA}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, + server_sa := ServerSA} = _State) -> + ?SEV_ANNOUNCE_START(Pid, ServerSA), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% The actual test + #{desc => "order server continue (accept)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), + ok + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept, + [{client, Client}]), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect, + [{server, Server}]) + end}, + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (send)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send), + ok + end}, + #{desc => "await client ready (send)", + cmd => fun(#{server := Server, + client := Client} = State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + {ok, {_, _, _, _} = Result} -> + ?SEV_IPRINT("client result: " + "~n ~p", [Result]), + {ok, State#{client_result => Result}}; + {ok, BadResult} -> + ?SEV_EPRINT("client result: " + "~n ~p", [BadResult]), + {error, {invalid_client_result, BadResult}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await server ready (recv)", + cmd => fun(#{server := Server, + client := Client, + num := Num} = State) -> + case ?SEV_AWAIT_READY(Server, server, recv, + [{client, Client}]) of + {ok, {Num, _, _, _, _} = Result} -> + ?SEV_IPRINT("server result: " + "~n ~p", [Result]), + Result2 = erlang:delete_element(1, Result), + {ok, State#{server_result => Result2}}; + {ok, BadResult} -> + ?SEV_EPRINT("bad sever result: " + "~n ~p", [BadResult]), + {error, {invalid_server_result, BadResult}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "present result", + cmd => fun(#{server_result := SRes, + client_result := CRes, + num := Num} = State) -> + {SSent, SReceived, SStart, SStop} = SRes, + {CSent, CReceived, CStart, CStop} = CRes, + STime = tdiff(SStart, SStop), + CTime = tdiff(CStart, CStop), + ?SEV_IPRINT("Results: " + "~n Server: ~w msec" + "~n ~w messages/msec exchanged" + "~n ~w bytes/msec sent" + "~n ~w bytes/msec received" + "~n Client: ~w msec" + "~n ~w messages/msec exchanged" + "~n ~w bytes/msec sent" + "~n ~w bytes/msec received", + [STime, + Num div STime, + SSent div STime, + SReceived div STime, + CTime, + Num div CTime, + CSent div CTime, + CReceived div CTime]), + State1 = maps:remove(server_result, State), + State2 = maps:remove(client_result, State), + {ok, State2} + end}, + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(client, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = #{domain => maps:get(domain, InitState), + recv => maps:get(recv, InitState), + send => maps:get(send, InitState), + buf_init => maps:get(buf_init, InitState)}, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start client evaluator(s)"), + ClientInitState = InitState#{host => local_host()}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid, + num => maps:get(num, InitState)}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + +tpp_tcp_handler_create() -> + Self = self(), + erlang:spawn(fun() -> tpp_tcp_handler(Self) end). + +tpp_tcp_handler(Parent) -> + tpp_tcp_handler_init(Parent), + {Sock, BufInit, Send, Recv} = tpp_tcp_handler_await_start(Parent), + tpp_tcp_handler_announce_ready(Parent, init), + tpp_tcp_handler_await_continue(Parent, recv), + Result = tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv), + tpp_tcp_handler_announce_ready(Parent, recv, Result), + Reason = tpp_tcp_handler_await_terminate(Parent), + exit(Reason). + +tpp_tcp_handler_init(Parent) -> + put(sname, "handler"), + ?SEV_IPRINT("init"), + _MRef = erlang:monitor(process, Parent), + ok. + +tpp_tcp_handler_await_start(Parent) -> + ?SEV_IPRINT("await start"), + ?SEV_AWAIT_START(Parent). + +tpp_tcp_handler_announce_ready(Parent, Slogan) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan). +tpp_tcp_handler_announce_ready(Parent, Slogan, Extra) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan, Extra). + +tpp_tcp_handler_await_continue(Parent, Slogan) -> + ?SEV_IPRINT("await continue (~p)", [Slogan]), + case ?SEV_AWAIT_CONTINUE(Parent, parent, Slogan) of + ok -> + %% ?SEV_IPRINT("continue (~p): ok", [Slogan]), + ok; + {error, Reason} -> + ?SEV_EPRINT("continue (~p): error" + "~n ~p", [Slogan, Reason]), + exit({continue, Slogan, Reason}) + end. + +tpp_tcp_handler_await_terminate(Parent) -> + ?SEV_IPRINT("await terminate"), + case ?SEV_AWAIT_TERMINATE(Parent, parent) of + ok -> + ok; + {error, Reason} -> + Reason + end. + +tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv) -> + ok = BufInit(Sock), + socket:setopt(Sock, otp, debug, true), + tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). + +tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> + %% if (N =:= 1000) -> socket:setopt(Sock, otp, debug, true); true -> ok end, + ?SEV_IPRINT("[~w] try receive", [N]), + case Recv(Sock) of + {ok, Msg} -> + NewStart = if (Start =:= undefined) -> ?LIB:timestamp(); + true -> Start end, + ?SEV_IPRINT("[~w] received - now try send", [N]), + case Send(Sock, Msg) of + ok -> + tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, + N+1, + Sent+size(Msg), + Received+size(Msg), + NewStart); + {error, SReason} -> + ?SEV_EPRINT("send (~w): ~p", [N, SReason]), + exit({send, SReason, N}) + end; + %% {error, timeout} -> + %% ?SEV_IPRINT("timeout(~w) - try again", [N]), + %% case Send(Sock, list_to_binary("ping")) of + %% ok -> + %% exit({'ping-send', ok, N}); + %% {error, Reason} -> + %% exit({'ping-send', Reason, N}) + %% end; + {error, closed} -> + ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", [N, Sent, Received]), + Stop = ?LIB:timestamp(), + {N, Sent, Received, Start, Stop}; + {error, RReason} -> + ?SEV_EPRINT("recv (~w): ~p", [N, RReason]), + exit({recv, RReason, N}) + end. + +%% The (remote) client process + +tpp_tcp_client_create(Node) -> + Self = self(), + GL = group_leader(), + Fun = fun() -> tpp_tcp_client(Self, GL) end, + erlang:spawn(Node, Fun). + +tpp_tcp_client(Parent, GL) -> + tpp_tcp_client_init(Parent, GL), + {ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent), + Domain = maps:get(family, ServerSA), + Sock = tpp_tcp_client_sock_open(Domain), + tpp_tcp_client_sock_bind(Sock, Domain), + tpp_tcp_client_announce_ready(Parent, init), + tpp_tcp_client_await_continue(Parent, connect), + tpp_tcp_client_sock_connect(Sock, ServerSA), + tpp_tcp_client_announce_ready(Parent, connect), + {InitMsg, Num} = tpp_tcp_client_await_continue(Parent, send), + Result = tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num), + tpp_tcp_client_announce_ready(Parent, send, Result), + Reason = tpp_tcp_client_await_terminate(Parent), + tpp_tcp_client_sock_close(Sock), + exit(Reason). + +tpp_tcp_client_init(Parent, GL) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), + _MRef = erlang:monitor(process, Parent), + group_leader(self(), GL), + ok. + +tpp_tcp_client_await_start(Parent) -> + ?SEV_IPRINT("await start"), + ?SEV_AWAIT_START(Parent). + +tpp_tcp_client_announce_ready(Parent, Slogan) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan). +tpp_tcp_client_announce_ready(Parent, Slogan, Extra) -> + ?SEV_IPRINT("announce ready (~p): ~p", [Slogan, Extra]), + ?SEV_ANNOUNCE_READY(Parent, Slogan, Extra). + +tpp_tcp_client_await_continue(Parent, Slogan) -> + ?SEV_IPRINT("await continue (~p)", [Slogan]), + case ?SEV_AWAIT_CONTINUE(Parent, parent, Slogan) of + ok -> + %% ?SEV_IPRINT("continue (~p): ok", [Slogan]), + ok; + {ok, Data} -> + %% ?SEV_IPRINT("continue (~p): ok with data", [Slogan]), + Data; + {error, Reason} -> + ?SEV_EPRINT("continue (~p): error" + "~n ~p", [Slogan, Reason]), + exit({continue, Slogan, Reason}) + end. + +tpp_tcp_client_await_terminate(Parent) -> + ?SEV_IPRINT("await terminate"), + case ?SEV_AWAIT_TERMINATE(Parent, parent) of + ok -> + ok; + {error, Reason} -> + Reason + end. + +tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num) -> + ok = BufInit(Sock), + Start = ?LIB:timestamp(), + tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, InitMsg, + Num, 0, 0, 0, Start). + +tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, + Num, Num, Sent, Received, + Start) -> + Stop = ?LIB:timestamp(), + case socket:close(Sock) of + ok -> + {Sent, Received, Start, Stop}; + {error, Reason} -> + exit({failed_closing, Reason}) + end; +tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg, + Num, N, Sent, Received, Start) -> + d("[~w,~w] try send", [Num,N]), + case Send(Sock, Msg) of + ok -> + d("[~w,~w] sent - no try recv", [Num,N]), + case Recv(Sock) of + {ok, NewMsg} -> + tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, + NewMsg, Num, N+1, + Sent+size(Msg), + Received+size(NewMsg), + Start); + {error, RReason} -> + ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), + exit({recv, RReason, N}) + end; + {error, SReason} -> + ?SEV_EPRINT("send (~w of ~w): ~p", [N, Num, SReason]), + exit({send, SReason, N}) + end. + +tpp_tcp_client_sock_open(Domain) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + Sock; + {error, Reason} -> + exit({open_failed, Reason}) + end. + +tpp_tcp_client_sock_bind(Sock, Domain) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, + addr => LAddr}, + case socket:bind(Sock, LSA) of + {ok, _} -> + ok; + {error, Reason} -> + exit({bind, Reason}) + end. + +tpp_tcp_client_sock_connect(Sock, ServerSA) -> + case socket:connect(Sock, ServerSA) of + ok -> + ok; + {error, Reason} -> + exit({connect, Reason}) + end. + +tpp_tcp_client_sock_close(Sock) -> + case socket:close(Sock) of + ok -> + ok; + {error, Reason} -> + exit({close, Reason}) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% This gets the local address (not 127.0...) +%% We should really implement this using the (new) net module, +%% but until that gets the necessary functionality... +which_local_addr(Domain) -> + case inet:getifaddrs() of + {ok, IFL} -> + which_addr(Domain, IFL); + {error, Reason} -> + ?FAIL({inet, getifaddrs, Reason}) + end. + +which_addr(_Domain, []) -> + ?FAIL(no_address); +which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> + which_addr2(Domain, IFO); +which_addr(Domain, [_|IFL]) -> + which_addr(Domain, IFL). + +which_addr2(_Domain, []) -> + ?FAIL(no_address); +which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> + Addr; +which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> + Addr; +which_addr2(Domain, [_|IFO]) -> + which_addr2(Domain, IFO). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_node(Host, NodeName) -> + UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), + case do_start_node(Host, UniqueNodeName) of + {ok, _} = OK -> + OK; + {error, Reason, _} -> + {error, Reason} + end. + +do_start_node(Host, NodeName) when is_list(NodeName) -> + do_start_node(Host, list_to_atom(NodeName)); +do_start_node(Host, NodeName) when is_atom(NodeName) -> + Dir = filename:dirname(code:which(?MODULE)), + Flags = "-pa " ++ Dir, + Opts = [{monitor_master, true}, {erl_flags, Flags}], + ct_slave:start(Host, NodeName, Opts). + + +stop_node(Node) -> + case ct_slave:stop(Node) of + {ok, _} -> + ok; + {error, _} = ERROR -> + ERROR + end. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +sock_open(Domain, Type, Proto) -> + try socket:open(Domain, Type, Proto) of + {ok, Socket} -> + Socket; + {error, Reason} -> + ?FAIL({open, Reason}) + catch + C:E:S -> + ?FAIL({open, C, E, S}) + end. + + +sock_bind(Sock, SockAddr) -> + try socket:bind(Sock, SockAddr) of + {ok, Port} -> + Port; + {error, Reason} -> + i("sock_bind -> error: ~p", [Reason]), + ?FAIL({bind, Reason}) + catch + C:E:S -> + i("sock_bind -> failed: ~p, ~p, ~p", [C, E, S]), + ?FAIL({bind, C, E, S}) + end. + +sock_connect(Sock, SockAddr) -> + try socket:connect(Sock, SockAddr) of + ok -> + ok; + {error, Reason} -> + ?FAIL({connect, Reason}) + catch + C:E:S -> + ?FAIL({connect, C, E, S}) + end. + +sock_sockname(Sock) -> + try socket:sockname(Sock) of + {ok, SockAddr} -> + SockAddr; + {error, Reason} -> + ?FAIL({sockname, Reason}) + catch + C:E:S -> + ?FAIL({sockname, C, E, S}) + end. + + +%% sock_listen(Sock) -> +%% sock_listen2(fun() -> socket:listen(Sock) end). + +%% sock_listen(Sock, BackLog) -> +%% sock_listen2(fun() -> socket:listen(Sock, BackLog) end). + +%% sock_listen2(Listen) -> +%% try Listen() of +%% ok -> +%% ok; +%% {error, Reason} -> +%% ?FAIL({listen, Reason}) +%% catch +%% C:E:S -> +%% ?FAIL({listen, C, E, S}) +%% end. + + +%% sock_accept(LSock) -> +%% try socket:accept(LSock) of +%% {ok, Sock} -> +%% Sock; +%% {error, Reason} -> +%% i("sock_accept -> error: ~p", [Reason]), +%% ?FAIL({accept, Reason}) +%% catch +%% C:E:S -> +%% i("sock_accept -> failed: ~p, ~p, ~p", [C, E, S]), +%% ?FAIL({accept, C, E, S}) +%% end. + + +sock_close(Sock) -> + try socket:close(Sock) of + ok -> + ok; + {error, Reason} -> + i("sock_close -> error: ~p", [Reason]), + ?FAIL({close, Reason}) + catch + C:E:S -> + i("sock_close -> failed: ~p, ~p, ~p", [C, E, S]), + ?FAIL({close, C, E, S}) + end. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +not_yet_implemented() -> + skip("not yet implemented"). + +skip(Reason) -> + throw({skip, Reason}). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +t() -> + os:timestamp(). + + +tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> + T1 = A1*1000000000+B1*1000+(C1 div 1000), + T2 = A2*1000000000+B2*1000+(C2 div 1000), + T2 - T1. + + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, _N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + %% {YYYY,MM,DD} = Date, + {Hour,Min,Sec} = Time, + %% FormatTS = + %% io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w.~w", + %% [YYYY, MM, DD, Hour, Min, Sec, N3]), + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w", [Hour, Min, Sec]), + lists:flatten(FormatTS). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +set_tc_name(N) when is_atom(N) -> + set_tc_name(atom_to_list(N)); +set_tc_name(N) when is_list(N) -> + put(tc_name, N). + +%% get_tc_name() -> +%% get(tc_name). + +tc_begin(TC) -> + set_tc_name(TC), + tc_print("begin ***", + "~n----------------------------------------------------~n", ""). + +tc_end(Result) when is_list(Result) -> + tc_print("done: ~s", [Result], + "", "----------------------------------------------------~n~n"), + ok. + + +tc_try(Case, Fun) when is_atom(Case) andalso is_function(Fun, 0) -> + tc_begin(Case), try begin Fun(), @@ -6923,6 +7918,9 @@ tc_which_name() -> l2a(S) when is_list(S) -> list_to_atom(S). +l2b(L) when is_list(L) -> + list_to_binary(L). + b2l(B) when is_binary(B) -> binary_to_list(B). @@ -6953,6 +7951,22 @@ f(F, A) -> %% i(Before ++ FStr ++ After, []). +d(F, A) -> + d(get(dbg_fd), F, A). + +d(undefined, F, A) -> + [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), + DbgFileName = f("~s-dbg.txt", [NodeNameStr]), + case file:open(DbgFileName, [write]) of + {ok, FD} -> + put(dbg_fd, FD), + d(FD, F, A); + {error, Reason} -> + exit({failed_open_dbg_file, Reason}) + end; +d(FD, F, A) -> + io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). + i(F) -> i(F, []). -- cgit v1.2.3 From d0df643ad994bbc609c52f83b814fc79624af501 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 2 Nov 2018 14:14:48 +0100 Subject: [socket-nif] Inherit buffer sizes when accepting An "accepted" socket will inherit the parent (listen) socket's buffer sizes (rBufSz, rCtrlSz and wCtrlSz). OTP-14831 --- erts/emulator/nifs/common/socket_nif.c | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 389d43ee42..82bf8305fc 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -4919,6 +4919,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->domain = descP->domain; accDescP->type = descP->type; accDescP->protocol = descP->protocol; + accDescP->rBufSz = descP->rBufSz; // Inherit buffer size + accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez + accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accRef = enif_make_resource(env, accDescP); enif_release_resource(accDescP); // We should really store a reference ... @@ -9549,7 +9552,17 @@ ERL_NIF_TERM nsetopt_int_opt(ErlNifEnv* env, int val; if (GET_INT(env, eVal, &val)) { - int res = socket_setopt(descP->sock, level, opt, &val, sizeof(val)); + int res; + + /* + SSDBG( descP, + ("SOCKET", "nsetopt_int_opt -> set option" + "\r\n opt: %d" + "\r\n val: %d" + "\r\n", opt, val) ); + */ + + res = socket_setopt(descP->sock, level, opt, &val, sizeof(val)); if (res != 0) result = esock_make_error_errno(env, sock_errno()); -- cgit v1.2.3 From c6767f0a6dc66971df4425c216024c47993a310b Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 2 Nov 2018 14:17:09 +0100 Subject: [socket-nif|test] Biffer init and message sizes in ping-pong case The ping-pong test case(s) now initiates the socket buffers before they are connected (server: before listen is called on the listen socket and client: before connect is called). Also, we now include a length indicator in the messages, so that we know how much to read. OTP-14831 --- erts/emulator/test/socket_SUITE.erl | 213 +++++++++++++++++++++++++----------- 1 file changed, 150 insertions(+), 63 deletions(-) (limited to 'erts') diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 90e9407f6a..c2b8a8349f 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -6712,7 +6712,7 @@ traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) -> ?TT(?SECS(15)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, %% Recv = fun(Sock) -> socket:recv(Sock, 0, 5000) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, send => Send, % Send function recv => Recv % Receive function @@ -6740,7 +6740,7 @@ traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, send => Send, % Send function recv => Recv % Receive function @@ -6768,7 +6768,7 @@ traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> %% not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, send => Send, % Send function recv => Recv % Receive function @@ -6796,7 +6796,7 @@ traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, send => Send, % Send function recv => Recv % Receive function @@ -6825,7 +6825,7 @@ traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> %% not_yet_implemented(), ?TT(?MINS(5)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, send => Send, % Send function recv => Recv % Receive function @@ -6853,7 +6853,7 @@ traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> not_yet_implemented(), ?TT(?MINS(5)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, send => Send, % Send function recv => Recv % Receive function @@ -6887,12 +6887,31 @@ traffic_ping_pong_medium_send_and_receive_tcp(InitState) -> traffic_ping_pong_large_send_and_receive_tcp(InitState) -> Msg = l2b(?LARGE), - Num = 10, + Num = 1000, Fun = fun(Sock) -> - ok = socket:setopt(Sock, socket, rcvbuf, 64*1024*1024), - ok = socket:setopt(Sock, socket, sndbuf, 64*1024*1024), - %% ok = socket:setopt(Sock, otp, rcvbuf, 64*1024*1024), - %% ok = socket:setopt(Sock, otp, sndbuf, 64*1024*1024), + %% ?SEV_IPRINT("Socket buffers (before): " + %% "~n Rcv: ~p" + %% "~n Snd: ~p", + %% [socket:getopt(Sock, socket, rcvbuf), + %% socket:getopt(Sock, socket, sndbuf)]), + {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), + if (RcvSz < size(Msg)) -> + ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); + true -> + ok + end, + {ok, SndSz} = socket:getopt(Sock, socket, sndbuf), + if (SndSz < size(Msg)) -> + ok = socket:setopt(Sock, socket, sndbuf, 1024+size(Msg)); + true -> + ok + end, + ok = socket:setopt(Sock, otp, rcvbuf, 8*1024), + %% ?SEV_IPRINT("Socket buffers (after): " + %% "~n Rcv: ~p" + %% "~n Snd: ~p", + %% [socket:getopt(Sock, socket, rcvbuf), + %% socket:getopt(Sock, socket, sndbuf)]), ok end, %% Fun to update the buffers: NEEDED here!!! traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, @@ -6939,6 +6958,10 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> ERROR end end}, + #{desc => "maybe init buffers", + cmd => fun(#{lsock := LSock, buf_init := BufInit} = State) -> + BufInit(LSock) + end}, #{desc => "make listen socket", cmd => fun(#{lsock := LSock}) -> socket:listen(LSock) @@ -6982,11 +7005,9 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> #{desc => "start handler", cmd => fun(#{handler := Handler, csock := Sock, - buf_init := BufInit, send := Send, recv := Recv} = _State) -> - ?SEV_ANNOUNCE_START(Handler, - {Sock, BufInit, Send, Recv}), + ?SEV_ANNOUNCE_START(Handler, {Sock, Send, Recv}), ok end}, #{desc => "await handler ready (init)", @@ -7346,25 +7367,35 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> {CSent, CReceived, CStart, CStop} = CRes, STime = tdiff(SStart, SStop), CTime = tdiff(CStart, CStop), - ?SEV_IPRINT("Results: " + %% Note that the sizes we are counting is only + %% the "data" part of the messages. There is also + %% fixed header for each message, which of cource + %% is small for the large messages, but comparatively + %% big for the small messages! + ?SEV_IPRINT("Results: ~w messages exchanged" "~n Server: ~w msec" - "~n ~w messages/msec exchanged" + "~n ~.2f msec/message (roundtrip)" + "~n ~.2f messages/msec (roundtrip)" "~n ~w bytes/msec sent" "~n ~w bytes/msec received" "~n Client: ~w msec" - "~n ~w messages/msec exchanged" + "~n ~.2f msec/message (roundtrip)" + "~n ~.2f messages/msec (roundtrip)" "~n ~w bytes/msec sent" "~n ~w bytes/msec received", - [STime, - Num div STime, + [Num, + STime, + STime / Num, + Num / STime, SSent div STime, SReceived div STime, CTime, - Num div CTime, + CTime / Num, + Num / CTime, CSent div CTime, CReceived div CTime]), State1 = maps:remove(server_result, State), - State2 = maps:remove(client_result, State), + State2 = maps:remove(client_result, State1), {ok, State2} end}, @@ -7432,10 +7463,10 @@ tpp_tcp_handler_create() -> tpp_tcp_handler(Parent) -> tpp_tcp_handler_init(Parent), - {Sock, BufInit, Send, Recv} = tpp_tcp_handler_await_start(Parent), + {Sock, Send, Recv} = tpp_tcp_handler_await_start(Parent), tpp_tcp_handler_announce_ready(Parent, init), tpp_tcp_handler_await_continue(Parent, recv), - Result = tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv), + Result = tpp_tcp_handler_msg_exchange(Sock, Send, Recv), tpp_tcp_handler_announce_ready(Parent, recv, Result), Reason = tpp_tcp_handler_await_terminate(Parent), exit(Reason). @@ -7478,25 +7509,24 @@ tpp_tcp_handler_await_terminate(Parent) -> Reason end. -tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv) -> - ok = BufInit(Sock), - socket:setopt(Sock, otp, debug, true), +tpp_tcp_handler_msg_exchange(Sock, Send, Recv) -> + %% socket:setopt(Sock, otp, debug, true), tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> %% if (N =:= 1000) -> socket:setopt(Sock, otp, debug, true); true -> ok end, - ?SEV_IPRINT("[~w] try receive", [N]), - case Recv(Sock) of - {ok, Msg} -> + %% ?SEV_IPRINT("[~w] try receive", [N]), + case tpp_tcp_recv_req(Sock, Recv) of + {ok, Msg, RecvSz} -> NewStart = if (Start =:= undefined) -> ?LIB:timestamp(); true -> Start end, - ?SEV_IPRINT("[~w] received - now try send", [N]), - case Send(Sock, Msg) of - ok -> + %% ?SEV_IPRINT("[~w] received - now try send", [N]), + case tpp_tcp_send_rep(Sock, Send, Msg) of + {ok, SendSz} -> tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N+1, - Sent+size(Msg), - Received+size(Msg), + Sent+SendSz, + Received+RecvSz, NewStart); {error, SReason} -> ?SEV_EPRINT("send (~w): ~p", [N, SReason]), @@ -7531,14 +7561,14 @@ tpp_tcp_client(Parent, GL) -> tpp_tcp_client_init(Parent, GL), {ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), - Sock = tpp_tcp_client_sock_open(Domain), + Sock = tpp_tcp_client_sock_open(Domain, BufInit), tpp_tcp_client_sock_bind(Sock, Domain), tpp_tcp_client_announce_ready(Parent, init), tpp_tcp_client_await_continue(Parent, connect), tpp_tcp_client_sock_connect(Sock, ServerSA), tpp_tcp_client_announce_ready(Parent, connect), {InitMsg, Num} = tpp_tcp_client_await_continue(Parent, send), - Result = tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num), + Result = tpp_tcp_client_msg_exchange(Sock, Send, Recv, InitMsg, Num), tpp_tcp_client_announce_ready(Parent, send, Result), Reason = tpp_tcp_client_await_terminate(Parent), tpp_tcp_client_sock_close(Sock), @@ -7586,8 +7616,7 @@ tpp_tcp_client_await_terminate(Parent) -> Reason end. -tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num) -> - ok = BufInit(Sock), +tpp_tcp_client_msg_exchange(Sock, Send, Recv, InitMsg, Num) -> Start = ?LIB:timestamp(), tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, InitMsg, Num, 0, 0, 0, Start). @@ -7602,18 +7631,18 @@ tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, {error, Reason} -> exit({failed_closing, Reason}) end; -tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg, +tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Num, N, Sent, Received, Start) -> - d("[~w,~w] try send", [Num,N]), - case Send(Sock, Msg) of - ok -> - d("[~w,~w] sent - no try recv", [Num,N]), - case Recv(Sock) of - {ok, NewMsg} -> + %% d("[~w,~w] try send", [Num,N]), + case tpp_tcp_send_req(Sock, Send, Data) of + {ok, SendSz} -> + %% d("[~w,~w] sent - no try recv", [Num,N]), + case tpp_tcp_recv_rep(Sock, Recv) of + {ok, NewData, RecvSz} -> tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, - NewMsg, Num, N+1, - Sent+size(Msg), - Received+size(NewMsg), + NewData, Num, N+1, + Sent+SendSz, + Received+RecvSz, Start); {error, RReason} -> ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), @@ -7624,9 +7653,10 @@ tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg, exit({send, SReason, N}) end. -tpp_tcp_client_sock_open(Domain) -> +tpp_tcp_client_sock_open(Domain, BufInit) -> case socket:open(Domain, stream, tcp) of {ok, Sock} -> + ok = BufInit(Sock), Sock; {error, Reason} -> exit({open_failed, Reason}) @@ -7659,6 +7689,63 @@ tpp_tcp_client_sock_close(Sock) -> exit({close, Reason}) end. +-define(TPP_REQUEST, 1). +-define(TPP_REPLY, 2). + +tpp_tcp_recv_req(Sock, Recv) -> + tpp_tcp_recv(Sock, Recv, ?TPP_REQUEST). + +tpp_tcp_recv_rep(Sock, Recv) -> + tpp_tcp_recv(Sock, Recv, ?TPP_REPLY). + +tpp_tcp_recv(Sock, Recv, Tag) -> + case Recv(Sock, 0) of + {ok, <> = Msg} + when (Sz =:= size(Data)) -> + %% We got it all + {ok, Data, size(Msg)}; + {ok, <> = Msg} -> + Remains = Sz - size(Data), + tpp_tcp_recv(Sock, Recv, Tag, Remains, size(Msg), [Data]); + {ok, <>} -> + {error, {invalid_msg_tag, Tag}}; + {error, _} = ERROR -> + ERROR + end. + +tpp_tcp_recv(Sock, Recv, Tag, Remaining, AccSz, Acc) -> + case Recv(Sock, Remaining) of + {ok, Data} when (Remaining =:= size(Data)) -> + %% We got the rest + TotSz = AccSz + size(Data), + {ok, erlang:iolist_to_binary(lists:reverse([Data | Acc])), TotSz}; + {ok, Data} when (Remaining > size(Data)) -> + tpp_tcp_recv(Sock, Recv, Tag, + Remaining - size(Data), AccSz + size(Data), + [Data | Acc]); + {error, _} = ERROR -> + ERROR + end. + + +tpp_tcp_send_req(Sock, Send, Data) -> + tpp_tcp_send(Sock, Send, ?TPP_REQUEST, Data). + +tpp_tcp_send_rep(Sock, Send, Data) -> + tpp_tcp_send(Sock, Send, ?TPP_REPLY, Data). + +tpp_tcp_send(Sock, Send, Tag, Data) -> + DataSz = size(Data), + Msg = <>, + case Send(Sock, Msg) of + ok -> + {ok, size(Msg)}; + {error, _} = ERROR -> + ERROR + end. + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This gets the local address (not 127.0...) @@ -7951,21 +8038,21 @@ f(F, A) -> %% i(Before ++ FStr ++ After, []). -d(F, A) -> - d(get(dbg_fd), F, A). +%% d(F, A) -> +%% d(get(dbg_fd), F, A). -d(undefined, F, A) -> - [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), - DbgFileName = f("~s-dbg.txt", [NodeNameStr]), - case file:open(DbgFileName, [write]) of - {ok, FD} -> - put(dbg_fd, FD), - d(FD, F, A); - {error, Reason} -> - exit({failed_open_dbg_file, Reason}) - end; -d(FD, F, A) -> - io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). +%% d(undefined, F, A) -> +%% [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), +%% DbgFileName = f("~s-dbg.txt", [NodeNameStr]), +%% case file:open(DbgFileName, [write]) of +%% {ok, FD} -> +%% put(dbg_fd, FD), +%% d(FD, F, A); +%% {error, Reason} -> +%% exit({failed_open_dbg_file, Reason}) +%% end; +%% d(FD, F, A) -> +%% io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). i(F) -> i(F, []). -- cgit v1.2.3 From 6fcbb97c0b7f082c4934d7765c6b63222f317ef2 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 5 Nov 2018 14:37:39 +0100 Subject: [socket-nif] Make it possible to send (nif) debug to file Make it possible to open a file and send debug printouts to (instead of stdout) for debug printouts from the nif-code. OTP-14831 --- erts/emulator/nifs/common/socket_dbg.c | 24 +++++++++++++++++++++++- erts/emulator/nifs/common/socket_dbg.h | 8 ++++++-- 2 files changed, 29 insertions(+), 3 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_dbg.c b/erts/emulator/nifs/common/socket_dbg.c index dd11fbca9b..fe9135e5a0 100644 --- a/erts/emulator/nifs/common/socket_dbg.c +++ b/erts/emulator/nifs/common/socket_dbg.c @@ -36,10 +36,32 @@ #define TNAME(__T__) enif_thread_name( __T__ ) #define TSNAME() TNAME(TSELF()) +static FILE* dbgout = NULL; + static int realtime(struct timespec* tsP); static int timespec2str(char *buf, unsigned int len, struct timespec *ts); +extern +void esock_dbg_init(char* filename) +{ + if (filename != NULL) { + if (strcmp(filename, ESOCK_DBGOUT_DEFAULT) == 0) { + dbgout = stdout; + } else if (strcmp(filename, ESOCK_DBGOUT_UNIQUE) == 0) { + char template[] = "/tmp/esock-dbg-XXXXXX"; + dbgout = fdopen(mkstemp(template), "w+"); + } else { + dbgout = fopen(filename, "w+"); + } + } else { + char template[] = "/tmp/esock-dbg-XXXXXX"; + dbgout = fdopen(mkstemp(template), "w+"); + } +} + + + /* * Print a debug format string *with* both a timestamp and the * the name of the *current* thread. @@ -70,7 +92,7 @@ void esock_dbg_printf( const char* prefix, const char* format, ... ) if (res > 0) { va_start (args, format); - enif_vfprintf (stdout, f, args); + enif_vfprintf (dbgout, f, args); va_end (args); fflush(stdout); } diff --git a/erts/emulator/nifs/common/socket_dbg.h b/erts/emulator/nifs/common/socket_dbg.h index ad0fcdada9..47739b46da 100644 --- a/erts/emulator/nifs/common/socket_dbg.h +++ b/erts/emulator/nifs/common/socket_dbg.h @@ -27,6 +27,10 @@ #ifndef SOCKET_DBG_H__ #define SOCKET_DBG_H__ +/* Used when calling the init function */ +#define ESOCK_DBGOUT_DEFAULT "stdout" +#define ESOCK_DBGOUT_UNIQUE "unique" + /* Used in debug printouts */ #ifdef __WIN32__ @@ -45,7 +49,7 @@ typedef unsigned long long llu_t; } -extern -void esock_dbg_printf( const char* prefix, const char* format, ... ); +extern void esock_dbg_init(char* filename); +extern void esock_dbg_printf( const char* prefix, const char* format, ... ); #endif // SOCKET_DBG_H__ -- cgit v1.2.3 From 868950ba50185d68075e0eb14708beb5a7a5a63f Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 5 Nov 2018 14:40:14 +0100 Subject: [socket-nif] Sending when buffer is full failed When the send buffer was full (eagain), the send failed (with the rather useless return of {ok, -1}) instead of returning {error, eagain}. OTP-14831 --- erts/emulator/nifs/common/socket_nif.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 82bf8305fc..70a969e867 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -5583,7 +5583,7 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, if (IS_SOCKET_ERROR(written)) save_errno = sock_errno(); else - save_errno = -1; // The value does not actually matter in this case + save_errno = -1; // OK or not complete: this value should not matter in this case res = send_check_result(env, descP, written, dataSize, save_errno, sendRef); @@ -13327,7 +13327,12 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); + SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef); + + return esock_make_error(env, esock_atom_eagain); + } + } /* We failed to write the *entire* packet (anything less then size @@ -13352,8 +13357,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writeWaits, 1); - SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), - descP, NULL, sendRef); + SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef); SSDBG( descP, ("SOCKET", "send_check_result -> " @@ -16736,12 +16740,12 @@ int esock_monitor(const char* slogan, { int res; - SSDBG( descP, ("SOCKET", "[%d] %s: try monitor", descP->sock, slogan) ); + SSDBG( descP, ("SOCKET", "[%d] %s: try monitor\r\n", descP->sock, slogan) ); /* esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); */ res = enif_monitor_process(env, descP, pid, &monP->mon); if (res != 0) { - SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d", descP->sock, res) ); + SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) ); // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res); } /* else { esock_dbg_printf("MONP", @@ -17595,6 +17599,9 @@ BOOLEAN_T extract_iow(ErlNifEnv* env, static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) { + esock_dbg_init(ESOCK_DBGOUT_DEFAULT); + // esock_dbg_init(ESOCK_DBGOUT_UNIQUE); + data.dbg = extract_debug(env, load_info); data.iow = extract_iow(env, load_info); -- cgit v1.2.3 From f18bd2a88d7bc8519cf5db611c4c530eedecba2a Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 5 Nov 2018 14:43:07 +0100 Subject: [socket-nif] Add "partial success" to sendmsg The sendmsg function attempts to send *one message*. But its possible for the underlying software to fail to send the *entire* message. So, instead of retrying itself, as send does, the sendmsg function will now instead return with {ok, Remaining}, leaving it to the caller to decide what to do. OTP-14831 --- erts/preloaded/ebin/socket.beam | Bin 68564 -> 69380 bytes erts/preloaded/src/socket.erl | 47 +++++++++++++++++++++++++++++++++------- 2 files changed, 39 insertions(+), 8 deletions(-) (limited to 'erts') diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam index 480f86334c..d3bc7c7af0 100644 Binary files a/erts/preloaded/ebin/socket.beam and b/erts/preloaded/ebin/socket.beam differ diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 5ebc2074e0..dd10aac3ff 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1573,12 +1573,13 @@ sendmsg(Socket, MsgHdr, Timeout) sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout). --spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - MsgHdr :: msghdr(), - Flags :: send_flags(), - Timeout :: timeout(), - Reason :: term(). +-spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {ok, Remaining} | {error, Reason} when + Socket :: socket(), + MsgHdr :: msghdr(), + Flags :: send_flags(), + Timeout :: timeout(), + Remaining :: erlang:iovec(), + Reason :: term(). sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout) when is_list(IOV) andalso @@ -1603,6 +1604,18 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> %% We are done ok; + {ok, Written} when is_integer(Written) andalso (Written > 0) -> + + %% We should not retry here since the protocol may not + %% be able to handle a message being split. Leave it to + %% the caller to figure out (call again with the rest). + %% + %% We should really not need to cancel, since this is + %% accepted for sendmsg! + %% + cancel(SockRef, sendmsg, SendRef), + {ok, do_sendmsg_rest(maps:get(iov, MsgHdr), Written)}; + {error, eagain} -> receive {select, SockRef, SendRef, ready_output} -> @@ -1617,6 +1630,12 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> ERROR end. +do_sendmsg_rest([B|IOVec], Written) when (Written >= size(B)) -> + do_sendmsg_rest(IOVec, Written - size(B)); +do_sendmsg_rest([B|IOVec], Written) -> + <<_:Written/binary, Rest/binary>> = B, + [Rest|IOVec]. + ensure_msghdr(#{ctrl := []} = M) -> ensure_msghdr(maps:remove(ctrl, M)); ensure_msghdr(#{iov := IOV} = M) when is_list(IOV) andalso (IOV =/= []) -> @@ -1625,6 +1644,8 @@ ensure_msghdr(_) -> einval(). + + %% =========================================================================== %% %% writev - write data into multiple buffers @@ -1983,10 +2004,20 @@ recvmsg(Socket, Timeout) -> Flags :: recv_flags(), Timeout :: timeout(), MsgHdr :: msghdr(), + Reason :: term() + ; (Socket, BufSz, CtrlSz) -> {ok, MsgHdr} | {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + MsgHdr :: msghdr(), Reason :: term(). -recvmsg(Socket, Flags, Timeout) -> - recvmsg(Socket, 0, 0, Flags, Timeout). +recvmsg(Socket, Flags, Timeout) when is_list(Flags) -> + recvmsg(Socket, 0, 0, Flags, Timeout); +recvmsg(Socket, BufSz, CtrlSz) when is_integer(BufSz) andalso is_integer(CtrlSz) -> + recvmsg(Socket, BufSz, CtrlSz, + ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT). + -spec recvmsg(Socket, BufSz, CtrlSz, -- cgit v1.2.3 From 741eb8d2d8ba606d81990ef50b2d8b261d47ec81 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 5 Nov 2018 14:47:03 +0100 Subject: [socket-nif|doc] Improved doc for recvmsg and update for sendmsg The API for the sendmsg function has been updated to describe the possible "partial success" of {ok, Remaining}. OTP-14831 --- erts/doc/src/socket.xml | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'erts') diff --git a/erts/doc/src/socket.xml b/erts/doc/src/socket.xml index 49c14869bf..ea2fde7dee 100644 --- a/erts/doc/src/socket.xml +++ b/erts/doc/src/socket.xml @@ -405,7 +405,8 @@ - + + Receive a message from a socket. @@ -416,7 +417,7 @@ which may contain the source address (if socket not connected), a list of cmsghdr_recv() (depends on what socket options have been set and what the protocol and platform supports) and - also a set of flags, providing further info about the read .

+ also a set of flags, providing further info about the read.

The BufSz argument basically defines the size of the receive buffer. By setting the value to zero (0), the configured @@ -458,6 +459,15 @@ which also contains the message to send, The MsgHdr may also contain an list of optional cmsghdr_send() (depends on what the protocol and platform supports).

+ +

Unlike the send function, + this one sends one message. + This means that if, for whatever reason, its not possible to send the + message in one go, the function will instead return with the + remaining data ({ok, Remaining}). Thereby leaving it + up to the caller to decide what to do (retry with the remaining data + of give up).

+
-- cgit v1.2.3 From fbd9c5949373c9b6292e56604885822f210f24a2 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 5 Nov 2018 14:48:56 +0100 Subject: [socket-nif|test] Add sendmsg/recvmsg ping-pong test cases Added New ping-pong test cases using the sendmsg and recvmsg functions. OTP-14831 --- erts/emulator/test/socket_SUITE.erl | 293 +++++++++++++++++++++++++++++++++--- 1 file changed, 272 insertions(+), 21 deletions(-) (limited to 'erts') diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index c2b8a8349f..2cbd45a63f 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -18,6 +18,8 @@ %% %CopyrightEnd% %% +%% ts:run(emulator, socket_SUITE, [batch]). + -module(socket_SUITE). -include_lib("common_test/include/ct.hrl"). @@ -91,16 +93,16 @@ traffic_send_and_recv_chunks_tcp6/1, traffic_ping_pong_small_send_and_recv_tcp4/1, traffic_ping_pong_small_send_and_recv_tcp6/1, - %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4/1, - %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4/1, + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6/1, traffic_ping_pong_medium_send_and_recv_tcp4/1, traffic_ping_pong_medium_send_and_recv_tcp6/1, - %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4/1, - %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4/1, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6/1, traffic_ping_pong_large_send_and_recv_tcp4/1, - traffic_ping_pong_large_send_and_recv_tcp6/1%, - %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4/1, - %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6/1 + traffic_ping_pong_large_send_and_recv_tcp6/1, + traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4/1, + traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6/1 %% Tickets @@ -261,16 +263,16 @@ traffic_cases() -> traffic_ping_pong_small_send_and_recv_tcp4, traffic_ping_pong_small_send_and_recv_tcp6, - %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, - %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, traffic_ping_pong_medium_send_and_recv_tcp4, traffic_ping_pong_medium_send_and_recv_tcp6, - %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, - %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, traffic_ping_pong_large_send_and_recv_tcp4, - traffic_ping_pong_large_send_and_recv_tcp6%% , - %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, - %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6 + traffic_ping_pong_large_send_and_recv_tcp6, + traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, + traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6 ]. @@ -6749,6 +6751,83 @@ traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> end). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv4. + +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(suite) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(doc) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, + fun() -> + ?TT(?SECS(20)), + Send = fun(Sock, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv6. + +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(suite) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(doc) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(20)), + Send = fun(Sock, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + end). + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test that the send and recv functions %% by repeatedly sending a meassage between two entities. @@ -6765,7 +6844,6 @@ traffic_ping_pong_medium_send_and_recv_tcp4(doc) -> traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> tc_try(traffic_ping_pong_medium_send_and_recv_tcp4, fun() -> - %% not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, @@ -6806,6 +6884,83 @@ traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv4. + +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(suite) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(doc) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, + fun() -> + ?TT(?SECS(20)), + Send = fun(Sock, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv6. + +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(suite) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(doc) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(20)), + Send = fun(Sock, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + end). + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test that the send and recv functions %% by repeatedly sending a meassage between two entities. @@ -6863,6 +7018,87 @@ traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'large' message test case, for IPv4. + +traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(suite) -> + []; +traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(doc) -> + []; +traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, + fun() -> + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'large' message test case, for IPv6. + +traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(suite) -> + []; +traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(doc) -> + []; +traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) when is_binary(Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr); + (Sock, Data) when is_list(Data) -> + MsgHdr = #{iov => Data}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + end). + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -define(SMALL, lists:seq(1, 8)). @@ -6959,7 +7195,7 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> end end}, #{desc => "maybe init buffers", - cmd => fun(#{lsock := LSock, buf_init := BufInit} = State) -> + cmd => fun(#{lsock := LSock, buf_init := BufInit} = _State) -> BufInit(LSock) end}, #{desc => "make listen socket", @@ -7510,11 +7746,9 @@ tpp_tcp_handler_await_terminate(Parent) -> end. tpp_tcp_handler_msg_exchange(Sock, Send, Recv) -> - %% socket:setopt(Sock, otp, debug, true), tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> - %% if (N =:= 1000) -> socket:setopt(Sock, otp, debug, true); true -> ok end, %% ?SEV_IPRINT("[~w] try receive", [N]), case tpp_tcp_recv_req(Sock, Recv) of {ok, Msg, RecvSz} -> @@ -7633,10 +7867,11 @@ tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, end; tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Num, N, Sent, Received, Start) -> - %% d("[~w,~w] try send", [Num,N]), + %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) try send", [Num,N]), case tpp_tcp_send_req(Sock, Send, Data) of {ok, SendSz} -> - %% d("[~w,~w] sent - no try recv", [Num,N]), + %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) sent - " + %% "now try recv", [Num,N]), case tpp_tcp_recv_rep(Sock, Recv) of {ok, NewData, RecvSz} -> tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, @@ -7737,13 +7972,29 @@ tpp_tcp_send_rep(Sock, Send, Data) -> tpp_tcp_send(Sock, Send, Tag, Data) -> DataSz = size(Data), Msg = <>, + tpp_tcp_send_msg(Sock, Send, Msg, 0). + +tpp_tcp_send_msg(Sock, Send, Msg, AccSz) when is_binary(Msg) -> case Send(Sock, Msg) of ok -> - {ok, size(Msg)}; + {ok, AccSz+size(Msg)}; + {ok, Rest} -> % This is an IOVec + RestBin = list_to_binary(Rest), + tpp_tcp_send_msg(Sock, Send, RestBin, AccSz+(size(Msg)-size(RestBin))); {error, _} = ERROR -> ERROR end. + + +%% size_of_data(Data) when is_binary(Data) -> +%% size(Data); +%% size_of_data(Data) when is_list(Data) -> +%% size_of_iovec(Data, 0). +%% size_of_iovec([], Sz) -> +%% Sz; +%% size_of_iovec([B|IOVec], Sz) -> +%% size_of_iovec(IOVec, Sz+size(B)). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -- cgit v1.2.3 From 98d27d647f78e7e0c7154a5d4a007a0c0c2b43d4 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 5 Nov 2018 15:51:51 +0100 Subject: [socket-nif|test] Some minor restructure of the ping-pong cases Some minor restructure of the ping-pong test cases in order to not have duplicate the send and receive fun's. OTP-14831 --- erts/emulator/test/socket_SUITE.erl | 295 +++++++++++++----------------------- 1 file changed, 107 insertions(+), 188 deletions(-) (limited to 'erts') diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 2cbd45a63f..3c20b6422a 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -131,6 +131,10 @@ -define(LIB, socket_test_lib). +-define(PP_SMALL, lists:seq(1, 8)). +-define(PP_MEDIUM, lists:flatten(lists:duplicate(1024, ?PP_SMALL))). +-define(PP_LARGE, lists:flatten(lists:duplicate(1024, ?PP_MEDIUM))). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -6709,17 +6713,15 @@ traffic_ping_pong_small_send_and_recv_tcp4(suite) -> traffic_ping_pong_small_send_and_recv_tcp4(doc) -> []; traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?PP_SMALL), + Num = 100000, tc_try(traffic_ping_pong_small_send_and_recv_tcp4, fun() -> ?TT(?SECS(15)), - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - %% Recv = fun(Sock) -> socket:recv(Sock, 0, 5000) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). @@ -6737,17 +6739,16 @@ traffic_ping_pong_small_send_and_recv_tcp6(suite) -> traffic_ping_pong_small_send_and_recv_tcp6(doc) -> []; traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?PP_SMALL), + Num = 100000, tc_try(traffic_ping_pong_small_send_and_recv_tcp6, fun() -> not_yet_implemented(), - ?TT(?SECS(30)), - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, + ?TT(?SECS(15)), InitState = #{domain => inet6, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). @@ -6765,27 +6766,15 @@ traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(suite) -> traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(doc) -> []; traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?PP_SMALL), + Num = 100000, tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, fun() -> ?TT(?SECS(20)), - Send = fun(Sock, Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, InitState = #{domain => inet, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). @@ -6803,28 +6792,16 @@ traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(suite) -> traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(doc) -> []; traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?PP_SMALL), + Num = 100000, tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, fun() -> not_yet_implemented(), ?TT(?SECS(20)), - Send = fun(Sock, Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, - InitState = #{domain => inet6, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). @@ -6842,16 +6819,15 @@ traffic_ping_pong_medium_send_and_recv_tcp4(suite) -> traffic_ping_pong_medium_send_and_recv_tcp4(doc) -> []; traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?PP_MEDIUM), + Num = 100000, tc_try(traffic_ping_pong_medium_send_and_recv_tcp4, fun() -> ?TT(?SECS(30)), - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). @@ -6869,17 +6845,16 @@ traffic_ping_pong_medium_send_and_recv_tcp6(suite) -> traffic_ping_pong_medium_send_and_recv_tcp6(doc) -> []; traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?PP_MEDIUM), + Num = 100000, tc_try(traffic_ping_pong_medium_send_and_recv_tcp6, fun() -> not_yet_implemented(), ?TT(?SECS(30)), - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). @@ -6898,27 +6873,15 @@ traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(suite) -> traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(doc) -> []; traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?PP_MEDIUM), + Num = 100000, tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, fun() -> - ?TT(?SECS(20)), - Send = fun(Sock, Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, + ?TT(?SECS(30)), InitState = #{domain => inet, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). @@ -6936,28 +6899,16 @@ traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(suite) -> traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(doc) -> []; traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?PP_MEDIUM), + Num = 100000, tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, fun() -> not_yet_implemented(), ?TT(?SECS(20)), - Send = fun(Sock, Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, - InitState = #{domain => inet6, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + InitState = #{domain => ine6, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). @@ -6975,17 +6926,15 @@ traffic_ping_pong_large_send_and_recv_tcp4(suite) -> traffic_ping_pong_large_send_and_recv_tcp4(doc) -> []; traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?PP_LARGE), + Num = 1000, tc_try(traffic_ping_pong_large_send_and_recv_tcp4, fun() -> - %% not_yet_implemented(), - ?TT(?MINS(5)), - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, + ?TT(?SECS(45)), InitState = #{domain => inet, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). @@ -7003,17 +6952,16 @@ traffic_ping_pong_large_send_and_recv_tcp6(suite) -> traffic_ping_pong_large_send_and_recv_tcp6(doc) -> []; traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?PP_LARGE), + Num = 1000, tc_try(traffic_ping_pong_large_send_and_recv_tcp6, fun() -> not_yet_implemented(), - ?TT(?MINS(5)), - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, + ?TT(?SECS(45)), InitState = #{domain => inet6, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). @@ -7032,27 +6980,15 @@ traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(suite) -> traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(doc) -> []; traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?PP_LARGE), + Num = 1000, tc_try(traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, fun() -> ?TT(?SECS(30)), - Send = fun(Sock, Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, InitState = #{domain => inet, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). @@ -7070,66 +7006,55 @@ traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(suite) -> traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(doc) -> []; traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?PP_LARGE), + Num = 1000, tc_try(traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6, fun() -> not_yet_implemented(), ?TT(?SECS(30)), - Send = fun(Sock, Data) when is_binary(Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr); - (Sock, Data) when is_list(Data) -> - MsgHdr = #{iov => Data}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, InitState = #{domain => inet6, - send => Send, % Send function - recv => Recv % Receive function - }, - ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --define(SMALL, lists:seq(1, 8)). --define(MEDIUM, lists:flatten(lists:duplicate(1024, ?SMALL))). --define(LARGE, lists:flatten(lists:duplicate(1024, ?MEDIUM))). - -traffic_ping_pong_small_send_and_receive_tcp(InitState) -> - Msg = l2b(?SMALL), - Num = 100000, - Fun = fun(_) -> ok end, %% Fun to update the buffers: Not needed here - traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, - num => Num, - buf_init => Fun}). +traffic_ping_pong_send_and_recv_tcp(InitState) -> + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, + InitState2 = InitState#{send => Send, % Send function + recv => Recv % Receive function + }, + traffic_ping_pong_send_and_receive_tcp(InitState2). + +traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) -> + Send = fun(Sock, Data) when is_binary(Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr); + (Sock, Data) when is_list(Data) -> %% We assume iovec... + MsgHdr = #{iov => Data}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState2 = InitState#{send => Send, % Send function + recv => Recv % Receive function + }, + traffic_ping_pong_send_and_receive_tcp(InitState2). -traffic_ping_pong_medium_send_and_receive_tcp(InitState) -> - Msg = l2b(?MEDIUM), - Num = 100000, - Fun = fun(_) -> ok end, %% Fun to update the buffers: MAYBE needed here - traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, - num => Num, - buf_init => Fun}). -traffic_ping_pong_large_send_and_receive_tcp(InitState) -> - Msg = l2b(?LARGE), - Num = 1000, +traffic_ping_pong_send_and_receive_tcp(#{msg := Msg} = InitState) -> Fun = fun(Sock) -> - %% ?SEV_IPRINT("Socket buffers (before): " - %% "~n Rcv: ~p" - %% "~n Snd: ~p", - %% [socket:getopt(Sock, socket, rcvbuf), - %% socket:getopt(Sock, socket, sndbuf)]), {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), if (RcvSz < size(Msg)) -> ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); @@ -7142,19 +7067,11 @@ traffic_ping_pong_large_send_and_receive_tcp(InitState) -> true -> ok end, - ok = socket:setopt(Sock, otp, rcvbuf, 8*1024), - %% ?SEV_IPRINT("Socket buffers (after): " - %% "~n Rcv: ~p" - %% "~n Snd: ~p", - %% [socket:getopt(Sock, socket, rcvbuf), - %% socket:getopt(Sock, socket, sndbuf)]), - ok - end, %% Fun to update the buffers: NEEDED here!!! - traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, - num => Num, - buf_init => Fun}). - -traffic_ping_pong_send_and_receive_tcp(InitState) -> + ok = socket:setopt(Sock, otp, rcvbuf, 8*1024) + end, + traffic_ping_pong_send_and_receive_tcp2(InitState#{buf_init => Fun}). + +traffic_ping_pong_send_and_receive_tcp2(InitState) -> ServerSeq = [ %% *** Wait for start order part *** @@ -7705,6 +7622,7 @@ tpp_tcp_handler(Parent) -> Result = tpp_tcp_handler_msg_exchange(Sock, Send, Recv), tpp_tcp_handler_announce_ready(Parent, recv, Result), Reason = tpp_tcp_handler_await_terminate(Parent), + ?SEV_IPRINT("terminating"), exit(Reason). tpp_tcp_handler_init(Parent) -> @@ -7806,6 +7724,7 @@ tpp_tcp_client(Parent, GL) -> tpp_tcp_client_announce_ready(Parent, send, Result), Reason = tpp_tcp_client_await_terminate(Parent), tpp_tcp_client_sock_close(Sock), + ?SEV_IPRINT("terminating"), exit(Reason). tpp_tcp_client_init(Parent, GL) -> -- cgit v1.2.3 From c6e94046261a608ee536c18cf631e33191e71bb1 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Tue, 6 Nov 2018 15:18:31 +0100 Subject: [socket-nif] Badly handled socket close for recvfrom and recvmsg When type = dgram, the functions recvfrom and recvmsg did not properly handle socket close, cuaing the caller to hang indefinitely. OTP-14831 --- erts/emulator/nifs/common/socket_nif.c | 111 +++++++++++++++++++++------------ 1 file changed, 71 insertions(+), 40 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 70a969e867..f657da3ace 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -13825,6 +13825,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { + ERL_NIF_TERM res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -13842,12 +13843,14 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; + recv_error_current_reader(env, descP, res); + SELECT(env, descP->sock, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); - return esock_make_error(env, atom_closed); + return res; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { @@ -13862,12 +13865,15 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, return esock_make_error(env, esock_atom_eagain); } else { + ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); SSDBG( descP, ("SOCKET", "recvfrom_check_result -> errno: %d\r\n", saveErrno) ); - return esock_make_error_errno(env, saveErrno); + recv_error_current_reader(env, descP, res); + + return res; } } else { @@ -13894,6 +13900,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, data = MKSBIN(env, data, 0, read); } + recv_update_current_reader(env, descP); + return esock_make_ok2(env, MKT2(env, eSockAddr, data)); } @@ -13957,6 +13965,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { + ERL_NIF_TERM res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -13974,12 +13983,14 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; + recv_error_current_reader(env, descP, res); + SELECT(env, descP->sock, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); - return esock_make_error(env, atom_closed); + return res; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { @@ -13995,12 +14006,15 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, return esock_make_error(env, esock_atom_eagain); } else { + ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); SSDBG( descP, ("SOCKET", "recvmsg_check_result -> errno: %d\r\n", saveErrno) ); - return esock_make_error_errno(env, saveErrno); + recv_error_current_reader(env, descP, res); + + return res; } } else { @@ -14029,6 +14043,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, "recvmsg_check_result -> " "(msghdr) encode failed: %s\r\n", xres) ); + recv_update_current_reader(env, descP); + return esock_make_error_str(env, xres); } else { @@ -14037,6 +14053,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, "recvmsg_check_result -> " "(msghdr) encode ok: %T\r\n", eMsgHdr) ); + recv_update_current_reader(env, descP); + return esock_make_ok2(env, eMsgHdr); } @@ -16913,18 +16931,22 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") ); if (!compare_pids(env, &descP->closerPid, - &descP->currentWriter.pid) && - send_msg_nif_abort(env, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current writer %T\r\n", - descP->currentWriter.ref, - descP->currentWriter.pid); + &descP->currentWriter.pid)) { + SSDBG( descP, ("SOCKET", "socket_stop -> " + "send abort message to current writer %T\r\n", + descP->currentWriter.pid) ); + if (send_msg_nif_abort(env, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid) != NULL) { + /* Shall we really do this? + * This happens if the controlling process has been killed! + */ + esock_warning_msg("Failed sending abort (%T) message to " + "current writer %T\r\n", + descP->currentWriter.ref, + descP->currentWriter.pid); + } } /* And also deal with the waiting writers (in the same way) */ @@ -16948,18 +16970,22 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") ); if (!compare_pids(env, &descP->closerPid, - &descP->currentReader.pid) && - send_msg_nif_abort(env, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current reader %T\r\n", - descP->currentReader.ref, - descP->currentReader.pid); + &descP->currentReader.pid)) { + SSDBG( descP, ("SOCKET", "socket_stop -> " + "send abort message to current reader %T\r\n", + descP->currentReader.pid) ); + if (send_msg_nif_abort(env, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid) != NULL) { + /* Shall we really do this? + * This happens if the controlling process has been killed! + */ + esock_warning_msg("Failed sending abort (%T) message to " + "current reader %T\r\n", + descP->currentReader.ref, + descP->currentReader.pid); + } } /* And also deal with the waiting readers (in the same way) */ @@ -16982,18 +17008,22 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") ); if (!compare_pids(env, &descP->closerPid, - &descP->currentAcceptor.pid) && - send_msg_nif_abort(env, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current acceptor %T\r\n", - descP->currentAcceptor.ref, - descP->currentAcceptor.pid); + &descP->currentAcceptor.pid)) { + SSDBG( descP, ("SOCKET", "socket_stop -> " + "send abort message to current acceptor %T\r\n", + descP->currentWriter.pid) ); + if (send_msg_nif_abort(env, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid) != NULL) { + /* Shall we really do this? + * This happens if the controlling process has been killed! + */ + esock_warning_msg("Failed sending abort (%T) message to " + "current acceptor %T\r\n", + descP->currentAcceptor.ref, + descP->currentAcceptor.pid); + } } /* And also deal with the waiting acceptors (in the same way) */ @@ -17105,6 +17135,7 @@ void inform_waiting_procs(ErlNifEnv* env, currentP->data.ref, reason, ¤tP->data.pid)) ); + DEMONP("inform_waiting_procs -> current 'request'", env, descP, ¤tP->data.mon); nextP = currentP->nextP; -- cgit v1.2.3 From e3e607ac76dc308da3ac24364477d48da0dc23bd Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Tue, 6 Nov 2018 15:22:09 +0100 Subject: [socket-nif|test] Add UDP ping-pong test cases Added ping-pong test cases for UDP, small and medium, using the sendto/recvfrom and sendmsg/recvmsg functions. OTP-14831 --- erts/emulator/test/socket_SUITE.erl | 1451 ++++++++++++++++++++++++++++++----- erts/preloaded/ebin/socket.beam | Bin 69380 -> 69392 bytes erts/preloaded/src/socket.erl | 8 + 3 files changed, 1272 insertions(+), 187 deletions(-) (limited to 'erts') diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 3c20b6422a..4a381843a9 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -91,19 +91,30 @@ %% Traffic traffic_send_and_recv_chunks_tcp4/1, traffic_send_and_recv_chunks_tcp6/1, + traffic_ping_pong_small_send_and_recv_tcp4/1, traffic_ping_pong_small_send_and_recv_tcp6/1, - traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4/1, - traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6/1, traffic_ping_pong_medium_send_and_recv_tcp4/1, traffic_ping_pong_medium_send_and_recv_tcp6/1, - traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4/1, - traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6/1, traffic_ping_pong_large_send_and_recv_tcp4/1, traffic_ping_pong_large_send_and_recv_tcp6/1, + + traffic_ping_pong_small_sendto_and_recvfrom_udp4/1, + traffic_ping_pong_small_sendto_and_recvfrom_udp6/1, + traffic_ping_pong_medium_sendto_and_recvfrom_udp4/1, + traffic_ping_pong_medium_sendto_and_recvfrom_udp6/1, + + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4/1, + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4/1, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6/1, traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4/1, - traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6/1 + traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_small_sendmsg_and_recvmsg_udp4/1, + traffic_ping_pong_small_sendmsg_and_recvmsg_udp6/1, + traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4/1, + traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6/1 %% Tickets ]). @@ -131,9 +142,13 @@ -define(LIB, socket_test_lib). --define(PP_SMALL, lists:seq(1, 8)). --define(PP_MEDIUM, lists:flatten(lists:duplicate(1024, ?PP_SMALL))). --define(PP_LARGE, lists:flatten(lists:duplicate(1024, ?PP_MEDIUM))). +-define(TPP_SMALL, lists:seq(1, 8)). +-define(TPP_MEDIUM, lists:flatten(lists:duplicate(1024, ?TPP_SMALL))). +-define(TPP_LARGE, lists:flatten(lists:duplicate(1024, ?TPP_MEDIUM))). + +-define(TPP_SMALL_NUM, 100000). +-define(TPP_MEDIUM_NUM, 100000). +-define(TPP_LARGE_NUM, 1000). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -267,16 +282,27 @@ traffic_cases() -> traffic_ping_pong_small_send_and_recv_tcp4, traffic_ping_pong_small_send_and_recv_tcp6, - traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, - traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, traffic_ping_pong_medium_send_and_recv_tcp4, traffic_ping_pong_medium_send_and_recv_tcp6, - traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, - traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, traffic_ping_pong_large_send_and_recv_tcp4, traffic_ping_pong_large_send_and_recv_tcp6, + + traffic_ping_pong_small_sendto_and_recvfrom_udp4, + traffic_ping_pong_small_sendto_and_recvfrom_udp6, + traffic_ping_pong_medium_sendto_and_recvfrom_udp4, + traffic_ping_pong_medium_sendto_and_recvfrom_udp6, + + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, + traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, + traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, - traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6 + traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6, + + traffic_ping_pong_small_sendmsg_and_recvmsg_udp4, + traffic_ping_pong_small_sendmsg_and_recvmsg_udp6, + traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4, + traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6 ]. @@ -6713,8 +6739,8 @@ traffic_ping_pong_small_send_and_recv_tcp4(suite) -> traffic_ping_pong_small_send_and_recv_tcp4(doc) -> []; traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) -> - Msg = l2b(?PP_SMALL), - Num = 100000, + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, tc_try(traffic_ping_pong_small_send_and_recv_tcp4, fun() -> ?TT(?SECS(15)), @@ -6739,8 +6765,8 @@ traffic_ping_pong_small_send_and_recv_tcp6(suite) -> traffic_ping_pong_small_send_and_recv_tcp6(doc) -> []; traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> - Msg = l2b(?PP_SMALL), - Num = 100000, + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, tc_try(traffic_ping_pong_small_send_and_recv_tcp6, fun() -> not_yet_implemented(), @@ -6753,58 +6779,59 @@ traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This test case is intended to test that the sendmsg and recvmsg -%% functions by repeatedly sending a meassage between two entities. +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. %% The same basic test case is used for three different message sizes; %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'small' message test case, for IPv4. +%% This is the 'medium' message test case, for IPv4. -traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(suite) -> +traffic_ping_pong_medium_send_and_recv_tcp4(suite) -> []; -traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(doc) -> +traffic_ping_pong_medium_send_and_recv_tcp4(doc) -> []; -traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> - Msg = l2b(?PP_SMALL), - Num = 100000, - tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, +traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_send_and_recv_tcp4, fun() -> - ?TT(?SECS(20)), + ?TT(?SECS(30)), InitState = #{domain => inet, msg => Msg, num => Num}, - ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This test case is intended to test that the sendmsg and recvmsg functions +%% This test case is intended to test that the send and recv functions %% by repeatedly sending a meassage between two entities. %% The same basic test case is used for three different message sizes; %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'small' message test case, for IPv6. +%% This is the 'medium' message test case, for IPv6. -traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(suite) -> +traffic_ping_pong_medium_send_and_recv_tcp6(suite) -> []; -traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(doc) -> +traffic_ping_pong_medium_send_and_recv_tcp6(doc) -> []; -traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> - Msg = l2b(?PP_SMALL), - Num = 100000, - tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, +traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_send_and_recv_tcp6, fun() -> not_yet_implemented(), - ?TT(?SECS(20)), - InitState = #{domain => inet, + ?TT(?SECS(30)), + InitState = #{domain => inet6, msg => Msg, num => Num}, - ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) + ok = traffic_ping_pong_send_and_recv_tcp(InitState) end). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test that the send and recv functions %% by repeatedly sending a meassage between two entities. @@ -6812,18 +6839,18 @@ traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'medium' message test case, for IPv4. +%% This is the 'large' message test case, for IPv4. -traffic_ping_pong_medium_send_and_recv_tcp4(suite) -> +traffic_ping_pong_large_send_and_recv_tcp4(suite) -> []; -traffic_ping_pong_medium_send_and_recv_tcp4(doc) -> +traffic_ping_pong_large_send_and_recv_tcp4(doc) -> []; -traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> - Msg = l2b(?PP_MEDIUM), - Num = 100000, - tc_try(traffic_ping_pong_medium_send_and_recv_tcp4, +traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_LARGE), + Num = ?TPP_LARGE_NUM, + tc_try(traffic_ping_pong_large_send_and_recv_tcp4, fun() -> - ?TT(?SECS(30)), + ?TT(?SECS(45)), InitState = #{domain => inet, msg => Msg, num => Num}, @@ -6838,19 +6865,19 @@ traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'medium' message test case, for IPv6. +%% This is the 'large' message test case, for IPv6. -traffic_ping_pong_medium_send_and_recv_tcp6(suite) -> +traffic_ping_pong_large_send_and_recv_tcp6(suite) -> []; -traffic_ping_pong_medium_send_and_recv_tcp6(doc) -> +traffic_ping_pong_large_send_and_recv_tcp6(doc) -> []; -traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> - Msg = l2b(?PP_MEDIUM), - Num = 100000, - tc_try(traffic_ping_pong_medium_send_and_recv_tcp6, +traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_LARGE), + Num = ?TPP_LARGE_NUM, + tc_try(traffic_ping_pong_large_send_and_recv_tcp6, fun() -> not_yet_implemented(), - ?TT(?SECS(30)), + ?TT(?SECS(45)), InitState = #{domain => inet6, msg => Msg, num => Num}, @@ -6859,6 +6886,112 @@ traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendto and recvfrom +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for two different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv4. + +traffic_ping_pong_small_sendto_and_recvfrom_udp4(suite) -> + []; +traffic_ping_pong_small_sendto_and_recvfrom_udp4(doc) -> + []; +traffic_ping_pong_small_sendto_and_recvfrom_udp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, + tc_try(traffic_ping_pong_small_sendto_and_recvfrom_udp4, + fun() -> + ?TT(?SECS(45)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendto_and_recvfrom_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendto and recvfrom +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for two different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv6. + +traffic_ping_pong_small_sendto_and_recvfrom_udp6(suite) -> + []; +traffic_ping_pong_small_sendto_and_recvfrom_udp6(doc) -> + []; +traffic_ping_pong_small_sendto_and_recvfrom_udp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, + tc_try(traffic_ping_pong_small_sendto_and_recvfrom_udp6, + fun() -> + ?TT(?SECS(45)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendto_and_recvfrom_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendto and recvfrom +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for two different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv4. + +traffic_ping_pong_medium_sendto_and_recvfrom_udp4(suite) -> + []; +traffic_ping_pong_medium_sendto_and_recvfrom_udp4(doc) -> + []; +traffic_ping_pong_medium_sendto_and_recvfrom_udp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_sendto_and_recvfrom_udp4, + fun() -> + ?TT(?SECS(45)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendto_and_recvfrom_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendto and recvfrom +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for two different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv6. + +traffic_ping_pong_medium_sendto_and_recvfrom_udp6(suite) -> + []; +traffic_ping_pong_medium_sendto_and_recvfrom_udp6(doc) -> + []; +traffic_ping_pong_medium_sendto_and_recvfrom_udp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_sendto_and_recvfrom_udp6, + fun() -> + ?TT(?SECS(45)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendto_and_recvfrom_udp(InitState) + end). + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test that the sendmsg and recvmsg %% functions by repeatedly sending a meassage between two entities. @@ -6866,18 +6999,18 @@ traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'medium' message test case, for IPv4. +%% This is the 'small' message test case, for IPv4. -traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(suite) -> +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(suite) -> []; -traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(doc) -> +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(doc) -> []; -traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> - Msg = l2b(?PP_MEDIUM), - Num = 100000, - tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, + tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, fun() -> - ?TT(?SECS(30)), + ?TT(?SECS(20)), InitState = #{domain => inet, msg => Msg, num => Num}, @@ -6892,20 +7025,20 @@ traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'medium' message test case, for IPv6. +%% This is the 'small' message test case, for IPv6. -traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(suite) -> +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(suite) -> []; -traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(doc) -> +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(doc) -> []; -traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> - Msg = l2b(?PP_MEDIUM), - Num = 100000, - tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, +traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, + tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, fun() -> not_yet_implemented(), ?TT(?SECS(20)), - InitState = #{domain => ine6, + InitState = #{domain => inet, msg => Msg, num => Num}, ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) @@ -6913,59 +7046,58 @@ traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This test case is intended to test that the send and recv functions -%% by repeatedly sending a meassage between two entities. +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. %% The same basic test case is used for three different message sizes; %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'large' message test case, for IPv4. +%% This is the 'medium' message test case, for IPv4. -traffic_ping_pong_large_send_and_recv_tcp4(suite) -> +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(suite) -> []; -traffic_ping_pong_large_send_and_recv_tcp4(doc) -> +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(doc) -> []; -traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> - Msg = l2b(?PP_LARGE), - Num = 1000, - tc_try(traffic_ping_pong_large_send_and_recv_tcp4, +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, fun() -> - ?TT(?SECS(45)), + ?TT(?SECS(30)), InitState = #{domain => inet, msg => Msg, num => Num}, - ok = traffic_ping_pong_send_and_recv_tcp(InitState) + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This test case is intended to test that the send and recv functions +%% This test case is intended to test that the sendmsg and recvmsg functions %% by repeatedly sending a meassage between two entities. %% The same basic test case is used for three different message sizes; %% small (8 bytes), medium (8K) and large (8M). %% The message is sent from A to B and then back again. This is %% repeated a set number of times (more times the small the message). -%% This is the 'large' message test case, for IPv6. +%% This is the 'medium' message test case, for IPv6. -traffic_ping_pong_large_send_and_recv_tcp6(suite) -> +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(suite) -> []; -traffic_ping_pong_large_send_and_recv_tcp6(doc) -> +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(doc) -> []; -traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> - Msg = l2b(?PP_LARGE), - Num = 1000, - tc_try(traffic_ping_pong_large_send_and_recv_tcp6, +traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, fun() -> not_yet_implemented(), - ?TT(?SECS(45)), - InitState = #{domain => inet6, + ?TT(?SECS(20)), + InitState = #{domain => ine6, msg => Msg, num => Num}, - ok = traffic_ping_pong_send_and_recv_tcp(InitState) + ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) end). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test that the sendmsg and recvmsg %% functions by repeatedly sending a meassage between two entities. @@ -6980,8 +7112,8 @@ traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(suite) -> traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(doc) -> []; traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> - Msg = l2b(?PP_LARGE), - Num = 1000, + Msg = l2b(?TPP_LARGE), + Num = ?TPP_LARGE_NUM, tc_try(traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, fun() -> ?TT(?SECS(30)), @@ -7006,8 +7138,8 @@ traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(suite) -> traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(doc) -> []; traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) -> - Msg = l2b(?PP_LARGE), - Num = 1000, + Msg = l2b(?TPP_LARGE), + Num = ?TPP_LARGE_NUM, tc_try(traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6, fun() -> not_yet_implemented(), @@ -7021,40 +7153,149 @@ traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv4. -traffic_ping_pong_send_and_recv_tcp(InitState) -> - Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, - InitState2 = InitState#{send => Send, % Send function - recv => Recv % Receive function - }, - traffic_ping_pong_send_and_receive_tcp(InitState2). - -traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) -> - Send = fun(Sock, Data) when is_binary(Data) -> - MsgHdr = #{iov => [Data]}, - socket:sendmsg(Sock, MsgHdr); - (Sock, Data) when is_list(Data) -> %% We assume iovec... - MsgHdr = #{iov => Data}, - socket:sendmsg(Sock, MsgHdr) - end, - Recv = fun(Sock, Sz) -> - case socket:recvmsg(Sock, Sz, 0) of - {ok, #{addr := undefined, - iov := [Data]}} -> - {ok, Data}; - {error, _} = ERROR -> - ERROR - end - end, - InitState2 = InitState#{send => Send, % Send function - recv => Recv % Receive function - }, - traffic_ping_pong_send_and_receive_tcp(InitState2). +traffic_ping_pong_small_sendmsg_and_recvmsg_udp4(suite) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_udp4(doc) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_udp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, + tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_udp4, + fun() -> + ?TT(?SECS(20)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) + end). -traffic_ping_pong_send_and_receive_tcp(#{msg := Msg} = InitState) -> - Fun = fun(Sock) -> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv6. + +traffic_ping_pong_small_sendmsg_and_recvmsg_udp6(suite) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_udp6(doc) -> + []; +traffic_ping_pong_small_sendmsg_and_recvmsg_udp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_SMALL), + Num = ?TPP_SMALL_NUM, + tc_try(traffic_ping_pong_small_sendmsg_and_recvmsg_udp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(20)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv4. + +traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4(suite) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4(doc) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4, + fun() -> + ?TT(?SECS(30)), + InitState = #{domain => inet, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the sendmsg and recvmsg +%% functions by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes) and medium (8K). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv6. + +traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6(suite) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6(doc) -> + []; +traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6(_Config) when is_list(_Config) -> + Msg = l2b(?TPP_MEDIUM), + Num = ?TPP_MEDIUM_NUM, + tc_try(traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(20)), + InitState = #{domain => ine6, + msg => Msg, + num => Num}, + ok = traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Ping-Pong for TCP + +traffic_ping_pong_send_and_recv_tcp(InitState) -> + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, + InitState2 = InitState#{send => Send, % Send function + recv => Recv % Receive function + }, + traffic_ping_pong_send_and_receive_tcp(InitState2). + +traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) -> + Send = fun(Sock, Data) when is_binary(Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr); + (Sock, Data) when is_list(Data) -> %% We assume iovec... + MsgHdr = #{iov => Data}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + InitState2 = InitState#{send => Send, % Send function + recv => Recv % Receive function + }, + traffic_ping_pong_send_and_receive_tcp(InitState2). + + +traffic_ping_pong_send_and_receive_tcp(#{msg := Msg} = InitState) -> + Fun = fun(Sock) -> {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), if (RcvSz < size(Msg)) -> ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); @@ -7918,58 +8159,894 @@ tpp_tcp_send_msg(Sock, Send, Msg, AccSz) when is_binary(Msg) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% This gets the local address (not 127.0...) -%% We should really implement this using the (new) net module, -%% but until that gets the necessary functionality... -which_local_addr(Domain) -> - case inet:getifaddrs() of - {ok, IFL} -> - which_addr(Domain, IFL); - {error, Reason} -> - ?FAIL({inet, getifaddrs, Reason}) - end. +%% Ping-Pong for UDP -which_addr(_Domain, []) -> - ?FAIL(no_address); -which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> - which_addr2(Domain, IFO); -which_addr(Domain, [_|IFL]) -> - which_addr(Domain, IFL). +traffic_ping_pong_sendto_and_recvfrom_udp(InitState) -> + Send = fun(Sock, Data, Dest) -> + socket:sendto(Sock, Data, Dest) + end, + Recv = fun(Sock, Sz) -> + socket:recvfrom(Sock, Sz) + end, + InitState2 = InitState#{send => Send, % Send function + recv => Recv % Receive function + }, + traffic_ping_pong_send_and_receive_udp(InitState2). -which_addr2(_Domain, []) -> - ?FAIL(no_address); -which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> - Addr; -which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> - Addr; -which_addr2(Domain, [_|IFO]) -> - which_addr2(Domain, IFO). - +traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) -> + Send = fun(Sock, Data, Dest) when is_binary(Data) -> + MsgHdr = #{addr => Dest, iov => [Data]}, + socket:sendmsg(Sock, MsgHdr); + (Sock, Data, Dest) when is_list(Data) -> %% We assume iovec... + MsgHdr = #{addr => Dest, iov => Data}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock, Sz) -> + case socket:recvmsg(Sock, Sz, 0) of + {ok, #{addr := Source, + iov := [Data]}} -> + {ok, {Source, Data}}; + {error, _} = ERROR -> + ERROR + end + end, + InitState2 = InitState#{send => Send, % Send function + recv => Recv % Receive function + }, + traffic_ping_pong_send_and_receive_udp(InitState2). -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +traffic_ping_pong_send_and_receive_udp(#{msg := Msg} = InitState) -> + Fun = fun(Sock) -> + {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), + if (RcvSz < size(Msg)) -> + ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); + true -> + ok + end, + {ok, SndSz} = socket:getopt(Sock, socket, sndbuf), + if (SndSz < size(Msg)) -> + ok = socket:setopt(Sock, socket, sndbuf, 1024+size(Msg)); + true -> + ok + end, + {ok, OtpRcvBuf} = socket:getopt(Sock, otp, rcvbuf), + if + (OtpRcvBuf < size(Msg)) -> + ok = socket:setopt(Sock, otp, rcvbuf, 1024+size(Msg)); + true -> + ok + end + end, + traffic_ping_pong_send_and_receive_udp2(InitState#{buf_init => Fun}). -start_node(Host, NodeName) -> - UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), - case do_start_node(Host, UniqueNodeName) of - {ok, _} = OK -> - OK; - {error, Reason, _} -> - {error, Reason} - end. +traffic_ping_pong_send_and_receive_udp2(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, -do_start_node(Host, NodeName) when is_list(NodeName) -> - do_start_node(Host, list_to_atom(NodeName)); -do_start_node(Host, NodeName) when is_atom(NodeName) -> - Dir = filename:dirname(code:which(?MODULE)), - Flags = "-pa " ++ Dir, - Opts = [{monitor_master, true}, {erl_flags, Flags}], - ct_slave:start(Host, NodeName, Opts). + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, dgram, udp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = State) -> + case socket:bind(Sock, LSA) of + {ok, Port} -> + {ok, State#{port => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "maybe init buffers", + cmd => fun(#{sock := Sock, buf_init := BufInit} = _State) -> + BufInit(Sock) + end}, + #{desc => "create handler", + cmd => fun(State) -> + Handler = tpp_udp_server_handler_create(), + ?SEV_IPRINT("handler created: ~p", [Handler]), + {ok, State#{handler => Handler}} + end}, + #{desc => "monitor handler", + cmd => fun(#{handler := Handler} = _State) -> + _MRef = erlang:monitor(process, Handler), + ok + end}, + #{desc => "start handler", + cmd => fun(#{handler := Handler, + sock := Sock, + send := Send, + recv := Recv} = _State) -> + ?SEV_ANNOUNCE_START(Handler, {Sock, Send, Recv}), + ok + end}, + #{desc => "await handler ready (init)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, init, + [{tester, Tester}]) of + ok -> + {ok, maps:remove(csock, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, local_sa := LSA, port := Port}) -> + ServerSA = LSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), + ok + end}, + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester, + handler := Handler} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv, + [{handler, Handler}]) + end}, + #{desc => "order handler to recv", + cmd => fun(#{handler := Handler} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Handler, recv), + ok + end}, + #{desc => "await continue (close)", + cmd => fun(#{tester := Tester, + handler := Handler} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, close, + [{handler, Handler}]) + end}, -stop_node(Node) -> - case ct_slave:stop(Node) of - {ok, _} -> + ?SEV_SLEEP(?SECS(1)), + + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + %% socket:setopt(Sock, otp, debug, true), + case socket:close(Sock) of + ok -> + {ok, maps:remove(sock, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (close)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_ANNOUNCE_READY(Tester, close), + ok + end}, + #{desc => "await handler ready (recv)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, recv, + [{tester, Tester}]) of + {ok, Result} -> + %% ?SEV_IPRINT("Result: ~p", [Result]), + {ok, State#{result => Result}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv)", + cmd => fun(#{tester := Tester, + result := Result} = State) -> + ?SEV_ANNOUNCE_READY(Tester, recv, Result), + {ok, maps:remove(result, State)} + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "stop handler", + cmd => fun(#{handler := Handler}) -> + ?SEV_ANNOUNCE_TERMINATE(Handler), + ok + end}, + #{desc => "await handler termination", + cmd => fun(#{handler := Handler} = State) -> + ?SEV_AWAIT_TERMINATION(Handler), + State1 = maps:remove(handler, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, ServerSA} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, + server_sa => ServerSA}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "create node", + cmd => fun(#{host := Host} = State) -> + case start_node(Host, client) of + {ok, Node} -> + ?SEV_IPRINT("(remote) client node ~p started", + [Node]), + {ok, State#{node => Node}}; + {error, Reason, _} -> + {error, Reason} + end + end}, + #{desc => "monitor client node", + cmd => fun(#{node := Node} = _State) -> + true = erlang:monitor_node(Node, true), + ok + end}, + #{desc => "create (remote) handler", + cmd => fun(#{node := Node} = State) -> + Pid = tpp_udp_client_handler_create(Node), + ?SEV_IPRINT("handler created: ~p", [Pid]), + {ok, State#{handler => Pid}} + end}, + #{desc => "monitor remote handler", + cmd => fun(#{handler := Pid}) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "order remote handler to start", + cmd => fun(#{handler := Handler, + server_sa := ServerSA, + buf_init := BufInit, + send := Send, + recv := Recv}) -> + ?SEV_ANNOUNCE_START(Handler, + {ServerSA, BufInit, Send, Recv}), + ok + end}, + #{desc => "await (remote) handler ready", + cmd => fun(#{tester := Tester, + handler := Handler} = _State) -> + ?SEV_AWAIT_READY(Handler, handler, init, + [{tester, Tester}]) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (send)", + cmd => fun(#{tester := Tester, + handler := Handler} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, + send, + [{handler, Handler}]) + end}, + #{desc => "order handler to continue (send)", + cmd => fun(#{handler := Handler, + msg := Msg, + num := Num} = State) -> + Data = {Msg, Num}, + ?SEV_ANNOUNCE_CONTINUE(Handler, send, Data), + {ok, maps:remove(data, State)} + end}, + #{desc => "await remote handler ready (send)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, send, + [{tester, Tester}]) of + {ok, Result} -> + %% ?SEV_IPRINT("remote client result: " + %% "~n ~p", [Result]), + {ok, State#{result => Result}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (send)", + cmd => fun(#{tester := Tester, result := Result} = State) -> + ?SEV_ANNOUNCE_READY(Tester, send, Result), + {ok, maps:remove(result, State)} + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester, + [{handler, Handler}]) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "stop (remote) handler", + cmd => fun(#{handler := Handler}) -> + ?SEV_ANNOUNCE_TERMINATE(Handler), + ok + end}, + #{desc => "await (remote) handler termination", + cmd => fun(#{handler := Handler} = State) -> + ?SEV_AWAIT_TERMINATION(Handler), + State1 = maps:remove(handler, State), + {ok, State1} + end}, + #{desc => "stop client node", + cmd => fun(#{node := Node} = _State) -> + stop_node(Node) + end}, + #{desc => "await client node termination", + cmd => fun(#{node := Node} = State) -> + receive + {nodedown, Node} -> + {ok, maps:remove(node, State)} + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_sa => ServerSA}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, + server_sa := ServerSA} = _State) -> + ?SEV_ANNOUNCE_START(Pid, ServerSA), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% The actual test + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (send)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send), + ok + end}, + #{desc => "await client ready (send)", + cmd => fun(#{server := Server, + client := Client} = State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + {ok, {_, _, _, _} = Result} -> + ?SEV_IPRINT("client result: " + "~n ~p", [Result]), + {ok, State#{client_result => Result}}; + {ok, BadResult} -> + ?SEV_EPRINT("client result: " + "~n ~p", [BadResult]), + {error, {invalid_client_result, BadResult}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order server continue (close)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, close), + ok + end}, + #{desc => "await server ready (close)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, close) + end}, + %% Because of the way we control the server, there is no real + %% point in collecting statistics from it (the time will include + %% our communication with it). + #{desc => "await server ready (recv)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + case ?SEV_AWAIT_READY(Server, server, recv, + [{client, Client}]) of + {ok, _Result} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "present result", + cmd => fun(#{client_result := CRes, + num := Num} = State) -> + {CSent, CReceived, CStart, CStop} = CRes, + CTime = tdiff(CStart, CStop), + %% Note that the sizes we are counting is only + %% the "data" part of the messages. There is also + %% fixed header for each message, which of cource + %% is small for the large messages, but comparatively + %% big for the small messages! + ?SEV_IPRINT("Results: ~w messages exchanged" + "~n Client: ~w msec" + "~n ~.2f msec/message (roundtrip)" + "~n ~.2f messages/msec (roundtrip)" + "~n ~w bytes/msec sent" + "~n ~w bytes/msec received", + [Num, + CTime, + CTime / Num, + Num / CTime, + CSent div CTime, + CReceived div CTime]), + State1 = maps:remove(client_result, State), + {ok, State1} + end}, + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(client, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = #{domain => maps:get(domain, InitState), + recv => maps:get(recv, InitState), + send => maps:get(send, InitState), + buf_init => maps:get(buf_init, InitState)}, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start client evaluator(s)"), + ClientInitState = InitState#{host => local_host()}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid, + num => maps:get(num, InitState)}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + +%% Server side handler process +%% We don't actually need a separate process for this socket, +%% but we do it anyway to simplify the sequence. +tpp_udp_server_handler_create() -> + Self = self(), + erlang:spawn(fun() -> tpp_udp_server_handler(Self) end). + +tpp_udp_server_handler(Parent) -> + tpp_udp_server_handler_init(Parent), + {Sock, Send, Recv} = tpp_udp_handler_await_start(Parent), + tpp_udp_handler_announce_ready(Parent, init), + tpp_udp_handler_await_continue(Parent, recv), + Result = tpp_udp_server_handler_msg_exchange(Sock, Send, Recv), + tpp_udp_handler_announce_ready(Parent, recv, Result), + Reason = tpp_udp_handler_await_terminate(Parent), + ?SEV_IPRINT("terminating"), + exit(Reason). + +tpp_udp_server_handler_init(Parent) -> + put(sname, "shandler"), + ?SEV_IPRINT("init"), + _MRef = erlang:monitor(process, Parent), + ok. + +tpp_udp_server_handler_msg_exchange(Sock, Send, Recv) -> + tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). + +tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, + N, Sent, Received, Start) -> + %% ?SEV_IPRINT("[~w] try receive", [N]), + %% if + %% (N =:= (?TPP_SMALL_NUM-2)) -> + %% ?SEV_IPRINT("[~w] try receive", [N]), + %% socket:setopt(Sock, otp, debug, true); + %% true -> ok + %% end, + case tpp_udp_recv_req(Sock, Recv) of + {ok, Msg, RecvSz, From} -> + NewStart = if (Start =:= undefined) -> ?LIB:timestamp(); + true -> Start end, + %% ?SEV_IPRINT("[~w] received - now try send", [N]), + case tpp_udp_send_rep(Sock, Send, Msg, From) of + {ok, SendSz} -> + tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, + N+1, + Sent+SendSz, + Received+RecvSz, + NewStart); + {error, SReason} -> + ?SEV_EPRINT("send (~w): ~p", [N, SReason]), + exit({send, SReason, N}) + end; + %% {error, timeout} -> + %% ?SEV_IPRINT("timeout(~w) - try again", [N]), + %% case Send(Sock, list_to_binary("ping")) of + %% ok -> + %% exit({'ping-send', ok, N}); + %% {error, Reason} -> + %% exit({'ping-send', Reason, N}) + %% end; + {error, closed} -> + ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", [N, Sent, Received]), + Stop = ?LIB:timestamp(), + {N, Sent, Received, Start, Stop}; + {error, RReason} -> + ?SEV_EPRINT("recv (~w): ~p", [N, RReason]), + exit({recv, RReason, N}) + end. + + +%% The (remote) client side handler process + +tpp_udp_client_handler_create(Node) -> + Self = self(), + GL = group_leader(), + Fun = fun() -> tpp_udp_client_handler(Self, GL) end, + erlang:spawn(Node, Fun). + +tpp_udp_client_handler(Parent, GL) -> + tpp_udp_client_handler_init(Parent, GL), + {ServerSA, BufInit, Send, Recv} = tpp_udp_handler_await_start(Parent), + Domain = maps:get(family, ServerSA), + Sock = tpp_udp_sock_open(Domain, BufInit), + tpp_udp_sock_bind(Sock, Domain), + tpp_udp_handler_announce_ready(Parent, init), + {InitMsg, Num} = tpp_udp_handler_await_continue(Parent, send), + Result = tpp_udp_client_handler_msg_exchange(Sock, ServerSA, + Send, Recv, InitMsg, Num), + tpp_udp_handler_announce_ready(Parent, send, Result), + Reason = tpp_udp_handler_await_terminate(Parent), + tpp_udp_sock_close(Sock), + ?SEV_IPRINT("terminating"), + exit(Reason). + +tpp_udp_client_handler_init(Parent, GL) -> + put(sname, "chandler"), + ?SEV_IPRINT("init"), + _MRef = erlang:monitor(process, Parent), + group_leader(self(), GL), + ok. + +tpp_udp_client_handler_msg_exchange(Sock, ServerSA, Send, Recv, InitMsg, Num) -> + Start = ?LIB:timestamp(), + tpp_udp_client_handler_msg_exchange_loop(Sock, ServerSA, Send, Recv, InitMsg, + Num, 0, 0, 0, Start). + +tpp_udp_client_handler_msg_exchange_loop(_Sock, _Dest, _Send, _Recv, _Msg, + Num, Num, Sent, Received, + Start) -> + Stop = ?LIB:timestamp(), + {Sent, Received, Start, Stop}; +tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, Data, + Num, N, Sent, Received, Start) -> + %% d("tpp_udp_client_handler_msg_exchange_loop(~w,~w) try send", [Num,N]), + case tpp_udp_send_req(Sock, Send, Data, Dest) of + {ok, SendSz} -> + %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) sent - " + %% "now try recv", [Num,N]), + case tpp_udp_recv_rep(Sock, Recv) of + {ok, NewData, RecvSz, Dest} -> + tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, + NewData, Num, N+1, + Sent+SendSz, + Received+RecvSz, + Start); + {error, RReason} -> + ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), + exit({recv, RReason, N}) + end; + {error, SReason} -> + ?SEV_EPRINT("send (~w of ~w): ~p", [N, Num, SReason]), + exit({send, SReason, N}) + end. + + +tpp_udp_recv_req(Sock, Recv) -> + tpp_udp_recv(Sock, Recv, ?TPP_REQUEST). + +tpp_udp_recv_rep(Sock, Recv) -> + tpp_udp_recv(Sock, Recv, ?TPP_REPLY). + +tpp_udp_recv(Sock, Recv, Tag) -> + case Recv(Sock, 0) of + {ok, {Source, <> = Msg}} + when (Sz =:= size(Data)) -> + %% We got it all + %% ?SEV_IPRINT("tpp_udp_recv -> got all: " + %% "~n Source: ~p" + %% "~n Tag: ~p" + %% "~n Sz: ~p" + %% "~n size(Data): ~p", [Source, Tag, Sz, size(Data)]), + {ok, Data, size(Msg), Source}; + {ok, {Source, <> = Msg}} -> + %% ?SEV_IPRINT("tpp_udp_recv -> got part: " + %% "~n Source: ~p" + %% "~n Tag: ~p" + %% "~n Sz: ~p" + %% "~n size(Data): ~p", [Source, Tag, Sz, size(Data)]), + Remains = Sz - size(Data), + tpp_tcp_recv(Sock, Source, Recv, Tag, Remains, size(Msg), [Data]); + {ok, {_, <>}} -> + {error, {invalid_msg_tag, Tag}}; + {error, _} = ERROR -> + ERROR + end. + +%% We match against Source since we only communicate with one peer +tpp_tcp_recv(Sock, Source, Recv, Tag, Remaining, AccSz, Acc) -> + %% ?SEV_IPRINT("tpp_tcp_recv -> entry with" + %% "~n Tag: ~p" + %% "~n Remaining: ~p" + %% "~n AccSz: ~p" + %% "~n RcvBuf: ~p" + %% "~n SndBuf: ~p", + %% [Tag, Remaining, AccSz, + %% socket:getopt(Sock, socket, rcvbuf), + %% socket:getopt(Sock, socket, sndbuf)]), + case Recv(Sock, Remaining) of + {ok, {Source, Data}} when (Remaining =:= size(Data)) -> + %% ?SEV_IPRINT("tpp_udp_recv -> got rest: " + %% "~n Source: ~p" + %% "~n size(Data): ~p", [Source, size(Data)]), + %% We got the rest + TotSz = AccSz + size(Data), + {ok, + erlang:iolist_to_binary(lists:reverse([Data | Acc])), + TotSz, Source}; + {ok, {Source, Data}} when (Remaining > size(Data)) -> + %% ?SEV_IPRINT("tpp_udp_recv -> got part of rest: " + %% "~n Source: ~p" + %% "~n size(Data): ~p", [Source, size(Data)]), + tpp_tcp_recv(Sock, Source, Recv, Tag, + Remaining - size(Data), AccSz + size(Data), + [Data | Acc]); + {error, _} = ERROR -> + ERROR + end. + + +tpp_udp_send_req(Sock, Send, Data, Dest) -> + tpp_udp_send(Sock, Send, ?TPP_REQUEST, Data, Dest). + +tpp_udp_send_rep(Sock, Send, Data, Dest) -> + tpp_udp_send(Sock, Send, ?TPP_REPLY, Data, Dest). + +tpp_udp_send(Sock, Send, Tag, Data, Dest) -> + DataSz = size(Data), + Msg = <>, + tpp_udp_send_msg(Sock, Send, Msg, Dest, 0). + +tpp_udp_send_msg(Sock, Send, Msg, Dest, AccSz) when is_binary(Msg) -> + %% d("tpp_udp_send_msg -> entry with" + %% "~n size(Msg): ~p" + %% "~n Dest: ~p" + %% "~n AccSz: ~p" + %% "~n RcvBuf: ~p" + %% "~n SndBuf: ~p", + %% [size(Msg), Dest, AccSz, + %% socket:getopt(Sock, socket, rcvbuf), + %% socket:getopt(Sock, socket, sndbuf)]), + case Send(Sock, Msg, Dest) of + ok -> + {ok, AccSz+size(Msg)}; + {ok, Rest} -> % This is an IOVec + RestBin = list_to_binary(Rest), + tpp_udp_send_msg(Sock, Send, RestBin, Dest, + AccSz+(size(Msg)-size(RestBin))); + {error, _} = ERROR -> + ERROR + end. + + +tpp_udp_handler_await_start(Parent) -> + ?SEV_IPRINT("await start"), + ?SEV_AWAIT_START(Parent). + +tpp_udp_handler_announce_ready(Parent, Slogan) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan). +tpp_udp_handler_announce_ready(Parent, Slogan, Extra) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan, Extra). + +tpp_udp_handler_await_continue(Parent, Slogan) -> + ?SEV_IPRINT("await continue (~p)", [Slogan]), + case ?SEV_AWAIT_CONTINUE(Parent, parent, Slogan) of + ok -> + ?SEV_IPRINT("continue (~p): ok", [Slogan]), + ok; + {ok, Data} -> + ?SEV_IPRINT("continue (~p): ok with data", [Slogan]), + Data; + {error, Reason} -> + ?SEV_EPRINT("continue (~p): error" + "~n ~p", [Slogan, Reason]), + exit({continue, Slogan, Reason}) + end. + +tpp_udp_handler_await_terminate(Parent) -> + ?SEV_IPRINT("await terminate"), + case ?SEV_AWAIT_TERMINATE(Parent, parent) of + ok -> + ok; + {error, Reason} -> + Reason + end. + + +tpp_udp_sock_open(Domain, BufInit) -> + case socket:open(Domain, dgram, udp) of + {ok, Sock} -> + ok = BufInit(Sock), + Sock; + {error, Reason} -> + exit({open_failed, Reason}) + end. + +tpp_udp_sock_bind(Sock, Domain) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, + addr => LAddr}, + case socket:bind(Sock, LSA) of + {ok, _} -> + ok; + {error, Reason} -> + exit({bind, Reason}) + end. + +tpp_udp_sock_close(Sock) -> + case socket:close(Sock) of + ok -> + ok; + {error, Reason} -> + exit({close, Reason}) + end. + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% This gets the local address (not 127.0...) +%% We should really implement this using the (new) net module, +%% but until that gets the necessary functionality... +which_local_addr(Domain) -> + case inet:getifaddrs() of + {ok, IFL} -> + which_addr(Domain, IFL); + {error, Reason} -> + ?FAIL({inet, getifaddrs, Reason}) + end. + +which_addr(_Domain, []) -> + ?FAIL(no_address); +which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> + which_addr2(Domain, IFO); +which_addr(Domain, [_|IFL]) -> + which_addr(Domain, IFL). + +which_addr2(_Domain, []) -> + ?FAIL(no_address); +which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> + Addr; +which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> + Addr; +which_addr2(Domain, [_|IFO]) -> + which_addr2(Domain, IFO). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_node(Host, NodeName) -> + UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), + case do_start_node(Host, UniqueNodeName) of + {ok, _} = OK -> + OK; + {error, Reason, _} -> + {error, Reason} + end. + +do_start_node(Host, NodeName) when is_list(NodeName) -> + do_start_node(Host, list_to_atom(NodeName)); +do_start_node(Host, NodeName) when is_atom(NodeName) -> + Dir = filename:dirname(code:which(?MODULE)), + Flags = "-pa " ++ Dir, + Opts = [{monitor_master, true}, {erl_flags, Flags}], + ct_slave:start(Host, NodeName, Opts). + + +stop_node(Node) -> + case ct_slave:stop(Node) of + {ok, _} -> ok; {error, _} = ERROR -> ERROR @@ -8208,21 +9285,21 @@ f(F, A) -> %% i(Before ++ FStr ++ After, []). -%% d(F, A) -> -%% d(get(dbg_fd), F, A). +d(F, A) -> + d(get(dbg_fd), F, A). -%% d(undefined, F, A) -> -%% [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), -%% DbgFileName = f("~s-dbg.txt", [NodeNameStr]), -%% case file:open(DbgFileName, [write]) of -%% {ok, FD} -> -%% put(dbg_fd, FD), -%% d(FD, F, A); -%% {error, Reason} -> -%% exit({failed_open_dbg_file, Reason}) -%% end; -%% d(FD, F, A) -> -%% io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). +d(undefined, F, A) -> + [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), + DbgFileName = f("~s-dbg.txt", [NodeNameStr]), + case file:open(DbgFileName, [write]) of + {ok, FD} -> + put(dbg_fd, FD), + d(FD, F, A); + {error, Reason} -> + exit({failed_open_dbg_file, Reason}) + end; +d(FD, F, A) -> + io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). i(F) -> i(F, []). diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam index d3bc7c7af0..25046e6aad 100644 Binary files a/erts/preloaded/ebin/socket.beam and b/erts/preloaded/ebin/socket.beam differ diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index dd10aac3ff..a40692881b 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1958,6 +1958,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> next_timeout(TS, Timeout)); {nif_abort, RecvRef, Reason} -> + %% p("received nif-abort: ~p", [Reason]), {error, Reason} after NewTimeout -> @@ -1970,6 +1971,13 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> end. +%% pi(Item) -> +%% pi(self(), Item). + +%% pi(Pid, Item) -> +%% {Item, Info} = process_info(Pid, Item), +%% Info. + %% --------------------------------------------------------------------------- %% -- cgit v1.2.3