From 7006c50c3ed6c3cbcb24e9e88a76ebd1aaf3a5f8 Mon Sep 17 00:00:00 2001 From: "j.uhlig" Date: Mon, 9 Apr 2018 12:53:02 +0200 Subject: Add suspend/resume of listeners and update of transport options This allows graceful draining of connections, updating transport options on a running listener without having to drop connections and other similar scenarios. Note that when updating transport options the listener must be suspended which means that new connections will be rejected until the listener is resumed. --- src/ranch.erl | 81 ++++++++++++++++++++++++++++++++++++++++++--- src/ranch_acceptors_sup.erl | 11 +++--- src/ranch_listener_sup.erl | 4 +-- src/ranch_server.erl | 41 +++++++++++++++-------- 4 files changed, 113 insertions(+), 24 deletions(-) (limited to 'src') diff --git a/src/ranch.erl b/src/ranch.erl index 70fb64c..5af4b74 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -17,17 +17,23 @@ -export([start_listener/5]). -export([start_listener/6]). -export([stop_listener/1]). +-export([suspend_listener/1]). +-export([resume_listener/1]). -export([child_spec/5]). -export([child_spec/6]). -export([accept_ack/1]). -export([remove_connection/1]). +-export([get_status/1]). -export([get_addr/1]). -export([get_port/1]). -export([get_max_connections/1]). -export([set_max_connections/2]). +-export([get_transport_options/1]). +-export([set_transport_options/2]). -export([get_protocol_options/1]). -export([set_protocol_options/2]). -export([info/0]). +-export([info/1]). -export([procs/2]). -export([filter_options/3]). -export([set_option_default/3]). @@ -110,6 +116,37 @@ stop_listener(Ref) -> {error, Reason} end. +-spec suspend_listener(ref()) -> ok | {error, term()}. +suspend_listener(Ref) -> + case get_status(Ref) of + running -> + ListenerSup = ranch_server:get_listener_sup(Ref), + ok = ranch_server:set_addr(Ref, {undefined, undefined}), + supervisor:terminate_child(ListenerSup, ranch_acceptors_sup); + suspended -> + ok + end. + +-spec resume_listener(ref()) -> ok | {error, term()}. +resume_listener(Ref) -> + case get_status(Ref) of + running -> + ok; + suspended -> + ListenerSup = ranch_server:get_listener_sup(Ref), + Res = supervisor:restart_child(ListenerSup, ranch_acceptors_sup), + maybe_resumed(Res) + end. + +maybe_resumed(Error={error, {listen_error, _, Reason}}) -> + start_error(Reason, Error); +maybe_resumed({ok, _}) -> + ok; +maybe_resumed({ok, _, _}) -> + ok; +maybe_resumed(Res) -> + Res. + -spec child_spec(ref(), module(), any(), module(), any()) -> supervisor:child_spec(). child_spec(Ref, Transport, TransOpts, Protocol, ProtoOpts) -> @@ -137,11 +174,22 @@ remove_connection(Ref) -> ConnsSup ! {remove_connection, Ref, self()}, ok. --spec get_addr(ref()) -> {inet:ip_address(), inet:port_number()}. +-spec get_status(ref()) -> running | suspended | restarting. +get_status(Ref) -> + ListenerSup = ranch_server:get_listener_sup(Ref), + Children = supervisor:which_children(ListenerSup), + case lists:keyfind(ranch_acceptors_sup, 1, Children) of + {_, undefined, _, _} -> + suspended; + {_, AcceptorsSup, _, _} when is_pid(AcceptorsSup) -> + running + end. + +-spec get_addr(ref()) -> {inet:ip_address(), inet:port_number()} | {undefined, undefined}. get_addr(Ref) -> ranch_server:get_addr(Ref). --spec get_port(ref()) -> inet:port_number(). +-spec get_port(ref()) -> inet:port_number() | undefined. get_port(Ref) -> {_, Port} = get_addr(Ref), Port. @@ -154,6 +202,19 @@ get_max_connections(Ref) -> set_max_connections(Ref, MaxConnections) -> ranch_server:set_max_connections(Ref, MaxConnections). +-spec get_transport_options(ref()) -> any(). +get_transport_options(Ref) -> + ranch_server:get_transport_options(Ref). + +-spec set_transport_options(ref(), any()) -> ok | {error, running}. +set_transport_options(Ref, TransOpts) -> + case get_status(Ref) of + suspended -> + ok = ranch_server:set_transport_options(Ref, TransOpts); + running -> + {error, running} + end. + -spec get_protocol_options(ref()) -> any(). get_protocol_options(Ref) -> ranch_server:get_protocol_options(Ref). @@ -167,14 +228,22 @@ info() -> [{Ref, listener_info(Ref, Pid)} || {Ref, Pid} <- ranch_server:get_listener_sups()]. +-spec info(ref()) -> [{atom(), any()}]. +info(Ref) -> + Pid = ranch_server:get_listener_sup(Ref), + listener_info(Ref, Pid). + listener_info(Ref, Pid) -> - [_, NumAcceptors, Transport, TransOpts, Protocol, _] = ranch_server:get_listener_start_args(Ref), + [_, NumAcceptors, Transport, _, Protocol, _] = ranch_server:get_listener_start_args(Ref), ConnsSup = ranch_server:get_connections_sup(Ref), + Status = get_status(Ref), {IP, Port} = get_addr(Ref), MaxConns = get_max_connections(Ref), + TransOpts = ranch_server:get_transport_options(Ref), ProtoOpts = get_protocol_options(Ref), [ {pid, Pid}, + {status, Status}, {ip, IP}, {port, Port}, {num_acceptors, NumAcceptors}, @@ -197,7 +266,11 @@ procs1(Ref, Sup) -> ListenerSup = ranch_server:get_listener_sup(Ref), {_, SupPid, _, _} = lists:keyfind(Sup, 1, supervisor:which_children(ListenerSup)), - [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)]. + try + [Pid || {_, Pid, _, _} <- supervisor:which_children(SupPid)] + catch exit:{noproc, _} when Sup =:= ranch_acceptors_sup -> + [] + end. -spec filter_options([inet | inet6 | {atom(), any()} | {raw, any(), any(), any()}], [atom()], Acc) -> Acc when Acc :: [any()]. diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index f4b2fd8..9e39f2d 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -15,16 +15,17 @@ -module(ranch_acceptors_sup). -behaviour(supervisor). --export([start_link/4]). +-export([start_link/3]). -export([init/1]). --spec start_link(ranch:ref(), non_neg_integer(), module(), any()) +-spec start_link(ranch:ref(), non_neg_integer(), module()) -> {ok, pid()}. -start_link(Ref, NumAcceptors, Transport, TransOpts) -> - supervisor:start_link(?MODULE, [Ref, NumAcceptors, Transport, TransOpts]). +start_link(Ref, NumAcceptors, Transport) -> + supervisor:start_link(?MODULE, [Ref, NumAcceptors, Transport]). -init([Ref, NumAcceptors, Transport, TransOpts]) -> +init([Ref, NumAcceptors, Transport]) -> ConnsSup = ranch_server:get_connections_sup(Ref), + TransOpts = ranch_server:get_transport_options(Ref), LSocket = case proplists:get_value(socket, TransOpts) of undefined -> TransOpts2 = proplists:delete(ack_timeout, diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index d3bc59d..502df44 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -22,7 +22,7 @@ -> {ok, pid()}. start_link(Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> MaxConns = proplists:get_value(max_connections, TransOpts, 1024), - ranch_server:set_new_listener_opts(Ref, MaxConns, ProtoOpts, + ranch_server:set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts, [Ref, NumAcceptors, Transport, TransOpts, Protocol, ProtoOpts]), supervisor:start_link(?MODULE, { Ref, NumAcceptors, Transport, TransOpts, Protocol @@ -38,7 +38,7 @@ init({Ref, NumAcceptors, Transport, TransOpts, Protocol}) -> [Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]}, permanent, infinity, supervisor, [ranch_conns_sup]}, {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, - [Ref, NumAcceptors, Transport, TransOpts]}, + [Ref, NumAcceptors, Transport]}, permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 1, 5}, ChildSpecs}}. diff --git a/src/ranch_server.erl b/src/ranch_server.erl index 89c508c..80f82d6 100644 --- a/src/ranch_server.erl +++ b/src/ranch_server.erl @@ -17,7 +17,7 @@ %% API. -export([start_link/0]). --export([set_new_listener_opts/4]). +-export([set_new_listener_opts/5]). -export([cleanup_listener_opts/1]). -export([set_connections_sup/2]). -export([get_connections_sup/1]). @@ -29,6 +29,8 @@ -export([get_addr/1]). -export([set_max_connections/2]). -export([get_max_connections/1]). +-export([set_transport_options/2]). +-export([get_transport_options/1]). -export([set_protocol_options/2]). -export([get_protocol_options/1]). -export([get_listener_start_args/1]). @@ -55,15 +57,16 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). --spec set_new_listener_opts(ranch:ref(), ranch:max_conns(), any(), [any()]) -> ok. -set_new_listener_opts(Ref, MaxConns, ProtoOpts, StartArgs) -> - gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, ProtoOpts, StartArgs}). +-spec set_new_listener_opts(ranch:ref(), ranch:max_conns(), any(), any(), [any()]) -> ok. +set_new_listener_opts(Ref, MaxConns, TransOpts, ProtoOpts, StartArgs) -> + gen_server:call(?MODULE, {set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartArgs}). -spec cleanup_listener_opts(ranch:ref()) -> ok. cleanup_listener_opts(Ref) -> _ = ets:delete(?TAB, {addr, Ref}), _ = ets:delete(?TAB, {max_conns, Ref}), - _ = ets:delete(?TAB, {opts, Ref}), + _ = ets:delete(?TAB, {trans_opts, Ref}), + _ = ets:delete(?TAB, {proto_opts, Ref}), _ = ets:delete(?TAB, {listener_start_args, Ref}), %% We also remove the pid of the connections supervisor. %% Depending on the timing, it might already have been deleted @@ -103,11 +106,11 @@ get_listener_sup(Ref) -> get_listener_sups() -> [{Ref, Pid} || [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})]. --spec set_addr(ranch:ref(), {inet:ip_address(), inet:port_number()}) -> ok. +-spec set_addr(ranch:ref(), {inet:ip_address(), inet:port_number()} | {undefined, undefined}) -> ok. set_addr(Ref, Addr) -> gen_server:call(?MODULE, {set_addr, Ref, Addr}). --spec get_addr(ranch:ref()) -> {inet:ip_address(), inet:port_number()}. +-spec get_addr(ranch:ref()) -> {inet:ip_address(), inet:port_number()} | {undefined, undefined}. get_addr(Ref) -> ets:lookup_element(?TAB, {addr, Ref}, 2). @@ -119,13 +122,21 @@ set_max_connections(Ref, MaxConnections) -> get_max_connections(Ref) -> ets:lookup_element(?TAB, {max_conns, Ref}, 2). +-spec set_transport_options(ranch:ref(), any()) -> ok. +set_transport_options(Ref, TransOpts) -> + gen_server:call(?MODULE, {set_trans_opts, Ref, TransOpts}). + +-spec get_transport_options(ranch:ref()) -> any(). +get_transport_options(Ref) -> + ets:lookup_element(?TAB, {trans_opts, Ref}, 2). + -spec set_protocol_options(ranch:ref(), any()) -> ok. set_protocol_options(Ref, ProtoOpts) -> - gen_server:call(?MODULE, {set_opts, Ref, ProtoOpts}). + gen_server:call(?MODULE, {set_proto_opts, Ref, ProtoOpts}). -spec get_protocol_options(ranch:ref()) -> any(). get_protocol_options(Ref) -> - ets:lookup_element(?TAB, {opts, Ref}, 2). + ets:lookup_element(?TAB, {proto_opts, Ref}, 2). -spec get_listener_start_args(ranch:ref()) -> [any()]. get_listener_start_args(Ref) -> @@ -144,9 +155,10 @@ init([]) -> [Ref, Pid] <- ets:match(?TAB, {{listener_sup, '$1'}, '$2'})], {ok, #state{monitors=ConnMonitors++ListenerMonitors}}. -handle_call({set_new_listener_opts, Ref, MaxConns, ProtoOpts, StartArgs}, _, State) -> +handle_call({set_new_listener_opts, Ref, MaxConns, TransOpts, ProtoOpts, StartArgs}, _, State) -> ets:insert(?TAB, {{max_conns, Ref}, MaxConns}), - ets:insert(?TAB, {{opts, Ref}, ProtoOpts}), + ets:insert(?TAB, {{trans_opts, Ref}, TransOpts}), + ets:insert(?TAB, {{proto_opts, Ref}, ProtoOpts}), ets:insert(?TAB, {{listener_start_args, Ref}, StartArgs}), {reply, ok, State}; handle_call({set_connections_sup, Ref, Pid}, _, @@ -177,8 +189,11 @@ handle_call({set_max_conns, Ref, MaxConns}, _, State) -> ConnsSup = get_connections_sup(Ref), ConnsSup ! {set_max_conns, MaxConns}, {reply, ok, State}; -handle_call({set_opts, Ref, Opts}, _, State) -> - ets:insert(?TAB, {{opts, Ref}, Opts}), +handle_call({set_trans_opts, Ref, Opts}, _, State) -> + ets:insert(?TAB, {{trans_opts, Ref}, Opts}), + {reply, ok, State}; +handle_call({set_proto_opts, Ref, Opts}, _, State) -> + ets:insert(?TAB, {{proto_opts, Ref}, Opts}), ConnsSup = get_connections_sup(Ref), ConnsSup ! {set_opts, Opts}, {reply, ok, State}; -- cgit v1.2.3