diff options
Diffstat (limited to 'src/ranch_conns_sup.erl')
-rw-r--r-- | src/ranch_conns_sup.erl | 126 |
1 files changed, 118 insertions, 8 deletions
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index 5242446..d5f5200 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -43,6 +43,7 @@ handshake_timeout :: timeout(), max_conns = undefined :: ranch:max_conns(), stats_counters_ref :: counters:counters_ref(), + alarms = #{} :: #{term() => {undefined | reference(), map()}}, logger = undefined :: module() }). @@ -106,6 +107,7 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) -> process_flag(trap_exit, true), ok = ranch_server:set_connections_sup(Ref, Id, self()), MaxConns = ranch_server:get_max_connections(Ref), + Alarms = get_alarms(TransOpts), ConnType = maps:get(connection_type, TransOpts, worker), Shutdown = maps:get(shutdown, TransOpts, 5000), HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000), @@ -116,11 +118,12 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) -> shutdown=Shutdown, transport=Transport, protocol=Protocol, opts=ProtoOpts, stats_counters_ref=StatsCounters, handshake_timeout=HandshakeTimeout, - max_conns=MaxConns, logger=Logger}, 0, 0, []). + max_conns=MaxConns, alarms=Alarms, + logger=Logger}, 0, 0, []). loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType, transport=Transport, protocol=Protocol, opts=Opts, stats_counters_ref=StatsCounters, - max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) -> + alarms=Alarms, max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) -> receive {?MODULE, start_protocol, To, Socket} -> try Protocol:start_link(Ref, Transport, Opts) of @@ -181,6 +184,12 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType, {set_protocol_options, Opts2} -> loop(State#state{opts=Opts2}, CurConns, NbChildren, Sleepers); + {timeout, _, {activate_alarm, AlarmName}} when is_map_key(AlarmName, Alarms) -> + {AlarmOpts, _} = maps:get(AlarmName, Alarms), + NewAlarm = trigger_alarm(Ref, AlarmName, {AlarmOpts, undefined}, CurConns), + loop(State#state{alarms=Alarms#{AlarmName => NewAlarm}}, CurConns, NbChildren, Sleepers); + {timeout, _, {activate_alarm, _}} -> + loop(State, CurConns, NbChildren, Sleepers); {'EXIT', Parent, Reason} -> terminate(State, Reason, NbChildren); {'EXIT', Pid, Reason} when Sleepers =:= [] -> @@ -245,18 +254,20 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType, end. handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout, - max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> + max_conns=MaxConns, alarms=Alarms0}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> case Transport:controlling_process(Socket, ProtocolPid) of ok -> ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout}, put(SupPid, active), CurConns2 = CurConns + 1, - if CurConns2 < MaxConns -> + Sleepers2 = if CurConns2 < MaxConns -> To ! self(), - loop(State, CurConns2, NbChildren + 1, Sleepers); + Sleepers; true -> - loop(State, CurConns2, NbChildren + 1, [To|Sleepers]) - end; + [To|Sleepers] + end, + Alarms1 = trigger_alarms(Ref, Alarms0, CurConns2), + loop(State#state{alarms=Alarms1}, CurConns2, NbChildren + 1, Sleepers2); {error, _} -> Transport:close(Socket), %% Only kill the supervised pid, because the connection's pid, @@ -266,6 +277,45 @@ handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=Handshake loop(State, CurConns, NbChildren, Sleepers) end. +trigger_alarms(Ref, Alarms, CurConns) -> + maps:map( + fun + (AlarmName, Alarm) -> + trigger_alarm(Ref, AlarmName, Alarm, CurConns) + end, + Alarms + ). + +trigger_alarm(Ref, AlarmName, {Opts=#{treshold := Treshold, callback := Callback}, undefined}, CurConns) when CurConns >= Treshold -> + ActiveConns = [Pid || {Pid, active} <- get()], + case Callback of + {Mod, Fun} -> + spawn(Mod, Fun, [Ref, AlarmName, self(), ActiveConns]); + _ -> + Self = self(), + spawn(fun () -> Callback(Ref, AlarmName, Self, ActiveConns) end) + end, + {Opts, schedule_activate_alarm(AlarmName, Opts)}; +trigger_alarm(_, _, Alarm, _) -> + Alarm. + +schedule_activate_alarm(AlarmName, #{cooldown := Cooldown}) when Cooldown > 0 -> + erlang:start_timer(Cooldown, self(), {activate_alarm, AlarmName}); +schedule_activate_alarm(_, _) -> + undefined. + +get_alarms(#{alarms := Alarms}) when is_map(Alarms) -> + maps:fold( + fun + (Name, Opts = #{type := num_connections}, Acc) -> Acc#{Name => {Opts, undefined}}; + (_, _, Acc) -> Acc + end, + #{}, + Alarms + ); +get_alarms(_) -> + #{}. + set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) -> MaxConns1 = maps:get(max_connections, TransOpts, 1024), HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000), @@ -277,9 +327,69 @@ set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, S false -> Sleepers0 end, - loop(State#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown}, + State1=set_alarm_option(State, TransOpts, CurConns), + loop(State1#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown}, CurConns, NbChildren, Sleepers1). +set_alarm_option(State=#state{ref=Ref, alarms=OldAlarms}, TransOpts, CurConns) -> + NewAlarms0 = get_alarms(TransOpts), + NewAlarms1 = merge_alarms(OldAlarms, NewAlarms0), + NewAlarms2 = trigger_alarms(Ref, NewAlarms1, CurConns), + State#state{alarms=NewAlarms2}. + +merge_alarms(Old, New) -> + OldList = lists:sort(maps:to_list(Old)), + NewList = lists:sort(maps:to_list(New)), + Merged = merge_alarms(OldList, NewList, []), + maps:from_list(Merged). + +merge_alarms([], News, Acc) -> + News ++ Acc; +merge_alarms([{_, {_, undefined}}|Olds], [], Acc) -> + merge_alarms(Olds, [], Acc); +merge_alarms([{_, {_, Timer}}|Olds], [], Acc) -> + _ = cancel_alarm_reactivation_timer(Timer), + merge_alarms(Olds, [], Acc); +merge_alarms([{Name, {OldOpts, Timer}}|Olds], [{Name, {NewOpts, _}}|News], Acc) -> + merge_alarms(Olds, News, [{Name, {NewOpts, adapt_alarm_timer(Name, Timer, OldOpts, NewOpts)}}|Acc]); +merge_alarms([{OldName, {_, Timer}}|Olds], News=[{NewName, _}|_], Acc) when OldName < NewName -> + _ = cancel_alarm_reactivation_timer(Timer), + merge_alarms(Olds, News, Acc); +merge_alarms(Olds, [New|News], Acc) -> + merge_alarms(Olds, News, [New|Acc]). + +%% Not in cooldown. +adapt_alarm_timer(_, undefined, _, _) -> + undefined; +%% Cooldown unchanged. +adapt_alarm_timer(_, Timer, #{cooldown := Cooldown}, #{cooldown := Cooldown}) -> + Timer; +%% Cooldown changed to no cooldown, cancel cooldown timer. +adapt_alarm_timer(_, Timer, _, #{cooldown := 0}) -> + _ = cancel_alarm_reactivation_timer(Timer), + undefined; +%% Cooldown changed, cancel current and start new timer taking the already elapsed time into account. +adapt_alarm_timer(Name, Timer, #{cooldown := OldCooldown}, #{cooldown := NewCooldown}) -> + OldTimeLeft = cancel_alarm_reactivation_timer(Timer), + case NewCooldown-OldCooldown+OldTimeLeft of + NewTimeLeft when NewTimeLeft>0 -> + erlang:start_timer(NewTimeLeft, self(), {activate_alarm, Name}); + _ -> + undefined + end. + +cancel_alarm_reactivation_timer(Timer) -> + case erlang:cancel_timer(Timer) of + %% Timer had already expired when we tried to cancel it, so we flush the + %% reactivation message it sent and return 0 as remaining time. + false -> + ok = receive {timeout, Timer, {activate_alarm, _}} -> ok after 0 -> ok end, + 0; + %% Timer has not yet expired, we return the amount of time that was remaining. + TimeLeft -> + TimeLeft + end. + -spec terminate(#state{}, any(), non_neg_integer()) -> no_return(). terminate(#state{shutdown=brutal_kill, id=Id, stats_counters_ref=StatsCounters}, Reason, NbChildren) -> |