aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-09-11 14:33:58 +0200
committerLoïc Hoguin <[email protected]>2018-09-11 14:33:58 +0200
commit8404b1c908ac890925496ce839e5b2b2b407a6f7 (patch)
treeb717bdc70f5bcf3d355f00f01081a8f00ac5eee8
parent4b385749f2aab90b5c7e44e844159c0221a8790d (diff)
downloadcowboy-8404b1c908ac890925496ce839e5b2b2b407a6f7.tar.gz
cowboy-8404b1c908ac890925496ce839e5b2b2b407a6f7.tar.bz2
cowboy-8404b1c908ac890925496ce839e5b2b2b407a6f7.zip
Add a commands-based interface to Websocket handlers
This feature is currently experimental. It will become the preferred way to use Websocket handlers once it becomes documented. A commands-based interface enables adding commands without having to change the interface much. It mirrors the interface of stream handlers or gen_statem. It will enable adding commands that have been needed for some time but were not implemented for fear of making the interface too complex.
-rw-r--r--src/cowboy_websocket.erl46
-rw-r--r--test/handlers/ws_handle_commands_h.erl31
-rw-r--r--test/handlers/ws_info_commands_h.erl32
-rw-r--r--test/handlers/ws_init_commands_h.erl30
-rw-r--r--test/ws_handler_SUITE.erl207
5 files changed, 342 insertions, 4 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 067019c..2aa56b9 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -31,7 +31,12 @@
-export([system_terminate/4]).
-export([system_code_change/4]).
--type call_result(State) :: {ok, State}
+-type commands() :: [cow_ws:frame()].
+-export_type([commands/0]).
+
+-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}.
+
+-type deprecated_call_result(State) :: {ok, State}
| {ok, State, hibernate}
| {reply, cow_ws:frame() | [cow_ws:frame()], State}
| {reply, cow_ws:frame() | [cow_ws:frame()], State, hibernate}
@@ -48,13 +53,13 @@
when Req::cowboy_req:req().
-callback websocket_init(State)
- -> call_result(State) when State::any().
+ -> call_result(State) | deprecated_call_result(State) when State::any().
-optional_callbacks([websocket_init/1]).
-callback websocket_handle(ping | pong | {text | binary | ping | pong, binary()}, State)
- -> call_result(State) when State::any().
+ -> call_result(State) | deprecated_call_result(State) when State::any().
-callback websocket_info(any(), State)
- -> call_result(State) when State::any().
+ -> call_result(State) | deprecated_call_result(State) when State::any().
-callback terminate(any(), cowboy_req:req(), any()) -> ok.
-optional_callbacks([terminate/3]).
@@ -457,6 +462,13 @@ handler_call(State=#state{handler=Handler}, HandlerState,
websocket_init -> Handler:websocket_init(HandlerState);
_ -> Handler:Callback(Message, HandlerState)
end of
+ {Commands, HandlerState2} when is_list(Commands) ->
+ handler_call_result(State,
+ HandlerState2, ParseState, NextState, Commands);
+ {Commands, HandlerState2, hibernate} when is_list(Commands) ->
+ handler_call_result(State#state{hibernate=true},
+ HandlerState2, ParseState, NextState, Commands);
+ %% The following call results are deprecated.
{ok, HandlerState2} ->
NextState(State, HandlerState2, ParseState);
{ok, HandlerState2, hibernate} ->
@@ -488,6 +500,32 @@ handler_call(State=#state{handler=Handler}, HandlerState,
erlang:raise(Class, Reason, erlang:get_stacktrace())
end.
+-spec handler_call_result(#state{}, any(), parse_state(), fun(), commands()) -> no_return().
+handler_call_result(State0, HandlerState, ParseState, NextState, Commands) ->
+ case commands(Commands, State0, []) of
+ {ok, State} ->
+ NextState(State, HandlerState, ParseState);
+ {stop, State} ->
+ terminate(State, HandlerState, stop);
+ {Error = {error, _}, State} ->
+ terminate(State, HandlerState, Error)
+ end.
+
+commands([], State, []) ->
+ {ok, State};
+commands([], State, Data) ->
+ Result = transport_send(State, nofin, lists:reverse(Data)),
+ {Result, State};
+commands([Frame|Tail], State=#state{extensions=Extensions}, Data0) ->
+ Data = [cow_ws:frame(Frame, Extensions)|Data0],
+ case is_close_frame(Frame) of
+ true ->
+ _ = transport_send(State, fin, lists:reverse(Data)),
+ {stop, State};
+ false ->
+ commands(Tail, State, Data)
+ end.
+
transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
Pid ! {Stream, {data, IsFin, Data}},
ok;
diff --git a/test/handlers/ws_handle_commands_h.erl b/test/handlers/ws_handle_commands_h.erl
new file mode 100644
index 0000000..da3ffad
--- /dev/null
+++ b/test/handlers/ws_handle_commands_h.erl
@@ -0,0 +1,31 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_handle/2 callback.
+
+-module(ws_handle_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req=#{pid := Pid}, RunOrHibernate) ->
+ Commands0 = cowboy_req:header(<<"x-commands">>, Req),
+ Commands = binary_to_term(base64:decode(Commands0)),
+ case Commands of
+ bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ -> ok
+ end,
+ {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+
+websocket_init(State) ->
+ {[], State}.
+
+websocket_handle(_, State={Commands, run}) ->
+ {Commands, State};
+websocket_handle(_, State={Commands, hibernate}) ->
+ {Commands, State, hibernate}.
+
+websocket_info(_, State) ->
+ {[], State}.
+
diff --git a/test/handlers/ws_info_commands_h.erl b/test/handlers/ws_info_commands_h.erl
new file mode 100644
index 0000000..d596473
--- /dev/null
+++ b/test/handlers/ws_info_commands_h.erl
@@ -0,0 +1,32 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_info/2 callback.
+%% This callback is triggered via a message.
+
+-module(ws_info_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req=#{pid := Pid}, RunOrHibernate) ->
+ Commands0 = cowboy_req:header(<<"x-commands">>, Req),
+ Commands = binary_to_term(base64:decode(Commands0)),
+ case Commands of
+ bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ -> ok
+ end,
+ {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+
+websocket_init(State) ->
+ self() ! shoot,
+ {[], State}.
+
+websocket_handle(_, State) ->
+ {[], State}.
+
+websocket_info(_, State={Commands, run}) ->
+ {Commands, State};
+websocket_info(_, State={Commands, hibernate}) ->
+ {Commands, State, hibernate}.
diff --git a/test/handlers/ws_init_commands_h.erl b/test/handlers/ws_init_commands_h.erl
new file mode 100644
index 0000000..8bae352
--- /dev/null
+++ b/test/handlers/ws_init_commands_h.erl
@@ -0,0 +1,30 @@
+%% This module takes commands from the x-commands header
+%% and returns them in the websocket_init/1 callback.
+
+-module(ws_init_commands_h).
+-behavior(cowboy_websocket).
+
+-export([init/2]).
+-export([websocket_init/1]).
+-export([websocket_handle/2]).
+-export([websocket_info/2]).
+
+init(Req=#{pid := Pid}, RunOrHibernate) ->
+ Commands0 = cowboy_req:header(<<"x-commands">>, Req),
+ Commands = binary_to_term(base64:decode(Commands0)),
+ case Commands of
+ bad -> ct_helper_error_h:ignore(Pid, cowboy_websocket, handler_call, 6);
+ _ -> ok
+ end,
+ {cowboy_websocket, Req, {Commands, RunOrHibernate}}.
+
+websocket_init(State={Commands, run}) ->
+ {Commands, State};
+websocket_init(State={Commands, hibernate}) ->
+ {Commands, State, hibernate}.
+
+websocket_handle(_, State) ->
+ {[], State}.
+
+websocket_info(_, State) ->
+ {[], State}.
diff --git a/test/ws_handler_SUITE.erl b/test/ws_handler_SUITE.erl
new file mode 100644
index 0000000..4848847
--- /dev/null
+++ b/test/ws_handler_SUITE.erl
@@ -0,0 +1,207 @@
+%% Copyright (c) 2018, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(ws_handler_SUITE).
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-import(ct_helper, [config/2]).
+-import(ct_helper, [doc/1]).
+-import(cowboy_test, [gun_open/1]).
+-import(cowboy_test, [gun_down/1]).
+
+%% ct.
+
+all() ->
+ [{group, ws}, {group, ws_hibernate}].
+
+groups() ->
+ AllTests = ct_helper:all(?MODULE),
+ [{ws, [parallel], AllTests}, {ws_hibernate, [parallel], AllTests}].
+
+init_per_group(Name, Config) ->
+ cowboy_test:init_http(Name, #{
+ env => #{dispatch => init_dispatch(Name)}
+ }, Config).
+
+end_per_group(Name, _) ->
+ cowboy:stop_listener(Name).
+
+%% Dispatch configuration.
+
+init_dispatch(Name) ->
+ RunOrHibernate = case Name of
+ ws -> run;
+ ws_hibernate -> hibernate
+ end,
+ cowboy_router:compile([{'_', [
+ {"/init", ws_init_commands_h, RunOrHibernate},
+ {"/handle", ws_handle_commands_h, RunOrHibernate},
+ {"/info", ws_info_commands_h, RunOrHibernate}
+ ]}]).
+
+%% Support functions for testing using Gun.
+
+gun_open_ws(Config, Path, Commands) ->
+ ConnPid = gun_open(Config),
+ StreamRef = gun:ws_upgrade(ConnPid, Path, [
+ {<<"x-commands">>, base64:encode(term_to_binary(Commands))}
+ ]),
+ receive
+ {gun_upgrade, ConnPid, StreamRef, [<<"websocket">>], _} ->
+ {ok, ConnPid, StreamRef};
+ {gun_response, ConnPid, _, _, Status, Headers} ->
+ exit({ws_upgrade_failed, Status, Headers});
+ {gun_error, ConnPid, StreamRef, Reason} ->
+ exit({ws_upgrade_failed, Reason})
+ after 1000 ->
+ error(timeout)
+ end.
+
+receive_ws(ConnPid, StreamRef) ->
+ receive
+ {gun_ws, ConnPid, StreamRef, Frame} ->
+ {ok, Frame}
+ after 1000 ->
+ {error, timeout}
+ end.
+
+ensure_handle_is_called(ConnPid, "/handle") ->
+ gun:ws_send(ConnPid, {text, <<"Necessary to trigger websocket_handle/2.">>});
+ensure_handle_is_called(_, _) ->
+ ok.
+
+%% Tests.
+
+websocket_init_nothing(Config) ->
+ doc("Nothing happens when websocket_init/1 returns no commands."),
+ do_nothing(Config, "/init").
+
+websocket_handle_nothing(Config) ->
+ doc("Nothing happens when websocket_handle/2 returns no commands."),
+ do_nothing(Config, "/handle").
+
+websocket_info_nothing(Config) ->
+ doc("Nothing happens when websocket_info/2 returns no commands."),
+ do_nothing(Config, "/info").
+
+do_nothing(Config, Path) ->
+ {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, []),
+ ensure_handle_is_called(ConnPid, Path),
+ {error, timeout} = receive_ws(ConnPid, StreamRef),
+ ok.
+
+websocket_init_invalid(Config) ->
+ doc("The connection must be closed when websocket_init/1 returns an invalid command."),
+ do_invalid(Config, "/init").
+
+websocket_handle_invalid(Config) ->
+ doc("The connection must be closed when websocket_handle/2 returns an invalid command."),
+ do_invalid(Config, "/init").
+
+websocket_info_invalid(Config) ->
+ doc("The connection must be closed when websocket_info/2 returns an invalid command."),
+ do_invalid(Config, "/info").
+
+do_invalid(Config, Path) ->
+ {ok, ConnPid, _} = gun_open_ws(Config, Path, bad),
+ ensure_handle_is_called(ConnPid, Path),
+ gun_down(ConnPid).
+
+websocket_init_one_frame(Config) ->
+ doc("A single frame is received when websocket_init/1 returns it as a command."),
+ do_one_frame(Config, "/init").
+
+websocket_handle_one_frame(Config) ->
+ doc("A single frame is received when websocket_handle/2 returns it as a command."),
+ do_one_frame(Config, "/handle").
+
+websocket_info_one_frame(Config) ->
+ doc("A single frame is received when websocket_info/2 returns it as a command."),
+ do_one_frame(Config, "/info").
+
+do_one_frame(Config, Path) ->
+ {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [
+ {text, <<"One frame!">>}
+ ]),
+ ensure_handle_is_called(ConnPid, Path),
+ {ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef),
+ ok.
+
+websocket_init_many_frames(Config) ->
+ doc("Multiple frames are received when websocket_init/1 returns them as commands."),
+ do_many_frames(Config, "/init").
+
+websocket_handle_many_frames(Config) ->
+ doc("Multiple frames are received when websocket_handle/2 returns them as commands."),
+ do_many_frames(Config, "/handle").
+
+websocket_info_many_frames(Config) ->
+ doc("Multiple frames are received when websocket_info/2 returns them as commands."),
+ do_many_frames(Config, "/info").
+
+do_many_frames(Config, Path) ->
+ {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [
+ {text, <<"One frame!">>},
+ {binary, <<"Two frames!">>}
+ ]),
+ ensure_handle_is_called(ConnPid, Path),
+ {ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef),
+ {ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef),
+ ok.
+
+websocket_init_close_frame(Config) ->
+ doc("A single close frame is received when websocket_init/1 returns it as a command."),
+ do_close_frame(Config, "/init").
+
+websocket_handle_close_frame(Config) ->
+ doc("A single close frame is received when websocket_handle/2 returns it as a command."),
+ do_close_frame(Config, "/handle").
+
+websocket_info_close_frame(Config) ->
+ doc("A single close frame is received when websocket_info/2 returns it as a command."),
+ do_close_frame(Config, "/info").
+
+do_close_frame(Config, Path) ->
+ {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [close]),
+ ensure_handle_is_called(ConnPid, Path),
+ {ok, close} = receive_ws(ConnPid, StreamRef),
+ gun_down(ConnPid).
+
+websocket_init_many_frames_then_close_frame(Config) ->
+ doc("Multiple frames are received followed by a close frame "
+ "when websocket_init/1 returns them as commands."),
+ do_many_frames_then_close_frame(Config, "/init").
+
+websocket_handle_many_frames_then_close_frame(Config) ->
+ doc("Multiple frames are received followed by a close frame "
+ "when websocket_handle/2 returns them as commands."),
+ do_many_frames_then_close_frame(Config, "/handle").
+
+websocket_info_many_frames_then_close_frame(Config) ->
+ doc("Multiple frames are received followed by a close frame "
+ "when websocket_info/2 returns them as commands."),
+ do_many_frames_then_close_frame(Config, "/info").
+
+do_many_frames_then_close_frame(Config, Path) ->
+ {ok, ConnPid, StreamRef} = gun_open_ws(Config, Path, [
+ {text, <<"One frame!">>},
+ {binary, <<"Two frames!">>},
+ close
+ ]),
+ ensure_handle_is_called(ConnPid, Path),
+ {ok, {text, <<"One frame!">>}} = receive_ws(ConnPid, StreamRef),
+ {ok, {binary, <<"Two frames!">>}} = receive_ws(ConnPid, StreamRef),
+ {ok, close} = receive_ws(ConnPid, StreamRef),
+ gun_down(ConnPid).