%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%% ==========================================================================
%%
%% This is the "simple" server using gen_tcp. The server is supposed to be
%% as simple as possible in order to incur as little overhead as possible.
%%
%% There are three ways to run the server: active, passive or active-once.
%%
%% The server does only two things; accept connnections and then reply
%% to requests (actually the handler(s) does that). No timing or counting.
%% That is all done by the clients.
%%
%% ==========================================================================
-module(socket_test_ttest_tcp_server).
-export([
start_monitor/2,
stop/1
]).
-include_lib("kernel/include/inet.hrl").
-include("socket_test_ttest.hrl").
-define(ACC_TIMEOUT, 10000).
-define(RECV_TIMEOUT, 10000).
%% ==========================================================================
start_monitor(Mod, Active)
when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) ->
Self = self(),
ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end,
{Pid, MRef} = spawn_monitor(ServerInit),
receive
{'DOWN', MRef, process, Pid, normal} ->
ok;
{'DOWN', MRef, process, Pid, Reason} ->
{error, {exit, Reason}};
{?MODULE, Pid, {ok, Port}} ->
erlang:demonitor(MRef, [flush]),
{ok, {Pid, MRef, Port}};
{?MODULE, Pid, {error, _} = ERROR} ->
erlang:demonitor(MRef, [flush]),
ERROR
end.
stop(Pid) when is_pid(Pid) ->
req(Pid, stop).
%% ==========================================================================
server_init(Parent, Mod, Active) ->
i("init -> entry with"
"~n Parent: ~p"
"~n Mod: ~p"
"~n Active: ~p", [Parent, Mod, Active]),
case Mod:listen(0) of
{ok, LSock} ->
case Mod:port(LSock) of
{ok, Port} = OK ->
Addr = which_addr(), % This is just for convenience
i("init -> listening on:"
"~n Addr: ~p (~s)"
"~n Port: ~w"
"~n", [Addr, inet:ntoa(Addr), Port]),
Parent ! {?MODULE, self(), OK},
server_loop(#{parent => Parent,
mod => Mod,
active => Active,
lsock => LSock,
handlers => [],
%% Accumulation
runtime => 0,
mcnt => 0,
bcnt => 0,
hcnt => 0
});
{error, PReason} ->
(catch Mod:close(LSock)),
exit({port, PReason})
end;
{error, LReason} ->
exit({listen, LReason})
end.
server_loop(State) ->
%% i("loop -> enter"),
server_loop( server_handle_message( server_accept(State) ) ).
server_accept(#{mod := Mod,
active := Active,
lsock := LSock,
handlers := Handlers} = State) ->
%% i("server-accept -> entry with"
%% "~n Mod: ~p"
%% "~n Active: ~p"
%% "~n LSock: ~p", [Mod, Active, LSock]),
case Mod:accept(LSock, ?ACC_TIMEOUT) of
{ok, Sock} ->
%% i("server-accept -> accepted: "
%% "~n Sock: ~p", [Sock]),
i("accepted connection from ~s",
[case Mod:peername(Sock) of
{ok, Peer} ->
format_peername(Peer);
{error, _} ->
"-"
end]),
{Pid, _} = handler_start(),
i("handler ~p started -> try transfer socket control", [Pid]),
case Mod:controlling_process(Sock, Pid) of
ok ->
i("server-accept: handler ~p started", [Pid]),
handler_continue(Pid, Mod, Sock, Active),
Handlers2 = [Pid | Handlers],
State#{handlers => Handlers2};
{error, CPReason} ->
(catch Mod:close(Sock)),
(catch Mod:close(LSock)),
exit({controlling_process, CPReason})
end;
{error, timeout} ->
State;
{error, AReason} ->
(catch Mod:close(LSock)),
exit({accept, AReason})
end.
format_peername({Addr, Port}) ->
case inet:gethostbyaddr(Addr) of
{ok, #hostent{h_name = N}} ->
f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
{error, _} ->
f("~p, ~p", [Addr, Port])
end.
server_handle_message(#{parent := Parent, handlers := H} = State) ->
%% i("server_handle_message -> enter"),
receive
{?MODULE, Ref, Parent, stop} ->
reply(Parent, Ref, ok),
lists:foreach(fun(P) -> handler_stop(P) end, H),
exit(normal);
{'DOWN', _MRef, process, Pid, Reason} ->
server_handle_down(Pid, Reason, State)
after 0 ->
State
end.
server_handle_down(Pid, Reason, #{handlers := Handlers} = State) ->
case lists:delete(Pid, Handlers) of
Handlers ->
i("unknown process ~p died", [Pid]),
State;
Handlers2 ->
server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2})
end.
server_handle_handler_down(Pid,
{done, RunTime, MCnt, BCnt},
#{runtime := AccRunTime,
mcnt := AccMCnt,
bcnt := AccBCnt,
hcnt := AccHCnt} = State) ->
AccRunTime2 = AccRunTime + RunTime,
AccMCnt2 = AccMCnt + MCnt,
AccBCnt2 = AccBCnt + BCnt,
AccHCnt2 = AccHCnt + 1,
i("handler ~p (~w) done => accumulated results: "
"~n Run Time: ~s ms"
"~n Message Count: ~s"
"~n Byte Count: ~s",
[Pid, AccHCnt2,
format_time(AccRunTime2),
if (AccRunTime2 > 0) ->
f("~w => ~w (~w) msgs / ms",
[AccMCnt2,
AccMCnt2 div AccRunTime2,
(AccMCnt2 div AccHCnt2) div AccRunTime2]);
true ->
f("~w", [AccMCnt2])
end,
if (AccRunTime2 > 0) ->
f("~w => ~w (~w) bytes / ms",
[AccBCnt2,
AccBCnt2 div AccRunTime2,
(AccBCnt2 div AccHCnt2) div AccRunTime2]);
true ->
f("~w", [AccBCnt2])
end]),
State#{runtime => AccRunTime2,
mcnt => AccMCnt2,
bcnt => AccBCnt2,
hcnt => AccHCnt2};
server_handle_handler_down(Pid, Reason, State) ->
i("handler ~p terminated: "
"~n ~p", [Pid, Reason]),
State.
%% ==========================================================================
handler_start() ->
Self = self(),
HandlerInit = fun() -> put(sname, "handler"), handler_init(Self) end,
spawn_monitor(HandlerInit).
handler_continue(Pid, Mod, Sock, Active) ->
req(Pid, {continue, Mod, Sock, Active}).
handler_stop(Pid) ->
req(Pid, stop).
handler_init(Parent) ->
i("starting"),
receive
{?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} ->
i("received continue"),
reply(Parent, Ref, ok),
handler_initial_activation(Mod, Sock, Active),
handler_loop(#{parent => Parent,
mod => Mod,
sock => Sock,
active => Active,
start => t(),
mcnt => 0,
bcnt => 0,
last_reply => none,
acc => <<>>})
after 5000 ->
i("timeout when message queue: "
"~n ~p"
"~nwhen"
"~n Parent: ~p", [process_info(self(), messages), Parent]),
handler_init(Parent)
end.
handler_loop(State) ->
%% i("handler-loop"),
handler_loop( handler_handle_message( handler_recv_message(State) ) ).
%% When passive, we read *one* request and then attempt to reply to it.
handler_recv_message(#{mod := Mod,
sock := Sock,
active := false,
mcnt := MCnt,
bcnt := BCnt} = State) ->
%% i("handler_recv_message(false) -> entry"),
case handler_recv_message2(Mod, Sock) of
{ok, {MsgSz, ID, Body}} ->
handler_send_reply(Mod, Sock, ID, Body),
State#{mcnt => MCnt + 1,
bcnt => BCnt + MsgSz,
last_reply => ID};
{error, closed} ->
handler_done(State);
{error, timeout} ->
State
end;
%% When "active" (once or true), we receive one data "message", which may
%% contain any number of requests or only part of a request. Then we
%% process this data together with whatever we had "accumulated" from
%% prevous messages. Each request will be extracted and replied to. If
%% there is some data left, not enough for a complete request, we store
%% this in 'acc' (accumulate it).
handler_recv_message(#{mod := Mod,
sock := Sock,
active := Active,
mcnt := MCnt,
bcnt := BCnt,
last_reply := LID,
acc := Acc} = State) ->
%% i("handler_recv_message(~w) -> entry", [Active]),
case handler_recv_message3(Mod, Sock, Acc, LID) of
{ok, {MCnt2, BCnt2, LID2}, NewAcc} ->
handler_maybe_activate(Mod, Sock, Active),
State#{mcnt => MCnt + MCnt2,
bcnt => BCnt + BCnt2,
last_reply => LID2,
acc => NewAcc};
{error, closed} ->
if
(size(Acc) =:= 0) ->
handler_done(State);
true ->
e("client done with partial message: "
"~n Last Reply Sent: ~w"
"~n Message Count: ~w"
"~n Byte Count: ~w"
"~n Partial Message: ~w bytes",
[LID, MCnt, BCnt, size(Acc)]),
exit({closed_with_partial_message, LID})
end;
{error, timeout} ->
State
end.
handler_process_data(Acc, Mod, Sock, LID) ->
handler_process_data(Acc, Mod, Sock, 0, 0, LID).
%% Extract each complete request, one at a time.
handler_process_data(<<?TTEST_TAG:32,
?TTEST_TYPE_REQUEST:32,
ID:32,
SZ:32,
Rest/binary>>,
Mod, Sock,
MCnt, BCnt, _LID) when (size(Rest) >= SZ) ->
<<Body:SZ/binary, Rest2/binary>> = Rest,
case handler_send_reply(Mod, Sock, ID, Body) of
ok ->
handler_process_data(Rest2, Mod, Sock, MCnt+1, BCnt+SZ, ID);
{error, _} = ERROR ->
ERROR
end;
handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) ->
{ok, {MCnt, BCnt, LID}, Data}.
handler_recv_message2(Mod, Sock) ->
%% i("handler_recv_message2 -> entry"),
case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
{ok, <<?TTEST_TAG:32,
?TTEST_TYPE_REQUEST:32,
ID:32,
SZ:32>> = Hdr} ->
%% i("handler_recv_message2 -> got request header: "
%% "~n ID: ~p"
%% "~n SZ: ~p", [ID, SZ]),
case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
{ok, Body} when (SZ =:= size(Body)) ->
%% i("handler_recv_message2 -> got body"),
{ok, {size(Hdr) + size(Body), ID, Body}};
{error, BReason} ->
e("failed reading body (~w) of message ~w:"
"~n ~p", [SZ, ID, BReason]),
exit({recv, body, ID, SZ, BReason})
end;
{error, timeout} = ERROR ->
i("handler_recv_message2 -> timeout"),
ERROR;
{error, closed} = ERROR ->
ERROR;
{error, HReason} ->
e("failed reading header of message:"
"~n ~p", [HReason]),
exit({recv, header, HReason})
end.
handler_recv_message3(Mod, Sock, Acc, LID) ->
receive
{TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
(TagClosed =:= socket_closed) ->
{error, closed};
{TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
(TagErr =:= socket_error) ->
{error, Reason};
{Tag, Sock, Msg} when (Tag =:= tcp) orelse
(Tag =:= socket) ->
handler_process_data(<<Acc/binary, Msg/binary>>, Mod, Sock, LID)
after ?RECV_TIMEOUT ->
{error, timeout}
end.
%% %% Socket in passive mode => we need to read explicitly
%% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) ->
%% %% i("try recv msg header"),
%% MsgSz =
%% case gen_tcp:recv(Sock, 4*4) of
%% {ok, <<?SOCKET_SIMPLE_TAG:32,
%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
%% ID:32,
%% SZ:32>> = Hdr} ->
%% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]),
%% case gen_tcp:recv(Sock, SZ) of
%% {ok, Data} when (size(Data) =:= SZ) ->
%% handler_send_reply(Sock, ID, Data),
%% size(Hdr)+SZ;
%% {ok, InvData} ->
%% i("invalid data"),
%% (catch gen_tcp:close(Sock)),
%% exit({invalid_data, SZ, size(InvData)});
%% {error, Reason2} ->
%% i("Data read failed: ~p", [Reason2]),
%% (catch gen_tcp:close(Sock)),
%% exit({recv_data, Reason2})
%% end;
%% {ok, _InvHdr} ->
%% (catch gen_tcp:close(Sock)),
%% exit(invalid_hdr);
%% {error, closed} ->
%% i("we are done: "
%% "~n Message Count: ~p"
%% "~n Byte Count: ~p", [MCnt, BCnt]),
%% exit(normal);
%% {error, Reason1} ->
%% i("Header read failed: ~p", [Reason1]),
%% (catch gen_tcp:close(Sock)),
%% exit({recv_hdr, Reason1})
%% end,
%% handler_loop(State, MCnt+1, BCnt+MsgSz);
%% %% Socket in active mode (once | true) => data messages arrive
%% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) ->
%% %% i("await msg"),
%% try handler_recv_request(Sock, Active) of
%% {ID, Data, MsgSz} ->
%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
%% handler_send_reply(Sock, ID, Data),
%% handler_maybe_activate(Sock, Active),
%% handler_loop(State, MCnt+1, BCnt+MsgSz)
%% catch
%% throw:tcp_closed ->
%% i("we are done: "
%% "~n Message Count: ~p"
%% "~n Byte Count: ~p", [MCnt, BCnt]),
%% exit(normal);
%% throw:{tcp_error, Reason} ->
%% i("<ERROR> TCP error ~p when: "
%% "~n Message Count: ~p"
%% "~n Byte Count: ~p", [Reason, MCnt, BCnt]),
%% exit({tcp_error, Reason})
%% end.
%% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active),
%% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]),
%% %% handler_send_reply(Sock, ID, Data),
%% %% handler_maybe_activate(Sock, Active),
%% %% handler_loop(State, MCnt+1, BCnt+MsgSz).
%% handler_recv_request(Sock, Active) ->
%% %% In theory we should also be ready for a partial header,
%% %% but I can't be bothered...
%% receive
%% {tcp_closed, Sock} ->
%% throw(tcp_closed);
%% {tcp_error, Sock, Reason} ->
%% throw({tcp_error, Reason});
%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
%% ID:32,
%% SZ:32,
%% Data/binary>> = Msg} when (size(Data) =:= SZ) ->
%% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
%% {ID, Data, size(Msg)};
%% {tcp, Sock, <<?SOCKET_SIMPLE_TAG:32,
%% ?SOCKET_SIMPLE_TYPE_REQUEST:32,
%% ID:32,
%% SZ:32,
%% Data/binary>> = Msg} when (size(Data) < SZ) ->
%% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]),
%% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg))
%% end.
%% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) ->
%% handler_maybe_activate(Sock, Active),
%% receive
%% {tcp_closed, Sock} ->
%% %% i("we are done (incomplete data)"),
%% throw(tcp_closed);
%% {tcp_error, Sock, Reason} ->
%% throw({tcp_error, Reason});
%% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) ->
%% %% i("[complete] received the remaining data (~w bytes) for msg ~w",
%% %% [size(Data), ID]),
%% {ID, <<AccData/binary, Data/binary>>, AccMsgSz+size(Data)};
%% {tcp, Sock, Data} ->
%% %% i("[incomplete] received ~w bytes of data for for msg ~w",
%% %% [size(Data), ID]),
%% handler_recv_request_data(Sock, Active, ID, SZ,
%% <<AccData/binary, Data/binary>>,
%% AccMsgSz+size(Data))
%% end.
handler_send_reply(Mod, Sock, ID, Data) ->
SZ = size(Data),
Msg = <<?TTEST_TAG:32,
?TTEST_TYPE_REPLY:32,
ID:32,
SZ:32,
Data/binary>>,
%% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]),
case Mod:send(Sock, Msg) of
ok ->
%% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]),
ok;
{error, Reason} ->
(catch Mod:close(Sock)),
exit({send, Reason})
end.
handler_done(State) ->
handler_done(State, t()).
handler_done(#{start := Start,
mod := Mod,
sock := Sock,
mcnt := MCnt,
bcnt := BCnt}, Stop) ->
(catch Mod:close(Sock)),
exit({done, tdiff(Start, Stop), MCnt, BCnt}).
handler_handle_message(#{parent := Parent} = State) ->
receive
{'EXIT', Parent, Reason} ->
exit({parent_exit, Reason})
after 0 ->
State
end.
handler_initial_activation(_Mod, _Sock, false = _Active) ->
ok;
handler_initial_activation(Mod, Sock, Active) ->
Mod:active(Sock, Active).
handler_maybe_activate(Mod, Sock, once = Active) ->
Mod:active(Sock, Active);
handler_maybe_activate(_, _, _) ->
ok.
%% ==========================================================================
which_addr() ->
case inet:getifaddrs() of
{ok, IfAddrs} ->
which_addrs(inet, IfAddrs);
{error, Reason} ->
exit({getifaddrs, Reason})
end.
which_addrs(_Family, []) ->
exit({getifaddrs, not_found});
which_addrs(Family, [{"lo", _} | IfAddrs]) ->
%% Skip
which_addrs(Family, IfAddrs);
which_addrs(Family, [{"docker" ++ _, _} | IfAddrs]) ->
%% Skip docker
which_addrs(Family, IfAddrs);
which_addrs(Family, [{"br-" ++ _, _} | IfAddrs]) ->
%% Skip docker
which_addrs(Family, IfAddrs);
which_addrs(Family, [{"en" ++ _, IfOpts} | IfAddrs]) ->
%% Maybe take this one
case which_addr(Family, IfOpts) of
{ok, Addr} ->
Addr;
error ->
which_addrs(Family, IfAddrs)
end;
which_addrs(Family, [{_IfName, IfOpts} | IfAddrs]) ->
case which_addr(Family, IfOpts) of
{ok, Addr} ->
Addr;
error ->
which_addrs(Family, IfAddrs)
end.
which_addr(_, []) ->
error;
which_addr(inet, [{addr, Addr}|_])
when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
{ok, Addr};
which_addr(inet6, [{addr, Addr}|_])
when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
{ok, Addr};
which_addr(Family, [_|IfOpts]) ->
which_addr(Family, IfOpts).
%% ==========================================================================
req(Pid, Req) ->
Ref = make_ref(),
Pid ! {?MODULE, Ref, self(), Req},
receive
{'EXIT', Pid, Reason} ->
{error, {exit, Reason}};
{?MODULE, Ref, Reply} ->
Reply
end.
reply(Pid, Ref, Reply) ->
Pid ! {?MODULE, Ref, Reply}.
%% ==========================================================================
t() ->
os:timestamp().
tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
T1 = A1*1000000000+B1*1000+(C1 div 1000),
T2 = A2*1000000000+B2*1000+(C2 div 1000),
T2 - T1.
formated_timestamp() ->
format_timestamp(os:timestamp()).
format_timestamp({_N1, _N2, N3} = TS) ->
{_Date, Time} = calendar:now_to_local_time(TS),
{Hour,Min,Sec} = Time,
FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
[Hour, Min, Sec, round(N3/1000)]),
lists:flatten(FormatTS).
%% Time is always in number os ms (milli seconds)
format_time(T) ->
f("~p", [T]).
%% ==========================================================================
f(F, A) ->
lists:flatten(io_lib:format(F, A)).
e(F, A) ->
p(get(sname), "<ERROR> " ++ F, A).
i(F) ->
i(F, []).
i(F, A) ->
p(get(sname), "<INFO> " ++ F, A).
p(undefined, F, A) ->
p("- ", F, A);
p(Prefix, F, A) ->
io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).