diff options
author | Micael Karlberg <[email protected]> | 2018-09-20 15:37:37 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-20 16:29:05 +0200 |
commit | 13d10bc60a41f98647d802524ea8ef8fa9af6b39 (patch) | |
tree | 2a56b2fa3b0d63c05c49aea93427a343e149835f /lib | |
parent | e01a856c993b55c3fbc76fd429783d4aad5bfc80 (diff) | |
download | otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.tar.gz otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.tar.bz2 otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.zip |
[socket-nif] Add proper send timeout handling
Added proper send timeout handling.
Made use of the enif_select(mode = cancel) feature. Each
time a timeout expires, the "active" send (the surrent write
select) has to be cancelled.
OTP-14831
Diffstat (limited to 'lib')
-rw-r--r-- | lib/kernel/test/socket_client.erl | 344 | ||||
-rw-r--r-- | lib/kernel/test/socket_server.erl | 24 |
2 files changed, 241 insertions, 127 deletions
diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl index 6cd353fd07..1c07e799b8 100644 --- a/lib/kernel/test/socket_client.erl +++ b/lib/kernel/test/socket_client.erl @@ -21,53 +21,126 @@ -module(socket_client). -export([ - start/1, start/5, - start_tcp/1, start_tcp/2, start_tcp4/1, start_tcp6/1, - start_udp/1, start_udp/2, start_udp4/1, start_udp6/1 + start/1, start/2, start/5, start/6, + start_tcp/1, start_tcp/2, start_tcp/3, + start_tcp4/1, start_tcp4/2, start_tcp6/1, start_tcp6/2, + start_udp/1, start_udp/2, start_udp/3, + start_udp4/1, start_udp4/2, start_udp6/1, start_udp6/2 ]). -define(LIB, socket_lib). --record(client, {socket, msg = true, type, dest, msg_id = 1}). +-record(client, {socket, verbose = true, msg = true, type, dest, msg_id = 1}). start(Port) -> - start_tcp(Port). + start(Port, 1). + +start(Port, Num) -> + start_tcp(Port, Num). start_tcp(Port) -> - start_tcp4(Port). + start_tcp(Port, 1). + +start_tcp(Port, Num) -> + start_tcp4(Port, Num). start_tcp4(Port) -> - start(inet, stream, tcp, Port). + start_tcp4(Port, 1). + +start_tcp4(Port, Num) -> + start(inet, stream, tcp, Port, Num). start_tcp6(Port) -> - start(inet6, stream, tcp, Port). + start_tcp6(Port, 1). + +start_tcp6(Port, Num) -> + start(inet6, stream, tcp, Port, Num). -start_tcp(Addr, Port) when (size(Addr) =:= 4) -> - start(inet, stream, tcp, Addr, Port); -start_tcp(Addr, Port) when (size(Addr) =:= 8) -> - start(inet6, stream, tcp, Addr, Port). +start_tcp(Addr, Port, Num) when (size(Addr) =:= 4) andalso + is_integer(Num) andalso + (Num > 0) -> + start(inet, stream, tcp, Addr, Port, Num); +start_tcp(Addr, Port, Num) when (size(Addr) =:= 8) andalso + is_integer(Num) andalso + (Num > 0) -> + start(inet6, stream, tcp, Addr, Port, Num). start_udp(Port) -> - start_udp4(Port). + start_udp(Port, 1). + +start_udp(Port, Num) -> + start_udp4(Port, Num). start_udp4(Port) -> - start(inet, dgram, udp, Port). + start_udp4(Port, 1). + +start_udp4(Port, Num) -> + start(inet, dgram, udp, Port, Num). start_udp6(Port) -> - start(inet6, dgram, udp, Port). + start_udp6(Port, 1). -start_udp(Addr, Port) when (size(Addr) =:= 4) -> - start(inet, dgram, udp, Addr, Port); -start_udp(Addr, Port) when (size(Addr) =:= 8) -> - start(inet6, dgram, udp, Addr, Port). +start_udp6(Port, Num) -> + start(inet6, dgram, udp, Port, Num). +start_udp(Addr, Port, Num) when (size(Addr) =:= 4) -> + start(inet, dgram, udp, Addr, Port, Num); +start_udp(Addr, Port, Num) when (size(Addr) =:= 8) -> + start(inet6, dgram, udp, Addr, Port, Num). -start(Domain, Type, Proto, Port) -> - start(Domain, Type, Proto, which_addr(Domain), Port). + +start(Domain, Type, Proto, Port, Num) + when is_integer(Port) andalso is_integer(Num) -> + start(Domain, Type, Proto, which_addr(Domain), Port, Num); start(Domain, Type, Proto, Addr, Port) -> + start(Domain, Type, Proto, Addr, Port, 1). + +start(Domain, Type, Proto, Addr, Port, 1 = Num) -> + start(Domain, Type, Proto, Addr, Port, Num, true); +start(Domain, Type, Proto, Addr, Port, Num) + when is_integer(Num) andalso (Num > 1) -> + start(Domain, Type, Proto, Addr, Port, Num, false). + +start(Domain, Type, Proto, Addr, Port, Num, Verbose) -> put(sname, "starter"), + Clients = start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose), + await_clients(Clients). + +start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose) -> + start_clients(Num, 1, Domain, Type, Proto, Addr, Port, Verbose, []). + +start_clients(Num, ID, Domain, Type, Proto, Addr, Port, Verbose, Acc) + when (Num > 0) -> + StartClient = fun() -> + start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) + end, + {Pid, _} = spawn_monitor(StartClient), + ?LIB:sleep(500), + i("start client ~w", [ID]), + start_clients(Num-1, ID+1, Domain, Type, Proto, Addr, Port, Verbose, [Pid|Acc]); +start_clients(_, _, _, _, _, _, _, _, Acc) -> + i("all client(s) started"), + lists:reverse(Acc). + +await_clients([]) -> + i("all clients done"); +await_clients(Clients) -> + receive + {'DOWN', _MRef, process, Pid, _Reason} -> + case lists:delete(Pid, Clients) of + Clients2 when (Clients2 =/= Clients) -> + i("client ~p done", [Pid]), + await_clients(Clients2); + _ -> + await_clients(Clients) + end + end. + + +start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) -> + put(sname, ?LIB:f("client[~w]", [ID])), SA = #{family => Domain, addr => Addr, port => Port}, @@ -75,110 +148,119 @@ start(Domain, Type, Proto, Addr, Port) -> %% send so few messages (a new value for every %% message). tos_init(), - do_start(Domain, Type, Proto, SA). + do_start(Domain, Type, Proto, SA, Verbose). -do_start(Domain, stream = Type, Proto, SA) -> +do_start(Domain, stream = Type, Proto, SA, Verbose) -> try do_init(Domain, Type, Proto) of Sock -> connect(Sock, SA), - {ok, Name} = socket:sockname(Sock), - {ok, Peer} = socket:peername(Sock), - {ok, Domain} = socket:getopt(Sock, socket, domain), - {ok, Type} = socket:getopt(Sock, socket, type), - {ok, Proto} = socket:getopt(Sock, socket, protocol), - {ok, OOBI} = socket:getopt(Sock, socket, oobinline), - {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), - {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), - {ok, Linger} = socket:getopt(Sock, socket, linger), - {ok, MTU} = socket:getopt(Sock, ip, mtu), - {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover), - {ok, MALL} = socket:getopt(Sock, ip, multicast_all), - {ok, MIF} = socket:getopt(Sock, ip, multicast_if), - {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), - {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), - {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), - i("connected: " - "~n From: ~p" - "~n To: ~p" - "~nwhen" - "~n (socket) Domain: ~p" - "~n (socket) Type: ~p" - "~n (socket) Protocol: ~p" - "~n (socket) OOBInline: ~p" - "~n (socket) SndBuf: ~p" - "~n (socket) RcvBuf: ~p" - "~n (socket) Linger: ~p" - "~n (ip) MTU: ~p" - "~n (ip) MTU Discovery: ~p" - "~n (ip) Multicast ALL: ~p" - "~n (ip) Multicast IF: ~p" - "~n (ip) Multicast Loop: ~p" - "~n (ip) Multicast TTL: ~p" - "~n (ip) RecvTOS: ~p" - "~n => wait some", - [Name, Peer, - Domain, Type, Proto, - OOBI, SndBuf, RcvBuf, Linger, - MTU, MTUDisc, MALL, MIF, MLoop, MTTL, - RecvTOS]), + maybe_print_start_info(Verbose, Sock, Type), %% Give the server some time... ?LIB:sleep(5000), %% ok = socket:close(Sock), - send_loop(#client{socket = Sock, - type = Type}) + send_loop(#client{socket = Sock, + type = Type, + verbose = Verbose}) catch throw:E -> e("Failed initiate: " "~n Error: ~p", [E]) end; -do_start(Domain, dgram = Type, Proto, SA) -> +do_start(Domain, dgram = Type, Proto, SA, Verbose) -> try do_init(Domain, Type, Proto) of Sock -> + maybe_print_start_info(Verbose, Sock, Type), %% Give the server some time... - {ok, Domain} = socket:getopt(Sock, socket, domain), - {ok, Type} = socket:getopt(Sock, socket, type), - {ok, Proto} = socket:getopt(Sock, socket, protocol), - {ok, OOBI} = socket:getopt(Sock, socket, oobinline), - {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), - {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), - {ok, Linger} = socket:getopt(Sock, socket, linger), - {ok, MALL} = socket:getopt(Sock, ip, multicast_all), - {ok, MIF} = socket:getopt(Sock, ip, multicast_if), - {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), - {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), - {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), - {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl), - i("initiated when: " - "~n (socket) Domain: ~p" - "~n (socket) Type: ~p" - "~n (socket) Protocol: ~p" - "~n (socket) OOBInline: ~p" - "~n (socket) SndBuf: ~p" - "~n (socket) RcvBuf: ~p" - "~n (socket) Linger: ~p" - "~n (ip) Multicast ALL: ~p" - "~n (ip) Multicast IF: ~p" - "~n (ip) Multicast Loop: ~p" - "~n (ip) Multicast TTL: ~p" - "~n (ip) RecvTOS: ~p" - "~n (ip) RecvTTL: ~p" - "~n => wait some", - [Domain, Type, Proto, - OOBI, SndBuf, RcvBuf, Linger, - MALL, MIF, MLoop, MTTL, - RecvTOS, RecvTTL]), ?LIB:sleep(5000), %% ok = socket:close(Sock), - send_loop(#client{socket = Sock, - type = Type, - dest = SA}) + send_loop(#client{socket = Sock, + type = Type, + dest = SA, + verbose = Verbose}) catch throw:E -> e("Failed initiate: " "~n Error: ~p", [E]) end. +maybe_print_start_info(true = _Verbose, Sock, stream = _Type) -> + {ok, Name} = socket:sockname(Sock), + {ok, Peer} = socket:peername(Sock), + {ok, Domain} = socket:getopt(Sock, socket, domain), + {ok, Type} = socket:getopt(Sock, socket, type), + {ok, Proto} = socket:getopt(Sock, socket, protocol), + {ok, OOBI} = socket:getopt(Sock, socket, oobinline), + {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), + {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), + {ok, Linger} = socket:getopt(Sock, socket, linger), + {ok, MTU} = socket:getopt(Sock, ip, mtu), + {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover), + {ok, MALL} = socket:getopt(Sock, ip, multicast_all), + {ok, MIF} = socket:getopt(Sock, ip, multicast_if), + {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), + {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), + {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), + i("connected: " + "~n From: ~p" + "~n To: ~p" + "~nwhen" + "~n (socket) Domain: ~p" + "~n (socket) Type: ~p" + "~n (socket) Protocol: ~p" + "~n (socket) OOBInline: ~p" + "~n (socket) SndBuf: ~p" + "~n (socket) RcvBuf: ~p" + "~n (socket) Linger: ~p" + "~n (ip) MTU: ~p" + "~n (ip) MTU Discovery: ~p" + "~n (ip) Multicast ALL: ~p" + "~n (ip) Multicast IF: ~p" + "~n (ip) Multicast Loop: ~p" + "~n (ip) Multicast TTL: ~p" + "~n (ip) RecvTOS: ~p" + "~n => wait some", + [Name, Peer, + Domain, Type, Proto, + OOBI, SndBuf, RcvBuf, Linger, + MTU, MTUDisc, MALL, MIF, MLoop, MTTL, + RecvTOS]); +maybe_print_start_info(true = _Verbose, Sock, dgram = _Type) -> + {ok, Domain} = socket:getopt(Sock, socket, domain), + {ok, Type} = socket:getopt(Sock, socket, type), + {ok, Proto} = socket:getopt(Sock, socket, protocol), + {ok, OOBI} = socket:getopt(Sock, socket, oobinline), + {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), + {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), + {ok, Linger} = socket:getopt(Sock, socket, linger), + {ok, MALL} = socket:getopt(Sock, ip, multicast_all), + {ok, MIF} = socket:getopt(Sock, ip, multicast_if), + {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), + {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), + {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), + {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl), + i("initiated when: " + "~n (socket) Domain: ~p" + "~n (socket) Type: ~p" + "~n (socket) Protocol: ~p" + "~n (socket) OOBInline: ~p" + "~n (socket) SndBuf: ~p" + "~n (socket) RcvBuf: ~p" + "~n (socket) Linger: ~p" + "~n (ip) Multicast ALL: ~p" + "~n (ip) Multicast IF: ~p" + "~n (ip) Multicast Loop: ~p" + "~n (ip) Multicast TTL: ~p" + "~n (ip) RecvTOS: ~p" + "~n (ip) RecvTTL: ~p" + "~n => wait some", + [Domain, Type, Proto, + OOBI, SndBuf, RcvBuf, Linger, + MALL, MIF, MLoop, MTTL, + RecvTOS, RecvTTL]); +maybe_print_start_info(_Verbose, _Sock, _Type) -> + ok. + do_init(Domain, stream = Type, Proto) -> i("try (socket) open"), Sock = case socket:open(Domain, Type, Proto) of @@ -248,14 +330,25 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) -> i("request ~w sent - now try read answer", [N]), case recv(C) of {ok, {Source, Msg}} -> - i("received ~w bytes of data~s", - [size(Msg), case Source of - undefined -> ""; - _ -> ?LIB:f(" from:~n ~p", [Source]) - end]), + if + (C#client.verbose =:= true) -> + i("received ~w bytes of data~s", + [size(Msg), case Source of + undefined -> ""; + _ -> ?LIB:f(" from:~n ~p", [Source]) + end]); + true -> + i("received ~w bytes", [size(Msg)]) + end, case ?LIB:dec_msg(Msg) of {reply, N, Reply} -> - i("received reply ~w: ~p", [N, Reply]), + if + (C#client.verbose =:= true) -> + i("received reply ~w: ~p", [N, Reply]); + true -> + i("received reply ~w", [N]) + end, + ?LIB:sleep(500), % Just to spread it out a bit send_loop(C#client{msg_id = N+1}) end; {error, RReason} -> @@ -268,13 +361,20 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) -> "~n ~p", [N, SReason]), exit({failed_send, SReason}) end; -send_loop(#client{socket = Sock}) -> +send_loop(Client) -> + sock_close(Client). + +sock_close(#client{socket = Sock, verbose = true}) -> i("we are done - close the socket when: " "~n ~p", [socket:info()]), ok = socket:close(Sock), i("we are done - socket closed when: " - "~n ~p", [socket:info()]). + "~n ~p", [socket:info()]); +sock_close(#client{socket = Sock}) -> + i("we are done"), + ok = socket:close(Sock). + send(#client{socket = Sock, type = stream}, Msg) -> socket:send(Sock, Msg); @@ -298,31 +398,41 @@ recv(#client{socket = Sock, type = stream, msg = false}) -> {error, _} = ERROR -> ERROR end; -recv(#client{socket = Sock, type = stream, msg = true}) -> +recv(#client{socket = Sock, verbose = Verbose, type = stream, msg = true}) -> case socket:recvmsg(Sock) of %% An iov of length 1 is an simplification... {ok, #{addr := undefined = Source, iov := [Msg], ctrl := CMsgHdrs, flags := Flags}} -> - i("received message: " - "~n CMsgHdr: ~p" - "~n Flags: ~p", [CMsgHdrs, Flags]), + if + (Verbose =:= true) -> + i("received message: " + "~n CMsgHdr: ~p" + "~n Flags: ~p", [CMsgHdrs, Flags]); + true -> + ok + end, {ok, {Source, Msg}}; {error, _} = ERROR -> ERROR end; recv(#client{socket = Sock, type = dgram, msg = false}) -> socket:recvfrom(Sock); -recv(#client{socket = Sock, type = dgram, msg = true}) -> +recv(#client{socket = Sock, verbose = Verbose, type = dgram, msg = true}) -> case socket:recvmsg(Sock) of {ok, #{addr := Source, iov := [Msg], ctrl := CMsgHdrs, flags := Flags}} -> - i("received message: " - "~n CMsgHdr: ~p" - "~n Flags: ~p", [CMsgHdrs, Flags]), + if + (Verbose =:= true) -> + i("received message: " + "~n CMsgHdr: ~p" + "~n Flags: ~p", [CMsgHdrs, Flags]); + true -> + ok + end, {ok, {Source, Msg}}; {error, _} = ERROR -> ERROR diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index 3e5c4e5d95..9142942428 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -34,8 +34,10 @@ -define(LIB, socket_lib). -record(manager, {socket, msg, peek, acceptors, handler_id, handlers}). --record(acceptor, {id, socket, manager, atimeout = 5000}). --record(handler, {socket, peek, msg, type, manager}). +-record(acceptor, {id, socket, manager, + atimeout = 5000}). +-record(handler, {socket, peek, msg, type, manager, + stimeout = 5000, rtimeout = 5000}). -define(NUM_ACCEPTORS, 5). @@ -904,28 +906,30 @@ peek_recvfrom(Sock, BufSz) -> end. -send(#handler{socket = Sock, msg = true, type = stream}, Msg, _) -> +send(#handler{socket = Sock, msg = true, type = stream, stimeout = Timeout}, + Msg, _) -> CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{iov => [Msg], ctrl => CMsgHdrs}, %% socket:setopt(Sock, otp, debug, true), - Res = socket:sendmsg(Sock, MsgHdr), + Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% socket:setopt(Sock, otp, debug, false), Res; -send(#handler{socket = Sock, type = stream}, Msg, _) -> - socket:send(Sock, Msg); -send(#handler{socket = Sock, msg = true, type = dgram}, Msg, Dest) -> +send(#handler{socket = Sock, type = stream, stimeout = Timeout}, Msg, _) -> + socket:send(Sock, Msg, Timeout); +send(#handler{socket = Sock, msg = true, type = dgram, stimeout = Timeout}, + Msg, Dest) -> CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{addr => Dest, iov => [Msg], ctrl => CMsgHdrs}, %% ok = socket:setopt(Sock, otp, debug, true), - Res = socket:sendmsg(Sock, MsgHdr), + Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% ok = socket:setopt(Sock, otp, debug, false), Res; -send(#handler{socket = Sock, type = dgram}, Msg, Dest) -> - socket:sendto(Sock, Msg, Dest). +send(#handler{socket = Sock, type = dgram, stimeout = Timeout}, Msg, Dest) -> + socket:sendto(Sock, Msg, Dest, Timeout). %% filler() -> %% list_to_binary(lists:duplicate(2048, " FILLER ")). |