%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2018-2018. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% ========================================================================== %% %% This is the "simple" client using gen_tcp. The client is supposed to be %% as simple as possible in order to incur as little overhead as possible. %% %% There are three ways to run the client: active, passive or active-once. %% %% The client is the entity that controls the test, timing and counting. %% %% ========================================================================== %% %% Before the actual test starts, the client performs a "warmup". %% The warmup has two functions. First, to ensure that everything is "loaded" %% and, second, to calculate an approximate roundtrip time, in order to %% "know" how many iterations we should make (to run for the expected time). %% This is not intended to be exact, but just to ensure that all tests take %% approx the same time to run. %% %% ========================================================================== -module(socket_test_ttest_tcp_client). -export([ start_monitor/4, start_monitor/5, start_monitor/7, stop/1 ]). -include_lib("kernel/include/inet.hrl"). -include("socket_test_ttest.hrl"). -include("socket_test_ttest_client.hrl"). -define(RECV_TIMEOUT, 10000). -define(MAX_OUTSTANDING_DEFAULT_1, 100). -define(MAX_OUTSTANDING_DEFAULT_2, 10). -define(MAX_OUTSTANDING_DEFAULT_3, 3). -type active() :: once | boolean(). -type msg_id() :: 1..3. -type max_outstanding() :: pos_integer(). -type runtime() :: pos_integer(). %% ========================================================================== -spec start_monitor(Mod, Active, Addr, Port) -> term() when Mod :: atom(), Active :: active(), Addr :: inet:ip_address(), Port :: inet:port_number(). %% RunTime is in number of ms. start_monitor(Mod, Active, Addr, Port) -> start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT). -spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when Mod :: atom(), Active :: active(), Addr :: inet:ip_address(), Port :: inet:port_number(), MsgID :: msg_id(). %% RunTime is in number of ms. start_monitor(Mod, Active, Addr, Port, 1 = MsgID) -> start_monitor(Mod, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); start_monitor(Mod, Active, Addr, Port, 2 = MsgID) -> start_monitor(Mod, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); start_monitor(Mod, Active, Addr, Port, 3 = MsgID) -> start_monitor(Mod, Active, Addr, Port, MsgID, ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). -spec start_monitor(Mod, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> term() when Mod :: atom(), Active :: active(), Addr :: inet:ip_address(), Port :: inet:port_number(), MsgID :: msg_id(), MaxOutstanding :: max_outstanding(), RunTime :: runtime(). %% RunTime is in number of ms. start_monitor(Mod, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) when is_atom(Mod) andalso (is_boolean(Active) orelse (Active =:= once)) andalso is_tuple(Addr) andalso (is_integer(Port) andalso (Port > 0)) andalso (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso (is_integer(RunTime) andalso (RunTime > 0)) -> Self = self(), ClientInit = fun() -> put(sname, "client"), init(Self, Mod, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) end, {Pid, MRef} = spawn_monitor(ClientInit), receive {?MODULE, Pid, ok} -> erlang:demonitor(MRef, [flush]), {ok, {Pid, MRef}}; {?MODULE, Pid, {error, _} = ERROR} -> erlang:demonitor(MRef, [flush]), ERROR; {'DOWN', MRef, process, Pid, normal} -> ok; {'DOWN', MRef, process, Pid, Reason} -> {error, {exit, Reason}} end. %% We should not normally stop this (it terminates when its done). stop(Pid) when is_pid(Pid) -> req(Pid, stop). %% ========================================================================== init(Parent, Mod, Active, Addr, Port, MsgID, MaxOutstanding, RunTime) -> i("init -> entry with" "~n Parent: ~p" "~n Mod: ~p" "~n Active: ~p" "~n Addr: ~s" "~n Port: ~p" "~n Msg ID: ~p (=> 16 + ~w bytes)" "~n Max Outstanding: ~p" "~n (Suggested) Run Time: ~p ms", [Parent, Mod, Active, inet:ntoa(Addr), Port, MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]), case Mod:connect(Addr, Port) of {ok, Sock} -> i("init -> connected"), Parent ! {?MODULE, self(), ok}, initial_activation(Mod, Sock, Active), Results = loop(#{slogan => run, runtime => RunTime, start => t(), parent => Parent, mod => Mod, sock => Sock, active => Active, msg_data => which_msg_data(MsgID), outstanding => 0, max_outstanding => MaxOutstanding, sid => 1, rid => 1, scnt => 0, rcnt => 0, bcnt => 0, num => undefined, acc => <<>>}), present_results(Results), (catch Mod:close(Sock)), exit(normal); {error, Reason} -> i("init -> connect failed: ~p", [Reason]), exit({connect, Reason}) end. which_msg_data(1) -> ?MSG_DATA1; which_msg_data(2) -> ?MSG_DATA2; which_msg_data(3) -> ?MSG_DATA3. present_results(#{status := ok, runtime := RunTime, bcnt := ByteCnt, cnt := NumIterations}) -> i("Results: " "~n Run Time: ~s" "~n ByteCnt: ~s" "~n NumIterations: ~s", [format_time(RunTime), if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> f("~w, ~w", [ByteCnt, RunTime]); true -> f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) end, if (RunTime =:= 0) -> "-"; true -> f("~p => ~p iterations / ms", [NumIterations, NumIterations div RunTime]) end]), ok; present_results(#{status := Failure, runtime := RunTime, sid := SID, rid := RID, scnt := SCnt, rcnt := RCnt, bcnt := BCnt, num := Num}) -> i("Time Test failed: " "~n ~p" "~n" "~nwhen" "~n" "~n Run Time: ~s" "~n Send ID: ~p" "~n Recv ID: ~p" "~n Send Count: ~p" "~n Recv Count: ~p" "~n Byte Count: ~p" "~n Num Iterations: ~p", [Failure, format_time(RunTime), SID, RID, SCnt, RCnt, BCnt, Num]). loop(#{runtime := RunTime} = State) -> erlang:start_timer(RunTime, self(), stop), try do_loop(State) catch throw:Results -> Results end. do_loop(State) -> do_loop( handle_message( msg_exchange(State) ) ). msg_exchange(#{rcnt := Num, num := Num} = State) -> %% i("we are done"), finish(ok, State); msg_exchange(#{scnt := Num, num := Num} = State) -> %% We are done sending more requests - now we will just await %% the replies for the (still) outstanding replies. %% i("we have sent all requests - (only) wait for replies"), msg_exchange( recv_reply(State) ); msg_exchange(#{outstanding := Outstanding, max_outstanding := MaxOutstanding} = State) when (Outstanding < MaxOutstanding) -> %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]), msg_exchange( send_request(State) ); msg_exchange(State) -> send_request( recv_reply(State) ). finish(ok, #{start := Start, bcnt := BCnt, num := Num}) -> Stop = t(), throw(#{status => ok, runtime => tdiff(Start, Stop), bcnt => BCnt, cnt => Num}); finish(Reason, #{start := Start, sid := SID, rid := RID, scnt := SCnt, rcnt := RCnt, bcnt := BCnt, num := Num}) -> Stop = t(), throw(#{status => Reason, runtime => tdiff(Start, Stop), sid => SID, rid => RID, scnt => SCnt, rcnt => RCnt, bcnt => BCnt, num => Num}). send_request(#{mod := Mod, sock := Sock, sid := ID, scnt := Cnt, outstanding := Outstanding, max_outstanding := MaxOutstanding, msg_data := Data} = State) when (MaxOutstanding > Outstanding) -> %% i("send request -> entry when" %% "~n ID: ~p" %% "~n Cnt: ~p" %% "~n Outstanding: ~p" %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]), SZ = size(Data), Req = <>, case Mod:send(Sock, Req) of ok -> %% i("~w bytes sent", [size(Req)]), State#{sid => next_id(ID), scnt => Cnt + 1, outstanding => Outstanding + 1}; {error, Reason} -> e("Failed sending request: ~p", [Reason]), exit({send, Reason}) end; send_request(State) -> State. recv_reply(#{mod := Mod, sock := Sock, rid := ID, active := false, bcnt := BCnt, rcnt := Cnt, outstanding := Outstanding} = State) -> %% i("recv-reply(false) -> entry with" %% "~n (R)ID: ~p" %% "~n (R)Cnt: ~p" %% "~n BCnt: ~p" %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]), case recv_reply_message1(Mod, Sock, ID) of {ok, MsgSz} -> State#{rid => next_id(ID), bcnt => BCnt + MsgSz, rcnt => Cnt + 1, outstanding => Outstanding - 1}; {error, timeout} -> i("recv_reply(false) -> error: timeout"), State; {error, Reason} -> finish(Reason, State) end; recv_reply(#{mod := Mod, sock := Sock, rid := ID, active := Active, bcnt := BCnt, scnt := SCnt, rcnt := RCnt, outstanding := Outstanding, acc := Acc} = State) -> %% i("recv-reply(~w) -> entry with" %% "~n (R)ID: ~p" %% "~n RCnt: ~p" %% "~n BCnt: ~p" %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]), case recv_reply_message2(Mod, Sock, ID, Acc) of {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> maybe_activate(Mod, Sock, Active), State#{rid => next_id(ID), bcnt => BCnt + MsgSz, rcnt => RCnt + 1, outstanding => Outstanding - 1, acc => NewAcc}; ok -> State; {error, stop} -> i("recv_reply(~w) -> stop", [Active]), %% This will have the effect that no more requests are sent... State#{num => SCnt, stop_started => t()}; {error, timeout} -> i("recv_reply(~w) -> error: timeout", [Active]), State; {error, Reason} -> finish(Reason, State) end. %% This function reads exactly one (reply) message. No more no less. recv_reply_message1(Mod, Sock, ID) -> %% i("recv_reply_message1 -> entry with" %% "~n ID: ~w", [ID]), case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <> = Hdr} -> %% Receive the ping-pong reply boby %% i("recv_reply_message1 -> try read body" %% "~n ID: ~w", [ID]), case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Data} when (size(Data) =:= SZ) -> {ok, size(Hdr) + size(Data)}; {error, Reason2} -> i("recv_reply_message1 -> body error: " "~n ~p: ~p", [Reason2]), {error, {recv_hdr, Reason2}} end; {ok, <>} -> {error, {invalid_hdr, {?TTEST_TAG, BadTag}, {?TTEST_TYPE_REPLY, BadType}, {ID, BadID}, BadSZ}}; {ok, _InvHdr} -> {error, invalid_hdr}; {error, Reason1} -> i("recv_reply_message1 -> hdr error: " "~n ~p", [Reason1]), {error, {recv_hdr, Reason1}} end. %% This function first attempts to process the data we have already %% accumulated. If that is not enough for a (complete) reply, it %% will attempt to receive more. recv_reply_message2(Mod, Sock, ID, Acc) -> %% i("recv_reply_message2 -> entry with" %% "~n ID: ~w", [ID]), case process_acc_data(ID, Acc) of ok -> %% No or insufficient data, so get more recv_reply_message3(Mod, Sock, ID, Acc); {ok, _} = OK -> % We already had a reply accumulated - no need to read more OK; {error, _} = ERROR -> ERROR end. %% This function receives a "chunk" of data, then it tries to extract %% one (reply) message from the accumulated and new data (combined). recv_reply_message3(_Mod, Sock, ID, Acc) -> receive {timeout, _TRef, stop} -> %% i("stop - when messages: ~p", [process_info(self(), messages)]), {error, stop}; {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse (TagClosed =:= socket_closed) -> {error, closed}; {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse (TagErr =:= socket_error) -> {error, Reason}; {Tag, Sock, Msg} when (Tag =:= tcp) orelse (Tag =:= socket) -> %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]), process_acc_data(ID, <>) %% after ?RECV_TIMEOUT -> %% {error, timeout} end. process_acc_data(ID, <>) when (SZ =< size(Data)) -> %% i("process_acc_data -> entry with" %% "~n ID: ~w" %% "~n SZ: ~w", [ID, SZ]), <<_Body:SZ/binary, Rest/binary>> = Data, {ok, {4*4+SZ, Rest}}; process_acc_data(ID, <>) when ((BadTag =/= ?TTEST_TAG) orelse (BadType =/= ?TTEST_TYPE_REPLY) orelse (BadID =/= ID)) -> {error, {invalid_hdr, {?TTEST_TAG, BadTag}, {?TTEST_TYPE_REPLY, BadType}, {ID, BadID}, BadSZ}}; %% Not enough for an entire (reply) message process_acc_data(_ID, _Data) -> ok. handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) -> receive {timeout, _TRef, stop} -> i("stop"), %% This will have the effect that no more requests are sent... State#{num => SCnt, stop_started => t()}; {?MODULE, Ref, Parent, stop} -> %% This *aborts* the test reply(Parent, Ref, ok), exit(normal); %% Only when active {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse (TagClosed =:= socket_closed) -> %% We should never get this (unless the server crashed) exit({closed, Reason}); %% Only when active {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse (TagErr =:= socket_error) -> exit({error, Reason}) after 0 -> State end. initial_activation(_Mod, _Sock, false = _Active) -> ok; initial_activation(Mod, Sock, Active) -> Mod:active(Sock, Active). maybe_activate(Mod, Sock, once = Active) -> Mod:active(Sock, Active); maybe_activate(_, _, _) -> ok. %% ========================================================================== req(Pid, Req) -> Ref = make_ref(), Pid ! {?MODULE, Ref, Pid, Req}, receive {'EXIT', Pid, Reason} -> {error, {exit, Reason}}; {?MODULE, Ref, Reply} -> Reply end. reply(Pid, Ref, Reply) -> Pid ! {?MODULE, Ref, Reply}. %% ========================================================================== next_id(ID) when (ID < ?MAX_ID) -> ID + 1; next_id(_) -> 1. %% ========================================================================== t() -> os:timestamp(). tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> T1 = A1*1000000000+B1*1000+(C1 div 1000), T2 = A2*1000000000+B2*1000+(C2 div 1000), T2 - T1. formated_timestamp() -> format_timestamp(os:timestamp()). format_timestamp({_N1, _N2, N3} = TS) -> {_Date, Time} = calendar:now_to_local_time(TS), {Hour,Min,Sec} = Time, FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", [Hour, Min, Sec, round(N3/1000)]), lists:flatten(FormatTS). %% Time is always in number os ms (milli seconds) format_time(T) -> f("~p", [T]). %% ========================================================================== f(F, A) -> lists:flatten(io_lib:format(F, A)). %% e(F) -> %% i(" " ++ F). e(F, A) -> p(get(sname), " " ++ F, A). i(F) -> i(F, []). i(F, A) -> p(get(sname), " " ++ F, A). p(undefined, F, A) -> p("- ", F, A); p(Prefix, F, A) -> io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).