From c409897f508eedff8ecc6f0860c9379fcc11bf23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Thu, 12 Mar 2015 19:22:19 +0100 Subject: Add initial Websocket support All autobahntestsuite tests pass including the permessage-deflate compression tests. Some of the tests pass in a non-strict fashion. They are testing for protocol errors and expect events to happen in a particular order, which is not respected by Gun. Gun fails earlier than is expected due to concurrent processing of frames. The implementation when error occurs during handshake is probably a bit rough at this point. The documentation is also incomplete and/or wrong at this time, though this is the general state of the Gun documentation and will be resolved in a separate commit. --- src/gun.erl | 84 ++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 53 insertions(+), 31 deletions(-) (limited to 'src/gun.erl') diff --git a/src/gun.erl b/src/gun.erl index 71af26e..3b98732 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -58,6 +58,7 @@ %% Websocket. -export([ws_upgrade/2]). -export([ws_upgrade/3]). +-export([ws_upgrade/4]). -export([ws_send/2]). %% Debug. @@ -85,6 +86,8 @@ | {type, conn_type()}]. -export_type([opts/0]). +-type ws_opts() :: [{compress, boolean()}]. + -record(state, { parent :: pid(), owner :: pid(), @@ -98,7 +101,7 @@ socket :: inet:socket() | ssl:sslsocket(), transport :: module(), protocol :: module(), - proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY and WS too. + proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY too. protocol_state :: any() }). @@ -338,14 +341,19 @@ cancel(ServerPid, StreamRef) -> %% Websocket. --spec ws_upgrade(pid(), iodata()) -> ok. +-spec ws_upgrade(pid(), iodata()) -> reference(). ws_upgrade(ServerPid, Path) -> - ws_upgrade(ServerPid, Path, []). + ws_upgrade(ServerPid, Path, [], []). --spec ws_upgrade(pid(), iodata(), headers()) -> ok. +-spec ws_upgrade(pid(), iodata(), headers()) -> reference(). ws_upgrade(ServerPid, Path, Headers) -> - _ = ServerPid ! {ws_upgrade, self(), Path, Headers}, - ok. + ws_upgrade(ServerPid, Path, Headers, []). + +-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference(). +ws_upgrade(ServerPid, Path, Headers, Opts) -> + StreamRef = make_ref(), + _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts}, + StreamRef. -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> @@ -428,8 +436,10 @@ connect(State=#state{owner=Owner, host=Host, port=Port, type=Type, retry(State, Retries - 1) end. -retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when - is_reference(KeepaliveRef) -> +%% Exit normally if the retry functionality has been disabled. +retry(_, 0) -> + ok; +retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) -> _ = erlang:cancel_timer(KeepaliveRef), %% Flush if we have a keepalive message receive @@ -458,7 +468,7 @@ before_loop(State=#state{keepalive=Keepalive}) -> KeepaliveRef = erlang:send_after(Keepalive, self(), keepalive), loop(State#state{keepalive_ref=KeepaliveRef}). -loop(State=#state{parent=Parent, owner=Owner, host=Host, +loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, retry=Retry, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> {OK, Closed, Error} = Transport:messages(), @@ -470,7 +480,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, Transport:close(Socket), retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); - ProtoState2 -> + {upgrade, Protocol2, ProtoState2} -> + ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); + ProtoState2 -> loop(State#state{protocol_state=ProtoState2}) end; {Closed, Socket} -> @@ -494,11 +506,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, before_loop(State#state{protocol_state=ProtoState2}); {request, Owner, StreamRef, Method, Path, Headers} -> ProtoState2 = Protocol:request(ProtoState, - StreamRef, Method, Host, Path, Headers), + StreamRef, Method, Host, Port, Path, Headers), loop(State#state{protocol_state=ProtoState2}); {request, Owner, StreamRef, Method, Path, Headers, Body} -> ProtoState2 = Protocol:request(ProtoState, - StreamRef, Method, Host, Path, Headers, Body), + StreamRef, Method, Host, Port, Path, Headers, Body), loop(State#state{protocol_state=ProtoState2}); {data, Owner, StreamRef, IsFin, Data} -> ProtoState2 = Protocol:data(ProtoState, @@ -507,11 +519,10 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, {cancel, Owner, StreamRef} -> ProtoState2 = Protocol:cancel(ProtoState, StreamRef), loop(State#state{protocol_state=ProtoState2}); - {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy -> - %% @todo - ProtoState2 = Protocol:ws_upgrade(ProtoState, - Path, Headers), - ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2}); + {ws_upgrade, Owner, StreamRef, Path, Headers, Opts} when Protocol =/= gun_spdy -> + ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, Opts), + loop(State#state{protocol_state=ProtoState2}); + %% @todo can fail if http/1.0 {shutdown, Owner} -> %% @todo Protocol:shutdown? ok; @@ -525,9 +536,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, element(2, Any) ! {gun_error, self(), {notowner, "Operations are restricted to the owner of the connection."}}, loop(State); - {ws_upgrade, _, _, _} -> - Owner ! {gun_error, self(), {badstate, - "Websocket over SPDY isn't supported."}}, + {ws_upgrade, _, StreamRef, _, _} -> + Owner ! {gun_error, self(), StreamRef, {badstate, + "Websocket is only supported over HTTP/1.1."}}, loop(State); {ws_send, _, _} -> Owner ! {gun_error, self(), {badstate, @@ -545,23 +556,34 @@ ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket, ok = Transport:setopts(Socket, [{active, once}]), receive {OK, Socket, Data} -> - ProtoState2 = Protocol:handle(ProtoState, Data), - ws_loop(State#state{protocol_state=ProtoState2}); + case Protocol:handle(Data, ProtoState) of + close -> + Transport:close(Socket), + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + ws_loop(State#state{protocol_state=ProtoState2}) + end; {Closed, Socket} -> Transport:close(Socket), - retry(State#state{socket=undefined, transport=undefined, - protocol=undefined}, Retry); + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); {Error, Socket, _} -> Transport:close(Socket), - retry(State#state{socket=undefined, transport=undefined, - protocol=undefined}, Retry); - %% @todo keepalive - {ws_send, Owner, Frames} when is_list(Frames) -> - todo; %% @todo + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); + %% Ignore any previous HTTP keep-alive. + keepalive -> + ws_loop(State); +% {ws_send, Owner, Frames} when is_list(Frames) -> +% todo; %% @todo {ws_send, Owner, Frame} -> - {todo, Frame}; %% @todo + case Protocol:send(Frame, ProtoState) of + close -> + Transport:close(Socket), + retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry); + ProtoState2 -> + ws_loop(State#state{protocol_state=ProtoState2}) + end; {shutdown, Owner} -> - %% @todo Protocol:shutdown? + %% @todo Protocol:shutdown? %% @todo close frame ok; {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], -- cgit v1.2.3