From ec98f78f154bcf71afbf198386dc0ffb765839f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 17 Oct 2019 15:05:57 +0200 Subject: Use a map instead of pdict to keep pid active/removed state The performance seems to be equal as far as Cowboy is concerned, with a possible tiny improvement in the favor of maps. The code is smaller and the runtime introspection far cleaner because the process dictionary is small again. --- src/ranch_conns_sup.erl | 147 +++++++++++++++++++++++------------------------- test/acceptor_SUITE.erl | 2 +- 2 files changed, 70 insertions(+), 79 deletions(-) diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index b87a7bb..02a3b26 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -112,18 +112,18 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) -> loop(#state{parent=Parent, ref=Ref, conn_type=ConnType, shutdown=Shutdown, transport=Transport, protocol=Protocol, opts=ProtoOpts, handshake_timeout=HandshakeTimeout, - max_conns=MaxConns, logger=Logger}, 0, 0, []). + max_conns=MaxConns, logger=Logger}, #{}, 0, 0, []). loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, transport=Transport, protocol=Protocol, opts=Opts, - max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) -> + max_conns=MaxConns, logger=Logger}, Conns, CurConns, NbChildren, Sleepers) -> receive {?MODULE, start_protocol, To, Socket} -> try Protocol:start_link(Ref, Transport, Opts) of {ok, Pid} -> - handshake(State, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid); + handshake(State, Conns, CurConns, NbChildren, Sleepers, To, Socket, Pid, Pid); {ok, SupPid, ProtocolPid} when ConnType =:= supervisor -> - handshake(State, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid); + handshake(State, Conns, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid); Ret -> To ! self(), ranch:log(error, @@ -131,7 +131,7 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, "~p:start_link/3 returned: ~999999p~n", [Ref, Protocol, Ret], Logger), Transport:close(Socket), - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns, CurConns, NbChildren, Sleepers) catch Class:Reason -> To ! self(), ranch:log(error, @@ -139,82 +139,78 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, "~p:start_link/3 crashed with reason: ~p:~999999p~n", [Ref, Protocol, Class, Reason], Logger), Transport:close(Socket), - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns, CurConns, NbChildren, Sleepers) end; {?MODULE, active_connections, To, Tag} -> To ! {Tag, CurConns}, - loop(State, CurConns, NbChildren, Sleepers); + loop(State, Conns, CurConns, NbChildren, Sleepers); %% Remove a connection from the count of connections. {remove_connection, Ref, Pid} -> - case put(Pid, removed) of - active when Sleepers =:= [] -> - loop(State, CurConns - 1, NbChildren, Sleepers); - active -> + case Conns of + #{Pid := active} when Sleepers =:= [] -> + loop(State, Conns#{Pid => removed}, CurConns - 1, NbChildren, Sleepers); + #{Pid := active} -> [To|Sleepers2] = Sleepers, To ! self(), - loop(State, CurConns - 1, NbChildren, Sleepers2); - removed -> - loop(State, CurConns, NbChildren, Sleepers); - undefined -> - _ = erase(Pid), - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns#{Pid => removed}, CurConns - 1, NbChildren, Sleepers2); + #{Pid := removed} -> + loop(State, Conns, CurConns, NbChildren, Sleepers); + _ -> + loop(State, Conns, CurConns, NbChildren, Sleepers) end; %% Upgrade the max number of connections allowed concurrently. %% We resume all sleeping acceptors if this number increases. {set_max_conns, MaxConns2} when MaxConns2 > MaxConns -> _ = [To ! self() || To <- Sleepers], - loop(State#state{max_conns=MaxConns2}, - CurConns, NbChildren, []); + loop(State#state{max_conns=MaxConns2}, Conns, CurConns, NbChildren, []); {set_max_conns, MaxConns2} -> - loop(State#state{max_conns=MaxConns2}, - CurConns, NbChildren, Sleepers); + loop(State#state{max_conns=MaxConns2}, Conns, CurConns, NbChildren, Sleepers); %% Upgrade the transport options. {set_transport_options, TransOpts} -> - set_transport_options(State, CurConns, NbChildren, Sleepers, TransOpts); + set_transport_options(State, Conns, CurConns, NbChildren, Sleepers, TransOpts); %% Upgrade the protocol options. {set_protocol_options, Opts2} -> - loop(State#state{opts=Opts2}, - CurConns, NbChildren, Sleepers); + loop(State#state{opts=Opts2}, Conns, CurConns, NbChildren, Sleepers); {'EXIT', Parent, Reason} -> - terminate(State, Reason, NbChildren); + terminate(State, Conns, Reason, NbChildren); {'EXIT', Pid, Reason} when Sleepers =:= [] -> - case erase(Pid) of - active -> + case maps:take(Pid, Conns) of + {active, Conns1} -> report_error(Logger, Ref, Protocol, Pid, Reason), - loop(State, CurConns - 1, NbChildren - 1, Sleepers); - removed -> + loop(State, Conns1, CurConns - 1, NbChildren - 1, Sleepers); + {removed, Conns1} -> report_error(Logger, Ref, Protocol, Pid, Reason), - loop(State, CurConns, NbChildren - 1, Sleepers); - undefined -> - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns1, CurConns, NbChildren - 1, Sleepers); + error -> + loop(State, Conns, CurConns, NbChildren, Sleepers) end; %% Resume a sleeping acceptor if needed. {'EXIT', Pid, Reason} -> - case erase(Pid) of - active when CurConns > MaxConns -> + case maps:take(Pid, Conns) of + {active, Conns1} when CurConns > MaxConns -> report_error(Logger, Ref, Protocol, Pid, Reason), - loop(State, CurConns - 1, NbChildren - 1, Sleepers); - active -> + loop(State, Conns1, CurConns - 1, NbChildren - 1, Sleepers); + {active, Conns1} -> report_error(Logger, Ref, Protocol, Pid, Reason), [To|Sleepers2] = Sleepers, To ! self(), - loop(State, CurConns - 1, NbChildren - 1, Sleepers2); - removed -> + loop(State, Conns1, CurConns - 1, NbChildren - 1, Sleepers2); + {removed, Conns1} -> report_error(Logger, Ref, Protocol, Pid, Reason), - loop(State, CurConns, NbChildren - 1, Sleepers); - undefined -> - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns1, CurConns, NbChildren - 1, Sleepers); + error -> + loop(State, Conns, CurConns, NbChildren, Sleepers) end; {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], - {State, CurConns, NbChildren, Sleepers}); + {State, Conns, CurConns, NbChildren, Sleepers}); %% Calls from the supervisor module. {'$gen_call', {To, Tag}, which_children} -> - Children = [{Protocol, Pid, ConnType, [Protocol]} - || {Pid, Type} <- get(), - Type =:= active orelse Type =:= removed], + Children = maps:fold(fun(Pid, _, Acc) -> + [{Protocol, Pid, ConnType, [Protocol]}|Acc] + end, [], Conns), To ! {Tag, Children}, - loop(State, CurConns, NbChildren, Sleepers); + loop(State, Conns, CurConns, NbChildren, Sleepers); {'$gen_call', {To, Tag}, count_children} -> Counts = case ConnType of worker -> [{supervisors, 0}, {workers, NbChildren}]; @@ -222,29 +218,29 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, end, Counts2 = [{specs, 1}, {active, NbChildren}|Counts], To ! {Tag, Counts2}, - loop(State, CurConns, NbChildren, Sleepers); + loop(State, Conns, CurConns, NbChildren, Sleepers); {'$gen_call', {To, Tag}, _} -> To ! {Tag, {error, ?MODULE}}, - loop(State, CurConns, NbChildren, Sleepers); + loop(State, Conns, CurConns, NbChildren, Sleepers); Msg -> ranch:log(error, "Ranch listener ~p received unexpected message ~p~n", [Ref, Msg], Logger), - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns, CurConns, NbChildren, Sleepers) end. handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout, - max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> + max_conns=MaxConns}, Conns0, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> case Transport:controlling_process(Socket, ProtocolPid) of ok -> ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout}, - put(SupPid, active), + Conns = Conns0#{SupPid => active}, CurConns2 = CurConns + 1, if CurConns2 < MaxConns -> To ! self(), - loop(State, CurConns2, NbChildren + 1, Sleepers); + loop(State, Conns, CurConns2, NbChildren + 1, Sleepers); true -> - loop(State, CurConns2, NbChildren + 1, [To|Sleepers]) + loop(State, Conns, CurConns2, NbChildren + 1, [To|Sleepers]) end; {error, _} -> Transport:close(Socket), @@ -252,10 +248,10 @@ handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=Handshake %% when different, is supposed to be sitting under it and linked. exit(SupPid, kill), To ! self(), - loop(State, CurConns, NbChildren, Sleepers) + loop(State, Conns0, CurConns, NbChildren, Sleepers) end. -set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) -> +set_transport_options(State=#state{max_conns=MaxConns0}, Conns, CurConns, NbChildren, Sleepers0, TransOpts) -> MaxConns1 = maps:get(max_connections, TransOpts, 1024), HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000), Shutdown = maps:get(shutdown, TransOpts, 5000), @@ -267,24 +263,22 @@ set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, S Sleepers0 end, loop(State#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown}, - CurConns, NbChildren, Sleepers1). + Conns, CurConns, NbChildren, Sleepers1). --spec terminate(#state{}, any(), non_neg_integer()) -> no_return(). -terminate(#state{shutdown=brutal_kill}, Reason, _) -> - kill_children(get_keys(active)), - kill_children(get_keys(removed)), +-spec terminate(#state{}, #{pid() => active | removed}, any(), non_neg_integer()) -> no_return(). +terminate(#state{shutdown=brutal_kill}, Conns, Reason, _) -> + kill_children(maps:keys(Conns)), exit(Reason); %% Attempt to gracefully shutdown all children. -terminate(#state{shutdown=Shutdown}, Reason, NbChildren) -> - shutdown_children(get_keys(active)), - shutdown_children(get_keys(removed)), +terminate(#state{shutdown=Shutdown}, Conns, Reason, NbChildren) -> + shutdown_children(maps:keys(Conns)), _ = if Shutdown =:= infinity -> ok; true -> erlang:send_after(Shutdown, self(), kill) end, - wait_children(NbChildren), + wait_children(NbChildren, Conns), exit(Reason). %% Kill all children and then exit. We unlink first to avoid @@ -307,31 +301,28 @@ shutdown_children(Pids) -> end || P <- Pids], ok. -wait_children(0) -> +wait_children(0, _) -> ok; -wait_children(NbChildren) -> +wait_children(NbChildren, Conns0) -> receive {'DOWN', _, process, Pid, _} -> - case erase(Pid) of - active -> wait_children(NbChildren - 1); - removed -> wait_children(NbChildren - 1); - _ -> wait_children(NbChildren) + case maps:take(Pid, Conns0) of + {active, Conns} -> wait_children(NbChildren - 1, Conns); + {removed, Conns} -> wait_children(NbChildren - 1, Conns); + error -> wait_children(NbChildren, Conns0) end; kill -> - Active = get_keys(active), - _ = [exit(P, kill) || P <- Active], - Removed = get_keys(removed), - _ = [exit(P, kill) || P <- Removed], + _ = [exit(P, kill) || P <- maps:keys(Conns0)], ok end. -spec system_continue(_, _, any()) -> no_return(). -system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) -> - loop(State, CurConns, NbChildren, Sleepers). +system_continue(_, _, {State, Conns, CurConns, NbChildren, Sleepers}) -> + loop(State, Conns, CurConns, NbChildren, Sleepers). -spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, {State, _, NbChildren, _}) -> - terminate(State, Reason, NbChildren). +system_terminate(Reason, _, _, {State, Conns, _, NbChildren, _}) -> + terminate(State, Conns, Reason, NbChildren). -spec system_code_change(any(), _, _, _) -> {ok, any()}. system_code_change(Misc, _, _, _) -> diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl index 7fc26f4..217ba56 100644 --- a/test/acceptor_SUITE.erl +++ b/test/acceptor_SUITE.erl @@ -319,7 +319,7 @@ misc_set_transport_options(_) -> num_acceptors => 2, shutdown => 1001, socket_opts => [{send_timeout, 5002}]}), ConnsSups = [ConnsSup || {_, ConnsSup} <- ranch_server:get_connections_sups(Name)], _ = [begin - {State, _, _, _} = sys:get_state(ConnsSup), + {State, _, _, _, _} = sys:get_state(ConnsSup), 20 = element(10, State), 5001 = element(9, State), 1001 = element(5, State) -- cgit v1.2.3