diff options
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 20 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 1020 |
2 files changed, 1030 insertions, 10 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 27395b5cf6..389d43ee42 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -13314,12 +13314,6 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); - /* <KOLLA> - * SHOULD RESULT IN {error, eagain}!!!! - * </KOLLA> - */ - written = 0; - } } @@ -13349,7 +13343,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, descP, NULL, sendRef); SSDBG( descP, - ("SOCKET", "send_check_result -> not entire package written\r\n") ); + ("SOCKET", "send_check_result -> " + "not entire package written (%d of %d)\r\n", written, dataSize) ); return esock_make_ok2(env, MKI(env, written)); @@ -13687,15 +13682,26 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { + int sres; + SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); + SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") ); + + /* SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); + */ + sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ), + descP, NULL, recvRef); + + SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); + return esock_make_error(env, esock_atom_eagain); } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 7a1a362181..90e9407f6a 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -88,7 +88,19 @@ %% Traffic traffic_send_and_recv_chunks_tcp4/1, - traffic_send_and_recv_chunks_tcp6/1 + traffic_send_and_recv_chunks_tcp6/1, + traffic_ping_pong_small_send_and_recv_tcp4/1, + traffic_ping_pong_small_send_and_recv_tcp6/1, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4/1, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_medium_send_and_recv_tcp4/1, + traffic_ping_pong_medium_send_and_recv_tcp6/1, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4/1, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6/1, + traffic_ping_pong_large_send_and_recv_tcp4/1, + traffic_ping_pong_large_send_and_recv_tcp6/1%, + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4/1, + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6/1 %% Tickets @@ -115,6 +127,8 @@ -define(TT(T), ct:timetrap(T)). +-define(LIB, socket_test_lib). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -243,7 +257,20 @@ sc_rc_cases() -> traffic_cases() -> [ traffic_send_and_recv_chunks_tcp4, - traffic_send_and_recv_chunks_tcp6 + traffic_send_and_recv_chunks_tcp6, + + traffic_ping_pong_small_send_and_recv_tcp4, + traffic_ping_pong_small_send_and_recv_tcp6, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp4, + %% traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_medium_send_and_recv_tcp4, + traffic_ping_pong_medium_send_and_recv_tcp6, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp4, + %% traffic_ping_pong_medium_sendmsg_and_recvmsg_tcp6, + traffic_ping_pong_large_send_and_recv_tcp4, + traffic_ping_pong_large_send_and_recv_tcp6%% , + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp4, + %% traffic_ping_pong_large_sendmsg_and_recvmsg_tcp6 ]. @@ -5713,6 +5740,8 @@ traffic_send_and_recv_chunks_tcp6(_Config) when is_list(_Config) -> end). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + traffic_send_and_recv_chunks_tcp(InitState) -> ServerSeq = [ @@ -6663,7 +6692,973 @@ traffic_snr_tcp_client_await_terminate(Parent) -> end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv4. + +traffic_ping_pong_small_send_and_recv_tcp4(suite) -> + []; +traffic_ping_pong_small_send_and_recv_tcp4(doc) -> + []; +traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_small_send_and_recv_tcp4, + fun() -> + ?TT(?SECS(15)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + %% Recv = fun(Sock) -> socket:recv(Sock, 0, 5000) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'small' message test case, for IPv6. + +traffic_ping_pong_small_send_and_recv_tcp6(suite) -> + []; +traffic_ping_pong_small_send_and_recv_tcp6(doc) -> + []; +traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_small_send_and_recv_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_small_send_and_receive_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv4. + +traffic_ping_pong_medium_send_and_recv_tcp4(suite) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp4(doc) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_medium_send_and_recv_tcp4, + fun() -> + %% not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'medium' message test case, for IPv6. + +traffic_ping_pong_medium_send_and_recv_tcp6(suite) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp6(doc) -> + []; +traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_medium_send_and_recv_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?SECS(30)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_medium_send_and_receive_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'large' message test case, for IPv4. + +traffic_ping_pong_large_send_and_recv_tcp4(suite) -> + []; +traffic_ping_pong_large_send_and_recv_tcp4(doc) -> + []; +traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_large_send_and_recv_tcp4, + fun() -> + %% not_yet_implemented(), + ?TT(?MINS(5)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + end). + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to test that the send and recv functions +%% by repeatedly sending a meassage between two entities. +%% The same basic test case is used for three different message sizes; +%% small (8 bytes), medium (8K) and large (8M). +%% The message is sent from A to B and then back again. This is +%% repeated a set number of times (more times the small the message). +%% This is the 'large' message test case, for IPv6. + +traffic_ping_pong_large_send_and_recv_tcp6(suite) -> + []; +traffic_ping_pong_large_send_and_recv_tcp6(doc) -> + []; +traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> + tc_try(traffic_ping_pong_large_send_and_recv_tcp6, + fun() -> + not_yet_implemented(), + ?TT(?MINS(5)), + Send = fun(Sock, Data) -> socket:send(Sock, Data) end, + Recv = fun(Sock) -> socket:recv(Sock) end, + InitState = #{domain => inet6, + send => Send, % Send function + recv => Recv % Receive function + }, + ok = traffic_ping_pong_large_send_and_receive_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-define(SMALL, lists:seq(1, 8)). +-define(MEDIUM, lists:flatten(lists:duplicate(1024, ?SMALL))). +-define(LARGE, lists:flatten(lists:duplicate(1024, ?MEDIUM))). + +traffic_ping_pong_small_send_and_receive_tcp(InitState) -> + Msg = l2b(?SMALL), + Num = 100000, + Fun = fun(_) -> ok end, %% Fun to update the buffers: Not needed here + traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, + num => Num, + buf_init => Fun}). + +traffic_ping_pong_medium_send_and_receive_tcp(InitState) -> + Msg = l2b(?MEDIUM), + Num = 100000, + Fun = fun(_) -> ok end, %% Fun to update the buffers: MAYBE needed here + traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, + num => Num, + buf_init => Fun}). + +traffic_ping_pong_large_send_and_receive_tcp(InitState) -> + Msg = l2b(?LARGE), + Num = 10, + Fun = fun(Sock) -> + ok = socket:setopt(Sock, socket, rcvbuf, 64*1024*1024), + ok = socket:setopt(Sock, socket, sndbuf, 64*1024*1024), + %% ok = socket:setopt(Sock, otp, rcvbuf, 64*1024*1024), + %% ok = socket:setopt(Sock, otp, sndbuf, 64*1024*1024), + ok + end, %% Fun to update the buffers: NEEDED here!!! + traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, + num => Num, + buf_init => Fun}). + +traffic_ping_pong_send_and_receive_tcp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, local_sa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, local_sa := LSA, lport := Port}) -> + ServerSA = LSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "accept", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, Sock} -> + {ok, State#{csock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "create handler", + cmd => fun(State) -> + Handler = tpp_tcp_handler_create(), + ?SEV_IPRINT("handler created: ~p", [Handler]), + {ok, State#{handler => Handler}} + end}, + #{desc => "monitor handler", + cmd => fun(#{handler := Handler} = _State) -> + _MRef = erlang:monitor(process, Handler), + ok + end}, + #{desc => "transfer connection socket ownership to handler", + cmd => fun(#{handler := Handler, csock := Sock} = _State) -> + socket:setopt(Sock, otp, controlling_process, Handler) + end}, + #{desc => "start handler", + cmd => fun(#{handler := Handler, + csock := Sock, + buf_init := BufInit, + send := Send, + recv := Recv} = _State) -> + ?SEV_ANNOUNCE_START(Handler, + {Sock, BufInit, Send, Recv}), + ok + end}, + #{desc => "await handler ready (init)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, init, + [{tester, Tester}]) of + ok -> + {ok, maps:remove(csock, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester, + handler := Handler} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv, + [{handler, Handler}]) + end}, + #{desc => "order handler to recv", + cmd => fun(#{handler := Handler} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Handler, recv), + ok + end}, + #{desc => "await handler ready (recv)", + cmd => fun(#{tester := Tester, + handler := Handler} = State) -> + case ?SEV_AWAIT_READY(Handler, handler, recv, + [{tester, Tester}]) of + {ok, Result} -> + %% ?SEV_IPRINT("Result: ~p", [Result]), + {ok, State#{result => Result}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv)", + cmd => fun(#{tester := Tester, + result := Result} = State) -> + ?SEV_ANNOUNCE_READY(Tester, recv, Result), + {ok, maps:remove(result, State)} + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "stop handler", + cmd => fun(#{handler := Handler}) -> + ?SEV_ANNOUNCE_TERMINATE(Handler), + ok + end}, + #{desc => "await handler termination", + cmd => fun(#{handler := Handler} = State) -> + ?SEV_AWAIT_TERMINATION(Handler), + State1 = maps:remove(handler, State), + {ok, State1} + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(lsock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, ServerSA} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, + server_sa => ServerSA}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "create node", + cmd => fun(#{host := Host} = State) -> + case start_node(Host, client) of + {ok, Node} -> + ?SEV_IPRINT("(remote) client node ~p started", + [Node]), + {ok, State#{node => Node}}; + {error, Reason, _} -> + {error, Reason} + end + end}, + #{desc => "monitor client node", + cmd => fun(#{node := Node} = _State) -> + true = erlang:monitor_node(Node, true), + ok + end}, + #{desc => "create remote client", + cmd => fun(#{node := Node} = State) -> + Pid = tpp_tcp_client_create(Node), + ?SEV_IPRINT("remote client created: ~p", [Pid]), + {ok, State#{rclient => Pid}} + end}, + #{desc => "monitor remote client", + cmd => fun(#{rclient := Pid}) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "order remote client to start", + cmd => fun(#{rclient := RClient, + server_sa := ServerSA, + buf_init := BufInit, + send := Send, + recv := Recv}) -> + ?SEV_ANNOUNCE_START(RClient, + {ServerSA, BufInit, Send, Recv}), + ok + end}, + #{desc => "await remote client ready", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_READY(RClient, rclient, init, + [{tester, Tester}]) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect, + [{rclient, RClient}]), + ok + end}, + #{desc => "order remote client to continue (connect)", + cmd => fun(#{rclient := RClient}) -> + ?SEV_ANNOUNCE_CONTINUE(RClient, connect), + ok + end}, + #{desc => "await remote client ready (connect)", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_READY(RClient, rclient, connect, + [{tester, Tester}]) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + #{desc => "await continue (send)", + cmd => fun(#{tester := Tester, + rclient := RClient} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, + send, + [{rclient, RClient}]) + end}, + #{desc => "order remote client to continue (send)", + cmd => fun(#{rclient := RClient, + msg := Msg, + num := Num} = State) -> + Data = {Msg, Num}, + ?SEV_ANNOUNCE_CONTINUE(RClient, send, Data), + {ok, maps:remove(data, State)} + end}, + #{desc => "await remote client ready (send)", + cmd => fun(#{tester := Tester, + rclient := RClient} = State) -> + case ?SEV_AWAIT_READY(RClient, rclient, send, + [{tester, Tester}]) of + {ok, Result} -> + %% ?SEV_IPRINT("remote client result: " + %% "~n ~p", [Result]), + {ok, State#{result => Result}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (send)", + cmd => fun(#{tester := Tester, result := Result} = State) -> + ?SEV_ANNOUNCE_READY(Tester, send, Result), + {ok, maps:remove(result, State)} + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester, + rclient := RClient} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester, + [{rclient, RClient}]) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "stop remote client", + cmd => fun(#{rclient := RClient}) -> + ?SEV_ANNOUNCE_TERMINATE(RClient), + ok + end}, + #{desc => "await remote client termination", + cmd => fun(#{rclient := RClient} = State) -> + ?SEV_AWAIT_TERMINATION(RClient), + State1 = maps:remove(rclient, State), + {ok, State1} + end}, + #{desc => "stop client node", + cmd => fun(#{node := Node} = _State) -> + stop_node(Node) + end}, + #{desc => "await client node termination", + cmd => fun(#{node := Node} = State) -> + receive + {nodedown, Node} -> + {ok, maps:remove(node, State)} + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_sa => ServerSA}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, + server_sa := ServerSA} = _State) -> + ?SEV_ANNOUNCE_START(Pid, ServerSA), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% The actual test + #{desc => "order server continue (accept)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), + ok + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept, + [{client, Client}]), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{server := Server, + client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect, + [{server, Server}]) + end}, + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + ?SEV_SLEEP(?SECS(1)), + #{desc => "order client continue (send)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send), + ok + end}, + #{desc => "await client ready (send)", + cmd => fun(#{server := Server, + client := Client} = State) -> + case ?SEV_AWAIT_READY(Client, client, send, + [{server, Server}]) of + {ok, {_, _, _, _} = Result} -> + ?SEV_IPRINT("client result: " + "~n ~p", [Result]), + {ok, State#{client_result => Result}}; + {ok, BadResult} -> + ?SEV_EPRINT("client result: " + "~n ~p", [BadResult]), + {error, {invalid_client_result, BadResult}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "await server ready (recv)", + cmd => fun(#{server := Server, + client := Client, + num := Num} = State) -> + case ?SEV_AWAIT_READY(Server, server, recv, + [{client, Client}]) of + {ok, {Num, _, _, _, _} = Result} -> + ?SEV_IPRINT("server result: " + "~n ~p", [Result]), + Result2 = erlang:delete_element(1, Result), + {ok, State#{server_result => Result2}}; + {ok, BadResult} -> + ?SEV_EPRINT("bad sever result: " + "~n ~p", [BadResult]), + {error, {invalid_server_result, BadResult}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "present result", + cmd => fun(#{server_result := SRes, + client_result := CRes, + num := Num} = State) -> + {SSent, SReceived, SStart, SStop} = SRes, + {CSent, CReceived, CStart, CStop} = CRes, + STime = tdiff(SStart, SStop), + CTime = tdiff(CStart, CStop), + ?SEV_IPRINT("Results: " + "~n Server: ~w msec" + "~n ~w messages/msec exchanged" + "~n ~w bytes/msec sent" + "~n ~w bytes/msec received" + "~n Client: ~w msec" + "~n ~w messages/msec exchanged" + "~n ~w bytes/msec sent" + "~n ~w bytes/msec received", + [STime, + Num div STime, + SSent div STime, + SReceived div STime, + CTime, + Num div CTime, + CSent div CTime, + CReceived div CTime]), + State1 = maps:remove(server_result, State), + State2 = maps:remove(client_result, State), + {ok, State2} + end}, + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(client, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = #{domain => maps:get(domain, InitState), + recv => maps:get(recv, InitState), + send => maps:get(send, InitState), + buf_init => maps:get(buf_init, InitState)}, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start client evaluator(s)"), + ClientInitState = InitState#{host => local_host()}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid, + num => maps:get(num, InitState)}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + +tpp_tcp_handler_create() -> + Self = self(), + erlang:spawn(fun() -> tpp_tcp_handler(Self) end). + +tpp_tcp_handler(Parent) -> + tpp_tcp_handler_init(Parent), + {Sock, BufInit, Send, Recv} = tpp_tcp_handler_await_start(Parent), + tpp_tcp_handler_announce_ready(Parent, init), + tpp_tcp_handler_await_continue(Parent, recv), + Result = tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv), + tpp_tcp_handler_announce_ready(Parent, recv, Result), + Reason = tpp_tcp_handler_await_terminate(Parent), + exit(Reason). + +tpp_tcp_handler_init(Parent) -> + put(sname, "handler"), + ?SEV_IPRINT("init"), + _MRef = erlang:monitor(process, Parent), + ok. + +tpp_tcp_handler_await_start(Parent) -> + ?SEV_IPRINT("await start"), + ?SEV_AWAIT_START(Parent). + +tpp_tcp_handler_announce_ready(Parent, Slogan) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan). +tpp_tcp_handler_announce_ready(Parent, Slogan, Extra) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan, Extra). + +tpp_tcp_handler_await_continue(Parent, Slogan) -> + ?SEV_IPRINT("await continue (~p)", [Slogan]), + case ?SEV_AWAIT_CONTINUE(Parent, parent, Slogan) of + ok -> + %% ?SEV_IPRINT("continue (~p): ok", [Slogan]), + ok; + {error, Reason} -> + ?SEV_EPRINT("continue (~p): error" + "~n ~p", [Slogan, Reason]), + exit({continue, Slogan, Reason}) + end. + +tpp_tcp_handler_await_terminate(Parent) -> + ?SEV_IPRINT("await terminate"), + case ?SEV_AWAIT_TERMINATE(Parent, parent) of + ok -> + ok; + {error, Reason} -> + Reason + end. + +tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv) -> + ok = BufInit(Sock), + socket:setopt(Sock, otp, debug, true), + tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). + +tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> + %% if (N =:= 1000) -> socket:setopt(Sock, otp, debug, true); true -> ok end, + ?SEV_IPRINT("[~w] try receive", [N]), + case Recv(Sock) of + {ok, Msg} -> + NewStart = if (Start =:= undefined) -> ?LIB:timestamp(); + true -> Start end, + ?SEV_IPRINT("[~w] received - now try send", [N]), + case Send(Sock, Msg) of + ok -> + tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, + N+1, + Sent+size(Msg), + Received+size(Msg), + NewStart); + {error, SReason} -> + ?SEV_EPRINT("send (~w): ~p", [N, SReason]), + exit({send, SReason, N}) + end; + %% {error, timeout} -> + %% ?SEV_IPRINT("timeout(~w) - try again", [N]), + %% case Send(Sock, list_to_binary("ping")) of + %% ok -> + %% exit({'ping-send', ok, N}); + %% {error, Reason} -> + %% exit({'ping-send', Reason, N}) + %% end; + {error, closed} -> + ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", [N, Sent, Received]), + Stop = ?LIB:timestamp(), + {N, Sent, Received, Start, Stop}; + {error, RReason} -> + ?SEV_EPRINT("recv (~w): ~p", [N, RReason]), + exit({recv, RReason, N}) + end. + +%% The (remote) client process + +tpp_tcp_client_create(Node) -> + Self = self(), + GL = group_leader(), + Fun = fun() -> tpp_tcp_client(Self, GL) end, + erlang:spawn(Node, Fun). + +tpp_tcp_client(Parent, GL) -> + tpp_tcp_client_init(Parent, GL), + {ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent), + Domain = maps:get(family, ServerSA), + Sock = tpp_tcp_client_sock_open(Domain), + tpp_tcp_client_sock_bind(Sock, Domain), + tpp_tcp_client_announce_ready(Parent, init), + tpp_tcp_client_await_continue(Parent, connect), + tpp_tcp_client_sock_connect(Sock, ServerSA), + tpp_tcp_client_announce_ready(Parent, connect), + {InitMsg, Num} = tpp_tcp_client_await_continue(Parent, send), + Result = tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num), + tpp_tcp_client_announce_ready(Parent, send, Result), + Reason = tpp_tcp_client_await_terminate(Parent), + tpp_tcp_client_sock_close(Sock), + exit(Reason). + +tpp_tcp_client_init(Parent, GL) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), + _MRef = erlang:monitor(process, Parent), + group_leader(self(), GL), + ok. + +tpp_tcp_client_await_start(Parent) -> + ?SEV_IPRINT("await start"), + ?SEV_AWAIT_START(Parent). + +tpp_tcp_client_announce_ready(Parent, Slogan) -> + ?SEV_IPRINT("announce ready (~p)", [Slogan]), + ?SEV_ANNOUNCE_READY(Parent, Slogan). +tpp_tcp_client_announce_ready(Parent, Slogan, Extra) -> + ?SEV_IPRINT("announce ready (~p): ~p", [Slogan, Extra]), + ?SEV_ANNOUNCE_READY(Parent, Slogan, Extra). + +tpp_tcp_client_await_continue(Parent, Slogan) -> + ?SEV_IPRINT("await continue (~p)", [Slogan]), + case ?SEV_AWAIT_CONTINUE(Parent, parent, Slogan) of + ok -> + %% ?SEV_IPRINT("continue (~p): ok", [Slogan]), + ok; + {ok, Data} -> + %% ?SEV_IPRINT("continue (~p): ok with data", [Slogan]), + Data; + {error, Reason} -> + ?SEV_EPRINT("continue (~p): error" + "~n ~p", [Slogan, Reason]), + exit({continue, Slogan, Reason}) + end. + +tpp_tcp_client_await_terminate(Parent) -> + ?SEV_IPRINT("await terminate"), + case ?SEV_AWAIT_TERMINATE(Parent, parent) of + ok -> + ok; + {error, Reason} -> + Reason + end. + +tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num) -> + ok = BufInit(Sock), + Start = ?LIB:timestamp(), + tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, InitMsg, + Num, 0, 0, 0, Start). + +tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, + Num, Num, Sent, Received, + Start) -> + Stop = ?LIB:timestamp(), + case socket:close(Sock) of + ok -> + {Sent, Received, Start, Stop}; + {error, Reason} -> + exit({failed_closing, Reason}) + end; +tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg, + Num, N, Sent, Received, Start) -> + d("[~w,~w] try send", [Num,N]), + case Send(Sock, Msg) of + ok -> + d("[~w,~w] sent - no try recv", [Num,N]), + case Recv(Sock) of + {ok, NewMsg} -> + tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, + NewMsg, Num, N+1, + Sent+size(Msg), + Received+size(NewMsg), + Start); + {error, RReason} -> + ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), + exit({recv, RReason, N}) + end; + {error, SReason} -> + ?SEV_EPRINT("send (~w of ~w): ~p", [N, Num, SReason]), + exit({send, SReason, N}) + end. + +tpp_tcp_client_sock_open(Domain) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + Sock; + {error, Reason} -> + exit({open_failed, Reason}) + end. + +tpp_tcp_client_sock_bind(Sock, Domain) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, + addr => LAddr}, + case socket:bind(Sock, LSA) of + {ok, _} -> + ok; + {error, Reason} -> + exit({bind, Reason}) + end. + +tpp_tcp_client_sock_connect(Sock, ServerSA) -> + case socket:connect(Sock, ServerSA) of + ok -> + ok; + {error, Reason} -> + exit({connect, Reason}) + end. + +tpp_tcp_client_sock_close(Sock) -> + case socket:close(Sock) of + ok -> + ok; + {error, Reason} -> + exit({close, Reason}) + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This gets the local address (not 127.0...) @@ -6698,7 +7693,7 @@ which_addr2(Domain, [_|IFO]) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% start_node(Host, NodeName) -> - UniqueNodeName = f("~w_~w", [NodeName, erlang:unique_integer([positive])]), + UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), case do_start_node(Host, UniqueNodeName) of {ok, _} = OK -> OK; @@ -6923,6 +7918,9 @@ tc_which_name() -> l2a(S) when is_list(S) -> list_to_atom(S). +l2b(L) when is_list(L) -> + list_to_binary(L). + b2l(B) when is_binary(B) -> binary_to_list(B). @@ -6953,6 +7951,22 @@ f(F, A) -> %% i(Before ++ FStr ++ After, []). +d(F, A) -> + d(get(dbg_fd), F, A). + +d(undefined, F, A) -> + [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), + DbgFileName = f("~s-dbg.txt", [NodeNameStr]), + case file:open(DbgFileName, [write]) of + {ok, FD} -> + put(dbg_fd, FD), + d(FD, F, A); + {error, Reason} -> + exit({failed_open_dbg_file, Reason}) + end; +d(FD, F, A) -> + io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). + i(F) -> i(F, []). |