Loïc Hoguin <[email protected]>2013-09-02 19:14:28 +0200
committerLoïc Hoguin <[email protected]>2013-09-02 19:14:28 +0200
commit9eab26d8353f1546ad8209196c36e42d616f952e (patch)
tree29dfd120a023582f6814b320ff75ac778a0ea49e /src/cowboy_spdy.erl
parentd68b3de9d96dbe0fab56b78cdfeb699055446746 (diff)
Add request body support for SPDY
And various other improvements following the addition of two tests. New dependency cowlib that will gradually receive most of the parse code from SPDY but also HTTP and its headers.
1 files changed, 119 insertions, 240 deletions
diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl
index cc4d867..425a422 100644
--- a/src/cowboy_spdy.erl
+++ b/src/cowboy_spdy.erl
@@ -32,7 +32,7 @@
%% Internal request process.
@@ -41,6 +41,7 @@
%% Internal transport functions.
@@ -48,6 +49,8 @@
streamid :: non_neg_integer(),
pid :: pid(),
input = nofin :: fin | nofin,
+ in_buffer = <<>> :: binary(),
+ is_recv = false :: {true, non_neg_integer()} | false,
output = nofin :: fin | nofin
@@ -67,19 +70,9 @@
children = [] :: [#child{}]
--record(special_headers, {
- method,
- path,
- version,
- host,
- scheme %% @todo We don't use it.
-type opts() :: [].
%% API.
%% @doc Start a SPDY protocol process.
@@ -108,41 +101,59 @@ init(Parent, Ref, Socket, Transport, Opts) ->
Env = [{listener, Ref}|get_value(env, Opts, [])],
OnRequest = get_value(onrequest, Opts, undefined),
OnResponse = get_value(onresponse, Opts, undefined),
- Zdef = zlib:open(),
- ok = zlib:deflateInit(Zdef),
- _ = zlib:deflateSetDictionary(Zdef, ?ZDICT),
- Zinf = zlib:open(),
- ok = zlib:inflateInit(Zinf),
+ Zdef = cow_spdy:deflate_init(),
+ Zinf = cow_spdy:inflate_init(),
ok = ranch:accept_ack(Ref),
loop(#state{parent=Parent, socket=Socket, transport=Transport,
middlewares=Middlewares, env=Env, onrequest=OnRequest,
onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
- buffer=Buffer, children=Children}) ->
+ buffer=Buffer, zinf=Zinf, children=Children}) ->
{OK, Closed, Error} = Transport:messages(),
Transport:setopts(Socket, [{active, once}]),
{OK, Socket, Data} ->
Data2 = << Buffer/binary, Data/binary >>,
- case Data2 of
- << _:40, Length:24, _/bits >>
- when byte_size(Data2) >= Length + 8 ->
- Length2 = Length + 8,
- << Frame:Length2/binary, Rest/bits >> = Data2,
- control_frame(State#state{buffer=Rest}, Frame);
- Rest ->
- loop(State#state{buffer=Rest})
+ case cow_spdy:split(Data2) of
+ {true, Frame, Rest} ->
+ P = cow_spdy:parse(Frame, Zinf),
+ handle_frame(State#state{buffer=Rest}, P);
+ false ->
+ loop(State#state{buffer=Data2})
{Closed, Socket} ->
{Error, Socket, _Reason} ->
+ %% @todo Timeout (send a message to self).
+ {recv, FromSocket = {Pid, StreamID}, FromPid, Length, _Timeout}
+ when Pid =:= self() ->
+ Child = #child{in_buffer=InBuffer, is_recv=false}
+ = lists:keyfind(StreamID, #child.streamid, Children),
+ if
+ Length =:= 0, InBuffer =/= <<>> ->
+ FromPid ! {recv, FromSocket, {ok, InBuffer}},
+ Children2 = lists:keyreplace(StreamID, #child.streamid,
+ Children, Child#child{in_buffer= <<>>}),
+ loop(State#state{children=Children2});
+ byte_size(InBuffer) >= Length ->
+ << Data:Length/binary, Rest/binary >> = InBuffer,
+ FromPid ! {recv, FromSocket, {ok, Data}},
+ Children2 = lists:keyreplace(StreamID, #child.streamid,
+ Children, Child#child{in_buffer=Rest}),
+ loop(State#state{children=Children2});
+ true ->
+ Children2 = lists:keyreplace(StreamID, #child.streamid,
+ Children, Child#child{is_recv=
+ {true, FromSocket, FromPid, Length}}),
+ loop(State#state{children=Children2})
+ end;
{reply, {Pid, StreamID}, Status, Headers}
when Pid =:= self() ->
Child = #child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- syn_reply(State, fin, StreamID, Status, Headers),
+ syn_reply(State, StreamID, true, Status, Headers),
Children2 = lists:keyreplace(StreamID,
#child.streamid, Children, Child#child{output=fin}),
@@ -150,8 +161,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
when Pid =:= self() ->
Child = #child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- syn_reply(State, nofin, StreamID, Status, Headers),
- data(State, fin, StreamID, Body),
+ syn_reply(State, StreamID, false, Status, Headers),
+ data(State, StreamID, true, Body),
Children2 = lists:keyreplace(StreamID,
#child.streamid, Children, Child#child{output=fin}),
@@ -159,19 +170,19 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
when Pid =:= self() ->
#child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- syn_reply(State, nofin, StreamID, Status, Headers),
+ syn_reply(State, StreamID, false, Status, Headers),
{stream_data, {Pid, StreamID}, Data}
when Pid =:= self() ->
#child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- data(State, nofin, StreamID, Data),
+ data(State, StreamID, false, Data),
{stream_close, {Pid, StreamID}}
when Pid =:= self() ->
Child = #child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- data(State, fin, StreamID),
+ data(State, StreamID, true, <<>>),
Children2 = lists:keyreplace(StreamID,
#child.streamid, Children, Child#child{output=fin}),
@@ -186,6 +197,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
{'EXIT', Parent, Reason} ->
{'EXIT', Pid, _} ->
+ %% @todo Report the error if any.
Children2 = lists:keydelete(Pid, #child.pid, Children),
{system, From, Request} ->
@@ -220,231 +232,95 @@ system_terminate(Reason, _, _, _) ->
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
-%% We do not support SYN_STREAM with FLAG_UNIDIRECTIONAL set.
-control_frame(State, << 1:1, 3:15, 1:16, _:6, 1:1, _:26,
- StreamID:31, _/bits >>) ->
- rst_stream(State, StreamID, internal_error),
+%% FLAG_UNIDIRECTIONAL can only be set by the server.
+handle_frame(State, {syn_stream, StreamID, _, _, true,
+ _, _, _, _, _, _, _}) ->
+ rst_stream(State, StreamID, protocol_error),
-%% We do not support Associated-To-Stream-ID and CREDENTIAL Slot.
-control_frame(State, << 1:1, 3:15, 1:16, _:33, StreamID:31, _:1,
- AssocToStreamID:31, _:8, Slot:8, _/bits >>)
- when AssocToStreamID =/= 0; Slot =/= 0 ->
+%% We do not support Associated-To-Stream-ID.
+handle_frame(State, {syn_stream, StreamID, AssocToStreamID,
+ _, _, _, _, _, _, _, _, _}) when AssocToStreamID =/= 0 ->
rst_stream(State, StreamID, internal_error),
%% Erlang does not allow us to control the priority of processes
%% so we ignore that value entirely.
-control_frame(State=#state{middlewares=Middlewares, env=Env,
+handle_frame(State=#state{middlewares=Middlewares, env=Env,
onrequest=OnRequest, onresponse=OnResponse, peer=Peer,
- zinf=Zinf, children=Children},
- << 1:1, 3:15, 1:16, Flags:8, _:25, StreamID:31,
- _:32, _Priority:3, _:13, Rest/bits >>) ->
- IsFin = case Flags of
- 1 -> fin;
- 0 -> nofin
- end,
- [<< NbHeaders:32, Rest2/bits >>] = try
- zlib:inflate(Zinf, Rest)
- catch _:_ ->
- ok = zlib:inflateSetDictionary(Zinf, ?ZDICT),
- zlib:inflate(Zinf, <<>>)
- end,
- case syn_stream_headers(Rest2, NbHeaders, [], #special_headers{}) of
- {ok, Headers, Special} ->
- Pid = spawn_link(?MODULE, request_init,
- [self(), StreamID, Peer, Headers,
- OnRequest, OnResponse, Env, Middlewares, Special]),
- loop(State#state{last_streamid=StreamID,
- children=[#child{streamid=StreamID, pid=Pid,
- input=IsFin, output=nofin}|Children]});
- {error, badname} ->
- rst_stream(State, StreamID, protocol_error),
- loop(State#state{last_streamid=StreamID});
- {error, special} ->
- rst_stream(State, StreamID, protocol_error),
- loop(State#state{last_streamid=StreamID})
- end;
-control_frame(State, << 1:1, 3:15, 2:16, _/bits >>) ->
- error_logger:error_msg("Ignored SYN_REPLY control frame~n"),
- loop(State);
-control_frame(State, << 1:1, 3:15, 3:16, _Flags:8, _Length:24,
- _:1, _StreamID:31, StatusCode:32 >>) ->
- Status = case StatusCode of
- 1 -> protocol_error;
- 2 -> invalid_stream;
- 3 -> refused_stream;
- 4 -> unsupported_version;
- 5 -> cancel;
- 6 -> internal_error;
- 7 -> flow_control_error;
- 8 -> stream_in_use;
- 9 -> stream_already_closed;
- 10 -> invalid_credentials;
- 11 -> frame_too_large
- end,
- error_logger:error_msg("Received RST_STREAM control frame: ~p~n", [Status]),
+ children=Children}, {syn_stream, StreamID, _, IsFin,
+ _, _, Method, _, Host, Path, Version, Headers}) ->
+ Pid = spawn_link(?MODULE, request_init, [
+ {self(), StreamID}, Peer, OnRequest, OnResponse,
+ Env, Middlewares, Method, Host, Path, Version, Headers
+ ]),
+ IsFin2 = if IsFin -> fin; true -> nofin end,
+ loop(State#state{last_streamid=StreamID,
+ children=[#child{streamid=StreamID, pid=Pid,
+ input=IsFin2, output=nofin}|Children]});
+handle_frame(State, {rst_stream, StreamID, Status}) ->
+ error_logger:error_msg("Received RST_STREAM frame ~p ~p",
+ [StreamID, Status]),
%% @todo Stop StreamID.
-control_frame(State, << 1:1, 3:15, 4:16, 0:8, _:24,
- NbEntries:32, Rest/bits >>) ->
- Settings = [begin
- Name = case ID of
- 1 -> upload_bandwidth;
- 2 -> download_bandwidth;
- 3 -> round_trip_time;
- 4 -> max_concurrent_streams;
- 5 -> current_cwnd;
- 6 -> download_retrans_rate;
- 7 -> initial_window_size;
- 8 -> client_certificate_vector_size
- end,
- {Flags, Name, Value}
- end || << Flags:8, ID:24, Value:32 >> <= Rest],
- if
- NbEntries =/= length(Settings) ->
- goaway(State, protocol_error),
- terminate(State);
- true ->
- error_logger:error_msg("Ignored SETTINGS control frame: ~p~n",
- [Settings]),
- loop(State)
- end;
-%% PING initiated by the server; ignore, we don't send any
-control_frame(State, << 1:1, 3:15, 6:16, 0:8, 4:24, PingID:32 >>)
- when PingID rem 2 =:= 0 ->
+%% PING initiated by the server; ignore, we don't send any.
+handle_frame(State, {ping, PingID}) when PingID rem 2 =:= 0 ->
error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]),
-%% PING initiated by the client; send it back
-control_frame(State=#state{socket=Socket, transport=Transport},
- Data = << 1:1, 3:15, 6:16, 0:8, 4:24, _:32 >>) ->
- Transport:send(Socket, Data),
- loop(State);
-control_frame(State, << 1:1, 3:15, 7:16, _/bits >>) ->
- error_logger:error_msg("Ignored GOAWAY control frame~n"),
- loop(State);
-control_frame(State, << 1:1, 3:15, 8:16, _/bits >>) ->
- error_logger:error_msg("Ignored HEADERS control frame~n"),
- loop(State);
-control_frame(State, << 1:1, 3:15, 9:16, 0:8, _/bits >>) ->
- error_logger:error_msg("Ignored WINDOW_UPDATE control frame~n"),
+%% PING initiated by the client; send it back.
+handle_frame(State=#state{socket=Socket, transport=Transport},
+ {ping, PingID}) ->
+ Transport:send(Socket, cow_spdy:ping(PingID)),
-control_frame(State, << 1:1, 3:15, 10:16, _/bits >>) ->
- error_logger:error_msg("Ignored CREDENTIAL control frame~n"),
- loop(State);
-%% ???
-control_frame(State, _) ->
+%% Data received for a stream.
+ {data, StreamID, IsFin, Data}) ->
+ Child = #child{input=nofin, in_buffer=Buffer, is_recv=IsRecv}
+ = lists:keyfind(StreamID, #child.streamid, Children),
+ Data2 = << Buffer/binary, Data/binary >>,
+ IsFin2 = if IsFin -> fin; true -> nofin end,
+ Child2 = case IsRecv of
+ {true, FromSocket, FromPid, 0} ->
+ FromPid ! {recv, FromSocket, {ok, Data2}},
+ Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false};
+ {true, FromSocket, FromPid, Length} when byte_size(Data2) >= Length ->
+ << Data3:Length/binary, Rest/binary >> = Data2,
+ FromPid ! {recv, FromSocket, {ok, Data3}},
+ Child#child{input=IsFin2, in_buffer=Rest, is_recv=false};
+ _ ->
+ Child#child{input=IsFin2, in_buffer=Data2}
+ end,
+ Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child2),
+ loop(State#state{children=Children2});
+%% General error, can't recover.
+handle_frame(State, {error, badprotocol}) ->
goaway(State, protocol_error),
- terminate(State).
+ terminate(State);
+%% Ignore all other frames for now.
+handle_frame(State, Frame) ->
+ error_logger:error_msg("Ignored frame ~p", [Frame]),
+ loop(State).
%% @todo We must wait for the children to finish here,
%% but only up to N milliseconds. Then we shutdown.
terminate(_State) ->
-syn_stream_headers(<<>>, 0, Acc, Special=#special_headers{
- method=Method, path=Path, version=Version, host=Host, scheme=Scheme}) ->
- if
- Method =:= undefined; Path =:= undefined; Version =:= undefined;
- Host =:= undefined; Scheme =:= undefined ->
- {error, special};
- true ->
- {ok, lists:reverse(Acc), Special}
- end;
-syn_stream_headers(<< 0:32, _Rest/bits >>, _NbHeaders, _Acc, _Special) ->
- {error, badname};
-syn_stream_headers(<< NameLen:32, Rest/bits >>, NbHeaders, Acc, Special) ->
- << Name:NameLen/binary, ValueLen:32, Rest2/bits >> = Rest,
- << Value:ValueLen/binary, Rest3/bits >> = Rest2,
- case Name of
- <<":host">> ->
- syn_stream_headers(Rest3, NbHeaders - 1,
- [{<<"host">>, Value}|Acc],
- Special#special_headers{host=Value});
- <<":method">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{method=Value});
- <<":path">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{path=Value});
- <<":version">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{version=Value});
- <<":scheme">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{scheme=Value});
- _ ->
- syn_stream_headers(Rest3, NbHeaders - 1,
- [{Name, Value}|Acc], Special)
- end.
syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef},
- IsFin, StreamID, Status, Headers) ->
- Headers2 = [{<<":status">>, Status},
- {<<":version">>, <<"HTTP/1.1">>}|Headers],
- NbHeaders = length(Headers2),
- HeaderBlock = [begin
- NameLen = byte_size(Name),
- ValueLen = iolist_size(Value),
- [<< NameLen:32, Name/binary, ValueLen:32 >>, Value]
- end || {Name, Value} <- Headers2],
- HeaderBlock2 = [<< NbHeaders:32 >>, HeaderBlock],
- HeaderBlock3 = zlib:deflate(Zdef, HeaderBlock2, full),
- Flags = case IsFin of
- fin -> 1;
- nofin -> 0
- end,
- Len = 4 + iolist_size(HeaderBlock3),
- Transport:send(Socket, [
- << 1:1, 3:15, 2:16, Flags:8, Len:24, 0:1, StreamID:31 >>,
- HeaderBlock3]).
+ StreamID, IsFin, Status, Headers) ->
+ Transport:send(Socket, cow_spdy:syn_reply(Zdef, StreamID, IsFin,
+ Status, <<"HTTP/1.1">>, Headers)).
rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) ->
- StatusCode = case Status of
- protocol_error -> 1;
-%% invalid_stream -> 2;
-%% refused_stream -> 3;
-%% unsupported_version -> 4;
-%% cancel -> 5;
- internal_error -> 6
-%% flow_control_error -> 7;
-%% stream_in_use -> 8;
-%% stream_already_closed -> 9;
-%% invalid_credentials -> 10;
-%% frame_too_large -> 11
- end,
- Transport:send(Socket, << 1:1, 3:15, 3:16, 0:8, 8:24,
- 0:1, StreamID:31, StatusCode:32 >>).
+ Transport:send(Socket, cow_spdy:rst_stream(StreamID, Status)).
goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID},
Status) ->
- StatusCode = case Status of
- ok -> 0;
- protocol_error -> 1
-%% internal_error -> 2
- end,
- Transport:send(Socket, << 1:1, 3:15, 7:16, 0:8, 8:24,
- 0:1, LastStreamID:31, StatusCode:32 >>).
+ Transport:send(Socket, cow_spdy:goaway(LastStreamID, Status)).
-data(#state{socket=Socket, transport=Transport}, fin, StreamID) ->
- Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>).
-data(#state{socket=Socket, transport=Transport}, IsFin, StreamID, Data) ->
- Flags = case IsFin of
- fin -> 1;
- nofin -> 0
- end,
- Len = iolist_size(Data),
- Transport:send(Socket, [
- << 0:1, StreamID:31, Flags:8, Len:24 >>,
- Data]).
+data(#state{socket=Socket, transport=Transport}, StreamID, IsFin, Data) ->
+ Transport:send(Socket, cow_spdy:data(StreamID, IsFin, Data)).
data_from_file(#state{socket=Socket, transport=Transport},
StreamID, Filepath) ->
@@ -454,12 +330,10 @@ data_from_file(#state{socket=Socket, transport=Transport},
data_from_file(Socket, Transport, StreamID, IoDevice) ->
case file:read(IoDevice, 16#1fff) of
eof ->
- _ = Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>),
+ _ = Transport:send(Socket, cow_spdy:data(StreamID, true, <<>>)),
{ok, Data} ->
- Len = byte_size(Data),
- Data2 = [<< 0:1, StreamID:31, 0:8, Len:24 >>, Data],
- case Transport:send(Socket, Data2) of
+ case Transport:send(Socket, cow_spdy:data(StreamID, false, Data)) of
ok ->
data_from_file(Socket, Transport, StreamID, IoDevice);
{error, _} ->
@@ -469,14 +343,12 @@ data_from_file(Socket, Transport, StreamID, IoDevice) ->
%% Request process.
-request_init(Parent, StreamID, Peer,
- Headers, OnRequest, OnResponse, Env, Middlewares,
- #special_headers{method=Method, path=Path, version=Version,
- host=Host}) ->
+request_init(FakeSocket, Peer, OnRequest, OnResponse,
+ Env, Middlewares, Method, Host, Path, Version, Headers) ->
Version2 = parse_version(Version),
{Host2, Port} = cowboy_protocol:parse_host(Host, <<>>),
{Path2, Query} = parse_path(Path, <<>>),
- Req = cowboy_req:new({Parent, StreamID}, ?MODULE, Peer,
+ Req = cowboy_req:new(FakeSocket, ?MODULE, Peer,
Method, Path2, Query, Version2, Headers,
Host2, Port, <<>>, true, false, OnResponse),
case OnRequest of
@@ -562,16 +434,23 @@ stream_close(Socket = {Pid, _}) ->
%% Internal transport functions.
-%% @todo recv
name() ->
+recv(Socket = {Pid, _}, Length, Timeout) ->
+ _ = Pid ! {recv, Socket, self(), Length, Timeout},
+ receive
+ {recv, Socket, Ret} ->
+ Ret
+ end.
send(Socket, Data) ->
stream_data(Socket, Data).
%% We don't wait for the result of the actual sendfile call,
%% therefore we can't know how much was actually sent.
+%% This isn't a problem as we don't use this value in Cowboy.
sendfile(Socket = {Pid, _}, Filepath) ->
_ = Pid ! {sendfile, Socket, Filepath},
{ok, undefined}.