aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/cowboy_http.erl42
-rw-r--r--test/handlers/stream_handler_h.erl7
-rw-r--r--test/stream_handler_SUITE.erl20
3 files changed, 50 insertions, 19 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 0be949e..bfbd290 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -98,7 +98,7 @@
out_streamid = 1 :: pos_integer(),
%% Whether we finished writing data for the current stream.
- out_state = wait :: wait | headers | chunked | done,
+ out_state = wait :: wait | chunked | done,
%% The connection will be closed after this stream.
last_streamid = undefined :: pos_integer(),
@@ -809,12 +809,10 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
case Body of
{sendfile, O, B, P} ->
Transport:send(Socket, Response),
- commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+ commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
_ ->
Transport:send(Socket, [Response, Body]),
- %% @todo If max number of requests, close connection.
- %% @todo If IsFin, maybe skip body of current request.
- maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
+ commands(State#state{out_state=done}, StreamID, Tail)
end;
%% Send response headers and initiate chunked encoding.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
@@ -836,7 +834,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str
%%
%% @todo WINDOW_UPDATE stuff require us to buffer some data.
%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
-commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
[{data, IsFin, Data}|Tail]) ->
%% Do not send anything when the user asks to send an empty
%% data frame, as that would break the protocol.
@@ -853,12 +851,20 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, Stre
Transport:send(Socket, Data)
end
end,
- maybe_terminate(State, StreamID, Tail, IsFin);
+ State = case IsFin of
+ fin -> State0#state{out_state=done};
+ nofin -> State0
+ end,
+ commands(State, StreamID, Tail);
%% Send a file.
-commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+commands(State0=#state{socket=Socket, transport=Transport}, StreamID,
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
Transport:sendfile(Socket, Path, Offset, Bytes),
- maybe_terminate(State, StreamID, Tail, IsFin);
+ State = case IsFin of
+ fin -> State0#state{out_state=done};
+ nofin -> State0
+ end,
+ commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
opts=Opts, children=Children}, StreamID,
@@ -886,11 +892,13 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
%% @todo Do we want to run the commands after a stop?
-% commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
-
- %% @todo I think that's where we need to terminate streams.
-
- maybe_terminate(State, StreamID, Tail, fin);
+ %% @todo We currently wait for the stop command before we
+ %% continue with the next request/response. In theory, if
+ %% the request body was read fully and the response body
+ %% was sent fully we should be able to start working on
+ %% the next request concurrently. This can be done as a
+ %% future optimization.
+ maybe_terminate(State, StreamID, Tail);
%% HTTP/1.1 does not support push; ignore.
commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) ->
commands(State, StreamID, Tail).
@@ -905,12 +913,10 @@ headers_to_list(Headers) ->
flush() ->
receive _ -> flush() after 0 -> ok end.
-maybe_terminate(State, StreamID, Tail, nofin) ->
- commands(State, StreamID, Tail);
%% @todo In these cases I'm not sure if we should continue processing commands.
-maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
+maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) ->
terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
-maybe_terminate(State, StreamID, _Tail, fin) ->
+maybe_terminate(State, StreamID, _Tail) ->
stream_terminate(State, StreamID, normal).
stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
diff --git a/test/handlers/stream_handler_h.erl b/test/handlers/stream_handler_h.erl
index 74fc478..23d6b15 100644
--- a/test/handlers/stream_handler_h.erl
+++ b/test/handlers/stream_handler_h.erl
@@ -43,6 +43,8 @@ init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) ->
init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) ->
Spawn = init_process(true, State),
[{headers, 200, #{}}, {spawn, Spawn, 2000}];
+init_commands(_, _, State=#state{test=terminate_on_stop}) ->
+ [{response, 204, #{}, <<>>}];
init_commands(_, _, _) ->
[{headers, 200, #{}}].
@@ -72,7 +74,10 @@ info(_, crash, #state{test=crash_in_info}) ->
error(crash);
info(StreamID, Info, State=#state{pid=Pid}) ->
Pid ! {Pid, self(), info, StreamID, Info, State},
- {[], State}.
+ case Info of
+ please_stop -> {[stop], State};
+ _ -> {[], State}
+ end.
terminate(StreamID, Reason, State=#state{pid=Pid, test=crash_in_terminate}) ->
Pid ! {Pid, self(), terminate, StreamID, Reason, State},
diff --git a/test/stream_handler_SUITE.erl b/test/stream_handler_SUITE.erl
index 632adff..594e025 100644
--- a/test/stream_handler_SUITE.erl
+++ b/test/stream_handler_SUITE.erl
@@ -341,3 +341,23 @@ terminate_on_socket_close(Config) ->
%% Confirm terminate/3 is called.
receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
ok.
+
+terminate_on_stop(Config) ->
+ doc("Confirm terminate/3 is called after stop is returned."),
+ Self = self(),
+ ConnPid = gun_open(Config),
+ Ref = gun:get(ConnPid, "/long_polling", [
+ {<<"accept-encoding">>, <<"gzip">>},
+ {<<"x-test-case">>, <<"terminate_on_stop">>},
+ {<<"x-test-pid">>, pid_to_list(Self)}
+ ]),
+ %% Confirm init/3 is called and receive the response.
+ Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end,
+ {response, fin, 204, _} = gun:await(ConnPid, Ref),
+ %% Confirm the stream is still alive even though we
+ %% received the response fully, and tell it to stop.
+ Pid ! {{Pid, 1}, please_stop},
+ receive {Self, Pid, info, _, please_stop, _} -> ok after 1000 -> error(timeout) end,
+ %% Confirm terminate/3 is called.
+ receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end,
+ ok.