aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/test/socket_client.erl
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-09-20 15:37:37 +0200
committerMicael Karlberg <[email protected]>2018-09-20 16:29:05 +0200
commit13d10bc60a41f98647d802524ea8ef8fa9af6b39 (patch)
tree2a56b2fa3b0d63c05c49aea93427a343e149835f /lib/kernel/test/socket_client.erl
parente01a856c993b55c3fbc76fd429783d4aad5bfc80 (diff)
downloadotp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.tar.gz
otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.tar.bz2
otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.zip
[socket-nif] Add proper send timeout handling
Added proper send timeout handling. Made use of the enif_select(mode = cancel) feature. Each time a timeout expires, the "active" send (the surrent write select) has to be cancelled. OTP-14831
Diffstat (limited to 'lib/kernel/test/socket_client.erl')
-rw-r--r--lib/kernel/test/socket_client.erl344
1 files changed, 227 insertions, 117 deletions
diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl
index 6cd353fd07..1c07e799b8 100644
--- a/lib/kernel/test/socket_client.erl
+++ b/lib/kernel/test/socket_client.erl
@@ -21,53 +21,126 @@
-module(socket_client).
-export([
- start/1, start/5,
- start_tcp/1, start_tcp/2, start_tcp4/1, start_tcp6/1,
- start_udp/1, start_udp/2, start_udp4/1, start_udp6/1
+ start/1, start/2, start/5, start/6,
+ start_tcp/1, start_tcp/2, start_tcp/3,
+ start_tcp4/1, start_tcp4/2, start_tcp6/1, start_tcp6/2,
+ start_udp/1, start_udp/2, start_udp/3,
+ start_udp4/1, start_udp4/2, start_udp6/1, start_udp6/2
]).
-define(LIB, socket_lib).
--record(client, {socket, msg = true, type, dest, msg_id = 1}).
+-record(client, {socket, verbose = true, msg = true, type, dest, msg_id = 1}).
start(Port) ->
- start_tcp(Port).
+ start(Port, 1).
+
+start(Port, Num) ->
+ start_tcp(Port, Num).
start_tcp(Port) ->
- start_tcp4(Port).
+ start_tcp(Port, 1).
+
+start_tcp(Port, Num) ->
+ start_tcp4(Port, Num).
start_tcp4(Port) ->
- start(inet, stream, tcp, Port).
+ start_tcp4(Port, 1).
+
+start_tcp4(Port, Num) ->
+ start(inet, stream, tcp, Port, Num).
start_tcp6(Port) ->
- start(inet6, stream, tcp, Port).
+ start_tcp6(Port, 1).
+
+start_tcp6(Port, Num) ->
+ start(inet6, stream, tcp, Port, Num).
-start_tcp(Addr, Port) when (size(Addr) =:= 4) ->
- start(inet, stream, tcp, Addr, Port);
-start_tcp(Addr, Port) when (size(Addr) =:= 8) ->
- start(inet6, stream, tcp, Addr, Port).
+start_tcp(Addr, Port, Num) when (size(Addr) =:= 4) andalso
+ is_integer(Num) andalso
+ (Num > 0) ->
+ start(inet, stream, tcp, Addr, Port, Num);
+start_tcp(Addr, Port, Num) when (size(Addr) =:= 8) andalso
+ is_integer(Num) andalso
+ (Num > 0) ->
+ start(inet6, stream, tcp, Addr, Port, Num).
start_udp(Port) ->
- start_udp4(Port).
+ start_udp(Port, 1).
+
+start_udp(Port, Num) ->
+ start_udp4(Port, Num).
start_udp4(Port) ->
- start(inet, dgram, udp, Port).
+ start_udp4(Port, 1).
+
+start_udp4(Port, Num) ->
+ start(inet, dgram, udp, Port, Num).
start_udp6(Port) ->
- start(inet6, dgram, udp, Port).
+ start_udp6(Port, 1).
-start_udp(Addr, Port) when (size(Addr) =:= 4) ->
- start(inet, dgram, udp, Addr, Port);
-start_udp(Addr, Port) when (size(Addr) =:= 8) ->
- start(inet6, dgram, udp, Addr, Port).
+start_udp6(Port, Num) ->
+ start(inet6, dgram, udp, Port, Num).
+start_udp(Addr, Port, Num) when (size(Addr) =:= 4) ->
+ start(inet, dgram, udp, Addr, Port, Num);
+start_udp(Addr, Port, Num) when (size(Addr) =:= 8) ->
+ start(inet6, dgram, udp, Addr, Port, Num).
-start(Domain, Type, Proto, Port) ->
- start(Domain, Type, Proto, which_addr(Domain), Port).
+
+start(Domain, Type, Proto, Port, Num)
+ when is_integer(Port) andalso is_integer(Num) ->
+ start(Domain, Type, Proto, which_addr(Domain), Port, Num);
start(Domain, Type, Proto, Addr, Port) ->
+ start(Domain, Type, Proto, Addr, Port, 1).
+
+start(Domain, Type, Proto, Addr, Port, 1 = Num) ->
+ start(Domain, Type, Proto, Addr, Port, Num, true);
+start(Domain, Type, Proto, Addr, Port, Num)
+ when is_integer(Num) andalso (Num > 1) ->
+ start(Domain, Type, Proto, Addr, Port, Num, false).
+
+start(Domain, Type, Proto, Addr, Port, Num, Verbose) ->
put(sname, "starter"),
+ Clients = start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose),
+ await_clients(Clients).
+
+start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose) ->
+ start_clients(Num, 1, Domain, Type, Proto, Addr, Port, Verbose, []).
+
+start_clients(Num, ID, Domain, Type, Proto, Addr, Port, Verbose, Acc)
+ when (Num > 0) ->
+ StartClient = fun() ->
+ start_client(ID, Domain, Type, Proto, Addr, Port, Verbose)
+ end,
+ {Pid, _} = spawn_monitor(StartClient),
+ ?LIB:sleep(500),
+ i("start client ~w", [ID]),
+ start_clients(Num-1, ID+1, Domain, Type, Proto, Addr, Port, Verbose, [Pid|Acc]);
+start_clients(_, _, _, _, _, _, _, _, Acc) ->
+ i("all client(s) started"),
+ lists:reverse(Acc).
+
+await_clients([]) ->
+ i("all clients done");
+await_clients(Clients) ->
+ receive
+ {'DOWN', _MRef, process, Pid, _Reason} ->
+ case lists:delete(Pid, Clients) of
+ Clients2 when (Clients2 =/= Clients) ->
+ i("client ~p done", [Pid]),
+ await_clients(Clients2);
+ _ ->
+ await_clients(Clients)
+ end
+ end.
+
+
+start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) ->
+ put(sname, ?LIB:f("client[~w]", [ID])),
SA = #{family => Domain,
addr => Addr,
port => Port},
@@ -75,110 +148,119 @@ start(Domain, Type, Proto, Addr, Port) ->
%% send so few messages (a new value for every
%% message).
tos_init(),
- do_start(Domain, Type, Proto, SA).
+ do_start(Domain, Type, Proto, SA, Verbose).
-do_start(Domain, stream = Type, Proto, SA) ->
+do_start(Domain, stream = Type, Proto, SA, Verbose) ->
try do_init(Domain, Type, Proto) of
Sock ->
connect(Sock, SA),
- {ok, Name} = socket:sockname(Sock),
- {ok, Peer} = socket:peername(Sock),
- {ok, Domain} = socket:getopt(Sock, socket, domain),
- {ok, Type} = socket:getopt(Sock, socket, type),
- {ok, Proto} = socket:getopt(Sock, socket, protocol),
- {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
- {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
- {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
- {ok, Linger} = socket:getopt(Sock, socket, linger),
- {ok, MTU} = socket:getopt(Sock, ip, mtu),
- {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover),
- {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
- {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
- {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
- {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
- {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
- i("connected: "
- "~n From: ~p"
- "~n To: ~p"
- "~nwhen"
- "~n (socket) Domain: ~p"
- "~n (socket) Type: ~p"
- "~n (socket) Protocol: ~p"
- "~n (socket) OOBInline: ~p"
- "~n (socket) SndBuf: ~p"
- "~n (socket) RcvBuf: ~p"
- "~n (socket) Linger: ~p"
- "~n (ip) MTU: ~p"
- "~n (ip) MTU Discovery: ~p"
- "~n (ip) Multicast ALL: ~p"
- "~n (ip) Multicast IF: ~p"
- "~n (ip) Multicast Loop: ~p"
- "~n (ip) Multicast TTL: ~p"
- "~n (ip) RecvTOS: ~p"
- "~n => wait some",
- [Name, Peer,
- Domain, Type, Proto,
- OOBI, SndBuf, RcvBuf, Linger,
- MTU, MTUDisc, MALL, MIF, MLoop, MTTL,
- RecvTOS]),
+ maybe_print_start_info(Verbose, Sock, Type),
%% Give the server some time...
?LIB:sleep(5000),
%% ok = socket:close(Sock),
- send_loop(#client{socket = Sock,
- type = Type})
+ send_loop(#client{socket = Sock,
+ type = Type,
+ verbose = Verbose})
catch
throw:E ->
e("Failed initiate: "
"~n Error: ~p", [E])
end;
-do_start(Domain, dgram = Type, Proto, SA) ->
+do_start(Domain, dgram = Type, Proto, SA, Verbose) ->
try do_init(Domain, Type, Proto) of
Sock ->
+ maybe_print_start_info(Verbose, Sock, Type),
%% Give the server some time...
- {ok, Domain} = socket:getopt(Sock, socket, domain),
- {ok, Type} = socket:getopt(Sock, socket, type),
- {ok, Proto} = socket:getopt(Sock, socket, protocol),
- {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
- {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
- {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
- {ok, Linger} = socket:getopt(Sock, socket, linger),
- {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
- {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
- {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
- {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
- {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
- {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl),
- i("initiated when: "
- "~n (socket) Domain: ~p"
- "~n (socket) Type: ~p"
- "~n (socket) Protocol: ~p"
- "~n (socket) OOBInline: ~p"
- "~n (socket) SndBuf: ~p"
- "~n (socket) RcvBuf: ~p"
- "~n (socket) Linger: ~p"
- "~n (ip) Multicast ALL: ~p"
- "~n (ip) Multicast IF: ~p"
- "~n (ip) Multicast Loop: ~p"
- "~n (ip) Multicast TTL: ~p"
- "~n (ip) RecvTOS: ~p"
- "~n (ip) RecvTTL: ~p"
- "~n => wait some",
- [Domain, Type, Proto,
- OOBI, SndBuf, RcvBuf, Linger,
- MALL, MIF, MLoop, MTTL,
- RecvTOS, RecvTTL]),
?LIB:sleep(5000),
%% ok = socket:close(Sock),
- send_loop(#client{socket = Sock,
- type = Type,
- dest = SA})
+ send_loop(#client{socket = Sock,
+ type = Type,
+ dest = SA,
+ verbose = Verbose})
catch
throw:E ->
e("Failed initiate: "
"~n Error: ~p", [E])
end.
+maybe_print_start_info(true = _Verbose, Sock, stream = _Type) ->
+ {ok, Name} = socket:sockname(Sock),
+ {ok, Peer} = socket:peername(Sock),
+ {ok, Domain} = socket:getopt(Sock, socket, domain),
+ {ok, Type} = socket:getopt(Sock, socket, type),
+ {ok, Proto} = socket:getopt(Sock, socket, protocol),
+ {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
+ {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
+ {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
+ {ok, Linger} = socket:getopt(Sock, socket, linger),
+ {ok, MTU} = socket:getopt(Sock, ip, mtu),
+ {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover),
+ {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
+ {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
+ {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
+ {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
+ {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
+ i("connected: "
+ "~n From: ~p"
+ "~n To: ~p"
+ "~nwhen"
+ "~n (socket) Domain: ~p"
+ "~n (socket) Type: ~p"
+ "~n (socket) Protocol: ~p"
+ "~n (socket) OOBInline: ~p"
+ "~n (socket) SndBuf: ~p"
+ "~n (socket) RcvBuf: ~p"
+ "~n (socket) Linger: ~p"
+ "~n (ip) MTU: ~p"
+ "~n (ip) MTU Discovery: ~p"
+ "~n (ip) Multicast ALL: ~p"
+ "~n (ip) Multicast IF: ~p"
+ "~n (ip) Multicast Loop: ~p"
+ "~n (ip) Multicast TTL: ~p"
+ "~n (ip) RecvTOS: ~p"
+ "~n => wait some",
+ [Name, Peer,
+ Domain, Type, Proto,
+ OOBI, SndBuf, RcvBuf, Linger,
+ MTU, MTUDisc, MALL, MIF, MLoop, MTTL,
+ RecvTOS]);
+maybe_print_start_info(true = _Verbose, Sock, dgram = _Type) ->
+ {ok, Domain} = socket:getopt(Sock, socket, domain),
+ {ok, Type} = socket:getopt(Sock, socket, type),
+ {ok, Proto} = socket:getopt(Sock, socket, protocol),
+ {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
+ {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
+ {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
+ {ok, Linger} = socket:getopt(Sock, socket, linger),
+ {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
+ {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
+ {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
+ {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
+ {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
+ {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl),
+ i("initiated when: "
+ "~n (socket) Domain: ~p"
+ "~n (socket) Type: ~p"
+ "~n (socket) Protocol: ~p"
+ "~n (socket) OOBInline: ~p"
+ "~n (socket) SndBuf: ~p"
+ "~n (socket) RcvBuf: ~p"
+ "~n (socket) Linger: ~p"
+ "~n (ip) Multicast ALL: ~p"
+ "~n (ip) Multicast IF: ~p"
+ "~n (ip) Multicast Loop: ~p"
+ "~n (ip) Multicast TTL: ~p"
+ "~n (ip) RecvTOS: ~p"
+ "~n (ip) RecvTTL: ~p"
+ "~n => wait some",
+ [Domain, Type, Proto,
+ OOBI, SndBuf, RcvBuf, Linger,
+ MALL, MIF, MLoop, MTTL,
+ RecvTOS, RecvTTL]);
+maybe_print_start_info(_Verbose, _Sock, _Type) ->
+ ok.
+
do_init(Domain, stream = Type, Proto) ->
i("try (socket) open"),
Sock = case socket:open(Domain, Type, Proto) of
@@ -248,14 +330,25 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) ->
i("request ~w sent - now try read answer", [N]),
case recv(C) of
{ok, {Source, Msg}} ->
- i("received ~w bytes of data~s",
- [size(Msg), case Source of
- undefined -> "";
- _ -> ?LIB:f(" from:~n ~p", [Source])
- end]),
+ if
+ (C#client.verbose =:= true) ->
+ i("received ~w bytes of data~s",
+ [size(Msg), case Source of
+ undefined -> "";
+ _ -> ?LIB:f(" from:~n ~p", [Source])
+ end]);
+ true ->
+ i("received ~w bytes", [size(Msg)])
+ end,
case ?LIB:dec_msg(Msg) of
{reply, N, Reply} ->
- i("received reply ~w: ~p", [N, Reply]),
+ if
+ (C#client.verbose =:= true) ->
+ i("received reply ~w: ~p", [N, Reply]);
+ true ->
+ i("received reply ~w", [N])
+ end,
+ ?LIB:sleep(500), % Just to spread it out a bit
send_loop(C#client{msg_id = N+1})
end;
{error, RReason} ->
@@ -268,13 +361,20 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) ->
"~n ~p", [N, SReason]),
exit({failed_send, SReason})
end;
-send_loop(#client{socket = Sock}) ->
+send_loop(Client) ->
+ sock_close(Client).
+
+sock_close(#client{socket = Sock, verbose = true}) ->
i("we are done - close the socket when: "
"~n ~p", [socket:info()]),
ok = socket:close(Sock),
i("we are done - socket closed when: "
- "~n ~p", [socket:info()]).
+ "~n ~p", [socket:info()]);
+sock_close(#client{socket = Sock}) ->
+ i("we are done"),
+ ok = socket:close(Sock).
+
send(#client{socket = Sock, type = stream}, Msg) ->
socket:send(Sock, Msg);
@@ -298,31 +398,41 @@ recv(#client{socket = Sock, type = stream, msg = false}) ->
{error, _} = ERROR ->
ERROR
end;
-recv(#client{socket = Sock, type = stream, msg = true}) ->
+recv(#client{socket = Sock, verbose = Verbose, type = stream, msg = true}) ->
case socket:recvmsg(Sock) of
%% An iov of length 1 is an simplification...
{ok, #{addr := undefined = Source,
iov := [Msg],
ctrl := CMsgHdrs,
flags := Flags}} ->
- i("received message: "
- "~n CMsgHdr: ~p"
- "~n Flags: ~p", [CMsgHdrs, Flags]),
+ if
+ (Verbose =:= true) ->
+ i("received message: "
+ "~n CMsgHdr: ~p"
+ "~n Flags: ~p", [CMsgHdrs, Flags]);
+ true ->
+ ok
+ end,
{ok, {Source, Msg}};
{error, _} = ERROR ->
ERROR
end;
recv(#client{socket = Sock, type = dgram, msg = false}) ->
socket:recvfrom(Sock);
-recv(#client{socket = Sock, type = dgram, msg = true}) ->
+recv(#client{socket = Sock, verbose = Verbose, type = dgram, msg = true}) ->
case socket:recvmsg(Sock) of
{ok, #{addr := Source,
iov := [Msg],
ctrl := CMsgHdrs,
flags := Flags}} ->
- i("received message: "
- "~n CMsgHdr: ~p"
- "~n Flags: ~p", [CMsgHdrs, Flags]),
+ if
+ (Verbose =:= true) ->
+ i("received message: "
+ "~n CMsgHdr: ~p"
+ "~n Flags: ~p", [CMsgHdrs, Flags]);
+ true ->
+ ok
+ end,
{ok, {Source, Msg}};
{error, _} = ERROR ->
ERROR