From 018e392f3a47a82bb41eb345933ec5cfa2490d38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 22 Aug 2013 10:39:50 +0200 Subject: Initial commit with working SPDY client --- src/gun.erl | 373 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 src/gun.erl (limited to 'src/gun.erl') diff --git a/src/gun.erl b/src/gun.erl new file mode 100644 index 0000000..011eed1 --- /dev/null +++ b/src/gun.erl @@ -0,0 +1,373 @@ +%% Copyright (c) 2013, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun). + +%% Connection. +-export([open/3]). +-export([close/1]). +-export([shutdown/1]). + +%% Requests. +-export([delete/2]). +-export([delete/3]). +-export([get/2]). +-export([get/3]). +-export([head/2]). +-export([head/3]). +-export([options/2]). +-export([options/3]). +-export([patch/3]). +-export([patch/4]). +-export([post/3]). +-export([post/4]). +-export([put/3]). +-export([put/4]). +-export([request/4]). +-export([request/5]). + +%% Responses. +-export([response/4]). +-export([response/5]). + +%% Streaming data. +-export([data/4]). + +%% Cancelling a stream. +-export([cancel/2]). + +%% Websocket. +-export([ws_upgrade/2]). +-export([ws_upgrade/3]). +-export([ws_send/2]). + +%% Internals. +-export([start_link/4]). +-export([init/5]). + +-record(state, { + parent, + owner, + host, + port, + keepalive, + type, + retry, + retry_timeout, + socket, + transport, + protocol, + protocol_state +}). + +%% Connection. + +open(Host, Port, Opts) -> + case open_opts(Opts) of + ok -> + supervisor:start_child(gun_sup, [self(), Host, Port, Opts]); + Error -> + Error + end. + +%% @private +open_opts([]) -> + ok; +open_opts([{keepalive, K}|Opts]) when is_integer(K) -> + open_opts(Opts); +open_opts([{retry, R}|Opts]) when is_integer(R) -> + open_opts(Opts); +open_opts([{retry_timeout, T}|Opts]) when is_integer(T) -> + open_opts(Opts); +open_opts([{type, T}|Opts]) + when T =:= tcp; T =:= tcp_spdy; T =:= ssl -> + open_opts(Opts); +open_opts([Opt|_]) -> + {error, {options, Opt}}. + +close(ServerPid) -> + supervisor:terminate_child(gun_sup, ServerPid). + +shutdown(ServerPid) -> + gen_server:call(ServerPid, {shutdown, self()}). + +%% Requests. + +delete(ServerPid, Path) -> + request(ServerPid, <<"DELETE">>, Path, []). +delete(ServerPid, Path, Headers) -> + request(ServerPid, <<"DELETE">>, Path, Headers). + +get(ServerPid, Path) -> + request(ServerPid, <<"GET">>, Path, []). +get(ServerPid, Path, Headers) -> + request(ServerPid, <<"GET">>, Path, Headers). + +head(ServerPid, Path) -> + request(ServerPid, <<"HEAD">>, Path, []). +head(ServerPid, Path, Headers) -> + request(ServerPid, <<"HEAD">>, Path, Headers). + +options(ServerPid, Path) -> + request(ServerPid, <<"OPTIONS">>, Path, []). +options(ServerPid, Path, Headers) -> + request(ServerPid, <<"OPTIONS">>, Path, Headers). + +patch(ServerPid, Path, Headers) -> + request(ServerPid, <<"PATCH">>, Path, Headers). +patch(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"PATCH">>, Path, Headers, Body). + +post(ServerPid, Path, Headers) -> + request(ServerPid, <<"POST">>, Path, Headers). +post(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"POST">>, Path, Headers, Body). + +put(ServerPid, Path, Headers) -> + request(ServerPid, <<"PUT">>, Path, Headers). +put(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"PUT">>, Path, Headers, Body). + +request(ServerPid, Method, Path, Headers) -> + StreamRef = make_ref(), + _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers}, + StreamRef. +request(ServerPid, Method, Path, Headers, Body) -> + StreamRef = make_ref(), + _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body}, + StreamRef. + +%% Responses. + +response(ServerPid, StreamRef, Status, Headers) -> + _ = ServerPid ! {response, self(), StreamRef, Status, Headers}, + ok. +response(ServerPid, StreamRef, Status, Headers, Body) -> + _ = ServerPid ! {response, self(), StreamRef, Status, Headers, Body}, + ok. + +%% Streaming data. + +data(ServerPid, StreamRef, IsFin, Data) -> + _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, + ok. + +%% Cancelling a stream. + +cancel(ServerPid, StreamRef) -> + _ = ServerPid ! {cancel, self(), StreamRef}, + ok. + +%% Websocket. + +ws_upgrade(ServerPid, Path) -> + ws_upgrade(ServerPid, Path, []). +ws_upgrade(ServerPid, Path, Headers) -> + _ = ServerPid ! {ws_upgrade, self(), Path, Headers}, + ok. + +ws_send(ServerPid, Payload) -> + _ = ServerPid ! {ws_send, self(), Payload}, + ok. + +%% Internals. + +start_link(Owner, Host, Port, Opts) -> + proc_lib:start_link(?MODULE, init, + [self(), Owner, Host, Port, Opts]). + +%% @doc Faster alternative to proplists:get_value/3. +%% @private +get_value(Key, Opts, Default) -> + case lists:keyfind(Key, 1, Opts) of + {_, Value} -> Value; + _ -> Default + end. + +init(Parent, Owner, Host, Port, Opts) -> + try + ok = proc_lib:init_ack(Parent, {ok, self()}), + Keepalive = get_value(keepalive, Opts, 5000), + Retry = get_value(retry, Opts, 5), + RetryTimeout = get_value(retry_timeout, Opts, 5000), + Type = get_value(type, Opts, ssl), + connect(#state{parent=Parent, owner=Owner, host=Host, port=Port, + keepalive=Keepalive, type=Type, + retry=Retry, retry_timeout=RetryTimeout}, Retry) + catch Class:Reason -> + Owner ! {gun, error, self(), {{Class, Reason, erlang:get_stacktrace()}, + "An unexpected error occurred."}} + end. + +connect(State=#state{owner=Owner, host=Host, port=Port, type=ssl}, Retries) -> + Transport = ranch_ssl, + Opts = [binary, {active, false}, {client_preferred_next_protocols, + client, [<<"spdy/3">>, <<"http/1.1">>], <<"http/1.1">>}], + case Transport:connect(Host, Port, Opts) of + {ok, Socket} -> + Protocol = gun_spdy, +%% @todo For some reasons this function doesn't work? Bug submitted. +% Protocol = case ssl:negotiated_next_protocol(Socket) of +% {ok, <<"spdy/3">>} -> gun_spdy; +% _ -> gun_http +% end, + ProtoState = Protocol:init(Owner, Socket, Transport), + before_loop(State#state{socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}); + {error, _} -> + retry_loop(State, Retries - 1) + end; +connect(State=#state{owner=Owner, host=Host, port=Port, type=Type}, Retries) -> + Transport = ranch_tcp, + Opts = [binary, {active, false}], + case Transport:connect(Host, Port, Opts) of + {ok, Socket} -> + Protocol = case Type of + tcp_spdy -> gun_spdy; + tcp -> gun_http + end, + ProtoState = Protocol:init(Owner, Socket, Transport), + before_loop(State#state{socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}); + {error, _} -> + retry_loop(State, Retries - 1) + end. + +%% Too many failures, give up. +retry_loop(_, 0) -> + error(too_many_retries); +retry_loop(State=#state{parent=Parent, retry_timeout=RetryTimeout}, Retries) -> + _ = erlang:send_after(RetryTimeout, self(), retry), + receive + retry -> + connect(State, Retries); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {retry_loop, [State, Retries]}) + end. + +before_loop(State=#state{keepalive=Keepalive}) -> + _ = erlang:send_after(Keepalive, self(), keepalive), + loop(State). + +loop(State=#state{parent=Parent, owner=Owner, host=Host, + retry=Retry, socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}) -> + {OK, Closed, Error} = Transport:messages(), + ok = Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + case Protocol:handle(Data, ProtoState) of + error -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, + transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + loop(State#state{protocol_state=ProtoState2}) + end; + {Closed, Socket} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {Error, Socket, _} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + keepalive -> + ProtoState2 = Protocol:keepalive(ProtoState), + before_loop(State#state{protocol_state=ProtoState2}); + {request, Owner, StreamRef, Method, Path, Headers} -> + ProtoState2 = Protocol:request(ProtoState, + StreamRef, Method, Host, Path, Headers), + loop(State#state{protocol_state=ProtoState2}); + {request, Owner, StreamRef, Method, Path, Headers, Body} -> + ProtoState2 = Protocol:request(ProtoState, + StreamRef, Method, Host, Path, Headers, Body), + loop(State#state{protocol_state=ProtoState2}); + {response, Owner, StreamRef, Status, Headers} -> + ProtoState2 = Protocol:response(ProtoState, + StreamRef, Status, Headers), + loop(State#state{protocol_state=ProtoState2}); + {response, Owner, StreamRef, Status, Headers, Body} -> + ProtoState2 = Protocol:response(ProtoState, + StreamRef, Status, Headers, Body), + loop(State#state{protocol_state=ProtoState2}); + {data, Owner, StreamRef, IsFin, Data} -> + ProtoState2 = Protocol:data(ProtoState, + StreamRef, IsFin, Data), + loop(State#state{protocol_state=ProtoState2}); + {cancel, Owner, StreamRef} -> + ProtoState2 = Protocol:cancel(ProtoState, StreamRef), + loop(State#state{protocol_state=ProtoState2}); + {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy -> + %% @todo + ProtoState2 = Protocol:ws_upgrade(ProtoState, + Path, Headers), + ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2}); + {shutdown, Owner} -> + %% @todo Protocol:shutdown? + ok; + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {loop, [State]}); + Any when is_tuple(Any), is_pid(element(2, Any)) -> + element(2, Any) ! {gun, error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + loop(State); + {ws_upgrade, _, _, _} -> + Owner ! {gun, error, self(), {badstate, + "Websocket over SPDY isn't supported."}}, + loop(State); + {ws_send, _, _} -> + Owner ! {gun, error, self(), {badstate, + "Connection needs to be upgraded to Websocket " + "before the gun:ws_send/1 function can be used."}}, + loop(State); + Any -> + error_logger:error_msg("Unexpected message: ~w~n", [Any]) + end. + +ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket, + transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> + {OK, Closed, Error} = Transport:messages(), + ok = Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + ProtoState2 = Protocol:handle(ProtoState, Data), + ws_loop(State#state{protocol_state=ProtoState2}); + {Closed, Socket} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {Error, Socket, _} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {ws_send, Owner, Frames} when is_list(Frames) -> + todo; %% @todo + {ws_send, Owner, Frame} -> + {todo, Frame}; %% @todo + {shutdown, Owner} -> + %% @todo Protocol:shutdown? + ok; + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {loop, [State]}); + Any when is_tuple(Any), is_pid(element(2, Any)) -> + element(2, Any) ! {gun, error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + loop(State); + Any -> + error_logger:error_msg("Unexpected message: ~w~n", [Any]) + end. -- cgit v1.2.3