From 017565203f40860d24b80a54136a160aee460dbe Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 16 Jul 2018 18:21:48 +0200 Subject: [socket-nif] Add support for multiple acceptor processes Its now possible to have multiple (simultaneous) acceptor processes for the same listening socket. OTP-14831 --- lib/kernel/test/socket_server.erl | 313 ++++++++++++++++++++++---------------- 1 file changed, 184 insertions(+), 129 deletions(-) (limited to 'lib/kernel') diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index d9bbf00e85..8a77b9b3c9 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -28,10 +28,12 @@ -define(LIB, socket_lib). --record(manager, {peek, acceptor, handler_id, handlers}). --record(acceptor, {socket, manager}). +-record(manager, {socket, peek, acceptors, handler_id, handlers}). +-record(acceptor, {id, socket, manager}). -record(handler, {socket, peek, type, manager}). +-define(NUM_ACCEPTORS, 5). + start() -> start_tcp(). @@ -83,17 +85,13 @@ manager_reply(Pid, Ref, Reply) -> manager_init(Domain, stream = Type, Proto, Peek) -> put(sname, "manager"), - i("try start acceptor"), - case acceptor_start(Domain, Type, Proto) of - {ok, {Pid, MRef}} -> - i("acceptor started"), - manager_loop(#manager{peek = Peek, - acceptor = {Pid, MRef}, - handler_id = 1, - handlers = []}); - {error, Reason} -> - exit({failed_starting_acceptor, Reason}) - end; + i("try start acceptor(s)"), + {Sock, Acceptors} = manager_stream_init(Domain, Type, Proto), + manager_loop(#manager{socket = Sock, + peek = Peek, + acceptors = Acceptors, + handler_id = 1, + handlers = []}); manager_init(Domain, dgram = Type, Proto, Peek) -> put(sname, "manager"), i("try open socket"), @@ -142,7 +140,7 @@ manager_init(Domain, dgram = Type, Proto, Peek) -> handler_continue(Pid), manager_loop(#manager{peek = Peek, handler_id = 2, % Just in case - handlers = [{Pid, MRef, 1}]}); + handlers = [{1, Pid, MRef}]}); {error, SReason} -> e("Failed starting handler: " "~n ~p", [SReason]), @@ -155,35 +153,142 @@ manager_init(Domain, dgram = Type, Proto, Peek) -> end. +manager_stream_init(Domain, Type, Proto) -> + i("try (socket) open"), + Sock = case socket:open(Domain, Type, Proto) of + {ok, S} -> + S; + {error, OReason} -> + throw({open, OReason}) + end, + F = fun(X) -> case socket:getopt(Sock, socket, X) of + {ok, V} -> f("~p", [V]); + {error, R} -> f("error: ~p", [R]) + end + end, + i("(socket) open (~s,~s,~s): " + "~n debug: ~s" + "~n prio: ~s" + "~n => try find (local) address", + [F(domain), F(type), F(protocol), F(debug), F(priority)]), + Addr = which_addr(Domain), + SA = #{family => Domain, + addr => Addr}, + i("found: " + "~n ~p" + "~n => try (socket) bind", [Addr]), + %% ok = socket:setopt(Sock, otp, debug, true), + %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!! + Port = case socket:bind(Sock, SA) of + {ok, P} -> + %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!! + %% ok = socket:setopt(Sock, otp, debug, false), + P; + {error, BReason} -> + throw({bind, BReason}) + end, + i("bound to: " + "~n ~p" + "~n => try (socket) listen (acceptconn: ~s)", + [Port, F(acceptconn)]), + case socket:listen(Sock) of + ok -> + i("listening (acceptconn: ~s)", + [F(acceptconn)]), + manager_stream_init(Sock, 1, ?NUM_ACCEPTORS, []); + {error, LReason} -> + throw({listen, LReason}) + end. + +which_addr(Domain) -> + Iflist = case inet:getifaddrs() of + {ok, IFL} -> + IFL; + {error, Reason} -> + throw({inet,getifaddrs,Reason}) + end, + which_addr(Domain, Iflist). + +which_addr(_Domain, []) -> + throw(no_address); +which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> + which_addr2(Domain, IFO); +which_addr(Domain, [_|IFL]) -> + which_addr(Domain, IFL). + +which_addr2(_, []) -> + throw(no_address); +which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> + Addr; +which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> + Addr; +which_addr2(Domain, [_|IFO]) -> + which_addr2(Domain, IFO). + + +manager_stream_init(Sock, ID, NumAcceptors, Acc) + when (NumAcceptors > 0) -> + i("try start acceptor"), + case acceptor_start(Sock, ID) of + {ok, {Pid, MRef}} -> + i("acceptor ~w (~p) started", [ID, Pid]), + ?LIB:sleep(5000), + manager_stream_init(Sock, ID+1, NumAcceptors-1, + [{ID, Pid, MRef}|Acc]); + {error, Reason} -> + exit({failed_starting_acceptor, Reason}) + end; +manager_stream_init(Sock, _ID, 0, Acc) -> + %% Req = {kill_acceptor, length(Acc)}, % Last in the queue + %% Req = {kill_acceptor, 3}, % In the "middle" of the queue + %% Req = {kill_acceptor, 2}, % The first in the queue + %% Req = {kill_acceptor, 1}, % Current acceptor + %% Msg = {manager, self(), make_ref(), Req}, + %% erlang:send_after(timer:seconds(10), self(), Msg), + {Sock, lists:reverse(Acc)}. + + manager_loop(M) -> receive {'DOWN', MRef, process, Pid, Reason} -> M2 = manager_handle_down(M, MRef, Pid, Reason), manager_loop(M2); - + {manager, Pid, Ref, Request} -> M2 = manager_handle_request(M, Pid, Ref, Request), manager_loop(M2) end. -manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) - when (Reason =/= normal) -> - e("acceptor died: " - "~n ~p", [Reason]), - exit({acceptor_died, Reason}); -manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) -> - exit(Reason); -manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) -> - if - (Reason =/= normal) -> - e("handler ~p died: " - "~n ~p", [Pid, Reason]); - true -> - i("handler ~p terminated", [Pid]) - end, - Handlers2 = lists:keydelete(Pid, 1, Handlers), - M#manager{handlers = Handlers2}. +manager_handle_down(#manager{acceptors = Acceptors, + handlers = Handlers} = M, MRef, Pid, Reason) -> + case lists:keysearch(Pid, 2, Acceptors) of + {value, {ID, Pid, MRef}} when (Reason =:= normal) -> + i("acceptor ~w exited (normally)", [ID]), + case lists:keydelete(Pid, 2, Acceptors) of + [] -> + %% We are done + i("the last acceptor - we are done"), + exit(normal); + Acceptors2 -> + M#manager{acceptors = Acceptors2} + end; + {value, {ID, Pid, MRef}} -> + e("acceptor ~w crashed: " + "~n ~p", [ID, Reason]), + exit({acceptor_died, Reason}); + + false -> %% handler! + if + (Reason =/= normal) -> + e("handler ~p died: " + "~n ~p", [Pid, Reason]); + true -> + i("handler ~p terminated", [Pid]) + end, + Handlers2 = lists:keydelete(Pid, 2, Handlers), + M#manager{handlers = Handlers2} + end. manager_handle_request(#manager{peek = Peek, @@ -196,7 +301,7 @@ manager_handle_request(#manager{peek = Peek, i("handler ~w started", [HID]), manager_reply(Pid, Ref, {ok, HPid}), M#manager{handler_id = HID+1, - handlers = [{HPid, HMRef, HID}|Handlers]}; + handlers = [{HID, HPid, HMRef}|Handlers]}; {error, Reason} = ERROR -> e("Failed starting new handler: " "~n Sock: ~p" @@ -204,21 +309,50 @@ manager_handle_request(#manager{peek = Peek, manager_reply(Pid, Ref, ERROR), M end; -manager_handle_request(#manager{acceptor = {Pid, MRef}, - handlers = Handlers}, Pid, Ref, +manager_handle_request(#manager{socket = Sock, + acceptors = [{AID, APid, AMRef}]} = M, _Pid, _Ref, + {kill_acceptor, AID}) -> + i("try kill (only remeining) acceptor ~w", [AID]), + socket:setopt(Sock, otp, debug, true), + manager_stop_acceptor(APid, AMRef, AID, kill), + M#manager{acceptors = []}; +manager_handle_request(#manager{socket = Sock, + acceptors = Acceptors} = M, _Pid, _Ref, + {kill_acceptor, AID}) -> + i("try kill acceptor ~w", [AID]), + case lists:keysearch(AID, 1, Acceptors) of + {value, {AID, APid, AMRef}} -> + socket:setopt(Sock, otp, debug, true), + manager_stop_acceptor(APid, AMRef, AID, kill), + Acceptors2 = lists:keydelete(AID, 1, Acceptors), + M#manager{acceptors = Acceptors2}; + false -> + e("no such acceptor"), + M + end; +manager_handle_request(#manager{acceptors = Acceptors, + handlers = Handlers}, Pid, Ref, {stop, Reason}) -> i("stop"), manager_reply(Pid, Ref, ok), manager_stop_handlers(Handlers, Reason), - i("try stop acceptor ~p: ~p", [Pid, Reason]), - erlang:demonitor(MRef, [flush]), - acceptor_stop(Pid, Reason), - i("stop", []), + manager_stop_acceptors(Acceptors, Reason), + i("stopped", []), exit(Reason). +manager_stop_acceptors(Acceptors, Reason) -> + lists:foreach(fun({ID,P,M}) -> + manager_stop_acceptor(P, M, ID, Reason) + end, Acceptors). + +manager_stop_acceptor(Pid, MRef, ID, Reason) -> + i("try stop acceptor ~w (~p): ~p", [ID, Pid, Reason]), + erlang:demonitor(MRef, [flush]), + acceptor_stop(Pid, Reason), + ok. manager_stop_handlers(Handlers, Reason) -> - lists:foreach(fun({P,M,ID}) -> + lists:foreach(fun({ID,P,M}) -> manager_stop_handler(P, M, ID, Reason) end, Handlers). @@ -232,10 +366,10 @@ manager_stop_handler(Pid, MRef, ID, Reason) -> %% ========================================================================= -acceptor_start(Domain, Type, Proto) -> +acceptor_start(Sock, ID) -> Self = self(), A = {Pid, _} = spawn_monitor(fun() -> - acceptor_init(Self, Domain, Type, Proto) + acceptor_init(Self, Sock, ID) end), receive {acceptor, Pid, ok} -> @@ -258,93 +392,12 @@ acceptor_stop(Pid, _Reason) -> %% reply(acceptor, Pid, Ref, Reply). -acceptor_init(Manager, Domain, Type, Proto) -> - put(sname, "acceptor"), - try acceptor_do_init(Domain, Type, Proto) of - Sock -> - Manager ! {acceptor, self(), ok}, - acceptor_loop(#acceptor{manager = Manager, - socket = Sock}) - catch - throw:E:P -> - e("Failed initiate: " - "~n Error: ~p" - "~n Path: ~p", [E, P]), - Manager ! {acceptor, self(), {error, {catched, E, P}}} - end. - -acceptor_do_init(Domain, Type, Proto) -> - i("try (socket) open"), - Sock = case socket:open(Domain, Type, Proto) of - {ok, S} -> - S; - {error, OReason} -> - throw({open, OReason}) - end, - F = fun(X) -> case socket:getopt(Sock, socket, X) of - {ok, V} -> f("~p", [V]); - {error, R} -> f("error: ~p", [R]) - end - end, - i("(socket) open (~s,~s,~s): " - "~n debug: ~s" - "~n prio: ~s" - "~n => try find (local) address", - [F(domain), F(type), F(protocol), F(debug), F(priority)]), - Addr = which_addr(Domain), - SA = #{family => Domain, - addr => Addr}, - i("found: " - "~n ~p" - "~n => try (socket) bind", [Addr]), - %% ok = socket:setopt(Sock, otp, debug, true), - %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!! - Port = case socket:bind(Sock, SA) of - {ok, P} -> - %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!! - %% ok = socket:setopt(Sock, otp, debug, false), - P; - {error, BReason} -> - throw({bind, BReason}) - end, - i("bound to: " - "~n ~p" - "~n => try (socket) listen (acceptconn: ~s)", - [Port, F(acceptconn)]), - case socket:listen(Sock) of - ok -> - i("listening (acceptconn: ~s)", - [F(acceptconn)]), - Sock; - {error, LReason} -> - throw({listen, LReason}) - end. - -which_addr(Domain) -> - Iflist = case inet:getifaddrs() of - {ok, IFL} -> - IFL; - {error, Reason} -> - throw({inet,getifaddrs,Reason}) - end, - which_addr(Domain, Iflist). - -which_addr(_Domain, []) -> - throw(no_address); -which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> - which_addr2(Domain, IFO); -which_addr(Domain, [_|IFL]) -> - which_addr(Domain, IFL). - -which_addr2(_, []) -> - throw(no_address); -which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> - Addr; -which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> - Addr; -which_addr2(Domain, [_|IFO]) -> - which_addr2(Domain, IFO). - +acceptor_init(Manager, Sock, ID) -> + put(sname, f("acceptor[~w]", [ID])), + Manager ! {acceptor, self(), ok}, + acceptor_loop(#acceptor{id = ID, + manager = Manager, + socket = Sock}). acceptor_loop(#acceptor{socket = LSock} = A) -> i("try accept"), @@ -600,6 +653,8 @@ send(#handler{socket = Sock, type = dgram}, Msg, Dest) -> f(F, A) -> ?LIB:f(F, A). +e(F) -> + e(F, []). e(F, A) -> ?LIB:e(F, A). -- cgit v1.2.3