aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun.erl
blob: 011eed18170e4a1b139e68b100c7a39ba7e57476 (plain) (tree)




















































































































































































































































































































































































                                                                                                 
%% Copyright (c) 2013, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun).

%% Connection.
-export([open/3]).
-export([close/1]).
-export([shutdown/1]).

%% Requests.
-export([delete/2]).
-export([delete/3]).
-export([get/2]).
-export([get/3]).
-export([head/2]).
-export([head/3]).
-export([options/2]).
-export([options/3]).
-export([patch/3]).
-export([patch/4]).
-export([post/3]).
-export([post/4]).
-export([put/3]).
-export([put/4]).
-export([request/4]).
-export([request/5]).

%% Responses.
-export([response/4]).
-export([response/5]).

%% Streaming data.
-export([data/4]).

%% Cancelling a stream.
-export([cancel/2]).

%% Websocket.
-export([ws_upgrade/2]).
-export([ws_upgrade/3]).
-export([ws_send/2]).

%% Internals.
-export([start_link/4]).
-export([init/5]).

-record(state, {
	parent,
	owner,
	host,
	port,
	keepalive,
	type,
	retry,
	retry_timeout,
	socket,
	transport,
	protocol,
	protocol_state
}).

%% Connection.

open(Host, Port, Opts) ->
	case open_opts(Opts) of
		ok ->
			supervisor:start_child(gun_sup, [self(), Host, Port, Opts]);
		Error ->
			Error
	end.

%% @private
open_opts([]) ->
	ok;
open_opts([{keepalive, K}|Opts]) when is_integer(K) ->
	open_opts(Opts);
open_opts([{retry, R}|Opts]) when is_integer(R) ->
	open_opts(Opts);
open_opts([{retry_timeout, T}|Opts]) when is_integer(T) ->
	open_opts(Opts);
open_opts([{type, T}|Opts])
		when T =:= tcp; T =:= tcp_spdy; T =:= ssl ->
	open_opts(Opts);
open_opts([Opt|_]) ->
	{error, {options, Opt}}.

close(ServerPid) ->
	supervisor:terminate_child(gun_sup, ServerPid).

shutdown(ServerPid) ->
	gen_server:call(ServerPid, {shutdown, self()}).

%% Requests.

delete(ServerPid, Path) ->
	request(ServerPid, <<"DELETE">>, Path, []).
delete(ServerPid, Path, Headers) ->
	request(ServerPid, <<"DELETE">>, Path, Headers).

get(ServerPid, Path) ->
	request(ServerPid, <<"GET">>, Path, []).
get(ServerPid, Path, Headers) ->
	request(ServerPid, <<"GET">>, Path, Headers).

head(ServerPid, Path) ->
	request(ServerPid, <<"HEAD">>, Path, []).
head(ServerPid, Path, Headers) ->
	request(ServerPid, <<"HEAD">>, Path, Headers).

options(ServerPid, Path) ->
	request(ServerPid, <<"OPTIONS">>, Path, []).
options(ServerPid, Path, Headers) ->
	request(ServerPid, <<"OPTIONS">>, Path, Headers).

patch(ServerPid, Path, Headers) ->
	request(ServerPid, <<"PATCH">>, Path, Headers).
patch(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"PATCH">>, Path, Headers, Body).

post(ServerPid, Path, Headers) ->
	request(ServerPid, <<"POST">>, Path, Headers).
post(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"POST">>, Path, Headers, Body).

put(ServerPid, Path, Headers) ->
	request(ServerPid, <<"PUT">>, Path, Headers).
put(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"PUT">>, Path, Headers, Body).

request(ServerPid, Method, Path, Headers) ->
	StreamRef = make_ref(),
	_ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers},
	StreamRef.
request(ServerPid, Method, Path, Headers, Body) ->
	StreamRef = make_ref(),
	_ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body},
	StreamRef.

%% Responses.

response(ServerPid, StreamRef, Status, Headers) ->
	_ = ServerPid ! {response, self(), StreamRef, Status, Headers},
	ok.
response(ServerPid, StreamRef, Status, Headers, Body) ->
	_ = ServerPid ! {response, self(), StreamRef, Status, Headers, Body},
	ok.

%% Streaming data.

data(ServerPid, StreamRef, IsFin, Data) ->
	_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
	ok.

%% Cancelling a stream.

cancel(ServerPid, StreamRef) ->
	_ = ServerPid ! {cancel, self(), StreamRef},
	ok.

%% Websocket.

ws_upgrade(ServerPid, Path) ->
	ws_upgrade(ServerPid, Path, []).
ws_upgrade(ServerPid, Path, Headers) ->
	_ = ServerPid ! {ws_upgrade, self(), Path, Headers},
	ok.

ws_send(ServerPid, Payload) ->
	_ = ServerPid ! {ws_send, self(), Payload},
	ok.

%% Internals.

start_link(Owner, Host, Port, Opts) ->
	proc_lib:start_link(?MODULE, init,
		[self(), Owner, Host, Port, Opts]).

%% @doc Faster alternative to proplists:get_value/3.
%% @private
get_value(Key, Opts, Default) ->
	case lists:keyfind(Key, 1, Opts) of
		{_, Value} -> Value;
		_ -> Default
	end.

init(Parent, Owner, Host, Port, Opts) ->
	try
		ok = proc_lib:init_ack(Parent, {ok, self()}),
		Keepalive = get_value(keepalive, Opts, 5000),
		Retry = get_value(retry, Opts, 5),
		RetryTimeout = get_value(retry_timeout, Opts, 5000),
		Type = get_value(type, Opts, ssl),
		connect(#state{parent=Parent, owner=Owner, host=Host, port=Port,
			keepalive=Keepalive, type=Type,
			retry=Retry, retry_timeout=RetryTimeout}, Retry)
	catch Class:Reason ->
		Owner ! {gun, error, self(), {{Class, Reason, erlang:get_stacktrace()},
			"An unexpected error occurred."}}
	end.

connect(State=#state{owner=Owner, host=Host, port=Port, type=ssl}, Retries) ->
	Transport = ranch_ssl,
	Opts = [binary, {active, false}, {client_preferred_next_protocols,
		client, [<<"spdy/3">>, <<"http/1.1">>], <<"http/1.1">>}],
	case Transport:connect(Host, Port, Opts) of
		{ok, Socket} ->
			Protocol = gun_spdy,
%% @todo For some reasons this function doesn't work? Bug submitted.
%			Protocol = case ssl:negotiated_next_protocol(Socket) of
%				{ok, <<"spdy/3">>} -> gun_spdy;
%				_ -> gun_http
%			end,
			ProtoState = Protocol:init(Owner, Socket, Transport),
			before_loop(State#state{socket=Socket, transport=Transport,
				protocol=Protocol, protocol_state=ProtoState});
		{error, _} ->
			retry_loop(State, Retries - 1)
	end;
connect(State=#state{owner=Owner, host=Host, port=Port, type=Type}, Retries) ->
	Transport = ranch_tcp,
	Opts = [binary, {active, false}],
	case Transport:connect(Host, Port, Opts) of
		{ok, Socket} ->
			Protocol = case Type of
				tcp_spdy -> gun_spdy;
				tcp -> gun_http
			end,
			ProtoState = Protocol:init(Owner, Socket, Transport),
			before_loop(State#state{socket=Socket, transport=Transport,
				protocol=Protocol, protocol_state=ProtoState});
		{error, _} ->
			retry_loop(State, Retries - 1)
	end.

%% Too many failures, give up.
retry_loop(_, 0) ->
	error(too_many_retries);
retry_loop(State=#state{parent=Parent, retry_timeout=RetryTimeout}, Retries) ->
	_ = erlang:send_after(RetryTimeout, self(), retry),
	receive
		retry ->
			connect(State, Retries);
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{retry_loop, [State, Retries]})
	end.

before_loop(State=#state{keepalive=Keepalive}) ->
	_ = erlang:send_after(Keepalive, self(), keepalive),
	loop(State).

loop(State=#state{parent=Parent, owner=Owner, host=Host,
		retry=Retry, socket=Socket, transport=Transport,
		protocol=Protocol, protocol_state=ProtoState}) ->
	{OK, Closed, Error} = Transport:messages(),
	ok = Transport:setopts(Socket, [{active, once}]),
	receive
		{OK, Socket, Data} ->
			case Protocol:handle(Data, ProtoState) of
				error ->
					Transport:close(Socket),
					retry_loop(State#state{socket=undefined,
						transport=undefined, protocol=undefined}, Retry);
				ProtoState2 ->
					loop(State#state{protocol_state=ProtoState2})
			end;
		{Closed, Socket} ->
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		{Error, Socket, _} ->
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		keepalive ->
			ProtoState2 = Protocol:keepalive(ProtoState),
			before_loop(State#state{protocol_state=ProtoState2});
		{request, Owner, StreamRef, Method, Path, Headers} ->
			ProtoState2 = Protocol:request(ProtoState,
				StreamRef, Method, Host, Path, Headers),
			loop(State#state{protocol_state=ProtoState2});
		{request, Owner, StreamRef, Method, Path, Headers, Body} ->
			ProtoState2 = Protocol:request(ProtoState,
				StreamRef, Method, Host, Path, Headers, Body),
			loop(State#state{protocol_state=ProtoState2});
		{response, Owner, StreamRef, Status, Headers} ->
			ProtoState2 = Protocol:response(ProtoState,
				StreamRef, Status, Headers),
			loop(State#state{protocol_state=ProtoState2});
		{response, Owner, StreamRef, Status, Headers, Body} ->
			ProtoState2 = Protocol:response(ProtoState,
				StreamRef, Status, Headers, Body),
			loop(State#state{protocol_state=ProtoState2});
		{data, Owner, StreamRef, IsFin, Data} ->
			ProtoState2 = Protocol:data(ProtoState,
				StreamRef, IsFin, Data),
			loop(State#state{protocol_state=ProtoState2});
		{cancel, Owner, StreamRef} ->
			ProtoState2 = Protocol:cancel(ProtoState, StreamRef),
			loop(State#state{protocol_state=ProtoState2});
		{ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy ->
			%% @todo
			ProtoState2 = Protocol:ws_upgrade(ProtoState,
				Path, Headers),
			ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2});
		{shutdown, Owner} ->
			%% @todo Protocol:shutdown?
			ok;
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{loop, [State]});
		Any when is_tuple(Any), is_pid(element(2, Any)) ->
			element(2, Any) ! {gun, error, self(), {notowner,
				"Operations are restricted to the owner of the connection."}},
			loop(State);
		{ws_upgrade, _, _, _} ->
			Owner ! {gun, error, self(), {badstate,
				"Websocket over SPDY isn't supported."}},
			loop(State);
		{ws_send, _, _} ->
			Owner ! {gun, error, self(), {badstate,
				"Connection needs to be upgraded to Websocket "
				"before the gun:ws_send/1 function can be used."}},
			loop(State);
		Any ->
			error_logger:error_msg("Unexpected message: ~w~n", [Any])
	end.

ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket,
		transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
	{OK, Closed, Error} = Transport:messages(),
	ok = Transport:setopts(Socket, [{active, once}]),
	receive
		{OK, Socket, Data} ->
			ProtoState2 = Protocol:handle(ProtoState, Data),
			ws_loop(State#state{protocol_state=ProtoState2});
		{Closed, Socket} ->
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		{Error, Socket, _} ->
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		{ws_send, Owner, Frames} when is_list(Frames) ->
			todo; %% @todo
		{ws_send, Owner, Frame} ->
			{todo, Frame}; %% @todo
		{shutdown, Owner} ->
			%% @todo Protocol:shutdown?
			ok;
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{loop, [State]});
		Any when is_tuple(Any), is_pid(element(2, Any)) ->
			element(2, Any) ! {gun, error, self(), {notowner,
				"Operations are restricted to the owner of the connection."}},
			loop(State);
		Any ->
			error_logger:error_msg("Unexpected message: ~w~n", [Any])
	end.