From 8404b1c908ac890925496ce839e5b2b2b407a6f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 11 Sep 2018 14:33:58 +0200 Subject: Add a commands-based interface to Websocket handlers This feature is currently experimental. It will become the preferred way to use Websocket handlers once it becomes documented. A commands-based interface enables adding commands without having to change the interface much. It mirrors the interface of stream handlers or gen_statem. It will enable adding commands that have been needed for some time but were not implemented for fear of making the interface too complex. --- src/cowboy_websocket.erl | 46 +++++++- test/handlers/ws_handle_commands_h.erl | 31 +++++ test/handlers/ws_info_commands_h.erl | 32 +++++ test/handlers/ws_init_commands_h.erl | 30 +++++ test/ws_handler_SUITE.erl | 207 +++++++++++++++++++++++++++++++++ 5 files changed, 342 insertions(+), 4 deletions(-) create mode 100644 test/handlers/ws_handle_commands_h.erl create mode 100644 test/handlers/ws_info_commands_h.erl create mode 100644 test/handlers/ws_init_commands_h.erl create mode 100644 test/ws_handler_SUITE.erl diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 067019c..2aa56b9 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -31,7 +31,12 @@ -export([system_terminate/4]). -export([system_code_change/4]). --type call_result(State) :: {ok, State} +-type commands() :: [cow_ws:frame()]. +-export_type([commands/0]). + +-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}. + +-type deprecated_call_result(State) :: {ok, State} | {ok, State, hibernate} | {reply, cow_ws:frame() | [cow_ws:frame()], State} | {reply, cow_ws:frame() | [cow_ws:frame()], State, hibernate} @@ -48,13 +53,13 @@ when Req::cowboy_req:req(). -callback websocket_init(State) - -> call_result(State) when State::any(). + -> call_result(State) | deprecated_call_result(State) when State::any(). -optional_callbacks([websocket_init/1]). -callback websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State) - -> call_result(State) when State::any(). + -> call_result(State) | deprecated_call_result(State) when State::any(). -callback websocket_info(any(), State) - -> call_result(State) when State::any(). + -> call_result(State) | deprecated_call_result(State) when State::any(). -callback terminate(any(), cowboy_req:req(), any()) -> ok. -optional_callbacks([terminate/3]). @@ -457,6 +462,13 @@ handler_call(State=#state{handler=Handler}, HandlerState, websocket_init -> Handler:websocket_init(HandlerState); _ -> Handler:Callback(Message, HandlerState) end of + {Commands, HandlerState2} when is_list(Commands) -> + handler_call_result(State, + HandlerState2, ParseState, NextState, Commands); + {Commands, HandlerState2, hibernate} when is_list(Commands) -> + handler_call_result(State#state{hibernate=true}, + HandlerState2, ParseState, NextState, Commands); + %% The following call results are deprecated. {ok, HandlerState2} -> NextState(State, HandlerState2, ParseState); {ok, HandlerState2, hibernate} -> @@ -488,6 +500,32 @@ handler_call(State=#state{handler=Handler}, HandlerState, erlang:raise(Class, Reason, erlang:get_stacktrace()) end. +-spec handler_call_result(#state{}, any(), parse_state(), fun(), commands()) -> no_return(). +handler_call_result(State0, HandlerState, ParseState, NextState, Commands) -> + case commands(Commands, State0, []) of + {ok, State} -> + NextState(State, HandlerState, ParseState); + {stop, State} -> + terminate(State, HandlerState, stop); + {Error = {error, _}, State} -> + terminate(State, HandlerState, Error) + end. + +commands([], State, []) -> + {ok, State}; +commands([], State, Data) -> + Result = transport_send(State, nofin, lists:reverse(Data)), + {Result, State}; +commands([Frame|Tail], State=#state{extensions=Extensions}, Data0) -> + Data = [cow_ws:frame(Frame, Extensions)|Data0], + case is_close_frame(Frame) of + true -> + _ = transport_send(State, fin, lists:reverse(Data)), + {stop, State}; + false -> + commands(Tail, State, Data) + end. + transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) -> Pid ! {Stream, {data, IsFin, Data}}, ok; diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl new file mode 100644 index 0000000..da3ffad --- /dev/null +++ b/test/handlers/ws_handle_commands_h.erl @@ -0,0 +1,31 @@ +%% This module takes commands from the x-commands header +%% and returns them in the websocket_handle/2 callback. + +-module(ws_handle_commands_h). +-behavior(cowboy_websocket). + +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). + +init(Req=#{pid := Pid}, RunOrHibernate) -> + Commands0 = cowboy_req:header(<<"x-commands">>, Req), + Commands = binary_to_term(base64:decode(Commands0)), + case Commands of + bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> ok + end, + {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + +websocket_init(State) -> + {[], State}. + +websocket_handle(_, State={Commands, run}) -> + {Commands, State}; +websocket_handle(_, State={Commands, hibernate}) -> + {Commands, State, hibernate}. + +websocket_info(_, State) -> + {[], State}. + diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl new file mode 100644 index 0000000..d596473 --- /dev/null +++ b/test/handlers/ws_info_commands_h.erl @@ -0,0 +1,32 @@ +%% This module takes commands from the x-commands header +%% and returns them in the websocket_info/2 callback. +%% This callback is triggered via a message. + +-module(ws_info_commands_h). +-behavior(cowboy_websocket). + +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). + +init(Req=#{pid := Pid}, RunOrHibernate) -> + Commands0 = cowboy_req:header(<<"x-commands">>, Req), + Commands = binary_to_term(base64:decode(Commands0)), + case Commands of + bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> ok + end, + {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + +websocket_init(State) -> + self() ! shoot, + {[], State}. + +websocket_handle(_, State) -> + {[], State}. + +websocket_info(_, State={Commands, run}) -> + {Commands, State}; +websocket_info(_, State={Commands, hibernate}) -> + {Commands, State, hibernate}. diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl new file mode 100644 index 0000000..8bae352 --- /dev/null +++ b/test/handlers/ws_init_commands_h.erl @@ -0,0 +1,30 @@ +%% This module takes commands from the x-commands header +%% and returns them in the websocket_init/1 callback. + +-module(ws_init_commands_h). +-behavior(cowboy_websocket). + +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). + +init(Req=#{pid := Pid}, RunOrHibernate) -> + Commands0 = cowboy_req:header(<<"x-commands">>, Req), + Commands = binary_to_term(base64:decode(Commands0)), + case Commands of + bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6); + _ -> ok + end, + {cowboy_websocket, Req, {Commands, RunOrHibernate}}. + +websocket_init(State={Commands, run}) -> + {Commands, State}; +websocket_init(State={Commands, hibernate}) -> + {Commands, State, hibernate}. + +websocket_handle(_, State) -> + {[], State}. + +websocket_info(_, State) -> + {[], State}. diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl new file mode 100644 index 0000000..4848847 --- /dev/null +++ b/test/ws_handler_SUITE.erl @@ -0,0 +1,207 @@ +%% Copyright (c) 2018, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(ws_handler_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [config/2]). +-import(ct_helper, [doc/1]). +-import(cowboy_test, [gun_open/1]). +-import(cowboy_test, [gun_down/1]). + +%% ct. + +all() -> + [{group, ws}, {group, ws_hibernate}]. + +groups() -> + AllTests = ct_helper:all(?MODULE), + [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}]. + +init_per_group(Name, Config) -> + cowboy_test:init_http(Name, #{ + env => #{dispatch => init_dispatch(Name)} + }, Config). + +end_per_group(Name, _) -> + cowboy:stop_listener(Name). + +%% Dispatch configuration. + +init_dispatch(Name) -> + RunOrHibernate = case Name of + ws -> run; + ws_hibernate -> hibernate + end, + cowboy_router:compile([{'_', [ + {"/init", ws_init_commands_h, RunOrHibernate}, + {"/handle", ws_handle_commands_h, RunOrHibernate}, + {"/info", ws_info_commands_h, RunOrHibernate} + ]}]). + +%% Support functions for testing using Gun. + +gun_open_ws(Config, Path, Commands) -> + ConnPid = gun_open(Config), + StreamRef = gun:ws_upgrade(ConnPid, Path, [ + {<<"x-commands">>, base64:encode(term_to_binary(Commands))} + ]), + receive + {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} -> + {ok, ConnPid, StreamRef}; + {gun_response, ConnPid, _, _, Status, Headers} -> + exit({ws_upgrade_failed, Status, Headers}); + {gun_error, ConnPid, StreamRef, Reason} -> + exit({ws_upgrade_failed, Reason}) + after 1000 -> + error(timeout) + end. + +receive_ws(ConnPid, StreamRef) -> + receive + {gun_ws, ConnPid, StreamRef, Frame} -> + {ok, Frame} + after 1000 -> + {error, timeout} + end. + +ensure_handle_is_called(ConnPid, "/handle") -> + gun:ws_send(ConnPid, {text, <<"Necessary to trigger websocket_handle/2.">>}); +ensure_handle_is_called(_, _) -> + ok. + +%% Tests. + +websocket_init_nothing(Config) -> + doc("Nothing happens when websocket_init/1 returns no commands."), + do_nothing(Config, "/init"). + +websocket_handle_nothing(Config) -> + doc("Nothing happens when websocket_handle/2 returns no commands."), + do_nothing(Config, "/handle"). + +websocket_info_nothing(Config) -> + doc("Nothing happens when websocket_info/2 returns no commands."), + do_nothing(Config, "/info"). + +do_nothing(Config, Path) -> + {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, []), + ensure_handle_is_called(ConnPid, Path), + {error, timeout} = receive_ws(ConnPid, StreamRef), + ok. + +websocket_init_invalid(Config) -> + doc("The connection must be closed when websocket_init/1 returns an invalid command."), + do_invalid(Config, "/init"). + +websocket_handle_invalid(Config) -> + doc("The connection must be closed when websocket_handle/2 returns an invalid command."), + do_invalid(Config, "/init"). + +websocket_info_invalid(Config) -> + doc("The connection must be closed when websocket_info/2 returns an invalid command."), + do_invalid(Config, "/info"). + +do_invalid(Config, Path) -> + {ok, ConnPid, _} = gun_open_ws(Config, Path, bad), + ensure_handle_is_called(ConnPid, Path), + gun_down(ConnPid). + +websocket_init_one_frame(Config) -> + doc("A single frame is received when websocket_init/1 returns it as a command."), + do_one_frame(Config, "/init"). + +websocket_handle_one_frame(Config) -> + doc("A single frame is received when websocket_handle/2 returns it as a command."), + do_one_frame(Config, "/handle"). + +websocket_info_one_frame(Config) -> + doc("A single frame is received when websocket_info/2 returns it as a command."), + do_one_frame(Config, "/info"). + +do_one_frame(Config, Path) -> + {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [ + {text, <<"One frame!">>} + ]), + ensure_handle_is_called(ConnPid, Path), + {ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef), + ok. + +websocket_init_many_frames(Config) -> + doc("Multiple frames are received when websocket_init/1 returns them as commands."), + do_many_frames(Config, "/init"). + +websocket_handle_many_frames(Config) -> + doc("Multiple frames are received when websocket_handle/2 returns them as commands."), + do_many_frames(Config, "/handle"). + +websocket_info_many_frames(Config) -> + doc("Multiple frames are received when websocket_info/2 returns them as commands."), + do_many_frames(Config, "/info"). + +do_many_frames(Config, Path) -> + {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [ + {text, <<"One frame!">>}, + {binary, <<"Two frames!">>} + ]), + ensure_handle_is_called(ConnPid, Path), + {ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef), + {ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef), + ok. + +websocket_init_close_frame(Config) -> + doc("A single close frame is received when websocket_init/1 returns it as a command."), + do_close_frame(Config, "/init"). + +websocket_handle_close_frame(Config) -> + doc("A single close frame is received when websocket_handle/2 returns it as a command."), + do_close_frame(Config, "/handle"). + +websocket_info_close_frame(Config) -> + doc("A single close frame is received when websocket_info/2 returns it as a command."), + do_close_frame(Config, "/info"). + +do_close_frame(Config, Path) -> + {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [close]), + ensure_handle_is_called(ConnPid, Path), + {ok, close} = receive_ws(ConnPid, StreamRef), + gun_down(ConnPid). + +websocket_init_many_frames_then_close_frame(Config) -> + doc("Multiple frames are received followed by a close frame " + "when websocket_init/1 returns them as commands."), + do_many_frames_then_close_frame(Config, "/init"). + +websocket_handle_many_frames_then_close_frame(Config) -> + doc("Multiple frames are received followed by a close frame " + "when websocket_handle/2 returns them as commands."), + do_many_frames_then_close_frame(Config, "/handle"). + +websocket_info_many_frames_then_close_frame(Config) -> + doc("Multiple frames are received followed by a close frame " + "when websocket_info/2 returns them as commands."), + do_many_frames_then_close_frame(Config, "/info"). + +do_many_frames_then_close_frame(Config, Path) -> + {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [ + {text, <<"One frame!">>}, + {binary, <<"Two frames!">>}, + close + ]), + ensure_handle_is_called(ConnPid, Path), + {ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef), + {ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef), + {ok, close} = receive_ws(ConnPid, StreamRef), + gun_down(ConnPid). -- cgit v1.2.3