diff options
author | Loïc Hoguin <[email protected]> | 2018-11-09 17:42:37 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2018-11-09 17:42:37 +0100 |
commit | d7b7580b3913c17b404319cc4c153748d5e59194 (patch) | |
tree | 3026f38e4bcfdcb26d067a4e5aec6d401700c3cc /src | |
parent | 29043aa7b4d11e377bc76d453f592ea5a6df1f43 (diff) | |
download | cowboy-d7b7580b3913c17b404319cc4c153748d5e59194.tar.gz cowboy-d7b7580b3913c17b404319cc4c153748d5e59194.tar.bz2 cowboy-d7b7580b3913c17b404319cc4c153748d5e59194.zip |
Add sendfile support to cowboy_req:stream_body
It is now possible to stream one or more sendfile tuples.
A simple example of what can now be done would be for
example to build a tar file on the fly using the sendfile
syscall for sending the files, or to support Range requests
with more than one range with the sendfile syscall.
When using cowboy_compress_h unfortunately we have to read
the file in order to send it. More options will be added
at a later time to make sure users don't read too much
into memory. This is a new feature however so existing
code is not affected.
Also rework cowboy_http's data sending to be flatter.
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_compress_h.erl | 29 | ||||
-rw-r--r-- | src/cowboy_http.erl | 159 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 6 | ||||
-rw-r--r-- | src/cowboy_metrics_h.erl | 2 | ||||
-rw-r--r-- | src/cowboy_req.erl | 22 | ||||
-rw-r--r-- | src/cowboy_stream.erl | 2 |
6 files changed, 128 insertions, 92 deletions
diff --git a/src/cowboy_compress_h.erl b/src/cowboy_compress_h.erl index 781cc0a..ed81db6 100644 --- a/src/cowboy_compress_h.erl +++ b/src/cowboy_compress_h.erl @@ -98,7 +98,7 @@ fold(Commands, State) -> fold([], State, Acc) -> {lists:reverse(Acc), State}; -%% We do not compress sendfile bodies. +%% We do not compress full sendfile bodies. fold([Response={response, _, _, {sendfile, _, _, _}}|Tail], State, Acc) -> fold(Tail, State, [Response|Acc]); %% We compress full responses directly, unless they are lower than @@ -171,6 +171,21 @@ gzip_headers({headers, Status, Headers0}, State) -> <<"content-encoding">> => <<"gzip">> }}, State#state{deflate=Z}}. +%% It is not possible to combine zlib and the sendfile +%% syscall as far as I can tell, because the zlib format +%% includes a checksum at the end of the stream. We have +%% to read the file in memory, making this not suitable for +%% large files. +gzip_data({data, nofin, Sendfile={sendfile, _, _, _}}, State=#state{deflate=Z}) -> + {ok, Data0} = read_file(Sendfile), + Data = zlib:deflate(Z, Data0), + {{data, nofin, Data}, State}; +gzip_data({data, fin, Sendfile={sendfile, _, _, _}}, State=#state{deflate=Z}) -> + {ok, Data0} = read_file(Sendfile), + Data = zlib:deflate(Z, Data0, finish), + zlib:deflateEnd(Z), + zlib:close(Z), + {{data, fin, Data}, State#state{deflate=undefined}}; gzip_data({data, nofin, Data0}, State=#state{deflate=Z}) -> Data = zlib:deflate(Z, Data0), {{data, nofin, Data}, State}; @@ -179,3 +194,15 @@ gzip_data({data, fin, Data0}, State=#state{deflate=Z}) -> zlib:deflateEnd(Z), zlib:close(Z), {{data, fin, Data}, State#state{deflate=undefined}}. + +read_file({sendfile, Offset, Bytes, Path}) -> + {ok, IoDevice} = file:open(Path, [read, raw, binary]), + try + _ = case Offset of + 0 -> ok; + _ -> file:position(IoDevice, {bof, Offset}) + end, + file:read(IoDevice, Bytes) + after + file:close(IoDevice) + end. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 9ce7aa8..c4a4e79 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -940,17 +940,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea %% @todo I'm pretty sure the last stream in the list is the one we want %% considering all others are queued. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), - {State, Headers} = connection(State0, Headers0, StreamID, Version), + {State1, Headers} = connection(State0, Headers0, StreamID, Version), + State = State1#state{out_state=done}, %% @todo Ensure content-length is set. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)), case Body of - {sendfile, O, B, P} -> + {sendfile, _, _, _} -> Transport:send(Socket, Response), - commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]); + sendfile(State, Body); _ -> - Transport:send(Socket, [Response, Body]), - commands(State#state{out_state=done}, StreamID, Tail) - end; + Transport:send(Socket, [Response, Body]) + end, + commands(State, StreamID, Tail); %% Send response headers and initiate chunked encoding or streaming. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState}, StreamID, [{headers, StatusCode, Headers0}|Tail]) -> @@ -981,53 +982,57 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))), commands(State, StreamID, Tail); %% Send a response body chunk. -%% -%% @todo WINDOW_UPDATE stuff require us to buffer some data. -%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also. +%% @todo We need to kill the stream if it tries to send data before headers. commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState}, StreamID, [{data, IsFin, Data}|Tail]) -> %% Do not send anything when the user asks to send an empty %% data frame, as that would break the protocol. - Size = iolist_size(Data), - Stream0 = lists:keyfind(StreamID, #stream.id, Streams0), - Stream = case Size of - 0 -> - %% We send the last chunk only if version is HTTP/1.1 and IsFin=fin. - case {OutState, Stream0} of - {_, #stream{method= <<"HEAD">>}} -> - ok; - {chunked, _} when IsFin =:= fin -> - Transport:send(Socket, <<"0\r\n\r\n">>); - _ -> - ok - end, + Size = case Data of + {sendfile, _, B, _} -> B; + _ -> iolist_size(Data) + end, + %% Depending on the current state we may need to send nothing, + %% the last chunk, chunked data with/without the last chunk, + %% or just the data as-is. + Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of + Stream0=#stream{method= <<"HEAD">>} -> Stream0; - _ -> - %% @todo We need to kill the stream if it tries to send data before headers. - %% @todo Same as above. - case {OutState, Stream0} of - {_, #stream{method= <<"HEAD">>}} -> - Stream0; - {chunked, _} -> - Transport:send(Socket, [ - integer_to_binary(Size, 16), <<"\r\n">>, Data, - case IsFin of - fin -> <<"\r\n0\r\n\r\n">>; - nofin -> <<"\r\n">> - end - ]), - Stream0; - {streaming, #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize}} -> - SentSize = SentSize0 + Size, - if - %% undefined is > any integer value. - SentSize > ExpectedSize -> - terminate(State0, response_body_too_large); - true -> - Transport:send(Socket, Data), - Stream0#stream{local_sent_size=SentSize} - end - end + Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked -> + Transport:send(Socket, <<"0\r\n\r\n">>), + Stream0; + Stream0 when Size =:= 0 -> + Stream0; + Stream0 when is_tuple(Data), OutState =:= chunked -> + Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]), + sendfile(State0, Data), + Transport:send(Socket, + case IsFin of + fin -> <<"\r\n0\r\n\r\n">>; + nofin -> <<"\r\n">> + end), + Stream0; + Stream0 when OutState =:= chunked -> + Transport:send(Socket, [ + integer_to_binary(Size, 16), <<"\r\n">>, Data, + case IsFin of + fin -> <<"\r\n0\r\n\r\n">>; + nofin -> <<"\r\n">> + end + ]), + Stream0; + Stream0 when OutState =:= streaming -> + #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0, + SentSize = SentSize0 + Size, + if + %% ExpectedSize may be undefined, which is > any integer value. + SentSize > ExpectedSize -> + terminate(State0, response_body_too_large); + is_tuple(Data) -> + sendfile(State0, Data); + true -> + Transport:send(Socket, Data) + end, + Stream0#stream{local_sent_size=SentSize} end, State = case IsFin of fin -> State0#state{out_state=done}; @@ -1050,38 +1055,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s ok end, commands(State#state{out_state=done}, StreamID, Tail); -%% Send a file. -commands(State0=#state{socket=Socket, transport=Transport, opts=Opts}, StreamID, - [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> - %% @todo exit with response_body_too_large if we exceed content-length - %% We wrap the sendfile call into a try/catch because on OTP-20 - %% and earlier a few different crashes could occur for sockets - %% that were closing or closed. For example a badarg in - %% erlang:port_get_data(#Port<...>) or a badmatch like - %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}... - %% - %% OTP-21 uses a NIF instead of a port so the implementation - %% and behavior has dramatically changed and it is unclear - %% whether it will be necessary in the future. - %% - %% This try/catch prevents some noisy logs to be written - %% when these errors occur. - try - %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end, - State = case IsFin of - fin -> State0#state{out_state=done} -%% @todo Add the sendfile command. -% nofin -> State0 - end, - commands(State, StreamID, Tail) - catch _:_ -> - terminate(State0, {socket_error, sendfile_crash, - 'An error occurred when using the sendfile function.'}) - end; %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, out_state=OutState, opts=Opts, children=Children}, StreamID, @@ -1136,6 +1109,32 @@ headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> headers_to_list(Headers) -> maps:to_list(Headers). +%% We wrap the sendfile call into a try/catch because on OTP-20 +%% and earlier a few different crashes could occur for sockets +%% that were closing or closed. For example a badarg in +%% erlang:port_get_data(#Port<...>) or a badmatch like +%% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}... +%% +%% OTP-21 uses a NIF instead of a port so the implementation +%% and behavior has dramatically changed and it is unclear +%% whether it will be necessary in the future. +%% +%% This try/catch prevents some noisy logs to be written +%% when these errors occur. +sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts}, + {sendfile, Offset, Bytes, Path}) -> + try + %% When sendfile is disabled we explicitly use the fallback. + _ = case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end, + ok + catch _:_ -> + terminate(State, {socket_error, sendfile_crash, + 'An error occurred when using the sendfile function.'}) + end. + %% Flush messages specific to cowboy_http before handing over the %% connection to another protocol. flush(Parent) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index d461595..fc1a3a0 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -510,12 +510,6 @@ commands(State0, StreamID, [{data, IsFin, Data}|Tail]) -> commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}), commands(State, StreamID, Tail); -%% Send a file. -%% @todo Add the sendfile command. -%commands(State0, Stream0=#stream{local=nofin}, -% [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> -% {State, Stream} = maybe_send_data(State0, Stream0, IsFin, {sendfile, Offset, Bytes, Path}), -% commands(State, Stream, Tail); %% Send a push promise. %% %% @todo Responses sent as a result of a push_promise request diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl index f9a1363..309afde 100644 --- a/src/cowboy_metrics_h.erl +++ b/src/cowboy_metrics_h.erl @@ -243,6 +243,8 @@ fold([{headers, Status, Headers}|Tail], end, resp_start=RespStart }); +%% @todo It might be worthwhile to keep the sendfile information around, +%% especially if these frames ultimately result in a sendfile syscall. fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) -> fold(Tail, State#state{ resp_body_length=RespBodyLen + resp_body_length(Data) diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 46bc62a..2c3de06 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -808,21 +808,35 @@ stream_reply(Status, Headers=#{}, Req=#{pid := Pid, streamid := StreamID}) Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}}, done_replying(Req, headers). --spec stream_body(iodata(), fin | nofin, req()) -> ok. +-spec stream_body(resp_body(), fin | nofin, req()) -> ok. %% Error out if headers were not sent. %% Don't send any body for HEAD responses. stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) -> ok; %% Don't send a message if the data is empty, except for the -%% very last message with IsFin=fin. -stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> +%% very last message with IsFin=fin. When using sendfile this +%% is converted to a data tuple, however. +stream_body({sendfile, _, 0, _}, nofin, _) -> + ok; +stream_body({sendfile, _, 0, _}, IsFin=fin, + #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> + Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}}, + ok; +stream_body({sendfile, O, B, P}, IsFin, + #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) + when is_integer(O), O >= 0, is_integer(B), B > 0 -> + Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}}, + ok; +stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) + when not is_tuple(Data) -> case iolist_size(Data) of 0 -> ok; _ -> Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, ok end; -stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> +stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) + when not is_tuple(Data) -> Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, ok. diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl index 92f66ba..49d1bb2 100644 --- a/src/cowboy_stream.erl +++ b/src/cowboy_stream.erl @@ -32,7 +32,7 @@ -type commands() :: [{inform, cowboy:http_status(), cowboy:http_headers()} | resp_command() | {headers, cowboy:http_status(), cowboy:http_headers()} - | {data, fin(), iodata()} + | {data, fin(), cowboy_req:resp_body()} | {trailers, cowboy:http_headers()} | {push, binary(), binary(), binary(), inet:port_number(), binary(), binary(), cowboy:http_headers()} |