aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/test/socket_test_ttest_tcp_socket.erl
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/test/socket_test_ttest_tcp_socket.erl')
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_socket.erl129
1 files changed, 95 insertions, 34 deletions
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
index 12d9e052d7..f314d01a63 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
@@ -24,9 +24,9 @@
accept/1, accept/2,
active/2,
close/1,
- connect/2,
+ connect/2, connect/3,
controlling_process/2,
- listen/0, listen/1,
+ listen/0, listen/1, listen/2,
port/1,
peername/1,
recv/2, recv/3,
@@ -38,25 +38,41 @@
-define(READER_RECV_TIMEOUT, 1000).
+-define(DATA_MSG(Sock, Method, Data),
+ {socket,
+ #{sock => Sock, reader => self(), method => Method},
+ Data}).
+
+-define(CLOSED_MSG(Sock, Method),
+ {socket_closed,
+ #{sock => Sock, reader => self(), method => Method}}).
+
+-define(ERROR_MSG(Sock, Method, Reason),
+ {socket_error,
+ #{sock => Sock, reader => self(), method => Method},
+ Reason}).
+
%% ==========================================================================
-accept(#{sock := LSock}) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}) ->
case socket:accept(LSock) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
-accept(#{sock := LSock}, Timeout) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) ->
case socket:accept(LSock, Timeout) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
@@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) ->
%% Create a socket and connect it to a peer
connect(Addr, Port) ->
+ connect(Addr, Port, #{method => plain}).
+
+connect(Addr, Port, #{method := Method} = Opts) ->
try
begin
Sock =
@@ -100,9 +119,10 @@ connect(Addr, Port) ->
(catch socket:close(Sock)),
throw({error, {connect, CReason}})
end,
- Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}}
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}}
end
catch
throw:ERROR:_ ->
@@ -110,6 +130,16 @@ connect(Addr, Port) ->
end.
+maybe_start_stats_timer(#{stats_to := Pid,
+ stats_interval := T},
+ Reader) when is_pid(Pid) ->
+ erlang:start_timer(T, Pid, {stats, T, "reader", Reader});
+maybe_start_stats_timer(O, _) ->
+ io:format("NO STATS: "
+ "~n ~p"
+ "~n", [O]),
+ ok.
+
controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
case socket:setopt(Sock, otp, controlling_process, NewPid) of
ok ->
@@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
%% Create a listen socket
listen() ->
- listen(0).
+ listen(0, #{method => plain}).
-listen(Port) when is_integer(Port) andalso (Port >= 0) ->
+listen(Port) ->
+ listen(Port, #{method => plain}).
+listen(Port, #{method := Method} = Opts)
+ when (is_integer(Port) andalso (Port >= 0)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
try
begin
Sock = case socket:open(inet, stream, tcp) of
@@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) ->
(catch socket:close(Sock)),
throw({error, {listen, LReason}})
end,
- {ok, #{sock => Sock}}
+ {ok, #{sock => Sock, opts => Opts}}
end
catch
throw:{error, Reason}:_ ->
@@ -178,14 +212,31 @@ peername(#{sock := Sock}) ->
end.
-recv(#{sock := Sock}, Length) ->
- socket:recv(Sock, Length).
-recv(#{sock := Sock}, Length, Timeout) ->
- socket:recv(Sock, Length, Timeout).
+recv(#{sock := Sock, method := plain}, Length) ->
+ socket:recv(Sock, Length);
+recv(#{sock := Sock, method := msg}, Length) ->
+ case socket:recvmsg(Sock, Length, 0, [], infinity) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+recv(#{sock := Sock, method := plain}, Length, Timeout) ->
+ socket:recv(Sock, Length, Timeout);
+recv(#{sock := Sock, method := msg}, Length, Timeout) ->
+ case socket:recvmsg(Sock, Length, 0, [], Timeout) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
-send(#{sock := Sock}, Length) ->
- socket:send(Sock, Length).
+send(#{sock := Sock, method := plain}, Bin) ->
+ socket:send(Sock, Bin);
+send(#{sock := Sock, method := msg}, Bin) ->
+ socket:sendmsg(Sock, #{iov => [Bin]}).
shutdown(#{sock := Sock}, How) ->
@@ -203,14 +254,16 @@ sockname(#{sock := Sock}) ->
%% ==========================================================================
-reader_init(ControllingProcess, Sock, Active)
+reader_init(ControllingProcess, Sock, Active, Method)
when is_pid(ControllingProcess) andalso
- (is_boolean(Active) orelse (Active =:= once)) ->
+ (is_boolean(Active) orelse (Active =:= once)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
MRef = erlang:monitor(process, ControllingProcess),
reader_loop(#{ctrl_proc => ControllingProcess,
ctrl_proc_mref => MRef,
active => Active,
- sock => Sock}).
+ sock => Sock,
+ method => Method}).
%% Never read
@@ -245,10 +298,11 @@ reader_loop(#{active := false,
%% Read *once* and then change to false
reader_loop(#{active := once,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State#{active => false});
{error, timeout} ->
receive
@@ -280,21 +334,22 @@ reader_loop(#{active := once,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end;
%% Read and forward data continuously
reader_loop(#{active := true,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State);
{error, timeout} ->
receive
@@ -326,20 +381,26 @@ reader_loop(#{active := true,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end.
-
+do_recv(plain, Sock) ->
+ socket:recv(Sock, 0, ?READER_RECV_TIMEOUT);
+do_recv(msg, Sock) ->
+ case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
%% ==========================================================================
-
-