aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_spdy.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_spdy.erl')
-rw-r--r--src/cowboy_spdy.erl359
1 files changed, 119 insertions, 240 deletions
diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl
index cc4d867..425a422 100644
--- a/src/cowboy_spdy.erl
+++ b/src/cowboy_spdy.erl
@@ -32,7 +32,7 @@
-export([system_code_change/4]).
%% Internal request process.
--export([request_init/9]).
+-export([request_init/11]).
-export([resume/5]).
-export([reply/4]).
-export([stream_reply/3]).
@@ -41,6 +41,7 @@
%% Internal transport functions.
-export([name/0]).
+-export([recv/3]).
-export([send/2]).
-export([sendfile/2]).
@@ -48,6 +49,8 @@
streamid :: non_neg_integer(),
pid :: pid(),
input = nofin :: fin | nofin,
+ in_buffer = <<>> :: binary(),
+ is_recv = false :: {true, non_neg_integer()} | false,
output = nofin :: fin | nofin
}).
@@ -67,19 +70,9 @@
children = [] :: [#child{}]
}).
--record(special_headers, {
- method,
- path,
- version,
- host,
- scheme %% @todo We don't use it.
-}).
-
-type opts() :: [].
-export_type([opts/0]).
--include("cowboy_spdy.hrl").
-
%% API.
%% @doc Start a SPDY protocol process.
@@ -108,41 +101,59 @@ init(Parent, Ref, Socket, Transport, Opts) ->
Env = [{listener, Ref}|get_value(env, Opts, [])],
OnRequest = get_value(onrequest, Opts, undefined),
OnResponse = get_value(onresponse, Opts, undefined),
- Zdef = zlib:open(),
- ok = zlib:deflateInit(Zdef),
- _ = zlib:deflateSetDictionary(Zdef, ?ZDICT),
- Zinf = zlib:open(),
- ok = zlib:inflateInit(Zinf),
+ Zdef = cow_spdy:deflate_init(),
+ Zinf = cow_spdy:inflate_init(),
ok = ranch:accept_ack(Ref),
loop(#state{parent=Parent, socket=Socket, transport=Transport,
middlewares=Middlewares, env=Env, onrequest=OnRequest,
onresponse=OnResponse, peer=Peer, zdef=Zdef, zinf=Zinf}).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
- buffer=Buffer, children=Children}) ->
+ buffer=Buffer, zinf=Zinf, children=Children}) ->
{OK, Closed, Error} = Transport:messages(),
Transport:setopts(Socket, [{active, once}]),
receive
{OK, Socket, Data} ->
Data2 = << Buffer/binary, Data/binary >>,
- case Data2 of
- << _:40, Length:24, _/bits >>
- when byte_size(Data2) >= Length + 8 ->
- Length2 = Length + 8,
- << Frame:Length2/binary, Rest/bits >> = Data2,
- control_frame(State#state{buffer=Rest}, Frame);
- Rest ->
- loop(State#state{buffer=Rest})
+ case cow_spdy:split(Data2) of
+ {true, Frame, Rest} ->
+ P = cow_spdy:parse(Frame, Zinf),
+ handle_frame(State#state{buffer=Rest}, P);
+ false ->
+ loop(State#state{buffer=Data2})
end;
{Closed, Socket} ->
terminate(State);
{Error, Socket, _Reason} ->
terminate(State);
+ %% @todo Timeout (send a message to self).
+ {recv, FromSocket = {Pid, StreamID}, FromPid, Length, _Timeout}
+ when Pid =:= self() ->
+ Child = #child{in_buffer=InBuffer, is_recv=false}
+ = lists:keyfind(StreamID, #child.streamid, Children),
+ if
+ Length =:= 0, InBuffer =/= <<>> ->
+ FromPid ! {recv, FromSocket, {ok, InBuffer}},
+ Children2 = lists:keyreplace(StreamID, #child.streamid,
+ Children, Child#child{in_buffer= <<>>}),
+ loop(State#state{children=Children2});
+ byte_size(InBuffer) >= Length ->
+ << Data:Length/binary, Rest/binary >> = InBuffer,
+ FromPid ! {recv, FromSocket, {ok, Data}},
+ Children2 = lists:keyreplace(StreamID, #child.streamid,
+ Children, Child#child{in_buffer=Rest}),
+ loop(State#state{children=Children2});
+ true ->
+ Children2 = lists:keyreplace(StreamID, #child.streamid,
+ Children, Child#child{is_recv=
+ {true, FromSocket, FromPid, Length}}),
+ loop(State#state{children=Children2})
+ end;
{reply, {Pid, StreamID}, Status, Headers}
when Pid =:= self() ->
Child = #child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- syn_reply(State, fin, StreamID, Status, Headers),
+ syn_reply(State, StreamID, true, Status, Headers),
Children2 = lists:keyreplace(StreamID,
#child.streamid, Children, Child#child{output=fin}),
loop(State#state{children=Children2});
@@ -150,8 +161,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
when Pid =:= self() ->
Child = #child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- syn_reply(State, nofin, StreamID, Status, Headers),
- data(State, fin, StreamID, Body),
+ syn_reply(State, StreamID, false, Status, Headers),
+ data(State, StreamID, true, Body),
Children2 = lists:keyreplace(StreamID,
#child.streamid, Children, Child#child{output=fin}),
loop(State#state{children=Children2});
@@ -159,19 +170,19 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
when Pid =:= self() ->
#child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- syn_reply(State, nofin, StreamID, Status, Headers),
+ syn_reply(State, StreamID, false, Status, Headers),
loop(State);
{stream_data, {Pid, StreamID}, Data}
when Pid =:= self() ->
#child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- data(State, nofin, StreamID, Data),
+ data(State, StreamID, false, Data),
loop(State);
{stream_close, {Pid, StreamID}}
when Pid =:= self() ->
Child = #child{output=nofin} = lists:keyfind(StreamID,
#child.streamid, Children),
- data(State, fin, StreamID),
+ data(State, StreamID, true, <<>>),
Children2 = lists:keyreplace(StreamID,
#child.streamid, Children, Child#child{output=fin}),
loop(State#state{children=Children2});
@@ -186,6 +197,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
{'EXIT', Parent, Reason} ->
exit(Reason);
{'EXIT', Pid, _} ->
+ %% @todo Report the error if any.
Children2 = lists:keydelete(Pid, #child.pid, Children),
loop(State#state{children=Children2});
{system, From, Request} ->
@@ -220,231 +232,95 @@ system_terminate(Reason, _, _, _) ->
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
-%% We do not support SYN_STREAM with FLAG_UNIDIRECTIONAL set.
-control_frame(State, << 1:1, 3:15, 1:16, _:6, 1:1, _:26,
- StreamID:31, _/bits >>) ->
- rst_stream(State, StreamID, internal_error),
+%% FLAG_UNIDIRECTIONAL can only be set by the server.
+handle_frame(State, {syn_stream, StreamID, _, _, true,
+ _, _, _, _, _, _, _}) ->
+ rst_stream(State, StreamID, protocol_error),
loop(State);
-%% We do not support Associated-To-Stream-ID and CREDENTIAL Slot.
-control_frame(State, << 1:1, 3:15, 1:16, _:33, StreamID:31, _:1,
- AssocToStreamID:31, _:8, Slot:8, _/bits >>)
- when AssocToStreamID =/= 0; Slot =/= 0 ->
+%% We do not support Associated-To-Stream-ID.
+handle_frame(State, {syn_stream, StreamID, AssocToStreamID,
+ _, _, _, _, _, _, _, _, _}) when AssocToStreamID =/= 0 ->
rst_stream(State, StreamID, internal_error),
loop(State);
-%% SYN_STREAM
+%% SYN_STREAM.
%%
%% Erlang does not allow us to control the priority of processes
%% so we ignore that value entirely.
-control_frame(State=#state{middlewares=Middlewares, env=Env,
+handle_frame(State=#state{middlewares=Middlewares, env=Env,
onrequest=OnRequest, onresponse=OnResponse, peer=Peer,
- zinf=Zinf, children=Children},
- << 1:1, 3:15, 1:16, Flags:8, _:25, StreamID:31,
- _:32, _Priority:3, _:13, Rest/bits >>) ->
- IsFin = case Flags of
- 1 -> fin;
- 0 -> nofin
- end,
- [<< NbHeaders:32, Rest2/bits >>] = try
- zlib:inflate(Zinf, Rest)
- catch _:_ ->
- ok = zlib:inflateSetDictionary(Zinf, ?ZDICT),
- zlib:inflate(Zinf, <<>>)
- end,
- case syn_stream_headers(Rest2, NbHeaders, [], #special_headers{}) of
- {ok, Headers, Special} ->
- Pid = spawn_link(?MODULE, request_init,
- [self(), StreamID, Peer, Headers,
- OnRequest, OnResponse, Env, Middlewares, Special]),
- loop(State#state{last_streamid=StreamID,
- children=[#child{streamid=StreamID, pid=Pid,
- input=IsFin, output=nofin}|Children]});
- {error, badname} ->
- rst_stream(State, StreamID, protocol_error),
- loop(State#state{last_streamid=StreamID});
- {error, special} ->
- rst_stream(State, StreamID, protocol_error),
- loop(State#state{last_streamid=StreamID})
- end;
-%% SYN_REPLY
-control_frame(State, << 1:1, 3:15, 2:16, _/bits >>) ->
- error_logger:error_msg("Ignored SYN_REPLY control frame~n"),
- loop(State);
-%% RST_STREAM
-control_frame(State, << 1:1, 3:15, 3:16, _Flags:8, _Length:24,
- _:1, _StreamID:31, StatusCode:32 >>) ->
- Status = case StatusCode of
- 1 -> protocol_error;
- 2 -> invalid_stream;
- 3 -> refused_stream;
- 4 -> unsupported_version;
- 5 -> cancel;
- 6 -> internal_error;
- 7 -> flow_control_error;
- 8 -> stream_in_use;
- 9 -> stream_already_closed;
- 10 -> invalid_credentials;
- 11 -> frame_too_large
- end,
- error_logger:error_msg("Received RST_STREAM control frame: ~p~n", [Status]),
+ children=Children}, {syn_stream, StreamID, _, IsFin,
+ _, _, Method, _, Host, Path, Version, Headers}) ->
+ Pid = spawn_link(?MODULE, request_init, [
+ {self(), StreamID}, Peer, OnRequest, OnResponse,
+ Env, Middlewares, Method, Host, Path, Version, Headers
+ ]),
+ IsFin2 = if IsFin -> fin; true -> nofin end,
+ loop(State#state{last_streamid=StreamID,
+ children=[#child{streamid=StreamID, pid=Pid,
+ input=IsFin2, output=nofin}|Children]});
+%% RST_STREAM.
+handle_frame(State, {rst_stream, StreamID, Status}) ->
+ error_logger:error_msg("Received RST_STREAM frame ~p ~p",
+ [StreamID, Status]),
%% @todo Stop StreamID.
loop(State);
-%% SETTINGS
-control_frame(State, << 1:1, 3:15, 4:16, 0:8, _:24,
- NbEntries:32, Rest/bits >>) ->
- Settings = [begin
- Name = case ID of
- 1 -> upload_bandwidth;
- 2 -> download_bandwidth;
- 3 -> round_trip_time;
- 4 -> max_concurrent_streams;
- 5 -> current_cwnd;
- 6 -> download_retrans_rate;
- 7 -> initial_window_size;
- 8 -> client_certificate_vector_size
- end,
- {Flags, Name, Value}
- end || << Flags:8, ID:24, Value:32 >> <= Rest],
- if
- NbEntries =/= length(Settings) ->
- goaway(State, protocol_error),
- terminate(State);
- true ->
- error_logger:error_msg("Ignored SETTINGS control frame: ~p~n",
- [Settings]),
- loop(State)
- end;
-%% PING initiated by the server; ignore, we don't send any
-control_frame(State, << 1:1, 3:15, 6:16, 0:8, 4:24, PingID:32 >>)
- when PingID rem 2 =:= 0 ->
+%% PING initiated by the server; ignore, we don't send any.
+handle_frame(State, {ping, PingID}) when PingID rem 2 =:= 0 ->
error_logger:error_msg("Ignored PING control frame: ~p~n", [PingID]),
loop(State);
-%% PING initiated by the client; send it back
-control_frame(State=#state{socket=Socket, transport=Transport},
- Data = << 1:1, 3:15, 6:16, 0:8, 4:24, _:32 >>) ->
- Transport:send(Socket, Data),
- loop(State);
-%% GOAWAY
-control_frame(State, << 1:1, 3:15, 7:16, _/bits >>) ->
- error_logger:error_msg("Ignored GOAWAY control frame~n"),
- loop(State);
-%% HEADERS
-control_frame(State, << 1:1, 3:15, 8:16, _/bits >>) ->
- error_logger:error_msg("Ignored HEADERS control frame~n"),
- loop(State);
-%% WINDOW_UPDATE
-control_frame(State, << 1:1, 3:15, 9:16, 0:8, _/bits >>) ->
- error_logger:error_msg("Ignored WINDOW_UPDATE control frame~n"),
+%% PING initiated by the client; send it back.
+handle_frame(State=#state{socket=Socket, transport=Transport},
+ {ping, PingID}) ->
+ Transport:send(Socket, cow_spdy:ping(PingID)),
loop(State);
-%% CREDENTIAL
-control_frame(State, << 1:1, 3:15, 10:16, _/bits >>) ->
- error_logger:error_msg("Ignored CREDENTIAL control frame~n"),
- loop(State);
-%% ???
-control_frame(State, _) ->
+%% Data received for a stream.
+handle_frame(State=#state{children=Children},
+ {data, StreamID, IsFin, Data}) ->
+ Child = #child{input=nofin, in_buffer=Buffer, is_recv=IsRecv}
+ = lists:keyfind(StreamID, #child.streamid, Children),
+ Data2 = << Buffer/binary, Data/binary >>,
+ IsFin2 = if IsFin -> fin; true -> nofin end,
+ Child2 = case IsRecv of
+ {true, FromSocket, FromPid, 0} ->
+ FromPid ! {recv, FromSocket, {ok, Data2}},
+ Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false};
+ {true, FromSocket, FromPid, Length} when byte_size(Data2) >= Length ->
+ << Data3:Length/binary, Rest/binary >> = Data2,
+ FromPid ! {recv, FromSocket, {ok, Data3}},
+ Child#child{input=IsFin2, in_buffer=Rest, is_recv=false};
+ _ ->
+ Child#child{input=IsFin2, in_buffer=Data2}
+ end,
+ Children2 = lists:keyreplace(StreamID, #child.streamid, Children, Child2),
+ loop(State#state{children=Children2});
+%% General error, can't recover.
+handle_frame(State, {error, badprotocol}) ->
goaway(State, protocol_error),
- terminate(State).
+ terminate(State);
+%% Ignore all other frames for now.
+handle_frame(State, Frame) ->
+ error_logger:error_msg("Ignored frame ~p", [Frame]),
+ loop(State).
%% @todo We must wait for the children to finish here,
%% but only up to N milliseconds. Then we shutdown.
terminate(_State) ->
ok.
-syn_stream_headers(<<>>, 0, Acc, Special=#special_headers{
- method=Method, path=Path, version=Version, host=Host, scheme=Scheme}) ->
- if
- Method =:= undefined; Path =:= undefined; Version =:= undefined;
- Host =:= undefined; Scheme =:= undefined ->
- {error, special};
- true ->
- {ok, lists:reverse(Acc), Special}
- end;
-syn_stream_headers(<< 0:32, _Rest/bits >>, _NbHeaders, _Acc, _Special) ->
- {error, badname};
-syn_stream_headers(<< NameLen:32, Rest/bits >>, NbHeaders, Acc, Special) ->
- << Name:NameLen/binary, ValueLen:32, Rest2/bits >> = Rest,
- << Value:ValueLen/binary, Rest3/bits >> = Rest2,
- case Name of
- <<":host">> ->
- syn_stream_headers(Rest3, NbHeaders - 1,
- [{<<"host">>, Value}|Acc],
- Special#special_headers{host=Value});
- <<":method">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{method=Value});
- <<":path">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{path=Value});
- <<":version">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{version=Value});
- <<":scheme">> ->
- syn_stream_headers(Rest3, NbHeaders - 1, Acc,
- Special#special_headers{scheme=Value});
- _ ->
- syn_stream_headers(Rest3, NbHeaders - 1,
- [{Name, Value}|Acc], Special)
- end.
-
syn_reply(#state{socket=Socket, transport=Transport, zdef=Zdef},
- IsFin, StreamID, Status, Headers) ->
- Headers2 = [{<<":status">>, Status},
- {<<":version">>, <<"HTTP/1.1">>}|Headers],
- NbHeaders = length(Headers2),
- HeaderBlock = [begin
- NameLen = byte_size(Name),
- ValueLen = iolist_size(Value),
- [<< NameLen:32, Name/binary, ValueLen:32 >>, Value]
- end || {Name, Value} <- Headers2],
- HeaderBlock2 = [<< NbHeaders:32 >>, HeaderBlock],
- HeaderBlock3 = zlib:deflate(Zdef, HeaderBlock2, full),
- Flags = case IsFin of
- fin -> 1;
- nofin -> 0
- end,
- Len = 4 + iolist_size(HeaderBlock3),
- Transport:send(Socket, [
- << 1:1, 3:15, 2:16, Flags:8, Len:24, 0:1, StreamID:31 >>,
- HeaderBlock3]).
+ StreamID, IsFin, Status, Headers) ->
+ Transport:send(Socket, cow_spdy:syn_reply(Zdef, StreamID, IsFin,
+ Status, <<"HTTP/1.1">>, Headers)).
rst_stream(#state{socket=Socket, transport=Transport}, StreamID, Status) ->
- StatusCode = case Status of
- protocol_error -> 1;
-%% invalid_stream -> 2;
-%% refused_stream -> 3;
-%% unsupported_version -> 4;
-%% cancel -> 5;
- internal_error -> 6
-%% flow_control_error -> 7;
-%% stream_in_use -> 8;
-%% stream_already_closed -> 9;
-%% invalid_credentials -> 10;
-%% frame_too_large -> 11
- end,
- Transport:send(Socket, << 1:1, 3:15, 3:16, 0:8, 8:24,
- 0:1, StreamID:31, StatusCode:32 >>).
+ Transport:send(Socket, cow_spdy:rst_stream(StreamID, Status)).
goaway(#state{socket=Socket, transport=Transport, last_streamid=LastStreamID},
Status) ->
- StatusCode = case Status of
- ok -> 0;
- protocol_error -> 1
-%% internal_error -> 2
- end,
- Transport:send(Socket, << 1:1, 3:15, 7:16, 0:8, 8:24,
- 0:1, LastStreamID:31, StatusCode:32 >>).
+ Transport:send(Socket, cow_spdy:goaway(LastStreamID, Status)).
-data(#state{socket=Socket, transport=Transport}, fin, StreamID) ->
- Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>).
-
-data(#state{socket=Socket, transport=Transport}, IsFin, StreamID, Data) ->
- Flags = case IsFin of
- fin -> 1;
- nofin -> 0
- end,
- Len = iolist_size(Data),
- Transport:send(Socket, [
- << 0:1, StreamID:31, Flags:8, Len:24 >>,
- Data]).
+data(#state{socket=Socket, transport=Transport}, StreamID, IsFin, Data) ->
+ Transport:send(Socket, cow_spdy:data(StreamID, IsFin, Data)).
data_from_file(#state{socket=Socket, transport=Transport},
StreamID, Filepath) ->
@@ -454,12 +330,10 @@ data_from_file(#state{socket=Socket, transport=Transport},
data_from_file(Socket, Transport, StreamID, IoDevice) ->
case file:read(IoDevice, 16#1fff) of
eof ->
- _ = Transport:send(Socket, << 0:1, StreamID:31, 1:8, 0:24 >>),
+ _ = Transport:send(Socket, cow_spdy:data(StreamID, true, <<>>)),
ok;
{ok, Data} ->
- Len = byte_size(Data),
- Data2 = [<< 0:1, StreamID:31, 0:8, Len:24 >>, Data],
- case Transport:send(Socket, Data2) of
+ case Transport:send(Socket, cow_spdy:data(StreamID, false, Data)) of
ok ->
data_from_file(Socket, Transport, StreamID, IoDevice);
{error, _} ->
@@ -469,14 +343,12 @@ data_from_file(Socket, Transport, StreamID, IoDevice) ->
%% Request process.
-request_init(Parent, StreamID, Peer,
- Headers, OnRequest, OnResponse, Env, Middlewares,
- #special_headers{method=Method, path=Path, version=Version,
- host=Host}) ->
+request_init(FakeSocket, Peer, OnRequest, OnResponse,
+ Env, Middlewares, Method, Host, Path, Version, Headers) ->
Version2 = parse_version(Version),
{Host2, Port} = cowboy_protocol:parse_host(Host, <<>>),
{Path2, Query} = parse_path(Path, <<>>),
- Req = cowboy_req:new({Parent, StreamID}, ?MODULE, Peer,
+ Req = cowboy_req:new(FakeSocket, ?MODULE, Peer,
Method, Path2, Query, Version2, Headers,
Host2, Port, <<>>, true, false, OnResponse),
case OnRequest of
@@ -562,16 +434,23 @@ stream_close(Socket = {Pid, _}) ->
ok.
%% Internal transport functions.
-%% @todo recv
name() ->
spdy.
+recv(Socket = {Pid, _}, Length, Timeout) ->
+ _ = Pid ! {recv, Socket, self(), Length, Timeout},
+ receive
+ {recv, Socket, Ret} ->
+ Ret
+ end.
+
send(Socket, Data) ->
stream_data(Socket, Data).
%% We don't wait for the result of the actual sendfile call,
%% therefore we can't know how much was actually sent.
+%% This isn't a problem as we don't use this value in Cowboy.
sendfile(Socket = {Pid, _}, Filepath) ->
_ = Pid ! {sendfile, Socket, Filepath},
{ok, undefined}.