aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2019-04-17 16:53:08 +0200
committerMicael Karlberg <[email protected]>2019-05-29 15:13:48 +0200
commit08f4df99bbaafbba86a216734fa603bd2996f2a3 (patch)
tree6c5152923c920de2d2273fb0476125dcadba71ac
parent22258359ff6633300ea6f28638eaae66deb0649a (diff)
downloadotp-08f4df99bbaafbba86a216734fa603bd2996f2a3.tar.gz
otp-08f4df99bbaafbba86a216734fa603bd2996f2a3.tar.bz2
otp-08f4df99bbaafbba86a216734fa603bd2996f2a3.zip
[socket|test] Add async to ttest
Make it possible for the tttest server to run with async.
-rwxr-xr-xerts/emulator/test/esock_ttest/esock-ttest22
-rwxr-xr-xerts/emulator/test/esock_ttest/esock-ttest-server-sock24
-rw-r--r--erts/emulator/test/socket_SUITE.erl46
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client_socket.erl82
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server.erl103
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server_socket.erl18
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_socket.erl331
7 files changed, 458 insertions, 168 deletions
diff --git a/erts/emulator/test/esock_ttest/esock-ttest b/erts/emulator/test/esock_ttest/esock-ttest
index cf1d9cd9ab..9adc51fc8b 100755
--- a/erts/emulator/test/esock_ttest/esock-ttest
+++ b/erts/emulator/test/esock_ttest/esock-ttest
@@ -60,6 +60,9 @@ usage() ->
"~n Which domain to use."
"~n Only valid for server."
"~n Defaults to: inet"
+ "~n --async Asynchronous mode (Timeout = nowait)"
+ "~n This option is only valid for transport = sock."
+ "~n Also, its only used when active =/= false."
"~n --active <active> boolean() | once."
"~n Valid for both client and server."
"~n Defaults to: false"
@@ -111,6 +114,7 @@ process_args(Args) ->
process_server_args(Args) ->
Defaults = #{role => server,
domain => inet,
+ async => false,
active => false,
transport => {sock, plain}},
process_server_args(Args, Defaults).
@@ -124,6 +128,9 @@ process_server_args(["--domain", Domain|Args], State)
(Domain =:= "inet6")) ->
process_server_args(Args, State#{domain => list_to_atom(Domain)});
+process_server_args(["--async"|Args], State) ->
+ process_server_args(Args, State#{async => true});
+
process_server_args(["--active", Active|Args], State)
when ((Active =:= "false") orelse
(Active =:= "once") orelse
@@ -145,6 +152,7 @@ process_server_args([Arg|_], _State) ->
process_client_args(Args) ->
Defaults = #{role => client,
+ async => false,
active => false,
transport => {sock, plain},
%% Will cause error if not provided
@@ -159,10 +167,13 @@ process_client_args(Args) ->
process_client_args([], State) ->
process_client_args_ensure_max_outstanding(State);
+process_client_args(["--async"|Args], State) ->
+ process_client_args(Args, State#{async => true});
+
process_client_args(["--active", Active|Args], State)
- when ((Active =:= "false") orelse
- (Active =:= "once") orelse
- (Active =:= "true")) ->
+ when (Active =:= "false") orelse
+ (Active =:= "once") orelse
+ (Active =:= "true") ->
process_client_args(Args, State#{active => list_to_atom(Active)});
process_client_args(["--transport", "gen" | Args], State) ->
@@ -280,9 +291,10 @@ exec(#{role := server,
end;
exec(#{role := server,
domain := Domain,
+ async := Async,
active := Active,
transport := {sock, Method}}) ->
- case socket_test_ttest_tcp_server_socket:start(Method, Domain, Active) of
+ case socket_test_ttest_tcp_server_socket:start(Method, Domain, Async, Active) of
{ok, {Pid, _}} ->
MRef = erlang:monitor(process, Pid),
receive
@@ -323,12 +335,14 @@ exec(#{role := client,
end;
exec(#{role := client,
server := ServerInfo,
+ async := Async,
active := Active,
transport := {sock, Method},
msg_id := MsgID,
max_outstanding := MaxOutstanding,
runtime := RunTime}) ->
case socket_test_ttest_tcp_client_socket:start(true,
+ Async,
Method,
ServerInfo,
Active,
diff --git a/erts/emulator/test/esock_ttest/esock-ttest-server-sock b/erts/emulator/test/esock_ttest/esock-ttest-server-sock
index 4ec0d335d9..4ea8ce0185 100755
--- a/erts/emulator/test/esock_ttest/esock-ttest-server-sock
+++ b/erts/emulator/test/esock_ttest/esock-ttest-server-sock
@@ -24,9 +24,27 @@ EMU=$ERL_TOP/erts/emulator
EMU_TEST=$EMU/test
ESOCK_TTEST=$EMU_TEST/esock_ttest
-if [ $# = 1 ]; then
- ACTIVE="--active $1"
+# $1 - async - boolean()
+# $2 - active - once | boolean()
+if [ $# = 2 ]; then
+
+ async=$1
+ active=$2
+
+ if [ $async = true ]; then
+ ASYNC="--async"
+ else
+ ASYNC=
+ fi
+
+ ACTIVE="--active $async"
+
+else
+ echo "<ERROR> Missing args: async and active"
+ echo ""
+ exit 1
fi
-$ESOCK_TTEST/esock-ttest --server --transport sock $ACTIVE
+
+$ESOCK_TTEST/esock-ttest --server $ASYNC --transport sock $ACTIVE
diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl
index f5757d5ce6..d4478bd9e3 100644
--- a/erts/emulator/test/socket_SUITE.erl
+++ b/erts/emulator/test/socket_SUITE.erl
@@ -2729,8 +2729,7 @@ api_a_send_and_recv_udp(InitState) ->
%% *** Init part ***
#{desc => "which local address",
cmd => fun(#{domain := Domain} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain, addr => LAddr},
+ LSA = which_local_socket_addr(Domain),
{ok, State#{local_sa => LSA}}
end},
#{desc => "create socket",
@@ -2864,8 +2863,7 @@ api_a_send_and_recv_udp(InitState) ->
%% *** Init part ***
#{desc => "local address",
cmd => fun(#{domain := Domain} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain, addr => LAddr},
+ LSA = which_local_socket_addr(Domain),
{ok, State#{lsa => LSA}}
end},
#{desc => "open socket",
@@ -2876,8 +2874,12 @@ api_a_send_and_recv_udp(InitState) ->
end},
#{desc => "bind socket (to local address)",
cmd => fun(#{sock := Sock, lsa := LSA}) ->
- sock_bind(Sock, LSA),
- ok
+ case socket:bind(Sock, LSA) of
+ {ok, _Port} ->
+ ok;
+ {error, _} = ERROR ->
+ ERROR
+ end
end},
#{desc => "announce ready (init)",
cmd => fun(#{tester := Tester}) ->
@@ -3209,8 +3211,7 @@ api_a_send_and_recv_tcp(InitState) ->
%% *** Init part ***
#{desc => "which local address",
cmd => fun(#{domain := Domain} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain, addr => LAddr},
+ LSA = which_local_socket_addr(Domain),
{ok, State#{lsa => LSA}}
end},
#{desc => "create listen socket",
@@ -3401,10 +3402,8 @@ api_a_send_and_recv_tcp(InitState) ->
%% *** The init part ***
#{desc => "which server (local) address",
cmd => fun(#{domain := Domain, server_port := Port} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain,
- addr => LAddr},
- SSA = LSA#{port => Port},
+ LSA = which_local_socket_addr(Domain),
+ SSA = LSA#{port => Port},
{ok, State#{local_sa => LSA, server_sa => SSA}}
end},
#{desc => "create socket",
@@ -3764,8 +3763,7 @@ api_a_recv_cancel_udp(InitState) ->
%% *** Init part ***
#{desc => "which local address",
cmd => fun(#{domain := Domain} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain, addr => LAddr},
+ LSA = which_local_socket_addr(Domain),
{ok, State#{local_sa => LSA}}
end},
#{desc => "create socket",
@@ -3999,8 +3997,7 @@ api_a_accept_cancel_tcp(InitState) ->
%% *** Init part ***
#{desc => "which local address",
cmd => fun(#{domain := Domain} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain, addr => LAddr},
+ LSA = which_local_socket_addr(Domain),
{ok, State#{lsa => LSA}}
end},
#{desc => "create listen socket",
@@ -4245,8 +4242,7 @@ api_a_recv_cancel_tcp(InitState) ->
%% *** Init part ***
#{desc => "which local address",
cmd => fun(#{domain := Domain} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain, addr => LAddr},
+ LSA = which_local_socket_addr(Domain),
{ok, State#{lsa => LSA}}
end},
#{desc => "create listen socket",
@@ -4389,10 +4385,8 @@ api_a_recv_cancel_tcp(InitState) ->
%% *** The init part ***
#{desc => "which server (local) address",
cmd => fun(#{domain := Domain, server_port := Port} = State) ->
- LAddr = which_local_addr(Domain),
- LSA = #{family => Domain,
- addr => LAddr},
- SSA = LSA#{port => Port},
+ LSA = which_local_socket_addr(Domain),
+ SSA = LSA#{port => Port},
{ok, State#{local_sa => LSA, server_sa => SSA}}
end},
#{desc => "create socket",
@@ -21692,7 +21686,9 @@ ttest_tcp_server_start(Node, _Domain, gen, Active) ->
socket_test_ttest_tcp_server:start_monitor(Node, Transport, Active);
ttest_tcp_server_start(Node, Domain, sock, Active) ->
TransportMod = socket_test_ttest_tcp_socket,
- Transport = {TransportMod, #{domain => Domain, method => plain}},
+ Transport = {TransportMod, #{domain => Domain,
+ async => true,
+ method => plain}},
socket_test_ttest_tcp_server:start_monitor(Node, Transport, Active).
ttest_tcp_server_stop(Pid) ->
@@ -21714,7 +21710,9 @@ ttest_tcp_client_start(Node,
Domain, sock,
ServerInfo, Active, MsgID, MaxOutstanding, RunTime) ->
TransportMod = socket_test_ttest_tcp_socket,
- Transport = {TransportMod, #{domain => Domain, method => plain}},
+ Transport = {TransportMod, #{domain => Domain,
+ async => true,
+ method => plain}},
socket_test_ttest_tcp_client:start_monitor(Node,
Notify,
Transport,
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
index ccace2a560..ca7eff4437 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
@@ -21,89 +21,95 @@
-module(socket_test_ttest_tcp_client_socket).
-export([
- start/3, start/4, start/6, start/7,
+ start/4, start/5, start/7, start/8,
stop/1
]).
-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
--define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, method => M}}).
+-define(MOD(D, A, M), {?TRANSPORT_MOD, #{domain => D,
+ async => A,
+ method => M}}).
-start(Method, ServerInfo, Active)
+start(Method, Async, Active, ServerInfo)
when is_list(ServerInfo) ->
Domain = local,
- socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method),
- ServerInfo, Active);
-start(Method, ServerInfo = {Addr, _}, Active)
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method),
+ Active, ServerInfo);
+start(Method, Async, Active, ServerInfo = {Addr, _})
when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
Domain = inet,
- socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method),
- ServerInfo, Active);
-start(Method, ServerInfo = {Addr, _}, Active)
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method),
+ Active, ServerInfo);
+start(Method, Async, Active, ServerInfo = {Addr, _})
when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
Domain = inet6,
- socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Method),
- ServerInfo, Active).
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Domain, Async, Method),
+ Active, ServerInfo).
-start(Method, ServerInfo, Active, MsgID)
+start(Method, Async, Active, ServerInfo, MsgID)
when is_list(ServerInfo) ->
%% This is just a simplification
Domain = local,
- socket_test_ttest_tcp_client:start(?MOD(Domain, Method),
- ServerInfo, Active, MsgID);
-start(Method, ServerInfo = {Addr, _}, Active, MsgID)
+ socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method),
+ Active, ServerInfo, MsgID);
+start(Method, Async, Active, ServerInfo = {Addr, _}, MsgID)
when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
%% This is just a simplification
Domain = inet,
- socket_test_ttest_tcp_client:start(?MOD(Domain, Method),
- ServerInfo, Active, MsgID);
-start(Method, ServerInfo = {Addr, _}, Active, MsgID)
+ socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method),
+ Active, ServerInfo, MsgID);
+start(Method, Async, Active, ServerInfo = {Addr, _}, MsgID)
when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
Domain = inet6,
- socket_test_ttest_tcp_client:start(?MOD(Domain, Method),
- ServerInfo, Active, MsgID).
+ socket_test_ttest_tcp_client:start(?MOD(Domain, Async, Method),
+ Active, ServerInfo, MsgID).
-start(Method, ServerInfo, Active, MsgID, MaxOutstanding, RunTime)
+start(Method, Async, Active, ServerInfo, MsgID, MaxOutstanding, RunTime)
when is_list(ServerInfo) ->
Domain = local,
socket_test_ttest_tcp_client:start(false,
- ?MOD(Domain, Method),
- ServerInfo, Active,
+ ?MOD(Domain, Async, Method),
+ Active, ServerInfo,
MsgID, MaxOutstanding, RunTime);
-start(Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime)
+start(Method, Async, Active, ServerInfo = {Addr, _},
+ MsgID, MaxOutstanding, RunTime)
when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
Domain = inet,
socket_test_ttest_tcp_client:start(false,
- ?MOD(Domain, Method),
- ServerInfo, Active,
+ ?MOD(Domain, Async, Method),
+ Active, ServerInfo,
MsgID, MaxOutstanding, RunTime);
-start(Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime)
+start(Method, Async, Active, ServerInfo = {Addr, _},
+ MsgID, MaxOutstanding, RunTime)
when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
Domain = inet6,
socket_test_ttest_tcp_client:start(false,
- ?MOD(Domain, Method),
- ServerInfo, Active,
+ ?MOD(Domain, Async, Method),
+ Active, ServerInfo,
MsgID, MaxOutstanding, RunTime).
-start(Quiet, Method, ServerInfo, Active, MsgID, MaxOutstanding, RunTime)
+start(Quiet, Async, Active, Method, ServerInfo, MsgID, MaxOutstanding, RunTime)
when is_list(ServerInfo) ->
Domain = local,
socket_test_ttest_tcp_client:start(Quiet,
- ?MOD(Domain, Method),
- ServerInfo, Active,
+ ?MOD(Domain, Async, Method),
+ Active, ServerInfo,
MsgID, MaxOutstanding, RunTime);
-start(Quiet, Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime)
+start(Quiet, Async, Active, Method, ServerInfo = {Addr, _},
+ MsgID, MaxOutstanding, RunTime)
when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
Domain = inet,
socket_test_ttest_tcp_client:start(Quiet,
- ?MOD(Domain, Method),
- ServerInfo, Active,
+ ?MOD(Domain, Async, Method),
+ Active, ServerInfo,
MsgID, MaxOutstanding, RunTime);
-start(Quiet, Method, ServerInfo = {Addr, _}, Active, MsgID, MaxOutstanding, RunTime)
+start(Quiet, Async, Active, Method, ServerInfo = {Addr, _},
+ MsgID, MaxOutstanding, RunTime)
when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
Domain = inet6,
socket_test_ttest_tcp_client:start(Quiet,
- ?MOD(Domain, Method),
- ServerInfo, Active,
+ ?MOD(Domain, Async, Method),
+ Active, ServerInfo,
MsgID, MaxOutstanding, RunTime).
stop(Pid) ->
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl
index e916fcb93e..f0a2ae73ec 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_server.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl
@@ -90,7 +90,9 @@ start_monitor(_, Transport, Active) ->
start(Transport, Active) ->
do_start(self(), Transport, Active).
-
+%% Note that the Async option is actually only "used" for the
+%% socket transport module (it details how to implement the
+%% active feature).
do_start(Parent, Transport, Active)
when is_pid(Parent) andalso
(is_atom(Transport) orelse is_tuple(Transport)) andalso
@@ -167,7 +169,7 @@ server_init(Starter, Parent, Transport, Active) ->
end.
process_transport(Mod, _) when is_atom(Mod) ->
- {Mod, fun(Port) -> Mod:listen(Port) end, infinity};
+ {Mod, false, fun(Port) -> Mod:listen(Port) end, infinity};
process_transport({Mod, #{stats_interval := T} = Opts}, Active)
when (Active =/= false) ->
{Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T};
@@ -179,42 +181,59 @@ process_transport({Mod, Opts}, _Active) ->
server_loop(State) ->
- server_loop( server_handle_message( server_accept(State) ) ).
+ server_loop( server_handle_message( server_accept(State, ?ACC_TIMEOUT), 0) ).
-server_accept(#{mod := Mod,
- active := Active,
- lsock := LSock,
- handlers := Handlers} = State) ->
- case Mod:accept(LSock, ?ACC_TIMEOUT) of
+server_accept(#{mod := Mod, lsock := LSock} = State, Timeout) ->
+ case Mod:accept(LSock, Timeout) of
{ok, Sock} ->
- ?I("accepted connection from ~s",
- [case Mod:peername(Sock) of
- {ok, Peer} ->
- format_peername(Peer);
- {error, _} ->
- "-"
- end]),
- {Pid, _} = handler_start(),
- ?I("handler ~p started -> try transfer socket control", [Pid]),
- case Mod:controlling_process(Sock, Pid) of
- ok ->
- maybe_start_stats_timer(State, Pid),
- ?I("server-accept: handler ~p started", [Pid]),
- handler_continue(Pid, Mod, Sock, Active),
- Handlers2 = [Pid | Handlers],
- State#{handlers => Handlers2};
- {error, CPReason} ->
- (catch Mod:close(Sock)),
- (catch Mod:close(LSock)),
- exit({controlling_process, CPReason})
- end;
- {error, timeout} ->
+ server_handle_accepted(State, Sock);
+ {error, timeout} when (Timeout =/= nowait) ->
State;
{error, AReason} ->
(catch Mod:close(LSock)),
exit({accept, AReason})
end.
+%% server_accept(#{mod := Mod,
+%% lsock := LSock} = State) ->
+%% case Mod:accept(LSock, ?ACC_TIMEOUT) of
+%% {ok, Sock} ->
+%% server_handle_accepted(State, Sock);
+%% {error, timeout} ->
+%% State;
+%% {error, AReason} ->
+%% (catch Mod:close(LSock)),
+%% exit({accept, AReason})
+%% end.
+
+server_handle_accepted(#{mod := Mod,
+ lsock := LSock,
+ active := Active,
+ handlers := Handlers} = State,
+ Sock) ->
+ ?I("accepted connection from ~s",
+ [case Mod:peername(Sock) of
+ {ok, Peer} ->
+ format_peername(Peer);
+ {error, _} ->
+ "-"
+ end]),
+ {Pid, _} = handler_start(),
+ ?I("handler ~p started -> try transfer socket control", [Pid]),
+ case Mod:controlling_process(Sock, Pid) of
+ ok ->
+ maybe_start_stats_timer(State, Pid),
+ ?I("server-accept: handler ~p started", [Pid]),
+ handler_continue(Pid, Mod, Sock, Active),
+ Handlers2 = [Pid | Handlers],
+ State#{handlers => Handlers2};
+ {error, CPReason} ->
+ (catch Mod:close(Sock)),
+ (catch Mod:close(LSock)),
+ exit({controlling_process, CPReason})
+ end.
+
+
format_peername({Addr, Port}) ->
case inet:gethostbyaddr(Addr) of
{ok, #hostent{h_name = N}} ->
@@ -237,7 +256,7 @@ start_stats_timer(Time, ProcStr, Pid) ->
server_handle_message(#{mod := Mod,
lsock := LSock,
parent := Parent,
- handlers := H} = State) ->
+ handlers := H} = State, Timeout) ->
receive
{timeout, _TRef, {stats, Interval, ProcStr, Pid}} ->
case server_handle_stats(ProcStr, Pid) of
@@ -247,7 +266,7 @@ server_handle_message(#{mod := Mod,
ok
end,
State;
-
+
{?MODULE, Ref, Parent, stop} ->
reply(Parent, Ref, ok),
lists:foreach(fun(P) -> handler_stop(P) end, H),
@@ -257,7 +276,7 @@ server_handle_message(#{mod := Mod,
{'DOWN', _MRef, process, Pid, Reason} ->
server_handle_down(Pid, Reason, State)
- after 0 ->
+ after Timeout ->
State
end.
@@ -342,15 +361,15 @@ handler_init(Parent) ->
?I("received continue"),
reply(Parent, Ref, ok),
handler_initial_activation(Mod, Sock, Active),
- handler_loop(#{parent => Parent,
- mod => Mod,
- sock => Sock,
- active => Active,
- start => ?T(),
- mcnt => 0,
- bcnt => 0,
- last_reply => none,
- acc => <<>>})
+ handler_loop(#{parent => Parent,
+ mod => Mod,
+ sock => Sock,
+ active => Active,
+ start => ?T(),
+ mcnt => 0,
+ bcnt => 0,
+ last_reply => none,
+ acc => <<>>})
after 5000 ->
?I("timeout when message queue: "
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
index d1de230637..4045bf4e4e 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
@@ -21,18 +21,26 @@
-module(socket_test_ttest_tcp_server_socket).
-export([
- start/3,
+ start/4,
stop/1
]).
-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
-%% -define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D,
+%% -define(MOD(M), {?TRANSPORT_MOD, #{async => false,
%% method => M,
%% stats_interval => 10000}}).
--define(MOD(D, M), {?TRANSPORT_MOD, #{domain => D, method => M}}).
+-define(MOD(D,M,A), {?TRANSPORT_MOD, #{domain => D,
+ async => A,
+ method => M}}).
-start(Method, Domain, Active) ->
- socket_test_ttest_tcp_server:start(?MOD(Domain, Method), Active).
+start(Method, Domain, Async, Active) ->
+ socket_test_ttest_tcp_server:start(?MOD(Domain, Method, Async), Active).
+ %% {ok, {Pid, AddrPort}} ->
+ %% MRef = erlang:monitor(process, Pid),
+ %% {ok, {Pid, MRef, AddrPort}};
+ %% {error, _} = ERROR ->
+ %% ERROR
+ %% end.
stop(Pid) ->
socket_test_ttest_tcp_server:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
index cf68bfe591..65fbba44c6 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
@@ -57,7 +57,7 @@
%% ==========================================================================
-%% This does not really work. Its just a placeholder for the time beeing...
+%% This does not really work. Its just a placeholder for the time being...
%% getopt(Sock, Opt) when is_atom(Opt) ->
%% socket:getopt(Sock, socket, Opt).
@@ -68,22 +68,32 @@
%% ==========================================================================
-accept(#{sock := LSock, opts := #{method := Method} = Opts}) ->
+%% The way we use server async its no point in doing a async accept call
+%% (we do never actually run the test with more than one client).
+accept(#{sock := LSock, opts := #{async := Async,
+ method := Method} = Opts}) ->
case socket:accept(LSock) of
- {ok, Sock} ->
+ {ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ Reader = spawn(fun() ->
+ reader_init(Self, Sock, Async, false, Method)
+ end),
maybe_start_stats_timer(Opts, Reader),
{ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
-accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) ->
+%% If a timeout has been explictly specified, then we do not use
+%% async here. We will pass it on to the reader process.
+accept(#{sock := LSock, opts := #{async := Async,
+ method := Method} = Opts}, Timeout) ->
case socket:accept(LSock, Timeout) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ Reader = spawn(fun() ->
+ reader_init(Self, Sock, Async, false, Method)
+ end),
maybe_start_stats_timer(Opts, Reader),
{ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
@@ -153,7 +163,8 @@ connect(Addr, Port, #{domain := Domain} = Opts) ->
do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}).
do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain,
- proto := Proto,
+ proto := Proto,
+ async := Async,
method := Method} = Opts) ->
try
begin
@@ -181,7 +192,9 @@ do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain,
throw({error, {connect, CReason}})
end,
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ Reader = spawn(fun() ->
+ reader_init(Self, Sock, Async, false, Method)
+ end),
maybe_start_stats_timer(Opts, Reader),
{ok, #{sock => Sock, reader => Reader, method => Method}}
end
@@ -219,9 +232,9 @@ listen() ->
listen(0).
listen(Port) when is_integer(Port) ->
- listen(Port, #{domain => inet, method => plain});
+ listen(Port, #{domain => inet, async => false, method => plain});
listen(Path) when is_list(Path) ->
- listen(Path, #{domain => local, method => plain}).
+ listen(Path, #{domain => local, async => false, method => plain}).
listen(0, #{domain := local} = Opts) ->
listen(mk_unique_path(), Opts);
@@ -241,8 +254,10 @@ listen(Port, #{domain := Domain} = Opts)
do_listen(SA,
Cleanup,
- #{domain := Domain, proto := Proto, method := Method} = Opts)
- when (Method =:= plain) orelse (Method =:= msg) ->
+ #{domain := Domain, proto := Proto,
+ async := Async, method := Method} = Opts)
+ when (Method =:= plain) orelse (Method =:= msg) andalso
+ is_boolean(Async) ->
try
begin
Sock = case socket:open(Domain, stream, Proto) of
@@ -339,13 +354,18 @@ sockname(#{sock := Sock}) ->
%% ==========================================================================
-reader_init(ControllingProcess, Sock, Active, Method)
+reader_init(ControllingProcess, Sock, Async, Active, Method)
when is_pid(ControllingProcess) andalso
+ is_boolean(Async) andalso
(is_boolean(Active) orelse (Active =:= once)) andalso
((Method =:= plain) orelse (Method =:= msg)) ->
+ put(verbose, false),
MRef = erlang:monitor(process, ControllingProcess),
reader_loop(#{ctrl_proc => ControllingProcess,
ctrl_proc_mref => MRef,
+ async => Async,
+ select_info => undefined,
+ select_num => 0, % Count the number of select messages
active => Active,
sock => Sock,
method => Method}).
@@ -356,11 +376,11 @@ reader_loop(#{active := false,
ctrl_proc := Pid} = State) ->
receive
{?MODULE, stop} ->
- exit(normal);
+ reader_exit(State, stop);
{?MODULE, Pid, controlling_process, NewPid} ->
- MRef = maps:get(ctrl_proc_mref, State),
- erlang:demonitor(MRef, [flush]),
+ OldMRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(OldMRef, [flush]),
NewMRef = erlang:monitor(process, NewPid),
Pid ! {?MODULE, self(), controlling_process},
reader_loop(State#{ctrl_proc => NewPid,
@@ -371,17 +391,16 @@ reader_loop(#{active := false,
{'DOWN', MRef, process, Pid, Reason} ->
case maps:get(ctrl_proc_mref, State) of
- MRef when (Reason =:= normal) ->
- exit(normal);
MRef ->
- exit({controlling_process, Reason});
+ reader_exit(State, {ctrl_exit, Reason});
_ ->
reader_loop(State)
end
end;
%% Read *once* and then change to false
-reader_loop(#{active := once,
+reader_loop(#{async := false,
+ active := once,
sock := Sock,
method := Method,
ctrl_proc := Pid} = State) ->
@@ -392,25 +411,23 @@ reader_loop(#{active := once,
{error, timeout} ->
receive
{?MODULE, stop} ->
- exit(normal);
+ reader_exit(State, stop);
{?MODULE, Pid, controlling_process, NewPid} ->
- MRef = maps:get(ctrl_proc_mref, State),
- erlang:demonitor(MRef, [flush]),
- MRef = erlang:monitor(process, NewPid),
+ OldMRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(OldMRef, [flush]),
+ NewMRef = erlang:monitor(process, NewPid),
Pid ! {?MODULE, self(), controlling_process},
reader_loop(State#{ctrl_proc => NewPid,
- ctrl_proc_mref => MRef});
+ ctrl_proc_mref => NewMRef});
{?MODULE, active, NewActive} ->
reader_loop(State#{active => NewActive});
{'DOWN', MRef, process, Pid, Reason} ->
case maps:get(ctrl_proc_mref, State) of
- MRef when (Reason =:= normal) ->
- exit(normal);
- MRef ->
- exit({controlling_process, Reason});
+ MRef ->
+ reader_exit(State, {ctrl_exit, Reason});
_ ->
reader_loop(State)
end
@@ -418,17 +435,86 @@ reader_loop(#{active := once,
reader_loop(State)
end;
- {error, closed} ->
+ {error, closed} = E1 ->
+ Pid ! ?CLOSED_MSG(Sock, Method),
+ reader_exit(State, E1);
+
+ {error, Reason} = E2 ->
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
+ reader_exit(State, E2)
+ end;
+reader_loop(#{async := true,
+ select_info := undefined,
+ active := once,
+ sock := Sock,
+ method := Method,
+ ctrl_proc := Pid} = State) ->
+ case do_recv(Method, Sock, nowait) of
+ {ok, {select, _, _} = SelectInfo} ->
+ reader_loop(State#{select_info => SelectInfo});
+ {ok, Data} ->
+ Pid ! ?DATA_MSG(Sock, Method, Data),
+ reader_loop(State#{active => false});
+
+ {error, closed} = E1 ->
Pid ! ?CLOSED_MSG(Sock, Method),
- exit(normal);
+ reader_exit(State, E1);
- {error, Reason} ->
+ {error, Reason} = E2 ->
Pid ! ?ERROR_MSG(Sock, Method, Reason),
- exit(Reason)
+ reader_exit(State, E2)
+ end;
+reader_loop(#{async := true,
+ select_info := {select, _, Ref},
+ select_num := N,
+ active := once,
+ sock := Sock,
+ method := Method,
+ ctrl_proc := Pid} = State) ->
+ receive
+ {?MODULE, stop} ->
+ reader_exit(State, stop);
+
+ {?MODULE, Pid, controlling_process, NewPid} ->
+ OldMRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(OldMRef, [flush]),
+ NewMRef = erlang:monitor(process, NewPid),
+ Pid ! {?MODULE, self(), controlling_process},
+ reader_loop(State#{ctrl_proc => NewPid,
+ ctrl_proc_mref => NewMRef});
+
+ {?MODULE, active, NewActive} ->
+ reader_loop(State#{active => NewActive});
+
+ {'DOWN', MRef, process, Pid, Reason} ->
+ case maps:get(ctrl_proc_mref, State) of
+ MRef ->
+ reader_exit(State, {ctrl_exit, Reason});
+ _ ->
+ reader_loop(State)
+ end;
+
+ {'$socket', Sock, select, Ref} ->
+ case do_recv(Method, Sock, nowait) of
+ {ok, Data} when is_binary(Data) ->
+ Pid ! ?DATA_MSG(Sock, Method, Data),
+ reader_loop(State#{active => false,
+ select_info => undefined,
+ select_num => N+1});
+
+ {error, closed} = E1 ->
+ Pid ! ?CLOSED_MSG(Sock, Method),
+ reader_exit(State, E1);
+
+ {error, Reason} = E2 ->
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
+ reader_exit(State, E2)
+ end
end;
%% Read and forward data continuously
-reader_loop(#{active := true,
+reader_loop(#{async := false,
+ active := true,
sock := Sock,
method := Method,
ctrl_proc := Pid} = State) ->
@@ -439,25 +525,23 @@ reader_loop(#{active := true,
{error, timeout} ->
receive
{?MODULE, stop} ->
- exit(normal);
+ reader_exit(State, stop);
{?MODULE, Pid, controlling_process, NewPid} ->
- MRef = maps:get(ctrl_proc_mref, State),
- erlang:demonitor(MRef, [flush]),
- MRef = erlang:monitor(process, NewPid),
+ OldMRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(OldMRef, [flush]),
+ NewMRef = erlang:monitor(process, NewPid),
Pid ! {?MODULE, self(), controlling_process},
reader_loop(State#{ctrl_proc => NewPid,
- ctrl_proc_mref => MRef});
+ ctrl_proc_mref => NewMRef});
{?MODULE, active, NewActive} ->
reader_loop(State#{active => NewActive});
{'DOWN', MRef, process, Pid, Reason} ->
case maps:get(ctrl_proc_mref, State) of
- MRef when (Reason =:= normal) ->
- exit(normal);
MRef ->
- exit({controlling_process, Reason});
+ reader_exit(State, {ctrl_exit, Reason});
_ ->
reader_loop(State)
end
@@ -465,27 +549,170 @@ reader_loop(#{active := true,
reader_loop(State)
end;
- {error, closed} ->
+ {error, closed} = E1 ->
+ Pid ! ?CLOSED_MSG(Sock, Method),
+ reader_exit(State, E1);
+
+ {error, Reason} = E2 ->
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
+ reader_exit(State, E2)
+ end;
+reader_loop(#{async := true,
+ select_info := undefined,
+ active := true,
+ sock := Sock,
+ method := Method,
+ ctrl_proc := Pid} = State) ->
+ case do_recv(Method, Sock) of
+ {ok, {select, _, _} = SelectInfo} ->
+ reader_loop(State#{select_info => SelectInfo});
+ {ok, Data} ->
+ Pid ! ?DATA_MSG(Sock, Method, Data),
+ reader_loop(State);
+
+ {error, closed} = E1 ->
Pid ! ?CLOSED_MSG(Sock, Method),
- exit(normal);
+ reader_exit(State, E1);
- {error, Reason} ->
+ {error, Reason} = E2 ->
Pid ! ?ERROR_MSG(Sock, Method, Reason),
- exit(Reason)
+ reader_exit(State, E2)
+ end;
+reader_loop(#{async := true,
+ select_info := {select, _, Ref},
+ select_num := N,
+ active := true,
+ sock := Sock,
+ method := Method,
+ ctrl_proc := Pid} = State) ->
+ receive
+ {?MODULE, stop} ->
+ reader_exit(State, stop);
+
+ {?MODULE, Pid, controlling_process, NewPid} ->
+ OldMRef = maps:get(ctrl_proc_mref, State),
+ erlang:demonitor(OldMRef, [flush]),
+ NewMRef = erlang:monitor(process, NewPid),
+ Pid ! {?MODULE, self(), controlling_process},
+ reader_loop(State#{ctrl_proc => NewPid,
+ ctrl_proc_mref => NewMRef});
+
+ {?MODULE, active, NewActive} ->
+ reader_loop(State#{active => NewActive});
+
+ {'DOWN', MRef, process, Pid, Reason} ->
+ case maps:get(ctrl_proc_mref, State) of
+ MRef ->
+ reader_exit(State, {ctrl_exit, Reason});
+ _ ->
+ reader_loop(State)
+ end;
+
+ {'$socket', Sock, select, Ref} ->
+ case do_recv(Method, Sock, nowait) of
+ {ok, Data} when is_binary(Data) ->
+ Pid ! ?DATA_MSG(Sock, Method, Data),
+ reader_loop(State#{select_info => undefined,
+ select_num => N+1});
+
+ {error, closed} = E1 ->
+ Pid ! ?CLOSED_MSG(Sock, Method),
+ reader_exit(State, E1);
+
+ {error, Reason} = E2 ->
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
+ reader_exit(State, E2)
+ end
end.
-do_recv(plain, Sock) ->
- socket:recv(Sock, 0, ?READER_RECV_TIMEOUT);
-do_recv(msg, Sock) ->
- case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of
+do_recv(Method, Sock) ->
+ do_recv(Method, Sock, ?READER_RECV_TIMEOUT).
+
+do_recv(plain, Sock, Timeout) ->
+ socket:recv(Sock, 0, Timeout);
+do_recv(msg, Sock, Timeout) ->
+ case socket:recvmsg(Sock, 0, 0, [], Timeout) of
{ok, #{iov := [Bin]}} ->
{ok, Bin};
+ {ok, {select, _, _}} = OK ->
+ OK;
{error, _} = ERROR ->
ERROR
end.
-
-
+
+
+reader_exit(#{async := false, active := Active}, stop) ->
+ vp("reader stopped when active: ~w", [Active]),
+ exit(normal);
+reader_exit(#{async := true,
+ active := Active,
+ select_info := SelectInfo,
+ select_num := N}, stop) ->
+ vp("reader stopped when active: ~w"
+ "~n Current select info: ~p"
+ "~n Number of select messages: ~p", [Active, SelectInfo, N]),
+ exit(normal);
+reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) ->
+ vp("reader ctrl exit when active: ~w", [Active]),
+ exit(normal);
+reader_exit(#{async := true,
+ active := Active,
+ select_info := SelectInfo,
+ select_num := N}, {ctrl_exit, normal}) ->
+ vp("reader ctrl exit when active: ~w"
+ "~n Current select info: ~p"
+ "~n Number of select messages: ~p", [Active, SelectInfo, N]),
+ exit(normal);
+reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) ->
+ vp("reader exit when ctrl crash when active: ~w", [Active]),
+ exit({controlling_process, Reason});
+reader_exit(#{async := true,
+ active := Active,
+ select_info := SelectInfo,
+ select_num := N}, {ctrl_exit, Reason}) ->
+ vp("reader exit when ctrl crash when active: ~w"
+ "~n Current select info: ~p"
+ "~n Number of select messages: ~p", [Active, SelectInfo, N]),
+ exit({controlling_process, Reason});
+reader_exit(#{async := false, active := Active}, {error, closed}) ->
+ vp("reader exit when socket closed when active: ~w", [Active]),
+ exit(normal);
+reader_exit(#{async := true,
+ active := Active,
+ select_info := SelectInfo,
+ select_num := N}, {error, closed}) ->
+ vp("reader exit when socket closed when active: ~w "
+ "~n Current select info: ~p"
+ "~n Number of select messages: ~p", [Active, SelectInfo, N]),
+ exit(normal);
+reader_exit(#{async := false, active := Active}, {error, Reason}) ->
+ vp("reader exit when socket error when active: ~w", [Active]),
+ exit(Reason);
+reader_exit(#{async := true,
+ active := Active,
+ select_info := SelectInfo,
+ select_num := N}, {error, Reason}) ->
+ vp("reader exit when socket error when active: ~w: "
+ "~n Current select info: ~p"
+ "~n Number of select messages: ~p", [Active, SelectInfo, N]),
+ exit(Reason).
+
+
+
+
+
%% ==========================================================================
+vp(F, A) ->
+ vp(get(verbose), F, A).
+
+vp(true, F, A) ->
+ p(F, A);
+vp(_, _, _) ->
+ ok.
+
+p(F, A) ->
+ io:format(F ++ "~n", A).
+