%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%
%%----------------------------------------------------------------------
%% Purpose: The HDLT client module.
%% This is the traffic generator
%%----------------------------------------------------------------------
-module(hdlt_client).
-export([
start/1,
stop/0,
start_inets/0,
start_service/1,
release/0,
node_info/0
]).
-export([
proxy/1
]).
-include("hdlt_logger.hrl").
-define(CTRL, hdlt_ctrl).
-define(PROXY, hdlt_proxy).
-record(state,
{
mode = initial,
send_rate,
time,
stop_time,
url,
nof_reqs = 0,
nof_reps = 0,
last_req,
sizes,
socket_type,
cert_file
}).
start(Debug) ->
proc_lib:start_link(?MODULE, proxy, [Debug]).
stop() ->
(catch erlang:send(?PROXY, stop)),
ok.
start_inets() ->
?PROXY ! start_inets.
start_service(Args) ->
?PROXY ! {start_client, Args, self()},
receive
client_started ->
%% ?LOG("client service started"),
ok
end.
release() ->
?PROXY ! release.
node_info() ->
?PROXY ! {node_info, self()},
receive
{node_info, NodeInfo} ->
NodeInfo
end.
%% ---------------------------------------------------------------------
%%
%% The proxy process
%%
proxy(Debug) ->
process_flag(trap_exit, true),
erlang:register(?PROXY, self()),
SName = lists:flatten(
io_lib:format("HDLT PROXY[~p,~p]", [self(), node()])),
?SET_NAME(SName),
?SET_LEVEL(Debug),
?LOG("starting", []),
Ref = await_for_controller(10),
CtrlNode = node(Ref),
erlang:monitor_node(CtrlNode, true),
proc_lib:init_ack({ok, self()}),
?DEBUG("started", []),
proxy_loop(Ref, CtrlNode, undefined).
await_for_controller(N) when N > 0 ->
case global:whereis_name(hdlt_ctrl) of
Pid when is_pid(Pid) ->
erlang:monitor(process, Pid);
_ ->
timer:sleep(1000),
await_for_controller(N-1)
end;
await_for_controller(_) ->
proc_lib:init_ack({error, controller_not_found, nodes()}),
timer:sleep(500),
init:stop().
proxy_loop(Ref, CtrlNode, Client) ->
?DEBUG("await command", []),
receive
stop ->
?LOG("stop", []),
timer:sleep(1000),
halt();
start_inets ->
?LOG("start the inets service framework", []),
%% inets:enable_trace(max, "/tmp/inets-httpc-trace.log", all),
case (catch inets:start()) of
ok ->
?LOG("framework started", []),
proxy_loop(Ref, CtrlNode, Client);
Error ->
?LOG("failed starting inets service framework: "
"~n Error: ~p", [Error]),
timer:sleep(1000),
halt()
end;
{start_client, Args, From} ->
?LOG("start client with"
"~n Args: ~p", [Args]),
Client2 = spawn_link(fun() -> client(Args) end),
From ! client_started,
proxy_loop(Ref, CtrlNode, Client2);
release ->
?LOG("release", []),
Client ! go,
proxy_loop(Ref, CtrlNode, Client);
{node_info, Pid} ->
?LOG("received requets for node info", []),
NodeInfo = get_node_info(),
Pid ! {node_info, NodeInfo},
proxy_loop(Ref, CtrlNode, Client);
{'EXIT', Client, normal} ->
?LOG("received normal exit message from client (~p)",
[Client]),
exit(normal);
{'EXIT', Client, Reason} ->
?INFO("received exit message from client (~p)"
"~n Reason: ~p", [Client, Reason]),
%% Unexpected client termination, inform the controller and die
global:send(hdlt_ctrl, {client_exit, Client, node(), Reason}),
exit({client_exit, Reason});
{nodedown, CtrlNode} ->
?LOG("received nodedown for controller node - terminate", []),
halt();
{'DOWN', Ref, process, _, _} ->
?INFO("received DOWN message for controller - terminate", []),
%% The controller has terminated, dont care why, time to die
halt()
end.
%% ---------------------------------------------------------------------
%%
%% The client process
%%
client([SocketType, CertFile, URLBase, Sizes, Time, SendRate, Debug]) ->
SName = lists:flatten(
io_lib:format("HDLT CLIENT[~p,~p]", [self(), node()])),
?SET_NAME(SName),
?SET_LEVEL(Debug),
?LOG("starting with"
"~n SocketType: ~p"
"~n Time: ~p"
"~n SendRate: ~p", [SocketType, Time, SendRate]),
httpc:set_options([{max_pipeline_length, 0}]),
if
(SocketType =:= ssl) orelse
(SocketType =:= ossl) orelse
(SocketType =:= essl) ->
%% Ensure crypto and ssl started:
crypto:start(),
ssl:start();
true ->
ok
end,
State = #state{mode = idle,
url = URLBase,
time = Time,
send_rate = SendRate,
sizes = Sizes,
socket_type = SocketType,
cert_file = CertFile},
?DEBUG("started", []),
client_loop(State).
%% The point is to first start all client nodes and then this
%% process. Then, when they are all started, the go-ahead, go,
%% message is sent to let them lose at the same time.
client_loop(#state{mode = idle,
time = Time,
send_rate = SendRate} = State) ->
?DEBUG("[idle] awaiting the go command", []),
receive
go ->
?LOG("[idle] received go", []),
erlang:send_after(Time, self(), stop),
NewState = send_requests(State, SendRate),
client_loop(NewState#state{mode = generating,
nof_reqs = SendRate})
end;
%% In this mode the client is generating traffic.
%% It will continue to do so until the stop message
%% is received.
client_loop(#state{mode = generating} = State) ->
receive
stop ->
?LOG("[generating] received stop", []),
StopTime = timestamp(),
req_reply(State),
client_loop(State#state{mode = stopping, stop_time = StopTime});
{http, {_, {{_, 200, _}, _, _}}} ->
%% ?DEBUG("[generating] received reply - send another request", []),
NewState = send_requests(State, 1),
client_loop(NewState#state{nof_reps = NewState#state.nof_reps + 1,
nof_reqs = NewState#state.nof_reqs + 1});
{http, {ReqId, {error, Reason}}} ->
?INFO("[generating] request ~p failed: "
"~n Reason: ~p"
"~n NofReqs: ~p"
"~n NofReps: ~p",
[ReqId, Reason, State#state.nof_reqs, State#state.nof_reps]),
exit({Reason, generating, State#state.nof_reqs, State#state.nof_reps});
Else ->
?LOG("[generating] received unexpected message: "
"~n~p", [Else]),
unexpected_data(Else),
client_loop(State)
end;
%% The client no longer issues any new requests, instead it
%% waits for replies for all the oustanding requests to
%% arrive.
client_loop(#state{mode = stopping,
time = Time,
last_req = LastReqId} = State) ->
receive
{http, {LastReqId, {{_, 200, _}, _, _}}} ->
?DEBUG("[stopping] received reply for last request (~p)", [LastReqId]),
time_to_complete(State),
ok;
{http, {ReqId, {{_, 200, _}, _, _}}} ->
?DEBUG("[stopping] received reply ~p", [ReqId]),
client_loop(State);
{http, {ReqId, {error, Reason}}} ->
?INFO("[stopping] request ~p failed: "
"~n Reason: ~p"
"~n NofReqs: ~p"
"~n NofReps: ~p",
[ReqId, Reason, State#state.nof_reqs, State#state.nof_reps]),
exit({Reason, stopping, State#state.nof_reqs, State#state.nof_reps});
Else ->
?LOG("[stopping] received unexpected message: "
"~n~p", [Else]),
unexpected_data(Else),
client_loop(State)
after Time ->
?INFO("timeout when"
"~n Number of requests: ~p"
"~n Number of replies: ~p",
[State#state.nof_reqs, State#state.nof_reps]),
exit({timeout, State#state.nof_reqs, State#state.nof_reps})
end.
req_reply(#state{nof_reqs = NofReqs, nof_reps = NofReps}) ->
load_data({req_reply, node(), NofReqs, NofReps}).
time_to_complete(#state{stop_time = StopTime}) ->
StoppedTime = os:timestamp(),
load_data({time_to_complete, node(), StopTime, StoppedTime}).
load_data(Data) ->
global:send(?CTRL, {load_data, Data}).
unexpected_data(Else) ->
global:send(?CTRL, {unexpected_data, Else}).
send_requests(#state{sizes = Sizes} = State, N) ->
send_requests(State, N, Sizes).
send_requests(State, 0, Sizes) ->
State#state{sizes = Sizes};
send_requests(#state{socket_type = SocketType,
cert_file = CertFile} = State, N, [Sz | Sizes]) ->
URL = lists:flatten(io_lib:format("~s~w", [State#state.url, Sz])),
Method = get,
Request = {URL, []},
HTTPOptions =
case SocketType of
ip_comm ->
[];
_ ->
SslOpts = [{verify, 0},
{certfile, CertFile},
{keyfile, CertFile}],
case SocketType of
ssl ->
[{ssl, SslOpts}];
ossl ->
[{ssl, {ossl, SslOpts}}];
essl ->
[{ssl, {essl, SslOpts}}]
end
end,
Options = [{sync, false}],
{ok, Ref} = httpc:request(Method, Request, HTTPOptions, Options),
send_requests(State#state{last_req = Ref}, N-1, lists:append(Sizes, [Sz])).
timestamp() ->
os:timestamp().
get_node_info() ->
[{cpu_topology, erlang:system_info(cpu_topology)},
{heap_type, erlang:system_info(heap_type)},
{nof_schedulers, erlang:system_info(schedulers)},
{otp_release, erlang:system_info(otp_release)},
{version, erlang:system_info(version)},
{system_version, erlang:system_info(system_version)},
{system_architecture, erlang:system_info(system_architecture)}].