From faeb37ed80e1f2ae2bfa0c096b2a660c271adddb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Sat, 5 Jan 2013 23:35:30 +0100 Subject: Add cowboy_req:set_resp_body_fun/2 This allows streaming a body without knowing the length in advance. Also allows {stream, StreamFun} response body in the REST code. --- src/cowboy_req.erl | 50 ++++++++++++++++++++++++++++++--------- src/cowboy_rest.erl | 6 +++-- test/http_SUITE.erl | 22 +++++++++++++++++ test/http_handler_stream_body.erl | 11 ++++++--- 4 files changed, 73 insertions(+), 16 deletions(-) diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 5af1bf5..dab9410 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -89,6 +89,7 @@ -export([set_resp_cookie/4]). -export([set_resp_header/3]). -export([set_resp_body/2]). +-export([set_resp_body_fun/2]). -export([set_resp_body_fun/3]). -export([has_resp_header/2]). -export([has_resp_body/1]). @@ -157,7 +158,8 @@ %% Response. resp_state = waiting :: locked | waiting | chunks | done, resp_headers = [] :: cowboy_http:headers(), - resp_body = <<>> :: iodata() | {non_neg_integer(), resp_body_fun()}, + resp_body = <<>> :: iodata() | resp_body_fun() + | {non_neg_integer(), resp_body_fun()}, %% Functions. onresponse = undefined :: undefined | cowboy_protocol:onresponse_fun() @@ -821,20 +823,33 @@ set_resp_header(Name, Value, Req=#http_req{resp_headers=RespHeaders}) -> set_resp_body(Body, Req) -> Req#http_req{resp_body=Body}. +%% @doc Add a body stream function to the response. +%% +%% The body set here is ignored if the response is later sent using +%% anything other than reply/2 or reply/3. +%% +%% Setting a response stream function without a length means that the +%% body will be sent until the connection is closed. Cowboy will make +%% sure that the connection is closed with no extra step required. +%% +%% To inform the client that a body has been sent with this request, +%% Cowboy will add a "Transfer-Encoding: identity" header to the +%% response. +-spec set_resp_body_fun(resp_body_fun(), Req) -> Req when Req::req(). +set_resp_body_fun(StreamFun, Req) -> + Req#http_req{resp_body=StreamFun}. + %% @doc Add a body function to the response. %% -%% The response body may also be set to a content-length - stream-function pair. -%% If the response body is of this type normal response headers will be sent. -%% After the response headers has been sent the body function is applied. -%% The body function is expected to write the response body directly to the -%% socket using the transport module. +%% The body set here is ignored if the response is later sent using +%% anything other than reply/2 or reply/3. %% -%% If the body function crashes while writing the response body or writes fewer -%% bytes than declared the behaviour is undefined. The body set here is ignored -%% if the response is later sent using anything other than `reply/2' or -%% `reply/3'. +%% Cowboy will call the given response stream function after sending the +%% headers. This function must send the specified number of bytes to the +%% socket it will receive as argument. %% -%% @see cowboy_req:transport/1. +%% If the body function crashes while writing the response body or writes +%% fewer bytes than declared the behaviour is undefined. -spec set_resp_body_fun(non_neg_integer(), resp_body_fun(), Req) -> Req when Req::req(). set_resp_body_fun(StreamLen, StreamFun, Req) -> @@ -884,7 +899,20 @@ reply(Status, Headers, Body, Req=#http_req{ _ -> [] end, case Body of + BodyFun when is_function(BodyFun) -> + %% We stream the response body until we close the connection. + {RespType, Req2} = response(Status, Headers, RespHeaders, [ + {<<"connection">>, <<"close">>}, + {<<"date">>, cowboy_clock:rfc1123()}, + {<<"server">>, <<"Cowboy">>}, + {<<"transfer-encoding">>, <<"identity">>} + ], <<>>, Req#http_req{connection=close}), + if RespType =/= hook, Method =/= <<"HEAD">> -> + BodyFun(Socket, Transport); + true -> ok + end; {ContentLength, BodyFun} -> + %% We stream the response body for ContentLength bytes. {RespType, Req2} = response(Status, Headers, RespHeaders, [ {<<"content-length">>, integer_to_list(ContentLength)}, {<<"date">>, cowboy_clock:rfc1123()}, diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 511cd20..963b2f7 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -796,8 +796,10 @@ set_resp_body(Req, State=#state{handler=Handler, handler_state=HandlerState, {Body, Req6, HandlerState} -> State5 = State4#state{handler_state=HandlerState}, Req7 = case Body of - {stream, Len, Fun1} -> - cowboy_req:set_resp_body_fun(Len, Fun1, Req6); + {stream, StreamFun} -> + cowboy_req:set_resp_body_fun(StreamFun, Req6); + {stream, Len, StreamFun} -> + cowboy_req:set_resp_body_fun(Len, StreamFun, Req6); _Contents -> cowboy_req:set_resp_body(Body, Req6) end, diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index cd29c87..7ce0835 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -68,6 +68,7 @@ -export([static_test_file/1]). -export([static_test_file_css/1]). -export([stream_body_set_resp/1]). +-export([stream_body_set_resp_close/1]). -export([te_chunked/1]). -export([te_chunked_delayed/1]). -export([te_identity/1]). @@ -117,6 +118,7 @@ groups() -> static_test_file, static_test_file_css, stream_body_set_resp, + stream_body_set_resp_close, te_chunked, te_chunked_delayed, te_identity @@ -235,6 +237,10 @@ init_dispatch(Config) -> [{body, <<"A flameless dance does not equal a cycle">>}]}, {[<<"stream_body">>, <<"set_resp">>], http_handler_stream_body, [{reply, set_resp}, {body, <<"stream_body_set_resp">>}]}, + {[<<"stream_body">>, <<"set_resp_close">>], + http_handler_stream_body, [ + {reply, set_resp_close}, + {body, <<"stream_body_set_resp_close">>}]}, {[<<"static">>, '...'], cowboy_static, [{directory, ?config(static_dir, Config)}, {mimetypes, [{<<".css">>, [<<"text/css">>]}]}]}, @@ -892,6 +898,22 @@ stream_body_set_resp(Config) -> {ok, <<"stream_body_set_resp">>, _} = cowboy_client:response_body(Client3). +stream_body_set_resp_close(Config) -> + Client = ?config(client, Config), + {ok, Client2} = cowboy_client:request(<<"GET">>, + build_url("/stream_body/set_resp_close", Config), Client), + {ok, 200, _, Client3} = cowboy_client:response(Client2), + {ok, Transport, Socket} = cowboy_client:transport(Client3), + case element(7, Client3) of + <<"stream_body_set_resp_close">> -> + ok; + Buffer -> + {ok, Rest} = Transport:recv(Socket, 26 - size(Buffer), 1000), + <<"stream_body_set_resp_close">> = << Buffer/binary, Rest/binary >>, + ok + end, + {error, closed} = Transport:recv(Socket, 0, 1000). + te_chunked(Config) -> Client = ?config(client, Config), Body = list_to_binary(io_lib:format("~p", [lists:seq(1, 100)])), diff --git a/test/http_handler_stream_body.erl b/test/http_handler_stream_body.erl index 15870a8..d8f7d91 100644 --- a/test/http_handler_stream_body.erl +++ b/test/http_handler_stream_body.erl @@ -12,10 +12,15 @@ init({_Transport, http}, Req, Opts) -> Reply = proplists:get_value(reply, Opts), {ok, Req, #state{headers=Headers, body=Body, reply=Reply}}. -handle(Req, State=#state{headers=_Headers, body=Body, reply=set_resp}) -> +handle(Req, State=#state{headers=_Headers, body=Body, reply=Reply}) -> SFun = fun(Socket, Transport) -> Transport:send(Socket, Body) end, - SLen = iolist_size(Body), - Req2 = cowboy_req:set_resp_body_fun(SLen, SFun, Req), + Req2 = case Reply of + set_resp -> + SLen = iolist_size(Body), + cowboy_req:set_resp_body_fun(SLen, SFun, Req); + set_resp_close -> + cowboy_req:set_resp_body_fun(SFun, Req) + end, {ok, Req3} = cowboy_req:reply(200, Req2), {ok, Req3, State}. -- cgit v1.2.3