diff options
Diffstat (limited to 'src/cowboy_spdy.erl')
-rw-r--r-- | src/cowboy_spdy.erl | 82 |
1 files changed, 63 insertions, 19 deletions
diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl index dd7882c..ce75419 100644 --- a/src/cowboy_spdy.erl +++ b/src/cowboy_spdy.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% Copyright (c) 2013-2014, Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -12,10 +12,6 @@ %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -%% @doc SPDY protocol handler. -%% -%% Note that there is no need to monitor these processes when using Cowboy as -%% an application as it already supervises them under the listener supervisor. -module(cowboy_spdy). %% API. @@ -37,17 +33,22 @@ %% Internal transport functions. -export([name/0]). +-export([messages/0]). -export([recv/3]). -export([send/2]). -export([sendfile/2]). +-export([setopts/2]). + +-type streamid() :: non_neg_integer(). +-type socket() :: {pid(), streamid()}. -record(child, { - streamid :: non_neg_integer(), + streamid :: streamid(), pid :: pid(), input = nofin :: fin | nofin, in_buffer = <<>> :: binary(), - is_recv = false :: {true, {non_neg_integer(), pid()}, - pid(), non_neg_integer(), reference()} | false, + is_recv = false :: false | {active, socket(), pid()} + | {passive, socket(), pid(), non_neg_integer(), reference()}, output = nofin :: fin | nofin }). @@ -63,7 +64,7 @@ peer, zdef, zinf, - last_streamid = 0 :: non_neg_integer(), + last_streamid = 0 :: streamid(), children = [] :: [#child{}] }). @@ -75,7 +76,6 @@ %% API. -%% @doc Start a SPDY protocol process. -spec start_link(any(), inet:socket(), module(), any()) -> {ok, pid()}. start_link(Ref, Socket, Transport, Opts) -> proc_lib:start_link(?MODULE, init, @@ -83,15 +83,13 @@ start_link(Ref, Socket, Transport, Opts) -> %% Internal. -%% @doc Faster alternative to proplists:get_value/3. -%% @private +%% Faster alternative to proplists:get_value/3. get_value(Key, Opts, Default) -> case lists:keyfind(Key, 1, Opts) of {_, Value} -> Value; _ -> Default end. -%% @private -spec init(pid(), ranch:ref(), inet:socket(), module(), opts()) -> ok. init(Parent, Ref, Socket, Transport, Opts) -> process_flag(trap_exit, true), @@ -142,15 +140,15 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, TRef = erlang:send_after(Timeout, self(), {recv_timeout, FromSocket}), loop(replace_child(Child#child{ - is_recv={true, FromSocket, FromPid, Length, TRef}}, + is_recv={passive, FromSocket, FromPid, Length, TRef}}, State)) end; {recv_timeout, {Pid, StreamID}} when Pid =:= self() -> - Child = #child{is_recv={true, FromSocket, FromPid, _, _}} + Child = #child{is_recv={passive, FromSocket, FromPid, _, _}} = get_child(StreamID, State), FromPid ! {recv, FromSocket, {error, timeout}}, - loop(replace_child(Child#child{is_recv=false}, State)); + loop(replace_child(Child#child{is_recv=passive}, State)); {reply, {Pid, StreamID}, Status, Headers} when Pid =:= self() -> Child = #child{output=nofin} = get_child(StreamID, State), @@ -182,6 +180,22 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, Child = #child{output=nofin} = get_child(StreamID, State), data_from_file(State, StreamID, Filepath), loop(replace_child(Child#child{output=fin}, State)); + {active, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() -> + Child = #child{in_buffer=InBuffer, is_recv=false} + = get_child(StreamID, State), + case InBuffer of + <<>> -> + loop(replace_child(Child#child{ + is_recv={active, FromSocket, FromPid}}, State)); + _ -> + FromPid ! {spdy, FromSocket, InBuffer}, + loop(replace_child(Child#child{in_buffer= <<>>}, State)) + end; + {passive, FromSocket = {Pid, StreamID}, FromPid} when Pid =:= self() -> + Child = #child{is_recv=IsRecv} = get_child(StreamID, State), + %% Make sure we aren't in the middle of a recv call. + case IsRecv of false -> ok; {active, FromSocket, FromPid} -> ok end, + loop(replace_child(Child#child{is_recv=false}, State)); {'EXIT', Parent, Reason} -> exit(Reason); {'EXIT', Pid, _} -> @@ -209,6 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, terminate(State) end. +-spec system_continue(_, _, #state{}) -> ok. system_continue(_, _, State) -> loop(State). @@ -216,6 +231,7 @@ system_continue(_, _, State) -> system_terminate(Reason, _, _, _) -> exit(Reason). +-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::#state{}. system_code_change(Misc, _, _, _) -> {ok, Misc}. @@ -264,11 +280,14 @@ handle_frame(State, {data, StreamID, IsFin, Data}) -> Data2 = << Buffer/binary, Data/binary >>, IsFin2 = if IsFin -> fin; true -> nofin end, Child2 = case IsRecv of - {true, FromSocket, FromPid, 0, TRef} -> + {active, FromSocket, FromPid} -> + FromPid ! {spdy, FromSocket, Data}, + Child#child{input=IsFin2, is_recv=false}; + {passive, FromSocket, FromPid, 0, TRef} -> FromPid ! {recv, FromSocket, {ok, Data2}}, cancel_recv_timeout(StreamID, TRef), Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false}; - {true, FromSocket, FromPid, Length, TRef} + {passive, FromSocket, FromPid, Length, TRef} when byte_size(Data2) >= Length -> << Data3:Length/binary, Rest/binary >> = Data2, FromPid ! {recv, FromSocket, {ok, Data3}}, @@ -358,6 +377,11 @@ delete_child(Pid, State=#state{children=Children}) -> %% Request process. +-spec request_init(socket(), {inet:ip_address(), inet:port_number()}, + cowboy:onrequest_fun(), cowboy:onresponse_fun(), + cowboy_middleware:env(), [module()], + binary(), binary(), binary(), binary(), [{binary(), binary()}]) + -> ok. request_init(FakeSocket, Peer, OnRequest, OnResponse, Env, Middlewares, Method, Host, Path, Version, Headers) -> {Host2, Port} = cow_http:parse_fullhost(Host), @@ -394,7 +418,6 @@ execute(Req, Env, [Middleware|Tail]) -> cowboy_req:maybe_reply(Status, Req2) end. -%% @private -spec resume(cowboy_middleware:env(), [module()], module(), module(), [any()]) -> ok. resume(Env, Tail, Module, Function, Args) -> @@ -412,6 +435,7 @@ resume(Env, Tail, Module, Function, Args) -> %% Reply functions used by cowboy_req. +-spec reply(socket(), binary(), cowboy:http_headers(), iodata()) -> ok. reply(Socket = {Pid, _}, Status, Headers, Body) -> _ = case iolist_size(Body) of 0 -> Pid ! {reply, Socket, Status, Headers}; @@ -419,23 +443,33 @@ reply(Socket = {Pid, _}, Status, Headers, Body) -> end, ok. +-spec stream_reply(socket(), binary(), cowboy:http_headers()) -> ok. stream_reply(Socket = {Pid, _}, Status, Headers) -> _ = Pid ! {stream_reply, Socket, Status, Headers}, ok. +-spec stream_data(socket(), iodata()) -> ok. stream_data(Socket = {Pid, _}, Data) -> _ = Pid ! {stream_data, Socket, Data}, ok. +-spec stream_close(socket()) -> ok. stream_close(Socket = {Pid, _}) -> _ = Pid ! {stream_close, Socket}, ok. %% Internal transport functions. +-spec name() -> spdy. name() -> spdy. +-spec messages() -> {spdy, spdy_closed, spdy_error}. +messages() -> + {spdy, spdy_closed, spdy_error}. + +-spec recv(socket(), non_neg_integer(), timeout()) + -> {ok, binary()} | {error, timeout}. recv(Socket = {Pid, _}, Length, Timeout) -> _ = Pid ! {recv, Socket, self(), Length, Timeout}, receive @@ -443,12 +477,22 @@ recv(Socket = {Pid, _}, Length, Timeout) -> Ret end. +-spec send(socket(), iodata()) -> ok. send(Socket, Data) -> stream_data(Socket, Data). %% We don't wait for the result of the actual sendfile call, %% therefore we can't know how much was actually sent. %% This isn't a problem as we don't use this value in Cowboy. +-spec sendfile(socket(), file:name_all()) -> {ok, undefined}. sendfile(Socket = {Pid, _}, Filepath) -> _ = Pid ! {sendfile, Socket, Filepath}, {ok, undefined}. + +-spec setopts(inet:socket(), list()) -> ok. +setopts(Socket = {Pid, _}, [{active, once}]) -> + _ = Pid ! {active, Socket, self()}, + ok; +setopts(Socket = {Pid, _}, [{active, false}]) -> + _ = Pid ! {passive, Socket, self()}, + ok. |