From ce1d8862c093b31a2e3ba0a072b58b697a6b55de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 2 Apr 2013 18:43:37 +0200 Subject: Replace init_stream/5 with stream_body/2 This allows us to change the max chunk length on a per chunk basis instead of for the whole stream. It's also much easier to use this way even if we don't want to change the chunk size. --- src/cowboy_req.erl | 67 +++++++++++++++++++++++++++++------------------------- 1 file changed, 36 insertions(+), 31 deletions(-) (limited to 'src/cowboy_req.erl') diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 966e463..bdebddd 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -78,8 +78,8 @@ -export([has_body/1]). -export([body_length/1]). -export([init_stream/4]). --export([init_stream/5]). -export([stream_body/1]). +-export([stream_body/2]). -export([skip_body/1]). -export([body/1]). -export([body/2]). @@ -155,8 +155,8 @@ meta = [] :: [{atom(), any()}], %% Request body. - body_state = waiting :: waiting | done | {stream, - non_neg_integer(), non_neg_integer(), fun(), any(), fun()}, + body_state = waiting :: waiting | done + | {stream, non_neg_integer(), fun(), any(), fun()}, multipart = undefined :: undefined | {non_neg_integer(), fun()}, buffer = <<>> :: binary(), @@ -591,17 +591,11 @@ body_length(Req) -> {undefined, Req2} end. -%% @equiv init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req) --spec init_stream(fun(), any(), fun(), Req) - -> {ok, Req} when Req::req(). -init_stream(TransferDecode, TransferState, ContentDecode, Req) -> - init_stream(1000000, TransferDecode, TransferState, ContentDecode, Req). - %% @doc Initialize body streaming and set custom decoding functions. %% %% Calling this function is optional. It should only be used if you %% need to override the default behavior of Cowboy. Otherwise you -%% should call stream_body/1 directly. +%% should call stream_body/{1,2} directly. %% %% Two decodings happen. First a decoding function is applied to the %% transferred data, and then another is applied to the actual content. @@ -613,27 +607,36 @@ init_stream(TransferDecode, TransferState, ContentDecode, Req) -> %% Content encoding is generally used for compression. %% %% Standard encodings can be found in cowboy_http. --spec init_stream(non_neg_integer(), fun(), any(), fun(), Req) +-spec init_stream(fun(), any(), fun(), Req) -> {ok, Req} when Req::req(). -init_stream(MaxLength, TransferDecode, TransferState, ContentDecode, Req) -> +init_stream(TransferDecode, TransferState, ContentDecode, Req) -> {ok, Req#http_req{body_state= - {stream, 0, MaxLength, TransferDecode, TransferState, ContentDecode}}}. + {stream, 0, TransferDecode, TransferState, ContentDecode}}}. + +%% @equiv stream_body(Req, 1000000) +-spec stream_body(Req) -> {ok, binary(), Req} + | {done, Req} | {error, atom()} when Req::req(). +stream_body(Req) -> + stream_body(Req, 1000000). %% @doc Stream the request's body. %% %% This is the most low level function to read the request body. %% -%% In most cases, if they weren't defined before using stream_body/4, +%% In most cases, if they weren't defined before using init_stream/4, %% this function will guess which transfer and content encodings were %% used for building the request body, and configure the decoding %% functions that will be used when streaming. %% %% It then starts streaming the body, returning {ok, Data, Req} %% for each streamed part, and {done, Req} when it's finished streaming. --spec stream_body(Req) -> {ok, binary(), Req} +%% +%% You can limit the size of the chunks being returned by using the +%% second argument which is the size in bytes. It defaults to 1000000 bytes. +-spec stream_body(Req, non_neg_integer()) -> {ok, binary(), Req} | {done, Req} | {error, atom()} when Req::req(). -stream_body(Req=#http_req{body_state=waiting, - version=Version, transport=Transport, socket=Socket}) -> +stream_body(Req=#http_req{body_state=waiting, version=Version, + transport=Transport, socket=Socket}, MaxLength) -> {ok, ExpectHeader, Req1} = parse_header(<<"expect">>, Req), case ExpectHeader of [<<"100-continue">>] -> @@ -646,9 +649,10 @@ stream_body(Req=#http_req{body_state=waiting, case parse_header(<<"transfer-encoding">>, Req1) of {ok, [<<"chunked">>], Req2} -> stream_body(Req2#http_req{body_state= - {stream, 0, 1000000, + {stream, 0, fun cowboy_http:te_chunked/2, {0, 0}, - fun cowboy_http:ce_identity/1}}); + fun cowboy_http:ce_identity/1}}, + MaxLength); {ok, [<<"identity">>], Req2} -> {Length, Req3} = body_length(Req2), case Length of @@ -656,24 +660,25 @@ stream_body(Req=#http_req{body_state=waiting, {done, Req3#http_req{body_state=done}}; Length -> stream_body(Req3#http_req{body_state= - {stream, Length, 1000000, + {stream, Length, fun cowboy_http:te_identity/2, {0, Length}, - fun cowboy_http:ce_identity/1}}) + fun cowboy_http:ce_identity/1}}, + MaxLength) end end; -stream_body(Req=#http_req{body_state=done}) -> +stream_body(Req=#http_req{body_state=done}, _) -> {done, Req}; -stream_body(Req=#http_req{buffer=Buffer}) +stream_body(Req=#http_req{buffer=Buffer}, _) when Buffer =/= <<>> -> transfer_decode(Buffer, Req#http_req{buffer= <<>>}); -stream_body(Req) -> - stream_body_recv(Req). +stream_body(Req, MaxLength) -> + stream_body_recv(Req, MaxLength). --spec stream_body_recv(Req) +-spec stream_body_recv(Req, non_neg_integer()) -> {ok, binary(), Req} | {error, atom()} when Req::req(). stream_body_recv(Req=#http_req{ transport=Transport, socket=Socket, buffer=Buffer, - body_state={stream, Length, MaxLength, _, _, _}}) -> + body_state={stream, Length, _, _, _}}, MaxLength) -> %% @todo Allow configuring the timeout. case Transport:recv(Socket, min(Length, MaxLength), 5000) of {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, @@ -683,20 +688,20 @@ stream_body_recv(Req=#http_req{ -spec transfer_decode(binary(), Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -transfer_decode(Data, Req=#http_req{body_state={stream, _, MaxLength, +transfer_decode(Data, Req=#http_req{body_state={stream, _, TransferDecode, TransferState, ContentDecode}}) -> case TransferDecode(Data, TransferState) of {ok, Data2, Rest, TransferState2} -> content_decode(ContentDecode, Data2, - Req#http_req{buffer=Rest, body_state={stream, 0, MaxLength, + Req#http_req{buffer=Rest, body_state={stream, 0, TransferDecode, TransferState2, ContentDecode}}); %% @todo {header(s) for chunked more -> stream_body_recv(Req#http_req{buffer=Data, body_state={stream, - 0, MaxLength, TransferDecode, TransferState, ContentDecode}}); + 0, TransferDecode, TransferState, ContentDecode}}, 0); {more, Length, Data2, TransferState2} -> content_decode(ContentDecode, Data2, - Req#http_req{body_state={stream, Length, MaxLength, + Req#http_req{body_state={stream, Length, TransferDecode, TransferState2, ContentDecode}}); {done, Length, Rest} -> Req2 = transfer_decode_done(Length, Rest, Req), -- cgit v1.2.3