From c6767f0a6dc66971df4425c216024c47993a310b Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Fri, 2 Nov 2018 14:17:09 +0100 Subject: [socket-nif|test] Biffer init and message sizes in ping-pong case The ping-pong test case(s) now initiates the socket buffers before they are connected (server: before listen is called on the listen socket and client: before connect is called). Also, we now include a length indicator in the messages, so that we know how much to read. OTP-14831 --- erts/emulator/test/socket_SUITE.erl | 213 +++++++++++++++++++++++++----------- 1 file changed, 150 insertions(+), 63 deletions(-) diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 90e9407f6a..c2b8a8349f 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -6712,7 +6712,7 @@ traffic_ping_pong_small_send_and_recv_tcp4(_Config) when is_list(_Config) -> ?TT(?SECS(15)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, %% Recv = fun(Sock) -> socket:recv(Sock, 0, 5000) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, send => Send, % Send function recv => Recv % Receive function @@ -6740,7 +6740,7 @@ traffic_ping_pong_small_send_and_recv_tcp6(_Config) when is_list(_Config) -> not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, send => Send, % Send function recv => Recv % Receive function @@ -6768,7 +6768,7 @@ traffic_ping_pong_medium_send_and_recv_tcp4(_Config) when is_list(_Config) -> %% not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, send => Send, % Send function recv => Recv % Receive function @@ -6796,7 +6796,7 @@ traffic_ping_pong_medium_send_and_recv_tcp6(_Config) when is_list(_Config) -> not_yet_implemented(), ?TT(?SECS(30)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, send => Send, % Send function recv => Recv % Receive function @@ -6825,7 +6825,7 @@ traffic_ping_pong_large_send_and_recv_tcp4(_Config) when is_list(_Config) -> %% not_yet_implemented(), ?TT(?MINS(5)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet, send => Send, % Send function recv => Recv % Receive function @@ -6853,7 +6853,7 @@ traffic_ping_pong_large_send_and_recv_tcp6(_Config) when is_list(_Config) -> not_yet_implemented(), ?TT(?MINS(5)), Send = fun(Sock, Data) -> socket:send(Sock, Data) end, - Recv = fun(Sock) -> socket:recv(Sock) end, + Recv = fun(Sock, Sz) -> socket:recv(Sock, Sz) end, InitState = #{domain => inet6, send => Send, % Send function recv => Recv % Receive function @@ -6887,12 +6887,31 @@ traffic_ping_pong_medium_send_and_receive_tcp(InitState) -> traffic_ping_pong_large_send_and_receive_tcp(InitState) -> Msg = l2b(?LARGE), - Num = 10, + Num = 1000, Fun = fun(Sock) -> - ok = socket:setopt(Sock, socket, rcvbuf, 64*1024*1024), - ok = socket:setopt(Sock, socket, sndbuf, 64*1024*1024), - %% ok = socket:setopt(Sock, otp, rcvbuf, 64*1024*1024), - %% ok = socket:setopt(Sock, otp, sndbuf, 64*1024*1024), + %% ?SEV_IPRINT("Socket buffers (before): " + %% "~n Rcv: ~p" + %% "~n Snd: ~p", + %% [socket:getopt(Sock, socket, rcvbuf), + %% socket:getopt(Sock, socket, sndbuf)]), + {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), + if (RcvSz < size(Msg)) -> + ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); + true -> + ok + end, + {ok, SndSz} = socket:getopt(Sock, socket, sndbuf), + if (SndSz < size(Msg)) -> + ok = socket:setopt(Sock, socket, sndbuf, 1024+size(Msg)); + true -> + ok + end, + ok = socket:setopt(Sock, otp, rcvbuf, 8*1024), + %% ?SEV_IPRINT("Socket buffers (after): " + %% "~n Rcv: ~p" + %% "~n Snd: ~p", + %% [socket:getopt(Sock, socket, rcvbuf), + %% socket:getopt(Sock, socket, sndbuf)]), ok end, %% Fun to update the buffers: NEEDED here!!! traffic_ping_pong_send_and_receive_tcp(InitState#{msg => Msg, @@ -6939,6 +6958,10 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> ERROR end end}, + #{desc => "maybe init buffers", + cmd => fun(#{lsock := LSock, buf_init := BufInit} = State) -> + BufInit(LSock) + end}, #{desc => "make listen socket", cmd => fun(#{lsock := LSock}) -> socket:listen(LSock) @@ -6982,11 +7005,9 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> #{desc => "start handler", cmd => fun(#{handler := Handler, csock := Sock, - buf_init := BufInit, send := Send, recv := Recv} = _State) -> - ?SEV_ANNOUNCE_START(Handler, - {Sock, BufInit, Send, Recv}), + ?SEV_ANNOUNCE_START(Handler, {Sock, Send, Recv}), ok end}, #{desc => "await handler ready (init)", @@ -7346,25 +7367,35 @@ traffic_ping_pong_send_and_receive_tcp(InitState) -> {CSent, CReceived, CStart, CStop} = CRes, STime = tdiff(SStart, SStop), CTime = tdiff(CStart, CStop), - ?SEV_IPRINT("Results: " + %% Note that the sizes we are counting is only + %% the "data" part of the messages. There is also + %% fixed header for each message, which of cource + %% is small for the large messages, but comparatively + %% big for the small messages! + ?SEV_IPRINT("Results: ~w messages exchanged" "~n Server: ~w msec" - "~n ~w messages/msec exchanged" + "~n ~.2f msec/message (roundtrip)" + "~n ~.2f messages/msec (roundtrip)" "~n ~w bytes/msec sent" "~n ~w bytes/msec received" "~n Client: ~w msec" - "~n ~w messages/msec exchanged" + "~n ~.2f msec/message (roundtrip)" + "~n ~.2f messages/msec (roundtrip)" "~n ~w bytes/msec sent" "~n ~w bytes/msec received", - [STime, - Num div STime, + [Num, + STime, + STime / Num, + Num / STime, SSent div STime, SReceived div STime, CTime, - Num div CTime, + CTime / Num, + Num / CTime, CSent div CTime, CReceived div CTime]), State1 = maps:remove(server_result, State), - State2 = maps:remove(client_result, State), + State2 = maps:remove(client_result, State1), {ok, State2} end}, @@ -7432,10 +7463,10 @@ tpp_tcp_handler_create() -> tpp_tcp_handler(Parent) -> tpp_tcp_handler_init(Parent), - {Sock, BufInit, Send, Recv} = tpp_tcp_handler_await_start(Parent), + {Sock, Send, Recv} = tpp_tcp_handler_await_start(Parent), tpp_tcp_handler_announce_ready(Parent, init), tpp_tcp_handler_await_continue(Parent, recv), - Result = tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv), + Result = tpp_tcp_handler_msg_exchange(Sock, Send, Recv), tpp_tcp_handler_announce_ready(Parent, recv, Result), Reason = tpp_tcp_handler_await_terminate(Parent), exit(Reason). @@ -7478,25 +7509,24 @@ tpp_tcp_handler_await_terminate(Parent) -> Reason end. -tpp_tcp_handler_msg_exchange(Sock, BufInit, Send, Recv) -> - ok = BufInit(Sock), - socket:setopt(Sock, otp, debug, true), +tpp_tcp_handler_msg_exchange(Sock, Send, Recv) -> + %% socket:setopt(Sock, otp, debug, true), tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> %% if (N =:= 1000) -> socket:setopt(Sock, otp, debug, true); true -> ok end, - ?SEV_IPRINT("[~w] try receive", [N]), - case Recv(Sock) of - {ok, Msg} -> + %% ?SEV_IPRINT("[~w] try receive", [N]), + case tpp_tcp_recv_req(Sock, Recv) of + {ok, Msg, RecvSz} -> NewStart = if (Start =:= undefined) -> ?LIB:timestamp(); true -> Start end, - ?SEV_IPRINT("[~w] received - now try send", [N]), - case Send(Sock, Msg) of - ok -> + %% ?SEV_IPRINT("[~w] received - now try send", [N]), + case tpp_tcp_send_rep(Sock, Send, Msg) of + {ok, SendSz} -> tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N+1, - Sent+size(Msg), - Received+size(Msg), + Sent+SendSz, + Received+RecvSz, NewStart); {error, SReason} -> ?SEV_EPRINT("send (~w): ~p", [N, SReason]), @@ -7531,14 +7561,14 @@ tpp_tcp_client(Parent, GL) -> tpp_tcp_client_init(Parent, GL), {ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), - Sock = tpp_tcp_client_sock_open(Domain), + Sock = tpp_tcp_client_sock_open(Domain, BufInit), tpp_tcp_client_sock_bind(Sock, Domain), tpp_tcp_client_announce_ready(Parent, init), tpp_tcp_client_await_continue(Parent, connect), tpp_tcp_client_sock_connect(Sock, ServerSA), tpp_tcp_client_announce_ready(Parent, connect), {InitMsg, Num} = tpp_tcp_client_await_continue(Parent, send), - Result = tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num), + Result = tpp_tcp_client_msg_exchange(Sock, Send, Recv, InitMsg, Num), tpp_tcp_client_announce_ready(Parent, send, Result), Reason = tpp_tcp_client_await_terminate(Parent), tpp_tcp_client_sock_close(Sock), @@ -7586,8 +7616,7 @@ tpp_tcp_client_await_terminate(Parent) -> Reason end. -tpp_tcp_client_msg_exchange(Sock, BufInit, Send, Recv, InitMsg, Num) -> - ok = BufInit(Sock), +tpp_tcp_client_msg_exchange(Sock, Send, Recv, InitMsg, Num) -> Start = ?LIB:timestamp(), tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, InitMsg, Num, 0, 0, 0, Start). @@ -7602,18 +7631,18 @@ tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, {error, Reason} -> exit({failed_closing, Reason}) end; -tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg, +tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Num, N, Sent, Received, Start) -> - d("[~w,~w] try send", [Num,N]), - case Send(Sock, Msg) of - ok -> - d("[~w,~w] sent - no try recv", [Num,N]), - case Recv(Sock) of - {ok, NewMsg} -> + %% d("[~w,~w] try send", [Num,N]), + case tpp_tcp_send_req(Sock, Send, Data) of + {ok, SendSz} -> + %% d("[~w,~w] sent - no try recv", [Num,N]), + case tpp_tcp_recv_rep(Sock, Recv) of + {ok, NewData, RecvSz} -> tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, - NewMsg, Num, N+1, - Sent+size(Msg), - Received+size(NewMsg), + NewData, Num, N+1, + Sent+SendSz, + Received+RecvSz, Start); {error, RReason} -> ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), @@ -7624,9 +7653,10 @@ tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Msg, exit({send, SReason, N}) end. -tpp_tcp_client_sock_open(Domain) -> +tpp_tcp_client_sock_open(Domain, BufInit) -> case socket:open(Domain, stream, tcp) of {ok, Sock} -> + ok = BufInit(Sock), Sock; {error, Reason} -> exit({open_failed, Reason}) @@ -7659,6 +7689,63 @@ tpp_tcp_client_sock_close(Sock) -> exit({close, Reason}) end. +-define(TPP_REQUEST, 1). +-define(TPP_REPLY, 2). + +tpp_tcp_recv_req(Sock, Recv) -> + tpp_tcp_recv(Sock, Recv, ?TPP_REQUEST). + +tpp_tcp_recv_rep(Sock, Recv) -> + tpp_tcp_recv(Sock, Recv, ?TPP_REPLY). + +tpp_tcp_recv(Sock, Recv, Tag) -> + case Recv(Sock, 0) of + {ok, <> = Msg} + when (Sz =:= size(Data)) -> + %% We got it all + {ok, Data, size(Msg)}; + {ok, <> = Msg} -> + Remains = Sz - size(Data), + tpp_tcp_recv(Sock, Recv, Tag, Remains, size(Msg), [Data]); + {ok, <>} -> + {error, {invalid_msg_tag, Tag}}; + {error, _} = ERROR -> + ERROR + end. + +tpp_tcp_recv(Sock, Recv, Tag, Remaining, AccSz, Acc) -> + case Recv(Sock, Remaining) of + {ok, Data} when (Remaining =:= size(Data)) -> + %% We got the rest + TotSz = AccSz + size(Data), + {ok, erlang:iolist_to_binary(lists:reverse([Data | Acc])), TotSz}; + {ok, Data} when (Remaining > size(Data)) -> + tpp_tcp_recv(Sock, Recv, Tag, + Remaining - size(Data), AccSz + size(Data), + [Data | Acc]); + {error, _} = ERROR -> + ERROR + end. + + +tpp_tcp_send_req(Sock, Send, Data) -> + tpp_tcp_send(Sock, Send, ?TPP_REQUEST, Data). + +tpp_tcp_send_rep(Sock, Send, Data) -> + tpp_tcp_send(Sock, Send, ?TPP_REPLY, Data). + +tpp_tcp_send(Sock, Send, Tag, Data) -> + DataSz = size(Data), + Msg = <>, + case Send(Sock, Msg) of + ok -> + {ok, size(Msg)}; + {error, _} = ERROR -> + ERROR + end. + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This gets the local address (not 127.0...) @@ -7951,21 +8038,21 @@ f(F, A) -> %% i(Before ++ FStr ++ After, []). -d(F, A) -> - d(get(dbg_fd), F, A). +%% d(F, A) -> +%% d(get(dbg_fd), F, A). -d(undefined, F, A) -> - [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), - DbgFileName = f("~s-dbg.txt", [NodeNameStr]), - case file:open(DbgFileName, [write]) of - {ok, FD} -> - put(dbg_fd, FD), - d(FD, F, A); - {error, Reason} -> - exit({failed_open_dbg_file, Reason}) - end; -d(FD, F, A) -> - io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). +%% d(undefined, F, A) -> +%% [NodeNameStr|_] = string:split(atom_to_list(node()), [$@]), +%% DbgFileName = f("~s-dbg.txt", [NodeNameStr]), +%% case file:open(DbgFileName, [write]) of +%% {ok, FD} -> +%% put(dbg_fd, FD), +%% d(FD, F, A); +%% {error, Reason} -> +%% exit({failed_open_dbg_file, Reason}) +%% end; +%% d(FD, F, A) -> +%% io:format(FD, "~s~n", [f("[~s] " ++ F, [formated_timestamp()|A])]). i(F) -> i(F, []). -- cgit v1.2.3