From 0b15ed914dd84e15a7d354807696d9bfa82bbebb Mon Sep 17 00:00:00 2001 From: Magnus Klaar Date: Sun, 1 Apr 2012 18:58:28 -0700 Subject: Add support for fragmented websocket messages --- src/cowboy_http_websocket.erl | 138 ++++++++++++++++++++++++++++++++++-------- test/ws_SUITE.erl | 63 ++++++++++++++++++- 2 files changed, 172 insertions(+), 29 deletions(-) diff --git a/src/cowboy_http_websocket.erl b/src/cowboy_http_websocket.erl index 40fef23..1b9a591 100644 --- a/src/cowboy_http_websocket.erl +++ b/src/cowboy_http_websocket.erl @@ -46,6 +46,14 @@ -type opcode() :: 0 | 1 | 2 | 8 | 9 | 10. -type mask_key() :: 0..16#ffffffff. +%% The websocket_data/4 function may be called multiple times for a message. +%% The websocket_dispatch/4 function is only called once for each message. +-type frag_state() :: + undefined | %% no fragmentation has been seen. + {nofin, opcode()} | %% first fragment has been seen. + {nofin, opcode(), binary()} | %% first fragment has been unmasked. + {fin, opcode(), binary()}. %% last fragment has been seen. + -record(state, { version :: 0 | 7 | 8 | 13, handler :: module(), @@ -56,7 +64,8 @@ messages = undefined :: undefined | {atom(), atom(), atom()}, hibernate = false :: boolean(), eop :: undefined | tuple(), %% hixie-76 specific. - origin = undefined :: undefined | binary() %% hixie-76 specific. + origin = undefined :: undefined | binary(), %% hixie-76 specific. + frag_state = undefined :: frag_state() }). %% @doc Upgrade a HTTP request to the WebSocket protocol. @@ -266,31 +275,94 @@ websocket_data(State=#state{version=0, eop=EOP}, Req, HandlerState, websocket_data(State=#state{version=Version}, Req, HandlerState, Data) when Version =/= 0, byte_size(Data) =:= 1 -> handler_before_loop(State, Req, HandlerState, Data); -%% hybi data frame. -%% @todo Handle Fin. -websocket_data(State=#state{version=Version}, Req, HandlerState, Data) - when Version =/= 0 -> - << 1:1, 0:3, Opcode:4, Mask:1, PayloadLen:7, Rest/bits >> = Data, - case {PayloadLen, Rest} of - {126, _} when Opcode >= 8 -> websocket_close( - State, Req, HandlerState, {error, protocol}); - {127, _} when Opcode >= 8 -> websocket_close( - State, Req, HandlerState, {error, protocol}); - {126, << L:16, R/bits >>} -> websocket_before_unmask( - State, Req, HandlerState, Data, R, Opcode, Mask, L); - {126, Rest} -> websocket_before_unmask( - State, Req, HandlerState, Data, Rest, Opcode, Mask, undefined); - {127, << 0:1, L:63, R/bits >>} -> websocket_before_unmask( - State, Req, HandlerState, Data, R, Opcode, Mask, L); - {127, Rest} -> websocket_before_unmask( - State, Req, HandlerState, Data, Rest, Opcode, Mask, undefined); - {PayloadLen, Rest} -> websocket_before_unmask( - State, Req, HandlerState, Data, Rest, Opcode, Mask, PayloadLen) - end; -%% Something was wrong with the frame. Close the connection. -websocket_data(State, Req, HandlerState, _Bad) -> +%% 7 bit payload length prefix exists +websocket_data(State, Req, HandlerState, + << Fin:1, Rsv:3, Opcode:4, Mask:1, PayloadLen:7, Rest/bits >> + = Data) when PayloadLen < 126 -> + websocket_data(State, Req, HandlerState, + Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data); +%% 7+16 bits payload length prefix exists +websocket_data(State, Req, HandlerState, + << Fin:1, Rsv:3, Opcode:4, Mask:1, 126:7, PayloadLen:16, Rest/bits >> + = Data) when PayloadLen > 125 -> + websocket_data(State, Req, HandlerState, + Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data); +%% 7+16 bits payload length prefix missing +websocket_data(State, Req, HandlerState, + << _Fin:1, _Rsv:3, _Opcode:4, _Mask:1, 126:7, Rest/bits >> + = Data) when byte_size(Rest) < 2 -> + handler_before_loop(State, Req, HandlerState, Data); +%% 7+64 bits payload length prefix exists +websocket_data(State, Req, HandlerState, + << Fin:1, Rsv:3, Opcode:4, Mask:1, 127:7, 0:1, PayloadLen:63, + Rest/bits >> = Data) when PayloadLen > 16#FFFF -> + websocket_data(State, Req, HandlerState, + Fin, Rsv, Opcode, Mask, PayloadLen, Rest, Data); +%% 7+64 bits payload length prefix missing +websocket_data(State, Req, HandlerState, + << _Fin:1, _Rsv:3, _Opcode:4, _Mask:1, 127:7, Rest/bits >> + = Data) when byte_size(Rest) < 8 -> + handler_before_loop(State, Req, HandlerState, Data); +%% invalid payload length prefix. +websocket_data(State, Req, HandlerState, _Data) -> websocket_close(State, Req, HandlerState, {error, badframe}). + +-spec websocket_data(#state{}, #http_req{}, any(), non_neg_integer(), + non_neg_integer(), non_neg_integer(), non_neg_integer(), + non_neg_integer(), binary(), binary()) -> closed. +%% A fragmented message MUST start a non-zero opcode. +websocket_data(State=#state{frag_state=undefined}, Req, HandlerState, + _Fin=0, _Rsv=0, _Opcode=0, _Mask, _PayloadLen, _Rest, _Buffer) -> + websocket_close(State, Req, HandlerState, {error, badframe}); +%% A control message MUST NOT be fragmented. +websocket_data(State, Req, HandlerState, _Fin=0, _Rsv=0, Opcode, _Mask, + _PayloadLen, _Rest, _Buffer) when Opcode >= 8 -> + websocket_close(State, Req, HandlerState, {error, badframe}); +%% The opcode is only included in the first message fragment. +websocket_data(State=#state{frag_state=undefined}, Req, HandlerState, + _Fin=0, _Rsv=0, Opcode, Mask, PayloadLen, Rest, Data) -> + websocket_before_unmask( + State#state{frag_state={nofin, Opcode}}, Req, HandlerState, + Data, Rest, 0, Mask, PayloadLen); +%% non-control opcode when expecting control message or next fragment. +websocket_data(State=#state{frag_state={nofin, _, _}}, Req, HandlerState, _Fin, + _Rsv=0, Opcode, _Mask, _Ln, _Rest, _Data) when Opcode > 0, Opcode < 8 -> + websocket_close(State, Req, HandlerState, {error, badframe}); +%% If the first message fragment was incomplete, retry unmasking. +websocket_data(State=#state{frag_state={nofin, Opcode}}, Req, HandlerState, + _Fin=0, _Rsv=0, Opcode, Mask, PayloadLen, Rest, Data) -> + websocket_before_unmask( + State#state{frag_state={nofin, Opcode}}, Req, HandlerState, + Data, Rest, 0, Mask, PayloadLen); +%% if the opcode is zero and the fin flag is zero, unmask and await next. +websocket_data(State=#state{frag_state={nofin, _Opcode, _Payloads}}, Req, + HandlerState, _Fin=0, _Rsv=0, _Opcode2=0, Mask, PayloadLen, Rest, + Data) -> + websocket_before_unmask( + State, Req, HandlerState, Data, Rest, 0, Mask, PayloadLen); +%% when the last fragment is seen. Update the fragmentation status. +websocket_data(State=#state{frag_state={nofin, Opcode, Payloads}}, Req, + HandlerState, _Fin=1, _Rsv=0, _Opcode=0, Mask, PayloadLen, Rest, + Data) -> + websocket_before_unmask( + State#state{frag_state={fin, Opcode, Payloads}}, + Req, HandlerState, Data, Rest, 0, Mask, PayloadLen); +%% control messages MUST NOT use 7+16 bits or 7+64 bits payload length prefixes +websocket_data(State, Req, HandlerState, _Fin, _Rsv, Opcode, _Mask, PayloadLen, + _Rest, _Data) when Opcode >= 8, PayloadLen > 125 -> + websocket_close(State, Req, HandlerState, {error, protocol}); +%% unfragmented message. unmask and dispatch the message. +websocket_data(State=#state{version=Version}, Req, HandlerState, _Fin=1, _Rsv=0, + Opcode, Mask, PayloadLen, Rest, Data) when Version =/= 0 -> + websocket_before_unmask( + State, Req, HandlerState, Data, Rest, Opcode, Mask, PayloadLen); +%% Something was wrong with the frame. Close the connection. +websocket_data(State, Req, HandlerState, _Fin, _Rsv, _Opcode, _Mask, + _PayloadLen, _Rest, _Data) -> + websocket_close(State, Req, HandlerState, {error, badframe}). + + %% hybi routing depending on whether unmasking is needed. -spec websocket_before_unmask(#state{}, #http_req{}, any(), binary(), binary(), opcode(), 0 | 1, non_neg_integer() | undefined) -> closed. @@ -349,8 +421,22 @@ websocket_unmask(State, Req, HandlerState, RemainingData, %% hybi dispatching. -spec websocket_dispatch(#state{}, #http_req{}, any(), binary(), opcode(), binary()) -> closed. -%% @todo Fragmentation. -%~ websocket_dispatch(State, Req, HandlerState, RemainingData, 0, Payload) -> +%% First frame of a fragmented message unmasked. Expect intermediate or last. +websocket_dispatch(State=#state{frag_state={nofin, Opcode}}, Req, HandlerState, + RemainingData, 0, Payload) -> + websocket_data(State#state{frag_state={nofin, Opcode, Payload}}, + Req, HandlerState, RemainingData); +%% Intermediate frame of a fragmented message unmasked. Add payload to buffer. +websocket_dispatch(State=#state{frag_state={nofin, Opcode, Payloads}}, Req, + HandlerState, RemainingData, 0, Payload) -> + websocket_data(State#state{frag_state={nofin, Opcode, + <>}}, Req, HandlerState, + RemainingData); +%% Last frame of a fragmented message unmasked. Dispatch to handler. +websocket_dispatch(State=#state{frag_state={fin, Opcode, Payloads}}, Req, + HandlerState, RemainingData, 0, Payload) -> + websocket_dispatch(State#state{frag_state=undefined}, Req, HandlerState, + RemainingData, Opcode, <>); %% Text frame. websocket_dispatch(State, Req, HandlerState, RemainingData, 1, Payload) -> handler_call(State, Req, HandlerState, RemainingData, diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 136833f..0c5e2c8 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -19,7 +19,7 @@ -export([all/0, groups/0, init_per_suite/1, end_per_suite/1, init_per_group/2, end_per_group/2]). %% ct. -export([ws0/1, ws8/1, ws8_single_bytes/1, ws8_init_shutdown/1, - ws13/1, ws_timeout_hibernate/1]). %% ws. + ws13/1, ws_timeout_hibernate/1, ws_text_fragments/1]). %% ws. %% ct. @@ -28,7 +28,7 @@ all() -> groups() -> BaseTests = [ws0, ws8, ws8_single_bytes, ws8_init_shutdown, ws13, - ws_timeout_hibernate], + ws_timeout_hibernate, ws_text_fragments], [{ws, [], BaseTests}]. init_per_suite(Config) -> @@ -60,7 +60,8 @@ init_dispatch() -> {[<<"localhost">>], [ {[<<"websocket">>], websocket_handler, []}, {[<<"ws_timeout_hibernate">>], ws_timeout_hibernate_handler, []}, - {[<<"ws_init_shutdown">>], websocket_handler_init_shutdown, []} + {[<<"ws_init_shutdown">>], websocket_handler_init_shutdown, []}, + {[<<"ws_echo_handler">>], websocket_echo_handler, []} ]} ]. @@ -310,6 +311,62 @@ ws13(Config) -> {error, closed} = gen_tcp:recv(Socket, 0, 6000), ok. +ws_text_fragments(Config) -> + {port, Port} = lists:keyfind(port, 1, Config), + {ok, Socket} = gen_tcp:connect("localhost", Port, + [binary, {active, false}, {packet, raw}]), + ok = gen_tcp:send(Socket, [ + "GET /ws_echo_handler HTTP/1.1\r\n" + "Host: localhost\r\n" + "Connection: Upgrade\r\n" + "Upgrade: websocket\r\n" + "Sec-WebSocket-Origin: http://localhost\r\n" + "Sec-WebSocket-Version: 8\r\n" + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n" + "\r\n"]), + {ok, Handshake} = gen_tcp:recv(Socket, 0, 6000), + {ok, {http_response, {1, 1}, 101, "Switching Protocols"}, Rest} + = erlang:decode_packet(http, Handshake, []), + [Headers, <<>>] = websocket_headers( + erlang:decode_packet(httph, Rest, []), []), + {'Connection', "Upgrade"} = lists:keyfind('Connection', 1, Headers), + {'Upgrade', "websocket"} = lists:keyfind('Upgrade', 1, Headers), + {"sec-websocket-accept", "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="} + = lists:keyfind("sec-websocket-accept", 1, Headers), + + ok = gen_tcp:send(Socket, [ + << 0:1, 0:3, 1:4, 1:1, 5:7 >>, + << 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>, + << 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]), + ok = gen_tcp:send(Socket, [ + << 1:1, 0:3, 0:4, 1:1, 5:7 >>, + << 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>, + << 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]), + {ok, << 1:1, 0:3, 1:4, 0:1, 10:7, "HelloHello" >>} + = gen_tcp:recv(Socket, 0, 6000), + + ok = gen_tcp:send(Socket, [ + %% #1 + << 0:1, 0:3, 1:4, 1:1, 5:7 >>, + << 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>, + << 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>, + %% #2 + << 0:1, 0:3, 0:4, 1:1, 5:7 >>, + << 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>, + << 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>, + %% #3 + << 1:1, 0:3, 0:4, 1:1, 5:7 >>, + << 16#37 >>, << 16#fa >>, << 16#21 >>, << 16#3d >>, << 16#7f >>, + << 16#9f >>, << 16#4d >>, << 16#51 >>, << 16#58 >>]), + {ok, << 1:1, 0:3, 1:4, 0:1, 15:7, "HelloHelloHello" >>} + = gen_tcp:recv(Socket, 0, 6000), + + ok = gen_tcp:send(Socket, << 1:1, 0:3, 8:4, 0:8 >>), %% close + {ok, << 1:1, 0:3, 8:4, 0:8 >>} = gen_tcp:recv(Socket, 0, 6000), + {error, closed} = gen_tcp:recv(Socket, 0, 6000), + ok. + + websocket_headers({ok, http_eoh, Rest}, Acc) -> [Acc, Rest]; websocket_headers({ok, {http_header, _I, Key, _R, Value}, Rest}, Acc) -> -- cgit v1.2.3 From 80c67bff33939a14e445ee93e37e8db5778fdfdc Mon Sep 17 00:00:00 2001 From: Magnus Klaar Date: Fri, 6 Apr 2012 17:53:53 +0200 Subject: Update autobahn suite to use autobahntestsuite --- test/autobahn_SUITE_data/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/autobahn_SUITE_data/test.py b/test/autobahn_SUITE_data/test.py index 422cb41..c528c64 100755 --- a/test/autobahn_SUITE_data/test.py +++ b/test/autobahn_SUITE_data/test.py @@ -29,7 +29,7 @@ def install_env(env): subprocess.check_call(["curl", "-sS", VIRTUALENV_URL, "-o", VIRTUALENV_BIN]) subprocess.check_call(["python", VIRTUALENV_BIN, env]) activate_env(env) - subprocess.check_call([PIP_BIN, "install", "Autobahn"]) + subprocess.check_call([PIP_BIN, "install", "AutobahnTestSuite"]) def client_config(): """ @@ -54,7 +54,7 @@ def run_test(env, config): activate_env(env) from twisted.python import log from twisted.internet import reactor - from autobahn.fuzzing import FuzzingClientFactory + from autobahntestsuite.fuzzing import FuzzingClientFactory os.chdir(AB_TESTS_PRIV) log.startLogging(sys.stdout) fuzzer = FuzzingClientFactory(config) -- cgit v1.2.3