From 7923c2c7ce4deba75d845501eb96ddce07dc5ea0 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 6 Dec 2018 12:40:37 +0100 Subject: [socket-nif] Valgrind: plugged memory leaks in nif_recv Running valgrind, found a couple of cases when allocated binaries where not released. Mostly when the recv failed for some reason. Also clear and free the environment associated with the socket (resource) (in the _dtor callback function). OTP-14831 --- erts/emulator/nifs/common/socket_int.h | 1 + erts/emulator/nifs/common/socket_nif.c | 41 ++++++++++++++++++++++++++++------ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 7d223b8259..ec17e45f25 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -378,6 +378,7 @@ extern ERL_NIF_TERM esock_atom_einval; #define ALLOC_BIN(SZ, BP) enif_alloc_binary((SZ), (BP)) #define REALLOC_BIN(SZ, BP) enif_realloc_binary((SZ), (BP)) +#define FREE_BIN(BP) enif_release_binary((BP)) #endif // SOCKET_INT_H__ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 3da895e644..57ce92d77e 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -4233,8 +4233,13 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, descP->type = type; descP->protocol = protocol; + /* + * Should we keep track of sockets (resources) in some way? + * Doing it here will require mutex to ensure data integrity, + * which will be costly. Send it somewhere? + */ res = enif_make_resource(env, descP); - enif_release_resource(descP); // We should really store a reference ... + enif_release_resource(descP); /* Keep track of the creator * This should not be a problem but just in case @@ -13592,6 +13597,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(bufP); + return res; } @@ -13633,6 +13640,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); SSDBG( descP, @@ -13664,6 +13674,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_update_current_reader(env, descP); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); return esock_make_ok3(env, atom_true, data); @@ -13707,6 +13720,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); + FREE_BIN(bufP); + return res; } else if ((saveErrno == ERRNO_BLOCK) || @@ -13721,16 +13736,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") ); - /* - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); - */ - sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); - SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); + SSDBG( descP, + ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); + FREE_BIN(bufP); + return esock_make_error(env, esock_atom_eagain); } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -13740,6 +13753,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(bufP); + return res; } @@ -13768,6 +13783,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_update_current_reader(env, descP); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); @@ -13792,6 +13810,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, cnt_inc(&descP->readByteCnt, read); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); @@ -15713,6 +15734,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { char buf[64]; /* Buffer used for building the mutex name */ + // This needs to be released when the socket is closed! descP->env = enif_alloc_env(); sprintf(buf, "socket[w,%d]", sock); @@ -16870,10 +16892,15 @@ void socket_dtor(ErlNifEnv* env, void* obj) { SocketDescriptor* descP = (SocketDescriptor*) obj; + enif_clear_env(descP->env); + enif_free_env(descP->env); + descP->env = NULL; + MDESTROY(descP->writeMtx); MDESTROY(descP->readMtx); MDESTROY(descP->accMtx); MDESTROY(descP->closeMtx); + } -- cgit v1.2.3 From c5378517cccf29d1708c71a0949664605743b478 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 6 Dec 2018 14:24:06 +0100 Subject: [socket-nif] Valgrind: plugged memory leaks in nif_[recvfromsock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); + FREE_BIN(bufP); + return esock_make_error(env, esock_atom_eagain); + } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -13906,6 +13911,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(bufP); + return res; } @@ -13983,6 +13990,9 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, * *We* do never actually try to read 0 bytes from a stream socket! */ + + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return esock_make_error(env, atom_closed); } @@ -14023,6 +14033,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return res; } else if ((saveErrno == ERRNO_BLOCK) || @@ -14037,7 +14049,10 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return esock_make_error(env, esock_atom_eagain); + } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -14047,6 +14062,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return res; } @@ -14078,6 +14095,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, recv_update_current_reader(env, descP); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return esock_make_error_str(env, xres); } else { -- cgit v1.2.3 From d0c3f79d22b4778f66ac1d8a2fc03e736f42e973 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 7 Dec 2018 18:40:24 +0100 Subject: [socket-nif|test] ttest improvements Added a ttest lib module for some common functions. Added a process (server handler and reader processes) stats printouts. So far only used by the server. There is still a "leak". Its a term leak. Some of the functions take a ref as argument (recv, send and accept for instance). This is stored internally, by way of a call to the enif_make_copy, in order to be used later in a select call. Its not "released" though, until the environment is released, which happens when the socket dtor callback function is called. Possible solution: We need to keep "temporary" environments (one for each of the queues), which we can clear (basically we need two, one that is currently used for new ref's and one for the old ref's). OTP-14831 --- erts/emulator/test/Makefile | 1 + erts/emulator/test/socket_test_ttest_lib.erl | 125 +++++++ .../emulator/test/socket_test_ttest_tcp_client.erl | 261 ++++++-------- .../test/socket_test_ttest_tcp_client_socket.erl | 15 +- .../emulator/test/socket_test_ttest_tcp_server.erl | 391 +++++++++------------ .../test/socket_test_ttest_tcp_server_socket.erl | 9 +- .../emulator/test/socket_test_ttest_tcp_socket.erl | 129 +++++-- 7 files changed, 508 insertions(+), 423 deletions(-) create mode 100644 erts/emulator/test/socket_test_ttest_lib.erl diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index a910588381..09bfe6f104 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -34,6 +34,7 @@ SOCKET_MODULES = \ socket_client \ socket_test_lib \ socket_test_evaluator \ + socket_test_ttest_lib \ socket_test_ttest_tcp_gen \ socket_test_ttest_tcp_socket \ socket_test_ttest_tcp_client \ diff --git a/erts/emulator/test/socket_test_ttest_lib.erl b/erts/emulator/test/socket_test_ttest_lib.erl new file mode 100644 index 0000000000..71679bc6d6 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_lib.erl @@ -0,0 +1,125 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_lib). + +-compile({no_auto_import, [error/2]}). + +-export([ + t/0, tdiff/2, + formated_timestamp/0, format_timestamp/1, + format_time/1, + + formated_process_stats/1, formated_process_stats/2, + + format/2, + error/1, error/2, + info/1, info/2 + ]). + +%% ========================================================================== + +t() -> + os:timestamp(). + +tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> + T1 = A1*1000000000+B1*1000+(C1 div 1000), + T2 = A2*1000000000+B2*1000+(C2 div 1000), + T2 - T1. + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + {Hour,Min,Sec} = Time, + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", + [Hour, Min, Sec, round(N3/1000)]), + lists:flatten(FormatTS). + +%% Time is always in number os ms (milli seconds) +%% At some point, we should convert this to a more readable format... +format_time(T) -> + format("~p", [T]). + + +formated_process_stats(Pid) -> + formated_process_stats("", Pid). + +formated_process_stats(Prefix, Pid) when is_list(Prefix) andalso is_pid(Pid) -> + try + begin + TotHeapSz = pi(Pid, total_heap_size), + HeapSz = pi(Pid, heap_size), + StackSz = pi(Pid, stack_size), + Reds = pi(Pid, reductions), + GCInfo = pi(Pid, garbage_collection), + MinBinVHeapSz = proplists:get_value(min_bin_vheap_size, GCInfo), + MinHeapSz = proplists:get_value(min_heap_size, GCInfo), + MinGCS = proplists:get_value(minor_gcs, GCInfo), + format("~n ~sTotal Heap Size: ~p" + "~n ~sHeap Size: ~p" + "~n ~sStack Size: ~p" + "~n ~sReductions: ~p" + "~n ~s[GC] Min Bin VHeap Size: ~p" + "~n ~s[GC] Min Heap Size: ~p" + "~n ~s[GC] Minor GCS: ~p", + [Prefix, TotHeapSz, + Prefix, HeapSz, + Prefix, StackSz, + Prefix, Reds, + Prefix, MinBinVHeapSz, + Prefix, MinHeapSz, + Prefix, MinGCS]) + end + catch + _:_:_ -> + "" + end. + + +pi(Pid, Item) -> + {Item, Info} = process_info(Pid, Item), + Info. + + + +%% ========================================================================== + +format(F, A) -> + lists:flatten(io_lib:format(F, A)). + +error(F) -> + error(F, []). + +error(F, A) -> + print(get(sname), " " ++ F, A). + +info(F) -> + info(F, []). + +info(F, A) -> + print(get(sname), " " ++ F, A). + +print(undefined, F, A) -> + print("- ", F, A); +print(Prefix, F, A) -> + io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). + diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl index 1bd1bc54e9..9c43c41841 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_client.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl @@ -54,6 +54,14 @@ -define(MAX_OUTSTANDING_DEFAULT_2, 10). -define(MAX_OUTSTANDING_DEFAULT_3, 3). +-define(LIB, socket_test_ttest_lib). +-define(I(F), ?LIB:info(F)). +-define(I(F,A), ?LIB:info(F, A)). +-define(E(F,A), ?LIB:error(F, A)). +-define(F(F,A), ?LIB:format(F, A)). +-define(FORMAT_TIME(T), ?LIB:format_time(T)). +-define(T(), ?LIB:t()). +-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). -type active() :: once | boolean(). -type msg_id() :: 1..3. @@ -63,42 +71,28 @@ %% ========================================================================== --spec start_monitor(Mod, Active, Addr, Port) -> term() when - Mod :: atom(), - Active :: active(), - Addr :: inet:ip_address(), - Port :: inet:port_number(). +start_monitor(Transport, Active, Addr, Port) -> + start_monitor(Transport, Active, Addr, Port, ?MSG_ID_DEFAULT). %% RunTime is in number of ms. -start_monitor(Mod, Active, Addr, Port) -> - start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT). - --spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when - Mod :: atom(), - Active :: active(), - Addr :: inet:ip_address(), - Port :: inet:port_number(), - MsgID :: msg_id(). - -%% RunTime is in number of ms. -start_monitor(Mod, Active, Addr, Port, 1 = MsgID) -> - start_monitor(Mod, Active, Addr, Port, MsgID, +start_monitor(Transport, Active, Addr, Port, 1 = MsgID) -> + start_monitor(Transport, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); -start_monitor(Mod, Active, Addr, Port, 2 = MsgID) -> - start_monitor(Mod, Active, Addr, Port, MsgID, +start_monitor(Transport, Active, Addr, Port, 2 = MsgID) -> + start_monitor(Transport, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); -start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> - start_monitor(Mod, Active, Addr, Port, MsgID, +start_monitor(Transport, Active, Addr, Port, 3 = MsgID) -> + start_monitor(Transport, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). --spec start_monitor(Mod, +-spec start_monitor(Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> term() when - Mod :: atom(), + Transport :: atom() | tuple(), Active :: active(), Addr :: inet:ip_address(), Port :: inet:port_number(), @@ -107,9 +101,9 @@ start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> RunTime :: runtime(). %% RunTime is in number of ms. -start_monitor(Mod, Active, Addr, Port, +start_monitor(Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) - when is_atom(Mod) andalso + when (is_atom(Transport) orelse is_tuple(Transport)) andalso (is_boolean(Active) orelse (Active =:= once)) andalso is_tuple(Addr) andalso (is_integer(Port) andalso (Port > 0)) andalso @@ -119,7 +113,7 @@ start_monitor(Mod, Active, Addr, Port, Self = self(), ClientInit = fun() -> put(sname, "client"), init(Self, - Mod, Active, Addr, Port, + Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) end, {Pid, MRef} = spawn_monitor(ClientInit), @@ -147,28 +141,29 @@ stop(Pid) when is_pid(Pid) -> %% ========================================================================== -init(Parent, Mod, Active, Addr, Port, +init(Parent, Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> - i("init -> entry with" - "~n Parent: ~p" - "~n Mod: ~p" - "~n Active: ~p" - "~n Addr: ~s" - "~n Port: ~p" - "~n Msg ID: ~p (=> 16 + ~w bytes)" - "~n Max Outstanding: ~p" - "~n (Suggested) Run Time: ~p ms", - [Parent, - Mod, Active, inet:ntoa(Addr), Port, - MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), - case Mod:connect(Addr, Port) of + ?I("init with" + "~n Parent: ~p" + "~n Transport: ~p" + "~n Active: ~p" + "~n Addr: ~s" + "~n Port: ~p" + "~n Msg ID: ~p (=> 16 + ~w bytes)" + "~n Max Outstanding: ~p" + "~n (Suggested) Run Time: ~p ms", + [Parent, + Transport, Active, inet:ntoa(Addr), Port, + MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), + {Mod, Connect} = process_transport(Transport), + case Connect(Addr, Port) of {ok, Sock} -> - i("init -> connected"), + ?I("connected"), Parent ! {?MODULE, self(), ok}, initial_activation(Mod, Sock, Active), Results = loop(#{slogan => run, runtime => RunTime, - start => t(), + start => ?T(), parent => Parent, mod => Mod, sock => Sock, @@ -187,10 +182,16 @@ init(Parent, Mod, Active, Addr, Port, (catch Mod:close(Sock)), exit(normal); {error, Reason} -> - i("init -> connect failed: ~p", [Reason]), + ?E("connect failed: ~p", [Reason]), exit({connect, Reason}) end. +process_transport(Mod) when is_atom(Mod) -> + {Mod, fun(A, P) -> Mod:connect(A, P) end}; +process_transport({Mod, Opts}) -> + {Mod, fun(A, P) -> Mod:connect(A, P, Opts) end}. + + which_msg_data(1) -> ?MSG_DATA1; which_msg_data(2) -> ?MSG_DATA2; which_msg_data(3) -> ?MSG_DATA3. @@ -200,21 +201,21 @@ present_results(#{status := ok, runtime := RunTime, bcnt := ByteCnt, cnt := NumIterations}) -> - i("Results: " - "~n Run Time: ~s" - "~n ByteCnt: ~s" - "~n NumIterations: ~s", - [format_time(RunTime), - if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> - f("~w, ~w", [ByteCnt, RunTime]); + ?I("Results: " + "~n Run Time: ~s" + "~n ByteCnt: ~s" + "~n NumIterations: ~s", + [?FORMAT_TIME(RunTime), + if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> + ?F("~w, ~w", [ByteCnt, RunTime]); true -> - f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) + ?F("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) end, if (RunTime =:= 0) -> "-"; true -> - f("~p => ~p iterations / ms", - [NumIterations, NumIterations div RunTime]) + ?F("~p => ~p iterations / ms", + [NumIterations, NumIterations div RunTime]) end]), ok; present_results(#{status := Failure, @@ -225,21 +226,21 @@ present_results(#{status := Failure, rcnt := RCnt, bcnt := BCnt, num := Num}) -> - i("Time Test failed: " - "~n ~p" - "~n" - "~nwhen" - "~n" - "~n Run Time: ~s" - "~n Send ID: ~p" - "~n Recv ID: ~p" - "~n Send Count: ~p" - "~n Recv Count: ~p" - "~n Byte Count: ~p" - "~n Num Iterations: ~p", - [Failure, - format_time(RunTime), - SID, RID, SCnt, RCnt, BCnt, Num]). + ?I("Time Test failed: " + "~n ~p" + "~n" + "~nwhen" + "~n" + "~n Run Time: ~s" + "~n Send ID: ~p" + "~n Recv ID: ~p" + "~n Send Count: ~p" + "~n Recv Count: ~p" + "~n Byte Count: ~p" + "~n Num Iterations: ~p", + [Failure, + ?FORMAT_TIME(RunTime), + SID, RID, SCnt, RCnt, BCnt, Num]). @@ -255,17 +256,14 @@ do_loop(State) -> do_loop( handle_message( msg_exchange(State) ) ). msg_exchange(#{rcnt := Num, num := Num} = State) -> - %% i("we are done"), finish(ok, State); msg_exchange(#{scnt := Num, num := Num} = State) -> %% We are done sending more requests - now we will just await %% the replies for the (still) outstanding replies. - %% i("we have sent all requests - (only) wait for replies"), msg_exchange( recv_reply(State) ); msg_exchange(#{outstanding := Outstanding, max_outstanding := MaxOutstanding} = State) when (Outstanding < MaxOutstanding) -> - %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]), msg_exchange( send_request(State) ); msg_exchange(State) -> send_request( recv_reply(State) ). @@ -273,9 +271,9 @@ msg_exchange(State) -> finish(ok, #{start := Start, bcnt := BCnt, num := Num}) -> - Stop = t(), + Stop = ?T(), throw(#{status => ok, - runtime => tdiff(Start, Stop), + runtime => ?TDIFF(Start, Stop), bcnt => BCnt, cnt => Num}); finish(Reason, @@ -283,9 +281,9 @@ finish(Reason, sid := SID, rid := RID, scnt := SCnt, rcnt := RCnt, bcnt := BCnt, num := Num}) -> - Stop = t(), + Stop = ?T(), throw(#{status => Reason, - runtime => tdiff(Start, Stop), + runtime => ?TDIFF(Start, Stop), sid => SID, rid => RID, scnt => SCnt, @@ -301,11 +299,6 @@ send_request(#{mod := Mod, max_outstanding := MaxOutstanding, msg_data := Data} = State) when (MaxOutstanding > Outstanding) -> - %% i("send request -> entry when" - %% "~n ID: ~p" - %% "~n Cnt: ~p" - %% "~n Outstanding: ~p" - %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]), SZ = size(Data), Req = <>, case Mod:send(Sock, Req) of ok -> - %% i("~w bytes sent", [size(Req)]), State#{sid => next_id(ID), scnt => Cnt + 1, outstanding => Outstanding + 1}; {error, Reason} -> - e("Failed sending request: ~p", [Reason]), + ?E("Failed sending request: ~p", [Reason]), exit({send, Reason}) end; send_request(State) -> @@ -334,11 +326,6 @@ recv_reply(#{mod := Mod, bcnt := BCnt, rcnt := Cnt, outstanding := Outstanding} = State) -> - %% i("recv-reply(false) -> entry with" - %% "~n (R)ID: ~p" - %% "~n (R)Cnt: ~p" - %% "~n BCnt: ~p" - %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]), case recv_reply_message1(Mod, Sock, ID) of {ok, MsgSz} -> State#{rid => next_id(ID), @@ -347,7 +334,7 @@ recv_reply(#{mod := Mod, outstanding => Outstanding - 1}; {error, timeout} -> - i("recv_reply(false) -> error: timeout"), + ?I("receive timeout"), State; {error, Reason} -> @@ -362,11 +349,6 @@ recv_reply(#{mod := Mod, rcnt := RCnt, outstanding := Outstanding, acc := Acc} = State) -> - %% i("recv-reply(~w) -> entry with" - %% "~n (R)ID: ~p" - %% "~n RCnt: ~p" - %% "~n BCnt: ~p" - %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]), case recv_reply_message2(Mod, Sock, ID, Acc) of {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> maybe_activate(Mod, Sock, Active), @@ -380,12 +362,12 @@ recv_reply(#{mod := Mod, State; {error, stop} -> - i("recv_reply(~w) -> stop", [Active]), + ?I("receive [~w] -> stop", [Active]), %% This will have the effect that no more requests are sent... - State#{num => SCnt, stop_started => t()}; + State#{num => SCnt, stop_started => ?T()}; {error, timeout} -> - i("recv_reply(~w) -> error: timeout", [Active]), + ?I("receive[~w] -> timeout", [Active]), State; {error, Reason} -> @@ -395,23 +377,19 @@ recv_reply(#{mod := Mod, %% This function reads exactly one (reply) message. No more no less. recv_reply_message1(Mod, Sock, ID) -> - %% i("recv_reply_message1 -> entry with" - %% "~n ID: ~w", [ID]), case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <> = Hdr} -> %% Receive the ping-pong reply boby - %% i("recv_reply_message1 -> try read body" - %% "~n ID: ~w", [ID]), case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Data} when (size(Data) =:= SZ) -> {ok, size(Hdr) + size(Data)}; {error, Reason2} -> - i("recv_reply_message1 -> body error: " - "~n ~p: ~p", [Reason2]), - {error, {recv_hdr, Reason2}} + ?E("Failed reading body: " + "~n ~p: ~p", [Reason2]), + {error, {recv_body, Reason2}} end; {ok, < {error, invalid_hdr}; {error, Reason1} -> - i("recv_reply_message1 -> hdr error: " + ?E("Feiled reading header: " "~n ~p", [Reason1]), {error, {recv_hdr, Reason1}} end. @@ -437,8 +415,6 @@ recv_reply_message1(Mod, Sock, ID) -> %% accumulated. If that is not enough for a (complete) reply, it %% will attempt to receive more. recv_reply_message2(Mod, Sock, ID, Acc) -> - %% i("recv_reply_message2 -> entry with" - %% "~n ID: ~w", [ID]), case process_acc_data(ID, Acc) of ok -> %% No or insufficient data, so get more @@ -456,7 +432,6 @@ recv_reply_message2(Mod, Sock, ID, Acc) -> recv_reply_message3(_Mod, Sock, ID, Acc) -> receive {timeout, _TRef, stop} -> - %% i("stop - when messages: ~p", [process_info(self(), messages)]), {error, stop}; {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse @@ -469,7 +444,6 @@ recv_reply_message3(_Mod, Sock, ID, Acc) -> {Tag, Sock, Msg} when (Tag =:= tcp) orelse (Tag =:= socket) -> - %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]), process_acc_data(ID, <>) %% after ?RECV_TIMEOUT -> @@ -482,9 +456,6 @@ process_acc_data(ID, <>) when (SZ =< size(Data)) -> - %% i("process_acc_data -> entry with" - %% "~n ID: ~w" - %% "~n SZ: ~w", [ID, SZ]), <<_Body:SZ/binary, Rest/binary>> = Data, {ok, {4*4+SZ, Rest}}; process_acc_data(ID, < handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) -> receive {timeout, _TRef, stop} -> - i("stop"), + ?I("STOP"), %% This will have the effect that no more requests are sent... - State#{num => SCnt, stop_started => t()}; + State#{num => SCnt, stop_started => ?T()}; {?MODULE, Ref, Parent, stop} -> %% This *aborts* the test @@ -571,47 +542,47 @@ next_id(_) -> %% ========================================================================== -t() -> - os:timestamp(). +%% t() -> +%% os:timestamp(). -tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> - T1 = A1*1000000000+B1*1000+(C1 div 1000), - T2 = A2*1000000000+B2*1000+(C2 div 1000), - T2 - T1. +%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> +%% T1 = A1*1000000000+B1*1000+(C1 div 1000), +%% T2 = A2*1000000000+B2*1000+(C2 div 1000), +%% T2 - T1. -formated_timestamp() -> - format_timestamp(os:timestamp()). +%% formated_timestamp() -> +%% format_timestamp(os:timestamp()). -format_timestamp({_N1, _N2, N3} = TS) -> - {_Date, Time} = calendar:now_to_local_time(TS), - {Hour,Min,Sec} = Time, - FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", - [Hour, Min, Sec, round(N3/1000)]), - lists:flatten(FormatTS). +%% format_timestamp({_N1, _N2, N3} = TS) -> +%% {_Date, Time} = calendar:now_to_local_time(TS), +%% {Hour,Min,Sec} = Time, +%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", +%% [Hour, Min, Sec, round(N3/1000)]), +%% lists:flatten(FormatTS). -%% Time is always in number os ms (milli seconds) -format_time(T) -> - f("~p", [T]). +%% %% Time is always in number os ms (milli seconds) +%% format_time(T) -> +%% f("~p", [T]). %% ========================================================================== -f(F, A) -> - lists:flatten(io_lib:format(F, A)). +%% f(F, A) -> +%% lists:flatten(io_lib:format(F, A)). -%% e(F) -> -%% i(" " ++ F). +%% %% e(F) -> +%% %% i(" " ++ F). -e(F, A) -> - p(get(sname), " " ++ F, A). +%% e(F, A) -> +%% p(get(sname), " " ++ F, A). -i(F) -> - i(F, []). +%% i(F) -> +%% i(F, []). -i(F, A) -> - p(get(sname), " " ++ F, A). +%% i(F, A) -> +%% p(get(sname), " " ++ F, A). -p(undefined, F, A) -> - p("- ", F, A); -p(Prefix, F, A) -> - io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). +%% p(undefined, F, A) -> +%% p("- ", F, A); +%% p(Prefix, F, A) -> +%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl index 0d2ec38876..ef980e8d32 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl @@ -21,23 +21,24 @@ -module(socket_test_ttest_tcp_client_socket). -export([ - start_monitor/3, start_monitor/4, start_monitor/6, + start_monitor/4, start_monitor/5, start_monitor/7, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). +-define(MOD(M), {?TRANSPORT_MOD, #{method => Method}}). -start_monitor(Active, Addr, Port) -> - socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, +start_monitor(Method, Active, Addr, Port) -> + socket_test_ttest_tcp_client:start_monitor(?MOD(Method), Active, Addr, Port). -start_monitor(Active, Addr, Port, MsgID) -> - socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, +start_monitor(Method, Active, Addr, Port, MsgID) -> + socket_test_ttest_tcp_client:start_monitor(?MOD(Method), Active, Addr, Port, MsgID). -start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> - socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, +start_monitor(Method, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> + socket_test_ttest_tcp_client:start_monitor(?MOD(Method), Active, Addr, Port, MsgID, MaxOutstanding, RunTime). diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl index cb503a1feb..b248b063a3 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl @@ -44,13 +44,25 @@ -define(ACC_TIMEOUT, 10000). -define(RECV_TIMEOUT, 10000). +-define(LIB, socket_test_ttest_lib). +-define(I(F), ?LIB:info(F)). +-define(I(F,A), ?LIB:info(F, A)). +-define(E(F,A), ?LIB:error(F, A)). +-define(F(F,A), ?LIB:format(F, A)). +-define(FORMAT_TIME(T), ?LIB:format_time(T)). +-define(T(), ?LIB:t()). +-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). + %% ========================================================================== -start_monitor(Mod, Active) - when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) -> +start_monitor(Transport, Active) + when (is_atom(Transport) orelse is_tuple(Transport)) andalso + (is_boolean(Active) orelse (Active =:= once)) -> Self = self(), - ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end, + ServerInit = fun() -> put(sname, "server"), + server_init(Self, Transport, Active) + end, {Pid, MRef} = spawn_monitor(ServerInit), receive {'DOWN', MRef, process, Pid, normal} -> @@ -73,26 +85,28 @@ stop(Pid) when is_pid(Pid) -> %% ========================================================================== -server_init(Parent, Mod, Active) -> - i("init -> entry with" - "~n Parent: ~p" - "~n Mod: ~p" - "~n Active: ~p", [Parent, Mod, Active]), - case Mod:listen(0) of +server_init(Parent, Transport, Active) -> + ?I("init -> entry with" + "~n Parent: ~p" + "~n Transport: ~p" + "~n Active: ~p", [Parent, Transport, Active]), + {Mod, Listen, StatsInterval} = process_transport(Transport, Active), + case Listen(0) of {ok, LSock} -> case Mod:port(LSock) of {ok, Port} = OK -> Addr = which_addr(), % This is just for convenience - i("init -> listening on:" - "~n Addr: ~p (~s)" - "~n Port: ~w" - "~n", [Addr, inet:ntoa(Addr), Port]), + ?I("init -> listening on:" + "~n Addr: ~p (~s)" + "~n Port: ~w" + "~n", [Addr, inet:ntoa(Addr), Port]), Parent ! {?MODULE, self(), OK}, - server_loop(#{parent => Parent, - mod => Mod, - active => Active, - lsock => LSock, - handlers => [], + server_loop(#{parent => Parent, + mod => Mod, + active => Active, + lsock => LSock, + handlers => [], + stats_interval => StatsInterval, %% Accumulation runtime => 0, mcnt => 0, @@ -107,37 +121,40 @@ server_init(Parent, Mod, Active) -> exit({listen, LReason}) end. +process_transport(Mod, _) when is_atom(Mod) -> + {Mod, fun(Port) -> Mod:listen(Port) end, infinity}; +process_transport({Mod, #{stats_interval := T} = Opts}, Active) + when (Active =/= false) -> + {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T}; +process_transport({Mod, #{stats_interval := T} = Opts}, _Active) -> + {Mod, fun(Port) -> Mod:listen(Port, Opts) end, T}; +process_transport({Mod, Opts}, _Active) -> + {Mod, fun(Port) -> Mod:listen(Port, Opts) end, infinity}. + + server_loop(State) -> - %% i("loop -> enter"), server_loop( server_handle_message( server_accept(State) ) ). server_accept(#{mod := Mod, active := Active, lsock := LSock, handlers := Handlers} = State) -> - - %% i("server-accept -> entry with" - %% "~n Mod: ~p" - %% "~n Active: ~p" - %% "~n LSock: ~p", [Mod, Active, LSock]), - case Mod:accept(LSock, ?ACC_TIMEOUT) of {ok, Sock} -> - %% i("server-accept -> accepted: " - %% "~n Sock: ~p", [Sock]), - i("accepted connection from ~s", - [case Mod:peername(Sock) of - {ok, Peer} -> - format_peername(Peer); - {error, _} -> - "-" - end]), + ?I("accepted connection from ~s", + [case Mod:peername(Sock) of + {ok, Peer} -> + format_peername(Peer); + {error, _} -> + "-" + end]), {Pid, _} = handler_start(), - i("handler ~p started -> try transfer socket control", [Pid]), + ?I("handler ~p started -> try transfer socket control", [Pid]), case Mod:controlling_process(Sock, Pid) of ok -> - i("server-accept: handler ~p started", [Pid]), + maybe_start_stats_timer(State, Pid), + ?I("server-accept: handler ~p started", [Pid]), handler_continue(Pid, Mod, Sock, Active), Handlers2 = [Pid | Handlers], State#{handlers => Handlers2}; @@ -156,14 +173,31 @@ server_accept(#{mod := Mod, format_peername({Addr, Port}) -> case inet:gethostbyaddr(Addr) of {ok, #hostent{h_name = N}} -> - f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); + ?F("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); {error, _} -> - f("~p, ~p", [Addr, Port]) + ?F("~p, ~p", [Addr, Port]) end. - + +maybe_start_stats_timer(#{active := Active, stats_interval := Time}, Handler) + when (Active =/= false) andalso (is_integer(Time) andalso (Time > 0)) -> + start_stats_timer(Time, "handler", Handler); +maybe_start_stats_timer(_, _) -> + ok. + +start_stats_timer(Time, ProcStr, Pid) -> + erlang:start_timer(Time, self(), {stats, Time, ProcStr, Pid}). + server_handle_message(#{parent := Parent, handlers := H} = State) -> - %% i("server_handle_message -> enter"), receive + {timeout, _TRef, {stats, Interval, ProcStr, Pid}} -> + case server_handle_stats(ProcStr, Pid) of + ok -> + start_stats_timer(Interval, ProcStr, Pid); + skip -> + ok + end, + State; + {?MODULE, Ref, Parent, stop} -> reply(Parent, Ref, ok), lists:foreach(fun(P) -> handler_stop(P) end, H), @@ -176,11 +210,20 @@ server_handle_message(#{parent := Parent, handlers := H} = State) -> State end. +server_handle_stats(ProcStr, Pid) -> + case ?LIB:formated_process_stats(Pid) of + "" -> + skip; + FormatedStats -> + ?I("Statistics for ~s ~p:~s", [ProcStr, Pid, FormatedStats]), + ok + end. + server_handle_down(Pid, Reason, #{handlers := Handlers} = State) -> case lists:delete(Pid, Handlers) of Handlers -> - i("unknown process ~p died", [Pid]), + ?I("unknown process ~p died", [Pid]), State; Handlers2 -> server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2}) @@ -197,35 +240,35 @@ server_handle_handler_down(Pid, AccMCnt2 = AccMCnt + MCnt, AccBCnt2 = AccBCnt + BCnt, AccHCnt2 = AccHCnt + 1, - i("handler ~p (~w) done => accumulated results: " - "~n Run Time: ~s ms" - "~n Message Count: ~s" - "~n Byte Count: ~s", - [Pid, AccHCnt2, - format_time(AccRunTime2), - if (AccRunTime2 > 0) -> - f("~w => ~w (~w) msgs / ms", - [AccMCnt2, - AccMCnt2 div AccRunTime2, - (AccMCnt2 div AccHCnt2) div AccRunTime2]); - true -> - f("~w", [AccMCnt2]) - end, - if (AccRunTime2 > 0) -> - f("~w => ~w (~w) bytes / ms", - [AccBCnt2, - AccBCnt2 div AccRunTime2, - (AccBCnt2 div AccHCnt2) div AccRunTime2]); - true -> - f("~w", [AccBCnt2]) - end]), + ?I("handler ~p (~w) done => accumulated results: " + "~n Run Time: ~s ms" + "~n Message Count: ~s" + "~n Byte Count: ~s", + [Pid, AccHCnt2, + ?FORMAT_TIME(AccRunTime2), + if (AccRunTime2 > 0) -> + ?F("~w => ~w (~w) msgs / ms", + [AccMCnt2, + AccMCnt2 div AccRunTime2, + (AccMCnt2 div AccHCnt2) div AccRunTime2]); + true -> + ?F("~w", [AccMCnt2]) + end, + if (AccRunTime2 > 0) -> + ?F("~w => ~w (~w) bytes / ms", + [AccBCnt2, + AccBCnt2 div AccRunTime2, + (AccBCnt2 div AccHCnt2) div AccRunTime2]); + true -> + ?F("~w", [AccBCnt2]) + end]), State#{runtime => AccRunTime2, mcnt => AccMCnt2, bcnt => AccBCnt2, hcnt => AccHCnt2}; server_handle_handler_down(Pid, Reason, State) -> - i("handler ~p terminated: " - "~n ~p", [Pid, Reason]), + ?I("handler ~p terminated: " + "~n ~p", [Pid, Reason]), State. @@ -244,32 +287,31 @@ handler_stop(Pid) -> req(Pid, stop). handler_init(Parent) -> - i("starting"), + ?I("starting"), receive {?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} -> - i("received continue"), + ?I("received continue"), reply(Parent, Ref, ok), handler_initial_activation(Mod, Sock, Active), handler_loop(#{parent => Parent, mod => Mod, sock => Sock, active => Active, - start => t(), + start => ?T(), mcnt => 0, bcnt => 0, last_reply => none, acc => <<>>}) after 5000 -> - i("timeout when message queue: " - "~n ~p" - "~nwhen" - "~n Parent: ~p", [process_info(self(), messages), Parent]), + ?I("timeout when message queue: " + "~n ~p" + "~nwhen" + "~n Parent: ~p", [process_info(self(), messages), Parent]), handler_init(Parent) end. handler_loop(State) -> - %% i("handler-loop"), handler_loop( handler_handle_message( handler_recv_message(State) ) ). %% When passive, we read *one* request and then attempt to reply to it. @@ -278,7 +320,6 @@ handler_recv_message(#{mod := Mod, active := false, mcnt := MCnt, bcnt := BCnt} = State) -> - %% i("handler_recv_message(false) -> entry"), case handler_recv_message2(Mod, Sock) of {ok, {MsgSz, ID, Body}} -> handler_send_reply(Mod, Sock, ID, Body), @@ -305,7 +346,6 @@ handler_recv_message(#{mod := Mod, bcnt := BCnt, last_reply := LID, acc := Acc} = State) -> - %% i("handler_recv_message(~w) -> entry", [Active]), case handler_recv_message3(Mod, Sock, Acc, LID) of {ok, {MCnt2, BCnt2, LID2}, NewAcc} -> handler_maybe_activate(Mod, Sock, Active), @@ -319,12 +359,12 @@ handler_recv_message(#{mod := Mod, (size(Acc) =:= 0) -> handler_done(State); true -> - e("client done with partial message: " - "~n Last Reply Sent: ~w" - "~n Message Count: ~w" - "~n Byte Count: ~w" - "~n Partial Message: ~w bytes", - [LID, MCnt, BCnt, size(Acc)]), + ?E("client done with partial message: " + "~n Last Reply Sent: ~w" + "~n Message Count: ~w" + "~n Byte Count: ~w" + "~n Partial Message: ~w bytes", + [LID, MCnt, BCnt, size(Acc)]), exit({closed_with_partial_message, LID}) end; @@ -355,32 +395,27 @@ handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) -> handler_recv_message2(Mod, Sock) -> - %% i("handler_recv_message2 -> entry"), case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <> = Hdr} -> - %% i("handler_recv_message2 -> got request header: " - %% "~n ID: ~p" - %% "~n SZ: ~p", [ID, SZ]), case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Body} when (SZ =:= size(Body)) -> - %% i("handler_recv_message2 -> got body"), {ok, {size(Hdr) + size(Body), ID, Body}}; {error, BReason} -> - e("failed reading body (~w) of message ~w:" - "~n ~p", [SZ, ID, BReason]), + ?E("failed reading body (~w) of message ~w:" + "~n ~p", [SZ, ID, BReason]), exit({recv, body, ID, SZ, BReason}) end; {error, timeout} = ERROR -> - i("handler_recv_message2 -> timeout"), + ?I("timeout"), ERROR; {error, closed} = ERROR -> ERROR; {error, HReason} -> - e("failed reading header of message:" - "~n ~p", [HReason]), + ?E("Failed reading header of message:" + "~n ~p", [HReason]), exit({recv, header, HReason}) end. @@ -405,116 +440,6 @@ handler_recv_message3(Mod, Sock, Acc, LID) -> -%% %% Socket in passive mode => we need to read explicitly -%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) -> -%% %% i("try recv msg header"), -%% MsgSz = -%% case gen_tcp:recv(Sock, 4*4) of -%% {ok, <> = Hdr} -> -%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]), -%% case gen_tcp:recv(Sock, SZ) of -%% {ok, Data} when (size(Data) =:= SZ) -> -%% handler_send_reply(Sock, ID, Data), -%% size(Hdr)+SZ; -%% {ok, InvData} -> -%% i("invalid data"), -%% (catch gen_tcp:close(Sock)), -%% exit({invalid_data, SZ, size(InvData)}); -%% {error, Reason2} -> -%% i("Data read failed: ~p", [Reason2]), -%% (catch gen_tcp:close(Sock)), -%% exit({recv_data, Reason2}) -%% end; -%% {ok, _InvHdr} -> -%% (catch gen_tcp:close(Sock)), -%% exit(invalid_hdr); -%% {error, closed} -> -%% i("we are done: " -%% "~n Message Count: ~p" -%% "~n Byte Count: ~p", [MCnt, BCnt]), -%% exit(normal); -%% {error, Reason1} -> -%% i("Header read failed: ~p", [Reason1]), -%% (catch gen_tcp:close(Sock)), -%% exit({recv_hdr, Reason1}) -%% end, -%% handler_loop(State, MCnt+1, BCnt+MsgSz); - -%% %% Socket in active mode (once | true) => data messages arrive -%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) -> -%% %% i("await msg"), -%% try handler_recv_request(Sock, Active) of -%% {ID, Data, MsgSz} -> -%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), -%% handler_send_reply(Sock, ID, Data), -%% handler_maybe_activate(Sock, Active), -%% handler_loop(State, MCnt+1, BCnt+MsgSz) -%% catch -%% throw:tcp_closed -> -%% i("we are done: " -%% "~n Message Count: ~p" -%% "~n Byte Count: ~p", [MCnt, BCnt]), -%% exit(normal); -%% throw:{tcp_error, Reason} -> -%% i(" TCP error ~p when: " -%% "~n Message Count: ~p" -%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]), -%% exit({tcp_error, Reason}) -%% end. -%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active), -%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), -%% %% handler_send_reply(Sock, ID, Data), -%% %% handler_maybe_activate(Sock, Active), -%% %% handler_loop(State, MCnt+1, BCnt+MsgSz). - - -%% handler_recv_request(Sock, Active) -> -%% %% In theory we should also be ready for a partial header, -%% %% but I can't be bothered... -%% receive -%% {tcp_closed, Sock} -> -%% throw(tcp_closed); -%% {tcp_error, Sock, Reason} -> -%% throw({tcp_error, Reason}); -%% {tcp, Sock, <> = Msg} when (size(Data) =:= SZ) -> -%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), -%% {ID, Data, size(Msg)}; -%% {tcp, Sock, <> = Msg} when (size(Data) < SZ) -> -%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), -%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg)) -%% end. - -%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) -> -%% handler_maybe_activate(Sock, Active), -%% receive -%% {tcp_closed, Sock} -> -%% %% i("we are done (incomplete data)"), -%% throw(tcp_closed); -%% {tcp_error, Sock, Reason} -> -%% throw({tcp_error, Reason}); -%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) -> -%% %% i("[complete] received the remaining data (~w bytes) for msg ~w", -%% %% [size(Data), ID]), -%% {ID, <>, AccMsgSz+size(Data)}; -%% {tcp, Sock, Data} -> -%% %% i("[incomplete] received ~w bytes of data for for msg ~w", -%% %% [size(Data), ID]), -%% handler_recv_request_data(Sock, Active, ID, SZ, -%% <>, -%% AccMsgSz+size(Data)) -%% end. - handler_send_reply(Mod, Sock, ID, Data) -> SZ = size(Data), Msg = < ID:32, SZ:32, Data/binary>>, - %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]), case Mod:send(Sock, Msg) of ok -> - %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]), ok; {error, Reason} -> (catch Mod:close(Sock)), @@ -534,7 +457,7 @@ handler_send_reply(Mod, Sock, ID, Data) -> handler_done(State) -> - handler_done(State, t()). + handler_done(State, ?T()). handler_done(#{start := Start, mod := Mod, @@ -542,7 +465,7 @@ handler_done(#{start := Start, mcnt := MCnt, bcnt := BCnt}, Stop) -> (catch Mod:close(Sock)), - exit({done, tdiff(Start, Stop), MCnt, BCnt}). + exit({done, ?TDIFF(Start, Stop), MCnt, BCnt}). handler_handle_message(#{parent := Parent} = State) -> @@ -634,45 +557,45 @@ reply(Pid, Ref, Reply) -> %% ========================================================================== -t() -> - os:timestamp(). +%% t() -> +%% os:timestamp(). -tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> - T1 = A1*1000000000+B1*1000+(C1 div 1000), - T2 = A2*1000000000+B2*1000+(C2 div 1000), - T2 - T1. +%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> +%% T1 = A1*1000000000+B1*1000+(C1 div 1000), +%% T2 = A2*1000000000+B2*1000+(C2 div 1000), +%% T2 - T1. -formated_timestamp() -> - format_timestamp(os:timestamp()). +%% formated_timestamp() -> +%% format_timestamp(os:timestamp()). -format_timestamp({_N1, _N2, N3} = TS) -> - {_Date, Time} = calendar:now_to_local_time(TS), - {Hour,Min,Sec} = Time, - FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", - [Hour, Min, Sec, round(N3/1000)]), - lists:flatten(FormatTS). +%% format_timestamp({_N1, _N2, N3} = TS) -> +%% {_Date, Time} = calendar:now_to_local_time(TS), +%% {Hour,Min,Sec} = Time, +%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", +%% [Hour, Min, Sec, round(N3/1000)]), +%% lists:flatten(FormatTS). -%% Time is always in number os ms (milli seconds) -format_time(T) -> - f("~p", [T]). +%% %% Time is always in number os ms (milli seconds) +%% format_time(T) -> +%% f("~p", [T]). %% ========================================================================== -f(F, A) -> - lists:flatten(io_lib:format(F, A)). +%% f(F, A) -> +%% lists:flatten(io_lib:format(F, A)). -e(F, A) -> - p(get(sname), " " ++ F, A). +%% e(F, A) -> +%% p(get(sname), " " ++ F, A). -i(F) -> - i(F, []). +%% i(F) -> +%% i(F, []). -i(F, A) -> - p(get(sname), " " ++ F, A). +%% i(F, A) -> +%% p(get(sname), " " ++ F, A). -p(undefined, F, A) -> - p("- ", F, A); -p(Prefix, F, A) -> - io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). +%% p(undefined, F, A) -> +%% p("- ", F, A); +%% p(Prefix, F, A) -> +%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl index de9df857fe..d62f4d86a6 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl @@ -21,14 +21,17 @@ -module(socket_test_ttest_tcp_server_socket). -export([ - start_monitor/1, + start_monitor/2, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). +%% -define(MOD(M), {?TRANSPORT_MOD, #{method => M, +%% stats_interval => 10000}}). +-define(MOD(M), {?TRANSPORT_MOD, #{method => M}}). -start_monitor(Active) -> - socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active). +start_monitor(Method, Active) -> + socket_test_ttest_tcp_server:start_monitor(?MOD(Method), Active). stop(Pid) -> socket_test_ttest_tcp_server:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index 12d9e052d7..f314d01a63 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -24,9 +24,9 @@ accept/1, accept/2, active/2, close/1, - connect/2, + connect/2, connect/3, controlling_process/2, - listen/0, listen/1, + listen/0, listen/1, listen/2, port/1, peername/1, recv/2, recv/3, @@ -38,25 +38,41 @@ -define(READER_RECV_TIMEOUT, 1000). +-define(DATA_MSG(Sock, Method, Data), + {socket, + #{sock => Sock, reader => self(), method => Method}, + Data}). + +-define(CLOSED_MSG(Sock, Method), + {socket_closed, + #{sock => Sock, reader => self(), method => Method}}). + +-define(ERROR_MSG(Sock, Method, Reason), + {socket_error, + #{sock => Sock, reader => self(), method => Method}, + Reason}). + %% ========================================================================== -accept(#{sock := LSock}) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> case socket:accept(LSock) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock}, Timeout) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. @@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) -> %% Create a socket and connect it to a peer connect(Addr, Port) -> + connect(Addr, Port, #{method => plain}). + +connect(Addr, Port, #{method := Method} = Opts) -> try begin Sock = @@ -100,9 +119,10 @@ connect(Addr, Port) -> (catch socket:close(Sock)), throw({error, {connect, CReason}}) end, - Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}} + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}} end catch throw:ERROR:_ -> @@ -110,6 +130,16 @@ connect(Addr, Port) -> end. +maybe_start_stats_timer(#{stats_to := Pid, + stats_interval := T}, + Reader) when is_pid(Pid) -> + erlang:start_timer(T, Pid, {stats, T, "reader", Reader}); +maybe_start_stats_timer(O, _) -> + io:format("NO STATS: " + "~n ~p" + "~n", [O]), + ok. + controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> case socket:setopt(Sock, otp, controlling_process, NewPid) of ok -> @@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> %% Create a listen socket listen() -> - listen(0). + listen(0, #{method => plain}). -listen(Port) when is_integer(Port) andalso (Port >= 0) -> +listen(Port) -> + listen(Port, #{method => plain}). +listen(Port, #{method := Method} = Opts) + when (is_integer(Port) andalso (Port >= 0)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> try begin Sock = case socket:open(inet, stream, tcp) of @@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) -> (catch socket:close(Sock)), throw({error, {listen, LReason}}) end, - {ok, #{sock => Sock}} + {ok, #{sock => Sock, opts => Opts}} end catch throw:{error, Reason}:_ -> @@ -178,14 +212,31 @@ peername(#{sock := Sock}) -> end. -recv(#{sock := Sock}, Length) -> - socket:recv(Sock, Length). -recv(#{sock := Sock}, Length, Timeout) -> - socket:recv(Sock, Length, Timeout). +recv(#{sock := Sock, method := plain}, Length) -> + socket:recv(Sock, Length); +recv(#{sock := Sock, method := msg}, Length) -> + case socket:recvmsg(Sock, Length, 0, [], infinity) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. + +recv(#{sock := Sock, method := plain}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout); +recv(#{sock := Sock, method := msg}, Length, Timeout) -> + case socket:recvmsg(Sock, Length, 0, [], Timeout) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. -send(#{sock := Sock}, Length) -> - socket:send(Sock, Length). +send(#{sock := Sock, method := plain}, Bin) -> + socket:send(Sock, Bin); +send(#{sock := Sock, method := msg}, Bin) -> + socket:sendmsg(Sock, #{iov => [Bin]}). shutdown(#{sock := Sock}, How) -> @@ -203,14 +254,16 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active) +reader_init(ControllingProcess, Sock, Active, Method) when is_pid(ControllingProcess) andalso - (is_boolean(Active) orelse (Active =:= once)) -> + (is_boolean(Active) orelse (Active =:= once)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, active => Active, - sock => Sock}). + sock => Sock, + method => Method}). %% Never read @@ -245,10 +298,11 @@ reader_loop(#{active := false, %% Read *once* and then change to false reader_loop(#{active := once, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State#{active => false}); {error, timeout} -> receive @@ -280,21 +334,22 @@ reader_loop(#{active := once, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end; %% Read and forward data continuously reader_loop(#{active := true, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State); {error, timeout} -> receive @@ -326,20 +381,26 @@ reader_loop(#{active := true, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end. - +do_recv(plain, Sock) -> + socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); +do_recv(msg, Sock) -> + case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. %% ========================================================================== - - -- cgit v1.2.3