diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_protocol.erl | 5 | ||||
-rw-r--r-- | src/cowboy_req.erl | 433 |
2 files changed, 252 insertions, 186 deletions
diff --git a/src/cowboy_protocol.erl b/src/cowboy_protocol.erl index 22faf1b..3ac967e 100644 --- a/src/cowboy_protocol.erl +++ b/src/cowboy_protocol.erl @@ -469,8 +469,9 @@ next_request(Req, State=#state{req_keepalive=Keepalive, timeout=Timeout}, close -> terminate(State); _ -> - Buffer = case cowboy_req:skip_body(Req) of - {ok, Req2} -> cowboy_req:get(buffer, Req2); + %% Skip the body if it is reasonably sized. Close otherwise. + Buffer = case cowboy_req:body(Req) of + {ok, _, Req2} -> cowboy_req:get(buffer, Req2); _ -> close end, %% Flush the resp_sent message before moving on. diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index eec4e88..a651515 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -49,10 +49,6 @@ %% Request body API. -export([has_body/1]). -export([body_length/1]). --export([init_stream/4]). --export([stream_body/1]). --export([stream_body/2]). --export([skip_body/1]). -export([body/1]). -export([body/2]). -export([body_qs/1]). @@ -79,6 +75,7 @@ -export([chunked_reply/3]). -export([chunk/2]). -export([upgrade_reply/3]). +-export([continue/1]). -export([maybe_reply/2]). -export([ensure_response/2]). @@ -93,6 +90,16 @@ -export([lock/1]). -export([to_list/1]). +%% Deprecated API. +-export([init_stream/4]). +-deprecated({init_stream, 4}). +-export([stream_body/1]). +-deprecated({stream_body, 1}). +-export([stream_body/2]). +-deprecated({stream_body, 2}). +-export([skip_body/1]). +-deprecated({skip_body, 1}). + -type cookie_opts() :: cow_cookie:cookie_opts(). -export_type([cookie_opts/0]). @@ -102,6 +109,14 @@ -type transfer_decode_fun() :: fun((binary(), any()) -> cow_http_te:decode_ret()). +-type body_opts() :: [{continue, boolean()} + | {length, non_neg_integer()} + | {read_length, non_neg_integer()} + | {read_timeout, timeout()} + | {transfer_decode, transfer_decode_fun(), any()} + | {content_decode, content_decode_fun()}]. +-export_type([body_opts/0]). + -type resp_body_fun() :: fun((any(), module()) -> ok). -type send_chunk_fun() :: fun((iodata()) -> ok | {error, atom()}). -type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok). @@ -483,6 +498,107 @@ body_length(Req) -> {undefined, Req2} end. +-spec body(Req) + -> {ok, binary(), Req} | {more, binary(), Req} + | {error, atom()} when Req::req(). +body(Req) -> + body(Req, []). + +-spec body(Req, body_opts()) + -> {ok, binary(), Req} | {more, binary(), Req} + | {error, atom()} when Req::req(). +%% @todo This clause is kept for compatibility reasons, to be removed in 1.0. +body(MaxBodyLength, Req) when is_integer(MaxBodyLength) -> + body(Req, [{length, MaxBodyLength}]); +body(Req=#http_req{body_state=waiting}, Opts) -> + %% Send a 100 continue if needed (enabled by default). + Req1 = case lists:keyfind(continue, 1, Opts) of + {_, false} -> + Req; + _ -> + {ok, ExpectHeader, Req0} = parse_header(<<"expect">>, Req), + ok = case ExpectHeader of + [<<"100-continue">>] -> continue(Req0); + _ -> ok + end, + Req0 + end, + %% Initialize body streaming state. + CFun = case lists:keyfind(content_decode, 1, Opts) of + false -> + fun cowboy_http:ce_identity/1; + {_, CFun0} -> + CFun0 + end, + case lists:keyfind(transfer_decode, 1, Opts) of + false -> + case parse_header(<<"transfer-encoding">>, Req1) of + {ok, [<<"chunked">>], Req2} -> + body(Req2#http_req{body_state={stream, 0, + fun cow_http_te:stream_chunked/2, {0, 0}, CFun}}, Opts); + {ok, [<<"identity">>], Req2} -> + {Len, Req3} = body_length(Req2), + case Len of + 0 -> + {ok, <<>>, Req3#http_req{body_state=done}}; + _ -> + body(Req3#http_req{body_state={stream, Len, + fun cow_http_te:stream_identity/2, {0, Len}, + CFun}}, Opts) + end + end; + {_, TFun, TState} -> + body(Req1#http_req{body_state={stream, 0, + TFun, TState, CFun}}, Opts) + end; +body(Req=#http_req{body_state=done}, _) -> + {ok, <<>>, Req}; +body(Req, Opts) -> + ChunkLen = case lists:keyfind(length, 1, Opts) of + false -> 8000000; + {_, ChunkLen0} -> ChunkLen0 + end, + ReadLen = case lists:keyfind(read_length, 1, Opts) of + false -> 1000000; + {_, ReadLen0} -> ReadLen0 + end, + ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of + false -> 15000; + {_, ReadTimeout0} -> ReadTimeout0 + end, + body_loop(Req, ReadTimeout, ReadLen, ChunkLen, <<>>). + +body_loop(Req=#http_req{buffer=Buffer, body_state={stream, Length, _, _, _}}, + ReadTimeout, ReadLength, ChunkLength, Acc) -> + {Tag, Res, Req2} = case Buffer of + <<>> -> + body_recv(Req, ReadTimeout, min(Length, ReadLength)); + _ -> + body_decode(Req, ReadTimeout) + end, + case {Tag, Res} of + {ok, {ok, Data}} -> + {ok, << Acc/binary, Data/binary >>, Req2}; + {more, {ok, Data}} -> + Acc2 = << Acc/binary, Data/binary >>, + case byte_size(Acc2) >= ChunkLength of + true -> {more, Acc2, Req2}; + false -> body_loop(Req2, ReadTimeout, ReadLength, ChunkLength, Acc2) + end; + _ -> %% Error. + Res + end. + +body_recv(Req=#http_req{transport=Transport, socket=Socket, buffer=Buffer}, + ReadTimeout, ReadLength) -> + case Transport:recv(Socket, ReadLength, ReadTimeout) of + {ok, Data} -> + body_decode(Req#http_req{buffer= << Buffer/binary, Data/binary >>}, + ReadTimeout); + Error = {error, _} -> + {error, Error, Req} + end. + %% Two decodings happen. First a decoding function is applied to the %% transferred data, and then another is applied to the actual content. %% @@ -491,149 +607,92 @@ body_length(Req) -> %% also initialized through this function. %% %% Content encoding is generally used for compression. --spec init_stream(transfer_decode_fun(), any(), content_decode_fun(), Req) - -> {ok, Req} when Req::req(). -init_stream(TransferDecode, TransferState, ContentDecode, Req) -> - {ok, Req#http_req{body_state= - {stream, 0, TransferDecode, TransferState, ContentDecode}}}. - --spec stream_body(Req) -> {ok, binary(), Req} - | {done, Req} | {error, atom()} when Req::req(). -stream_body(Req) -> - stream_body(1000000, Req). - --spec stream_body(non_neg_integer(), Req) -> {ok, binary(), Req} - | {done, Req} | {error, atom()} when Req::req(). -stream_body(MaxLength, Req=#http_req{body_state=waiting, version=Version, - transport=Transport, socket=Socket}) -> - {ok, ExpectHeader, Req1} = parse_header(<<"expect">>, Req), - case ExpectHeader of - [<<"100-continue">>] -> - HTTPVer = atom_to_binary(Version, latin1), - Transport:send(Socket, - << HTTPVer/binary, " ", (status(100))/binary, "\r\n\r\n" >>); - undefined -> - ok - end, - case parse_header(<<"transfer-encoding">>, Req1) of - {ok, [<<"chunked">>], Req2} -> - stream_body(MaxLength, Req2#http_req{body_state= - {stream, 0, - fun cow_http_te:stream_chunked/2, {0, 0}, - fun cowboy_http:ce_identity/1}}); - {ok, [<<"identity">>], Req2} -> - {Length, Req3} = body_length(Req2), - case Length of - 0 -> - {done, Req3#http_req{body_state=done}}; - Length -> - stream_body(MaxLength, Req3#http_req{body_state= - {stream, Length, - fun cow_http_te:stream_identity/2, {0, Length}, - fun cowboy_http:ce_identity/1}}) - end - end; -stream_body(_, Req=#http_req{body_state=done}) -> - {done, Req}; -stream_body(_, Req=#http_req{buffer=Buffer}) - when Buffer =/= <<>> -> - transfer_decode(Buffer, Req#http_req{buffer= <<>>}); -stream_body(MaxLength, Req) -> - stream_body_recv(MaxLength, Req). - --spec stream_body_recv(non_neg_integer(), Req) - -> {ok, binary(), Req} | {error, atom()} when Req::req(). -stream_body_recv(MaxLength, Req=#http_req{ - transport=Transport, socket=Socket, buffer=Buffer, - body_state={stream, Length, _, _, _}}) -> - %% @todo Allow configuring the timeout. - case Transport:recv(Socket, min(Length, MaxLength), 5000) of - {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>, - Req#http_req{buffer= <<>>}); - {error, Reason} -> {error, Reason} - end. - +%% %% @todo Handle chunked after-the-facts headers. %% @todo Depending on the length returned we might want to 0 or +5 it. --spec transfer_decode(binary(), Req) - -> {ok, binary(), Req} | {error, atom()} when Req::req(). -transfer_decode(Data, Req=#http_req{body_state={stream, _, - TransferDecode, TransferState, ContentDecode}}) -> - case TransferDecode(Data, TransferState) of +body_decode(Req=#http_req{buffer=Data, body_state={stream, _, + TDecode, TState, CDecode}}, ReadTimeout) -> + case TDecode(Data, TState) of more -> - stream_body_recv(0, Req#http_req{buffer=Data, body_state={stream, - 0, TransferDecode, TransferState, ContentDecode}}); - {more, Data2, TransferState2} -> - content_decode(ContentDecode, Data2, - Req#http_req{body_state={stream, 0, - TransferDecode, TransferState2, ContentDecode}}); - {more, Data2, Length, TransferState2} when is_integer(Length) -> - content_decode(ContentDecode, Data2, - Req#http_req{body_state={stream, Length, - TransferDecode, TransferState2, ContentDecode}}); - {more, Data2, Rest, TransferState2} -> - content_decode(ContentDecode, Data2, - Req#http_req{buffer=Rest, body_state={stream, 0, - TransferDecode, TransferState2, ContentDecode}}); - {done, Length, Rest} -> - Req2 = transfer_decode_done(Length, Rest, Req), - {done, Req2}; - {done, Data2, Length, Rest} -> - Req2 = transfer_decode_done(Length, Rest, Req), - content_decode(ContentDecode, Data2, Req2) + body_recv(Req#http_req{body_state={stream, 0, + TDecode, TState, CDecode}}, ReadTimeout, 0); + {more, Data2, TState2} -> + {more, CDecode(Data2), Req#http_req{body_state={stream, 0, + TDecode, TState2, CDecode}, buffer= <<>>}}; + {more, Data2, Length, TState2} when is_integer(Length) -> + {more, CDecode(Data2), Req#http_req{body_state={stream, Length, + TDecode, TState2, CDecode}, buffer= <<>>}}; + {more, Data2, Rest, TState2} -> + {more, CDecode(Data2), Req#http_req{body_state={stream, 0, + TDecode, TState2, CDecode}, buffer=Rest}}; + {done, TotalLength, Rest} -> + {ok, {ok, <<>>}, body_decode_end(Req, TotalLength, Rest)}; + {done, Data2, TotalLength, Rest} -> + {ok, CDecode(Data2), body_decode_end(Req, TotalLength, Rest)} end. --spec transfer_decode_done(non_neg_integer(), binary(), Req) - -> Req when Req::req(). -transfer_decode_done(Length, Rest, Req=#http_req{ - headers=Headers, p_headers=PHeaders}) -> +body_decode_end(Req=#http_req{headers=Headers, p_headers=PHeaders}, + TotalLength, Rest) -> Headers2 = lists:keystore(<<"content-length">>, 1, Headers, - {<<"content-length">>, list_to_binary(integer_to_list(Length))}), + {<<"content-length">>, list_to_binary(integer_to_list(TotalLength))}), %% At this point we just assume TEs were all decoded. Headers3 = lists:keydelete(<<"transfer-encoding">>, 1, Headers2), PHeaders2 = lists:keystore(<<"content-length">>, 1, PHeaders, - {<<"content-length">>, Length}), + {<<"content-length">>, TotalLength}), PHeaders3 = lists:keydelete(<<"transfer-encoding">>, 1, PHeaders2), Req#http_req{buffer=Rest, body_state=done, headers=Headers3, p_headers=PHeaders3}. --spec content_decode(content_decode_fun(), binary(), Req) - -> {ok, binary(), Req} | {error, atom()} when Req::req(). -content_decode(ContentDecode, Data, Req) -> - case ContentDecode(Data) of - {ok, Data2} -> {ok, Data2, Req}; - {error, Reason} -> {error, Reason} +-spec body_qs(Req) + -> {ok, [{binary(), binary() | true}], Req} | {error, atom()} + when Req::req(). +body_qs(Req) -> + body_qs(Req, [ + {length, 64000}, + {read_length, 64000}, + {read_timeout, 5000}]). + +-spec body_qs(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req} + | {badlength, Req} | {error, atom()} when Req::req(). +%% @todo This clause is kept for compatibility reasons, to be removed in 1.0. +body_qs(MaxBodyLength, Req) when is_integer(MaxBodyLength) -> + body_qs(Req, [{length, MaxBodyLength}]); +body_qs(Req, Opts) -> + case body(Req, Opts) of + {ok, Body, Req2} -> + {ok, cow_qs:parse_qs(Body), Req2}; + {more, _, Req2} -> + {badlength, Req2}; + {error, Reason} -> + {error, Reason} end. --spec body(Req) -> {ok, binary(), Req} | {error, atom()} when Req::req(). -body(Req) -> - body(8000000, Req). +%% Deprecated body API. +%% @todo The following 4 functions will be removed in Cowboy 1.0. --spec body(non_neg_integer() | infinity, Req) - -> {ok, binary(), Req} | {error, atom()} when Req::req(). -body(MaxBodyLength, Req) -> - case parse_header(<<"transfer-encoding">>, Req) of - {ok, [<<"identity">>], Req2} -> - {ok, Length, Req3} = parse_header(<<"content-length">>, Req2, 0), - if Length > MaxBodyLength -> - {error, badlength}; - true -> - read_body(Req3, <<>>) - end; - {ok, _, _} -> - {error, chunked} - end. +-spec init_stream(transfer_decode_fun(), any(), content_decode_fun(), Req) + -> {ok, Req} when Req::req(). +init_stream(TransferDecode, TransferState, ContentDecode, Req) -> + {ok, Req#http_req{body_state= + {stream, 0, TransferDecode, TransferState, ContentDecode}}}. --spec read_body(Req, binary()) - -> {ok, binary(), Req} | {error, atom()} when Req::req(). -read_body(Req, Acc) -> - case stream_body(Req) of +-spec stream_body(Req) -> {ok, binary(), Req} + | {done, Req} | {error, atom()} when Req::req(). +stream_body(Req) -> + stream_body(1000000, Req). + +-spec stream_body(non_neg_integer(), Req) -> {ok, binary(), Req} + | {done, Req} | {error, atom()} when Req::req(). +stream_body(ChunkLength, Req) -> + case body(Req, [{length, ChunkLength}]) of + {ok, <<>>, Req2} -> + {done, Req2}; {ok, Data, Req2} -> - read_body(Req2, << Acc/binary, Data/binary >>); - {done, Req2} -> - {ok, Acc, Req2}; - {error, Reason} -> - {error, Reason} + {ok, Data, Req2}; + {more, Data, Req2} -> + {ok, Data, Req2}; + Error = {error, _} -> + Error end. -spec skip_body(Req) -> {ok, Req} | {error, atom()} when Req::req(). @@ -644,43 +703,34 @@ skip_body(Req) -> {error, Reason} -> {error, Reason} end. --spec body_qs(Req) - -> {ok, [{binary(), binary() | true}], Req} | {error, atom()} - when Req::req(). -body_qs(Req) -> - body_qs(16000, Req). - -%% Essentially a POST query string. --spec body_qs(non_neg_integer() | infinity, Req) - -> {ok, [{binary(), binary() | true}], Req} | {error, atom()} - when Req::req(). -body_qs(MaxBodyLength, Req) -> - case body(MaxBodyLength, Req) of - {ok, Body, Req2} -> - {ok, cow_qs:parse_qs(Body), Req2}; - {error, Reason} -> - {error, Reason} - end. - %% Multipart API. -spec part(Req) -> {ok, cow_multipart:headers(), Req} | {done, Req} when Req::req(). -part(Req=#http_req{multipart=undefined}) -> - part(init_multipart(Req)); part(Req) -> - {ok, Data, Req2} = stream_multipart(Req), - part(Data, Req2). + part(Req, [ + {length, 64000}, + {read_length, 64000}, + {read_timeout, 5000}]). -part(Buffer, Req=#http_req{multipart={Boundary, _}}) -> +-spec part(Req, body_opts()) + -> {ok, cow_multipart:headers(), Req} | {done, Req} + when Req::req(). +part(Req=#http_req{multipart=undefined}, Opts) -> + part(init_multipart(Req), Opts); +part(Req, Opts) -> + {Data, Req2} = stream_multipart(Req, Opts), + part(Data, Opts, Req2). + +part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) -> case cow_multipart:parse_headers(Buffer, Boundary) of more -> - {ok, Data, Req2} = stream_multipart(Req), - part(<< Buffer/binary, Data/binary >>, Req2); + {Data, Req2} = stream_multipart(Req, Opts), + part(<< Buffer/binary, Data/binary >>, Opts, Req2); {more, Buffer2} -> - {ok, Data, Req2} = stream_multipart(Req), - part(<< Buffer2/binary, Data/binary >>, Req2); + {Data, Req2} = stream_multipart(Req, Opts), + part(<< Buffer2/binary, Data/binary >>, Opts, Req2); {ok, Headers, Rest} -> {ok, Headers, Req#http_req{multipart={Boundary, Rest}}}; %% Ignore epilogue. @@ -692,33 +742,39 @@ part(Buffer, Req=#http_req{multipart={Boundary, _}}) -> -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). part_body(Req) -> - part_body(8000000, Req). + part_body(Req, []). --spec part_body(non_neg_integer(), Req) +-spec part_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). -part_body(MaxLength, Req=#http_req{multipart=undefined}) -> - part_body(MaxLength, init_multipart(Req)); -part_body(MaxLength, Req) -> - part_body(<<>>, MaxLength, Req, <<>>). - -part_body(Buffer, MaxLength, Req=#http_req{multipart={Boundary, _}}, Acc) - when byte_size(Acc) > MaxLength -> - {more, Acc, Req#http_req{multipart={Boundary, Buffer}}}; -part_body(Buffer, MaxLength, Req=#http_req{multipart={Boundary, _}}, Acc) -> - {ok, Data, Req2} = stream_multipart(Req), - case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of - {ok, Body} -> - part_body(<<>>, MaxLength, Req2, << Acc/binary, Body/binary >>); - {ok, Body, Rest} -> - part_body(Rest, MaxLength, Req2, << Acc/binary, Body/binary >>); - done -> - {ok, Acc, Req2}; - {done, Body} -> - {ok, << Acc/binary, Body/binary >>, Req2}; - {done, Body, Rest} -> - {ok, << Acc/binary, Body/binary >>, - Req2#http_req{multipart={Boundary, Rest}}} +part_body(Req=#http_req{multipart=undefined}, Opts) -> + part_body(init_multipart(Req), Opts); +part_body(Req, Opts) -> + part_body(<<>>, Opts, Req, <<>>). + +part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) -> + ChunkLen = case lists:keyfind(length, 1, Opts) of + false -> 8000000; + {_, ChunkLen0} -> ChunkLen0 + end, + case byte_size(Acc) > ChunkLen of + true -> + {more, Acc, Req#http_req{multipart={Boundary, Buffer}}}; + false -> + {Data, Req2} = stream_multipart(Req, Opts), + case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of + {ok, Body} -> + part_body(<<>>, Opts, Req2, << Acc/binary, Body/binary >>); + {ok, Body, Rest} -> + part_body(Rest, Opts, Req2, << Acc/binary, Body/binary >>); + done -> + {ok, Acc, Req2}; + {done, Body} -> + {ok, << Acc/binary, Body/binary >>, Req2}; + {done, Body, Rest} -> + {ok, << Acc/binary, Body/binary >>, + Req2#http_req{multipart={Boundary, Rest}}} + end end. init_multipart(Req) -> @@ -727,10 +783,12 @@ init_multipart(Req) -> {_, Boundary} = lists:keyfind(<<"boundary">>, 1, Params), Req2#http_req{multipart={Boundary, <<>>}}. -stream_multipart(Req=#http_req{multipart={_, <<>>}}) -> - stream_body(Req); -stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}) -> - {ok, Buffer, Req#http_req{multipart={Boundary, <<>>}}}. +stream_multipart(Req=#http_req{body_state=BodyState, multipart={_, <<>>}}, Opts) -> + true = BodyState =/= done, + {_, Data, Req2} = body(Req, Opts), + {Data, Req2}; +stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) -> + {Buffer, Req#http_req{multipart={Boundary, <<>>}}}. %% Response API. @@ -970,6 +1028,13 @@ upgrade_reply(Status, Headers, Req=#http_req{transport=Transport, ], <<>>, Req), {ok, Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}. +-spec continue(req()) -> ok | {error, atom()}. +continue(#http_req{socket=Socket, transport=Transport, + version=Version}) -> + HTTPVer = atom_to_binary(Version, latin1), + Transport:send(Socket, + << HTTPVer/binary, " ", (status(100))/binary, "\r\n\r\n" >>). + %% Meant to be used internally for sending errors after crashes. -spec maybe_reply(cowboy:http_status(), req()) -> ok. maybe_reply(Status, Req) -> |