diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_websocket.erl | 68 |
1 files changed, 35 insertions, 33 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 913c116..e1cb2d4 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -66,6 +66,7 @@ -type opts() :: #{ compress => boolean(), + deflate_opts => cow_ws:deflate_opts(), idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()) @@ -77,13 +78,11 @@ ref :: ranch:ref(), socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined, transport = undefined :: module() | undefined, + opts = #{} :: opts(), active = true :: boolean(), handler :: module(), key = undefined :: undefined | binary(), - timeout = infinity :: timeout(), timeout_ref = undefined :: undefined | reference(), - compress = false :: boolean(), - max_frame_size :: non_neg_integer() | infinity, messages = undefined :: undefined | {atom(), atom(), atom()}, hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), @@ -125,15 +124,11 @@ upgrade(Req, Env, Handler, HandlerState) -> when Req::cowboy_req:req(), Env::cowboy_middleware:env(). %% @todo Immediately crash if a response has already been sent. upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> - Timeout = maps:get(idle_timeout, Opts, 60000), - MaxFrameSize = maps:get(max_frame_size, Opts, infinity), - Compress = maps:get(compress, Opts, false), FilteredReq = case maps:get(req_filter, Opts, undefined) of undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0); FilterFun -> FilterFun(Req0) end, - State0 = #state{handler=Handler, timeout=Timeout, compress=Compress, - max_frame_size=MaxFrameSize, req=FilteredReq}, + State0 = #state{opts=Opts, handler=Handler, req=FilteredReq}, try websocket_upgrade(State0, Req0) of {ok, State, Req} -> websocket_handshake(State, Req, HandlerState, Env); @@ -174,13 +169,14 @@ websocket_version(State, Req) -> end, websocket_extensions(State, Req#{websocket_version => WsVersion}). -websocket_extensions(State=#state{compress=Compress}, Req) -> +websocket_extensions(State=#state{opts=Opts}, Req) -> %% @todo We want different options for this. For example %% * compress everything auto %% * compress only text auto %% * compress only binary auto %% * compress nothing auto (but still enabled it) %% * disable compression + Compress = maps:get(compress, Opts, false), case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of {true, Extensions} when Extensions =/= undefined -> websocket_extensions(State, Req, Extensions, []); @@ -193,15 +189,15 @@ websocket_extensions(State, Req, [], []) -> websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) -> {ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)}; %% For HTTP/2 we ARE on the controlling process and do NOT want to update the owner. -websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version}, +websocket_extensions(State=#state{opts=Opts, extensions=Extensions}, + Req=#{pid := Pid, version := Version}, [{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> - %% @todo Make deflate options configurable. - Opts0 = #{level => best_compression, mem_level => 8, strategy => default}, - Opts = case Version of - 'HTTP/1.1' -> Opts0#{owner => Pid}; - _ -> Opts0 + DeflateOpts0 = maps:get(deflate_opts, Opts, #{}), + DeflateOpts = case Version of + 'HTTP/1.1' -> DeflateOpts0#{owner => Pid}; + _ -> DeflateOpts0 end, - try cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of + try cow_ws:negotiate_permessage_deflate(Params, Extensions, DeflateOpts) of {ok, RespExt, Extensions2} -> websocket_extensions(State#state{extensions=Extensions2}, Req, Tail, [<<", ">>, RespExt|RespHeader]); @@ -210,15 +206,15 @@ websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, vers catch exit:{error, incompatible_zlib_version, _} -> websocket_extensions(State, Req, Tail, RespHeader) end; -websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid, version := Version}, +websocket_extensions(State=#state{opts=Opts, extensions=Extensions}, + Req=#{pid := Pid, version := Version}, [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> - %% @todo Make deflate options configurable. - Opts0 = #{level => best_compression, mem_level => 8, strategy => default}, - Opts = case Version of - 'HTTP/1.1' -> Opts0#{owner => Pid}; - _ -> Opts0 + DeflateOpts0 = maps:get(deflate_opts, Opts, #{}), + DeflateOpts = case Version of + 'HTTP/1.1' -> DeflateOpts0#{owner => Pid}; + _ -> DeflateOpts0 end, - try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts) of + try cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, DeflateOpts) of {ok, RespExt, Extensions2} -> websocket_extensions(State#state{extensions=Extensions2}, Req, Tail, [<<", ">>, RespExt|RespHeader]); @@ -317,13 +313,18 @@ before_loop(State=#state{socket=Socket, transport=Transport}, loop(State, HandlerState, ParseState). -spec loop_timeout(#state{}) -> #state{}. -loop_timeout(State=#state{timeout=infinity}) -> - State#state{timeout_ref=undefined}; -loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) -> - _ = case PrevRef of undefined -> ignore; PrevRef -> - erlang:cancel_timer(PrevRef) end, - TRef = erlang:start_timer(Timeout, self(), ?MODULE), - State#state{timeout_ref=TRef}. +loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) -> + _ = case PrevRef of + undefined -> ignore; + PrevRef -> erlang:cancel_timer(PrevRef) + end, + case maps:get(idle_timeout, Opts, 60000) of + infinity -> + State#state{timeout_ref=undefined}; + Timeout -> + TRef = erlang:start_timer(Timeout, self(), ?MODULE), + State#state{timeout_ref=TRef} + end. -spec loop(#state{}, any(), parse_state()) -> no_return(). loop(State=#state{parent=Parent, socket=Socket, messages=Messages, @@ -377,9 +378,9 @@ parse(State, HandlerState, PS=#ps_payload{buffer=Buffer}, Data) -> parse_payload(State, HandlerState, PS#ps_payload{buffer= <<>>}, <<Buffer/binary, Data/binary>>). -parse_header(State=#state{max_frame_size=MaxFrameSize, - frag_state=FragState, extensions=Extensions}, +parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions}, HandlerState, ParseState=#ps_header{buffer=Data}) -> + MaxFrameSize = maps:get(max_frame_size, Opts, infinity), case cow_ws:parse_header(Data, Extensions, FragState) of %% All frames sent from the client to the server are masked. {_, _, _, _, undefined, _} -> @@ -423,10 +424,11 @@ parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensio websocket_close(State, HandlerState, Error) end. -dispatch_frame(State=#state{max_frame_size=MaxFrameSize, frag_state=FragState, +dispatch_frame(State=#state{opts=Opts, frag_state=FragState, frag_buffer=SoFar, extensions=Extensions}, HandlerState, #ps_payload{type=Type0, unmasked=Payload0, close_code=CloseCode0}, RemainingData) -> + MaxFrameSize = maps:get(max_frame_size, Opts, infinity), case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of %% @todo Allow receiving fragments. {fragment, _, _, Payload} when byte_size(Payload) + byte_size(SoFar) > MaxFrameSize -> |