aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_raw.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_raw.erl')
-rw-r--r--src/gun_raw.erl48
1 files changed, 40 insertions, 8 deletions
diff --git a/src/gun_raw.erl b/src/gun_raw.erl
index 480d6bc..c5fbf45 100644
--- a/src/gun_raw.erl
+++ b/src/gun_raw.erl
@@ -20,21 +20,29 @@
-export([has_keepalive/0]).
-export([init/4]).
-export([handle/5]).
+-export([update_flow/4]).
-export([closing/4]).
-export([close/4]).
-export([data/7]).
-%% @todo down
+-export([down/1]).
-record(raw_state, {
ref :: undefined | gun:stream_ref(),
reply_to :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
- transport :: module()
+ transport :: module(),
+ flow :: integer() | infinity
}).
-%% @todo Reject ALL options.
-check_options(_) ->
- ok.
+check_options(Opts) ->
+ do_check_options(maps:to_list(Opts)).
+
+do_check_options([]) ->
+ ok;
+do_check_options([{flow, Flow}|Opts]) when is_integer(Flow); Flow == infinity ->
+ do_check_options(Opts);
+do_check_options([Opt|_]) ->
+ {error, {options, {raw, Opt}}}.
name() -> raw.
opts_name() -> raw_opts.
@@ -42,12 +50,32 @@ has_keepalive() -> false.
init(ReplyTo, Socket, Transport, Opts) ->
StreamRef = maps:get(stream_ref, Opts, undefined),
- {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport}}.
+ InitialFlow = maps:get(flow, Opts, infinity),
+ {ok, connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo,
+ socket=Socket, transport=Transport, flow=InitialFlow}}.
-handle(Data, #raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) ->
+handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo, flow=Flow0},
+ CookieStore, _, EvHandlerState) ->
%% When we take over the entire connection there is no stream reference.
ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
- {[], CookieStore, EvHandlerState}.
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - 1
+ end,
+ {[
+ {state, State#raw_state{flow=Flow}},
+ {active, Flow > 0}
+ ], CookieStore, EvHandlerState}.
+
+update_flow(State=#raw_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ [
+ {state, State#raw_state{flow=Flow}},
+ {active, Flow > 0}
+ ].
%% We can always close immediately.
closing(_, _, _, EvHandlerState) ->
@@ -63,3 +91,7 @@ data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef,
ok -> {[], EvHandlerState};
Error={error, _} -> {Error, EvHandlerState}
end.
+
+%% raw has no concept of streams.
+down(_) ->
+ [].