aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/test/socket_test_ttest_tcp_socket.erl
blob: 3aa3b2c50411182bac1b5f235c5ffdf06f59b108 (plain) (tree)
1
2
3
4


                   
                                                        





















                                                                           
                                         
                               
                                      








                        

                              

                                   













                                                               


                                                                             
                                                                          









                                                                             



                                                                        
                                
                     
                          


                                                                                

                                                                      



                             



                                                                        


                                         


                                                                                

                                                                      












                                                             








                                                                

                                           
                                               








                                                               
                                                 


                                                                   
                   





                                  

                                                 
                   
                                           
                             





                                                                  
                                                                    
 

                                                  


                                  

                                                                
 
                                                          

                                                         
                                                                     


                  
                                                          




                                                       
                                              



                                               
                              

                                                   
                                                  



                                               
                              

                                                      
                            


                                                                                

                                                                     





                        


                                                                           
 



                                                             
                                 

       














                                                                 


                                     
                                                                     
                                  
                                                                      


















                                                         



                                                         

             
                                                             




                                                          




                                               
                              



                                                   

                                   
                                               


                                                     
                                               








                                  

                                                 








                                 

                                                 






                                              


















                                                          

 



                                              
















                                                                             
                                                             
                                         
                                

                                                             
                        


                                                       


                                                                            
                                           

                                             






                                           
                                     

                                                      

                                                      









                                                           
                       
                                                            





                                       


                                 
                                  
                                           
                                 
                     
                                                



                                                 
                                             

                                                              


                                                              

                                                                 
                                                                   





                                                             

                                                                    






                                              







                                                   

                                  
                                       
                                  


                                             
                               





                                                           
                                            
                                   
 
                               
                                                   

                                  


                                                   
                               
                                  








































                                                                


                                     

                                 
                                
                                  
                                           
                                 
                     
                                                



                                  
                                             

                                                              


                                                              

                                                                 
                                                                   





                                                             
                               
                                                                    






                                              







                                                   

                                  
                                       



                                             
                               





                                                           
                                            
                                   
 
                               
                                                   

                                  


                                                   
                               








































                                                                


        






                                                   

                                

                               


                             





























































                                                                        


                                                                             










                            
%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 2018-2019. All Rights Reserved.
%% 
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% 
%% %CopyrightEnd%
%%

-module(socket_test_ttest_tcp_socket).

-export([
	 accept/1, accept/2,
	 active/2,
	 close/1,
	 connect/1, connect/2, connect/3,
	 controlling_process/2,
	 listen/0, listen/1, listen/2,
	 port/1,
	 peername/1,
	 recv/2, recv/3,
	 send/2,
	 shutdown/2,
	 sockname/1
	]).


-define(LIB, socket_test_lib).

-define(READER_RECV_TIMEOUT, 1000).

-define(DATA_MSG(Sock, Method, Data),
        {socket,
         #{sock => Sock, reader => self(), method => Method},
         Data}).

-define(CLOSED_MSG(Sock, Method),
        {socket_closed,
         #{sock => Sock, reader => self(), method => Method}}).

-define(ERROR_MSG(Sock, Method, Reason),
        {socket_error,
         #{sock => Sock, reader => self(), method => Method},
         Reason}).


%% ==========================================================================

%% This does not really work. Its just a placeholder for the time being...

%% getopt(Sock, Opt) when is_atom(Opt) ->
%%     socket:getopt(Sock, socket, Opt).

%% setopt(Sock, Opt, Value) when is_atom(Opt) ->
%%     socket:setopts(Sock, socket, Opt, Value).


%% ==========================================================================

%% The way we use server async its no point in doing a async accept call
%% (we do never actually run the test with more than one client).
accept(#{sock := LSock, opts := #{async  := Async,
                                  method := Method} = Opts}) ->
    case socket:accept(LSock) of
        {ok, Sock} ->
	    Self = self(),
	    Reader = spawn(fun() ->
                                   reader_init(Self, Sock, Async, false, Method)
                           end),
            maybe_start_stats_timer(Opts, Reader),
	    {ok, #{sock => Sock, reader => Reader, method => Method}};
	{error, _} = ERROR ->
	    ERROR
    end.

%% If a timeout has been explictly specified, then we do not use
%% async here. We will pass it on to the reader process.
accept(#{sock := LSock, opts := #{async  := Async,
                                  method := Method} = Opts}, Timeout) ->
    case socket:accept(LSock, Timeout) of
	{ok, Sock} ->
	    Self = self(),
	    Reader = spawn(fun() ->
                                   reader_init(Self, Sock, Async, false, Method)
                           end),
            maybe_start_stats_timer(Opts, Reader),
	    {ok, #{sock => Sock, reader => Reader, method => Method}};
	{error, _} = ERROR ->
	    ERROR
    end.


active(#{reader := Pid}, NewActive) 
  when (is_boolean(NewActive) orelse (NewActive =:= once)) ->
    Pid ! {?MODULE, active, NewActive},
    ok.


close(#{sock := Sock, reader := Pid}) ->
    Pid ! {?MODULE, stop},
    Unlink = case socket:sockname(Sock) of
                 {ok, #{family := local, path := Path}} ->
                     fun() -> os:cmd("unlink " ++ Path), ok end;
                 _ ->
                     fun() -> ok end
             end,
    Res = socket:close(Sock),
    Unlink(),
    Res.

%% Create a socket and connect it to a peer
connect(ServerPath) when is_list(ServerPath) ->
    Domain     = local,
    ClientPath = mk_unique_path(),
    LocalSA    = #{family => Domain,
                   path   => ClientPath},
    ServerSA   = #{family => Domain, path => ServerPath},
    Opts       = #{domain => Domain,
                   proto  => default,
                   method => plain},
    Cleanup = fun() -> os:cmd("unlink " ++ ClientPath), ok end,
    do_connect(LocalSA, ServerSA, Cleanup, Opts).

connect(Addr, Port) when is_tuple(Addr) andalso is_integer(Port) ->
    Domain   = inet,
    LocalSA  = any,
    ServerSA = #{family => Domain,
                 addr   => Addr,
                 port   => Port},
    Opts     = #{domain => Domain,
                 proto  => tcp,
                 method => plain},
    Cleanup  = fun() -> ok end,
    do_connect(LocalSA, ServerSA, Cleanup, Opts);
connect(ServerPath,
        #{domain := local = Domain} = Opts)
  when is_list(ServerPath) ->
    ClientPath = mk_unique_path(),
    LocalSA    = #{family => Domain,
                   path   => ClientPath},
    ServerSA   = #{family => Domain,
                   path   => ServerPath},
    Cleanup    = fun() -> os:cmd("unlink " ++ ClientPath), ok end,
    do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => default}).

connect(Addr, Port, #{domain := Domain} = Opts) ->
    LocalSA  = any,
    ServerSA = #{family => Domain,
                 addr   => Addr,
                 port   => Port},
    Cleanup  = fun() -> ok end,
    do_connect(LocalSA, ServerSA, Cleanup, Opts#{proto => tcp}).

do_connect(LocalSA, ServerSA, Cleanup, #{domain := Domain,
                                         proto  := Proto,
                                         async  := Async,
                                         method := Method} = Opts) ->
    try
	begin
	    Sock =
		case socket:open(Domain, stream, Proto) of
		    {ok, S} ->
			S;
		    {error, OReason} ->
			throw({error, {open, OReason}})
		end,
	    case socket:bind(Sock, LocalSA) of
		{ok, _} ->
		    ok;
		{error, BReason} ->
		    (catch socket:close(Sock)),
                    Cleanup(),
		    throw({error, {bind, BReason}})
	    end,
	    case socket:connect(Sock, ServerSA) of
		ok ->
		    ok;
		{error, CReason} ->
		    (catch socket:close(Sock)),
                    Cleanup(),
		    throw({error, {connect, CReason}})
	    end,
	    Self   = self(),
	    Reader = spawn(fun() ->
                                   reader_init(Self, Sock, Async, false, Method)
                           end),
            maybe_start_stats_timer(Opts, Reader),
	    {ok, #{sock => Sock, reader => Reader, method => Method}}
	end
    catch
	throw:ERROR:_ ->
	    ERROR
    end.

mk_unique_path() ->
    [NodeName | _] = string:tokens(atom_to_list(node()), [$@]),
    ?LIB:f("/tmp/esock_~s_~w", [NodeName, erlang:system_time(nanosecond)]).

maybe_start_stats_timer(#{stats_to       := Pid,
                          stats_interval := T},
                        Reader) when is_pid(Pid) ->
    erlang:start_timer(T, Pid, {stats, T, "reader", Reader});
maybe_start_stats_timer(_O, _) ->
    ok.

controlling_process(#{sock := Sock, reader := Pid}, NewPid) ->
    case socket:setopt(Sock, otp, controlling_process, NewPid) of
	ok ->
	    Pid ! {?MODULE, self(), controlling_process, NewPid},
	    receive
		{?MODULE, Pid, controlling_process} ->
		    ok
	    end;
	{error, _} = ERROR ->
	    ERROR
    end.


%% Create a listen socket
listen() ->
    listen(0).

listen(Port) when is_integer(Port) ->
    listen(Port, #{domain => inet, async => false, method => plain});
listen(Path) when is_list(Path) ->
    listen(Path, #{domain => local, async => false, method => plain}).

listen(0, #{domain := local} = Opts) ->
    listen(mk_unique_path(), Opts);
listen(Path, #{domain := local = Domain} = Opts)
  when is_list(Path) andalso (Path =/= []) ->
    SA = #{family => Domain,
           path   => Path},
    Cleanup = fun() -> os:cmd("unlink " ++ Path), ok end,
    do_listen(SA, Cleanup, Opts#{proto => default});
listen(Port, #{domain := Domain} = Opts)
  when is_integer(Port) andalso (Port >= 0) ->
    %% Bind fills in the rest
    SA = #{family => Domain,
           port   => Port},
    Cleanup = fun() -> ok end,
    do_listen(SA, Cleanup, Opts#{proto => tcp}).

do_listen(SA,
          Cleanup,
          #{domain := Domain, proto  := Proto, 
            async  := Async,  method := Method} = Opts)
  when (Method =:= plain) orelse (Method =:= msg) andalso
       is_boolean(Async) ->
    try
	begin
	    Sock = case socket:open(Domain, stream, Proto) of
		       {ok, S} ->
			   S;
		       {error, OReason} ->
			   throw({error, {open, OReason}})
		   end,
	    case socket:bind(Sock, SA) of
		{ok, _} ->
		    ok;
		{error, BReason} ->
		    (catch socket:close(Sock)),
                    Cleanup(),
		    throw({error, {bind, BReason}})
	    end,
	    case socket:listen(Sock) of
		ok ->
                    ok;
                {error, LReason} ->
		    (catch socket:close(Sock)),
                    Cleanup(),
                    throw({error, {listen, LReason}})
            end,
	    {ok, #{sock => Sock, opts => Opts}}
	end
    catch
	throw:{error, Reason}:_ ->
	    {error, Reason}
    end.


port(#{sock := Sock}) ->
    case socket:sockname(Sock) of
	{ok, #{family := local, path := Path}} ->
	    {ok, Path};
	{ok, #{port := Port}} ->
	    {ok, Port};
	{error, _} = ERROR ->
	    ERROR
    end.


peername(#{sock := Sock}) ->
    case socket:peername(Sock) of
	{ok, #{family := local, path := Path}} ->
	    {ok, Path};
	{ok, #{addr := Addr, port := Port}} ->
	    {ok, {Addr, Port}};
	{error, _} = ERROR ->
	    ERROR
    end.


recv(#{sock := Sock, method := plain}, Length) ->
    socket:recv(Sock, Length);
recv(#{sock := Sock, method := msg}, Length) ->
    case socket:recvmsg(Sock, Length, 0, [], infinity) of
        {ok, #{iov := [Bin]}} ->
            {ok, Bin};
        {error, _} = ERROR ->
            ERROR
    end.

recv(#{sock := Sock, method := plain}, Length, Timeout) ->
    socket:recv(Sock, Length, Timeout);
recv(#{sock := Sock, method := msg}, Length, Timeout) ->
    case socket:recvmsg(Sock, Length, 0, [], Timeout) of
        {ok, #{iov := [Bin]}} ->
            {ok, Bin};
        {error, _} = ERROR ->
            ERROR
    end.


send(#{sock := Sock, method := plain}, Bin) ->
    socket:send(Sock, Bin);
send(#{sock := Sock, method := msg}, Bin) ->
    socket:sendmsg(Sock, #{iov => [Bin]}).


shutdown(#{sock := Sock}, How) ->
    socket:shutdown(Sock, How).


sockname(#{sock := Sock}) ->
    case socket:sockname(Sock) of
	{ok, #{addr := Addr, port := Port}} ->
	    {ok, {Addr, Port}};
	{error, _} = ERROR ->
	    ERROR
    end.


%% ==========================================================================

reader_init(ControllingProcess, Sock, Async, Active, Method) 
  when is_pid(ControllingProcess) andalso
       is_boolean(Async) andalso
       (is_boolean(Active) orelse (Active =:= once)) andalso 
       ((Method =:= plain) orelse (Method =:= msg)) ->
    put(verbose, false),
    MRef = erlang:monitor(process, ControllingProcess),
    reader_loop(#{ctrl_proc      => ControllingProcess,
		  ctrl_proc_mref => MRef,
                  async          => Async,
                  select_info    => undefined,
                  select_num     => 0, % Count the number of select messages
		  active         => Active,
		  sock           => Sock,
                  method         => Method}).


%% Never read
reader_loop(#{active    := false,
	      ctrl_proc := Pid} = State) ->
    receive
	{?MODULE, stop} ->
            reader_exit(State, stop);

	{?MODULE, Pid, controlling_process, NewPid} ->
	    OldMRef = maps:get(ctrl_proc_mref, State),
	    erlang:demonitor(OldMRef, [flush]),
	    NewMRef = erlang:monitor(process, NewPid),
	    Pid ! {?MODULE, self(), controlling_process},
	    reader_loop(State#{ctrl_proc      => NewPid,
			       ctrl_proc_mref => NewMRef});

	{?MODULE, active, NewActive} ->
	    reader_loop(State#{active => NewActive});

	{'DOWN', MRef, process, Pid, Reason} ->
	    case maps:get(ctrl_proc_mref, State) of
		MRef ->
                    reader_exit(State, {ctrl_exit, Reason});
		_ ->
		    reader_loop(State)
	    end
    end;

%% Read *once* and then change to false
reader_loop(#{active    := once,
	      async     := false,
              sock      := Sock,
              method    := Method,
	      ctrl_proc := Pid} = State) ->
    case do_recv(Method, Sock) of
	{ok, Data} ->
	    Pid ! ?DATA_MSG(Sock, Method, Data),
	    reader_loop(State#{active => false});
	{error, timeout} ->
	    receive
		{?MODULE, stop} ->
		    reader_exit(State, stop);

		{?MODULE, Pid, controlling_process, NewPid} ->
		    OldMRef = maps:get(ctrl_proc_mref, State),
		    erlang:demonitor(OldMRef, [flush]),
		    NewMRef = erlang:monitor(process, NewPid),
		    Pid ! {?MODULE, self(), controlling_process},
		    reader_loop(State#{ctrl_proc      => NewPid,
				       ctrl_proc_mref => NewMRef});

		{?MODULE, active, NewActive} ->
		    reader_loop(State#{active => NewActive});

		{'DOWN', MRef, process, Pid, Reason} ->
		    case maps:get(ctrl_proc_mref, State) of
                        MRef ->
                            reader_exit(State, {ctrl_exit, Reason});
			_ ->
			    reader_loop(State)
		    end
	    after 0 ->
		    reader_loop(State)
	    end;

	{error, closed} = E1 ->
            Pid ! ?CLOSED_MSG(Sock, Method),
            reader_exit(State, E1);

	{error, Reason} = E2 ->
	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
            reader_exit(State, E2)
    end;
reader_loop(#{active      := once,
	      async       := true,
              select_info := undefined,
              sock        := Sock,
              method      := Method,
	      ctrl_proc   := Pid} = State) ->
    case do_recv(Method, Sock, nowait) of
        {select, SelectInfo} ->
            reader_loop(State#{select_info => SelectInfo});
	{ok, Data} ->
	    Pid ! ?DATA_MSG(Sock, Method, Data),
	    reader_loop(State#{active => false});

	{error, closed} = E1 ->
	    Pid ! ?CLOSED_MSG(Sock, Method),
            reader_exit(State, E1);

	{error, Reason} = E2 ->
	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
            reader_exit(State, E2)
    end;
reader_loop(#{active      := once,
	      async       := true,
              select_info := {select_info, _, Ref},
              select_num  := N,
              sock        := Sock,
              method      := Method,
	      ctrl_proc   := Pid} = State) ->
    receive
        {?MODULE, stop} ->
            reader_exit(State, stop);
        
        {?MODULE, Pid, controlling_process, NewPid} ->
            OldMRef = maps:get(ctrl_proc_mref, State),
            erlang:demonitor(OldMRef, [flush]),
            NewMRef = erlang:monitor(process, NewPid),
            Pid ! {?MODULE, self(), controlling_process},
            reader_loop(State#{ctrl_proc      => NewPid,
                               ctrl_proc_mref => NewMRef});
        
        {?MODULE, active, NewActive} ->
            reader_loop(State#{active => NewActive});
        
        {'DOWN', MRef, process, Pid, Reason} ->
            case maps:get(ctrl_proc_mref, State) of
                MRef ->
                    reader_exit(State, {ctrl_exit, Reason});
                _ ->
                    reader_loop(State)
            end;

        {'$socket', Sock, select, Ref} ->
            case do_recv(Method, Sock, nowait) of
                {ok, Data} when is_binary(Data) ->
                    Pid ! ?DATA_MSG(Sock, Method, Data),
                    reader_loop(State#{active      => false,
                                       select_info => undefined,
                                       select_num  => N+1});
                
                {error, closed} = E1 ->
                    Pid ! ?CLOSED_MSG(Sock, Method),
                    reader_exit(State, E1);
                
                {error, Reason} = E2 ->
                    Pid ! ?ERROR_MSG(Sock, Method, Reason),
                    reader_exit(State, E2)
            end
    end;

%% Read and forward data continuously
reader_loop(#{active    := true,
	      async     := false,
	      sock      := Sock,
              method    := Method,
	      ctrl_proc := Pid} = State) ->
    case do_recv(Method, Sock) of
	{ok, Data} ->
	    Pid ! ?DATA_MSG(Sock, Method, Data),
	    reader_loop(State);
	{error, timeout} ->
	    receive
		{?MODULE, stop} ->
                    reader_exit(State, stop);

		{?MODULE, Pid, controlling_process, NewPid} ->
		    OldMRef = maps:get(ctrl_proc_mref, State),
		    erlang:demonitor(OldMRef, [flush]),
		    NewMRef = erlang:monitor(process, NewPid),
		    Pid ! {?MODULE, self(), controlling_process},
		    reader_loop(State#{ctrl_proc      => NewPid,
				       ctrl_proc_mref => NewMRef});

		{?MODULE, active, NewActive} ->
		    reader_loop(State#{active => NewActive});

		{'DOWN', MRef, process, Pid, Reason} ->
		    case maps:get(ctrl_proc_mref, State) of
			MRef ->
                            reader_exit(State, {ctrl_exit, Reason});
			_ ->
			    reader_loop(State)
		    end
	    after 0 ->
		    reader_loop(State)
	    end;

	{error, closed} = E1 ->
	    Pid ! ?CLOSED_MSG(Sock, Method),
            reader_exit(State, E1);

	{error, Reason} = E2 ->
	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
            reader_exit(State, E2)
    end;
reader_loop(#{active      := true,
	      async       := true,
              select_info := undefined,
	      sock        := Sock,
              method      := Method,
	      ctrl_proc   := Pid} = State) ->
    case do_recv(Method, Sock) of
        {select, SelectInfo} ->
            reader_loop(State#{select_info => SelectInfo});
	{ok, Data} ->
	    Pid ! ?DATA_MSG(Sock, Method, Data),
	    reader_loop(State);

	{error, closed} = E1 ->
	    Pid ! ?CLOSED_MSG(Sock, Method),
            reader_exit(State, E1);

	{error, Reason} = E2 ->
	    Pid ! ?ERROR_MSG(Sock, Method, Reason),
            reader_exit(State, E2)
    end;
reader_loop(#{active      := true,
	      async       := true,
              select_info := {select_info, _, Ref},
              select_num  := N,
	      sock        := Sock,
              method      := Method,
	      ctrl_proc   := Pid} = State) ->
    receive
        {?MODULE, stop} ->
            reader_exit(State, stop);
        
        {?MODULE, Pid, controlling_process, NewPid} ->
            OldMRef = maps:get(ctrl_proc_mref, State),
            erlang:demonitor(OldMRef, [flush]),
            NewMRef = erlang:monitor(process, NewPid),
            Pid ! {?MODULE, self(), controlling_process},
            reader_loop(State#{ctrl_proc      => NewPid,
                               ctrl_proc_mref => NewMRef});
        
        {?MODULE, active, NewActive} ->
            reader_loop(State#{active => NewActive});
        
        {'DOWN', MRef, process, Pid, Reason} ->
            case maps:get(ctrl_proc_mref, State) of
                MRef ->
                    reader_exit(State, {ctrl_exit, Reason});
                _ ->
                    reader_loop(State)
            end;

        {'$socket', Sock, select, Ref} ->
            case do_recv(Method, Sock, nowait) of
                {ok, Data} when is_binary(Data) ->
                    Pid ! ?DATA_MSG(Sock, Method, Data),
                    reader_loop(State#{select_info => undefined,
                                       select_num  => N+1});
                
                {error, closed} = E1 ->
                    Pid ! ?CLOSED_MSG(Sock, Method),
                    reader_exit(State, E1);
                
                {error, Reason} = E2 ->
                    Pid ! ?ERROR_MSG(Sock, Method, Reason),
                    reader_exit(State, E2)
            end
    end.


do_recv(Method, Sock) ->
    do_recv(Method, Sock, ?READER_RECV_TIMEOUT).

do_recv(plain, Sock, Timeout) ->
    socket:recv(Sock, 0, Timeout);
do_recv(msg, Sock, Timeout) ->
    case socket:recvmsg(Sock, 0, 0, [], Timeout) of
        {ok, #{iov := [Bin]}} ->
            {ok, Bin};
        {select, _} = SELECT ->
            SELECT;
        {error, _} = ERROR ->
            ERROR
    end.


reader_exit(#{async := false, active := Active}, stop) ->
    vp("reader stopped when active: ~w", [Active]),
    exit(normal);
reader_exit(#{async       := true, 
              active      := Active,
              select_info := SelectInfo,
              select_num  := N}, stop) ->
    vp("reader stopped when active: ~w"
       "~n   Current select info:       ~p"
       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
    exit(normal);
reader_exit(#{async := false, active := Active}, {ctrl_exit, normal}) ->
    vp("reader ctrl exit when active: ~w", [Active]),
    exit(normal);
reader_exit(#{async       := true, 
              active      := Active,
              select_info := SelectInfo,
              select_num  := N}, {ctrl_exit, normal}) ->
    vp("reader ctrl exit when active: ~w"
       "~n   Current select info:       ~p"
       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
    exit(normal);
reader_exit(#{async := false, active := Active}, {ctrl_exit, Reason}) ->
    vp("reader exit when ctrl crash when active: ~w", [Active]),
    exit({controlling_process, Reason});
reader_exit(#{async       := true, 
              active      := Active,
              select_info := SelectInfo,
              select_num  := N}, {ctrl_exit, Reason}) ->
    vp("reader exit when ctrl crash when active: ~w"
       "~n   Current select info:       ~p"
       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
    exit({controlling_process, Reason});
reader_exit(#{async := false, active := Active}, {error, closed}) ->
    vp("reader exit when socket closed when active: ~w", [Active]),
    exit(normal);
reader_exit(#{async       := true, 
              active      := Active,
              select_info := SelectInfo,
              select_num  := N}, {error, closed}) ->
    vp("reader exit when socket closed when active: ~w "
       "~n   Current select info:       ~p"
       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
    exit(normal);
reader_exit(#{async := false, active := Active}, {error, Reason}) ->
    vp("reader exit when socket error when active: ~w", [Active]),
    exit(Reason);
reader_exit(#{async       := true, 
              active      := Active,
              select_info := SelectInfo,
              select_num  := N}, {error, Reason}) ->
    vp("reader exit when socket error when active: ~w: "
       "~n   Current select info:       ~p"
       "~n   Number of select messages: ~p", [Active, SelectInfo, N]),
    exit(Reason).






%% ==========================================================================

vp(F, A) ->
    vp(get(verbose), F, A).

vp(true, F, A) ->
    p(F, A);
vp(_, _, _) ->
    ok.

p(F, A) ->
    io:format(F ++ "~n", A).