diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ranch.app.src | 2 | ||||
-rw-r--r-- | src/ranch.erl | 10 | ||||
-rw-r--r-- | src/ranch_acceptor.erl | 11 | ||||
-rw-r--r-- | src/ranch_conns_sup.erl | 89 | ||||
-rw-r--r-- | src/ranch_listener_sup.erl | 10 | ||||
-rw-r--r-- | src/ranch_ssl.erl | 33 | ||||
-rw-r--r-- | src/ranch_tcp.erl | 7 | ||||
-rw-r--r-- | src/ranch_transport.erl | 7 |
8 files changed, 119 insertions, 50 deletions
diff --git a/src/ranch.app.src b/src/ranch.app.src index 03b8dae..bb6db94 100644 --- a/src/ranch.app.src +++ b/src/ranch.app.src @@ -14,7 +14,7 @@ {application, ranch, [ {description, "Socket acceptor pool for TCP protocols."}, - {vsn, "0.8.5"}, + {vsn, "0.9.0"}, {modules, []}, {registered, [ranch_sup, ranch_server]}, {applications, [ diff --git a/src/ranch.erl b/src/ranch.erl index 74497f0..641fc4d 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -120,7 +120,7 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) andalso is_atom(Protocol) -> {{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [ Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts - ]}, permanent, 5000, supervisor, [ranch_listener_sup]}. + ]}, permanent, infinity, supervisor, [ranch_listener_sup]}. %% @doc Acknowledge the accepted connection. %% @@ -128,7 +128,9 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) %% the protocol process before starting to use it. -spec accept_ack(ref()) -> ok. accept_ack(Ref) -> - receive {shoot, Ref} -> ok end. + receive {shoot, Ref, Transport, Socket, AckTimeout} -> + Transport:accept_ack(Socket, AckTimeout) + end. %% @doc Remove the calling process' connection from the pool. %% @@ -175,7 +177,7 @@ set_protocol_options(Ref, Opts) -> %% It takes a list of options, a list of allowed keys and an accumulator. %% This accumulator can be used to set default options that should never %% be overriden. --spec filter_options([{atom(), any()} | {atom(), any(), any(), any()}], +-spec filter_options([{atom(), any()} | {raw, any(), any(), any()}], [atom()], Acc) -> Acc when Acc :: [any()]. filter_options([], _, Acc) -> Acc; @@ -200,7 +202,7 @@ set_option_default(Opts, Key, Value) -> end. %% @doc Start the given applications if they were not already started. --spec require(list(module())) -> ok. +-spec require([atom()]) -> ok. require([]) -> ok; require([App|Tail]) -> diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index f838b7d..da1aac5 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -48,4 +48,15 @@ loop(LSocket, Transport, ConnsSup) -> {error, Reason} when Reason =/= closed -> ok end, + flush(), ?MODULE:loop(LSocket, Transport, ConnsSup). + +flush() -> + receive Msg -> + error_logger:error_msg( + "Ranch acceptor received unexpected message: ~p~n", + [Msg]), + flush() + after 0 -> + ok + end. diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl index 245a5e0..308a1ab 100644 --- a/src/ranch_conns_sup.erl +++ b/src/ranch_conns_sup.erl @@ -20,34 +20,38 @@ -module(ranch_conns_sup). %% API. --export([start_link/4]). +-export([start_link/6]). -export([start_protocol/2]). -export([active_connections/1]). %% Supervisor internals. --export([init/5]). +-export([init/7]). -export([system_continue/3]). -export([system_terminate/4]). -export([system_code_change/4]). -type conn_type() :: worker | supervisor. +-type shutdown() :: brutal_kill | timeout(). -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), conn_type :: conn_type(), + shutdown :: shutdown(), transport = undefined :: module(), protocol = undefined :: module(), opts :: any(), - max_conns = undefined :: non_neg_integer() | infinity + ack_timeout :: timeout(), + max_conns = undefined :: ranch:max_conns() }). %% API. --spec start_link(ranch:ref(), conn_type(), module(), module()) -> {ok, pid()}. -start_link(Ref, ConnType, Transport, Protocol) -> +-spec start_link(ranch:ref(), conn_type(), shutdown(), module(), + timeout(), module()) -> {ok, pid()}. +start_link(Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) -> proc_lib:start_link(?MODULE, init, - [self(), Ref, ConnType, Transport, Protocol]). + [self(), Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]). %% We can safely assume we are on the same node as the supervisor. %% @@ -92,26 +96,28 @@ active_connections(SupPid) -> %% Supervisor internals. --spec init(pid(), ranch:ref(), conn_type(), module(), module()) -> no_return(). -init(Parent, Ref, ConnType, Transport, Protocol) -> +-spec init(pid(), ranch:ref(), conn_type(), shutdown(), + module(), timeout(), module()) -> no_return(). +init(Parent, Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol) -> process_flag(trap_exit, true), ok = ranch_server:set_connections_sup(Ref, self()), MaxConns = ranch_server:get_max_connections(Ref), Opts = ranch_server:get_protocol_options(Ref), ok = proc_lib:init_ack(Parent, {ok, self()}), loop(#state{parent=Parent, ref=Ref, conn_type=ConnType, - transport=Transport, protocol=Protocol, opts=Opts, - max_conns=MaxConns}, 0, 0, []). + shutdown=Shutdown, transport=Transport, protocol=Protocol, + opts=Opts, ack_timeout=AckTimeout, max_conns=MaxConns}, 0, 0, []). loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, transport=Transport, protocol=Protocol, opts=Opts, - max_conns=MaxConns}, CurConns, NbChildren, Sleepers) -> + ack_timeout=AckTimeout, max_conns=MaxConns}, + CurConns, NbChildren, Sleepers) -> receive {?MODULE, start_protocol, To, Socket} -> case Protocol:start_link(Ref, Socket, Transport, Opts) of {ok, Pid} -> Transport:controlling_process(Socket, Pid), - Pid ! {shoot, Ref}, + Pid ! {shoot, Ref, Transport, Socket, AckTimeout}, put(Pid, true), CurConns2 = CurConns + 1, if CurConns2 < MaxConns -> @@ -122,8 +128,12 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, loop(State, CurConns2, NbChildren + 1, [To|Sleepers]) end; - _ -> + Ret -> To ! self(), + error_logger:error_msg( + "Ranch listener ~p connection process start failure; " + "~p:start_link/4 returned: ~999999p~n", + [Ref, Protocol, Ret]), Transport:close(Socket), loop(State, CurConns, NbChildren, Sleepers) end; @@ -147,7 +157,7 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, loop(State#state{opts=Opts2}, CurConns, NbChildren, Sleepers); {'EXIT', Parent, Reason} -> - exit(Reason); + terminate(State, Reason, NbChildren); {'EXIT', Pid, Reason} when Sleepers =:= [] -> report_error(Ref, Protocol, Pid, Reason), erase(Pid), @@ -186,12 +196,59 @@ loop(State=#state{parent=Parent, ref=Ref, conn_type=ConnType, [Ref, Msg]) end. +-spec terminate(#state{}, any(), non_neg_integer()) -> no_return(). +%% Kill all children and then exit. We unlink first to avoid +%% getting a message for each child getting killed. +terminate(#state{shutdown=brutal_kill}, Reason, _) -> + Pids = get_keys(true), + _ = [begin + unlink(P), + exit(P, kill) + end || P <- Pids], + exit(Reason); +%% Attempt to gracefully shutdown all children. +terminate(#state{shutdown=Shutdown}, Reason, NbChildren) -> + shutdown_children(), + _ = if + Shutdown =:= infinity -> + ok; + true -> + erlang:send_after(Shutdown, self(), kill) + end, + wait_children(NbChildren), + exit(Reason). + +%% Monitor processes so we can know which ones have shutdown +%% before the timeout. Unlink so we avoid receiving an extra +%% message. Then send a shutdown exit signal. +shutdown_children() -> + Pids = get_keys(true), + _ = [begin + monitor(process, P), + unlink(P), + exit(P, shutdown) + end || P <- Pids], + ok. + +wait_children(0) -> + ok; +wait_children(NbChildren) -> + receive + {'DOWN', _, process, Pid, _} -> + _ = erase(Pid), + wait_children(NbChildren - 1); + kill -> + Pids = get_keys(true), + _ = [exit(P, kill) || P <- Pids], + ok + end. + system_continue(_, _, {State, CurConns, NbChildren, Sleepers}) -> loop(State, CurConns, NbChildren, Sleepers). -spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). +system_terminate(Reason, _, _, {State, _, NbChildren, _}) -> + terminate(State, Reason, NbChildren). system_code_change(Misc, _, _, _) -> {ok, Misc}. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl index 0392105..30017d0 100644 --- a/src/ranch_listener_sup.erl +++ b/src/ranch_listener_sup.erl @@ -36,15 +36,15 @@ start_link(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> %% supervisor. init({Ref, NbAcceptors, Transport, TransOpts, Protocol}) -> + AckTimeout = proplists:get_value(ack_timeout, TransOpts, 5000), ConnType = proplists:get_value(connection_type, TransOpts, worker), + Shutdown = proplists:get_value(shutdown, TransOpts, 5000), ChildSpecs = [ - %% conns_sup {ranch_conns_sup, {ranch_conns_sup, start_link, - [Ref, ConnType, Transport, Protocol]}, + [Ref, ConnType, Shutdown, Transport, AckTimeout, Protocol]}, permanent, infinity, supervisor, [ranch_conns_sup]}, - %% acceptors_sup {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, - [Ref, NbAcceptors, Transport, TransOpts] - }, permanent, infinity, supervisor, [ranch_acceptors_sup]} + [Ref, NbAcceptors, Transport, TransOpts]}, + permanent, infinity, supervisor, [ranch_acceptors_sup]} ], {ok, {{rest_for_one, 10, 10}, ChildSpecs}}. diff --git a/src/ranch_ssl.erl b/src/ranch_ssl.erl index 097b31c..dc29a18 100644 --- a/src/ranch_ssl.erl +++ b/src/ranch_ssl.erl @@ -30,6 +30,7 @@ -export([messages/0]). -export([listen/1]). -export([accept/2]). +-export([accept_ack/2]). -export([connect/3]). -export([connect/4]). -export([recv/3]). @@ -165,13 +166,18 @@ listen(Opts) -> %% @see ssl:transport_accept/2 %% @see ssl:ssl_accept/2 -spec accept(ssl:sslsocket(), timeout()) - -> {ok, ssl:sslsocket()} | {error, closed | timeout | atom() | tuple()}. + -> {ok, ssl:sslsocket()} | {error, closed | timeout | atom()}. accept(LSocket, Timeout) -> - case ssl:transport_accept(LSocket, Timeout) of - {ok, CSocket} -> - ssl_accept(CSocket, Timeout); + ssl:transport_accept(LSocket, Timeout). + +-spec accept_ack(ssl:sslsocket(), timeout()) -> ok. +accept_ack(CSocket, Timeout) -> + case ssl:ssl_accept(CSocket, Timeout) of + ok -> + ok; {error, Reason} -> - {error, Reason} + ok = close(CSocket), + error(Reason) end. %% @private Experimental. Open a connection to the given host and port number. @@ -209,7 +215,7 @@ send(Socket, Packet) -> ssl:send(Socket, Packet). %% @equiv sendfile(Socket, Filename, 0, 0, []) --spec sendfile(ssl:sslsocket(), file:name_all()) +-spec sendfile(ssl:sslsocket(), file:name_all() | file:fd()) -> {ok, non_neg_integer()} | {error, atom()}. sendfile(Socket, Filename) -> sendfile(Socket, Filename, 0, 0, []). @@ -275,21 +281,6 @@ close(Socket) -> %% Internal. -%% This call always times out, either because a numeric timeout value -%% was given, or because we've decided to use 5000ms instead of infinity. -%% This value should be reasonable enough for the moment. --spec ssl_accept(ssl:sslsocket(), timeout()) - -> {ok, ssl:sslsocket()} | {error, {ssl_accept, atom()}}. -ssl_accept(Socket, infinity) -> - ssl_accept(Socket, 5000); -ssl_accept(Socket, Timeout) -> - case ssl:ssl_accept(Socket, Timeout) of - ok -> - {ok, Socket}; - {error, Reason} -> - {error, {ssl_accept, Reason}} - end. - %% Unfortunately the implementation of elliptic-curve ciphers that has %% been introduced in R16B01 is incomplete. Depending on the particular %% client, this can cause the TLS handshake to break during key diff --git a/src/ranch_tcp.erl b/src/ranch_tcp.erl index abf7612..d5d5003 100644 --- a/src/ranch_tcp.erl +++ b/src/ranch_tcp.erl @@ -24,6 +24,7 @@ -export([messages/0]). -export([listen/1]). -export([accept/2]). +-export([accept_ack/2]). -export([connect/3]). -export([connect/4]). -export([recv/3]). @@ -90,6 +91,10 @@ listen(Opts) -> accept(LSocket, Timeout) -> gen_tcp:accept(LSocket, Timeout). +-spec accept_ack(inet:socket(), timeout()) -> ok. +accept_ack(_, _) -> + ok. + %% @private Experimental. Open a connection to the given host and port number. %% @see gen_tcp:connect/3 %% @todo Probably filter Opts? @@ -126,7 +131,7 @@ send(Socket, Packet) -> gen_tcp:send(Socket, Packet). %% @equiv sendfile(Socket, File, Offset, Bytes, []) --spec sendfile(inet:socket(), file:name_all()) +-spec sendfile(inet:socket(), file:name_all() | file:fd()) -> {ok, non_neg_integer()} | {error, atom()}. sendfile(Socket, Filename) -> sendfile(Socket, Filename, 0, 0, []). diff --git a/src/ranch_transport.erl b/src/ranch_transport.erl index fe06420..5cf10d1 100644 --- a/src/ranch_transport.erl +++ b/src/ranch_transport.erl @@ -46,7 +46,10 @@ %% Accept connections with the given listening socket. -callback accept(socket(), timeout()) - -> {ok, socket()} | {error, closed | timeout | atom() | tuple()}. + -> {ok, socket()} | {error, closed | timeout | atom()}. + +%% Perform post-accept operations on the socket. +-callback accept_ack(socket(), timeout()) -> ok. %% Experimental. Open a connection to the given host and port number. -callback connect(string(), inet:port_number(), opts()) @@ -65,7 +68,7 @@ -callback send(socket(), iodata()) -> ok | {error, atom()}. %% Send a file on a socket. --callback sendfile(socket(), file:name()) +-callback sendfile(socket(), file:name() | file:fd()) -> {ok, non_neg_integer()} | {error, atom()}. %% Send part of a file on a socket. |