@@ -469,8 +469,9 @@ next_request(Req, State=#state{req_keepalive=Keepalive, timeout=Timeout},
close ->
_ ->
- Buffer = case cowboy_req:skip_body(Req) of
- {ok, Req2} -> cowboy_req:get(buffer, Req2);
+ %% Skip the body if it is reasonably sized. Close otherwise.
+ Buffer = case cowboy_req:body(Req) of
+ {ok, _, Req2} -> cowboy_req:get(buffer, Req2);
_ -> close
%% Flush the resp_sent message before moving on.
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index eec4e88..a651515 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -49,10 +49,6 @@
%% Request body API.
@@ -79,6 +75,7 @@
@@ -93,6 +90,16 @@
+%% Deprecated API.
+-deprecated({init_stream, 4}).
+-deprecated({stream_body, 1}).
+-deprecated({stream_body, 2}).
+-deprecated({skip_body, 1}).
-type cookie_opts() :: cow_cookie:cookie_opts().
@@ -102,6 +109,14 @@
-type transfer_decode_fun() :: fun((binary(), any())
-> cow_http_te:decode_ret()).
+-type body_opts() :: [{continue, boolean()}
+ | {length, non_neg_integer()}
+ | {read_length, non_neg_integer()}
+ | {read_timeout, timeout()}
+ | {transfer_decode, transfer_decode_fun(), any()}
+ | {content_decode, content_decode_fun()}].
-type resp_body_fun() :: fun((any(), module()) -> ok).
-type send_chunk_fun() :: fun((iodata()) -> ok | {error, atom()}).
-type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok).
@@ -483,6 +498,107 @@ body_length(Req) ->
{undefined, Req2}
+-spec body(Req)
+ -> {ok, binary(), Req} | {more, binary(), Req}
+ | {error, atom()} when Req::req().
+body(Req) ->
+ body(Req, []).
+-spec body(Req, body_opts())
+ -> {ok, binary(), Req} | {more, binary(), Req}
+ | {error, atom()} when Req::req().
+%% @todo This clause is kept for compatibility reasons, to be removed in 1.0.
+body(MaxBodyLength, Req) when is_integer(MaxBodyLength) ->
+ body(Req, [{length, MaxBodyLength}]);
+body(Req=#http_req{body_state=waiting}, Opts) ->
+ %% Send a 100 continue if needed (enabled by default).
+ Req1 = case lists:keyfind(continue, 1, Opts) of
+ {_, false} ->
+ Req;
+ _ ->
+ {ok, ExpectHeader, Req0} = parse_header(<<"expect">>, Req),
+ ok = case ExpectHeader of
+ [<<"100-continue">>] -> continue(Req0);
+ _ -> ok
+ end,
+ Req0
+ end,
+ %% Initialize body streaming state.
+ CFun = case lists:keyfind(content_decode, 1, Opts) of
+ false ->
+ fun cowboy_http:ce_identity/1;
+ {_, CFun0} ->
+ CFun0
+ end,
+ case lists:keyfind(transfer_decode, 1, Opts) of
+ false ->
+ case parse_header(<<"transfer-encoding">>, Req1) of
+ {ok, [<<"chunked">>], Req2} ->
+ body(Req2#http_req{body_state={stream, 0,
+ fun cow_http_te:stream_chunked/2, {0, 0}, CFun}}, Opts);
+ {ok, [<<"identity">>], Req2} ->
+ {Len, Req3} = body_length(Req2),
+ case Len of
+ 0 ->
+ {ok, <<>>, Req3#http_req{body_state=done}};
+ _ ->
+ body(Req3#http_req{body_state={stream, Len,
+ fun cow_http_te:stream_identity/2, {0, Len},
+ CFun}}, Opts)
+ end
+ end;
+ {_, TFun, TState} ->
+ body(Req1#http_req{body_state={stream, 0,
+ TFun, TState, CFun}}, Opts)
+ end;
+body(Req=#http_req{body_state=done}, _) ->
+ {ok, <<>>, Req};
+body(Req, Opts) ->
+ ChunkLen = case lists:keyfind(length, 1, Opts) of
+ false -> 8000000;
+ {_, ChunkLen0} -> ChunkLen0
+ end,
+ ReadLen = case lists:keyfind(read_length, 1, Opts) of
+ false -> 1000000;
+ {_, ReadLen0} -> ReadLen0
+ end,
+ ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of
+ false -> 15000;
+ {_, ReadTimeout0} -> ReadTimeout0
+ end,
+ body_loop(Req, ReadTimeout, ReadLen, ChunkLen, <<>>).
+body_loop(Req=#http_req{buffer=Buffer, body_state={stream, Length, _, _, _}},
+ ReadTimeout, ReadLength, ChunkLength, Acc) ->
+ {Tag, Res, Req2} = case Buffer of
+ <<>> ->
+ body_recv(Req, ReadTimeout, min(Length, ReadLength));
+ _ ->
+ body_decode(Req, ReadTimeout)
+ end,
+ case {Tag, Res} of
+ {ok, {ok, Data}} ->
+ {ok, << Acc/binary, Data/binary >>, Req2};
+ {more, {ok, Data}} ->
+ Acc2 = << Acc/binary, Data/binary >>,
+ case byte_size(Acc2) >= ChunkLength of
+ true -> {more, Acc2, Req2};
+ false -> body_loop(Req2, ReadTimeout, ReadLength, ChunkLength, Acc2)
+ end;
+ _ -> %% Error.
+ Res
+ end.
+body_recv(Req=#http_req{transport=Transport, socket=Socket, buffer=Buffer},
+ ReadTimeout, ReadLength) ->
+ case Transport:recv(Socket, ReadLength, ReadTimeout) of
+ {ok, Data} ->
+ body_decode(Req#http_req{buffer= << Buffer/binary, Data/binary >>},
+ ReadTimeout);
+ Error = {error, _} ->
+ {error, Error, Req}
+ end.
%% Two decodings happen. First a decoding function is applied to the
%% transferred data, and then another is applied to the actual content.
@@ -491,149 +607,92 @@ body_length(Req) ->
%% also initialized through this function.
%% Content encoding is generally used for compression.
--spec init_stream(transfer_decode_fun(), any(), content_decode_fun(), Req)
- -> {ok, Req} when Req::req().
-init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
- {ok, Req#http_req{body_state=
- {stream, 0, TransferDecode, TransferState, ContentDecode}}}.
--spec stream_body(Req) -> {ok, binary(), Req}
- | {done, Req} | {error, atom()} when Req::req().
-stream_body(Req) ->
- stream_body(1000000, Req).
--spec stream_body(non_neg_integer(), Req) -> {ok, binary(), Req}
- | {done, Req} | {error, atom()} when Req::req().
-stream_body(MaxLength, Req=#http_req{body_state=waiting, version=Version,
- transport=Transport, socket=Socket}) ->
- {ok, ExpectHeader, Req1} = parse_header(<<"expect">>, Req),
- case ExpectHeader of
- [<<"100-continue">>] ->
- HTTPVer = atom_to_binary(Version, latin1),
- Transport:send(Socket,
- << HTTPVer/binary, " ", (status(100))/binary, "\r\n\r\n" >>);
- undefined ->
- ok
- end,
- case parse_header(<<"transfer-encoding">>, Req1) of
- {ok, [<<"chunked">>], Req2} ->
- stream_body(MaxLength, Req2#http_req{body_state=
- {stream, 0,
- fun cow_http_te:stream_chunked/2, {0, 0},
- fun cowboy_http:ce_identity/1}});
- {ok, [<<"identity">>], Req2} ->
- {Length, Req3} = body_length(Req2),
- case Length of
- 0 ->
- {done, Req3#http_req{body_state=done}};
- Length ->
- stream_body(MaxLength, Req3#http_req{body_state=
- {stream, Length,
- fun cow_http_te:stream_identity/2, {0, Length},
- fun cowboy_http:ce_identity/1}})
- end
- end;
-stream_body(_, Req=#http_req{body_state=done}) ->
- {done, Req};
-stream_body(_, Req=#http_req{buffer=Buffer})
- when Buffer =/= <<>> ->
- transfer_decode(Buffer, Req#http_req{buffer= <<>>});
-stream_body(MaxLength, Req) ->
- stream_body_recv(MaxLength, Req).
--spec stream_body_recv(non_neg_integer(), Req)
- -> {ok, binary(), Req} | {error, atom()} when Req::req().
-stream_body_recv(MaxLength, Req=#http_req{
- transport=Transport, socket=Socket, buffer=Buffer,
- body_state={stream, Length, _, _, _}}) ->
- %% @todo Allow configuring the timeout.
- case Transport:recv(Socket, min(Length, MaxLength), 5000) of
- {ok, Data} -> transfer_decode(<< Buffer/binary, Data/binary >>,
- Req#http_req{buffer= <<>>});
- {error, Reason} -> {error, Reason}
- end.
%% @todo Handle chunked after-the-facts headers.
%% @todo Depending on the length returned we might want to 0 or +5 it.
--spec transfer_decode(binary(), Req)
- -> {ok, binary(), Req} | {error, atom()} when Req::req().
-transfer_decode(Data, Req=#http_req{body_state={stream, _,
- TransferDecode, TransferState, ContentDecode}}) ->
- case TransferDecode(Data, TransferState) of
+body_decode(Req=#http_req{buffer=Data, body_state={stream, _,
+ TDecode, TState, CDecode}}, ReadTimeout) ->
+ case TDecode(Data, TState) of
more ->
- stream_body_recv(0, Req#http_req{buffer=Data, body_state={stream,
- 0, TransferDecode, TransferState, ContentDecode}});
- {more, Data2, TransferState2} ->
- content_decode(ContentDecode, Data2,
- Req#http_req{body_state={stream, 0,
- TransferDecode, TransferState2, ContentDecode}});
- {more, Data2, Length, TransferState2} when is_integer(Length) ->
- content_decode(ContentDecode, Data2,
- Req#http_req{body_state={stream, Length,
- TransferDecode, TransferState2, ContentDecode}});
- {more, Data2, Rest, TransferState2} ->
- content_decode(ContentDecode, Data2,
- Req#http_req{buffer=Rest, body_state={stream, 0,
- TransferDecode, TransferState2, ContentDecode}});
- {done, Length, Rest} ->
- Req2 = transfer_decode_done(Length, Rest, Req),
- {done, Req2};
- {done, Data2, Length, Rest} ->
- Req2 = transfer_decode_done(Length, Rest, Req),
- content_decode(ContentDecode, Data2, Req2)
+ body_recv(Req#http_req{body_state={stream, 0,
+ TDecode, TState, CDecode}}, ReadTimeout, 0);
+ {more, Data2, TState2} ->
+ {more, CDecode(Data2), Req#http_req{body_state={stream, 0,
+ TDecode, TState2, CDecode}, buffer= <<>>}};
+ {more, Data2, Length, TState2} when is_integer(Length) ->
+ {more, CDecode(Data2), Req#http_req{body_state={stream, Length,
+ TDecode, TState2, CDecode}, buffer= <<>>}};
+ {more, Data2, Rest, TState2} ->
+ {more, CDecode(Data2), Req#http_req{body_state={stream, 0,
+ TDecode, TState2, CDecode}, buffer=Rest}};
+ {done, TotalLength, Rest} ->
+ {ok, {ok, <<>>}, body_decode_end(Req, TotalLength, Rest)};
+ {done, Data2, TotalLength, Rest} ->
+ {ok, CDecode(Data2), body_decode_end(Req, TotalLength, Rest)}
--spec transfer_decode_done(non_neg_integer(), binary(), Req)
- -> Req when Req::req().
-transfer_decode_done(Length, Rest, Req=#http_req{
- headers=Headers, p_headers=PHeaders}) ->
+body_decode_end(Req=#http_req{headers=Headers, p_headers=PHeaders},
+ TotalLength, Rest) ->
Headers2 = lists:keystore(<<"content-length">>, 1, Headers,
- {<<"content-length">>, list_to_binary(integer_to_list(Length))}),
+ {<<"content-length">>, list_to_binary(integer_to_list(TotalLength))}),
%% At this point we just assume TEs were all decoded.
Headers3 = lists:keydelete(<<"transfer-encoding">>, 1, Headers2),
PHeaders2 = lists:keystore(<<"content-length">>, 1, PHeaders,
- {<<"content-length">>, Length}),
+ {<<"content-length">>, TotalLength}),
PHeaders3 = lists:keydelete(<<"transfer-encoding">>, 1, PHeaders2),
Req#http_req{buffer=Rest, body_state=done,
headers=Headers3, p_headers=PHeaders3}.
--spec content_decode(content_decode_fun(), binary(), Req)
- -> {ok, binary(), Req} | {error, atom()} when Req::req().
-content_decode(ContentDecode, Data, Req) ->
- case ContentDecode(Data) of
- {ok, Data2} -> {ok, Data2, Req};
- {error, Reason} -> {error, Reason}
+-spec body_qs(Req)
+ -> {ok, [{binary(), binary() | true}], Req} | {error, atom()}
+ when Req::req().
+body_qs(Req) ->
+ body_qs(Req, [
+ {length, 64000},
+ {read_length, 64000},
+ {read_timeout, 5000}]).
+-spec body_qs(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req}
+ | {badlength, Req} | {error, atom()} when Req::req().
+%% @todo This clause is kept for compatibility reasons, to be removed in 1.0.
+body_qs(MaxBodyLength, Req) when is_integer(MaxBodyLength) ->
+ body_qs(Req, [{length, MaxBodyLength}]);
+body_qs(Req, Opts) ->
+ case body(Req, Opts) of
+ {ok, Body, Req2} ->
+ {ok, cow_qs:parse_qs(Body), Req2};
+ {more, _, Req2} ->
+ {badlength, Req2};
+ {error, Reason} ->
+ {error, Reason}
--spec body(Req) -> {ok, binary(), Req} | {error, atom()} when Req::req().
-body(Req) ->
- body(8000000, Req).
+%% Deprecated body API.
+%% @todo The following 4 functions will be removed in Cowboy 1.0.
--spec body(non_neg_integer() | infinity, Req)
- -> {ok, binary(), Req} | {error, atom()} when Req::req().
-body(MaxBodyLength, Req) ->
- case parse_header(<<"transfer-encoding">>, Req) of
- {ok, [<<"identity">>], Req2} ->
- {ok, Length, Req3} = parse_header(<<"content-length">>, Req2, 0),
- if Length > MaxBodyLength ->
- {error, badlength};
- true ->
- read_body(Req3, <<>>)
- end;
- {ok, _, _} ->
- {error, chunked}
- end.
+-spec init_stream(transfer_decode_fun(), any(), content_decode_fun(), Req)
+ -> {ok, Req} when Req::req().
+init_stream(TransferDecode, TransferState, ContentDecode, Req) ->
+ {ok, Req#http_req{body_state=
+ {stream, 0, TransferDecode, TransferState, ContentDecode}}}.
--spec read_body(Req, binary())
- -> {ok, binary(), Req} | {error, atom()} when Req::req().
-read_body(Req, Acc) ->
- case stream_body(Req) of
+-spec stream_body(Req) -> {ok, binary(), Req}
+ | {done, Req} | {error, atom()} when Req::req().
+stream_body(Req) ->
+ stream_body(1000000, Req).
+-spec stream_body(non_neg_integer(), Req) -> {ok, binary(), Req}
+ | {done, Req} | {error, atom()} when Req::req().
+stream_body(ChunkLength, Req) ->
+ case body(Req, [{length, ChunkLength}]) of
+ {ok, <<>>, Req2} ->
+ {done, Req2};
{ok, Data, Req2} ->
- read_body(Req2, << Acc/binary, Data/binary >>);
- {done, Req2} ->
- {ok, Acc, Req2};
- {error, Reason} ->
- {error, Reason}
+ {ok, Data, Req2};
+ {more, Data, Req2} ->
+ {ok, Data, Req2};
+ Error = {error, _} ->
+ Error
-spec skip_body(Req) -> {ok, Req} | {error, atom()} when Req::req().
@@ -644,43 +703,34 @@ skip_body(Req) ->
{error, Reason} -> {error, Reason}
--spec body_qs(Req)
- -> {ok, [{binary(), binary() | true}], Req} | {error, atom()}
- when Req::req().
-body_qs(Req) ->
- body_qs(16000, Req).
-%% Essentially a POST query string.
--spec body_qs(non_neg_integer() | infinity, Req)
- -> {ok, [{binary(), binary() | true}], Req} | {error, atom()}
- when Req::req().
-body_qs(MaxBodyLength, Req) ->
- case body(MaxBodyLength, Req) of
- {ok, Body, Req2} ->
- {ok, cow_qs:parse_qs(Body), Req2};
- {error, Reason} ->
- {error, Reason}
- end.
%% Multipart API.
-spec part(Req)
-> {ok, cow_multipart:headers(), Req} | {done, Req}
when Req::req().
-part(Req=#http_req{multipart=undefined}) ->
- part(init_multipart(Req));
part(Req) ->
- {ok, Data, Req2} = stream_multipart(Req),
- part(Data, Req2).
+ part(Req, [
+ {length, 64000},
+ {read_length, 64000},
+ {read_timeout, 5000}]).
-part(Buffer, Req=#http_req{multipart={Boundary, _}}) ->
+-spec part(Req, body_opts())
+ -> {ok, cow_multipart:headers(), Req} | {done, Req}
+ when Req::req().
+part(Req=#http_req{multipart=undefined}, Opts) ->
+ part(init_multipart(Req), Opts);
+part(Req, Opts) ->
+ {Data, Req2} = stream_multipart(Req, Opts),
+ part(Data, Opts, Req2).
+part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) ->
case cow_multipart:parse_headers(Buffer, Boundary) of
more ->
- {ok, Data, Req2} = stream_multipart(Req),
- part(<< Buffer/binary, Data/binary >>, Req2);
+ {Data, Req2} = stream_multipart(Req, Opts),
+ part(<< Buffer/binary, Data/binary >>, Opts, Req2);
{more, Buffer2} ->
- {ok, Data, Req2} = stream_multipart(Req),
- part(<< Buffer2/binary, Data/binary >>, Req2);
+ {Data, Req2} = stream_multipart(Req, Opts),
+ part(<< Buffer2/binary, Data/binary >>, Opts, Req2);
{ok, Headers, Rest} ->
{ok, Headers, Req#http_req{multipart={Boundary, Rest}}};
%% Ignore epilogue.
@@ -692,33 +742,39 @@ part(Buffer, Req=#http_req{multipart={Boundary, _}}) ->
-> {ok, binary(), Req} | {more, binary(), Req}
when Req::req().
part_body(Req) ->
- part_body(8000000, Req).
+ part_body(Req, []).
--spec part_body(non_neg_integer(), Req)
+-spec part_body(Req, body_opts())
-> {ok, binary(), Req} | {more, binary(), Req}
when Req::req().
-part_body(MaxLength, Req=#http_req{multipart=undefined}) ->
- part_body(MaxLength, init_multipart(Req));
-part_body(MaxLength, Req) ->
- part_body(<<>>, MaxLength, Req, <<>>).
-part_body(Buffer, MaxLength, Req=#http_req{multipart={Boundary, _}}, Acc)
- when byte_size(Acc) > MaxLength ->
- {more, Acc, Req#http_req{multipart={Boundary, Buffer}}};
-part_body(Buffer, MaxLength, Req=#http_req{multipart={Boundary, _}}, Acc) ->
- {ok, Data, Req2} = stream_multipart(Req),
- case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of
- {ok, Body} ->
- part_body(<<>>, MaxLength, Req2, << Acc/binary, Body/binary >>);
- {ok, Body, Rest} ->
- part_body(Rest, MaxLength, Req2, << Acc/binary, Body/binary >>);
- done ->
- {ok, Acc, Req2};
- {done, Body} ->
- {ok, << Acc/binary, Body/binary >>, Req2};
- {done, Body, Rest} ->
- {ok, << Acc/binary, Body/binary >>,
- Req2#http_req{multipart={Boundary, Rest}}}
+part_body(Req=#http_req{multipart=undefined}, Opts) ->
+ part_body(init_multipart(Req), Opts);
+part_body(Req, Opts) ->
+ part_body(<<>>, Opts, Req, <<>>).
+part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) ->
+ ChunkLen = case lists:keyfind(length, 1, Opts) of
+ false -> 8000000;
+ {_, ChunkLen0} -> ChunkLen0
+ end,
+ case byte_size(Acc) > ChunkLen of
+ true ->
+ {more, Acc, Req#http_req{multipart={Boundary, Buffer}}};
+ false ->
+ {Data, Req2} = stream_multipart(Req, Opts),
+ case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of
+ {ok, Body} ->
+ part_body(<<>>, Opts, Req2, << Acc/binary, Body/binary >>);
+ {ok, Body, Rest} ->
+ part_body(Rest, Opts, Req2, << Acc/binary, Body/binary >>);
+ done ->
+ {ok, Acc, Req2};
+ {done, Body} ->
+ {ok, << Acc/binary, Body/binary >>, Req2};
+ {done, Body, Rest} ->
+ {ok, << Acc/binary, Body/binary >>,
+ Req2#http_req{multipart={Boundary, Rest}}}
+ end
init_multipart(Req) ->
@@ -727,10 +783,12 @@ init_multipart(Req) ->
{_, Boundary} = lists:keyfind(<<"boundary">>, 1, Params),
Req2#http_req{multipart={Boundary, <<>>}}.
-stream_multipart(Req=#http_req{multipart={_, <<>>}}) ->
- stream_body(Req);
-stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}) ->
- {ok, Buffer, Req#http_req{multipart={Boundary, <<>>}}}.
+stream_multipart(Req=#http_req{body_state=BodyState, multipart={_, <<>>}}, Opts) ->
+ true = BodyState =/= done,
+ {_, Data, Req2} = body(Req, Opts),
+ {Data, Req2};
+stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) ->
+ {Buffer, Req#http_req{multipart={Boundary, <<>>}}}.
%% Response API.
@@ -970,6 +1028,13 @@ upgrade_reply(Status, Headers, Req=#http_req{transport=Transport,
], <<>>, Req),
{ok, Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}.
+-spec continue(req()) -> ok | {error, atom()}.
+continue(#http_req{socket=Socket, transport=Transport,
+ version=Version}) ->
+ HTTPVer = atom_to_binary(Version, latin1),
+ Transport:send(Socket,
+ << HTTPVer/binary, " ", (status(100))/binary, "\r\n\r\n" >>).
%% Meant to be used internally for sending errors after crashes.
-spec maybe_reply(cowboy:http_status(), req()) -> ok.
maybe_reply(Status, Req) ->