aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-07-16 18:21:48 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit017565203f40860d24b80a54136a160aee460dbe (patch)
tree7dd8d8a9426cd918a1d41db7dcd99bcb7cc35a51 /lib/kernel
parent8de18e84deaed4c9e6e7242ae2550fc6618dc44d (diff)
downloadotp-017565203f40860d24b80a54136a160aee460dbe.tar.gz
otp-017565203f40860d24b80a54136a160aee460dbe.tar.bz2
otp-017565203f40860d24b80a54136a160aee460dbe.zip
[socket-nif] Add support for multiple acceptor processes
Its now possible to have multiple (simultaneous) acceptor processes for the same listening socket. OTP-14831
Diffstat (limited to 'lib/kernel')
-rw-r--r--lib/kernel/test/socket_server.erl313
1 files changed, 184 insertions, 129 deletions
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index d9bbf00e85..8a77b9b3c9 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -28,10 +28,12 @@
-define(LIB, socket_lib).
--record(manager, {peek, acceptor, handler_id, handlers}).
--record(acceptor, {socket, manager}).
+-record(manager, {socket, peek, acceptors, handler_id, handlers}).
+-record(acceptor, {id, socket, manager}).
-record(handler, {socket, peek, type, manager}).
+-define(NUM_ACCEPTORS, 5).
+
start() ->
start_tcp().
@@ -83,17 +85,13 @@ manager_reply(Pid, Ref, Reply) ->
manager_init(Domain, stream = Type, Proto, Peek) ->
put(sname, "manager"),
- i("try start acceptor"),
- case acceptor_start(Domain, Type, Proto) of
- {ok, {Pid, MRef}} ->
- i("acceptor started"),
- manager_loop(#manager{peek = Peek,
- acceptor = {Pid, MRef},
- handler_id = 1,
- handlers = []});
- {error, Reason} ->
- exit({failed_starting_acceptor, Reason})
- end;
+ i("try start acceptor(s)"),
+ {Sock, Acceptors} = manager_stream_init(Domain, Type, Proto),
+ manager_loop(#manager{socket = Sock,
+ peek = Peek,
+ acceptors = Acceptors,
+ handler_id = 1,
+ handlers = []});
manager_init(Domain, dgram = Type, Proto, Peek) ->
put(sname, "manager"),
i("try open socket"),
@@ -142,7 +140,7 @@ manager_init(Domain, dgram = Type, Proto, Peek) ->
handler_continue(Pid),
manager_loop(#manager{peek = Peek,
handler_id = 2, % Just in case
- handlers = [{Pid, MRef, 1}]});
+ handlers = [{1, Pid, MRef}]});
{error, SReason} ->
e("Failed starting handler: "
"~n ~p", [SReason]),
@@ -155,35 +153,142 @@ manager_init(Domain, dgram = Type, Proto, Peek) ->
end.
+manager_stream_init(Domain, Type, Proto) ->
+ i("try (socket) open"),
+ Sock = case socket:open(Domain, Type, Proto) of
+ {ok, S} ->
+ S;
+ {error, OReason} ->
+ throw({open, OReason})
+ end,
+ F = fun(X) -> case socket:getopt(Sock, socket, X) of
+ {ok, V} -> f("~p", [V]);
+ {error, R} -> f("error: ~p", [R])
+ end
+ end,
+ i("(socket) open (~s,~s,~s): "
+ "~n debug: ~s"
+ "~n prio: ~s"
+ "~n => try find (local) address",
+ [F(domain), F(type), F(protocol), F(debug), F(priority)]),
+ Addr = which_addr(Domain),
+ SA = #{family => Domain,
+ addr => Addr},
+ i("found: "
+ "~n ~p"
+ "~n => try (socket) bind", [Addr]),
+ %% ok = socket:setopt(Sock, otp, debug, true),
+ %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!!
+ Port = case socket:bind(Sock, SA) of
+ {ok, P} ->
+ %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!!
+ %% ok = socket:setopt(Sock, otp, debug, false),
+ P;
+ {error, BReason} ->
+ throw({bind, BReason})
+ end,
+ i("bound to: "
+ "~n ~p"
+ "~n => try (socket) listen (acceptconn: ~s)",
+ [Port, F(acceptconn)]),
+ case socket:listen(Sock) of
+ ok ->
+ i("listening (acceptconn: ~s)",
+ [F(acceptconn)]),
+ manager_stream_init(Sock, 1, ?NUM_ACCEPTORS, []);
+ {error, LReason} ->
+ throw({listen, LReason})
+ end.
+
+which_addr(Domain) ->
+ Iflist = case inet:getifaddrs() of
+ {ok, IFL} ->
+ IFL;
+ {error, Reason} ->
+ throw({inet,getifaddrs,Reason})
+ end,
+ which_addr(Domain, Iflist).
+
+which_addr(_Domain, []) ->
+ throw(no_address);
+which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
+ which_addr2(Domain, IFO);
+which_addr(Domain, [_|IFL]) ->
+ which_addr(Domain, IFL).
+
+which_addr2(_, []) ->
+ throw(no_address);
+which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
+ Addr;
+which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
+ Addr;
+which_addr2(Domain, [_|IFO]) ->
+ which_addr2(Domain, IFO).
+
+
+manager_stream_init(Sock, ID, NumAcceptors, Acc)
+ when (NumAcceptors > 0) ->
+ i("try start acceptor"),
+ case acceptor_start(Sock, ID) of
+ {ok, {Pid, MRef}} ->
+ i("acceptor ~w (~p) started", [ID, Pid]),
+ ?LIB:sleep(5000),
+ manager_stream_init(Sock, ID+1, NumAcceptors-1,
+ [{ID, Pid, MRef}|Acc]);
+ {error, Reason} ->
+ exit({failed_starting_acceptor, Reason})
+ end;
+manager_stream_init(Sock, _ID, 0, Acc) ->
+ %% Req = {kill_acceptor, length(Acc)}, % Last in the queue
+ %% Req = {kill_acceptor, 3}, % In the "middle" of the queue
+ %% Req = {kill_acceptor, 2}, % The first in the queue
+ %% Req = {kill_acceptor, 1}, % Current acceptor
+ %% Msg = {manager, self(), make_ref(), Req},
+ %% erlang:send_after(timer:seconds(10), self(), Msg),
+ {Sock, lists:reverse(Acc)}.
+
+
manager_loop(M) ->
receive
{'DOWN', MRef, process, Pid, Reason} ->
M2 = manager_handle_down(M, MRef, Pid, Reason),
manager_loop(M2);
-
+
{manager, Pid, Ref, Request} ->
M2 = manager_handle_request(M, Pid, Ref, Request),
manager_loop(M2)
end.
-manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason)
- when (Reason =/= normal) ->
- e("acceptor died: "
- "~n ~p", [Reason]),
- exit({acceptor_died, Reason});
-manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) ->
- exit(Reason);
-manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) ->
- if
- (Reason =/= normal) ->
- e("handler ~p died: "
- "~n ~p", [Pid, Reason]);
- true ->
- i("handler ~p terminated", [Pid])
- end,
- Handlers2 = lists:keydelete(Pid, 1, Handlers),
- M#manager{handlers = Handlers2}.
+manager_handle_down(#manager{acceptors = Acceptors,
+ handlers = Handlers} = M, MRef, Pid, Reason) ->
+ case lists:keysearch(Pid, 2, Acceptors) of
+ {value, {ID, Pid, MRef}} when (Reason =:= normal) ->
+ i("acceptor ~w exited (normally)", [ID]),
+ case lists:keydelete(Pid, 2, Acceptors) of
+ [] ->
+ %% We are done
+ i("the last acceptor - we are done"),
+ exit(normal);
+ Acceptors2 ->
+ M#manager{acceptors = Acceptors2}
+ end;
+ {value, {ID, Pid, MRef}} ->
+ e("acceptor ~w crashed: "
+ "~n ~p", [ID, Reason]),
+ exit({acceptor_died, Reason});
+
+ false -> %% handler!
+ if
+ (Reason =/= normal) ->
+ e("handler ~p died: "
+ "~n ~p", [Pid, Reason]);
+ true ->
+ i("handler ~p terminated", [Pid])
+ end,
+ Handlers2 = lists:keydelete(Pid, 2, Handlers),
+ M#manager{handlers = Handlers2}
+ end.
manager_handle_request(#manager{peek = Peek,
@@ -196,7 +301,7 @@ manager_handle_request(#manager{peek = Peek,
i("handler ~w started", [HID]),
manager_reply(Pid, Ref, {ok, HPid}),
M#manager{handler_id = HID+1,
- handlers = [{HPid, HMRef, HID}|Handlers]};
+ handlers = [{HID, HPid, HMRef}|Handlers]};
{error, Reason} = ERROR ->
e("Failed starting new handler: "
"~n Sock: ~p"
@@ -204,21 +309,50 @@ manager_handle_request(#manager{peek = Peek,
manager_reply(Pid, Ref, ERROR),
M
end;
-manager_handle_request(#manager{acceptor = {Pid, MRef},
- handlers = Handlers}, Pid, Ref,
+manager_handle_request(#manager{socket = Sock,
+ acceptors = [{AID, APid, AMRef}]} = M, _Pid, _Ref,
+ {kill_acceptor, AID}) ->
+ i("try kill (only remeining) acceptor ~w", [AID]),
+ socket:setopt(Sock, otp, debug, true),
+ manager_stop_acceptor(APid, AMRef, AID, kill),
+ M#manager{acceptors = []};
+manager_handle_request(#manager{socket = Sock,
+ acceptors = Acceptors} = M, _Pid, _Ref,
+ {kill_acceptor, AID}) ->
+ i("try kill acceptor ~w", [AID]),
+ case lists:keysearch(AID, 1, Acceptors) of
+ {value, {AID, APid, AMRef}} ->
+ socket:setopt(Sock, otp, debug, true),
+ manager_stop_acceptor(APid, AMRef, AID, kill),
+ Acceptors2 = lists:keydelete(AID, 1, Acceptors),
+ M#manager{acceptors = Acceptors2};
+ false ->
+ e("no such acceptor"),
+ M
+ end;
+manager_handle_request(#manager{acceptors = Acceptors,
+ handlers = Handlers}, Pid, Ref,
{stop, Reason}) ->
i("stop"),
manager_reply(Pid, Ref, ok),
manager_stop_handlers(Handlers, Reason),
- i("try stop acceptor ~p: ~p", [Pid, Reason]),
- erlang:demonitor(MRef, [flush]),
- acceptor_stop(Pid, Reason),
- i("stop", []),
+ manager_stop_acceptors(Acceptors, Reason),
+ i("stopped", []),
exit(Reason).
+manager_stop_acceptors(Acceptors, Reason) ->
+ lists:foreach(fun({ID,P,M}) ->
+ manager_stop_acceptor(P, M, ID, Reason)
+ end, Acceptors).
+
+manager_stop_acceptor(Pid, MRef, ID, Reason) ->
+ i("try stop acceptor ~w (~p): ~p", [ID, Pid, Reason]),
+ erlang:demonitor(MRef, [flush]),
+ acceptor_stop(Pid, Reason),
+ ok.
manager_stop_handlers(Handlers, Reason) ->
- lists:foreach(fun({P,M,ID}) ->
+ lists:foreach(fun({ID,P,M}) ->
manager_stop_handler(P, M, ID, Reason)
end, Handlers).
@@ -232,10 +366,10 @@ manager_stop_handler(Pid, MRef, ID, Reason) ->
%% =========================================================================
-acceptor_start(Domain, Type, Proto) ->
+acceptor_start(Sock, ID) ->
Self = self(),
A = {Pid, _} = spawn_monitor(fun() ->
- acceptor_init(Self, Domain, Type, Proto)
+ acceptor_init(Self, Sock, ID)
end),
receive
{acceptor, Pid, ok} ->
@@ -258,93 +392,12 @@ acceptor_stop(Pid, _Reason) ->
%% reply(acceptor, Pid, Ref, Reply).
-acceptor_init(Manager, Domain, Type, Proto) ->
- put(sname, "acceptor"),
- try acceptor_do_init(Domain, Type, Proto) of
- Sock ->
- Manager ! {acceptor, self(), ok},
- acceptor_loop(#acceptor{manager = Manager,
- socket = Sock})
- catch
- throw:E:P ->
- e("Failed initiate: "
- "~n Error: ~p"
- "~n Path: ~p", [E, P]),
- Manager ! {acceptor, self(), {error, {catched, E, P}}}
- end.
-
-acceptor_do_init(Domain, Type, Proto) ->
- i("try (socket) open"),
- Sock = case socket:open(Domain, Type, Proto) of
- {ok, S} ->
- S;
- {error, OReason} ->
- throw({open, OReason})
- end,
- F = fun(X) -> case socket:getopt(Sock, socket, X) of
- {ok, V} -> f("~p", [V]);
- {error, R} -> f("error: ~p", [R])
- end
- end,
- i("(socket) open (~s,~s,~s): "
- "~n debug: ~s"
- "~n prio: ~s"
- "~n => try find (local) address",
- [F(domain), F(type), F(protocol), F(debug), F(priority)]),
- Addr = which_addr(Domain),
- SA = #{family => Domain,
- addr => Addr},
- i("found: "
- "~n ~p"
- "~n => try (socket) bind", [Addr]),
- %% ok = socket:setopt(Sock, otp, debug, true),
- %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!!
- Port = case socket:bind(Sock, SA) of
- {ok, P} ->
- %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!!
- %% ok = socket:setopt(Sock, otp, debug, false),
- P;
- {error, BReason} ->
- throw({bind, BReason})
- end,
- i("bound to: "
- "~n ~p"
- "~n => try (socket) listen (acceptconn: ~s)",
- [Port, F(acceptconn)]),
- case socket:listen(Sock) of
- ok ->
- i("listening (acceptconn: ~s)",
- [F(acceptconn)]),
- Sock;
- {error, LReason} ->
- throw({listen, LReason})
- end.
-
-which_addr(Domain) ->
- Iflist = case inet:getifaddrs() of
- {ok, IFL} ->
- IFL;
- {error, Reason} ->
- throw({inet,getifaddrs,Reason})
- end,
- which_addr(Domain, Iflist).
-
-which_addr(_Domain, []) ->
- throw(no_address);
-which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
- which_addr2(Domain, IFO);
-which_addr(Domain, [_|IFL]) ->
- which_addr(Domain, IFL).
-
-which_addr2(_, []) ->
- throw(no_address);
-which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
- Addr;
-which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
- Addr;
-which_addr2(Domain, [_|IFO]) ->
- which_addr2(Domain, IFO).
-
+acceptor_init(Manager, Sock, ID) ->
+ put(sname, f("acceptor[~w]", [ID])),
+ Manager ! {acceptor, self(), ok},
+ acceptor_loop(#acceptor{id = ID,
+ manager = Manager,
+ socket = Sock}).
acceptor_loop(#acceptor{socket = LSock} = A) ->
i("try accept"),
@@ -600,6 +653,8 @@ send(#handler{socket = Sock, type = dgram}, Msg, Dest) ->
f(F, A) ->
?LIB:f(F, A).
+e(F) ->
+ e(F, []).
e(F, A) ->
?LIB:e(F, A).