diff options
author | Micael Karlberg <[email protected]> | 2018-07-03 15:46:30 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 14:50:18 +0200 |
commit | dce68cf27f2dd1721bd316594a29ff99a0de7bb9 (patch) | |
tree | 8c0f28a0480628a2c7796628a376c43b50e2e0b2 | |
parent | 24be0729fe3a1ccfd5f0713b565463d6557d8aa7 (diff) | |
download | otp-dce68cf27f2dd1721bd316594a29ff99a0de7bb9.tar.gz otp-dce68cf27f2dd1721bd316594a29ff99a0de7bb9.tar.bz2 otp-dce68cf27f2dd1721bd316594a29ff99a0de7bb9.zip |
[socket-nif] Restructure of the socket (test) server
-rw-r--r-- | lib/kernel/test/socket_server.erl | 301 |
1 files changed, 252 insertions, 49 deletions
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index 64bd6396e4..dde605b624 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -13,7 +13,9 @@ -define(REQ, 0). -define(REP, 1). --record(handler, {socket, parent}). +-record(manager, {acceptor, handler_id, handlers}). +-record(acceptor, {socket, manager}). +-record(handler, {socket, manager}). start() -> start_tcp(). @@ -23,17 +25,169 @@ start_tcp() -> start(Domain, Type, Proto) -> put(sname, "starter"), - try do_init(Domain, Type, Proto) of + i("try start manager"), + {Pid, MRef} = manager_start(Domain, Type, Proto), + i("manager (~p) started", [Pid]), + loop(Pid, MRef). + +loop(Pid, MRef) -> + receive + {'DOWN', MRef, process, Pid, Reason} -> + i("manager process exited: " + "~n ~p", [Reason]), + ok + end. + + +%% ========================================================================= + +manager_start(Domain, Type, Proto) -> + spawn_monitor(fun() -> manager_init(Domain, Type, Proto) end). + +manager_start_handler(Pid, Sock) -> + manager_request(Pid, {start_handler, Sock}). + +manager_stop(Pid, Reason) -> + manager_request(Pid, {stop, Reason}). + +manager_request(Pid, Request) -> + request(manager, Pid, Request). + +manager_reply(Pid, Ref, Reply) -> + reply(manager, Pid, Ref, Reply). + + +manager_init(Domain, Type, Proto) -> + put(sname, "manager"), + i("try start acceptor"), + case acceptor_start(Domain, Type, Proto) of + {ok, {Pid, MRef}} -> + i("acceptor started"), + manager_loop(#manager{acceptor = {Pid, MRef}, + handler_id = 1, + handlers = []}); + {error, Reason} -> + exit({failed_starting_acceptor, Reason}) + end. + +manager_loop(M) -> + receive + {'DOWN', MRef, process, Pid, Reason} -> + M2 = manager_handle_down(M, MRef, Pid, Reason), + manager_loop(M2); + + {manager, Pid, Ref, Request} -> + M2 = manager_handle_request(M, Pid, Ref, Request), + manager_loop(M2) + end. + + +manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) + when (Reason =/= normal) -> + e("acceptor died: " + "~n ~p", [Reason]), + exit({acceptor_died, Reason}); +manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) -> + exit(Reason); +manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) -> + if + (Reason =/= normal) -> + e("handler ~p died: " + "~n ~p", [Pid, Reason]); + true -> + i("handler ~p terminated", [Pid]) + end, + Handlers2 = lists:keydelete(Pid, 1, Handlers), + M#manager{handlers = Handlers2}. + + +manager_handle_request(#manager{handler_id = HID, + handlers = Handlers} = M, Pid, Ref, + {start_handler, Sock}) -> + i("try start handler (~w)", [HID]), + case handler_start(HID, Sock) of + {ok, {HPid, HMRef}} -> + i("handler ~w started", [HID]), + manager_reply(Pid, Ref, {ok, HPid}), + M#manager{handler_id = HID+1, + handlers = [{HPid, HMRef, HID}|Handlers]}; + {error, Reason} = ERROR -> + e("Failed starting new handler: " + "~n Sock: ~p" + "~n Reason: ~p", [Sock, Reason]), + manager_reply(Pid, Ref, ERROR), + M + end; +manager_handle_request(#manager{acceptor = {Pid, MRef}, + handlers = Handlers}, Pid, Ref, + {stop, Reason}) -> + i("stop"), + manager_reply(Pid, Ref, ok), + manager_stop_handlers(Handlers, Reason), + i("try stop acceptor ~p: ~p", [Pid, Reason]), + erlang:demonitor(MRef, [flush]), + acceptor_stop(Pid, Reason), + i("stop", []), + exit(Reason). + + +manager_stop_handlers(Handlers, Reason) -> + lists:foreach(fun({P,M,ID}) -> + manager_stop_handler(P, M, ID, Reason) + end, Handlers). + +manager_stop_handler(Pid, MRef, ID, Reason) -> + i("try stop handler ~w (~p): ~p", [ID, Pid, Reason]), + erlang:demonitor(MRef, [flush]), + handler_stop(Pid, Reason), + ok. + + + +%% ========================================================================= + +acceptor_start(Domain, Type, Proto) -> + Self = self(), + A = {Pid, _} = spawn_monitor(fun() -> + acceptor_init(Self, Domain, Type, Proto) + end), + receive + {acceptor, Pid, ok} -> + {ok, A}; + {acceptor, Pid, {error, _} = Error} -> + exit(Pid, kill), % Just in case + Error; + {'DOWN', _MRef, process, Pid, Reason} -> + {error, {crashed, Reason}} + end. + +acceptor_stop(Pid, _Reason) -> + %% acceptor_request(Pid, {stop, Reason}). + exit(Pid, kill). + +%% acceptor_request(Pid, Request) -> +%% request(acceptor, Pid, Request). + +%% acceptor_reply(Pid, Ref, Reply) -> +%% reply(acceptor, Pid, Ref, Reply). + + +acceptor_init(Manager, Domain, Type, Proto) -> + put(sname, "acceptor"), + try acceptor_do_init(Domain, Type, Proto) of Sock -> - accept_loop(Sock) + Manager ! {acceptor, self(), ok}, + acceptor_loop(#acceptor{manager = Manager, + socket = Sock}) catch throw:E:P -> e("Failed initiate: " "~n Error: ~p" - "~n Path: ~p", [E, P]) + "~n Path: ~p", [E, P]), + Manager ! {acceptor, self(), {error, {catched, E, P}}} end. -do_init(Domain, Type, Proto) -> +acceptor_do_init(Domain, Type, Proto) -> i("try (socket) open"), Sock = case socket:open(Domain, Type, Proto) of {ok, S} -> @@ -41,18 +195,18 @@ do_init(Domain, Type, Proto) -> {error, OReason} -> throw({open, OReason}) end, - i("opened - now try find (local) address"), + i("(socket) open - try find (local) address"), Addr = which_addr(Domain), SA = #{family => Domain, addr => Addr}, - i("addr ~p - now try (socket) bind", [Addr]), + i("found (~p) - try (socket) bind", [Addr]), Port = case socket:bind(Sock, SA) of {ok, P} -> P; {error, BReason} -> throw({bind, BReason}) end, - i("bound to ~w - now try (socket) listen", [Port]), + i("bound (~w) - try (socket) listen", [Port]), case socket:listen(Sock) of ok -> Sock; @@ -84,11 +238,7 @@ which_addr2(Domain, [_|IFO]) -> which_addr2(Domain, IFO). -accept_loop(LSock) -> - put(sname, "acceptor"), - accept_loop(LSock, []). - -accept_loop(LSock, Handlers) -> +acceptor_loop(#acceptor{socket = LSock} = A) -> i("try accept"), case socket:accept(LSock, infinity) of {ok, Sock} -> @@ -96,14 +246,14 @@ accept_loop(LSock, Handlers) -> "~n ~p" "~nwhen" "~n ~p", [Sock, socket:info()]), - case handle_accept_success(Sock) of - {ok, Handler} -> - accept_loop(LSock, [Handler|Handlers]); - {error, HReason} -> + case acceptor_handle_accept_success(A, Sock) of + ok -> + acceptor_loop(A); + {error, Reason} -> e("Failed starting handler: " - "~n ~p", [HReason]), + "~n ~p", [Reason]), socket:close(Sock), - exit({failed_starting_handler, HReason}) + exit({failed_starting_handler, Reason}) end; {error, Reason} -> e("accept failure: " @@ -111,41 +261,75 @@ accept_loop(LSock, Handlers) -> exit({accept, Reason}) end. +acceptor_handle_accept_success(#acceptor{manager = Manager}, Sock) -> + i("try start handler"), + case manager_start_handler(Manager, Sock) of + {ok, Pid} -> + i("handler (~p) started - now change 'ownership'", [Pid]), + case socket:setopt(Sock, otp, controlling_process, Pid) of + ok -> + %% Normally we should have a msgs collection here + %% (of messages we receive before the control was + %% handled over to Handler), but since we don't + %% have active implemented yet... + i("new handler (~p) now controlling process", [Pid]), + handler_continue(Pid), + ok; + {error, _} = ERROR -> + exit(Pid, kill), + ERROR + end; + {error, Reason2} -> + e("failed starting handler: " + "~n (new) Socket: ~p" + "~n Reason: ~p", [Sock, Reason2]), + exit({failed_starting_handler, Reason2}) + end. -handle_accept_success(Sock) -> - Self = self(), - Handler = spawn_link(fun() -> handler_init(Self, Sock) end), - case socket:setopt(Sock, otp, controlling_process, Handler) of - ok -> - %% Normally we should have a msgs collection here - %% (of messages we receive before the control was - %% handled over to Handler), but since we don't - %% have active implemented yet... - handler_continue(Handler), - {ok, {Handler, Sock}}; - {error, _} = ERROR -> - exit(Handler, kill), + + +%% ========================================================================= + +handler_start(ID, Sock) -> + Self = self(), + H = {Pid, _} = spawn_monitor(fun() -> handler_init(Self, ID, Sock) end), + receive + {handler, Pid, ok} -> + {ok, H}; + {handler, Pid, {error, _} = ERROR} -> + exit(Pid, kill), % Just in case ERROR end. +handler_stop(Pid, _Reason) -> + %% handler_request(Pid, {stop, Reason}). + exit(Pid, kill). + +handler_continue(Pid) -> + handler_request(Pid, continue). + +handler_request(Pid, Request) -> + request(handler, Pid, Request). + +handler_reply(Pid, Ref, Reply) -> + reply(handler, Pid, Ref, Reply). + -handler_init(Parent, Socket) -> - put(sname, "handler"), +handler_init(Manager, ID, Sock) -> + put(sname, f("handler:~w", [ID])), + i("starting"), + Manager ! {handler, self(), ok}, receive - {handler, Parent, continue} -> - socket:setopt(Socket, otp, debug, true), - handler_loop(#handler{parent = Parent, - socket = Socket}) + {handler, Pid, Ref, continue} -> + i("continue"), + handler_reply(Pid, Ref, ok), + %% socket:setopt(Socket, otp, debug, true), + handler_loop(#handler{manager = Manager, + socket = Sock}) end. -handler_continue(Handler) -> - Handler ! {handler, self(), continue}. - handler_loop(#handler{socket = Socket} = H) -> - case socket:recv(Socket, 0) of - {ok, Msg} when (size(Msg) =:= 0) -> - i("received empty msg - hickup? - try again", []), - handler_loop(H); + case socket:recv(Socket) of {ok, Msg} -> i("received ~w bytes of data", [size(Msg)]), case dec_msg(Msg) of @@ -174,9 +358,10 @@ handler_loop(#handler{socket = Socket} = H) -> "~n ~p", [RReason]), exit({failed_reading_request, RReason}) end. - - -%% --- + + + +%% ========================================================================= enc_req_msg(N, Data) -> enc_msg(?REQ, N, Data). @@ -196,6 +381,21 @@ dec_msg(<<?REP:32/integer, N:32/integer, Data/binary>>) -> {reply, N, Data}. + +%% --- + +request(Tag, Pid, Request) -> + Ref = make_ref(), + Pid ! {Tag, self(), Ref, Request}, + receive + {Tag, Pid, Ref, Reply} -> + Reply + end. + +reply(Tag, Pid, Ref, Reply) -> + Pid ! {Tag, self(), Ref, Reply}. + + %% --- formated_timestamp() -> @@ -226,6 +426,9 @@ format_timestamp(N, N2T, FormatExtra, ArgsExtra) -> %% --- +f(F, A) -> + lists:flatten(io_lib:format(F, A)). + e(F, A) -> p("<ERROR> " ++ F, A). @@ -238,6 +441,6 @@ p(F, A) -> p(get(sname), F, A). p(SName, F, A) -> - io:format("[server:~s,~p][~s] " ++ F ++ "~n", + io:format("[~s,~p][~s] " ++ F ++ "~n", [SName,self(),formated_timestamp()|A]). |