From 9a2d35c2e800ee73c27b6d6cc324453c5219f715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 30 May 2013 20:21:01 +0200 Subject: Add experimental and incomplete SPDY support The SPDY connection processes are also supervisors. Missing: * sendfile support * request body reading support --- src/cowboy_req.erl | 111 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 77 insertions(+), 34 deletions(-) (limited to 'src/cowboy_req.erl') diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index d0f2a35..093663c 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -130,13 +130,13 @@ | {done, binary(), non_neg_integer(), binary()} | {error, atom()}). --type resp_body_fun() :: fun((inet:socket(), module()) -> ok). +-type resp_body_fun() :: fun((any(), module()) -> ok). -type send_chunk_fun() :: fun((iodata()) -> ok | {error, atom()}). -type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok). -record(http_req, { %% Transport. - socket = undefined :: undefined | inet:socket(), + socket = undefined :: any(), transport = undefined :: undefined | module(), connection = keepalive :: keepalive | close, @@ -189,7 +189,7 @@ %% %% Since we always need to parse the Connection header, we do it %% in an optimized way and add the parsed value to p_headers' cache. --spec new(inet:socket(), module(), +-spec new(any(), module(), undefined | {inet:ip_address(), inet:port_number()}, binary(), binary(), binary(), cowboy:http_version(), cowboy:http_headers(), binary(), @@ -917,7 +917,7 @@ has_resp_body(#http_req{resp_body={Length, _}}) -> has_resp_body(#http_req{resp_body=RespBody}) -> iolist_size(RespBody) > 0. -%% Remove a header previously set for the response. +%% @doc Remove a header previously set for the response. -spec delete_resp_header(binary(), Req) -> Req when Req::req(). delete_resp_header(Name, Req=#http_req{resp_headers=RespHeaders}) -> @@ -944,20 +944,30 @@ reply(Status, Headers, Body, Req=#http_req{ version=Version, connection=Connection, method=Method, resp_compress=Compress, resp_state=waiting, resp_headers=RespHeaders}) -> - HTTP11Headers = case Version of - 'HTTP/1.1' -> [{<<"connection">>, atom_to_connection(Connection)}]; - _ -> [] + HTTP11Headers = if + Transport =/= cowboy_spdy, Version =:= 'HTTP/1.1' -> + [{<<"connection">>, atom_to_connection(Connection)}]; + true -> + [] end, Req3 = case Body of BodyFun when is_function(BodyFun) -> %% We stream the response body until we close the connection. RespConn = close, - {RespType, Req2} = response(Status, Headers, RespHeaders, [ - {<<"connection">>, <<"close">>}, - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>}, - {<<"transfer-encoding">>, <<"identity">>} - ], <<>>, Req), + {RespType, Req2} = if + Transport =:= cowboy_spdy -> + response(Status, Headers, RespHeaders, [ + {<<"date">>, cowboy_clock:rfc1123()}, + {<<"server">>, <<"Cowboy">>} + ], stream, Req); + true -> + response(Status, Headers, RespHeaders, [ + {<<"connection">>, <<"close">>}, + {<<"date">>, cowboy_clock:rfc1123()}, + {<<"server">>, <<"Cowboy">>}, + {<<"transfer-encoding">>, <<"identity">>} + ], <<>>, Req) + end, if RespType =/= hook, Method =/= <<"HEAD">> -> BodyFun(Socket, Transport); true -> ok @@ -970,13 +980,12 @@ reply(Status, Headers, Body, Req=#http_req{ ChunkFun = fun(IoData) -> chunk(IoData, Req2) end, BodyFun(ChunkFun), %% Terminate the chunked body for HTTP/1.1 only. - _ = case Version of - 'HTTP/1.0' -> ok; - _ -> Transport:send(Socket, <<"0\r\n\r\n">>) + case Version of + 'HTTP/1.0' -> Req2; + _ -> last_chunk(Req2) end; - true -> ok - end, - Req2; + true -> Req2 + end; {ContentLength, BodyFun} -> %% We stream the response body for ContentLength bytes. RespConn = response_connection(Headers, Connection), @@ -984,7 +993,7 @@ reply(Status, Headers, Body, Req=#http_req{ {<<"content-length">>, integer_to_list(ContentLength)}, {<<"date">>, cowboy_clock:rfc1123()}, {<<"server">>, <<"Cowboy">>} - |HTTP11Headers], <<>>, Req), + |HTTP11Headers], stream, Req), if RespType =/= hook, Method =/= <<"HEAD">> -> BodyFun(Socket, Transport); true -> ok @@ -1001,7 +1010,7 @@ reply(Status, Headers, Body, Req=#http_req{ RespHeaders, HTTP11Headers, Method, iolist_size(Body)), Req2#http_req{connection=RespConn} end, - {ok, Req3#http_req{resp_state=done,resp_headers=[], resp_body= <<>>}}. + {ok, Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}}. reply_may_compress(Status, Headers, Body, Req, RespHeaders, HTTP11Headers, Method) -> @@ -1065,18 +1074,34 @@ chunked_reply(Status, Headers, Req) -> -spec chunk(iodata(), req()) -> ok | {error, atom()}. chunk(_Data, #http_req{method= <<"HEAD">>}) -> ok; -chunk(Data, #http_req{socket=Socket, transport=Transport, version='HTTP/1.0'}) -> +chunk(Data, #http_req{socket=Socket, transport=cowboy_spdy, + resp_state=chunks}) -> + cowboy_spdy:stream_data(Socket, Data); +chunk(Data, #http_req{socket=Socket, transport=Transport, + resp_state=chunks, version='HTTP/1.0'}) -> Transport:send(Socket, Data); -chunk(Data, #http_req{socket=Socket, transport=Transport, resp_state=chunks}) -> +chunk(Data, #http_req{socket=Socket, transport=Transport, + resp_state=chunks}) -> Transport:send(Socket, [integer_to_list(iolist_size(Data), 16), <<"\r\n">>, Data, <<"\r\n">>]). +%% @doc Finish the chunked reply. +%% @todo If ever made public, need to send nothing if HEAD. +-spec last_chunk(Req) -> Req when Req::req(). +last_chunk(Req=#http_req{socket=Socket, transport=cowboy_spdy}) -> + _ = cowboy_spdy:stream_close(Socket), + Req#http_req{resp_state=done}; +last_chunk(Req=#http_req{socket=Socket, transport=Transport}) -> + _ = Transport:send(Socket, <<"0\r\n\r\n">>), + Req#http_req{resp_state=done}. + %% @doc Send an upgrade reply. %% @private -spec upgrade_reply(cowboy:http_status(), cowboy:http_headers(), Req) -> {ok, Req} when Req::req(). -upgrade_reply(Status, Headers, Req=#http_req{ - resp_state=waiting, resp_headers=RespHeaders}) -> +upgrade_reply(Status, Headers, Req=#http_req{transport=Transport, + resp_state=waiting, resp_headers=RespHeaders}) + when Transport =/= cowboy_spdy -> {_, Req2} = response(Status, Headers, RespHeaders, [ {<<"connection">>, <<"Upgrade">>} ], <<>>, Req), @@ -1098,9 +1123,8 @@ ensure_response(#http_req{method= <<"HEAD">>, resp_state=chunks}, _) -> ok; ensure_response(#http_req{version='HTTP/1.0', resp_state=chunks}, _) -> ok; -ensure_response(#http_req{socket=Socket, transport=Transport, - resp_state=chunks}, _) -> - Transport:send(Socket, <<"0\r\n\r\n">>), +ensure_response(Req=#http_req{resp_state=chunks}, _) -> + _ = last_chunk(Req), ok. %% Private setter/getter API. @@ -1212,6 +1236,15 @@ to_list(Req) -> -spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) -> {normal | hook, Req} when Req::req(). +chunked_response(Status, Headers, Req=#http_req{ + transport=cowboy_spdy, resp_state=waiting, + resp_headers=RespHeaders}) -> + {RespType, Req2} = response(Status, Headers, RespHeaders, [ + {<<"date">>, cowboy_clock:rfc1123()}, + {<<"server">>, <<"Cowboy">>} + ], stream, Req), + {RespType, Req2#http_req{resp_state=chunks, + resp_headers=[], resp_body= <<>>}}; chunked_response(Status, Headers, Req=#http_req{ version=Version, connection=Connection, resp_state=waiting, resp_headers=RespHeaders}) -> @@ -1230,7 +1263,7 @@ chunked_response(Status, Headers, Req=#http_req{ resp_headers=[], resp_body= <<>>}}. -spec response(cowboy:http_status(), cowboy:http_headers(), - cowboy:http_headers(), cowboy:http_headers(), iodata(), Req) + cowboy:http_headers(), cowboy:http_headers(), stream | iodata(), Req) -> {normal | hook, Req} when Req::req(). response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{ socket=Socket, transport=Transport, version=Version, @@ -1239,22 +1272,32 @@ response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{ already_called -> Headers; _ -> response_merge_headers(Headers, RespHeaders, DefaultHeaders) end, + Body2 = case Body of stream -> <<>>; _ -> Body end, Req2 = case OnResponse of already_called -> Req; undefined -> Req; - OnResponse -> OnResponse(Status, FullHeaders, Body, - %% Don't call 'onresponse' from the hook itself. - Req#http_req{resp_headers=[], resp_body= <<>>, - onresponse=already_called}) + OnResponse -> + OnResponse(Status, FullHeaders, Body2, + %% Don't call 'onresponse' from the hook itself. + Req#http_req{resp_headers=[], resp_body= <<>>, + onresponse=already_called}) end, ReplyType = case Req2#http_req.resp_state of + waiting when Transport =:= cowboy_spdy, Body =:= stream -> + cowboy_spdy:stream_reply(Socket, status(Status), FullHeaders), + ReqPid ! {?MODULE, resp_sent}, + normal; + waiting when Transport =:= cowboy_spdy -> + cowboy_spdy:reply(Socket, status(Status), FullHeaders, Body), + ReqPid ! {?MODULE, resp_sent}, + normal; waiting -> HTTPVer = atom_to_binary(Version, latin1), StatusLine = << HTTPVer/binary, " ", (status(Status))/binary, "\r\n" >>, HeaderLines = [[Key, <<": ">>, Value, <<"\r\n">>] || {Key, Value} <- FullHeaders], - Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>, Body]), + Transport:send(Socket, [StatusLine, HeaderLines, <<"\r\n">>, Body2]), ReqPid ! {?MODULE, resp_sent}, normal; _ -> -- cgit v1.2.3