%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2018-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(socket_server).
-export([
start/0, start/4,
start_tcp/0, start_tcp/1, start_tcp/2,
start_tcp4/0, start_tcp4/1, start_tcp6/0, start_tcp6/1,
start_udp/0, start_udp/1, start_udp/2,
start_udp4/0, start_udp4/1, start_udp6/0, start_udp6/1,
start_sctp/0, start_sctp/1
]).
-define(LIB, socket_lib).
-record(manager, {socket, peek, acceptors, handler_id, handlers}).
-record(acceptor, {id, socket, manager}).
-record(handler, {socket, peek, type, manager}).
-define(NUM_ACCEPTORS, 5).
start() ->
start_tcp().
start_tcp() ->
start_tcp4().
start_tcp(Peek) ->
start_tcp4(Peek).
start_tcp4() ->
start_tcp4(false).
start_tcp4(Peek) ->
start_tcp(inet, Peek).
start_tcp6() ->
start_tcp6(false).
start_tcp6(Peek) ->
start_tcp(inet6, Peek).
start_tcp(Domain, Peek) when is_boolean(Peek) ->
start(Domain, stream, tcp, Peek).
start_udp() ->
start_udp4().
start_udp(Peek) ->
start_udp4(Peek).
start_udp4() ->
start_udp4(false).
start_udp4(Peek) ->
start_udp(inet, Peek).
start_udp6() ->
start_udp6(false).
start_udp6(Peek) ->
start_udp(inet6, Peek).
start_udp(Domain, Peek) when is_boolean(Peek) ->
start(Domain, dgram, udp, Peek).
start_sctp() ->
start_sctp(inet).
start_sctp(Domain) when ((Domain =:= inet) orelse (Domain =:= inet6)) ->
start(Domain, seqpacket, sctp, false).
start(Domain, Type, Proto, Peek) ->
put(sname, "starter"),
i("try start manager"),
{Pid, MRef} = manager_start(Domain, Type, Proto, Peek),
i("manager (~p) started", [Pid]),
loop(Pid, MRef).
loop(Pid, MRef) ->
receive
{'DOWN', MRef, process, Pid, Reason} ->
i("manager process exited: "
"~n ~p", [Reason]),
ok
end.
%% =========================================================================
manager_start(Domain, Type, Proto, Peek) ->
spawn_monitor(fun() -> manager_init(Domain, Type, Proto, Peek) end).
manager_start_handler(Pid, Sock) ->
manager_request(Pid, {start_handler, Sock}).
manager_stop(Pid, Reason) ->
manager_request(Pid, {stop, Reason}).
manager_request(Pid, Request) ->
?LIB:request(manager, Pid, Request).
manager_reply(Pid, Ref, Reply) ->
?LIB:reply(manager, Pid, Ref, Reply).
manager_init(Domain, Type, Proto, Peek) ->
put(sname, "manager"),
do_manager_init(Domain, Type, Proto, Peek).
do_manager_init(Domain, stream = Type, Proto, Peek) ->
i("try start acceptor(s)"),
{Sock, Acceptors} = manager_stream_init(Domain, Type, Proto),
manager_loop(#manager{socket = Sock,
peek = Peek,
acceptors = Acceptors,
handler_id = 1,
handlers = []});
do_manager_init(Domain, dgram = Type, Proto, Peek) ->
i("try open socket"),
case socket:open(Domain, Type, Proto) of
{ok, Sock} ->
F = fun(X) -> case socket:getopt(Sock, socket, X) of
{ok, V} -> f("~p", [V]);
{error, R} -> f("error: ~p", [R])
end
end,
i("socket opened (~s,~s,~s): "
"~n broadcast: ~s"
"~n dontroute: ~s"
"~n keepalive: ~s"
"~n reuseaddr: ~s"
"~n linger: ~s"
"~n debug: ~s"
"~n prio: ~s"
"~n rcvbuf: ~s"
"~n rcvtimeo: ~s"
"~n sndbuf: ~s"
"~n sndtimeo: ~s"
"~n => try find (local) address",
[F(domain), F(type), F(protocol),
F(broadcast), F(dontroute), F(keepalive), F(reuseaddr), F(linger),
F(debug), F(priority),
F(rcvbuf), F(rcvtimeo), F(sndbuf), F(sndtimeo)]),
Addr = which_addr(Domain),
SA = #{family => Domain,
addr => Addr},
i("try bind to: "
"~n ~p", [Addr]),
case socket:bind(Sock, SA) of
{ok, _P} ->
ok;
{error, BReason} ->
throw({bind, BReason})
end,
i("bound to: "
"~n ~s"
"~n => try start handler",
[case socket:sockname(Sock) of
{ok, Name} -> f("~p", [Name]);
{error, R} -> f("error: ~p", [R])
end]),
case handler_start(1, Sock, Peek) of
{ok, {Pid, MRef}} ->
i("handler (~p) started", [Pid]),
handler_continue(Pid),
manager_loop(#manager{peek = Peek,
handler_id = 2, % Just in case
handlers = [{1, Pid, MRef}]});
{error, SReason} ->
e("Failed starting handler: "
"~n ~p", [SReason]),
exit({failed_start_handler, SReason})
end;
{error, OReason} ->
e("Failed open socket: "
"~n ~p", [OReason]),
exit({failed_open_socket, OReason})
end;
do_manager_init(Domain, seqpacket = Type, sctp = Proto, _Peek) ->
%% This is as far as I have got with SCTP at the moment...
case socket:open(Domain, Type, Proto) of
{ok, Sock} ->
i("(sctp) socket opened: "
"~n ~p", [Sock]),
EXP = fun(_Desc, Expect, Expect) ->
Expect;
(Desc, Expect, Actual) ->
e("Unexpected result ~w: "
"~n Expect: ~p"
"~n Actual: ~p", [Desc, Expect, Actual]),
exit({Desc, Expect, Actual})
end,
GO = fun(O) -> case socket:getopt(Sock, sctp, O) of
{ok, V} -> f("~p", [V]);
{error, R} -> f("error: ~p", [R])
end
end,
%% ok = socket:setopt(Sock, otp, debug, true),
i("Miscellaneous options: "
"~n associnfo: ~s"
"~n autoclose: ~s"
"~n disable-fragments: ~s"
"~n initmsg: ~s"
"~n maxseg: ~s"
"~n nodelay: ~s"
"~n rtoinfo: ~s",
[GO(associnfo),
GO(autoclose),
GO(disable_fragments),
GO(initmsg),
GO(maxseg),
GO(nodelay),
GO(rtoinfo)]),
Events = #{data_in => true,
association => true,
address => true,
send_failure => true,
peer_error => true,
shutdown => true,
partial_delivery => true,
adaptation_layer => true,
authentication => true,
sender_dry => true},
EXP(set_sctp_events, ok, socket:setopt(Sock, sctp, events, Events)),
EXP(close_socket, ok, socket:close(Sock));
{error, Reason} ->
exit({failed_open, Reason})
end.
manager_stream_init(Domain, Type, Proto) ->
i("try (socket) open"),
Sock = case socket:open(Domain, Type, Proto) of
{ok, S} ->
S;
{error, OReason} ->
throw({open, OReason})
end,
F = fun(X) -> case socket:getopt(Sock, socket, X) of
{ok, V} -> f("~p", [V]);
{error, R} -> f("error: ~p", [R])
end
end,
i("(socket) open (~s,~s,~s): "
"~n debug: ~s"
"~n prio: ~s"
"~n => try find (local) address",
[F(domain), F(type), F(protocol), F(debug), F(priority)]),
Addr = which_addr(Domain),
SA = #{family => Domain,
addr => Addr},
i("found: "
"~n ~p"
"~n => try (socket) bind", [Addr]),
%% ok = socket:setopt(Sock, otp, debug, true),
%% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!!
Port = case socket:bind(Sock, SA) of
{ok, P} ->
%% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!!
%% ok = socket:setopt(Sock, otp, debug, false),
P;
{error, BReason} ->
throw({bind, BReason})
end,
i("bound to: "
"~n ~p"
"~n => try (socket) listen (acceptconn: ~s)",
[Port, F(acceptconn)]),
case socket:listen(Sock) of
ok ->
i("listening (acceptconn: ~s)",
[F(acceptconn)]),
manager_stream_init(Sock, 1, ?NUM_ACCEPTORS, []);
{error, LReason} ->
throw({listen, LReason})
end.
which_addr(Domain) ->
Iflist = case inet:getifaddrs() of
{ok, IFL} ->
IFL;
{error, Reason} ->
throw({inet,getifaddrs,Reason})
end,
which_addr(Domain, Iflist).
which_addr(_Domain, []) ->
throw(no_address);
which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
which_addr2(Domain, IFO);
which_addr(Domain, [_|IFL]) ->
which_addr(Domain, IFL).
which_addr2(_, []) ->
throw(no_address);
which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
Addr;
which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
Addr;
which_addr2(Domain, [_|IFO]) ->
which_addr2(Domain, IFO).
manager_stream_init(Sock, ID, NumAcceptors, Acc)
when (NumAcceptors > 0) ->
i("try start acceptor"),
case acceptor_start(Sock, ID) of
{ok, {Pid, MRef}} ->
i("acceptor ~w (~p) started", [ID, Pid]),
?LIB:sleep(2000),
manager_stream_init(Sock, ID+1, NumAcceptors-1,
[{ID, Pid, MRef}|Acc]);
{error, Reason} ->
exit({failed_starting_acceptor, Reason})
end;
manager_stream_init(Sock, _ID, 0, Acc) ->
%% Req = {kill_acceptor, length(Acc)}, % Last in the queue
%% Req = {kill_acceptor, 3}, % In the "middle" of the queue
%% Req = {kill_acceptor, 2}, % The first in the queue
%% Req = {kill_acceptor, 1}, % Current acceptor
%% Msg = {manager, self(), make_ref(), Req},
%% erlang:send_after(timer:seconds(10), self(), Msg),
{Sock, lists:reverse(Acc)}.
manager_loop(M) ->
receive
{'DOWN', MRef, process, Pid, Reason} ->
M2 = manager_handle_down(M, MRef, Pid, Reason),
manager_loop(M2);
{manager, Pid, Ref, Request} ->
M2 = manager_handle_request(M, Pid, Ref, Request),
manager_loop(M2)
end.
manager_handle_down(#manager{acceptors = Acceptors,
handlers = Handlers} = M, MRef, Pid, Reason) ->
case lists:keysearch(Pid, 2, Acceptors) of
{value, {ID, Pid, MRef}} when (Reason =:= normal) ->
i("acceptor ~w exited (normally)", [ID]),
case lists:keydelete(Pid, 2, Acceptors) of
[] ->
%% We are done
i("the last acceptor - we are done"),
exit(normal);
Acceptors2 ->
M#manager{acceptors = Acceptors2}
end;
{value, {ID, Pid, MRef}} ->
e("acceptor ~w crashed: "
"~n ~p", [ID, Reason]),
exit({acceptor_died, Reason});
false -> %% handler!
if
(Reason =/= normal) ->
e("handler ~p died: "
"~n ~p", [Pid, Reason]);
true ->
i("handler ~p terminated", [Pid])
end,
Handlers2 = lists:keydelete(Pid, 2, Handlers),
M#manager{handlers = Handlers2}
end.
manager_handle_request(#manager{peek = Peek,
handler_id = HID,
handlers = Handlers} = M, Pid, Ref,
{start_handler, Sock}) ->
i("try start handler (~w)", [HID]),
case handler_start(HID, Sock, Peek) of
{ok, {HPid, HMRef}} ->
i("handler ~w started", [HID]),
manager_reply(Pid, Ref, {ok, HPid}),
M#manager{handler_id = HID+1,
handlers = [{HID, HPid, HMRef}|Handlers]};
{error, Reason} = ERROR ->
e("Failed starting new handler: "
"~n Sock: ~p"
"~n Reason: ~p", [Sock, Reason]),
manager_reply(Pid, Ref, ERROR),
M
end;
manager_handle_request(#manager{socket = Sock,
acceptors = [{AID, APid, AMRef}]} = M, _Pid, _Ref,
{kill_acceptor, AID}) ->
i("try kill (only remeining) acceptor ~w", [AID]),
socket:setopt(Sock, otp, debug, true),
manager_stop_acceptor(APid, AMRef, AID, kill),
M#manager{acceptors = []};
manager_handle_request(#manager{socket = Sock,
acceptors = Acceptors} = M, _Pid, _Ref,
{kill_acceptor, AID}) ->
i("try kill acceptor ~w", [AID]),
case lists:keysearch(AID, 1, Acceptors) of
{value, {AID, APid, AMRef}} ->
socket:setopt(Sock, otp, debug, true),
manager_stop_acceptor(APid, AMRef, AID, kill),
Acceptors2 = lists:keydelete(AID, 1, Acceptors),
M#manager{acceptors = Acceptors2};
false ->
e("no such acceptor"),
M
end;
manager_handle_request(#manager{acceptors = Acceptors,
handlers = Handlers}, Pid, Ref,
{stop, Reason}) ->
i("stop"),
manager_reply(Pid, Ref, ok),
manager_stop_handlers(Handlers, Reason),
manager_stop_acceptors(Acceptors, Reason),
i("stopped", []),
exit(Reason).
manager_stop_acceptors(Acceptors, Reason) ->
lists:foreach(fun({ID,P,M}) ->
manager_stop_acceptor(P, M, ID, Reason)
end, Acceptors).
manager_stop_acceptor(Pid, MRef, ID, Reason) ->
i("try stop acceptor ~w (~p): ~p", [ID, Pid, Reason]),
erlang:demonitor(MRef, [flush]),
acceptor_stop(Pid, Reason),
ok.
manager_stop_handlers(Handlers, Reason) ->
lists:foreach(fun({ID,P,M}) ->
manager_stop_handler(P, M, ID, Reason)
end, Handlers).
manager_stop_handler(Pid, MRef, ID, Reason) ->
i("try stop handler ~w (~p): ~p", [ID, Pid, Reason]),
erlang:demonitor(MRef, [flush]),
handler_stop(Pid, Reason),
ok.
%% =========================================================================
acceptor_start(Sock, ID) ->
Self = self(),
A = {Pid, _} = spawn_monitor(fun() ->
acceptor_init(Self, Sock, ID)
end),
receive
{acceptor, Pid, ok} ->
{ok, A};
{acceptor, Pid, {error, _} = Error} ->
exit(Pid, kill), % Just in case
Error;
{'DOWN', _MRef, process, Pid, Reason} ->
{error, {crashed, Reason}}
end.
acceptor_stop(Pid, _Reason) ->
%% acceptor_request(Pid, {stop, Reason}).
exit(Pid, kill).
%% acceptor_request(Pid, Request) ->
%% request(acceptor, Pid, Request).
%% acceptor_reply(Pid, Ref, Reply) ->
%% reply(acceptor, Pid, Ref, Reply).
acceptor_init(Manager, Sock, ID) ->
put(sname, f("acceptor[~w]", [ID])),
Manager ! {acceptor, self(), ok},
acceptor_loop(#acceptor{id = ID,
manager = Manager,
socket = Sock}).
acceptor_loop(#acceptor{socket = LSock} = A) ->
i("try accept"),
case socket:accept(LSock, infinity) of
{ok, Sock} ->
i("accepted: "
"~n ~p"
"~nwhen"
"~n ~p", [Sock, socket:info()]),
case acceptor_handle_accept_success(A, Sock) of
ok ->
acceptor_loop(A);
{error, Reason} ->
e("Failed starting handler: "
"~n ~p", [Reason]),
socket:close(Sock),
exit({failed_starting_handler, Reason})
end;
{error, Reason} ->
e("accept failure: "
"~n ~p", [Reason]),
exit({accept, Reason})
end.
acceptor_handle_accept_success(#acceptor{manager = Manager}, Sock) ->
i("try start handler for peer"
"~n ~p", [case socket:peername(Sock) of
{ok, Peer} -> Peer;
{error, _} = E -> E
end]),
case manager_start_handler(Manager, Sock) of
{ok, Pid} ->
i("handler (~p) started - now change 'ownership'", [Pid]),
case socket:setopt(Sock, otp, controlling_process, Pid) of
ok ->
%% Normally we should have a msgs collection here
%% (of messages we receive before the control was
%% handled over to Handler), but since we don't
%% have active implemented yet...
i("new handler (~p) now controlling process", [Pid]),
handler_continue(Pid),
ok;
{error, _} = ERROR ->
exit(Pid, kill),
ERROR
end;
{error, Reason2} ->
e("failed starting handler: "
"~n (new) Socket: ~p"
"~n Reason: ~p", [Sock, Reason2]),
exit({failed_starting_handler, Reason2})
end.
%% =========================================================================
handler_start(ID, Sock, Peek) ->
Self = self(),
H = {Pid, _} = spawn_monitor(fun() ->
handler_init(Self, ID, Peek, Sock)
end),
receive
{handler, Pid, ok} ->
{ok, H};
{handler, Pid, {error, _} = ERROR} ->
exit(Pid, kill), % Just in case
ERROR
end.
handler_stop(Pid, _Reason) ->
%% handler_request(Pid, {stop, Reason}).
exit(Pid, kill).
handler_continue(Pid) ->
handler_request(Pid, continue).
handler_request(Pid, Request) ->
?LIB:request(handler, Pid, Request).
handler_reply(Pid, Ref, Reply) ->
?LIB:reply(handler, Pid, Ref, Reply).
handler_init(Manager, ID, Peek, Sock) ->
put(sname, f("handler:~w", [ID])),
i("starting"),
Manager ! {handler, self(), ok},
receive
{handler, Pid, Ref, continue} ->
i("got continue"),
handler_reply(Pid, Ref, ok),
G = fun(L, O) -> case socket:getopt(Sock, L, O) of
{ok, Val} ->
f("~p", [Val]);
{error, R} when is_atom(R) ->
f("error: ~w", [R]);
{error, {T, R}} when is_atom(T) ->
f("error: ~w, ~p", [T, R]);
{error, R} ->
f("error: ~p", [R])
end
end,
GSO = fun(O) -> G(socket, O) end,
GIP4 = fun(O) -> G(ip, O) end,
GIP6 = fun(O) -> G(ipv6, O) end,
{ok, Domain} = socket:getopt(Sock, socket, domain),
{ok, Type} = socket:getopt(Sock, socket, type),
{ok, Proto} = socket:getopt(Sock, socket, protocol),
B2D = GSO(bindtodevice),
RA = GSO(reuseaddr),
RP = GSO(reuseport),
OOBI = GSO(oobinline),
RcvBuf = GSO(rcvbuf),
RcvLW = GSO(rcvlowat),
RcvTO = GSO(rcvtimeo),
SndBuf = GSO(sndbuf),
SndLW = GSO(sndlowat),
SndTO = GSO(sndtimeo),
Linger = GSO(linger),
Timestamp = GSO(timestamp),
FreeBind = GIP4(freebind),
MTU = GIP4(mtu),
MTUDisc = GIP4(mtu_discover),
MALL = GIP4(multicast_all),
MIF4 = GIP4(multicast_if),
MLoop4 = GIP4(multicast_loop),
MTTL = GIP4(multicast_ttl),
NF = GIP4(nodefrag), % raw only
RecvIF = GIP4(recvif), % Only dgram and raw (and FreeBSD)
RecvOPTS = GIP4(recvopts), % Not stream
RecvTOS = GIP4(recvtos),
RecvTTL = GIP4(recvttl), % not stream
MHops = GIP6(multicast_hops),
MIF6 = GIP6(multicast_if), % Only dgram and raw
MLoop6 = GIP6(multicast_loop),
RecvPktInfo = GIP6(recvpktinfo),
RtHdr = GIP6(rthdr),
AuthHdr = GIP6(authhdr),
HopLimit = GIP6(hoplimit),
HopOpts = GIP6(hopopts),
DstOpts = GIP6(dstopts),
FlowInfo = GIP6(flowinfo),
UHops = GIP6(unicast_hops),
i("got continue when: "
"~n (socket) Domain: ~p"
"~n (socket) Type: ~p"
"~n (socket) Protocol: ~p"
"~n (socket) Reuse Address: ~s"
"~n (socket) Reuse Port: ~s"
"~n (socket) Bind To Device: ~s"
"~n (socket) OOBInline: ~s"
"~n (socket) RcvBuf: ~s"
"~n (socket) RcvLW: ~s"
"~n (socket) RcvTO: ~s"
"~n (socket) SndBuf: ~s"
"~n (socket) SndLW: ~s"
"~n (socket) SndTO: ~s"
"~n (socket) Linger: ~s"
"~n (socket) Timestamp: ~s"
"~n (ip) FreeBind: ~s"
"~n (ip) MTU: ~s"
"~n (ip) MTU Discovery: ~s"
"~n (ip) Multicast ALL: ~s"
"~n (ip) Multicast IF: ~s"
"~n (ip) Multicast Loop: ~s"
"~n (ip) Multicast TTL: ~s"
"~n (ip) Node Frag: ~s"
"~n (ip) Recv IF: ~s"
"~n (ip) Recv OPTS: ~s"
"~n (ip) Recv TOS: ~s"
"~n (ip) Recv TTL: ~s"
"~n (ipv6) Multicast Hops: ~s"
"~n (ipv6) Multicast IF: ~s"
"~n (ipv6) Multicast Loop: ~s"
"~n (ipv6) Recv Pkt Info: ~s"
"~n (ipv6) RT Hdr: ~s"
"~n (ipv6) Auth Hdr: ~s"
"~n (ipv6) Hop Limit: ~s"
"~n (ipv6) Hop Opts: ~s"
"~n (ipv6) Dst Opts: ~s"
"~n (ipv6) Flow Info: ~s"
"~n (ipv6) Unicast Hops: ~s",
[Domain, Type, Proto,
RA, RP, B2D, OOBI,
RcvBuf, RcvLW, RcvTO, SndBuf, SndLW, SndTO,
Linger, Timestamp,
FreeBind, MTU, MTUDisc, MALL, MIF4, MLoop4, MTTL,
NF, RecvIF, RecvOPTS, RecvTOS, RecvTTL,
MHops, MIF6, MLoop6, RecvPktInfo,
RtHdr, AuthHdr, HopLimit, HopOpts, DstOpts, FlowInfo,
UHops]),
handler_loop(#handler{peek = Peek,
manager = Manager,
type = Type,
socket = Sock})
end.
handler_loop(H) ->
i("try read message"),
case recv(H) of
{ok, {Source, Msg}} ->
i("received ~w bytes of data~s",
[size(Msg), case Source of
undefined -> "";
_ -> f(" from:~n ~p", [Source])
end]),
case ?LIB:dec_msg(Msg) of
{request, N, Req} ->
i("received request ~w: "
"~n ~p", [N, Req]),
Reply = ?LIB:enc_rep_msg(N, "hoppsan"),
case send(H, Reply, Source) of
ok ->
i("successfully sent reply ~w", [N]),
handler_loop(H);
{error, SReason} ->
e("failed sending reply ~w:"
"~n ~p", [N, SReason]),
exit({failed_sending_reply, SReason})
end
end;
{error, closed} ->
i("closed when"
"~n ~p", [socket:info()]),
exit(normal);
{error, RReason} ->
e("failed reading request: "
"~n ~p", [RReason]),
exit({failed_reading_request, RReason})
end.
recv(#handler{peek = true, socket = Sock, type = stream}) ->
peek_recv(Sock);
recv(#handler{peek = false, socket = Sock, type = stream}) ->
do_recv(Sock);
recv(#handler{peek = Peek, socket = Sock, type = dgram})
when (Peek =:= true) ->
%% ok = socket:setopt(Sock, otp, debug, true),
RES = peek_recvfrom(Sock, 5),
%% ok = socket:setopt(Sock, otp, debug, false),
RES;
recv(#handler{peek = Peek, socket = Sock, type = dgram})
when (Peek =:= false) ->
%% ok = socket:setopt(Sock, otp, debug, true),
socket:recvfrom(Sock).
do_recv(Sock) ->
case socket:recv(Sock) of
{ok, Msg} ->
{ok, {undefined, Msg}};
{error, _} = ERROR ->
ERROR
end.
peek_recv(Sock) ->
i("try peek on the message type (expect request)"),
Type = ?LIB:req(),
case socket:recv(Sock, 4, [peek]) of
{ok, <<Type:32>>} ->
i("was request - do proper recv"),
do_recv(Sock);
{error, _} = ERROR ->
ERROR
end.
peek_recvfrom(Sock, BufSz) ->
i("try peek recvfrom with buffer size ~w", [BufSz]),
case socket:recvfrom(Sock, BufSz, [peek]) of
{ok, {_Source, Msg}} when (BufSz =:= size(Msg)) ->
%% i("we filled the buffer: "
%% "~n ~p", [Msg]),
%% It *may not* fit => try again with double size
peek_recvfrom(Sock, BufSz*2);
{ok, _} ->
%% It fits => read for real
i("we did *not* fill the buffer - do the 'real' read"),
socket:recvfrom(Sock);
{error, _} = ERROR ->
ERROR
end.
send(#handler{socket = Sock, type = stream}, Msg, _) ->
socket:send(Sock, Msg);
send(#handler{socket = Sock, type = dgram}, Msg, Dest) ->
socket:sendto(Sock, Msg, Dest).
%% =========================================================================
f(F, A) ->
?LIB:f(F, A).
e(F) ->
e(F, []).
e(F, A) ->
?LIB:e(F, A).
i(F) ->
?LIB:i(F).
i(F, A) ->
?LIB:i(F, A).