diff options
author | Loïc Hoguin <[email protected]> | 2013-09-02 19:14:28 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2013-09-02 19:14:28 +0200 |
commit | 9eab26d8353f1546ad8209196c36e42d616f952e (patch) | |
tree | 29dfd120a023582f6814b320ff75ac778a0ea49e /src/cowboy_spdy.erl | |
parent | d68b3de9d96dbe0fab56b78cdfeb699055446746 (diff) | |
download | cowboy-9eab26d8353f1546ad8209196c36e42d616f952e.tar.gz cowboy-9eab26d8353f1546ad8209196c36e42d616f952e.tar.bz2 cowboy-9eab26d8353f1546ad8209196c36e42d616f952e.zip |
Add request body support for SPDY
And various other improvements following the addition of two tests.
New dependency cowlib that will gradually receive most of the parse
code from SPDY but also HTTP and its headers.
Diffstat (limited to 'src/cowboy_spdy.erl')
-rw-r--r-- | src/cowboy_spdy.erl | 359 |
1 files changed, 119 insertions, 240 deletions
diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl index cc4d867..425a422 100644 --- a/src/cowboy_spdy.erl +++ b/src/cowboy_spdy.erl @@ -32,7 +32,7 @@ -export([system_code_change/4]). %% Internal request process. --export([request_init/9]). +-export([request_init/11]). -export([resume/5]). -export([reply/4]). -export([stream_reply/3]). @@ -41,6 +41,7 @@ %% Internal transport functions. -export([name/0]). +-export([recv/3]). -export([send/2]). -export([sendfile/2]). @@ -48,6 +49,8 @@ streamid :: non_neg_integer(), pid :: pid(), input = nofin :: fin | nofin, + in_buffer = <<>> :: binary(), + is_recv = false :: {true, non_neg_integer()} | false, output = nofin :: fin | nofin }). @@ -67,19 +70,9 @@ children = [] :: [#child{}] }). --record(special_headers, { - method, - path, - version, - host, - scheme %% @todo We don't use it. -}). - -type opts() :: []. -export_type([opts/0]). --include("cowboy_spdy.hrl"). - %% API. %% @doc Start a SPDY protocol process. @@ -108,41 +101,59 @@ init(Parent, Ref, Socket, Transport, Opts) -> Env = [{listener, Ref}|get_value(env, Opts, [])], OnRequest = get_value(onrequest, Opts, undefined), OnResponse = get_value(onresponse, Opts, undefined), - Zdef = zlib:open(), - ok = zlib:deflateInit(Zdef), - _ = zlib:deflateSetDictionary(Zdef, ?ZDICT), - Zinf = zlib:open(), - ok = zlib:inflateInit(Zinf), + Zdef = cow_spdy:deflate_init(), + Zinf = cow_spdy:inflate_init(), ok = ranch:accept_ack(Ref), loop(#state{parent=Parent, socket=Socket, transport=Transport, middlewares=Middlewares, env=Env, onrequest=OnRequest, onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, - buffer=Buffer, children=Children}) -> + buffer=Buffer, zinf=Zinf, children=Children}) -> {OK, Closed, Error} = Transport:messages(), Transport:setopts(Socket, [{active, once}]), receive {OK, Socket, Data} -> Data2 = << Buffer/binary, Data/binary >>, - case Data2 of - << _:40, Length:24, _/bits >> - when byte_size(Data2) >= Length + 8 -> - Length2 = Length + 8, - << Frame:Length2/binary, Rest/bits >> = Data2, - control_frame(State#state{buffer=Rest}, Frame); - Rest -> - loop(State#state{buffer=Rest}) + case cow_spdy:split(Data2) of + {true, Frame, Rest} -> + P = cow_spdy:parse(Frame, Zinf), + handle_frame(State#state{buffer=Rest}, P); + false -> + loop(State#state{buffer=Data2}) end; {Closed, Socket} -> terminate(State); {Error, Socket, _Reason} -> terminate(State); + %% @todo Timeout (send a message to self). + {recv, FromSocket = {Pid, StreamID}, FromPid, Length, _Timeout} + when Pid =:= self() -> + Child = #child{in_buffer=InBuffer, is_recv=false} + = lists:keyfind(StreamID, #child.streamid, Children), + if + Length =:= 0, InBuffer =/= <<>> -> + FromPid ! {recv, FromSocket, {ok, InBuffer}}, + Children2 = lists:keyreplace(StreamID, #child.streamid, + Children, Child#child{in_buffer= <<>>}), + loop(State#state{children=Children2}); + byte_size(InBuffer) >= Length -> + << Data:Length/binary, Rest/binary >> = InBuffer, + FromPid ! {recv, FromSocket, {ok, Data}}, + Children2 = lists:keyreplace(StreamID, #child.streamid, + Children, Child#child{in_buffer=Rest}), + loop(State#state{children=Children2}); + true -> + Children2 = lists:keyreplace(StreamID, #child.streamid, + Children, Child#child{is_recv= + {true, FromSocket, FromPid, Length}}), + loop(State#state{children=Children2}) + end; {reply, {Pid, StreamID}, Status, Headers} when Pid =:= self() -> Child = #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - syn_reply(State, fin, StreamID, Status, Headers), + syn_reply(State, StreamID, true, Status, Headers), Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child#child{output=fin}), loop(State#state{children=Children2}); @@ -150,8 +161,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, when Pid =:= self() -> Child = #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - syn_reply(State, nofin, StreamID, Status, Headers), - data(State, fin, StreamID, Body), + syn_reply(State, StreamID, false, Status, Headers), + data(State, StreamID, true, Body), Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child#child{output=fin}), loop(State#state{children=Children2}); @@ -159,19 +170,19 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, when Pid =:= self() -> #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - syn_reply(State, nofin, StreamID, Status, Headers), + syn_reply(State, StreamID, false, Status, Headers), loop(State); {stream_data, {Pid, StreamID}, Data} when Pid =:= self() -> #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - data(State, nofin, StreamID, Data), + data(State, StreamID, false, Data), loop(State); {stream_close, {Pid, StreamID}} when Pid =:= self() -> Child = #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - data(State, fin, StreamID), + data(State, StreamID, true, <<>>), Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child#child{output=fin}), loop(State#state{children=Children2}); @@ -186,6 +197,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, {'EXIT', Parent, Reason} -> exit(Reason); {'EXIT', Pid, _} -> + %% @todo Report the error if any. Children2 = lists:keydelete(Pid, #child.pid, Children), loop(State#state{children=Children2}); {system, From, Request} -> @@ -220,231 +232,95 @@ system_terminate(Reason, _, _, _) -> system_code_change(Misc, _, _, _) -> {ok, Misc}. -%% We do not support SYN_STREAM with FLAG_UNIDIRECTIONAL set. -control_frame(State, << 1:1, 3:15, 1:16, _:6, 1:1, _:26, - StreamID:31, _/bits >>) -> - rst_stream(State, StreamID, internal_error), +%% FLAG_UNIDIRECTIONAL can only be set by the server. +handle_frame(State, {syn_stream, StreamID, _, _, true, + _, _, _, _, _, _, _}) -> + rst_stream(State, StreamID, protocol_error), loop(State); -%% We do not support Associated-To-Stream-ID and CREDENTIAL Slot. -control_frame(State, << 1:1, 3:15, 1:16, _:33, StreamID:31, _:1, - AssocToStreamID:31, _:8, Slot:8, _/bits >>) - when AssocToStreamID =/= 0; Slot =/= 0 -> +%% We do not support Associated-To-Stream-ID. +handle_frame(State, {syn_stream, StreamID, AssocToStreamID, + _, _, _, _, _, _, _, _, _}) when AssocToStreamID =/= 0 -> rst_stream(State, StreamID, internal_error), loop(State); -%% SYN_STREAM +%% SYN_STREAM. %% %% Erlang does not allow us to control the priority of processes %% so we ignore that value entirely. -control_frame(State=#state{middlewares=Middlewares, env=Env, +handle_frame(State=#state{middlewares=Middlewares, env=Env, onrequest=OnRequest, onresponse=OnResponse, peer=Peer, - zinf=Zinf, children=Children}, - << 1:1, 3:15, 1:16, Flags:8, _:25, StreamID:31, - _:32, _Priority:3, _:13, Rest/bits >>) -> - IsFin = case Flags of - 1 -> fin; - 0 -> nofin - end, - [<< NbHeaders:32, Rest2/bits >>] = try - zlib:inflate(Zinf, Rest) - catch _:_ -> - ok = zlib:inflateSetDictionary(Zinf, ?ZDICT), - zlib:inflate(Zinf, <<>>) - end, - case syn_stream_headers(Rest2, NbHeaders, [], #special_headers{}) of - {ok, Headers, Special} -> - Pid = spawn_link(?MODULE, request_init, - [self(), StreamID, Peer, Headers, - OnRequest, OnResponse, Env, Middlewares, Special]), - loop(State#state{last_streamid=StreamID, - children=[#child{streamid=StreamID, pid=Pid, - input=IsFin, output=nofin}|Children]}); - {error, badname} -> - rst_stream(State, StreamID, protocol_error), - loop(State#state{last_streamid=StreamID}); - {error, special} -> - rst_stream(State, StreamID, protocol_error), - loop(State#state{last_streamid=StreamID}) - end; -%% SYN_REPLY -control_frame(State, << 1:1, 3:15, 2:16, _/bits >>) -> - error_logger:error_msg("Ignored SYN_REPLY control frame~n"), - loop(State); -%% RST_STREAM -control_frame(State, << 1:1, 3:15, 3:16, _Flags:8, _Length:24, - _:1, _StreamID:31, StatusCode:32 >>) -> - Status = case StatusCode of - 1 -> protocol_error; - 2 -> invalid_stream; - 3 -> refused_stream; - 4 -> unsupported_version; - 5 -> cancel; - 6 -> internal_error; - 7 -> flow_control_error; - 8 -> stream_in_use; - 9 -> stream_already_closed; - 10 -> invalid_credentials; - 11 -> frame_too_large - end, - error_logger:error_msg("Received RST_STREAM control frame: ~p~n", [Status]), + children=Children}, {syn_stream, StreamID, _, IsFin, + _, _, Method, _, Host, Path, Version, Headers}) -> + Pid = spawn_link(?MODULE, request_init, [ + {self(), StreamID}, Peer, OnRequest, OnResponse, + Env, Middlewares, Method, Host, Path, Version, Headers + ]), + IsFin2 = if IsFin -> fin; true -> nofin end, + loop(State#state{last_streamid=StreamID, + children=[#child{streamid=StreamID, pid=Pid, + input=IsFin2, output=nofin}|Children]}); +%% RST_STREAM. +handle_frame(State, {rst_stream, StreamID, Status}) -> + error_logger:error_msg("Received RST_STREAM frame ~p ~p", + [StreamID, Status]), %% @todo Stop StreamID. loop(State); -%% SETTINGS -control_frame(State, << 1:1, 3:15, 4:16, 0:8, _:24, - NbEntries:32, Rest/bits >>) -> - Settings = [begin - Name = case ID of - 1 -> upload_bandwidth; - 2 -> download_bandwidth; - 3 -> round_trip_time; - 4 -> max_concurrent_streams; - 5 -> current_cwnd; - 6 -> download_retrans_rate; - 7 -> initial_window_size; - 8 -> client_certificate_vector_size - end, - {Flags, Name, Value} - end || << Flags:8, ID:24, Value:32 >> <= Rest], - if - NbEntries =/= length(Settings) -> - goaway(State, protocol_error), - terminate(State); - true -> - error_logger:error_msg("Ignored SETTINGS control frame: ~p~n", - [Settings]), - loop(State) - end; -%% PING initiated by the server; ignore, we don't send any -control_frame(State, << 1:1, 3:15, 6:16, 0:8, 4:24, PingID:32 >>) - when PingID rem 2 =:= 0 -> +%% PING initiated by the server; ignore, we don't send any. +handle_frame(State, {ping, PingID}) when PingID rem 2 =:= 0 -> error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]), loop(State); -%% PING initiated by the client; send it back -control_frame(State=#state{socket=Socket, transport=Transport}, - Data = << 1:1, 3:15, 6:16, 0:8, 4:24, _:32 >>) -> - Transport:send(Socket, Data), - loop(State); -%% GOAWAY -control_frame(State, << 1:1, 3:15, 7:16, _/bits >>) -> - error_logger:error_msg("Ignored GOAWAY control frame~n"), - loop(State); -%% HEADERS -control_frame(State, << 1:1, 3:15, 8:16, _/bits >>) -> - error_logger:error_msg("Ignored HEADERS control frame~n"), - loop(State); -%% WINDOW_UPDATE -control_frame(State, << 1:1, 3:15, 9:16, 0:8, _/bits >>) -> - error_logger:error_msg("Ignored WINDOW_UPDATE control frame~n"), +%% PING initiated by the client; send it back. +handle_frame(State=#state{socket=Socket, transport=Transport}, + {ping, PingID}) -> + Transport:send(Socket, cow_spdy:ping(PingID)), loop(State); -%% CREDENTIAL -control_frame(State, << 1:1, 3:15, 10:16, _/bits >>) -> - error_logger:error_msg("Ignored CREDENTIAL control frame~n"), - loop(State); -%% ??? -control_frame(State, _) -> +%% Data received for a stream. +handle_frame(State=#state{children=Children}, + {data, StreamID, IsFin, Data}) -> + Child = #child{input=nofin, in_buffer=Buffer, is_recv=IsRecv} + = lists:keyfind(StreamID, #child.streamid, Children), + Data2 = << Buffer/binary, Data/binary >>, + IsFin2 = if IsFin -> fin; true -> nofin end, + Child2 = case IsRecv of + {true, FromSocket, FromPid, 0} -> + FromPid ! {recv, FromSocket, {ok, Data2}}, + Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false}; + {true, FromSocket, FromPid, Length} when byte_size(Data2) >= Length -> + << Data3:Length/binary, Rest/binary >> = Data2, + FromPid ! {recv, FromSocket, {ok, Data3}}, + Child#child{input=IsFin2, in_buffer=Rest, is_recv=false}; + _ -> + Child#child{input=IsFin2, in_buffer=Data2} + end, + Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child2), + loop(State#state{children=Children2}); +%% General error, can't recover. +handle_frame(State, {error, badprotocol}) -> goaway(State, protocol_error), - terminate(State). + terminate(State); +%% Ignore all other frames for now. +handle_frame(State, Frame) -> + error_logger:error_msg("Ignored frame ~p", [Frame]), + loop(State). %% @todo We must wait for the children to finish here, %% but only up to N milliseconds. Then we shutdown. terminate(_State) -> ok. -syn_stream_headers(<<>>, 0, Acc, Special=#special_headers{ - method=Method, path=Path, version=Version, host=Host, scheme=Scheme}) -> - if - Method =:= undefined; Path =:= undefined; Version =:= undefined; - Host =:= undefined; Scheme =:= undefined -> - {error, special}; - true -> - {ok, lists:reverse(Acc), Special} - end; -syn_stream_headers(<< 0:32, _Rest/bits >>, _NbHeaders, _Acc, _Special) -> - {error, badname}; -syn_stream_headers(<< NameLen:32, Rest/bits >>, NbHeaders, Acc, Special) -> - << Name:NameLen/binary, ValueLen:32, Rest2/bits >> = Rest, - << Value:ValueLen/binary, Rest3/bits >> = Rest2, - case Name of - <<":host">> -> - syn_stream_headers(Rest3, NbHeaders - 1, - [{<<"host">>, Value}|Acc], - Special#special_headers{host=Value}); - <<":method">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{method=Value}); - <<":path">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{path=Value}); - <<":version">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{version=Value}); - <<":scheme">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{scheme=Value}); - _ -> - syn_stream_headers(Rest3, NbHeaders - 1, - [{Name, Value}|Acc], Special) - end. - syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef}, - IsFin, StreamID, Status, Headers) -> - Headers2 = [{<<":status">>, Status}, - {<<":version">>, <<"HTTP/1.1">>}|Headers], - NbHeaders = length(Headers2), - HeaderBlock = [begin - NameLen = byte_size(Name), - ValueLen = iolist_size(Value), - [<< NameLen:32, Name/binary, ValueLen:32 >>, Value] - end || {Name, Value} <- Headers2], - HeaderBlock2 = [<< NbHeaders:32 >>, HeaderBlock], - HeaderBlock3 = zlib:deflate(Zdef, HeaderBlock2, full), - Flags = case IsFin of - fin -> 1; - nofin -> 0 - end, - Len = 4 + iolist_size(HeaderBlock3), - Transport:send(Socket, [ - << 1:1, 3:15, 2:16, Flags:8, Len:24, 0:1, StreamID:31 >>, - HeaderBlock3]). + StreamID, IsFin, Status, Headers) -> + Transport:send(Socket, cow_spdy:syn_reply(Zdef, StreamID, IsFin, + Status, <<"HTTP/1.1">>, Headers)). rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) -> - StatusCode = case Status of - protocol_error -> 1; -%% invalid_stream -> 2; -%% refused_stream -> 3; -%% unsupported_version -> 4; -%% cancel -> 5; - internal_error -> 6 -%% flow_control_error -> 7; -%% stream_in_use -> 8; -%% stream_already_closed -> 9; -%% invalid_credentials -> 10; -%% frame_too_large -> 11 - end, - Transport:send(Socket, << 1:1, 3:15, 3:16, 0:8, 8:24, - 0:1, StreamID:31, StatusCode:32 >>). + Transport:send(Socket, cow_spdy:rst_stream(StreamID, Status)). goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID}, Status) -> - StatusCode = case Status of - ok -> 0; - protocol_error -> 1 -%% internal_error -> 2 - end, - Transport:send(Socket, << 1:1, 3:15, 7:16, 0:8, 8:24, - 0:1, LastStreamID:31, StatusCode:32 >>). + Transport:send(Socket, cow_spdy:goaway(LastStreamID, Status)). -data(#state{socket=Socket, transport=Transport}, fin, StreamID) -> - Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>). - -data(#state{socket=Socket, transport=Transport}, IsFin, StreamID, Data) -> - Flags = case IsFin of - fin -> 1; - nofin -> 0 - end, - Len = iolist_size(Data), - Transport:send(Socket, [ - << 0:1, StreamID:31, Flags:8, Len:24 >>, - Data]). +data(#state{socket=Socket, transport=Transport}, StreamID, IsFin, Data) -> + Transport:send(Socket, cow_spdy:data(StreamID, IsFin, Data)). data_from_file(#state{socket=Socket, transport=Transport}, StreamID, Filepath) -> @@ -454,12 +330,10 @@ data_from_file(#state{socket=Socket, transport=Transport}, data_from_file(Socket, Transport, StreamID, IoDevice) -> case file:read(IoDevice, 16#1fff) of eof -> - _ = Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>), + _ = Transport:send(Socket, cow_spdy:data(StreamID, true, <<>>)), ok; {ok, Data} -> - Len = byte_size(Data), - Data2 = [<< 0:1, StreamID:31, 0:8, Len:24 >>, Data], - case Transport:send(Socket, Data2) of + case Transport:send(Socket, cow_spdy:data(StreamID, false, Data)) of ok -> data_from_file(Socket, Transport, StreamID, IoDevice); {error, _} -> @@ -469,14 +343,12 @@ data_from_file(Socket, Transport, StreamID, IoDevice) -> %% Request process. -request_init(Parent, StreamID, Peer, - Headers, OnRequest, OnResponse, Env, Middlewares, - #special_headers{method=Method, path=Path, version=Version, - host=Host}) -> +request_init(FakeSocket, Peer, OnRequest, OnResponse, + Env, Middlewares, Method, Host, Path, Version, Headers) -> Version2 = parse_version(Version), {Host2, Port} = cowboy_protocol:parse_host(Host, <<>>), {Path2, Query} = parse_path(Path, <<>>), - Req = cowboy_req:new({Parent, StreamID}, ?MODULE, Peer, + Req = cowboy_req:new(FakeSocket, ?MODULE, Peer, Method, Path2, Query, Version2, Headers, Host2, Port, <<>>, true, false, OnResponse), case OnRequest of @@ -562,16 +434,23 @@ stream_close(Socket = {Pid, _}) -> ok. %% Internal transport functions. -%% @todo recv name() -> spdy. +recv(Socket = {Pid, _}, Length, Timeout) -> + _ = Pid ! {recv, Socket, self(), Length, Timeout}, + receive + {recv, Socket, Ret} -> + Ret + end. + send(Socket, Data) -> stream_data(Socket, Data). %% We don't wait for the result of the actual sendfile call, %% therefore we can't know how much was actually sent. +%% This isn't a problem as we don't use this value in Cowboy. sendfile(Socket = {Pid, _}, Filepath) -> _ = Pid ! {sendfile, Socket, Filepath}, {ok, undefined}. |