From 94d8e2f1bf9508656f5b9b2c2c644128a9bdfb57 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Wed, 5 Dec 2018 10:51:19 +0100 Subject: [socket-nif|test] Added the proper time-test code --- .../emulator/test/socket_test_ttest_tcp_socket.erl | 345 +++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 erts/emulator/test/socket_test_ttest_tcp_socket.erl (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl') diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl new file mode 100644 index 0000000000..12d9e052d7 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -0,0 +1,345 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_tcp_socket). + +-export([ + accept/1, accept/2, + active/2, + close/1, + connect/2, + controlling_process/2, + listen/0, listen/1, + port/1, + peername/1, + recv/2, recv/3, + send/2, + shutdown/2, + sockname/1 + ]). + + +-define(READER_RECV_TIMEOUT, 1000). + + +%% ========================================================================== + +accept(#{sock := LSock}) -> + case socket:accept(LSock) of + {ok, Sock} -> + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false) end), + {ok, #{sock => Sock, reader => Reader}}; + {error, _} = ERROR -> + ERROR + end. + +accept(#{sock := LSock}, Timeout) -> + case socket:accept(LSock, Timeout) of + {ok, Sock} -> + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false) end), + {ok, #{sock => Sock, reader => Reader}}; + {error, _} = ERROR -> + ERROR + end. + + +active(#{reader := Pid}, NewActive) + when (is_boolean(NewActive) orelse (NewActive =:= once)) -> + Pid ! {?MODULE, active, NewActive}, + ok. + + +close(#{sock := Sock, reader := Pid}) -> + Pid ! {?MODULE, stop}, + socket:close(Sock). + +%% Create a socket and connect it to a peer +connect(Addr, Port) -> + try + begin + Sock = + case socket:open(inet, stream, tcp) of + {ok, S} -> + S; + {error, OReason} -> + throw({error, {open, OReason}}) + end, + case socket:bind(Sock, any) of + {ok, _} -> + ok; + {error, BReason} -> + (catch socket:close(Sock)), + throw({error, {bind, BReason}}) + end, + SA = #{family => inet, + addr => Addr, + port => Port}, + case socket:connect(Sock, SA) of + ok -> + ok; + {error, CReason} -> + (catch socket:close(Sock)), + throw({error, {connect, CReason}}) + end, + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false) end), + {ok, #{sock => Sock, reader => Reader}} + end + catch + throw:ERROR:_ -> + ERROR + end. + + +controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> + case socket:setopt(Sock, otp, controlling_process, NewPid) of + ok -> + Pid ! {?MODULE, self(), controlling_process, NewPid}, + receive + {?MODULE, Pid, controlling_process} -> + ok + end; + {error, _} = ERROR -> + ERROR + end. + + +%% Create a listen socket +listen() -> + listen(0). + +listen(Port) when is_integer(Port) andalso (Port >= 0) -> + try + begin + Sock = case socket:open(inet, stream, tcp) of + {ok, S} -> + S; + {error, OReason} -> + throw({error, {open, OReason}}) + end, + SA = #{family => inet, + port => Port}, + case socket:bind(Sock, SA) of + {ok, _} -> + ok; + {error, BReason} -> + (catch socket:close(Sock)), + throw({error, {bind, BReason}}) + end, + case socket:listen(Sock) of + ok -> + ok; + {error, LReason} -> + (catch socket:close(Sock)), + throw({error, {listen, LReason}}) + end, + {ok, #{sock => Sock}} + end + catch + throw:{error, Reason}:_ -> + {error, Reason} + end. + + +port(#{sock := Sock}) -> + case socket:sockname(Sock) of + {ok, #{port := Port}} -> + {ok, Port}; + {error, _} = ERROR -> + ERROR + end. + + +peername(#{sock := Sock}) -> + case socket:peername(Sock) of + {ok, #{addr := Addr, port := Port}} -> + {ok, {Addr, Port}}; + {error, _} = ERROR -> + ERROR + end. + + +recv(#{sock := Sock}, Length) -> + socket:recv(Sock, Length). +recv(#{sock := Sock}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout). + + +send(#{sock := Sock}, Length) -> + socket:send(Sock, Length). + + +shutdown(#{sock := Sock}, How) -> + socket:shutdown(Sock, How). + + +sockname(#{sock := Sock}) -> + case socket:sockname(Sock) of + {ok, #{addr := Addr, port := Port}} -> + {ok, {Addr, Port}}; + {error, _} = ERROR -> + ERROR + end. + + +%% ========================================================================== + +reader_init(ControllingProcess, Sock, Active) + when is_pid(ControllingProcess) andalso + (is_boolean(Active) orelse (Active =:= once)) -> + MRef = erlang:monitor(process, ControllingProcess), + reader_loop(#{ctrl_proc => ControllingProcess, + ctrl_proc_mref => MRef, + active => Active, + sock => Sock}). + + +%% Never read +reader_loop(#{active := false, + ctrl_proc := Pid} = State) -> + receive + {?MODULE, stop} -> + exit(normal); + + {?MODULE, Pid, controlling_process, NewPid} -> + MRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(MRef, [flush]), + NewMRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => NewMRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef when (Reason =:= normal) -> + exit(normal); + MRef -> + exit({controlling_process, Reason}); + _ -> + reader_loop(State) + end + end; + +%% Read *once* and then change to false +reader_loop(#{active := once, + sock := Sock, + ctrl_proc := Pid} = State) -> + case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + {ok, Data} -> + Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + reader_loop(State#{active => false}); + {error, timeout} -> + receive + {?MODULE, stop} -> + exit(normal); + + {?MODULE, Pid, controlling_process, NewPid} -> + MRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(MRef, [flush]), + MRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => MRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef when (Reason =:= normal) -> + exit(normal); + MRef -> + exit({controlling_process, Reason}); + _ -> + reader_loop(State) + end + after 0 -> + reader_loop(State) + end; + + {error, closed} -> + Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + exit(normal); + + {error, Reason} -> + Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + exit(Reason) + end; + +%% Read and forward data continuously +reader_loop(#{active := true, + sock := Sock, + ctrl_proc := Pid} = State) -> + case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + {ok, Data} -> + Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + reader_loop(State); + {error, timeout} -> + receive + {?MODULE, stop} -> + exit(normal); + + {?MODULE, Pid, controlling_process, NewPid} -> + MRef = maps:get(ctrl_proc_mref, State), + erlang:demonitor(MRef, [flush]), + MRef = erlang:monitor(process, NewPid), + Pid ! {?MODULE, self(), controlling_process}, + reader_loop(State#{ctrl_proc => NewPid, + ctrl_proc_mref => MRef}); + + {?MODULE, active, NewActive} -> + reader_loop(State#{active => NewActive}); + + {'DOWN', MRef, process, Pid, Reason} -> + case maps:get(ctrl_proc_mref, State) of + MRef when (Reason =:= normal) -> + exit(normal); + MRef -> + exit({controlling_process, Reason}); + _ -> + reader_loop(State) + end + after 0 -> + reader_loop(State) + end; + + {error, closed} -> + Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + exit(normal); + + {error, Reason} -> + Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + exit(Reason) + end. + + + + + + +%% ========================================================================== + + + -- cgit v1.2.3 From d0c3f79d22b4778f66ac1d8a2fc03e736f42e973 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 7 Dec 2018 18:40:24 +0100 Subject: [socket-nif|test] ttest improvements Added a ttest lib module for some common functions. Added a process (server handler and reader processes) stats printouts. So far only used by the server. There is still a "leak". Its a term leak. Some of the functions take a ref as argument (recv, send and accept for instance). This is stored internally, by way of a call to the enif_make_copy, in order to be used later in a select call. Its not "released" though, until the environment is released, which happens when the socket dtor callback function is called. Possible solution: We need to keep "temporary" environments (one for each of the queues), which we can clear (basically we need two, one that is currently used for new ref's and one for the old ref's). OTP-14831 --- .../emulator/test/socket_test_ttest_tcp_socket.erl | 129 +++++++++++++++------ 1 file changed, 95 insertions(+), 34 deletions(-) (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl') diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index 12d9e052d7..f314d01a63 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -24,9 +24,9 @@ accept/1, accept/2, active/2, close/1, - connect/2, + connect/2, connect/3, controlling_process/2, - listen/0, listen/1, + listen/0, listen/1, listen/2, port/1, peername/1, recv/2, recv/3, @@ -38,25 +38,41 @@ -define(READER_RECV_TIMEOUT, 1000). +-define(DATA_MSG(Sock, Method, Data), + {socket, + #{sock => Sock, reader => self(), method => Method}, + Data}). + +-define(CLOSED_MSG(Sock, Method), + {socket_closed, + #{sock => Sock, reader => self(), method => Method}}). + +-define(ERROR_MSG(Sock, Method, Reason), + {socket_error, + #{sock => Sock, reader => self(), method => Method}, + Reason}). + %% ========================================================================== -accept(#{sock := LSock}) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> case socket:accept(LSock) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock}, Timeout) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. @@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) -> %% Create a socket and connect it to a peer connect(Addr, Port) -> + connect(Addr, Port, #{method => plain}). + +connect(Addr, Port, #{method := Method} = Opts) -> try begin Sock = @@ -100,9 +119,10 @@ connect(Addr, Port) -> (catch socket:close(Sock)), throw({error, {connect, CReason}}) end, - Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}} + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}} end catch throw:ERROR:_ -> @@ -110,6 +130,16 @@ connect(Addr, Port) -> end. +maybe_start_stats_timer(#{stats_to := Pid, + stats_interval := T}, + Reader) when is_pid(Pid) -> + erlang:start_timer(T, Pid, {stats, T, "reader", Reader}); +maybe_start_stats_timer(O, _) -> + io:format("NO STATS: " + "~n ~p" + "~n", [O]), + ok. + controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> case socket:setopt(Sock, otp, controlling_process, NewPid) of ok -> @@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> %% Create a listen socket listen() -> - listen(0). + listen(0, #{method => plain}). -listen(Port) when is_integer(Port) andalso (Port >= 0) -> +listen(Port) -> + listen(Port, #{method => plain}). +listen(Port, #{method := Method} = Opts) + when (is_integer(Port) andalso (Port >= 0)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> try begin Sock = case socket:open(inet, stream, tcp) of @@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) -> (catch socket:close(Sock)), throw({error, {listen, LReason}}) end, - {ok, #{sock => Sock}} + {ok, #{sock => Sock, opts => Opts}} end catch throw:{error, Reason}:_ -> @@ -178,14 +212,31 @@ peername(#{sock := Sock}) -> end. -recv(#{sock := Sock}, Length) -> - socket:recv(Sock, Length). -recv(#{sock := Sock}, Length, Timeout) -> - socket:recv(Sock, Length, Timeout). +recv(#{sock := Sock, method := plain}, Length) -> + socket:recv(Sock, Length); +recv(#{sock := Sock, method := msg}, Length) -> + case socket:recvmsg(Sock, Length, 0, [], infinity) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. + +recv(#{sock := Sock, method := plain}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout); +recv(#{sock := Sock, method := msg}, Length, Timeout) -> + case socket:recvmsg(Sock, Length, 0, [], Timeout) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. -send(#{sock := Sock}, Length) -> - socket:send(Sock, Length). +send(#{sock := Sock, method := plain}, Bin) -> + socket:send(Sock, Bin); +send(#{sock := Sock, method := msg}, Bin) -> + socket:sendmsg(Sock, #{iov => [Bin]}). shutdown(#{sock := Sock}, How) -> @@ -203,14 +254,16 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active) +reader_init(ControllingProcess, Sock, Active, Method) when is_pid(ControllingProcess) andalso - (is_boolean(Active) orelse (Active =:= once)) -> + (is_boolean(Active) orelse (Active =:= once)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, active => Active, - sock => Sock}). + sock => Sock, + method => Method}). %% Never read @@ -245,10 +298,11 @@ reader_loop(#{active := false, %% Read *once* and then change to false reader_loop(#{active := once, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State#{active => false}); {error, timeout} -> receive @@ -280,21 +334,22 @@ reader_loop(#{active := once, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end; %% Read and forward data continuously reader_loop(#{active := true, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State); {error, timeout} -> receive @@ -326,20 +381,26 @@ reader_loop(#{active := true, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end. - +do_recv(plain, Sock) -> + socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); +do_recv(msg, Sock) -> + case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. %% ========================================================================== - - -- cgit v1.2.3 From 6781913e975e93a4a29d14e14794aae4526de9f7 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 14 Dec 2018 19:14:43 +0100 Subject: [socket-nif|test] Add test case based on the ttest modules Added a (first) test case based on the ttest modules. OTP-14831 --- erts/emulator/test/socket_test_ttest_tcp_socket.erl | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl') diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index f314d01a63..9af25c5f50 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -134,10 +134,7 @@ maybe_start_stats_timer(#{stats_to := Pid, stats_interval := T}, Reader) when is_pid(Pid) -> erlang:start_timer(T, Pid, {stats, T, "reader", Reader}); -maybe_start_stats_timer(O, _) -> - io:format("NO STATS: " - "~n ~p" - "~n", [O]), +maybe_start_stats_timer(_O, _) -> ok. controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> -- cgit v1.2.3 From 2dceec3057ff298cf6a7ff6af5a5a1b5f8b96e61 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Tue, 18 Dec 2018 15:21:43 +0100 Subject: [socket-nif|test] Add more ttest (gen_tcp) test cases Added a number of ttest test cases with transport gen_tcp. Server with transport = gen_tcp, active = false and client using transport = gen_tcp (and active = false, once and true). Also "fixed" the gen_tcp socket buffer size (default size was way to small for the "large" messages). OTP-14831 --- erts/emulator/test/socket_test_ttest_tcp_socket.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl') diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index 9af25c5f50..0ae2412e4c 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -53,6 +53,17 @@ Reason}). +%% ========================================================================== + +%% This does not really work. Its just a placeholder for the time beeing... + +%% getopt(Sock, Opt) when is_atom(Opt) -> +%% socket:getopt(Sock, socket, Opt). + +%% setopt(Sock, Opt, Value) when is_atom(Opt) -> +%% socket:setopts(Sock, socket, Opt, Value). + + %% ========================================================================== accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> -- cgit v1.2.3