diff options
Diffstat (limited to 'src/cowboy_spdy.erl')
-rw-r--r-- | src/cowboy_spdy.erl | 359 |
1 files changed, 119 insertions, 240 deletions
diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl index cc4d867..425a422 100644 --- a/src/cowboy_spdy.erl +++ b/src/cowboy_spdy.erl @@ -32,7 +32,7 @@ -export([system_code_change/4]). %% Internal request process. --export([request_init/9]). +-export([request_init/11]). -export([resume/5]). -export([reply/4]). -export([stream_reply/3]). @@ -41,6 +41,7 @@ %% Internal transport functions. -export([name/0]). +-export([recv/3]). -export([send/2]). -export([sendfile/2]). @@ -48,6 +49,8 @@ streamid :: non_neg_integer(), pid :: pid(), input = nofin :: fin | nofin, + in_buffer = <<>> :: binary(), + is_recv = false :: {true, non_neg_integer()} | false, output = nofin :: fin | nofin }). @@ -67,19 +70,9 @@ children = [] :: [#child{}] }). --record(special_headers, { - method, - path, - version, - host, - scheme %% @todo We don't use it. -}). - -type opts() :: []. -export_type([opts/0]). --include("cowboy_spdy.hrl"). - %% API. %% @doc Start a SPDY protocol process. @@ -108,41 +101,59 @@ init(Parent, Ref, Socket, Transport, Opts) -> Env = [{listener, Ref}|get_value(env, Opts, [])], OnRequest = get_value(onrequest, Opts, undefined), OnResponse = get_value(onresponse, Opts, undefined), - Zdef = zlib:open(), - ok = zlib:deflateInit(Zdef), - _ = zlib:deflateSetDictionary(Zdef, ?ZDICT), - Zinf = zlib:open(), - ok = zlib:inflateInit(Zinf), + Zdef = cow_spdy:deflate_init(), + Zinf = cow_spdy:inflate_init(), ok = ranch:accept_ack(Ref), loop(#state{parent=Parent, socket=Socket, transport=Transport, middlewares=Middlewares, env=Env, onrequest=OnRequest, onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, - buffer=Buffer, children=Children}) -> + buffer=Buffer, zinf=Zinf, children=Children}) -> {OK, Closed, Error} = Transport:messages(), Transport:setopts(Socket, [{active, once}]), receive {OK, Socket, Data} -> Data2 = << Buffer/binary, Data/binary >>, - case Data2 of - << _:40, Length:24, _/bits >> - when byte_size(Data2) >= Length + 8 -> - Length2 = Length + 8, - << Frame:Length2/binary, Rest/bits >> = Data2, - control_frame(State#state{buffer=Rest}, Frame); - Rest -> - loop(State#state{buffer=Rest}) + case cow_spdy:split(Data2) of + {true, Frame, Rest} -> + P = cow_spdy:parse(Frame, Zinf), + handle_frame(State#state{buffer=Rest}, P); + false -> + loop(State#state{buffer=Data2}) end; {Closed, Socket} -> terminate(State); {Error, Socket, _Reason} -> terminate(State); + %% @todo Timeout (send a message to self). + {recv, FromSocket = {Pid, StreamID}, FromPid, Length, _Timeout} + when Pid =:= self() -> + Child = #child{in_buffer=InBuffer, is_recv=false} + = lists:keyfind(StreamID, #child.streamid, Children), + if + Length =:= 0, InBuffer =/= <<>> -> + FromPid ! {recv, FromSocket, {ok, InBuffer}}, + Children2 = lists:keyreplace(StreamID, #child.streamid, + Children, Child#child{in_buffer= <<>>}), + loop(State#state{children=Children2}); + byte_size(InBuffer) >= Length -> + << Data:Length/binary, Rest/binary >> = InBuffer, + FromPid ! {recv, FromSocket, {ok, Data}}, + Children2 = lists:keyreplace(StreamID, #child.streamid, + Children, Child#child{in_buffer=Rest}), + loop(State#state{children=Children2}); + true -> + Children2 = lists:keyreplace(StreamID, #child.streamid, + Children, Child#child{is_recv= + {true, FromSocket, FromPid, Length}}), + loop(State#state{children=Children2}) + end; {reply, {Pid, StreamID}, Status, Headers} when Pid =:= self() -> Child = #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - syn_reply(State, fin, StreamID, Status, Headers), + syn_reply(State, StreamID, true, Status, Headers), Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child#child{output=fin}), loop(State#state{children=Children2}); @@ -150,8 +161,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, when Pid =:= self() -> Child = #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - syn_reply(State, nofin, StreamID, Status, Headers), - data(State, fin, StreamID, Body), + syn_reply(State, StreamID, false, Status, Headers), + data(State, StreamID, true, Body), Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child#child{output=fin}), loop(State#state{children=Children2}); @@ -159,19 +170,19 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, when Pid =:= self() -> #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - syn_reply(State, nofin, StreamID, Status, Headers), + syn_reply(State, StreamID, false, Status, Headers), loop(State); {stream_data, {Pid, StreamID}, Data} when Pid =:= self() -> #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - data(State, nofin, StreamID, Data), + data(State, StreamID, false, Data), loop(State); {stream_close, {Pid, StreamID}} when Pid =:= self() -> Child = #child{output=nofin} = lists:keyfind(StreamID, #child.streamid, Children), - data(State, fin, StreamID), + data(State, StreamID, true, <<>>), Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child#child{output=fin}), loop(State#state{children=Children2}); @@ -186,6 +197,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, {'EXIT', Parent, Reason} -> exit(Reason); {'EXIT', Pid, _} -> + %% @todo Report the error if any. Children2 = lists:keydelete(Pid, #child.pid, Children), loop(State#state{children=Children2}); {system, From, Request} -> @@ -220,231 +232,95 @@ system_terminate(Reason, _, _, _) -> system_code_change(Misc, _, _, _) -> {ok, Misc}. -%% We do not support SYN_STREAM with FLAG_UNIDIRECTIONAL set. -control_frame(State, << 1:1, 3:15, 1:16, _:6, 1:1, _:26, - StreamID:31, _/bits >>) -> - rst_stream(State, StreamID, internal_error), +%% FLAG_UNIDIRECTIONAL can only be set by the server. +handle_frame(State, {syn_stream, StreamID, _, _, true, + _, _, _, _, _, _, _}) -> + rst_stream(State, StreamID, protocol_error), loop(State); -%% We do not support Associated-To-Stream-ID and CREDENTIAL Slot. -control_frame(State, << 1:1, 3:15, 1:16, _:33, StreamID:31, _:1, - AssocToStreamID:31, _:8, Slot:8, _/bits >>) - when AssocToStreamID =/= 0; Slot =/= 0 -> +%% We do not support Associated-To-Stream-ID. +handle_frame(State, {syn_stream, StreamID, AssocToStreamID, + _, _, _, _, _, _, _, _, _}) when AssocToStreamID =/= 0 -> rst_stream(State, StreamID, internal_error), loop(State); -%% SYN_STREAM +%% SYN_STREAM. %% %% Erlang does not allow us to control the priority of processes %% so we ignore that value entirely. -control_frame(State=#state{middlewares=Middlewares, env=Env, +handle_frame(State=#state{middlewares=Middlewares, env=Env, onrequest=OnRequest, onresponse=OnResponse, peer=Peer, - zinf=Zinf, children=Children}, - << 1:1, 3:15, 1:16, Flags:8, _:25, StreamID:31, - _:32, _Priority:3, _:13, Rest/bits >>) -> - IsFin = case Flags of - 1 -> fin; - 0 -> nofin - end, - [<< NbHeaders:32, Rest2/bits >>] = try - zlib:inflate(Zinf, Rest) - catch _:_ -> - ok = zlib:inflateSetDictionary(Zinf, ?ZDICT), - zlib:inflate(Zinf, <<>>) - end, - case syn_stream_headers(Rest2, NbHeaders, [], #special_headers{}) of - {ok, Headers, Special} -> - Pid = spawn_link(?MODULE, request_init, - [self(), StreamID, Peer, Headers, - OnRequest, OnResponse, Env, Middlewares, Special]), - loop(State#state{last_streamid=StreamID, - children=[#child{streamid=StreamID, pid=Pid, - input=IsFin, output=nofin}|Children]}); - {error, badname} -> - rst_stream(State, StreamID, protocol_error), - loop(State#state{last_streamid=StreamID}); - {error, special} -> - rst_stream(State, StreamID, protocol_error), - loop(State#state{last_streamid=StreamID}) - end; -%% SYN_REPLY -control_frame(State, << 1:1, 3:15, 2:16, _/bits >>) -> - error_logger:error_msg("Ignored SYN_REPLY control frame~n"), - loop(State); -%% RST_STREAM -control_frame(State, << 1:1, 3:15, 3:16, _Flags:8, _Length:24, - _:1, _StreamID:31, StatusCode:32 >>) -> - Status = case StatusCode of - 1 -> protocol_error; - 2 -> invalid_stream; - 3 -> refused_stream; - 4 -> unsupported_version; - 5 -> cancel; - 6 -> internal_error; - 7 -> flow_control_error; - 8 -> stream_in_use; - 9 -> stream_already_closed; - 10 -> invalid_credentials; - 11 -> frame_too_large - end, - error_logger:error_msg("Received RST_STREAM control frame: ~p~n", [Status]), + children=Children}, {syn_stream, StreamID, _, IsFin, + _, _, Method, _, Host, Path, Version, Headers}) -> + Pid = spawn_link(?MODULE, request_init, [ + {self(), StreamID}, Peer, OnRequest, OnResponse, + Env, Middlewares, Method, Host, Path, Version, Headers + ]), + IsFin2 = if IsFin -> fin; true -> nofin end, + loop(State#state{last_streamid=StreamID, + children=[#child{streamid=StreamID, pid=Pid, + input=IsFin2, output=nofin}|Children]}); +%% RST_STREAM. +handle_frame(State, {rst_stream, StreamID, Status}) -> + error_logger:error_msg("Received RST_STREAM frame ~p ~p", + [StreamID, Status]), %% @todo Stop StreamID. loop(State); -%% SETTINGS -control_frame(State, << 1:1, 3:15, 4:16, 0:8, _:24, - NbEntries:32, Rest/bits >>) -> - Settings = [begin - Name = case ID of - 1 -> upload_bandwidth; - 2 -> download_bandwidth; - 3 -> round_trip_time; - 4 -> max_concurrent_streams; - 5 -> current_cwnd; - 6 -> download_retrans_rate; - 7 -> initial_window_size; - 8 -> client_certificate_vector_size - end, - {Flags, Name, Value} - end || << Flags:8, ID:24, Value:32 >> <= Rest], - if - NbEntries =/= length(Settings) -> - goaway(State, protocol_error), - terminate(State); - true -> - error_logger:error_msg("Ignored SETTINGS control frame: ~p~n", - [Settings]), - loop(State) - end; -%% PING initiated by the server; ignore, we don't send any -control_frame(State, << 1:1, 3:15, 6:16, 0:8, 4:24, PingID:32 >>) - when PingID rem 2 =:= 0 -> +%% PING initiated by the server; ignore, we don't send any. +handle_frame(State, {ping, PingID}) when PingID rem 2 =:= 0 -> error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]), loop(State); -%% PING initiated by the client; send it back -control_frame(State=#state{socket=Socket, transport=Transport}, - Data = << 1:1, 3:15, 6:16, 0:8, 4:24, _:32 >>) -> - Transport:send(Socket, Data), - loop(State); -%% GOAWAY -control_frame(State, << 1:1, 3:15, 7:16, _/bits >>) -> - error_logger:error_msg("Ignored GOAWAY control frame~n"), - loop(State); -%% HEADERS -control_frame(State, << 1:1, 3:15, 8:16, _/bits >>) -> - error_logger:error_msg("Ignored HEADERS control frame~n"), - loop(State); -%% WINDOW_UPDATE -control_frame(State, << 1:1, 3:15, 9:16, 0:8, _/bits >>) -> - error_logger:error_msg("Ignored WINDOW_UPDATE control frame~n"), +%% PING initiated by the client; send it back. +handle_frame(State=#state{socket=Socket, transport=Transport}, + {ping, PingID}) -> + Transport:send(Socket, cow_spdy:ping(PingID)), loop(State); -%% CREDENTIAL -control_frame(State, << 1:1, 3:15, 10:16, _/bits >>) -> - error_logger:error_msg("Ignored CREDENTIAL control frame~n"), - loop(State); -%% ??? -control_frame(State, _) -> +%% Data received for a stream. +handle_frame(State=#state{children=Children}, + {data, StreamID, IsFin, Data}) -> + Child = #child{input=nofin, in_buffer=Buffer, is_recv=IsRecv} + = lists:keyfind(StreamID, #child.streamid, Children), + Data2 = << Buffer/binary, Data/binary >>, + IsFin2 = if IsFin -> fin; true -> nofin end, + Child2 = case IsRecv of + {true, FromSocket, FromPid, 0} -> + FromPid ! {recv, FromSocket, {ok, Data2}}, + Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false}; + {true, FromSocket, FromPid, Length} when byte_size(Data2) >= Length -> + << Data3:Length/binary, Rest/binary >> = Data2, + FromPid ! {recv, FromSocket, {ok, Data3}}, + Child#child{input=IsFin2, in_buffer=Rest, is_recv=false}; + _ -> + Child#child{input=IsFin2, in_buffer=Data2} + end, + Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child2), + loop(State#state{children=Children2}); +%% General error, can't recover. +handle_frame(State, {error, badprotocol}) -> goaway(State, protocol_error), - terminate(State). + terminate(State); +%% Ignore all other frames for now. +handle_frame(State, Frame) -> + error_logger:error_msg("Ignored frame ~p", [Frame]), + loop(State). %% @todo We must wait for the children to finish here, %% but only up to N milliseconds. Then we shutdown. terminate(_State) -> ok. -syn_stream_headers(<<>>, 0, Acc, Special=#special_headers{ - method=Method, path=Path, version=Version, host=Host, scheme=Scheme}) -> - if - Method =:= undefined; Path =:= undefined; Version =:= undefined; - Host =:= undefined; Scheme =:= undefined -> - {error, special}; - true -> - {ok, lists:reverse(Acc), Special} - end; -syn_stream_headers(<< 0:32, _Rest/bits >>, _NbHeaders, _Acc, _Special) -> - {error, badname}; -syn_stream_headers(<< NameLen:32, Rest/bits >>, NbHeaders, Acc, Special) -> - << Name:NameLen/binary, ValueLen:32, Rest2/bits >> = Rest, - << Value:ValueLen/binary, Rest3/bits >> = Rest2, - case Name of - <<":host">> -> - syn_stream_headers(Rest3, NbHeaders - 1, - [{<<"host">>, Value}|Acc], - Special#special_headers{host=Value}); - <<":method">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{method=Value}); - <<":path">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{path=Value}); - <<":version">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{version=Value}); - <<":scheme">> -> - syn_stream_headers(Rest3, NbHeaders - 1, Acc, - Special#special_headers{scheme=Value}); - _ -> - syn_stream_headers(Rest3, NbHeaders - 1, - [{Name, Value}|Acc], Special) - end. - syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef}, - IsFin, StreamID, Status, Headers) -> - Headers2 = [{<<":status">>, Status}, - {<<":version">>, <<"HTTP/1.1">>}|Headers], - NbHeaders = length(Headers2), - HeaderBlock = [begin - NameLen = byte_size(Name), - ValueLen = iolist_size(Value), - [<< NameLen:32, Name/binary, ValueLen:32 >>, Value] - end || {Name, Value} <- Headers2], - HeaderBlock2 = [<< NbHeaders:32 >>, HeaderBlock], - HeaderBlock3 = zlib:deflate(Zdef, HeaderBlock2, full), - Flags = case IsFin of - fin -> 1; - nofin -> 0 - end, - Len = 4 + iolist_size(HeaderBlock3), - Transport:send(Socket, [ - << 1:1, 3:15, 2:16, Flags:8, Len:24, 0:1, StreamID:31 >>, - HeaderBlock3]). + StreamID, IsFin, Status, Headers) -> + Transport:send(Socket, cow_spdy:syn_reply(Zdef, StreamID, IsFin, + Status, <<"HTTP/1.1">>, Headers)). rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) -> - StatusCode = case Status of - protocol_error -> 1; -%% invalid_stream -> 2; -%% refused_stream -> 3; -%% unsupported_version -> 4; -%% cancel -> 5; - internal_error -> 6 -%% flow_control_error -> 7; -%% stream_in_use -> 8; -%% stream_already_closed -> 9; -%% invalid_credentials -> 10; -%% frame_too_large -> 11 - end, - Transport:send(Socket, << 1:1, 3:15, 3:16, 0:8, 8:24, - 0:1, StreamID:31, StatusCode:32 >>). + Transport:send(Socket, cow_spdy:rst_stream(StreamID, Status)). goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID}, Status) -> - StatusCode = case Status of - ok -> 0; - protocol_error -> 1 -%% internal_error -> 2 - end, - Transport:send(Socket, << 1:1, 3:15, 7:16, 0:8, 8:24, - 0:1, LastStreamID:31, StatusCode:32 >>). + Transport:send(Socket, cow_spdy:goaway(LastStreamID, Status)). -data(#state{socket=Socket, transport=Transport}, fin, StreamID) -> - Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>). - -data(#state{socket=Socket, transport=Transport}, IsFin, StreamID, Data) -> - Flags = case IsFin of - fin -> 1; - nofin -> 0 - end, - Len = iolist_size(Data), - Transport:send(Socket, [ - << 0:1, StreamID:31, Flags:8, Len:24 >>, - Data]). +data(#state{socket=Socket, transport=Transport}, StreamID, IsFin, Data) -> + Transport:send(Socket, cow_spdy:data(StreamID, IsFin, Data)). data_from_file(#state{socket=Socket, transport=Transport}, StreamID, Filepath) -> @@ -454,12 +330,10 @@ data_from_file(#state{socket=Socket, transport=Transport}, data_from_file(Socket, Transport, StreamID, IoDevice) -> case file:read(IoDevice, 16#1fff) of eof -> - _ = Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>), + _ = Transport:send(Socket, cow_spdy:data(StreamID, true, <<>>)), ok; {ok, Data} -> - Len = byte_size(Data), - Data2 = [<< 0:1, StreamID:31, 0:8, Len:24 >>, Data], - case Transport:send(Socket, Data2) of + case Transport:send(Socket, cow_spdy:data(StreamID, false, Data)) of ok -> data_from_file(Socket, Transport, StreamID, IoDevice); {error, _} -> @@ -469,14 +343,12 @@ data_from_file(Socket, Transport, StreamID, IoDevice) -> %% Request process. -request_init(Parent, StreamID, Peer, - Headers, OnRequest, OnResponse, Env, Middlewares, - #special_headers{method=Method, path=Path, version=Version, - host=Host}) -> +request_init(FakeSocket, Peer, OnRequest, OnResponse, + Env, Middlewares, Method, Host, Path, Version, Headers) -> Version2 = parse_version(Version), {Host2, Port} = cowboy_protocol:parse_host(Host, <<>>), {Path2, Query} = parse_path(Path, <<>>), - Req = cowboy_req:new({Parent, StreamID}, ?MODULE, Peer, + Req = cowboy_req:new(FakeSocket, ?MODULE, Peer, Method, Path2, Query, Version2, Headers, Host2, Port, <<>>, true, false, OnResponse), case OnRequest of @@ -562,16 +434,23 @@ stream_close(Socket = {Pid, _}) -> ok. %% Internal transport functions. -%% @todo recv name() -> spdy. +recv(Socket = {Pid, _}, Length, Timeout) -> + _ = Pid ! {recv, Socket, self(), Length, Timeout}, + receive + {recv, Socket, Ret} -> + Ret + end. + send(Socket, Data) -> stream_data(Socket, Data). %% We don't wait for the result of the actual sendfile call, %% therefore we can't know how much was actually sent. +%% This isn't a problem as we don't use this value in Cowboy. sendfile(Socket = {Pid, _}, Filepath) -> _ = Pid ! {sendfile, Socket, Filepath}, {ok, undefined}. |