diff options
-rw-r--r-- | ebin/gun.app | 2 | ||||
-rw-r--r-- | src/gun_http.erl | 73 | ||||
-rw-r--r-- | src/gun_ws.erl | 38 | ||||
-rw-r--r-- | src/gun_ws_handler.erl | 35 |
4 files changed, 101 insertions, 47 deletions
diff --git a/ebin/gun.app b/ebin/gun.app index eefe4cc..7fd12ea 100644 --- a/ebin/gun.app +++ b/ebin/gun.app @@ -1,7 +1,7 @@ {application, gun, [ {description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."}, {vsn, "1.0.0-pre.2"}, - {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_spdy','gun_sse','gun_sup','gun_ws']}, + {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_spdy','gun_sse','gun_sup','gun_ws','gun_ws_handler']}, {registered, [gun_sup]}, {applications, [kernel,stdlib,ssl,cowlib,ranch]}, {mod, {gun_app, []}}, diff --git a/src/gun_http.erl b/src/gun_http.erl index f5eef71..8ebd42e 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -29,7 +29,8 @@ -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked. --type websocket_info() :: {websocket, reference(), binary(), [binary()], [], gun:ws_opts()}. %% key, extensions, protocols, options +%% @todo Make that a record. +-type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options -record(stream, { ref :: reference() | websocket_info(), @@ -164,8 +165,8 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of - {101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} -> - ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts); + {101, {websocket, _, WsKey, WsExtensions, WsOpts}} -> + ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsOpts); _ -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, @@ -174,7 +175,7 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, ok; true -> StreamRef2 = case StreamRef of - {websocket, SR, _, _, _, _} -> SR; + {websocket, SR, _, _, _} -> SR; _ -> StreamRef end, Owner ! {gun_response, self(), StreamRef2, @@ -330,7 +331,7 @@ cancel(State, StreamRef) -> %% HTTP does not provide any way to figure out what streams are unprocessed. down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of - {websocket, Ref2, _, _, _, _} -> Ref2; + {websocket, Ref2, _, _, _} -> Ref2; _ -> Ref end || #stream{ref=Ref} <- Streams], {KilledStreams, []}. @@ -425,53 +426,59 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> error; %% @todo ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, - StreamRef, Host, Port, Path, Headers, WsOpts) -> + StreamRef, Host, Port, Path, Headers0, WsOpts) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} - |Headers], + |Headers0], [<<"permessage-deflate">>]}; - false -> {Headers, []} + false -> {Headers0, []} + end, + Headers2 = case maps:get(protocols, WsOpts, []) of + [] -> Headers1; + ProtoOpt -> + << _, _, Proto/bits >> = iolist_to_binary([[<<", ">>, P] || {P, _} <- ProtoOpt]), + [{<<"sec-websocket-protocol">>, Proto}|Headers1] end, Key = cow_ws:key(), - Headers2 = [ + Headers3 = [ {<<"connection">>, <<"upgrade">>}, {<<"upgrade">>, <<"websocket">>}, {<<"sec-websocket-version">>, <<"13">>}, {<<"sec-websocket-key">>, Key} - |Headers1 + |Headers2 ], IsSecure = Transport:secure(), - Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - true -> Headers2; - false when Port =:= 80, not IsSecure -> [{<<"host">>, Host}|Headers2]; - false when Port =:= 443, IsSecure -> [{<<"host">>, Host}|Headers2]; - false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2] + Headers = case lists:keymember(<<"host">>, 1, Headers0) of + true -> Headers3; + false when Port =:= 80, not IsSecure -> [{<<"host">>, Host}|Headers3]; + false when Port =:= 443, IsSecure -> [{<<"host">>, Host}|Headers3]; + false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers3] end, - Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)), + Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)), new_stream(State#http_state{connection=keepalive, out=head}, - {websocket, StreamRef, Key, GunExtensions, [], WsOpts}, <<"GET">>). + {websocket, StreamRef, Key, GunExtensions, WsOpts}, <<"GET">>). -ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) -> +ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) -> %% @todo check upgrade, connection case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of false -> close; {_, Accept} -> case cow_ws:encode_key(Key) of - Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts); + Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts); _ -> close end end. -ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) -> +ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts) -> case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of false -> - ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols); + ws_handshake_protocols(Buffer, State, Headers, #{}, Opts); {_, ExtHd} -> case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of close -> close; - Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols) + Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) end end. @@ -493,11 +500,23 @@ ws_validate_extensions(_, _, _, _) -> close. %% @todo Validate protocols. -ws_handshake_protocols(Buffer, State, Headers, Extensions, _GunProtocols = []) -> - Protocols = [], - ws_handshake_end(Buffer, State, Headers, Extensions, Protocols). +ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) -> + case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of + false -> + ws_handshake_end(Buffer, State, Headers, Extensions, + maps:get(default_protocol, Opts, gun_ws_handler), Opts); + {_, Proto} -> + ProtoOpt = maps:get(protocols, Opts, []), + case lists:keyfind(Proto, 1, ProtoOpt) of + {_, Handler} -> + ws_handshake_end(Buffer, State, Headers, Extensions, Handler, Opts); + false -> + close + end + end. -ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Headers, Extensions, Protocols) -> +ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, + Headers, Extensions, Handler, Opts) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> @@ -506,4 +525,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans {OK, _, _} = Transport:messages(), self() ! {OK, Socket, Buffer} end, - gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Protocols). + gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts). diff --git a/src/gun_ws.erl b/src/gun_ws.erl index a8154d6..a3ec618 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -16,7 +16,7 @@ -export([check_options/1]). -export([name/0]). --export([init/6]). +-export([init/7]). -export([handle/2]). -export([send/2]). -export([down/1]). @@ -38,9 +38,10 @@ buffer = <<>> :: binary(), in = head :: head | #payload{} | close, frag_state = undefined :: cow_ws:frag_state(), - frag_buffer = <<>> :: binary(), utf8_state = 0 :: cow_ws:utf8_state(), - extensions = #{} :: cow_ws:extensions() + extensions = #{} :: cow_ws:extensions(), + handler :: module(), + handler_state :: any() }). check_options(Opts) -> @@ -55,10 +56,11 @@ do_check_options([Opt|_]) -> name() -> ws. -%% @todo Protocols -init(Owner, Socket, Transport, Headers, Extensions, _Protocols) -> +init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts) -> Owner ! {gun_ws_upgrade, self(), ok, Headers}, - {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions}}. + HandlerState = Handler:init(Owner, Headers, Opts), + {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, + extensions=Extensions, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. handle(_, State=#ws_state{in=close}) -> @@ -94,29 +96,27 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke close(Error, State) end. -dispatch(Rest, State=#ws_state{owner=Owner, frag_state=FragState, frag_buffer=SoFar}, +dispatch(Rest, State0=#ws_state{frag_state=FragState, + handler=Handler, handler_state=HandlerState0}, Type0, Payload0, CloseCode0) -> case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of - {fragment, nofin, _, Payload} -> - handle(Rest, State#ws_state{frag_buffer= << SoFar/binary, Payload/binary >>}); - {fragment, fin, Type, Payload} -> - Owner ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}}, - handle(Rest, State#ws_state{frag_state=undefined, frag_buffer= <<>>}); ping -> - State2 = send(pong, State), - handle(Rest, State2); + State = send(pong, State0), + handle(Rest, State); {ping, Payload} -> - State2 = send({pong, Payload}, State), - handle(Rest, State2); - pong -> + State = send({pong, Payload}, State0), handle(Rest, State); + pong -> + handle(Rest, State0); {pong, _} -> - handle(Rest, State); + handle(Rest, State0); Frame -> - Owner ! {gun_ws, self(), Frame}, + HandlerState = Handler:handle(Frame, HandlerState0), + State = State0#ws_state{handler_state=HandlerState}, case Frame of close -> handle(Rest, State#ws_state{in=close}); {close, _, _} -> handle(Rest, State#ws_state{in=close}); + {fragment, fin, _, _} -> handle(Rest, State#ws_state{frag_state=undefined}); _ -> handle(Rest, State) end end. diff --git a/src/gun_ws_handler.erl b/src/gun_ws_handler.erl new file mode 100644 index 0000000..4356ab5 --- /dev/null +++ b/src/gun_ws_handler.erl @@ -0,0 +1,35 @@ +%% Copyright (c) 2017, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_ws_handler). + +-export([init/3]). +-export([handle/2]). + +-record(state, { + reply_to :: pid(), + frag_buffer = <<>> :: binary() +}). + +init(ReplyTo, _, _) -> + #state{reply_to=ReplyTo}. + +handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) -> + State#state{frag_buffer= << SoFar/binary, Payload/binary >>}; +handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, frag_buffer=SoFar}) -> + ReplyTo ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}}, + State#state{frag_buffer= <<>>}; +handle(Frame, State=#state{reply_to=ReplyTo}) -> + ReplyTo ! {gun_ws, self(), Frame}, + State. |