diff options
-rw-r--r-- | src/cowboy_handler.erl | 19 | ||||
-rw-r--r-- | test/http_SUITE.erl | 24 | ||||
-rw-r--r-- | test/http_SUITE_data/http_loop_stream_recv.erl | 41 |
3 files changed, 82 insertions, 2 deletions
diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl index fcbfe55..e431ba8 100644 --- a/src/cowboy_handler.erl +++ b/src/cowboy_handler.erl @@ -211,7 +211,7 @@ handler_loop(Req, State=#state{loop_buffer_size=NbBytes, handler_after_loop(Req, State, Handler, HandlerState, {normal, timeout}); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> - handler_before_loop(Req, State, Handler, HandlerState); + handler_loop(Req, State, Handler, HandlerState); Message -> %% We set the socket back to {active, false} mode in case %% the handler is going to call recv. We also flush any @@ -280,8 +280,14 @@ handler_after_loop(Req, State, Handler, HandlerState, Reason) -> -spec terminate_request(Req, #state{}, module(), any(), {normal, timeout | shutdown} | {error, atom()}) -> {ok, Req, cowboy_middleware:env()} when Req::cowboy_req:req(). -terminate_request(Req, #state{env=Env}, Handler, HandlerState, Reason) -> +terminate_request(Req, #state{env=Env, loop_timeout_ref=TRef}, + Handler, HandlerState, Reason) -> HandlerRes = handler_terminate(Req, Handler, HandlerState, Reason), + _ = case TRef of + undefined -> ignore; + TRef -> erlang:cancel_timer(TRef) + end, + flush_timeouts(), {ok, Req, [{result, HandlerRes}|Env]}. -spec handler_terminate(cowboy_req:req(), module(), any(), @@ -299,3 +305,12 @@ handler_terminate(Req, Handler, HandlerState, Reason) -> {terminate_reason, Reason} ]) end. + +-spec flush_timeouts() -> ok. +flush_timeouts() -> + receive + {timeout, TRef, ?MODULE} when is_reference(TRef) -> + flush_timeouts() + after 0 -> + ok + end. diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index f0196ec..977cc1a 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -47,6 +47,7 @@ -export([http10_hostless/1]). -export([keepalive_max/1]). -export([keepalive_nl/1]). +-export([keepalive_stream_loop/1]). -export([multipart/1]). -export([nc_rand/1]). -export([nc_zero/1]). @@ -132,6 +133,7 @@ groups() -> http10_hostless, keepalive_max, keepalive_nl, + keepalive_stream_loop, multipart, nc_rand, nc_zero, @@ -407,6 +409,7 @@ init_dispatch(Config) -> {"/rest_expires", rest_expires, []}, {"/rest_empty_resource", rest_empty_resource, []}, {"/loop_recv", http_loop_recv, []}, + {"/loop_stream_recv", http_loop_stream_recv, []}, {"/loop_timeout", http_loop_timeout, []}, {"/", http_handler, []} ]} @@ -725,6 +728,27 @@ keepalive_nl_loop(Client, URL, N) -> ok = Transport:send(Socket, <<"\r\n">>), %% empty line keepalive_nl_loop(Client3, URL, N - 1). +keepalive_stream_loop(Config) -> + Client = ?config(client, Config), + Transport = ?config(transport, Config), + {ok, Client2} = cowboy_client:connect( + Transport, "localhost", ?config(port, Config), Client), + keepalive_stream_loop(Client2, 10). + +keepalive_stream_loop(Client, 0) -> + {error, closed} = cowboy_client:response(Client), + ok; +keepalive_stream_loop(Client, N) -> + {ok, _} = cowboy_client:raw_request("POST /loop_stream_recv HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: keepalive\r\n" + "Transfer-Encoding: chunked\r\n\r\n", Client), + _ = [{ok, _} = cowboy_client:raw_request(<<"4\r\n",Id:32,"\r\n">>, Client) || + Id <- lists:seq(1, 250)], + {ok, _} = cowboy_client:raw_request(<<"0\r\n\r\n">>, Client), + {ok, 200, _, _} = cowboy_client:response(Client), + keepalive_stream_loop(Client, N-1). + multipart(Config) -> Client = ?config(client, Config), Body = << diff --git a/test/http_SUITE_data/http_loop_stream_recv.erl b/test/http_SUITE_data/http_loop_stream_recv.erl new file mode 100644 index 0000000..87113c6 --- /dev/null +++ b/test/http_SUITE_data/http_loop_stream_recv.erl @@ -0,0 +1,41 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(http_loop_stream_recv). +-export([init/3]). +-export([info/3]). +-export([terminate/3]). + +init({_, http}, Req, _) -> + receive after 100 -> ok end, + self() ! stream, + {loop, Req, 1, 100}. + +info(stream, Req, Id) -> + case stream_next(Req) of + {ok, Id, Req2} -> + info(stream, Req2, Id+1); + {done, Req2} -> + {ok, Req3} = cowboy_req:reply(200, Req2), + {ok, Req3, Id} + end. + +terminate({normal, shutdown}, _, _) -> + ok. + +stream_next(Req) -> + stream_next(<<>>, Req). + +stream_next(Buffer, Req) -> + case cowboy_req:stream_body(Req) of + {ok, Packet, Req2} -> + case <<Buffer/binary, Packet/binary>> of + <<Id:32>> -> + {ok, Id, Req2}; + Buffer2 when byte_size(Buffer2) < 4 -> + stream_next(Buffer2, Req2); + _InvalidBuffer -> + {error, invalid_chunk} + end; + Other -> + Other + end. |