From 50f6191bf8646147b303f8a9e1a5a9efa1d6dc5a Mon Sep 17 00:00:00 2001 From: juhlig Date: Thu, 10 Sep 2020 16:18:06 +0200 Subject: Enable connection count alarms --- doc/src/guide/listeners.asciidoc | 61 +++++++++++++++++++ src/ranch.erl | 28 +++++++++ src/ranch_conns_sup.erl | 126 ++++++++++++++++++++++++++++++++++++--- test/acceptor_SUITE.erl | 76 ++++++++++++++++++++++- 4 files changed, 282 insertions(+), 9 deletions(-) diff --git a/doc/src/guide/listeners.asciidoc b/doc/src/guide/listeners.asciidoc index 3835c64..4437143 100644 --- a/doc/src/guide/listeners.asciidoc +++ b/doc/src/guide/listeners.asciidoc @@ -328,6 +328,67 @@ processes. echo_protocol, [] ). +=== Setting connection count alarms + +The `alarms` transport options allows you to configure alarms +which will be triggered when the number of connections under a connection +supervisor reaches or exceeds the defined treshold. + +The `alarms` transport option takes a map with alarm names as keys and alarm +options as values. + +Any term is allowed as an alarm name. + +Alarm options, defining the alarm behavior, are again a map with the following +keys, all of which are mandatory: + +`type`:: +The alarm type. Currently, `num_connections` is the only allowed type. + +`treshold`:: +The alarm treshold. When the number of connections under a connection +supervisor reaches or exceeds this value, the alarm will trigger and +call the function given in the `callback` key. + +`callback`:: +The alarm function, that is, the function which will be called when the +alarm is triggered. Its arguments are the listener name, the alarm +name, the Pid of the triggering connection supervisor, and the Pids of +all the connection processes under that supervisor. + +`cooldown`:: +The minimum time to elapse before the alarm can trigger again, in +milliseconds. + +.Setting an alarm to log warnings when the number of connections exceed 100 + +[source,erlang] +---- +Alarms = #{ + my_alarm => #{ + type => num_connections, + treshold => 100, + callback => fun(Ref, Name, ConnSup, ConnPids]) -> + logger:warning("Warning (~s): " + "Supervisor ~s of listener ~s " + "has ~b connections", + [Name, Ref, ConnSup, length(ConnPids)]) + end, + cooldown => 5000 + } +}, +{ok, _} = ranch:start_listener(tcp_echo, + ranch_tcp, #{alarms => Alarms, socket_opts => [{port, 5555}]}, + echo_protocol, [] +). +---- + +In the example code, an alarm named `my_alarm` is defined, which will +call the given function when the number of connections under a +connection supervisor reaches or exceeds 100. When the number of +connections is still (or again) above 100 after 5 seconds, the +alarm will trigger again. + === When running out of file descriptors Operating systems have limits on the number of sockets diff --git a/src/ranch.erl b/src/ranch.erl index f36c145..576f122 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -55,7 +55,17 @@ -type opts() :: any() | transport_opts(any()). -export_type([opts/0]). +-type alarm(Type, Callback) :: #{ + type := Type, + callback := Callback, + treshold := non_neg_integer(), + cooldown := non_neg_integer() +}. + +-type alarm_num_connections() :: alarm(num_connections, fun((ref(), term(), pid(), [pid()]) -> any())). + -type transport_opts(SocketOpts) :: #{ + alarms => #{term() => alarm_num_connections()}, connection_type => worker | supervisor, handshake_timeout => timeout(), logger => module(), @@ -123,6 +133,16 @@ validate_transport_opt(max_connections, infinity, _) -> true; validate_transport_opt(max_connections, Value, _) -> is_integer(Value) andalso Value >= 0; +validate_transport_opt(alarms, Alarms, _) -> + maps:fold( + fun + (_, Opts, true) -> + validate_alarm(Opts); + (_, _, false) -> + false + end, + true, + Alarms); validate_transport_opt(logger, Value, _) -> is_atom(Value); validate_transport_opt(num_acceptors, Value, _) -> @@ -145,6 +165,14 @@ validate_transport_opt(socket_opts, _, _) -> validate_transport_opt(_, _, _) -> false. +validate_alarm(#{type := num_connections, treshold := Treshold, + callback := Callback, cooldown := Cooldown}) -> + is_integer(Treshold) andalso Treshold >= 0 + andalso is_integer(Cooldown) andalso Cooldown >= 0 + andalso is_function(Callback, 4); +validate_alarm(_) -> + false. + maybe_started({error, {{shutdown, {failed_to_start_child, ranch_acceptors_sup, {listen_error, _, Reason}}}, _}} = Error) -> diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index 5242446..d5f5200 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -43,6 +43,7 @@ handshake_timeout :: timeout(), max_conns = undefined :: ranch:max_conns(), stats_counters_ref :: counters:counters_ref(), + alarms = #{} :: #{term() => {undefined | reference(), map()}}, logger = undefined :: module() }). @@ -106,6 +107,7 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) -> process_flag(trap_exit, true), ok = ranch_server:set_connections_sup(Ref, Id, self()), MaxConns = ranch_server:get_max_connections(Ref), + Alarms = get_alarms(TransOpts), ConnType = maps:get(connection_type, TransOpts, worker), Shutdown = maps:get(shutdown, TransOpts, 5000), HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000), @@ -116,11 +118,12 @@ init(Parent, Ref, Id, Transport, TransOpts, Protocol, Logger) -> shutdown=Shutdown, transport=Transport, protocol=Protocol, opts=ProtoOpts, stats_counters_ref=StatsCounters, handshake_timeout=HandshakeTimeout, - max_conns=MaxConns, logger=Logger}, 0, 0, []). + max_conns=MaxConns, alarms=Alarms, + logger=Logger}, 0, 0, []). loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType, transport=Transport, protocol=Protocol, opts=Opts, stats_counters_ref=StatsCounters, - max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) -> + alarms=Alarms, max_conns=MaxConns, logger=Logger}, CurConns, NbChildren, Sleepers) -> receive {?MODULE, start_protocol, To, Socket} -> try Protocol:start_link(Ref, Transport, Opts) of @@ -181,6 +184,12 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType, {set_protocol_options, Opts2} -> loop(State#state{opts=Opts2}, CurConns, NbChildren, Sleepers); + {timeout, _, {activate_alarm, AlarmName}} when is_map_key(AlarmName, Alarms) -> + {AlarmOpts, _} = maps:get(AlarmName, Alarms), + NewAlarm = trigger_alarm(Ref, AlarmName, {AlarmOpts, undefined}, CurConns), + loop(State#state{alarms=Alarms#{AlarmName => NewAlarm}}, CurConns, NbChildren, Sleepers); + {timeout, _, {activate_alarm, _}} -> + loop(State, CurConns, NbChildren, Sleepers); {'EXIT', Parent, Reason} -> terminate(State, Reason, NbChildren); {'EXIT', Pid, Reason} when Sleepers =:= [] -> @@ -245,18 +254,20 @@ loop(State=#state{parent=Parent, ref=Ref, id=Id, conn_type=ConnType, end. handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=HandshakeTimeout, - max_conns=MaxConns}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> + max_conns=MaxConns, alarms=Alarms0}, CurConns, NbChildren, Sleepers, To, Socket, SupPid, ProtocolPid) -> case Transport:controlling_process(Socket, ProtocolPid) of ok -> ProtocolPid ! {handshake, Ref, Transport, Socket, HandshakeTimeout}, put(SupPid, active), CurConns2 = CurConns + 1, - if CurConns2 < MaxConns -> + Sleepers2 = if CurConns2 < MaxConns -> To ! self(), - loop(State, CurConns2, NbChildren + 1, Sleepers); + Sleepers; true -> - loop(State, CurConns2, NbChildren + 1, [To|Sleepers]) - end; + [To|Sleepers] + end, + Alarms1 = trigger_alarms(Ref, Alarms0, CurConns2), + loop(State#state{alarms=Alarms1}, CurConns2, NbChildren + 1, Sleepers2); {error, _} -> Transport:close(Socket), %% Only kill the supervised pid, because the connection's pid, @@ -266,6 +277,45 @@ handshake(State=#state{ref=Ref, transport=Transport, handshake_timeout=Handshake loop(State, CurConns, NbChildren, Sleepers) end. +trigger_alarms(Ref, Alarms, CurConns) -> + maps:map( + fun + (AlarmName, Alarm) -> + trigger_alarm(Ref, AlarmName, Alarm, CurConns) + end, + Alarms + ). + +trigger_alarm(Ref, AlarmName, {Opts=#{treshold := Treshold, callback := Callback}, undefined}, CurConns) when CurConns >= Treshold -> + ActiveConns = [Pid || {Pid, active} <- get()], + case Callback of + {Mod, Fun} -> + spawn(Mod, Fun, [Ref, AlarmName, self(), ActiveConns]); + _ -> + Self = self(), + spawn(fun () -> Callback(Ref, AlarmName, Self, ActiveConns) end) + end, + {Opts, schedule_activate_alarm(AlarmName, Opts)}; +trigger_alarm(_, _, Alarm, _) -> + Alarm. + +schedule_activate_alarm(AlarmName, #{cooldown := Cooldown}) when Cooldown > 0 -> + erlang:start_timer(Cooldown, self(), {activate_alarm, AlarmName}); +schedule_activate_alarm(_, _) -> + undefined. + +get_alarms(#{alarms := Alarms}) when is_map(Alarms) -> + maps:fold( + fun + (Name, Opts = #{type := num_connections}, Acc) -> Acc#{Name => {Opts, undefined}}; + (_, _, Acc) -> Acc + end, + #{}, + Alarms + ); +get_alarms(_) -> + #{}. + set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, Sleepers0, TransOpts) -> MaxConns1 = maps:get(max_connections, TransOpts, 1024), HandshakeTimeout = maps:get(handshake_timeout, TransOpts, 5000), @@ -277,9 +327,69 @@ set_transport_options(State=#state{max_conns=MaxConns0}, CurConns, NbChildren, S false -> Sleepers0 end, - loop(State#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown}, + State1=set_alarm_option(State, TransOpts, CurConns), + loop(State1#state{max_conns=MaxConns1, handshake_timeout=HandshakeTimeout, shutdown=Shutdown}, CurConns, NbChildren, Sleepers1). +set_alarm_option(State=#state{ref=Ref, alarms=OldAlarms}, TransOpts, CurConns) -> + NewAlarms0 = get_alarms(TransOpts), + NewAlarms1 = merge_alarms(OldAlarms, NewAlarms0), + NewAlarms2 = trigger_alarms(Ref, NewAlarms1, CurConns), + State#state{alarms=NewAlarms2}. + +merge_alarms(Old, New) -> + OldList = lists:sort(maps:to_list(Old)), + NewList = lists:sort(maps:to_list(New)), + Merged = merge_alarms(OldList, NewList, []), + maps:from_list(Merged). + +merge_alarms([], News, Acc) -> + News ++ Acc; +merge_alarms([{_, {_, undefined}}|Olds], [], Acc) -> + merge_alarms(Olds, [], Acc); +merge_alarms([{_, {_, Timer}}|Olds], [], Acc) -> + _ = cancel_alarm_reactivation_timer(Timer), + merge_alarms(Olds, [], Acc); +merge_alarms([{Name, {OldOpts, Timer}}|Olds], [{Name, {NewOpts, _}}|News], Acc) -> + merge_alarms(Olds, News, [{Name, {NewOpts, adapt_alarm_timer(Name, Timer, OldOpts, NewOpts)}}|Acc]); +merge_alarms([{OldName, {_, Timer}}|Olds], News=[{NewName, _}|_], Acc) when OldName < NewName -> + _ = cancel_alarm_reactivation_timer(Timer), + merge_alarms(Olds, News, Acc); +merge_alarms(Olds, [New|News], Acc) -> + merge_alarms(Olds, News, [New|Acc]). + +%% Not in cooldown. +adapt_alarm_timer(_, undefined, _, _) -> + undefined; +%% Cooldown unchanged. +adapt_alarm_timer(_, Timer, #{cooldown := Cooldown}, #{cooldown := Cooldown}) -> + Timer; +%% Cooldown changed to no cooldown, cancel cooldown timer. +adapt_alarm_timer(_, Timer, _, #{cooldown := 0}) -> + _ = cancel_alarm_reactivation_timer(Timer), + undefined; +%% Cooldown changed, cancel current and start new timer taking the already elapsed time into account. +adapt_alarm_timer(Name, Timer, #{cooldown := OldCooldown}, #{cooldown := NewCooldown}) -> + OldTimeLeft = cancel_alarm_reactivation_timer(Timer), + case NewCooldown-OldCooldown+OldTimeLeft of + NewTimeLeft when NewTimeLeft>0 -> + erlang:start_timer(NewTimeLeft, self(), {activate_alarm, Name}); + _ -> + undefined + end. + +cancel_alarm_reactivation_timer(Timer) -> + case erlang:cancel_timer(Timer) of + %% Timer had already expired when we tried to cancel it, so we flush the + %% reactivation message it sent and return 0 as remaining time. + false -> + ok = receive {timeout, Timer, {activate_alarm, _}} -> ok after 0 -> ok end, + 0; + %% Timer has not yet expired, we return the amount of time that was remaining. + TimeLeft -> + TimeLeft + end. + -spec terminate(#state{}, any(), non_neg_integer()) -> no_return(). terminate(#state{shutdown=brutal_kill, id=Id, stats_counters_ref=StatsCounters}, Reason, NbChildren) -> diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl index 49625c6..53a1223 100644 --- a/test/acceptor_SUITE.erl +++ b/test/acceptor_SUITE.erl @@ -109,7 +109,8 @@ groups() -> misc_post_listen_callback_error, misc_set_transport_options, misc_wait_for_connections, - misc_multiple_ip_local_socket_opts + misc_multiple_ip_local_socket_opts, + misc_connection_alarms ]}, {supervisor, [ connection_type_supervisor, connection_type_supervisor_separate_from_connection, @@ -576,6 +577,79 @@ do_misc_multiple_ip_local_socket_opts() -> {error, enoent} = file:read_file_info(SockFile2), ok. +misc_connection_alarms(_) -> + doc("Ensure that connection alarms work."), + Name = name(), + + Self = self(), + TransOpts0 = #{num_conns_sups => 1}, + AlarmCallback = fun (Ref, AlarmName, _, ActiveConns) -> + Self ! {connection_alarm, {Ref, AlarmName, length(ActiveConns)}} + end, + Alarms0 = #{ + test1 => Alarm1 = #{type => num_connections, treshold => 2, cooldown => 0, callback => AlarmCallback}, + test2 => Alarm2 = #{type => num_connections, treshold => 3, cooldown => 0, callback => AlarmCallback} + }, + ConnectOpts = [binary, {active, false}, {packet, raw}], + + {ok, _} = ranch:start_listener(Name, ranch_tcp, + TransOpts0#{alarms => Alarms0}, notify_and_wait_protocol, #{pid => self()}), + Port = ranch:get_port(Name), + + {ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts), + {1, [Conn1]} = receive_loop(connected, 100), + #{test1 := undefined, test2 := undefined} = do_recv_connection_alarms(Name, 100), + + {ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts), + {1, [Conn2]} = receive_loop(connected, 100), + #{test1 := 2, test2 := undefined} = do_recv_connection_alarms(Name, 100), + + {ok, _} = gen_tcp:connect("localhost", Port, ConnectOpts), + {1, [Conn3]} = receive_loop(connected, 100), + #{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100), + + Alarms1 = #{ + test1 => Alarm1#{cooldown => 100}, + test2 => Alarm2#{cooldown => 100} + }, + ok = ranch:set_transport_options(Name, TransOpts0#{alarms => Alarms1}), + ok = do_flush_connection_alarms(Name), + #{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100), + ok = do_flush_connection_alarms(Name), + #{test1 := 3, test2 := 3} = do_recv_connection_alarms(Name, 100), + + Conn3 ! stop, + timer:sleep(100), + ok = do_flush_connection_alarms(Name), + #{test1 := 2, test2 := undefined} = do_recv_connection_alarms(Name, 100), + + Conn2 ! stop, + timer:sleep(100), + ok = do_flush_connection_alarms(Name), + #{test1 := undefined, test2 := undefined} = do_recv_connection_alarms(Name, 100), + + Conn1 ! stop, + + ok = ranch:stop_listener(Name), + ok. + +do_recv_connection_alarms(Name, Timeout) -> + do_recv_connection_alarms(Name, Timeout, #{test1 => undefined, test2 => undefined}). + +do_recv_connection_alarms(Name, Timeout, Acc) -> + receive {connection_alarm, {Name, AlarmName, N}} -> + do_recv_connection_alarms(Name, Timeout, Acc#{AlarmName => N}) + after Timeout -> + Acc + end. + +do_flush_connection_alarms(Name) -> + receive {connection_alarm, {Name, _, _}} -> + do_flush_connection_alarms(Name) + after 0 -> + ok + end. + %% ssl. ssl_accept_error(_) -> -- cgit v1.2.3