path: root/src
diff options
authorLoïc Hoguin <[email protected]>2018-11-09 17:42:37 +0100
committerLoïc Hoguin <[email protected]>2018-11-09 17:42:37 +0100
commitd7b7580b3913c17b404319cc4c153748d5e59194 (patch)
tree3026f38e4bcfdcb26d067a4e5aec6d401700c3cc /src
parent29043aa7b4d11e377bc76d453f592ea5a6df1f43 (diff)
Add sendfile support to cowboy_req:stream_body
It is now possible to stream one or more sendfile tuples. A simple example of what can now be done would be for example to build a tar file on the fly using the sendfile syscall for sending the files, or to support Range requests with more than one range with the sendfile syscall. When using cowboy_compress_h unfortunately we have to read the file in order to send it. More options will be added at a later time to make sure users don't read too much into memory. This is a new feature however so existing code is not affected. Also rework cowboy_http's data sending to be flatter.
Diffstat (limited to 'src')
6 files changed, 128 insertions, 92 deletions
diff --git a/src/cowboy_compress_h.erl b/src/cowboy_compress_h.erl
index 781cc0a..ed81db6 100644
--- a/src/cowboy_compress_h.erl
+++ b/src/cowboy_compress_h.erl
@@ -98,7 +98,7 @@ fold(Commands, State) ->
fold([], State, Acc) ->
{lists:reverse(Acc), State};
-%% We do not compress sendfile bodies.
+%% We do not compress full sendfile bodies.
fold([Response={response, _, _, {sendfile, _, _, _}}|Tail], State, Acc) ->
fold(Tail, State, [Response|Acc]);
%% We compress full responses directly, unless they are lower than
@@ -171,6 +171,21 @@ gzip_headers({headers, Status, Headers0}, State) ->
<<"content-encoding">> => <<"gzip">>
}}, State#state{deflate=Z}}.
+%% It is not possible to combine zlib and the sendfile
+%% syscall as far as I can tell, because the zlib format
+%% includes a checksum at the end of the stream. We have
+%% to read the file in memory, making this not suitable for
+%% large files.
+gzip_data({data, nofin, Sendfile={sendfile, _, _, _}}, State=#state{deflate=Z}) ->
+ {ok, Data0} = read_file(Sendfile),
+ Data = zlib:deflate(Z, Data0),
+ {{data, nofin, Data}, State};
+gzip_data({data, fin, Sendfile={sendfile, _, _, _}}, State=#state{deflate=Z}) ->
+ {ok, Data0} = read_file(Sendfile),
+ Data = zlib:deflate(Z, Data0, finish),
+ zlib:deflateEnd(Z),
+ zlib:close(Z),
+ {{data, fin, Data}, State#state{deflate=undefined}};
gzip_data({data, nofin, Data0}, State=#state{deflate=Z}) ->
Data = zlib:deflate(Z, Data0),
{{data, nofin, Data}, State};
@@ -179,3 +194,15 @@ gzip_data({data, fin, Data0}, State=#state{deflate=Z}) ->
{{data, fin, Data}, State#state{deflate=undefined}}.
+read_file({sendfile, Offset, Bytes, Path}) ->
+ {ok, IoDevice} = file:open(Path, [read, raw, binary]),
+ try
+ _ = case Offset of
+ 0 -> ok;
+ _ -> file:position(IoDevice, {bof, Offset})
+ end,
+ file:read(IoDevice, Bytes)
+ after
+ file:close(IoDevice)
+ end.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 9ce7aa8..c4a4e79 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -940,17 +940,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
%% @todo I'm pretty sure the last stream in the list is the one we want
%% considering all others are queued.
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
- {State, Headers} = connection(State0, Headers0, StreamID, Version),
+ {State1, Headers} = connection(State0, Headers0, StreamID, Version),
+ State = State1#state{out_state=done},
%% @todo Ensure content-length is set.
Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
case Body of
- {sendfile, O, B, P} ->
+ {sendfile, _, _, _} ->
Transport:send(Socket, Response),
- commands(State, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+ sendfile(State, Body);
_ ->
- Transport:send(Socket, [Response, Body]),
- commands(State#state{out_state=done}, StreamID, Tail)
- end;
+ Transport:send(Socket, [Response, Body])
+ end,
+ commands(State, StreamID, Tail);
%% Send response headers and initiate chunked encoding or streaming.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
StreamID, [{headers, StatusCode, Headers0}|Tail]) ->
@@ -981,53 +982,57 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
commands(State, StreamID, Tail);
%% Send a response body chunk.
-%% @todo WINDOW_UPDATE stuff require us to buffer some data.
-%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also.
+%% @todo We need to kill the stream if it tries to send data before headers.
commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out_state=OutState},
StreamID, [{data, IsFin, Data}|Tail]) ->
%% Do not send anything when the user asks to send an empty
%% data frame, as that would break the protocol.
- Size = iolist_size(Data),
- Stream0 = lists:keyfind(StreamID, #stream.id, Streams0),
- Stream = case Size of
- 0 ->
- %% We send the last chunk only if version is HTTP/1.1 and IsFin=fin.
- case {OutState, Stream0} of
- {_, #stream{method= <<"HEAD">>}} ->
- ok;
- {chunked, _} when IsFin =:= fin ->
- Transport:send(Socket, <<"0\r\n\r\n">>);
- _ ->
- ok
- end,
+ Size = case Data of
+ {sendfile, _, B, _} -> B;
+ _ -> iolist_size(Data)
+ end,
+ %% Depending on the current state we may need to send nothing,
+ %% the last chunk, chunked data with/without the last chunk,
+ %% or just the data as-is.
+ Stream = case lists:keyfind(StreamID, #stream.id, Streams0) of
+ Stream0=#stream{method= <<"HEAD">>} ->
- _ ->
- %% @todo We need to kill the stream if it tries to send data before headers.
- %% @todo Same as above.
- case {OutState, Stream0} of
- {_, #stream{method= <<"HEAD">>}} ->
- Stream0;
- {chunked, _} ->
- Transport:send(Socket, [
- integer_to_binary(Size, 16), <<"\r\n">>, Data,
- case IsFin of
- fin -> <<"\r\n0\r\n\r\n">>;
- nofin -> <<"\r\n">>
- end
- ]),
- Stream0;
- {streaming, #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize}} ->
- SentSize = SentSize0 + Size,
- if
- %% undefined is > any integer value.
- SentSize > ExpectedSize ->
- terminate(State0, response_body_too_large);
- true ->
- Transport:send(Socket, Data),
- Stream0#stream{local_sent_size=SentSize}
- end
- end
+ Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
+ Transport:send(Socket, <<"0\r\n\r\n">>),
+ Stream0;
+ Stream0 when Size =:= 0 ->
+ Stream0;
+ Stream0 when is_tuple(Data), OutState =:= chunked ->
+ Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
+ sendfile(State0, Data),
+ Transport:send(Socket,
+ case IsFin of
+ fin -> <<"\r\n0\r\n\r\n">>;
+ nofin -> <<"\r\n">>
+ end),
+ Stream0;
+ Stream0 when OutState =:= chunked ->
+ Transport:send(Socket, [
+ integer_to_binary(Size, 16), <<"\r\n">>, Data,
+ case IsFin of
+ fin -> <<"\r\n0\r\n\r\n">>;
+ nofin -> <<"\r\n">>
+ end
+ ]),
+ Stream0;
+ Stream0 when OutState =:= streaming ->
+ #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
+ SentSize = SentSize0 + Size,
+ if
+ %% ExpectedSize may be undefined, which is > any integer value.
+ SentSize > ExpectedSize ->
+ terminate(State0, response_body_too_large);
+ is_tuple(Data) ->
+ sendfile(State0, Data);
+ true ->
+ Transport:send(Socket, Data)
+ end,
+ Stream0#stream{local_sent_size=SentSize}
State = case IsFin of
fin -> State0#state{out_state=done};
@@ -1050,38 +1055,6 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s
commands(State#state{out_state=done}, StreamID, Tail);
-%% Send a file.
-commands(State0=#state{socket=Socket, transport=Transport, opts=Opts}, StreamID,
- [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
- %% @todo exit with response_body_too_large if we exceed content-length
- %% We wrap the sendfile call into a try/catch because on OTP-20
- %% and earlier a few different crashes could occur for sockets
- %% that were closing or closed. For example a badarg in
- %% erlang:port_get_data(#Port<...>) or a badmatch like
- %% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
- %%
- %% OTP-21 uses a NIF instead of a port so the implementation
- %% and behavior has dramatically changed and it is unclear
- %% whether it will be necessary in the future.
- %%
- %% This try/catch prevents some noisy logs to be written
- %% when these errors occur.
- try
- %% When sendfile is disabled we explicitly use the fallback.
- _ = case maps:get(sendfile, Opts, true) of
- true -> Transport:sendfile(Socket, Path, Offset, Bytes);
- false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
- end,
- State = case IsFin of
- fin -> State0#state{out_state=done}
-%% @todo Add the sendfile command.
-% nofin -> State0
- end,
- commands(State, StreamID, Tail)
- catch _:_ ->
- terminate(State0, {socket_error, sendfile_crash,
- 'An error occurred when using the sendfile function.'})
- end;
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
out_state=OutState, opts=Opts, children=Children}, StreamID,
@@ -1136,6 +1109,32 @@ headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) ->
headers_to_list(Headers) ->
+%% We wrap the sendfile call into a try/catch because on OTP-20
+%% and earlier a few different crashes could occur for sockets
+%% that were closing or closed. For example a badarg in
+%% erlang:port_get_data(#Port<...>) or a badmatch like
+%% {{badmatch,{error,einval}},[{prim_file,sendfile,8,[]}...
+%% OTP-21 uses a NIF instead of a port so the implementation
+%% and behavior has dramatically changed and it is unclear
+%% whether it will be necessary in the future.
+%% This try/catch prevents some noisy logs to be written
+%% when these errors occur.
+sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
+ {sendfile, Offset, Bytes, Path}) ->
+ try
+ %% When sendfile is disabled we explicitly use the fallback.
+ _ = case maps:get(sendfile, Opts, true) of
+ true -> Transport:sendfile(Socket, Path, Offset, Bytes);
+ false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
+ end,
+ ok
+ catch _:_ ->
+ terminate(State, {socket_error, sendfile_crash,
+ 'An error occurred when using the sendfile function.'})
+ end.
%% Flush messages specific to cowboy_http before handing over the
%% connection to another protocol.
flush(Parent) ->
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index d461595..fc1a3a0 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -510,12 +510,6 @@ commands(State0, StreamID, [{data, IsFin, Data}|Tail]) ->
commands(State0, StreamID, [{trailers, Trailers}|Tail]) ->
State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}),
commands(State, StreamID, Tail);
-%% Send a file.
-%% @todo Add the sendfile command.
-%commands(State0, Stream0=#stream{local=nofin},
-% [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
-% {State, Stream} = maybe_send_data(State0, Stream0, IsFin, {sendfile, Offset, Bytes, Path}),
-% commands(State, Stream, Tail);
%% Send a push promise.
%% @todo Responses sent as a result of a push_promise request
diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl
index f9a1363..309afde 100644
--- a/src/cowboy_metrics_h.erl
+++ b/src/cowboy_metrics_h.erl
@@ -243,6 +243,8 @@ fold([{headers, Status, Headers}|Tail],
+%% @todo It might be worthwhile to keep the sendfile information around,
+%% especially if these frames ultimately result in a sendfile syscall.
fold([{data, nofin, Data}|Tail], State=#state{resp_body_length=RespBodyLen}) ->
fold(Tail, State#state{
resp_body_length=RespBodyLen + resp_body_length(Data)
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 46bc62a..2c3de06 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -808,21 +808,35 @@ stream_reply(Status, Headers=#{}, Req=#{pid := Pid, streamid := StreamID})
Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
done_replying(Req, headers).
--spec stream_body(iodata(), fin | nofin, req()) -> ok.
+-spec stream_body(resp_body(), fin | nofin, req()) -> ok.
%% Error out if headers were not sent.
%% Don't send any body for HEAD responses.
stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) ->
%% Don't send a message if the data is empty, except for the
-%% very last message with IsFin=fin.
-stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+%% very last message with IsFin=fin. When using sendfile this
+%% is converted to a data tuple, however.
+stream_body({sendfile, _, 0, _}, nofin, _) ->
+ ok;
+stream_body({sendfile, _, 0, _}, IsFin=fin,
+ #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+ Pid ! {{Pid, StreamID}, {data, IsFin, <<>>}},
+ ok;
+stream_body({sendfile, O, B, P}, IsFin,
+ #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+ when is_integer(O), O >= 0, is_integer(B), B > 0 ->
+ Pid ! {{Pid, StreamID}, {data, IsFin, {sendfile, O, B, P}}},
+ ok;
+stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+ when not is_tuple(Data) ->
case iolist_size(Data) of
0 -> ok;
_ ->
Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
-stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) ->
+stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers})
+ when not is_tuple(Data) ->
Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl
index 92f66ba..49d1bb2 100644
--- a/src/cowboy_stream.erl
+++ b/src/cowboy_stream.erl
@@ -32,7 +32,7 @@
-type commands() :: [{inform, cowboy:http_status(), cowboy:http_headers()}
| resp_command()
| {headers, cowboy:http_status(), cowboy:http_headers()}
- | {data, fin(), iodata()}
+ | {data, fin(), cowboy_req:resp_body()}
| {trailers, cowboy:http_headers()}
| {push, binary(), binary(), binary(), inet:port_number(),
binary(), binary(), cowboy:http_headers()}