From 233cf43ab9c6c16d22e14039a79606fc935693d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 5 Mar 2013 17:49:58 +0100 Subject: Make streamed chunk size configurable Defaults to a maximum of 1000000 bytes. Also standardize the te_identity and te_chunked decoding functions. Now they both try to read as much as possible (up to the limit), making body reading much faster when not using chunked encoding. --- src/cowboy_req.erl | 69 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 40 insertions(+), 29 deletions(-) (limited to 'src/cowboy_req.erl') diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index c800595..93c8656 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -78,6 +78,7 @@ -export([has_body/1]). -export([body_length/1]). -export([init_stream/4]). +-export([init_stream/5]). -export([stream_body/1]). -export([skip_body/1]). -export([body/1]). @@ -152,7 +153,8 @@ meta = [] :: [{atom(), any()}], %% Request body. - body_state = waiting :: waiting | done | {stream, fun(), any(), fun()}, + body_state = waiting :: waiting | done | {stream, + non_neg_integer(), non_neg_integer(), fun(), any(), fun()}, multipart = undefined :: undefined | {non_neg_integer(), fun()}, buffer = <<>> :: binary(), @@ -587,6 +589,12 @@ body_length(Req) -> {undefined, Req2} end. +%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req) +-spec init_stream(fun(), any(), fun(), Req) + -> {ok, Req} when Req::req(). +init_stream(TransferDecode, TransferState, ContentDecode, Req) -> + init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req). + %% @doc Initialize body streaming and set custom decoding functions. %% %% Calling this function is optional. It should only be used if you @@ -603,10 +611,11 @@ body_length(Req) -> %% Content encoding is generally used for compression. %% %% Standard encodings can be found in cowboy_http. --spec init_stream(fun(), any(), fun(), Req) -> {ok, Req} when Req::req(). -init_stream(TransferDecode, TransferState, ContentDecode, Req) -> +-spec init_stream(non_neg_integer(), fun(), any(), fun(), Req) + -> {ok, Req} when Req::req(). +init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) -> {ok, Req#http_req{body_state= - {stream, TransferDecode, TransferState, ContentDecode}}}. + {stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}. %% @doc Stream the request's body. %% @@ -635,8 +644,9 @@ stream_body(Req=#http_req{body_state=waiting, case parse_header(<<"transfer-encoding">>, Req1) of {ok, [<<"chunked">>], Req2} -> stream_body(Req2#http_req{body_state= - {stream, fun cowboy_http:te_chunked/2, {0, 0}, - fun cowboy_http:ce_identity/1}}); + {stream, 0, 1000000, + fun cowboy_http:te_chunked/2, {0, 0}, + fun cowboy_http:ce_identity/1}}); {ok, [<<"identity">>], Req2} -> {Length, Req3} = body_length(Req2), case Length of @@ -644,24 +654,26 @@ stream_body(Req=#http_req{body_state=waiting, {done, Req3#http_req{body_state=done}}; Length -> stream_body(Req3#http_req{body_state= - {stream, fun cowboy_http:te_identity/2, {0, Length}, - fun cowboy_http:ce_identity/1}}) + {stream, Length, 1000000, + fun cowboy_http:te_identity/2, {0, Length}, + fun cowboy_http:ce_identity/1}}) end end; -stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}}) +stream_body(Req=#http_req{body_state=done}) -> + {done, Req}; +stream_body(Req=#http_req{buffer=Buffer}) when Buffer =/= <<>> -> transfer_decode(Buffer, Req#http_req{buffer= <<>>}); -stream_body(Req=#http_req{body_state={stream, _, _, _}}) -> - stream_body_recv(0, Req); -stream_body(Req=#http_req{body_state=done}) -> - {done, Req}. +stream_body(Req) -> + stream_body_recv(Req). --spec stream_body_recv(non_neg_integer(), Req) +-spec stream_body_recv(Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -stream_body_recv(Length, Req=#http_req{ - transport=Transport, socket=Socket, buffer=Buffer}) -> +stream_body_recv(Req=#http_req{ + transport=Transport, socket=Socket, buffer=Buffer, + body_state={stream, Length, MaxLength, _, _, _}}) -> %% @todo Allow configuring the timeout. - case Transport:recv(Socket, Length, 5000) of + case Transport:recv(Socket, min(Length, MaxLength), 5000) of {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, Req#http_req{buffer= <<>>}); {error, Reason} -> {error, Reason} @@ -669,22 +681,21 @@ stream_body_recv(Length, Req=#http_req{ -spec transfer_decode(binary(), Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -transfer_decode(Data, Req=#http_req{ - body_state={stream, TransferDecode, TransferState, ContentDecode}}) -> +transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength, + TransferDecode, TransferState, ContentDecode}}) -> case TransferDecode(Data, TransferState) of - {ok, Data2, TransferState2} -> - content_decode(ContentDecode, Data2, Req#http_req{body_state= - {stream, TransferDecode, TransferState2, ContentDecode}}); {ok, Data2, Rest, TransferState2} -> - content_decode(ContentDecode, Data2, Req#http_req{ - buffer=Rest, body_state= - {stream, TransferDecode, TransferState2, ContentDecode}}); + content_decode(ContentDecode, Data2, + Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength, + TransferDecode, TransferState2, ContentDecode}}); %% @todo {header(s) for chunked more -> - stream_body_recv(0, Req#http_req{buffer=Data}); - {more, Length, Rest, TransferState2} -> - stream_body_recv(Length, Req#http_req{buffer=Rest, body_state= - {stream, TransferDecode, TransferState2, ContentDecode}}); + stream_body_recv(Req#http_req{buffer=Data, body_state={stream, + 0, MaxLength, TransferDecode, TransferState, ContentDecode}}); + {more, Length, Data2, TransferState2} -> + content_decode(ContentDecode, Data2, + Req#http_req{body_state={stream, Length, MaxLength, + TransferDecode, TransferState2, ContentDecode}}); {done, Length, Rest} -> Req2 = transfer_decode_done(Length, Rest, Req), {done, Req2}; -- cgit v1.2.3