diff options
author | Loïc Hoguin <[email protected]> | 2013-08-22 10:39:50 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2013-08-22 10:40:59 +0200 |
commit | 018e392f3a47a82bb41eb345933ec5cfa2490d38 (patch) | |
tree | 68b2d6bf25525b634d1c21ec5c60f10919a2e712 /src | |
download | gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.tar.gz gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.tar.bz2 gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.zip |
Initial commit with working SPDY client
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.app.src | 28 | ||||
-rw-r--r-- | src/gun.erl | 373 | ||||
-rw-r--r-- | src/gun_app.erl | 29 | ||||
-rw-r--r-- | src/gun_spdy.erl | 293 | ||||
-rw-r--r-- | src/gun_sup.erl | 39 |
5 files changed, 762 insertions, 0 deletions
diff --git a/src/gun.app.src b/src/gun.app.src new file mode 100644 index 0000000..9351b3e --- /dev/null +++ b/src/gun.app.src @@ -0,0 +1,28 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +{application, gun, [ + {description, "Asynchronous SPDY, HTTP and Websocket client."}, + {vsn, "0.1.0"}, + {modules, []}, + {registered, [gun_sup]}, + {applications, [ + kernel, + stdlib, + ranch, + ssl + ]}, + {mod, {gun_app, []}}, + {env, []} +]}. diff --git a/src/gun.erl b/src/gun.erl new file mode 100644 index 0000000..011eed1 --- /dev/null +++ b/src/gun.erl @@ -0,0 +1,373 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun). + +%% Connection. +-export([open/3]). +-export([close/1]). +-export([shutdown/1]). + +%% Requests. +-export([delete/2]). +-export([delete/3]). +-export([get/2]). +-export([get/3]). +-export([head/2]). +-export([head/3]). +-export([options/2]). +-export([options/3]). +-export([patch/3]). +-export([patch/4]). +-export([post/3]). +-export([post/4]). +-export([put/3]). +-export([put/4]). +-export([request/4]). +-export([request/5]). + +%% Responses. +-export([response/4]). +-export([response/5]). + +%% Streaming data. +-export([data/4]). + +%% Cancelling a stream. +-export([cancel/2]). + +%% Websocket. +-export([ws_upgrade/2]). +-export([ws_upgrade/3]). +-export([ws_send/2]). + +%% Internals. +-export([start_link/4]). +-export([init/5]). + +-record(state, { + parent, + owner, + host, + port, + keepalive, + type, + retry, + retry_timeout, + socket, + transport, + protocol, + protocol_state +}). + +%% Connection. + +open(Host, Port, Opts) -> + case open_opts(Opts) of + ok -> + supervisor:start_child(gun_sup, [self(), Host, Port, Opts]); + Error -> + Error + end. + +%% @private +open_opts([]) -> + ok; +open_opts([{keepalive, K}|Opts]) when is_integer(K) -> + open_opts(Opts); +open_opts([{retry, R}|Opts]) when is_integer(R) -> + open_opts(Opts); +open_opts([{retry_timeout, T}|Opts]) when is_integer(T) -> + open_opts(Opts); +open_opts([{type, T}|Opts]) + when T =:= tcp; T =:= tcp_spdy; T =:= ssl -> + open_opts(Opts); +open_opts([Opt|_]) -> + {error, {options, Opt}}. + +close(ServerPid) -> + supervisor:terminate_child(gun_sup, ServerPid). + +shutdown(ServerPid) -> + gen_server:call(ServerPid, {shutdown, self()}). + +%% Requests. + +delete(ServerPid, Path) -> + request(ServerPid, <<"DELETE">>, Path, []). +delete(ServerPid, Path, Headers) -> + request(ServerPid, <<"DELETE">>, Path, Headers). + +get(ServerPid, Path) -> + request(ServerPid, <<"GET">>, Path, []). +get(ServerPid, Path, Headers) -> + request(ServerPid, <<"GET">>, Path, Headers). + +head(ServerPid, Path) -> + request(ServerPid, <<"HEAD">>, Path, []). +head(ServerPid, Path, Headers) -> + request(ServerPid, <<"HEAD">>, Path, Headers). + +options(ServerPid, Path) -> + request(ServerPid, <<"OPTIONS">>, Path, []). +options(ServerPid, Path, Headers) -> + request(ServerPid, <<"OPTIONS">>, Path, Headers). + +patch(ServerPid, Path, Headers) -> + request(ServerPid, <<"PATCH">>, Path, Headers). +patch(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"PATCH">>, Path, Headers, Body). + +post(ServerPid, Path, Headers) -> + request(ServerPid, <<"POST">>, Path, Headers). +post(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"POST">>, Path, Headers, Body). + +put(ServerPid, Path, Headers) -> + request(ServerPid, <<"PUT">>, Path, Headers). +put(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"PUT">>, Path, Headers, Body). + +request(ServerPid, Method, Path, Headers) -> + StreamRef = make_ref(), + _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers}, + StreamRef. +request(ServerPid, Method, Path, Headers, Body) -> + StreamRef = make_ref(), + _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body}, + StreamRef. + +%% Responses. + +response(ServerPid, StreamRef, Status, Headers) -> + _ = ServerPid ! {response, self(), StreamRef, Status, Headers}, + ok. +response(ServerPid, StreamRef, Status, Headers, Body) -> + _ = ServerPid ! {response, self(), StreamRef, Status, Headers, Body}, + ok. + +%% Streaming data. + +data(ServerPid, StreamRef, IsFin, Data) -> + _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, + ok. + +%% Cancelling a stream. + +cancel(ServerPid, StreamRef) -> + _ = ServerPid ! {cancel, self(), StreamRef}, + ok. + +%% Websocket. + +ws_upgrade(ServerPid, Path) -> + ws_upgrade(ServerPid, Path, []). +ws_upgrade(ServerPid, Path, Headers) -> + _ = ServerPid ! {ws_upgrade, self(), Path, Headers}, + ok. + +ws_send(ServerPid, Payload) -> + _ = ServerPid ! {ws_send, self(), Payload}, + ok. + +%% Internals. + +start_link(Owner, Host, Port, Opts) -> + proc_lib:start_link(?MODULE, init, + [self(), Owner, Host, Port, Opts]). + +%% @doc Faster alternative to proplists:get_value/3. +%% @private +get_value(Key, Opts, Default) -> + case lists:keyfind(Key, 1, Opts) of + {_, Value} -> Value; + _ -> Default + end. + +init(Parent, Owner, Host, Port, Opts) -> + try + ok = proc_lib:init_ack(Parent, {ok, self()}), + Keepalive = get_value(keepalive, Opts, 5000), + Retry = get_value(retry, Opts, 5), + RetryTimeout = get_value(retry_timeout, Opts, 5000), + Type = get_value(type, Opts, ssl), + connect(#state{parent=Parent, owner=Owner, host=Host, port=Port, + keepalive=Keepalive, type=Type, + retry=Retry, retry_timeout=RetryTimeout}, Retry) + catch Class:Reason -> + Owner ! {gun, error, self(), {{Class, Reason, erlang:get_stacktrace()}, + "An unexpected error occurred."}} + end. + +connect(State=#state{owner=Owner, host=Host, port=Port, type=ssl}, Retries) -> + Transport = ranch_ssl, + Opts = [binary, {active, false}, {client_preferred_next_protocols, + client, [<<"spdy/3">>, <<"http/1.1">>], <<"http/1.1">>}], + case Transport:connect(Host, Port, Opts) of + {ok, Socket} -> + Protocol = gun_spdy, +%% @todo For some reasons this function doesn't work? Bug submitted. +% Protocol = case ssl:negotiated_next_protocol(Socket) of +% {ok, <<"spdy/3">>} -> gun_spdy; +% _ -> gun_http +% end, + ProtoState = Protocol:init(Owner, Socket, Transport), + before_loop(State#state{socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}); + {error, _} -> + retry_loop(State, Retries - 1) + end; +connect(State=#state{owner=Owner, host=Host, port=Port, type=Type}, Retries) -> + Transport = ranch_tcp, + Opts = [binary, {active, false}], + case Transport:connect(Host, Port, Opts) of + {ok, Socket} -> + Protocol = case Type of + tcp_spdy -> gun_spdy; + tcp -> gun_http + end, + ProtoState = Protocol:init(Owner, Socket, Transport), + before_loop(State#state{socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}); + {error, _} -> + retry_loop(State, Retries - 1) + end. + +%% Too many failures, give up. +retry_loop(_, 0) -> + error(too_many_retries); +retry_loop(State=#state{parent=Parent, retry_timeout=RetryTimeout}, Retries) -> + _ = erlang:send_after(RetryTimeout, self(), retry), + receive + retry -> + connect(State, Retries); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {retry_loop, [State, Retries]}) + end. + +before_loop(State=#state{keepalive=Keepalive}) -> + _ = erlang:send_after(Keepalive, self(), keepalive), + loop(State). + +loop(State=#state{parent=Parent, owner=Owner, host=Host, + retry=Retry, socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}) -> + {OK, Closed, Error} = Transport:messages(), + ok = Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + case Protocol:handle(Data, ProtoState) of + error -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, + transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + loop(State#state{protocol_state=ProtoState2}) + end; + {Closed, Socket} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {Error, Socket, _} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + keepalive -> + ProtoState2 = Protocol:keepalive(ProtoState), + before_loop(State#state{protocol_state=ProtoState2}); + {request, Owner, StreamRef, Method, Path, Headers} -> + ProtoState2 = Protocol:request(ProtoState, + StreamRef, Method, Host, Path, Headers), + loop(State#state{protocol_state=ProtoState2}); + {request, Owner, StreamRef, Method, Path, Headers, Body} -> + ProtoState2 = Protocol:request(ProtoState, + StreamRef, Method, Host, Path, Headers, Body), + loop(State#state{protocol_state=ProtoState2}); + {response, Owner, StreamRef, Status, Headers} -> + ProtoState2 = Protocol:response(ProtoState, + StreamRef, Status, Headers), + loop(State#state{protocol_state=ProtoState2}); + {response, Owner, StreamRef, Status, Headers, Body} -> + ProtoState2 = Protocol:response(ProtoState, + StreamRef, Status, Headers, Body), + loop(State#state{protocol_state=ProtoState2}); + {data, Owner, StreamRef, IsFin, Data} -> + ProtoState2 = Protocol:data(ProtoState, + StreamRef, IsFin, Data), + loop(State#state{protocol_state=ProtoState2}); + {cancel, Owner, StreamRef} -> + ProtoState2 = Protocol:cancel(ProtoState, StreamRef), + loop(State#state{protocol_state=ProtoState2}); + {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy -> + %% @todo + ProtoState2 = Protocol:ws_upgrade(ProtoState, + Path, Headers), + ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2}); + {shutdown, Owner} -> + %% @todo Protocol:shutdown? + ok; + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {loop, [State]}); + Any when is_tuple(Any), is_pid(element(2, Any)) -> + element(2, Any) ! {gun, error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + loop(State); + {ws_upgrade, _, _, _} -> + Owner ! {gun, error, self(), {badstate, + "Websocket over SPDY isn't supported."}}, + loop(State); + {ws_send, _, _} -> + Owner ! {gun, error, self(), {badstate, + "Connection needs to be upgraded to Websocket " + "before the gun:ws_send/1 function can be used."}}, + loop(State); + Any -> + error_logger:error_msg("Unexpected message: ~w~n", [Any]) + end. + +ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket, + transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> + {OK, Closed, Error} = Transport:messages(), + ok = Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + ProtoState2 = Protocol:handle(ProtoState, Data), + ws_loop(State#state{protocol_state=ProtoState2}); + {Closed, Socket} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {Error, Socket, _} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {ws_send, Owner, Frames} when is_list(Frames) -> + todo; %% @todo + {ws_send, Owner, Frame} -> + {todo, Frame}; %% @todo + {shutdown, Owner} -> + %% @todo Protocol:shutdown? + ok; + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {loop, [State]}); + Any when is_tuple(Any), is_pid(element(2, Any)) -> + element(2, Any) ! {gun, error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + loop(State); + Any -> + error_logger:error_msg("Unexpected message: ~w~n", [Any]) + end. diff --git a/src/gun_app.erl b/src/gun_app.erl new file mode 100644 index 0000000..35ff79a --- /dev/null +++ b/src/gun_app.erl @@ -0,0 +1,29 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(gun_app). +-behaviour(application). + +%% API. +-export([start/2]). +-export([stop/1]). + +%% API. + +start(_Type, _Args) -> + gun_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl new file mode 100644 index 0000000..6b0dd94 --- /dev/null +++ b/src/gun_spdy.erl @@ -0,0 +1,293 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_spdy). + +-export([init/3]). +-export([handle/2]). +-export([keepalive/1]). +-export([request/6]). +-export([request/7]). +-export([response/4]). +-export([response/5]). +-export([data/4]). +-export([cancel/2]). + +-record(stream, { + id :: non_neg_integer(), + ref :: reference(), + in :: boolean(), %% true = open + out :: boolean(), %% true = open + version :: binary() +}). + +-record(spdy_state, { + owner :: pid(), + socket :: inet:socket() | ssl:sslsocket(), + transport :: module(), + buffer = <<>> :: binary(), + zdef :: zlib:zstream(), + zinf :: zlib:zstream(), + streams = [] :: [#stream{}], + stream_id = 1 :: non_neg_integer(), + ping_id = 1 :: non_neg_integer() +}). + +init(Owner, Socket, Transport) -> + #spdy_state{owner=Owner, socket=Socket, transport=Transport, + zdef=cow_spdy:deflate_init(), zinf=cow_spdy:inflate_init()}. + +handle(Data, State=#spdy_state{buffer=Buffer}) -> + handle_loop(<< Buffer/binary, Data/binary >>, + State#spdy_state{buffer= <<>>}). + +handle_loop(Data, State=#spdy_state{zinf=Zinf}) -> + case cow_spdy:split(Data) of + {true, Frame, Rest} -> + P = cow_spdy:parse(Frame, Zinf), + handle_frame(Rest, State, P); + {false, Rest} -> + State#spdy_state{buffer=Rest} + end. + +handle_frame(Rest, State=#spdy_state{owner=Owner, + socket=Socket, transport=Transport}, + {data, StreamID, IsFin, Data}) -> + case get_stream_by_id(StreamID, State) of + #stream{in=false} -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, stream_already_closed)), + handle_loop(Rest, delete_stream(StreamID, State)); + S = #stream{ref=StreamRef} when IsFin -> + Owner ! {gun, data, self(), StreamRef, fin, Data}, + handle_loop(Rest, in_fin_stream(S, State)); + #stream{ref=StreamRef} -> + Owner ! {gun, data, self(), StreamRef, nofin, Data}, + handle_loop(Rest, State); + false -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, invalid_stream)), + handle_loop(Rest, delete_stream(StreamID, State)) + end; +handle_frame(Rest, State=#spdy_state{owner=Owner, + socket=Socket, transport=Transport}, + {syn_stream, StreamID, _, IsFin, IsUnidirectional, + _, Method, _, Host, Path, Version, Headers}) -> + case get_stream_by_id(StreamID, State) of + false -> + StreamRef = make_ref(), + Owner ! {gun, request, self(), StreamRef, + Method, Host, Path, Headers}, + handle_loop(Rest, new_stream(StreamID, StreamRef, + not IsFin, not IsUnidirectional, Version, State)); + #stream{} -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, stream_in_use)), + handle_loop(Rest, State) + end; +handle_frame(Rest, State=#spdy_state{owner=Owner, + socket=Socket, transport=Transport}, + {syn_reply, StreamID, IsFin, Status, _, Headers}) -> + case get_stream_by_id(StreamID, State) of + #stream{in=false} -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, stream_already_closed)), + handle_loop(Rest, delete_stream(StreamID, State)); + S = #stream{ref=StreamRef} -> + Owner ! {gun, response, self(), StreamRef, Status, Headers}, + if IsFin -> + handle_loop(Rest, in_fin_stream(S, State)); + true -> + handle_loop(Rest, State) + end; + false -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, invalid_stream)), + handle_loop(Rest, delete_stream(StreamID, State)) + end; +handle_frame(Rest, State=#spdy_state{owner=Owner}, + {rst_stream, StreamID, Status}) -> + case get_stream_by_id(StreamID, State) of + #stream{} -> + Owner ! {gun, error, self(), StreamID, Status}, + handle_loop(Rest, delete_stream(StreamID, State)); + false -> + handle_loop(Rest, State) + end; +handle_frame(Rest, State, {settings, ClearSettings, Settings}) -> + error_logger:error_msg("Ignored SETTINGS control frame ~p ~p~n", + [ClearSettings, Settings]), + handle_loop(Rest, State); +%% Server PING. +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, + {ping, PingID}) when PingID rem 2 =:= 0 -> + Transport:send(Socket, cow_spdy:ping(PingID)), + handle_loop(Rest, State); +%% Client PING. +handle_frame(Rest, State, {ping, _}) -> + handle_loop(Rest, State); +handle_frame(Rest, State, {goaway, LastGoodStreamID, Status}) -> + error_logger:error_msg("Ignored GOAWAY control frame ~p ~p~n", + [LastGoodStreamID, Status]), + handle_loop(Rest, State); +handle_frame(Rest, State, {headers, StreamID, IsFin, Headers}) -> + error_logger:error_msg("Ignored HEADERS control frame ~p ~p ~p~n", + [StreamID, IsFin, Headers]), + handle_loop(Rest, State); +handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) -> + error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n", + [StreamID, DeltaWindowSize]), + handle_loop(Rest, State); +handle_frame(_, #spdy_state{owner=Owner, socket=Socket, transport=Transport}, + {error, badprotocol}) -> + Owner ! {gun, error, self(), {badprotocol, + "The remote endpoint sent invalid data."}}, + %% @todo LastGoodStreamID + Transport:send(Socket, cow_spdy:goaway(0, protocol_error)), + error. + +keepalive(State=#spdy_state{socket=Socket, transport=Transport, + ping_id=PingID}) -> + Transport:send(Socket, cow_spdy:ping(PingID)), + State#spdy_state{ping_id=PingID + 2}. + +request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, + stream_id=StreamID}, StreamRef, Method, Host, Path, Headers) -> + Out = false =/= lists:keyfind(<<"content-type">>, 1, Headers), + Transport:send(Socket, cow_spdy:syn_stream(Zdef, + StreamID, 0, not Out, false, 0, + Method, <<"https">>, Host, Path, <<"HTTP/1.1">>, Headers)), + new_stream(StreamID, StreamRef, true, Out, <<"HTTP/1.1">>, + State#spdy_state{stream_id=StreamID + 2}). + +%% @todo Handle Body > 16MB. +request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, + stream_id=StreamID}, StreamRef, Method, Host, Path, Headers, Body) -> + Transport:send(Socket, [ + cow_spdy:syn_stream(Zdef, + StreamID, 0, false, false, 0, + Method, <<"https">>, Host, Path, <<"HTTP/1.1">>, Headers), + cow_spdy:data(StreamID, true, Body) + ]), + new_stream(StreamID, StreamRef, true, false, <<"HTTP/1.1">>, + State#spdy_state{stream_id=StreamID + 2}). + +response(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef}, + StreamRef, Status, Headers) -> + case get_stream_by_ref(StreamRef, State) of + #stream{out=false} -> + error_stream_closed(State); + S = #stream{id=StreamID, version=Version} -> + Out = false =/= lists:keyfind(<<"content-type">>, 1, Headers), + Transport:send(Socket, cow_spdy:syn_reply(Zdef, + StreamID, not Out, Status, Version, Headers)), + if Out -> + State; + true -> + out_fin_stream(S, State) + end; + false -> + error_stream_not_found(State) + end. + +response(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef}, + StreamRef, Status, Headers, Body) -> + case get_stream_by_ref(StreamRef, State) of + #stream{out=false} -> + error_stream_closed(State); + S = #stream{id=StreamID, version=Version} -> + Transport:send(Socket, [ + cow_spdy:syn_reply(Zdef, + StreamID, false, Status, Version, Headers), + cow_spdy:data(S#stream.id, true, Body) + ]), + out_fin_stream(S, State); + false -> + error_stream_not_found(State) + end. + +data(State=#spdy_state{socket=Socket, transport=Transport}, + StreamRef, IsFin, Data) -> + case get_stream_by_ref(StreamRef, State) of + #stream{out=false} -> + error_stream_closed(State); + S = #stream{} -> + IsFin2 = IsFin =:= fin, + Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)), + if IsFin2 -> + out_fin_stream(S, State); + true -> + State + end; + false -> + error_stream_not_found(State) + end. + +cancel(State=#spdy_state{socket=Socket, transport=Transport}, + StreamRef) -> + case get_stream_by_ref(StreamRef, State) of + #stream{id=StreamID} -> + Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)), + delete_stream(StreamID, State); + false -> + error_stream_not_found(State) + end. + +error_stream_closed(State=#spdy_state{owner=Owner}) -> + Owner ! {gun, error, self(), {badstate, + "The stream has already been closed."}}, + State. + +error_stream_not_found(State=#spdy_state{owner=Owner}) -> + Owner ! {gun, error, self(), {badstate, + "The stream cannot be found."}}, + State. + +%% Streams. + +new_stream(StreamID, StreamRef, In, Out, Version, + State=#spdy_state{streams=Streams}) -> + New = #stream{id=StreamID, ref=StreamRef, + in=In, out=Out, version=Version}, + State#spdy_state{streams=[New|Streams]}. + +get_stream_by_id(StreamID, #spdy_state{streams=Streams}) -> + case lists:keyfind(StreamID, #stream.id, Streams) of + false -> false; + S -> S + end. + +get_stream_by_ref(StreamRef, #spdy_state{streams=Streams}) -> + case lists:keyfind(StreamRef, #stream.id, Streams) of + false -> false; + S -> S + end. + +delete_stream(StreamID, State=#spdy_state{streams=Streams}) -> + Streams2 = lists:keydelete(StreamID, #stream.id, Streams), + State#spdy_state{streams=Streams2}. + +in_fin_stream(S=#stream{out=false}, State) -> + delete_stream(S#stream.id, State); +in_fin_stream(S, State=#spdy_state{streams=Streams}) -> + Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, + S#stream{in=false}), + State#spdy_state{streams=Streams2}. + +out_fin_stream(S=#stream{in=false}, State) -> + delete_stream(S#stream.id, State); +out_fin_stream(S, State=#spdy_state{streams=Streams}) -> + Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, + S#stream{out=false}), + State#spdy_state{streams=Streams2}. diff --git a/src/gun_sup.erl b/src/gun_sup.erl new file mode 100644 index 0000000..b7a9c82 --- /dev/null +++ b/src/gun_sup.erl @@ -0,0 +1,39 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(gun_sup). +-behaviour(supervisor). + +%% API. +-export([start_link/0]). + +%% supervisor. +-export([init/1]). + +-define(SUPERVISOR, ?MODULE). + +%% API. + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + +%% supervisor. + +init([]) -> + Procs = [ + {gun, {gun, start_link, []}, + transient, 5000, worker, [gun]}], + {ok, {{simple_one_for_one, 10, 10}, Procs}}. |