aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-06-03 20:31:09 +0200
committerLoïc Hoguin <[email protected]>2018-06-03 20:31:09 +0200
commit34307a584149abbf0b2e5b33beb2fca4c585b0d1 (patch)
treef44f3236287271ca0868262cb4401d1c1b88da38 /src
parentca57baf279a164c4b9813267e6bbc96ed5b45ee8 (diff)
downloadgun-34307a584149abbf0b2e5b33beb2fca4c585b0d1.tar.gz
gun-34307a584149abbf0b2e5b33beb2fca4c585b0d1.tar.bz2
gun-34307a584149abbf0b2e5b33beb2fca4c585b0d1.zip
Change messages to gun_upgrade and gun_ws with stream reference
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl9
-rw-r--r--src/gun_http.erl36
-rw-r--r--src/gun_ws.erl8
-rw-r--r--src/gun_ws_h.erl (renamed from src/gun_ws_handler.erl)21
4 files changed, 44 insertions, 30 deletions
diff --git a/src/gun.erl b/src/gun.erl
index cad1da1..1224c82 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -381,6 +381,7 @@ await(ServerPid, StreamRef, Timeout) ->
demonitor(MRef, [flush]),
Res.
+%% @todo Add gun_upgrade and gun_ws?
await(ServerPid, StreamRef, Timeout, MRef) ->
receive
{gun_inform, ServerPid, StreamRef, Status, Headers} ->
@@ -494,9 +495,9 @@ flush_pid(ServerPid) ->
flush_pid(ServerPid);
{gun_error, ServerPid, _} ->
flush_pid(ServerPid);
- {gun_ws_upgrade, ServerPid, _, _} ->
+ {gun_upgrade, ServerPid, _, _, _} ->
flush_pid(ServerPid);
- {gun_ws, ServerPid, _} ->
+ {gun_ws, ServerPid, _, _} ->
flush_pid(ServerPid);
{'DOWN', _, process, ServerPid, _} ->
flush_pid(ServerPid)
@@ -517,6 +518,10 @@ flush_ref(StreamRef) ->
{gun_push, _, StreamRef, _, _, _, _, _} ->
flush_ref(StreamRef);
{gun_error, _, StreamRef, _} ->
+ flush_ref(StreamRef);
+ {gun_upgrade, _, StreamRef, _, _} ->
+ flush_ref(StreamRef);
+ {gun_ws, _, StreamRef, _} ->
flush_ref(StreamRef)
after 0 ->
ok
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 7e7bed7..4fc6a17 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -201,8 +201,8 @@ handle_head(Data, State=#http_state{version=ClientVersion,
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
- {101, {websocket, _, WsKey, WsExtensions, WsOpts}} ->
- ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsOpts);
+ {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
+ ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts);
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
handle(Rest2, State);
@@ -511,26 +511,31 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, ou
new_stream(State#http_state{connection=keepalive, out=head},
{websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>).
-ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) ->
+ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
%% @todo check upgrade, connection
case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
false ->
close;
{_, Accept} ->
case cow_ws:encode_key(Key) of
- Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts);
- _ -> close
+ Accept ->
+ ws_handshake_extensions(Buffer, State, StreamRef,
+ Headers, GunExtensions, Opts);
+ _ ->
+ close
end
end.
-ws_handshake_extensions(Buffer, State, Headers, GunExtensions, Opts) ->
+ws_handshake_extensions(Buffer, State, StreamRef, Headers, GunExtensions, Opts) ->
case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
false ->
- ws_handshake_protocols(Buffer, State, Headers, #{}, Opts);
+ ws_handshake_protocols(Buffer, State, StreamRef, Headers, #{}, Opts);
{_, ExtHd} ->
case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
- close -> close;
- Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts)
+ close ->
+ close;
+ Extensions ->
+ ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts)
end
end.
@@ -552,23 +557,24 @@ ws_validate_extensions(_, _, _, _) ->
close.
%% @todo Validate protocols.
-ws_handshake_protocols(Buffer, State, Headers, Extensions, Opts) ->
+ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
case lists:keyfind(<<"sec-websocket-protocol">>, 1, Headers) of
false ->
- ws_handshake_end(Buffer, State, Headers, Extensions,
- maps:get(default_protocol, Opts, gun_ws_handler), Opts);
+ ws_handshake_end(Buffer, State, StreamRef, Headers, Extensions,
+ maps:get(default_protocol, Opts, gun_ws_h), Opts);
{_, Proto} ->
ProtoOpt = maps:get(protocols, Opts, []),
case lists:keyfind(Proto, 1, ProtoOpt) of
{_, Handler} ->
- ws_handshake_end(Buffer, State, Headers, Extensions, Handler, Opts);
+ ws_handshake_end(Buffer, State, StreamRef,
+ Headers, Extensions, Handler, Opts);
false ->
close
end
end.
ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport},
- Headers, Extensions, Handler, Opts) ->
+ StreamRef, Headers, Extensions, Handler, Opts) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -577,4 +583,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
{OK, _, _} = Transport:messages(),
self() ! {OK, Socket, Buffer}
end,
- gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts).
+ gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts).
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 6501893..fe12448 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -16,7 +16,7 @@
-export([check_options/1]).
-export([name/0]).
--export([init/7]).
+-export([init/8]).
-export([handle/2]).
-export([close/2]).
-export([send/2]).
@@ -66,9 +66,9 @@ do_check_options([Opt|_]) ->
name() -> ws.
-init(Owner, Socket, Transport, Headers, Extensions, Handler, Opts) ->
- Owner ! {gun_ws_upgrade, self(), ok, Headers},
- HandlerState = Handler:init(Owner, Headers, Opts),
+init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
+ Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
+ HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
{upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
diff --git a/src/gun_ws_handler.erl b/src/gun_ws_h.erl
index 4356ab5..fb6e3d2 100644
--- a/src/gun_ws_handler.erl
+++ b/src/gun_ws_h.erl
@@ -12,24 +12,27 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
--module(gun_ws_handler).
+-module(gun_ws_h).
--export([init/3]).
+-export([init/4]).
-export([handle/2]).
-record(state, {
reply_to :: pid(),
+ stream_ref :: reference(),
frag_buffer = <<>> :: binary()
}).
-init(ReplyTo, _, _) ->
- #state{reply_to=ReplyTo}.
+init(ReplyTo, StreamRef, _, _) ->
+ #state{reply_to=ReplyTo, stream_ref=StreamRef}.
-handle({fragment, nofin, _, Payload}, State=#state{frag_buffer=SoFar}) ->
+handle({fragment, nofin, _, Payload},
+ State=#state{frag_buffer=SoFar}) ->
State#state{frag_buffer= << SoFar/binary, Payload/binary >>};
-handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, frag_buffer=SoFar}) ->
- ReplyTo ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}},
+handle({fragment, fin, Type, Payload},
+ State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) ->
+ ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}},
State#state{frag_buffer= <<>>};
-handle(Frame, State=#state{reply_to=ReplyTo}) ->
- ReplyTo ! {gun_ws, self(), Frame},
+handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
+ ReplyTo ! {gun_ws, self(), StreamRef, Frame},
State.