diff options
Diffstat (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl')
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_socket.erl | 129 |
1 files changed, 95 insertions, 34 deletions
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index 12d9e052d7..f314d01a63 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -24,9 +24,9 @@ accept/1, accept/2, active/2, close/1, - connect/2, + connect/2, connect/3, controlling_process/2, - listen/0, listen/1, + listen/0, listen/1, listen/2, port/1, peername/1, recv/2, recv/3, @@ -38,25 +38,41 @@ -define(READER_RECV_TIMEOUT, 1000). +-define(DATA_MSG(Sock, Method, Data), + {socket, + #{sock => Sock, reader => self(), method => Method}, + Data}). + +-define(CLOSED_MSG(Sock, Method), + {socket_closed, + #{sock => Sock, reader => self(), method => Method}}). + +-define(ERROR_MSG(Sock, Method, Reason), + {socket_error, + #{sock => Sock, reader => self(), method => Method}, + Reason}). + %% ========================================================================== -accept(#{sock := LSock}) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> case socket:accept(LSock) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock}, Timeout) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. @@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) -> %% Create a socket and connect it to a peer connect(Addr, Port) -> + connect(Addr, Port, #{method => plain}). + +connect(Addr, Port, #{method := Method} = Opts) -> try begin Sock = @@ -100,9 +119,10 @@ connect(Addr, Port) -> (catch socket:close(Sock)), throw({error, {connect, CReason}}) end, - Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}} + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}} end catch throw:ERROR:_ -> @@ -110,6 +130,16 @@ connect(Addr, Port) -> end. +maybe_start_stats_timer(#{stats_to := Pid, + stats_interval := T}, + Reader) when is_pid(Pid) -> + erlang:start_timer(T, Pid, {stats, T, "reader", Reader}); +maybe_start_stats_timer(O, _) -> + io:format("NO STATS: " + "~n ~p" + "~n", [O]), + ok. + controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> case socket:setopt(Sock, otp, controlling_process, NewPid) of ok -> @@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> %% Create a listen socket listen() -> - listen(0). + listen(0, #{method => plain}). -listen(Port) when is_integer(Port) andalso (Port >= 0) -> +listen(Port) -> + listen(Port, #{method => plain}). +listen(Port, #{method := Method} = Opts) + when (is_integer(Port) andalso (Port >= 0)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> try begin Sock = case socket:open(inet, stream, tcp) of @@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) -> (catch socket:close(Sock)), throw({error, {listen, LReason}}) end, - {ok, #{sock => Sock}} + {ok, #{sock => Sock, opts => Opts}} end catch throw:{error, Reason}:_ -> @@ -178,14 +212,31 @@ peername(#{sock := Sock}) -> end. -recv(#{sock := Sock}, Length) -> - socket:recv(Sock, Length). -recv(#{sock := Sock}, Length, Timeout) -> - socket:recv(Sock, Length, Timeout). +recv(#{sock := Sock, method := plain}, Length) -> + socket:recv(Sock, Length); +recv(#{sock := Sock, method := msg}, Length) -> + case socket:recvmsg(Sock, Length, 0, [], infinity) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. + +recv(#{sock := Sock, method := plain}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout); +recv(#{sock := Sock, method := msg}, Length, Timeout) -> + case socket:recvmsg(Sock, Length, 0, [], Timeout) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. -send(#{sock := Sock}, Length) -> - socket:send(Sock, Length). +send(#{sock := Sock, method := plain}, Bin) -> + socket:send(Sock, Bin); +send(#{sock := Sock, method := msg}, Bin) -> + socket:sendmsg(Sock, #{iov => [Bin]}). shutdown(#{sock := Sock}, How) -> @@ -203,14 +254,16 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active) +reader_init(ControllingProcess, Sock, Active, Method) when is_pid(ControllingProcess) andalso - (is_boolean(Active) orelse (Active =:= once)) -> + (is_boolean(Active) orelse (Active =:= once)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, active => Active, - sock => Sock}). + sock => Sock, + method => Method}). %% Never read @@ -245,10 +298,11 @@ reader_loop(#{active := false, %% Read *once* and then change to false reader_loop(#{active := once, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State#{active => false}); {error, timeout} -> receive @@ -280,21 +334,22 @@ reader_loop(#{active := once, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end; %% Read and forward data continuously reader_loop(#{active := true, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State); {error, timeout} -> receive @@ -326,20 +381,26 @@ reader_loop(#{active := true, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end. - +do_recv(plain, Sock) -> + socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); +do_recv(msg, Sock) -> + case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. %% ========================================================================== - - |