aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2015-03-12 19:22:19 +0100
committerLoïc Hoguin <[email protected]>2015-03-12 19:22:19 +0100
commitc409897f508eedff8ecc6f0860c9379fcc11bf23 (patch)
treed5a651df4ef5e8f9c6c6bb77366defa53c686e20 /src/gun.erl
parentea2de24f18741fc89d7a1dd6a3a0a43f3ccb1fd4 (diff)
downloadgun-c409897f508eedff8ecc6f0860c9379fcc11bf23.tar.gz
gun-c409897f508eedff8ecc6f0860c9379fcc11bf23.tar.bz2
gun-c409897f508eedff8ecc6f0860c9379fcc11bf23.zip
Add initial Websocket support
All autobahntestsuite tests pass including the permessage-deflate compression tests. Some of the tests pass in a non-strict fashion. They are testing for protocol errors and expect events to happen in a particular order, which is not respected by Gun. Gun fails earlier than is expected due to concurrent processing of frames. The implementation when error occurs during handshake is probably a bit rough at this point. The documentation is also incomplete and/or wrong at this time, though this is the general state of the Gun documentation and will be resolved in a separate commit.
Diffstat (limited to 'src/gun.erl')
-rw-r--r--src/gun.erl84
1 files changed, 53 insertions, 31 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 71af26e..3b98732 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -58,6 +58,7 @@
%% Websocket.
-export([ws_upgrade/2]).
-export([ws_upgrade/3]).
+-export([ws_upgrade/4]).
-export([ws_send/2]).
%% Debug.
@@ -85,6 +86,8 @@
| {type, conn_type()}].
-export_type([opts/0]).
+-type ws_opts() :: [{compress, boolean()}].
+
-record(state, {
parent :: pid(),
owner :: pid(),
@@ -98,7 +101,7 @@
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
protocol :: module(),
- proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY and WS too.
+ proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY too.
protocol_state :: any()
}).
@@ -338,14 +341,19 @@ cancel(ServerPid, StreamRef) ->
%% Websocket.
--spec ws_upgrade(pid(), iodata()) -> ok.
+-spec ws_upgrade(pid(), iodata()) -> reference().
ws_upgrade(ServerPid, Path) ->
- ws_upgrade(ServerPid, Path, []).
+ ws_upgrade(ServerPid, Path, [], []).
--spec ws_upgrade(pid(), iodata(), headers()) -> ok.
+-spec ws_upgrade(pid(), iodata(), headers()) -> reference().
ws_upgrade(ServerPid, Path, Headers) ->
- _ = ServerPid ! {ws_upgrade, self(), Path, Headers},
- ok.
+ ws_upgrade(ServerPid, Path, Headers, []).
+
+-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference().
+ws_upgrade(ServerPid, Path, Headers, Opts) ->
+ StreamRef = make_ref(),
+ _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
+ StreamRef.
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
@@ -428,8 +436,10 @@ connect(State=#state{owner=Owner, host=Host, port=Port, type=Type,
retry(State, Retries - 1)
end.
-retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when
- is_reference(KeepaliveRef) ->
+%% Exit normally if the retry functionality has been disabled.
+retry(_, 0) ->
+ ok;
+retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) ->
_ = erlang:cancel_timer(KeepaliveRef),
%% Flush if we have a keepalive message
receive
@@ -458,7 +468,7 @@ before_loop(State=#state{keepalive=Keepalive}) ->
KeepaliveRef = erlang:send_after(Keepalive, self(), keepalive),
loop(State#state{keepalive_ref=KeepaliveRef}).
-loop(State=#state{parent=Parent, owner=Owner, host=Host,
+loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port,
retry=Retry, socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState}) ->
{OK, Closed, Error} = Transport:messages(),
@@ -470,7 +480,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
Transport:close(Socket),
retry(State#state{socket=undefined, transport=undefined,
protocol=undefined}, Retry);
- ProtoState2 ->
+ {upgrade, Protocol2, ProtoState2} ->
+ ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2});
+ ProtoState2 ->
loop(State#state{protocol_state=ProtoState2})
end;
{Closed, Socket} ->
@@ -494,11 +506,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
before_loop(State#state{protocol_state=ProtoState2});
{request, Owner, StreamRef, Method, Path, Headers} ->
ProtoState2 = Protocol:request(ProtoState,
- StreamRef, Method, Host, Path, Headers),
+ StreamRef, Method, Host, Port, Path, Headers),
loop(State#state{protocol_state=ProtoState2});
{request, Owner, StreamRef, Method, Path, Headers, Body} ->
ProtoState2 = Protocol:request(ProtoState,
- StreamRef, Method, Host, Path, Headers, Body),
+ StreamRef, Method, Host, Port, Path, Headers, Body),
loop(State#state{protocol_state=ProtoState2});
{data, Owner, StreamRef, IsFin, Data} ->
ProtoState2 = Protocol:data(ProtoState,
@@ -507,11 +519,10 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
{cancel, Owner, StreamRef} ->
ProtoState2 = Protocol:cancel(ProtoState, StreamRef),
loop(State#state{protocol_state=ProtoState2});
- {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy ->
- %% @todo
- ProtoState2 = Protocol:ws_upgrade(ProtoState,
- Path, Headers),
- ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2});
+ {ws_upgrade, Owner, StreamRef, Path, Headers, Opts} when Protocol =/= gun_spdy ->
+ ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, Opts),
+ loop(State#state{protocol_state=ProtoState2});
+ %% @todo can fail if http/1.0
{shutdown, Owner} ->
%% @todo Protocol:shutdown?
ok;
@@ -525,9 +536,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
element(2, Any) ! {gun_error, self(), {notowner,
"Operations are restricted to the owner of the connection."}},
loop(State);
- {ws_upgrade, _, _, _} ->
- Owner ! {gun_error, self(), {badstate,
- "Websocket over SPDY isn't supported."}},
+ {ws_upgrade, _, StreamRef, _, _} ->
+ Owner ! {gun_error, self(), StreamRef, {badstate,
+ "Websocket is only supported over HTTP/1.1."}},
loop(State);
{ws_send, _, _} ->
Owner ! {gun_error, self(), {badstate,
@@ -545,23 +556,34 @@ ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket,
ok = Transport:setopts(Socket, [{active, once}]),
receive
{OK, Socket, Data} ->
- ProtoState2 = Protocol:handle(ProtoState, Data),
- ws_loop(State#state{protocol_state=ProtoState2});
+ case Protocol:handle(Data, ProtoState) of
+ close ->
+ Transport:close(Socket),
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
+ ProtoState2 ->
+ ws_loop(State#state{protocol_state=ProtoState2})
+ end;
{Closed, Socket} ->
Transport:close(Socket),
- retry(State#state{socket=undefined, transport=undefined,
- protocol=undefined}, Retry);
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
{Error, Socket, _} ->
Transport:close(Socket),
- retry(State#state{socket=undefined, transport=undefined,
- protocol=undefined}, Retry);
- %% @todo keepalive
- {ws_send, Owner, Frames} when is_list(Frames) ->
- todo; %% @todo
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
+ %% Ignore any previous HTTP keep-alive.
+ keepalive ->
+ ws_loop(State);
+% {ws_send, Owner, Frames} when is_list(Frames) ->
+% todo; %% @todo
{ws_send, Owner, Frame} ->
- {todo, Frame}; %% @todo
+ case Protocol:send(Frame, ProtoState) of
+ close ->
+ Transport:close(Socket),
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
+ ProtoState2 ->
+ ws_loop(State#state{protocol_state=ProtoState2})
+ end;
{shutdown, Owner} ->
- %% @todo Protocol:shutdown?
+ %% @todo Protocol:shutdown? %% @todo close frame
ok;
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],