diff options
author | Micael Karlberg <[email protected]> | 2018-12-05 18:28:19 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-12-05 18:28:19 +0100 |
commit | a862cc8ab56616e182c959a5a6a06e4aefa09b08 (patch) | |
tree | b6794282cc47a5eaf4aebe4b77b238d3310f3a3e /erts/emulator/test/socket_test_ttest_tcp_client.erl | |
parent | a76dbf18074b33774d4c8a58280b690c6f462962 (diff) | |
parent | 94d8e2f1bf9508656f5b9b2c2c644128a9bdfb57 (diff) | |
download | otp-a862cc8ab56616e182c959a5a6a06e4aefa09b08.tar.gz otp-a862cc8ab56616e182c959a5a6a06e4aefa09b08.tar.bz2 otp-a862cc8ab56616e182c959a5a6a06e4aefa09b08.zip |
Merge branch 'bmk/20181205/nififying_inet_ttest/OTP-14831' into bmk/20180918/nififying_inet/OTP-14831
Diffstat (limited to 'erts/emulator/test/socket_test_ttest_tcp_client.erl')
-rw-r--r-- | erts/emulator/test/socket_test_ttest_tcp_client.erl | 617 |
1 files changed, 617 insertions, 0 deletions
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl new file mode 100644 index 0000000000..1bd1bc54e9 --- /dev/null +++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl @@ -0,0 +1,617 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% ========================================================================== +%% +%% This is the "simple" client using gen_tcp. The client is supposed to be +%% as simple as possible in order to incur as little overhead as possible. +%% +%% There are three ways to run the client: active, passive or active-once. +%% +%% The client is the entity that controls the test, timing and counting. +%% +%% ========================================================================== +%% +%% Before the actual test starts, the client performs a "warmup". +%% The warmup has two functions. First, to ensure that everything is "loaded" +%% and, second, to calculate an approximate roundtrip time, in order to +%% "know" how many iterations we should make (to run for the expected time). +%% This is not intended to be exact, but just to ensure that all tests take +%% approx the same time to run. +%% +%% ========================================================================== + +-module(socket_test_ttest_tcp_client). + +-export([ + start_monitor/4, start_monitor/5, start_monitor/7, + stop/1 + ]). + +-include_lib("kernel/include/inet.hrl"). +-include("socket_test_ttest.hrl"). +-include("socket_test_ttest_client.hrl"). + +-define(RECV_TIMEOUT, 10000). +-define(MAX_OUTSTANDING_DEFAULT_1, 100). +-define(MAX_OUTSTANDING_DEFAULT_2, 10). +-define(MAX_OUTSTANDING_DEFAULT_3, 3). + + +-type active() :: once | boolean(). +-type msg_id() :: 1..3. +-type max_outstanding() :: pos_integer(). +-type runtime() :: pos_integer(). + + +%% ========================================================================== + +-spec start_monitor(Mod, Active, Addr, Port) -> term() when + Mod :: atom(), + Active :: active(), + Addr :: inet:ip_address(), + Port :: inet:port_number(). + +%% RunTime is in number of ms. +start_monitor(Mod, Active, Addr, Port) -> + start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT). + +-spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when + Mod :: atom(), + Active :: active(), + Addr :: inet:ip_address(), + Port :: inet:port_number(), + MsgID :: msg_id(). + +%% RunTime is in number of ms. +start_monitor(Mod, Active, Addr, Port, 1 = MsgID) -> + start_monitor(Mod, Active, Addr, Port, MsgID, + ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); +start_monitor(Mod, Active, Addr, Port, 2 = MsgID) -> + start_monitor(Mod, Active, Addr, Port, MsgID, + ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); +start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> + start_monitor(Mod, Active, Addr, Port, MsgID, + ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). + +-spec start_monitor(Mod, + Active, + Addr, + Port, + MsgID, + MaxOutstanding, + RunTime) -> term() when + Mod :: atom(), + Active :: active(), + Addr :: inet:ip_address(), + Port :: inet:port_number(), + MsgID :: msg_id(), + MaxOutstanding :: max_outstanding(), + RunTime :: runtime(). + +%% RunTime is in number of ms. +start_monitor(Mod, Active, Addr, Port, + MsgID, MaxOutstanding, RunTime) + when is_atom(Mod) andalso + (is_boolean(Active) orelse (Active =:= once)) andalso + is_tuple(Addr) andalso + (is_integer(Port) andalso (Port > 0)) andalso + (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso + (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso + (is_integer(RunTime) andalso (RunTime > 0)) -> + Self = self(), + ClientInit = fun() -> put(sname, "client"), + init(Self, + Mod, Active, Addr, Port, + MsgID, MaxOutstanding, RunTime) + end, + {Pid, MRef} = spawn_monitor(ClientInit), + receive + {?MODULE, Pid, ok} -> + erlang:demonitor(MRef, [flush]), + {ok, {Pid, MRef}}; + + {?MODULE, Pid, {error, _} = ERROR} -> + erlang:demonitor(MRef, [flush]), + ERROR; + + {'DOWN', MRef, process, Pid, normal} -> + ok; + {'DOWN', MRef, process, Pid, Reason} -> + {error, {exit, Reason}} + + end. + + +%% We should not normally stop this (it terminates when its done). +stop(Pid) when is_pid(Pid) -> + req(Pid, stop). + + +%% ========================================================================== + +init(Parent, Mod, Active, Addr, Port, + MsgID, MaxOutstanding, RunTime) -> + i("init -> entry with" + "~n Parent: ~p" + "~n Mod: ~p" + "~n Active: ~p" + "~n Addr: ~s" + "~n Port: ~p" + "~n Msg ID: ~p (=> 16 + ~w bytes)" + "~n Max Outstanding: ~p" + "~n (Suggested) Run Time: ~p ms", + [Parent, + Mod, Active, inet:ntoa(Addr), Port, + MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), + case Mod:connect(Addr, Port) of + {ok, Sock} -> + i("init -> connected"), + Parent ! {?MODULE, self(), ok}, + initial_activation(Mod, Sock, Active), + Results = loop(#{slogan => run, + runtime => RunTime, + start => t(), + parent => Parent, + mod => Mod, + sock => Sock, + active => Active, + msg_data => which_msg_data(MsgID), + outstanding => 0, + max_outstanding => MaxOutstanding, + sid => 1, + rid => 1, + scnt => 0, + rcnt => 0, + bcnt => 0, + num => undefined, + acc => <<>>}), + present_results(Results), + (catch Mod:close(Sock)), + exit(normal); + {error, Reason} -> + i("init -> connect failed: ~p", [Reason]), + exit({connect, Reason}) + end. + +which_msg_data(1) -> ?MSG_DATA1; +which_msg_data(2) -> ?MSG_DATA2; +which_msg_data(3) -> ?MSG_DATA3. + + +present_results(#{status := ok, + runtime := RunTime, + bcnt := ByteCnt, + cnt := NumIterations}) -> + i("Results: " + "~n Run Time: ~s" + "~n ByteCnt: ~s" + "~n NumIterations: ~s", + [format_time(RunTime), + if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> + f("~w, ~w", [ByteCnt, RunTime]); + true -> + f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) + end, + if (RunTime =:= 0) -> + "-"; + true -> + f("~p => ~p iterations / ms", + [NumIterations, NumIterations div RunTime]) + end]), + ok; +present_results(#{status := Failure, + runtime := RunTime, + sid := SID, + rid := RID, + scnt := SCnt, + rcnt := RCnt, + bcnt := BCnt, + num := Num}) -> + i("Time Test failed: " + "~n ~p" + "~n" + "~nwhen" + "~n" + "~n Run Time: ~s" + "~n Send ID: ~p" + "~n Recv ID: ~p" + "~n Send Count: ~p" + "~n Recv Count: ~p" + "~n Byte Count: ~p" + "~n Num Iterations: ~p", + [Failure, + format_time(RunTime), + SID, RID, SCnt, RCnt, BCnt, Num]). + + + +loop(#{runtime := RunTime} = State) -> + erlang:start_timer(RunTime, self(), stop), + try do_loop(State) + catch + throw:Results -> + Results + end. + +do_loop(State) -> + do_loop( handle_message( msg_exchange(State) ) ). + +msg_exchange(#{rcnt := Num, num := Num} = State) -> + %% i("we are done"), + finish(ok, State); +msg_exchange(#{scnt := Num, num := Num} = State) -> + %% We are done sending more requests - now we will just await + %% the replies for the (still) outstanding replies. + %% i("we have sent all requests - (only) wait for replies"), + msg_exchange( recv_reply(State) ); +msg_exchange(#{outstanding := Outstanding, + max_outstanding := MaxOutstanding} = State) + when (Outstanding < MaxOutstanding) -> + %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]), + msg_exchange( send_request(State) ); +msg_exchange(State) -> + send_request( recv_reply(State) ). + + +finish(ok, + #{start := Start, bcnt := BCnt, num := Num}) -> + Stop = t(), + throw(#{status => ok, + runtime => tdiff(Start, Stop), + bcnt => BCnt, + cnt => Num}); +finish(Reason, + #{start := Start, + sid := SID, rid := RID, + scnt := SCnt, rcnt := RCnt, bcnt := BCnt, + num := Num}) -> + Stop = t(), + throw(#{status => Reason, + runtime => tdiff(Start, Stop), + sid => SID, + rid => RID, + scnt => SCnt, + rcnt => RCnt, + bcnt => BCnt, + num => Num}). + +send_request(#{mod := Mod, + sock := Sock, + sid := ID, + scnt := Cnt, + outstanding := Outstanding, + max_outstanding := MaxOutstanding, + msg_data := Data} = State) + when (MaxOutstanding > Outstanding) -> + %% i("send request -> entry when" + %% "~n ID: ~p" + %% "~n Cnt: ~p" + %% "~n Outstanding: ~p" + %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]), + SZ = size(Data), + Req = <<?TTEST_TAG:32, + ?TTEST_TYPE_REQUEST:32, + ID:32, + SZ:32, + Data/binary>>, + case Mod:send(Sock, Req) of + ok -> + %% i("~w bytes sent", [size(Req)]), + State#{sid => next_id(ID), + scnt => Cnt + 1, + outstanding => Outstanding + 1}; + {error, Reason} -> + e("Failed sending request: ~p", [Reason]), + exit({send, Reason}) + end; +send_request(State) -> + State. + + + +recv_reply(#{mod := Mod, + sock := Sock, + rid := ID, + active := false, + bcnt := BCnt, + rcnt := Cnt, + outstanding := Outstanding} = State) -> + %% i("recv-reply(false) -> entry with" + %% "~n (R)ID: ~p" + %% "~n (R)Cnt: ~p" + %% "~n BCnt: ~p" + %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]), + case recv_reply_message1(Mod, Sock, ID) of + {ok, MsgSz} -> + State#{rid => next_id(ID), + bcnt => BCnt + MsgSz, + rcnt => Cnt + 1, + outstanding => Outstanding - 1}; + + {error, timeout} -> + i("recv_reply(false) -> error: timeout"), + State; + + {error, Reason} -> + finish(Reason, State) + end; +recv_reply(#{mod := Mod, + sock := Sock, + rid := ID, + active := Active, + bcnt := BCnt, + scnt := SCnt, + rcnt := RCnt, + outstanding := Outstanding, + acc := Acc} = State) -> + %% i("recv-reply(~w) -> entry with" + %% "~n (R)ID: ~p" + %% "~n RCnt: ~p" + %% "~n BCnt: ~p" + %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]), + case recv_reply_message2(Mod, Sock, ID, Acc) of + {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> + maybe_activate(Mod, Sock, Active), + State#{rid => next_id(ID), + bcnt => BCnt + MsgSz, + rcnt => RCnt + 1, + outstanding => Outstanding - 1, + acc => NewAcc}; + + ok -> + State; + + {error, stop} -> + i("recv_reply(~w) -> stop", [Active]), + %% This will have the effect that no more requests are sent... + State#{num => SCnt, stop_started => t()}; + + {error, timeout} -> + i("recv_reply(~w) -> error: timeout", [Active]), + State; + + {error, Reason} -> + finish(Reason, State) + end. + + +%% This function reads exactly one (reply) message. No more no less. +recv_reply_message1(Mod, Sock, ID) -> + %% i("recv_reply_message1 -> entry with" + %% "~n ID: ~w", [ID]), + case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of + {ok, <<?TTEST_TAG:32, + ?TTEST_TYPE_REPLY:32, + ID:32, + SZ:32>> = Hdr} -> + %% Receive the ping-pong reply boby + %% i("recv_reply_message1 -> try read body" + %% "~n ID: ~w", [ID]), + case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of + {ok, Data} when (size(Data) =:= SZ) -> + {ok, size(Hdr) + size(Data)}; + {error, Reason2} -> + i("recv_reply_message1 -> body error: " + "~n ~p: ~p", [Reason2]), + {error, {recv_hdr, Reason2}} + end; + + {ok, <<BadTag:32, + BadType:32, + BadID:32, + BadSZ:32>>} -> + {error, {invalid_hdr, + {?TTEST_TAG, BadTag}, + {?TTEST_TYPE_REPLY, BadType}, + {ID, BadID}, + BadSZ}}; + {ok, _InvHdr} -> + {error, invalid_hdr}; + + {error, Reason1} -> + i("recv_reply_message1 -> hdr error: " + "~n ~p", [Reason1]), + {error, {recv_hdr, Reason1}} + end. + + +%% This function first attempts to process the data we have already +%% accumulated. If that is not enough for a (complete) reply, it +%% will attempt to receive more. +recv_reply_message2(Mod, Sock, ID, Acc) -> + %% i("recv_reply_message2 -> entry with" + %% "~n ID: ~w", [ID]), + case process_acc_data(ID, Acc) of + ok -> + %% No or insufficient data, so get more + recv_reply_message3(Mod, Sock, ID, Acc); + + {ok, _} = OK -> % We already had a reply accumulated - no need to read more + OK; + + {error, _} = ERROR -> + ERROR + end. + +%% This function receives a "chunk" of data, then it tries to extract +%% one (reply) message from the accumulated and new data (combined). +recv_reply_message3(_Mod, Sock, ID, Acc) -> + receive + {timeout, _TRef, stop} -> + %% i("stop - when messages: ~p", [process_info(self(), messages)]), + {error, stop}; + + {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse + (TagClosed =:= socket_closed) -> + {error, closed}; + + {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse + (TagErr =:= socket_error) -> + {error, Reason}; + + {Tag, Sock, Msg} when (Tag =:= tcp) orelse + (Tag =:= socket) -> + %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]), + process_acc_data(ID, <<Acc/binary, Msg/binary>>) + + %% after ?RECV_TIMEOUT -> + %% {error, timeout} + end. + + +process_acc_data(ID, <<?TTEST_TAG:32, + ?TTEST_TYPE_REPLY:32, + ID:32, + SZ:32, + Data/binary>>) when (SZ =< size(Data)) -> + %% i("process_acc_data -> entry with" + %% "~n ID: ~w" + %% "~n SZ: ~w", [ID, SZ]), + <<_Body:SZ/binary, Rest/binary>> = Data, + {ok, {4*4+SZ, Rest}}; +process_acc_data(ID, <<BadTag:32, + BadType:32, + BadID:32, + BadSZ:32, + _Data/binary>>) + when ((BadTag =/= ?TTEST_TAG) orelse + (BadType =/= ?TTEST_TYPE_REPLY) orelse + (BadID =/= ID)) -> + {error, {invalid_hdr, + {?TTEST_TAG, BadTag}, + {?TTEST_TYPE_REPLY, BadType}, + {ID, BadID}, + BadSZ}}; +%% Not enough for an entire (reply) message +process_acc_data(_ID, _Data) -> + ok. + + +handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) -> + receive + {timeout, _TRef, stop} -> + i("stop"), + %% This will have the effect that no more requests are sent... + State#{num => SCnt, stop_started => t()}; + + {?MODULE, Ref, Parent, stop} -> + %% This *aborts* the test + reply(Parent, Ref, ok), + exit(normal); + + %% Only when active + {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse + (TagClosed =:= socket_closed) -> + %% We should never get this (unless the server crashed) + exit({closed, Reason}); + + %% Only when active + {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse + (TagErr =:= socket_error) -> + exit({error, Reason}) + + after 0 -> + State + end. + + +initial_activation(_Mod, _Sock, false = _Active) -> + ok; +initial_activation(Mod, Sock, Active) -> + Mod:active(Sock, Active). + + +maybe_activate(Mod, Sock, once = Active) -> + Mod:active(Sock, Active); +maybe_activate(_, _, _) -> + ok. + + +%% ========================================================================== + +req(Pid, Req) -> + Ref = make_ref(), + Pid ! {?MODULE, Ref, Pid, Req}, + receive + {'EXIT', Pid, Reason} -> + {error, {exit, Reason}}; + {?MODULE, Ref, Reply} -> + Reply + end. + +reply(Pid, Ref, Reply) -> + Pid ! {?MODULE, Ref, Reply}. + + +%% ========================================================================== + +next_id(ID) when (ID < ?MAX_ID) -> + ID + 1; +next_id(_) -> + 1. + + +%% ========================================================================== + +t() -> + os:timestamp(). + +tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> + T1 = A1*1000000000+B1*1000+(C1 div 1000), + T2 = A2*1000000000+B2*1000+(C2 div 1000), + T2 - T1. + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp({_N1, _N2, N3} = TS) -> + {_Date, Time} = calendar:now_to_local_time(TS), + {Hour,Min,Sec} = Time, + FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", + [Hour, Min, Sec, round(N3/1000)]), + lists:flatten(FormatTS). + +%% Time is always in number os ms (milli seconds) +format_time(T) -> + f("~p", [T]). + + +%% ========================================================================== + +f(F, A) -> + lists:flatten(io_lib:format(F, A)). + +%% e(F) -> +%% i("<ERROR> " ++ F). + +e(F, A) -> + p(get(sname), "<ERROR> " ++ F, A). + +i(F) -> + i(F, []). + +i(F, A) -> + p(get(sname), "<INFO> " ++ F, A). + +p(undefined, F, A) -> + p("- ", F, A); +p(Prefix, F, A) -> + io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]). |