aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-11-02 14:17:09 +0100
committerMicael Karlberg <[email protected]>2018-11-02 14:17:09 +0100
commitc6767f0a6dc66971df4425c216024c47993a310b (patch)
treec3e501b997bdf253414dc165fe0a7b165c4c39df
parentd0df643ad994bbc609c52f83b814fc79624af501 (diff)
downloadotp-c6767f0a6dc66971df4425c216024c47993a310b.tar.gz
otp-c6767f0a6dc66971df4425c216024c47993a310b.tar.bz2
otp-c6767f0a6dc66971df4425c216024c47993a310b.zip
[socket-nif|test] Biffer init and message sizes in ping-pong case
The ping-pong test case(s) now initiates the socket buffers before they are connected (server: before listen is called on the listen socket and client: before connect is called). Also, we now include a length indicator in the messages, so that we know how much to read. OTP-14831
-rw-r--r--erts/emulator/test/socket_SUITE.erl213
1 files changed, 150 insertions, 63 deletions
diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl
index 90e9407f6a..c2b8a8349f 100644
--- a/erts/emulator/test/socket_SUITE.erl
+++ b/erts/emulator/test/socket_SUITE.erl
@@ -6712,7 +6712,7 @@ traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) ->
?TT(?SECS(15)),
Send = fun(Sock, Data) -> socket:send(Sock, Data) end,
%% Recv = fun(Sock) -> socket:recv(Sock, 0, 5000) end,
- Recv = fun(Sock) -> socket:recv(Sock) end,
+ Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end,
InitState = #{domain => inet,
send => Send, % Send function
recv => Recv % Receive function
@@ -6740,7 +6740,7 @@ traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) ->
not_yet_implemented(),
?TT(?SECS(30)),
Send = fun(Sock, Data) -> socket:send(Sock, Data) end,
- Recv = fun(Sock) -> socket:recv(Sock) end,
+ Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end,
InitState = #{domain => inet6,
send => Send, % Send function
recv => Recv % Receive function
@@ -6768,7 +6768,7 @@ traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) ->
%% not_yet_implemented(),
?TT(?SECS(30)),
Send = fun(Sock, Data) -> socket:send(Sock, Data) end,
- Recv = fun(Sock) -> socket:recv(Sock) end,
+ Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end,
InitState = #{domain => inet,
send => Send, % Send function
recv => Recv % Receive function
@@ -6796,7 +6796,7 @@ traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) ->
not_yet_implemented(),
?TT(?SECS(30)),
Send = fun(Sock, Data) -> socket:send(Sock, Data) end,
- Recv = fun(Sock) -> socket:recv(Sock) end,
+ Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end,
InitState = #{domain => inet6,
send => Send, % Send function
recv => Recv % Receive function
@@ -6825,7 +6825,7 @@ traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) ->
%% not_yet_implemented(),
?TT(?MINS(5)),
Send = fun(Sock, Data) -> socket:send(Sock, Data) end,
- Recv = fun(Sock) -> socket:recv(Sock) end,
+ Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end,
InitState = #{domain => inet,
send => Send, % Send function
recv => Recv % Receive function
@@ -6853,7 +6853,7 @@ traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) ->
not_yet_implemented(),
?TT(?MINS(5)),
Send = fun(Sock, Data) -> socket:send(Sock, Data) end,
- Recv = fun(Sock) -> socket:recv(Sock) end,
+ Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end,
InitState = #{domain => inet6,
send => Send, % Send function
recv => Recv % Receive function
@@ -6887,12 +6887,31 @@ traffic_ping_pong_medium_send_and_receive_tcp(InitState) ->
traffic_ping_pong_large_send_and_receive_tcp(InitState) ->
Msg = l2b(?LARGE),
- Num = 10,
+ Num = 1000,
Fun = fun(Sock) ->
- ok = socket:setopt(Sock, socket, rcvbuf, 64*1024*1024),
- ok = socket:setopt(Sock, socket, sndbuf, 64*1024*1024),
- %% ok = socket:setopt(Sock, otp, rcvbuf, 64*1024*1024),
- %% ok = socket:setopt(Sock, otp, sndbuf, 64*1024*1024),
+ %% ?SEV_IPRINT("Socket buffers (before): "
+ %% "~n Rcv: ~p"
+ %% "~n Snd: ~p",
+ %% [socket:getopt(Sock, socket, rcvbuf),
+ %% socket:getopt(Sock, socket, sndbuf)]),
+ {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf),
+ if (RcvSz < size(Msg)) ->
+ ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg));
+ true ->
+ ok
+ end,
+ {ok, SndSz} = socket:getopt(Sock, socket, sndbuf),
+ if (SndSz < size(Msg)) ->
+ ok = socket:setopt(Sock, socket, sndbuf, 1024+size(Msg));
+ true ->
+ ok
+ end,
+ ok = socket:setopt(Sock, otp, rcvbuf, 8*1024),
+ %% ?SEV_IPRINT("Socket buffers (after): "
+ %% "~n Rcv: ~p"
+ %% "~n Snd: ~p",
+ %% [socket:getopt(Sock, socket, rcvbuf),
+ %% socket:getopt(Sock, socket, sndbuf)]),
ok
end, %% Fun to update the buffers: NEEDED here!!!
traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg,
@@ -6939,6 +6958,10 @@ traffic_ping_pong_send_and_receive_tcp(InitState) ->
ERROR
end
end},
+ #{desc => "maybe init buffers",
+ cmd => fun(#{lsock := LSock, buf_init := BufInit} = State) ->
+ BufInit(LSock)
+ end},
#{desc => "make listen socket",
cmd => fun(#{lsock := LSock}) ->
socket:listen(LSock)
@@ -6982,11 +7005,9 @@ traffic_ping_pong_send_and_receive_tcp(InitState) ->
#{desc => "start handler",
cmd => fun(#{handler := Handler,
csock := Sock,
- buf_init := BufInit,
send := Send,
recv := Recv} = _State) ->
- ?SEV_ANNOUNCE_START(Handler,
- {Sock, BufInit, Send, Recv}),
+ ?SEV_ANNOUNCE_START(Handler, {Sock, Send, Recv}),
ok
end},
#{desc => "await handler ready (init)",
@@ -7346,25 +7367,35 @@ traffic_ping_pong_send_and_receive_tcp(InitState) ->
{CSent, CReceived, CStart, CStop} = CRes,
STime = tdiff(SStart, SStop),
CTime = tdiff(CStart, CStop),
- ?SEV_IPRINT("Results: "
+ %% Note that the sizes we are counting is only
+ %% the "data" part of the messages. There is also
+ %% fixed header for each message, which of cource
+ %% is small for the large messages, but comparatively
+ %% big for the small messages!
+ ?SEV_IPRINT("Results: ~w messages exchanged"
"~n Server: ~w msec"
- "~n ~w messages/msec exchanged"
+ "~n ~.2f msec/message (roundtrip)"
+ "~n ~.2f messages/msec (roundtrip)"
"~n ~w bytes/msec sent"
"~n ~w bytes/msec received"
"~n Client: ~w msec"
- "~n ~w messages/msec exchanged"
+ "~n ~.2f msec/message (roundtrip)"
+ "~n ~.2f messages/msec (roundtrip)"
"~n ~w bytes/msec sent"
"~n ~w bytes/msec received",
- [STime,
- Num div STime,
+ [Num,
+ STime,
+ STime / Num,
+ Num / STime,
SSent div STime,
SReceived div STime,
CTime,
- Num div CTime,
+ CTime / Num,
+ Num / CTime,
CSent div CTime,
CReceived div CTime]),
State1 = maps:remove(server_result, State),
- State2 = maps:remove(client_result, State),
+ State2 = maps:remove(client_result, State1),
{ok, State2}
end},
@@ -7432,10 +7463,10 @@ tpp_tcp_handler_create() ->
tpp_tcp_handler(Parent) ->
tpp_tcp_handler_init(Parent),
- {Sock, BufInit, Send, Recv} = tpp_tcp_handler_await_start(Parent),
+ {Sock, Send, Recv} = tpp_tcp_handler_await_start(Parent),
tpp_tcp_handler_announce_ready(Parent, init),
tpp_tcp_handler_await_continue(Parent, recv),
- Result = tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv),
+ Result = tpp_tcp_handler_msg_exchange(Sock, Send, Recv),
tpp_tcp_handler_announce_ready(Parent, recv, Result),
Reason = tpp_tcp_handler_await_terminate(Parent),
exit(Reason).
@@ -7478,25 +7509,24 @@ tpp_tcp_handler_await_terminate(Parent) ->
Reason
end.
-tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv) ->
- ok = BufInit(Sock),
- socket:setopt(Sock, otp, debug, true),
+tpp_tcp_handler_msg_exchange(Sock, Send, Recv) ->
+ %% socket:setopt(Sock, otp, debug, true),
tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined).
tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) ->
%% if (N =:= 1000) -> socket:setopt(Sock, otp, debug, true); true -> ok end,
- ?SEV_IPRINT("[~w] try receive", [N]),
- case Recv(Sock) of
- {ok, Msg} ->
+ %% ?SEV_IPRINT("[~w] try receive", [N]),
+ case tpp_tcp_recv_req(Sock, Recv) of
+ {ok, Msg, RecvSz} ->
NewStart = if (Start =:= undefined) -> ?LIB:timestamp();
true -> Start end,
- ?SEV_IPRINT("[~w] received - now try send", [N]),
- case Send(Sock, Msg) of
- ok ->
+ %% ?SEV_IPRINT("[~w] received - now try send", [N]),
+ case tpp_tcp_send_rep(Sock, Send, Msg) of
+ {ok, SendSz} ->
tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv,
N+1,
- Sent+size(Msg),
- Received+size(Msg),
+ Sent+SendSz,
+ Received+RecvSz,
NewStart);
{error, SReason} ->
?SEV_EPRINT("send (~w): ~p", [N, SReason]),
@@ -7531,14 +7561,14 @@ tpp_tcp_client(Parent, GL) ->
tpp_tcp_client_init(Parent, GL),
{ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent),
Domain = maps:get(family, ServerSA),
- Sock = tpp_tcp_client_sock_open(Domain),
+ Sock = tpp_tcp_client_sock_open(Domain, BufInit),
tpp_tcp_client_sock_bind(Sock, Domain),
tpp_tcp_client_announce_ready(Parent, init),
tpp_tcp_client_await_continue(Parent, connect),
tpp_tcp_client_sock_connect(Sock, ServerSA),
tpp_tcp_client_announce_ready(Parent, connect),
{InitMsg, Num} = tpp_tcp_client_await_continue(Parent, send),
- Result = tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num),
+ Result = tpp_tcp_client_msg_exchange(Sock, Send, Recv, InitMsg, Num),
tpp_tcp_client_announce_ready(Parent, send, Result),
Reason = tpp_tcp_client_await_terminate(Parent),
tpp_tcp_client_sock_close(Sock),
@@ -7586,8 +7616,7 @@ tpp_tcp_client_await_terminate(Parent) ->
Reason
end.
-tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num) ->
- ok = BufInit(Sock),
+tpp_tcp_client_msg_exchange(Sock, Send, Recv, InitMsg, Num) ->
Start = ?LIB:timestamp(),
tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, InitMsg,
Num, 0, 0, 0, Start).
@@ -7602,18 +7631,18 @@ tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg,
{error, Reason} ->
exit({failed_closing, Reason})
end;
-tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg,
+tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data,
Num, N, Sent, Received, Start) ->
- d("[~w,~w] try send", [Num,N]),
- case Send(Sock, Msg) of
- ok ->
- d("[~w,~w] sent - no try recv", [Num,N]),
- case Recv(Sock) of
- {ok, NewMsg} ->
+ %% d("[~w,~w] try send", [Num,N]),
+ case tpp_tcp_send_req(Sock, Send, Data) of
+ {ok, SendSz} ->
+ %% d("[~w,~w] sent - no try recv", [Num,N]),
+ case tpp_tcp_recv_rep(Sock, Recv) of
+ {ok, NewData, RecvSz} ->
tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv,
- NewMsg, Num, N+1,
- Sent+size(Msg),
- Received+size(NewMsg),
+ NewData, Num, N+1,
+ Sent+SendSz,
+ Received+RecvSz,
Start);
{error, RReason} ->
?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]),
@@ -7624,9 +7653,10 @@ tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg,
exit({send, SReason, N})
end.
-tpp_tcp_client_sock_open(Domain) ->
+tpp_tcp_client_sock_open(Domain, BufInit) ->
case socket:open(Domain, stream, tcp) of
{ok, Sock} ->
+ ok = BufInit(Sock),
Sock;
{error, Reason} ->
exit({open_failed, Reason})
@@ -7659,6 +7689,63 @@ tpp_tcp_client_sock_close(Sock) ->
exit({close, Reason})
end.
+-define(TPP_REQUEST, 1).
+-define(TPP_REPLY, 2).
+
+tpp_tcp_recv_req(Sock, Recv) ->
+ tpp_tcp_recv(Sock, Recv, ?TPP_REQUEST).
+
+tpp_tcp_recv_rep(Sock, Recv) ->
+ tpp_tcp_recv(Sock, Recv, ?TPP_REPLY).
+
+tpp_tcp_recv(Sock, Recv, Tag) ->
+ case Recv(Sock, 0) of
+ {ok, <<Tag:32/integer, Sz:32/integer, Data/binary>> = Msg}
+ when (Sz =:= size(Data)) ->
+ %% We got it all
+ {ok, Data, size(Msg)};
+ {ok, <<Tag:32/integer, Sz:32/integer, Data/binary>> = Msg} ->
+ Remains = Sz - size(Data),
+ tpp_tcp_recv(Sock, Recv, Tag, Remains, size(Msg), [Data]);
+ {ok, <<Tag:32/integer, _/binary>>} ->
+ {error, {invalid_msg_tag, Tag}};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+tpp_tcp_recv(Sock, Recv, Tag, Remaining, AccSz, Acc) ->
+ case Recv(Sock, Remaining) of
+ {ok, Data} when (Remaining =:= size(Data)) ->
+ %% We got the rest
+ TotSz = AccSz + size(Data),
+ {ok, erlang:iolist_to_binary(lists:reverse([Data | Acc])), TotSz};
+ {ok, Data} when (Remaining > size(Data)) ->
+ tpp_tcp_recv(Sock, Recv, Tag,
+ Remaining - size(Data), AccSz + size(Data),
+ [Data | Acc]);
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+tpp_tcp_send_req(Sock, Send, Data) ->
+ tpp_tcp_send(Sock, Send, ?TPP_REQUEST, Data).
+
+tpp_tcp_send_rep(Sock, Send, Data) ->
+ tpp_tcp_send(Sock, Send, ?TPP_REPLY, Data).
+
+tpp_tcp_send(Sock, Send, Tag, Data) ->
+ DataSz = size(Data),
+ Msg = <<Tag:32/integer, DataSz:32/integer, Data/binary>>,
+ case Send(Sock, Msg) of
+ ok ->
+ {ok, size(Msg)};
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% This gets the local address (not 127.0...)
@@ -7951,21 +8038,21 @@ f(F, A) ->
%% i(Before ++ FStr ++ After, []).
-d(F, A) ->
- d(get(dbg_fd), F, A).
+%% d(F, A) ->
+%% d(get(dbg_fd), F, A).
-d(undefined, F, A) ->
- [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]),
- DbgFileName = f("~s-dbg.txt", [NodeNameStr]),
- case file:open(DbgFileName, [write]) of
- {ok, FD} ->
- put(dbg_fd, FD),
- d(FD, F, A);
- {error, Reason} ->
- exit({failed_open_dbg_file, Reason})
- end;
-d(FD, F, A) ->
- io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]).
+%% d(undefined, F, A) ->
+%% [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]),
+%% DbgFileName = f("~s-dbg.txt", [NodeNameStr]),
+%% case file:open(DbgFileName, [write]) of
+%% {ok, FD} ->
+%% put(dbg_fd, FD),
+%% d(FD, F, A);
+%% {error, Reason} ->
+%% exit({failed_open_dbg_file, Reason})
+%% end;
+%% d(FD, F, A) ->
+%% io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]).
i(F) ->
i(F, []).