diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_http.erl | 45 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 107 |
2 files changed, 116 insertions, 36 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index abaab06..bba2108 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -47,6 +47,7 @@ middlewares => [module()], proxy_header => boolean(), request_timeout => timeout(), + reset_idle_timeout_on_send => boolean(), sendfile => boolean(), shutdown_timeout => timeout(), stream_handlers => [module()], @@ -294,6 +295,14 @@ set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> end, State#state{timer=TimerRef}. +maybe_reset_idle_timeout(State=#state{opts=Opts}) -> + case maps:get(reset_idle_timeout_on_send, Opts, false) of + true -> + set_timeout(State, idle_timeout); + false -> + State + end. + cancel_timeout(State=#state{timer=TimerRef}) -> ok = case TimerRef of undefined -> @@ -366,6 +375,11 @@ after_parse({request, Req=#{streamid := StreamID, method := Method, cowboy:log(cowboy_stream:make_error_log(init, [StreamID, Req, Opts], Class, Exception, Stacktrace), Opts), + %% We do not reset the idle timeout on send here + %% because an error occurred in the application. While we + %% are keeping the connection open for further requests we + %% do not want to keep the connection up too long if no + %% additional requests come in. early_error(500, State0, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:init/3.'}, Req), parse(Buffer, State0) @@ -1012,19 +1026,20 @@ commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID, commands(State, StreamID, [{error_response, _, _, _}|Tail]) -> commands(State, StreamID, Tail); %% Send an informational response. -commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, +commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID, [{inform, StatusCode, Headers}|Tail]) -> %% @todo I'm pretty sure the last stream in the list is the one we want %% considering all others are queued. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), _ = case Version of 'HTTP/1.1' -> - ok = maybe_socket_error(State, Transport:send(Socket, + ok = maybe_socket_error(State0, Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))); %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2) 'HTTP/1.0' -> ok end, + State = maybe_reset_idle_timeout(State0), commands(State, StreamID, Tail); %% Send a full response. %% @@ -1037,17 +1052,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea %% considering all others are queued. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), {State1, Headers} = connection(State0, Headers0, StreamID, Version), - State = State1#state{out_state=done}, + State2 = State1#state{out_state=done}, %% @todo Ensure content-length is set. 204 must never have content-length set. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)), %% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2) case Body of {sendfile, _, _, _} -> - ok = maybe_socket_error(State, Transport:send(Socket, Response)), - sendfile(State, Body); + ok = maybe_socket_error(State2, Transport:send(Socket, Response)), + sendfile(State2, Body); _ -> - ok = maybe_socket_error(State, Transport:send(Socket, [Response, Body])) + ok = maybe_socket_error(State2, Transport:send(Socket, [Response, Body])) end, + State = maybe_reset_idle_timeout(State2), commands(State, StreamID, Tail); %% Send response headers and initiate chunked encoding or streaming. commands(State0=#state{socket=Socket, transport=Transport, @@ -1084,9 +1100,10 @@ commands(State0=#state{socket=Socket, transport=Transport, trailers -> Headers1; _ -> maps:remove(<<"trailer">>, Headers1) end, - {State, Headers} = connection(State1, Headers2, StreamID, Version), - ok = maybe_socket_error(State, Transport:send(Socket, + {State2, Headers} = connection(State1, Headers2, StreamID, Version), + ok = maybe_socket_error(State2, Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))), + State = maybe_reset_idle_timeout(State2), commands(State, StreamID, Tail); %% Send a response body chunk. %% @todo We need to kill the stream if it tries to send data before headers. @@ -1147,17 +1164,18 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out end, Stream0#stream{local_sent_size=SentSize} end, - State = case IsFin of + State1 = case IsFin of fin -> State0#state{out_state=done}; nofin -> State0 end, + State = maybe_reset_idle_timeout(State1), Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), commands(State#state{streams=Streams}, StreamID, Tail); -commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, StreamID, [{trailers, Trailers}|Tail]) -> case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of trailers -> - ok = maybe_socket_error(State, + ok = maybe_socket_error(State0, Transport:send(Socket, [ <<"0\r\n">>, cow_http:headers(maps:to_list(Trailers)), @@ -1165,12 +1183,13 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s ]) ); no_trailers -> - ok = maybe_socket_error(State, + ok = maybe_socket_error(State0, Transport:send(Socket, <<"0\r\n\r\n">>)); not_chunked -> ok end, - commands(State#state{out_state=done}, StreamID, Tail); + State = maybe_reset_idle_timeout(State0#state{out_state=done}), + commands(State, StreamID, Tail); %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 04935fb..59fd76b 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -57,6 +57,7 @@ middlewares => [module()], preface_timeout => timeout(), proxy_header => boolean(), + reset_idle_timeout_on_send => boolean(), sendfile => boolean(), settings_timeout => timeout(), shutdown_timeout => timeout(), @@ -318,6 +319,14 @@ set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> end, State#state{timer=TimerRef}. +maybe_reset_idle_timeout(State=#state{opts=Opts}) -> + case maps:get(reset_idle_timeout_on_send, Opts, false) of + true -> + set_idle_timeout(State); + false -> + State + end. + %% HTTP/2 protocol parsing. parse(State=#state{http2_status=sequence}, Data) -> @@ -394,10 +403,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> goaway(State#state{http2_machine=HTTP2Machine}, GoAway); {send, SendData, HTTP2Machine} -> %% We may need to send an alarm for each of the streams sending data. - lists:foldl( + State1 = lists:foldl( fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end, send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []), - SendData); + SendData), + maybe_reset_idle_timeout(State1); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> reset_stream(State#state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}); @@ -409,6 +419,9 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> %% if we were still waiting for a SETTINGS frame. maybe_ack(State=#state{http2_status=settings}, Frame) -> maybe_ack(State#state{http2_status=connected}, Frame); +%% We do not reset the idle timeout on send here because we are +%% sending data as a consequence of receiving data, which means +%% we already resetted the idle timeout. maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> case Frame of {settings, _} -> @@ -419,7 +432,7 @@ maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> end, State. -data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFin, Data) -> case Streams of #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -428,11 +441,26 @@ data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin %% We may receive more data than we requested. We ensure %% that the flow value doesn't go lower than 0. Size = byte_size(Data), - State = update_window(State0#state{flow=max(0, Flow - Size), + Flow = max(0, Flow0 - Size), + %% We would normally update the window when changing the flow + %% value. But because we are running commands, which themselves + %% may update the window, and we want to avoid updating the + %% window twice in a row, we first run the commands and then + %% only update the window a flow command was executed. We know + %% that it was because the flow value changed in the state. + State1 = State0#state{flow=Flow, streams=Streams#{StreamID => Stream#stream{ flow=max(0, StreamFlow - Size), state=StreamState}}}, - StreamID), - commands(State, StreamID, Commands) + State = commands(State1, StreamID, Commands), + case State of + %% No flow command was executed. We must update the window + %% because we changed the flow value earlier. + #state{flow=Flow} -> + update_window(State, StreamID); + %% Otherwise the window was updated already. + _ -> + State + end catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], @@ -686,23 +714,37 @@ commands(State=#state{http2_machine=HTTP2Machine}, StreamID, end; %% Send an informational response. commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, idle, StatusCode, Headers), + State1 = send_headers(State0, StreamID, idle, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) -> - State = send_response(State0, StreamID, StatusCode, Headers, Body), + State1 = send_response(State0, StreamID, StatusCode, Headers, Body), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State1 = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send a response body chunk. commands(State0, StreamID, [{data, IsFin, Data}|Tail]) -> - State = maybe_send_data(State0, StreamID, IsFin, Data, []), + State = case maybe_send_data(State0, StreamID, IsFin, Data, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send trailers. commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> - State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []), + State = case maybe_send_data(State0, StreamID, fin, + {trailers, maps:to_list(Trailers)}, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send a push promise. %% @@ -737,7 +779,8 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State1 = State0#state{http2_machine=HTTP2Machine}, ok = maybe_socket_error(State1, Transport:send(Socket, cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))), - headers_frame(State1, PromisedStreamID, fin, Headers, PseudoHeaders, 0); + State2 = maybe_reset_idle_timeout(State1), + headers_frame(State2, PromisedStreamID, fin, Headers, PseudoHeaders, 0); {error, no_push} -> State0 end, @@ -760,6 +803,9 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Only reset when the stream still exists. reset_stream(State, StreamID, Error); %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself. +%% +%% We do not need to reset the idle timeout on send because it +%% hasn't been set yet. This is called from init/12. commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> %% @todo This 101 response needs to be passed through stream handlers. @@ -798,10 +844,12 @@ update_window(State0=#state{socket=Socket, transport=Transport, end, State = State0#state{http2_machine=HTTP2Machine}, case {Data1, Data2} of - {<<>>, <<>>} -> ok; - _ -> ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])) - end, - State. + {<<>>, <<>>} -> + State; + _ -> + ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])), + maybe_reset_idle_timeout(State) + end. %% Send the response, trailers or data. @@ -821,8 +869,9 @@ send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body, - [cow_http2:headers(StreamID, nofin, HeaderBlock)]) + {_, State} = maybe_send_data(State0#state{http2_machine=HTTP2Machine}, + StreamID, fin, Body, [cow_http2:headers(StreamID, nofin, HeaderBlock)]), + State end. send_headers(State0=#state{socket=Socket, transport=Transport, @@ -854,11 +903,15 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, State1 = State0#state{http2_machine=HTTP2Machine}, %% If we have prefix data (like a HEADERS frame) we need to send it %% even if we do not send any DATA frames. - case Prefix of - [] -> ok; - _ -> ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)) + WasDataSent = case Prefix of + [] -> + no_data_sent; + _ -> + ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)), + data_sent end, - maybe_send_data_alarm(State1, HTTP2Machine0, StreamID); + State = maybe_send_data_alarm(State1, HTTP2Machine0, StreamID), + {WasDataSent, State}; {send, SendData, HTTP2Machine} -> State = #state{http2_status=Status, streams=Streams} = send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix), @@ -867,7 +920,7 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); true -> - maybe_send_data_alarm(State, HTTP2Machine0, StreamID) + {data_sent, maybe_send_data_alarm(State, HTTP2Machine0, StreamID)} end end. @@ -983,6 +1036,10 @@ stream_alarm(State, StreamID, Name, Value) -> %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. +%% +%% We do not reset the idle timeout on send here. We already +%% disabled it if we initiated shutdown; and we already reset +%% it if the client sent a GOAWAY frame. goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> @@ -1159,6 +1216,10 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> end. %% @todo Don't send an RST_STREAM if one was already sent. +%% +%% When resetting the stream we are technically sending data +%% on the socket. However due to implementation complexities +%% we do not attempt to reset the idle timeout on send. reset_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, Error) -> Reason = case Error of |