aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-07-03 15:46:30 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commitdce68cf27f2dd1721bd316594a29ff99a0de7bb9 (patch)
tree8c0f28a0480628a2c7796628a376c43b50e2e0b2
parent24be0729fe3a1ccfd5f0713b565463d6557d8aa7 (diff)
downloadotp-dce68cf27f2dd1721bd316594a29ff99a0de7bb9.tar.gz
otp-dce68cf27f2dd1721bd316594a29ff99a0de7bb9.tar.bz2
otp-dce68cf27f2dd1721bd316594a29ff99a0de7bb9.zip
[socket-nif] Restructure of the socket (test) server
-rw-r--r--lib/kernel/test/socket_server.erl301
1 files changed, 252 insertions, 49 deletions
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index 64bd6396e4..dde605b624 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -13,7 +13,9 @@
-define(REQ, 0).
-define(REP, 1).
--record(handler, {socket, parent}).
+-record(manager, {acceptor, handler_id, handlers}).
+-record(acceptor, {socket, manager}).
+-record(handler, {socket, manager}).
start() ->
start_tcp().
@@ -23,17 +25,169 @@ start_tcp() ->
start(Domain, Type, Proto) ->
put(sname, "starter"),
- try do_init(Domain, Type, Proto) of
+ i("try start manager"),
+ {Pid, MRef} = manager_start(Domain, Type, Proto),
+ i("manager (~p) started", [Pid]),
+ loop(Pid, MRef).
+
+loop(Pid, MRef) ->
+ receive
+ {'DOWN', MRef, process, Pid, Reason} ->
+ i("manager process exited: "
+ "~n ~p", [Reason]),
+ ok
+ end.
+
+
+%% =========================================================================
+
+manager_start(Domain, Type, Proto) ->
+ spawn_monitor(fun() -> manager_init(Domain, Type, Proto) end).
+
+manager_start_handler(Pid, Sock) ->
+ manager_request(Pid, {start_handler, Sock}).
+
+manager_stop(Pid, Reason) ->
+ manager_request(Pid, {stop, Reason}).
+
+manager_request(Pid, Request) ->
+ request(manager, Pid, Request).
+
+manager_reply(Pid, Ref, Reply) ->
+ reply(manager, Pid, Ref, Reply).
+
+
+manager_init(Domain, Type, Proto) ->
+ put(sname, "manager"),
+ i("try start acceptor"),
+ case acceptor_start(Domain, Type, Proto) of
+ {ok, {Pid, MRef}} ->
+ i("acceptor started"),
+ manager_loop(#manager{acceptor = {Pid, MRef},
+ handler_id = 1,
+ handlers = []});
+ {error, Reason} ->
+ exit({failed_starting_acceptor, Reason})
+ end.
+
+manager_loop(M) ->
+ receive
+ {'DOWN', MRef, process, Pid, Reason} ->
+ M2 = manager_handle_down(M, MRef, Pid, Reason),
+ manager_loop(M2);
+
+ {manager, Pid, Ref, Request} ->
+ M2 = manager_handle_request(M, Pid, Ref, Request),
+ manager_loop(M2)
+ end.
+
+
+manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason)
+ when (Reason =/= normal) ->
+ e("acceptor died: "
+ "~n ~p", [Reason]),
+ exit({acceptor_died, Reason});
+manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) ->
+ exit(Reason);
+manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) ->
+ if
+ (Reason =/= normal) ->
+ e("handler ~p died: "
+ "~n ~p", [Pid, Reason]);
+ true ->
+ i("handler ~p terminated", [Pid])
+ end,
+ Handlers2 = lists:keydelete(Pid, 1, Handlers),
+ M#manager{handlers = Handlers2}.
+
+
+manager_handle_request(#manager{handler_id = HID,
+ handlers = Handlers} = M, Pid, Ref,
+ {start_handler, Sock}) ->
+ i("try start handler (~w)", [HID]),
+ case handler_start(HID, Sock) of
+ {ok, {HPid, HMRef}} ->
+ i("handler ~w started", [HID]),
+ manager_reply(Pid, Ref, {ok, HPid}),
+ M#manager{handler_id = HID+1,
+ handlers = [{HPid, HMRef, HID}|Handlers]};
+ {error, Reason} = ERROR ->
+ e("Failed starting new handler: "
+ "~n Sock: ~p"
+ "~n Reason: ~p", [Sock, Reason]),
+ manager_reply(Pid, Ref, ERROR),
+ M
+ end;
+manager_handle_request(#manager{acceptor = {Pid, MRef},
+ handlers = Handlers}, Pid, Ref,
+ {stop, Reason}) ->
+ i("stop"),
+ manager_reply(Pid, Ref, ok),
+ manager_stop_handlers(Handlers, Reason),
+ i("try stop acceptor ~p: ~p", [Pid, Reason]),
+ erlang:demonitor(MRef, [flush]),
+ acceptor_stop(Pid, Reason),
+ i("stop", []),
+ exit(Reason).
+
+
+manager_stop_handlers(Handlers, Reason) ->
+ lists:foreach(fun({P,M,ID}) ->
+ manager_stop_handler(P, M, ID, Reason)
+ end, Handlers).
+
+manager_stop_handler(Pid, MRef, ID, Reason) ->
+ i("try stop handler ~w (~p): ~p", [ID, Pid, Reason]),
+ erlang:demonitor(MRef, [flush]),
+ handler_stop(Pid, Reason),
+ ok.
+
+
+
+%% =========================================================================
+
+acceptor_start(Domain, Type, Proto) ->
+ Self = self(),
+ A = {Pid, _} = spawn_monitor(fun() ->
+ acceptor_init(Self, Domain, Type, Proto)
+ end),
+ receive
+ {acceptor, Pid, ok} ->
+ {ok, A};
+ {acceptor, Pid, {error, _} = Error} ->
+ exit(Pid, kill), % Just in case
+ Error;
+ {'DOWN', _MRef, process, Pid, Reason} ->
+ {error, {crashed, Reason}}
+ end.
+
+acceptor_stop(Pid, _Reason) ->
+ %% acceptor_request(Pid, {stop, Reason}).
+ exit(Pid, kill).
+
+%% acceptor_request(Pid, Request) ->
+%% request(acceptor, Pid, Request).
+
+%% acceptor_reply(Pid, Ref, Reply) ->
+%% reply(acceptor, Pid, Ref, Reply).
+
+
+acceptor_init(Manager, Domain, Type, Proto) ->
+ put(sname, "acceptor"),
+ try acceptor_do_init(Domain, Type, Proto) of
Sock ->
- accept_loop(Sock)
+ Manager ! {acceptor, self(), ok},
+ acceptor_loop(#acceptor{manager = Manager,
+ socket = Sock})
catch
throw:E:P ->
e("Failed initiate: "
"~n Error: ~p"
- "~n Path: ~p", [E, P])
+ "~n Path: ~p", [E, P]),
+ Manager ! {acceptor, self(), {error, {catched, E, P}}}
end.
-do_init(Domain, Type, Proto) ->
+acceptor_do_init(Domain, Type, Proto) ->
i("try (socket) open"),
Sock = case socket:open(Domain, Type, Proto) of
{ok, S} ->
@@ -41,18 +195,18 @@ do_init(Domain, Type, Proto) ->
{error, OReason} ->
throw({open, OReason})
end,
- i("opened - now try find (local) address"),
+ i("(socket) open - try find (local) address"),
Addr = which_addr(Domain),
SA = #{family => Domain,
addr => Addr},
- i("addr ~p - now try (socket) bind", [Addr]),
+ i("found (~p) - try (socket) bind", [Addr]),
Port = case socket:bind(Sock, SA) of
{ok, P} ->
P;
{error, BReason} ->
throw({bind, BReason})
end,
- i("bound to ~w - now try (socket) listen", [Port]),
+ i("bound (~w) - try (socket) listen", [Port]),
case socket:listen(Sock) of
ok ->
Sock;
@@ -84,11 +238,7 @@ which_addr2(Domain, [_|IFO]) ->
which_addr2(Domain, IFO).
-accept_loop(LSock) ->
- put(sname, "acceptor"),
- accept_loop(LSock, []).
-
-accept_loop(LSock, Handlers) ->
+acceptor_loop(#acceptor{socket = LSock} = A) ->
i("try accept"),
case socket:accept(LSock, infinity) of
{ok, Sock} ->
@@ -96,14 +246,14 @@ accept_loop(LSock, Handlers) ->
"~n ~p"
"~nwhen"
"~n ~p", [Sock, socket:info()]),
- case handle_accept_success(Sock) of
- {ok, Handler} ->
- accept_loop(LSock, [Handler|Handlers]);
- {error, HReason} ->
+ case acceptor_handle_accept_success(A, Sock) of
+ ok ->
+ acceptor_loop(A);
+ {error, Reason} ->
e("Failed starting handler: "
- "~n ~p", [HReason]),
+ "~n ~p", [Reason]),
socket:close(Sock),
- exit({failed_starting_handler, HReason})
+ exit({failed_starting_handler, Reason})
end;
{error, Reason} ->
e("accept failure: "
@@ -111,41 +261,75 @@ accept_loop(LSock, Handlers) ->
exit({accept, Reason})
end.
+acceptor_handle_accept_success(#acceptor{manager = Manager}, Sock) ->
+ i("try start handler"),
+ case manager_start_handler(Manager, Sock) of
+ {ok, Pid} ->
+ i("handler (~p) started - now change 'ownership'", [Pid]),
+ case socket:setopt(Sock, otp, controlling_process, Pid) of
+ ok ->
+ %% Normally we should have a msgs collection here
+ %% (of messages we receive before the control was
+ %% handled over to Handler), but since we don't
+ %% have active implemented yet...
+ i("new handler (~p) now controlling process", [Pid]),
+ handler_continue(Pid),
+ ok;
+ {error, _} = ERROR ->
+ exit(Pid, kill),
+ ERROR
+ end;
+ {error, Reason2} ->
+ e("failed starting handler: "
+ "~n (new) Socket: ~p"
+ "~n Reason: ~p", [Sock, Reason2]),
+ exit({failed_starting_handler, Reason2})
+ end.
-handle_accept_success(Sock) ->
- Self = self(),
- Handler = spawn_link(fun() -> handler_init(Self, Sock) end),
- case socket:setopt(Sock, otp, controlling_process, Handler) of
- ok ->
- %% Normally we should have a msgs collection here
- %% (of messages we receive before the control was
- %% handled over to Handler), but since we don't
- %% have active implemented yet...
- handler_continue(Handler),
- {ok, {Handler, Sock}};
- {error, _} = ERROR ->
- exit(Handler, kill),
+
+
+%% =========================================================================
+
+handler_start(ID, Sock) ->
+ Self = self(),
+ H = {Pid, _} = spawn_monitor(fun() -> handler_init(Self, ID, Sock) end),
+ receive
+ {handler, Pid, ok} ->
+ {ok, H};
+ {handler, Pid, {error, _} = ERROR} ->
+ exit(Pid, kill), % Just in case
ERROR
end.
+handler_stop(Pid, _Reason) ->
+ %% handler_request(Pid, {stop, Reason}).
+ exit(Pid, kill).
+
+handler_continue(Pid) ->
+ handler_request(Pid, continue).
+
+handler_request(Pid, Request) ->
+ request(handler, Pid, Request).
+
+handler_reply(Pid, Ref, Reply) ->
+ reply(handler, Pid, Ref, Reply).
+
-handler_init(Parent, Socket) ->
- put(sname, "handler"),
+handler_init(Manager, ID, Sock) ->
+ put(sname, f("handler:~w", [ID])),
+ i("starting"),
+ Manager ! {handler, self(), ok},
receive
- {handler, Parent, continue} ->
- socket:setopt(Socket, otp, debug, true),
- handler_loop(#handler{parent = Parent,
- socket = Socket})
+ {handler, Pid, Ref, continue} ->
+ i("continue"),
+ handler_reply(Pid, Ref, ok),
+ %% socket:setopt(Socket, otp, debug, true),
+ handler_loop(#handler{manager = Manager,
+ socket = Sock})
end.
-handler_continue(Handler) ->
- Handler ! {handler, self(), continue}.
-
handler_loop(#handler{socket = Socket} = H) ->
- case socket:recv(Socket, 0) of
- {ok, Msg} when (size(Msg) =:= 0) ->
- i("received empty msg - hickup? - try again", []),
- handler_loop(H);
+ case socket:recv(Socket) of
{ok, Msg} ->
i("received ~w bytes of data", [size(Msg)]),
case dec_msg(Msg) of
@@ -174,9 +358,10 @@ handler_loop(#handler{socket = Socket} = H) ->
"~n ~p", [RReason]),
exit({failed_reading_request, RReason})
end.
-
-
-%% ---
+
+
+
+%% =========================================================================
enc_req_msg(N, Data) ->
enc_msg(?REQ, N, Data).
@@ -196,6 +381,21 @@ dec_msg(<<?REP:32/integer, N:32/integer, Data/binary>>) ->
{reply, N, Data}.
+
+%% ---
+
+request(Tag, Pid, Request) ->
+ Ref = make_ref(),
+ Pid ! {Tag, self(), Ref, Request},
+ receive
+ {Tag, Pid, Ref, Reply} ->
+ Reply
+ end.
+
+reply(Tag, Pid, Ref, Reply) ->
+ Pid ! {Tag, self(), Ref, Reply}.
+
+
%% ---
formated_timestamp() ->
@@ -226,6 +426,9 @@ format_timestamp(N, N2T, FormatExtra, ArgsExtra) ->
%% ---
+f(F, A) ->
+ lists:flatten(io_lib:format(F, A)).
+
e(F, A) ->
p("<ERROR> " ++ F, A).
@@ -238,6 +441,6 @@ p(F, A) ->
p(get(sname), F, A).
p(SName, F, A) ->
- io:format("[server:~s,~p][~s] " ++ F ++ "~n",
+ io:format("[~s,~p][~s] " ++ F ++ "~n",
[SName,self(),formated_timestamp()|A]).