From 4abd27adb55f8c2d2f29629c5d4645b126481252 Mon Sep 17 00:00:00 2001 From: juhlig Date: Fri, 10 May 2019 15:50:13 +0200 Subject: Add experimental num_listen_sockets option --- src/ranch.erl | 1 + src/ranch_acceptors_sup.erl | 46 +++++++++++++++----- test/acceptor_SUITE.erl | 104 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 138 insertions(+), 13 deletions(-) diff --git a/src/ranch.erl b/src/ranch.erl index 4198997..fc6a3c8 100644 --- a/src/ranch.erl +++ b/src/ranch.erl @@ -53,6 +53,7 @@ logger => module(), num_acceptors => pos_integer(), num_conns_sups => pos_integer(), + num_listen_sockets => pos_integer(), shutdown => timeout() | brutal_kill, socket_opts => any() }. diff --git a/src/ranch_acceptors_sup.erl b/src/ranch_acceptors_sup.erl index f96b19b..8da93e3 100644 --- a/src/ranch_acceptors_sup.erl +++ b/src/ranch_acceptors_sup.erl @@ -26,27 +26,51 @@ start_link(Ref, NumAcceptors, Transport) -> init([Ref, NumAcceptors, Transport]) -> TransOpts = ranch_server:get_transport_options(Ref), Logger = maps:get(logger, TransOpts, error_logger), + NumListenSockets = maps:get(num_listen_sockets, TransOpts, 1), SocketOpts = maps:get(socket_opts, TransOpts, []), %% We temporarily put the logger in the process dictionary %% so that it can be used from ranch:filter_options. The %% interface as it currently is does not allow passing it %% down otherwise. put(logger, Logger), - LSocket = case Transport:listen(SocketOpts) of + LSockets = start_listen_sockets(Ref, NumListenSockets, Transport, SocketOpts, Logger), + erase(logger), + Procs = [begin + LSocketId = (AcceptorId rem NumListenSockets) + 1, + {_, LSocket} = lists:keyfind(LSocketId, 1, LSockets), + { + {acceptor, self(), AcceptorId}, + {ranch_acceptor, start_link, [Ref, AcceptorId, LSocket, Transport, Logger]}, + permanent, brutal_kill, worker, [] + } + end || AcceptorId <- lists:seq(1, NumAcceptors)], + {ok, {{one_for_one, 1, 5}, Procs}}. + +-spec start_listen_sockets(any(), pos_integer(), module(), list(), module()) + -> [{pos_integer(), inet:socket()}]. +start_listen_sockets(Ref, NumListenSockets, Transport, SocketOpts0, Logger) when NumListenSockets > 0 -> + BaseSocket = start_listen_socket(Ref, Transport, SocketOpts0, Logger), + {ok, Addr={_, Port}} = Transport:sockname(BaseSocket), + ranch_server:set_addr(Ref, Addr), + SocketOpts = case lists:keyfind(port, 1, SocketOpts0) of + {port, Port} -> + SocketOpts0; + _ -> + [{port, Port}|lists:keydelete(port, 1, SocketOpts0)] + end, + ExtraSockets = [ + {N, start_listen_socket(Ref, Transport, SocketOpts, Logger)} + || N <- lists:seq(2, NumListenSockets)], + [{1, BaseSocket}|ExtraSockets]. + +-spec start_listen_socket(any(), module(), list(), module()) -> inet:socket(). +start_listen_socket(Ref, Transport, SocketOpts, Logger) -> + case Transport:listen(SocketOpts) of {ok, Socket} -> - erase(logger), Socket; {error, Reason} -> listen_error(Ref, Transport, SocketOpts, Reason, Logger) - end, - {ok, Addr} = Transport:sockname(LSocket), - ranch_server:set_addr(Ref, Addr), - Procs = [ - {{acceptor, self(), N}, {ranch_acceptor, start_link, [ - Ref, N, LSocket, Transport, Logger - ]}, permanent, brutal_kill, worker, []} - || N <- lists:seq(1, NumAcceptors)], - {ok, {{one_for_one, 1, 5}, Procs}}. + end. -spec listen_error(any(), module(), any(), atom(), module()) -> no_return(). listen_error(Ref, Transport, SocketOpts0, Reason, Logger) -> diff --git a/test/acceptor_SUITE.erl b/test/acceptor_SUITE.erl index 0b59855..41939b0 100644 --- a/test/acceptor_SUITE.erl +++ b/test/acceptor_SUITE.erl @@ -41,6 +41,8 @@ groups() -> tcp_getopts_capability, tcp_getstat_capability, tcp_upgrade, + tcp_10_acceptors_10_listen_sockets, + tcp_many_listen_sockets_no_reuseport, tcp_error_eaddrinuse, tcp_error_eacces ]}, {ssl, [ @@ -53,6 +55,8 @@ groups() -> ssl_upgrade_from_tcp, ssl_getopts_capability, ssl_getstat_capability, + ssl_10_acceptors_10_listen_sockets, + ssl_many_listen_sockets_no_reuseport, ssl_error_eaddrinuse, ssl_error_no_cert, ssl_error_eacces @@ -399,6 +403,50 @@ ssl_accept_error(_) -> true = is_process_alive(AcceptorPid), ok = ranch:stop_listener(Name). +ssl_10_acceptors_10_listen_sockets(_) -> + case do_os_supports_reuseport() of + true -> + ok = do_ssl_10_acceptors_10_listen_sockets(); + false -> + {skip, "No SO_REUSEPORT support."} + end. + +do_ssl_10_acceptors_10_listen_sockets() -> + doc("Ensure that we can use 10 listen sockets across 10 acceptors with SSL."), + Name = name(), + Opts = ct_helper:get_certs_from_ets(), + {ok, ListenerSupPid} = ranch:start_listener(Name, + ranch_ssl, #{ + num_acceptors => 10, + num_listen_sockets => 10, + socket_opts => [{raw, 1, 15, <<1:32/native>>}|Opts]}, + echo_protocol, []), + 10 = length(do_get_listener_sockets(ListenerSupPid)), + ok = ranch:stop_listener(Name), + {'EXIT', _} = begin catch ranch:get_port(Name) end, + ok. + +ssl_many_listen_sockets_no_reuseport(_) -> + case do_os_supports_reuseport() of + true -> + ok = do_ssl_many_listen_sockets_no_reuseport(); + false -> + {skip, "No SO_REUSEPORT support."} + end. + +do_ssl_many_listen_sockets_no_reuseport() -> + doc("Confirm that ranch:start_listener/5 fails when SO_REUSEPORT is not available with SSL."), + Name = name(), + Opts = ct_helper:get_certs_from_ets(), + {error, eaddrinuse} = ranch:start_listener(Name, + ranch_ssl, #{ + num_acceptors => 10, + num_listen_sockets => 10, + socket_opts => [{raw, 1, 15, <<0:32/native>>}|Opts]}, + echo_protocol, []), + {'EXIT', _} = begin catch ranch:get_port(Name) end, + ok. + ssl_active_echo(_) -> doc("Ensure that active mode works with SSL transport."), Name = name(), @@ -622,6 +670,48 @@ ssl_error_eacces(_) -> %% tcp. +tcp_10_acceptors_10_listen_sockets(_) -> + case do_os_supports_reuseport() of + true -> + ok = do_tcp_10_acceptors_10_listen_sockets(); + false -> + {skip, "No SO_REUSEPORT support."} + end. + +do_tcp_10_acceptors_10_listen_sockets() -> + doc("Ensure that we can use 10 listen sockets across 10 acceptors with TCP."), + Name = name(), + {ok, ListenerSupPid} = ranch:start_listener(Name, + ranch_tcp, #{ + num_acceptors => 10, + num_listen_sockets => 10, + socket_opts => [{raw, 1, 15, <<1:32/native>>}]}, + echo_protocol, []), + 10 = length(do_get_listener_sockets(ListenerSupPid)), + ok = ranch:stop_listener(Name), + {'EXIT', _} = begin catch ranch:get_port(Name) end, + ok. + +tcp_many_listen_sockets_no_reuseport(_) -> + case do_os_supports_reuseport() of + true -> + ok = do_tcp_many_listen_sockets_no_reuseport(); + false -> + {skip, "No SO_REUSEPORT support."} + end. + +do_tcp_many_listen_sockets_no_reuseport() -> + doc("Confirm that ranch:start_listener/5 fails when SO_REUSEPORT is not available with TCP."), + Name = name(), + {error, eaddrinuse} = ranch:start_listener(Name, + ranch_tcp, #{ + num_acceptors => 10, + num_listen_sockets => 10, + socket_opts => [{raw, 1, 15, <<0:32/native>>}]}, + echo_protocol, []), + {'EXIT', _} = begin catch ranch:get_port(Name) end, + ok. + tcp_active_echo(_) -> doc("Ensure that active mode works with TCP transport."), Name = name(), @@ -1246,11 +1336,14 @@ clean_traces() -> end. do_get_listener_socket(ListenerSupPid) -> + [LSocket] = do_get_listener_sockets(ListenerSupPid), + LSocket. + +do_get_listener_sockets(ListenerSupPid) -> [AcceptorsSupPid] = [Pid || {ranch_acceptors_sup, Pid, supervisor, _} <- supervisor:which_children(ListenerSupPid)], {links, Links} = erlang:process_info(AcceptorsSupPid, links), - [LSocket] = [P || P <- Links, is_port(P)], - LSocket. + [P || P <- Links, is_port(P)]. do_conns_which_children(Name) -> Conns = [supervisor:which_children(ConnsSup) || @@ -1273,3 +1366,10 @@ do_conns_count_children(Name) -> [supervisor:count_children(ConnsSup) || {_, ConnsSup} <- ranch_server:get_connections_sups(Name)] ). + +do_os_supports_reuseport() -> + case {os:type(), os:version()} of + {{unix, linux}, {Major, _, _}} when Major > 3 -> true; + {{unix, linux}, {3, Minor, _}} when Minor >= 9 -> true; + _ -> false + end. -- cgit v1.2.3