aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-10-20 13:16:04 +0100
committerLoïc Hoguin <[email protected]>2017-10-20 13:16:04 +0100
commitc602871f86c795da8463908d12f7bf966bfeec12 (patch)
tree24795ef05335e4a23df9c2695b8a4431ccea1ac1
parentb9526a1745cad73624a1a01a89f6498797c18443 (diff)
downloadcowboy-c602871f86c795da8463908d12f7bf966bfeec12.tar.gz
cowboy-c602871f86c795da8463908d12f7bf966bfeec12.tar.bz2
cowboy-c602871f86c795da8463908d12f7bf966bfeec12.zip
Fix HTTP/1.1 stopping streams too early
It is possible in some cases to move on to the next request without waiting, but that can be done as an optimization later on if necessary.
-rw-r--r--src/cowboy_http.erl42
-rw-r--r--test/handlers/stream_handler_h.erl7
-rw-r--r--test/stream_handler_SUITE.erl20
3 files changed, 50 insertions, 19 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 0be949e..bfbd290 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -98,7 +98,7 @@
out_streamid = 1 :: pos_integer(),
%% Whether we finished writing data for the current stream.
- out_state = wait :: wait | headers | chunked | done,
+ out_state = wait :: wait | chunked | done,
%% The connection will be closed after this stream.
last_streamid = undefined :: pos_integer(),
@@ -809,12 +809,10 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
case Body of
{sendfile, O, B, P} ->
Transport:send(Socket, Response),
- commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+ commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
_ ->
Transport:send(Socket, [Response, Body]),
- %% @todo If max number of requests, close connection.
- %% @todo If IsFin, maybe skip body of current request.
- maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
+ commands(State#state{out_state=done}, StreamID, Tail)
end;
%% Send response headers and initiate chunked encoding.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
@@ -836,7 +834,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str
%%
%% @todo WINDOW_UPDATE stuff require us to buffer some data.
%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
-commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
[{data, IsFin, Data}|Tail]) ->
%% Do not send anything when the user asks to send an empty
%% data frame, as that would break the protocol.
@@ -853,12 +851,20 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, Stre
Transport:send(Socket, Data)
end
end,
- maybe_terminate(State, StreamID, Tail, IsFin);
+ State = case IsFin of
+ fin -> State0#state{out_state=done};
+ nofin -> State0
+ end,
+ commands(State, StreamID, Tail);
%% Send a file.
-commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
Transport:sendfile(Socket, Path, Offset, Bytes),
- maybe_terminate(State, StreamID, Tail, IsFin);
+ State = case IsFin of
+ fin -> State0#state{out_state=done};
+ nofin -> State0
+ end,
+ commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
opts=Opts, children=Children}, StreamID,
@@ -886,11 +892,13 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
%% @todo Do we want to run the commands after a stop?
-% commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
-
- %% @todo I think that's where we need to terminate streams.
-
- maybe_terminate(State, StreamID, Tail, fin);
+ %% @todo We currently wait for the stop command before we
+ %% continue with the next request/response. In theory, if
+ %% the request body was read fully and the response body
+ %% was sent fully we should be able to start working on
+ %% the next request concurrently. This can be done as a
+ %% future optimization.
+ maybe_terminate(State, StreamID, Tail);
%% HTTP/1.1 does not support push; ignore.
commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
commands(State, StreamID, Tail).
@@ -905,12 +913,10 @@ headers_to_list(Headers) ->
flush() ->
receive _ -> flush() after 0 -> ok end.
-maybe_terminate(State, StreamID, Tail, nofin) ->
- commands(State, StreamID, Tail);
%% @todo In these cases I'm not sure if we should continue processing commands.
-maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
+maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
-maybe_terminate(State, StreamID, _Tail, fin) ->
+maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
diff --git a/test/handlers/stream_handler_h.erl b/test/handlers/stream_handler_h.erl
index 74fc478..23d6b15 100644
--- a/test/handlers/stream_handler_h.erl
+++ b/test/handlers/stream_handler_h.erl
@@ -43,6 +43,8 @@ init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) ->
init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) ->
Spawn = init_process(true, State),
[{headers, 200, #{}}, {spawn, Spawn, 2000}];
+init_commands(_, _, State=#state{test=terminate_on_stop}) ->
+ [{response, 204, #{}, <<>>}];
init_commands(_, _, _) ->
[{headers, 200, #{}}].
@@ -72,7 +74,10 @@ info(_, crash, #state{test=crash_in_info}) ->
error(crash);
info(StreamID, Info, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), info, StreamID, Info, State},
- {[], State}.
+ case Info of
+ please_stop -> {[stop], State};
+ _ -> {[], State}
+ end.
terminate(StreamID, Reason, State=#state{pid=Pid, test=crash_in_terminate}) ->
Pid ! {Pid, self(), terminate, StreamID, Reason, State},
diff --git a/test/stream_handler_SUITE.erl b/test/stream_handler_SUITE.erl
index 632adff..594e025 100644
--- a/test/stream_handler_SUITE.erl
+++ b/test/stream_handler_SUITE.erl
@@ -341,3 +341,23 @@ terminate_on_socket_close(Config) ->
%% Confirm terminate/3 is called.
receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
ok.
+
+terminate_on_stop(Config) ->
+ doc("Confirm terminate/3 is called after stop is returned."),
+ Self = self(),
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/long_polling", [
+ {<<"accept-encoding">>, <<"gzip">>},
+ {<<"x-test-case">>, <<"terminate_on_stop">>},
+ {<<"x-test-pid">>, pid_to_list(Self)}
+ ]),
+ %% Confirm init/3 is called and receive the response.
+ Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
+ {response, fin, 204, _} = gun:await(ConnPid, Ref),
+ %% Confirm the stream is still alive even though we
+ %% received the response fully, and tell it to stop.
+ Pid ! {{Pid, 1}, please_stop},
+ receive {Self, Pid, info, _, please_stop, _} -> ok after 1000 -> error(timeout) end,
+ %% Confirm terminate/3 is called.
+ receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
+ ok.