aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-12-07 18:40:24 +0100
committerMicael Karlberg <[email protected]>2018-12-07 18:43:00 +0100
commitd0c3f79d22b4778f66ac1d8a2fc03e736f42e973 (patch)
treefee37591fb85039583e58858ade959d0de360142
parentc5378517cccf29d1708c71a0949664605743b478 (diff)
downloadotp-d0c3f79d22b4778f66ac1d8a2fc03e736f42e973.tar.gz
otp-d0c3f79d22b4778f66ac1d8a2fc03e736f42e973.tar.bz2
otp-d0c3f79d22b4778f66ac1d8a2fc03e736f42e973.zip
[socket-nif|test] ttest improvements
Added a ttest lib module for some common functions. Added a process (server handler and reader processes) stats printouts. So far only used by the server. There is still a "leak". Its a term leak. Some of the functions take a ref as argument (recv, send and accept for instance). This is stored internally, by way of a call to the enif_make_copy, in order to be used later in a select call. Its not "released" though, until the environment is released, which happens when the socket dtor callback function is called. Possible solution: We need to keep "temporary" environments (one for each of the queues), which we can clear (basically we need two, one that is currently used for new ref's and one for the old ref's). OTP-14831
-rw-r--r--erts/emulator/test/Makefile1
-rw-r--r--erts/emulator/test/socket_test_ttest_lib.erl125
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client.erl261
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client_socket.erl15
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server.erl391
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server_socket.erl9
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_socket.erl129
7 files changed, 508 insertions, 423 deletions
diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile
index a910588381..09bfe6f104 100644
--- a/erts/emulator/test/Makefile
+++ b/erts/emulator/test/Makefile
@@ -34,6 +34,7 @@ SOCKET_MODULES = \
socket_client \
socket_test_lib \
socket_test_evaluator \
+ socket_test_ttest_lib \
socket_test_ttest_tcp_gen \
socket_test_ttest_tcp_socket \
socket_test_ttest_tcp_client \
diff --git a/erts/emulator/test/socket_test_ttest_lib.erl b/erts/emulator/test/socket_test_ttest_lib.erl
new file mode 100644
index 0000000000..71679bc6d6
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_lib.erl
@@ -0,0 +1,125 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_lib).
+
+-compile({no_auto_import, [error/2]}).
+
+-export([
+ t/0, tdiff/2,
+ formated_timestamp/0, format_timestamp/1,
+ format_time/1,
+
+ formated_process_stats/1, formated_process_stats/2,
+
+ format/2,
+ error/1, error/2,
+ info/1, info/2
+ ]).
+
+%% ==========================================================================
+
+t() ->
+ os:timestamp().
+
+tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+ T1 = A1*1000000000+B1*1000+(C1 div 1000),
+ T2 = A2*1000000000+B2*1000+(C2 div 1000),
+ T2 - T1.
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp({_N1, _N2, N3} = TS) ->
+ {_Date, Time} = calendar:now_to_local_time(TS),
+ {Hour,Min,Sec} = Time,
+ FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+ [Hour, Min, Sec, round(N3/1000)]),
+ lists:flatten(FormatTS).
+
+%% Time is always in number os ms (milli seconds)
+%% At some point, we should convert this to a more readable format...
+format_time(T) ->
+ format("~p", [T]).
+
+
+formated_process_stats(Pid) ->
+ formated_process_stats("", Pid).
+
+formated_process_stats(Prefix, Pid) when is_list(Prefix) andalso is_pid(Pid) ->
+ try
+ begin
+ TotHeapSz = pi(Pid, total_heap_size),
+ HeapSz = pi(Pid, heap_size),
+ StackSz = pi(Pid, stack_size),
+ Reds = pi(Pid, reductions),
+ GCInfo = pi(Pid, garbage_collection),
+ MinBinVHeapSz = proplists:get_value(min_bin_vheap_size, GCInfo),
+ MinHeapSz = proplists:get_value(min_heap_size, GCInfo),
+ MinGCS = proplists:get_value(minor_gcs, GCInfo),
+ format("~n ~sTotal Heap Size: ~p"
+ "~n ~sHeap Size: ~p"
+ "~n ~sStack Size: ~p"
+ "~n ~sReductions: ~p"
+ "~n ~s[GC] Min Bin VHeap Size: ~p"
+ "~n ~s[GC] Min Heap Size: ~p"
+ "~n ~s[GC] Minor GCS: ~p",
+ [Prefix, TotHeapSz,
+ Prefix, HeapSz,
+ Prefix, StackSz,
+ Prefix, Reds,
+ Prefix, MinBinVHeapSz,
+ Prefix, MinHeapSz,
+ Prefix, MinGCS])
+ end
+ catch
+ _:_:_ ->
+ ""
+ end.
+
+
+pi(Pid, Item) ->
+ {Item, Info} = process_info(Pid, Item),
+ Info.
+
+
+
+%% ==========================================================================
+
+format(F, A) ->
+ lists:flatten(io_lib:format(F, A)).
+
+error(F) ->
+ error(F, []).
+
+error(F, A) ->
+ print(get(sname), "<ERROR> " ++ F, A).
+
+info(F) ->
+ info(F, []).
+
+info(F, A) ->
+ print(get(sname), "<INFO> " ++ F, A).
+
+print(undefined, F, A) ->
+ print("- ", F, A);
+print(Prefix, F, A) ->
+ io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl
index 1bd1bc54e9..9c43c41841 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_client.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl
@@ -54,6 +54,14 @@
-define(MAX_OUTSTANDING_DEFAULT_2, 10).
-define(MAX_OUTSTANDING_DEFAULT_3, 3).
+-define(LIB, socket_test_ttest_lib).
+-define(I(F), ?LIB:info(F)).
+-define(I(F,A), ?LIB:info(F, A)).
+-define(E(F,A), ?LIB:error(F, A)).
+-define(F(F,A), ?LIB:format(F, A)).
+-define(FORMAT_TIME(T), ?LIB:format_time(T)).
+-define(T(), ?LIB:t()).
+-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)).
-type active() :: once | boolean().
-type msg_id() :: 1..3.
@@ -63,42 +71,28 @@
%% ==========================================================================
--spec start_monitor(Mod, Active, Addr, Port) -> term() when
- Mod :: atom(),
- Active :: active(),
- Addr :: inet:ip_address(),
- Port :: inet:port_number().
+start_monitor(Transport, Active, Addr, Port) ->
+ start_monitor(Transport, Active, Addr, Port, ?MSG_ID_DEFAULT).
%% RunTime is in number of ms.
-start_monitor(Mod, Active, Addr, Port) ->
- start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT).
-
--spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when
- Mod :: atom(),
- Active :: active(),
- Addr :: inet:ip_address(),
- Port :: inet:port_number(),
- MsgID :: msg_id().
-
-%% RunTime is in number of ms.
-start_monitor(Mod, Active, Addr, Port, 1 = MsgID) ->
- start_monitor(Mod, Active, Addr, Port, MsgID,
+start_monitor(Transport, Active, Addr, Port, 1 = MsgID) ->
+ start_monitor(Transport, Active, Addr, Port, MsgID,
?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT);
-start_monitor(Mod, Active, Addr, Port, 2 = MsgID) ->
- start_monitor(Mod, Active, Addr, Port, MsgID,
+start_monitor(Transport, Active, Addr, Port, 2 = MsgID) ->
+ start_monitor(Transport, Active, Addr, Port, MsgID,
?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT);
-start_monitor(Mod, Active, Addr, Port, 3 = MsgID) ->
- start_monitor(Mod, Active, Addr, Port, MsgID,
+start_monitor(Transport, Active, Addr, Port, 3 = MsgID) ->
+ start_monitor(Transport, Active, Addr, Port, MsgID,
?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT).
--spec start_monitor(Mod,
+-spec start_monitor(Transport,
Active,
Addr,
Port,
MsgID,
MaxOutstanding,
RunTime) -> term() when
- Mod :: atom(),
+ Transport :: atom() | tuple(),
Active :: active(),
Addr :: inet:ip_address(),
Port :: inet:port_number(),
@@ -107,9 +101,9 @@ start_monitor(Mod, Active, Addr, Port, 3 = MsgID) ->
RunTime :: runtime().
%% RunTime is in number of ms.
-start_monitor(Mod, Active, Addr, Port,
+start_monitor(Transport, Active, Addr, Port,
MsgID, MaxOutstanding, RunTime)
- when is_atom(Mod) andalso
+ when (is_atom(Transport) orelse is_tuple(Transport)) andalso
(is_boolean(Active) orelse (Active =:= once)) andalso
is_tuple(Addr) andalso
(is_integer(Port) andalso (Port > 0)) andalso
@@ -119,7 +113,7 @@ start_monitor(Mod, Active, Addr, Port,
Self = self(),
ClientInit = fun() -> put(sname, "client"),
init(Self,
- Mod, Active, Addr, Port,
+ Transport, Active, Addr, Port,
MsgID, MaxOutstanding, RunTime)
end,
{Pid, MRef} = spawn_monitor(ClientInit),
@@ -147,28 +141,29 @@ stop(Pid) when is_pid(Pid) ->
%% ==========================================================================
-init(Parent, Mod, Active, Addr, Port,
+init(Parent, Transport, Active, Addr, Port,
MsgID, MaxOutstanding, RunTime) ->
- i("init -> entry with"
- "~n Parent: ~p"
- "~n Mod: ~p"
- "~n Active: ~p"
- "~n Addr: ~s"
- "~n Port: ~p"
- "~n Msg ID: ~p (=> 16 + ~w bytes)"
- "~n Max Outstanding: ~p"
- "~n (Suggested) Run Time: ~p ms",
- [Parent,
- Mod, Active, inet:ntoa(Addr), Port,
- MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]),
- case Mod:connect(Addr, Port) of
+ ?I("init with"
+ "~n Parent: ~p"
+ "~n Transport: ~p"
+ "~n Active: ~p"
+ "~n Addr: ~s"
+ "~n Port: ~p"
+ "~n Msg ID: ~p (=> 16 + ~w bytes)"
+ "~n Max Outstanding: ~p"
+ "~n (Suggested) Run Time: ~p ms",
+ [Parent,
+ Transport, Active, inet:ntoa(Addr), Port,
+ MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]),
+ {Mod, Connect} = process_transport(Transport),
+ case Connect(Addr, Port) of
{ok, Sock} ->
- i("init -> connected"),
+ ?I("connected"),
Parent ! {?MODULE, self(), ok},
initial_activation(Mod, Sock, Active),
Results = loop(#{slogan => run,
runtime => RunTime,
- start => t(),
+ start => ?T(),
parent => Parent,
mod => Mod,
sock => Sock,
@@ -187,10 +182,16 @@ init(Parent, Mod, Active, Addr, Port,
(catch Mod:close(Sock)),
exit(normal);
{error, Reason} ->
- i("init -> connect failed: ~p", [Reason]),
+ ?E("connect failed: ~p", [Reason]),
exit({connect, Reason})
end.
+process_transport(Mod) when is_atom(Mod) ->
+ {Mod, fun(A, P) -> Mod:connect(A, P) end};
+process_transport({Mod, Opts}) ->
+ {Mod, fun(A, P) -> Mod:connect(A, P, Opts) end}.
+
+
which_msg_data(1) -> ?MSG_DATA1;
which_msg_data(2) -> ?MSG_DATA2;
which_msg_data(3) -> ?MSG_DATA3.
@@ -200,21 +201,21 @@ present_results(#{status := ok,
runtime := RunTime,
bcnt := ByteCnt,
cnt := NumIterations}) ->
- i("Results: "
- "~n Run Time: ~s"
- "~n ByteCnt: ~s"
- "~n NumIterations: ~s",
- [format_time(RunTime),
- if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) ->
- f("~w, ~w", [ByteCnt, RunTime]);
+ ?I("Results: "
+ "~n Run Time: ~s"
+ "~n ByteCnt: ~s"
+ "~n NumIterations: ~s",
+ [?FORMAT_TIME(RunTime),
+ if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) ->
+ ?F("~w, ~w", [ByteCnt, RunTime]);
true ->
- f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime])
+ ?F("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime])
end,
if (RunTime =:= 0) ->
"-";
true ->
- f("~p => ~p iterations / ms",
- [NumIterations, NumIterations div RunTime])
+ ?F("~p => ~p iterations / ms",
+ [NumIterations, NumIterations div RunTime])
end]),
ok;
present_results(#{status := Failure,
@@ -225,21 +226,21 @@ present_results(#{status := Failure,
rcnt := RCnt,
bcnt := BCnt,
num := Num}) ->
- i("Time Test failed: "
- "~n ~p"
- "~n"
- "~nwhen"
- "~n"
- "~n Run Time: ~s"
- "~n Send ID: ~p"
- "~n Recv ID: ~p"
- "~n Send Count: ~p"
- "~n Recv Count: ~p"
- "~n Byte Count: ~p"
- "~n Num Iterations: ~p",
- [Failure,
- format_time(RunTime),
- SID, RID, SCnt, RCnt, BCnt, Num]).
+ ?I("Time Test failed: "
+ "~n ~p"
+ "~n"
+ "~nwhen"
+ "~n"
+ "~n Run Time: ~s"
+ "~n Send ID: ~p"
+ "~n Recv ID: ~p"
+ "~n Send Count: ~p"
+ "~n Recv Count: ~p"
+ "~n Byte Count: ~p"
+ "~n Num Iterations: ~p",
+ [Failure,
+ ?FORMAT_TIME(RunTime),
+ SID, RID, SCnt, RCnt, BCnt, Num]).
@@ -255,17 +256,14 @@ do_loop(State) ->
do_loop( handle_message( msg_exchange(State) ) ).
msg_exchange(#{rcnt := Num, num := Num} = State) ->
- %% i("we are done"),
finish(ok, State);
msg_exchange(#{scnt := Num, num := Num} = State) ->
%% We are done sending more requests - now we will just await
%% the replies for the (still) outstanding replies.
- %% i("we have sent all requests - (only) wait for replies"),
msg_exchange( recv_reply(State) );
msg_exchange(#{outstanding := Outstanding,
max_outstanding := MaxOutstanding} = State)
when (Outstanding < MaxOutstanding) ->
- %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]),
msg_exchange( send_request(State) );
msg_exchange(State) ->
send_request( recv_reply(State) ).
@@ -273,9 +271,9 @@ msg_exchange(State) ->
finish(ok,
#{start := Start, bcnt := BCnt, num := Num}) ->
- Stop = t(),
+ Stop = ?T(),
throw(#{status => ok,
- runtime => tdiff(Start, Stop),
+ runtime => ?TDIFF(Start, Stop),
bcnt => BCnt,
cnt => Num});
finish(Reason,
@@ -283,9 +281,9 @@ finish(Reason,
sid := SID, rid := RID,
scnt := SCnt, rcnt := RCnt, bcnt := BCnt,
num := Num}) ->
- Stop = t(),
+ Stop = ?T(),
throw(#{status => Reason,
- runtime => tdiff(Start, Stop),
+ runtime => ?TDIFF(Start, Stop),
sid => SID,
rid => RID,
scnt => SCnt,
@@ -301,11 +299,6 @@ send_request(#{mod := Mod,
max_outstanding := MaxOutstanding,
msg_data := Data} = State)
when (MaxOutstanding > Outstanding) ->
- %% i("send request -> entry when"
- %% "~n ID: ~p"
- %% "~n Cnt: ~p"
- %% "~n Outstanding: ~p"
- %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]),
SZ = size(Data),
Req = <<?TTEST_TAG:32,
?TTEST_TYPE_REQUEST:32,
@@ -314,12 +307,11 @@ send_request(#{mod := Mod,
Data/binary>>,
case Mod:send(Sock, Req) of
ok ->
- %% i("~w bytes sent", [size(Req)]),
State#{sid => next_id(ID),
scnt => Cnt + 1,
outstanding => Outstanding + 1};
{error, Reason} ->
- e("Failed sending request: ~p", [Reason]),
+ ?E("Failed sending request: ~p", [Reason]),
exit({send, Reason})
end;
send_request(State) ->
@@ -334,11 +326,6 @@ recv_reply(#{mod := Mod,
bcnt := BCnt,
rcnt := Cnt,
outstanding := Outstanding} = State) ->
- %% i("recv-reply(false) -> entry with"
- %% "~n (R)ID: ~p"
- %% "~n (R)Cnt: ~p"
- %% "~n BCnt: ~p"
- %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]),
case recv_reply_message1(Mod, Sock, ID) of
{ok, MsgSz} ->
State#{rid => next_id(ID),
@@ -347,7 +334,7 @@ recv_reply(#{mod := Mod,
outstanding => Outstanding - 1};
{error, timeout} ->
- i("recv_reply(false) -> error: timeout"),
+ ?I("receive timeout"),
State;
{error, Reason} ->
@@ -362,11 +349,6 @@ recv_reply(#{mod := Mod,
rcnt := RCnt,
outstanding := Outstanding,
acc := Acc} = State) ->
- %% i("recv-reply(~w) -> entry with"
- %% "~n (R)ID: ~p"
- %% "~n RCnt: ~p"
- %% "~n BCnt: ~p"
- %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]),
case recv_reply_message2(Mod, Sock, ID, Acc) of
{ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) ->
maybe_activate(Mod, Sock, Active),
@@ -380,12 +362,12 @@ recv_reply(#{mod := Mod,
State;
{error, stop} ->
- i("recv_reply(~w) -> stop", [Active]),
+ ?I("receive [~w] -> stop", [Active]),
%% This will have the effect that no more requests are sent...
- State#{num => SCnt, stop_started => t()};
+ State#{num => SCnt, stop_started => ?T()};
{error, timeout} ->
- i("recv_reply(~w) -> error: timeout", [Active]),
+ ?I("receive[~w] -> timeout", [Active]),
State;
{error, Reason} ->
@@ -395,23 +377,19 @@ recv_reply(#{mod := Mod,
%% This function reads exactly one (reply) message. No more no less.
recv_reply_message1(Mod, Sock, ID) ->
- %% i("recv_reply_message1 -> entry with"
- %% "~n ID: ~w", [ID]),
case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
{ok, <<?TTEST_TAG:32,
?TTEST_TYPE_REPLY:32,
ID:32,
SZ:32>> = Hdr} ->
%% Receive the ping-pong reply boby
- %% i("recv_reply_message1 -> try read body"
- %% "~n ID: ~w", [ID]),
case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
{ok, Data} when (size(Data) =:= SZ) ->
{ok, size(Hdr) + size(Data)};
{error, Reason2} ->
- i("recv_reply_message1 -> body error: "
- "~n ~p: ~p", [Reason2]),
- {error, {recv_hdr, Reason2}}
+ ?E("Failed reading body: "
+ "~n ~p: ~p", [Reason2]),
+ {error, {recv_body, Reason2}}
end;
{ok, <<BadTag:32,
@@ -427,7 +405,7 @@ recv_reply_message1(Mod, Sock, ID) ->
{error, invalid_hdr};
{error, Reason1} ->
- i("recv_reply_message1 -> hdr error: "
+ ?E("Feiled reading header: "
"~n ~p", [Reason1]),
{error, {recv_hdr, Reason1}}
end.
@@ -437,8 +415,6 @@ recv_reply_message1(Mod, Sock, ID) ->
%% accumulated. If that is not enough for a (complete) reply, it
%% will attempt to receive more.
recv_reply_message2(Mod, Sock, ID, Acc) ->
- %% i("recv_reply_message2 -> entry with"
- %% "~n ID: ~w", [ID]),
case process_acc_data(ID, Acc) of
ok ->
%% No or insufficient data, so get more
@@ -456,7 +432,6 @@ recv_reply_message2(Mod, Sock, ID, Acc) ->
recv_reply_message3(_Mod, Sock, ID, Acc) ->
receive
{timeout, _TRef, stop} ->
- %% i("stop - when messages: ~p", [process_info(self(), messages)]),
{error, stop};
{TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
@@ -469,7 +444,6 @@ recv_reply_message3(_Mod, Sock, ID, Acc) ->
{Tag, Sock, Msg} when (Tag =:= tcp) orelse
(Tag =:= socket) ->
- %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]),
process_acc_data(ID, <<Acc/binary, Msg/binary>>)
%% after ?RECV_TIMEOUT ->
@@ -482,9 +456,6 @@ process_acc_data(ID, <<?TTEST_TAG:32,
ID:32,
SZ:32,
Data/binary>>) when (SZ =< size(Data)) ->
- %% i("process_acc_data -> entry with"
- %% "~n ID: ~w"
- %% "~n SZ: ~w", [ID, SZ]),
<<_Body:SZ/binary, Rest/binary>> = Data,
{ok, {4*4+SZ, Rest}};
process_acc_data(ID, <<BadTag:32,
@@ -508,9 +479,9 @@ process_acc_data(_ID, _Data) ->
handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) ->
receive
{timeout, _TRef, stop} ->
- i("stop"),
+ ?I("STOP"),
%% This will have the effect that no more requests are sent...
- State#{num => SCnt, stop_started => t()};
+ State#{num => SCnt, stop_started => ?T()};
{?MODULE, Ref, Parent, stop} ->
%% This *aborts* the test
@@ -571,47 +542,47 @@ next_id(_) ->
%% ==========================================================================
-t() ->
- os:timestamp().
+%% t() ->
+%% os:timestamp().
-tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
- T1 = A1*1000000000+B1*1000+(C1 div 1000),
- T2 = A2*1000000000+B2*1000+(C2 div 1000),
- T2 - T1.
+%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+%% T1 = A1*1000000000+B1*1000+(C1 div 1000),
+%% T2 = A2*1000000000+B2*1000+(C2 div 1000),
+%% T2 - T1.
-formated_timestamp() ->
- format_timestamp(os:timestamp()).
+%% formated_timestamp() ->
+%% format_timestamp(os:timestamp()).
-format_timestamp({_N1, _N2, N3} = TS) ->
- {_Date, Time} = calendar:now_to_local_time(TS),
- {Hour,Min,Sec} = Time,
- FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
- [Hour, Min, Sec, round(N3/1000)]),
- lists:flatten(FormatTS).
+%% format_timestamp({_N1, _N2, N3} = TS) ->
+%% {_Date, Time} = calendar:now_to_local_time(TS),
+%% {Hour,Min,Sec} = Time,
+%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+%% [Hour, Min, Sec, round(N3/1000)]),
+%% lists:flatten(FormatTS).
-%% Time is always in number os ms (milli seconds)
-format_time(T) ->
- f("~p", [T]).
+%% %% Time is always in number os ms (milli seconds)
+%% format_time(T) ->
+%% f("~p", [T]).
%% ==========================================================================
-f(F, A) ->
- lists:flatten(io_lib:format(F, A)).
+%% f(F, A) ->
+%% lists:flatten(io_lib:format(F, A)).
-%% e(F) ->
-%% i("<ERROR> " ++ F).
+%% %% e(F) ->
+%% %% i("<ERROR> " ++ F).
-e(F, A) ->
- p(get(sname), "<ERROR> " ++ F, A).
+%% e(F, A) ->
+%% p(get(sname), "<ERROR> " ++ F, A).
-i(F) ->
- i(F, []).
+%% i(F) ->
+%% i(F, []).
-i(F, A) ->
- p(get(sname), "<INFO> " ++ F, A).
+%% i(F, A) ->
+%% p(get(sname), "<INFO> " ++ F, A).
-p(undefined, F, A) ->
- p("- ", F, A);
-p(Prefix, F, A) ->
- io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+%% p(undefined, F, A) ->
+%% p("- ", F, A);
+%% p(Prefix, F, A) ->
+%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
index 0d2ec38876..ef980e8d32 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
@@ -21,23 +21,24 @@
-module(socket_test_ttest_tcp_client_socket).
-export([
- start_monitor/3, start_monitor/4, start_monitor/6,
+ start_monitor/4, start_monitor/5, start_monitor/7,
stop/1
]).
-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
+-define(MOD(M), {?TRANSPORT_MOD, #{method => Method}}).
-start_monitor(Active, Addr, Port) ->
- socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+start_monitor(Method, Active, Addr, Port) ->
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Method),
Active, Addr, Port).
-start_monitor(Active, Addr, Port, MsgID) ->
- socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+start_monitor(Method, Active, Addr, Port, MsgID) ->
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Method),
Active, Addr, Port,
MsgID).
-start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) ->
- socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+start_monitor(Method, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) ->
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Method),
Active, Addr, Port,
MsgID, MaxOutstanding, RunTime).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl
index cb503a1feb..b248b063a3 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_server.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl
@@ -44,13 +44,25 @@
-define(ACC_TIMEOUT, 10000).
-define(RECV_TIMEOUT, 10000).
+-define(LIB, socket_test_ttest_lib).
+-define(I(F), ?LIB:info(F)).
+-define(I(F,A), ?LIB:info(F, A)).
+-define(E(F,A), ?LIB:error(F, A)).
+-define(F(F,A), ?LIB:format(F, A)).
+-define(FORMAT_TIME(T), ?LIB:format_time(T)).
+-define(T(), ?LIB:t()).
+-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)).
+
%% ==========================================================================
-start_monitor(Mod, Active)
- when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) ->
+start_monitor(Transport, Active)
+ when (is_atom(Transport) orelse is_tuple(Transport)) andalso
+ (is_boolean(Active) orelse (Active =:= once)) ->
Self = self(),
- ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end,
+ ServerInit = fun() -> put(sname, "server"),
+ server_init(Self, Transport, Active)
+ end,
{Pid, MRef} = spawn_monitor(ServerInit),
receive
{'DOWN', MRef, process, Pid, normal} ->
@@ -73,26 +85,28 @@ stop(Pid) when is_pid(Pid) ->
%% ==========================================================================
-server_init(Parent, Mod, Active) ->
- i("init -> entry with"
- "~n Parent: ~p"
- "~n Mod: ~p"
- "~n Active: ~p", [Parent, Mod, Active]),
- case Mod:listen(0) of
+server_init(Parent, Transport, Active) ->
+ ?I("init -> entry with"
+ "~n Parent: ~p"
+ "~n Transport: ~p"
+ "~n Active: ~p", [Parent, Transport, Active]),
+ {Mod, Listen, StatsInterval} = process_transport(Transport, Active),
+ case Listen(0) of
{ok, LSock} ->
case Mod:port(LSock) of
{ok, Port} = OK ->
Addr = which_addr(), % This is just for convenience
- i("init -> listening on:"
- "~n Addr: ~p (~s)"
- "~n Port: ~w"
- "~n", [Addr, inet:ntoa(Addr), Port]),
+ ?I("init -> listening on:"
+ "~n Addr: ~p (~s)"
+ "~n Port: ~w"
+ "~n", [Addr, inet:ntoa(Addr), Port]),
Parent ! {?MODULE, self(), OK},
- server_loop(#{parent => Parent,
- mod => Mod,
- active => Active,
- lsock => LSock,
- handlers => [],
+ server_loop(#{parent => Parent,
+ mod => Mod,
+ active => Active,
+ lsock => LSock,
+ handlers => [],
+ stats_interval => StatsInterval,
%% Accumulation
runtime => 0,
mcnt => 0,
@@ -107,37 +121,40 @@ server_init(Parent, Mod, Active) ->
exit({listen, LReason})
end.
+process_transport(Mod, _) when is_atom(Mod) ->
+ {Mod, fun(Port) -> Mod:listen(Port) end, infinity};
+process_transport({Mod, #{stats_interval := T} = Opts}, Active)
+ when (Active =/= false) ->
+ {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T};
+process_transport({Mod, #{stats_interval := T} = Opts}, _Active) ->
+ {Mod, fun(Port) -> Mod:listen(Port, Opts) end, T};
+process_transport({Mod, Opts}, _Active) ->
+ {Mod, fun(Port) -> Mod:listen(Port, Opts) end, infinity}.
+
+
server_loop(State) ->
- %% i("loop -> enter"),
server_loop( server_handle_message( server_accept(State) ) ).
server_accept(#{mod := Mod,
active := Active,
lsock := LSock,
handlers := Handlers} = State) ->
-
- %% i("server-accept -> entry with"
- %% "~n Mod: ~p"
- %% "~n Active: ~p"
- %% "~n LSock: ~p", [Mod, Active, LSock]),
-
case Mod:accept(LSock, ?ACC_TIMEOUT) of
{ok, Sock} ->
- %% i("server-accept -> accepted: "
- %% "~n Sock: ~p", [Sock]),
- i("accepted connection from ~s",
- [case Mod:peername(Sock) of
- {ok, Peer} ->
- format_peername(Peer);
- {error, _} ->
- "-"
- end]),
+ ?I("accepted connection from ~s",
+ [case Mod:peername(Sock) of
+ {ok, Peer} ->
+ format_peername(Peer);
+ {error, _} ->
+ "-"
+ end]),
{Pid, _} = handler_start(),
- i("handler ~p started -> try transfer socket control", [Pid]),
+ ?I("handler ~p started -> try transfer socket control", [Pid]),
case Mod:controlling_process(Sock, Pid) of
ok ->
- i("server-accept: handler ~p started", [Pid]),
+ maybe_start_stats_timer(State, Pid),
+ ?I("server-accept: handler ~p started", [Pid]),
handler_continue(Pid, Mod, Sock, Active),
Handlers2 = [Pid | Handlers],
State#{handlers => Handlers2};
@@ -156,14 +173,31 @@ server_accept(#{mod := Mod,
format_peername({Addr, Port}) ->
case inet:gethostbyaddr(Addr) of
{ok, #hostent{h_name = N}} ->
- f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
+ ?F("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
{error, _} ->
- f("~p, ~p", [Addr, Port])
+ ?F("~p, ~p", [Addr, Port])
end.
-
+
+maybe_start_stats_timer(#{active := Active, stats_interval := Time}, Handler)
+ when (Active =/= false) andalso (is_integer(Time) andalso (Time > 0)) ->
+ start_stats_timer(Time, "handler", Handler);
+maybe_start_stats_timer(_, _) ->
+ ok.
+
+start_stats_timer(Time, ProcStr, Pid) ->
+ erlang:start_timer(Time, self(), {stats, Time, ProcStr, Pid}).
+
server_handle_message(#{parent := Parent, handlers := H} = State) ->
- %% i("server_handle_message -> enter"),
receive
+ {timeout, _TRef, {stats, Interval, ProcStr, Pid}} ->
+ case server_handle_stats(ProcStr, Pid) of
+ ok ->
+ start_stats_timer(Interval, ProcStr, Pid);
+ skip ->
+ ok
+ end,
+ State;
+
{?MODULE, Ref, Parent, stop} ->
reply(Parent, Ref, ok),
lists:foreach(fun(P) -> handler_stop(P) end, H),
@@ -176,11 +210,20 @@ server_handle_message(#{parent := Parent, handlers := H} = State) ->
State
end.
+server_handle_stats(ProcStr, Pid) ->
+ case ?LIB:formated_process_stats(Pid) of
+ "" ->
+ skip;
+ FormatedStats ->
+ ?I("Statistics for ~s ~p:~s", [ProcStr, Pid, FormatedStats]),
+ ok
+ end.
+
server_handle_down(Pid, Reason, #{handlers := Handlers} = State) ->
case lists:delete(Pid, Handlers) of
Handlers ->
- i("unknown process ~p died", [Pid]),
+ ?I("unknown process ~p died", [Pid]),
State;
Handlers2 ->
server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2})
@@ -197,35 +240,35 @@ server_handle_handler_down(Pid,
AccMCnt2 = AccMCnt + MCnt,
AccBCnt2 = AccBCnt + BCnt,
AccHCnt2 = AccHCnt + 1,
- i("handler ~p (~w) done => accumulated results: "
- "~n Run Time: ~s ms"
- "~n Message Count: ~s"
- "~n Byte Count: ~s",
- [Pid, AccHCnt2,
- format_time(AccRunTime2),
- if (AccRunTime2 > 0) ->
- f("~w => ~w (~w) msgs / ms",
- [AccMCnt2,
- AccMCnt2 div AccRunTime2,
- (AccMCnt2 div AccHCnt2) div AccRunTime2]);
- true ->
- f("~w", [AccMCnt2])
- end,
- if (AccRunTime2 > 0) ->
- f("~w => ~w (~w) bytes / ms",
- [AccBCnt2,
- AccBCnt2 div AccRunTime2,
- (AccBCnt2 div AccHCnt2) div AccRunTime2]);
- true ->
- f("~w", [AccBCnt2])
- end]),
+ ?I("handler ~p (~w) done => accumulated results: "
+ "~n Run Time: ~s ms"
+ "~n Message Count: ~s"
+ "~n Byte Count: ~s",
+ [Pid, AccHCnt2,
+ ?FORMAT_TIME(AccRunTime2),
+ if (AccRunTime2 > 0) ->
+ ?F("~w => ~w (~w) msgs / ms",
+ [AccMCnt2,
+ AccMCnt2 div AccRunTime2,
+ (AccMCnt2 div AccHCnt2) div AccRunTime2]);
+ true ->
+ ?F("~w", [AccMCnt2])
+ end,
+ if (AccRunTime2 > 0) ->
+ ?F("~w => ~w (~w) bytes / ms",
+ [AccBCnt2,
+ AccBCnt2 div AccRunTime2,
+ (AccBCnt2 div AccHCnt2) div AccRunTime2]);
+ true ->
+ ?F("~w", [AccBCnt2])
+ end]),
State#{runtime => AccRunTime2,
mcnt => AccMCnt2,
bcnt => AccBCnt2,
hcnt => AccHCnt2};
server_handle_handler_down(Pid, Reason, State) ->
- i("handler ~p terminated: "
- "~n ~p", [Pid, Reason]),
+ ?I("handler ~p terminated: "
+ "~n ~p", [Pid, Reason]),
State.
@@ -244,32 +287,31 @@ handler_stop(Pid) ->
req(Pid, stop).
handler_init(Parent) ->
- i("starting"),
+ ?I("starting"),
receive
{?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} ->
- i("received continue"),
+ ?I("received continue"),
reply(Parent, Ref, ok),
handler_initial_activation(Mod, Sock, Active),
handler_loop(#{parent => Parent,
mod => Mod,
sock => Sock,
active => Active,
- start => t(),
+ start => ?T(),
mcnt => 0,
bcnt => 0,
last_reply => none,
acc => <<>>})
after 5000 ->
- i("timeout when message queue: "
- "~n ~p"
- "~nwhen"
- "~n Parent: ~p", [process_info(self(), messages), Parent]),
+ ?I("timeout when message queue: "
+ "~n ~p"
+ "~nwhen"
+ "~n Parent: ~p", [process_info(self(), messages), Parent]),
handler_init(Parent)
end.
handler_loop(State) ->
- %% i("handler-loop"),
handler_loop( handler_handle_message( handler_recv_message(State) ) ).
%% When passive, we read *one* request and then attempt to reply to it.
@@ -278,7 +320,6 @@ handler_recv_message(#{mod := Mod,
active := false,
mcnt := MCnt,
bcnt := BCnt} = State) ->
- %% i("handler_recv_message(false) -> entry"),
case handler_recv_message2(Mod, Sock) of
{ok, {MsgSz, ID, Body}} ->
handler_send_reply(Mod, Sock, ID, Body),
@@ -305,7 +346,6 @@ handler_recv_message(#{mod := Mod,
bcnt := BCnt,
last_reply := LID,
acc := Acc} = State) ->
- %% i("handler_recv_message(~w) -> entry", [Active]),
case handler_recv_message3(Mod, Sock, Acc, LID) of
{ok, {MCnt2, BCnt2, LID2}, NewAcc} ->
handler_maybe_activate(Mod, Sock, Active),
@@ -319,12 +359,12 @@ handler_recv_message(#{mod := Mod,
(size(Acc) =:= 0) ->
handler_done(State);
true ->
- e("client done with partial message: "
- "~n Last Reply Sent: ~w"
- "~n Message Count: ~w"
- "~n Byte Count: ~w"
- "~n Partial Message: ~w bytes",
- [LID, MCnt, BCnt, size(Acc)]),
+ ?E("client done with partial message: "
+ "~n Last Reply Sent: ~w"
+ "~n Message Count: ~w"
+ "~n Byte Count: ~w"
+ "~n Partial Message: ~w bytes",
+ [LID, MCnt, BCnt, size(Acc)]),
exit({closed_with_partial_message, LID})
end;
@@ -355,32 +395,27 @@ handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) ->
handler_recv_message2(Mod, Sock) ->
- %% i("handler_recv_message2 -> entry"),
case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
{ok, <<?TTEST_TAG:32,
?TTEST_TYPE_REQUEST:32,
ID:32,
SZ:32>> = Hdr} ->
- %% i("handler_recv_message2 -> got request header: "
- %% "~n ID: ~p"
- %% "~n SZ: ~p", [ID, SZ]),
case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
{ok, Body} when (SZ =:= size(Body)) ->
- %% i("handler_recv_message2 -> got body"),
{ok, {size(Hdr) + size(Body), ID, Body}};
{error, BReason} ->
- e("failed reading body (~w) of message ~w:"
- "~n ~p", [SZ, ID, BReason]),
+ ?E("failed reading body (~w) of message ~w:"
+ "~n ~p", [SZ, ID, BReason]),
exit({recv, body, ID, SZ, BReason})
end;
{error, timeout} = ERROR ->
- i("handler_recv_message2 -> timeout"),
+ ?I("timeout"),
ERROR;
{error, closed} = ERROR ->
ERROR;
{error, HReason} ->
- e("failed reading header of message:"
- "~n ~p", [HReason]),
+ ?E("Failed reading header of message:"
+ "~n ~p", [HReason]),
exit({recv, header, HReason})
end.
@@ -405,116 +440,6 @@ handler_recv_message3(Mod, Sock, Acc, LID) ->
-%% %% Socket in passive mode => we need to read explicitly
-%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) ->
-%% %% i("try recv msg header"),
-%% MsgSz =
-%% case gen_tcp:recv(Sock, 4*4) of
-%% {ok, <<?SOCKET_SIMPLE_TAG:32,
-%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
-%% ID:32,
-%% SZ:32>> = Hdr} ->
-%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]),
-%% case gen_tcp:recv(Sock, SZ) of
-%% {ok, Data} when (size(Data) =:= SZ) ->
-%% handler_send_reply(Sock, ID, Data),
-%% size(Hdr)+SZ;
-%% {ok, InvData} ->
-%% i("invalid data"),
-%% (catch gen_tcp:close(Sock)),
-%% exit({invalid_data, SZ, size(InvData)});
-%% {error, Reason2} ->
-%% i("Data read failed: ~p", [Reason2]),
-%% (catch gen_tcp:close(Sock)),
-%% exit({recv_data, Reason2})
-%% end;
-%% {ok, _InvHdr} ->
-%% (catch gen_tcp:close(Sock)),
-%% exit(invalid_hdr);
-%% {error, closed} ->
-%% i("we are done: "
-%% "~n Message Count: ~p"
-%% "~n Byte Count: ~p", [MCnt, BCnt]),
-%% exit(normal);
-%% {error, Reason1} ->
-%% i("Header read failed: ~p", [Reason1]),
-%% (catch gen_tcp:close(Sock)),
-%% exit({recv_hdr, Reason1})
-%% end,
-%% handler_loop(State, MCnt+1, BCnt+MsgSz);
-
-%% %% Socket in active mode (once | true) => data messages arrive
-%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) ->
-%% %% i("await msg"),
-%% try handler_recv_request(Sock, Active) of
-%% {ID, Data, MsgSz} ->
-%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
-%% handler_send_reply(Sock, ID, Data),
-%% handler_maybe_activate(Sock, Active),
-%% handler_loop(State, MCnt+1, BCnt+MsgSz)
-%% catch
-%% throw:tcp_closed ->
-%% i("we are done: "
-%% "~n Message Count: ~p"
-%% "~n Byte Count: ~p", [MCnt, BCnt]),
-%% exit(normal);
-%% throw:{tcp_error, Reason} ->
-%% i("<ERROR> TCP error ~p when: "
-%% "~n Message Count: ~p"
-%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]),
-%% exit({tcp_error, Reason})
-%% end.
-%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active),
-%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
-%% %% handler_send_reply(Sock, ID, Data),
-%% %% handler_maybe_activate(Sock, Active),
-%% %% handler_loop(State, MCnt+1, BCnt+MsgSz).
-
-
-%% handler_recv_request(Sock, Active) ->
-%% %% In theory we should also be ready for a partial header,
-%% %% but I can't be bothered...
-%% receive
-%% {tcp_closed, Sock} ->
-%% throw(tcp_closed);
-%% {tcp_error, Sock, Reason} ->
-%% throw({tcp_error, Reason});
-%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
-%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
-%% ID:32,
-%% SZ:32,
-%% Data/binary>> = Msg} when (size(Data) =:= SZ) ->
-%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
-%% {ID, Data, size(Msg)};
-%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
-%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
-%% ID:32,
-%% SZ:32,
-%% Data/binary>> = Msg} when (size(Data) < SZ) ->
-%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
-%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg))
-%% end.
-
-%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) ->
-%% handler_maybe_activate(Sock, Active),
-%% receive
-%% {tcp_closed, Sock} ->
-%% %% i("we are done (incomplete data)"),
-%% throw(tcp_closed);
-%% {tcp_error, Sock, Reason} ->
-%% throw({tcp_error, Reason});
-%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) ->
-%% %% i("[complete] received the remaining data (~w bytes) for msg ~w",
-%% %% [size(Data), ID]),
-%% {ID, <<AccData/binary, Data/binary>>, AccMsgSz+size(Data)};
-%% {tcp, Sock, Data} ->
-%% %% i("[incomplete] received ~w bytes of data for for msg ~w",
-%% %% [size(Data), ID]),
-%% handler_recv_request_data(Sock, Active, ID, SZ,
-%% <<AccData/binary, Data/binary>>,
-%% AccMsgSz+size(Data))
-%% end.
-
handler_send_reply(Mod, Sock, ID, Data) ->
SZ = size(Data),
Msg = <<?TTEST_TAG:32,
@@ -522,10 +447,8 @@ handler_send_reply(Mod, Sock, ID, Data) ->
ID:32,
SZ:32,
Data/binary>>,
- %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]),
case Mod:send(Sock, Msg) of
ok ->
- %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]),
ok;
{error, Reason} ->
(catch Mod:close(Sock)),
@@ -534,7 +457,7 @@ handler_send_reply(Mod, Sock, ID, Data) ->
handler_done(State) ->
- handler_done(State, t()).
+ handler_done(State, ?T()).
handler_done(#{start := Start,
mod := Mod,
@@ -542,7 +465,7 @@ handler_done(#{start := Start,
mcnt := MCnt,
bcnt := BCnt}, Stop) ->
(catch Mod:close(Sock)),
- exit({done, tdiff(Start, Stop), MCnt, BCnt}).
+ exit({done, ?TDIFF(Start, Stop), MCnt, BCnt}).
handler_handle_message(#{parent := Parent} = State) ->
@@ -634,45 +557,45 @@ reply(Pid, Ref, Reply) ->
%% ==========================================================================
-t() ->
- os:timestamp().
+%% t() ->
+%% os:timestamp().
-tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
- T1 = A1*1000000000+B1*1000+(C1 div 1000),
- T2 = A2*1000000000+B2*1000+(C2 div 1000),
- T2 - T1.
+%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+%% T1 = A1*1000000000+B1*1000+(C1 div 1000),
+%% T2 = A2*1000000000+B2*1000+(C2 div 1000),
+%% T2 - T1.
-formated_timestamp() ->
- format_timestamp(os:timestamp()).
+%% formated_timestamp() ->
+%% format_timestamp(os:timestamp()).
-format_timestamp({_N1, _N2, N3} = TS) ->
- {_Date, Time} = calendar:now_to_local_time(TS),
- {Hour,Min,Sec} = Time,
- FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
- [Hour, Min, Sec, round(N3/1000)]),
- lists:flatten(FormatTS).
+%% format_timestamp({_N1, _N2, N3} = TS) ->
+%% {_Date, Time} = calendar:now_to_local_time(TS),
+%% {Hour,Min,Sec} = Time,
+%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+%% [Hour, Min, Sec, round(N3/1000)]),
+%% lists:flatten(FormatTS).
-%% Time is always in number os ms (milli seconds)
-format_time(T) ->
- f("~p", [T]).
+%% %% Time is always in number os ms (milli seconds)
+%% format_time(T) ->
+%% f("~p", [T]).
%% ==========================================================================
-f(F, A) ->
- lists:flatten(io_lib:format(F, A)).
+%% f(F, A) ->
+%% lists:flatten(io_lib:format(F, A)).
-e(F, A) ->
- p(get(sname), "<ERROR> " ++ F, A).
+%% e(F, A) ->
+%% p(get(sname), "<ERROR> " ++ F, A).
-i(F) ->
- i(F, []).
+%% i(F) ->
+%% i(F, []).
-i(F, A) ->
- p(get(sname), "<INFO> " ++ F, A).
+%% i(F, A) ->
+%% p(get(sname), "<INFO> " ++ F, A).
-p(undefined, F, A) ->
- p("- ", F, A);
-p(Prefix, F, A) ->
- io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+%% p(undefined, F, A) ->
+%% p("- ", F, A);
+%% p(Prefix, F, A) ->
+%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
index de9df857fe..d62f4d86a6 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
@@ -21,14 +21,17 @@
-module(socket_test_ttest_tcp_server_socket).
-export([
- start_monitor/1,
+ start_monitor/2,
stop/1
]).
-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
+%% -define(MOD(M), {?TRANSPORT_MOD, #{method => M,
+%% stats_interval => 10000}}).
+-define(MOD(M), {?TRANSPORT_MOD, #{method => M}}).
-start_monitor(Active) ->
- socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active).
+start_monitor(Method, Active) ->
+ socket_test_ttest_tcp_server:start_monitor(?MOD(Method), Active).
stop(Pid) ->
socket_test_ttest_tcp_server:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
index 12d9e052d7..f314d01a63 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
@@ -24,9 +24,9 @@
accept/1, accept/2,
active/2,
close/1,
- connect/2,
+ connect/2, connect/3,
controlling_process/2,
- listen/0, listen/1,
+ listen/0, listen/1, listen/2,
port/1,
peername/1,
recv/2, recv/3,
@@ -38,25 +38,41 @@
-define(READER_RECV_TIMEOUT, 1000).
+-define(DATA_MSG(Sock, Method, Data),
+ {socket,
+ #{sock => Sock, reader => self(), method => Method},
+ Data}).
+
+-define(CLOSED_MSG(Sock, Method),
+ {socket_closed,
+ #{sock => Sock, reader => self(), method => Method}}).
+
+-define(ERROR_MSG(Sock, Method, Reason),
+ {socket_error,
+ #{sock => Sock, reader => self(), method => Method},
+ Reason}).
+
%% ==========================================================================
-accept(#{sock := LSock}) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}) ->
case socket:accept(LSock) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
-accept(#{sock := LSock}, Timeout) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) ->
case socket:accept(LSock, Timeout) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
@@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) ->
%% Create a socket and connect it to a peer
connect(Addr, Port) ->
+ connect(Addr, Port, #{method => plain}).
+
+connect(Addr, Port, #{method := Method} = Opts) ->
try
begin
Sock =
@@ -100,9 +119,10 @@ connect(Addr, Port) ->
(catch socket:close(Sock)),
throw({error, {connect, CReason}})
end,
- Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}}
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}}
end
catch
throw:ERROR:_ ->
@@ -110,6 +130,16 @@ connect(Addr, Port) ->
end.
+maybe_start_stats_timer(#{stats_to := Pid,
+ stats_interval := T},
+ Reader) when is_pid(Pid) ->
+ erlang:start_timer(T, Pid, {stats, T, "reader", Reader});
+maybe_start_stats_timer(O, _) ->
+ io:format("NO STATS: "
+ "~n ~p"
+ "~n", [O]),
+ ok.
+
controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
case socket:setopt(Sock, otp, controlling_process, NewPid) of
ok ->
@@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
%% Create a listen socket
listen() ->
- listen(0).
+ listen(0, #{method => plain}).
-listen(Port) when is_integer(Port) andalso (Port >= 0) ->
+listen(Port) ->
+ listen(Port, #{method => plain}).
+listen(Port, #{method := Method} = Opts)
+ when (is_integer(Port) andalso (Port >= 0)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
try
begin
Sock = case socket:open(inet, stream, tcp) of
@@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) ->
(catch socket:close(Sock)),
throw({error, {listen, LReason}})
end,
- {ok, #{sock => Sock}}
+ {ok, #{sock => Sock, opts => Opts}}
end
catch
throw:{error, Reason}:_ ->
@@ -178,14 +212,31 @@ peername(#{sock := Sock}) ->
end.
-recv(#{sock := Sock}, Length) ->
- socket:recv(Sock, Length).
-recv(#{sock := Sock}, Length, Timeout) ->
- socket:recv(Sock, Length, Timeout).
+recv(#{sock := Sock, method := plain}, Length) ->
+ socket:recv(Sock, Length);
+recv(#{sock := Sock, method := msg}, Length) ->
+ case socket:recvmsg(Sock, Length, 0, [], infinity) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+recv(#{sock := Sock, method := plain}, Length, Timeout) ->
+ socket:recv(Sock, Length, Timeout);
+recv(#{sock := Sock, method := msg}, Length, Timeout) ->
+ case socket:recvmsg(Sock, Length, 0, [], Timeout) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
-send(#{sock := Sock}, Length) ->
- socket:send(Sock, Length).
+send(#{sock := Sock, method := plain}, Bin) ->
+ socket:send(Sock, Bin);
+send(#{sock := Sock, method := msg}, Bin) ->
+ socket:sendmsg(Sock, #{iov => [Bin]}).
shutdown(#{sock := Sock}, How) ->
@@ -203,14 +254,16 @@ sockname(#{sock := Sock}) ->
%% ==========================================================================
-reader_init(ControllingProcess, Sock, Active)
+reader_init(ControllingProcess, Sock, Active, Method)
when is_pid(ControllingProcess) andalso
- (is_boolean(Active) orelse (Active =:= once)) ->
+ (is_boolean(Active) orelse (Active =:= once)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
MRef = erlang:monitor(process, ControllingProcess),
reader_loop(#{ctrl_proc => ControllingProcess,
ctrl_proc_mref => MRef,
active => Active,
- sock => Sock}).
+ sock => Sock,
+ method => Method}).
%% Never read
@@ -245,10 +298,11 @@ reader_loop(#{active := false,
%% Read *once* and then change to false
reader_loop(#{active := once,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State#{active => false});
{error, timeout} ->
receive
@@ -280,21 +334,22 @@ reader_loop(#{active := once,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end;
%% Read and forward data continuously
reader_loop(#{active := true,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State);
{error, timeout} ->
receive
@@ -326,20 +381,26 @@ reader_loop(#{active := true,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end.
-
+do_recv(plain, Sock) ->
+ socket:recv(Sock, 0, ?READER_RECV_TIMEOUT);
+do_recv(msg, Sock) ->
+ case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
%% ==========================================================================
-
-