aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/test/socket_test_ttest_tcp_client.erl
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/test/socket_test_ttest_tcp_client.erl')
-rw-r--r--erts/emulator/test/socket_test_ttest_tcp_client.erl617
1 files changed, 617 insertions, 0 deletions
diff --git a/erts/emulator/test/socket_test_ttest_tcp_client.erl b/erts/emulator/test/socket_test_ttest_tcp_client.erl
new file mode 100644
index 0000000000..1bd1bc54e9
--- /dev/null
+++ b/erts/emulator/test/socket_test_ttest_tcp_client.erl
@@ -0,0 +1,617 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%% ==========================================================================
+%%
+%% This is the "simple" client using gen_tcp. The client is supposed to be
+%% as simple as possible in order to incur as little overhead as possible.
+%%
+%% There are three ways to run the client: active, passive or active-once.
+%%
+%% The client is the entity that controls the test, timing and counting.
+%%
+%% ==========================================================================
+%%
+%% Before the actual test starts, the client performs a "warmup".
+%% The warmup has two functions. First, to ensure that everything is "loaded"
+%% and, second, to calculate an approximate roundtrip time, in order to
+%% "know" how many iterations we should make (to run for the expected time).
+%% This is not intended to be exact, but just to ensure that all tests take
+%% approx the same time to run.
+%%
+%% ==========================================================================
+
+-module(socket_test_ttest_tcp_client).
+
+-export([
+ start_monitor/4, start_monitor/5, start_monitor/7,
+ stop/1
+ ]).
+
+-include_lib("kernel/include/inet.hrl").
+-include("socket_test_ttest.hrl").
+-include("socket_test_ttest_client.hrl").
+
+-define(RECV_TIMEOUT, 10000).
+-define(MAX_OUTSTANDING_DEFAULT_1, 100).
+-define(MAX_OUTSTANDING_DEFAULT_2, 10).
+-define(MAX_OUTSTANDING_DEFAULT_3, 3).
+
+
+-type active() :: once | boolean().
+-type msg_id() :: 1..3.
+-type max_outstanding() :: pos_integer().
+-type runtime() :: pos_integer().
+
+
+%% ==========================================================================
+
+-spec start_monitor(Mod, Active, Addr, Port) -> term() when
+ Mod :: atom(),
+ Active :: active(),
+ Addr :: inet:ip_address(),
+ Port :: inet:port_number().
+
+%% RunTime is in number of ms.
+start_monitor(Mod, Active, Addr, Port) ->
+ start_monitor(Mod, Active, Addr, Port, ?MSG_ID_DEFAULT).
+
+-spec start_monitor(Mod, Active, Addr, Port, MsgID) -> term() when
+ Mod :: atom(),
+ Active :: active(),
+ Addr :: inet:ip_address(),
+ Port :: inet:port_number(),
+ MsgID :: msg_id().
+
+%% RunTime is in number of ms.
+start_monitor(Mod, Active, Addr, Port, 1 = MsgID) ->
+ start_monitor(Mod, Active, Addr, Port, MsgID,
+ ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT);
+start_monitor(Mod, Active, Addr, Port, 2 = MsgID) ->
+ start_monitor(Mod, Active, Addr, Port, MsgID,
+ ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT);
+start_monitor(Mod, Active, Addr, Port, 3 = MsgID) ->
+ start_monitor(Mod, Active, Addr, Port, MsgID,
+ ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT).
+
+-spec start_monitor(Mod,
+ Active,
+ Addr,
+ Port,
+ MsgID,
+ MaxOutstanding,
+ RunTime) -> term() when
+ Mod :: atom(),
+ Active :: active(),
+ Addr :: inet:ip_address(),
+ Port :: inet:port_number(),
+ MsgID :: msg_id(),
+ MaxOutstanding :: max_outstanding(),
+ RunTime :: runtime().
+
+%% RunTime is in number of ms.
+start_monitor(Mod, Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime)
+ when is_atom(Mod) andalso
+ (is_boolean(Active) orelse (Active =:= once)) andalso
+ is_tuple(Addr) andalso
+ (is_integer(Port) andalso (Port > 0)) andalso
+ (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso
+ (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso
+ (is_integer(RunTime) andalso (RunTime > 0)) ->
+ Self = self(),
+ ClientInit = fun() -> put(sname, "client"),
+ init(Self,
+ Mod, Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime)
+ end,
+ {Pid, MRef} = spawn_monitor(ClientInit),
+ receive
+ {?MODULE, Pid, ok} ->
+ erlang:demonitor(MRef, [flush]),
+ {ok, {Pid, MRef}};
+
+ {?MODULE, Pid, {error, _} = ERROR} ->
+ erlang:demonitor(MRef, [flush]),
+ ERROR;
+
+ {'DOWN', MRef, process, Pid, normal} ->
+ ok;
+ {'DOWN', MRef, process, Pid, Reason} ->
+ {error, {exit, Reason}}
+
+ end.
+
+
+%% We should not normally stop this (it terminates when its done).
+stop(Pid) when is_pid(Pid) ->
+ req(Pid, stop).
+
+
+%% ==========================================================================
+
+init(Parent, Mod, Active, Addr, Port,
+ MsgID, MaxOutstanding, RunTime) ->
+ i("init -> entry with"
+ "~n Parent: ~p"
+ "~n Mod: ~p"
+ "~n Active: ~p"
+ "~n Addr: ~s"
+ "~n Port: ~p"
+ "~n Msg ID: ~p (=> 16 + ~w bytes)"
+ "~n Max Outstanding: ~p"
+ "~n (Suggested) Run Time: ~p ms",
+ [Parent,
+ Mod, Active, inet:ntoa(Addr), Port,
+ MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]),
+ case Mod:connect(Addr, Port) of
+ {ok, Sock} ->
+ i("init -> connected"),
+ Parent ! {?MODULE, self(), ok},
+ initial_activation(Mod, Sock, Active),
+ Results = loop(#{slogan => run,
+ runtime => RunTime,
+ start => t(),
+ parent => Parent,
+ mod => Mod,
+ sock => Sock,
+ active => Active,
+ msg_data => which_msg_data(MsgID),
+ outstanding => 0,
+ max_outstanding => MaxOutstanding,
+ sid => 1,
+ rid => 1,
+ scnt => 0,
+ rcnt => 0,
+ bcnt => 0,
+ num => undefined,
+ acc => <<>>}),
+ present_results(Results),
+ (catch Mod:close(Sock)),
+ exit(normal);
+ {error, Reason} ->
+ i("init -> connect failed: ~p", [Reason]),
+ exit({connect, Reason})
+ end.
+
+which_msg_data(1) -> ?MSG_DATA1;
+which_msg_data(2) -> ?MSG_DATA2;
+which_msg_data(3) -> ?MSG_DATA3.
+
+
+present_results(#{status := ok,
+ runtime := RunTime,
+ bcnt := ByteCnt,
+ cnt := NumIterations}) ->
+ i("Results: "
+ "~n Run Time: ~s"
+ "~n ByteCnt: ~s"
+ "~n NumIterations: ~s",
+ [format_time(RunTime),
+ if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) ->
+ f("~w, ~w", [ByteCnt, RunTime]);
+ true ->
+ f("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime])
+ end,
+ if (RunTime =:= 0) ->
+ "-";
+ true ->
+ f("~p => ~p iterations / ms",
+ [NumIterations, NumIterations div RunTime])
+ end]),
+ ok;
+present_results(#{status := Failure,
+ runtime := RunTime,
+ sid := SID,
+ rid := RID,
+ scnt := SCnt,
+ rcnt := RCnt,
+ bcnt := BCnt,
+ num := Num}) ->
+ i("Time Test failed: "
+ "~n ~p"
+ "~n"
+ "~nwhen"
+ "~n"
+ "~n Run Time: ~s"
+ "~n Send ID: ~p"
+ "~n Recv ID: ~p"
+ "~n Send Count: ~p"
+ "~n Recv Count: ~p"
+ "~n Byte Count: ~p"
+ "~n Num Iterations: ~p",
+ [Failure,
+ format_time(RunTime),
+ SID, RID, SCnt, RCnt, BCnt, Num]).
+
+
+
+loop(#{runtime := RunTime} = State) ->
+ erlang:start_timer(RunTime, self(), stop),
+ try do_loop(State)
+ catch
+ throw:Results ->
+ Results
+ end.
+
+do_loop(State) ->
+ do_loop( handle_message( msg_exchange(State) ) ).
+
+msg_exchange(#{rcnt := Num, num := Num} = State) ->
+ %% i("we are done"),
+ finish(ok, State);
+msg_exchange(#{scnt := Num, num := Num} = State) ->
+ %% We are done sending more requests - now we will just await
+ %% the replies for the (still) outstanding replies.
+ %% i("we have sent all requests - (only) wait for replies"),
+ msg_exchange( recv_reply(State) );
+msg_exchange(#{outstanding := Outstanding,
+ max_outstanding := MaxOutstanding} = State)
+ when (Outstanding < MaxOutstanding) ->
+ %% i("send the (initial) requests (~w, ~w)", [Outstanding, MaxOutstanding]),
+ msg_exchange( send_request(State) );
+msg_exchange(State) ->
+ send_request( recv_reply(State) ).
+
+
+finish(ok,
+ #{start := Start, bcnt := BCnt, num := Num}) ->
+ Stop = t(),
+ throw(#{status => ok,
+ runtime => tdiff(Start, Stop),
+ bcnt => BCnt,
+ cnt => Num});
+finish(Reason,
+ #{start := Start,
+ sid := SID, rid := RID,
+ scnt := SCnt, rcnt := RCnt, bcnt := BCnt,
+ num := Num}) ->
+ Stop = t(),
+ throw(#{status => Reason,
+ runtime => tdiff(Start, Stop),
+ sid => SID,
+ rid => RID,
+ scnt => SCnt,
+ rcnt => RCnt,
+ bcnt => BCnt,
+ num => Num}).
+
+send_request(#{mod := Mod,
+ sock := Sock,
+ sid := ID,
+ scnt := Cnt,
+ outstanding := Outstanding,
+ max_outstanding := MaxOutstanding,
+ msg_data := Data} = State)
+ when (MaxOutstanding > Outstanding) ->
+ %% i("send request -> entry when"
+ %% "~n ID: ~p"
+ %% "~n Cnt: ~p"
+ %% "~n Outstanding: ~p"
+ %% "~n MaxOutstanding: ~p", [ID, Cnt, Outstanding, MaxOutstanding]),
+ SZ = size(Data),
+ Req = <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REQUEST:32,
+ ID:32,
+ SZ:32,
+ Data/binary>>,
+ case Mod:send(Sock, Req) of
+ ok ->
+ %% i("~w bytes sent", [size(Req)]),
+ State#{sid => next_id(ID),
+ scnt => Cnt + 1,
+ outstanding => Outstanding + 1};
+ {error, Reason} ->
+ e("Failed sending request: ~p", [Reason]),
+ exit({send, Reason})
+ end;
+send_request(State) ->
+ State.
+
+
+
+recv_reply(#{mod := Mod,
+ sock := Sock,
+ rid := ID,
+ active := false,
+ bcnt := BCnt,
+ rcnt := Cnt,
+ outstanding := Outstanding} = State) ->
+ %% i("recv-reply(false) -> entry with"
+ %% "~n (R)ID: ~p"
+ %% "~n (R)Cnt: ~p"
+ %% "~n BCnt: ~p"
+ %% "~n Outstanding: ~p", [ID, Cnt, BCnt, Outstanding]),
+ case recv_reply_message1(Mod, Sock, ID) of
+ {ok, MsgSz} ->
+ State#{rid => next_id(ID),
+ bcnt => BCnt + MsgSz,
+ rcnt => Cnt + 1,
+ outstanding => Outstanding - 1};
+
+ {error, timeout} ->
+ i("recv_reply(false) -> error: timeout"),
+ State;
+
+ {error, Reason} ->
+ finish(Reason, State)
+ end;
+recv_reply(#{mod := Mod,
+ sock := Sock,
+ rid := ID,
+ active := Active,
+ bcnt := BCnt,
+ scnt := SCnt,
+ rcnt := RCnt,
+ outstanding := Outstanding,
+ acc := Acc} = State) ->
+ %% i("recv-reply(~w) -> entry with"
+ %% "~n (R)ID: ~p"
+ %% "~n RCnt: ~p"
+ %% "~n BCnt: ~p"
+ %% "~n Outstanding: ~p", [Active, ID, RCnt, BCnt, Outstanding]),
+ case recv_reply_message2(Mod, Sock, ID, Acc) of
+ {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) ->
+ maybe_activate(Mod, Sock, Active),
+ State#{rid => next_id(ID),
+ bcnt => BCnt + MsgSz,
+ rcnt => RCnt + 1,
+ outstanding => Outstanding - 1,
+ acc => NewAcc};
+
+ ok ->
+ State;
+
+ {error, stop} ->
+ i("recv_reply(~w) -> stop", [Active]),
+ %% This will have the effect that no more requests are sent...
+ State#{num => SCnt, stop_started => t()};
+
+ {error, timeout} ->
+ i("recv_reply(~w) -> error: timeout", [Active]),
+ State;
+
+ {error, Reason} ->
+ finish(Reason, State)
+ end.
+
+
+%% This function reads exactly one (reply) message. No more no less.
+recv_reply_message1(Mod, Sock, ID) ->
+ %% i("recv_reply_message1 -> entry with"
+ %% "~n ID: ~w", [ID]),
+ case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
+ {ok, <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REPLY:32,
+ ID:32,
+ SZ:32>> = Hdr} ->
+ %% Receive the ping-pong reply boby
+ %% i("recv_reply_message1 -> try read body"
+ %% "~n ID: ~w", [ID]),
+ case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
+ {ok, Data} when (size(Data) =:= SZ) ->
+ {ok, size(Hdr) + size(Data)};
+ {error, Reason2} ->
+ i("recv_reply_message1 -> body error: "
+ "~n ~p: ~p", [Reason2]),
+ {error, {recv_hdr, Reason2}}
+ end;
+
+ {ok, <<BadTag:32,
+ BadType:32,
+ BadID:32,
+ BadSZ:32>>} ->
+ {error, {invalid_hdr,
+ {?TTEST_TAG, BadTag},
+ {?TTEST_TYPE_REPLY, BadType},
+ {ID, BadID},
+ BadSZ}};
+ {ok, _InvHdr} ->
+ {error, invalid_hdr};
+
+ {error, Reason1} ->
+ i("recv_reply_message1 -> hdr error: "
+ "~n ~p", [Reason1]),
+ {error, {recv_hdr, Reason1}}
+ end.
+
+
+%% This function first attempts to process the data we have already
+%% accumulated. If that is not enough for a (complete) reply, it
+%% will attempt to receive more.
+recv_reply_message2(Mod, Sock, ID, Acc) ->
+ %% i("recv_reply_message2 -> entry with"
+ %% "~n ID: ~w", [ID]),
+ case process_acc_data(ID, Acc) of
+ ok ->
+ %% No or insufficient data, so get more
+ recv_reply_message3(Mod, Sock, ID, Acc);
+
+ {ok, _} = OK -> % We already had a reply accumulated - no need to read more
+ OK;
+
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+%% This function receives a "chunk" of data, then it tries to extract
+%% one (reply) message from the accumulated and new data (combined).
+recv_reply_message3(_Mod, Sock, ID, Acc) ->
+ receive
+ {timeout, _TRef, stop} ->
+ %% i("stop - when messages: ~p", [process_info(self(), messages)]),
+ {error, stop};
+
+ {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
+ (TagClosed =:= socket_closed) ->
+ {error, closed};
+
+ {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
+ (TagErr =:= socket_error) ->
+ {error, Reason};
+
+ {Tag, Sock, Msg} when (Tag =:= tcp) orelse
+ (Tag =:= socket) ->
+ %% i("recv_reply_message3 -> got ~w byte message", [size(Msg)]),
+ process_acc_data(ID, <<Acc/binary, Msg/binary>>)
+
+ %% after ?RECV_TIMEOUT ->
+ %% {error, timeout}
+ end.
+
+
+process_acc_data(ID, <<?TTEST_TAG:32,
+ ?TTEST_TYPE_REPLY:32,
+ ID:32,
+ SZ:32,
+ Data/binary>>) when (SZ =< size(Data)) ->
+ %% i("process_acc_data -> entry with"
+ %% "~n ID: ~w"
+ %% "~n SZ: ~w", [ID, SZ]),
+ <<_Body:SZ/binary, Rest/binary>> = Data,
+ {ok, {4*4+SZ, Rest}};
+process_acc_data(ID, <<BadTag:32,
+ BadType:32,
+ BadID:32,
+ BadSZ:32,
+ _Data/binary>>)
+ when ((BadTag =/= ?TTEST_TAG) orelse
+ (BadType =/= ?TTEST_TYPE_REPLY) orelse
+ (BadID =/= ID)) ->
+ {error, {invalid_hdr,
+ {?TTEST_TAG, BadTag},
+ {?TTEST_TYPE_REPLY, BadType},
+ {ID, BadID},
+ BadSZ}};
+%% Not enough for an entire (reply) message
+process_acc_data(_ID, _Data) ->
+ ok.
+
+
+handle_message(#{parent := Parent, sock := Sock, scnt := SCnt} = State) ->
+ receive
+ {timeout, _TRef, stop} ->
+ i("stop"),
+ %% This will have the effect that no more requests are sent...
+ State#{num => SCnt, stop_started => t()};
+
+ {?MODULE, Ref, Parent, stop} ->
+ %% This *aborts* the test
+ reply(Parent, Ref, ok),
+ exit(normal);
+
+ %% Only when active
+ {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse
+ (TagClosed =:= socket_closed) ->
+ %% We should never get this (unless the server crashed)
+ exit({closed, Reason});
+
+ %% Only when active
+ {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
+ (TagErr =:= socket_error) ->
+ exit({error, Reason})
+
+ after 0 ->
+ State
+ end.
+
+
+initial_activation(_Mod, _Sock, false = _Active) ->
+ ok;
+initial_activation(Mod, Sock, Active) ->
+ Mod:active(Sock, Active).
+
+
+maybe_activate(Mod, Sock, once = Active) ->
+ Mod:active(Sock, Active);
+maybe_activate(_, _, _) ->
+ ok.
+
+
+%% ==========================================================================
+
+req(Pid, Req) ->
+ Ref = make_ref(),
+ Pid ! {?MODULE, Ref, Pid, Req},
+ receive
+ {'EXIT', Pid, Reason} ->
+ {error, {exit, Reason}};
+ {?MODULE, Ref, Reply} ->
+ Reply
+ end.
+
+reply(Pid, Ref, Reply) ->
+ Pid ! {?MODULE, Ref, Reply}.
+
+
+%% ==========================================================================
+
+next_id(ID) when (ID < ?MAX_ID) ->
+ ID + 1;
+next_id(_) ->
+ 1.
+
+
+%% ==========================================================================
+
+t() ->
+ os:timestamp().
+
+tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
+ T1 = A1*1000000000+B1*1000+(C1 div 1000),
+ T2 = A2*1000000000+B2*1000+(C2 div 1000),
+ T2 - T1.
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp({_N1, _N2, N3} = TS) ->
+ {_Date, Time} = calendar:now_to_local_time(TS),
+ {Hour,Min,Sec} = Time,
+ FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
+ [Hour, Min, Sec, round(N3/1000)]),
+ lists:flatten(FormatTS).
+
+%% Time is always in number os ms (milli seconds)
+format_time(T) ->
+ f("~p", [T]).
+
+
+%% ==========================================================================
+
+f(F, A) ->
+ lists:flatten(io_lib:format(F, A)).
+
+%% e(F) ->
+%% i("<ERROR> " ++ F).
+
+e(F, A) ->
+ p(get(sname), "<ERROR> " ++ F, A).
+
+i(F) ->
+ i(F, []).
+
+i(F, A) ->
+ p(get(sname), "<INFO> " ++ F, A).
+
+p(undefined, F, A) ->
+ p("- ", F, A);
+p(Prefix, F, A) ->
+ io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).