aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-06-03 20:31:09 +0200
committerLoïc Hoguin <[email protected]>2018-06-03 20:31:09 +0200
commit34307a584149abbf0b2e5b33beb2fca4c585b0d1 (patch)
treef44f3236287271ca0868262cb4401d1c1b88da38
parentca57baf279a164c4b9813267e6bbc96ed5b45ee8 (diff)
downloadgun-34307a584149abbf0b2e5b33beb2fca4c585b0d1.tar.gz
gun-34307a584149abbf0b2e5b33beb2fca4c585b0d1.tar.bz2
gun-34307a584149abbf0b2e5b33beb2fca4c585b0d1.zip
Change messages to gun_upgrade and gun_ws with stream reference
-rw-r--r--ebin/gun.app2
-rw-r--r--src/gun.erl9
-rw-r--r--src/gun_http.erl36
-rw-r--r--src/gun_ws.erl8
-rw-r--r--src/gun_ws_h.erl (renamed from src/gun_ws_handler.erl)21
-rw-r--r--test/gun_SUITE.erl2
-rw-r--r--test/ws_SUITE.erl60
7 files changed, 76 insertions, 62 deletions
diff --git a/ebin/gun.app b/ebin/gun.app
index f41a23e..de3896c 100644
--- a/ebin/gun.app
+++ b/ebin/gun.app
@@ -1,7 +1,7 @@
{application, 'gun', [
{description, "HTTP/1.1, HTTP/2 and Websocket client for Erlang/OTP."},
{vsn, "1.0.0-pre.5"},
- {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_sse','gun_sup','gun_ws','gun_ws_handler']},
+ {modules, ['gun','gun_app','gun_content_handler','gun_data','gun_http','gun_http2','gun_sse','gun_sup','gun_ws','gun_ws_h']},
{registered, [gun_sup]},
{applications, [kernel,stdlib,ssl,cowlib,ranch]},
{mod, {gun_app, []}},
diff --git a/src/gun.erl b/src/gun.erl
index cad1da1..1224c82 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -381,6 +381,7 @@ await(ServerPid, StreamRef, Timeout) ->
demonitor(MRef, [flush]),
Res.
+%% @todo Add gun_upgrade and gun_ws?
await(ServerPid, StreamRef, Timeout, MRef) ->
receive
{gun_inform, ServerPid, StreamRef, Status, Headers} ->
@@ -494,9 +495,9 @@ flush_pid(ServerPid) ->
flush_pid(ServerPid);
{gun_error, ServerPid, _} ->
flush_pid(ServerPid);
- {gun_ws_upgrade, ServerPid, _, _} ->
+ {gun_upgrade, ServerPid, _, _, _} ->
flush_pid(ServerPid);
- {gun_ws, ServerPid, _} ->
+ {gun_ws, ServerPid, _, _} ->
flush_pid(ServerPid);
{'DOWN', _, process, ServerPid, _} ->
flush_pid(ServerPid)
@@ -517,6 +518,10 @@ flush_ref(StreamRef) ->
{gun_push, _, StreamRef, _, _, _, _, _} ->
flush_ref(StreamRef);
{gun_error, _, StreamRef, _} ->
+ flush_ref(StreamRef);
+ {gun_upgrade, _, StreamRef, _, _} ->
+ flush_ref(StreamRef);
+ {gun_ws, _, StreamRef, _} ->
flush_ref(StreamRef)
after 0 ->
ok
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 7e7bed7..4fc6a17 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -201,8 +201,8 @@ handle_head(Data, State=#http_state{version=ClientVersion,
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
- {101, {websocket, _, WsKey, WsExtensions, WsOpts}} ->
- ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsOpts);
+ {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
+ ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts);
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
handle(Rest2, State);
@@ -511,26 +511,31 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, ou
new_stream(State#http_state{connection=keepalive, out=head},
{websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>).
-ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) ->
+ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
%% @todo check upgrade, connection
case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
false ->
close;
{_, Accept} ->
case cow_ws:encode_key(Key) of
- Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts);
- _ -> close
+ Accept ->
+ ws_handshake_extensions(Buffer, State, StreamRef,
+ Headers, GunExtensions, Opts);
+ _ ->
+ close
end
end.
-ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts) ->
+ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) ->
case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
false ->
- ws_handshake_protocols(Buffer, State, Headers, #{}, Opts);
+ ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts);
{_, ExtHd} ->
case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
- close -> close;
- Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts)
+ close ->
+ close;
+ Extensions ->
+ ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts)
end
end.
@@ -552,23 +557,24 @@ ws_validate_extensions(_, _, _, _) ->
close.
%% @todo Validate protocols.
-ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) ->
+ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
false ->
- ws_handshake_end(Buffer, State, Headers, Extensions,
- maps:get(default_protocol, Opts, gun_ws_handler), Opts);
+ ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions,
+ maps:get(default_protocol, Opts, gun_ws_h), Opts);
{_, Proto} ->
ProtoOpt = maps:get(protocols, Opts, []),
case lists:keyfind(Proto, 1, ProtoOpt) of
{_, Handler} ->
- ws_handshake_end(Buffer, State, Headers, Extensions, Handler, Opts);
+ ws_handshake_end(Buffer, State, StreamRef,
+ Headers, Extensions, Handler, Opts);
false ->
close
end
end.
ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport},
- Headers, Extensions, Handler, Opts) ->
+ StreamRef, Headers, Extensions, Handler, Opts) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -577,4 +583,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
{OK, _, _} = Transport:messages(),
self() ! {OK, Socket, Buffer}
end,
- gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts).
+ gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts).
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 6501893..fe12448 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -16,7 +16,7 @@
-export([check_options/1]).
-export([name/0]).
--export([init/7]).
+-export([init/8]).
-export([handle/2]).
-export([close/2]).
-export([send/2]).
@@ -66,9 +66,9 @@ do_check_options([Opt|_]) ->
name() -> ws.
-init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts) ->
- Owner ! {gun_ws_upgrade, self(), ok, Headers},
- HandlerState = Handler:init(Owner, Headers, Opts),
+init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
+ Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
+ HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
{upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
diff --git a/src/gun_ws_handler.erl b/src/gun_ws_h.erl
index 4356ab5..fb6e3d2 100644
--- a/src/gun_ws_handler.erl
+++ b/src/gun_ws_h.erl
@@ -12,24 +12,27 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
--module(gun_ws_handler).
+-module(gun_ws_h).
--export([init/3]).
+-export([init/4]).
-export([handle/2]).
-record(state, {
reply_to :: pid(),
+ stream_ref :: reference(),
frag_buffer = <<>> :: binary()
}).
-init(ReplyTo, _, _) ->
- #state{reply_to=ReplyTo}.
+init(ReplyTo, StreamRef, _, _) ->
+ #state{reply_to=ReplyTo, stream_ref=StreamRef}.
-handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) ->
+handle({fragment, nofin, _, Payload},
+ State=#state{frag_buffer=SoFar}) ->
State#state{frag_buffer= << SoFar/binary, Payload/binary >>};
-handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, frag_buffer=SoFar}) ->
- ReplyTo ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}},
+handle({fragment, fin, Type, Payload},
+ State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) ->
+ ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}},
State#state{frag_buffer= <<>>};
-handle(Frame, State=#state{reply_to=ReplyTo}) ->
- ReplyTo ! {gun_ws, self(), Frame},
+handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
+ ReplyTo ! {gun_ws, self(), StreamRef, Frame},
State.
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index 4b00559..797b7e6 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -73,7 +73,7 @@ detect_owner_gone_ws(_) ->
gun:await_up(ConnPid),
gun:ws_upgrade(ConnPid, "/", []),
receive
- {gun_ws_upgrade, ConnPid, ok, _} ->
+ {gun_upgrade, ConnPid, _, [<<"websocket">>], _} ->
ok
after 1000 ->
error(timeout)
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 0630c1b..8a29110 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -31,7 +31,7 @@ init_per_group(autobahn, Config) ->
Out = os:cmd("pip show autobahntestsuite ; pip2 show autobahntestsuite"),
case string:str(Out, "autobahntestsuite") of
0 ->
- ct:print("Skipping the autobahn group because the "
+ ct:pal("Skipping the autobahn group because the "
"Autobahn Test Suite is not installed.~nTo install it, "
"please follow the instructions on this page:~n~n "
"http://autobahn.ws/testsuite/installation.html"),
@@ -91,13 +91,13 @@ receive_infinity(Port) ->
end.
get_case_count() ->
- {Pid, Ref} = connect("/getCaseCount"),
+ {Pid, MRef, StreamRef} = connect("/getCaseCount"),
receive
- {gun_ws, Pid, {text, N}} ->
- close(Pid, Ref),
+ {gun_ws, Pid, StreamRef, {text, N}} ->
+ close(Pid, MRef),
binary_to_integer(N);
Msg ->
- ct:print("Unexpected message ~p", [Msg]),
+ ct:pal("Unexpected message ~p", [Msg]),
terminate(),
error(failed)
end.
@@ -105,36 +105,36 @@ get_case_count() ->
run_cases(Total, Total) ->
ok;
run_cases(N, Total) ->
- {Pid, Ref} = connect(["/runCase?case=", integer_to_binary(N + 1), "&agent=Gun"]),
- loop(Pid, Ref),
+ {Pid, MRef, StreamRef} = connect(["/runCase?case=", integer_to_binary(N + 1), "&agent=Gun"]),
+ loop(Pid, MRef, StreamRef),
update_reports(),
run_cases(N + 1, Total).
-loop(Pid, Ref) ->
+loop(Pid, MRef, StreamRef) ->
receive
- {gun_ws, Pid, close} ->
+ {gun_ws, Pid, StreamRef, close} ->
gun:ws_send(Pid, close),
- loop(Pid, Ref);
- {gun_ws, Pid, {close, Code, _}} ->
+ loop(Pid, MRef, StreamRef);
+ {gun_ws, Pid, StreamRef, {close, Code, _}} ->
gun:ws_send(Pid, {close, Code, <<>>}),
- loop(Pid, Ref);
- {gun_ws, Pid, Frame} ->
+ loop(Pid, MRef, StreamRef);
+ {gun_ws, Pid, StreamRef, Frame} ->
gun:ws_send(Pid, Frame),
- loop(Pid, Ref);
+ loop(Pid, MRef, StreamRef);
{gun_down, Pid, ws, _, _, _} ->
- close(Pid, Ref);
- {'DOWN', Ref, process, Pid, normal} ->
- close(Pid, Ref);
+ close(Pid, MRef);
+ {'DOWN', MRef, process, Pid, normal} ->
+ close(Pid, MRef);
Msg ->
- ct:print("Unexpected message ~p", [Msg]),
- close(Pid, Ref)
+ ct:pal("Unexpected message ~p", [Msg]),
+ close(Pid, MRef)
end.
update_reports() ->
- {Pid, Ref} = connect("/updateReports?agent=Gun"),
+ {Pid, MRef, StreamRef} = connect("/updateReports?agent=Gun"),
receive
- {gun_ws, Pid, close} ->
- close(Pid, Ref)
+ {gun_ws, Pid, StreamRef, close} ->
+ close(Pid, MRef)
after 5000 ->
error(failed)
end.
@@ -143,22 +143,22 @@ log_output() ->
ok.
connect(Path) ->
- {ok, Pid} = gun:open("127.0.0.1", 33080, #{retry=>0}),
+ {ok, Pid} = gun:open("127.0.0.1", 33080, #{retry => 0}),
{ok, http} = gun:await_up(Pid),
- Ref = monitor(process, Pid),
- gun:ws_upgrade(Pid, Path, [], #{compress => true}),
+ MRef = monitor(process, Pid),
+ StreamRef = gun:ws_upgrade(Pid, Path, [], #{compress => true}),
receive
- {gun_ws_upgrade, Pid, ok, _} ->
+ {gun_upgrade, Pid, StreamRef, [<<"websocket">>], _} ->
ok;
Msg ->
- ct:print("Unexpected message ~p", [Msg]),
+ ct:pal("Unexpected message ~p", [Msg]),
terminate(),
error(failed)
end,
- {Pid, Ref}.
+ {Pid, MRef, StreamRef}.
-close(Pid, Ref) ->
- demonitor(Ref),
+close(Pid, MRef) ->
+ demonitor(MRef),
gun:close(Pid),
gun:flush(Pid).