aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2013-08-22 10:39:50 +0200
committerLoïc Hoguin <[email protected]>2013-08-22 10:40:59 +0200
commit018e392f3a47a82bb41eb345933ec5cfa2490d38 (patch)
tree68b2d6bf25525b634d1c21ec5c60f10919a2e712 /src/gun.erl
downloadgun-018e392f3a47a82bb41eb345933ec5cfa2490d38.tar.gz
gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.tar.bz2
gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.zip
Initial commit with working SPDY client
Diffstat (limited to 'src/gun.erl')
-rw-r--r--src/gun.erl373
1 files changed, 373 insertions, 0 deletions
diff --git a/src/gun.erl b/src/gun.erl
new file mode 100644
index 0000000..011eed1
--- /dev/null
+++ b/src/gun.erl
@@ -0,0 +1,373 @@
+%% Copyright (c) 2013, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun).
+
+%% Connection.
+-export([open/3]).
+-export([close/1]).
+-export([shutdown/1]).
+
+%% Requests.
+-export([delete/2]).
+-export([delete/3]).
+-export([get/2]).
+-export([get/3]).
+-export([head/2]).
+-export([head/3]).
+-export([options/2]).
+-export([options/3]).
+-export([patch/3]).
+-export([patch/4]).
+-export([post/3]).
+-export([post/4]).
+-export([put/3]).
+-export([put/4]).
+-export([request/4]).
+-export([request/5]).
+
+%% Responses.
+-export([response/4]).
+-export([response/5]).
+
+%% Streaming data.
+-export([data/4]).
+
+%% Cancelling a stream.
+-export([cancel/2]).
+
+%% Websocket.
+-export([ws_upgrade/2]).
+-export([ws_upgrade/3]).
+-export([ws_send/2]).
+
+%% Internals.
+-export([start_link/4]).
+-export([init/5]).
+
+-record(state, {
+ parent,
+ owner,
+ host,
+ port,
+ keepalive,
+ type,
+ retry,
+ retry_timeout,
+ socket,
+ transport,
+ protocol,
+ protocol_state
+}).
+
+%% Connection.
+
+open(Host, Port, Opts) ->
+ case open_opts(Opts) of
+ ok ->
+ supervisor:start_child(gun_sup, [self(), Host, Port, Opts]);
+ Error ->
+ Error
+ end.
+
+%% @private
+open_opts([]) ->
+ ok;
+open_opts([{keepalive, K}|Opts]) when is_integer(K) ->
+ open_opts(Opts);
+open_opts([{retry, R}|Opts]) when is_integer(R) ->
+ open_opts(Opts);
+open_opts([{retry_timeout, T}|Opts]) when is_integer(T) ->
+ open_opts(Opts);
+open_opts([{type, T}|Opts])
+ when T =:= tcp; T =:= tcp_spdy; T =:= ssl ->
+ open_opts(Opts);
+open_opts([Opt|_]) ->
+ {error, {options, Opt}}.
+
+close(ServerPid) ->
+ supervisor:terminate_child(gun_sup, ServerPid).
+
+shutdown(ServerPid) ->
+ gen_server:call(ServerPid, {shutdown, self()}).
+
+%% Requests.
+
+delete(ServerPid, Path) ->
+ request(ServerPid, <<"DELETE">>, Path, []).
+delete(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"DELETE">>, Path, Headers).
+
+get(ServerPid, Path) ->
+ request(ServerPid, <<"GET">>, Path, []).
+get(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"GET">>, Path, Headers).
+
+head(ServerPid, Path) ->
+ request(ServerPid, <<"HEAD">>, Path, []).
+head(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"HEAD">>, Path, Headers).
+
+options(ServerPid, Path) ->
+ request(ServerPid, <<"OPTIONS">>, Path, []).
+options(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"OPTIONS">>, Path, Headers).
+
+patch(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"PATCH">>, Path, Headers).
+patch(ServerPid, Path, Headers, Body) ->
+ request(ServerPid, <<"PATCH">>, Path, Headers, Body).
+
+post(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"POST">>, Path, Headers).
+post(ServerPid, Path, Headers, Body) ->
+ request(ServerPid, <<"POST">>, Path, Headers, Body).
+
+put(ServerPid, Path, Headers) ->
+ request(ServerPid, <<"PUT">>, Path, Headers).
+put(ServerPid, Path, Headers, Body) ->
+ request(ServerPid, <<"PUT">>, Path, Headers, Body).
+
+request(ServerPid, Method, Path, Headers) ->
+ StreamRef = make_ref(),
+ _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers},
+ StreamRef.
+request(ServerPid, Method, Path, Headers, Body) ->
+ StreamRef = make_ref(),
+ _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body},
+ StreamRef.
+
+%% Responses.
+
+response(ServerPid, StreamRef, Status, Headers) ->
+ _ = ServerPid ! {response, self(), StreamRef, Status, Headers},
+ ok.
+response(ServerPid, StreamRef, Status, Headers, Body) ->
+ _ = ServerPid ! {response, self(), StreamRef, Status, Headers, Body},
+ ok.
+
+%% Streaming data.
+
+data(ServerPid, StreamRef, IsFin, Data) ->
+ _ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
+ ok.
+
+%% Cancelling a stream.
+
+cancel(ServerPid, StreamRef) ->
+ _ = ServerPid ! {cancel, self(), StreamRef},
+ ok.
+
+%% Websocket.
+
+ws_upgrade(ServerPid, Path) ->
+ ws_upgrade(ServerPid, Path, []).
+ws_upgrade(ServerPid, Path, Headers) ->
+ _ = ServerPid ! {ws_upgrade, self(), Path, Headers},
+ ok.
+
+ws_send(ServerPid, Payload) ->
+ _ = ServerPid ! {ws_send, self(), Payload},
+ ok.
+
+%% Internals.
+
+start_link(Owner, Host, Port, Opts) ->
+ proc_lib:start_link(?MODULE, init,
+ [self(), Owner, Host, Port, Opts]).
+
+%% @doc Faster alternative to proplists:get_value/3.
+%% @private
+get_value(Key, Opts, Default) ->
+ case lists:keyfind(Key, 1, Opts) of
+ {_, Value} -> Value;
+ _ -> Default
+ end.
+
+init(Parent, Owner, Host, Port, Opts) ->
+ try
+ ok = proc_lib:init_ack(Parent, {ok, self()}),
+ Keepalive = get_value(keepalive, Opts, 5000),
+ Retry = get_value(retry, Opts, 5),
+ RetryTimeout = get_value(retry_timeout, Opts, 5000),
+ Type = get_value(type, Opts, ssl),
+ connect(#state{parent=Parent, owner=Owner, host=Host, port=Port,
+ keepalive=Keepalive, type=Type,
+ retry=Retry, retry_timeout=RetryTimeout}, Retry)
+ catch Class:Reason ->
+ Owner ! {gun, error, self(), {{Class, Reason, erlang:get_stacktrace()},
+ "An unexpected error occurred."}}
+ end.
+
+connect(State=#state{owner=Owner, host=Host, port=Port, type=ssl}, Retries) ->
+ Transport = ranch_ssl,
+ Opts = [binary, {active, false}, {client_preferred_next_protocols,
+ client, [<<"spdy/3">>, <<"http/1.1">>], <<"http/1.1">>}],
+ case Transport:connect(Host, Port, Opts) of
+ {ok, Socket} ->
+ Protocol = gun_spdy,
+%% @todo For some reasons this function doesn't work? Bug submitted.
+% Protocol = case ssl:negotiated_next_protocol(Socket) of
+% {ok, <<"spdy/3">>} -> gun_spdy;
+% _ -> gun_http
+% end,
+ ProtoState = Protocol:init(Owner, Socket, Transport),
+ before_loop(State#state{socket=Socket, transport=Transport,
+ protocol=Protocol, protocol_state=ProtoState});
+ {error, _} ->
+ retry_loop(State, Retries - 1)
+ end;
+connect(State=#state{owner=Owner, host=Host, port=Port, type=Type}, Retries) ->
+ Transport = ranch_tcp,
+ Opts = [binary, {active, false}],
+ case Transport:connect(Host, Port, Opts) of
+ {ok, Socket} ->
+ Protocol = case Type of
+ tcp_spdy -> gun_spdy;
+ tcp -> gun_http
+ end,
+ ProtoState = Protocol:init(Owner, Socket, Transport),
+ before_loop(State#state{socket=Socket, transport=Transport,
+ protocol=Protocol, protocol_state=ProtoState});
+ {error, _} ->
+ retry_loop(State, Retries - 1)
+ end.
+
+%% Too many failures, give up.
+retry_loop(_, 0) ->
+ error(too_many_retries);
+retry_loop(State=#state{parent=Parent, retry_timeout=RetryTimeout}, Retries) ->
+ _ = erlang:send_after(RetryTimeout, self(), retry),
+ receive
+ retry ->
+ connect(State, Retries);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {retry_loop, [State, Retries]})
+ end.
+
+before_loop(State=#state{keepalive=Keepalive}) ->
+ _ = erlang:send_after(Keepalive, self(), keepalive),
+ loop(State).
+
+loop(State=#state{parent=Parent, owner=Owner, host=Host,
+ retry=Retry, socket=Socket, transport=Transport,
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ {OK, Closed, Error} = Transport:messages(),
+ ok = Transport:setopts(Socket, [{active, once}]),
+ receive
+ {OK, Socket, Data} ->
+ case Protocol:handle(Data, ProtoState) of
+ error ->
+ Transport:close(Socket),
+ retry_loop(State#state{socket=undefined,
+ transport=undefined, protocol=undefined}, Retry);
+ ProtoState2 ->
+ loop(State#state{protocol_state=ProtoState2})
+ end;
+ {Closed, Socket} ->
+ Transport:close(Socket),
+ retry_loop(State#state{socket=undefined, transport=undefined,
+ protocol=undefined}, Retry);
+ {Error, Socket, _} ->
+ Transport:close(Socket),
+ retry_loop(State#state{socket=undefined, transport=undefined,
+ protocol=undefined}, Retry);
+ keepalive ->
+ ProtoState2 = Protocol:keepalive(ProtoState),
+ before_loop(State#state{protocol_state=ProtoState2});
+ {request, Owner, StreamRef, Method, Path, Headers} ->
+ ProtoState2 = Protocol:request(ProtoState,
+ StreamRef, Method, Host, Path, Headers),
+ loop(State#state{protocol_state=ProtoState2});
+ {request, Owner, StreamRef, Method, Path, Headers, Body} ->
+ ProtoState2 = Protocol:request(ProtoState,
+ StreamRef, Method, Host, Path, Headers, Body),
+ loop(State#state{protocol_state=ProtoState2});
+ {response, Owner, StreamRef, Status, Headers} ->
+ ProtoState2 = Protocol:response(ProtoState,
+ StreamRef, Status, Headers),
+ loop(State#state{protocol_state=ProtoState2});
+ {response, Owner, StreamRef, Status, Headers, Body} ->
+ ProtoState2 = Protocol:response(ProtoState,
+ StreamRef, Status, Headers, Body),
+ loop(State#state{protocol_state=ProtoState2});
+ {data, Owner, StreamRef, IsFin, Data} ->
+ ProtoState2 = Protocol:data(ProtoState,
+ StreamRef, IsFin, Data),
+ loop(State#state{protocol_state=ProtoState2});
+ {cancel, Owner, StreamRef} ->
+ ProtoState2 = Protocol:cancel(ProtoState, StreamRef),
+ loop(State#state{protocol_state=ProtoState2});
+ {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy ->
+ %% @todo
+ ProtoState2 = Protocol:ws_upgrade(ProtoState,
+ Path, Headers),
+ ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2});
+ {shutdown, Owner} ->
+ %% @todo Protocol:shutdown?
+ ok;
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {loop, [State]});
+ Any when is_tuple(Any), is_pid(element(2, Any)) ->
+ element(2, Any) ! {gun, error, self(), {notowner,
+ "Operations are restricted to the owner of the connection."}},
+ loop(State);
+ {ws_upgrade, _, _, _} ->
+ Owner ! {gun, error, self(), {badstate,
+ "Websocket over SPDY isn't supported."}},
+ loop(State);
+ {ws_send, _, _} ->
+ Owner ! {gun, error, self(), {badstate,
+ "Connection needs to be upgraded to Websocket "
+ "before the gun:ws_send/1 function can be used."}},
+ loop(State);
+ Any ->
+ error_logger:error_msg("Unexpected message: ~w~n", [Any])
+ end.
+
+ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket,
+ transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
+ {OK, Closed, Error} = Transport:messages(),
+ ok = Transport:setopts(Socket, [{active, once}]),
+ receive
+ {OK, Socket, Data} ->
+ ProtoState2 = Protocol:handle(ProtoState, Data),
+ ws_loop(State#state{protocol_state=ProtoState2});
+ {Closed, Socket} ->
+ Transport:close(Socket),
+ retry_loop(State#state{socket=undefined, transport=undefined,
+ protocol=undefined}, Retry);
+ {Error, Socket, _} ->
+ Transport:close(Socket),
+ retry_loop(State#state{socket=undefined, transport=undefined,
+ protocol=undefined}, Retry);
+ {ws_send, Owner, Frames} when is_list(Frames) ->
+ todo; %% @todo
+ {ws_send, Owner, Frame} ->
+ {todo, Frame}; %% @todo
+ {shutdown, Owner} ->
+ %% @todo Protocol:shutdown?
+ ok;
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {loop, [State]});
+ Any when is_tuple(Any), is_pid(element(2, Any)) ->
+ element(2, Any) ! {gun, error, self(), {notowner,
+ "Operations are restricted to the owner of the connection."}},
+ loop(State);
+ Any ->
+ error_logger:error_msg("Unexpected message: ~w~n", [Any])
+ end.