aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun_http.erl73
-rw-r--r--src/gun_ws.erl38
-rw-r--r--src/gun_ws_handler.erl35
4 files changed, 101 insertions, 47 deletions
diff --git a/ebin/gun.app b/ebin/gun.app
index eefe4cc..7fd12ea 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, gun, [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "1.0.0-pre.2"},
- {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_spdy','gun_sse','gun_sup','gun_ws']},
+ {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_spdy','gun_sse','gun_sup','gun_ws','gun_ws_handler']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib,ranch]},
{mod, {gun_app, []}},
diff --git a/src/gun_http.erl b/src/gun_http.erl
index f5eef71..8ebd42e 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -29,7 +29,8 @@
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked.
--type websocket_info() :: {websocket, reference(), binary(), [binary()], [], gun:ws_opts()}. %% key, extensions, protocols, options
+%% @todo Make that a record.
+-type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options
-record(stream, {
ref :: reference() | websocket_info(),
@@ -164,8 +165,8 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
- {101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} ->
- ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts);
+ {101, {websocket, _, WsKey, WsExtensions, WsOpts}} ->
+ ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsOpts);
_ ->
In = response_io_from_headers(Method, Version, Status, Headers),
IsFin = case In of head -> fin; _ -> nofin end,
@@ -174,7 +175,7 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
ok;
true ->
StreamRef2 = case StreamRef of
- {websocket, SR, _, _, _, _} -> SR;
+ {websocket, SR, _, _, _} -> SR;
_ -> StreamRef
end,
Owner ! {gun_response, self(), StreamRef2,
@@ -330,7 +331,7 @@ cancel(State, StreamRef) ->
%% HTTP does not provide any way to figure out what streams are unprocessed.
down(#http_state{streams=Streams}) ->
KilledStreams = [case Ref of
- {websocket, Ref2, _, _, _, _} -> Ref2;
+ {websocket, Ref2, _, _, _} -> Ref2;
_ -> Ref
end || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
@@ -425,53 +426,59 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
error; %% @todo
ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
- StreamRef, Host, Port, Path, Headers, WsOpts) ->
+ StreamRef, Host, Port, Path, Headers0, WsOpts) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
true -> {[{<<"sec-websocket-extensions">>,
<<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}
- |Headers],
+ |Headers0],
[<<"permessage-deflate">>]};
- false -> {Headers, []}
+ false -> {Headers0, []}
+ end,
+ Headers2 = case maps:get(protocols, WsOpts, []) of
+ [] -> Headers1;
+ ProtoOpt ->
+ << _, _, Proto/bits >> = iolist_to_binary([[<<", ">>, P] || {P, _} <- ProtoOpt]),
+ [{<<"sec-websocket-protocol">>, Proto}|Headers1]
end,
Key = cow_ws:key(),
- Headers2 = [
+ Headers3 = [
{<<"connection">>, <<"upgrade">>},
{<<"upgrade">>, <<"websocket">>},
{<<"sec-websocket-version">>, <<"13">>},
{<<"sec-websocket-key">>, Key}
- |Headers1
+ |Headers2
],
IsSecure = Transport:secure(),
- Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
- true -> Headers2;
- false when Port =:= 80, not IsSecure -> [{<<"host">>, Host}|Headers2];
- false when Port =:= 443, IsSecure -> [{<<"host">>, Host}|Headers2];
- false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]
+ Headers = case lists:keymember(<<"host">>, 1, Headers0) of
+ true -> Headers3;
+ false when Port =:= 80, not IsSecure -> [{<<"host">>, Host}|Headers3];
+ false when Port =:= 443, IsSecure -> [{<<"host">>, Host}|Headers3];
+ false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers3]
end,
- Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)),
+ Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)),
new_stream(State#http_state{connection=keepalive, out=head},
- {websocket, StreamRef, Key, GunExtensions, [], WsOpts}, <<"GET">>).
+ {websocket, StreamRef, Key, GunExtensions, WsOpts}, <<"GET">>).
-ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) ->
+ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) ->
%% @todo check upgrade, connection
case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
false ->
close;
{_, Accept} ->
case cow_ws:encode_key(Key) of
- Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts);
+ Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts);
_ -> close
end
end.
-ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) ->
+ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts) ->
case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
false ->
- ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols);
+ ws_handshake_protocols(Buffer, State, Headers, #{}, Opts);
{_, ExtHd} ->
case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
close -> close;
- Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols)
+ Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts)
end
end.
@@ -493,11 +500,23 @@ ws_validate_extensions(_, _, _, _) ->
close.
%% @todo Validate protocols.
-ws_handshake_protocols(Buffer, State, Headers, Extensions, _GunProtocols = []) ->
- Protocols = [],
- ws_handshake_end(Buffer, State, Headers, Extensions, Protocols).
+ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) ->
+ case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
+ false ->
+ ws_handshake_end(Buffer, State, Headers, Extensions,
+ maps:get(default_protocol, Opts, gun_ws_handler), Opts);
+ {_, Proto} ->
+ ProtoOpt = maps:get(protocols, Opts, []),
+ case lists:keyfind(Proto, 1, ProtoOpt) of
+ {_, Handler} ->
+ ws_handshake_end(Buffer, State, Headers, Extensions, Handler, Opts);
+ false ->
+ close
+ end
+ end.
-ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Headers, Extensions, Protocols) ->
+ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport},
+ Headers, Extensions, Handler, Opts) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -506,4 +525,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
{OK, _, _} = Transport:messages(),
self() ! {OK, Socket, Buffer}
end,
- gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Protocols).
+ gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts).
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index a8154d6..a3ec618 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -16,7 +16,7 @@
-export([check_options/1]).
-export([name/0]).
--export([init/6]).
+-export([init/7]).
-export([handle/2]).
-export([send/2]).
-export([down/1]).
@@ -38,9 +38,10 @@
buffer = <<>> :: binary(),
in = head :: head | #payload{} | close,
frag_state = undefined :: cow_ws:frag_state(),
- frag_buffer = <<>> :: binary(),
utf8_state = 0 :: cow_ws:utf8_state(),
- extensions = #{} :: cow_ws:extensions()
+ extensions = #{} :: cow_ws:extensions(),
+ handler :: module(),
+ handler_state :: any()
}).
check_options(Opts) ->
@@ -55,10 +56,11 @@ do_check_options([Opt|_]) ->
name() -> ws.
-%% @todo Protocols
-init(Owner, Socket, Transport, Headers, Extensions, _Protocols) ->
+init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts) ->
Owner ! {gun_ws_upgrade, self(), ok, Headers},
- {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions}}.
+ HandlerState = Handler:init(Owner, Headers, Opts),
+ {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
+ extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
handle(_, State=#ws_state{in=close}) ->
@@ -94,29 +96,27 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
close(Error, State)
end.
-dispatch(Rest, State=#ws_state{owner=Owner, frag_state=FragState, frag_buffer=SoFar},
+dispatch(Rest, State0=#ws_state{frag_state=FragState,
+ handler=Handler, handler_state=HandlerState0},
Type0, Payload0, CloseCode0) ->
case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of
- {fragment, nofin, _, Payload} ->
- handle(Rest, State#ws_state{frag_buffer= << SoFar/binary, Payload/binary >>});
- {fragment, fin, Type, Payload} ->
- Owner ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}},
- handle(Rest, State#ws_state{frag_state=undefined, frag_buffer= <<>>});
ping ->
- State2 = send(pong, State),
- handle(Rest, State2);
+ State = send(pong, State0),
+ handle(Rest, State);
{ping, Payload} ->
- State2 = send({pong, Payload}, State),
- handle(Rest, State2);
- pong ->
+ State = send({pong, Payload}, State0),
handle(Rest, State);
+ pong ->
+ handle(Rest, State0);
{pong, _} ->
- handle(Rest, State);
+ handle(Rest, State0);
Frame ->
- Owner ! {gun_ws, self(), Frame},
+ HandlerState = Handler:handle(Frame, HandlerState0),
+ State = State0#ws_state{handler_state=HandlerState},
case Frame of
close -> handle(Rest, State#ws_state{in=close});
{close, _, _} -> handle(Rest, State#ws_state{in=close});
+ {fragment, fin, _, _} -> handle(Rest, State#ws_state{frag_state=undefined});
_ -> handle(Rest, State)
end
end.
diff --git a/src/gun_ws_handler.erl b/src/gun_ws_handler.erl
new file mode 100644
index 0000000..4356ab5
--- /dev/null
+++ b/src/gun_ws_handler.erl
@@ -0,0 +1,35 @@
+%% Copyright (c) 2017, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_ws_handler).
+
+-export([init/3]).
+-export([handle/2]).
+
+-record(state, {
+ reply_to :: pid(),
+ frag_buffer = <<>> :: binary()
+}).
+
+init(ReplyTo, _, _) ->
+ #state{reply_to=ReplyTo}.
+
+handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) ->
+ State#state{frag_buffer= << SoFar/binary, Payload/binary >>};
+handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, frag_buffer=SoFar}) ->
+ ReplyTo ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}},
+ State#state{frag_buffer= <<>>};
+handle(Frame, State=#state{reply_to=ReplyTo}) ->
+ ReplyTo ! {gun_ws, self(), Frame},
+ State.