diff options
author | Loïc Hoguin <[email protected]> | 2013-08-22 10:39:50 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2013-08-22 10:40:59 +0200 |
commit | 018e392f3a47a82bb41eb345933ec5cfa2490d38 (patch) | |
tree | 68b2d6bf25525b634d1c21ec5c60f10919a2e712 | |
download | gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.tar.gz gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.tar.bz2 gun-018e392f3a47a82bb41eb345933ec5cfa2490d38.zip |
Initial commit with working SPDY client
-rw-r--r-- | LICENSE | 13 | ||||
-rw-r--r-- | Makefile | 18 | ||||
-rw-r--r-- | README.md | 24 | ||||
-rw-r--r-- | erlang.mk | 224 | ||||
-rw-r--r-- | src/gun.app.src | 28 | ||||
-rw-r--r-- | src/gun.erl | 373 | ||||
-rw-r--r-- | src/gun_app.erl | 29 | ||||
-rw-r--r-- | src/gun_spdy.erl | 293 | ||||
-rw-r--r-- | src/gun_sup.erl | 39 | ||||
-rw-r--r-- | test/twitter_SUITE.erl | 70 |
10 files changed, 1111 insertions, 0 deletions
@@ -0,0 +1,13 @@ +Copyright (c) 2013, Loïc Hoguin <[email protected]> + +Permission to use, copy, modify, and/or distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9d8a30f --- /dev/null +++ b/Makefile @@ -0,0 +1,18 @@ +# See LICENSE for licensing information. + +PROJECT = gun + +# Options. + +CT_SUITES = twitter +PLT_APPS = ssl + +# Dependencies. + +DEPS = cowlib ranch +dep_cowlib = pkg://cowlib master +dep_ranch = pkg://ranch master + +# Standard targets. + +include erlang.mk diff --git a/README.md b/README.md new file mode 100644 index 0000000..fef5da4 --- /dev/null +++ b/README.md @@ -0,0 +1,24 @@ +Gun +=== + +Gun is an asynchronous SPDY, HTTP and Websocket client. + +Goals +----- + +Gun aims to provide an **easy to use** client compatible with +HTTP, SPDY and Websocket. + +Gun is **always connected**. It will maintain a permanent +connection to the server, reopening it as soon as the server +closes it, saving time for the requests that come in. + +All connections are **supervised** automatically, allowing +the developer to focus on writing his code without worrying. + +Support +------- + + * Official IRC Channel: #ninenines on irc.freenode.net + * [Mailing Lists](http://lists.ninenines.eu) + * [Commercial Support](http://ninenines.eu/support) diff --git a/erlang.mk b/erlang.mk new file mode 100644 index 0000000..f073155 --- /dev/null +++ b/erlang.mk @@ -0,0 +1,224 @@ +# Copyright (c) 2013, Loïc Hoguin <[email protected]> +# +# Permission to use, copy, modify, and/or distribute this software for any +# purpose with or without fee is hereby granted, provided that the above +# copyright notice and this permission notice appear in all copies. +# +# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +# Project. + +PROJECT ?= $(notdir $(CURDIR)) + +# Packages database file. + +PKG_FILE ?= $(CURDIR)/.erlang.mk.packages.v1 +export PKG_FILE + +PKG_FILE_URL ?= https://raw.github.com/extend/erlang.mk/master/packages.v1.txt + +define get_pkg_file + wget -O $(PKG_FILE) $(PKG_FILE_URL) +endef + +# Verbosity and tweaks. + +V ?= 0 + +appsrc_verbose_0 = @echo " APP " $(PROJECT).app.src; +appsrc_verbose = $(appsrc_verbose_$(V)) + +erlc_verbose_0 = @echo " ERLC " $(filter %.erl %.core,$(?F)); +erlc_verbose = $(erlc_verbose_$(V)) + +xyrl_verbose_0 = @echo " XYRL " $(filter %.xrl %.yrl,$(?F)); +xyrl_verbose = $(xyrl_verbose_$(V)) + +dtl_verbose_0 = @echo " DTL " $(filter %.dtl,$(?F)); +dtl_verbose = $(dtl_verbose_$(V)) + +gen_verbose_0 = @echo " GEN " $@; +gen_verbose = $(gen_verbose_$(V)) + +.PHONY: all clean-all app clean deps clean-deps docs clean-docs \ + build-tests tests build-plt dialyze + +# Deps directory. + +DEPS_DIR ?= $(CURDIR)/deps +export DEPS_DIR + +REBAR_DEPS_DIR = $(DEPS_DIR) +export REBAR_DEPS_DIR + +ALL_DEPS_DIRS = $(addprefix $(DEPS_DIR)/,$(DEPS)) +ALL_TEST_DEPS_DIRS = $(addprefix $(DEPS_DIR)/,$(TEST_DEPS)) + +# Application. + +ERLC_OPTS ?= -Werror +debug_info +warn_export_all +warn_export_vars \ + +warn_shadow_vars +warn_obsolete_guard # +bin_opt_info +warn_missing_spec +COMPILE_FIRST ?= +COMPILE_FIRST_PATHS = $(addprefix src/,$(addsuffix .erl,$(COMPILE_FIRST))) + +all: deps app + +clean-all: clean clean-deps clean-docs + $(gen_verbose) rm -rf .$(PROJECT).plt $(DEPS_DIR) logs + +app: ebin/$(PROJECT).app + $(eval MODULES := $(shell find ebin -name \*.beam \ + | sed 's/ebin\///;s/\.beam/,/' | sed '$$s/.$$//')) + $(appsrc_verbose) cat src/$(PROJECT).app.src \ + | sed 's/{modules, \[\]}/{modules, \[$(MODULES)\]}/' \ + > ebin/$(PROJECT).app + +define compile_erl + $(erlc_verbose) ERL_LIBS=$(DEPS_DIR) erlc -v $(ERLC_OPTS) -o ebin/ \ + -pa ebin/ -I include/ $(COMPILE_FIRST_PATHS) $(1) +endef + +define compile_xyrl + $(xyrl_verbose) erlc -v -o ebin/ $(1) + $(xyrl_verbose) erlc $(ERLC_OPTS) -o ebin/ ebin/*.erl + @rm ebin/*.erl +endef + +define compile_dtl + $(dtl_verbose) erl -noshell -pa ebin/ $(DEPS_DIR)/erlydtl/ebin/ -eval ' \ + Compile = fun(F) -> \ + Module = list_to_atom( \ + string:to_lower(filename:basename(F, ".dtl")) ++ "_dtl"), \ + erlydtl_compiler:compile(F, Module, [{out_dir, "ebin/"}]) \ + end, \ + _ = [Compile(F) || F <- string:tokens("$(1)", " ")], \ + init:stop()' +endef + +ebin/$(PROJECT).app: src/*.erl $(wildcard src/*.core) \ + $(wildcard src/*.xrl) $(wildcard src/*.yrl) \ + $(wildcard templates/*.dtl) + @mkdir -p ebin/ + $(if $(strip $(filter %.erl %.core,$?)), \ + $(call compile_erl,$(filter %.erl %.core,$?))) + $(if $(strip $(filter %.xrl %.yrl,$?)), \ + $(call compile_xyrl,$(filter %.xrl %.yrl,$?))) + $(if $(strip $(filter %.dtl,$?)), \ + $(call compile_dtl,$(filter %.dtl,$?))) + +clean: + $(gen_verbose) rm -rf ebin/ test/*.beam erl_crash.dump + +# Dependencies. + +define get_dep + @mkdir -p $(DEPS_DIR) +ifeq (,$(findstring pkg://,$(word 1,$(dep_$(1))))) + git clone -n -- $(word 1,$(dep_$(1))) $(DEPS_DIR)/$(1) +else + @if [ ! -f $(PKG_FILE) ]; then $(call get_pkg_file); fi + git clone -n -- `awk 'BEGIN { FS = "\t" }; \ + $$$$1 == "$(subst pkg://,,$(word 1,$(dep_$(1))))" { print $$$$2 }' \ + $(PKG_FILE)` $(DEPS_DIR)/$(1) +endif + cd $(DEPS_DIR)/$(1) ; git checkout -q $(word 2,$(dep_$(1))) +endef + +define dep_target +$(DEPS_DIR)/$(1): + $(call get_dep,$(1)) +endef + +$(foreach dep,$(DEPS),$(eval $(call dep_target,$(dep)))) + +deps: $(ALL_DEPS_DIRS) + @for dep in $(ALL_DEPS_DIRS) ; do \ + if [ -f $$dep/Makefile ] ; then \ + $(MAKE) -C $$dep ; \ + else \ + echo "include $(CURDIR)/erlang.mk" | $(MAKE) -f - -C $$dep ; \ + fi ; \ + done + +clean-deps: + @for dep in $(ALL_DEPS_DIRS) ; do $(MAKE) -C $$dep clean; done + +# Documentation. + +docs: clean-docs + $(gen_verbose) erl -noshell \ + -eval 'edoc:application($(PROJECT), ".", []), init:stop().' + +clean-docs: + $(gen_verbose) rm -f doc/*.css doc/*.html doc/*.png doc/edoc-info + +# Tests. + +$(foreach dep,$(TEST_DEPS),$(eval $(call dep_target,$(dep)))) + +build-test-deps: $(ALL_TEST_DEPS_DIRS) + @for dep in $(ALL_TEST_DEPS_DIRS) ; do $(MAKE) -C $$dep; done + +build-tests: build-test-deps + $(gen_verbose) ERL_LIBS=$(DEPS_DIR) erlc -v $(ERLC_OPTS) -o test/ \ + $(wildcard test/*.erl test/*/*.erl) -pa ebin/ + +CT_RUN = ct_run \ + -no_auto_compile \ + -noshell \ + -pa ebin $(DEPS_DIR)/*/ebin \ + -dir test \ + -logdir logs +# -cover test/cover.spec + +CT_SUITES ?= +CT_SUITES_FULL = $(addsuffix _SUITE,$(CT_SUITES)) + +tests: ERLC_OPTS += -DTEST=1 +'{parse_transform, eunit_autoexport}' +tests: clean deps app build-tests + @mkdir -p logs/ + @$(CT_RUN) -suite $(CT_SUITES_FULL) + $(gen_verbose) rm -f test/*.beam + +# Dialyzer. + +PLT_APPS ?= +DIALYZER_OPTS ?= -Werror_handling -Wrace_conditions \ + -Wunmatched_returns # -Wunderspecs + +build-plt: deps app + @dialyzer --build_plt --output_plt .$(PROJECT).plt \ + --apps erts kernel stdlib $(PLT_APPS) $(ALL_DEPS_DIRS) + +dialyze: + @dialyzer --src src --plt .$(PROJECT).plt --no_native $(DIALYZER_OPTS) + +# Packages. + +$(PKG_FILE): + @$(call get_pkg_file) + +pkg-list: $(PKG_FILE) + @cat $(PKG_FILE) | awk 'BEGIN { FS = "\t" }; { print \ + "Name:\t\t" $$1 "\n" \ + "Repository:\t" $$2 "\n" \ + "Website:\t" $$3 "\n" \ + "Description:\t" $$4 "\n" }' + +ifdef q +pkg-search: $(PKG_FILE) + @cat $(PKG_FILE) | grep -i ${q} | awk 'BEGIN { FS = "\t" }; { print \ + "Name:\t\t" $$1 "\n" \ + "Repository:\t" $$2 "\n" \ + "Website:\t" $$3 "\n" \ + "Description:\t" $$4 "\n" }' +else +pkg-search: + @echo "Usage: make pkg-search q=STRING" +endif diff --git a/src/gun.app.src b/src/gun.app.src new file mode 100644 index 0000000..9351b3e --- /dev/null +++ b/src/gun.app.src @@ -0,0 +1,28 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +{application, gun, [ + {description, "Asynchronous SPDY, HTTP and Websocket client."}, + {vsn, "0.1.0"}, + {modules, []}, + {registered, [gun_sup]}, + {applications, [ + kernel, + stdlib, + ranch, + ssl + ]}, + {mod, {gun_app, []}}, + {env, []} +]}. diff --git a/src/gun.erl b/src/gun.erl new file mode 100644 index 0000000..011eed1 --- /dev/null +++ b/src/gun.erl @@ -0,0 +1,373 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun). + +%% Connection. +-export([open/3]). +-export([close/1]). +-export([shutdown/1]). + +%% Requests. +-export([delete/2]). +-export([delete/3]). +-export([get/2]). +-export([get/3]). +-export([head/2]). +-export([head/3]). +-export([options/2]). +-export([options/3]). +-export([patch/3]). +-export([patch/4]). +-export([post/3]). +-export([post/4]). +-export([put/3]). +-export([put/4]). +-export([request/4]). +-export([request/5]). + +%% Responses. +-export([response/4]). +-export([response/5]). + +%% Streaming data. +-export([data/4]). + +%% Cancelling a stream. +-export([cancel/2]). + +%% Websocket. +-export([ws_upgrade/2]). +-export([ws_upgrade/3]). +-export([ws_send/2]). + +%% Internals. +-export([start_link/4]). +-export([init/5]). + +-record(state, { + parent, + owner, + host, + port, + keepalive, + type, + retry, + retry_timeout, + socket, + transport, + protocol, + protocol_state +}). + +%% Connection. + +open(Host, Port, Opts) -> + case open_opts(Opts) of + ok -> + supervisor:start_child(gun_sup, [self(), Host, Port, Opts]); + Error -> + Error + end. + +%% @private +open_opts([]) -> + ok; +open_opts([{keepalive, K}|Opts]) when is_integer(K) -> + open_opts(Opts); +open_opts([{retry, R}|Opts]) when is_integer(R) -> + open_opts(Opts); +open_opts([{retry_timeout, T}|Opts]) when is_integer(T) -> + open_opts(Opts); +open_opts([{type, T}|Opts]) + when T =:= tcp; T =:= tcp_spdy; T =:= ssl -> + open_opts(Opts); +open_opts([Opt|_]) -> + {error, {options, Opt}}. + +close(ServerPid) -> + supervisor:terminate_child(gun_sup, ServerPid). + +shutdown(ServerPid) -> + gen_server:call(ServerPid, {shutdown, self()}). + +%% Requests. + +delete(ServerPid, Path) -> + request(ServerPid, <<"DELETE">>, Path, []). +delete(ServerPid, Path, Headers) -> + request(ServerPid, <<"DELETE">>, Path, Headers). + +get(ServerPid, Path) -> + request(ServerPid, <<"GET">>, Path, []). +get(ServerPid, Path, Headers) -> + request(ServerPid, <<"GET">>, Path, Headers). + +head(ServerPid, Path) -> + request(ServerPid, <<"HEAD">>, Path, []). +head(ServerPid, Path, Headers) -> + request(ServerPid, <<"HEAD">>, Path, Headers). + +options(ServerPid, Path) -> + request(ServerPid, <<"OPTIONS">>, Path, []). +options(ServerPid, Path, Headers) -> + request(ServerPid, <<"OPTIONS">>, Path, Headers). + +patch(ServerPid, Path, Headers) -> + request(ServerPid, <<"PATCH">>, Path, Headers). +patch(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"PATCH">>, Path, Headers, Body). + +post(ServerPid, Path, Headers) -> + request(ServerPid, <<"POST">>, Path, Headers). +post(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"POST">>, Path, Headers, Body). + +put(ServerPid, Path, Headers) -> + request(ServerPid, <<"PUT">>, Path, Headers). +put(ServerPid, Path, Headers, Body) -> + request(ServerPid, <<"PUT">>, Path, Headers, Body). + +request(ServerPid, Method, Path, Headers) -> + StreamRef = make_ref(), + _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers}, + StreamRef. +request(ServerPid, Method, Path, Headers, Body) -> + StreamRef = make_ref(), + _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body}, + StreamRef. + +%% Responses. + +response(ServerPid, StreamRef, Status, Headers) -> + _ = ServerPid ! {response, self(), StreamRef, Status, Headers}, + ok. +response(ServerPid, StreamRef, Status, Headers, Body) -> + _ = ServerPid ! {response, self(), StreamRef, Status, Headers, Body}, + ok. + +%% Streaming data. + +data(ServerPid, StreamRef, IsFin, Data) -> + _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, + ok. + +%% Cancelling a stream. + +cancel(ServerPid, StreamRef) -> + _ = ServerPid ! {cancel, self(), StreamRef}, + ok. + +%% Websocket. + +ws_upgrade(ServerPid, Path) -> + ws_upgrade(ServerPid, Path, []). +ws_upgrade(ServerPid, Path, Headers) -> + _ = ServerPid ! {ws_upgrade, self(), Path, Headers}, + ok. + +ws_send(ServerPid, Payload) -> + _ = ServerPid ! {ws_send, self(), Payload}, + ok. + +%% Internals. + +start_link(Owner, Host, Port, Opts) -> + proc_lib:start_link(?MODULE, init, + [self(), Owner, Host, Port, Opts]). + +%% @doc Faster alternative to proplists:get_value/3. +%% @private +get_value(Key, Opts, Default) -> + case lists:keyfind(Key, 1, Opts) of + {_, Value} -> Value; + _ -> Default + end. + +init(Parent, Owner, Host, Port, Opts) -> + try + ok = proc_lib:init_ack(Parent, {ok, self()}), + Keepalive = get_value(keepalive, Opts, 5000), + Retry = get_value(retry, Opts, 5), + RetryTimeout = get_value(retry_timeout, Opts, 5000), + Type = get_value(type, Opts, ssl), + connect(#state{parent=Parent, owner=Owner, host=Host, port=Port, + keepalive=Keepalive, type=Type, + retry=Retry, retry_timeout=RetryTimeout}, Retry) + catch Class:Reason -> + Owner ! {gun, error, self(), {{Class, Reason, erlang:get_stacktrace()}, + "An unexpected error occurred."}} + end. + +connect(State=#state{owner=Owner, host=Host, port=Port, type=ssl}, Retries) -> + Transport = ranch_ssl, + Opts = [binary, {active, false}, {client_preferred_next_protocols, + client, [<<"spdy/3">>, <<"http/1.1">>], <<"http/1.1">>}], + case Transport:connect(Host, Port, Opts) of + {ok, Socket} -> + Protocol = gun_spdy, +%% @todo For some reasons this function doesn't work? Bug submitted. +% Protocol = case ssl:negotiated_next_protocol(Socket) of +% {ok, <<"spdy/3">>} -> gun_spdy; +% _ -> gun_http +% end, + ProtoState = Protocol:init(Owner, Socket, Transport), + before_loop(State#state{socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}); + {error, _} -> + retry_loop(State, Retries - 1) + end; +connect(State=#state{owner=Owner, host=Host, port=Port, type=Type}, Retries) -> + Transport = ranch_tcp, + Opts = [binary, {active, false}], + case Transport:connect(Host, Port, Opts) of + {ok, Socket} -> + Protocol = case Type of + tcp_spdy -> gun_spdy; + tcp -> gun_http + end, + ProtoState = Protocol:init(Owner, Socket, Transport), + before_loop(State#state{socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}); + {error, _} -> + retry_loop(State, Retries - 1) + end. + +%% Too many failures, give up. +retry_loop(_, 0) -> + error(too_many_retries); +retry_loop(State=#state{parent=Parent, retry_timeout=RetryTimeout}, Retries) -> + _ = erlang:send_after(RetryTimeout, self(), retry), + receive + retry -> + connect(State, Retries); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {retry_loop, [State, Retries]}) + end. + +before_loop(State=#state{keepalive=Keepalive}) -> + _ = erlang:send_after(Keepalive, self(), keepalive), + loop(State). + +loop(State=#state{parent=Parent, owner=Owner, host=Host, + retry=Retry, socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}) -> + {OK, Closed, Error} = Transport:messages(), + ok = Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + case Protocol:handle(Data, ProtoState) of + error -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, + transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + loop(State#state{protocol_state=ProtoState2}) + end; + {Closed, Socket} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {Error, Socket, _} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + keepalive -> + ProtoState2 = Protocol:keepalive(ProtoState), + before_loop(State#state{protocol_state=ProtoState2}); + {request, Owner, StreamRef, Method, Path, Headers} -> + ProtoState2 = Protocol:request(ProtoState, + StreamRef, Method, Host, Path, Headers), + loop(State#state{protocol_state=ProtoState2}); + {request, Owner, StreamRef, Method, Path, Headers, Body} -> + ProtoState2 = Protocol:request(ProtoState, + StreamRef, Method, Host, Path, Headers, Body), + loop(State#state{protocol_state=ProtoState2}); + {response, Owner, StreamRef, Status, Headers} -> + ProtoState2 = Protocol:response(ProtoState, + StreamRef, Status, Headers), + loop(State#state{protocol_state=ProtoState2}); + {response, Owner, StreamRef, Status, Headers, Body} -> + ProtoState2 = Protocol:response(ProtoState, + StreamRef, Status, Headers, Body), + loop(State#state{protocol_state=ProtoState2}); + {data, Owner, StreamRef, IsFin, Data} -> + ProtoState2 = Protocol:data(ProtoState, + StreamRef, IsFin, Data), + loop(State#state{protocol_state=ProtoState2}); + {cancel, Owner, StreamRef} -> + ProtoState2 = Protocol:cancel(ProtoState, StreamRef), + loop(State#state{protocol_state=ProtoState2}); + {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy -> + %% @todo + ProtoState2 = Protocol:ws_upgrade(ProtoState, + Path, Headers), + ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2}); + {shutdown, Owner} -> + %% @todo Protocol:shutdown? + ok; + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {loop, [State]}); + Any when is_tuple(Any), is_pid(element(2, Any)) -> + element(2, Any) ! {gun, error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + loop(State); + {ws_upgrade, _, _, _} -> + Owner ! {gun, error, self(), {badstate, + "Websocket over SPDY isn't supported."}}, + loop(State); + {ws_send, _, _} -> + Owner ! {gun, error, self(), {badstate, + "Connection needs to be upgraded to Websocket " + "before the gun:ws_send/1 function can be used."}}, + loop(State); + Any -> + error_logger:error_msg("Unexpected message: ~w~n", [Any]) + end. + +ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket, + transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> + {OK, Closed, Error} = Transport:messages(), + ok = Transport:setopts(Socket, [{active, once}]), + receive + {OK, Socket, Data} -> + ProtoState2 = Protocol:handle(ProtoState, Data), + ws_loop(State#state{protocol_state=ProtoState2}); + {Closed, Socket} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {Error, Socket, _} -> + Transport:close(Socket), + retry_loop(State#state{socket=undefined, transport=undefined, + protocol=undefined}, Retry); + {ws_send, Owner, Frames} when is_list(Frames) -> + todo; %% @todo + {ws_send, Owner, Frame} -> + {todo, Frame}; %% @todo + {shutdown, Owner} -> + %% @todo Protocol:shutdown? + ok; + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {loop, [State]}); + Any when is_tuple(Any), is_pid(element(2, Any)) -> + element(2, Any) ! {gun, error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + loop(State); + Any -> + error_logger:error_msg("Unexpected message: ~w~n", [Any]) + end. diff --git a/src/gun_app.erl b/src/gun_app.erl new file mode 100644 index 0000000..35ff79a --- /dev/null +++ b/src/gun_app.erl @@ -0,0 +1,29 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(gun_app). +-behaviour(application). + +%% API. +-export([start/2]). +-export([stop/1]). + +%% API. + +start(_Type, _Args) -> + gun_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl new file mode 100644 index 0000000..6b0dd94 --- /dev/null +++ b/src/gun_spdy.erl @@ -0,0 +1,293 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_spdy). + +-export([init/3]). +-export([handle/2]). +-export([keepalive/1]). +-export([request/6]). +-export([request/7]). +-export([response/4]). +-export([response/5]). +-export([data/4]). +-export([cancel/2]). + +-record(stream, { + id :: non_neg_integer(), + ref :: reference(), + in :: boolean(), %% true = open + out :: boolean(), %% true = open + version :: binary() +}). + +-record(spdy_state, { + owner :: pid(), + socket :: inet:socket() | ssl:sslsocket(), + transport :: module(), + buffer = <<>> :: binary(), + zdef :: zlib:zstream(), + zinf :: zlib:zstream(), + streams = [] :: [#stream{}], + stream_id = 1 :: non_neg_integer(), + ping_id = 1 :: non_neg_integer() +}). + +init(Owner, Socket, Transport) -> + #spdy_state{owner=Owner, socket=Socket, transport=Transport, + zdef=cow_spdy:deflate_init(), zinf=cow_spdy:inflate_init()}. + +handle(Data, State=#spdy_state{buffer=Buffer}) -> + handle_loop(<< Buffer/binary, Data/binary >>, + State#spdy_state{buffer= <<>>}). + +handle_loop(Data, State=#spdy_state{zinf=Zinf}) -> + case cow_spdy:split(Data) of + {true, Frame, Rest} -> + P = cow_spdy:parse(Frame, Zinf), + handle_frame(Rest, State, P); + {false, Rest} -> + State#spdy_state{buffer=Rest} + end. + +handle_frame(Rest, State=#spdy_state{owner=Owner, + socket=Socket, transport=Transport}, + {data, StreamID, IsFin, Data}) -> + case get_stream_by_id(StreamID, State) of + #stream{in=false} -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, stream_already_closed)), + handle_loop(Rest, delete_stream(StreamID, State)); + S = #stream{ref=StreamRef} when IsFin -> + Owner ! {gun, data, self(), StreamRef, fin, Data}, + handle_loop(Rest, in_fin_stream(S, State)); + #stream{ref=StreamRef} -> + Owner ! {gun, data, self(), StreamRef, nofin, Data}, + handle_loop(Rest, State); + false -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, invalid_stream)), + handle_loop(Rest, delete_stream(StreamID, State)) + end; +handle_frame(Rest, State=#spdy_state{owner=Owner, + socket=Socket, transport=Transport}, + {syn_stream, StreamID, _, IsFin, IsUnidirectional, + _, Method, _, Host, Path, Version, Headers}) -> + case get_stream_by_id(StreamID, State) of + false -> + StreamRef = make_ref(), + Owner ! {gun, request, self(), StreamRef, + Method, Host, Path, Headers}, + handle_loop(Rest, new_stream(StreamID, StreamRef, + not IsFin, not IsUnidirectional, Version, State)); + #stream{} -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, stream_in_use)), + handle_loop(Rest, State) + end; +handle_frame(Rest, State=#spdy_state{owner=Owner, + socket=Socket, transport=Transport}, + {syn_reply, StreamID, IsFin, Status, _, Headers}) -> + case get_stream_by_id(StreamID, State) of + #stream{in=false} -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, stream_already_closed)), + handle_loop(Rest, delete_stream(StreamID, State)); + S = #stream{ref=StreamRef} -> + Owner ! {gun, response, self(), StreamRef, Status, Headers}, + if IsFin -> + handle_loop(Rest, in_fin_stream(S, State)); + true -> + handle_loop(Rest, State) + end; + false -> + Transport:send(Socket, + cow_spdy:rst_stream(StreamID, invalid_stream)), + handle_loop(Rest, delete_stream(StreamID, State)) + end; +handle_frame(Rest, State=#spdy_state{owner=Owner}, + {rst_stream, StreamID, Status}) -> + case get_stream_by_id(StreamID, State) of + #stream{} -> + Owner ! {gun, error, self(), StreamID, Status}, + handle_loop(Rest, delete_stream(StreamID, State)); + false -> + handle_loop(Rest, State) + end; +handle_frame(Rest, State, {settings, ClearSettings, Settings}) -> + error_logger:error_msg("Ignored SETTINGS control frame ~p ~p~n", + [ClearSettings, Settings]), + handle_loop(Rest, State); +%% Server PING. +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, + {ping, PingID}) when PingID rem 2 =:= 0 -> + Transport:send(Socket, cow_spdy:ping(PingID)), + handle_loop(Rest, State); +%% Client PING. +handle_frame(Rest, State, {ping, _}) -> + handle_loop(Rest, State); +handle_frame(Rest, State, {goaway, LastGoodStreamID, Status}) -> + error_logger:error_msg("Ignored GOAWAY control frame ~p ~p~n", + [LastGoodStreamID, Status]), + handle_loop(Rest, State); +handle_frame(Rest, State, {headers, StreamID, IsFin, Headers}) -> + error_logger:error_msg("Ignored HEADERS control frame ~p ~p ~p~n", + [StreamID, IsFin, Headers]), + handle_loop(Rest, State); +handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) -> + error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n", + [StreamID, DeltaWindowSize]), + handle_loop(Rest, State); +handle_frame(_, #spdy_state{owner=Owner, socket=Socket, transport=Transport}, + {error, badprotocol}) -> + Owner ! {gun, error, self(), {badprotocol, + "The remote endpoint sent invalid data."}}, + %% @todo LastGoodStreamID + Transport:send(Socket, cow_spdy:goaway(0, protocol_error)), + error. + +keepalive(State=#spdy_state{socket=Socket, transport=Transport, + ping_id=PingID}) -> + Transport:send(Socket, cow_spdy:ping(PingID)), + State#spdy_state{ping_id=PingID + 2}. + +request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, + stream_id=StreamID}, StreamRef, Method, Host, Path, Headers) -> + Out = false =/= lists:keyfind(<<"content-type">>, 1, Headers), + Transport:send(Socket, cow_spdy:syn_stream(Zdef, + StreamID, 0, not Out, false, 0, + Method, <<"https">>, Host, Path, <<"HTTP/1.1">>, Headers)), + new_stream(StreamID, StreamRef, true, Out, <<"HTTP/1.1">>, + State#spdy_state{stream_id=StreamID + 2}). + +%% @todo Handle Body > 16MB. +request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, + stream_id=StreamID}, StreamRef, Method, Host, Path, Headers, Body) -> + Transport:send(Socket, [ + cow_spdy:syn_stream(Zdef, + StreamID, 0, false, false, 0, + Method, <<"https">>, Host, Path, <<"HTTP/1.1">>, Headers), + cow_spdy:data(StreamID, true, Body) + ]), + new_stream(StreamID, StreamRef, true, false, <<"HTTP/1.1">>, + State#spdy_state{stream_id=StreamID + 2}). + +response(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef}, + StreamRef, Status, Headers) -> + case get_stream_by_ref(StreamRef, State) of + #stream{out=false} -> + error_stream_closed(State); + S = #stream{id=StreamID, version=Version} -> + Out = false =/= lists:keyfind(<<"content-type">>, 1, Headers), + Transport:send(Socket, cow_spdy:syn_reply(Zdef, + StreamID, not Out, Status, Version, Headers)), + if Out -> + State; + true -> + out_fin_stream(S, State) + end; + false -> + error_stream_not_found(State) + end. + +response(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef}, + StreamRef, Status, Headers, Body) -> + case get_stream_by_ref(StreamRef, State) of + #stream{out=false} -> + error_stream_closed(State); + S = #stream{id=StreamID, version=Version} -> + Transport:send(Socket, [ + cow_spdy:syn_reply(Zdef, + StreamID, false, Status, Version, Headers), + cow_spdy:data(S#stream.id, true, Body) + ]), + out_fin_stream(S, State); + false -> + error_stream_not_found(State) + end. + +data(State=#spdy_state{socket=Socket, transport=Transport}, + StreamRef, IsFin, Data) -> + case get_stream_by_ref(StreamRef, State) of + #stream{out=false} -> + error_stream_closed(State); + S = #stream{} -> + IsFin2 = IsFin =:= fin, + Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)), + if IsFin2 -> + out_fin_stream(S, State); + true -> + State + end; + false -> + error_stream_not_found(State) + end. + +cancel(State=#spdy_state{socket=Socket, transport=Transport}, + StreamRef) -> + case get_stream_by_ref(StreamRef, State) of + #stream{id=StreamID} -> + Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)), + delete_stream(StreamID, State); + false -> + error_stream_not_found(State) + end. + +error_stream_closed(State=#spdy_state{owner=Owner}) -> + Owner ! {gun, error, self(), {badstate, + "The stream has already been closed."}}, + State. + +error_stream_not_found(State=#spdy_state{owner=Owner}) -> + Owner ! {gun, error, self(), {badstate, + "The stream cannot be found."}}, + State. + +%% Streams. + +new_stream(StreamID, StreamRef, In, Out, Version, + State=#spdy_state{streams=Streams}) -> + New = #stream{id=StreamID, ref=StreamRef, + in=In, out=Out, version=Version}, + State#spdy_state{streams=[New|Streams]}. + +get_stream_by_id(StreamID, #spdy_state{streams=Streams}) -> + case lists:keyfind(StreamID, #stream.id, Streams) of + false -> false; + S -> S + end. + +get_stream_by_ref(StreamRef, #spdy_state{streams=Streams}) -> + case lists:keyfind(StreamRef, #stream.id, Streams) of + false -> false; + S -> S + end. + +delete_stream(StreamID, State=#spdy_state{streams=Streams}) -> + Streams2 = lists:keydelete(StreamID, #stream.id, Streams), + State#spdy_state{streams=Streams2}. + +in_fin_stream(S=#stream{out=false}, State) -> + delete_stream(S#stream.id, State); +in_fin_stream(S, State=#spdy_state{streams=Streams}) -> + Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, + S#stream{in=false}), + State#spdy_state{streams=Streams2}. + +out_fin_stream(S=#stream{in=false}, State) -> + delete_stream(S#stream.id, State); +out_fin_stream(S, State=#spdy_state{streams=Streams}) -> + Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, + S#stream{out=false}), + State#spdy_state{streams=Streams2}. diff --git a/src/gun_sup.erl b/src/gun_sup.erl new file mode 100644 index 0000000..b7a9c82 --- /dev/null +++ b/src/gun_sup.erl @@ -0,0 +1,39 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @private +-module(gun_sup). +-behaviour(supervisor). + +%% API. +-export([start_link/0]). + +%% supervisor. +-export([init/1]). + +-define(SUPERVISOR, ?MODULE). + +%% API. + +-spec start_link() -> {ok, pid()}. +start_link() -> + supervisor:start_link({local, ?SUPERVISOR}, ?MODULE, []). + +%% supervisor. + +init([]) -> + Procs = [ + {gun, {gun, start_link, []}, + transient, 5000, worker, [gun]}], + {ok, {{simple_one_for_one, 10, 10}, Procs}}. diff --git a/test/twitter_SUITE.erl b/test/twitter_SUITE.erl new file mode 100644 index 0000000..26afdc3 --- /dev/null +++ b/test/twitter_SUITE.erl @@ -0,0 +1,70 @@ +%% Copyright (c) 2013, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(twitter_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +%% ct. +-export([all/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). + +%% Tests. +-export([spdy/1]). + +%% ct. + +all() -> + [spdy]. + +init_per_suite(Config) -> + ok = application:start(ranch), + ok = application:start(crypto), + ok = application:start(asn1), + ok = application:start(public_key), + ok = application:start(ssl), + ok = application:start(gun), + Config. + +end_per_suite(_) -> + ok = application:stop(gun), + ok = application:stop(ssl), + ok = application:stop(public_key), + ok = application:stop(asn1), + ok = application:stop(crypto), + ok = application:stop(ranch), + ok. + +spdy(_) -> + {ok, Pid} = gun:open("twitter.com", 443, []), + Ref = gun:get(Pid, "/"), + receive + {gun, response, Pid, Ref, Status, Headers} -> + ct:print("response ~p ~p", [Status, Headers]), + data_loop(Pid, Ref) + after 5000 -> + error(timeout) + end. + +data_loop(Pid, Ref) -> + receive + {gun, data, Pid, Ref, nofin, Data} -> + ct:print("data ~p", [Data]), + data_loop(Pid, Ref); + {gun, data, Pid, Ref, fin, Data} -> + ct:print("data ~p~nend", [Data]) + after 5000 -> + error(timeout) + end. |