%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2018-2018. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% -module(socket_server). -export([start/0, start_tcp/0, start_udp/0]). -define(LIB, socket_lib). -record(manager, {acceptor, handler_id, handlers}). -record(acceptor, {socket, manager}). -record(handler, {socket, type, manager}). start() -> start_tcp(). start_tcp() -> start(inet, stream, tcp). start_udp() -> start(inet, dgram, udp). start(Domain, Type, Proto) -> put(sname, "starter"), i("try start manager"), {Pid, MRef} = manager_start(Domain, Type, Proto), i("manager (~p) started", [Pid]), loop(Pid, MRef). loop(Pid, MRef) -> receive {'DOWN', MRef, process, Pid, Reason} -> i("manager process exited: " "~n ~p", [Reason]), ok end. %% ========================================================================= manager_start(Domain, Type, Proto) -> spawn_monitor(fun() -> manager_init(Domain, Type, Proto) end). manager_start_handler(Pid, Sock) -> manager_request(Pid, {start_handler, Sock}). manager_stop(Pid, Reason) -> manager_request(Pid, {stop, Reason}). manager_request(Pid, Request) -> ?LIB:request(manager, Pid, Request). manager_reply(Pid, Ref, Reply) -> ?LIB:reply(manager, Pid, Ref, Reply). manager_init(Domain, stream = Type, Proto) -> put(sname, "manager"), i("try start acceptor"), case acceptor_start(Domain, Type, Proto) of {ok, {Pid, MRef}} -> i("acceptor started"), manager_loop(#manager{acceptor = {Pid, MRef}, handler_id = 1, handlers = []}); {error, Reason} -> exit({failed_starting_acceptor, Reason}) end; manager_init(Domain, dgram = Type, Proto) -> put(sname, "manager"), i("try open socket"), case socket:open(Domain, Type, Proto) of {ok, Sock} -> Addr = which_addr(Domain), SA = #{family => Domain, addr => Addr}, case socket:bind(Sock, SA) of {ok, _P} -> ok; {error, BReason} -> throw({bind, BReason}) end, i("try start handler for" "~n ~p", [case socket:sockname(Sock) of {ok, Name} -> Name; {error, _} = E -> E end]), case handler_start(1, Sock) of {ok, {Pid, MRef}} -> i("handler (~p) started", [Pid]), handler_continue(Pid), manager_loop(#manager{handler_id = 2, % Just in case handlers = [{Pid, MRef, 1}]}); {error, SReason} -> e("Failed starting handler: " "~n ~p", [SReason]), exit({failed_start_handler, SReason}) end; {error, OReason} -> e("Failed open socket: " "~n ~p", [OReason]), exit({failed_open_socket, OReason}) end. manager_loop(M) -> receive {'DOWN', MRef, process, Pid, Reason} -> M2 = manager_handle_down(M, MRef, Pid, Reason), manager_loop(M2); {manager, Pid, Ref, Request} -> M2 = manager_handle_request(M, Pid, Ref, Request), manager_loop(M2) end. manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) when (Reason =/= normal) -> e("acceptor died: " "~n ~p", [Reason]), exit({acceptor_died, Reason}); manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) -> exit(Reason); manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) -> if (Reason =/= normal) -> e("handler ~p died: " "~n ~p", [Pid, Reason]); true -> i("handler ~p terminated", [Pid]) end, Handlers2 = lists:keydelete(Pid, 1, Handlers), M#manager{handlers = Handlers2}. manager_handle_request(#manager{handler_id = HID, handlers = Handlers} = M, Pid, Ref, {start_handler, Sock}) -> i("try start handler (~w)", [HID]), case handler_start(HID, Sock) of {ok, {HPid, HMRef}} -> i("handler ~w started", [HID]), manager_reply(Pid, Ref, {ok, HPid}), M#manager{handler_id = HID+1, handlers = [{HPid, HMRef, HID}|Handlers]}; {error, Reason} = ERROR -> e("Failed starting new handler: " "~n Sock: ~p" "~n Reason: ~p", [Sock, Reason]), manager_reply(Pid, Ref, ERROR), M end; manager_handle_request(#manager{acceptor = {Pid, MRef}, handlers = Handlers}, Pid, Ref, {stop, Reason}) -> i("stop"), manager_reply(Pid, Ref, ok), manager_stop_handlers(Handlers, Reason), i("try stop acceptor ~p: ~p", [Pid, Reason]), erlang:demonitor(MRef, [flush]), acceptor_stop(Pid, Reason), i("stop", []), exit(Reason). manager_stop_handlers(Handlers, Reason) -> lists:foreach(fun({P,M,ID}) -> manager_stop_handler(P, M, ID, Reason) end, Handlers). manager_stop_handler(Pid, MRef, ID, Reason) -> i("try stop handler ~w (~p): ~p", [ID, Pid, Reason]), erlang:demonitor(MRef, [flush]), handler_stop(Pid, Reason), ok. %% ========================================================================= acceptor_start(Domain, Type, Proto) -> Self = self(), A = {Pid, _} = spawn_monitor(fun() -> acceptor_init(Self, Domain, Type, Proto) end), receive {acceptor, Pid, ok} -> {ok, A}; {acceptor, Pid, {error, _} = Error} -> exit(Pid, kill), % Just in case Error; {'DOWN', _MRef, process, Pid, Reason} -> {error, {crashed, Reason}} end. acceptor_stop(Pid, _Reason) -> %% acceptor_request(Pid, {stop, Reason}). exit(Pid, kill). %% acceptor_request(Pid, Request) -> %% request(acceptor, Pid, Request). %% acceptor_reply(Pid, Ref, Reply) -> %% reply(acceptor, Pid, Ref, Reply). acceptor_init(Manager, Domain, Type, Proto) -> put(sname, "acceptor"), try acceptor_do_init(Domain, Type, Proto) of Sock -> Manager ! {acceptor, self(), ok}, acceptor_loop(#acceptor{manager = Manager, socket = Sock}) catch throw:E:P -> e("Failed initiate: " "~n Error: ~p" "~n Path: ~p", [E, P]), Manager ! {acceptor, self(), {error, {catched, E, P}}} end. acceptor_do_init(Domain, Type, Proto) -> i("try (socket) open"), Sock = case socket:open(Domain, Type, Proto) of {ok, S} -> S; {error, OReason} -> throw({open, OReason}) end, i("(socket) open - try find (local) address"), Addr = which_addr(Domain), SA = #{family => Domain, addr => Addr}, i("found (~p) - try (socket) bind", [Addr]), Port = case socket:bind(Sock, SA) of {ok, P} -> P; {error, BReason} -> throw({bind, BReason}) end, i("bound (~w) - try (socket) listen", [Port]), case socket:listen(Sock) of ok -> Sock; {error, LReason} -> throw({listen, LReason}) end. which_addr(Domain) -> Iflist = case inet:getifaddrs() of {ok, IFL} -> IFL; {error, Reason} -> throw({inet,getifaddrs,Reason}) end, which_addr(Domain, Iflist). which_addr(_Domain, []) -> throw(no_address); which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> which_addr2(Domain, IFO); which_addr(Domain, [_|IFL]) -> which_addr(Domain, IFL). which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> Addr; which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> Addr; which_addr2(Domain, [_|IFO]) -> which_addr2(Domain, IFO). acceptor_loop(#acceptor{socket = LSock} = A) -> i("try accept"), case socket:accept(LSock, infinity) of {ok, Sock} -> i("accepted: " "~n ~p" "~nwhen" "~n ~p", [Sock, socket:info()]), case acceptor_handle_accept_success(A, Sock) of ok -> acceptor_loop(A); {error, Reason} -> e("Failed starting handler: " "~n ~p", [Reason]), socket:close(Sock), exit({failed_starting_handler, Reason}) end; {error, Reason} -> e("accept failure: " "~n ~p", [Reason]), exit({accept, Reason}) end. acceptor_handle_accept_success(#acceptor{manager = Manager}, Sock) -> i("try start handler for peer" "~n ~p", [case socket:peername(Sock) of {ok, Peer} -> Peer; {error, _} = E -> E end]), case manager_start_handler(Manager, Sock) of {ok, Pid} -> i("handler (~p) started - now change 'ownership'", [Pid]), case socket:setopt(Sock, otp, controlling_process, Pid) of ok -> %% Normally we should have a msgs collection here %% (of messages we receive before the control was %% handled over to Handler), but since we don't %% have active implemented yet... i("new handler (~p) now controlling process", [Pid]), handler_continue(Pid), ok; {error, _} = ERROR -> exit(Pid, kill), ERROR end; {error, Reason2} -> e("failed starting handler: " "~n (new) Socket: ~p" "~n Reason: ~p", [Sock, Reason2]), exit({failed_starting_handler, Reason2}) end. %% ========================================================================= handler_start(ID, Sock) -> Self = self(), H = {Pid, _} = spawn_monitor(fun() -> handler_init(Self, ID, Sock) end), receive {handler, Pid, ok} -> {ok, H}; {handler, Pid, {error, _} = ERROR} -> exit(Pid, kill), % Just in case ERROR end. handler_stop(Pid, _Reason) -> %% handler_request(Pid, {stop, Reason}). exit(Pid, kill). handler_continue(Pid) -> handler_request(Pid, continue). handler_request(Pid, Request) -> ?LIB:request(handler, Pid, Request). handler_reply(Pid, Ref, Reply) -> ?LIB:reply(handler, Pid, Ref, Reply). handler_init(Manager, ID, Sock) -> put(sname, f("handler:~w", [ID])), i("starting"), Manager ! {handler, self(), ok}, receive {handler, Pid, Ref, continue} -> i("got continue"), handler_reply(Pid, Ref, ok), {ok, Type} = socket:getopt(Sock, socket, type), %% socket:setopt(Socket, otp, debug, true), handler_loop(#handler{manager = Manager, type = Type, socket = Sock}) end. handler_loop(H) -> i("try read message"), case recv(H) of {ok, {Source, Msg}} -> i("received ~w bytes of data~s", [size(Msg), case Source of undefined -> ""; _ -> f(" from:~n ~p", [Source]) end]), case ?LIB:dec_msg(Msg) of {request, N, Req} -> i("received request ~w: " "~n ~p", [N, Req]), Reply = ?LIB:enc_rep_msg(N, "hoppsan"), case send(H, Reply, Source) of ok -> i("successfully sent reply ~w", [N]), handler_loop(H); {error, SReason} -> e("failed sending reply ~w:" "~n ~p", [N, SReason]), exit({failed_sending_reply, SReason}) end end; {error, closed} -> i("closed when" "~n ~p", [socket:info()]), exit(normal); {error, RReason} -> e("failed reading request: " "~n ~p", [RReason]), exit({failed_reading_request, RReason}) end. recv(#handler{socket = Sock, type = stream}) -> case socket:recv(Sock) of {ok, Msg} -> {ok, {undefined, Msg}}; {error, _} = ERROR -> ERROR end; recv(#handler{socket = Sock, type = dgram}) -> %% ok = socket:setopt(Sock, otp, debug, true), socket:recvfrom(Sock). send(#handler{socket = Sock, type = stream}, Msg, _) -> socket:send(Sock, Msg); send(#handler{socket = Sock, type = dgram}, Msg, Dest) -> socket:sendto(Sock, Msg, Dest). %% ========================================================================= %% enc_req_msg(N, Data) -> %% enc_msg(?REQ, N, Data). %% enc_rep_msg(N, Data) -> %% enc_msg(?REP, N, Data). %% enc_msg(Type, N, Data) when is_list(Data) -> %% enc_msg(Type, N, list_to_binary(Data)); %% enc_msg(Type, N, Data) %% when is_integer(Type) andalso is_integer(N) andalso is_binary(Data) -> %% <>. %% dec_msg(<>) -> %% {request, N, Data}; %% dec_msg(<>) -> %% {reply, N, Data}. %% --- %% request(Tag, Pid, Request) -> %% Ref = make_ref(), %% Pid ! {Tag, self(), Ref, Request}, %% receive %% {Tag, Pid, Ref, Reply} -> %% Reply %% end. %% reply(Tag, Pid, Ref, Reply) -> %% Pid ! {Tag, self(), Ref, Reply}. %% --- %% formated_timestamp() -> %% format_timestamp(os:timestamp()). %% format_timestamp(Now) -> %% N2T = fun(N) -> calendar:now_to_local_time(N) end, %% format_timestamp(Now, N2T, true). %% format_timestamp({_N1, _N2, N3} = N, N2T, true) -> %% FormatExtra = ".~.2.0w", %% ArgsExtra = [N3 div 10000], %% format_timestamp(N, N2T, FormatExtra, ArgsExtra); %% format_timestamp({_N1, _N2, _N3} = N, N2T, false) -> %% FormatExtra = "", %% ArgsExtra = [], %% format_timestamp(N, N2T, FormatExtra, ArgsExtra). %% format_timestamp(N, N2T, FormatExtra, ArgsExtra) -> %% {Date, Time} = N2T(N), %% {YYYY,MM,DD} = Date, %% {Hour,Min,Sec} = Time, %% FormatDate = %% io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w" ++ FormatExtra, %% [YYYY, MM, DD, Hour, Min, Sec] ++ ArgsExtra), %% lists:flatten(FormatDate). %% --- f(F, A) -> ?LIB:f(F, A). e(F, A) -> ?LIB:e(F, A). i(F) -> ?LIB:i(F). i(F, A) -> ?LIB:i(F, A).