From 08f4df99bbaafbba86a216734fa603bd2996f2a3 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Wed, 17 Apr 2019 16:53:08 +0200 Subject: [socket|test] Add async to ttest Make it possible for the tttest server to run with async. --- .../emulator/test/socket_test_ttest_tcp_socket.erl | 331 +++++++++++++++++---- 1 file changed, 279 insertions(+), 52 deletions(-) (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl') diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index cf68bfe591..65fbba44c6 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -57,7 +57,7 @@ %% ========================================================================== -%% This does not really work. Its just a placeholder for the time beeing... +%% This does not really work. Its just a placeholder for the time being... %% getopt(Sock, Opt) when is_atom(Opt) -> %% socket:getopt(Sock, socket, Opt). @@ -68,22 +68,32 @@ %% ========================================================================== -accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> +%% The way we use server async its no point in doing a async accept call +%% (we do never actually run the test with more than one client). +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}) -> case socket:accept(LSock) of - {ok, Sock} -> + {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> +%% If a timeout has been explictly specified, then we do not use +%% async here. We will pass it on to the reader process. +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> @@ -153,7 +163,8 @@ connect(Addr, Port, #{domain := Domain} = Opts) -> do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}). do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, - proto := Proto, + proto := Proto, + async := Async, method := Method} = Opts) -> try begin @@ -181,7 +192,9 @@ do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, throw({error, {connect, CReason}}) end, Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), maybe_start_stats_timer(Opts, Reader), {ok, #{sock => Sock, reader => Reader, method => Method}} end @@ -219,9 +232,9 @@ listen() -> listen(0). listen(Port) when is_integer(Port) -> - listen(Port, #{domain => inet, method => plain}); + listen(Port, #{domain => inet, async => false, method => plain}); listen(Path) when is_list(Path) -> - listen(Path, #{domain => local, method => plain}). + listen(Path, #{domain => local, async => false, method => plain}). listen(0, #{domain := local} = Opts) -> listen(mk_unique_path(), Opts); @@ -241,8 +254,10 @@ listen(Port, #{domain := Domain} = Opts) do_listen(SA, Cleanup, - #{domain := Domain, proto := Proto, method := Method} = Opts) - when (Method =:= plain) orelse (Method =:= msg) -> + #{domain := Domain, proto := Proto, + async := Async, method := Method} = Opts) + when (Method =:= plain) orelse (Method =:= msg) andalso + is_boolean(Async) -> try begin Sock = case socket:open(Domain, stream, Proto) of @@ -339,13 +354,18 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active, Method) +reader_init(ControllingProcess, Sock, Async, Active, Method) when is_pid(ControllingProcess) andalso + is_boolean(Async) andalso (is_boolean(Active) orelse (Active =:= once)) andalso ((Method =:= plain) orelse (Method =:= msg)) -> + put(verbose, false), MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, + async => Async, + select_info => undefined, + select_num => 0, % Count the number of select messages active => Active, sock => Sock, method => Method}). @@ -356,11 +376,11 @@ reader_loop(#{active := false, ctrl_proc := Pid} = State) -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, @@ -371,17 +391,16 @@ reader_loop(#{active := false, {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); MRef -> - exit({controlling_process, Reason}); + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end end; %% Read *once* and then change to false -reader_loop(#{active := once, +reader_loop(#{async := false, + active := once, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -392,25 +411,23 @@ reader_loop(#{active := once, {error, timeout} -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), - MRef = erlang:monitor(process, NewPid), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, - ctrl_proc_mref => MRef}); + ctrl_proc_mref => NewMRef}); {?MODULE, active, NewActive} -> reader_loop(State#{active => NewActive}); {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); - MRef -> - exit({controlling_process, Reason}); + MRef -> + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -418,17 +435,86 @@ reader_loop(#{active := once, reader_loop(State) end; - {error, closed} -> + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := undefined, + active := once, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock, nowait) of + {ok, {select, _, _} = SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false}); + + {error, closed} = E1 -> Pid ! ?CLOSED_MSG(Sock, Method), - exit(normal); + reader_exit(State, E1); - {error, Reason} -> + {error, Reason} = E2 -> Pid ! ?ERROR_MSG(Sock, Method, Reason), - exit(Reason) + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := {select, _, Ref}, + select_num := N, + active := once, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false, + select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end end; %% Read and forward data continuously -reader_loop(#{active := true, +reader_loop(#{async := false, + active := true, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -439,25 +525,23 @@ reader_loop(#{active := true, {error, timeout} -> receive {?MODULE, stop} -> - exit(normal); + reader_exit(State, stop); {?MODULE, Pid, controlling_process, NewPid} -> - MRef = maps:get(ctrl_proc_mref, State), - erlang:demonitor(MRef, [flush]), - MRef = erlang:monitor(process, NewPid), + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), Pid ! {?MODULE, self(), controlling_process}, reader_loop(State#{ctrl_proc => NewPid, - ctrl_proc_mref => MRef}); + ctrl_proc_mref => NewMRef}); {?MODULE, active, NewActive} -> reader_loop(State#{active => NewActive}); {'DOWN', MRef, process, Pid, Reason} -> case maps:get(ctrl_proc_mref, State) of - MRef when (Reason =:= normal) -> - exit(normal); MRef -> - exit({controlling_process, Reason}); + reader_exit(State, {ctrl_exit, Reason}); _ -> reader_loop(State) end @@ -465,27 +549,170 @@ reader_loop(#{active := true, reader_loop(State) end; - {error, closed} -> + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := undefined, + active := true, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock) of + {ok, {select, _, _} = SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State); + + {error, closed} = E1 -> Pid ! ?CLOSED_MSG(Sock, Method), - exit(normal); + reader_exit(State, E1); - {error, Reason} -> + {error, Reason} = E2 -> Pid ! ?ERROR_MSG(Sock, Method, Reason), - exit(Reason) + reader_exit(State, E2) + end; +reader_loop(#{async := true, + select_info := {select, _, Ref}, + select_num := N, + active := true, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end end. -do_recv(plain, Sock) -> - socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); -do_recv(msg, Sock) -> - case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of +do_recv(Method, Sock) -> + do_recv(Method, Sock, ?READER_RECV_TIMEOUT). + +do_recv(plain, Sock, Timeout) -> + socket:recv(Sock, 0, Timeout); +do_recv(msg, Sock, Timeout) -> + case socket:recvmsg(Sock, 0, 0, [], Timeout) of {ok, #{iov := [Bin]}} -> {ok, Bin}; + {ok, {select, _, _}} = OK -> + OK; {error, _} = ERROR -> ERROR end. - - + + +reader_exit(#{async := false, active := Active}, stop) -> + vp("reader stopped when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, stop) -> + vp("reader stopped when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w", [Active]), + exit({controlling_process, Reason}); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit({controlling_process, Reason}); +reader_exit(#{async := false, active := Active}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w", [Active]), + exit(Reason); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w: " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(Reason). + + + + + %% ========================================================================== +vp(F, A) -> + vp(get(verbose), F, A). + +vp(true, F, A) -> + p(F, A); +vp(_, _, _) -> + ok. + +p(F, A) -> + io:format(F ++ "~n", A). + -- cgit v1.2.3 From d5a78e4fab8e0039f59bfd20368e5c05a19b8439 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 2 May 2019 15:07:41 +0200 Subject: [esock|test] Fixed async ttest Updated the ttest test(s) to handle async properly. OTP-15731 --- .../emulator/test/socket_test_ttest_tcp_socket.erl | 42 +++++++++++----------- 1 file changed, 21 insertions(+), 21 deletions(-) (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl') diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index 65fbba44c6..3aa3b2c504 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -399,9 +399,9 @@ reader_loop(#{active := false, end; %% Read *once* and then change to false -reader_loop(#{async := false, - active := once, - sock := Sock, +reader_loop(#{active := once, + async := false, + sock := Sock, method := Method, ctrl_proc := Pid} = State) -> case do_recv(Method, Sock) of @@ -443,14 +443,14 @@ reader_loop(#{async := false, Pid ! ?ERROR_MSG(Sock, Method, Reason), reader_exit(State, E2) end; -reader_loop(#{async := true, +reader_loop(#{active := once, + async := true, select_info := undefined, - active := once, - sock := Sock, + sock := Sock, method := Method, ctrl_proc := Pid} = State) -> case do_recv(Method, Sock, nowait) of - {ok, {select, _, _} = SelectInfo} -> + {select, SelectInfo} -> reader_loop(State#{select_info => SelectInfo}); {ok, Data} -> Pid ! ?DATA_MSG(Sock, Method, Data), @@ -464,11 +464,11 @@ reader_loop(#{async := true, Pid ! ?ERROR_MSG(Sock, Method, Reason), reader_exit(State, E2) end; -reader_loop(#{async := true, - select_info := {select, _, Ref}, +reader_loop(#{active := once, + async := true, + select_info := {select_info, _, Ref}, select_num := N, - active := once, - sock := Sock, + sock := Sock, method := Method, ctrl_proc := Pid} = State) -> receive @@ -513,8 +513,8 @@ reader_loop(#{async := true, end; %% Read and forward data continuously -reader_loop(#{async := false, - active := true, +reader_loop(#{active := true, + async := false, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -557,14 +557,14 @@ reader_loop(#{async := false, Pid ! ?ERROR_MSG(Sock, Method, Reason), reader_exit(State, E2) end; -reader_loop(#{async := true, +reader_loop(#{active := true, + async := true, select_info := undefined, - active := true, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> case do_recv(Method, Sock) of - {ok, {select, _, _} = SelectInfo} -> + {select, SelectInfo} -> reader_loop(State#{select_info => SelectInfo}); {ok, Data} -> Pid ! ?DATA_MSG(Sock, Method, Data), @@ -578,10 +578,10 @@ reader_loop(#{async := true, Pid ! ?ERROR_MSG(Sock, Method, Reason), reader_exit(State, E2) end; -reader_loop(#{async := true, - select_info := {select, _, Ref}, +reader_loop(#{active := true, + async := true, + select_info := {select_info, _, Ref}, select_num := N, - active := true, sock := Sock, method := Method, ctrl_proc := Pid} = State) -> @@ -635,8 +635,8 @@ do_recv(msg, Sock, Timeout) -> case socket:recvmsg(Sock, 0, 0, [], Timeout) of {ok, #{iov := [Bin]}} -> {ok, Bin}; - {ok, {select, _, _}} = OK -> - OK; + {select, _} = SELECT -> + SELECT; {error, _} = ERROR -> ERROR end. -- cgit v1.2.3