diff options
Diffstat (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl')
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_socket.erl | 724 |
1 files changed, 724 insertions, 0 deletions
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl new file mode 100644 index 0000000000..9112748b4c --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -0,0 +1,724 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2019. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_socket). + +-export([ + accept/1, accept/2, + active/2, + close/1, + connect/1, connect/2, connect/3, + controlling_process/2, + listen/0, listen/1, listen/2, + port/1, + peername/1, + recv/2, recv/3, + send/2, + shutdown/2, + sockname/1 + ]). + + +-define(LIB, socket_test_lib). + +-define(READER_RECV_TIMEOUT, 1000). + +-define(DATA_MSG(Sock, Method, Data), + {socket, + #{sock => Sock, reader => self(), method => Method}, + Data}). + +-define(CLOSED_MSG(Sock, Method), + {socket_closed, + #{sock => Sock, reader => self(), method => Method}}). + +-define(ERROR_MSG(Sock, Method, Reason), + {socket_error, + #{sock => Sock, reader => self(), method => Method}, + Reason}). + + +%% ========================================================================== + +%% This does not really work. Its just a placeholder for the time being... + +%% getopt(Sock, Opt) when is_atom(Opt) -> +%% socket:getopt(Sock, socket, Opt). + +%% setopt(Sock, Opt, Value) when is_atom(Opt) -> +%% socket:setopts(Sock, socket, Opt, Value). + + +%% ========================================================================== + +%% The way we use server async its no point in doing a async accept call +%% (we do never actually run the test with more than one client). +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}) -> + case socket:accept(LSock) of + {ok, Sock} -> + Self = self(), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; + {error, _} = ERROR -> + ERROR + end. + +%% If a timeout has been explictly specified, then we do not use +%% async here. We will pass it on to the reader process. +accept(#{sock := LSock, opts := #{async := Async, + method := Method} = Opts}, Timeout) -> + case socket:accept(LSock, Timeout) of + {ok, Sock} -> + Self = self(), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; + {error, _} = ERROR -> + ERROR + end. + + +active(#{reader := Pid}, NewActive) + when (is_boolean(NewActive) orelse (NewActive =:= once)) -> + Pid ! {?MODULE, active, NewActive}, + ok. + + +close(#{sock := Sock, reader := Pid}) -> + Pid ! {?MODULE, stop}, + Unlink = case socket:sockname(Sock) of + {ok, #{family := local, path := Path}} -> + fun() -> os:cmd("unlink " ++ Path), ok end; + _ -> + fun() -> ok end + end, + Res = socket:close(Sock), + Unlink(), + Res. + +%% Create a socket and connect it to a peer +connect(ServerPath) when is_list(ServerPath) -> + Domain = local, + ClientPath = mk_unique_path(), + LocalSA = #{family => Domain, + path => ClientPath}, + ServerSA = #{family => Domain, path => ServerPath}, + Opts = #{domain => Domain, + proto => default, + method => plain}, + Cleanup = fun() -> os:cmd("unlink " ++ ClientPath), ok end, + do_connect(LocalSA, ServerSA, Cleanup, Opts). + +connect(Addr, Port) when is_tuple(Addr) andalso is_integer(Port) -> + Domain = inet, + LocalSA = any, + ServerSA = #{family => Domain, + addr => Addr, + port => Port}, + Opts = #{domain => Domain, + proto => tcp, + method => plain}, + Cleanup = fun() -> ok end, + do_connect(LocalSA, ServerSA, Cleanup, Opts); +connect(ServerPath, + #{domain := local = Domain} = Opts) + when is_list(ServerPath) -> + ClientPath = mk_unique_path(), + LocalSA = #{family => Domain, + path => ClientPath}, + ServerSA = #{family => Domain, + path => ServerPath}, + Cleanup = fun() -> os:cmd("unlink " ++ ClientPath), ok end, + do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => default}). + +connect(Addr, Port, #{domain := Domain} = Opts) -> + LocalSA = any, + ServerSA = #{family => Domain, + addr => Addr, + port => Port}, + Cleanup = fun() -> ok end, + do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}). + +do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain, + proto := Proto, + async := Async, + method := Method} = Opts) -> + try + begin + Sock = + case socket:open(Domain, stream, Proto) of + {ok, S} -> + S; + {error, OReason} -> + throw({error, {open, OReason}}) + end, + case socket:bind(Sock, LocalSA) of + {ok, _} -> + ok; + {error, BReason} -> + (catch socket:close(Sock)), + Cleanup(), + throw({error, {bind, BReason}}) + end, + case socket:connect(Sock, ServerSA) of + ok -> + ok; + {error, CReason} -> + (catch socket:close(Sock)), + Cleanup(), + throw({error, {connect, CReason}}) + end, + Self = self(), + Reader = spawn(fun() -> + reader_init(Self, Sock, Async, false, Method) + end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}} + end + catch + throw:ERROR:_ -> + ERROR + end. + +mk_unique_path() -> + [NodeName | _] = string:tokens(atom_to_list(node()), [$@]), + ?LIB:f("/tmp/esock_~s_~w", [NodeName, erlang:system_time(nanosecond)]). + +maybe_start_stats_timer(#{stats_to := Pid, + stats_interval := T}, + Reader) when is_pid(Pid) -> + erlang:start_timer(T, Pid, {stats, T, "reader", Reader}); +maybe_start_stats_timer(_O, _) -> + ok. + +controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> + case socket:setopt(Sock, otp, controlling_process, NewPid) of + ok -> + Pid ! {?MODULE, self(), controlling_process, NewPid}, + receive + {?MODULE, Pid, controlling_process} -> + ok + end; + {error, _} = ERROR -> + ERROR + end. + + +%% Create a listen socket +listen() -> + listen(0). + +listen(Port) when is_integer(Port) -> + listen(Port, #{domain => inet, async => false, method => plain}); +listen(Path) when is_list(Path) -> + listen(Path, #{domain => local, async => false, method => plain}). + +listen(0, #{domain := local} = Opts) -> + listen(mk_unique_path(), Opts); +listen(Path, #{domain := local = Domain} = Opts) + when is_list(Path) andalso (Path =/= []) -> + SA = #{family => Domain, + path => Path}, + Cleanup = fun() -> os:cmd("unlink " ++ Path), ok end, + do_listen(SA, Cleanup, Opts#{proto => default}); +listen(Port, #{domain := Domain} = Opts) + when is_integer(Port) andalso (Port >= 0) -> + %% Bind fills in the rest + case ?LIB:which_local_host_info(Domain) of + {ok, {_, _, Addr}} -> + SA = #{family => Domain, + addr => Addr, + port => Port}, + Cleanup = fun() -> ok end, + do_listen(SA, Cleanup, Opts#{proto => tcp}); + {error, _} = ERROR -> + ERROR + end. + +do_listen(SA, + Cleanup, + #{domain := Domain, proto := Proto, + async := Async, method := Method} = Opts) + when (Method =:= plain) orelse (Method =:= msg) andalso + is_boolean(Async) -> + try + begin + Sock = case socket:open(Domain, stream, Proto) of + {ok, S} -> + S; + {error, OReason} -> + throw({error, {open, OReason}}) + end, + case socket:bind(Sock, SA) of + {ok, _} -> + ok; + {error, BReason} -> + (catch socket:close(Sock)), + Cleanup(), + throw({error, {bind, BReason}}) + end, + case socket:listen(Sock) of + ok -> + ok; + {error, LReason} -> + (catch socket:close(Sock)), + Cleanup(), + throw({error, {listen, LReason}}) + end, + {ok, #{sock => Sock, opts => Opts}} + end + catch + throw:{error, Reason}:_ -> + {error, Reason} + end. + + +port(#{sock := Sock}) -> + case socket:sockname(Sock) of + {ok, #{family := local, path := Path}} -> + {ok, Path}; + {ok, #{port := Port}} -> + {ok, Port}; + {error, _} = ERROR -> + ERROR + end. + + +peername(#{sock := Sock}) -> + case socket:peername(Sock) of + {ok, #{family := local, path := Path}} -> + {ok, Path}; + {ok, #{addr := Addr, port := Port}} -> + {ok, {Addr, Port}}; + {error, _} = ERROR -> + ERROR + end. + + +recv(#{sock := Sock, method := plain}, Length) -> + socket:recv(Sock, Length); +recv(#{sock := Sock, method := msg}, Length) -> + case socket:recvmsg(Sock, Length, 0, [], infinity) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. + +recv(#{sock := Sock, method := plain}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout); +recv(#{sock := Sock, method := msg}, Length, Timeout) -> + case socket:recvmsg(Sock, Length, 0, [], Timeout) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. + + +send(#{sock := Sock, method := plain}, Bin) -> + socket:send(Sock, Bin); +send(#{sock := Sock, method := msg}, Bin) -> + socket:sendmsg(Sock, #{iov => [Bin]}). + + +shutdown(#{sock := Sock}, How) -> + socket:shutdown(Sock, How). + + +sockname(#{sock := Sock}) -> + case socket:sockname(Sock) of + {ok, #{addr := Addr, port := Port}} -> + {ok, {Addr, Port}}; + {error, _} = ERROR -> + ERROR + end. + + +%% ========================================================================== + +reader_init(ControllingProcess, Sock, Async, Active, Method) + when is_pid(ControllingProcess) andalso + is_boolean(Async) andalso + (is_boolean(Active) orelse (Active =:= once)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> + put(verbose, false), + MRef = erlang:monitor(process, ControllingProcess), + reader_loop(#{ctrl_proc => ControllingProcess, + ctrl_proc_mref => MRef, + async => Async, + select_info => undefined, + select_num => 0, % Count the number of select messages + active => Active, + sock => Sock, + method => Method}). + + +%% Never read +reader_loop(#{active := false, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end + end; + +%% Read *once* and then change to false +reader_loop(#{active := once, + async := false, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock) of + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false}); + {error, timeout} -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end + after 0 -> + reader_loop(State) + end; + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{active := once, + async := true, + select_info := undefined, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock, nowait) of + {select, SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{active := once, + async := true, + select_info := {select_info, _, Ref}, + select_num := N, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{active => false, + select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end + end; + +%% Read and forward data continuously +reader_loop(#{active := true, + async := false, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock) of + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State); + {error, timeout} -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end + after 0 -> + reader_loop(State) + end; + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{active := true, + async := true, + select_info := undefined, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + case do_recv(Method, Sock) of + {select, SelectInfo} -> + reader_loop(State#{select_info => SelectInfo}); + {ok, Data} -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end; +reader_loop(#{active := true, + async := true, + select_info := {select_info, _, Ref}, + select_num := N, + sock := Sock, + method := Method, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + reader_exit(State, stop); + + {?MODULE, Pid, controlling_process, NewPid} -> + OldMRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(OldMRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef -> + reader_exit(State, {ctrl_exit, Reason}); + _ -> + reader_loop(State) + end; + + {'$socket', Sock, select, Ref} -> + case do_recv(Method, Sock, nowait) of + {ok, Data} when is_binary(Data) -> + Pid ! ?DATA_MSG(Sock, Method, Data), + reader_loop(State#{select_info => undefined, + select_num => N+1}); + + {error, closed} = E1 -> + Pid ! ?CLOSED_MSG(Sock, Method), + reader_exit(State, E1); + + {error, Reason} = E2 -> + Pid ! ?ERROR_MSG(Sock, Method, Reason), + reader_exit(State, E2) + end + end. + + +do_recv(Method, Sock) -> + do_recv(Method, Sock, ?READER_RECV_TIMEOUT). + +do_recv(plain, Sock, Timeout) -> + socket:recv(Sock, 0, Timeout); +do_recv(msg, Sock, Timeout) -> + case socket:recvmsg(Sock, 0, 0, [], Timeout) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {select, _} = SELECT -> + SELECT; + {error, _} = ERROR -> + ERROR + end. + + +reader_exit(#{async := false, active := Active}, stop) -> + vp("reader stopped when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, stop) -> + vp("reader stopped when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, normal}) -> + vp("reader ctrl exit when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w", [Active]), + exit({controlling_process, Reason}); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {ctrl_exit, Reason}) -> + vp("reader exit when ctrl crash when active: ~w" + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit({controlling_process, Reason}); +reader_exit(#{async := false, active := Active}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w", [Active]), + exit(normal); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, closed}) -> + vp("reader exit when socket closed when active: ~w " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(normal); +reader_exit(#{async := false, active := Active}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w", [Active]), + exit(Reason); +reader_exit(#{async := true, + active := Active, + select_info := SelectInfo, + select_num := N}, {error, Reason}) -> + vp("reader exit when socket error when active: ~w: " + "~n Current select info: ~p" + "~n Number of select messages: ~p", [Active, SelectInfo, N]), + exit(Reason). + + + + + + +%% ========================================================================== + +vp(F, A) -> + vp(get(verbose), F, A). + +vp(true, F, A) -> + p(F, A); +vp(_, _, _) -> + ok. + +p(F, A) -> + io:format(F ++ "~n", A). + |