aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjuhlig <[email protected]>2020-09-10 16:18:06 +0200
committerLoïc Hoguin <[email protected]>2021-09-07 12:27:34 +0200
commit50f6191bf8646147b303f8a9e1a5a9efa1d6dc5a (patch)
treee56ce37ea3a6db5efd98a0bc9dc960330712d038
parentaf1508c4a729098bc1545358e09878d4946c789d (diff)
downloadranch-50f6191bf8646147b303f8a9e1a5a9efa1d6dc5a.tar.gz
ranch-50f6191bf8646147b303f8a9e1a5a9efa1d6dc5a.tar.bz2
ranch-50f6191bf8646147b303f8a9e1a5a9efa1d6dc5a.zip
Enable connection count alarms
-rw-r--r--doc/src/guide/listeners.asciidoc61
-rw-r--r--src/ranch.erl28
-rw-r--r--src/ranch_conns_sup.erl126
-rw-r--r--test/acceptor_SUITE.erl76
4 files changed, 282 insertions, 9 deletions
diff --git a/doc/src/guide/listeners.asciidoc b/doc/src/guide/listeners.asciidoc
index 3835c64..4437143 100644
--- a/doc/src/guide/listeners.asciidoc
+++ b/doc/src/guide/listeners.asciidoc
@@ -328,6 +328,67 @@ processes.
echo_protocol, []
).
+=== Setting connection count alarms
+
+The `alarms` transport options allows you to configure alarms
+which will be triggered when the number of connections under a connection
+supervisor reaches or exceeds the defined treshold.
+
+The `alarms` transport option takes a map with alarm names as keys and alarm
+options as values.
+
+Any term is allowed as an alarm name.
+
+Alarm options, defining the alarm behavior, are again a map with the following
+keys, all of which are mandatory:
+
+`type`::
+The alarm type. Currently, `num_connections` is the only allowed type.
+
+`treshold`::
+The alarm treshold. When the number of connections under a connection
+supervisor reaches or exceeds this value, the alarm will trigger and
+call the function given in the `callback` key.
+
+`callback`::
+The alarm function, that is, the function which will be called when the
+alarm is triggered. Its arguments are the listener name, the alarm
+name, the Pid of the triggering connection supervisor, and the Pids of
+all the connection processes under that supervisor.
+
+`cooldown`::
+The minimum time to elapse before the alarm can trigger again, in
+milliseconds.
+
+.Setting an alarm to log warnings when the number of connections exceed 100
+
+[source,erlang]
+----
+Alarms = #{
+ my_alarm => #{
+ type => num_connections,
+ treshold => 100,
+ callback => fun(Ref, Name, ConnSup, ConnPids]) ->
+ logger:warning("Warning (~s): "
+ "Supervisor ~s of listener ~s "
+ "has ~b connections",
+ [Name, Ref, ConnSup, length(ConnPids)])
+ end,
+ cooldown => 5000
+ }
+},
+{ok, _} = ranch:start_listener(tcp_echo,
+ ranch_tcp, #{alarms => Alarms, socket_opts => [{port, 5555}]},
+ echo_protocol, []
+).
+----
+
+In the example code, an alarm named `my_alarm` is defined, which will
+call the given function when the number of connections under a
+connection supervisor reaches or exceeds 100. When the number of
+connections is still (or again) above 100 after 5 seconds, the
+alarm will trigger again.
+
=== When running out of file descriptors
Operating systems have limits on the number of sockets
diff --git a/src/ranch.erl b/src/ranch.erl
index f36c145..576f122 100644
--- a/src/ranch.erl
+++ b/src/ranch.erl
@@ -55,7 +55,17 @@
-type opts() :: any() | transport_opts(any()).
-export_type([opts/0]).
+-type alarm(Type, Callback) :: #{
+ type := Type,
+ callback := Callback,
+ treshold := non_neg_integer(),
+ cooldown := non_neg_integer()
+}.
+
+-type alarm_num_connections() :: alarm(num_connections, fun((ref(), term(), pid(), [pid()]) -> any())).
+
-type transport_opts(SocketOpts) :: #{
+ alarms => #{term() => alarm_num_connections()},
connection_type => worker | supervisor,
handshake_timeout => timeout(),
logger => module(),
@@ -123,6 +133,16 @@ validate_transport_opt(max_connections, infinity, _) ->
true;
validate_transport_opt(max_connections, Value, _) ->
is_integer(Value) andalso Value >= 0;
+validate_transport_opt(alarms, Alarms, _) ->
+ maps:fold(
+ fun
+ (_, Opts, true) ->
+ validate_alarm(Opts);
+ (_, _, false) ->
+ false
+ end,
+ true,
+ Alarms);
validate_transport_opt(logger, Value, _) ->
is_atom(Value);
validate_transport_opt(num_acceptors, Value, _) ->
@@ -145,6 +165,14 @@ validate_transport_opt(socket_opts, _, _) ->
validate_transport_opt(_, _, _) ->
false.
+validate_alarm(#{type := num_connections, treshold := Treshold,
+ callback := Callback, cooldown := Cooldown}) ->
+ is_integer(Treshold) andalso Treshold >= 0
+ andalso is_integer(Cooldown) andalso Cooldown >= 0
+ andalso is_function(Callback, 4);
+validate_alarm(_) ->
+ false.
+
maybe_started({error, {{shutdown,
{failed_to_start_child, ranch_acceptors_sup,
{listen_error, _, Reason}}}, _}} = Error) ->
diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl
index 5242446..d5f5200 100644
--- a/src/ranch_conns_sup.erl
+++ b/src/ranch_conns_sup.erl
@@ -43,6 +43,7 @@
handshake_timeout :: timeout(),
max_conns = undefined :: ranch:max_conns(),
stats_counters_ref :: counters:counters_ref(),
+ alarms = #{} :: #{term() => {undefined | reference(), map()}},
logger = undefined :: module()
}).
@@ -106,6 +107,7 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) ->
process_flag(trap_exit, true),
ok = ranch_server:set_connections_sup(Ref, Id, self()),
MaxConns = ranch_server:get_max_connections(Ref),
+ Alarms = get_alarms(TransOpts),
ConnType = maps:get(connection_type, TransOpts, worker),
Shutdown = maps:get(shutdown, TransOpts, 5000),
HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
@@ -116,11 +118,12 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) ->
shutdown=Shutdown, transport=Transport, protocol=Protocol,
opts=ProtoOpts, stats_counters_ref=StatsCounters,
handshake_timeout=HandshakeTimeout,
- max_conns=MaxConns, logger=Logger}, 0, 0, []).
+ max_conns=MaxConns, alarms=Alarms,
+ logger=Logger}, 0, 0, []).
loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
transport=Transport, protocol=Protocol, opts=Opts, stats_counters_ref=StatsCounters,
- max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
+ alarms=Alarms, max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) ->
receive
{?MODULE, start_protocol, To, Socket} ->
try Protocol:start_link(Ref, Transport, Opts) of
@@ -181,6 +184,12 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
{set_protocol_options, Opts2} ->
loop(State#state{opts=Opts2},
CurConns, NbChildren, Sleepers);
+ {timeout, _, {activate_alarm, AlarmName}} when is_map_key(AlarmName, Alarms) ->
+ {AlarmOpts, _} = maps:get(AlarmName, Alarms),
+ NewAlarm = trigger_alarm(Ref, AlarmName, {AlarmOpts, undefined}, CurConns),
+ loop(State#state{alarms=Alarms#{AlarmName => NewAlarm}}, CurConns, NbChildren, Sleepers);
+ {timeout, _, {activate_alarm, _}} ->
+ loop(State, CurConns, NbChildren, Sleepers);
{'EXIT', Parent, Reason} ->
terminate(State, Reason, NbChildren);
{'EXIT', Pid, Reason} when Sleepers =:= [] ->
@@ -245,18 +254,20 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType,
end.
handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout,
- max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
+ max_conns=MaxConns, alarms=Alarms0}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) ->
case Transport:controlling_process(Socket, ProtocolPid) of
ok ->
ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout},
put(SupPid, active),
CurConns2 = CurConns + 1,
- if CurConns2 < MaxConns ->
+ Sleepers2 = if CurConns2 < MaxConns ->
To ! self(),
- loop(State, CurConns2, NbChildren + 1, Sleepers);
+ Sleepers;
true ->
- loop(State, CurConns2, NbChildren + 1, [To|Sleepers])
- end;
+ [To|Sleepers]
+ end,
+ Alarms1 = trigger_alarms(Ref, Alarms0, CurConns2),
+ loop(State#state{alarms=Alarms1}, CurConns2, NbChildren + 1, Sleepers2);
{error, _} ->
Transport:close(Socket),
%% Only kill the supervised pid, because the connection's pid,
@@ -266,6 +277,45 @@ handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=Handshake
loop(State, CurConns, NbChildren, Sleepers)
end.
+trigger_alarms(Ref, Alarms, CurConns) ->
+ maps:map(
+ fun
+ (AlarmName, Alarm) ->
+ trigger_alarm(Ref, AlarmName, Alarm, CurConns)
+ end,
+ Alarms
+ ).
+
+trigger_alarm(Ref, AlarmName, {Opts=#{treshold := Treshold, callback := Callback}, undefined}, CurConns) when CurConns >= Treshold ->
+ ActiveConns = [Pid || {Pid, active} <- get()],
+ case Callback of
+ {Mod, Fun} ->
+ spawn(Mod, Fun, [Ref, AlarmName, self(), ActiveConns]);
+ _ ->
+ Self = self(),
+ spawn(fun () -> Callback(Ref, AlarmName, Self, ActiveConns) end)
+ end,
+ {Opts, schedule_activate_alarm(AlarmName, Opts)};
+trigger_alarm(_, _, Alarm, _) ->
+ Alarm.
+
+schedule_activate_alarm(AlarmName, #{cooldown := Cooldown}) when Cooldown > 0 ->
+ erlang:start_timer(Cooldown, self(), {activate_alarm, AlarmName});
+schedule_activate_alarm(_, _) ->
+ undefined.
+
+get_alarms(#{alarms := Alarms}) when is_map(Alarms) ->
+ maps:fold(
+ fun
+ (Name, Opts = #{type := num_connections}, Acc) -> Acc#{Name => {Opts, undefined}};
+ (_, _, Acc) -> Acc
+ end,
+ #{},
+ Alarms
+ );
+get_alarms(_) ->
+ #{}.
+
set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) ->
MaxConns1 = maps:get(max_connections, TransOpts, 1024),
HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000),
@@ -277,9 +327,69 @@ set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, S
false ->
Sleepers0
end,
- loop(State#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown},
+ State1=set_alarm_option(State, TransOpts, CurConns),
+ loop(State1#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown},
CurConns, NbChildren, Sleepers1).
+set_alarm_option(State=#state{ref=Ref, alarms=OldAlarms}, TransOpts, CurConns) ->
+ NewAlarms0 = get_alarms(TransOpts),
+ NewAlarms1 = merge_alarms(OldAlarms, NewAlarms0),
+ NewAlarms2 = trigger_alarms(Ref, NewAlarms1, CurConns),
+ State#state{alarms=NewAlarms2}.
+
+merge_alarms(Old, New) ->
+ OldList = lists:sort(maps:to_list(Old)),
+ NewList = lists:sort(maps:to_list(New)),
+ Merged = merge_alarms(OldList, NewList, []),
+ maps:from_list(Merged).
+
+merge_alarms([], News, Acc) ->
+ News ++ Acc;
+merge_alarms([{_, {_, undefined}}|Olds], [], Acc) ->
+ merge_alarms(Olds, [], Acc);
+merge_alarms([{_, {_, Timer}}|Olds], [], Acc) ->
+ _ = cancel_alarm_reactivation_timer(Timer),
+ merge_alarms(Olds, [], Acc);
+merge_alarms([{Name, {OldOpts, Timer}}|Olds], [{Name, {NewOpts, _}}|News], Acc) ->
+ merge_alarms(Olds, News, [{Name, {NewOpts, adapt_alarm_timer(Name, Timer, OldOpts, NewOpts)}}|Acc]);
+merge_alarms([{OldName, {_, Timer}}|Olds], News=[{NewName, _}|_], Acc) when OldName < NewName ->
+ _ = cancel_alarm_reactivation_timer(Timer),
+ merge_alarms(Olds, News, Acc);
+merge_alarms(Olds, [New|News], Acc) ->
+ merge_alarms(Olds, News, [New|Acc]).
+
+%% Not in cooldown.
+adapt_alarm_timer(_, undefined, _, _) ->
+ undefined;
+%% Cooldown unchanged.
+adapt_alarm_timer(_, Timer, #{cooldown := Cooldown}, #{cooldown := Cooldown}) ->
+ Timer;
+%% Cooldown changed to no cooldown, cancel cooldown timer.
+adapt_alarm_timer(_, Timer, _, #{cooldown := 0}) ->
+ _ = cancel_alarm_reactivation_timer(Timer),
+ undefined;
+%% Cooldown changed, cancel current and start new timer taking the already elapsed time into account.
+adapt_alarm_timer(Name, Timer, #{cooldown := OldCooldown}, #{cooldown := NewCooldown}) ->
+ OldTimeLeft = cancel_alarm_reactivation_timer(Timer),
+ case NewCooldown-OldCooldown+OldTimeLeft of
+ NewTimeLeft when NewTimeLeft>0 ->
+ erlang:start_timer(NewTimeLeft, self(), {activate_alarm, Name});
+ _ ->
+ undefined
+ end.
+
+cancel_alarm_reactivation_timer(Timer) ->
+ case erlang:cancel_timer(Timer) of
+ %% Timer had already expired when we tried to cancel it, so we flush the
+ %% reactivation message it sent and return 0 as remaining time.
+ false ->
+ ok = receive {timeout, Timer, {activate_alarm, _}} -> ok after 0 -> ok end,
+ 0;
+ %% Timer has not yet expired, we return the amount of time that was remaining.
+ TimeLeft ->
+ TimeLeft
+ end.
+
-spec terminate(#state{}, any(), non_neg_integer()) -> no_return().
terminate(#state{shutdown=brutal_kill, id=Id,
stats_counters_ref=StatsCounters}, Reason, NbChildren) ->
diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl
index 49625c6..53a1223 100644
--- a/test/acceptor_SUITE.erl
+++ b/test/acceptor_SUITE.erl
@@ -109,7 +109,8 @@ groups() ->
misc_post_listen_callback_error,
misc_set_transport_options,
misc_wait_for_connections,
- misc_multiple_ip_local_socket_opts
+ misc_multiple_ip_local_socket_opts,
+ misc_connection_alarms
]}, {supervisor, [
connection_type_supervisor,
connection_type_supervisor_separate_from_connection,
@@ -576,6 +577,79 @@ do_misc_multiple_ip_local_socket_opts() ->
{error, enoent} = file:read_file_info(SockFile2),
ok.
+misc_connection_alarms(_) ->
+ doc("Ensure that connection alarms work."),
+ Name = name(),
+
+ Self = self(),
+ TransOpts0 = #{num_conns_sups => 1},
+ AlarmCallback = fun (Ref, AlarmName, _, ActiveConns) ->
+ Self ! {connection_alarm, {Ref, AlarmName, length(ActiveConns)}}
+ end,
+ Alarms0 = #{
+ test1 => Alarm1 = #{type => num_connections, treshold => 2, cooldown => 0, callback => AlarmCallback},
+ test2 => Alarm2 = #{type => num_connections, treshold => 3, cooldown => 0, callback => AlarmCallback}
+ },
+ ConnectOpts = [binary, {active, false}, {packet, raw}],
+
+ {ok, _} = ranch:start_listener(Name, ranch_tcp,
+ TransOpts0#{alarms => Alarms0}, notify_and_wait_protocol, #{pid => self()}),
+ Port = ranch:get_port(Name),
+
+ {ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts),
+ {1, [Conn1]} = receive_loop(connected, 100),
+ #{test1 := undefined, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+ {ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts),
+ {1, [Conn2]} = receive_loop(connected, 100),
+ #{test1 := 2, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+ {ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts),
+ {1, [Conn3]} = receive_loop(connected, 100),
+ #{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100),
+
+ Alarms1 = #{
+ test1 => Alarm1#{cooldown => 100},
+ test2 => Alarm2#{cooldown => 100}
+ },
+ ok = ranch:set_transport_options(Name, TransOpts0#{alarms => Alarms1}),
+ ok = do_flush_connection_alarms(Name),
+ #{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100),
+ ok = do_flush_connection_alarms(Name),
+ #{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100),
+
+ Conn3 ! stop,
+ timer:sleep(100),
+ ok = do_flush_connection_alarms(Name),
+ #{test1 := 2, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+ Conn2 ! stop,
+ timer:sleep(100),
+ ok = do_flush_connection_alarms(Name),
+ #{test1 := undefined, test2 := undefined} = do_recv_connection_alarms(Name, 100),
+
+ Conn1 ! stop,
+
+ ok = ranch:stop_listener(Name),
+ ok.
+
+do_recv_connection_alarms(Name, Timeout) ->
+ do_recv_connection_alarms(Name, Timeout, #{test1 => undefined, test2 => undefined}).
+
+do_recv_connection_alarms(Name, Timeout, Acc) ->
+ receive {connection_alarm, {Name, AlarmName, N}} ->
+ do_recv_connection_alarms(Name, Timeout, Acc#{AlarmName => N})
+ after Timeout ->
+ Acc
+ end.
+
+do_flush_connection_alarms(Name) ->
+ receive {connection_alarm, {Name, _, _}} ->
+ do_flush_connection_alarms(Name)
+ after 0 ->
+ ok
+ end.
+
%% ssl.
ssl_accept_error(_) ->