aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRaimo Niskanen <[email protected]>2018-10-22 11:48:35 +0200
committerRaimo Niskanen <[email protected]>2018-10-22 11:48:35 +0200
commit2dbf5bf8c3b91c17ed79f13df3eb08c0b943218c (patch)
tree9ca4a8d3f87f45b17757a55bdc925955b856fa1c
parent57e9c998574695433fa21f8c42e2bccba77448ef (diff)
parent6cf477d189ba993a5cb66aad421c9d7505da250c (diff)
downloadotp-2dbf5bf8c3b91c17ed79f13df3eb08c0b943218c.tar.gz
otp-2dbf5bf8c3b91c17ed79f13df3eb08c0b943218c.tar.bz2
otp-2dbf5bf8c3b91c17ed79f13df3eb08c0b943218c.zip
Merge branch 'raimo/tcp-close-while-send/maint/ERL-561/OTP-12242' into maint
* raimo/tcp-close-while-send/maint/ERL-561/OTP-12242: Write test case Fix hanging gen_tcp send vs close race Conflicts: erts/preloaded/ebin/prim_inet.beam
-rw-r--r--erts/preloaded/ebin/prim_inet.beambin80864 -> 82260 bytes
-rw-r--r--erts/preloaded/src/prim_inet.erl143
-rw-r--r--lib/kernel/test/gen_tcp_misc_SUITE.erl146
3 files changed, 260 insertions, 29 deletions
diff --git a/erts/preloaded/ebin/prim_inet.beam b/erts/preloaded/ebin/prim_inet.beam
index 52bab031ff..eaa1e2cdf8 100644
--- a/erts/preloaded/ebin/prim_inet.beam
+++ b/erts/preloaded/ebin/prim_inet.beam
Binary files differ
diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl
index 963e8933bc..f1d938c9a4 100644
--- a/erts/preloaded/src/prim_inet.erl
+++ b/erts/preloaded/src/prim_inet.erl
@@ -49,9 +49,15 @@
-include("inet_sctp.hrl").
-include("inet_int.hrl").
-%-define(DEBUG, 1).
+%%%-define(DEBUG, 1).
-ifdef(DEBUG).
--define(DBG_FORMAT(Format, Args), (io:format((Format), (Args)))).
+-define(
+ DBG_FORMAT(Format, Args),
+ begin
+ %% io:format((Format), (Args)),
+ erlang:display(lists:flatten(io_lib:format((Format), (Args)))),
+ ok
+ end).
-else.
-define(DBG_FORMAT(Format, Args), ok).
-endif.
@@ -150,39 +156,96 @@ shutdown_1(S, How) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
close(S) when is_port(S) ->
+ ?DBG_FORMAT("prim_inet:close(~p)~n", [S]),
case getopt(S, linger) of
{ok,{true,0}} ->
close_port(S);
- _ ->
- case subscribe(S, [subs_empty_out_q]) of
- {ok, [{subs_empty_out_q,N}]} when N > 0 ->
- close_pend_loop(S, N); %% wait for pending output to be sent
- _ ->
- close_port(S)
- end
+ {ok,{true,T}} ->
+ %% Wait for T seconds for pending output to be sent
+ %%
+ %% Note that this handling of Linger may look ok,
+ %% but sweeps some problems under the rug since
+ %% there are OS buffers that may have remaining data
+ %% after the inet driver has emptied its buffers.
+ %% But Linger for nonblocking sockets is broken
+ %% anyway on all OS:es, according to hearsay,
+ %% and is a contradiction in itself.
+ %% We have hereby done our best...
+ %%
+ Tref = erlang:start_timer(T * 1000, self(), close_port),
+ close_pend_loop(S, Tref, undefined);
+ _ -> % Regard this as {ok,{false,_}}
+ case subscribe(S, [subs_empty_out_q]) of
+ {ok, [{subs_empty_out_q,N}]} when N > 0 ->
+ %% Wait for pending output to be sent
+ DefaultT = 180000, % Arbitrary system timeout 3 min
+ Tref = erlang:start_timer(DefaultT, self(), close_port),
+ close_pend_loop(S, Tref, N);
+ _ ->
+ %% Subscribe failed or empty out q - give up or done
+ close_port(S)
+ end
end.
-close_pend_loop(S, N) ->
+close_pend_loop(S, Tref, N) ->
+ ?DBG_FORMAT("prim_inet:close_pend_loop(~p, _, ~p)~n", [S,N]),
receive
- {empty_out_q,S} ->
- close_port(S)
+ {timeout,Tref,_} -> % Linger timeout
+ ?DBG_FORMAT("prim_inet:close_pend_loop(~p, _, _) timeout~n", [S]),
+ close_port(S);
+ {empty_out_q,S} when N =/= undefined ->
+ ?DBG_FORMAT(
+ "prim_inet:close_pend_loop(~p, _, _) empty_out_q~n", [S]),
+ close_port(S, Tref)
after ?INET_CLOSE_TIMEOUT ->
case getstat(S, [send_pend]) of
{ok, [{send_pend,N1}]} ->
+ ?DBG_FORMAT(
+ "prim_inet:close_pend_loop(~p, _, _) send_pend ~p~n",
+ [S,N1]),
if
- N1 =:= N ->
- close_port(S);
- true ->
- close_pend_loop(S, N1)
+ N1 =:= 0 ->
+ %% Empty outq - done
+ close_port(S, Tref);
+ N =:= undefined ->
+ %% Within linger time - wait some more
+ close_pend_loop(S, Tref, N);
+ N1 =:= N ->
+ %% Inactivity - give up
+ close_port(S, Tref);
+ true ->
+ %% Still moving - wait some more
+ close_pend_loop(S, Tref, N)
end;
- _ ->
- close_port(S)
- end
+ _Stat ->
+ %% Failed getstat - give up
+ ?DBG_FORMAT(
+ "prim_inet:close_pend_loop(~p, _, _) getstat ~p~n",
+ [S,_Stat]),
+ close_port(S, Tref)
+ end
end.
+
+close_port(S, Tref) ->
+ ?DBG_FORMAT("prim_inet:close_port(~p, _)~n", [S]),
+ case erlang:cancel_timer(Tref) of
+ false ->
+ receive
+ {timeout,Tref,_} ->
+ ok
+ end;
+ _N ->
+ ok
+ end,
+ close_port(S).
+%%
close_port(S) ->
- catch erlang:port_close(S),
- receive {'EXIT',S,_} -> ok after 0 -> ok end.
+ ?DBG_FORMAT("prim_inet:close_port(~p)~n", [S]),
+ _Closed = (catch erlang:port_close(S)),
+ receive {'EXIT',S,_} -> ok after 0 -> ok end,
+ ?DBG_FORMAT("prim_inet:close_port(~p) ~p~n", [S,_Closed]),
+ ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
@@ -424,23 +487,49 @@ peeloff(S, AssocId) ->
%% be called directly -- use "sendmsg" instead:
%%
send(S, Data, OptList) when is_port(S), is_list(OptList) ->
- ?DBG_FORMAT("prim_inet:send(~p, ~p)~n", [S,Data]),
+ ?DBG_FORMAT("prim_inet:send(~p, _, ~p)~n", [S,OptList]),
try erlang:port_command(S, Data, OptList) of
false -> % Port busy and nosuspend option passed
?DBG_FORMAT("prim_inet:send() -> {error,busy}~n", []),
{error,busy};
true ->
- receive
- {inet_reply,S,Status} ->
- ?DBG_FORMAT("prim_inet:send() -> ~p~n", [Status]),
- Status
- end
+ send_recv_reply(S, undefined)
catch
error:_Error ->
?DBG_FORMAT("prim_inet:send() -> {error,einval}~n", []),
{error,einval}
end.
+send_recv_reply(S, Mref) ->
+ ReplyTimeout =
+ case Mref of
+ undefined ->
+ ?INET_CLOSE_TIMEOUT;
+ _ ->
+ infinity
+ end,
+ receive
+ {inet_reply,S,Status} ->
+ ?DBG_FORMAT(
+ "prim_inet:send_recv_reply(~p, _): inet_reply ~p~n",
+ [S,Status]),
+ case Mref of
+ undefined -> ok;
+ _ ->
+ demonitor(Mref, [flush]),
+ ok
+ end,
+ Status;
+ {'DOWN',Mref,_,_,_Reason} when Mref =/= undefined ->
+ ?DBG_FORMAT(
+ "prim_inet:send_recv_reply(~p, _) 'DOWN' ~p~n",
+ [S,_Reason]),
+ {error,closed}
+ after ReplyTimeout ->
+ send_recv_reply(S, monitor(port, S))
+ end.
+
+
send(S, Data) ->
send(S, Data, []).
diff --git a/lib/kernel/test/gen_tcp_misc_SUITE.erl b/lib/kernel/test/gen_tcp_misc_SUITE.erl
index 194522c009..04c0c48e3a 100644
--- a/lib/kernel/test/gen_tcp_misc_SUITE.erl
+++ b/lib/kernel/test/gen_tcp_misc_SUITE.erl
@@ -52,7 +52,8 @@
several_accepts_in_one_go/1, accept_system_limit/1,
active_once_closed/1, send_timeout/1, send_timeout_active/1,
otp_7731/1, zombie_sockets/1, otp_7816/1, otp_8102/1,
- wrapping_oct/0, wrapping_oct/1, otp_9389/1, otp_13939/1]).
+ wrapping_oct/0, wrapping_oct/1, otp_9389/1, otp_13939/1,
+ otp_12242/1]).
%% Internal exports.
-export([sender/3, not_owner/1, passive_sockets_server/2, priority_server/1,
@@ -95,7 +96,8 @@ all() ->
killing_multi_acceptors2, several_accepts_in_one_go, accept_system_limit,
active_once_closed, send_timeout, send_timeout_active, otp_7731,
wrapping_oct,
- zombie_sockets, otp_7816, otp_8102, otp_9389].
+ zombie_sockets, otp_7816, otp_8102, otp_9389,
+ otp_12242].
groups() ->
[].
@@ -3284,3 +3286,143 @@ otp_13939(Config) when is_list(Config) ->
exit(Pid, normal),
ct:fail("Server process blocked on send.")
end.
+
+otp_12242(Config) when is_list(Config) ->
+ case os:type() of
+ {win32,_} ->
+ %% Even if we set sndbuf and recbuf to small sizes
+ %% Windows either happily accepts to send GBytes of data
+ %% in no time, so the second send below that is supposed
+ %% to time out just succedes, or the first send that
+ %% is supposed to fill the inet_drv I/O queue and
+ %% start waiting for when more data can be sent
+ %% instead sends all data but suffers a send
+ %% failure that closes the socket
+ {skipped,backpressure_broken_on_win32};
+ _ ->
+ %% Find the IPv4 address of an up and running interface
+ %% that is not loopback nor pointtopoint
+ {ok,IFList} = inet:getifaddrs(),
+ ct:pal("IFList ~p~n", [IFList]),
+ case
+ lists:flatten(
+ [lists:filtermap(
+ fun ({addr,Addr}) when tuple_size(Addr) =:= 4 ->
+ {true,Addr};
+ (_) ->
+ false
+ end, Opts)
+ || {_,Opts} <- IFList,
+ case lists:keyfind(flags, 1, Opts) of
+ {_,Flags} ->
+ lists:member(up, Flags)
+ andalso
+ lists:member(running, Flags)
+ andalso
+ not lists:member(loopback, Flags)
+ andalso
+ not lists:member(pointtopoint, Flags);
+ false ->
+ false
+ end])
+ of
+ [Addr|_] ->
+ otp_12242(Addr);
+ Other ->
+ {skipped,{no_external_address,Other}}
+ end
+ end;
+%%
+otp_12242(Addr) when tuple_size(Addr) =:= 4 ->
+ ct:timetrap(30000),
+ ct:pal("Using address ~p~n", [Addr]),
+ Bufsize = 16 * 1024,
+ Datasize = 128 * 1024 * 1024, % At least 1 s on GBit interface
+ Blob = binary:copy(<<$x>>, Datasize),
+ LOpts =
+ [{backlog,4},{reuseaddr,true},{ip,Addr},
+ binary,{active,false},
+ {recbuf,Bufsize},{sndbuf,Bufsize},{buffer,Bufsize}],
+ COpts =
+ [binary,{active,false},{ip,Addr},
+ {linger,{true,1}}, % 1 s
+ {send_timeout,500},
+ {recbuf,Bufsize},{sndbuf,Bufsize},{buffer,Bufsize}],
+ Dir = filename:dirname(code:which(?MODULE)),
+ {ok,ListenerNode} =
+ test_server:start_node(
+ ?UNIQ_NODE_NAME, slave, [{args,"-pa " ++ Dir}]),
+ Tester = self(),
+ Listener =
+ spawn(
+ ListenerNode,
+ fun () ->
+ {ok,L} = gen_tcp:listen(0, LOpts),
+ {ok,LPort} = inet:port(L),
+ Tester ! {self(),port,LPort},
+ {ok,A} = gen_tcp:accept(L),
+ ok = gen_tcp:close(L),
+ receive
+ {Tester,stop} ->
+ ok = gen_tcp:close(A)
+ end
+ end),
+ ListenerMref = monitor(process, Listener),
+ LPort = receive {Listener,port,P} -> P end,
+ {ok,C} = gen_tcp:connect(Addr, LPort, COpts, infinity),
+ {ok,ReadCOpts} = inet:getopts(C, [recbuf,sndbuf,buffer]),
+ ct:pal("ReadCOpts ~p~n", [ReadCOpts]),
+ %%
+ %% Fill the buffers
+ ct:pal("Sending ~p bytes~n", [Datasize]),
+ ok = gen_tcp:send(C, Blob),
+ ct:pal("Sent ~p bytes~n", [Datasize]),
+ %% Spawn the Closer,
+ %% try to ensure that the close call is in progress
+ %% before the owner proceeds with sending
+ Owner = self(),
+ {_Closer,CloserMref} =
+ spawn_opt(
+ fun () ->
+ Owner ! {tref, erlang:start_timer(50, Owner, closing)},
+ ct:pal("Calling gen_tcp:close(C)~n"),
+ try gen_tcp:close(C) of
+ Result ->
+ ct:pal("gen_tcp:close(C) -> ~p~n", [Result]),
+ ok = Result
+ catch
+ Class:Reason:Stacktrace ->
+ ct:pal(
+ "gen_tcp:close(C) >< ~p:~p~n ~p~n",
+ [Class,Reason,Stacktrace]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end
+ end, [link,monitor]),
+ receive
+ {tref,Tref} ->
+ receive {timeout,Tref,_} -> ok end,
+ ct:pal("Sending ~p bytes again~n", [Datasize]),
+ %% Now should the close be in progress...
+ %% All buffers are full, remote end is not reading,
+ %% and the send timeout is 1 s so this will timeout:
+ {error,timeout} = gen_tcp:send(C, Blob),
+ ct:pal("Sending ~p bytes again timed out~n", [Datasize]),
+ ok = inet:setopts(C, [{send_timeout,10000}]),
+ %% There is a hidden timeout here. Port close is sampled
+ %% every 5 s by prim_inet:send_recv_reply.
+ %% Linger is 3 s so the Closer will finish this send:
+ ct:pal("Sending ~p bytes with 10 s timeout~n", [Datasize]),
+ {error,closed} = gen_tcp:send(C, Blob),
+ ct:pal("Sending ~p bytes with 10 s timeout was closed~n",
+ [Datasize]),
+ normal = wait(CloserMref),
+ ct:pal("The Closer has exited~n"),
+ Listener ! {Tester,stop},
+ receive {'DOWN',ListenerMref,_,_,_} -> ok end,
+ ct:pal("The Listener has exited~n"),
+ test_server:stop_node(ListenerNode),
+ ok
+ end.
+
+wait(Mref) ->
+ receive {'DOWN',Mref,_,_,Reason} -> Reason end.