aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cowboy_http.erl45
-rw-r--r--src/cowboy_http2.erl107
-rw-r--r--test/handlers/streamed_result_h.erl20
-rw-r--r--test/http2_SUITE.erl12
-rw-r--r--test/http_SUITE.erl79
5 files changed, 225 insertions, 38 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index abaab06..bba2108 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -47,6 +47,7 @@
middlewares => [module()],
proxy_header => boolean(),
request_timeout => timeout(),
+ reset_idle_timeout_on_send => boolean(),
sendfile => boolean(),
shutdown_timeout => timeout(),
stream_handlers => [module()],
@@ -294,6 +295,14 @@ set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
end,
State#state{timer=TimerRef}.
+maybe_reset_idle_timeout(State=#state{opts=Opts}) ->
+ case maps:get(reset_idle_timeout_on_send, Opts, false) of
+ true ->
+ set_timeout(State, idle_timeout);
+ false ->
+ State
+ end.
+
cancel_timeout(State=#state{timer=TimerRef}) ->
ok = case TimerRef of
undefined ->
@@ -366,6 +375,11 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
cowboy:log(cowboy_stream:make_error_log(init,
[StreamID, Req, Opts],
Class, Exception, Stacktrace), Opts),
+ %% We do not reset the idle timeout on send here
+ %% because an error occurred in the application. While we
+ %% are keeping the connection open for further requests we
+ %% do not want to keep the connection up too long if no
+ %% additional requests come in.
early_error(500, State0, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:init/3.'}, Req),
parse(Buffer, State0)
@@ -1012,19 +1026,20 @@ commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
commands(State, StreamID, Tail);
%% Send an informational response.
-commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
+commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
StreamID, [{inform, StatusCode, Headers}|Tail]) ->
%% @todo I'm pretty sure the last stream in the list is the one we want
%% considering all others are queued.
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
_ = case Version of
'HTTP/1.1' ->
- ok = maybe_socket_error(State, Transport:send(Socket,
+ ok = maybe_socket_error(State0, Transport:send(Socket,
cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))));
%% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
'HTTP/1.0' ->
ok
end,
+ State = maybe_reset_idle_timeout(State0),
commands(State, StreamID, Tail);
%% Send a full response.
%%
@@ -1037,17 +1052,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
%% considering all others are queued.
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
{State1, Headers} = connection(State0, Headers0, StreamID, Version),
- State = State1#state{out_state=done},
+ State2 = State1#state{out_state=done},
%% @todo Ensure content-length is set. 204 must never have content-length set.
Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
%% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2)
case Body of
{sendfile, _, _, _} ->
- ok = maybe_socket_error(State, Transport:send(Socket, Response)),
- sendfile(State, Body);
+ ok = maybe_socket_error(State2, Transport:send(Socket, Response)),
+ sendfile(State2, Body);
_ ->
- ok = maybe_socket_error(State, Transport:send(Socket, [Response, Body]))
+ ok = maybe_socket_error(State2, Transport:send(Socket, [Response, Body]))
end,
+ State = maybe_reset_idle_timeout(State2),
commands(State, StreamID, Tail);
%% Send response headers and initiate chunked encoding or streaming.
commands(State0=#state{socket=Socket, transport=Transport,
@@ -1084,9 +1100,10 @@ commands(State0=#state{socket=Socket, transport=Transport,
trailers -> Headers1;
_ -> maps:remove(<<"trailer">>, Headers1)
end,
- {State, Headers} = connection(State1, Headers2, StreamID, Version),
- ok = maybe_socket_error(State, Transport:send(Socket,
+ {State2, Headers} = connection(State1, Headers2, StreamID, Version),
+ ok = maybe_socket_error(State2, Transport:send(Socket,
cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))),
+ State = maybe_reset_idle_timeout(State2),
commands(State, StreamID, Tail);
%% Send a response body chunk.
%% @todo We need to kill the stream if it tries to send data before headers.
@@ -1147,17 +1164,18 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
end,
Stream0#stream{local_sent_size=SentSize}
end,
- State = case IsFin of
+ State1 = case IsFin of
fin -> State0#state{out_state=done};
nofin -> State0
end,
+ State = maybe_reset_idle_timeout(State1),
Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
commands(State#state{streams=Streams}, StreamID, Tail);
-commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
StreamID, [{trailers, Trailers}|Tail]) ->
case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
trailers ->
- ok = maybe_socket_error(State,
+ ok = maybe_socket_error(State0,
Transport:send(Socket, [
<<"0\r\n">>,
cow_http:headers(maps:to_list(Trailers)),
@@ -1165,12 +1183,13 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
])
);
no_trailers ->
- ok = maybe_socket_error(State,
+ ok = maybe_socket_error(State0,
Transport:send(Socket, <<"0\r\n\r\n">>));
not_chunked ->
ok
end,
- commands(State#state{out_state=done}, StreamID, Tail);
+ State = maybe_reset_idle_timeout(State0#state{out_state=done}),
+ commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 04935fb..59fd76b 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -57,6 +57,7 @@
middlewares => [module()],
preface_timeout => timeout(),
proxy_header => boolean(),
+ reset_idle_timeout_on_send => boolean(),
sendfile => boolean(),
settings_timeout => timeout(),
shutdown_timeout => timeout(),
@@ -318,6 +319,14 @@ set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) ->
end,
State#state{timer=TimerRef}.
+maybe_reset_idle_timeout(State=#state{opts=Opts}) ->
+ case maps:get(reset_idle_timeout_on_send, Opts, false) of
+ true ->
+ set_idle_timeout(State);
+ false ->
+ State
+ end.
+
%% HTTP/2 protocol parsing.
parse(State=#state{http2_status=sequence}, Data) ->
@@ -394,10 +403,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
goaway(State#state{http2_machine=HTTP2Machine}, GoAway);
{send, SendData, HTTP2Machine} ->
%% We may need to send an alarm for each of the streams sending data.
- lists:foldl(
+ State1 = lists:foldl(
fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end,
send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []),
- SendData);
+ SendData),
+ maybe_reset_idle_timeout(State1);
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
reset_stream(State#state{http2_machine=HTTP2Machine},
StreamID, {stream_error, Reason, Human});
@@ -409,6 +419,9 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) ->
%% if we were still waiting for a SETTINGS frame.
maybe_ack(State=#state{http2_status=settings}, Frame) ->
maybe_ack(State#state{http2_status=connected}, Frame);
+%% We do not reset the idle timeout on send here because we are
+%% sending data as a consequence of receiving data, which means
+%% we already resetted the idle timeout.
maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
case Frame of
{settings, _} ->
@@ -419,7 +432,7 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) ->
end,
State.
-data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) ->
+data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFin, Data) ->
case Streams of
#{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} ->
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
@@ -428,11 +441,26 @@ data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin
%% We may receive more data than we requested. We ensure
%% that the flow value doesn't go lower than 0.
Size = byte_size(Data),
- State = update_window(State0#state{flow=max(0, Flow - Size),
+ Flow = max(0, Flow0 - Size),
+ %% We would normally update the window when changing the flow
+ %% value. But because we are running commands, which themselves
+ %% may update the window, and we want to avoid updating the
+ %% window twice in a row, we first run the commands and then
+ %% only update the window a flow command was executed. We know
+ %% that it was because the flow value changed in the state.
+ State1 = State0#state{flow=Flow,
streams=Streams#{StreamID => Stream#stream{
flow=max(0, StreamFlow - Size), state=StreamState}}},
- StreamID),
- commands(State, StreamID, Commands)
+ State = commands(State1, StreamID, Commands),
+ case State of
+ %% No flow command was executed. We must update the window
+ %% because we changed the flow value earlier.
+ #state{flow=Flow} ->
+ update_window(State, StreamID);
+ %% Otherwise the window was updated already.
+ _ ->
+ State
+ end
catch Class:Exception:Stacktrace ->
cowboy:log(cowboy_stream:make_error_log(data,
[StreamID, IsFin, Data, StreamState0],
@@ -686,23 +714,37 @@ commands(State=#state{http2_machine=HTTP2Machine}, StreamID,
end;
%% Send an informational response.
commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) ->
- State = send_headers(State0, StreamID, idle, StatusCode, Headers),
+ State1 = send_headers(State0, StreamID, idle, StatusCode, Headers),
+ State = maybe_reset_idle_timeout(State1),
commands(State, StreamID, Tail);
%% Send response headers.
commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) ->
- State = send_response(State0, StreamID, StatusCode, Headers, Body),
+ State1 = send_response(State0, StreamID, StatusCode, Headers, Body),
+ State = maybe_reset_idle_timeout(State1),
commands(State, StreamID, Tail);
%% Send response headers.
commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) ->
- State = send_headers(State0, StreamID, nofin, StatusCode, Headers),
+ State1 = send_headers(State0, StreamID, nofin, StatusCode, Headers),
+ State = maybe_reset_idle_timeout(State1),
commands(State, StreamID, Tail);
%% Send a response body chunk.
commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
- State = maybe_send_data(State0, StreamID, IsFin, Data, []),
+ State = case maybe_send_data(State0, StreamID, IsFin, Data, []) of
+ {data_sent, State1} ->
+ maybe_reset_idle_timeout(State1);
+ {no_data_sent, State1} ->
+ State1
+ end,
commands(State, StreamID, Tail);
%% Send trailers.
commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
- State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []),
+ State = case maybe_send_data(State0, StreamID, fin,
+ {trailers, maps:to_list(Trailers)}, []) of
+ {data_sent, State1} ->
+ maybe_reset_idle_timeout(State1);
+ {no_data_sent, State1} ->
+ State1
+ end,
commands(State, StreamID, Tail);
%% Send a push promise.
%%
@@ -737,7 +779,8 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma
State1 = State0#state{http2_machine=HTTP2Machine},
ok = maybe_socket_error(State1, Transport:send(Socket,
cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))),
- headers_frame(State1, PromisedStreamID, fin, Headers, PseudoHeaders, 0);
+ State2 = maybe_reset_idle_timeout(State1),
+ headers_frame(State2, PromisedStreamID, fin, Headers, PseudoHeaders, 0);
{error, no_push} ->
State0
end,
@@ -760,6 +803,9 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) ->
%% @todo Only reset when the stream still exists.
reset_stream(State, StreamID, Error);
%% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself.
+%%
+%% We do not need to reset the idle timeout on send because it
+%% hasn't been set yet. This is called from init/12.
commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) ->
%% @todo This 101 response needs to be passed through stream handlers.
@@ -798,10 +844,12 @@ update_window(State0=#state{socket=Socket, transport=Transport,
end,
State = State0#state{http2_machine=HTTP2Machine},
case {Data1, Data2} of
- {<<>>, <<>>} -> ok;
- _ -> ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2]))
- end,
- State.
+ {<<>>, <<>>} ->
+ State;
+ _ ->
+ ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])),
+ maybe_reset_idle_timeout(State)
+ end.
%% Send the response, trailers or data.
@@ -821,8 +869,9 @@ send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode,
= cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin,
#{status => cow_http:status_to_integer(StatusCode)},
headers_to_list(Headers)),
- maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body,
- [cow_http2:headers(StreamID, nofin, HeaderBlock)])
+ {_, State} = maybe_send_data(State0#state{http2_machine=HTTP2Machine},
+ StreamID, fin, Body, [cow_http2:headers(StreamID, nofin, HeaderBlock)]),
+ State
end.
send_headers(State0=#state{socket=Socket, transport=Transport,
@@ -854,11 +903,15 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport,
State1 = State0#state{http2_machine=HTTP2Machine},
%% If we have prefix data (like a HEADERS frame) we need to send it
%% even if we do not send any DATA frames.
- case Prefix of
- [] -> ok;
- _ -> ok = maybe_socket_error(State1, Transport:send(Socket, Prefix))
+ WasDataSent = case Prefix of
+ [] ->
+ no_data_sent;
+ _ ->
+ ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)),
+ data_sent
end,
- maybe_send_data_alarm(State1, HTTP2Machine0, StreamID);
+ State = maybe_send_data_alarm(State1, HTTP2Machine0, StreamID),
+ {WasDataSent, State};
{send, SendData, HTTP2Machine} ->
State = #state{http2_status=Status, streams=Streams}
= send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix),
@@ -867,7 +920,7 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport,
Status =:= closing, Streams =:= #{} ->
terminate(State, {stop, normal, 'The connection is going away.'});
true ->
- maybe_send_data_alarm(State, HTTP2Machine0, StreamID)
+ {data_sent, maybe_send_data_alarm(State, HTTP2Machine0, StreamID)}
end
end.
@@ -983,6 +1036,10 @@ stream_alarm(State, StreamID, Name, Value) ->
%% We may have to cancel streams even if we receive multiple
%% GOAWAY frames as the LastStreamID value may be lower than
%% the one previously received.
+%%
+%% We do not reset the idle timeout on send here. We already
+%% disabled it if we initiated shutdown; and we already reset
+%% it if the client sent a GOAWAY frame.
goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0,
http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _})
when Status =:= connected; Status =:= closing_initiated; Status =:= closing ->
@@ -1159,6 +1216,10 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
end.
%% @todo Don't send an RST_STREAM if one was already sent.
+%%
+%% When resetting the stream we are technically sending data
+%% on the socket. However due to implementation complexities
+%% we do not attempt to reset the idle timeout on send.
reset_stream(State0=#state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, Error) ->
Reason = case Error of
diff --git a/test/handlers/streamed_result_h.erl b/test/handlers/streamed_result_h.erl
new file mode 100644
index 0000000..ea6f492
--- /dev/null
+++ b/test/handlers/streamed_result_h.erl
@@ -0,0 +1,20 @@
+-module(streamed_result_h).
+
+-export([init/2]).
+
+init(Req, Opts) ->
+ N = list_to_integer(binary_to_list(cowboy_req:binding(n, Req))),
+ Interval = list_to_integer(binary_to_list(cowboy_req:binding(interval, Req))),
+ chunked(N, Interval, Req, Opts).
+
+chunked(N, Interval, Req0, Opts) ->
+ Req = cowboy_req:stream_reply(200, Req0),
+ {ok, loop(N, Interval, Req), Opts}.
+
+loop(0, _Interval, Req) ->
+ ok = cowboy_req:stream_body("Finished!\n", fin, Req),
+ Req;
+loop(N, Interval, Req) ->
+ ok = cowboy_req:stream_body(iolist_to_binary([integer_to_list(N), <<"\n">>]), nofin, Req),
+ timer:sleep(Interval),
+ loop(N-1, Interval, Req).
diff --git a/test/http2_SUITE.erl b/test/http2_SUITE.erl
index 9a12ee1..37dfea3 100644
--- a/test/http2_SUITE.erl
+++ b/test/http2_SUITE.erl
@@ -29,7 +29,8 @@ init_dispatch(_) ->
cowboy_router:compile([{"localhost", [
{"/", hello_h, []},
{"/echo/:key", echo_h, []},
- {"/resp_iolist_body", resp_iolist_body_h, []}
+ {"/resp_iolist_body", resp_iolist_body_h, []},
+ {"/streamed_result/:n/:interval", streamed_result_h, []}
]}]).
%% Do a prior knowledge handshake (function originally copied from rfc7540_SUITE).
@@ -116,6 +117,15 @@ idle_timeout_reset_on_data(Config) ->
cowboy:stop_listener(?FUNCTION_NAME)
end.
+idle_timeout_on_send(Config) ->
+ doc("Ensure the idle timeout is not reset when sending (by default)."),
+ http_SUITE:do_idle_timeout_on_send(Config, http2).
+
+idle_timeout_reset_on_send(Config) ->
+ doc("Ensure the reset_idle_timeout_on_send results in the "
+ "idle timeout resetting when sending ."),
+ http_SUITE:do_idle_timeout_reset_on_send(Config, http2).
+
inactivity_timeout(Config) ->
doc("Terminate when the inactivity timeout is reached."),
ProtoOpts = #{
diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl
index ccac04f..8b8473f 100644
--- a/test/http_SUITE.erl
+++ b/test/http_SUITE.erl
@@ -45,7 +45,8 @@ init_dispatch(_) ->
{"/", hello_h, []},
{"/echo/:key", echo_h, []},
{"/resp/:key[/:arg]", resp_h, []},
- {"/set_options/:key", set_options_h, []}
+ {"/set_options/:key", set_options_h, []},
+ {"/streamed_result/:n/:interval", streamed_result_h, []}
]}]).
chunked_false(Config) ->
@@ -252,6 +253,82 @@ idle_timeout_infinity(Config) ->
cowboy:stop_listener(?FUNCTION_NAME)
end.
+idle_timeout_on_send(Config) ->
+ doc("Ensure the idle timeout is not reset when sending (by default)."),
+ do_idle_timeout_on_send(Config, http).
+
+%% Also used by http2_SUITE.
+do_idle_timeout_on_send(Config, Protocol) ->
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{
+ env => #{dispatch => init_dispatch(Config)},
+ idle_timeout => 1000
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ ConnPid = gun_open([{type, tcp}, {protocol, Protocol}, {port, Port}|Config]),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ #{socket := Socket} = gun:info(ConnPid),
+ Pid = get_remote_pid_tcp(Socket),
+ StreamRef = gun:get(ConnPid, "/streamed_result/10/250"),
+ Ref = erlang:monitor(process, Pid),
+ receive
+ {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
+ do_idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, false)
+ after 2000 ->
+ error(timeout)
+ end
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+idle_timeout_reset_on_send(Config) ->
+ doc("Ensure the reset_idle_timeout_on_send results in the "
+ "idle timeout resetting when sending ."),
+ do_idle_timeout_reset_on_send(Config, http).
+
+%% Also used by http2_SUITE.
+do_idle_timeout_reset_on_send(Config, Protocol) ->
+ {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [{port, 0}], #{
+ env => #{dispatch => init_dispatch(Config)},
+ idle_timeout => 1000,
+ reset_idle_timeout_on_send => true
+ }),
+ Port = ranch:get_port(?FUNCTION_NAME),
+ try
+ ConnPid = gun_open([{type, tcp}, {protocol, Protocol}, {port, Port}|Config]),
+ {ok, Protocol} = gun:await_up(ConnPid),
+ #{socket := Socket} = gun:info(ConnPid),
+ Pid = get_remote_pid_tcp(Socket),
+ StreamRef = gun:get(ConnPid, "/streamed_result/10/250"),
+ Ref = erlang:monitor(process, Pid),
+ receive
+ {gun_response, ConnPid, StreamRef, nofin, _Status, _Headers} ->
+ do_idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, true)
+ after 2000 ->
+ error(timeout)
+ end
+ after
+ cowboy:stop_listener(?FUNCTION_NAME)
+ end.
+
+do_idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion) ->
+ receive
+ {gun_data, ConnPid, StreamRef, nofin, _Data} ->
+ do_idle_timeout_recv_loop(Ref, Pid, ConnPid, StreamRef, ExpectCompletion);
+ {gun_data, ConnPid, StreamRef, fin, _Data} when ExpectCompletion ->
+ gun:close(ConnPid);
+ {gun_data, ConnPid, StreamRef, fin, _Data} ->
+ gun:close(ConnPid),
+ error(completed);
+ {'DOWN', Ref, process, Pid, _} when ExpectCompletion ->
+ gun:close(ConnPid),
+ error(exited);
+ {'DOWN', Ref, process, Pid, _} ->
+ ok
+ after 2000 ->
+ error(timeout)
+ end.
+
persistent_term_router(Config) ->
doc("The router can retrieve the routes from persistent_term storage."),
case erlang:function_exported(persistent_term, get, 1) of