aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/test/socket_test_ttest_tcp_server.erl
blob: e8d626e3d82fa9a76b6dbfff065f8558821bc730 (plain) (tree)



































                                                                             





                                                                


               




                   


                                        

                            
 








                                               


                                                                             



























                                                                          
                                                       



                                                                         

                                            
                                               



                                         
                                             
                                            

                 
    






                                                                             
                                                  
                  
                           
                                                  

                                                                        

                                   
                             
                                                                       
                                      


                                                            
                                                                    





                                                                  













                                                 










                                                                           

                     





                                                                 

                                           






                                              
                                       
                                                                           

                                                      

                                                                   

















                                                             
                                                         
                     
                                      
        









                                                                             
                                                                    
           








                                                              











                                                            








                                                                         



                                                                   
                                                                     















                                                                                  





















                                                              




                                                 

                                 

















                                                                             
                   

                                                                
                                    





                                                          
                                              





                                               



                                                                            



                                


                                                                          





                                                     








                                                    



                                                   
















                                                                          












                                                      





                                                           



                                                            




                                                                   
















                                                                  
                                                                           







                                                           




                                              

                                                      

                                                             

                                                                


                                                       



                                  

                                                  























                                                                            






                                          

                               







                                    
                              






                                       
                                                  


























































































                                                                             

                      
 



                                                     
 

                                        
 





                                                                    
 


                                                    



                                                                             

                                          
 

                                         
 

                
 

                                        
 



                                                                              
 
%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
%% 
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% 
%% %CopyrightEnd%
%%

%% ==========================================================================
%%
%% This is the "simple" server using gen_tcp. The server is supposed to be
%% as simple as possible in order to incur as little overhead as possible.
%%
%% There are three ways to run the server: active, passive or active-once.
%%
%% The server does only two things; accept connnections and then reply
%% to requests (actually the handler(s) does that). No timing or counting.
%% That is all done by the clients.
%%
%% ==========================================================================

-module(socket_test_ttest_tcp_server).

-export([
         %% This are for the test suite
         start_monitor/3,

         %% This are for starting in a shell when run "manually"
         start/2,

         stop/1
        ]).

%% Internal exports
-export([
         do_start/3
        ]).

-include_lib("kernel/include/inet.hrl").
-include("socket_test_ttest.hrl").

-define(ACC_TIMEOUT,  5000).
-define(RECV_TIMEOUT, 5000).

-define(LIB,            socket_test_ttest_lib).
-define(I(F),           ?LIB:info(F)).
-define(I(F,A),         ?LIB:info(F, A)).
-define(E(F,A),         ?LIB:error(F, A)).
-define(F(F,A),         ?LIB:format(F, A)).
-define(FORMAT_TIME(T), ?LIB:format_time(T)).
-define(T(),            ?LIB:t()).
-define(TDIFF(T1,T2),   ?LIB:tdiff(T1, T2)).


%% ==========================================================================

start_monitor(Node, Transport, Active) when (Node =/= node()) ->
    case rpc:call(Node, ?MODULE, do_start, [self(), Transport, Active]) of
        {badrpc, _} = Reason ->
            {error, Reason};
        {ok, {Pid, AddrPort}} ->
            MRef = erlang:monitor(process, Pid),
            {ok, {{Pid, MRef}, AddrPort}};
        {error, _} = ERROR ->
            ERROR
    end;
start_monitor(_, Transport, Active) ->
    case do_start(self(), Transport, Active) of
        {ok, {Pid, AddrPort}} ->
            MRef = erlang:monitor(process, Pid),
            {ok, {{Pid, MRef}, AddrPort}};
        {error, _} = ERROR ->
            ERROR
    end.
            


start(Transport, Active) ->
    do_start(self(), Transport, Active).


do_start(Parent, Transport, Active)
  when is_pid(Parent) andalso
       (is_atom(Transport) orelse is_tuple(Transport)) andalso
       (is_boolean(Active) orelse (Active =:= once)) ->
    Starter    = self(),
    ServerInit = fun() -> put(sname, "server"),
                          server_init(Starter, Parent, Transport, Active)
                 end,
    {Pid, MRef} = spawn_monitor(ServerInit),
    receive
        {'DOWN', MRef, process, Pid, Reason} ->
            {error, Reason};
        {?MODULE, Pid, {ok, AddrPort}} ->
            erlang:demonitor(MRef),
            {ok, {Pid, AddrPort}};
        {?MODULE, Pid, {error, _} = ERROR} ->
            erlang:demonitor(MRef, [flush]),
            ERROR
    end.
    

stop(Pid) when is_pid(Pid) ->
    req(Pid, stop).


%% ==========================================================================

server_init(Starter, Parent, Transport, Active) ->
    ?I("init with"
       "~n   Transport: ~p"
       "~n   Active:    ~p", [Transport, Active]),
    {Mod, Listen, StatsInterval} = process_transport(Transport, Active),
    case Listen(0) of
        {ok, LSock} ->
            case Mod:port(LSock) of
                {ok, Port} ->
		    Addr = which_addr(), % This is just for convenience
                    ?I("listening on:"
                       "~n   Addr: ~p (~s)"
                       "~n   Port: ~w"
                       "~n", [Addr, inet:ntoa(Addr), Port]),
                    Starter ! {?MODULE, self(), {ok, {Addr, Port}}},
                    server_loop(#{parent         => Parent,
				  mod            => Mod,
                                  active         => Active,
                                  lsock          => LSock,
                                  handlers       => [],
                                  stats_interval => StatsInterval,
				  %% Accumulation
				  runtime  => 0,
				  mcnt     => 0,
				  bcnt     => 0,
				  hcnt     => 0
				 });
                {error, PReason} ->
                    (catch Mod:close(LSock)),
                    exit({port, PReason})
            end;
        {error, LReason} ->
            exit({listen, LReason})
    end.

process_transport(Mod, _) when is_atom(Mod) ->
    {Mod, fun(Port) -> Mod:listen(Port) end, infinity};
process_transport({Mod, #{stats_interval := T} = Opts}, Active)
  when (Active =/= false) ->
    {Mod, fun(Port) -> Mod:listen(Port, Opts#{stats_to => self()}) end, T};
process_transport({Mod, #{stats_interval := T} = Opts}, _Active) ->
    {Mod, fun(Port) -> Mod:listen(Port, Opts) end, T};
process_transport({Mod, Opts}, _Active) ->
    {Mod, fun(Port) -> Mod:listen(Port, Opts) end, infinity}.



server_loop(State) ->
    server_loop( server_handle_message( server_accept(State) ) ).

server_accept(#{mod      := Mod,
		active   := Active,
                lsock    := LSock,
                handlers := Handlers} = State) ->
    case Mod:accept(LSock, ?ACC_TIMEOUT) of
        {ok, Sock} ->
            ?I("accepted connection from ~s", 
               [case Mod:peername(Sock) of
                    {ok, Peer} ->
                        format_peername(Peer);
                    {error, _} ->
                        "-"
                end]),
            {Pid, _} = handler_start(),
            ?I("handler ~p started -> try transfer socket control", [Pid]),
            case Mod:controlling_process(Sock, Pid) of
                ok ->
                    maybe_start_stats_timer(State, Pid),
                    ?I("server-accept: handler ~p started", [Pid]),
                    handler_continue(Pid, Mod, Sock, Active),
                    Handlers2 = [Pid | Handlers],
                    State#{handlers => Handlers2};
                {error, CPReason} ->
		    (catch Mod:close(Sock)),
		    (catch Mod:close(LSock)),
                    exit({controlling_process, CPReason})
            end;
        {error, timeout} ->
            State;
        {error, AReason} ->
	    (catch Mod:close(LSock)),
            exit({accept, AReason})
    end.

format_peername({Addr, Port}) ->
    case inet:gethostbyaddr(Addr) of
        {ok, #hostent{h_name = N}} ->
            ?F("~s (~s:~w)", [N, inet:ntoa(Addr), Port]);
        {error, _} ->
            ?F("~p, ~p", [Addr, Port])
    end.

maybe_start_stats_timer(#{active := Active, stats_interval := Time}, Handler)
  when (Active =/= false) andalso (is_integer(Time) andalso (Time > 0)) ->
    start_stats_timer(Time, "handler", Handler);
maybe_start_stats_timer(_, _) ->
    ok.

start_stats_timer(Time, ProcStr, Pid) ->
    erlang:start_timer(Time, self(), {stats, Time, ProcStr, Pid}).

server_handle_message(#{parent := Parent, handlers := H} = State) ->
    receive
        {timeout, _TRef, {stats, Interval, ProcStr, Pid}} ->
            case server_handle_stats(ProcStr, Pid) of
                ok ->
                    start_stats_timer(Interval, ProcStr, Pid);
                skip ->
                    ok
            end,
            State;
            
        {?MODULE, Ref, Parent, stop} ->
            reply(Parent, Ref, ok),
            lists:foreach(fun(P) -> handler_stop(P) end, H),
            exit(normal);

        {'DOWN', _MRef, process, Pid, Reason} -> 
	    server_handle_down(Pid, Reason, State)
            
    after 0 ->
            State
    end.

server_handle_stats(ProcStr, Pid) ->
    case ?LIB:formated_process_stats(Pid) of
        "" ->
            skip;
        FormatedStats ->
            ?I("Statistics for ~s ~p:~s", [ProcStr, Pid, FormatedStats]),
            ok
    end.


server_handle_down(Pid, Reason, #{handlers := Handlers} = State) ->
    case lists:delete(Pid, Handlers) of
        Handlers ->
            ?I("unknown process ~p died", [Pid]),                    
            State;
        Handlers2 ->
            server_handle_handler_down(Pid, Reason, State#{handlers => Handlers2})
    end.


server_handle_handler_down(Pid,
			   {done, RunTime, MCnt, BCnt},
			   #{runtime := AccRunTime,
			     mcnt    := AccMCnt,
			     bcnt    := AccBCnt,
			     hcnt    := AccHCnt} = State) ->
    AccRunTime2 = AccRunTime + RunTime,
    AccMCnt2    = AccMCnt + MCnt,
    AccBCnt2    = AccBCnt + BCnt,
    AccHCnt2    = AccHCnt + 1,
    ?I("handler ~p (~w) done => accumulated results: "
       "~n   Run Time:      ~s ms"
       "~n   Message Count: ~s"
       "~n   Byte Count:    ~s",
       [Pid, AccHCnt2,
        ?FORMAT_TIME(AccRunTime2),
        if (AccRunTime2 > 0) ->
                ?F("~w => ~w (~w) msgs / ms", 
                   [AccMCnt2,
                    AccMCnt2 div AccRunTime2,
                    (AccMCnt2 div AccHCnt2) div AccRunTime2]);
           true ->
                ?F("~w", [AccMCnt2])
        end,
        if (AccRunTime2 > 0) ->
                ?F("~w => ~w (~w) bytes / ms",
                   [AccBCnt2,
                    AccBCnt2 div AccRunTime2,
                    (AccBCnt2 div AccHCnt2) div AccRunTime2]);
           true ->
                ?F("~w", [AccBCnt2])
        end]),
    State#{runtime => AccRunTime2,
	   mcnt    => AccMCnt2,
	   bcnt    => AccBCnt2,
	   hcnt    => AccHCnt2};
server_handle_handler_down(Pid, Reason, State) ->
    ?I("handler ~p terminated: "
       "~n   ~p", [Pid, Reason]),
    State.



%% ==========================================================================

handler_start() ->
    Self         = self(),
    HandlerInit = fun() -> put(sname, "handler"), handler_init(Self) end,
    spawn_monitor(HandlerInit).

handler_continue(Pid, Mod, Sock, Active) ->
    req(Pid, {continue, Mod, Sock, Active}).

handler_stop(Pid) ->
    req(Pid, stop).

handler_init(Parent) ->
    ?I("starting"),
    receive
	{?MODULE, Ref, Parent, {continue, Mod, Sock, Active}} ->
	    ?I("received continue"),
	    reply(Parent, Ref, ok),
	    handler_initial_activation(Mod, Sock, Active),
	    handler_loop(#{parent     => Parent,
			   mod        => Mod,
			   sock       => Sock,
			   active     => Active,
			   start      => ?T(),
			   mcnt       => 0,
			   bcnt       => 0,
			   last_reply => none,
			   acc        => <<>>})

    after 5000 ->
	    ?I("timeout when message queue: "
               "~n   ~p"
               "~nwhen"
               "~n   Parent: ~p", [process_info(self(), messages), Parent]),
	    handler_init(Parent)
    end.

handler_loop(State) ->
    handler_loop( handler_handle_message( handler_recv_message(State) ) ).

%% When passive, we read *one* request and then attempt to reply to it.
handler_recv_message(#{mod        := Mod,
		       sock       := Sock,
                       active     := false,
                       mcnt       := MCnt,
                       bcnt       := BCnt,
                       last_reply := LID} = State) ->
    case handler_recv_message2(Mod, Sock) of
        {ok, {MsgSz, ID, Body}} ->
            handler_send_reply(Mod, Sock, ID, Body),
            State#{mcnt       => MCnt + 1,
                   bcnt       => BCnt + MsgSz,
		   last_reply => ID};
        {error, closed} ->
            handler_done(State);
        {error, timeout} ->
	    ?I("timeout when: "
               "~n   MCnt: ~p"
               "~n   BCnt: ~p"
               "~n   LID:  ~p", [MCnt, BCnt, LID]),
            State
    end;


%% When "active" (once or true), we receive one data "message", which may 
%% contain any number of requests or only part of a request. Then we 
%% process this data together with whatever we had "accumulated" from 
%% prevous messages. Each request will be extracted and replied to. If
%% there is some data left, not enough for a complete request, we store
%% this in 'acc' (accumulate it).
handler_recv_message(#{mod        := Mod,
		       sock       := Sock,
                       active     := Active,
                       mcnt       := MCnt,
                       bcnt       := BCnt,
                       last_reply := LID,
                       acc        := Acc} = State) ->
    case handler_recv_message3(Mod, Sock, Acc, LID) of
	{ok, {MCnt2, BCnt2, LID2}, NewAcc} ->
	    handler_maybe_activate(Mod, Sock, Active),
	    State#{mcnt       => MCnt + MCnt2,
		   bcnt       => BCnt + BCnt2,
		   last_reply => LID2,
		   acc        => NewAcc};

        {error, closed} ->
            if
                (size(Acc) =:= 0) ->
                    handler_done(State);
                true ->
                    ?E("client done with partial message: "
                       "~n   Last Reply Sent: ~w"
                       "~n   Message Count:   ~w"
                       "~n   Byte    Count:   ~w"
                       "~n   Partial Message: ~w bytes",
                       [LID, MCnt, BCnt, size(Acc)]),
                    exit({closed_with_partial_message, LID})
            end;

        {error, timeout} ->
	    ?I("timeout when: "
               "~n   MCnt:      ~p"
               "~n   BCnt:      ~p"
               "~n   LID:       ~p"
               "~n   size(Acc): ~p", [MCnt, BCnt, LID, size(Acc)]),
            State
    end.

handler_process_data(Acc, Mod, Sock, LID) ->
    handler_process_data(Acc, Mod, Sock, 0, 0, LID).

%% Extract each complete request, one at a time.
handler_process_data(<<?TTEST_TAG:32,
		       ?TTEST_TYPE_REQUEST:32,
 		       ID:32,
 		       SZ:32,
 		       Rest/binary>>,
 		     Mod, Sock,
 		     MCnt, BCnt, _LID) when (size(Rest) >= SZ)  ->
    <<Body:SZ/binary, Rest2/binary>> = Rest,
    case handler_send_reply(Mod, Sock, ID, Body) of
 	ok ->
 	    handler_process_data(Rest2, Mod, Sock, MCnt+1, BCnt+16+SZ, ID);
 	{error, _} = ERROR ->
 	    ERROR
    end;
handler_process_data(Data, _Mod, _Sock, MCnt, BCnt, LID) ->
    {ok, {MCnt, BCnt, LID}, Data}.
	    
	    
handler_recv_message2(Mod, Sock) ->
    case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of
        {ok, <<?TTEST_TAG:32,
               ?TTEST_TYPE_REQUEST:32,
               ID:32,
               SZ:32>> = Hdr} ->
            case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of
                {ok, Body} when (SZ =:= size(Body)) ->
                    {ok, {size(Hdr) + size(Body), ID, Body}};
                {error, BReason} ->
                    ?E("failed reading body (~w) of message ~w:"
                       "~n   ~p", [SZ, ID, BReason]),
                    exit({recv, body, ID, SZ, BReason})
            end;
        {error, timeout} = ERROR ->
            ERROR;
        {error, closed} = ERROR ->
            ERROR;
        {error, HReason} ->
            ?E("Failed reading header of message:"
               "~n   ~p", [HReason]),
            exit({recv, header, HReason})
    end.


handler_recv_message3(Mod, Sock, Acc, LID) ->
    receive
        {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse
			       (TagClosed =:= socket_closed) ->
            {error, closed};

        {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse
				    (TagErr =:= socket_error) ->
            {error, Reason};

        {Tag, Sock, Msg} when (Tag =:= tcp) orelse
			      (Tag =:= socket) ->
            handler_process_data(<<Acc/binary, Msg/binary>>, Mod, Sock, LID)
        
    after ?RECV_TIMEOUT ->
            {error, timeout}
    end.



handler_send_reply(Mod, Sock, ID, Data) ->
    SZ = size(Data),
    Msg = <<?TTEST_TAG:32,
	    ?TTEST_TYPE_REPLY:32,
	    ID:32,
	    SZ:32,
	    Data/binary>>,
    case Mod:send(Sock, Msg) of
        ok ->
            ok;
        {error, Reason} ->
            (catch Mod:close(Sock)),
            exit({send, Reason})
    end.


handler_done(State) ->
    handler_done(State, ?T()).

handler_done(#{start := Start,
	       mod   := Mod,
               sock  := Sock,
               mcnt  := MCnt,
               bcnt  := BCnt}, Stop) ->
    (catch Mod:close(Sock)),
    exit({done, ?TDIFF(Start, Stop), MCnt, BCnt}).


handler_handle_message(#{parent := Parent} = State) ->
    receive
        {'EXIT', Parent, Reason} ->
            exit({parent_exit, Reason})
    after 0 ->
            State
    end.


handler_initial_activation(_Mod, _Sock, false = _Active) ->
    ok;
handler_initial_activation(Mod, Sock, Active) ->
    Mod:active(Sock, Active).


handler_maybe_activate(Mod, Sock, once = Active) ->
    Mod:active(Sock, Active);
handler_maybe_activate(_, _, _) ->
    ok.



%% ==========================================================================

which_addr() ->
    case inet:getifaddrs() of
        {ok, IfAddrs} ->
            which_addrs(inet, IfAddrs);
        {error, Reason} ->
            exit({getifaddrs, Reason})
    end.

which_addrs(_Family, []) ->
    exit({getifaddrs, not_found});
which_addrs(Family, [{"lo", _} | IfAddrs]) ->
    %% Skip
    which_addrs(Family, IfAddrs);
which_addrs(Family, [{"docker" ++ _, _} | IfAddrs]) ->
    %% Skip docker
    which_addrs(Family, IfAddrs);
which_addrs(Family, [{"br-" ++ _, _} | IfAddrs]) ->
    %% Skip docker
    which_addrs(Family, IfAddrs);
which_addrs(Family, [{"en" ++ _, IfOpts} | IfAddrs]) ->
    %% Maybe take this one
    case which_addr(Family, IfOpts) of
        {ok, Addr} ->
            Addr;
        error ->
            which_addrs(Family, IfAddrs)
    end;
which_addrs(Family, [{_IfName, IfOpts} | IfAddrs]) ->
    case which_addr(Family, IfOpts) of
        {ok, Addr} ->
            Addr;
        error ->
            which_addrs(Family, IfAddrs)
    end.

which_addr(_, []) ->
    error;
which_addr(inet, [{addr, Addr}|_])
  when is_tuple(Addr) andalso (size(Addr) =:= 4) ->
    {ok, Addr};
which_addr(inet6, [{addr, Addr}|_])
  when is_tuple(Addr) andalso (size(Addr) =:= 8) ->
    {ok, Addr};
which_addr(Family, [_|IfOpts]) ->
    which_addr(Family, IfOpts).


%% ==========================================================================

req(Pid, Req) ->
    Ref = make_ref(),
    Pid ! {?MODULE, Ref, self(), Req},
    receive
        {'EXIT', Pid, Reason} ->
            {error, {exit, Reason}};
        {?MODULE, Ref, Reply} ->
            Reply
    end.

reply(Pid, Ref, Reply) ->
    Pid ! {?MODULE, Ref, Reply}.


%% ==========================================================================

%% t() ->
%%     os:timestamp().

%% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) ->
%%     T1 = A1*1000000000+B1*1000+(C1 div 1000), 
%%     T2 = A2*1000000000+B2*1000+(C2 div 1000), 
%%     T2 - T1.

%% formated_timestamp() ->
%%     format_timestamp(os:timestamp()).

%% format_timestamp({_N1, _N2, N3} = TS) ->
%%     {_Date, Time}  = calendar:now_to_local_time(TS),
%%     {Hour,Min,Sec} = Time,
%%     FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w",
%%                              [Hour, Min, Sec, round(N3/1000)]),  
%%     lists:flatten(FormatTS).

%% %% Time is always in number os ms (milli seconds)
%% format_time(T) ->
%%     f("~p", [T]).


%% ==========================================================================

%% f(F, A) ->
%%     lists:flatten(io_lib:format(F, A)).

%% e(F, A) ->
%%     p(get(sname), "<ERROR> " ++ F, A).

%% i(F) ->
%%     i(F, []).

%% i(F, A) ->
%%     p(get(sname), "<INFO> " ++ F, A).

%% p(undefined, F, A) ->
%%     p("- ", F, A);
%% p(Prefix, F, A) ->
%%     io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).