aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2013-03-05 17:49:58 +0100
committerLoïc Hoguin <[email protected]>2013-03-05 21:54:35 +0100
commit233cf43ab9c6c16d22e14039a79606fc935693d6 (patch)
tree22ab1570534f9ec2dc00995d797eafe97377dbf7
parent55e98f4f61b8a7da470bed5e1473c1a186cf8c1f (diff)
downloadcowboy-233cf43ab9c6c16d22e14039a79606fc935693d6.tar.gz
cowboy-233cf43ab9c6c16d22e14039a79606fc935693d6.tar.bz2
cowboy-233cf43ab9c6c16d22e14039a79606fc935693d6.zip
Make streamed chunk size configurable
Defaults to a maximum of 1000000 bytes. Also standardize the te_identity and te_chunked decoding functions. Now they both try to read as much as possible (up to the limit), making body reading much faster when not using chunked encoding.
-rw-r--r--guide/req.md3
-rw-r--r--src/cowboy_http.erl8
-rw-r--r--src/cowboy_req.erl69
3 files changed, 47 insertions, 33 deletions
diff --git a/guide/req.md b/guide/req.md
index e13d3a5..8fd854a 100644
--- a/guide/req.md
+++ b/guide/req.md
@@ -94,7 +94,8 @@ If you know the request contains a body, and that it is
of appropriate size, then you can read it directly with
either `body/1` or `body_qs/1`. Otherwise, you will want
to stream it with `stream_body/1` and `skip_body/1`, with
-the streaming process optionally initialized using `init_stream/4`.
+the streaming process optionally initialized using `init_stream/4`
+or `init_stream/5`.
Multipart request body
----------------------
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 1d19838..57dac0b 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -853,7 +853,7 @@ authorization_basic_password(<<C, Rest/binary>>, Fun, Acc) ->
%% @doc Decode a stream of chunks.
-spec te_chunked(Bin, TransferState)
-> more | {more, non_neg_integer(), Bin, TransferState}
- | {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState}
+ | {ok, Bin, Bin, TransferState}
| {done, non_neg_integer(), Bin} | {error, badarg}
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) ->
@@ -879,11 +879,13 @@ te_chunked(Data, {ChunkRem, Streamed}) ->
%% @doc Decode an identity stream.
-spec te_identity(Bin, TransferState)
- -> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin}
+ -> {more, non_neg_integer(), Bin, TransferState}
+ | {done, Bin, non_neg_integer(), Bin}
when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}.
te_identity(Data, {Streamed, Total})
when Streamed + byte_size(Data) < Total ->
- {ok, Data, {Streamed + byte_size(Data), Total}};
+ Streamed2 = Streamed + byte_size(Data),
+ {more, Total - Streamed2, Data, {Streamed2, Total}};
te_identity(Data, {Streamed, Total}) ->
Size = Total - Streamed,
<< Data2:Size/binary, Rest/binary >> = Data,
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index c800595..93c8656 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -78,6 +78,7 @@
-export([has_body/1]).
-export([body_length/1]).
-export([init_stream/4]).
+-export([init_stream/5]).
-export([stream_body/1]).
-export([skip_body/1]).
-export([body/1]).
@@ -152,7 +153,8 @@
meta = [] :: [{atom(), any()}],
%% Request body.
- body_state = waiting :: waiting | done | {stream, fun(), any(), fun()},
+ body_state = waiting :: waiting | done | {stream,
+ non_neg_integer(), non_neg_integer(), fun(), any(), fun()},
multipart = undefined :: undefined | {non_neg_integer(), fun()},
buffer = <<>> :: binary(),
@@ -587,6 +589,12 @@ body_length(Req) ->
{undefined, Req2}
end.
+%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req)
+-spec init_stream(fun(), any(), fun(), Req)
+ -> {ok, Req} when Req::req().
+init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
+ init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req).
+
%% @doc Initialize body streaming and set custom decoding functions.
%%
%% Calling this function is optional. It should only be used if you
@@ -603,10 +611,11 @@ body_length(Req) ->
%% Content encoding is generally used for compression.
%%
%% Standard encodings can be found in cowboy_http.
--spec init_stream(fun(), any(), fun(), Req) -> {ok, Req} when Req::req().
-init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
+-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req)
+ -> {ok, Req} when Req::req().
+init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) ->
{ok, Req#http_req{body_state=
- {stream, TransferDecode, TransferState, ContentDecode}}}.
+ {stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}.
%% @doc Stream the request's body.
%%
@@ -635,8 +644,9 @@ stream_body(Req=#http_req{body_state=waiting,
case parse_header(<<"transfer-encoding">>, Req1) of
{ok, [<<"chunked">>], Req2} ->
stream_body(Req2#http_req{body_state=
- {stream, fun cowboy_http:te_chunked/2, {0, 0},
- fun cowboy_http:ce_identity/1}});
+ {stream, 0, 1000000,
+ fun cowboy_http:te_chunked/2, {0, 0},
+ fun cowboy_http:ce_identity/1}});
{ok, [<<"identity">>], Req2} ->
{Length, Req3} = body_length(Req2),
case Length of
@@ -644,24 +654,26 @@ stream_body(Req=#http_req{body_state=waiting,
{done, Req3#http_req{body_state=done}};
Length ->
stream_body(Req3#http_req{body_state=
- {stream, fun cowboy_http:te_identity/2, {0, Length},
- fun cowboy_http:ce_identity/1}})
+ {stream, Length, 1000000,
+ fun cowboy_http:te_identity/2, {0, Length},
+ fun cowboy_http:ce_identity/1}})
end
end;
-stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}})
+stream_body(Req=#http_req{body_state=done}) ->
+ {done, Req};
+stream_body(Req=#http_req{buffer=Buffer})
when Buffer =/= <<>> ->
transfer_decode(Buffer, Req#http_req{buffer= <<>>});
-stream_body(Req=#http_req{body_state={stream, _, _, _}}) ->
- stream_body_recv(0, Req);
-stream_body(Req=#http_req{body_state=done}) ->
- {done, Req}.
+stream_body(Req) ->
+ stream_body_recv(Req).
--spec stream_body_recv(non_neg_integer(), Req)
+-spec stream_body_recv(Req)
-> {ok, binary(), Req} | {error, atom()} when Req::req().
-stream_body_recv(Length, Req=#http_req{
- transport=Transport, socket=Socket, buffer=Buffer}) ->
+stream_body_recv(Req=#http_req{
+ transport=Transport, socket=Socket, buffer=Buffer,
+ body_state={stream, Length, MaxLength, _, _, _}}) ->
%% @todo Allow configuring the timeout.
- case Transport:recv(Socket, Length, 5000) of
+ case Transport:recv(Socket, min(Length, MaxLength), 5000) of
{ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
Req#http_req{buffer= <<>>});
{error, Reason} -> {error, Reason}
@@ -669,22 +681,21 @@ stream_body_recv(Length, Req=#http_req{
-spec transfer_decode(binary(), Req)
-> {ok, binary(), Req} | {error, atom()} when Req::req().
-transfer_decode(Data, Req=#http_req{
- body_state={stream, TransferDecode, TransferState, ContentDecode}}) ->
+transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength,
+ TransferDecode, TransferState, ContentDecode}}) ->
case TransferDecode(Data, TransferState) of
- {ok, Data2, TransferState2} ->
- content_decode(ContentDecode, Data2, Req#http_req{body_state=
- {stream, TransferDecode, TransferState2, ContentDecode}});
{ok, Data2, Rest, TransferState2} ->
- content_decode(ContentDecode, Data2, Req#http_req{
- buffer=Rest, body_state=
- {stream, TransferDecode, TransferState2, ContentDecode}});
+ content_decode(ContentDecode, Data2,
+ Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength,
+ TransferDecode, TransferState2, ContentDecode}});
%% @todo {header(s) for chunked
more ->
- stream_body_recv(0, Req#http_req{buffer=Data});
- {more, Length, Rest, TransferState2} ->
- stream_body_recv(Length, Req#http_req{buffer=Rest, body_state=
- {stream, TransferDecode, TransferState2, ContentDecode}});
+ stream_body_recv(Req#http_req{buffer=Data, body_state={stream,
+ 0, MaxLength, TransferDecode, TransferState, ContentDecode}});
+ {more, Length, Data2, TransferState2} ->
+ content_decode(ContentDecode, Data2,
+ Req#http_req{body_state={stream, Length, MaxLength,
+ TransferDecode, TransferState2, ContentDecode}});
{done, Length, Rest} ->
Req2 = transfer_decode_done(Length, Rest, Req),
{done, Req2};