aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/test/socket_test_ttest_tcp_socket.erl
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-12-07 18:40:24 +0100
committerMicael Karlberg <[email protected]>2018-12-07 18:43:00 +0100
commitd0c3f79d22b4778f66ac1d8a2fc03e736f42e973 (patch)
treefee37591fb85039583e58858ade959d0de360142 /erts/emulator/test/socket_test_ttest_tcp_socket.erl
parentc5378517cccf29d1708c71a0949664605743b478 (diff)
downloadotp-d0c3f79d22b4778f66ac1d8a2fc03e736f42e973.tar.gz
otp-d0c3f79d22b4778f66ac1d8a2fc03e736f42e973.tar.bz2
otp-d0c3f79d22b4778f66ac1d8a2fc03e736f42e973.zip
[socket-nif|test] ttest improvements
Added a ttest lib module for some common functions. Added a process (server handler and reader processes) stats printouts. So far only used by the server. There is still a "leak". Its a term leak. Some of the functions take a ref as argument (recv, send and accept for instance). This is stored internally, by way of a call to the enif_make_copy, in order to be used later in a select call. Its not "released" though, until the environment is released, which happens when the socket dtor callback function is called. Possible solution: We need to keep "temporary" environments (one for each of the queues), which we can clear (basically we need two, one that is currently used for new ref's and one for the old ref's). OTP-14831
Diffstat (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl')
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_socket.erl129
1 files changed, 95 insertions, 34 deletions
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
index 12d9e052d7..f314d01a63 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
@@ -24,9 +24,9 @@
accept/1, accept/2,
active/2,
close/1,
- connect/2,
+ connect/2, connect/3,
controlling_process/2,
- listen/0, listen/1,
+ listen/0, listen/1, listen/2,
port/1,
peername/1,
recv/2, recv/3,
@@ -38,25 +38,41 @@
-define(READER_RECV_TIMEOUT, 1000).
+-define(DATA_MSG(Sock, Method, Data),
+ {socket,
+ #{sock => Sock, reader => self(), method => Method},
+ Data}).
+
+-define(CLOSED_MSG(Sock, Method),
+ {socket_closed,
+ #{sock => Sock, reader => self(), method => Method}}).
+
+-define(ERROR_MSG(Sock, Method, Reason),
+ {socket_error,
+ #{sock => Sock, reader => self(), method => Method},
+ Reason}).
+
%% ==========================================================================
-accept(#{sock := LSock}) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}) ->
case socket:accept(LSock) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
-accept(#{sock := LSock}, Timeout) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) ->
case socket:accept(LSock, Timeout) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
@@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) ->
%% Create a socket and connect it to a peer
connect(Addr, Port) ->
+ connect(Addr, Port, #{method => plain}).
+
+connect(Addr, Port, #{method := Method} = Opts) ->
try
begin
Sock =
@@ -100,9 +119,10 @@ connect(Addr, Port) ->
(catch socket:close(Sock)),
throw({error, {connect, CReason}})
end,
- Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}}
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}}
end
catch
throw:ERROR:_ ->
@@ -110,6 +130,16 @@ connect(Addr, Port) ->
end.
+maybe_start_stats_timer(#{stats_to := Pid,
+ stats_interval := T},
+ Reader) when is_pid(Pid) ->
+ erlang:start_timer(T, Pid, {stats, T, "reader", Reader});
+maybe_start_stats_timer(O, _) ->
+ io:format("NO STATS: "
+ "~n ~p"
+ "~n", [O]),
+ ok.
+
controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
case socket:setopt(Sock, otp, controlling_process, NewPid) of
ok ->
@@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
%% Create a listen socket
listen() ->
- listen(0).
+ listen(0, #{method => plain}).
-listen(Port) when is_integer(Port) andalso (Port >= 0) ->
+listen(Port) ->
+ listen(Port, #{method => plain}).
+listen(Port, #{method := Method} = Opts)
+ when (is_integer(Port) andalso (Port >= 0)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
try
begin
Sock = case socket:open(inet, stream, tcp) of
@@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) ->
(catch socket:close(Sock)),
throw({error, {listen, LReason}})
end,
- {ok, #{sock => Sock}}
+ {ok, #{sock => Sock, opts => Opts}}
end
catch
throw:{error, Reason}:_ ->
@@ -178,14 +212,31 @@ peername(#{sock := Sock}) ->
end.
-recv(#{sock := Sock}, Length) ->
- socket:recv(Sock, Length).
-recv(#{sock := Sock}, Length, Timeout) ->
- socket:recv(Sock, Length, Timeout).
+recv(#{sock := Sock, method := plain}, Length) ->
+ socket:recv(Sock, Length);
+recv(#{sock := Sock, method := msg}, Length) ->
+ case socket:recvmsg(Sock, Length, 0, [], infinity) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+recv(#{sock := Sock, method := plain}, Length, Timeout) ->
+ socket:recv(Sock, Length, Timeout);
+recv(#{sock := Sock, method := msg}, Length, Timeout) ->
+ case socket:recvmsg(Sock, Length, 0, [], Timeout) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
-send(#{sock := Sock}, Length) ->
- socket:send(Sock, Length).
+send(#{sock := Sock, method := plain}, Bin) ->
+ socket:send(Sock, Bin);
+send(#{sock := Sock, method := msg}, Bin) ->
+ socket:sendmsg(Sock, #{iov => [Bin]}).
shutdown(#{sock := Sock}, How) ->
@@ -203,14 +254,16 @@ sockname(#{sock := Sock}) ->
%% ==========================================================================
-reader_init(ControllingProcess, Sock, Active)
+reader_init(ControllingProcess, Sock, Active, Method)
when is_pid(ControllingProcess) andalso
- (is_boolean(Active) orelse (Active =:= once)) ->
+ (is_boolean(Active) orelse (Active =:= once)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
MRef = erlang:monitor(process, ControllingProcess),
reader_loop(#{ctrl_proc => ControllingProcess,
ctrl_proc_mref => MRef,
active => Active,
- sock => Sock}).
+ sock => Sock,
+ method => Method}).
%% Never read
@@ -245,10 +298,11 @@ reader_loop(#{active := false,
%% Read *once* and then change to false
reader_loop(#{active := once,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State#{active => false});
{error, timeout} ->
receive
@@ -280,21 +334,22 @@ reader_loop(#{active := once,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end;
%% Read and forward data continuously
reader_loop(#{active := true,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State);
{error, timeout} ->
receive
@@ -326,20 +381,26 @@ reader_loop(#{active := true,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end.
-
+do_recv(plain, Sock) ->
+ socket:recv(Sock, 0, ?READER_RECV_TIMEOUT);
+do_recv(msg, Sock) ->
+ case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
%% ==========================================================================
-
-