aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl84
-rw-r--r--src/gun_http.erl160
-rw-r--r--src/gun_ws.erl125
3 files changed, 307 insertions, 62 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 71af26e..3b98732 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -58,6 +58,7 @@
%% Websocket.
-export([ws_upgrade/2]).
-export([ws_upgrade/3]).
+-export([ws_upgrade/4]).
-export([ws_send/2]).
%% Debug.
@@ -85,6 +86,8 @@
| {type, conn_type()}].
-export_type([opts/0]).
+-type ws_opts() :: [{compress, boolean()}].
+
-record(state, {
parent :: pid(),
owner :: pid(),
@@ -98,7 +101,7 @@
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
protocol :: module(),
- proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY and WS too.
+ proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY too.
protocol_state :: any()
}).
@@ -338,14 +341,19 @@ cancel(ServerPid, StreamRef) ->
%% Websocket.
--spec ws_upgrade(pid(), iodata()) -> ok.
+-spec ws_upgrade(pid(), iodata()) -> reference().
ws_upgrade(ServerPid, Path) ->
- ws_upgrade(ServerPid, Path, []).
+ ws_upgrade(ServerPid, Path, [], []).
--spec ws_upgrade(pid(), iodata(), headers()) -> ok.
+-spec ws_upgrade(pid(), iodata(), headers()) -> reference().
ws_upgrade(ServerPid, Path, Headers) ->
- _ = ServerPid ! {ws_upgrade, self(), Path, Headers},
- ok.
+ ws_upgrade(ServerPid, Path, Headers, []).
+
+-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference().
+ws_upgrade(ServerPid, Path, Headers, Opts) ->
+ StreamRef = make_ref(),
+ _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
+ StreamRef.
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
@@ -428,8 +436,10 @@ connect(State=#state{owner=Owner, host=Host, port=Port, type=Type,
retry(State, Retries - 1)
end.
-retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when
- is_reference(KeepaliveRef) ->
+%% Exit normally if the retry functionality has been disabled.
+retry(_, 0) ->
+ ok;
+retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) ->
_ = erlang:cancel_timer(KeepaliveRef),
%% Flush if we have a keepalive message
receive
@@ -458,7 +468,7 @@ before_loop(State=#state{keepalive=Keepalive}) ->
KeepaliveRef = erlang:send_after(Keepalive, self(), keepalive),
loop(State#state{keepalive_ref=KeepaliveRef}).
-loop(State=#state{parent=Parent, owner=Owner, host=Host,
+loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port,
retry=Retry, socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState}) ->
{OK, Closed, Error} = Transport:messages(),
@@ -470,7 +480,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
Transport:close(Socket),
retry(State#state{socket=undefined, transport=undefined,
protocol=undefined}, Retry);
- ProtoState2 ->
+ {upgrade, Protocol2, ProtoState2} ->
+ ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2});
+ ProtoState2 ->
loop(State#state{protocol_state=ProtoState2})
end;
{Closed, Socket} ->
@@ -494,11 +506,11 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
before_loop(State#state{protocol_state=ProtoState2});
{request, Owner, StreamRef, Method, Path, Headers} ->
ProtoState2 = Protocol:request(ProtoState,
- StreamRef, Method, Host, Path, Headers),
+ StreamRef, Method, Host, Port, Path, Headers),
loop(State#state{protocol_state=ProtoState2});
{request, Owner, StreamRef, Method, Path, Headers, Body} ->
ProtoState2 = Protocol:request(ProtoState,
- StreamRef, Method, Host, Path, Headers, Body),
+ StreamRef, Method, Host, Port, Path, Headers, Body),
loop(State#state{protocol_state=ProtoState2});
{data, Owner, StreamRef, IsFin, Data} ->
ProtoState2 = Protocol:data(ProtoState,
@@ -507,11 +519,10 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
{cancel, Owner, StreamRef} ->
ProtoState2 = Protocol:cancel(ProtoState, StreamRef),
loop(State#state{protocol_state=ProtoState2});
- {ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy ->
- %% @todo
- ProtoState2 = Protocol:ws_upgrade(ProtoState,
- Path, Headers),
- ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2});
+ {ws_upgrade, Owner, StreamRef, Path, Headers, Opts} when Protocol =/= gun_spdy ->
+ ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, Opts),
+ loop(State#state{protocol_state=ProtoState2});
+ %% @todo can fail if http/1.0
{shutdown, Owner} ->
%% @todo Protocol:shutdown?
ok;
@@ -525,9 +536,9 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host,
element(2, Any) ! {gun_error, self(), {notowner,
"Operations are restricted to the owner of the connection."}},
loop(State);
- {ws_upgrade, _, _, _} ->
- Owner ! {gun_error, self(), {badstate,
- "Websocket over SPDY isn't supported."}},
+ {ws_upgrade, _, StreamRef, _, _} ->
+ Owner ! {gun_error, self(), StreamRef, {badstate,
+ "Websocket is only supported over HTTP/1.1."}},
loop(State);
{ws_send, _, _} ->
Owner ! {gun_error, self(), {badstate,
@@ -545,23 +556,34 @@ ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket,
ok = Transport:setopts(Socket, [{active, once}]),
receive
{OK, Socket, Data} ->
- ProtoState2 = Protocol:handle(ProtoState, Data),
- ws_loop(State#state{protocol_state=ProtoState2});
+ case Protocol:handle(Data, ProtoState) of
+ close ->
+ Transport:close(Socket),
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
+ ProtoState2 ->
+ ws_loop(State#state{protocol_state=ProtoState2})
+ end;
{Closed, Socket} ->
Transport:close(Socket),
- retry(State#state{socket=undefined, transport=undefined,
- protocol=undefined}, Retry);
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
{Error, Socket, _} ->
Transport:close(Socket),
- retry(State#state{socket=undefined, transport=undefined,
- protocol=undefined}, Retry);
- %% @todo keepalive
- {ws_send, Owner, Frames} when is_list(Frames) ->
- todo; %% @todo
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
+ %% Ignore any previous HTTP keep-alive.
+ keepalive ->
+ ws_loop(State);
+% {ws_send, Owner, Frames} when is_list(Frames) ->
+% todo; %% @todo
{ws_send, Owner, Frame} ->
- {todo, Frame}; %% @todo
+ case Protocol:send(Frame, ProtoState) of
+ close ->
+ Transport:close(Socket),
+ retry(State#state{socket=undefined, transport=undefined, protocol=undefined}, Retry);
+ ProtoState2 ->
+ ws_loop(State#state{protocol_state=ProtoState2})
+ end;
{shutdown, Owner} ->
- %% @todo Protocol:shutdown?
+ %% @todo Protocol:shutdown? %% @todo close frame
ok;
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
diff --git a/src/gun_http.erl b/src/gun_http.erl
index bd6565c..f2a2d23 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -18,16 +18,19 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/6]).
-export([request/7]).
+-export([request/8]).
-export([data/4]).
-export([cancel/2]).
+-export([ws_upgrade/7]).
-type opts() :: [{version, cow_http:version()}].
-export_type([opts/0]).
-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked.
+-type websocket_info() :: {websocket, reference(), binary(), [], []}. %% key, extensions, protocols
+
-record(http_state, {
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
@@ -35,7 +38,7 @@
version = 'HTTP/1.1' :: cow_http:version(),
connection = keepalive :: keepalive | close,
buffer = <<>> :: binary(),
- streams = [] :: [{reference(), boolean()}], %% ref + whether stream is alive
+ streams = [] :: [{reference() | websocket_info(), boolean()}], %% ref + whether stream is alive
in = head :: io(),
in_state :: {non_neg_integer(), non_neg_integer()},
out = head :: io()
@@ -128,31 +131,40 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
connection=Conn, streams=[{StreamRef, IsAlive}|_]}) ->
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
- In = io_from_headers(Version, Headers),
- IsFin = case In of head -> fin; _ -> nofin end,
- case IsAlive of
- false ->
- ok;
- true ->
- Owner ! {gun_response, self(), StreamRef,
- IsFin, Status, Headers},
- ok
- end,
- Conn2 = if
- Conn =:= close -> close;
- Version =:= 'HTTP/1.0' -> close;
- ClientVersion =:= 'HTTP/1.0' -> close;
- true -> conn_from_headers(Headers)
- end,
- %% We always reset in_state even if not chunked.
- if
- IsFin =:= fin, Conn2 =:= close ->
- close;
- IsFin =:= fin ->
- handle(Rest2, end_stream(State#http_state{in=In,
- in_state={0, 0}, connection=Conn2}));
- true ->
- handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2})
+ case {Status, StreamRef} of
+ {101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} ->
+ ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts);
+ _ ->
+ In = io_from_headers(Version, Headers),
+ IsFin = case In of head -> fin; _ -> nofin end,
+ case IsAlive of
+ false ->
+ ok;
+ true ->
+ StreamRef2 = case StreamRef of
+ {websocket, SR, _, _, _} -> SR;
+ _ -> StreamRef
+ end,
+ Owner ! {gun_response, self(), StreamRef2,
+ IsFin, Status, Headers},
+ ok
+ end,
+ Conn2 = if
+ Conn =:= close -> close;
+ Version =:= 'HTTP/1.0' -> close;
+ ClientVersion =:= 'HTTP/1.0' -> close;
+ true -> conn_from_headers(Headers)
+ end,
+ %% We always reset in_state even if not chunked.
+ if
+ IsFin =:= fin, Conn2 =:= close ->
+ close;
+ IsFin =:= fin ->
+ handle(Rest2, end_stream(State#http_state{in=In,
+ in_state={0, 0}, connection=Conn2}));
+ true ->
+ handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2})
+ end
end.
send_data_if_alive(<<>>, _, nofin) ->
@@ -187,27 +199,28 @@ keepalive(State) ->
State.
request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, Method, Host, Path, Headers) ->
+ out=head}, StreamRef, Method, Host, Port, Path, Headers) ->
Headers2 = case Version of
'HTTP/1.0' -> lists:keydelete(<<"transfer-encoding">>, 1, Headers);
'HTTP/1.1' -> Headers
end,
Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
- false -> [{<<"host">>, Host}|Headers2];
+ false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
true -> Headers2
end,
%% We use Headers2 because this is the smallest list.
Conn = conn_from_headers(Headers2),
+ %% @todo This should probably also check for content-type like SPDY.
Out = io_from_headers(Version, Headers2),
Transport:send(Socket, cow_http:request(Method, Path, Version, Headers3)),
new_stream(State#http_state{connection=Conn, out=Out}, StreamRef).
request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, Method, Host, Path, Headers, Body) ->
+ out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) ->
Headers2 = lists:keydelete(<<"content-length">>, 1,
lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
- false -> [{<<"host">>, Host}|Headers2];
+ false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
true -> Headers2
end,
%% We use Headers2 because this is the smallest list.
@@ -334,3 +347,88 @@ cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
end_stream(State=#http_state{streams=[_|Tail]}) ->
State#http_state{in=head, streams=Tail}.
+
+%% Websocket upgrade.
+
+%% Ensure version is 1.1.
+ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
+ error; %% @todo
+ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
+ StreamRef, Host, Port, Path, Headers, WsOpts) ->
+ %% @todo Add option for setting protocol.
+ {ExtHeaders, GunExtensions} = case maps:get(compress, WsOpts, false) of
+ true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}],
+ [<<"permessage-deflate">>]};
+ false -> {[], []}
+ end,
+ Key = cow_ws:key(),
+ Headers2 = [
+ {<<"connection">>, <<"upgrade">>},
+ {<<"upgrade">>, <<"websocket">>},
+ {<<"sec-websocket-version">>, <<"13">>},
+ {<<"sec-websocket-key">>, Key}
+ |ExtHeaders
+ ],
+ Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
+ false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
+ true -> Headers2
+ end,
+ Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)),
+ new_stream(State#http_state{connection=keepalive, out=head},
+ {websocket, StreamRef, Key, GunExtensions, [], WsOpts}).
+
+ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) ->
+ %% @todo check upgrade, connection
+ case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
+ false ->
+ close;
+ {_, Accept} ->
+ case cow_ws:encode_key(Key) of
+ Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts);
+ _ -> close
+ end
+ end.
+
+ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) ->
+ case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
+ false ->
+ ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols);
+ {_, ExtHd} ->
+ case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
+ close -> close;
+ Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols)
+ end
+ end.
+
+ws_validate_extensions([], _, Acc, _) ->
+ Acc;
+ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) ->
+ case lists:member(Name, GunExts) of
+ true ->
+ case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of
+ {ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts);
+ error -> close
+ end;
+ %% Fail the connection if extension was not requested.
+ false ->
+ close
+ end;
+%% Fail the connection on unknown extension.
+ws_validate_extensions(_, _, _, _) ->
+ close.
+
+%% @todo Validate protocols.
+ws_handshake_protocols(Buffer, State, _Headers, Extensions, _GunProtocols = []) ->
+ Protocols = [],
+ ws_handshake_end(Buffer, State, Extensions, Protocols).
+
+ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Extensions, Protocols) ->
+ %% Send ourselves the remaining buffer, if any.
+ _ = case Buffer of
+ <<>> ->
+ ok;
+ _ ->
+ {OK, _, _} = Transport:messages(),
+ self() ! {OK, Socket, Buffer}
+ end,
+ gun_ws:init(Owner, Socket, Transport, Extensions, Protocols).
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
new file mode 100644
index 0000000..5379362
--- /dev/null
+++ b/src/gun_ws.erl
@@ -0,0 +1,125 @@
+%% Copyright (c) 2015, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(gun_ws).
+
+-export([init/5]).
+-export([handle/2]).
+-export([send/2]).
+
+-record(payload, {
+ type = undefined :: cow_ws:frame_type(),
+ rsv = undefined :: cow_ws:rsv(),
+ len = undefined :: non_neg_integer(),
+ mask_key = undefined :: cow_ws:mask_key(),
+ close_code = undefined :: undefined | cow_ws:close_code(),
+ unmasked = <<>> :: binary(),
+ unmasked_len = 0 :: non_neg_integer()
+}).
+
+-record(ws_state, {
+ owner :: pid(),
+ socket :: inet:socket() | ssl:sslsocket(),
+ transport :: module(),
+ buffer = <<>> :: binary(),
+ in = head :: head | #payload{} | close,
+ frag_state = undefined :: cow_ws:frag_state(),
+ frag_buffer = <<>> :: binary(),
+ utf8_state = 0 :: cow_ws:utf8_state(),
+ extensions = #{} :: cow_ws:extensions()
+}).
+
+%% @todo Protocols
+init(Owner, Socket, Transport, Extensions, _Protocols) ->
+ Owner ! {gun_ws_upgrade, self(), ok},
+ {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions}}.
+
+%% Do not handle anything if we received a close frame.
+handle(_, State=#ws_state{in=close}) ->
+ State;
+%% Shortcut for common case when Data is empty after processing a frame.
+handle(<<>>, State=#ws_state{in=head}) ->
+ State;
+handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) ->
+ Data2 = << Buffer/binary, Data/binary >>,
+ case cow_ws:parse_header(Data2, Extensions, FragState) of
+ {Type, FragState2, Rsv, Len, MaskKey, Rest} ->
+ handle(Rest, State#ws_state{buffer= <<>>,
+ in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2});
+ more ->
+ State#ws_state{buffer=Data2};
+ error ->
+ close({error, badframe}, State)
+ end;
+handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode,
+ unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, utf8_state=Utf8State, extensions=Extensions}) ->
+ case cow_ws:parse_payload(Data, MaskKey, Utf8State, UnmaskedLen, Type, Len, FragState, Extensions, Rsv) of
+ {ok, CloseCode2, Payload, Utf8State2, Rest} ->
+ dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode2);
+ {ok, Payload, Utf8State2, Rest} ->
+ dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode);
+ {more, CloseCode2, Payload, Utf8State2} ->
+ State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>,
+ len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2};
+ {more, Payload, Utf8State2} ->
+ State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>,
+ len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2};
+ Error = {error, _Reason} ->
+ close(Error, State)
+ end.
+
+dispatch(Rest, State=#ws_state{owner=Owner, frag_state=FragState, frag_buffer=SoFar},
+ Type0, Payload0, CloseCode0) ->
+ case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of
+ {fragment, nofin, _, Payload} ->
+ handle(Rest, State#ws_state{frag_buffer= << SoFar/binary, Payload/binary >>});
+ {fragment, fin, Type, Payload} ->
+ Owner ! {gun_ws, self(), {Type, << SoFar/binary, Payload/binary >>}},
+ handle(Rest, State#ws_state{frag_state=undefined, frag_buffer= <<>>});
+ ping ->
+ State2 = send(pong, State),
+ handle(Rest, State2);
+ {ping, Payload} ->
+ State2 = send({pong, Payload}, State),
+ handle(Rest, State2);
+ pong ->
+ handle(Rest, State);
+ {pong, _} ->
+ handle(Rest, State);
+ Frame ->
+ Owner ! {gun_ws, self(), Frame},
+ case Frame of
+ close -> handle(Rest, State#ws_state{in=close});
+ {close, _, _} -> handle(Rest, State#ws_state{in=close});
+ _ -> handle(Rest, State)
+ end
+ end.
+
+close(Reason, State) ->
+ case Reason of
+ Normal when Normal =:= stop; Normal =:= timeout ->
+ send({close, 1000, <<>>}, State);
+ {error, badframe} ->
+ send({close, 1002, <<>>}, State);
+ {error, badencoding} ->
+ send({close, 1007, <<>>}, State)
+ end.
+
+send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Extensions}) ->
+ Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)),
+ case Frame of
+ close -> close;
+ {close, _, _} -> close;
+ _ -> State
+ end.