From 08f4df99bbaafbba86a216734fa603bd2996f2a3 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Wed, 17 Apr 2019 16:53:08 +0200 Subject: [socket|test] Add async to ttest Make it possible for the tttest server to run with async. --- erts/emulator/test/esock_ttest/esock-ttest | 22 +- .../test/esock_ttest/esock-ttest-server-sock | 24 +- erts/emulator/test/socket_SUITE.erl | 46 ++- .../test/socket_test_ttest_tcp_client_socket.erl | 82 ++--- .../emulator/test/socket_test_ttest_tcp_server.erl | 103 ++++--- .../test/socket_test_ttest_tcp_server_socket.erl | 18 +- .../emulator/test/socket_test_ttest_tcp_socket.erl | 331 +++++++++++++++++---- 7 files changed, 458 insertions(+), 168 deletions(-) (limited to 'erts/emulator/test') diff --git a/erts/emulator/test/esock_ttest/esock-ttest b/erts/emulator/test/esock_ttest/esock-ttest index cf1d9cd9ab..9adc51fc8b 100755 --- a/erts/emulator/test/esock_ttest/esock-ttest +++ b/erts/emulator/test/esock_ttest/esock-ttest @@ -60,6 +60,9 @@ usage() -> "~n Which domain to use." "~n Only valid for server." "~n Defaults to: inet" + "~n --async Asynchronous mode (Timeout = nowait)" + "~n This option is only valid for transport = sock." + "~n Also, its only used when active =/= false." "~n --active boolean() | once." "~n Valid for both client and server." "~n Defaults to: false" @@ -111,6 +114,7 @@ process_args(Args) -> process_server_args(Args) -> Defaults = #{role => server, domain => inet, + async => false, active => false, transport => {sock, plain}}, process_server_args(Args, Defaults). @@ -124,6 +128,9 @@ process_server_args(["--domain", Domain|Args], State) (Domain =:= "inet6")) -> process_server_args(Args, State#{domain => list_to_atom(Domain)}); +process_server_args(["--async"|Args], State) -> + process_server_args(Args, State#{async => true}); + process_server_args(["--active", Active|Args], State) when ((Active =:= "false") orelse (Active =:= "once") orelse @@ -145,6 +152,7 @@ process_server_args([Arg|_], _State) -> process_client_args(Args) -> Defaults = #{role => client, + async => false, active => false, transport => {sock, plain}, %% Will cause error if not provided @@ -159,10 +167,13 @@ process_client_args(Args) -> process_client_args([], State) -> process_client_args_ensure_max_outstanding(State); +process_client_args(["--async"|Args], State) -> + process_client_args(Args, State#{async => true}); + process_client_args(["--active", Active|Args], State) - when ((Active =:= "false") orelse - (Active =:= "once") orelse - (Active =:= "true")) -> + when (Active =:= "false") orelse + (Active =:= "once") orelse + (Active =:= "true") -> process_client_args(Args, State#{active => list_to_atom(Active)}); process_client_args(["--transport", "gen" | Args], State) -> @@ -280,9 +291,10 @@ exec(#{role := server, end; exec(#{role := server, domain := Domain, + async := Async, active := Active, transport := {sock, Method}}) -> - case socket_test_ttest_tcp_server_socket:start(Method, Domain, Active) of + case socket_test_ttest_tcp_server_socket:start(Method, Domain, Async, Active) of {ok, {Pid, _}} -> MRef = erlang:monitor(process, Pid), receive @@ -323,12 +335,14 @@ exec(#{role := client, end; exec(#{role := client, server := ServerInfo, + async := Async, active := Active, transport := {sock, Method}, msg_id := MsgID, max_outstanding := MaxOutstanding, runtime := RunTime}) -> case socket_test_ttest_tcp_client_socket:start(true, + Async, Method, ServerInfo, Active, diff --git a/erts/emulator/test/esock_ttest/esock-ttest-server-sock b/erts/emulator/test/esock_ttest/esock-ttest-server-sock index 4ec0d335d9..4ea8ce0185 100755 --- a/erts/emulator/test/esock_ttest/esock-ttest-server-sock +++ b/erts/emulator/test/esock_ttest/esock-ttest-server-sock @@ -24,9 +24,27 @@ EMU=$ERL_TOP/erts/emulator EMU_TEST=$EMU/test ESOCK_TTEST=$EMU_TEST/esock_ttest -if [ $# = 1 ]; then - ACTIVE="--active $1" +# $1 - async - boolean() +# $2 - active - once | boolean() +if [ $# = 2 ]; then + + async=$1 + active=$2 + + if [ $async = true ]; then + ASYNC="--async" + else + ASYNC= + fi + + ACTIVE="--active $async" + +else + echo " Missing args: async and active" + echo "" + exit 1 fi -$ESOCK_TTEST/esock-ttest --server --transport sock $ACTIVE + +$ESOCK_TTEST/esock-ttest --server $ASYNC --transport sock $ACTIVE diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index f5757d5ce6..d4478bd9e3 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -2729,8 +2729,7 @@ api_a_send_and_recv_udp(InitState) -> %% *** Init part *** #{desc => "which local address", cmd => fun(#{domain := Domain} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, addr => LAddr}, + LSA = which_local_socket_addr(Domain), {ok, State#{local_sa => LSA}} end}, #{desc => "create socket", @@ -2864,8 +2863,7 @@ api_a_send_and_recv_udp(InitState) -> %% *** Init part *** #{desc => "local address", cmd => fun(#{domain := Domain} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, addr => LAddr}, + LSA = which_local_socket_addr(Domain), {ok, State#{lsa => LSA}} end}, #{desc => "open socket", @@ -2876,8 +2874,12 @@ api_a_send_and_recv_udp(InitState) -> end}, #{desc => "bind socket (to local address)", cmd => fun(#{sock := Sock, lsa := LSA}) -> - sock_bind(Sock, LSA), - ok + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end end}, #{desc => "announce ready (init)", cmd => fun(#{tester := Tester}) -> @@ -3209,8 +3211,7 @@ api_a_send_and_recv_tcp(InitState) -> %% *** Init part *** #{desc => "which local address", cmd => fun(#{domain := Domain} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, addr => LAddr}, + LSA = which_local_socket_addr(Domain), {ok, State#{lsa => LSA}} end}, #{desc => "create listen socket", @@ -3401,10 +3402,8 @@ api_a_send_and_recv_tcp(InitState) -> %% *** The init part *** #{desc => "which server (local) address", cmd => fun(#{domain := Domain, server_port := Port} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, - addr => LAddr}, - SSA = LSA#{port => Port}, + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, {ok, State#{local_sa => LSA, server_sa => SSA}} end}, #{desc => "create socket", @@ -3764,8 +3763,7 @@ api_a_recv_cancel_udp(InitState) -> %% *** Init part *** #{desc => "which local address", cmd => fun(#{domain := Domain} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, addr => LAddr}, + LSA = which_local_socket_addr(Domain), {ok, State#{local_sa => LSA}} end}, #{desc => "create socket", @@ -3999,8 +3997,7 @@ api_a_accept_cancel_tcp(InitState) -> %% *** Init part *** #{desc => "which local address", cmd => fun(#{domain := Domain} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, addr => LAddr}, + LSA = which_local_socket_addr(Domain), {ok, State#{lsa => LSA}} end}, #{desc => "create listen socket", @@ -4245,8 +4242,7 @@ api_a_recv_cancel_tcp(InitState) -> %% *** Init part *** #{desc => "which local address", cmd => fun(#{domain := Domain} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, addr => LAddr}, + LSA = which_local_socket_addr(Domain), {ok, State#{lsa => LSA}} end}, #{desc => "create listen socket", @@ -4389,10 +4385,8 @@ api_a_recv_cancel_tcp(InitState) -> %% *** The init part *** #{desc => "which server (local) address", cmd => fun(#{domain := Domain, server_port := Port} = State) -> - LAddr = which_local_addr(Domain), - LSA = #{family => Domain, - addr => LAddr}, - SSA = LSA#{port => Port}, + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, {ok, State#{local_sa => LSA, server_sa => SSA}} end}, #{desc => "create socket", @@ -21692,7 +21686,9 @@ ttest_tcp_server_start(Node, _Domain, gen, Active) -> socket_test_ttest_tcp_server:start_monitor(Node, Transport, Active); ttest_tcp_server_start(Node, Domain, sock, Active) -> TransportMod = socket_test_ttest_tcp_socket, - Transport = {TransportMod, #{domain => Domain, method => plain}}, + Transport = {TransportMod, #{domain => Domain, + async => true, + method => plain}}, socket_test_ttest_tcp_server:start_monitor(Node, Transport, Active). ttest_tcp_server_stop(Pid) -> @@ -21714,7 +21710,9 @@ ttest_tcp_client_start(Node, Domain, sock, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> TransportMod = socket_test_ttest_tcp_socket, - Transport = {TransportMod, #{domain => Domain, method => plain}}, + Transport = {TransportMod, #{domain => Domain, + async => true, + method => plain}}, socket_test_ttest_tcp_client:start_monitor(Node, Notify, Transport, diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl index ccace2a560..ca7eff4437 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl @@ -21,89 +21,95 @@ -module(socket_test_ttest_tcp_client_socket). -export([ - start/3, start/4, start/6, start/7, + start/4, start/5, start/7, start/8, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). --define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, method => M}}). +-define(MOD(D, A, M), {?TRANSPORT_MOD, #{domain => D, + async => A, + method => M}}). -start(Method, ServerInfo, Active) +start(Method, Async, Active, ServerInfo) when is_list(ServerInfo) -> Domain = local, - socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method), - ServerInfo, Active); -start(Method, ServerInfo = {Addr, _}, Active) + socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method), + Active, ServerInfo); +start(Method, Async, Active, ServerInfo = {Addr, _}) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> Domain = inet, - socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method), - ServerInfo, Active); -start(Method, ServerInfo = {Addr, _}, Active) + socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method), + Active, ServerInfo); +start(Method, Async, Active, ServerInfo = {Addr, _}) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, - socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method), - ServerInfo, Active). + socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method), + Active, ServerInfo). -start(Method, ServerInfo, Active, MsgID) +start(Method, Async, Active, ServerInfo, MsgID) when is_list(ServerInfo) -> %% This is just a simplification Domain = local, - socket_test_ttest_tcp_client:start(?MOD(Domain, Method), - ServerInfo, Active, MsgID); -start(Method, ServerInfo = {Addr, _}, Active, MsgID) + socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID); +start(Method, Async, Active, ServerInfo = {Addr, _}, MsgID) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> %% This is just a simplification Domain = inet, - socket_test_ttest_tcp_client:start(?MOD(Domain, Method), - ServerInfo, Active, MsgID); -start(Method, ServerInfo = {Addr, _}, Active, MsgID) + socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID); +start(Method, Async, Active, ServerInfo = {Addr, _}, MsgID) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, - socket_test_ttest_tcp_client:start(?MOD(Domain, Method), - ServerInfo, Active, MsgID). + socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID). -start(Method, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) +start(Method, Async, Active, ServerInfo, MsgID, MaxOutstanding, RunTime) when is_list(ServerInfo) -> Domain = local, socket_test_ttest_tcp_client:start(false, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Method, Async, Active, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> Domain = inet, socket_test_ttest_tcp_client:start(false, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Method, Async, Active, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, socket_test_ttest_tcp_client:start(false, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime). -start(Quiet, Method, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) +start(Quiet, Async, Active, Method, ServerInfo, MsgID, MaxOutstanding, RunTime) when is_list(ServerInfo) -> Domain = local, socket_test_ttest_tcp_client:start(Quiet, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Quiet, Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Quiet, Async, Active, Method, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> Domain = inet, socket_test_ttest_tcp_client:start(Quiet, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime); -start(Quiet, Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime) +start(Quiet, Async, Active, Method, ServerInfo = {Addr, _}, + MsgID, MaxOutstanding, RunTime) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> Domain = inet6, socket_test_ttest_tcp_client:start(Quiet, - ?MOD(Domain, Method), - ServerInfo, Active, + ?MOD(Domain, Async, Method), + Active, ServerInfo, MsgID, MaxOutstanding, RunTime). stop(Pid) -> diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl index e916fcb93e..f0a2ae73ec 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl @@ -90,7 +90,9 @@ start_monitor(_, Transport, Active) -> start(Transport, Active) -> do_start(self(), Transport, Active). - +%% Note that the Async option is actually only "used" for the +%% socket transport module (it details how to implement the +%% active feature). do_start(Parent, Transport, Active) when is_pid(Parent) andalso (is_atom(Transport) orelse is_tuple(Transport)) andalso @@ -167,7 +169,7 @@ server_init(Starter, Parent, Transport, Active) -> end. process_transport(Mod, _) when is_atom(Mod) -> - {Mod, fun(Port) -> Mod:listen(Port) end, infinity}; + {Mod, false, fun(Port) -> Mod:listen(Port) end, infinity}; process_transport({Mod, #{stats_interval := T} = Opts}, Active) when (Active =/= false) -> {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T}; @@ -179,42 +181,59 @@ process_transport({Mod, Opts}, _Active) -> server_loop(State) -> - server_loop( server_handle_message( server_accept(State) ) ). + server_loop( server_handle_message( server_accept(State, ?ACC_TIMEOUT), 0) ). -server_accept(#{mod := Mod, - active := Active, - lsock := LSock, - handlers := Handlers} = State) -> - case Mod:accept(LSock, ?ACC_TIMEOUT) of +server_accept(#{mod := Mod, lsock := LSock} = State, Timeout) -> + case Mod:accept(LSock, Timeout) of {ok, Sock} -> - ?I("accepted connection from ~s", - [case Mod:peername(Sock) of - {ok, Peer} -> - format_peername(Peer); - {error, _} -> - "-" - end]), - {Pid, _} = handler_start(), - ?I("handler ~p started -> try transfer socket control", [Pid]), - case Mod:controlling_process(Sock, Pid) of - ok -> - maybe_start_stats_timer(State, Pid), - ?I("server-accept: handler ~p started", [Pid]), - handler_continue(Pid, Mod, Sock, Active), - Handlers2 = [Pid | Handlers], - State#{handlers => Handlers2}; - {error, CPReason} -> - (catch Mod:close(Sock)), - (catch Mod:close(LSock)), - exit({controlling_process, CPReason}) - end; - {error, timeout} -> + server_handle_accepted(State, Sock); + {error, timeout} when (Timeout =/= nowait) -> State; {error, AReason} -> (catch Mod:close(LSock)), exit({accept, AReason}) end. +%% server_accept(#{mod := Mod, +%% lsock := LSock} = State) -> +%% case Mod:accept(LSock, ?ACC_TIMEOUT) of +%% {ok, Sock} -> +%% server_handle_accepted(State, Sock); +%% {error, timeout} -> +%% State; +%% {error, AReason} -> +%% (catch Mod:close(LSock)), +%% exit({accept, AReason}) +%% end. + +server_handle_accepted(#{mod := Mod, + lsock := LSock, + active := Active, + handlers := Handlers} = State, + Sock) -> + ?I("accepted connection from ~s", + [case Mod:peername(Sock) of + {ok, Peer} -> + format_peername(Peer); + {error, _} -> + "-" + end]), + {Pid, _} = handler_start(), + ?I("handler ~p started -> try transfer socket control", [Pid]), + case Mod:controlling_process(Sock, Pid) of + ok -> + maybe_start_stats_timer(State, Pid), + ?I("server-accept: handler ~p started", [Pid]), + handler_continue(Pid, Mod, Sock, Active), + Handlers2 = [Pid | Handlers], + State#{handlers => Handlers2}; + {error, CPReason} -> + (catch Mod:close(Sock)), + (catch Mod:close(LSock)), + exit({controlling_process, CPReason}) + end. + + format_peername({Addr, Port}) -> case inet:gethostbyaddr(Addr) of {ok, #hostent{h_name = N}} -> @@ -237,7 +256,7 @@ start_stats_timer(Time, ProcStr, Pid) -> server_handle_message(#{mod := Mod, lsock := LSock, parent := Parent, - handlers := H} = State) -> + handlers := H} = State, Timeout) -> receive {timeout, _TRef, {stats, Interval, ProcStr, Pid}} -> case server_handle_stats(ProcStr, Pid) of @@ -247,7 +266,7 @@ server_handle_message(#{mod := Mod, ok end, State; - + {?MODULE, Ref, Parent, stop} -> reply(Parent, Ref, ok), lists:foreach(fun(P) -> handler_stop(P) end, H), @@ -257,7 +276,7 @@ server_handle_message(#{mod := Mod, {'DOWN', _MRef, process, Pid, Reason} -> server_handle_down(Pid, Reason, State) - after 0 -> + after Timeout -> State end. @@ -342,15 +361,15 @@ handler_init(Parent) -> ?I("received continue"), reply(Parent, Ref, ok), handler_initial_activation(Mod, Sock, Active), - handler_loop(#{parent => Parent, - mod => Mod, - sock => Sock, - active => Active, - start => ?T(), - mcnt => 0, - bcnt => 0, - last_reply => none, - acc => <<>>}) + handler_loop(#{parent => Parent, + mod => Mod, + sock => Sock, + active => Active, + start => ?T(), + mcnt => 0, + bcnt => 0, + last_reply => none, + acc => <<>>}) after 5000 -> ?I("timeout when message queue: " diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl index d1de230637..4045bf4e4e 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl @@ -21,18 +21,26 @@ -module(socket_test_ttest_tcp_server_socket). -export([ - start/3, + start/4, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). -%% -define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, +%% -define(MOD(M), {?TRANSPORT_MOD, #{async => false, %% method => M, %% stats_interval => 10000}}). --define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, method => M}}). +-define(MOD(D,M,A), {?TRANSPORT_MOD, #{domain => D, + async => A, + method => M}}). -start(Method, Domain, Active) -> - socket_test_ttest_tcp_server:start(?MOD(Domain, Method), Active). +start(Method, Domain, Async, Active) -> + socket_test_ttest_tcp_server:start(?MOD(Domain, Method, Async), Active). + %% {ok, {Pid, AddrPort}} -> + %% MRef = erlang:monitor(process, Pid), + %% {ok, {Pid, MRef, AddrPort}}; + %% {error, _} = ERROR -> + %% ERROR + %% end. stop(Pid) -> socket_test_ttest_tcp_server:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index cf68bfe591..65fbba44c6 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -57,7 +57,7 @@ %% ========================================================================== -%% This does not really work. Its just a placeholder for the time beeing... +%% This does not really work. Its just a placeholder for the time being... %% getopt(Sock, Opt) when is_atom(Opt) -> %% socket:getopt(Sock, socket, Opt). @@ -68,22 +68,32 @@ %% ========================================================================== -accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> +%% The way we use server async its no point in doing a async accept call +%% (we do never actually run the test with more than one client). +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}) -> case socket:accept(LSock) of - {ok, Sock} -> + {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> +%% If a timeout has been explictly specified, then we do not use +%% async here. We will pass it on to the reader process. +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> @@ -153,7 +163,8 @@ connect(Addr, Port, #{domain := Domain} = Opts) -> do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}). do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, - proto := Proto, + proto := Proto, + async := Async, method := Method} = Opts) -> try begin @@ -181,7 +192,9 @@ do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, throw({error, {connect, CReason}}) end, Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}} end @@ -219,9 +232,9 @@ listen() -> listen(0). listen(Port) when is_integer(Port) -> - listen(Port, #{domain => inet, method => plain}); + listen(Port, #{domain => inet, async => false, method => plain}); listen(Path) when is_list(Path) -> - listen(Path, #{domain => local, method => plain}). + listen(Path, #{domain => local, async => false, method => plain}). listen(0, #{domain := local} = Opts) -> listen(mk_unique_path(), Opts); @@ -241,8 +254,10 @@ listen(Port, #{domain := Domain} = Opts) do_listen(SA, Cleanup, - #{domain := Domain, proto := Proto, method := Method} = Opts) - when (Method =:= plain) orelse (Method =:= msg) -> + #{domain := Domain, proto := Proto, + async := Async, method := Method} = Opts) + when (Method =:= plain) orelse (Method =:= msg) andalso + is_boolean(Async) -> try begin Sock = case socket:open(Domain, stream, Proto) of @@ -339,13 +354,18 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active, Method) +reader_init(ControllingProcess, Sock, Async, Active, Method) when is_pid(ControllingProcess) andalso + is_boolean(Async) andalso (is_boolean(Active) orelse (Active =:= once)) andalso ((Method =:= plain) orelse (Method =:= msg)) -> + put(verbose, false), MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, + async => Async, + select_info => undefined, + select_num => 0, % Count the number of select messages active => Active, sock => Sock, method => Method}). @@ -356,11 +376,11 @@ reader_loop(#{active := false, ctrl_proc := Pid} = State) -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, @@ -371,17 +391,16 @@ reader_loop(#{active := false, {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); MRef -> - exit({controlling_process, Reason}); + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end end; %% Read *once* and then change to false -reader_loop(#{active := once, +reader_loop(#{async := false, + active := once, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -392,25 +411,23 @@ reader_loop(#{active := once, {error, timeout} -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), - MRef = erlang:monitor(process, NewPid), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, - ctrl_proc_mref => MRef}); + ctrl_proc_mref => NewMRef}); {?MODULE, active, NewActive} -> reader_loop(State#{active => NewActive}); {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); - MRef -> - exit({controlling_process, Reason}); + MRef -> + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -418,17 +435,86 @@ reader_loop(#{active := once, reader_loop(State) end; - {error, closed} -> + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := undefined, + active := once, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock, nowait) of + {ok, {select, _, _} = SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false}); + + {error, closed} = E1 -> Pid ! ?CLOSED_MSG(Sock, Method), - exit(normal); + reader_exit(State, E1); - {error, Reason} -> + {error, Reason} = E2 -> Pid ! ?ERROR_MSG(Sock, Method, Reason), - exit(Reason) + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := {select, _, Ref}, + select_num := N, + active := once, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false, + select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end end; %% Read and forward data continuously -reader_loop(#{active := true, +reader_loop(#{async := false, + active := true, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -439,25 +525,23 @@ reader_loop(#{active := true, {error, timeout} -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), - MRef = erlang:monitor(process, NewPid), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, - ctrl_proc_mref => MRef}); + ctrl_proc_mref => NewMRef}); {?MODULE, active, NewActive} -> reader_loop(State#{active => NewActive}); {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); MRef -> - exit({controlling_process, Reason}); + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -465,27 +549,170 @@ reader_loop(#{active := true, reader_loop(State) end; - {error, closed} -> + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := undefined, + active := true, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock) of + {ok, {select, _, _} = SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State); + + {error, closed} = E1 -> Pid ! ?CLOSED_MSG(Sock, Method), - exit(normal); + reader_exit(State, E1); - {error, Reason} -> + {error, Reason} = E2 -> Pid ! ?ERROR_MSG(Sock, Method, Reason), - exit(Reason) + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := {select, _, Ref}, + select_num := N, + active := true, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end end. -do_recv(plain, Sock) -> - socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); -do_recv(msg, Sock) -> - case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of +do_recv(Method, Sock) -> + do_recv(Method, Sock, ?READER_RECV_TIMEOUT). + +do_recv(plain, Sock, Timeout) -> + socket:recv(Sock, 0, Timeout); +do_recv(msg, Sock, Timeout) -> + case socket:recvmsg(Sock, 0, 0, [], Timeout) of {ok, #{iov := [Bin]}} -> {ok, Bin}; + {ok, {select, _, _}} = OK -> + OK; {error, _} = ERROR -> ERROR end. - - + + +reader_exit(#{async := false, active := Active}, stop) -> + vp("reader stopped when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, stop) -> + vp("reader stopped when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w", [Active]), + exit({controlling_process, Reason}); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit({controlling_process, Reason}); +reader_exit(#{async := false, active := Active}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w", [Active]), + exit(Reason); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w: " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(Reason). + + + + + %% ========================================================================== +vp(F, A) -> + vp(get(verbose), F, A). + +vp(true, F, A) -> + p(F, A); +vp(_, _, _) -> + ok. + +p(F, A) -> + io:format(F ++ "~n", A). + -- cgit v1.2.3