From 233cf43ab9c6c16d22e14039a79606fc935693d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 5 Mar 2013 17:49:58 +0100 Subject: Make streamed chunk size configurable Defaults to a maximum of 1000000 bytes. Also standardize the te_identity and te_chunked decoding functions. Now they both try to read as much as possible (up to the limit), making body reading much faster when not using chunked encoding. --- guide/req.md | 3 ++- src/cowboy_http.erl | 8 ++++--- src/cowboy_req.erl | 69 +++++++++++++++++++++++++++++++---------------------- 3 files changed, 47 insertions(+), 33 deletions(-) diff --git a/guide/req.md b/guide/req.md index e13d3a5..8fd854a 100644 --- a/guide/req.md +++ b/guide/req.md @@ -94,7 +94,8 @@ If you know the request contains a body, and that it is of appropriate size, then you can read it directly with either `body/1` or `body_qs/1`. Otherwise, you will want to stream it with `stream_body/1` and `skip_body/1`, with -the streaming process optionally initialized using `init_stream/4`. +the streaming process optionally initialized using `init_stream/4` +or `init_stream/5`. Multipart request body ---------------------- diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 1d19838..57dac0b 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -853,7 +853,7 @@ authorization_basic_password(<>, Fun, Acc) -> %% @doc Decode a stream of chunks. -spec te_chunked(Bin, TransferState) -> more | {more, non_neg_integer(), Bin, TransferState} - | {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState} + | {ok, Bin, Bin, TransferState} | {done, non_neg_integer(), Bin} | {error, badarg} when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}. te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) -> @@ -879,11 +879,13 @@ te_chunked(Data, {ChunkRem, Streamed}) -> %% @doc Decode an identity stream. -spec te_identity(Bin, TransferState) - -> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin} + -> {more, non_neg_integer(), Bin, TransferState} + | {done, Bin, non_neg_integer(), Bin} when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}. te_identity(Data, {Streamed, Total}) when Streamed + byte_size(Data) < Total -> - {ok, Data, {Streamed + byte_size(Data), Total}}; + Streamed2 = Streamed + byte_size(Data), + {more, Total - Streamed2, Data, {Streamed2, Total}}; te_identity(Data, {Streamed, Total}) -> Size = Total - Streamed, << Data2:Size/binary, Rest/binary >> = Data, diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index c800595..93c8656 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -78,6 +78,7 @@ -export([has_body/1]). -export([body_length/1]). -export([init_stream/4]). +-export([init_stream/5]). -export([stream_body/1]). -export([skip_body/1]). -export([body/1]). @@ -152,7 +153,8 @@ meta = [] :: [{atom(), any()}], %% Request body. - body_state = waiting :: waiting | done | {stream, fun(), any(), fun()}, + body_state = waiting :: waiting | done | {stream, + non_neg_integer(), non_neg_integer(), fun(), any(), fun()}, multipart = undefined :: undefined | {non_neg_integer(), fun()}, buffer = <<>> :: binary(), @@ -587,6 +589,12 @@ body_length(Req) -> {undefined, Req2} end. +%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req) +-spec init_stream(fun(), any(), fun(), Req) + -> {ok, Req} when Req::req(). +init_stream(TransferDecode, TransferState, ContentDecode, Req) -> + init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req). + %% @doc Initialize body streaming and set custom decoding functions. %% %% Calling this function is optional. It should only be used if you @@ -603,10 +611,11 @@ body_length(Req) -> %% Content encoding is generally used for compression. %% %% Standard encodings can be found in cowboy_http. --spec init_stream(fun(), any(), fun(), Req) -> {ok, Req} when Req::req(). -init_stream(TransferDecode, TransferState, ContentDecode, Req) -> +-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req) + -> {ok, Req} when Req::req(). +init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) -> {ok, Req#http_req{body_state= - {stream, TransferDecode, TransferState, ContentDecode}}}. + {stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}. %% @doc Stream the request's body. %% @@ -635,8 +644,9 @@ stream_body(Req=#http_req{body_state=waiting, case parse_header(<<"transfer-encoding">>, Req1) of {ok, [<<"chunked">>], Req2} -> stream_body(Req2#http_req{body_state= - {stream, fun cowboy_http:te_chunked/2, {0, 0}, - fun cowboy_http:ce_identity/1}}); + {stream, 0, 1000000, + fun cowboy_http:te_chunked/2, {0, 0}, + fun cowboy_http:ce_identity/1}}); {ok, [<<"identity">>], Req2} -> {Length, Req3} = body_length(Req2), case Length of @@ -644,24 +654,26 @@ stream_body(Req=#http_req{body_state=waiting, {done, Req3#http_req{body_state=done}}; Length -> stream_body(Req3#http_req{body_state= - {stream, fun cowboy_http:te_identity/2, {0, Length}, - fun cowboy_http:ce_identity/1}}) + {stream, Length, 1000000, + fun cowboy_http:te_identity/2, {0, Length}, + fun cowboy_http:ce_identity/1}}) end end; -stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}}) +stream_body(Req=#http_req{body_state=done}) -> + {done, Req}; +stream_body(Req=#http_req{buffer=Buffer}) when Buffer =/= <<>> -> transfer_decode(Buffer, Req#http_req{buffer= <<>>}); -stream_body(Req=#http_req{body_state={stream, _, _, _}}) -> - stream_body_recv(0, Req); -stream_body(Req=#http_req{body_state=done}) -> - {done, Req}. +stream_body(Req) -> + stream_body_recv(Req). --spec stream_body_recv(non_neg_integer(), Req) +-spec stream_body_recv(Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -stream_body_recv(Length, Req=#http_req{ - transport=Transport, socket=Socket, buffer=Buffer}) -> +stream_body_recv(Req=#http_req{ + transport=Transport, socket=Socket, buffer=Buffer, + body_state={stream, Length, MaxLength, _, _, _}}) -> %% @todo Allow configuring the timeout. - case Transport:recv(Socket, Length, 5000) of + case Transport:recv(Socket, min(Length, MaxLength), 5000) of {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, Req#http_req{buffer= <<>>}); {error, Reason} -> {error, Reason} @@ -669,22 +681,21 @@ stream_body_recv(Length, Req=#http_req{ -spec transfer_decode(binary(), Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -transfer_decode(Data, Req=#http_req{ - body_state={stream, TransferDecode, TransferState, ContentDecode}}) -> +transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength, + TransferDecode, TransferState, ContentDecode}}) -> case TransferDecode(Data, TransferState) of - {ok, Data2, TransferState2} -> - content_decode(ContentDecode, Data2, Req#http_req{body_state= - {stream, TransferDecode, TransferState2, ContentDecode}}); {ok, Data2, Rest, TransferState2} -> - content_decode(ContentDecode, Data2, Req#http_req{ - buffer=Rest, body_state= - {stream, TransferDecode, TransferState2, ContentDecode}}); + content_decode(ContentDecode, Data2, + Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength, + TransferDecode, TransferState2, ContentDecode}}); %% @todo {header(s) for chunked more -> - stream_body_recv(0, Req#http_req{buffer=Data}); - {more, Length, Rest, TransferState2} -> - stream_body_recv(Length, Req#http_req{buffer=Rest, body_state= - {stream, TransferDecode, TransferState2, ContentDecode}}); + stream_body_recv(Req#http_req{buffer=Data, body_state={stream, + 0, MaxLength, TransferDecode, TransferState, ContentDecode}}); + {more, Length, Data2, TransferState2} -> + content_decode(ContentDecode, Data2, + Req#http_req{body_state={stream, Length, MaxLength, + TransferDecode, TransferState2, ContentDecode}}); {done, Length, Rest} -> Req2 = transfer_decode_done(Length, Rest, Req), {done, Req2}; -- cgit v1.2.3