From 9a2d35c2e800ee73c27b6d6cc324453c5219f715 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 30 May 2013 20:21:01 +0200 Subject: Add experimental and incomplete SPDY support The SPDY connection processes are also supervisors. Missing: * sendfile support * request body reading support --- src/cowboy_spdy.erl | 544 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 544 insertions(+) create mode 100644 src/cowboy_spdy.erl (limited to 'src/cowboy_spdy.erl') diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl new file mode 100644 index 0000000..ba02706 --- /dev/null +++ b/src/cowboy_spdy.erl @@ -0,0 +1,544 @@ +%% Copyright (c) 2013, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @doc SPDY protocol handler. +%% +%% The available options are: +%%
+%%
+%% +%% Note that there is no need to monitor these processes when using Cowboy as +%% an application as it already supervises them under the listener supervisor. +-module(cowboy_spdy). + +%% API. +-export([start_link/4]). + +%% Internal. +-export([init/5]). +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). + +%% Internal request process. +-export([request_init/9]). +-export([resume/5]). +-export([reply/4]). +-export([stream_reply/3]). +-export([stream_data/2]). +-export([stream_close/1]). + +%% Internal transport functions. +-export([name/0]). +-export([send/2]). + +-record(child, { + streamid :: non_neg_integer(), + pid :: pid(), + input = nofin :: fin | nofin, + output = nofin :: fin | nofin +}). + +-record(state, { + parent = undefined :: pid(), + socket, + transport, + buffer = <<>> :: binary(), + middlewares, + env, + onrequest, + onresponse, + peer, + zdef, + zinf, + last_streamid = 0 :: non_neg_integer(), + children = [] :: [#child{}] +}). + +-record(special_headers, { + method, + path, + version, + host, + scheme %% @todo We don't use it. +}). + +-type opts() :: []. +-export_type([opts/0]). + +-include("cowboy_spdy.hrl"). + +%% API. + +%% @doc Start a SPDY protocol process. +-spec start_link(any(), inet:socket(), module(), any()) -> {ok, pid()}. +start_link(Ref, Socket, Transport, Opts) -> + proc_lib:start_link(?MODULE, init, + [self(), Ref, Socket, Transport, Opts]). + +%% Internal. + +%% @doc Faster alternative to proplists:get_value/3. +%% @private +get_value(Key, Opts, Default) -> + case lists:keyfind(Key, 1, Opts) of + {_, Value} -> Value; + _ -> Default + end. + +%% @private +-spec init(pid(), ranch:ref(), inet:socket(), module(), opts()) -> ok. +init(Parent, Ref, Socket, Transport, Opts) -> + process_flag(trap_exit, true), + ok = proc_lib:init_ack(Parent, {ok, self()}), + {ok, Peer} = Transport:peername(Socket), + Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]), + Env = [{listener, Ref}|get_value(env, Opts, [])], + OnRequest = get_value(onrequest, Opts, undefined), + OnResponse = get_value(onresponse, Opts, undefined), + Zdef = zlib:open(), + ok = zlib:deflateInit(Zdef), + _ = zlib:deflateSetDictionary(Zdef, ?ZDICT), + Zinf = zlib:open(), + ok = zlib:inflateInit(Zinf), + ok = ranch:accept_ack(Ref), + loop(#state{parent=Parent, socket=Socket, transport=Transport, + middlewares=Middlewares, env=Env, onrequest=OnRequest, + onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}). + +loop(State=#state{parent=Parent, socket=Socket, transport=Transport, + buffer=Buffer, children=Children}) -> + {OK, Closed, Error} = Transport:messages(), + Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + Data2 = << Buffer/binary, Data/binary >>, + case Data2 of + << _:40, Length:24, _/bits >> + when byte_size(Data2) >= Length + 8 -> + Length2 = Length + 8, + << Frame:Length2/binary, Rest/bits >> = Data2, + control_frame(State#state{buffer=Rest}, Frame); + Rest -> + loop(State#state{buffer=Rest}) + end; + {Closed, Socket} -> + terminate(State); + {Error, Socket, _Reason} -> + terminate(State); + {reply, {Pid, StreamID}, Status, Headers} + when Pid =:= self() -> + Child = #child{output=nofin} = lists:keyfind(StreamID, + #child.streamid, Children), + syn_reply(State, fin, StreamID, Status, Headers), + Children2 = lists:keyreplace(StreamID, + #child.streamid, Children, Child#child{output=fin}), + loop(State#state{children=Children2}); + {reply, {Pid, StreamID}, Status, Headers, Body} + when Pid =:= self() -> + Child = #child{output=nofin} = lists:keyfind(StreamID, + #child.streamid, Children), + syn_reply(State, nofin, StreamID, Status, Headers), + data(State, fin, StreamID, Body), + Children2 = lists:keyreplace(StreamID, + #child.streamid, Children, Child#child{output=fin}), + loop(State#state{children=Children2}); + {stream_reply, {Pid, StreamID}, Status, Headers} + when Pid =:= self() -> + #child{output=nofin} = lists:keyfind(StreamID, + #child.streamid, Children), + syn_reply(State, nofin, StreamID, Status, Headers), + loop(State); + {stream_data, {Pid, StreamID}, Data} + when Pid =:= self() -> + #child{output=nofin} = lists:keyfind(StreamID, + #child.streamid, Children), + data(State, nofin, StreamID, Data), + loop(State); + {stream_close, {Pid, StreamID}} + when Pid =:= self() -> + Child = #child{output=nofin} = lists:keyfind(StreamID, + #child.streamid, Children), + data(State, fin, StreamID), + Children2 = lists:keyreplace(StreamID, + #child.streamid, Children, Child#child{output=fin}), + loop(State#state{children=Children2}); + {'EXIT', Parent, Reason} -> + exit(Reason); + {'EXIT', Pid, _} -> + Children2 = lists:keydelete(Pid, #child.pid, Children), + loop(State#state{children=Children2}); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); + %% Calls from the supervisor module. + {'$gen_call', {To, Tag}, which_children} -> + Children = [{?MODULE, Pid, worker, [?MODULE]} + || #child{pid=Pid} <- Children], + To ! {Tag, Children}, + loop(State); + {'$gen_call', {To, Tag}, count_children} -> + NbChildren = length(Children), + Counts = [{specs, 1}, {active, NbChildren}, + {supervisors, 0}, {workers, NbChildren}], + To ! {Tag, Counts}, + loop(State); + {'$gen_call', {To, Tag}, _} -> + To ! {Tag, {error, ?MODULE}}, + loop(State) + after 60000 -> + goaway(State, ok), + terminate(State) + end. + +system_continue(_, _, State) -> + loop(State). + +-spec system_terminate(any(), _, _, _) -> no_return(). +system_terminate(Reason, _, _, _) -> + exit(Reason). + +system_code_change(Misc, _, _, _) -> + {ok, Misc}. + +%% We do not support SYN_STREAM with FLAG_UNIDIRECTIONAL set. +control_frame(State, << _:38, 1:1, _:26, StreamID:31, _/bits >>) -> + rst_stream(State, StreamID, internal_error), + loop(State); +%% We do not support Associated-To-Stream-ID and CREDENTIAL Slot. +control_frame(State, << _:65, StreamID:31, _:1, AssocToStreamID:31, + _:8, Slot:8, _/bits >>) when AssocToStreamID =/= 0; Slot =/= 0 -> + rst_stream(State, StreamID, internal_error), + loop(State); +%% SYN_STREAM +%% +%% Erlang does not allow us to control the priority of processes +%% so we ignore that value entirely. +control_frame(State=#state{middlewares=Middlewares, env=Env, + onrequest=OnRequest, onresponse=OnResponse, peer=Peer, + zinf=Zinf, children=Children}, + << 1:1, 3:15, 1:16, Flags:8, _:25, StreamID:31, + _:32, _Priority:3, _:13, Rest/bits >>) -> + IsFin = case Flags of + 1 -> fin; + 0 -> nofin + end, + [<< NbHeaders:32, Rest2/bits >>] = try + zlib:inflate(Zinf, Rest) + catch _:_ -> + ok = zlib:inflateSetDictionary(Zinf, ?ZDICT), + zlib:inflate(Zinf, <<>>) + end, + case syn_stream_headers(Rest2, NbHeaders, [], #special_headers{}) of + {ok, Headers, Special} -> + Pid = spawn_link(?MODULE, request_init, + [self(), StreamID, Peer, Headers, + OnRequest, OnResponse, Env, Middlewares, Special]), + loop(State#state{last_streamid=StreamID, + children=[#child{streamid=StreamID, pid=Pid, + input=IsFin, output=nofin}|Children]}); + {error, special} -> + rst_stream(State, StreamID, protocol_error), + loop(State#state{last_streamid=StreamID}) + end; +%% SYN_REPLY +control_frame(State, << 1:1, 3:15, 2:16, _/bits >>) -> + error_logger:error_msg("Ignored SYN_REPLY control frame~n"), + loop(State); +%% RST_STREAM +control_frame(State, << 1:1, 3:15, 3:16, _Flags:8, _Length:24, + _:1, _StreamID:31, StatusCode:32 >>) -> + Status = case StatusCode of + 1 -> protocol_error; + 2 -> invalid_stream; + 3 -> refused_stream; + 4 -> unsupported_version; + 5 -> cancel; + 6 -> internal_error; + 7 -> flow_control_error; + 8 -> stream_in_use; + 9 -> stream_already_closed; + 10 -> invalid_credentials; + 11 -> frame_too_large + end, + error_logger:error_msg("Received RST_STREAM control frame: ~p~n", [Status]), + %% @todo Stop StreamID. + loop(State); +%% SETTINGS +control_frame(State, << 1:1, 3:15, 4:16, 0:8, _:24, + NbEntries:32, Rest/bits >>) -> + Settings = [begin + Name = case ID of + 1 -> upload_bandwidth; + 2 -> download_bandwidth; + 3 -> round_trip_time; + 4 -> max_concurrent_streams; + 5 -> current_cwnd; + 6 -> download_retrans_rate; + 7 -> initial_window_size; + 8 -> client_certificate_vector_size + end, + {Flags, Name, Value} + end || << Flags:8, ID:24, Value:32 >> <= Rest], + if + NbEntries =/= length(Settings) -> + goaway(State, protocol_error), + terminate(State); + true -> + error_logger:error_msg("Ignored SETTINGS control frame: ~p~n", + [Settings]), + loop(State) + end; +%% PING initiated by the server; ignore, we don't send any +control_frame(State, << 1:1, 3:15, 6:16, 0:8, 4:24, PingID:32 >>) + when PingID rem 2 =:= 0 -> + error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]), + loop(State); +%% PING initiated by the client; send it back +control_frame(State=#state{socket=Socket, transport=Transport}, + Data = << 1:1, 3:15, 6:16, 0:8, 4:24, _:32 >>) -> + Transport:send(Socket, Data), + loop(State); +%% GOAWAY +control_frame(State, << 1:1, 3:15, 7:16, _/bits >>) -> + error_logger:error_msg("Ignored GOAWAY control frame~n"), + loop(State); +%% HEADERS +control_frame(State, << 1:1, 3:15, 8:16, _/bits >>) -> + error_logger:error_msg("Ignored HEADERS control frame~n"), + loop(State); +%% WINDOW_UPDATE +control_frame(State, << 1:1, 3:15, 9:16, 0:8, _/bits >>) -> + error_logger:error_msg("Ignored WINDOW_UPDATE control frame~n"), + loop(State); +%% CREDENTIAL +control_frame(State, << 1:1, 3:15, 10:16, _/bits >>) -> + error_logger:error_msg("Ignored CREDENTIAL control frame~n"), + loop(State); +%% ??? +control_frame(State, _) -> + goaway(State, protocol_error), + terminate(State). + +%% @todo We must wait for the children to finish here, +%% but only up to N milliseconds. Then we shutdown. +terminate(_State) -> + ok. + +syn_stream_headers(<<>>, 0, Acc, Special=#special_headers{ + method=Method, path=Path, version=Version, host=Host, scheme=Scheme}) -> + if + Method =:= undefined; Path =:= undefined; Version =:= undefined; + Host =:= undefined; Scheme =:= undefined -> + {error, special}; + true -> + {ok, lists:reverse(Acc), Special} + end; +syn_stream_headers(<< NameLen:32, Rest/bits >>, NbHeaders, Acc, Special) -> + << Name:NameLen/binary, ValueLen:32, Rest2/bits >> = Rest, + << Value:ValueLen/binary, Rest3/bits >> = Rest2, + case Name of + <<":host">> -> + syn_stream_headers(Rest3, NbHeaders - 1, + [{<<"host">>, Value}|Acc], + Special#special_headers{host=Value}); + <<":method">> -> + syn_stream_headers(Rest3, NbHeaders - 1, Acc, + Special#special_headers{method=Value}); + <<":path">> -> + syn_stream_headers(Rest3, NbHeaders - 1, Acc, + Special#special_headers{path=Value}); + <<":version">> -> + syn_stream_headers(Rest3, NbHeaders - 1, Acc, + Special#special_headers{version=Value}); + <<":scheme">> -> + syn_stream_headers(Rest3, NbHeaders - 1, Acc, + Special#special_headers{scheme=Value}); + _ -> + syn_stream_headers(Rest3, NbHeaders - 1, + [{Name, Value}|Acc], Special) + end. + +syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef}, + IsFin, StreamID, Status, Headers) -> + Headers2 = [{<<":status">>, Status}, + {<<":version">>, <<"HTTP/1.1">>}|Headers], + NbHeaders = length(Headers2), + HeaderBlock = [begin + NameLen = byte_size(Name), + ValueLen = iolist_size(Value), + [<< NameLen:32, Name/binary, ValueLen:32 >>, Value] + end || {Name, Value} <- Headers2], + HeaderBlock2 = [<< NbHeaders:32 >>, HeaderBlock], + HeaderBlock3 = zlib:deflate(Zdef, HeaderBlock2, full), + Flags = case IsFin of + fin -> 1; + nofin -> 0 + end, + Len = 4 + iolist_size(HeaderBlock3), + Transport:send(Socket, [ + << 1:1, 3:15, 2:16, Flags:8, Len:24, 0:1, StreamID:31 >>, + HeaderBlock3]). + +rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) -> + StatusCode = case Status of + protocol_error -> 1; +%% invalid_stream -> 2; +%% refused_stream -> 3; +%% unsupported_version -> 4; +%% cancel -> 5; + internal_error -> 6 +%% flow_control_error -> 7; +%% stream_in_use -> 8; +%% stream_already_closed -> 9; +%% invalid_credentials -> 10; +%% frame_too_large -> 11 + end, + Transport:send(Socket, << 1:1, 3:15, 3:16, 0:8, 8:24, + 0:1, StreamID:31, StatusCode:32 >>). + +goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID}, + Status) -> + StatusCode = case Status of + ok -> 0; + protocol_error -> 1 +%% internal_error -> 2 + end, + Transport:send(Socket, << 1:1, 3:15, 7:16, 0:8, 8:24, + 0:1, LastStreamID:31, StatusCode:32 >>). + +data(#state{socket=Socket, transport=Transport}, fin, StreamID) -> + Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>). + +data(#state{socket=Socket, transport=Transport}, IsFin, StreamID, Data) -> + Flags = case IsFin of + fin -> 1; + nofin -> 0 + end, + Len = iolist_size(Data), + Transport:send(Socket, [ + << 0:1, StreamID:31, Flags:8, Len:24 >>, + Data]). + +%% Request process. + +request_init(Parent, StreamID, Peer, + Headers, OnRequest, OnResponse, Env, Middlewares, + #special_headers{method=Method, path=Path, version=Version, + host=Host}) -> + Version2 = parse_version(Version), + {Host2, Port} = cowboy_protocol:parse_host(Host, <<>>), + {Path2, Query} = parse_path(Path, <<>>), + Req = cowboy_req:new({Parent, StreamID}, ?MODULE, Peer, + Method, Path2, Query, Version2, Headers, + Host2, Port, <<>>, true, false, OnResponse), + case OnRequest of + undefined -> + execute(Req, Env, Middlewares); + _ -> + Req2 = OnRequest(Req), + case cowboy_req:get(resp_state, Req2) of + waiting -> execute(Req2, Env, Middlewares); + _ -> ok + end + end. + +parse_version(<<"HTTP/1.1">>) -> + 'HTTP/1.1'; +parse_version(<<"HTTP/1.0">>) -> + 'HTTP/1.0'. + +parse_path(<<>>, Path) -> + {Path, <<>>}; +parse_path(<< $?, Rest/binary >>, Path) -> + parse_query(Rest, Path, <<>>); +parse_path(<< C, Rest/binary >>, SoFar) -> + parse_path(Rest, << SoFar/binary, C >>). + +parse_query(<<>>, Path, Query) -> + {Path, Query}; +parse_query(<< C, Rest/binary >>, Path, SoFar) -> + parse_query(Rest, Path, << SoFar/binary, C >>). + +-spec execute(cowboy_req:req(), cowboy_middleware:env(), [module()]) + -> ok. +execute(Req, _, []) -> + cowboy_req:ensure_response(Req, 204); +execute(Req, Env, [Middleware|Tail]) -> + case Middleware:execute(Req, Env) of + {ok, Req2, Env2} -> + execute(Req2, Env2, Tail); + {suspend, Module, Function, Args} -> + erlang:hibernate(?MODULE, resume, + [Env, Tail, Module, Function, Args]); + {halt, Req2} -> + cowboy_req:ensure_response(Req2, 204); + {error, Code, Req2} -> + error_terminate(Code, Req2) + end. + +%% @private +-spec resume(cowboy_middleware:env(), [module()], + module(), module(), [any()]) -> ok. +resume(Env, Tail, Module, Function, Args) -> + case apply(Module, Function, Args) of + {ok, Req2, Env2} -> + execute(Req2, Env2, Tail); + {suspend, Module2, Function2, Args2} -> + erlang:hibernate(?MODULE, resume, + [Env, Tail, Module2, Function2, Args2]); + {halt, Req2} -> + cowboy_req:ensure_response(Req2, 204); + {error, Code, Req2} -> + error_terminate(Code, Req2) + end. + +%% Only send an error reply if there is no resp_sent message. +-spec error_terminate(cowboy:http_status(), cowboy_req:req()) -> ok. +error_terminate(Code, Req) -> + receive + {cowboy_req, resp_sent} -> ok + after 0 -> + _ = cowboy_req:reply(Code, Req), + ok + end. + +%% Reply functions used by cowboy_req. + +reply(Socket = {Pid, _}, Status, Headers, Body) -> + _ = case iolist_size(Body) of + 0 -> Pid ! {reply, Socket, Status, Headers}; + _ -> Pid ! {reply, Socket, Status, Headers, Body} + end, + ok. + +stream_reply(Socket = {Pid, _}, Status, Headers) -> + _ = Pid ! {stream_reply, Socket, Status, Headers}, + ok. + +stream_data(Socket = {Pid, _}, Data) -> + _ = Pid ! {stream_data, Socket, Data}, + ok. + +stream_close(Socket = {Pid, _}) -> + _ = Pid ! {stream_close, Socket}, + ok. + +%% Internal transport functions. +%% @todo recv, sendfile + +name() -> + spdy. + +send(Socket, Data) -> + stream_data(Socket, Data). -- cgit v1.2.3