diff options
-rw-r--r-- | Makefile | 2 | ||||
-rw-r--r-- | rebar.config | 2 | ||||
-rw-r--r-- | src/cowboy_websocket.erl | 167 | ||||
-rw-r--r-- | test/ws_SUITE.erl | 4 |
4 files changed, 84 insertions, 91 deletions
@@ -11,7 +11,7 @@ PLT_APPS = crypto public_key ssl # Dependencies. DEPS = cowlib ranch -dep_cowlib = git https://github.com/ninenines/cowlib 1.1.0 +dep_cowlib = git https://github.com/ninenines/cowlib 1.2.0 TEST_DEPS = ct_helper gun dep_ct_helper = git https://github.com/extend/ct_helper.git master diff --git a/rebar.config b/rebar.config index 92cf75c..4dafa26 100644 --- a/rebar.config +++ b/rebar.config @@ -1,4 +1,4 @@ {deps, [ - {cowlib, ".*", {git, "git://github.com/ninenines/cowlib.git", "1.1.0"}}, + {cowlib, ".*", {git, "git://github.com/ninenines/cowlib.git", "1.2.0"}}, {ranch, ".*", {git, "git://github.com/ninenines/ranch.git", "1.0.0"}} ]}. diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 681470f..9ce4f9d 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -59,9 +59,8 @@ hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), - utf8_state = <<>> :: binary(), - recv_extensions = #{} :: map(), - send_extensions = #{} :: map() + utf8_state = 0 :: cow_ws:utf8_state(), + extensions = #{} :: map() }). -spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate) @@ -104,47 +103,55 @@ websocket_upgrade(State, Req) -> -spec websocket_extensions(#state{}, Req) -> {ok, #state{}, Req} when Req::cowboy_req:req(). websocket_extensions(State, Req) -> - case cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req) of - undefined -> - {ok, State, cowboy_req:set_meta(websocket_compress, false, Req)}; - Extensions -> - [Compress] = cowboy_req:get([resp_compress], Req), - case lists:keyfind(<<"x-webkit-deflate-frame">>, 1, Extensions) of - {<<"x-webkit-deflate-frame">>, []} when Compress =:= true -> - Inflate = zlib:open(), - Deflate = zlib:open(), - % Since we are negotiating an unconstrained deflate-frame - % then we must be willing to accept frames using the - % maximum window size which is 2^15. The negative value - % indicates that zlib headers are not used. - ok = zlib:inflateInit(Inflate, -15), - % Initialize the deflater with a window size of 2^15 bits and disable - % the zlib headers. - ok = zlib:deflateInit(Deflate, best_compression, deflated, -15, 8, default), - {ok, State#state{ - recv_extensions = #{deflate_frame => Inflate}, - send_extensions = #{deflate_frame => Deflate} - }, cowboy_req:set_meta(websocket_compress, true, Req)}; - _ -> - {ok, State, cowboy_req:set_meta(websocket_compress, false, Req)} - end + [Compress] = cowboy_req:get([resp_compress], Req), + Req2 = cowboy_req:set_meta(websocket_compress, false, Req), + case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req2)} of + {true, Extensions} when Extensions =/= undefined -> + websocket_extensions(State, Req2, Extensions, []); + _ -> + {ok, State, Req2} end. +websocket_extensions(State, Req, [], []) -> + {ok, State, Req}; +websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) -> + {ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)}; +websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> + %% @todo Make deflate options configurable. + Opts = #{level => best_compression, mem_level => 8, strategy => default}, + case cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of + {ok, RespExt, Extensions2} -> + Req2 = cowboy_req:set_meta(websocket_compress, true, Req), + websocket_extensions(State#state{extensions=Extensions2}, + Req2, Tail, [<<", ">>, RespExt|RespHeader]); + ignore -> + websocket_extensions(State, Req, Tail, RespHeader) + end; +websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> + %% @todo Make deflate options configurable. + Opts = #{level => best_compression, mem_level => 8, strategy => default}, + case cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts) of + {ok, RespExt, Extensions2} -> + Req2 = cowboy_req:set_meta(websocket_compress, true, Req), + websocket_extensions(State#state{extensions=Extensions2}, + Req2, Tail, [<<", ">>, RespExt|RespHeader]); + ignore -> + websocket_extensions(State, Req, Tail, RespHeader) + end; +websocket_extensions(State, Req, [_|Tail], RespHeader) -> + websocket_extensions(State, Req, Tail, RespHeader). + -spec websocket_handshake(#state{}, Req, any()) -> {ok, Req, cowboy_middleware:env()} | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(). -websocket_handshake(State=#state{ - transport=Transport, key=Key, recv_extensions=Extensions}, - Req, HandlerState) -> +websocket_handshake(State=#state{transport=Transport, key=Key}, Req, HandlerState) -> Challenge = base64:encode(crypto:hash(sha, << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)), - ExtHeader = case Extensions of - #{deflate_frame := _} -> [{<<"sec-websocket-extensions">>, <<"x-webkit-deflate-frame">>}]; - _ -> [] - end, - Req2 = cowboy_req:upgrade_reply(101, [{<<"upgrade">>, <<"websocket">>}, - {<<"sec-websocket-accept">>, Challenge}|ExtHeader], Req), + Req2 = cowboy_req:upgrade_reply(101, [ + {<<"upgrade">>, <<"websocket">>}, + {<<"sec-websocket-accept">>, Challenge} + ], Req), %% Flush the resp_sent message before moving on. receive {cowboy_req, resp_sent} -> ok after 0 -> ok end, State2 = handler_loop_timeout(State), @@ -203,44 +210,38 @@ handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error}, -> {ok, Req, cowboy_middleware:env()} | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(). -websocket_data(State=#state{frag_state=FragState, recv_extensions=Extensions}, Req, HandlerState, Data) -> +websocket_data(State=#state{frag_state=FragState, extensions=Extensions}, Req, HandlerState, Data) -> case cow_ws:parse_header(Data, Extensions, FragState) of %% All frames sent from the client to the server are masked. {_, _, _, _, undefined, _} -> websocket_close(State, Req, HandlerState, {error, badframe}); - %% No payload. - {Type, FragState2, _, 0, _, Rest} -> - websocket_dispatch(State#state{frag_state=FragState2}, Req, HandlerState, Type, <<>>, undefined, Rest); {Type, FragState2, Rsv, Len, MaskKey, Rest} -> - websocket_payload(State#state{frag_state=FragState2}, Req, HandlerState, Type, Len, MaskKey, Rsv, Rest); + websocket_payload(State#state{frag_state=FragState2}, Req, HandlerState, Type, Len, MaskKey, Rsv, undefined, <<>>, 0, Rest); more -> handler_before_loop(State, Req, HandlerState, Data); error -> websocket_close(State, Req, HandlerState, {error, badframe}) end. -websocket_payload(State, Req, HandlerState, Type = close, Len, MaskKey, Rsv, Data) -> - case cow_ws:parse_close_code(Data, MaskKey) of - {ok, CloseCode, Rest} -> - websocket_payload(State, Req, HandlerState, Type, Len - 2, MaskKey, Rsv, CloseCode, <<>>, 2, Rest); - error -> - websocket_close(State, Req, HandlerState, {error, badframe}) - end; -websocket_payload(State, Req, HandlerState, Type, Len, MaskKey, Rsv, Data) -> - websocket_payload(State, Req, HandlerState, Type, Len, MaskKey, Rsv, undefined, <<>>, 0, Data). - -websocket_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, recv_extensions=Extensions}, +websocket_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, Req, HandlerState, Type, Len, MaskKey, Rsv, CloseCode, Unmasked, UnmaskedLen, Data) -> case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen, Type, Len, FragState, Extensions, Rsv) of + {ok, CloseCode2, Payload, Utf8State, Rest} -> + websocket_dispatch(State#state{utf8_state=Utf8State}, + Req, HandlerState, Type, << Unmasked/binary, Payload/binary >>, CloseCode2, Rest); {ok, Payload, Utf8State, Rest} -> websocket_dispatch(State#state{utf8_state=Utf8State}, Req, HandlerState, Type, << Unmasked/binary, Payload/binary >>, CloseCode, Rest); + {more, CloseCode2, Payload, Utf8State} -> + websocket_payload_loop(State#state{utf8_state=Utf8State}, + Req, HandlerState, Type, Len - byte_size(Data), MaskKey, Rsv, CloseCode2, + << Unmasked/binary, Payload/binary >>, UnmaskedLen + byte_size(Data)); {more, Payload, Utf8State} -> websocket_payload_loop(State#state{utf8_state=Utf8State}, Req, HandlerState, Type, Len - byte_size(Data), MaskKey, Rsv, CloseCode, << Unmasked/binary, Payload/binary >>, UnmaskedLen + byte_size(Data)); - error -> - websocket_close(State, Req, HandlerState, {error, badencoding}) + Error = {error, _Reason} -> + websocket_close(State, Req, HandlerState, Error) end. websocket_payload_loop(State=#state{socket=Socket, transport=Transport, @@ -270,38 +271,28 @@ websocket_payload_loop(State=#state{socket=Socket, transport=Transport, end) end. -%% Continuation frame. -websocket_dispatch(State=#state{frag_state={nofin, _}, frag_buffer=SoFar}, - Req, HandlerState, fragment, Payload, _, RemainingData) -> - websocket_data(State#state{frag_buffer= << SoFar/binary, Payload/binary >>}, Req, HandlerState, RemainingData); -%% Last continuation frame. -websocket_dispatch(State=#state{frag_state={fin, Type}, frag_buffer=SoFar}, - Req, HandlerState, fragment, Payload, CloseCode, RemainingData) -> - websocket_dispatch(State#state{frag_state=undefined, frag_buffer= <<>>}, Req, HandlerState, - Type, << SoFar/binary, Payload/binary >>, CloseCode, RemainingData); -%% Text frame. -websocket_dispatch(State, Req, HandlerState, text, Payload, _, RemainingData) -> - handler_call(State, Req, HandlerState, RemainingData, - websocket_handle, {text, Payload}, fun websocket_data/4); -%% Binary frame. -websocket_dispatch(State, Req, HandlerState, binary, Payload, _, RemainingData) -> - handler_call(State, Req, HandlerState, RemainingData, - websocket_handle, {binary, Payload}, fun websocket_data/4); -%% Close control frame. -websocket_dispatch(State, Req, HandlerState, close, _, undefined, _) -> - websocket_close(State, Req, HandlerState, remote); -websocket_dispatch(State, Req, HandlerState, close, Payload, Code, _) -> - websocket_close(State, Req, HandlerState, {remote, Code, Payload}); -%% Ping control frame. Send a pong back and forward the ping to the handler. -websocket_dispatch(State=#state{socket=Socket, transport=Transport, send_extensions=Extensions}, - Req, HandlerState, ping, Payload, _, RemainingData) -> - Transport:send(Socket, cow_ws:frame({pong, Payload}, Extensions)), - handler_call(State, Req, HandlerState, RemainingData, - websocket_handle, {ping, Payload}, fun websocket_data/4); -%% Pong control frame. -websocket_dispatch(State, Req, HandlerState, pong, Payload, _, RemainingData) -> - handler_call(State, Req, HandlerState, RemainingData, - websocket_handle, {pong, Payload}, fun websocket_data/4). +websocket_dispatch(State=#state{socket=Socket, transport=Transport, frag_state=FragState, frag_buffer=SoFar, extensions=Extensions}, + Req, HandlerState, Type0, Payload0, CloseCode0, RemainingData) -> + case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of + %% @todo Allow receiving fragments. + {fragment, nofin, _, Payload} -> + websocket_data(State#state{frag_buffer= << SoFar/binary, Payload/binary >>}, Req, HandlerState, RemainingData); + {fragment, fin, Type, Payload} -> + handler_call(State#state{frag_state=undefined, frag_buffer= <<>>}, Req, HandlerState, RemainingData, + websocket_handle, {Type, << SoFar/binary, Payload/binary >>}, fun websocket_data/4); + close -> + websocket_close(State, Req, HandlerState, remote); + {close, CloseCode, Payload} -> + websocket_close(State, Req, HandlerState, {remote, CloseCode, Payload}); + Frame = ping -> + Transport:send(Socket, cow_ws:frame(pong, Extensions)), + handler_call(State, Req, HandlerState, RemainingData, websocket_handle, Frame, fun websocket_data/4); + Frame = {ping, Payload} -> + Transport:send(Socket, cow_ws:frame({pong, Payload}, Extensions)), + handler_call(State, Req, HandlerState, RemainingData, websocket_handle, Frame, fun websocket_data/4); + Frame -> + handler_call(State, Req, HandlerState, RemainingData, websocket_handle, Frame, fun websocket_data/4) + end. -spec handler_call(#state{}, Req, any(), binary(), atom(), any(), fun()) -> {ok, Req, cowboy_middleware:env()} @@ -370,7 +361,7 @@ handler_call(State=#state{handler=Handler}, Req, HandlerState, end. -spec websocket_send(cow_ws:frame(), #state{}) -> ok | stop | {error, atom()}. -websocket_send(Frame, #state{socket=Socket, transport=Transport, send_extensions=Extensions}) -> +websocket_send(Frame, #state{socket=Socket, transport=Transport, extensions=Extensions}) -> Res = Transport:send(Socket, cow_ws:frame(Frame, Extensions)), case Frame of close -> stop; @@ -392,7 +383,7 @@ websocket_send_many([Frame|Tail], State) -> -spec websocket_close(#state{}, Req, any(), terminate_reason()) -> {ok, Req, cowboy_middleware:env()} when Req::cowboy_req:req(). -websocket_close(State=#state{socket=Socket, transport=Transport, send_extensions=Extensions}, +websocket_close(State=#state{socket=Socket, transport=Transport, extensions=Extensions}, Req, HandlerState, Reason) -> case Reason of Normal when Normal =:= stop; Normal =:= timeout -> diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index e341e10..c47c293 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -41,7 +41,9 @@ init_per_group(Name = autobahn, Config) -> {skip, "Autobahn Test Suite not installed."}; _ -> {ok, _} = cowboy:start_http(Name, 100, [{port, 33080}], [ - {env, [{dispatch, init_dispatch()}]}]), + {env, [{dispatch, init_dispatch()}]}, + {compress, true} + ]), Config end; init_per_group(Name = ws, Config) -> |