aboutsummaryrefslogtreecommitdiffstats
path: root/src/ranch_conns_sup.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ranch_conns_sup.erl')
-rw-r--r--src/ranch_conns_sup.erl126
1 files changed, 118 insertions, 8 deletions
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index 5242446..d5f5200 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -43,6 +43,7 @@
handshake_timeout :: timeout(),
max_conns = undefined :: ranch:max_conns(),
stats_counters_ref :: counters:counters_ref(),
+ alarms = #{} :: #{term() => {undefined | reference(), map()}},
logger = undefined :: module()
}).
@@ -106,6 +107,7 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) ->
process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, Id, self()),
MaxConns = ranch_server:get_max_connections(Ref),
+ Alarms = get_alarms(TransOpts),
ConnType = maps:get(connection_type, TransOpts, worker),
Shutdown = maps:get(shutdown, TransOpts, 5000),
HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
@@ -116,11 +118,12 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) ->
shutdown=Shutdown, transport=Transport, protocol=Protocol,
opts=ProtoOpts, stats_counters_ref=StatsCounters,
handshake_timeout=HandshakeTimeout,
- max_conns=MaxConns, logger=Logger}, 0, 0, []).
+ max_conns=MaxConns, alarms=Alarms,
+ logger=Logger}, 0, 0, []).
loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
transport=Transport, protocol=Protocol, opts=Opts, stats_counters_ref=StatsCounters,
- max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
+ alarms=Alarms, max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
receive
{?MODULE, start_protocol, To, Socket} ->
try Protocol:start_link(Ref, Transport, Opts) of
@@ -181,6 +184,12 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
{set_protocol_options, Opts2} ->
loop(State#state{opts=Opts2},
CurConns, NbChildren, Sleepers);
+ {timeout, _, {activate_alarm, AlarmName}} when is_map_key(AlarmName, Alarms) ->
+ {AlarmOpts, _} = maps:get(AlarmName, Alarms),
+ NewAlarm = trigger_alarm(Ref, AlarmName, {AlarmOpts, undefined}, CurConns),
+ loop(State#state{alarms=Alarms#{AlarmName => NewAlarm}}, CurConns, NbChildren, Sleepers);
+ {timeout, _, {activate_alarm, _}} ->
+ loop(State, CurConns, NbChildren, Sleepers);
{'EXIT', Parent, Reason} ->
terminate(State, Reason, NbChildren);
{'EXIT', Pid, Reason} when Sleepers =:= [] ->
@@ -245,18 +254,20 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
end.
handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
- max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
+ max_conns=MaxConns, alarms=Alarms0}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
case Transport:controlling_process(Socket, ProtocolPid) of
ok ->
ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
put(SupPid, active),
CurConns2 = CurConns + 1,
- if CurConns2 < MaxConns ->
+ Sleepers2 = if CurConns2 < MaxConns ->
To ! self(),
- loop(State, CurConns2, NbChildren + 1, Sleepers);
+ Sleepers;
true ->
- loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
- end;
+ [To|Sleepers]
+ end,
+ Alarms1 = trigger_alarms(Ref, Alarms0, CurConns2),
+ loop(State#state{alarms=Alarms1}, CurConns2, NbChildren + 1, Sleepers2);
{error, _} ->
Transport:close(Socket),
%% Only kill the supervised pid, because the connection's pid,
@@ -266,6 +277,45 @@ handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=Handshake
loop(State, CurConns, NbChildren, Sleepers)
end.
+trigger_alarms(Ref, Alarms, CurConns) ->
+ maps:map(
+ fun
+ (AlarmName, Alarm) ->
+ trigger_alarm(Ref, AlarmName, Alarm, CurConns)
+ end,
+ Alarms
+ ).
+
+trigger_alarm(Ref, AlarmName, {Opts=#{treshold := Treshold, callback := Callback}, undefined}, CurConns) when CurConns >= Treshold ->
+ ActiveConns = [Pid || {Pid, active} <- get()],
+ case Callback of
+ {Mod, Fun} ->
+ spawn(Mod, Fun, [Ref, AlarmName, self(), ActiveConns]);
+ _ ->
+ Self = self(),
+ spawn(fun () -> Callback(Ref, AlarmName, Self, ActiveConns) end)
+ end,
+ {Opts, schedule_activate_alarm(AlarmName, Opts)};
+trigger_alarm(_, _, Alarm, _) ->
+ Alarm.
+
+schedule_activate_alarm(AlarmName, #{cooldown := Cooldown}) when Cooldown > 0 ->
+ erlang:start_timer(Cooldown, self(), {activate_alarm, AlarmName});
+schedule_activate_alarm(_, _) ->
+ undefined.
+
+get_alarms(#{alarms := Alarms}) when is_map(Alarms) ->
+ maps:fold(
+ fun
+ (Name, Opts = #{type := num_connections}, Acc) -> Acc#{Name => {Opts, undefined}};
+ (_, _, Acc) -> Acc
+ end,
+ #{},
+ Alarms
+ );
+get_alarms(_) ->
+ #{}.
+
set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) ->
MaxConns1 = maps:get(max_connections, TransOpts, 1024),
HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
@@ -277,9 +327,69 @@ set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, S
false ->
Sleepers0
end,
- loop(State#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown},
+ State1=set_alarm_option(State, TransOpts, CurConns),
+ loop(State1#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown},
CurConns, NbChildren, Sleepers1).
+set_alarm_option(State=#state{ref=Ref, alarms=OldAlarms}, TransOpts, CurConns) ->
+ NewAlarms0 = get_alarms(TransOpts),
+ NewAlarms1 = merge_alarms(OldAlarms, NewAlarms0),
+ NewAlarms2 = trigger_alarms(Ref, NewAlarms1, CurConns),
+ State#state{alarms=NewAlarms2}.
+
+merge_alarms(Old, New) ->
+ OldList = lists:sort(maps:to_list(Old)),
+ NewList = lists:sort(maps:to_list(New)),
+ Merged = merge_alarms(OldList, NewList, []),
+ maps:from_list(Merged).
+
+merge_alarms([], News, Acc) ->
+ News ++ Acc;
+merge_alarms([{_, {_, undefined}}|Olds], [], Acc) ->
+ merge_alarms(Olds, [], Acc);
+merge_alarms([{_, {_, Timer}}|Olds], [], Acc) ->
+ _ = cancel_alarm_reactivation_timer(Timer),
+ merge_alarms(Olds, [], Acc);
+merge_alarms([{Name, {OldOpts, Timer}}|Olds], [{Name, {NewOpts, _}}|News], Acc) ->
+ merge_alarms(Olds, News, [{Name, {NewOpts, adapt_alarm_timer(Name, Timer, OldOpts, NewOpts)}}|Acc]);
+merge_alarms([{OldName, {_, Timer}}|Olds], News=[{NewName, _}|_], Acc) when OldName < NewName ->
+ _ = cancel_alarm_reactivation_timer(Timer),
+ merge_alarms(Olds, News, Acc);
+merge_alarms(Olds, [New|News], Acc) ->
+ merge_alarms(Olds, News, [New|Acc]).
+
+%% Not in cooldown.
+adapt_alarm_timer(_, undefined, _, _) ->
+ undefined;
+%% Cooldown unchanged.
+adapt_alarm_timer(_, Timer, #{cooldown := Cooldown}, #{cooldown := Cooldown}) ->
+ Timer;
+%% Cooldown changed to no cooldown, cancel cooldown timer.
+adapt_alarm_timer(_, Timer, _, #{cooldown := 0}) ->
+ _ = cancel_alarm_reactivation_timer(Timer),
+ undefined;
+%% Cooldown changed, cancel current and start new timer taking the already elapsed time into account.
+adapt_alarm_timer(Name, Timer, #{cooldown := OldCooldown}, #{cooldown := NewCooldown}) ->
+ OldTimeLeft = cancel_alarm_reactivation_timer(Timer),
+ case NewCooldown-OldCooldown+OldTimeLeft of
+ NewTimeLeft when NewTimeLeft>0 ->
+ erlang:start_timer(NewTimeLeft, self(), {activate_alarm, Name});
+ _ ->
+ undefined
+ end.
+
+cancel_alarm_reactivation_timer(Timer) ->
+ case erlang:cancel_timer(Timer) of
+ %% Timer had already expired when we tried to cancel it, so we flush the
+ %% reactivation message it sent and return 0 as remaining time.
+ false ->
+ ok = receive {timeout, Timer, {activate_alarm, _}} -> ok after 0 -> ok end,
+ 0;
+ %% Timer has not yet expired, we return the amount of time that was remaining.
+ TimeLeft ->
+ TimeLeft
+ end.
+
-spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
terminate(#state{shutdown=brutal_kill, id=Id,
stats_counters_ref=StatsCounters}, Reason, NbChildren) ->