diff options
-rw-r--r-- | src/cowboy_http.erl | 42 | ||||
-rw-r--r-- | test/handlers/stream_handler_h.erl | 7 | ||||
-rw-r--r-- | test/stream_handler_SUITE.erl | 20 |
3 files changed, 50 insertions, 19 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 0be949e..bfbd290 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -98,7 +98,7 @@ out_streamid = 1 :: pos_integer(), %% Whether we finished writing data for the current stream. - out_state = wait :: wait | headers | chunked | done, + out_state = wait :: wait | chunked | done, %% The connection will be closed after this stream. last_streamid = undefined :: pos_integer(), @@ -809,12 +809,10 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea case Body of {sendfile, O, B, P} -> Transport:send(Socket, Response), - commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]); + commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]); _ -> Transport:send(Socket, [Response, Body]), - %% @todo If max number of requests, close connection. - %% @todo If IsFin, maybe skip body of current request. - maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin) + commands(State#state{out_state=done}, StreamID, Tail) end; %% Send response headers and initiate chunked encoding. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, @@ -836,7 +834,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str %% %% @todo WINDOW_UPDATE stuff require us to buffer some data. %% @todo We probably want to allow Data to be the {sendfile, ...} tuple also. -commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, [{data, IsFin, Data}|Tail]) -> %% Do not send anything when the user asks to send an empty %% data frame, as that would break the protocol. @@ -853,12 +851,20 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, Stre Transport:send(Socket, Data) end end, - maybe_terminate(State, StreamID, Tail, IsFin); + State = case IsFin of + fin -> State0#state{out_state=done}; + nofin -> State0 + end, + commands(State, StreamID, Tail); %% Send a file. -commands(State=#state{socket=Socket, transport=Transport}, StreamID, +commands(State0=#state{socket=Socket, transport=Transport}, StreamID, [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> Transport:sendfile(Socket, Path, Offset, Bytes), - maybe_terminate(State, StreamID, Tail, IsFin); + State = case IsFin of + fin -> State0#state{out_state=done}; + nofin -> State0 + end, + commands(State, StreamID, Tail); %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, opts=Opts, children=Children}, StreamID, @@ -886,11 +892,13 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% Stream shutdown. commands(State, StreamID, [stop|Tail]) -> %% @todo Do we want to run the commands after a stop? -% commands(stream_terminate(State, StreamID, stop), StreamID, Tail). - - %% @todo I think that's where we need to terminate streams. - - maybe_terminate(State, StreamID, Tail, fin); + %% @todo We currently wait for the stop command before we + %% continue with the next request/response. In theory, if + %% the request body was read fully and the response body + %% was sent fully we should be able to start working on + %% the next request concurrently. This can be done as a + %% future optimization. + maybe_terminate(State, StreamID, Tail); %% HTTP/1.1 does not support push; ignore. commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) -> commands(State, StreamID, Tail). @@ -905,12 +913,10 @@ headers_to_list(Headers) -> flush() -> receive _ -> flush() after 0 -> ok end. -maybe_terminate(State, StreamID, Tail, nofin) -> - commands(State, StreamID, Tail); %% @todo In these cases I'm not sure if we should continue processing commands. -maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) -> +maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail) -> terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok? -maybe_terminate(State, StreamID, _Tail, fin) -> +maybe_terminate(State, StreamID, _Tail) -> stream_terminate(State, StreamID, normal). stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> diff --git a/test/handlers/stream_handler_h.erl b/test/handlers/stream_handler_h.erl index 74fc478..23d6b15 100644 --- a/test/handlers/stream_handler_h.erl +++ b/test/handlers/stream_handler_h.erl @@ -43,6 +43,8 @@ init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) -> init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) -> Spawn = init_process(true, State), [{headers, 200, #{}}, {spawn, Spawn, 2000}]; +init_commands(_, _, State=#state{test=terminate_on_stop}) -> + [{response, 204, #{}, <<>>}]; init_commands(_, _, _) -> [{headers, 200, #{}}]. @@ -72,7 +74,10 @@ info(_, crash, #state{test=crash_in_info}) -> error(crash); info(StreamID, Info, State=#state{pid=Pid}) -> Pid ! {Pid, self(), info, StreamID, Info, State}, - {[], State}. + case Info of + please_stop -> {[stop], State}; + _ -> {[], State} + end. terminate(StreamID, Reason, State=#state{pid=Pid, test=crash_in_terminate}) -> Pid ! {Pid, self(), terminate, StreamID, Reason, State}, diff --git a/test/stream_handler_SUITE.erl b/test/stream_handler_SUITE.erl index 632adff..594e025 100644 --- a/test/stream_handler_SUITE.erl +++ b/test/stream_handler_SUITE.erl @@ -341,3 +341,23 @@ terminate_on_socket_close(Config) -> %% Confirm terminate/3 is called. receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, ok. + +terminate_on_stop(Config) -> + doc("Confirm terminate/3 is called after stop is returned."), + Self = self(), + ConnPid = gun_open(Config), + Ref = gun:get(ConnPid, "/long_polling", [ + {<<"accept-encoding">>, <<"gzip">>}, + {<<"x-test-case">>, <<"terminate_on_stop">>}, + {<<"x-test-pid">>, pid_to_list(Self)} + ]), + %% Confirm init/3 is called and receive the response. + Pid = receive {Self, P, init, _, _, _} -> P after 1000 -> error(timeout) end, + {response, fin, 204, _} = gun:await(ConnPid, Ref), + %% Confirm the stream is still alive even though we + %% received the response fully, and tell it to stop. + Pid ! {{Pid, 1}, please_stop}, + receive {Self, Pid, info, _, please_stop, _} -> ok after 1000 -> error(timeout) end, + %% Confirm terminate/3 is called. + receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, + ok. |