From 2d6f0763ddf80614d792cad96df38d0698eeb037 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Tue, 30 Apr 2019 12:43:44 +0200 Subject: [esock|test] Add multi async recv and recvmsg TCP test case(s) Add recv and recvmsg test case testing the abort message received when "another" process closes a socket. Multiple readers for good measure. --- erts/emulator/test/socket_SUITE.erl | 580 +++++++++++++++++++++++++++++++++++- 1 file changed, 579 insertions(+), 1 deletion(-) (limited to 'erts/emulator/test') diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index bd5d408add..52854cb7ad 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -94,6 +94,8 @@ api_a_mrecvfrom_cancel_udp4/1, api_a_mrecvmsg_cancel_udp4/1, api_a_maccept_cancel_tcp4/1, + api_a_mrecv_cancel_tcp4/1, + api_a_mrecvmsg_cancel_tcp4/1, %% *** API Options *** @@ -689,7 +691,9 @@ api_async_cases() -> api_a_recvmsg_cancel_tcp4, api_a_mrecvfrom_cancel_udp4, api_a_mrecvmsg_cancel_udp4, - api_a_maccept_cancel_tcp4 + api_a_maccept_cancel_tcp4, + api_a_mrecv_cancel_tcp4, + api_a_mrecvmsg_cancel_tcp4 ]. api_options_cases() -> @@ -5412,6 +5416,580 @@ api_a_maccept_cancel_tcp(InitState) -> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to recv +%% (from *several* processes), wait some time and then cancel, +%% This should result in abort messages to the 'other' processes. +%% +api_a_mrecv_cancel_tcp4(suite) -> + []; +api_a_mrecv_cancel_tcp4(doc) -> + []; +api_a_mrecv_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_mrecv_cancel_tcp4, + fun() -> + Recv = fun(Sock) -> + socket:recv(Sock, 0, nowait) + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_mrecv_cancel_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to recvmsg +%% (from *several* processes), wait some time and then cancel, +%% This should result in abort messages to the 'other' processes. +%% +api_a_mrecvmsg_cancel_tcp4(suite) -> + []; +api_a_mrecvmsg_cancel_tcp4(doc) -> + []; +api_a_mrecvmsg_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_mrecvmsg_cancel_tcp4, + fun() -> + Recv = fun(Sock) -> + socket:recvmsg(Sock, nowait) + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_mrecv_cancel_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_mrecv_cancel_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection (nowait)", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, CSock} -> + {ok, State#{csock => CSock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester, csock := Sock}) -> + ?SEV_ANNOUNCE_READY(Tester, accept, Sock), + ok + end}, + + #{desc => "await continue (nowait recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{csock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {ok, {select, T, R} = SelectInfo} -> + ?SEV_IPRINT("recv select: " + "~n Tag: ~p" + "~n Ref: ~p", [T, R]), + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{csock := Sock, + recv_select_info := {select, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ok + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + AltServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, Sock} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, sock => Sock}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {ok, {select, _, _} = SelectInfo} -> + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await abort message", + cmd => fun(#{sock := Sock, + recv_select_info := {select, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ?SEV_IPRINT("message queue: ~p", + [process_info(self(), + messages)]), + {error, timeout} + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + ?SEV_IPRINT("terminating"), + State1 = maps:remove(recv_select_info, State), + State2 = maps:remove(tester, State1), + State3 = maps:remove(sock, State2), + {ok, State3}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** The init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := Domain, server_port := Port} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, + addr => LAddr}, + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% *** The actual test *** + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect) + end}, + #{desc => "connect to server", + cmd => fun(#{sock := Sock, server_sa := SSA}) -> + socket:connect(Sock, SSA) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 1", + cmd => fun(#{alt_server1 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 2", + cmd => fun(#{alt_server2 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, server_port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "order client to continue (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Pid} = State) -> + {ok, Sock} = ?SEV_AWAIT_READY(Pid, server, accept), + {ok, State#{sock => Sock}} + end}, + + %% Start the alt server 1 + #{desc => "order alt-server 1 start", + cmd => fun(#{alt_server1 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 1 ready (init)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, init) + end}, + + %% Start the alt server 2 + #{desc => "order alt-server 2 start", + cmd => fun(#{alt_server2 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 2 ready (init)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, init) + end}, + + + %% *** The actual test *** + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await server ready (recv select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + + #{desc => "order alt-server 1 continue (recv)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await alt-server 1 ready (recv select)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, recv_select) + end}, + + #{desc => "order alt-server 2 continue (recv)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await alt-server 2 ready (recv select)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, recv_select) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "close the socket", + cmd => fun(#{sock := Sock} = _State) -> + socket:close(Sock) + end}, + + #{desc => "await server ready (abort)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, abort) + end}, + #{desc => "await alt-server 1 ready (abort)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, abort) + end}, + #{desc => "await alt-server 2 ready (abort)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, abort) + end}, + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + + #{desc => "order alt-server 2 to terminate", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 2 termination", + cmd => fun(#{alt_server2 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server2, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order alt-server 1 to terminate", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 1 termination", + cmd => fun(#{alt_server1 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server1, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start alt-server 1 evaluator"), + AltServer1 = ?SEV_START("alt_server1", AltServerSeq, InitState), + + i("start alt-server 2 evaluator"), + AltServer2 = ?SEV_START("alt_server2", AltServerSeq, InitState), + + i("start client evaluator"), + Client = ?SEV_START("client", ClientSeq, InitState), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + alt_server1 => AltServer1#ev.pid, + alt_server2 => AltServer2#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, AltServer1, AltServer2, Client, Tester]). + + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% -- cgit v1.2.3