%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2018-2018. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% ========================================================================== %% %% This is the "simple" server using gen_tcp. The server is supposed to be %% as simple as possible in order to incur as little overhead as possible. %% %% There are three ways to run the server: active, passive or active-once. %% %% The server does only two things; accept connnections and then reply %% to requests (actually the handler(s) does that). No timing or counting. %% That is all done by the clients. %% %% ========================================================================== -module(socket_test_ttest_tcp_server). -export([ start_monitor/2, stop/1 ]). -include_lib("kernel/include/inet.hrl"). -include("socket_test_ttest.hrl"). -define(ACC_TIMEOUT, 10000). -define(RECV_TIMEOUT, 10000). %% ========================================================================== start_monitor(Mod, Active) when is_atom(Mod) andalso is_boolean(Active) orelse (Active =:= once) -> Self = self(), ServerInit = fun() -> put(sname, "server"), server_init(Self, Mod, Active) end, {Pid, MRef} = spawn_monitor(ServerInit), receive {'DOWN', MRef, process, Pid, normal} -> ok; {'DOWN', MRef, process, Pid, Reason} -> {error, {exit, Reason}}; {?MODULE, Pid, {ok, Port}} -> erlang:demonitor(MRef, [flush]), {ok, {Pid, MRef, Port}}; {?MODULE, Pid, {error, _} = ERROR} -> erlang:demonitor(MRef, [flush]), ERROR end. stop(Pid) when is_pid(Pid) -> req(Pid, stop). %% ========================================================================== server_init(Parent, Mod, Active) -> i("init -> entry with" "~n Parent: ~p" "~n Mod: ~p" "~n Active: ~p", [Parent, Mod, Active]), case Mod:listen(0) of {ok, LSock} -> case Mod:port(LSock) of {ok, Port} = OK -> Addr = which_addr(), % This is just for convenience i("init -> listening on:" "~n Addr: ~p (~s)" "~n Port: ~w" "~n", [Addr, inet:ntoa(Addr), Port]), Parent ! {?MODULE, self(), OK}, server_loop(#{parent => Parent, mod => Mod, active => Active, lsock => LSock, handlers => [], %% Accumulation runtime => 0, mcnt => 0, bcnt => 0, hcnt => 0 }); {error, PReason} -> (catch Mod:close(LSock)), exit({port, PReason}) end; {error, LReason} -> exit({listen, LReason}) end. server_loop(State) -> %% i("loop -> enter"), server_loop( server_handle_message( server_accept(State) ) ). server_accept(#{mod := Mod, active := Active, lsock := LSock, handlers := Handlers} = State) -> %% i("server-accept -> entry with" %% "~n Mod: ~p" %% "~n Active: ~p" %% "~n LSock: ~p", [Mod, Active, LSock]), case Mod:accept(LSock, ?ACC_TIMEOUT) of {ok, Sock} -> %% i("server-accept -> accepted: " %% "~n Sock: ~p", [Sock]), i("accepted connection from ~s", [case Mod:peername(Sock) of {ok, Peer} -> format_peername(Peer); {error, _} -> "-" end]), {Pid, _} = handler_start(), i("handler ~p started -> try transfer socket control", [Pid]), case Mod:controlling_process(Sock, Pid) of ok -> i("server-accept: handler ~p started", [Pid]), handler_continue(Pid, Mod, Sock, Active), Handlers2 = [Pid | Handlers], State#{handlers => Handlers2}; {error, CPReason} -> (catch Mod:close(Sock)), (catch Mod:close(LSock)), exit({controlling_process, CPReason}) end; {error, timeout} -> State; {error, AReason} -> (catch Mod:close(LSock)), exit({accept, AReason}) end. format_peername({Addr, Port}) -> case inet:gethostbyaddr(Addr) of {ok, #hostent{h_name = N}} -> f("~s (~s:~w)", [N, inet:ntoa(Addr), Port]); {error, _} -> f("~p, ~p", [Addr, Port]) end. server_handle_message(#{parent := Parent, handlers := H} = State) -> %% i("server_handle_message -> enter"), receive {?MODULE, Ref, Parent, stop} -> reply(Parent, Ref, ok), lists:foreach(fun(P) -> handler_stop(P) end, H), exit(normal); {'DOWN', _MRef, process, Pid, Reason} -> server_handle_down(Pid, Reason, State) after 0 -> State end. server_handle_down(Pid, Reason, #{handlers := Handlers} = State) -> case lists:delete(Pid, Handlers) of Handlers -> i("unknown process ~p died", [Pid]), State; Handlers2 -> server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2}) end. server_handle_handler_down(Pid, {done, RunTime, MCnt, BCnt}, #{runtime := AccRunTime, mcnt := AccMCnt, bcnt := AccBCnt, hcnt := AccHCnt} = State) -> AccRunTime2 = AccRunTime + RunTime, AccMCnt2 = AccMCnt + MCnt, AccBCnt2 = AccBCnt + BCnt, AccHCnt2 = AccHCnt + 1, i("handler ~p (~w) done => accumulated results: " "~n Run Time: ~s ms" "~n Message Count: ~s" "~n Byte Count: ~s", [Pid, AccHCnt2, format_time(AccRunTime2), if (AccRunTime2 > 0) -> f("~w => ~w (~w) msgs / ms", [AccMCnt2, AccMCnt2 div AccRunTime2, (AccMCnt2 div AccHCnt2) div AccRunTime2]); true -> f("~w", [AccMCnt2]) end, if (AccRunTime2 > 0) -> f("~w => ~w (~w) bytes / ms", [AccBCnt2, AccBCnt2 div AccRunTime2, (AccBCnt2 div AccHCnt2) div AccRunTime2]); true -> f("~w", [AccBCnt2]) end]), State#{runtime => AccRunTime2, mcnt => AccMCnt2, bcnt => AccBCnt2, hcnt => AccHCnt2}; server_handle_handler_down(Pid, Reason, State) -> i("handler ~p terminated: " "~n ~p", [Pid, Reason]), State. %% ========================================================================== handler_start() -> Self = self(), HandlerInit = fun() -> put(sname, "handler"), handler_init(Self) end, spawn_monitor(HandlerInit). handler_continue(Pid, Mod, Sock, Active) -> req(Pid, {continue, Mod, Sock, Active}). handler_stop(Pid) -> req(Pid, stop). handler_init(Parent) -> i("starting"), receive {?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} -> i("received continue"), reply(Parent, Ref, ok), handler_initial_activation(Mod, Sock, Active), handler_loop(#{parent => Parent, mod => Mod, sock => Sock, active => Active, start => t(), mcnt => 0, bcnt => 0, last_reply => none, acc => <<>>}) after 5000 -> i("timeout when message queue: " "~n ~p" "~nwhen" "~n Parent: ~p", [process_info(self(), messages), Parent]), handler_init(Parent) end. handler_loop(State) -> %% i("handler-loop"), handler_loop( handler_handle_message( handler_recv_message(State) ) ). %% When passive, we read *one* request and then attempt to reply to it. handler_recv_message(#{mod := Mod, sock := Sock, active := false, mcnt := MCnt, bcnt := BCnt} = State) -> %% i("handler_recv_message(false) -> entry"), case handler_recv_message2(Mod, Sock) of {ok, {MsgSz, ID, Body}} -> handler_send_reply(Mod, Sock, ID, Body), State#{mcnt => MCnt + 1, bcnt => BCnt + MsgSz, last_reply => ID}; {error, closed} -> handler_done(State); {error, timeout} -> State end; %% When "active" (once or true), we receive one data "message", which may %% contain any number of requests or only part of a request. Then we %% process this data together with whatever we had "accumulated" from %% prevous messages. Each request will be extracted and replied to. If %% there is some data left, not enough for a complete request, we store %% this in 'acc' (accumulate it). handler_recv_message(#{mod := Mod, sock := Sock, active := Active, mcnt := MCnt, bcnt := BCnt, last_reply := LID, acc := Acc} = State) -> %% i("handler_recv_message(~w) -> entry", [Active]), case handler_recv_message3(Mod, Sock, Acc, LID) of {ok, {MCnt2, BCnt2, LID2}, NewAcc} -> handler_maybe_activate(Mod, Sock, Active), State#{mcnt => MCnt + MCnt2, bcnt => BCnt + BCnt2, last_reply => LID2, acc => NewAcc}; {error, closed} -> if (size(Acc) =:= 0) -> handler_done(State); true -> e("client done with partial message: " "~n Last Reply Sent: ~w" "~n Message Count: ~w" "~n Byte Count: ~w" "~n Partial Message: ~w bytes", [LID, MCnt, BCnt, size(Acc)]), exit({closed_with_partial_message, LID}) end; {error, timeout} -> State end. handler_process_data(Acc, Mod, Sock, LID) -> handler_process_data(Acc, Mod, Sock, 0, 0, LID). %% Extract each complete request, one at a time. handler_process_data(<>, Mod, Sock, MCnt, BCnt, _LID) when (size(Rest) >= SZ) -> <> = Rest, case handler_send_reply(Mod, Sock, ID, Body) of ok -> handler_process_data(Rest2, Mod, Sock, MCnt+1, BCnt+SZ, ID); {error, _} = ERROR -> ERROR end; handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) -> {ok, {MCnt, BCnt, LID}, Data}. handler_recv_message2(Mod, Sock) -> %% i("handler_recv_message2 -> entry"), case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <> = Hdr} -> %% i("handler_recv_message2 -> got request header: " %% "~n ID: ~p" %% "~n SZ: ~p", [ID, SZ]), case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Body} when (SZ =:= size(Body)) -> %% i("handler_recv_message2 -> got body"), {ok, {size(Hdr) + size(Body), ID, Body}}; {error, BReason} -> e("failed reading body (~w) of message ~w:" "~n ~p", [SZ, ID, BReason]), exit({recv, body, ID, SZ, BReason}) end; {error, timeout} = ERROR -> i("handler_recv_message2 -> timeout"), ERROR; {error, closed} = ERROR -> ERROR; {error, HReason} -> e("failed reading header of message:" "~n ~p", [HReason]), exit({recv, header, HReason}) end. handler_recv_message3(Mod, Sock, Acc, LID) -> receive {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse (TagClosed =:= socket_closed) -> {error, closed}; {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse (TagErr =:= socket_error) -> {error, Reason}; {Tag, Sock, Msg} when (Tag =:= tcp) orelse (Tag =:= socket) -> handler_process_data(<>, Mod, Sock, LID) after ?RECV_TIMEOUT -> {error, timeout} end. %% %% Socket in passive mode => we need to read explicitly %% handler_loop(#{sock := Sock, active := false} = State, MCnt, BCnt) -> %% %% i("try recv msg header"), %% MsgSz = %% case gen_tcp:recv(Sock, 4*4) of %% {ok, <> = Hdr} -> %% %% i("msg ~w header received (data sz is ~w bytes)", [ID, SZ]), %% case gen_tcp:recv(Sock, SZ) of %% {ok, Data} when (size(Data) =:= SZ) -> %% handler_send_reply(Sock, ID, Data), %% size(Hdr)+SZ; %% {ok, InvData} -> %% i("invalid data"), %% (catch gen_tcp:close(Sock)), %% exit({invalid_data, SZ, size(InvData)}); %% {error, Reason2} -> %% i("Data read failed: ~p", [Reason2]), %% (catch gen_tcp:close(Sock)), %% exit({recv_data, Reason2}) %% end; %% {ok, _InvHdr} -> %% (catch gen_tcp:close(Sock)), %% exit(invalid_hdr); %% {error, closed} -> %% i("we are done: " %% "~n Message Count: ~p" %% "~n Byte Count: ~p", [MCnt, BCnt]), %% exit(normal); %% {error, Reason1} -> %% i("Header read failed: ~p", [Reason1]), %% (catch gen_tcp:close(Sock)), %% exit({recv_hdr, Reason1}) %% end, %% handler_loop(State, MCnt+1, BCnt+MsgSz); %% %% Socket in active mode (once | true) => data messages arrive %% handler_loop(#{sock := Sock, active := Active} = State, MCnt, BCnt) -> %% %% i("await msg"), %% try handler_recv_request(Sock, Active) of %% {ID, Data, MsgSz} -> %% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), %% handler_send_reply(Sock, ID, Data), %% handler_maybe_activate(Sock, Active), %% handler_loop(State, MCnt+1, BCnt+MsgSz) %% catch %% throw:tcp_closed -> %% i("we are done: " %% "~n Message Count: ~p" %% "~n Byte Count: ~p", [MCnt, BCnt]), %% exit(normal); %% throw:{tcp_error, Reason} -> %% i(" TCP error ~p when: " %% "~n Message Count: ~p" %% "~n Byte Count: ~p", [Reason, MCnt, BCnt]), %% exit({tcp_error, Reason}) %% end. %% %% {ID, Data, MsgSz} = handler_recv_request(Sock, Active), %% %% i("msg ~w received (data sz is ~w bytes)", [ID, size(Data)]), %% %% handler_send_reply(Sock, ID, Data), %% %% handler_maybe_activate(Sock, Active), %% %% handler_loop(State, MCnt+1, BCnt+MsgSz). %% handler_recv_request(Sock, Active) -> %% %% In theory we should also be ready for a partial header, %% %% but I can't be bothered... %% receive %% {tcp_closed, Sock} -> %% throw(tcp_closed); %% {tcp_error, Sock, Reason} -> %% throw({tcp_error, Reason}); %% {tcp, Sock, <> = Msg} when (size(Data) =:= SZ) -> %% %% i("[complete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), %% {ID, Data, size(Msg)}; %% {tcp, Sock, <> = Msg} when (size(Data) < SZ) -> %% %% i("[incomplete] msg ~w received (data sz is ~w bytes)", [ID, SZ]), %% handler_recv_request_data(Sock, Active, ID, SZ, Data, size(Msg)) %% end. %% handler_recv_request_data(Sock, Active, ID, SZ, AccData, AccMsgSz) -> %% handler_maybe_activate(Sock, Active), %% receive %% {tcp_closed, Sock} -> %% %% i("we are done (incomplete data)"), %% throw(tcp_closed); %% {tcp_error, Sock, Reason} -> %% throw({tcp_error, Reason}); %% {tcp, Sock, Data} when (SZ =:= (size(AccData) + size(Data))) -> %% %% i("[complete] received the remaining data (~w bytes) for msg ~w", %% %% [size(Data), ID]), %% {ID, <>, AccMsgSz+size(Data)}; %% {tcp, Sock, Data} -> %% %% i("[incomplete] received ~w bytes of data for for msg ~w", %% %% [size(Data), ID]), %% handler_recv_request_data(Sock, Active, ID, SZ, %% <>, %% AccMsgSz+size(Data)) %% end. handler_send_reply(Mod, Sock, ID, Data) -> SZ = size(Data), Msg = <>, %% i("handler-send-reply -> try send reply ~w: ~w bytes", [ID, size(Msg)]), case Mod:send(Sock, Msg) of ok -> %% i("handler-send-reply -> reply ~w (~w bytes) sent", [ID, size(Msg)]), ok; {error, Reason} -> (catch Mod:close(Sock)), exit({send, Reason}) end. handler_done(State) -> handler_done(State, t()). handler_done(#{start := Start, mod := Mod, sock := Sock, mcnt := MCnt, bcnt := BCnt}, Stop) -> (catch Mod:close(Sock)), exit({done, tdiff(Start, Stop), MCnt, BCnt}). handler_handle_message(#{parent := Parent} = State) -> receive {'EXIT', Parent, Reason} -> exit({parent_exit, Reason}) after 0 -> State end. handler_initial_activation(_Mod, _Sock, false = _Active) -> ok; handler_initial_activation(Mod, Sock, Active) -> Mod:active(Sock, Active). handler_maybe_activate(Mod, Sock, once = Active) -> Mod:active(Sock, Active); handler_maybe_activate(_, _, _) -> ok. %% ========================================================================== which_addr() -> case inet:getifaddrs() of {ok, IfAddrs} -> which_addrs(inet, IfAddrs); {error, Reason} -> exit({getifaddrs, Reason}) end. which_addrs(_Family, []) -> exit({getifaddrs, not_found}); which_addrs(Family, [{"lo", _} | IfAddrs]) -> %% Skip which_addrs(Family, IfAddrs); which_addrs(Family, [{"docker" ++ _, _} | IfAddrs]) -> %% Skip docker which_addrs(Family, IfAddrs); which_addrs(Family, [{"br-" ++ _, _} | IfAddrs]) -> %% Skip docker which_addrs(Family, IfAddrs); which_addrs(Family, [{"en" ++ _, IfOpts} | IfAddrs]) -> %% Maybe take this one case which_addr(Family, IfOpts) of {ok, Addr} -> Addr; error -> which_addrs(Family, IfAddrs) end; which_addrs(Family, [{_IfName, IfOpts} | IfAddrs]) -> case which_addr(Family, IfOpts) of {ok, Addr} -> Addr; error -> which_addrs(Family, IfAddrs) end. which_addr(_, []) -> error; which_addr(inet, [{addr, Addr}|_]) when is_tuple(Addr) andalso (size(Addr) =:= 4) -> {ok, Addr}; which_addr(inet6, [{addr, Addr}|_]) when is_tuple(Addr) andalso (size(Addr) =:= 8) -> {ok, Addr}; which_addr(Family, [_|IfOpts]) -> which_addr(Family, IfOpts). %% ========================================================================== req(Pid, Req) -> Ref = make_ref(), Pid ! {?MODULE, Ref, self(), Req}, receive {'EXIT', Pid, Reason} -> {error, {exit, Reason}}; {?MODULE, Ref, Reply} -> Reply end. reply(Pid, Ref, Reply) -> Pid ! {?MODULE, Ref, Reply}. %% ========================================================================== t() -> os:timestamp(). tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> T1 = A1*1000000000+B1*1000+(C1 div 1000), T2 = A2*1000000000+B2*1000+(C2 div 1000), T2 - T1. formated_timestamp() -> format_timestamp(os:timestamp()). format_timestamp({_N1, _N2, N3} = TS) -> {_Date, Time} = calendar:now_to_local_time(TS), {Hour,Min,Sec} = Time, FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", [Hour, Min, Sec, round(N3/1000)]), lists:flatten(FormatTS). %% Time is always in number os ms (milli seconds) format_time(T) -> f("~p", [T]). %% ========================================================================== f(F, A) -> lists:flatten(io_lib:format(F, A)). e(F, A) -> p(get(sname), " " ++ F, A). i(F) -> i(F, []). i(F, A) -> p(get(sname), " " ++ F, A). p(undefined, F, A) -> p("- ", F, A); p(Prefix, F, A) -> io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).