aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-12-07 19:12:38 +0100
committerMicael Karlberg <[email protected]>2018-12-07 19:12:38 +0100
commita33acd53524160d65929d06f06e387ce8419b1c0 (patch)
treefee37591fb85039583e58858ade959d0de360142
parenta862cc8ab56616e182c959a5a6a06e4aefa09b08 (diff)
parentd0c3f79d22b4778f66ac1d8a2fc03e736f42e973 (diff)
downloadotp-a33acd53524160d65929d06f06e387ce8419b1c0.tar.gz
otp-a33acd53524160d65929d06f06e387ce8419b1c0.tar.bz2
otp-a33acd53524160d65929d06f06e387ce8419b1c0.zip
Merge branch 'bmk/20181206/nififying_inet_valgrind/OTP-14831' into bmk/20180918/nififying_inet/OTP-14831
-rw-r--r--erts/emulator/nifs/common/socket_int.h1
-rw-r--r--erts/emulator/nifs/common/socket_nif.c60
-rw-r--r--erts/emulator/test/Makefile1
-rw-r--r--erts/emulator/test/socket_test_ttest_lib.erl125
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client.erl261
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client_socket.erl15
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server.erl391
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_server_socket.erl9
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_socket.erl129
9 files changed, 562 insertions, 430 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index 7d223b8259..ec17e45f25 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -378,6 +378,7 @@ extern ERL_NIF_TERM esock_atom_einval;
#define ALLOC_BIN(SZ, BP) enif_alloc_binary((SZ), (BP))
#define REALLOC_BIN(SZ, BP) enif_realloc_binary((SZ), (BP))
+#define FREE_BIN(BP) enif_release_binary((BP))
#endif // SOCKET_INT_H__
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 3da895e644..1e0533535c 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -4233,8 +4233,13 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
descP->type = type;
descP->protocol = protocol;
+ /*
+ * Should we keep track of sockets (resources) in some way?
+ * Doing it here will require mutex to ensure data integrity,
+ * which will be costly. Send it somewhere?
+ */
res = enif_make_resource(env, descP);
- enif_release_resource(descP); // We should really store a reference ...
+ enif_release_resource(descP);
/* Keep track of the creator
* This should not be a problem but just in case
@@ -13592,6 +13597,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
recv_error_current_reader(env, descP, res);
+ FREE_BIN(bufP);
+
return res;
}
@@ -13633,6 +13640,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
return esock_make_error_str(env, xres);
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
data = MKBIN(env, bufP);
SSDBG( descP,
@@ -13664,6 +13674,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
recv_update_current_reader(env, descP);
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
data = MKBIN(env, bufP);
return esock_make_ok3(env, atom_true, data);
@@ -13707,6 +13720,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
(ERL_NIF_SELECT_STOP),
descP, NULL, recvRef);
+ FREE_BIN(bufP);
+
return res;
} else if ((saveErrno == ERRNO_BLOCK) ||
@@ -13721,16 +13736,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") );
- /*
- SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
- descP, NULL, recvRef);
- */
-
sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ),
descP, NULL, recvRef);
- SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) );
+ SSDBG( descP,
+ ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) );
+ FREE_BIN(bufP);
+
return esock_make_error(env, esock_atom_eagain);
} else {
ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
@@ -13740,6 +13753,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
recv_error_current_reader(env, descP, res);
+ FREE_BIN(bufP);
+
return res;
}
@@ -13768,6 +13783,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
recv_update_current_reader(env, descP);
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
data = MKBIN(env, bufP);
data = MKSBIN(env, data, 0, read);
@@ -13792,6 +13810,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
cnt_inc(&descP->readByteCnt, read);
+ /* This transfers "ownership" of the *allocated* binary to an
+ * erlang term (no need for an explicit free).
+ */
data = MKBIN(env, bufP);
data = MKSBIN(env, data, 0, read);
@@ -13862,6 +13883,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
(ERL_NIF_SELECT_STOP),
descP, NULL, recvRef);
+ FREE_BIN(bufP);
+
return res;
} else if ((saveErrno == ERRNO_BLOCK) ||
@@ -13875,7 +13898,10 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
descP, NULL, recvRef);
+ FREE_BIN(bufP);
+
return esock_make_error(env, esock_atom_eagain);
+
} else {
ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
@@ -13885,6 +13911,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
recv_error_current_reader(env, descP, res);
+ FREE_BIN(bufP);
+
return res;
}
@@ -13962,6 +13990,9 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
* *We* do never actually try to read 0 bytes from a stream socket!
*/
+
+ FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+
return esock_make_error(env, atom_closed);
}
@@ -14002,6 +14033,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
(ERL_NIF_SELECT_STOP),
descP, NULL, recvRef);
+ FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+
return res;
} else if ((saveErrno == ERRNO_BLOCK) ||
@@ -14016,7 +14049,10 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
descP, NULL, recvRef);
+ FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+
return esock_make_error(env, esock_atom_eagain);
+
} else {
ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
@@ -14026,6 +14062,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
recv_error_current_reader(env, descP, res);
+ FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+
return res;
}
@@ -14057,6 +14095,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
recv_update_current_reader(env, descP);
+ FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+
return esock_make_error_str(env, xres);
} else {
@@ -15713,6 +15753,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) {
char buf[64]; /* Buffer used for building the mutex name */
+ // This needs to be released when the socket is closed!
descP->env = enif_alloc_env();
sprintf(buf, "socket[w,%d]", sock);
@@ -16870,10 +16911,15 @@ void socket_dtor(ErlNifEnv* env, void* obj)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
+ enif_clear_env(descP->env);
+ enif_free_env(descP->env);
+ descP->env = NULL;
+
MDESTROY(descP->writeMtx);
MDESTROY(descP->readMtx);
MDESTROY(descP->accMtx);
MDESTROY(descP->closeMtx);
+
}
diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile
index a910588381..09bfe6f104 100644
--- a/erts/emulator/test/Makefile
+++ b/erts/emulator/test/Makefile
@@ -34,6 +34,7 @@ SOCKET_MODULES = \
socket_client \
socket_test_lib \
socket_test_evaluator \
+ socket_test_ttest_lib \
socket_test_ttest_tcp_gen \
socket_test_ttest_tcp_socket \
socket_test_ttest_tcp_client \
diff --git a/erts/emulator/test/socket_test_ttest_lib.erl b/erts/emulator/test/socket_test_ttest_lib.erl
new file mode 100644
index 0000000000..71679bc6d6
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_lib.erl
@@ -0,0 +1,125 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(socket_test_ttest_lib).
+
+-compile({no_auto_import, [error/2]}).
+
+-export([
+ t/0, tdiff/2,
+ formated_timestamp/0, format_timestamp/1,
+ format_time/1,
+
+ formated_process_stats/1, formated_process_stats/2,
+
+ format/2,
+ error/1, error/2,
+ info/1, info/2
+ ]).
+
+%% ==========================================================================
+
+t() ->
+ os:timestamp().
+
+tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+ T1 = A1*1000000000+B1*1000+(C1 div 1000),
+ T2 = A2*1000000000+B2*1000+(C2 div 1000),
+ T2 - T1.
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp({_N1, _N2, N3} = TS) ->
+ {_Date, Time} = calendar:now_to_local_time(TS),
+ {Hour,Min,Sec} = Time,
+ FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+ [Hour, Min, Sec, round(N3/1000)]),
+ lists:flatten(FormatTS).
+
+%% Time is always in number os ms (milli seconds)
+%% At some point, we should convert this to a more readable format...
+format_time(T) ->
+ format("~p", [T]).
+
+
+formated_process_stats(Pid) ->
+ formated_process_stats("", Pid).
+
+formated_process_stats(Prefix, Pid) when is_list(Prefix) andalso is_pid(Pid) ->
+ try
+ begin
+ TotHeapSz = pi(Pid, total_heap_size),
+ HeapSz = pi(Pid, heap_size),
+ StackSz = pi(Pid, stack_size),
+ Reds = pi(Pid, reductions),
+ GCInfo = pi(Pid, garbage_collection),
+ MinBinVHeapSz = proplists:get_value(min_bin_vheap_size, GCInfo),
+ MinHeapSz = proplists:get_value(min_heap_size, GCInfo),
+ MinGCS = proplists:get_value(minor_gcs, GCInfo),
+ format("~n ~sTotal Heap Size: ~p"
+ "~n ~sHeap Size: ~p"
+ "~n ~sStack Size: ~p"
+ "~n ~sReductions: ~p"
+ "~n ~s[GC] Min Bin VHeap Size: ~p"
+ "~n ~s[GC] Min Heap Size: ~p"
+ "~n ~s[GC] Minor GCS: ~p",
+ [Prefix, TotHeapSz,
+ Prefix, HeapSz,
+ Prefix, StackSz,
+ Prefix, Reds,
+ Prefix, MinBinVHeapSz,
+ Prefix, MinHeapSz,
+ Prefix, MinGCS])
+ end
+ catch
+ _:_:_ ->
+ ""
+ end.
+
+
+pi(Pid, Item) ->
+ {Item, Info} = process_info(Pid, Item),
+ Info.
+
+
+
+%% ==========================================================================
+
+format(F, A) ->
+ lists:flatten(io_lib:format(F, A)).
+
+error(F) ->
+ error(F, []).
+
+error(F, A) ->
+ print(get(sname), "<ERROR> " ++ F, A).
+
+info(F) ->
+ info(F, []).
+
+info(F, A) ->
+ print(get(sname), "<INFO> " ++ F, A).
+
+print(undefined, F, A) ->
+ print("- ", F, A);
+print(Prefix, F, A) ->
+ io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl
index 1bd1bc54e9..9c43c41841 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_client.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl
@@ -54,6 +54,14 @@
-define(MAX_OUTSTANDING_DEFAULT_2, 10).
-define(MAX_OUTSTANDING_DEFAULT_3, 3).
+-define(LIB, socket_test_ttest_lib).
+-define(I(F), ?LIB:info(F)).
+-define(I(F,A), ?LIB:info(F, A)).
+-define(E(F,A), ?LIB:error(F, A)).
+-define(F(F,A), ?LIB:format(F, A)).
+-define(FORMAT_TIME(T), ?LIB:format_time(T)).
+-define(T(), ?LIB:t()).
+-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)).
-type active() :: once | boolean().
-type msg_id() :: 1..3.
@@ -63,42 +71,28 @@
%% ==========================================================================
--spec start_monitor(Mod, Active, Addr, Port) -> term() when
- Mod :: atom(),
- Active :: active(),
- Addr :: inet:ip_address(),
- Port :: inet:port_number().
+start_monitor(Transport, Active, Addr, Port) ->
+ start_monitor(Transport, Active, Addr, Port, ?MSG_ID_DEFAULT).
%% RunTime is in number of ms.
-start_monitor(Mod, Active, Addr, Port) ->
- start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT).
-
--spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when
- Mod :: atom(),
- Active :: active(),
- Addr :: inet:ip_address(),
- Port :: inet:port_number(),
- MsgID :: msg_id().
-
-%% RunTime is in number of ms.
-start_monitor(Mod, Active, Addr, Port, 1 = MsgID) ->
- start_monitor(Mod, Active, Addr, Port, MsgID,
+start_monitor(Transport, Active, Addr, Port, 1 = MsgID) ->
+ start_monitor(Transport, Active, Addr, Port, MsgID,
?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT);
-start_monitor(Mod, Active, Addr, Port, 2 = MsgID) ->
- start_monitor(Mod, Active, Addr, Port, MsgID,
+start_monitor(Transport, Active, Addr, Port, 2 = MsgID) ->
+ start_monitor(Transport, Active, Addr, Port, MsgID,
?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT);
-start_monitor(Mod, Active, Addr, Port, 3 = MsgID) ->
- start_monitor(Mod, Active, Addr, Port, MsgID,
+start_monitor(Transport, Active, Addr, Port, 3 = MsgID) ->
+ start_monitor(Transport, Active, Addr, Port, MsgID,
?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT).
--spec start_monitor(Mod,
+-spec start_monitor(Transport,
Active,
Addr,
Port,
MsgID,
MaxOutstanding,
RunTime) -> term() when
- Mod :: atom(),
+ Transport :: atom() | tuple(),
Active :: active(),
Addr :: inet:ip_address(),
Port :: inet:port_number(),
@@ -107,9 +101,9 @@ start_monitor(Mod, Active, Addr, Port, 3 = MsgID) ->
RunTime :: runtime().
%% RunTime is in number of ms.
-start_monitor(Mod, Active, Addr, Port,
+start_monitor(Transport, Active, Addr, Port,
MsgID, MaxOutstanding, RunTime)
- when is_atom(Mod) andalso
+ when (is_atom(Transport) orelse is_tuple(Transport)) andalso
(is_boolean(Active) orelse (Active =:= once)) andalso
is_tuple(Addr) andalso
(is_integer(Port) andalso (Port > 0)) andalso
@@ -119,7 +113,7 @@ start_monitor(Mod, Active, Addr, Port,
Self = self(),
ClientInit = fun() -> put(sname, "client"),
init(Self,
- Mod, Active, Addr, Port,
+ Transport, Active, Addr, Port,
MsgID, MaxOutstanding, RunTime)
end,
{Pid, MRef} = spawn_monitor(ClientInit),
@@ -147,28 +141,29 @@ stop(Pid) when is_pid(Pid) ->
%% ==========================================================================
-init(Parent, Mod, Active, Addr, Port,
+init(Parent, Transport, Active, Addr, Port,
MsgID, MaxOutstanding, RunTime) ->
- i("init -> entry with"
- "~n Parent: ~p"
- "~n Mod: ~p"
- "~n Active: ~p"
- "~n Addr: ~s"
- "~n Port: ~p"
- "~n Msg ID: ~p (=> 16 + ~w bytes)"
- "~n Max Outstanding: ~p"
- "~n (Suggested) Run Time: ~p ms",
- [Parent,
- Mod, Active, inet:ntoa(Addr), Port,
- MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]),
- case Mod:connect(Addr, Port) of
+ ?I("init with"
+ "~n Parent: ~p"
+ "~n Transport: ~p"
+ "~n Active: ~p"
+ "~n Addr: ~s"
+ "~n Port: ~p"
+ "~n Msg ID: ~p (=> 16 + ~w bytes)"
+ "~n Max Outstanding: ~p"
+ "~n (Suggested) Run Time: ~p ms",
+ [Parent,
+ Transport, Active, inet:ntoa(Addr), Port,
+ MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]),
+ {Mod, Connect} = process_transport(Transport),
+ case Connect(Addr, Port) of
{ok, Sock} ->
- i("init -> connected"),
+ ?I("connected"),
Parent ! {?MODULE, self(), ok},
initial_activation(Mod, Sock, Active),
Results = loop(#{slogan => run,
runtime => RunTime,
- start => t(),
+ start => ?T(),
parent => Parent,
mod => Mod,
sock => Sock,
@@ -187,10 +182,16 @@ init(Parent, Mod, Active, Addr, Port,
(catch Mod:close(Sock)),
exit(normal);
{error, Reason} ->
- i("init -> connect failed: ~p", [Reason]),
+ ?E("connect failed: ~p", [Reason]),
exit({connect, Reason})
end.
+process_transport(Mod) when is_atom(Mod) ->
+ {Mod, fun(A, P) -> Mod:connect(A, P) end};
+process_transport({Mod, Opts}) ->
+ {Mod, fun(A, P) -> Mod:connect(A, P, Opts) end}.
+
+
which_msg_data(1) -> ?MSG_DATA1;
which_msg_data(2) -> ?MSG_DATA2;
which_msg_data(3) -> ?MSG_DATA3.
@@ -200,21 +201,21 @@ present_results(#{status := ok,
runtime := RunTime,
bcnt := ByteCnt,
cnt := NumIterations}) ->
- i("Results: "
- "~n Run Time: ~s"
- "~n ByteCnt: ~s"
- "~n NumIterations: ~s",
- [format_time(RunTime),
- if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) ->
- f("~w, ~w", [ByteCnt, RunTime]);
+ ?I("Results: "
+ "~n Run Time: ~s"
+ "~n ByteCnt: ~s"
+ "~n NumIterations: ~s",
+ [?FORMAT_TIME(RunTime),
+ if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) ->
+ ?F("~w, ~w", [ByteCnt, RunTime]);
true ->
- f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime])
+ ?F("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime])
end,
if (RunTime =:= 0) ->
"-";
true ->
- f("~p => ~p iterations / ms",
- [NumIterations, NumIterations div RunTime])
+ ?F("~p => ~p iterations / ms",
+ [NumIterations, NumIterations div RunTime])
end]),
ok;
present_results(#{status := Failure,
@@ -225,21 +226,21 @@ present_results(#{status := Failure,
rcnt := RCnt,
bcnt := BCnt,
num := Num}) ->
- i("Time Test failed: "
- "~n ~p"
- "~n"
- "~nwhen"
- "~n"
- "~n Run Time: ~s"
- "~n Send ID: ~p"
- "~n Recv ID: ~p"
- "~n Send Count: ~p"
- "~n Recv Count: ~p"
- "~n Byte Count: ~p"
- "~n Num Iterations: ~p",
- [Failure,
- format_time(RunTime),
- SID, RID, SCnt, RCnt, BCnt, Num]).
+ ?I("Time Test failed: "
+ "~n ~p"
+ "~n"
+ "~nwhen"
+ "~n"
+ "~n Run Time: ~s"
+ "~n Send ID: ~p"
+ "~n Recv ID: ~p"
+ "~n Send Count: ~p"
+ "~n Recv Count: ~p"
+ "~n Byte Count: ~p"
+ "~n Num Iterations: ~p",
+ [Failure,
+ ?FORMAT_TIME(RunTime),
+ SID, RID, SCnt, RCnt, BCnt, Num]).
@@ -255,17 +256,14 @@ do_loop(State) ->
do_loop( handle_message( msg_exchange(State) ) ).
msg_exchange(#{rcnt := Num, num := Num} = State) ->
- %% i("we are done"),
finish(ok, State);
msg_exchange(#{scnt := Num, num := Num} = State) ->
%% We are done sending more requests - now we will just await
%% the replies for the (still) outstanding replies.
- %% i("we have sent all requests - (only) wait for replies"),
msg_exchange( recv_reply(State) );
msg_exchange(#{outstanding := Outstanding,
max_outstanding := MaxOutstanding} = State)
when (Outstanding < MaxOutstanding) ->
- %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]),
msg_exchange( send_request(State) );
msg_exchange(State) ->
send_request( recv_reply(State) ).
@@ -273,9 +271,9 @@ msg_exchange(State) ->
finish(ok,
#{start := Start, bcnt := BCnt, num := Num}) ->
- Stop = t(),
+ Stop = ?T(),
throw(#{status => ok,
- runtime => tdiff(Start, Stop),
+ runtime => ?TDIFF(Start, Stop),
bcnt => BCnt,
cnt => Num});
finish(Reason,
@@ -283,9 +281,9 @@ finish(Reason,
sid := SID, rid := RID,
scnt := SCnt, rcnt := RCnt, bcnt := BCnt,
num := Num}) ->
- Stop = t(),
+ Stop = ?T(),
throw(#{status => Reason,
- runtime => tdiff(Start, Stop),
+ runtime => ?TDIFF(Start, Stop),
sid => SID,
rid => RID,
scnt => SCnt,
@@ -301,11 +299,6 @@ send_request(#{mod := Mod,
max_outstanding := MaxOutstanding,
msg_data := Data} = State)
when (MaxOutstanding > Outstanding) ->
- %% i("send request -> entry when"
- %% "~n ID: ~p"
- %% "~n Cnt: ~p"
- %% "~n Outstanding: ~p"
- %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]),
SZ = size(Data),
Req = <<?TTEST_TAG:32,
?TTEST_TYPE_REQUEST:32,
@@ -314,12 +307,11 @@ send_request(#{mod := Mod,
Data/binary>>,
case Mod:send(Sock, Req) of
ok ->
- %% i("~w bytes sent", [size(Req)]),
State#{sid => next_id(ID),
scnt => Cnt + 1,
outstanding => Outstanding + 1};
{error, Reason} ->
- e("Failed sending request: ~p", [Reason]),
+ ?E("Failed sending request: ~p", [Reason]),
exit({send, Reason})
end;
send_request(State) ->
@@ -334,11 +326,6 @@ recv_reply(#{mod := Mod,
bcnt := BCnt,
rcnt := Cnt,
outstanding := Outstanding} = State) ->
- %% i("recv-reply(false) -> entry with"
- %% "~n (R)ID: ~p"
- %% "~n (R)Cnt: ~p"
- %% "~n BCnt: ~p"
- %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]),
case recv_reply_message1(Mod, Sock, ID) of
{ok, MsgSz} ->
State#{rid => next_id(ID),
@@ -347,7 +334,7 @@ recv_reply(#{mod := Mod,
outstanding => Outstanding - 1};
{error, timeout} ->
- i("recv_reply(false) -> error: timeout"),
+ ?I("receive timeout"),
State;
{error, Reason} ->
@@ -362,11 +349,6 @@ recv_reply(#{mod := Mod,
rcnt := RCnt,
outstanding := Outstanding,
acc := Acc} = State) ->
- %% i("recv-reply(~w) -> entry with"
- %% "~n (R)ID: ~p"
- %% "~n RCnt: ~p"
- %% "~n BCnt: ~p"
- %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]),
case recv_reply_message2(Mod, Sock, ID, Acc) of
{ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) ->
maybe_activate(Mod, Sock, Active),
@@ -380,12 +362,12 @@ recv_reply(#{mod := Mod,
State;
{error, stop} ->
- i("recv_reply(~w) -> stop", [Active]),
+ ?I("receive [~w] -> stop", [Active]),
%% This will have the effect that no more requests are sent...
- State#{num => SCnt, stop_started => t()};
+ State#{num => SCnt, stop_started => ?T()};
{error, timeout} ->
- i("recv_reply(~w) -> error: timeout", [Active]),
+ ?I("receive[~w] -> timeout", [Active]),
State;
{error, Reason} ->
@@ -395,23 +377,19 @@ recv_reply(#{mod := Mod,
%% This function reads exactly one (reply) message. No more no less.
recv_reply_message1(Mod, Sock, ID) ->
- %% i("recv_reply_message1 -> entry with"
- %% "~n ID: ~w", [ID]),
case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
{ok, <<?TTEST_TAG:32,
?TTEST_TYPE_REPLY:32,
ID:32,
SZ:32>> = Hdr} ->
%% Receive the ping-pong reply boby
- %% i("recv_reply_message1 -> try read body"
- %% "~n ID: ~w", [ID]),
case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
{ok, Data} when (size(Data) =:= SZ) ->
{ok, size(Hdr) + size(Data)};
{error, Reason2} ->
- i("recv_reply_message1 -> body error: "
- "~n ~p: ~p", [Reason2]),
- {error, {recv_hdr, Reason2}}
+ ?E("Failed reading body: "
+ "~n ~p: ~p", [Reason2]),
+ {error, {recv_body, Reason2}}
end;
{ok, <<BadTag:32,
@@ -427,7 +405,7 @@ recv_reply_message1(Mod, Sock, ID) ->
{error, invalid_hdr};
{error, Reason1} ->
- i("recv_reply_message1 -> hdr error: "
+ ?E("Feiled reading header: "
"~n ~p", [Reason1]),
{error, {recv_hdr, Reason1}}
end.
@@ -437,8 +415,6 @@ recv_reply_message1(Mod, Sock, ID) ->
%% accumulated. If that is not enough for a (complete) reply, it
%% will attempt to receive more.
recv_reply_message2(Mod, Sock, ID, Acc) ->
- %% i("recv_reply_message2 -> entry with"
- %% "~n ID: ~w", [ID]),
case process_acc_data(ID, Acc) of
ok ->
%% No or insufficient data, so get more
@@ -456,7 +432,6 @@ recv_reply_message2(Mod, Sock, ID, Acc) ->
recv_reply_message3(_Mod, Sock, ID, Acc) ->
receive
{timeout, _TRef, stop} ->
- %% i("stop - when messages: ~p", [process_info(self(), messages)]),
{error, stop};
{TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
@@ -469,7 +444,6 @@ recv_reply_message3(_Mod, Sock, ID, Acc) ->
{Tag, Sock, Msg} when (Tag =:= tcp) orelse
(Tag =:= socket) ->
- %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]),
process_acc_data(ID, <<Acc/binary, Msg/binary>>)
%% after ?RECV_TIMEOUT ->
@@ -482,9 +456,6 @@ process_acc_data(ID, <<?TTEST_TAG:32,
ID:32,
SZ:32,
Data/binary>>) when (SZ =< size(Data)) ->
- %% i("process_acc_data -> entry with"
- %% "~n ID: ~w"
- %% "~n SZ: ~w", [ID, SZ]),
<<_Body:SZ/binary, Rest/binary>> = Data,
{ok, {4*4+SZ, Rest}};
process_acc_data(ID, <<BadTag:32,
@@ -508,9 +479,9 @@ process_acc_data(_ID, _Data) ->
handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) ->
receive
{timeout, _TRef, stop} ->
- i("stop"),
+ ?I("STOP"),
%% This will have the effect that no more requests are sent...
- State#{num => SCnt, stop_started => t()};
+ State#{num => SCnt, stop_started => ?T()};
{?MODULE, Ref, Parent, stop} ->
%% This *aborts* the test
@@ -571,47 +542,47 @@ next_id(_) ->
%% ==========================================================================
-t() ->
- os:timestamp().
+%% t() ->
+%% os:timestamp().
-tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
- T1 = A1*1000000000+B1*1000+(C1 div 1000),
- T2 = A2*1000000000+B2*1000+(C2 div 1000),
- T2 - T1.
+%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+%% T1 = A1*1000000000+B1*1000+(C1 div 1000),
+%% T2 = A2*1000000000+B2*1000+(C2 div 1000),
+%% T2 - T1.
-formated_timestamp() ->
- format_timestamp(os:timestamp()).
+%% formated_timestamp() ->
+%% format_timestamp(os:timestamp()).
-format_timestamp({_N1, _N2, N3} = TS) ->
- {_Date, Time} = calendar:now_to_local_time(TS),
- {Hour,Min,Sec} = Time,
- FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
- [Hour, Min, Sec, round(N3/1000)]),
- lists:flatten(FormatTS).
+%% format_timestamp({_N1, _N2, N3} = TS) ->
+%% {_Date, Time} = calendar:now_to_local_time(TS),
+%% {Hour,Min,Sec} = Time,
+%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+%% [Hour, Min, Sec, round(N3/1000)]),
+%% lists:flatten(FormatTS).
-%% Time is always in number os ms (milli seconds)
-format_time(T) ->
- f("~p", [T]).
+%% %% Time is always in number os ms (milli seconds)
+%% format_time(T) ->
+%% f("~p", [T]).
%% ==========================================================================
-f(F, A) ->
- lists:flatten(io_lib:format(F, A)).
+%% f(F, A) ->
+%% lists:flatten(io_lib:format(F, A)).
-%% e(F) ->
-%% i("<ERROR> " ++ F).
+%% %% e(F) ->
+%% %% i("<ERROR> " ++ F).
-e(F, A) ->
- p(get(sname), "<ERROR> " ++ F, A).
+%% e(F, A) ->
+%% p(get(sname), "<ERROR> " ++ F, A).
-i(F) ->
- i(F, []).
+%% i(F) ->
+%% i(F, []).
-i(F, A) ->
- p(get(sname), "<INFO> " ++ F, A).
+%% i(F, A) ->
+%% p(get(sname), "<INFO> " ++ F, A).
-p(undefined, F, A) ->
- p("- ", F, A);
-p(Prefix, F, A) ->
- io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+%% p(undefined, F, A) ->
+%% p("- ", F, A);
+%% p(Prefix, F, A) ->
+%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
index 0d2ec38876..ef980e8d32 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl
@@ -21,23 +21,24 @@
-module(socket_test_ttest_tcp_client_socket).
-export([
- start_monitor/3, start_monitor/4, start_monitor/6,
+ start_monitor/4, start_monitor/5, start_monitor/7,
stop/1
]).
-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
+-define(MOD(M), {?TRANSPORT_MOD, #{method => Method}}).
-start_monitor(Active, Addr, Port) ->
- socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+start_monitor(Method, Active, Addr, Port) ->
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Method),
Active, Addr, Port).
-start_monitor(Active, Addr, Port, MsgID) ->
- socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+start_monitor(Method, Active, Addr, Port, MsgID) ->
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Method),
Active, Addr, Port,
MsgID).
-start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) ->
- socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD,
+start_monitor(Method, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) ->
+ socket_test_ttest_tcp_client:start_monitor(?MOD(Method),
Active, Addr, Port,
MsgID, MaxOutstanding, RunTime).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl
index cb503a1feb..b248b063a3 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_server.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl
@@ -44,13 +44,25 @@
-define(ACC_TIMEOUT, 10000).
-define(RECV_TIMEOUT, 10000).
+-define(LIB, socket_test_ttest_lib).
+-define(I(F), ?LIB:info(F)).
+-define(I(F,A), ?LIB:info(F, A)).
+-define(E(F,A), ?LIB:error(F, A)).
+-define(F(F,A), ?LIB:format(F, A)).
+-define(FORMAT_TIME(T), ?LIB:format_time(T)).
+-define(T(), ?LIB:t()).
+-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)).
+
%% ==========================================================================
-start_monitor(Mod, Active)
- when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) ->
+start_monitor(Transport, Active)
+ when (is_atom(Transport) orelse is_tuple(Transport)) andalso
+ (is_boolean(Active) orelse (Active =:= once)) ->
Self = self(),
- ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end,
+ ServerInit = fun() -> put(sname, "server"),
+ server_init(Self, Transport, Active)
+ end,
{Pid, MRef} = spawn_monitor(ServerInit),
receive
{'DOWN', MRef, process, Pid, normal} ->
@@ -73,26 +85,28 @@ stop(Pid) when is_pid(Pid) ->
%% ==========================================================================
-server_init(Parent, Mod, Active) ->
- i("init -> entry with"
- "~n Parent: ~p"
- "~n Mod: ~p"
- "~n Active: ~p", [Parent, Mod, Active]),
- case Mod:listen(0) of
+server_init(Parent, Transport, Active) ->
+ ?I("init -> entry with"
+ "~n Parent: ~p"
+ "~n Transport: ~p"
+ "~n Active: ~p", [Parent, Transport, Active]),
+ {Mod, Listen, StatsInterval} = process_transport(Transport, Active),
+ case Listen(0) of
{ok, LSock} ->
case Mod:port(LSock) of
{ok, Port} = OK ->
Addr = which_addr(), % This is just for convenience
- i("init -> listening on:"
- "~n Addr: ~p (~s)"
- "~n Port: ~w"
- "~n", [Addr, inet:ntoa(Addr), Port]),
+ ?I("init -> listening on:"
+ "~n Addr: ~p (~s)"
+ "~n Port: ~w"
+ "~n", [Addr, inet:ntoa(Addr), Port]),
Parent ! {?MODULE, self(), OK},
- server_loop(#{parent => Parent,
- mod => Mod,
- active => Active,
- lsock => LSock,
- handlers => [],
+ server_loop(#{parent => Parent,
+ mod => Mod,
+ active => Active,
+ lsock => LSock,
+ handlers => [],
+ stats_interval => StatsInterval,
%% Accumulation
runtime => 0,
mcnt => 0,
@@ -107,37 +121,40 @@ server_init(Parent, Mod, Active) ->
exit({listen, LReason})
end.
+process_transport(Mod, _) when is_atom(Mod) ->
+ {Mod, fun(Port) -> Mod:listen(Port) end, infinity};
+process_transport({Mod, #{stats_interval := T} = Opts}, Active)
+ when (Active =/= false) ->
+ {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T};
+process_transport({Mod, #{stats_interval := T} = Opts}, _Active) ->
+ {Mod, fun(Port) -> Mod:listen(Port, Opts) end, T};
+process_transport({Mod, Opts}, _Active) ->
+ {Mod, fun(Port) -> Mod:listen(Port, Opts) end, infinity}.
+
+
server_loop(State) ->
- %% i("loop -> enter"),
server_loop( server_handle_message( server_accept(State) ) ).
server_accept(#{mod := Mod,
active := Active,
lsock := LSock,
handlers := Handlers} = State) ->
-
- %% i("server-accept -> entry with"
- %% "~n Mod: ~p"
- %% "~n Active: ~p"
- %% "~n LSock: ~p", [Mod, Active, LSock]),
-
case Mod:accept(LSock, ?ACC_TIMEOUT) of
{ok, Sock} ->
- %% i("server-accept -> accepted: "
- %% "~n Sock: ~p", [Sock]),
- i("accepted connection from ~s",
- [case Mod:peername(Sock) of
- {ok, Peer} ->
- format_peername(Peer);
- {error, _} ->
- "-"
- end]),
+ ?I("accepted connection from ~s",
+ [case Mod:peername(Sock) of
+ {ok, Peer} ->
+ format_peername(Peer);
+ {error, _} ->
+ "-"
+ end]),
{Pid, _} = handler_start(),
- i("handler ~p started -> try transfer socket control", [Pid]),
+ ?I("handler ~p started -> try transfer socket control", [Pid]),
case Mod:controlling_process(Sock, Pid) of
ok ->
- i("server-accept: handler ~p started", [Pid]),
+ maybe_start_stats_timer(State, Pid),
+ ?I("server-accept: handler ~p started", [Pid]),
handler_continue(Pid, Mod, Sock, Active),
Handlers2 = [Pid | Handlers],
State#{handlers => Handlers2};
@@ -156,14 +173,31 @@ server_accept(#{mod := Mod,
format_peername({Addr, Port}) ->
case inet:gethostbyaddr(Addr) of
{ok, #hostent{h_name = N}} ->
- f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
+ ?F("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
{error, _} ->
- f("~p, ~p", [Addr, Port])
+ ?F("~p, ~p", [Addr, Port])
end.
-
+
+maybe_start_stats_timer(#{active := Active, stats_interval := Time}, Handler)
+ when (Active =/= false) andalso (is_integer(Time) andalso (Time > 0)) ->
+ start_stats_timer(Time, "handler", Handler);
+maybe_start_stats_timer(_, _) ->
+ ok.
+
+start_stats_timer(Time, ProcStr, Pid) ->
+ erlang:start_timer(Time, self(), {stats, Time, ProcStr, Pid}).
+
server_handle_message(#{parent := Parent, handlers := H} = State) ->
- %% i("server_handle_message -> enter"),
receive
+ {timeout, _TRef, {stats, Interval, ProcStr, Pid}} ->
+ case server_handle_stats(ProcStr, Pid) of
+ ok ->
+ start_stats_timer(Interval, ProcStr, Pid);
+ skip ->
+ ok
+ end,
+ State;
+
{?MODULE, Ref, Parent, stop} ->
reply(Parent, Ref, ok),
lists:foreach(fun(P) -> handler_stop(P) end, H),
@@ -176,11 +210,20 @@ server_handle_message(#{parent := Parent, handlers := H} = State) ->
State
end.
+server_handle_stats(ProcStr, Pid) ->
+ case ?LIB:formated_process_stats(Pid) of
+ "" ->
+ skip;
+ FormatedStats ->
+ ?I("Statistics for ~s ~p:~s", [ProcStr, Pid, FormatedStats]),
+ ok
+ end.
+
server_handle_down(Pid, Reason, #{handlers := Handlers} = State) ->
case lists:delete(Pid, Handlers) of
Handlers ->
- i("unknown process ~p died", [Pid]),
+ ?I("unknown process ~p died", [Pid]),
State;
Handlers2 ->
server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2})
@@ -197,35 +240,35 @@ server_handle_handler_down(Pid,
AccMCnt2 = AccMCnt + MCnt,
AccBCnt2 = AccBCnt + BCnt,
AccHCnt2 = AccHCnt + 1,
- i("handler ~p (~w) done => accumulated results: "
- "~n Run Time: ~s ms"
- "~n Message Count: ~s"
- "~n Byte Count: ~s",
- [Pid, AccHCnt2,
- format_time(AccRunTime2),
- if (AccRunTime2 > 0) ->
- f("~w => ~w (~w) msgs / ms",
- [AccMCnt2,
- AccMCnt2 div AccRunTime2,
- (AccMCnt2 div AccHCnt2) div AccRunTime2]);
- true ->
- f("~w", [AccMCnt2])
- end,
- if (AccRunTime2 > 0) ->
- f("~w => ~w (~w) bytes / ms",
- [AccBCnt2,
- AccBCnt2 div AccRunTime2,
- (AccBCnt2 div AccHCnt2) div AccRunTime2]);
- true ->
- f("~w", [AccBCnt2])
- end]),
+ ?I("handler ~p (~w) done => accumulated results: "
+ "~n Run Time: ~s ms"
+ "~n Message Count: ~s"
+ "~n Byte Count: ~s",
+ [Pid, AccHCnt2,
+ ?FORMAT_TIME(AccRunTime2),
+ if (AccRunTime2 > 0) ->
+ ?F("~w => ~w (~w) msgs / ms",
+ [AccMCnt2,
+ AccMCnt2 div AccRunTime2,
+ (AccMCnt2 div AccHCnt2) div AccRunTime2]);
+ true ->
+ ?F("~w", [AccMCnt2])
+ end,
+ if (AccRunTime2 > 0) ->
+ ?F("~w => ~w (~w) bytes / ms",
+ [AccBCnt2,
+ AccBCnt2 div AccRunTime2,
+ (AccBCnt2 div AccHCnt2) div AccRunTime2]);
+ true ->
+ ?F("~w", [AccBCnt2])
+ end]),
State#{runtime => AccRunTime2,
mcnt => AccMCnt2,
bcnt => AccBCnt2,
hcnt => AccHCnt2};
server_handle_handler_down(Pid, Reason, State) ->
- i("handler ~p terminated: "
- "~n ~p", [Pid, Reason]),
+ ?I("handler ~p terminated: "
+ "~n ~p", [Pid, Reason]),
State.
@@ -244,32 +287,31 @@ handler_stop(Pid) ->
req(Pid, stop).
handler_init(Parent) ->
- i("starting"),
+ ?I("starting"),
receive
{?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} ->
- i("received continue"),
+ ?I("received continue"),
reply(Parent, Ref, ok),
handler_initial_activation(Mod, Sock, Active),
handler_loop(#{parent => Parent,
mod => Mod,
sock => Sock,
active => Active,
- start => t(),
+ start => ?T(),
mcnt => 0,
bcnt => 0,
last_reply => none,
acc => <<>>})
after 5000 ->
- i("timeout when message queue: "
- "~n ~p"
- "~nwhen"
- "~n Parent: ~p", [process_info(self(), messages), Parent]),
+ ?I("timeout when message queue: "
+ "~n ~p"
+ "~nwhen"
+ "~n Parent: ~p", [process_info(self(), messages), Parent]),
handler_init(Parent)
end.
handler_loop(State) ->
- %% i("handler-loop"),
handler_loop( handler_handle_message( handler_recv_message(State) ) ).
%% When passive, we read *one* request and then attempt to reply to it.
@@ -278,7 +320,6 @@ handler_recv_message(#{mod := Mod,
active := false,
mcnt := MCnt,
bcnt := BCnt} = State) ->
- %% i("handler_recv_message(false) -> entry"),
case handler_recv_message2(Mod, Sock) of
{ok, {MsgSz, ID, Body}} ->
handler_send_reply(Mod, Sock, ID, Body),
@@ -305,7 +346,6 @@ handler_recv_message(#{mod := Mod,
bcnt := BCnt,
last_reply := LID,
acc := Acc} = State) ->
- %% i("handler_recv_message(~w) -> entry", [Active]),
case handler_recv_message3(Mod, Sock, Acc, LID) of
{ok, {MCnt2, BCnt2, LID2}, NewAcc} ->
handler_maybe_activate(Mod, Sock, Active),
@@ -319,12 +359,12 @@ handler_recv_message(#{mod := Mod,
(size(Acc) =:= 0) ->
handler_done(State);
true ->
- e("client done with partial message: "
- "~n Last Reply Sent: ~w"
- "~n Message Count: ~w"
- "~n Byte Count: ~w"
- "~n Partial Message: ~w bytes",
- [LID, MCnt, BCnt, size(Acc)]),
+ ?E("client done with partial message: "
+ "~n Last Reply Sent: ~w"
+ "~n Message Count: ~w"
+ "~n Byte Count: ~w"
+ "~n Partial Message: ~w bytes",
+ [LID, MCnt, BCnt, size(Acc)]),
exit({closed_with_partial_message, LID})
end;
@@ -355,32 +395,27 @@ handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) ->
handler_recv_message2(Mod, Sock) ->
- %% i("handler_recv_message2 -> entry"),
case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
{ok, <<?TTEST_TAG:32,
?TTEST_TYPE_REQUEST:32,
ID:32,
SZ:32>> = Hdr} ->
- %% i("handler_recv_message2 -> got request header: "
- %% "~n ID: ~p"
- %% "~n SZ: ~p", [ID, SZ]),
case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
{ok, Body} when (SZ =:= size(Body)) ->
- %% i("handler_recv_message2 -> got body"),
{ok, {size(Hdr) + size(Body), ID, Body}};
{error, BReason} ->
- e("failed reading body (~w) of message ~w:"
- "~n ~p", [SZ, ID, BReason]),
+ ?E("failed reading body (~w) of message ~w:"
+ "~n ~p", [SZ, ID, BReason]),
exit({recv, body, ID, SZ, BReason})
end;
{error, timeout} = ERROR ->
- i("handler_recv_message2 -> timeout"),
+ ?I("timeout"),
ERROR;
{error, closed} = ERROR ->
ERROR;
{error, HReason} ->
- e("failed reading header of message:"
- "~n ~p", [HReason]),
+ ?E("Failed reading header of message:"
+ "~n ~p", [HReason]),
exit({recv, header, HReason})
end.
@@ -405,116 +440,6 @@ handler_recv_message3(Mod, Sock, Acc, LID) ->
-%% %% Socket in passive mode => we need to read explicitly
-%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) ->
-%% %% i("try recv msg header"),
-%% MsgSz =
-%% case gen_tcp:recv(Sock, 4*4) of
-%% {ok, <<?SOCKET_SIMPLE_TAG:32,
-%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
-%% ID:32,
-%% SZ:32>> = Hdr} ->
-%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]),
-%% case gen_tcp:recv(Sock, SZ) of
-%% {ok, Data} when (size(Data) =:= SZ) ->
-%% handler_send_reply(Sock, ID, Data),
-%% size(Hdr)+SZ;
-%% {ok, InvData} ->
-%% i("invalid data"),
-%% (catch gen_tcp:close(Sock)),
-%% exit({invalid_data, SZ, size(InvData)});
-%% {error, Reason2} ->
-%% i("Data read failed: ~p", [Reason2]),
-%% (catch gen_tcp:close(Sock)),
-%% exit({recv_data, Reason2})
-%% end;
-%% {ok, _InvHdr} ->
-%% (catch gen_tcp:close(Sock)),
-%% exit(invalid_hdr);
-%% {error, closed} ->
-%% i("we are done: "
-%% "~n Message Count: ~p"
-%% "~n Byte Count: ~p", [MCnt, BCnt]),
-%% exit(normal);
-%% {error, Reason1} ->
-%% i("Header read failed: ~p", [Reason1]),
-%% (catch gen_tcp:close(Sock)),
-%% exit({recv_hdr, Reason1})
-%% end,
-%% handler_loop(State, MCnt+1, BCnt+MsgSz);
-
-%% %% Socket in active mode (once | true) => data messages arrive
-%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) ->
-%% %% i("await msg"),
-%% try handler_recv_request(Sock, Active) of
-%% {ID, Data, MsgSz} ->
-%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
-%% handler_send_reply(Sock, ID, Data),
-%% handler_maybe_activate(Sock, Active),
-%% handler_loop(State, MCnt+1, BCnt+MsgSz)
-%% catch
-%% throw:tcp_closed ->
-%% i("we are done: "
-%% "~n Message Count: ~p"
-%% "~n Byte Count: ~p", [MCnt, BCnt]),
-%% exit(normal);
-%% throw:{tcp_error, Reason} ->
-%% i("<ERROR> TCP error ~p when: "
-%% "~n Message Count: ~p"
-%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]),
-%% exit({tcp_error, Reason})
-%% end.
-%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active),
-%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
-%% %% handler_send_reply(Sock, ID, Data),
-%% %% handler_maybe_activate(Sock, Active),
-%% %% handler_loop(State, MCnt+1, BCnt+MsgSz).
-
-
-%% handler_recv_request(Sock, Active) ->
-%% %% In theory we should also be ready for a partial header,
-%% %% but I can't be bothered...
-%% receive
-%% {tcp_closed, Sock} ->
-%% throw(tcp_closed);
-%% {tcp_error, Sock, Reason} ->
-%% throw({tcp_error, Reason});
-%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
-%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
-%% ID:32,
-%% SZ:32,
-%% Data/binary>> = Msg} when (size(Data) =:= SZ) ->
-%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
-%% {ID, Data, size(Msg)};
-%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
-%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
-%% ID:32,
-%% SZ:32,
-%% Data/binary>> = Msg} when (size(Data) < SZ) ->
-%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
-%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg))
-%% end.
-
-%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) ->
-%% handler_maybe_activate(Sock, Active),
-%% receive
-%% {tcp_closed, Sock} ->
-%% %% i("we are done (incomplete data)"),
-%% throw(tcp_closed);
-%% {tcp_error, Sock, Reason} ->
-%% throw({tcp_error, Reason});
-%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) ->
-%% %% i("[complete] received the remaining data (~w bytes) for msg ~w",
-%% %% [size(Data), ID]),
-%% {ID, <<AccData/binary, Data/binary>>, AccMsgSz+size(Data)};
-%% {tcp, Sock, Data} ->
-%% %% i("[incomplete] received ~w bytes of data for for msg ~w",
-%% %% [size(Data), ID]),
-%% handler_recv_request_data(Sock, Active, ID, SZ,
-%% <<AccData/binary, Data/binary>>,
-%% AccMsgSz+size(Data))
-%% end.
-
handler_send_reply(Mod, Sock, ID, Data) ->
SZ = size(Data),
Msg = <<?TTEST_TAG:32,
@@ -522,10 +447,8 @@ handler_send_reply(Mod, Sock, ID, Data) ->
ID:32,
SZ:32,
Data/binary>>,
- %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]),
case Mod:send(Sock, Msg) of
ok ->
- %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]),
ok;
{error, Reason} ->
(catch Mod:close(Sock)),
@@ -534,7 +457,7 @@ handler_send_reply(Mod, Sock, ID, Data) ->
handler_done(State) ->
- handler_done(State, t()).
+ handler_done(State, ?T()).
handler_done(#{start := Start,
mod := Mod,
@@ -542,7 +465,7 @@ handler_done(#{start := Start,
mcnt := MCnt,
bcnt := BCnt}, Stop) ->
(catch Mod:close(Sock)),
- exit({done, tdiff(Start, Stop), MCnt, BCnt}).
+ exit({done, ?TDIFF(Start, Stop), MCnt, BCnt}).
handler_handle_message(#{parent := Parent} = State) ->
@@ -634,45 +557,45 @@ reply(Pid, Ref, Reply) ->
%% ==========================================================================
-t() ->
- os:timestamp().
+%% t() ->
+%% os:timestamp().
-tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
- T1 = A1*1000000000+B1*1000+(C1 div 1000),
- T2 = A2*1000000000+B2*1000+(C2 div 1000),
- T2 - T1.
+%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+%% T1 = A1*1000000000+B1*1000+(C1 div 1000),
+%% T2 = A2*1000000000+B2*1000+(C2 div 1000),
+%% T2 - T1.
-formated_timestamp() ->
- format_timestamp(os:timestamp()).
+%% formated_timestamp() ->
+%% format_timestamp(os:timestamp()).
-format_timestamp({_N1, _N2, N3} = TS) ->
- {_Date, Time} = calendar:now_to_local_time(TS),
- {Hour,Min,Sec} = Time,
- FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
- [Hour, Min, Sec, round(N3/1000)]),
- lists:flatten(FormatTS).
+%% format_timestamp({_N1, _N2, N3} = TS) ->
+%% {_Date, Time} = calendar:now_to_local_time(TS),
+%% {Hour,Min,Sec} = Time,
+%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+%% [Hour, Min, Sec, round(N3/1000)]),
+%% lists:flatten(FormatTS).
-%% Time is always in number os ms (milli seconds)
-format_time(T) ->
- f("~p", [T]).
+%% %% Time is always in number os ms (milli seconds)
+%% format_time(T) ->
+%% f("~p", [T]).
%% ==========================================================================
-f(F, A) ->
- lists:flatten(io_lib:format(F, A)).
+%% f(F, A) ->
+%% lists:flatten(io_lib:format(F, A)).
-e(F, A) ->
- p(get(sname), "<ERROR> " ++ F, A).
+%% e(F, A) ->
+%% p(get(sname), "<ERROR> " ++ F, A).
-i(F) ->
- i(F, []).
+%% i(F) ->
+%% i(F, []).
-i(F, A) ->
- p(get(sname), "<INFO> " ++ F, A).
+%% i(F, A) ->
+%% p(get(sname), "<INFO> " ++ F, A).
-p(undefined, F, A) ->
- p("- ", F, A);
-p(Prefix, F, A) ->
- io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
+%% p(undefined, F, A) ->
+%% p("- ", F, A);
+%% p(Prefix, F, A) ->
+%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
index de9df857fe..d62f4d86a6 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl
@@ -21,14 +21,17 @@
-module(socket_test_ttest_tcp_server_socket).
-export([
- start_monitor/1,
+ start_monitor/2,
stop/1
]).
-define(TRANSPORT_MOD, socket_test_ttest_tcp_socket).
+%% -define(MOD(M), {?TRANSPORT_MOD, #{method => M,
+%% stats_interval => 10000}}).
+-define(MOD(M), {?TRANSPORT_MOD, #{method => M}}).
-start_monitor(Active) ->
- socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active).
+start_monitor(Method, Active) ->
+ socket_test_ttest_tcp_server:start_monitor(?MOD(Method), Active).
stop(Pid) ->
socket_test_ttest_tcp_server:stop(Pid).
diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
index 12d9e052d7..f314d01a63 100644
--- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl
+++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl
@@ -24,9 +24,9 @@
accept/1, accept/2,
active/2,
close/1,
- connect/2,
+ connect/2, connect/3,
controlling_process/2,
- listen/0, listen/1,
+ listen/0, listen/1, listen/2,
port/1,
peername/1,
recv/2, recv/3,
@@ -38,25 +38,41 @@
-define(READER_RECV_TIMEOUT, 1000).
+-define(DATA_MSG(Sock, Method, Data),
+ {socket,
+ #{sock => Sock, reader => self(), method => Method},
+ Data}).
+
+-define(CLOSED_MSG(Sock, Method),
+ {socket_closed,
+ #{sock => Sock, reader => self(), method => Method}}).
+
+-define(ERROR_MSG(Sock, Method, Reason),
+ {socket_error,
+ #{sock => Sock, reader => self(), method => Method},
+ Reason}).
+
%% ==========================================================================
-accept(#{sock := LSock}) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}) ->
case socket:accept(LSock) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
-accept(#{sock := LSock}, Timeout) ->
+accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) ->
case socket:accept(LSock, Timeout) of
{ok, Sock} ->
Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}};
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}};
{error, _} = ERROR ->
ERROR
end.
@@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) ->
%% Create a socket and connect it to a peer
connect(Addr, Port) ->
+ connect(Addr, Port, #{method => plain}).
+
+connect(Addr, Port, #{method := Method} = Opts) ->
try
begin
Sock =
@@ -100,9 +119,10 @@ connect(Addr, Port) ->
(catch socket:close(Sock)),
throw({error, {connect, CReason}})
end,
- Self = self(),
- Reader = spawn(fun() -> reader_init(Self, Sock, false) end),
- {ok, #{sock => Sock, reader => Reader}}
+ Self = self(),
+ Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end),
+ maybe_start_stats_timer(Opts, Reader),
+ {ok, #{sock => Sock, reader => Reader, method => Method}}
end
catch
throw:ERROR:_ ->
@@ -110,6 +130,16 @@ connect(Addr, Port) ->
end.
+maybe_start_stats_timer(#{stats_to := Pid,
+ stats_interval := T},
+ Reader) when is_pid(Pid) ->
+ erlang:start_timer(T, Pid, {stats, T, "reader", Reader});
+maybe_start_stats_timer(O, _) ->
+ io:format("NO STATS: "
+ "~n ~p"
+ "~n", [O]),
+ ok.
+
controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
case socket:setopt(Sock, otp, controlling_process, NewPid) of
ok ->
@@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
%% Create a listen socket
listen() ->
- listen(0).
+ listen(0, #{method => plain}).
-listen(Port) when is_integer(Port) andalso (Port >= 0) ->
+listen(Port) ->
+ listen(Port, #{method => plain}).
+listen(Port, #{method := Method} = Opts)
+ when (is_integer(Port) andalso (Port >= 0)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
try
begin
Sock = case socket:open(inet, stream, tcp) of
@@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) ->
(catch socket:close(Sock)),
throw({error, {listen, LReason}})
end,
- {ok, #{sock => Sock}}
+ {ok, #{sock => Sock, opts => Opts}}
end
catch
throw:{error, Reason}:_ ->
@@ -178,14 +212,31 @@ peername(#{sock := Sock}) ->
end.
-recv(#{sock := Sock}, Length) ->
- socket:recv(Sock, Length).
-recv(#{sock := Sock}, Length, Timeout) ->
- socket:recv(Sock, Length, Timeout).
+recv(#{sock := Sock, method := plain}, Length) ->
+ socket:recv(Sock, Length);
+recv(#{sock := Sock, method := msg}, Length) ->
+ case socket:recvmsg(Sock, Length, 0, [], infinity) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+recv(#{sock := Sock, method := plain}, Length, Timeout) ->
+ socket:recv(Sock, Length, Timeout);
+recv(#{sock := Sock, method := msg}, Length, Timeout) ->
+ case socket:recvmsg(Sock, Length, 0, [], Timeout) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
-send(#{sock := Sock}, Length) ->
- socket:send(Sock, Length).
+send(#{sock := Sock, method := plain}, Bin) ->
+ socket:send(Sock, Bin);
+send(#{sock := Sock, method := msg}, Bin) ->
+ socket:sendmsg(Sock, #{iov => [Bin]}).
shutdown(#{sock := Sock}, How) ->
@@ -203,14 +254,16 @@ sockname(#{sock := Sock}) ->
%% ==========================================================================
-reader_init(ControllingProcess, Sock, Active)
+reader_init(ControllingProcess, Sock, Active, Method)
when is_pid(ControllingProcess) andalso
- (is_boolean(Active) orelse (Active =:= once)) ->
+ (is_boolean(Active) orelse (Active =:= once)) andalso
+ ((Method =:= plain) orelse (Method =:= msg)) ->
MRef = erlang:monitor(process, ControllingProcess),
reader_loop(#{ctrl_proc => ControllingProcess,
ctrl_proc_mref => MRef,
active => Active,
- sock => Sock}).
+ sock => Sock,
+ method => Method}).
%% Never read
@@ -245,10 +298,11 @@ reader_loop(#{active := false,
%% Read *once* and then change to false
reader_loop(#{active := once,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State#{active => false});
{error, timeout} ->
receive
@@ -280,21 +334,22 @@ reader_loop(#{active := once,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end;
%% Read and forward data continuously
reader_loop(#{active := true,
sock := Sock,
+ method := Method,
ctrl_proc := Pid} = State) ->
- case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of
+ case do_recv(Method, Sock) of
{ok, Data} ->
- Pid ! {socket, #{sock => Sock, reader => self()}, Data},
+ Pid ! ?DATA_MSG(Sock, Method, Data),
reader_loop(State);
{error, timeout} ->
receive
@@ -326,20 +381,26 @@ reader_loop(#{active := true,
end;
{error, closed} ->
- Pid ! {socket_closed, #{sock => Sock, reader => self()}},
+ Pid ! ?CLOSED_MSG(Sock, Method),
exit(normal);
{error, Reason} ->
- Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason},
+ Pid ! ?ERROR_MSG(Sock, Method, Reason),
exit(Reason)
end.
-
+do_recv(plain, Sock) ->
+ socket:recv(Sock, 0, ?READER_RECV_TIMEOUT);
+do_recv(msg, Sock) ->
+ case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of
+ {ok, #{iov := [Bin]}} ->
+ {ok, Bin};
+ {error, _} = ERROR ->
+ ERROR
+ end.
%% ==========================================================================
-
-