From 34307a584149abbf0b2e5b33beb2fca4c585b0d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Sun, 3 Jun 2018 20:31:09 +0200 Subject: Change messages to gun_upgrade and gun_ws with stream reference --- ebin/gun.app | 2 +- src/gun.erl | 9 ++++++-- src/gun_http.erl | 36 +++++++++++++++++------------- src/gun_ws.erl | 8 +++---- src/gun_ws_h.erl | 38 ++++++++++++++++++++++++++++++++ src/gun_ws_handler.erl | 35 ----------------------------- test/gun_SUITE.erl | 2 +- test/ws_SUITE.erl | 60 +++++++++++++++++++++++++------------------------- 8 files changed, 102 insertions(+), 88 deletions(-) create mode 100644 src/gun_ws_h.erl delete mode 100644 src/gun_ws_handler.erl diff --git a/ebin/gun.app b/ebin/gun.app index f41a23e..de3896c 100644 --- a/ebin/gun.app +++ b/ebin/gun.app @@ -1,7 +1,7 @@ {application, 'gun', [ {description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."}, {vsn, "1.0.0-pre.5"}, - {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_sse','gun_sup','gun_ws','gun_ws_handler']}, + {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_sse','gun_sup','gun_ws','gun_ws_h']}, {registered, [gun_sup]}, {applications, [kernel,stdlib,ssl,cowlib,ranch]}, {mod, {gun_app, []}}, diff --git a/src/gun.erl b/src/gun.erl index cad1da1..1224c82 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -381,6 +381,7 @@ await(ServerPid, StreamRef, Timeout) -> demonitor(MRef, [flush]), Res. +%% @todo Add gun_upgrade and gun_ws? await(ServerPid, StreamRef, Timeout, MRef) -> receive {gun_inform, ServerPid, StreamRef, Status, Headers} -> @@ -494,9 +495,9 @@ flush_pid(ServerPid) -> flush_pid(ServerPid); {gun_error, ServerPid, _} -> flush_pid(ServerPid); - {gun_ws_upgrade, ServerPid, _, _} -> + {gun_upgrade, ServerPid, _, _, _} -> flush_pid(ServerPid); - {gun_ws, ServerPid, _} -> + {gun_ws, ServerPid, _, _} -> flush_pid(ServerPid); {'DOWN', _, process, ServerPid, _} -> flush_pid(ServerPid) @@ -517,6 +518,10 @@ flush_ref(StreamRef) -> {gun_push, _, StreamRef, _, _, _, _, _} -> flush_ref(StreamRef); {gun_error, _, StreamRef, _} -> + flush_ref(StreamRef); + {gun_upgrade, _, StreamRef, _, _} -> + flush_ref(StreamRef); + {gun_ws, _, StreamRef, _} -> flush_ref(StreamRef) after 0 -> ok diff --git a/src/gun_http.erl b/src/gun_http.erl index 7e7bed7..4fc6a17 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -201,8 +201,8 @@ handle_head(Data, State=#http_state{version=ClientVersion, {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of - {101, {websocket, _, WsKey, WsExtensions, WsOpts}} -> - ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsOpts); + {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> + ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts); {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, handle(Rest2, State); @@ -511,26 +511,31 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, ou new_stream(State#http_state{connection=keepalive, out=head}, {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>). -ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) -> +ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> %% @todo check upgrade, connection case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of false -> close; {_, Accept} -> case cow_ws:encode_key(Key) of - Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts); - _ -> close + Accept -> + ws_handshake_extensions(Buffer, State, StreamRef, + Headers, GunExtensions, Opts); + _ -> + close end end. -ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts) -> +ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) -> case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of false -> - ws_handshake_protocols(Buffer, State, Headers, #{}, Opts); + ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts); {_, ExtHd} -> case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of - close -> close; - Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) + close -> + close; + Extensions -> + ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) end end. @@ -552,23 +557,24 @@ ws_validate_extensions(_, _, _, _) -> close. %% @todo Validate protocols. -ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) -> +ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) -> case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of false -> - ws_handshake_end(Buffer, State, Headers, Extensions, - maps:get(default_protocol, Opts, gun_ws_handler), Opts); + ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions, + maps:get(default_protocol, Opts, gun_ws_h), Opts); {_, Proto} -> ProtoOpt = maps:get(protocols, Opts, []), case lists:keyfind(Proto, 1, ProtoOpt) of {_, Handler} -> - ws_handshake_end(Buffer, State, Headers, Extensions, Handler, Opts); + ws_handshake_end(Buffer, State, StreamRef, + Headers, Extensions, Handler, Opts); false -> close end end. ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, - Headers, Extensions, Handler, Opts) -> + StreamRef, Headers, Extensions, Handler, Opts) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> @@ -577,4 +583,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans {OK, _, _} = Transport:messages(), self() ! {OK, Socket, Buffer} end, - gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts). + gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts). diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 6501893..fe12448 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -16,7 +16,7 @@ -export([check_options/1]). -export([name/0]). --export([init/7]). +-export([init/8]). -export([handle/2]). -export([close/2]). -export([send/2]). @@ -66,9 +66,9 @@ do_check_options([Opt|_]) -> name() -> ws. -init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts) -> - Owner ! {gun_ws_upgrade, self(), ok, Headers}, - HandlerState = Handler:init(Owner, Headers, Opts), +init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> + Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, + HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions, handler=Handler, handler_state=HandlerState}}. diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl new file mode 100644 index 0000000..fb6e3d2 --- /dev/null +++ b/src/gun_ws_h.erl @@ -0,0 +1,38 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_ws_h). + +-export([init/4]). +-export([handle/2]). + +-record(state, { + reply_to :: pid(), + stream_ref :: reference(), + frag_buffer = <<>> :: binary() +}). + +init(ReplyTo, StreamRef, _, _) -> + #state{reply_to=ReplyTo, stream_ref=StreamRef}. + +handle({fragment, nofin, _, Payload}, + State=#state{frag_buffer=SoFar}) -> + State#state{frag_buffer= << SoFar/binary, Payload/binary >>}; +handle({fragment, fin, Type, Payload}, + State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) -> + ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}}, + State#state{frag_buffer= <<>>}; +handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> + ReplyTo ! {gun_ws, self(), StreamRef, Frame}, + State. diff --git a/src/gun_ws_handler.erl b/src/gun_ws_handler.erl deleted file mode 100644 index 4356ab5..0000000 --- a/src/gun_ws_handler.erl +++ /dev/null @@ -1,35 +0,0 @@ -%% Copyright (c) 2017, Loïc Hoguin -%% -%% Permission to use, copy, modify, and/or distribute this software for any -%% purpose with or without fee is hereby granted, provided that the above -%% copyright notice and this permission notice appear in all copies. -%% -%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - --module(gun_ws_handler). - --export([init/3]). --export([handle/2]). - --record(state, { - reply_to :: pid(), - frag_buffer = <<>> :: binary() -}). - -init(ReplyTo, _, _) -> - #state{reply_to=ReplyTo}. - -handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) -> - State#state{frag_buffer= << SoFar/binary, Payload/binary >>}; -handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, frag_buffer=SoFar}) -> - ReplyTo ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}}, - State#state{frag_buffer= <<>>}; -handle(Frame, State=#state{reply_to=ReplyTo}) -> - ReplyTo ! {gun_ws, self(), Frame}, - State. diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl index 4b00559..797b7e6 100644 --- a/test/gun_SUITE.erl +++ b/test/gun_SUITE.erl @@ -73,7 +73,7 @@ detect_owner_gone_ws(_) -> gun:await_up(ConnPid), gun:ws_upgrade(ConnPid, "/", []), receive - {gun_ws_upgrade, ConnPid, ok, _} -> + {gun_upgrade, ConnPid, _, [<<"websocket">>], _} -> ok after 1000 -> error(timeout) diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 0630c1b..8a29110 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -31,7 +31,7 @@ init_per_group(autobahn, Config) -> Out = os:cmd("pip show autobahntestsuite ; pip2 show autobahntestsuite"), case string:str(Out, "autobahntestsuite") of 0 -> - ct:print("Skipping the autobahn group because the " + ct:pal("Skipping the autobahn group because the " "Autobahn Test Suite is not installed.~nTo install it, " "please follow the instructions on this page:~n~n " "http://autobahn.ws/testsuite/installation.html"), @@ -91,13 +91,13 @@ receive_infinity(Port) -> end. get_case_count() -> - {Pid, Ref} = connect("/getCaseCount"), + {Pid, MRef, StreamRef} = connect("/getCaseCount"), receive - {gun_ws, Pid, {text, N}} -> - close(Pid, Ref), + {gun_ws, Pid, StreamRef, {text, N}} -> + close(Pid, MRef), binary_to_integer(N); Msg -> - ct:print("Unexpected message ~p", [Msg]), + ct:pal("Unexpected message ~p", [Msg]), terminate(), error(failed) end. @@ -105,36 +105,36 @@ get_case_count() -> run_cases(Total, Total) -> ok; run_cases(N, Total) -> - {Pid, Ref} = connect(["/runCase?case=", integer_to_binary(N + 1), "&agent=Gun"]), - loop(Pid, Ref), + {Pid, MRef, StreamRef} = connect(["/runCase?case=", integer_to_binary(N + 1), "&agent=Gun"]), + loop(Pid, MRef, StreamRef), update_reports(), run_cases(N + 1, Total). -loop(Pid, Ref) -> +loop(Pid, MRef, StreamRef) -> receive - {gun_ws, Pid, close} -> + {gun_ws, Pid, StreamRef, close} -> gun:ws_send(Pid, close), - loop(Pid, Ref); - {gun_ws, Pid, {close, Code, _}} -> + loop(Pid, MRef, StreamRef); + {gun_ws, Pid, StreamRef, {close, Code, _}} -> gun:ws_send(Pid, {close, Code, <<>>}), - loop(Pid, Ref); - {gun_ws, Pid, Frame} -> + loop(Pid, MRef, StreamRef); + {gun_ws, Pid, StreamRef, Frame} -> gun:ws_send(Pid, Frame), - loop(Pid, Ref); + loop(Pid, MRef, StreamRef); {gun_down, Pid, ws, _, _, _} -> - close(Pid, Ref); - {'DOWN', Ref, process, Pid, normal} -> - close(Pid, Ref); + close(Pid, MRef); + {'DOWN', MRef, process, Pid, normal} -> + close(Pid, MRef); Msg -> - ct:print("Unexpected message ~p", [Msg]), - close(Pid, Ref) + ct:pal("Unexpected message ~p", [Msg]), + close(Pid, MRef) end. update_reports() -> - {Pid, Ref} = connect("/updateReports?agent=Gun"), + {Pid, MRef, StreamRef} = connect("/updateReports?agent=Gun"), receive - {gun_ws, Pid, close} -> - close(Pid, Ref) + {gun_ws, Pid, StreamRef, close} -> + close(Pid, MRef) after 5000 -> error(failed) end. @@ -143,22 +143,22 @@ log_output() -> ok. connect(Path) -> - {ok, Pid} = gun:open("127.0.0.1", 33080, #{retry=>0}), + {ok, Pid} = gun:open("127.0.0.1", 33080, #{retry => 0}), {ok, http} = gun:await_up(Pid), - Ref = monitor(process, Pid), - gun:ws_upgrade(Pid, Path, [], #{compress => true}), + MRef = monitor(process, Pid), + StreamRef = gun:ws_upgrade(Pid, Path, [], #{compress => true}), receive - {gun_ws_upgrade, Pid, ok, _} -> + {gun_upgrade, Pid, StreamRef, [<<"websocket">>], _} -> ok; Msg -> - ct:print("Unexpected message ~p", [Msg]), + ct:pal("Unexpected message ~p", [Msg]), terminate(), error(failed) end, - {Pid, Ref}. + {Pid, MRef, StreamRef}. -close(Pid, Ref) -> - demonitor(Ref), +close(Pid, MRef) -> + demonitor(MRef), gun:close(Pid), gun:flush(Pid). -- cgit v1.2.3