diff options
author | Micael Karlberg <[email protected]> | 2019-06-14 12:08:44 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-06-14 12:08:44 +0200 |
commit | 06a802061c190a0da920e9b281ded17a2407c01b (patch) | |
tree | 6a5bed547421b42b2b09a4daf8aa42bc141ca388 /erts/emulator | |
parent | 8b39c942035fd28e9e0b1d3aa676a12916979b14 (diff) | |
parent | 0f13e8db460ee0e75a3f4b8681a803f5e2128f09 (diff) | |
download | otp-06a802061c190a0da920e9b281ded17a2407c01b.tar.gz otp-06a802061c190a0da920e9b281ded17a2407c01b.tar.bz2 otp-06a802061c190a0da920e9b281ded17a2407c01b.zip |
Merge branch 'maint'
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 146 | ||||
-rwxr-xr-x | erts/emulator/test/esock_ttest/esock-ttest | 22 | ||||
-rwxr-xr-x | erts/emulator/test/esock_ttest/esock-ttest-server-sock | 24 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 4109 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_logger.erl | 2 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client_socket.erl | 82 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server.erl | 101 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server_socket.erl | 18 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_socket.erl | 329 |
9 files changed, 4653 insertions, 180 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 25bc712949..a14d3c860b 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -825,6 +825,10 @@ typedef struct { ErlNifPid ctrlPid; ESockMonitor ctrlMon; + /* +++ Connector process +++ */ + ErlNifPid connPid; + ESockMonitor connMon; + /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; ESockRequestor currentWriter; @@ -2377,6 +2381,8 @@ static int socket_setopt(int sock, const void* optVal, const socklen_t optLen); +static BOOLEAN_T is_connector(ErlNifEnv* env, + ESockDescriptor* descP); static BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err); static ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); @@ -4895,17 +4901,17 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, return esock_make_error(env, atom_closed); if (!IS_OPEN(descP)) { - SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") ); + SSDBG( descP, ("SOCKET", "nconnect -> not open\r\n") ); return esock_make_error(env, atom_exbadstate); } if (IS_CONNECTED(descP)) { - SSDBG( descP, ("SOCKET", "nif_connect -> already connected\r\n") ); + SSDBG( descP, ("SOCKET", "nconnect -> already connected\r\n") ); return esock_make_error(env, atom_eisconn); } - if (IS_CONNECTING(descP)) { - SSDBG( descP, ("SOCKET", "nif_connect -> already connecting\r\n") ); + if (IS_CONNECTING(descP) && !is_connector(env, descP)) { + SSDBG( descP, ("SOCKET", "nconnect -> already connecting\r\n") ); return esock_make_error(env, esock_atom_einval); } @@ -4919,31 +4925,93 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, descP->addrLen); save_errno = sock_errno(); - SSDBG( descP, ("SOCKET", "nif_connect -> connect result: %d, %d\r\n", + SSDBG( descP, ("SOCKET", "nconnect -> connect result: %d, %d\r\n", code, save_errno) ); - if (IS_SOCKET_ERROR(code) && - ((save_errno == ERRNO_BLOCK) || /* Winsock2 */ - (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ - ref = MKREF(env); - descP->state = SOCKET_STATE_CONNECTING; - if ((sres = esock_select_write(env, descP->sock, descP, NULL, - sockRef, ref)) < 0) { - res = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } else { - res = esock_make_ok2(env, ref); + if (IS_SOCKET_ERROR(code)) { + switch (save_errno) { + case ERRNO_BLOCK: /* Winsock2 */ + case EINPROGRESS: /* Unix & OSE!! */ + SSDBG( descP, ("SOCKET", "nconnect -> would block => select\r\n") ); + + ref = MKREF(env); + + if (IS_CONNECTING(descP)) { + /* Glitch */ + res = esock_make_ok2(env, ref); + } else { + + /* First time here */ + + if (enif_self(env, &descP->connPid) == NULL) + return esock_make_error(env, atom_exself); + + if (MONP("nconnect -> conn", + env, descP, + &descP->connPid, + &descP->connMon) != 0) + return esock_make_error(env, atom_exmon); + + descP->state = SOCKET_STATE_CONNECTING; + + if ((sres = esock_select_write(env, descP->sock, descP, NULL, + sockRef, ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_ok2(env, ref); + } + } + break; + + case EISCONN: + SSDBG( descP, ("SOCKET", "nconnect -> *already* connected\r\n") ); + { + /* This is ***strange*** so make sure */ + int err = 0; + if (!verify_is_connected(descP, &err)) { + descP->state = SOCKET_STATE_OPEN; /* restore state */ + res = esock_make_error_errno(env, err); + } else { + descP->state = SOCKET_STATE_CONNECTED; + /* And just to be on the safe side, reset these */ + enif_set_pid_undefined(&descP->connPid); + DEMONP("nconnect -> connected", + env, descP, &descP->connMon); + descP->isReadable = TRUE; + descP->isWritable = TRUE; + res = esock_atom_ok; + } + } + break; + + default: + SSDBG( descP, ("SOCKET", "nconnect -> other error(1): %d\r\n", + save_errno) ); + res = esock_make_error_errno(env, save_errno); + break; } + } else if (code == 0) { /* ok we are connected */ + SSDBG( descP, ("SOCKET", "nconnect -> connected\r\n") ); + descP->state = SOCKET_STATE_CONNECTED; + enif_set_pid_undefined(&descP->connPid); + DEMONP("nconnect -> connected", env, descP, &descP->connMon); descP->isReadable = TRUE; descP->isWritable = TRUE; res = esock_atom_ok; + } else { + /* Do we really need this case? */ + + SSDBG( descP, ("SOCKET", "nconnect -> other error(2): %d\r\n", + save_errno) ); + res = esock_make_error_errno(env, save_errno); } @@ -4998,7 +5066,7 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, { int error; - if (descP->state != SOCKET_STATE_CONNECTING) + if (!IS_CONNECTING(descP)) return esock_make_error(env, atom_enotconn); if (!verify_is_connected(descP, &error)) { @@ -5007,6 +5075,8 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, } descP->state = SOCKET_STATE_CONNECTED; + enif_set_pid_undefined(&descP->connPid); + DEMONP("nfinalize_connection -> connected", env, descP, &descP->connMon); descP->isReadable = TRUE; descP->isWritable = TRUE; @@ -5069,6 +5139,29 @@ BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err) +/* *** is_connector *** + * Check if the current process is the connector process. + */ +#if !defined(__WIN32__) +static +BOOLEAN_T is_connector(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return FALSE; + + if (COMPARE_PIDS(&descP->connPid, &caller) == 0) + return TRUE; + else + return FALSE; + +} +#endif + + + /* ---------------------------------------------------------------------- * nif_listen * @@ -16873,7 +16966,9 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) char buf[64]; /* Buffer used for building the mutex name */ // This needs to be released when the socket is closed! - // descP->env = enif_alloc_env(); + + enif_set_pid_undefined(&descP->connPid); + MON_INIT(&descP->connMon); sprintf(buf, "esock[w,%d]", sock); descP->writeMtx = MCREATE(buf); @@ -18652,6 +18747,17 @@ void socket_down(ErlNifEnv* env, MON2T(env, mon)); } + } else if (COMPARE_PIDS(&descP->connPid, pid) == 0) { + + /* The connPid is only set during the connection. + * The same goes for the monitor (connMon). + */ + + descP->state = SOCKET_STATE_OPEN; /* restore state */ + enif_set_pid_undefined(&descP->connPid); + DEMONP("socket_down -> connector", + env, descP, &descP->connMon); + } else { /* check all operation queue(s): acceptor, writer and reader. diff --git a/erts/emulator/test/esock_ttest/esock-ttest b/erts/emulator/test/esock_ttest/esock-ttest index cf1d9cd9ab..9adc51fc8b 100755 --- a/erts/emulator/test/esock_ttest/esock-ttest +++ b/erts/emulator/test/esock_ttest/esock-ttest @@ -60,6 +60,9 @@ usage() -> "~n Which domain to use." "~n Only valid for server." "~n Defaults to: inet" + "~n --async Asynchronous mode (Timeout = nowait)" + "~n This option is only valid for transport = sock." + "~n Also, its only used when active =/= false." "~n --active <active> boolean() | once." "~n Valid for both client and server." "~n Defaults to: false" @@ -111,6 +114,7 @@ process_args(Args) -> process_server_args(Args) -> Defaults = #{role => server, domain => inet, + async => false, active => false, transport => {sock, plain}}, process_server_args(Args, Defaults). @@ -124,6 +128,9 @@ process_server_args(["--domain", Domain|Args], State) (Domain =:= "inet6")) -> process_server_args(Args, State#{domain => list_to_atom(Domain)}); +process_server_args(["--async"|Args], State) -> + process_server_args(Args, State#{async => true}); + process_server_args(["--active", Active|Args], State) when ((Active =:= "false") orelse (Active =:= "once") orelse @@ -145,6 +152,7 @@ process_server_args([Arg|_], _State) -> process_client_args(Args) -> Defaults = #{role => client, + async => false, active => false, transport => {sock, plain}, %% Will cause error if not provided @@ -159,10 +167,13 @@ process_client_args(Args) -> process_client_args([], State) -> process_client_args_ensure_max_outstanding(State); +process_client_args(["--async"|Args], State) -> + process_client_args(Args, State#{async => true}); + process_client_args(["--active", Active|Args], State) - when ((Active =:= "false") orelse - (Active =:= "once") orelse - (Active =:= "true")) -> + when (Active =:= "false") orelse + (Active =:= "once") orelse + (Active =:= "true") -> process_client_args(Args, State#{active => list_to_atom(Active)}); process_client_args(["--transport", "gen" | Args], State) -> @@ -280,9 +291,10 @@ exec(#{role := server, end; exec(#{role := server, domain := Domain, + async := Async, active := Active, transport := {sock, Method}}) -> - case socket_test_ttest_tcp_server_socket:start(Method, Domain, Active) of + case socket_test_ttest_tcp_server_socket:start(Method, Domain, Async, Active) of {ok, {Pid, _}} -> MRef = erlang:monitor(process, Pid), receive @@ -323,12 +335,14 @@ exec(#{role := client, end; exec(#{role := client, server := ServerInfo, + async := Async, active := Active, transport := {sock, Method}, msg_id := MsgID, max_outstanding := MaxOutstanding, runtime := RunTime}) -> case socket_test_ttest_tcp_client_socket:start(true, + Async, Method, ServerInfo, Active, diff --git a/erts/emulator/test/esock_ttest/esock-ttest-server-sock b/erts/emulator/test/esock_ttest/esock-ttest-server-sock index 4ec0d335d9..fc87499f09 100755 --- a/erts/emulator/test/esock_ttest/esock-ttest-server-sock +++ b/erts/emulator/test/esock_ttest/esock-ttest-server-sock @@ -24,9 +24,27 @@ EMU=$ERL_TOP/erts/emulator EMU_TEST=$EMU/test ESOCK_TTEST=$EMU_TEST/esock_ttest -if [ $# = 1 ]; then - ACTIVE="--active $1" +# $1 - async - boolean() +# $2 - active - once | boolean() +if [ $# = 2 ]; then + + async=$1 + active=$2 + + if [ $async = true ]; then + ASYNC="--async" + else + ASYNC= + fi + + ACTIVE="--active $active" + +else + echo "<ERROR> Missing args: async and active" + echo "" + exit 1 fi -$ESOCK_TTEST/esock-ttest --server --transport sock $ACTIVE + +$ESOCK_TTEST/esock-ttest --server $ASYNC --transport sock $ACTIVE diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 49b0fcccc2..0ec097836b 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -81,6 +81,24 @@ api_b_sendmsg_and_recvmsg_tcp4/1, api_b_sendmsg_and_recvmsg_tcpL/1, + %% *** API async *** + api_a_connect_tcp4/1, + api_a_sendto_and_recvfrom_udp4/1, + api_a_sendmsg_and_recvmsg_udp4/1, + api_a_send_and_recv_tcp4/1, + api_a_sendmsg_and_recvmsg_tcp4/1, + api_a_recvfrom_cancel_udp4/1, + api_a_recvmsg_cancel_udp4/1, + api_a_accept_cancel_tcp4/1, + api_a_recv_cancel_tcp4/1, + api_a_recvmsg_cancel_tcp4/1, + api_a_mrecvfrom_cancel_udp4/1, + api_a_mrecvmsg_cancel_udp4/1, + api_a_maccept_cancel_tcp4/1, + api_a_mrecv_cancel_tcp4/1, + api_a_mrecvmsg_cancel_tcp4/1, + + %% *** API Options *** api_opt_simple_otp_options/1, api_opt_simple_otp_rcvbuf_option/1, @@ -565,6 +583,7 @@ use_group(Group, Env, Default) -> groups() -> [{api, [], api_cases()}, {api_basic, [], api_basic_cases()}, + {api_async, [], api_async_cases()}, {api_options, [], api_options_cases()}, {api_op_with_timeout, [], api_op_with_timeout_cases()}, {socket_closure, [], socket_closure_cases()}, @@ -639,6 +658,7 @@ groups() -> api_cases() -> [ {group, api_basic}, + {group, api_async}, {group, api_options}, {group, api_op_with_timeout} ]. @@ -659,6 +679,25 @@ api_basic_cases() -> api_b_sendmsg_and_recvmsg_tcpL ]. +api_async_cases() -> + [ + api_a_connect_tcp4, + api_a_sendto_and_recvfrom_udp4, + api_a_sendmsg_and_recvmsg_udp4, + api_a_send_and_recv_tcp4, + api_a_sendmsg_and_recvmsg_tcp4, + api_a_recvfrom_cancel_udp4, + api_a_recvmsg_cancel_udp4, + api_a_accept_cancel_tcp4, + api_a_recv_cancel_tcp4, + api_a_recvmsg_cancel_tcp4, + api_a_mrecvfrom_cancel_udp4, + api_a_mrecvmsg_cancel_udp4, + api_a_maccept_cancel_tcp4, + api_a_mrecv_cancel_tcp4, + api_a_mrecvmsg_cancel_tcp4 + ]. + api_options_cases() -> [ api_opt_simple_otp_options, @@ -1543,11 +1582,12 @@ init_per_group(ttest = _GroupName, Config) -> io:format("init_per_group(~w) -> entry with" "~n Config: ~p" "~n", [_GroupName, Config]), + ttest_manager_start(), case lists:keysearch(esock_test_ttest_runtime, 1, Config) of {value, _} -> Config; false -> - [{esock_test_ttest_runtime, which_ttest_runtime_env()}|Config] + [{esock_test_ttest_runtime, which_ttest_runtime_env()} | Config] end; init_per_group(_GroupName, Config) -> Config. @@ -1556,6 +1596,7 @@ end_per_group(ttest = _GroupName, Config) -> io:format("init_per_group(~w) -> entry with" "~n Config: ~p" "~n", [_GroupName, Config]), + ttest_manager_stop(), lists:keydelete(esock_test_ttest_runtime, 1, Config); end_per_group(_GroupName, Config) -> Config. @@ -1565,16 +1606,16 @@ init_per_testcase(_TC, Config) -> io:format("init_per_testcase(~w) -> entry with" "~n Config: ~p" "~n", [_TC, Config]), - case quiet_mode(Config) of - default -> - ?LOGGER:start(); - Quiet -> - ?LOGGER:start(Quiet) - end, + %% case quiet_mode(Config) of + %% default -> + %% ?LOGGER:start(); + %% Quiet -> + %% ?LOGGER:start(Quiet) + %% end, Config. end_per_testcase(_TC, Config) -> - ?LOGGER:stop(), + %% ?LOGGER:stop(), Config. @@ -2526,11 +2567,7 @@ api_b_send_and_recv_tcp(InitState) -> ?SEV_ANNOUNCE_CONTINUE(Server, accept), ok end}, - #{desc => "sleep", - cmd => fun(_) -> - ?SLEEP(?SECS(1)), - ok - end}, + ?SEV_SLEEP(?SECS(1)), #{desc => "order client to continue (with connect)", cmd => fun(#{client := Client} = _State) -> ?SEV_ANNOUNCE_CONTINUE(Client, connect), @@ -2617,6 +2654,3852 @@ api_b_send_and_recv_tcp(InitState) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically establish a TCP connection via an async connect. + +api_a_connect_tcp4(suite) -> + []; +api_a_connect_tcp4(doc) -> + []; +api_a_connect_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_connect_tcp4, + fun() -> + Connect = fun(Sock, SockAddr) -> + socket:connect(Sock, SockAddr, nowait) + end, + Send = fun(Sock, Data) -> + socket:send(Sock, Data) + end, + Recv = fun(Sock) -> + socket:recv(Sock) + end, + InitState = #{domain => inet, + connect => Connect, + send => Send, + recv => Recv}, + ok = api_a_connect_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_connect_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, Sock} -> + ?SEV_IPRINT("accepted: ~n ~p", [Sock]), + {ok, State#{csock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + + #{desc => "await continue (recv_req)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_req) + end}, + #{desc => "recv req", + cmd => fun(#{csock := Sock, recv := Recv}) -> + case Recv(Sock) of + {ok, ?BASIC_REQ} -> + ok; + {ok, UnexpData} -> + {error, {unexpected_data, UnexpData}}; + {error, _} = ERROR -> + %% At the moment there is no way to get + %% status or state for the socket... + ERROR + end + end}, + #{desc => "announce ready (recv_req)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_req), + ok + end}, + #{desc => "await continue (send_rep)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_rep) + end}, + #{desc => "send rep", + cmd => fun(#{csock := Sock, send := Send}) -> + Send(Sock, ?BASIC_REP) + end}, + #{desc => "announce ready (send_rep)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_rep), + ok + end}, + + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket", + cmd => fun(#{csock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(csock, State)} + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(lsock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** The init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := Domain, server_port := Port} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% *** The actual test *** + #{desc => "await continue (async connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, async_connect) + end}, + #{desc => "connect (async) to server", + cmd => fun(#{sock := Sock, + server_sa := SSA, + connect := Connect} = State) -> + case Connect(Sock, SSA) of + ok -> + ?SEV_IPRINT("ok -> " + "unexpected success => SKIP", + []), + {skip, unexpected_success}; + {select, {select_info, ST, SR}} -> + ?SEV_IPRINT("select ->" + "~n tag: ~p" + "~n ref: ~p", [ST, SR]), + {ok, State#{connect_stag => ST, + connect_sref => SR}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (connect select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{sock := Sock, connect_sref := Ref}) -> + receive + {'$socket', Sock, select, Ref} -> + ?SEV_IPRINT("select message ->" + "~n ref: ~p", [Ref]), + ok + after 5000 -> + ?SEV_EPRINT("timeout: " + "~n message queue: ~p", + [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "connect (async) to server", + cmd => fun(#{sock := Sock, server_sa := SSA, connect := Connect}) -> + case Connect(Sock, SSA) of + ok -> + ok; + {select, SelectInfo} -> + {error, {unexpected_select, SelectInfo}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + #{desc => "get peername", + cmd => fun(#{sock := Sock} = _State) -> + case socket:peername(Sock) of + {ok, SockAddr} -> + ?SEV_IPRINT("Peer Name: ~p", [SockAddr]), + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "await continue (send_req)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_req) + end}, + #{desc => "send req", + cmd => fun(#{sock := Sock, send := Send}) -> + Send(Sock, ?BASIC_REQ) + end}, + #{desc => "announce ready (send_req)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_req), + ok + end}, + #{desc => "await continue (recv_rep)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_rep) + end}, + #{desc => "recv rep", + cmd => fun(#{sock := Sock, recv := Recv}) -> + case Recv(Sock) of + {ok, ?BASIC_REP} -> + ok; + {ok, UnexpData} -> + {error, {unexpected_data, UnexpData}}; + {error, _} = ERROR -> + %% At the moment there is no way to get + %% status or state for the socket... + ERROR + end + end}, + #{desc => "announce ready (recv_rep)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_rep), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + ok = socket:close(Sock), + State2 = maps:remove(sock, State), + State3 = maps:remove(connect_stag, State2), + State4 = maps:remove(connect_sref, State3), + {ok, State4} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, server_port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + + %% *** The actual test *** + #{desc => "order client to continue (async connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, async_connect), + ok + end}, + #{desc => "await client ready (connect select)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect_select) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "await client ready (select)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, select) + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order server to recv test req (recv req)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, recv_req), + ok + end}, + #{desc => "order client to send test req (send req)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send_req), + ok + end}, + #{desc => "await client ready (send_req)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, send_req) + end}, + #{desc => "await server ready (recv_req)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, recv_req) + end}, + #{desc => "order client to recv test rep (send rep)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, recv_rep), + ok + end}, + #{desc => "order server to send test rep (send rep)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, send_rep), + ok + end}, + #{desc => "await server ready (send_rep)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, send_rep) + end}, + #{desc => "await client ready (recv_rep)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, recv_rep) + end}, + + + %% *** Termination *** + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start client evaluator"), + Client = ?SEV_START("client", ClientSeq, InitState), + i("await evaluator(s)"), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically send and receive on an IPv4 UDP (dgram) socket using +%% sendto and recvfrom. But we try to be async. That is, we use +%% the 'nowait' value for the Timeout argument (and await the eventual +%% select message). Note that we only do this for the recvfrom, +%% since its much more difficult to "arrange" for sendto. +%% +api_a_sendto_and_recvfrom_udp4(suite) -> + []; +api_a_sendto_and_recvfrom_udp4(doc) -> + []; +api_a_sendto_and_recvfrom_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(5)), + tc_try(api_a_sendto_and_recvfrom_udp4, + fun() -> + Send = fun(Sock, Data, Dest) -> + socket:sendto(Sock, Data, Dest) + end, + Recv = fun(Sock) -> + socket:recvfrom(Sock, 0, nowait) + end, + InitState = #{domain => inet, + send => Send, + recv => Recv}, + ok = api_a_send_and_recv_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically send and receive on an IPv4 UDP (dgram) socket using +%% sendto and recvfrom. But we try to be async. That is, we use +%% the 'nowait' value for the Timeout argument (and await the eventual +%% select message). Note that we only do this for the recvmsg, +%% since its much more difficult to "arrange" for sendmsg. +%% +api_a_sendmsg_and_recvmsg_udp4(suite) -> + []; +api_a_sendmsg_and_recvmsg_udp4(doc) -> + []; +api_a_sendmsg_and_recvmsg_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(5)), + tc_try(api_a_sendmsg_and_recvmsg_udp4, + fun() -> + Send = fun(Sock, Data, Dest) -> + MsgHdr = #{addr => Dest, + %% ctrl => CMsgHdrs, + iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock) -> + case socket:recvmsg(Sock, nowait) of + {ok, #{addr := Source, + iov := [Data]}} -> + {ok, {Source, Data}}; + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + send => Send, + recv => Recv}, + ok = api_a_send_and_recv_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_send_and_recv_udp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, dgram, udp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind socket (to local address)", + cmd => fun(#{sock := Sock, local_sa := LSA} = State) -> + case socket:bind(Sock, LSA) of + {ok, Port} -> + {ok, State#{port => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, local_sa := LSA, port := Port}) -> + ServerSA = LSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), + ok + end}, + + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, {select_info, Tag, RecvRef}} -> + ?SEV_IPRINT("expected select: " + "~n Tag: ~p" + "~n Ref: ~p", [Tag, RecvRef]), + {ok, State#{recv_stag => Tag, + recv_sref => RecvRef}}; + {ok, X} -> + {error, {unexpected_succes, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{sock := Sock, recv_sref := RecvRef}) -> + receive + {'$socket', Sock, select, RecvRef} -> + ok + after 5000 -> + ?SEV_EPRINT("message queue: ~p", [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "now read the data (request)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {ok, {Src, ?BASIC_REQ}} -> + {ok, State#{req_src => Src}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv request)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_req), + ok + end}, + + #{desc => "await continue (send reply)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_reply) + end}, + #{desc => "send reply", + cmd => fun(#{sock := Sock, req_src := Src, send := Send}) -> + Send(Sock, ?BASIC_REP, Src) + end}, + #{desc => "announce ready (send)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + State2 = maps:remove(tester, State), + State3 = maps:remove(recv_stag, State2), + State4 = maps:remove(recv_sref, State3), + State5 = maps:remove(req_src, State4), + {ok, State5}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(sock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, ServerSA} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, + server_sa => ServerSA}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "open socket", + cmd => fun(#{domain := Domain} = State) -> + Sock = sock_open(Domain, dgram, udp), + SA = sock_sockname(Sock), + {ok, State#{sock => Sock, sa => SA}} + end}, + #{desc => "bind socket (to local address)", + cmd => fun(#{sock := Sock, lsa := LSA}) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (send request)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_req) + end}, + #{desc => "send request", + cmd => fun(#{sock := Sock, server_sa := Server, send := Send}) -> + Send(Sock, ?BASIC_REQ, Server) + end}, + #{desc => "announce ready (send request)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_req), + ok + end}, + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv reply (with nowait)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, {select_info, Tag, RecvRef}} -> + {ok, State#{recv_stag => Tag, + recv_sref => RecvRef}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{sock := Sock, recv_sref := RecvRef}) -> + receive + {'$socket', Sock, select, RecvRef} -> + ok + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "now read the data (reply)", + cmd => fun(#{sock := Sock, recv := Recv}) -> + case Recv(Sock) of + {ok, {_Src, ?BASIC_REP}} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv reply)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_rep), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + State2 = maps:remove(tester, State), + State3 = maps:remove(recv_stag, State2), + State4 = maps:remove(recv_sref, State3), + {ok, State4}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(sock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_sa => ServerSA}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, + server_sa := ServerSA} = _State) -> + ?SEV_ANNOUNCE_START(Pid, ServerSA), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% The actual test + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await server ready (recv_select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + + #{desc => "order client continue (send request)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send_req), + ok + end}, + #{desc => "await client ready (send request)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, send_req) + end}, + #{desc => "await server ready (select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, select) + end}, + #{desc => "await server ready (recv request)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, recv_req) + end}, + + #{desc => "order client continue (recv)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await client ready (recv_select)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, recv_select) + end}, + #{desc => "order server continue (send reply)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send_reply), + ok + end}, + #{desc => "await server ready (send)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, send) + end}, + #{desc => "await client ready (select)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, select) + end}, + #{desc => "await client ready (recv reply)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, recv_rep) + end}, + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(client, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = InitState, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start client evaluator(s)"), + ClientInitState = InitState, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically send and receive using the "common" functions (send and recv) +%% on an IPv4 TCP (stream) socket. But we try to be async. That is, we use +%% the 'nowait' value for the Timeout argument (and await the eventual +%% select message). Note that we only do this for the recv, +%% since its much more difficult to "arrange" for send. +%% We *also* test async for accept. +api_a_send_and_recv_tcp4(suite) -> + []; +api_a_send_and_recv_tcp4(doc) -> + []; +api_a_send_and_recv_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_send_and_recv_tcp4, + fun() -> + Send = fun(Sock, Data) -> + socket:send(Sock, Data) + end, + Recv = fun(Sock) -> + socket:recv(Sock, 0, nowait) + end, + InitState = #{domain => inet, + send => Send, + recv => Recv}, + ok = api_a_send_and_recv_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically send and receive using the msg functions (sendmsg and recvmsg) +%% on an IPv4 TCP (stream) socket. But we try to be async. That is, we use +%% the 'nowait' value for the Timeout argument (and await the eventual +%% select message). Note that we only do this for the recvmsg, +%% since its much more difficult to "arrange" for sendmsg. +%% We *also* test async for accept. +api_a_sendmsg_and_recvmsg_tcp4(suite) -> + []; +api_a_sendmsg_and_recvmsg_tcp4(doc) -> + []; +api_a_sendmsg_and_recvmsg_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_sendmsg_and_recvmsg_tcp4, + fun() -> + Send = fun(Sock, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(Sock, MsgHdr) + end, + Recv = fun(Sock) -> + case socket:recvmsg(Sock, nowait) of + {ok, #{addr := undefined, + iov := [Data]}} -> + {ok, Data}; + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + send => Send, + recv => Recv}, + ok = api_a_send_and_recv_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_send_and_recv_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection (nowait)", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock, nowait) of + {select, {select_info, Tag, Ref}} -> + ?SEV_IPRINT("accept select: " + "~n Tag: ~p" + "~n Ref: ~p", [Tag, Ref]), + {ok, State#{accept_stag => Tag, + accept_sref => Ref}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{lsock := Sock, accept_sref := Ref}) -> + receive + {'$socket', Sock, select, Ref} -> + ok + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "await connection (again)", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock, nowait) of + {ok, Sock} -> + ?SEV_IPRINT("accepted: " + "~n Sock: ~p", [Sock]), + {ok, State#{csock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + + #{desc => "await continue (recv request)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_req) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{csock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, {select_info, Tag, Ref}} -> + ?SEV_IPRINT("recv select: " + "~n Tag: ~p" + "~n Ref: ~p", [Tag, Ref]), + {ok, State#{recv_stag => Tag, + recv_sref => Ref}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{csock := Sock, recv_sref := RecvRef}) -> + receive + {'$socket', Sock, select, RecvRef} -> + ok + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "now read the data (request)", + cmd => fun(#{csock := Sock, recv := Recv} = _State) -> + case Recv(Sock) of + {ok, ?BASIC_REQ} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv request)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_req), + ok + end}, + + #{desc => "await continue (send reply)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_rep) + end}, + #{desc => "send reply", + cmd => fun(#{csock := Sock, send := Send}) -> + Send(Sock, ?BASIC_REP) + end}, + #{desc => "announce ready (send reply)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_rep), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket", + cmd => fun(#{csock := Sock}) -> + socket:close(Sock) + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + ClientSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** The init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := Domain, server_port := Port} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% *** The actual test *** + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect) + end}, + #{desc => "connect to server", + cmd => fun(#{sock := Sock, server_sa := SSA}) -> + socket:connect(Sock, SSA) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + #{desc => "await continue (send request)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_req) + end}, + #{desc => "send request (to server)", + cmd => fun(#{sock := Sock, send := Send}) -> + ok = Send(Sock, ?BASIC_REQ) + end}, + #{desc => "announce ready (send request)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_req), + ok + end}, + + #{desc => "try recv reply (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, {select_info, Tag, Ref}} -> + ?SEV_IPRINT("recv select: " + "~n Tag: ~p" + "~n Ref: ~p", [Tag, Ref]), + {ok, State#{recv_stag => Tag, + recv_sref => Ref}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{sock := Sock, recv_sref := RecvRef}) -> + receive + {'$socket', Sock, select, RecvRef} -> + ok + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "now read the data (reply)", + cmd => fun(#{sock := Sock, recv := Recv}) -> + {ok, ?BASIC_REP} = Recv(Sock), + ok + end}, + #{desc => "announce ready (recv reply)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_rep), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, server_port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% *** The actual test *** + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "await server ready (accept select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, accept_select) + end}, + #{desc => "order client to continue (with connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), + ok + end}, + #{desc => "await server ready (select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, select) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, accept) + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, client, connect) + end}, + + #{desc => "order server to continue (recv request)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv_req), + ok + end}, + #{desc => "await server ready (recv select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + #{desc => "order client to continue (send request)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send_req), + ok + end}, + #{desc => "await client ready (send request)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, send_req) + end}, + #{desc => "await server ready (select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, select) + end}, + #{desc => "await server ready (recv request)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, recv_req) + end}, + + #{desc => "order client to continue (recv reply)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv_rep), + ok + end}, + #{desc => "await client ready (recv select)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, client, recv_select) + end}, + #{desc => "order server to continue (send reply)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, send_rep), + ok + end}, + #{desc => "await server ready (send reply)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, send_rep) + end}, + #{desc => "await client ready (select)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, client, select) + end}, + #{desc => "await client ready (reply recv)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, recv_rep) + end}, + + + %% *** Termination *** + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start client evaluator"), + Client = ?SEV_START("client", ClientSeq, InitState), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make an async (Timeout = nowait) call to recvfrom, +%% wait some time and then cancel. +%% +api_a_recvfrom_cancel_udp4(suite) -> + []; +api_a_recvfrom_cancel_udp4(doc) -> + []; +api_a_recvfrom_cancel_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_recvfrom_cancel_udp4, + fun() -> + Recv = fun(Sock) -> + case socket:recvfrom(Sock, 0, nowait) of + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_recv_cancel_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make an async (Timeout = nowait) call to recvmsg, +%% wait some time and then cancel. +%% +api_a_recvmsg_cancel_udp4(suite) -> + []; +api_a_recvmsg_cancel_udp4(doc) -> + []; +api_a_recvmsg_cancel_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_recvmsg_cancel_udp4, + fun() -> + Recv = fun(Sock) -> + case socket:recvmsg(Sock, nowait) of + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_recv_cancel_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_recv_cancel_udp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, dgram, udp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind socket (to local address)", + cmd => fun(#{sock := Sock, local_sa := LSA} = State) -> + case socket:bind(Sock, LSA) of + {ok, Port} -> + {ok, State#{port => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, local_sa := LSA, port := Port}) -> + ServerSA = LSA#{port => Port}, + ?SEV_ANNOUNCE_READY(Tester, init, ServerSA), + ok + end}, + + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, SelectInfo} -> + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message (without success)", + cmd => fun(#{sock := Sock}) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}} + after 5000 -> + ok + end + end}, + #{desc => "announce ready (no select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, no_select), + ok + end}, + #{desc => "await continue (cancel)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, cancel) + end}, + #{desc => "cancel", + cmd => fun(#{sock := Sock, recv_select_info := SelectInfo}) -> + ok = socket:cancel(Sock, SelectInfo) + end}, + #{desc => "announce ready (cancel)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, cancel), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + State2 = maps:remove(tester, State), + State3 = maps:remove(recv_stag, State2), + State4 = maps:remove(recv_sref, State3), + State5 = maps:remove(req_src, State4), + {ok, State5}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(sock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, ServerSA} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_sa => ServerSA}} + end}, + + %% The actual test + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await server ready (recv select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + #{desc => "await server ready (no select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, no_select) + end}, + #{desc => "order server continue (cancel)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, cancel), + ok + end}, + #{desc => "await server ready (cancel)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, cancel) + end}, + + %% Terminations + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = InitState, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Tester]). + + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make an async (Timeout = nowait) call to accept, +%% wait some time and then cancel. +%% +api_a_accept_cancel_tcp4(suite) -> + []; +api_a_accept_cancel_tcp4(doc) -> + []; +api_a_accept_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_accept_cancel_tcp4, + fun() -> + Accept = fun(Sock) -> + case socket:accept(Sock, nowait) of + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + accept => Accept}, + ok = api_a_accept_cancel_tcp(InitState) + end). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_accept_cancel_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection (nowait)", + cmd => fun(#{lsock := LSock, accept := Accept} = State) -> + case Accept(LSock) of + {select, {select_info, T, R} = SelectInfo} -> + ?SEV_IPRINT("accept select: " + "~n T: ~p" + "~n R: ~p", [T, R]), + {ok, State#{accept_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept_select), + ok + end}, + #{desc => "await select message (without success)", + cmd => fun(#{lsock := Sock}) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}} + after 5000 -> + ok + end + end}, + #{desc => "announce ready (no select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, no_select), + ok + end}, + #{desc => "await continue (cancel)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, cancel) + end}, + #{desc => "cancel", + cmd => fun(#{lsock := Sock, accept_select_info := SelectInfo}) -> + ok = socket:cancel(Sock, SelectInfo) + end}, + #{desc => "announce ready (cancel)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, cancel), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% *** The actual test *** + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "await server ready (accept select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, accept_select) + end}, + #{desc => "await server ready (no select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, no_select) + end}, + #{desc => "order server to continue (cancel)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, cancel), + ok + end}, + #{desc => "await server ready (cancel)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, cancel) + end}, + + %% *** Termination *** + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make an async (Timeout = nowait) call to recv, +%% wait some time and then cancel. +%% +api_a_recv_cancel_tcp4(suite) -> + []; +api_a_recv_cancel_tcp4(doc) -> + []; +api_a_recv_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_recv_cancel_tcp4, + fun() -> + Recv = fun(Sock) -> + socket:recv(Sock, 0, nowait) + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_recv_cancel_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make an async (Timeout = nowait) call to recvmsg, +%% wait some time and then cancel. +%% +api_a_recvmsg_cancel_tcp4(suite) -> + []; +api_a_recvmsg_cancel_tcp4(doc) -> + []; +api_a_recvmsg_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_recvmsg_cancel_tcp4, + fun() -> + Recv = fun(Sock) -> + socket:recvmsg(Sock, nowait) + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_recv_cancel_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_recv_cancel_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection (nowait)", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, CSock} -> + {ok, State#{csock => CSock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + + #{desc => "await continue (nowait recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{csock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, {select_info, T, R} = SelectInfo} -> + ?SEV_IPRINT("recv select: " + "~n Tag: ~p" + "~n Ref: ~p", [T, R]), + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{csock := Sock}) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}} + after 5000 -> + ok + end + end}, + #{desc => "announce ready (no select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, no_select), + ok + end}, + #{desc => "await continue (cancel)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, cancel) + end}, + #{desc => "cancel", + cmd => fun(#{csock := Sock, recv_select_info := SelectInfo}) -> + ok = socket:cancel(Sock, SelectInfo) + end}, + #{desc => "announce ready (cancel)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, cancel), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket", + cmd => fun(#{csock := Sock}) -> + socket:close(Sock) + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + ClientSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** The init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := Domain, server_port := Port} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% *** The actual test *** + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect) + end}, + #{desc => "connect to server", + cmd => fun(#{sock := Sock, server_sa := SSA}) -> + socket:connect(Sock, SSA) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, server_port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% *** The actual test *** + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "order client to continue (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, accept) + end}, + + #{desc => "order server to continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await server ready (recv select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + #{desc => "await server ready (no select)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, no_select) + end}, + #{desc => "order server to continue (send request)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, cancel), + ok + end}, + #{desc => "await server ready (cancel)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, server, cancel) + end}, + + %% *** Termination *** + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start client evaluator"), + Client = ?SEV_START("client", ClientSeq, InitState), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to recvfrom +%% (from *several* processes), wait some time and then cancel. +%% This should result in abort messages to the 'other' processes. +%% +api_a_mrecvfrom_cancel_udp4(suite) -> + []; +api_a_mrecvfrom_cancel_udp4(doc) -> + []; +api_a_mrecvfrom_cancel_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_mrecvfrom_cancel_udp4, + fun() -> + Recv = fun(Sock) -> + case socket:recvfrom(Sock, 0, nowait) of + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_mrecv_cancel_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to recvmsg +%% (from *several* processes), wait some time and then cancel. +%% This should result in abort messages to the 'other' processes. +%% +api_a_mrecvmsg_cancel_udp4(suite) -> + []; +api_a_mrecvmsg_cancel_udp4(doc) -> + []; +api_a_mrecvmsg_cancel_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_mrecvmsg_cancel_udp4, + fun() -> + Recv = fun(Sock) -> + case socket:recvmsg(Sock, nowait) of + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_mrecv_cancel_udp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_mrecv_cancel_udp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, dgram, udp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind socket (to local address)", + cmd => fun(#{sock := Sock, local_sa := LSA} = State) -> + case socket:bind(Sock, LSA) of + {ok, Port} -> + {ok, State#{port => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, sock := Sock}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Sock), + ok + end}, + + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, SelectInfo} -> + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await abort message", + cmd => fun(#{sock := Sock, + recv_select_info := {select_info, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ?SEV_EPRINT("message queue: ~p", [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + State2 = maps:remove(tester, State), + State3 = maps:remove(recv_stag, State2), + State4 = maps:remove(recv_sref, State3), + State5 = maps:remove(req_src, State4), + {ok, State5}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + AltServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, Sock} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, sock => Sock}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, SelectInfo} -> + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await abort message", + cmd => fun(#{sock := Sock, + recv_select_info := {select_info, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ?SEV_EPRINT("message queue: ~p", [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + ?SEV_IPRINT("terminating"), + State1 = maps:remove(recv_select_info, State), + State2 = maps:remove(tester, State1), + State3 = maps:remove(sock, State2), + {ok, State3}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 1", + cmd => fun(#{alt_server1 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 2", + cmd => fun(#{alt_server2 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Sock} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{sock => Sock}} + end}, + + %% Start the alt-server 1 + #{desc => "order alt-server 1 start", + cmd => fun(#{alt_server1 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 1 ready (init)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, alt_server1, init) + end}, + + %% Start the alt-server 2 + #{desc => "order alt-server 2 start", + cmd => fun(#{alt_server2 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 2 ready (init)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, alt_server2, init) + end}, + + + %% The actual test + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await server ready (recv select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + + #{desc => "order alt-server 1 continue (recv)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await alt-server 1 ready (recv select)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, recv_select) + end}, + + #{desc => "order alt-server 2 continue (recv)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await alt-server 2 ready (recv select)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, recv_select) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "close the socket", + cmd => fun(#{sock := Sock} = _State) -> + socket:close(Sock) + end}, + + #{desc => "await server ready (abort)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, abort) + end}, + #{desc => "await alt-server 1 ready (abort)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, abort) + end}, + #{desc => "await alt-server 2 ready (abort)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, abort) + end}, + + %% Terminations + #{desc => "order alt-server 2 to terminate", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 2 termination", + cmd => fun(#{alt_server2 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server2, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order alt-server 1 to terminate", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 1 termination", + cmd => fun(#{alt_server1 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server1, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start alt-server 1 evaluator"), + AltServer1 = ?SEV_START("alt_server1", AltServerSeq, InitState), + + i("start alt-server 2 evaluator"), + AltServer2 = ?SEV_START("alt_server2", AltServerSeq, InitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + alt_server1 => AltServer1#ev.pid, + alt_server2 => AltServer2#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, AltServer1, AltServer2, Tester]). + + + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to accept +%% (from *several* processes), wait some time and then cancel, +%% This should result in abort messages to the 'other' processes. +%% +api_a_maccept_cancel_tcp4(suite) -> + []; +api_a_maccept_cancel_tcp4(doc) -> + []; +api_a_maccept_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_maccept_cancel_tcp4, + fun() -> + Accept = fun(Sock) -> + case socket:accept(Sock, nowait) of + {ok, _} = OK -> + OK; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end + end, + InitState = #{domain => inet, + accept => Accept}, + ok = api_a_maccept_cancel_tcp(InitState) + end). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_maccept_cancel_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lsock := Sock}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Sock), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection (nowait)", + cmd => fun(#{lsock := LSock, accept := Accept} = State) -> + case Accept(LSock) of + {select, {select_info, T, R} = SelectInfo} -> + ?SEV_IPRINT("accept select: " + "~n T: ~p" + "~n R: ~p", [T, R]), + {ok, State#{accept_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept_select), + ok + end}, + #{desc => "await select message (without success)", + cmd => fun(#{lsock := Sock, + accept_select_info := {select_info, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(lsock, State)} + after 5000 -> + ?SEV_EPRINT("message queue: ~p", [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + AltServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, Sock} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, lsock => Sock}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "try accept request (with nowait, expect select)", + cmd => fun(#{lsock := Sock, accept := Accept} = State) -> + case Accept(Sock) of + {select, SelectInfo} -> + {ok, State#{accept_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept_select), + ok + end}, + #{desc => "await abort message", + cmd => fun(#{lsock := Sock, + accept_select_info := {select_info, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ?SEV_EPRINT("message queue: ~p", [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + ?SEV_IPRINT("terminating"), + State1 = maps:remove(tester, State), + State2 = maps:remove(accept_select_info, State1), + State3 = maps:remove(lsock, State2), + {ok, State3}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 1", + cmd => fun(#{alt_server1 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 2", + cmd => fun(#{alt_server2 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Sock} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{sock => Sock}} + end}, + + %% Start the alt-server 1 + #{desc => "order alt-server 1 start", + cmd => fun(#{alt_server1 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 1 ready (init)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, alt_server1, init) + end}, + + %% Start the alt-server 2 + #{desc => "order alt-server 2 start", + cmd => fun(#{alt_server2 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 2 ready (init)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, alt_server2, init) + end}, + + + %% *** The actual test *** + #{desc => "order server continue (accept)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + #{desc => "await server ready (accept select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, accept_select) + end}, + + #{desc => "order alt-server 1 continue (accept)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + #{desc => "await alt-server 1 ready (accept select)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, accept_select) + end}, + + #{desc => "order alt-server 2 continue (accept)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, accept), + ok + end}, + #{desc => "await alt-server 2 ready (accept select)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, accept_select) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "close the socket", + cmd => fun(#{sock := Sock} = _State) -> + socket:close(Sock) + end}, + + #{desc => "await server ready (abort)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, abort) + end}, + #{desc => "await alt-server 1 ready (abort)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, abort) + end}, + #{desc => "await alt-server 2 ready (abort)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, abort) + end}, + + + %% *** Termination *** + #{desc => "order alt-server 2 to terminate", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 2 termination", + cmd => fun(#{alt_server2 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server2, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order alt-server 1 to terminate", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 1 termination", + cmd => fun(#{alt_server1 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server1, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start alt-server 1 evaluator"), + AltServer1 = ?SEV_START("alt_server1", AltServerSeq, InitState), + + i("start alt-server 2 evaluator"), + AltServer2 = ?SEV_START("alt_server2", AltServerSeq, InitState), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + alt_server1 => AltServer1#ev.pid, + alt_server2 => AltServer2#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, AltServer1, AltServer2, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to recv +%% (from *several* processes), wait some time and then cancel, +%% This should result in abort messages to the 'other' processes. +%% +api_a_mrecv_cancel_tcp4(suite) -> + []; +api_a_mrecv_cancel_tcp4(doc) -> + []; +api_a_mrecv_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_mrecv_cancel_tcp4, + fun() -> + Recv = fun(Sock) -> + socket:recv(Sock, 0, nowait) + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_mrecv_cancel_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Basically we make multiple async (Timeout = nowait) call(s) to recvmsg +%% (from *several* processes), wait some time and then cancel, +%% This should result in abort messages to the 'other' processes. +%% +api_a_mrecvmsg_cancel_tcp4(suite) -> + []; +api_a_mrecvmsg_cancel_tcp4(doc) -> + []; +api_a_mrecvmsg_cancel_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(20)), + tc_try(api_a_mrecvmsg_cancel_tcp4, + fun() -> + Recv = fun(Sock) -> + socket:recvmsg(Sock, nowait) + end, + InitState = #{domain => inet, + recv => Recv}, + ok = api_a_mrecv_cancel_tcp(InitState) + end). + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_mrecv_cancel_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection (nowait)", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, CSock} -> + {ok, State#{csock => CSock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester, csock := Sock}) -> + ?SEV_ANNOUNCE_READY(Tester, accept, Sock), + ok + end}, + + #{desc => "await continue (nowait recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{csock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, {select_info, T, R} = SelectInfo} -> + ?SEV_IPRINT("recv select: " + "~n Tag: ~p" + "~n Ref: ~p", [T, R]), + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{csock := Sock, + recv_select_info := {select_info, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ok + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + AltServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + {Tester, Sock} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, sock => Sock}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (recv)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv) + end}, + #{desc => "try recv request (with nowait, expect select)", + cmd => fun(#{sock := Sock, recv := Recv} = State) -> + case Recv(Sock) of + {select, SelectInfo} -> + {ok, State#{recv_select_info => SelectInfo}}; + {ok, X} -> + {error, {unexpected_select_info, X}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (recv_select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_select), + ok + end}, + #{desc => "await abort message", + cmd => fun(#{sock := Sock, + recv_select_info := {select_info, _, Ref}} = State) -> + receive + {'$socket', Sock, select, Ref} -> + {error, {unexpected_select, Ref}}; + {'$socket', Sock, abort, {Ref, closed}} -> + {ok, maps:remove(sock, State)} + after 5000 -> + ?SEV_EPRINT("message queue: ~p", [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (abort)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, abort), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + ?SEV_IPRINT("terminating"), + State1 = maps:remove(recv_select_info, State), + State2 = maps:remove(tester, State1), + State3 = maps:remove(sock, State2), + {ok, State3}; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** The init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := Domain, server_port := Port} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% *** The actual test *** + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect) + end}, + #{desc => "connect to server", + cmd => fun(#{sock := Sock, server_sa := SSA}) -> + socket:connect(Sock, SSA) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock}) -> + socket:close(Sock) + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 1", + cmd => fun(#{alt_server1 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor alt-server 2", + cmd => fun(#{alt_server2 := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, server_port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "order client to continue (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, connect), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Pid} = _State) -> + ?SEV_AWAIT_READY(Pid, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Pid} = State) -> + {ok, Sock} = ?SEV_AWAIT_READY(Pid, server, accept), + {ok, State#{sock => Sock}} + end}, + + %% Start the alt server 1 + #{desc => "order alt-server 1 start", + cmd => fun(#{alt_server1 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 1 ready (init)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, init) + end}, + + %% Start the alt server 2 + #{desc => "order alt-server 2 start", + cmd => fun(#{alt_server2 := Pid, sock := Sock} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Sock), + ok + end}, + #{desc => "await alt-server 2 ready (init)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, init) + end}, + + + %% *** The actual test *** + #{desc => "order server continue (recv)", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await server ready (recv select)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, recv_select) + end}, + + #{desc => "order alt-server 1 continue (recv)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await alt-server 1 ready (recv select)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, recv_select) + end}, + + #{desc => "order alt-server 2 continue (recv)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Pid, recv), + ok + end}, + #{desc => "await alt-server 2 ready (recv select)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, recv_select) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "close the socket", + cmd => fun(#{sock := Sock} = _State) -> + socket:close(Sock) + end}, + + #{desc => "await server ready (abort)", + cmd => fun(#{server := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, server, abort) + end}, + #{desc => "await alt-server 1 ready (abort)", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server1, abort) + end}, + #{desc => "await alt-server 2 ready (abort)", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, alt_server2, abort) + end}, + + %% Terminations + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + + #{desc => "order alt-server 2 to terminate", + cmd => fun(#{alt_server2 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 2 termination", + cmd => fun(#{alt_server2 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server2, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order alt-server 1 to terminate", + cmd => fun(#{alt_server1 := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await alt-server 1 termination", + cmd => fun(#{alt_server1 := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(alt_server1, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + #{desc => "order server to terminate", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Pid), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Pid} = State) -> + case ?SEV_AWAIT_TERMINATION(Pid) of + ok -> + State1 = maps:remove(server, State), + {ok, State1}; + {error, _} = ERROR -> + ERROR + end + end}, + + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start alt-server 1 evaluator"), + AltServer1 = ?SEV_START("alt_server1", AltServerSeq, InitState), + + i("start alt-server 2 evaluator"), + AltServer2 = ?SEV_START("alt_server2", AltServerSeq, InitState), + + i("start client evaluator"), + Client = ?SEV_START("client", ClientSeq, InitState), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + alt_server1 => AltServer1#ev.pid, + alt_server2 => AltServer2#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator(s)"), + ok = ?SEV_AWAIT_FINISH([Server, AltServer1, AltServer2, Client, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% %% API OPTIONS %% @@ -19206,7 +23089,7 @@ which_ttest_runtime_env(false) -> %% ms: milliseconds %% s: seconds (default) %% m: minutes -which_ttest_runtime_env2([$m, $s | MS]) when (length(MS) > 0) -> +which_ttest_runtime_env2([$s, $m | MS]) when (length(MS) > 0) -> convert_time(MS, fun(X) -> X end); which_ttest_runtime_env2([$m | M]) when (length(M) > 0) -> convert_time(M, fun(X) -> ?MINS(X) end); @@ -19631,12 +23514,23 @@ ttest_tcp(InitState) -> %% Present the results #{desc => "present the results", - cmd => fun(#{result := Result} = State) -> + cmd => fun(#{result := Result, + domain := Domain, + server_mod := ServerTrans, + server_active := ServerActive, + client_mod := ClientTrans, + client_active := ClientActive, + msg_id := MsgID} = State) -> case Result of #{status := ok, runtime := RunTime, cnt := Cnt, bcnt := BCnt} -> + ttest_report(Domain, + ServerTrans, ServerActive, + ClientTrans, ClientActive, + MsgID, + RunTime, BCnt, Cnt), ?SEV_IPRINT( "TTest results: " "~n Run Time: ~s" @@ -19729,7 +23623,9 @@ ttest_tcp_server_start(Node, _Domain, gen, Active) -> socket_test_ttest_tcp_server:start_monitor(Node, Transport, Active); ttest_tcp_server_start(Node, Domain, sock, Active) -> TransportMod = socket_test_ttest_tcp_socket, - Transport = {TransportMod, #{domain => Domain, method => plain}}, + Transport = {TransportMod, #{domain => Domain, + async => true, + method => plain}}, socket_test_ttest_tcp_server:start_monitor(Node, Transport, Active). ttest_tcp_server_stop(Pid) -> @@ -19751,7 +23647,9 @@ ttest_tcp_client_start(Node, Domain, sock, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> TransportMod = socket_test_ttest_tcp_socket, - Transport = {TransportMod, #{domain => Domain, method => plain}}, + Transport = {TransportMod, #{domain => Domain, + async => true, + method => plain}}, socket_test_ttest_tcp_client:start_monitor(Node, Notify, Transport, @@ -19762,6 +23660,183 @@ ttest_tcp_client_start(Node, %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-define(TTEST_MANAGER, esock_ttest_manager). + +-record(ttest_report_id, + {domain :: socket:domain(), + serv_trans :: gen | sock, + serv_active :: once | boolean(), + client_trans :: gen | sock, + client_active :: once | boolean(), + msg_id :: small | medium | large}). + +-record(ttest_report, {id :: #ttest_report_id{}, + time :: non_neg_integer(), + bytes :: non_neg_integer(), + msgs :: non_neg_integer()}). + +-spec ttest_report(Domain :: socket:domain(), + ServTrans :: gen | sock, ServActive :: once | boolean(), + ClientTrans :: gen | sock, ClientActive :: once | boolean(), + MsgID :: 1 | 2 | 3, + RunTime :: non_neg_integer(), + NumBytes :: non_neg_integer(), + NumMsgs :: non_neg_integer()) -> ok. + +ttest_report(Domain, + ServTrans, ServActive, + ClientTrans, ClientActive, + MsgID, + RunTime, + NumBytes, + NumMsgs) -> + ID = #ttest_report_id{domain = Domain, + serv_trans = ServTrans, + serv_active = ServActive, + client_trans = ClientTrans, + client_active = ClientActive, + msg_id = ttest_msg_id_num_to_name(MsgID)}, + Report = #ttest_report{id = ID, + time = RunTime, + bytes = NumBytes, + msgs = NumMsgs}, + %% If we run just one test case, the group init has never been run + %% and therefor the ttest manager is not running (we also don't actually + %% care about collecting reports in that case). + (catch global:send(?TTEST_MANAGER, Report)), + ok. + +ttest_msg_id_num_to_name(1) -> + small; +ttest_msg_id_num_to_name(2) -> + medium; +ttest_msg_id_num_to_name(3) -> + large. + +ttest_manager_start() -> + Self = self(), + {Pid, MRef} = spawn_monitor(fun() -> ttest_manager_init(Self) end), + receive + {ttest_manager_started, Pid} -> + erlang:demonitor(MRef, [flush]), + ok; + {'DOWN', MRef, process, Pid, Reason} -> + exit({failed_starting, ttest_manager, Reason}) + after 5000 -> + exit(Pid, kill), + exit({failed_starting, ttest_manager, timeout}) + end. + +ttest_manager_stop() -> + case global:whereis_name(?TTEST_MANAGER) of + Pid when is_pid(Pid) -> + erlang:monitor(process, Pid), + global:send(?TTEST_MANAGER, stop), + receive + {'DOWN', _MRef, process, Pid, _} -> + ok + after 10000 -> + exit(Pid, kill), + ok + end; + _ -> + ok + end. + +ttest_manager_init(Parent) -> + yes = global:register_name(?TTEST_MANAGER, self()), + ets:new(?TTEST_MANAGER, + [{keypos, #ttest_report.id}, named_table, protected, ordered_set]), + Parent ! {ttest_manager_started, self()}, + ttest_manager_loop(). + +ttest_manager_loop() -> + receive + stop -> + ?LOGGER:format("manager stopping~n", []), + ttest_manager_done(); + + #ttest_report{id = _ID, + time = _RunTime, + bytes = _NumBytes, + msgs = _NumMsgs} = Report -> + true = ets:insert_new(?TTEST_MANAGER, Report), + ttest_manager_loop() + end. + +%% We are supposed to pretty print the result here... +ttest_manager_done() -> + format_reports(inet), + %% format_reports(inet6), + ets:delete(?TTEST_MANAGER), + exit(normal). + +format_reports(Domain) -> + ?LOGGER:format("Domain ~w reports:~n~n", [Domain]), + format_reports(Domain, small), + format_reports(Domain, medium), + format_reports(Domain, large). + +format_reports(Domain, MsgID) when is_atom(MsgID) -> + case which_ttest_reports(Domain, MsgID) of + [] -> + ?LOGGER:format(" No ~w reports~n~n", [MsgID]); + Reports -> + ?LOGGER:format(" ~w reports: ~n", [MsgID]), + lists:foreach(fun(R) -> format_report(R) end, Reports) + end. + +%% This should really be a table like this: +%% +%% client +%% server gen(false) gen(once) gen(true) sock(false) sock(once) sock(true) +%% gen(false) nnn +%% gen(once) nnn +%% gen(true) nnn +%% sock(false) nnn +%% sock(once) nnn +%% sock(true) nnn +%% +format_report(#ttest_report{id = #ttest_report_id{serv_trans = STrans, + serv_active = SActive, + client_trans = CTrans, + client_active = CActive}, + time = RunTime, + bytes = BCnt, + msgs = MCnt}) -> + ?LOGGER:format(" server ~w[~w] - client ~w[~w] => " + "~n Run Time: ~s" + "~n Bytes: ~s" + "~n Messages: ~s" + "~n", [STrans, SActive, CTrans, CActive, + ?TTEST_LIB:format_time(RunTime), + if ((BCnt =:= 0) orelse (RunTime =:= 0)) -> + ?TTEST_LIB:format("~w, ~w", + [BCnt, RunTime]); + true -> + ?TTEST_LIB:format("~p => ~p byte / ms", + [BCnt, BCnt div RunTime]) + end, + if (RunTime =:= 0) -> + "-"; + true -> + ?TTEST_LIB:format("~p => ~p iterations / ms", + [MCnt, MCnt div RunTime]) + end]), + ok. + + +which_ttest_reports(Domain, all) -> + [R || R = #ttest_report{id = #ttest_report_id{domain = D}} <- + ets:tab2list(?TTEST_MANAGER), Domain =:= D]; +which_ttest_reports(Domain, MsgID) -> + [R || R = #ttest_report{id = #ttest_report_id{domain = D, msg_id = MID}} <- + ets:tab2list(?TTEST_MANAGER), (Domain =:= D) andalso (MsgID =:= MID)]. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% This mechanism has only one purpose: So that we are able to kill %% the node-starter process if it takes to long. The node-starter %% runs on the local node. diff --git a/erts/emulator/test/socket_test_logger.erl b/erts/emulator/test/socket_test_logger.erl index 26610e9ef3..f5d4c8c7b2 100644 --- a/erts/emulator/test/socket_test_logger.erl +++ b/erts/emulator/test/socket_test_logger.erl @@ -43,7 +43,7 @@ start(Quiet) -> ok; undefined -> Self = self(), - Pid = spawn_link(fun() -> init(Self, Quiet) end), + Pid = spawn(fun() -> init(Self, Quiet) end), yes = global:register_name(?LOGGER, Pid), ok end. diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl index ccace2a560..ca7eff4437 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl @@ -21,89 +21,95 @@ -module(socket_test_ttest_tcp_client_socket). -export([ - start/3, start/4, start/6, start/7, + start/4, start/5, start/7, start/8, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). --define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, method => M}}). +-define(MOD(D, A, M), {?TRANSPORT_MOD, #{domain => D, + async => A, + method => M}}). -start(Method, ServerInfo, Active) +start(Method, Async, Active, ServerInfo) when is_list(ServerInfo) -> Domain = local, - socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method), - ServerInfo, Active); -start(Method, ServerInfo = {Addr, _}, Active) + socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method), + Active, ServerInfo); +start(Method, Async, Active, ServerInfo = {Addr, _}) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> Domain = inet, - socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method), - ServerInfo, Active); -start(Method, ServerInfo = {Addr, _}, Active) + socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method), + Active, ServerInfo); +start(Method, Async, Active, ServerInfo = {Addr, _}) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, - socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method), - ServerInfo, Active). + socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method), + Active, ServerInfo). -start(Method, ServerInfo, Active, MsgID) +start(Method, Async, Active, ServerInfo, MsgID) when is_list(ServerInfo) -> %% This is just a simplification Domain = local, - socket_test_ttest_tcp_client:start(?MOD(Domain, Method), - ServerInfo, Active, MsgID); -start(Method, ServerInfo = {Addr, _}, Active, MsgID) + socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID); +start(Method, Async, Active, ServerInfo = {Addr, _}, MsgID) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> %% This is just a simplification Domain = inet, - socket_test_ttest_tcp_client:start(?MOD(Domain, Method), - ServerInfo, Active, MsgID); -start(Method, ServerInfo = {Addr, _}, Active, MsgID) + socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID); +start(Method, Async, Active, ServerInfo = {Addr, _}, MsgID) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, - socket_test_ttest_tcp_client:start(?MOD(Domain, Method), - ServerInfo, Active, MsgID). + socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID). -start(Method, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) +start(Method, Async, Active, ServerInfo, MsgID, MaxOutstanding, RunTime) when is_list(ServerInfo) -> Domain = local, socket_test_ttest_tcp_client:start(false, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Method, Async, Active, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> Domain = inet, socket_test_ttest_tcp_client:start(false, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Method, Async, Active, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, socket_test_ttest_tcp_client:start(false, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime). -start(Quiet, Method, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) +start(Quiet, Async, Active, Method, ServerInfo, MsgID, MaxOutstanding, RunTime) when is_list(ServerInfo) -> Domain = local, socket_test_ttest_tcp_client:start(Quiet, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Quiet, Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Quiet, Async, Active, Method, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> Domain = inet, socket_test_ttest_tcp_client:start(Quiet, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Quiet, Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Quiet, Async, Active, Method, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, socket_test_ttest_tcp_client:start(Quiet, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime). stop(Pid) -> diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl index e916fcb93e..27b561d4b7 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl @@ -90,7 +90,9 @@ start_monitor(_, Transport, Active) -> start(Transport, Active) -> do_start(self(), Transport, Active). - +%% Note that the Async option is actually only "used" for the +%% socket transport module (it details how to implement the +%% active feature). do_start(Parent, Transport, Active) when is_pid(Parent) andalso (is_atom(Transport) orelse is_tuple(Transport)) andalso @@ -179,42 +181,59 @@ process_transport({Mod, Opts}, _Active) -> server_loop(State) -> - server_loop( server_handle_message( server_accept(State) ) ). + server_loop( server_handle_message( server_accept(State, ?ACC_TIMEOUT), 0) ). -server_accept(#{mod := Mod, - active := Active, - lsock := LSock, - handlers := Handlers} = State) -> - case Mod:accept(LSock, ?ACC_TIMEOUT) of +server_accept(#{mod := Mod, lsock := LSock} = State, Timeout) -> + case Mod:accept(LSock, Timeout) of {ok, Sock} -> - ?I("accepted connection from ~s", - [case Mod:peername(Sock) of - {ok, Peer} -> - format_peername(Peer); - {error, _} -> - "-" - end]), - {Pid, _} = handler_start(), - ?I("handler ~p started -> try transfer socket control", [Pid]), - case Mod:controlling_process(Sock, Pid) of - ok -> - maybe_start_stats_timer(State, Pid), - ?I("server-accept: handler ~p started", [Pid]), - handler_continue(Pid, Mod, Sock, Active), - Handlers2 = [Pid | Handlers], - State#{handlers => Handlers2}; - {error, CPReason} -> - (catch Mod:close(Sock)), - (catch Mod:close(LSock)), - exit({controlling_process, CPReason}) - end; - {error, timeout} -> + server_handle_accepted(State, Sock); + {error, timeout} when (Timeout =/= nowait) -> State; {error, AReason} -> (catch Mod:close(LSock)), exit({accept, AReason}) end. +%% server_accept(#{mod := Mod, +%% lsock := LSock} = State) -> +%% case Mod:accept(LSock, ?ACC_TIMEOUT) of +%% {ok, Sock} -> +%% server_handle_accepted(State, Sock); +%% {error, timeout} -> +%% State; +%% {error, AReason} -> +%% (catch Mod:close(LSock)), +%% exit({accept, AReason}) +%% end. + +server_handle_accepted(#{mod := Mod, + lsock := LSock, + active := Active, + handlers := Handlers} = State, + Sock) -> + ?I("accepted connection from ~s", + [case Mod:peername(Sock) of + {ok, Peer} -> + format_peername(Peer); + {error, _} -> + "-" + end]), + {Pid, _} = handler_start(), + ?I("handler ~p started -> try transfer socket control", [Pid]), + case Mod:controlling_process(Sock, Pid) of + ok -> + maybe_start_stats_timer(State, Pid), + ?I("server-accept: handler ~p started", [Pid]), + handler_continue(Pid, Mod, Sock, Active), + Handlers2 = [Pid | Handlers], + State#{handlers => Handlers2}; + {error, CPReason} -> + (catch Mod:close(Sock)), + (catch Mod:close(LSock)), + exit({controlling_process, CPReason}) + end. + + format_peername({Addr, Port}) -> case inet:gethostbyaddr(Addr) of {ok, #hostent{h_name = N}} -> @@ -237,7 +256,7 @@ start_stats_timer(Time, ProcStr, Pid) -> server_handle_message(#{mod := Mod, lsock := LSock, parent := Parent, - handlers := H} = State) -> + handlers := H} = State, Timeout) -> receive {timeout, _TRef, {stats, Interval, ProcStr, Pid}} -> case server_handle_stats(ProcStr, Pid) of @@ -247,7 +266,7 @@ server_handle_message(#{mod := Mod, ok end, State; - + {?MODULE, Ref, Parent, stop} -> reply(Parent, Ref, ok), lists:foreach(fun(P) -> handler_stop(P) end, H), @@ -257,7 +276,7 @@ server_handle_message(#{mod := Mod, {'DOWN', _MRef, process, Pid, Reason} -> server_handle_down(Pid, Reason, State) - after 0 -> + after Timeout -> State end. @@ -342,15 +361,15 @@ handler_init(Parent) -> ?I("received continue"), reply(Parent, Ref, ok), handler_initial_activation(Mod, Sock, Active), - handler_loop(#{parent => Parent, - mod => Mod, - sock => Sock, - active => Active, - start => ?T(), - mcnt => 0, - bcnt => 0, - last_reply => none, - acc => <<>>}) + handler_loop(#{parent => Parent, + mod => Mod, + sock => Sock, + active => Active, + start => ?T(), + mcnt => 0, + bcnt => 0, + last_reply => none, + acc => <<>>}) after 5000 -> ?I("timeout when message queue: " diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl index d1de230637..4045bf4e4e 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl @@ -21,18 +21,26 @@ -module(socket_test_ttest_tcp_server_socket). -export([ - start/3, + start/4, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). -%% -define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, +%% -define(MOD(M), {?TRANSPORT_MOD, #{async => false, %% method => M, %% stats_interval => 10000}}). --define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, method => M}}). +-define(MOD(D,M,A), {?TRANSPORT_MOD, #{domain => D, + async => A, + method => M}}). -start(Method, Domain, Active) -> - socket_test_ttest_tcp_server:start(?MOD(Domain, Method), Active). +start(Method, Domain, Async, Active) -> + socket_test_ttest_tcp_server:start(?MOD(Domain, Method, Async), Active). + %% {ok, {Pid, AddrPort}} -> + %% MRef = erlang:monitor(process, Pid), + %% {ok, {Pid, MRef, AddrPort}}; + %% {error, _} = ERROR -> + %% ERROR + %% end. stop(Pid) -> socket_test_ttest_tcp_server:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index cf68bfe591..3aa3b2c504 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -57,7 +57,7 @@ %% ========================================================================== -%% This does not really work. Its just a placeholder for the time beeing... +%% This does not really work. Its just a placeholder for the time being... %% getopt(Sock, Opt) when is_atom(Opt) -> %% socket:getopt(Sock, socket, Opt). @@ -68,22 +68,32 @@ %% ========================================================================== -accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> +%% The way we use server async its no point in doing a async accept call +%% (we do never actually run the test with more than one client). +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}) -> case socket:accept(LSock) of - {ok, Sock} -> + {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> +%% If a timeout has been explictly specified, then we do not use +%% async here. We will pass it on to the reader process. +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> @@ -153,7 +163,8 @@ connect(Addr, Port, #{domain := Domain} = Opts) -> do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}). do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, - proto := Proto, + proto := Proto, + async := Async, method := Method} = Opts) -> try begin @@ -181,7 +192,9 @@ do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, throw({error, {connect, CReason}}) end, Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}} end @@ -219,9 +232,9 @@ listen() -> listen(0). listen(Port) when is_integer(Port) -> - listen(Port, #{domain => inet, method => plain}); + listen(Port, #{domain => inet, async => false, method => plain}); listen(Path) when is_list(Path) -> - listen(Path, #{domain => local, method => plain}). + listen(Path, #{domain => local, async => false, method => plain}). listen(0, #{domain := local} = Opts) -> listen(mk_unique_path(), Opts); @@ -241,8 +254,10 @@ listen(Port, #{domain := Domain} = Opts) do_listen(SA, Cleanup, - #{domain := Domain, proto := Proto, method := Method} = Opts) - when (Method =:= plain) orelse (Method =:= msg) -> + #{domain := Domain, proto := Proto, + async := Async, method := Method} = Opts) + when (Method =:= plain) orelse (Method =:= msg) andalso + is_boolean(Async) -> try begin Sock = case socket:open(Domain, stream, Proto) of @@ -339,13 +354,18 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active, Method) +reader_init(ControllingProcess, Sock, Async, Active, Method) when is_pid(ControllingProcess) andalso + is_boolean(Async) andalso (is_boolean(Active) orelse (Active =:= once)) andalso ((Method =:= plain) orelse (Method =:= msg)) -> + put(verbose, false), MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, + async => Async, + select_info => undefined, + select_num => 0, % Count the number of select messages active => Active, sock => Sock, method => Method}). @@ -356,11 +376,11 @@ reader_loop(#{active := false, ctrl_proc := Pid} = State) -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, @@ -371,10 +391,8 @@ reader_loop(#{active := false, {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); MRef -> - exit({controlling_process, Reason}); + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -382,7 +400,8 @@ reader_loop(#{active := false, %% Read *once* and then change to false reader_loop(#{active := once, - sock := Sock, + async := false, + sock := Sock, method := Method, ctrl_proc := Pid} = State) -> case do_recv(Method, Sock) of @@ -392,25 +411,23 @@ reader_loop(#{active := once, {error, timeout} -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), - MRef = erlang:monitor(process, NewPid), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, - ctrl_proc_mref => MRef}); + ctrl_proc_mref => NewMRef}); {?MODULE, active, NewActive} -> reader_loop(State#{active => NewActive}); {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); - MRef -> - exit({controlling_process, Reason}); + MRef -> + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -418,17 +435,86 @@ reader_loop(#{active := once, reader_loop(State) end; - {error, closed} -> + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{active := once, + async := true, + select_info := undefined, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock, nowait) of + {select, SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false}); + + {error, closed} = E1 -> Pid ! ?CLOSED_MSG(Sock, Method), - exit(normal); + reader_exit(State, E1); - {error, Reason} -> + {error, Reason} = E2 -> Pid ! ?ERROR_MSG(Sock, Method, Reason), - exit(Reason) + reader_exit(State, E2) + end; +reader_loop(#{active := once, + async := true, + select_info := {select_info, _, Ref}, + select_num := N, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false, + select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end end; %% Read and forward data continuously reader_loop(#{active := true, + async := false, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -439,25 +525,23 @@ reader_loop(#{active := true, {error, timeout} -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), - MRef = erlang:monitor(process, NewPid), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, - ctrl_proc_mref => MRef}); + ctrl_proc_mref => NewMRef}); {?MODULE, active, NewActive} -> reader_loop(State#{active => NewActive}); {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); MRef -> - exit({controlling_process, Reason}); + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -465,27 +549,170 @@ reader_loop(#{active := true, reader_loop(State) end; - {error, closed} -> + {error, closed} = E1 -> Pid ! ?CLOSED_MSG(Sock, Method), - exit(normal); + reader_exit(State, E1); - {error, Reason} -> + {error, Reason} = E2 -> Pid ! ?ERROR_MSG(Sock, Method, Reason), - exit(Reason) + reader_exit(State, E2) + end; +reader_loop(#{active := true, + async := true, + select_info := undefined, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock) of + {select, SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{active := true, + async := true, + select_info := {select_info, _, Ref}, + select_num := N, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end end. -do_recv(plain, Sock) -> - socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); -do_recv(msg, Sock) -> - case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of +do_recv(Method, Sock) -> + do_recv(Method, Sock, ?READER_RECV_TIMEOUT). + +do_recv(plain, Sock, Timeout) -> + socket:recv(Sock, 0, Timeout); +do_recv(msg, Sock, Timeout) -> + case socket:recvmsg(Sock, 0, 0, [], Timeout) of {ok, #{iov := [Bin]}} -> {ok, Bin}; + {select, _} = SELECT -> + SELECT; {error, _} = ERROR -> ERROR end. - - + + +reader_exit(#{async := false, active := Active}, stop) -> + vp("reader stopped when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, stop) -> + vp("reader stopped when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w", [Active]), + exit({controlling_process, Reason}); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit({controlling_process, Reason}); +reader_exit(#{async := false, active := Active}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w", [Active]), + exit(Reason); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w: " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(Reason). + + + + + %% ========================================================================== +vp(F, A) -> + vp(get(verbose), F, A). + +vp(true, F, A) -> + p(F, A); +vp(_, _, _) -> + ok. + +p(F, A) -> + io:format(F ++ "~n", A). + |