diff options
author | Micael Karlberg <[email protected]> | 2018-12-07 19:12:38 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-12-07 19:12:38 +0100 |
commit | a33acd53524160d65929d06f06e387ce8419b1c0 (patch) | |
tree | fee37591fb85039583e58858ade959d0de360142 | |
parent | a862cc8ab56616e182c959a5a6a06e4aefa09b08 (diff) | |
parent | d0c3f79d22b4778f66ac1d8a2fc03e736f42e973 (diff) | |
download | otp-a33acd53524160d65929d06f06e387ce8419b1c0.tar.gz otp-a33acd53524160d65929d06f06e387ce8419b1c0.tar.bz2 otp-a33acd53524160d65929d06f06e387ce8419b1c0.zip |
Merge branch 'bmk/20181206/nififying_inet_valgrind/OTP-14831' into bmk/20180918/nififying_inet/OTP-14831
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 1 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 60 | ||||
-rw-r--r-- | erts/emulator/test/Makefile | 1 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_lib.erl | 125 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client.erl | 261 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client_socket.erl | 15 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server.erl | 391 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_server_socket.erl | 9 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_socket.erl | 129 |
9 files changed, 562 insertions, 430 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 7d223b8259..ec17e45f25 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -378,6 +378,7 @@ extern ERL_NIF_TERM esock_atom_einval; #define ALLOC_BIN(SZ, BP) enif_alloc_binary((SZ), (BP)) #define REALLOC_BIN(SZ, BP) enif_realloc_binary((SZ), (BP)) +#define FREE_BIN(BP) enif_release_binary((BP)) #endif // SOCKET_INT_H__ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 3da895e644..1e0533535c 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -4233,8 +4233,13 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, descP->type = type; descP->protocol = protocol; + /* + * Should we keep track of sockets (resources) in some way? + * Doing it here will require mutex to ensure data integrity, + * which will be costly. Send it somewhere? + */ res = enif_make_resource(env, descP); - enif_release_resource(descP); // We should really store a reference ... + enif_release_resource(descP); /* Keep track of the creator * This should not be a problem but just in case @@ -13592,6 +13597,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(bufP); + return res; } @@ -13633,6 +13640,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); SSDBG( descP, @@ -13664,6 +13674,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_update_current_reader(env, descP); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); return esock_make_ok3(env, atom_true, data); @@ -13707,6 +13720,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); + FREE_BIN(bufP); + return res; } else if ((saveErrno == ERRNO_BLOCK) || @@ -13721,16 +13736,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") ); - /* - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); - */ - sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); - SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); + SSDBG( descP, + ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); + FREE_BIN(bufP); + return esock_make_error(env, esock_atom_eagain); } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -13740,6 +13753,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(bufP); + return res; } @@ -13768,6 +13783,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_update_current_reader(env, descP); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); @@ -13792,6 +13810,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, cnt_inc(&descP->readByteCnt, read); + /* This transfers "ownership" of the *allocated* binary to an + * erlang term (no need for an explicit free). + */ data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); @@ -13862,6 +13883,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); + FREE_BIN(bufP); + return res; } else if ((saveErrno == ERRNO_BLOCK) || @@ -13875,7 +13898,10 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); + FREE_BIN(bufP); + return esock_make_error(env, esock_atom_eagain); + } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -13885,6 +13911,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(bufP); + return res; } @@ -13962,6 +13990,9 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, * *We* do never actually try to read 0 bytes from a stream socket! */ + + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return esock_make_error(env, atom_closed); } @@ -14002,6 +14033,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return res; } else if ((saveErrno == ERRNO_BLOCK) || @@ -14016,7 +14049,10 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return esock_make_error(env, esock_atom_eagain); + } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -14026,6 +14062,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, res); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return res; } @@ -14057,6 +14095,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, recv_update_current_reader(env, descP); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + return esock_make_error_str(env, xres); } else { @@ -15713,6 +15753,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { char buf[64]; /* Buffer used for building the mutex name */ + // This needs to be released when the socket is closed! descP->env = enif_alloc_env(); sprintf(buf, "socket[w,%d]", sock); @@ -16870,10 +16911,15 @@ void socket_dtor(ErlNifEnv* env, void* obj) { SocketDescriptor* descP = (SocketDescriptor*) obj; + enif_clear_env(descP->env); + enif_free_env(descP->env); + descP->env = NULL; + MDESTROY(descP->writeMtx); MDESTROY(descP->readMtx); MDESTROY(descP->accMtx); MDESTROY(descP->closeMtx); + } diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index a910588381..09bfe6f104 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -34,6 +34,7 @@ SOCKET_MODULES = \ socket_client \ socket_test_lib \ socket_test_evaluator \ + socket_test_ttest_lib \ socket_test_ttest_tcp_gen \ socket_test_ttest_tcp_socket \ socket_test_ttest_tcp_client \ diff --git a/erts/emulator/test/socket_test_ttest_lib.erl b/erts/emulator/test/socket_test_ttest_lib.erl new file mode 100644 index 0000000000..71679bc6d6 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_lib.erl @@ -0,0 +1,125 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_ttest_lib). + +-compile({no_auto_import, [error/2]}). + +-export([ + t/0, tdiff/2, + formated_timestamp/0, format_timestamp/1, + format_time/1, + + formated_process_stats/1, formated_process_stats/2, + + format/2, + error/1, error/2, + info/1, info/2 + ]). + +%% ========================================================================== + +t() -> + os:timestamp(). + +tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> + T1 = A1*1000000000+B1*1000+(C1 div 1000), + T2 = A2*1000000000+B2*1000+(C2 div 1000), + T2 - T1. + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + {Hour,Min,Sec} = Time, + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", + [Hour, Min, Sec, round(N3/1000)]), + lists:flatten(FormatTS). + +%% Time is always in number os ms (milli seconds) +%% At some point, we should convert this to a more readable format... +format_time(T) -> + format("~p", [T]). + + +formated_process_stats(Pid) -> + formated_process_stats("", Pid). + +formated_process_stats(Prefix, Pid) when is_list(Prefix) andalso is_pid(Pid) -> + try + begin + TotHeapSz = pi(Pid, total_heap_size), + HeapSz = pi(Pid, heap_size), + StackSz = pi(Pid, stack_size), + Reds = pi(Pid, reductions), + GCInfo = pi(Pid, garbage_collection), + MinBinVHeapSz = proplists:get_value(min_bin_vheap_size, GCInfo), + MinHeapSz = proplists:get_value(min_heap_size, GCInfo), + MinGCS = proplists:get_value(minor_gcs, GCInfo), + format("~n ~sTotal Heap Size: ~p" + "~n ~sHeap Size: ~p" + "~n ~sStack Size: ~p" + "~n ~sReductions: ~p" + "~n ~s[GC] Min Bin VHeap Size: ~p" + "~n ~s[GC] Min Heap Size: ~p" + "~n ~s[GC] Minor GCS: ~p", + [Prefix, TotHeapSz, + Prefix, HeapSz, + Prefix, StackSz, + Prefix, Reds, + Prefix, MinBinVHeapSz, + Prefix, MinHeapSz, + Prefix, MinGCS]) + end + catch + _:_:_ -> + "" + end. + + +pi(Pid, Item) -> + {Item, Info} = process_info(Pid, Item), + Info. + + + +%% ========================================================================== + +format(F, A) -> + lists:flatten(io_lib:format(F, A)). + +error(F) -> + error(F, []). + +error(F, A) -> + print(get(sname), "<ERROR> " ++ F, A). + +info(F) -> + info(F, []). + +info(F, A) -> + print(get(sname), "<INFO> " ++ F, A). + +print(undefined, F, A) -> + print("- ", F, A); +print(Prefix, F, A) -> + io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). + diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl index 1bd1bc54e9..9c43c41841 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_client.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl @@ -54,6 +54,14 @@ -define(MAX_OUTSTANDING_DEFAULT_2, 10). -define(MAX_OUTSTANDING_DEFAULT_3, 3). +-define(LIB, socket_test_ttest_lib). +-define(I(F), ?LIB:info(F)). +-define(I(F,A), ?LIB:info(F, A)). +-define(E(F,A), ?LIB:error(F, A)). +-define(F(F,A), ?LIB:format(F, A)). +-define(FORMAT_TIME(T), ?LIB:format_time(T)). +-define(T(), ?LIB:t()). +-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). -type active() :: once | boolean(). -type msg_id() :: 1..3. @@ -63,42 +71,28 @@ %% ========================================================================== --spec start_monitor(Mod, Active, Addr, Port) -> term() when - Mod :: atom(), - Active :: active(), - Addr :: inet:ip_address(), - Port :: inet:port_number(). +start_monitor(Transport, Active, Addr, Port) -> + start_monitor(Transport, Active, Addr, Port, ?MSG_ID_DEFAULT). %% RunTime is in number of ms. -start_monitor(Mod, Active, Addr, Port) -> - start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT). - --spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when - Mod :: atom(), - Active :: active(), - Addr :: inet:ip_address(), - Port :: inet:port_number(), - MsgID :: msg_id(). - -%% RunTime is in number of ms. -start_monitor(Mod, Active, Addr, Port, 1 = MsgID) -> - start_monitor(Mod, Active, Addr, Port, MsgID, +start_monitor(Transport, Active, Addr, Port, 1 = MsgID) -> + start_monitor(Transport, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); -start_monitor(Mod, Active, Addr, Port, 2 = MsgID) -> - start_monitor(Mod, Active, Addr, Port, MsgID, +start_monitor(Transport, Active, Addr, Port, 2 = MsgID) -> + start_monitor(Transport, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); -start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> - start_monitor(Mod, Active, Addr, Port, MsgID, +start_monitor(Transport, Active, Addr, Port, 3 = MsgID) -> + start_monitor(Transport, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). --spec start_monitor(Mod, +-spec start_monitor(Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> term() when - Mod :: atom(), + Transport :: atom() | tuple(), Active :: active(), Addr :: inet:ip_address(), Port :: inet:port_number(), @@ -107,9 +101,9 @@ start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> RunTime :: runtime(). %% RunTime is in number of ms. -start_monitor(Mod, Active, Addr, Port, +start_monitor(Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) - when is_atom(Mod) andalso + when (is_atom(Transport) orelse is_tuple(Transport)) andalso (is_boolean(Active) orelse (Active =:= once)) andalso is_tuple(Addr) andalso (is_integer(Port) andalso (Port > 0)) andalso @@ -119,7 +113,7 @@ start_monitor(Mod, Active, Addr, Port, Self = self(), ClientInit = fun() -> put(sname, "client"), init(Self, - Mod, Active, Addr, Port, + Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) end, {Pid, MRef} = spawn_monitor(ClientInit), @@ -147,28 +141,29 @@ stop(Pid) when is_pid(Pid) -> %% ========================================================================== -init(Parent, Mod, Active, Addr, Port, +init(Parent, Transport, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> - i("init -> entry with" - "~n Parent: ~p" - "~n Mod: ~p" - "~n Active: ~p" - "~n Addr: ~s" - "~n Port: ~p" - "~n Msg ID: ~p (=> 16 + ~w bytes)" - "~n Max Outstanding: ~p" - "~n (Suggested) Run Time: ~p ms", - [Parent, - Mod, Active, inet:ntoa(Addr), Port, - MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), - case Mod:connect(Addr, Port) of + ?I("init with" + "~n Parent: ~p" + "~n Transport: ~p" + "~n Active: ~p" + "~n Addr: ~s" + "~n Port: ~p" + "~n Msg ID: ~p (=> 16 + ~w bytes)" + "~n Max Outstanding: ~p" + "~n (Suggested) Run Time: ~p ms", + [Parent, + Transport, Active, inet:ntoa(Addr), Port, + MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), + {Mod, Connect} = process_transport(Transport), + case Connect(Addr, Port) of {ok, Sock} -> - i("init -> connected"), + ?I("connected"), Parent ! {?MODULE, self(), ok}, initial_activation(Mod, Sock, Active), Results = loop(#{slogan => run, runtime => RunTime, - start => t(), + start => ?T(), parent => Parent, mod => Mod, sock => Sock, @@ -187,10 +182,16 @@ init(Parent, Mod, Active, Addr, Port, (catch Mod:close(Sock)), exit(normal); {error, Reason} -> - i("init -> connect failed: ~p", [Reason]), + ?E("connect failed: ~p", [Reason]), exit({connect, Reason}) end. +process_transport(Mod) when is_atom(Mod) -> + {Mod, fun(A, P) -> Mod:connect(A, P) end}; +process_transport({Mod, Opts}) -> + {Mod, fun(A, P) -> Mod:connect(A, P, Opts) end}. + + which_msg_data(1) -> ?MSG_DATA1; which_msg_data(2) -> ?MSG_DATA2; which_msg_data(3) -> ?MSG_DATA3. @@ -200,21 +201,21 @@ present_results(#{status := ok, runtime := RunTime, bcnt := ByteCnt, cnt := NumIterations}) -> - i("Results: " - "~n Run Time: ~s" - "~n ByteCnt: ~s" - "~n NumIterations: ~s", - [format_time(RunTime), - if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> - f("~w, ~w", [ByteCnt, RunTime]); + ?I("Results: " + "~n Run Time: ~s" + "~n ByteCnt: ~s" + "~n NumIterations: ~s", + [?FORMAT_TIME(RunTime), + if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> + ?F("~w, ~w", [ByteCnt, RunTime]); true -> - f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) + ?F("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) end, if (RunTime =:= 0) -> "-"; true -> - f("~p => ~p iterations / ms", - [NumIterations, NumIterations div RunTime]) + ?F("~p => ~p iterations / ms", + [NumIterations, NumIterations div RunTime]) end]), ok; present_results(#{status := Failure, @@ -225,21 +226,21 @@ present_results(#{status := Failure, rcnt := RCnt, bcnt := BCnt, num := Num}) -> - i("Time Test failed: " - "~n ~p" - "~n" - "~nwhen" - "~n" - "~n Run Time: ~s" - "~n Send ID: ~p" - "~n Recv ID: ~p" - "~n Send Count: ~p" - "~n Recv Count: ~p" - "~n Byte Count: ~p" - "~n Num Iterations: ~p", - [Failure, - format_time(RunTime), - SID, RID, SCnt, RCnt, BCnt, Num]). + ?I("Time Test failed: " + "~n ~p" + "~n" + "~nwhen" + "~n" + "~n Run Time: ~s" + "~n Send ID: ~p" + "~n Recv ID: ~p" + "~n Send Count: ~p" + "~n Recv Count: ~p" + "~n Byte Count: ~p" + "~n Num Iterations: ~p", + [Failure, + ?FORMAT_TIME(RunTime), + SID, RID, SCnt, RCnt, BCnt, Num]). @@ -255,17 +256,14 @@ do_loop(State) -> do_loop( handle_message( msg_exchange(State) ) ). msg_exchange(#{rcnt := Num, num := Num} = State) -> - %% i("we are done"), finish(ok, State); msg_exchange(#{scnt := Num, num := Num} = State) -> %% We are done sending more requests - now we will just await %% the replies for the (still) outstanding replies. - %% i("we have sent all requests - (only) wait for replies"), msg_exchange( recv_reply(State) ); msg_exchange(#{outstanding := Outstanding, max_outstanding := MaxOutstanding} = State) when (Outstanding < MaxOutstanding) -> - %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]), msg_exchange( send_request(State) ); msg_exchange(State) -> send_request( recv_reply(State) ). @@ -273,9 +271,9 @@ msg_exchange(State) -> finish(ok, #{start := Start, bcnt := BCnt, num := Num}) -> - Stop = t(), + Stop = ?T(), throw(#{status => ok, - runtime => tdiff(Start, Stop), + runtime => ?TDIFF(Start, Stop), bcnt => BCnt, cnt => Num}); finish(Reason, @@ -283,9 +281,9 @@ finish(Reason, sid := SID, rid := RID, scnt := SCnt, rcnt := RCnt, bcnt := BCnt, num := Num}) -> - Stop = t(), + Stop = ?T(), throw(#{status => Reason, - runtime => tdiff(Start, Stop), + runtime => ?TDIFF(Start, Stop), sid => SID, rid => RID, scnt => SCnt, @@ -301,11 +299,6 @@ send_request(#{mod := Mod, max_outstanding := MaxOutstanding, msg_data := Data} = State) when (MaxOutstanding > Outstanding) -> - %% i("send request -> entry when" - %% "~n ID: ~p" - %% "~n Cnt: ~p" - %% "~n Outstanding: ~p" - %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]), SZ = size(Data), Req = <<?TTEST_TAG:32, ?TTEST_TYPE_REQUEST:32, @@ -314,12 +307,11 @@ send_request(#{mod := Mod, Data/binary>>, case Mod:send(Sock, Req) of ok -> - %% i("~w bytes sent", [size(Req)]), State#{sid => next_id(ID), scnt => Cnt + 1, outstanding => Outstanding + 1}; {error, Reason} -> - e("Failed sending request: ~p", [Reason]), + ?E("Failed sending request: ~p", [Reason]), exit({send, Reason}) end; send_request(State) -> @@ -334,11 +326,6 @@ recv_reply(#{mod := Mod, bcnt := BCnt, rcnt := Cnt, outstanding := Outstanding} = State) -> - %% i("recv-reply(false) -> entry with" - %% "~n (R)ID: ~p" - %% "~n (R)Cnt: ~p" - %% "~n BCnt: ~p" - %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]), case recv_reply_message1(Mod, Sock, ID) of {ok, MsgSz} -> State#{rid => next_id(ID), @@ -347,7 +334,7 @@ recv_reply(#{mod := Mod, outstanding => Outstanding - 1}; {error, timeout} -> - i("recv_reply(false) -> error: timeout"), + ?I("receive timeout"), State; {error, Reason} -> @@ -362,11 +349,6 @@ recv_reply(#{mod := Mod, rcnt := RCnt, outstanding := Outstanding, acc := Acc} = State) -> - %% i("recv-reply(~w) -> entry with" - %% "~n (R)ID: ~p" - %% "~n RCnt: ~p" - %% "~n BCnt: ~p" - %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]), case recv_reply_message2(Mod, Sock, ID, Acc) of {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> maybe_activate(Mod, Sock, Active), @@ -380,12 +362,12 @@ recv_reply(#{mod := Mod, State; {error, stop} -> - i("recv_reply(~w) -> stop", [Active]), + ?I("receive [~w] -> stop", [Active]), %% This will have the effect that no more requests are sent... - State#{num => SCnt, stop_started => t()}; + State#{num => SCnt, stop_started => ?T()}; {error, timeout} -> - i("recv_reply(~w) -> error: timeout", [Active]), + ?I("receive[~w] -> timeout", [Active]), State; {error, Reason} -> @@ -395,23 +377,19 @@ recv_reply(#{mod := Mod, %% This function reads exactly one (reply) message. No more no less. recv_reply_message1(Mod, Sock, ID) -> - %% i("recv_reply_message1 -> entry with" - %% "~n ID: ~w", [ID]), case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <<?TTEST_TAG:32, ?TTEST_TYPE_REPLY:32, ID:32, SZ:32>> = Hdr} -> %% Receive the ping-pong reply boby - %% i("recv_reply_message1 -> try read body" - %% "~n ID: ~w", [ID]), case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Data} when (size(Data) =:= SZ) -> {ok, size(Hdr) + size(Data)}; {error, Reason2} -> - i("recv_reply_message1 -> body error: " - "~n ~p: ~p", [Reason2]), - {error, {recv_hdr, Reason2}} + ?E("Failed reading body: " + "~n ~p: ~p", [Reason2]), + {error, {recv_body, Reason2}} end; {ok, <<BadTag:32, @@ -427,7 +405,7 @@ recv_reply_message1(Mod, Sock, ID) -> {error, invalid_hdr}; {error, Reason1} -> - i("recv_reply_message1 -> hdr error: " + ?E("Feiled reading header: " "~n ~p", [Reason1]), {error, {recv_hdr, Reason1}} end. @@ -437,8 +415,6 @@ recv_reply_message1(Mod, Sock, ID) -> %% accumulated. If that is not enough for a (complete) reply, it %% will attempt to receive more. recv_reply_message2(Mod, Sock, ID, Acc) -> - %% i("recv_reply_message2 -> entry with" - %% "~n ID: ~w", [ID]), case process_acc_data(ID, Acc) of ok -> %% No or insufficient data, so get more @@ -456,7 +432,6 @@ recv_reply_message2(Mod, Sock, ID, Acc) -> recv_reply_message3(_Mod, Sock, ID, Acc) -> receive {timeout, _TRef, stop} -> - %% i("stop - when messages: ~p", [process_info(self(), messages)]), {error, stop}; {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse @@ -469,7 +444,6 @@ recv_reply_message3(_Mod, Sock, ID, Acc) -> {Tag, Sock, Msg} when (Tag =:= tcp) orelse (Tag =:= socket) -> - %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]), process_acc_data(ID, <<Acc/binary, Msg/binary>>) %% after ?RECV_TIMEOUT -> @@ -482,9 +456,6 @@ process_acc_data(ID, <<?TTEST_TAG:32, ID:32, SZ:32, Data/binary>>) when (SZ =< size(Data)) -> - %% i("process_acc_data -> entry with" - %% "~n ID: ~w" - %% "~n SZ: ~w", [ID, SZ]), <<_Body:SZ/binary, Rest/binary>> = Data, {ok, {4*4+SZ, Rest}}; process_acc_data(ID, <<BadTag:32, @@ -508,9 +479,9 @@ process_acc_data(_ID, _Data) -> handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) -> receive {timeout, _TRef, stop} -> - i("stop"), + ?I("STOP"), %% This will have the effect that no more requests are sent... - State#{num => SCnt, stop_started => t()}; + State#{num => SCnt, stop_started => ?T()}; {?MODULE, Ref, Parent, stop} -> %% This *aborts* the test @@ -571,47 +542,47 @@ next_id(_) -> %% ========================================================================== -t() -> - os:timestamp(). +%% t() -> +%% os:timestamp(). -tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> - T1 = A1*1000000000+B1*1000+(C1 div 1000), - T2 = A2*1000000000+B2*1000+(C2 div 1000), - T2 - T1. +%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> +%% T1 = A1*1000000000+B1*1000+(C1 div 1000), +%% T2 = A2*1000000000+B2*1000+(C2 div 1000), +%% T2 - T1. -formated_timestamp() -> - format_timestamp(os:timestamp()). +%% formated_timestamp() -> +%% format_timestamp(os:timestamp()). -format_timestamp({_N1, _N2, N3} = TS) -> - {_Date, Time} = calendar:now_to_local_time(TS), - {Hour,Min,Sec} = Time, - FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", - [Hour, Min, Sec, round(N3/1000)]), - lists:flatten(FormatTS). +%% format_timestamp({_N1, _N2, N3} = TS) -> +%% {_Date, Time} = calendar:now_to_local_time(TS), +%% {Hour,Min,Sec} = Time, +%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", +%% [Hour, Min, Sec, round(N3/1000)]), +%% lists:flatten(FormatTS). -%% Time is always in number os ms (milli seconds) -format_time(T) -> - f("~p", [T]). +%% %% Time is always in number os ms (milli seconds) +%% format_time(T) -> +%% f("~p", [T]). %% ========================================================================== -f(F, A) -> - lists:flatten(io_lib:format(F, A)). +%% f(F, A) -> +%% lists:flatten(io_lib:format(F, A)). -%% e(F) -> -%% i("<ERROR> " ++ F). +%% %% e(F) -> +%% %% i("<ERROR> " ++ F). -e(F, A) -> - p(get(sname), "<ERROR> " ++ F, A). +%% e(F, A) -> +%% p(get(sname), "<ERROR> " ++ F, A). -i(F) -> - i(F, []). +%% i(F) -> +%% i(F, []). -i(F, A) -> - p(get(sname), "<INFO> " ++ F, A). +%% i(F, A) -> +%% p(get(sname), "<INFO> " ++ F, A). -p(undefined, F, A) -> - p("- ", F, A); -p(Prefix, F, A) -> - io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). +%% p(undefined, F, A) -> +%% p("- ", F, A); +%% p(Prefix, F, A) -> +%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). diff --git a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl index 0d2ec38876..ef980e8d32 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_client_socket.erl @@ -21,23 +21,24 @@ -module(socket_test_ttest_tcp_client_socket). -export([ - start_monitor/3, start_monitor/4, start_monitor/6, + start_monitor/4, start_monitor/5, start_monitor/7, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). +-define(MOD(M), {?TRANSPORT_MOD, #{method => Method}}). -start_monitor(Active, Addr, Port) -> - socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, +start_monitor(Method, Active, Addr, Port) -> + socket_test_ttest_tcp_client:start_monitor(?MOD(Method), Active, Addr, Port). -start_monitor(Active, Addr, Port, MsgID) -> - socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, +start_monitor(Method, Active, Addr, Port, MsgID) -> + socket_test_ttest_tcp_client:start_monitor(?MOD(Method), Active, Addr, Port, MsgID). -start_monitor(Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> - socket_test_ttest_tcp_client:start_monitor(?TRANSPORT_MOD, +start_monitor(Method, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> + socket_test_ttest_tcp_client:start_monitor(?MOD(Method), Active, Addr, Port, MsgID, MaxOutstanding, RunTime). diff --git a/erts/emulator/test/socket_test_ttest_tcp_server.erl b/erts/emulator/test/socket_test_ttest_tcp_server.erl index cb503a1feb..b248b063a3 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server.erl @@ -44,13 +44,25 @@ -define(ACC_TIMEOUT, 10000). -define(RECV_TIMEOUT, 10000). +-define(LIB, socket_test_ttest_lib). +-define(I(F), ?LIB:info(F)). +-define(I(F,A), ?LIB:info(F, A)). +-define(E(F,A), ?LIB:error(F, A)). +-define(F(F,A), ?LIB:format(F, A)). +-define(FORMAT_TIME(T), ?LIB:format_time(T)). +-define(T(), ?LIB:t()). +-define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). + %% ========================================================================== -start_monitor(Mod, Active) - when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) -> +start_monitor(Transport, Active) + when (is_atom(Transport) orelse is_tuple(Transport)) andalso + (is_boolean(Active) orelse (Active =:= once)) -> Self = self(), - ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end, + ServerInit = fun() -> put(sname, "server"), + server_init(Self, Transport, Active) + end, {Pid, MRef} = spawn_monitor(ServerInit), receive {'DOWN', MRef, process, Pid, normal} -> @@ -73,26 +85,28 @@ stop(Pid) when is_pid(Pid) -> %% ========================================================================== -server_init(Parent, Mod, Active) -> - i("init -> entry with" - "~n Parent: ~p" - "~n Mod: ~p" - "~n Active: ~p", [Parent, Mod, Active]), - case Mod:listen(0) of +server_init(Parent, Transport, Active) -> + ?I("init -> entry with" + "~n Parent: ~p" + "~n Transport: ~p" + "~n Active: ~p", [Parent, Transport, Active]), + {Mod, Listen, StatsInterval} = process_transport(Transport, Active), + case Listen(0) of {ok, LSock} -> case Mod:port(LSock) of {ok, Port} = OK -> Addr = which_addr(), % This is just for convenience - i("init -> listening on:" - "~n Addr: ~p (~s)" - "~n Port: ~w" - "~n", [Addr, inet:ntoa(Addr), Port]), + ?I("init -> listening on:" + "~n Addr: ~p (~s)" + "~n Port: ~w" + "~n", [Addr, inet:ntoa(Addr), Port]), Parent ! {?MODULE, self(), OK}, - server_loop(#{parent => Parent, - mod => Mod, - active => Active, - lsock => LSock, - handlers => [], + server_loop(#{parent => Parent, + mod => Mod, + active => Active, + lsock => LSock, + handlers => [], + stats_interval => StatsInterval, %% Accumulation runtime => 0, mcnt => 0, @@ -107,37 +121,40 @@ server_init(Parent, Mod, Active) -> exit({listen, LReason}) end. +process_transport(Mod, _) when is_atom(Mod) -> + {Mod, fun(Port) -> Mod:listen(Port) end, infinity}; +process_transport({Mod, #{stats_interval := T} = Opts}, Active) + when (Active =/= false) -> + {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T}; +process_transport({Mod, #{stats_interval := T} = Opts}, _Active) -> + {Mod, fun(Port) -> Mod:listen(Port, Opts) end, T}; +process_transport({Mod, Opts}, _Active) -> + {Mod, fun(Port) -> Mod:listen(Port, Opts) end, infinity}. + + server_loop(State) -> - %% i("loop -> enter"), server_loop( server_handle_message( server_accept(State) ) ). server_accept(#{mod := Mod, active := Active, lsock := LSock, handlers := Handlers} = State) -> - - %% i("server-accept -> entry with" - %% "~n Mod: ~p" - %% "~n Active: ~p" - %% "~n LSock: ~p", [Mod, Active, LSock]), - case Mod:accept(LSock, ?ACC_TIMEOUT) of {ok, Sock} -> - %% i("server-accept -> accepted: " - %% "~n Sock: ~p", [Sock]), - i("accepted connection from ~s", - [case Mod:peername(Sock) of - {ok, Peer} -> - format_peername(Peer); - {error, _} -> - "-" - end]), + ?I("accepted connection from ~s", + [case Mod:peername(Sock) of + {ok, Peer} -> + format_peername(Peer); + {error, _} -> + "-" + end]), {Pid, _} = handler_start(), - i("handler ~p started -> try transfer socket control", [Pid]), + ?I("handler ~p started -> try transfer socket control", [Pid]), case Mod:controlling_process(Sock, Pid) of ok -> - i("server-accept: handler ~p started", [Pid]), + maybe_start_stats_timer(State, Pid), + ?I("server-accept: handler ~p started", [Pid]), handler_continue(Pid, Mod, Sock, Active), Handlers2 = [Pid | Handlers], State#{handlers => Handlers2}; @@ -156,14 +173,31 @@ server_accept(#{mod := Mod, format_peername({Addr, Port}) -> case inet:gethostbyaddr(Addr) of {ok, #hostent{h_name = N}} -> - f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); + ?F("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); {error, _} -> - f("~p, ~p", [Addr, Port]) + ?F("~p, ~p", [Addr, Port]) end. - + +maybe_start_stats_timer(#{active := Active, stats_interval := Time}, Handler) + when (Active =/= false) andalso (is_integer(Time) andalso (Time > 0)) -> + start_stats_timer(Time, "handler", Handler); +maybe_start_stats_timer(_, _) -> + ok. + +start_stats_timer(Time, ProcStr, Pid) -> + erlang:start_timer(Time, self(), {stats, Time, ProcStr, Pid}). + server_handle_message(#{parent := Parent, handlers := H} = State) -> - %% i("server_handle_message -> enter"), receive + {timeout, _TRef, {stats, Interval, ProcStr, Pid}} -> + case server_handle_stats(ProcStr, Pid) of + ok -> + start_stats_timer(Interval, ProcStr, Pid); + skip -> + ok + end, + State; + {?MODULE, Ref, Parent, stop} -> reply(Parent, Ref, ok), lists:foreach(fun(P) -> handler_stop(P) end, H), @@ -176,11 +210,20 @@ server_handle_message(#{parent := Parent, handlers := H} = State) -> State end. +server_handle_stats(ProcStr, Pid) -> + case ?LIB:formated_process_stats(Pid) of + "" -> + skip; + FormatedStats -> + ?I("Statistics for ~s ~p:~s", [ProcStr, Pid, FormatedStats]), + ok + end. + server_handle_down(Pid, Reason, #{handlers := Handlers} = State) -> case lists:delete(Pid, Handlers) of Handlers -> - i("unknown process ~p died", [Pid]), + ?I("unknown process ~p died", [Pid]), State; Handlers2 -> server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2}) @@ -197,35 +240,35 @@ server_handle_handler_down(Pid, AccMCnt2 = AccMCnt + MCnt, AccBCnt2 = AccBCnt + BCnt, AccHCnt2 = AccHCnt + 1, - i("handler ~p (~w) done => accumulated results: " - "~n Run Time: ~s ms" - "~n Message Count: ~s" - "~n Byte Count: ~s", - [Pid, AccHCnt2, - format_time(AccRunTime2), - if (AccRunTime2 > 0) -> - f("~w => ~w (~w) msgs / ms", - [AccMCnt2, - AccMCnt2 div AccRunTime2, - (AccMCnt2 div AccHCnt2) div AccRunTime2]); - true -> - f("~w", [AccMCnt2]) - end, - if (AccRunTime2 > 0) -> - f("~w => ~w (~w) bytes / ms", - [AccBCnt2, - AccBCnt2 div AccRunTime2, - (AccBCnt2 div AccHCnt2) div AccRunTime2]); - true -> - f("~w", [AccBCnt2]) - end]), + ?I("handler ~p (~w) done => accumulated results: " + "~n Run Time: ~s ms" + "~n Message Count: ~s" + "~n Byte Count: ~s", + [Pid, AccHCnt2, + ?FORMAT_TIME(AccRunTime2), + if (AccRunTime2 > 0) -> + ?F("~w => ~w (~w) msgs / ms", + [AccMCnt2, + AccMCnt2 div AccRunTime2, + (AccMCnt2 div AccHCnt2) div AccRunTime2]); + true -> + ?F("~w", [AccMCnt2]) + end, + if (AccRunTime2 > 0) -> + ?F("~w => ~w (~w) bytes / ms", + [AccBCnt2, + AccBCnt2 div AccRunTime2, + (AccBCnt2 div AccHCnt2) div AccRunTime2]); + true -> + ?F("~w", [AccBCnt2]) + end]), State#{runtime => AccRunTime2, mcnt => AccMCnt2, bcnt => AccBCnt2, hcnt => AccHCnt2}; server_handle_handler_down(Pid, Reason, State) -> - i("handler ~p terminated: " - "~n ~p", [Pid, Reason]), + ?I("handler ~p terminated: " + "~n ~p", [Pid, Reason]), State. @@ -244,32 +287,31 @@ handler_stop(Pid) -> req(Pid, stop). handler_init(Parent) -> - i("starting"), + ?I("starting"), receive {?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} -> - i("received continue"), + ?I("received continue"), reply(Parent, Ref, ok), handler_initial_activation(Mod, Sock, Active), handler_loop(#{parent => Parent, mod => Mod, sock => Sock, active => Active, - start => t(), + start => ?T(), mcnt => 0, bcnt => 0, last_reply => none, acc => <<>>}) after 5000 -> - i("timeout when message queue: " - "~n ~p" - "~nwhen" - "~n Parent: ~p", [process_info(self(), messages), Parent]), + ?I("timeout when message queue: " + "~n ~p" + "~nwhen" + "~n Parent: ~p", [process_info(self(), messages), Parent]), handler_init(Parent) end. handler_loop(State) -> - %% i("handler-loop"), handler_loop( handler_handle_message( handler_recv_message(State) ) ). %% When passive, we read *one* request and then attempt to reply to it. @@ -278,7 +320,6 @@ handler_recv_message(#{mod := Mod, active := false, mcnt := MCnt, bcnt := BCnt} = State) -> - %% i("handler_recv_message(false) -> entry"), case handler_recv_message2(Mod, Sock) of {ok, {MsgSz, ID, Body}} -> handler_send_reply(Mod, Sock, ID, Body), @@ -305,7 +346,6 @@ handler_recv_message(#{mod := Mod, bcnt := BCnt, last_reply := LID, acc := Acc} = State) -> - %% i("handler_recv_message(~w) -> entry", [Active]), case handler_recv_message3(Mod, Sock, Acc, LID) of {ok, {MCnt2, BCnt2, LID2}, NewAcc} -> handler_maybe_activate(Mod, Sock, Active), @@ -319,12 +359,12 @@ handler_recv_message(#{mod := Mod, (size(Acc) =:= 0) -> handler_done(State); true -> - e("client done with partial message: " - "~n Last Reply Sent: ~w" - "~n Message Count: ~w" - "~n Byte Count: ~w" - "~n Partial Message: ~w bytes", - [LID, MCnt, BCnt, size(Acc)]), + ?E("client done with partial message: " + "~n Last Reply Sent: ~w" + "~n Message Count: ~w" + "~n Byte Count: ~w" + "~n Partial Message: ~w bytes", + [LID, MCnt, BCnt, size(Acc)]), exit({closed_with_partial_message, LID}) end; @@ -355,32 +395,27 @@ handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) -> handler_recv_message2(Mod, Sock) -> - %% i("handler_recv_message2 -> entry"), case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <<?TTEST_TAG:32, ?TTEST_TYPE_REQUEST:32, ID:32, SZ:32>> = Hdr} -> - %% i("handler_recv_message2 -> got request header: " - %% "~n ID: ~p" - %% "~n SZ: ~p", [ID, SZ]), case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Body} when (SZ =:= size(Body)) -> - %% i("handler_recv_message2 -> got body"), {ok, {size(Hdr) + size(Body), ID, Body}}; {error, BReason} -> - e("failed reading body (~w) of message ~w:" - "~n ~p", [SZ, ID, BReason]), + ?E("failed reading body (~w) of message ~w:" + "~n ~p", [SZ, ID, BReason]), exit({recv, body, ID, SZ, BReason}) end; {error, timeout} = ERROR -> - i("handler_recv_message2 -> timeout"), + ?I("timeout"), ERROR; {error, closed} = ERROR -> ERROR; {error, HReason} -> - e("failed reading header of message:" - "~n ~p", [HReason]), + ?E("Failed reading header of message:" + "~n ~p", [HReason]), exit({recv, header, HReason}) end. @@ -405,116 +440,6 @@ handler_recv_message3(Mod, Sock, Acc, LID) -> -%% %% Socket in passive mode => we need to read explicitly -%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) -> -%% %% i("try recv msg header"), -%% MsgSz = -%% case gen_tcp:recv(Sock, 4*4) of -%% {ok, <<?SOCKET_SIMPLE_TAG:32, -%% ?SOCKET_SIMPLE_TYPE_REQUEST:32, -%% ID:32, -%% SZ:32>> = Hdr} -> -%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]), -%% case gen_tcp:recv(Sock, SZ) of -%% {ok, Data} when (size(Data) =:= SZ) -> -%% handler_send_reply(Sock, ID, Data), -%% size(Hdr)+SZ; -%% {ok, InvData} -> -%% i("invalid data"), -%% (catch gen_tcp:close(Sock)), -%% exit({invalid_data, SZ, size(InvData)}); -%% {error, Reason2} -> -%% i("Data read failed: ~p", [Reason2]), -%% (catch gen_tcp:close(Sock)), -%% exit({recv_data, Reason2}) -%% end; -%% {ok, _InvHdr} -> -%% (catch gen_tcp:close(Sock)), -%% exit(invalid_hdr); -%% {error, closed} -> -%% i("we are done: " -%% "~n Message Count: ~p" -%% "~n Byte Count: ~p", [MCnt, BCnt]), -%% exit(normal); -%% {error, Reason1} -> -%% i("Header read failed: ~p", [Reason1]), -%% (catch gen_tcp:close(Sock)), -%% exit({recv_hdr, Reason1}) -%% end, -%% handler_loop(State, MCnt+1, BCnt+MsgSz); - -%% %% Socket in active mode (once | true) => data messages arrive -%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) -> -%% %% i("await msg"), -%% try handler_recv_request(Sock, Active) of -%% {ID, Data, MsgSz} -> -%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), -%% handler_send_reply(Sock, ID, Data), -%% handler_maybe_activate(Sock, Active), -%% handler_loop(State, MCnt+1, BCnt+MsgSz) -%% catch -%% throw:tcp_closed -> -%% i("we are done: " -%% "~n Message Count: ~p" -%% "~n Byte Count: ~p", [MCnt, BCnt]), -%% exit(normal); -%% throw:{tcp_error, Reason} -> -%% i("<ERROR> TCP error ~p when: " -%% "~n Message Count: ~p" -%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]), -%% exit({tcp_error, Reason}) -%% end. -%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active), -%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), -%% %% handler_send_reply(Sock, ID, Data), -%% %% handler_maybe_activate(Sock, Active), -%% %% handler_loop(State, MCnt+1, BCnt+MsgSz). - - -%% handler_recv_request(Sock, Active) -> -%% %% In theory we should also be ready for a partial header, -%% %% but I can't be bothered... -%% receive -%% {tcp_closed, Sock} -> -%% throw(tcp_closed); -%% {tcp_error, Sock, Reason} -> -%% throw({tcp_error, Reason}); -%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32, -%% ?SOCKET_SIMPLE_TYPE_REQUEST:32, -%% ID:32, -%% SZ:32, -%% Data/binary>> = Msg} when (size(Data) =:= SZ) -> -%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), -%% {ID, Data, size(Msg)}; -%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32, -%% ?SOCKET_SIMPLE_TYPE_REQUEST:32, -%% ID:32, -%% SZ:32, -%% Data/binary>> = Msg} when (size(Data) < SZ) -> -%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), -%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg)) -%% end. - -%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) -> -%% handler_maybe_activate(Sock, Active), -%% receive -%% {tcp_closed, Sock} -> -%% %% i("we are done (incomplete data)"), -%% throw(tcp_closed); -%% {tcp_error, Sock, Reason} -> -%% throw({tcp_error, Reason}); -%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) -> -%% %% i("[complete] received the remaining data (~w bytes) for msg ~w", -%% %% [size(Data), ID]), -%% {ID, <<AccData/binary, Data/binary>>, AccMsgSz+size(Data)}; -%% {tcp, Sock, Data} -> -%% %% i("[incomplete] received ~w bytes of data for for msg ~w", -%% %% [size(Data), ID]), -%% handler_recv_request_data(Sock, Active, ID, SZ, -%% <<AccData/binary, Data/binary>>, -%% AccMsgSz+size(Data)) -%% end. - handler_send_reply(Mod, Sock, ID, Data) -> SZ = size(Data), Msg = <<?TTEST_TAG:32, @@ -522,10 +447,8 @@ handler_send_reply(Mod, Sock, ID, Data) -> ID:32, SZ:32, Data/binary>>, - %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]), case Mod:send(Sock, Msg) of ok -> - %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]), ok; {error, Reason} -> (catch Mod:close(Sock)), @@ -534,7 +457,7 @@ handler_send_reply(Mod, Sock, ID, Data) -> handler_done(State) -> - handler_done(State, t()). + handler_done(State, ?T()). handler_done(#{start := Start, mod := Mod, @@ -542,7 +465,7 @@ handler_done(#{start := Start, mcnt := MCnt, bcnt := BCnt}, Stop) -> (catch Mod:close(Sock)), - exit({done, tdiff(Start, Stop), MCnt, BCnt}). + exit({done, ?TDIFF(Start, Stop), MCnt, BCnt}). handler_handle_message(#{parent := Parent} = State) -> @@ -634,45 +557,45 @@ reply(Pid, Ref, Reply) -> %% ========================================================================== -t() -> - os:timestamp(). +%% t() -> +%% os:timestamp(). -tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> - T1 = A1*1000000000+B1*1000+(C1 div 1000), - T2 = A2*1000000000+B2*1000+(C2 div 1000), - T2 - T1. +%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> +%% T1 = A1*1000000000+B1*1000+(C1 div 1000), +%% T2 = A2*1000000000+B2*1000+(C2 div 1000), +%% T2 - T1. -formated_timestamp() -> - format_timestamp(os:timestamp()). +%% formated_timestamp() -> +%% format_timestamp(os:timestamp()). -format_timestamp({_N1, _N2, N3} = TS) -> - {_Date, Time} = calendar:now_to_local_time(TS), - {Hour,Min,Sec} = Time, - FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", - [Hour, Min, Sec, round(N3/1000)]), - lists:flatten(FormatTS). +%% format_timestamp({_N1, _N2, N3} = TS) -> +%% {_Date, Time} = calendar:now_to_local_time(TS), +%% {Hour,Min,Sec} = Time, +%% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", +%% [Hour, Min, Sec, round(N3/1000)]), +%% lists:flatten(FormatTS). -%% Time is always in number os ms (milli seconds) -format_time(T) -> - f("~p", [T]). +%% %% Time is always in number os ms (milli seconds) +%% format_time(T) -> +%% f("~p", [T]). %% ========================================================================== -f(F, A) -> - lists:flatten(io_lib:format(F, A)). +%% f(F, A) -> +%% lists:flatten(io_lib:format(F, A)). -e(F, A) -> - p(get(sname), "<ERROR> " ++ F, A). +%% e(F, A) -> +%% p(get(sname), "<ERROR> " ++ F, A). -i(F) -> - i(F, []). +%% i(F) -> +%% i(F, []). -i(F, A) -> - p(get(sname), "<INFO> " ++ F, A). +%% i(F, A) -> +%% p(get(sname), "<INFO> " ++ F, A). -p(undefined, F, A) -> - p("- ", F, A); -p(Prefix, F, A) -> - io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). +%% p(undefined, F, A) -> +%% p("- ", F, A); +%% p(Prefix, F, A) -> +%% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). diff --git a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl index de9df857fe..d62f4d86a6 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_server_socket.erl @@ -21,14 +21,17 @@ -module(socket_test_ttest_tcp_server_socket). -export([ - start_monitor/1, + start_monitor/2, stop/1 ]). -define(TRANSPORT_MOD, socket_test_ttest_tcp_socket). +%% -define(MOD(M), {?TRANSPORT_MOD, #{method => M, +%% stats_interval => 10000}}). +-define(MOD(M), {?TRANSPORT_MOD, #{method => M}}). -start_monitor(Active) -> - socket_test_ttest_tcp_server:start_monitor(?TRANSPORT_MOD, Active). +start_monitor(Method, Active) -> + socket_test_ttest_tcp_server:start_monitor(?MOD(Method), Active). stop(Pid) -> socket_test_ttest_tcp_server:stop(Pid). diff --git a/erts/emulator/test/socket_test_ttest_tcp_socket.erl b/erts/emulator/test/socket_test_ttest_tcp_socket.erl index 12d9e052d7..f314d01a63 100644 --- a/erts/emulator/test/socket_test_ttest_tcp_socket.erl +++ b/erts/emulator/test/socket_test_ttest_tcp_socket.erl @@ -24,9 +24,9 @@ accept/1, accept/2, active/2, close/1, - connect/2, + connect/2, connect/3, controlling_process/2, - listen/0, listen/1, + listen/0, listen/1, listen/2, port/1, peername/1, recv/2, recv/3, @@ -38,25 +38,41 @@ -define(READER_RECV_TIMEOUT, 1000). +-define(DATA_MSG(Sock, Method, Data), + {socket, + #{sock => Sock, reader => self(), method => Method}, + Data}). + +-define(CLOSED_MSG(Sock, Method), + {socket_closed, + #{sock => Sock, reader => self(), method => Method}}). + +-define(ERROR_MSG(Sock, Method, Reason), + {socket_error, + #{sock => Sock, reader => self(), method => Method}, + Reason}). + %% ========================================================================== -accept(#{sock := LSock}) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}) -> case socket:accept(LSock) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. -accept(#{sock := LSock}, Timeout) -> +accept(#{sock := LSock, opts := #{method := Method} = Opts}, Timeout) -> case socket:accept(LSock, Timeout) of {ok, Sock} -> Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}}; + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}}; {error, _} = ERROR -> ERROR end. @@ -74,6 +90,9 @@ close(#{sock := Sock, reader := Pid}) -> %% Create a socket and connect it to a peer connect(Addr, Port) -> + connect(Addr, Port, #{method => plain}). + +connect(Addr, Port, #{method := Method} = Opts) -> try begin Sock = @@ -100,9 +119,10 @@ connect(Addr, Port) -> (catch socket:close(Sock)), throw({error, {connect, CReason}}) end, - Self = self(), - Reader = spawn(fun() -> reader_init(Self, Sock, false) end), - {ok, #{sock => Sock, reader => Reader}} + Self = self(), + Reader = spawn(fun() -> reader_init(Self, Sock, false, Method) end), + maybe_start_stats_timer(Opts, Reader), + {ok, #{sock => Sock, reader => Reader, method => Method}} end catch throw:ERROR:_ -> @@ -110,6 +130,16 @@ connect(Addr, Port) -> end. +maybe_start_stats_timer(#{stats_to := Pid, + stats_interval := T}, + Reader) when is_pid(Pid) -> + erlang:start_timer(T, Pid, {stats, T, "reader", Reader}); +maybe_start_stats_timer(O, _) -> + io:format("NO STATS: " + "~n ~p" + "~n", [O]), + ok. + controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> case socket:setopt(Sock, otp, controlling_process, NewPid) of ok -> @@ -125,9 +155,13 @@ controlling_process(#{sock := Sock, reader := Pid}, NewPid) -> %% Create a listen socket listen() -> - listen(0). + listen(0, #{method => plain}). -listen(Port) when is_integer(Port) andalso (Port >= 0) -> +listen(Port) -> + listen(Port, #{method => plain}). +listen(Port, #{method := Method} = Opts) + when (is_integer(Port) andalso (Port >= 0)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> try begin Sock = case socket:open(inet, stream, tcp) of @@ -152,7 +186,7 @@ listen(Port) when is_integer(Port) andalso (Port >= 0) -> (catch socket:close(Sock)), throw({error, {listen, LReason}}) end, - {ok, #{sock => Sock}} + {ok, #{sock => Sock, opts => Opts}} end catch throw:{error, Reason}:_ -> @@ -178,14 +212,31 @@ peername(#{sock := Sock}) -> end. -recv(#{sock := Sock}, Length) -> - socket:recv(Sock, Length). -recv(#{sock := Sock}, Length, Timeout) -> - socket:recv(Sock, Length, Timeout). +recv(#{sock := Sock, method := plain}, Length) -> + socket:recv(Sock, Length); +recv(#{sock := Sock, method := msg}, Length) -> + case socket:recvmsg(Sock, Length, 0, [], infinity) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. + +recv(#{sock := Sock, method := plain}, Length, Timeout) -> + socket:recv(Sock, Length, Timeout); +recv(#{sock := Sock, method := msg}, Length, Timeout) -> + case socket:recvmsg(Sock, Length, 0, [], Timeout) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. -send(#{sock := Sock}, Length) -> - socket:send(Sock, Length). +send(#{sock := Sock, method := plain}, Bin) -> + socket:send(Sock, Bin); +send(#{sock := Sock, method := msg}, Bin) -> + socket:sendmsg(Sock, #{iov => [Bin]}). shutdown(#{sock := Sock}, How) -> @@ -203,14 +254,16 @@ sockname(#{sock := Sock}) -> %% ========================================================================== -reader_init(ControllingProcess, Sock, Active) +reader_init(ControllingProcess, Sock, Active, Method) when is_pid(ControllingProcess) andalso - (is_boolean(Active) orelse (Active =:= once)) -> + (is_boolean(Active) orelse (Active =:= once)) andalso + ((Method =:= plain) orelse (Method =:= msg)) -> MRef = erlang:monitor(process, ControllingProcess), reader_loop(#{ctrl_proc => ControllingProcess, ctrl_proc_mref => MRef, active => Active, - sock => Sock}). + sock => Sock, + method => Method}). %% Never read @@ -245,10 +298,11 @@ reader_loop(#{active := false, %% Read *once* and then change to false reader_loop(#{active := once, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State#{active => false}); {error, timeout} -> receive @@ -280,21 +334,22 @@ reader_loop(#{active := once, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end; %% Read and forward data continuously reader_loop(#{active := true, sock := Sock, + method := Method, ctrl_proc := Pid} = State) -> - case socket:recv(Sock, 0, ?READER_RECV_TIMEOUT) of + case do_recv(Method, Sock) of {ok, Data} -> - Pid ! {socket, #{sock => Sock, reader => self()}, Data}, + Pid ! ?DATA_MSG(Sock, Method, Data), reader_loop(State); {error, timeout} -> receive @@ -326,20 +381,26 @@ reader_loop(#{active := true, end; {error, closed} -> - Pid ! {socket_closed, #{sock => Sock, reader => self()}}, + Pid ! ?CLOSED_MSG(Sock, Method), exit(normal); {error, Reason} -> - Pid ! {socket_error, #{sock => Sock, reader => self()}, Reason}, + Pid ! ?ERROR_MSG(Sock, Method, Reason), exit(Reason) end. - +do_recv(plain, Sock) -> + socket:recv(Sock, 0, ?READER_RECV_TIMEOUT); +do_recv(msg, Sock) -> + case socket:recvmsg(Sock, 0, 0, [], ?READER_RECV_TIMEOUT) of + {ok, #{iov := [Bin]}} -> + {ok, Bin}; + {error, _} = ERROR -> + ERROR + end. %% ========================================================================== - - |