aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_spdy.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_spdy.erl')
-rw-r--r--src/cowboy_spdy.erl82
1 files changed, 63 insertions, 19 deletions
diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl
index dd7882c..ce75419 100644
--- a/src/cowboy_spdy.erl
+++ b/src/cowboy_spdy.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2013, Loïc Hoguin <[email protected]>
+%% Copyright (c) 2013-2014, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -12,10 +12,6 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-%% @doc SPDY protocol handler.
-%%
-%% Note that there is no need to monitor these processes when using Cowboy as
-%% an application as it already supervises them under the listener supervisor.
-module(cowboy_spdy).
%% API.
@@ -37,17 +33,22 @@
%% Internal transport functions.
-export([name/0]).
+-export([messages/0]).
-export([recv/3]).
-export([send/2]).
-export([sendfile/2]).
+-export([setopts/2]).
+
+-type streamid() :: non_neg_integer().
+-type socket() :: {pid(), streamid()}.
-record(child, {
- streamid :: non_neg_integer(),
+ streamid :: streamid(),
pid :: pid(),
input = nofin :: fin | nofin,
in_buffer = <<>> :: binary(),
- is_recv = false :: {true, {non_neg_integer(), pid()},
- pid(), non_neg_integer(), reference()} | false,
+ is_recv = false :: false | {active, socket(), pid()}
+ | {passive, socket(), pid(), non_neg_integer(), reference()},
output = nofin :: fin | nofin
}).
@@ -63,7 +64,7 @@
peer,
zdef,
zinf,
- last_streamid = 0 :: non_neg_integer(),
+ last_streamid = 0 :: streamid(),
children = [] :: [#child{}]
}).
@@ -75,7 +76,6 @@
%% API.
-%% @doc Start a SPDY protocol process.
-spec start_link(any(), inet:socket(), module(), any()) -> {ok, pid()}.
start_link(Ref, Socket, Transport, Opts) ->
proc_lib:start_link(?MODULE, init,
@@ -83,15 +83,13 @@ start_link(Ref, Socket, Transport, Opts) ->
%% Internal.
-%% @doc Faster alternative to proplists:get_value/3.
-%% @private
+%% Faster alternative to proplists:get_value/3.
get_value(Key, Opts, Default) ->
case lists:keyfind(Key, 1, Opts) of
{_, Value} -> Value;
_ -> Default
end.
-%% @private
-spec init(pid(), ranch:ref(), inet:socket(), module(), opts()) -> ok.
init(Parent, Ref, Socket, Transport, Opts) ->
process_flag(trap_exit, true),
@@ -142,15 +140,15 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
TRef = erlang:send_after(Timeout, self(),
{recv_timeout, FromSocket}),
loop(replace_child(Child#child{
- is_recv={true, FromSocket, FromPid, Length, TRef}},
+ is_recv={passive, FromSocket, FromPid, Length, TRef}},
State))
end;
{recv_timeout, {Pid, StreamID}}
when Pid =:= self() ->
- Child = #child{is_recv={true, FromSocket, FromPid, _, _}}
+ Child = #child{is_recv={passive, FromSocket, FromPid, _, _}}
= get_child(StreamID, State),
FromPid ! {recv, FromSocket, {error, timeout}},
- loop(replace_child(Child#child{is_recv=false}, State));
+ loop(replace_child(Child#child{is_recv=passive}, State));
{reply, {Pid, StreamID}, Status, Headers}
when Pid =:= self() ->
Child = #child{output=nofin} = get_child(StreamID, State),
@@ -182,6 +180,22 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
Child = #child{output=nofin} = get_child(StreamID, State),
data_from_file(State, StreamID, Filepath),
loop(replace_child(Child#child{output=fin}, State));
+ {active, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() ->
+ Child = #child{in_buffer=InBuffer, is_recv=false}
+ = get_child(StreamID, State),
+ case InBuffer of
+ <<>> ->
+ loop(replace_child(Child#child{
+ is_recv={active, FromSocket, FromPid}}, State));
+ _ ->
+ FromPid ! {spdy, FromSocket, InBuffer},
+ loop(replace_child(Child#child{in_buffer= <<>>}, State))
+ end;
+ {passive, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() ->
+ Child = #child{is_recv=IsRecv} = get_child(StreamID, State),
+ %% Make sure we aren't in the middle of a recv call.
+ case IsRecv of false -> ok; {active, FromSocket, FromPid} -> ok end,
+ loop(replace_child(Child#child{is_recv=false}, State));
{'EXIT', Parent, Reason} ->
exit(Reason);
{'EXIT', Pid, _} ->
@@ -209,6 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
terminate(State)
end.
+-spec system_continue(_, _, #state{}) -> ok.
system_continue(_, _, State) ->
loop(State).
@@ -216,6 +231,7 @@ system_continue(_, _, State) ->
system_terminate(Reason, _, _, _) ->
exit(Reason).
+-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::#state{}.
system_code_change(Misc, _, _, _) ->
{ok, Misc}.
@@ -264,11 +280,14 @@ handle_frame(State, {data, StreamID, IsFin, Data}) ->
Data2 = << Buffer/binary, Data/binary >>,
IsFin2 = if IsFin -> fin; true -> nofin end,
Child2 = case IsRecv of
- {true, FromSocket, FromPid, 0, TRef} ->
+ {active, FromSocket, FromPid} ->
+ FromPid ! {spdy, FromSocket, Data},
+ Child#child{input=IsFin2, is_recv=false};
+ {passive, FromSocket, FromPid, 0, TRef} ->
FromPid ! {recv, FromSocket, {ok, Data2}},
cancel_recv_timeout(StreamID, TRef),
Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false};
- {true, FromSocket, FromPid, Length, TRef}
+ {passive, FromSocket, FromPid, Length, TRef}
when byte_size(Data2) >= Length ->
<< Data3:Length/binary, Rest/binary >> = Data2,
FromPid ! {recv, FromSocket, {ok, Data3}},
@@ -358,6 +377,11 @@ delete_child(Pid, State=#state{children=Children}) ->
%% Request process.
+-spec request_init(socket(), {inet:ip_address(), inet:port_number()},
+ cowboy:onrequest_fun(), cowboy:onresponse_fun(),
+ cowboy_middleware:env(), [module()],
+ binary(), binary(), binary(), binary(), [{binary(), binary()}])
+ -> ok.
request_init(FakeSocket, Peer, OnRequest, OnResponse,
Env, Middlewares, Method, Host, Path, Version, Headers) ->
{Host2, Port} = cow_http:parse_fullhost(Host),
@@ -394,7 +418,6 @@ execute(Req, Env, [Middleware|Tail]) ->
cowboy_req:maybe_reply(Status, Req2)
end.
-%% @private
-spec resume(cowboy_middleware:env(), [module()],
module(), module(), [any()]) -> ok.
resume(Env, Tail, Module, Function, Args) ->
@@ -412,6 +435,7 @@ resume(Env, Tail, Module, Function, Args) ->
%% Reply functions used by cowboy_req.
+-spec reply(socket(), binary(), cowboy:http_headers(), iodata()) -> ok.
reply(Socket = {Pid, _}, Status, Headers, Body) ->
_ = case iolist_size(Body) of
0 -> Pid ! {reply, Socket, Status, Headers};
@@ -419,23 +443,33 @@ reply(Socket = {Pid, _}, Status, Headers, Body) ->
end,
ok.
+-spec stream_reply(socket(), binary(), cowboy:http_headers()) -> ok.
stream_reply(Socket = {Pid, _}, Status, Headers) ->
_ = Pid ! {stream_reply, Socket, Status, Headers},
ok.
+-spec stream_data(socket(), iodata()) -> ok.
stream_data(Socket = {Pid, _}, Data) ->
_ = Pid ! {stream_data, Socket, Data},
ok.
+-spec stream_close(socket()) -> ok.
stream_close(Socket = {Pid, _}) ->
_ = Pid ! {stream_close, Socket},
ok.
%% Internal transport functions.
+-spec name() -> spdy.
name() ->
spdy.
+-spec messages() -> {spdy, spdy_closed, spdy_error}.
+messages() ->
+ {spdy, spdy_closed, spdy_error}.
+
+-spec recv(socket(), non_neg_integer(), timeout())
+ -> {ok, binary()} | {error, timeout}.
recv(Socket = {Pid, _}, Length, Timeout) ->
_ = Pid ! {recv, Socket, self(), Length, Timeout},
receive
@@ -443,12 +477,22 @@ recv(Socket = {Pid, _}, Length, Timeout) ->
Ret
end.
+-spec send(socket(), iodata()) -> ok.
send(Socket, Data) ->
stream_data(Socket, Data).
%% We don't wait for the result of the actual sendfile call,
%% therefore we can't know how much was actually sent.
%% This isn't a problem as we don't use this value in Cowboy.
+-spec sendfile(socket(), file:name_all()) -> {ok, undefined}.
sendfile(Socket = {Pid, _}, Filepath) ->
_ = Pid ! {sendfile, Socket, Filepath},
{ok, undefined}.
+
+-spec setopts(inet:socket(), list()) -> ok.
+setopts(Socket = {Pid, _}, [{active, once}]) ->
+ _ = Pid ! {active, Socket, self()},
+ ok;
+setopts(Socket = {Pid, _}, [{active, false}]) ->
+ _ = Pid ! {passive, Socket, self()},
+ ok.