From 4156fa3ebb978a21b52abf94202bb3d7676f4c03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 12 Apr 2012 23:22:52 +0200 Subject: Import the acceptor code from Cowboy Modules were renamed. The 'cowboy_' prefix became 'ranch_'. At the same time, ranch_ssl_transport became ranch_ssl, and ranch_tcp_transport became ranch_tcp, because appending '_transport' felt a bit redundant considering SSL and TCP clearly are transports. One test has been added to make sure everything is working. --- src/ranch.app.src | 26 +++++ src/ranch.erl | 114 ++++++++++++++++++++++ src/ranch_acceptor.erl | 57 +++++++++++ src/ranch_acceptors_sup.erl | 48 ++++++++++ src/ranch_app.erl | 53 +++++++++++ src/ranch_conns_sup.erl | 44 +++++++++ src/ranch_listener.erl | 224 ++++++++++++++++++++++++++++++++++++++++++++ src/ranch_listener_sup.erl | 46 +++++++++ src/ranch_ssl.erl | 170 +++++++++++++++++++++++++++++++++ src/ranch_sup.erl | 40 ++++++++ src/ranch_tcp.erl | 113 ++++++++++++++++++++++ 11 files changed, 935 insertions(+) create mode 100644 src/ranch.app.src create mode 100644 src/ranch.erl create mode 100644 src/ranch_acceptor.erl create mode 100644 src/ranch_acceptors_sup.erl create mode 100644 src/ranch_app.erl create mode 100644 src/ranch_conns_sup.erl create mode 100644 src/ranch_listener.erl create mode 100644 src/ranch_listener_sup.erl create mode 100644 src/ranch_ssl.erl create mode 100644 src/ranch_sup.erl create mode 100644 src/ranch_tcp.erl (limited to 'src') diff --git a/src/ranch.app.src b/src/ranch.app.src new file mode 100644 index 0000000..becbe4c --- /dev/null +++ b/src/ranch.app.src @@ -0,0 +1,26 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +{application, ranch, [ + {description, "Socket acceptor pool for TCP protocols."}, + {vsn, "0.1.0"}, + {modules, []}, + {registered, [ranch_sup]}, + {applications, [ + kernel, + stdlib + ]}, + {mod, {ranch_app, []}}, + {env, []} +]}. diff --git a/src/ranch.erl b/src/ranch.erl new file mode 100644 index 0000000..ab9d767 --- /dev/null +++ b/src/ranch.erl @@ -0,0 +1,114 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc Ranch API to start and stop listeners. +-module(ranch). + +-export([start_listener/6, stop_listener/1, child_spec/6, accept_ack/1, + get_protocol_options/1, set_protocol_options/2]). + +%% @doc Start a listener for the given transport and protocol. +%% +%% A listener is effectively a pool of NbAcceptors acceptors. +%% Acceptors accept connections on the given Transport and forward +%% connections to the given Protocol handler. Both transport and +%% protocol modules can be given options through the TransOpts and +%% the ProtoOpts arguments. Available options are documented in the +%% listen transport function and in the protocol module of your choice. +%% +%% All acceptor and connection processes are supervised by the listener. +%% +%% It is recommended to set a large enough number of acceptors to improve +%% performance. The exact number depends of course on your hardware, on the +%% protocol used and on the number of expected simultaneous connections. +%% +%% The Transport option max_connections allows you to define +%% the maximum number of simultaneous connections for this listener. It defaults +%% to 1024. See ranch_listener for more details on limiting the number +%% of connections. +%% +%% Ref can be used to stop the listener later on. +-spec start_listener(any(), non_neg_integer(), module(), any(), module(), any()) + -> {ok, pid()}. +start_listener(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) + when is_integer(NbAcceptors) andalso is_atom(Transport) + andalso is_atom(Protocol) -> + supervisor:start_child(ranch_sup, child_spec(Ref, NbAcceptors, + Transport, TransOpts, Protocol, ProtoOpts)). + +%% @doc Stop a listener identified by Ref. +%% +%% Note that stopping the listener will close all currently running +%% connections abruptly. +-spec stop_listener(any()) -> ok | {error, not_found}. +stop_listener(Ref) -> + case supervisor:terminate_child(ranch_sup, {ranch_listener_sup, Ref}) of + ok -> + supervisor:delete_child(ranch_sup, {ranch_listener_sup, Ref}); + {error, Reason} -> + {error, Reason} + end. + +%% @doc Return a child spec suitable for embedding. +%% +%% When you want to embed Ranch in another application, you can use this +%% function to create a ChildSpec suitable for use in a supervisor. +%% The parameters are the same as in start_listener/6 but rather +%% than hooking the listener to the Ranch internal supervisor, it just returns +%% the spec. +-spec child_spec(any(), non_neg_integer(), module(), any(), module(), any()) + -> supervisor:child_spec(). +child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) + when is_integer(NbAcceptors) andalso is_atom(Transport) + andalso is_atom(Protocol) -> + {{ranch_listener_sup, Ref}, {ranch_listener_sup, start_link, [ + NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts + ]}, permanent, 5000, supervisor, [ranch_listener_sup]}. + +%% @doc Acknowledge the accepted connection. +%% +%% Effectively used to make sure the socket control has been given to +%% the protocol process before starting to use it. +-spec accept_ack(pid()) -> ok. +accept_ack(ListenerPid) -> + receive {shoot, ListenerPid} -> ok end. + +%% @doc Return the current protocol options for the given listener. +-spec get_protocol_options(any()) -> any(). +get_protocol_options(Ref) -> + ListenerPid = ref_to_listener_pid(Ref), + {ok, ProtoOpts} = ranch_listener:get_protocol_options(ListenerPid), + ProtoOpts. + +%% @doc Upgrade the protocol options for the given listener. +%% +%% The upgrade takes place at the acceptor level, meaning that only the +%% newly accepted connections receive the new protocol options. This has +%% no effect on the currently opened connections. +-spec set_protocol_options(any(), any()) -> ok. +set_protocol_options(Ref, ProtoOpts) -> + ListenerPid = ref_to_listener_pid(Ref), + ok = ranch_listener:set_protocol_options(ListenerPid, ProtoOpts). + +%% Internal. + +-spec ref_to_listener_pid(any()) -> pid(). +ref_to_listener_pid(Ref) -> + Children = supervisor:which_children(ranch_sup), + {_, ListenerSupPid, _, _} = lists:keyfind( + {ranch_listener_sup, Ref}, 1, Children), + ListenerSupChildren = supervisor:which_children(ListenerSupPid), + {_, ListenerPid, _, _} = lists:keyfind( + ranch_listener, 1, ListenerSupChildren), + ListenerPid. diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl new file mode 100644 index 0000000..f35b025 --- /dev/null +++ b/src/ranch_acceptor.erl @@ -0,0 +1,57 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(ranch_acceptor). + +-export([start_link/6]). %% API. +-export([acceptor/7]). %% Internal. + +%% API. + +-spec start_link(inet:socket(), module(), module(), any(), + pid(), pid()) -> {ok, pid()}. +start_link(LSocket, Transport, Protocol, Opts, + ListenerPid, ConnsSup) -> + Pid = spawn_link(?MODULE, acceptor, + [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]), + {ok, Pid}. + +%% Internal. + +-spec acceptor(inet:socket(), module(), module(), any(), + non_neg_integer(), pid(), pid()) -> no_return(). +acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) -> + Res = case Transport:accept(LSocket, 2000) of + {ok, CSocket} -> + {ok, Pid} = supervisor:start_child(ConnsSup, + [ListenerPid, CSocket, Transport, Protocol, Opts]), + Transport:controlling_process(CSocket, Pid), + ranch_listener:add_connection(ListenerPid, + default, Pid, OptsVsn); + {error, timeout} -> + ranch_listener:check_upgrades(ListenerPid, OptsVsn); + {error, _Reason} -> + %% @todo Probably do something here. If the socket was closed, + %% we may want to try and listen again on the port? + ok + end, + case Res of + ok -> + ?MODULE:acceptor(LSocket, Transport, Protocol, + Opts, OptsVsn, ListenerPid, ConnsSup); + {upgrade, Opts2, OptsVsn2} -> + ?MODULE:acceptor(LSocket, Transport, Protocol, + Opts2, OptsVsn2, ListenerPid, ConnsSup) + end. diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl new file mode 100644 index 0000000..3b32668 --- /dev/null +++ b/src/ranch_acceptors_sup.erl @@ -0,0 +1,48 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(ranch_acceptors_sup). +-behaviour(supervisor). + +-export([start_link/7]). %% API. +-export([init/1]). %% supervisor. + +%% API. + +-spec start_link(non_neg_integer(), module(), any(), + module(), any(), pid(), pid()) -> {ok, pid()}. +start_link(NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ConnsPid) -> + supervisor:start_link(?MODULE, [NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ConnsPid]). + +%% supervisor. + +-spec init([any()]) -> {'ok', {{'one_for_one', 10, 10}, [{ + any(), {atom() | tuple(), atom(), 'undefined' | [any()]}, + 'permanent' | 'temporary' | 'transient', + 'brutal_kill' | 'infinity' | non_neg_integer(), + 'supervisor' | 'worker', + 'dynamic' | [atom() | tuple()]}] +}}. +init([NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ConnsPid]) -> + {ok, LSocket} = Transport:listen(TransOpts), + Procs = [{{acceptor, self(), N}, {ranch_acceptor, start_link, [ + LSocket, Transport, Protocol, ProtoOpts, + ListenerPid, ConnsPid + ]}, permanent, brutal_kill, worker, []} + || N <- lists:seq(1, NbAcceptors)], + {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/ranch_app.erl b/src/ranch_app.erl new file mode 100644 index 0000000..0dc687c --- /dev/null +++ b/src/ranch_app.erl @@ -0,0 +1,53 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(ranch_app). +-behaviour(application). + +-export([start/2, stop/1, profile_output/0]). %% API. + +-type application_start_type() :: normal + | {takeover, node()} | {failover, node()}. + +%% API. + +-spec start(application_start_type(), any()) -> {ok, pid()}. +start(_Type, _Args) -> + consider_profiling(), + ranch_sup:start_link(). + +-spec stop(any()) -> ok. +stop(_State) -> + ok. + +-spec profile_output() -> ok. +profile_output() -> + eprof:stop_profiling(), + eprof:log("procs.profile"), + eprof:analyze(procs), + eprof:log("total.profile"), + eprof:analyze(total). + +%% Internal. + +-spec consider_profiling() -> profiling | not_profiling. +consider_profiling() -> + case application:get_env(profile) of + {ok, true} -> + {ok, _Pid} = eprof:start(), + eprof:start_profiling([self()]); + _ -> + not_profiling + end. diff --git a/src/ranch_conns_sup.erl b/src/ranch_conns_sup.erl new file mode 100644 index 0000000..41c1919 --- /dev/null +++ b/src/ranch_conns_sup.erl @@ -0,0 +1,44 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(ranch_conns_sup). +-behaviour(supervisor). + +-export([start_link/0, start_protocol/5]). %% API. +-export([init/1]). %% supervisor. + +%% API. + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link(?MODULE, []). + +-spec start_protocol(pid(), inet:socket(), module(), module(), any()) + -> {ok, pid()}. +start_protocol(ListenerPid, Socket, Transport, Protocol, Opts) -> + Protocol:start_link(ListenerPid, Socket, Transport, Opts). + +%% supervisor. + +-spec init([]) -> {'ok', {{'simple_one_for_one', 0, 1}, [{ + any(), {atom() | tuple(), atom(), 'undefined' | [any()]}, + 'permanent' | 'temporary' | 'transient', + 'brutal_kill' | 'infinity' | non_neg_integer(), + 'supervisor' | 'worker', + 'dynamic' | [atom() | tuple()]}] +}}. +init([]) -> + {ok, {{simple_one_for_one, 0, 1}, [{?MODULE, {?MODULE, start_protocol, []}, + temporary, brutal_kill, worker, [?MODULE]}]}}. diff --git a/src/ranch_listener.erl b/src/ranch_listener.erl new file mode 100644 index 0000000..fbcb87a --- /dev/null +++ b/src/ranch_listener.erl @@ -0,0 +1,224 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc Public API for managing listeners. +-module(ranch_listener). +-behaviour(gen_server). + +-export([start_link/2, stop/1, + add_connection/4, move_connection/3, remove_connection/2, check_upgrades/2, + get_protocol_options/1, set_protocol_options/2]). %% API. +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). %% gen_server. + +-type pools() :: [{atom(), non_neg_integer()}]. + +-record(state, { + conn_pools = [] :: pools(), + conns_table :: ets:tid(), + queue = undefined :: queue(), + max_conns = undefined :: non_neg_integer(), + proto_opts :: any(), + proto_opts_vsn = 1 :: non_neg_integer() +}). + +%% API. + +%% @private +%% +%% We set the process priority to high because ranch_listener is the central +%% gen_server in Ranch and is used to manage all the incoming connections. +%% Setting the process priority to high ensures the connection-related code +%% will always be executed when a connection needs it, allowing Ranch to +%% scale far beyond what it would with a normal priority. +-spec start_link(non_neg_integer(), any()) -> {ok, pid()}. +start_link(MaxConns, ProtoOpts) -> + gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], + [{spawn_opt, [{priority, high}]}]). + +%% @private +-spec stop(pid()) -> stopped. +stop(ServerPid) -> + gen_server:call(ServerPid, stop). + +%% @doc Add a connection to the given pool in the listener. +%% +%% Pools of connections are used to restrict the maximum number of connections +%% depending on their type. By default, Ranch add all connections to the +%% pool default. It also checks for the maximum number of connections +%% in that pool before accepting again. This function only returns when there +%% is free space in the pool. +%% +%% When a process managing a connection dies, the process is removed from the +%% pool. If the socket has been sent to another process, it is up to the +%% protocol code to inform the listener of the new ConnPid by removing +%% the previous and adding the new one. +%% +%% This function also returns whether the protocol options have been modified. +%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of +%% the atom 'ok'. The acceptor can then continue with the new protocol options. +-spec add_connection(pid(), atom(), pid(), non_neg_integer()) + -> ok | {upgrade, any(), non_neg_integer()}. +add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> + gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, + infinity). + +%% @doc Move a connection from one pool to another. +-spec move_connection(pid(), atom(), pid()) -> ok. +move_connection(ServerPid, DestPool, ConnPid) -> + gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). + +%% @doc Remove the given connection from its pool. +-spec remove_connection(pid(), pid()) -> ok. +remove_connection(ServerPid, ConnPid) -> + gen_server:cast(ServerPid, {remove_connection, ConnPid}). + +%% @doc Return whether a protocol upgrade is required. +-spec check_upgrades(pid(), non_neg_integer()) + -> ok | {upgrade, any(), non_neg_integer()}. +check_upgrades(ServerPid, OptsVsn) -> + gen_server:call(ServerPid, {check_upgrades, OptsVsn}). + +%% @doc Return the current protocol options. +-spec get_protocol_options(pid()) -> {ok, any()}. +get_protocol_options(ServerPid) -> + gen_server:call(ServerPid, get_protocol_options). + +%% @doc Upgrade the protocol options. +-spec set_protocol_options(pid(), any()) -> ok. +set_protocol_options(ServerPid, ProtoOpts) -> + gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). + +%% gen_server. + +%% @private +-spec init(list()) -> {ok, #state{}}. +init([MaxConns, ProtoOpts]) -> + ConnsTable = ets:new(connections_table, [set, private]), + Queue = queue:new(), + {ok, #state{conns_table=ConnsTable, max_conns=MaxConns, + proto_opts=ProtoOpts, queue=Queue}}. + +%% @private +-spec handle_call(_, _, State) + -> {reply, ignored, State} | {stop, normal, stopped, State}. +handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ + conn_pools=Pools, conns_table=ConnsTable, + queue=Queue, max_conns=MaxConns, + proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> + {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ConnsTable), + State2 = State#state{conn_pools=Pools2}, + if AccOptsVsn =/= LisOptsVsn -> + {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; + NbConns > MaxConns -> + Queue2 = queue:in(From, Queue), + {noreply, State2#state{queue=Queue2}}; + true -> + {reply, ok, State2} + end; +handle_call({check_upgrades, AccOptsVsn}, _From, State=#state{ + proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> + if AccOptsVsn =/= LisOptsVsn -> + {reply, {upgrade, ProtoOpts, LisOptsVsn}, State}; + true -> + {reply, ok, State} + end; +handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> + {reply, {ok, ProtoOpts}, State}; +handle_call({set_protocol_options, ProtoOpts}, _From, + State=#state{proto_opts_vsn=OptsVsn}) -> + {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; +handle_call(stop, _From, State) -> + {stop, normal, stopped, State}; +handle_call(_, _From, State) -> + {reply, ignored, State}. + +%% @private +-spec handle_cast(_, State) -> {noreply, State}. +handle_cast({move_connection, DestPool, ConnPid}, State=#state{ + conn_pools=Pools, conns_table=ConnsTable}) -> + Pools2 = move_pid(ConnPid, DestPool, Pools, ConnsTable), + {noreply, State#state{conn_pools=Pools2}}; +handle_cast({remove_connection, ConnPid}, State=#state{ + conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> + {Pools2, Queue2} = remove_pid(ConnPid, Pools, ConnsTable, Queue), + {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%% @private +-spec handle_info(_, State) -> {noreply, State}. +handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ + conn_pools=Pools, conns_table=ConnsTable, queue=Queue}) -> + {Pools2, Queue2} = remove_pid(Pid, Pools, ConnsTable, Queue), + {noreply, State#state{conn_pools=Pools2, queue=Queue2}}; +handle_info(_Info, State) -> + {noreply, State}. + +%% @private +-spec terminate(_, _) -> ok. +terminate(_Reason, _State) -> + ok. + +%% @private +-spec code_change(_, State, _) -> {ok, State}. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% Internal. + +%% @private +-spec add_pid(pid(), atom(), pools(), ets:tid()) + -> {non_neg_integer(), pools()}. +add_pid(ConnPid, Pool, Pools, ConnsTable) -> + MonitorRef = erlang:monitor(process, ConnPid), + ConnPid ! {shoot, self()}, + {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of + false -> + {1, [{Pool, 1}|Pools]}; + {Pool, NbConns} -> + NbConns2 = NbConns + 1, + {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} + end, + ets:insert(ConnsTable, {ConnPid, {MonitorRef, Pool}}), + {NbConnsRet, Pools2}. + +%% @private +-spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). +move_pid(ConnPid, DestPool, Pools, ConnsTable) -> + {MonitorRef, SrcPool} = ets:lookup_element(ConnsTable, ConnPid, 2), + ets:insert(ConnsTable, {ConnPid, {MonitorRef, DestPool}}), + {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), + DestNbConns = case lists:keyfind(DestPool, 1, Pools) of + false -> 1; + {DestPool, NbConns} -> NbConns + 1 + end, + Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), + [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. + +%% @private +-spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. +remove_pid(Pid, Pools, ConnsTable, Queue) -> + {MonitorRef, Pool} = ets:lookup_element(ConnsTable, Pid, 2), + erlang:demonitor(MonitorRef, [flush]), + {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), + Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], + ets:delete(ConnsTable, Pid), + case queue:out(Queue) of + {{value, Client}, Queue2} -> + gen_server:reply(Client, ok), + {Pools2, Queue2}; + _ -> + {Pools2, Queue} + end. diff --git a/src/ranch_listener_sup.erl b/src/ranch_listener_sup.erl new file mode 100644 index 0000000..c33194f --- /dev/null +++ b/src/ranch_listener_sup.erl @@ -0,0 +1,46 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(ranch_listener_sup). +-behaviour(supervisor). + +-export([start_link/5]). %% API. +-export([init/1]). %% supervisor. + +%% API. + +-spec start_link(non_neg_integer(), module(), any(), module(), any()) + -> {ok, pid()}. +start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> + MaxConns = proplists:get_value(max_connections, TransOpts, 1024), + {ok, SupPid} = supervisor:start_link(?MODULE, []), + {ok, ListenerPid} = supervisor:start_child(SupPid, + {ranch_listener, {ranch_listener, start_link, [MaxConns, ProtoOpts]}, + permanent, 5000, worker, [ranch_listener]}), + {ok, ConnsPid} = supervisor:start_child(SupPid, + {ranch_conns_sup, {ranch_conns_sup, start_link, []}, + permanent, 5000, supervisor, [ranch_conns_sup]}), + {ok, _PoolPid} = supervisor:start_child(SupPid, + {ranch_acceptors_sup, {ranch_acceptors_sup, start_link, [ + NbAcceptors, Transport, TransOpts, + Protocol, ProtoOpts, ListenerPid, ConnsPid + ]}, permanent, 5000, supervisor, [ranch_acceptors_sup]}), + {ok, SupPid}. + +%% supervisor. + +-spec init([]) -> {ok, {{one_for_all, 10, 10}, []}}. +init([]) -> + {ok, {{one_for_all, 10, 10}, []}}. diff --git a/src/ranch_ssl.erl b/src/ranch_ssl.erl new file mode 100644 index 0000000..10cba53 --- /dev/null +++ b/src/ranch_ssl.erl @@ -0,0 +1,170 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc SSL transport API. +%% +%% Wrapper around ssl implementing the Ranch transport API. +%% +%% This transport requires the crypto, public_key +%% and ssl applications to be started. If they aren't started, +%% it will try to start them itself before opening a port to listen. +%% Applications aren't stopped when the listening socket is closed, though. +%% +%% @see ssl +-module(ranch_ssl). +-export([name/0, messages/0, listen/1, accept/2, recv/3, send/2, setopts/2, + controlling_process/2, peername/1, close/1, sockname/1]). + +%% @doc Name of this transport API, ssl. +-spec name() -> ssl. +name() -> ssl. + +%% @doc Atoms used in the process messages sent by this API. +%% +%% They identify incoming data, closed connection and errors when receiving +%% data in active mode. +-spec messages() -> {ssl, ssl_closed, ssl_error}. +messages() -> {ssl, ssl_closed, ssl_error}. + +%% @doc Setup a socket to listen on the given port on the local host. +%% +%% The available options are: +%%
+%%
port
Mandatory. TCP port number to open.
+%%
backlog
Maximum length of the pending connections queue. +%% Defaults to 1024.
+%%
ip
Interface to listen on. Listen on all interfaces +%% by default.
+%%
certfile
Mandatory. Path to a file containing the user's +%% certificate.
+%%
keyfile
Optional. Path to the file containing the user's +%% private PEM encoded key.
+%%
cacertfile
Optional. Path to file containing PEM encoded +%% CA certificates (trusted certificates used for verifying a peer +%% certificate).
+%%
password
Optional. String containing the user's password. +%% All private keyfiles must be password protected currently.
+%%
ciphers
Optional. The cipher suites that should be supported. +%% The function ssl:cipher_suites/0 can be used to find all available +%% ciphers.
+%%
+%% +%% @see ssl:listen/2 +-spec listen([{port, inet:port_number()} | {certfile, string()} + | {keyfile, string()} | {password, string()} + | {cacertfile, string()} | {ip, inet:ip_address()}]) + -> {ok, ssl:sslsocket()} | {error, atom()}. +listen(Opts) -> + require([crypto, public_key, ssl]), + {port, Port} = lists:keyfind(port, 1, Opts), + Backlog = proplists:get_value(backlog, Opts, 1024), + {certfile, CertFile} = lists:keyfind(certfile, 1, Opts), + + ListenOpts0 = [binary, {active, false}, + {backlog, Backlog}, {packet, raw}, {reuseaddr, true}, + {certfile, CertFile}], + ListenOpts = lists:foldl(fun + ({ip, _} = Ip, Acc) -> [Ip | Acc]; + ({keyfile, _} = KeyFile, Acc) -> [KeyFile | Acc]; + ({cacertfile, _} = CACertFile, Acc) -> [CACertFile | Acc]; + ({password, _} = Password, Acc) -> [Password | Acc]; + ({ciphers, _} = Ciphers, Acc) -> [Ciphers | Acc]; + (_, Acc) -> Acc + end, ListenOpts0, Opts), + ssl:listen(Port, ListenOpts). + +%% @doc Accept an incoming connection on a listen socket. +%% +%% Note that this function does both the transport accept and +%% the SSL handshake. +%% +%% @see ssl:transport_accept/2 +%% @see ssl:ssl_accept/2 +-spec accept(ssl:sslsocket(), timeout()) + -> {ok, ssl:sslsocket()} | {error, closed | timeout | atom()}. +accept(LSocket, Timeout) -> + case ssl:transport_accept(LSocket, Timeout) of + {ok, CSocket} -> + ssl_accept(CSocket, Timeout); + {error, Reason} -> + {error, Reason} + end. + +%% @doc Receive a packet from a socket in passive mode. +%% @see ssl:recv/3 +-spec recv(ssl:sslsocket(), non_neg_integer(), timeout()) + -> {ok, any()} | {error, closed | atom()}. +recv(Socket, Length, Timeout) -> + ssl:recv(Socket, Length, Timeout). + +%% @doc Send a packet on a socket. +%% @see ssl:send/2 +-spec send(ssl:sslsocket(), iolist()) -> ok | {error, atom()}. +send(Socket, Packet) -> + ssl:send(Socket, Packet). + +%% @doc Set one or more options for a socket. +%% @see ssl:setopts/2 +-spec setopts(ssl:sslsocket(), list()) -> ok | {error, atom()}. +setopts(Socket, Opts) -> + ssl:setopts(Socket, Opts). + +%% @doc Assign a new controlling process Pid to Socket. +%% @see ssl:controlling_process/2 +-spec controlling_process(ssl:sslsocket(), pid()) + -> ok | {error, closed | not_owner | atom()}. +controlling_process(Socket, Pid) -> + ssl:controlling_process(Socket, Pid). + +%% @doc Return the address and port for the other end of a connection. +%% @see ssl:peername/1 +-spec peername(ssl:sslsocket()) + -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. +peername(Socket) -> + ssl:peername(Socket). + +%% @doc Close a TCP socket. +%% @see ssl:close/1 +-spec close(ssl:sslsocket()) -> ok. +close(Socket) -> + ssl:close(Socket). + +%% @doc Get the local address and port of a socket +%% @see ssl:sockname/1 +-spec sockname(ssl:sslsocket()) + -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. +sockname(Socket) -> + ssl:sockname(Socket). + +%% Internal. + +-spec require(list(module())) -> ok. +require([]) -> + ok; +require([App|Tail]) -> + case application:start(App) of + ok -> ok; + {error, {already_started, App}} -> ok + end, + require(Tail). + +-spec ssl_accept(ssl:sslsocket(), timeout()) + -> {ok, ssl:sslsocket()} | {error, closed | timeout | atom()}. +ssl_accept(Socket, Timeout) -> + case ssl:ssl_accept(Socket, Timeout) of + ok -> + {ok, Socket}; + {error, Reason} -> + {error, Reason} + end. diff --git a/src/ranch_sup.erl b/src/ranch_sup.erl new file mode 100644 index 0000000..942522d --- /dev/null +++ b/src/ranch_sup.erl @@ -0,0 +1,40 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(ranch_sup). +-behaviour(supervisor). + +-export([start_link/0]). %% API. +-export([init/1]). %% supervisor. + +-define(SUPERVISOR, ?MODULE). + +%% API. + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + +%% supervisor. + +-spec init([]) -> {'ok', {{'one_for_one', 10, 10}, [{ + any(), {atom() | tuple(), atom(), 'undefined' | [any()]}, + 'permanent' | 'temporary' | 'transient', + 'brutal_kill' | 'infinity' | non_neg_integer(), + 'supervisor' | 'worker', + 'dynamic' | [atom() | tuple()]}] +}}. +init([]) -> + {ok, {{one_for_one, 10, 10}, []}}. diff --git a/src/ranch_tcp.erl b/src/ranch_tcp.erl new file mode 100644 index 0000000..5c2a61d --- /dev/null +++ b/src/ranch_tcp.erl @@ -0,0 +1,113 @@ +%% Copyright (c) 2011-2012, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc TCP transport API. +%% +%% Wrapper around gen_tcp implementing the Ranch transport API. +%% +%% @see gen_tcp +-module(ranch_tcp). + +-export([name/0, messages/0, listen/1, accept/2, recv/3, send/2, setopts/2, + controlling_process/2, peername/1, close/1, sockname/1]). + +%% @doc Name of this transport API, tcp. +-spec name() -> tcp. +name() -> tcp. + +%% @doc Atoms used in the process messages sent by this API. +%% +%% They identify incoming data, closed connection and errors when receiving +%% data in active mode. +-spec messages() -> {tcp, tcp_closed, tcp_error}. +messages() -> {tcp, tcp_closed, tcp_error}. + +%% @doc Setup a socket to listen on the given port on the local host. +%% +%% The available options are: +%%
+%%
port
Mandatory. TCP port number to open.
+%%
backlog
Maximum length of the pending connections queue. +%% Defaults to 1024.
+%%
ip
Interface to listen on. Listen on all interfaces +%% by default.
+%%
+%% +%% @see gen_tcp:listen/2 +-spec listen([{port, inet:port_number()} | {ip, inet:ip_address()}]) + -> {ok, inet:socket()} | {error, atom()}. +listen(Opts) -> + {port, Port} = lists:keyfind(port, 1, Opts), + Backlog = proplists:get_value(backlog, Opts, 1024), + ListenOpts0 = [binary, {active, false}, + {backlog, Backlog}, {packet, raw}, {reuseaddr, true}], + ListenOpts = + case lists:keyfind(ip, 1, Opts) of + false -> ListenOpts0; + Ip -> [Ip|ListenOpts0] + end, + gen_tcp:listen(Port, ListenOpts). + +%% @doc Accept an incoming connection on a listen socket. +%% @see gen_tcp:accept/2 +-spec accept(inet:socket(), timeout()) + -> {ok, inet:socket()} | {error, closed | timeout | atom()}. +accept(LSocket, Timeout) -> + gen_tcp:accept(LSocket, Timeout). + +%% @doc Receive a packet from a socket in passive mode. +%% @see gen_tcp:recv/3 +-spec recv(inet:socket(), non_neg_integer(), timeout()) + -> {ok, any()} | {error, closed | atom()}. +recv(Socket, Length, Timeout) -> + gen_tcp:recv(Socket, Length, Timeout). + +%% @doc Send a packet on a socket. +%% @see gen_tcp:send/2 +-spec send(inet:socket(), iolist()) -> ok | {error, atom()}. +send(Socket, Packet) -> + gen_tcp:send(Socket, Packet). + +%% @doc Set one or more options for a socket. +%% @see inet:setopts/2 +-spec setopts(inet:socket(), list()) -> ok | {error, atom()}. +setopts(Socket, Opts) -> + inet:setopts(Socket, Opts). + +%% @doc Assign a new controlling process Pid to Socket. +%% @see gen_tcp:controlling_process/2 +-spec controlling_process(inet:socket(), pid()) + -> ok | {error, closed | not_owner | atom()}. +controlling_process(Socket, Pid) -> + gen_tcp:controlling_process(Socket, Pid). + +%% @doc Return the address and port for the other end of a connection. +%% @see inet:peername/1 +-spec peername(inet:socket()) + -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. +peername(Socket) -> + inet:peername(Socket). + +%% @doc Close a TCP socket. +%% @see gen_tcp:close/1 +-spec close(inet:socket()) -> ok. +close(Socket) -> + gen_tcp:close(Socket). + +%% @doc Get the local address and port of a socket +%% @see inet:sockname/1 +-spec sockname(inet:socket()) + -> {ok, {inet:ip_address(), inet:port_number()}} | {error, atom()}. +sockname(Socket) -> + inet:sockname(Socket). -- cgit v1.2.3