From 85d05fff340198bb9af332b7fd503f7c8883e634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 29 Jan 2013 19:12:34 +0100 Subject: Fix chunked streaming of request body and improve speed --- src/cowboy_http.erl | 28 ++++++++++++++-------------- src/cowboy_req.erl | 16 ++++++++++------ test/http_SUITE.erl | 17 +++++++++++++++++ 3 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index e1e1487..6f42ab9 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -804,22 +804,23 @@ qvalue(Data, Fun, Q, _M) -> %% Decoding. %% @doc Decode a stream of chunks. --spec te_chunked(binary(), {non_neg_integer(), non_neg_integer()}) - -> more | {ok, binary(), {non_neg_integer(), non_neg_integer()}} - | {ok, binary(), binary(), {non_neg_integer(), non_neg_integer()}} - | {done, non_neg_integer(), binary()} | {error, badarg}. -te_chunked(<<>>, _) -> - more; +-spec te_chunked(Bin, TransferState) + -> more | {more, non_neg_integer(), Bin, TransferState} + | {ok, Bin, TransferState} | {ok, Bin, Bin, TransferState} + | {done, non_neg_integer(), Bin} | {error, badarg} + when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}. te_chunked(<< "0\r\n\r\n", Rest/binary >>, {0, Streamed}) -> {done, Streamed, Rest}; te_chunked(Data, {0, Streamed}) -> %% @todo We are expecting an hex size, not a general token. token(Data, - fun (Rest, _) when byte_size(Rest) < 4 -> - more; - (<< "\r\n", Rest/binary >>, BinLen) -> + fun (<< "\r\n", Rest/binary >>, BinLen) -> Len = list_to_integer(binary_to_list(BinLen), 16), te_chunked(Rest, {Len, Streamed}); + %% Chunk size shouldn't take too many bytes, + %% don't try to stream forever. + (Rest, _) when byte_size(Rest) < 16 -> + more; (_, _) -> {error, badarg} end); @@ -827,13 +828,12 @@ te_chunked(Data, {ChunkRem, Streamed}) when byte_size(Data) >= ChunkRem + 2 -> << Chunk:ChunkRem/binary, "\r\n", Rest/binary >> = Data, {ok, Chunk, Rest, {0, Streamed + byte_size(Chunk)}}; te_chunked(Data, {ChunkRem, Streamed}) -> - Size = byte_size(Data), - {ok, Data, {ChunkRem - Size, Streamed + Size}}. + {more, ChunkRem + 2, Data, {ChunkRem, Streamed}}. %% @doc Decode an identity stream. --spec te_identity(binary(), {non_neg_integer(), non_neg_integer()}) - -> {ok, binary(), {non_neg_integer(), non_neg_integer()}} - | {done, binary(), non_neg_integer(), binary()}. +-spec te_identity(Bin, TransferState) + -> {ok, Bin, TransferState} | {done, Bin, non_neg_integer(), Bin} + when Bin::binary(), TransferState::{non_neg_integer(), non_neg_integer()}. te_identity(Data, {Streamed, Total}) when Streamed + byte_size(Data) < Total -> {ok, Data, {Streamed + byte_size(Data), Total}}; diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index e9d5158..4ae28e9 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -639,17 +639,18 @@ stream_body(Req=#http_req{buffer=Buffer, body_state={stream, _, _, _}}) when Buffer =/= <<>> -> transfer_decode(Buffer, Req#http_req{buffer= <<>>}); stream_body(Req=#http_req{body_state={stream, _, _, _}}) -> - stream_body_recv(Req); + stream_body_recv(0, Req); stream_body(Req=#http_req{body_state=done}) -> {done, Req}. --spec stream_body_recv(Req) +-spec stream_body_recv(non_neg_integer(), Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -stream_body_recv(Req=#http_req{ +stream_body_recv(Length, Req=#http_req{ transport=Transport, socket=Socket, buffer=Buffer}) -> %% @todo Allow configuring the timeout. - case Transport:recv(Socket, 0, 5000) of - {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, Req); + case Transport:recv(Socket, Length, 5000) of + {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, + Req#http_req{buffer= <<>>}); {error, Reason} -> {error, Reason} end. @@ -667,7 +668,10 @@ transfer_decode(Data, Req=#http_req{ {stream, TransferDecode, TransferState2, ContentDecode}}); %% @todo {header(s) for chunked more -> - stream_body_recv(Req#http_req{buffer=Data}); + stream_body_recv(0, Req#http_req{buffer=Data}); + {more, Length, Rest, TransferState2} -> + stream_body_recv(Length, Req#http_req{buffer=Rest, body_state= + {stream, TransferDecode, TransferState2, ContentDecode}}); {done, Length, Rest} -> Req2 = transfer_decode_done(Length, Rest, Req), {done, Req2}; diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index 25ce595..637bb0d 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -73,6 +73,7 @@ -export([stream_body_set_resp/1]). -export([stream_body_set_resp_close/1]). -export([te_chunked/1]). +-export([te_chunked_chopped/1]). -export([te_chunked_delayed/1]). -export([te_identity/1]). @@ -133,6 +134,7 @@ groups() -> stream_body_set_resp, stream_body_set_resp_close, te_chunked, + te_chunked_chopped, te_chunked_delayed, te_identity ], @@ -1037,6 +1039,21 @@ te_chunked(Config) -> {ok, 200, _, Client3} = cowboy_client:response(Client2), {ok, Body, _} = cowboy_client:response_body(Client3). +te_chunked_chopped(Config) -> + Client = ?config(client, Config), + Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])), + Body2 = iolist_to_binary(body_to_chunks(50, Body, [])), + {ok, Client2} = cowboy_client:request(<<"GET">>, + build_url("/echo/body", Config), + [{<<"transfer-encoding">>, <<"chunked">>}], Client), + {ok, Transport, Socket} = cowboy_client:transport(Client2), + _ = [begin + ok = Transport:send(Socket, << C >>), + ok = timer:sleep(10) + end || << C >> <= Body2], + {ok, 200, _, Client3} = cowboy_client:response(Client2), + {ok, Body, _} = cowboy_client:response_body(Client3). + te_chunked_delayed(Config) -> Client = ?config(client, Config), Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])), -- cgit v1.2.3