From 85d05fff340198bb9af332b7fd503f7c8883e634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 29 Jan 2013 19:12:34 +0100 Subject: Fix chunked streaming of request body and improve speed --- src/cowboy_http.erl | 28 ++++++++++++++-------------- src/cowboy_req.erl | 16 ++++++++++------ 2 files changed, 24 insertions(+), 20 deletions(-) (limited to 'src') diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index e1e1487..6f42ab9 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -804,22 +804,23 @@ qvalue(Data, Fun, Q, _M) -> %% Decoding. %% @doc Decode a stream of chunks. --spec te_chunked(binary(), {non_neg_integer(), non_neg_integer()}) - -> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}} - | {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}} - | {done, non_neg_integer(), binary()} | {error, badarg}. -te_chunked(<<>>, _) -> - more; +-spec te_chunked(Bin, TransferState) + -> more | {more, non_neg_integer(), Bin, TransferState} + | {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState} + | {done, non_neg_integer(), Bin} | {error, badarg} + when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}. te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) -> {done, Streamed, Rest}; te_chunked(Data, {0, Streamed}) -> %% @todo We are expecting an hex size, not a general token. token(Data, - fun (Rest, _) when byte_size(Rest) < 4 -> - more; - (<< "\r\n", Rest/binary >>, BinLen) -> + fun (<< "\r\n", Rest/binary >>, BinLen) -> Len = list_to_integer(binary_to_list(BinLen), 16), te_chunked(Rest, {Len, Streamed}); + %% Chunk size shouldn't take too many bytes, + %% don't try to stream forever. + (Rest, _) when byte_size(Rest) < 16 -> + more; (_, _) -> {error, badarg} end); @@ -827,13 +828,12 @@ te_chunked(Data, {ChunkRem, Streamed}) when byte_size(Data) >= ChunkRem + 2 -> << Chunk:ChunkRem/binary, "\r\n", Rest/binary >> = Data, {ok, Chunk, Rest, {0, Streamed + byte_size(Chunk)}}; te_chunked(Data, {ChunkRem, Streamed}) -> - Size = byte_size(Data), - {ok, Data, {ChunkRem - Size, Streamed + Size}}. + {more, ChunkRem + 2, Data, {ChunkRem, Streamed}}. %% @doc Decode an identity stream. --spec te_identity(binary(), {non_neg_integer(), non_neg_integer()}) - -> {ok, binary(), {non_neg_integer(), non_neg_integer()}} - | {done, binary(), non_neg_integer(), binary()}. +-spec te_identity(Bin, TransferState) + -> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin} + when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}. te_identity(Data, {Streamed, Total}) when Streamed + byte_size(Data) < Total -> {ok, Data, {Streamed + byte_size(Data), Total}}; diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index e9d5158..4ae28e9 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -639,17 +639,18 @@ stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}}) when Buffer =/= <<>> -> transfer_decode(Buffer, Req#http_req{buffer= <<>>}); stream_body(Req=#http_req{body_state={stream, _, _, _}}) -> - stream_body_recv(Req); + stream_body_recv(0, Req); stream_body(Req=#http_req{body_state=done}) -> {done, Req}. --spec stream_body_recv(Req) +-spec stream_body_recv(non_neg_integer(), Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -stream_body_recv(Req=#http_req{ +stream_body_recv(Length, Req=#http_req{ transport=Transport, socket=Socket, buffer=Buffer}) -> %% @todo Allow configuring the timeout. - case Transport:recv(Socket, 0, 5000) of - {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, Req); + case Transport:recv(Socket, Length, 5000) of + {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, + Req#http_req{buffer= <<>>}); {error, Reason} -> {error, Reason} end. @@ -667,7 +668,10 @@ transfer_decode(Data, Req=#http_req{ {stream, TransferDecode, TransferState2, ContentDecode}}); %% @todo {header(s) for chunked more -> - stream_body_recv(Req#http_req{buffer=Data}); + stream_body_recv(0, Req#http_req{buffer=Data}); + {more, Length, Rest, TransferState2} -> + stream_body_recv(Length, Req#http_req{buffer=Rest, body_state= + {stream, TransferDecode, TransferState2, ContentDecode}}); {done, Length, Rest} -> Req2 = transfer_decode_done(Length, Rest, Req), {done, Req2}; -- cgit v1.2.3