aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r--src/gun_http.erl137
1 files changed, 88 insertions, 49 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 68f9e7d..ec268ad 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -18,12 +18,13 @@
-export([name/0]).
-export([init/4]).
-export([handle/4]).
+-export([update_flow/4]).
-export([close/4]).
-export([keepalive/1]).
--export([headers/10]).
--export([request/11]).
+-export([headers/11]).
+-export([request/12]).
-export([data/7]).
--export([connect/5]).
+-export([connect/6]).
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -43,6 +44,7 @@
-record(stream, {
ref :: reference() | connect_info() | websocket_info(),
reply_to :: pid(),
+ flow :: integer() | infinity,
method :: binary(),
is_alive :: boolean(),
handler_state :: undefined | gun_content_handler:state()
@@ -52,6 +54,7 @@
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
version = 'HTTP/1.1' :: cow_http:version(),
content_handlers :: gun_content_handler:opt(),
connection = keepalive :: keepalive | close,
@@ -73,6 +76,8 @@ do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
ok -> do_check_options(Opts);
error -> {error, {options, {http, Opt}}}
end;
+do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
+ do_check_options(Opts);
do_check_options([{keepalive, infinity}|Opts]) ->
do_check_options(Opts);
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
@@ -87,10 +92,11 @@ do_check_options([Opt|_]) ->
name() -> http.
init(Owner, Socket, Transport, Opts) ->
+ %% @todo If we keep the opts we don't need to add these to the state.
Version = maps:get(version, Opts, 'HTTP/1.1'),
Handlers = maps:get(content_handlers, Opts, [gun_data_h]),
TransformHeaderName = maps:get(transform_header_name, Opts, fun (N) -> N end),
- #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version,
+ #http_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, version=Version,
content_handlers=Handlers, transform_header_name=TransformHeaderName}.
%% Stop looping when we got no more data.
@@ -120,7 +126,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer,
end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) ->
- {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState};
+ {send_data(Data, State, nofin), EvHandlerState};
%% Chunked transfer-encoding may contain both data and trailers.
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_],
@@ -130,21 +136,15 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
more ->
{{state, State#http_state{buffer=Buffer2}}, EvHandlerState0};
{more, Data2, InState2} ->
- {{state, send_data_if_alive(Data2,
- State#http_state{buffer= <<>>, in_state=InState2},
- nofin)}, EvHandlerState0};
+ {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0};
{more, Data2, Length, InState2} when is_integer(Length) ->
%% @todo See if we can recv faster than one message at a time.
- {{state, send_data_if_alive(Data2,
- State#http_state{buffer= <<>>, in_state=InState2},
- nofin)}, EvHandlerState0};
+ {send_data(Data2, State#http_state{buffer= <<>>, in_state=InState2}, nofin), EvHandlerState0};
{more, Data2, Rest, InState2} ->
%% @todo See if we can recv faster than one message at a time.
- {{state, send_data_if_alive(Data2,
- State#http_state{buffer=Rest, in_state=InState2},
- nofin)}, EvHandlerState0};
+ {send_data(Data2, State#http_state{buffer=Rest, in_state=InState2}, nofin), EvHandlerState0};
{done, HasTrailers, Rest} ->
- %% @todo response_end should be called AFTER send_data_if_alive
+ %% @todo response_end should be called AFTER send_data
{IsFin, EvHandlerState} = case HasTrailers of
trailers ->
{nofin, EvHandlerState0};
@@ -156,7 +156,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{fin, EvHandlerState1}
end,
%% I suppose it doesn't hurt to append an empty binary.
- State1 = send_data_if_alive(<<>>, State, IsFin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(<<>>, State, IsFin),
case {HasTrailers, Conn} of
{trailers, _} ->
handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState);
@@ -166,7 +167,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{[{state, end_stream(State1)}, close], EvHandlerState}
end;
{done, Data2, HasTrailers, Rest} ->
- %% @todo response_end should be called AFTER send_data_if_alive
+ %% @todo response_end should be called AFTER send_data
{IsFin, EvHandlerState} = case HasTrailers of
trailers ->
{nofin, EvHandlerState0};
@@ -177,7 +178,8 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
}, EvHandlerState0),
{fin, EvHandlerState1}
end,
- State1 = send_data_if_alive(Data2, State, IsFin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(Data2, State, IsFin),
case {HasTrailers, Conn} of
{trailers, _} ->
handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState);
@@ -218,24 +220,24 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
if
%% More data coming.
DataSize < Length ->
- {{state, send_data_if_alive(Data,
- State#http_state{in={body, Length - DataSize}},
- nofin)}, EvHandlerState0};
+ {send_data(Data, State#http_state{in={body, Length - DataSize}}, nofin), EvHandlerState0};
%% Stream finished, no rest.
DataSize =:= Length ->
- State1 = send_data_if_alive(Data, State, fin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(Data, State, fin),
EvHandlerState = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
- keepalive -> {{state, end_stream(State1)}, EvHandlerState};
+ keepalive -> {[{state, end_stream(State1)}, {active, true}], EvHandlerState};
close -> {[{state, end_stream(State1)}, close], EvHandlerState}
end;
%% Stream finished, rest.
true ->
<< Body:Length/binary, Rest/bits >> = Data,
- State1 = send_data_if_alive(Body, State, fin),
+ %% We ignore the active command because the stream ended.
+ [{state, State1}|_] = send_data(Body, State, fin),
EvHandlerState = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -420,20 +422,48 @@ stream_ref({connect, StreamRef, _}) -> StreamRef;
stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef;
stream_ref(StreamRef) -> StreamRef.
-send_data_if_alive(<<>>, State, nofin) ->
- State;
+%% The state must be first in order to retrieve it when the stream ended.
+send_data(<<>>, State, nofin) ->
+ [{state, State}, {active, true}];
%% @todo What if we receive data when the HEAD method was used?
-send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
- is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) ->
- Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
- State#http_state{streams=[Stream#stream{handler_state=Handlers}|Tail]};
-send_data_if_alive(_, State, _) ->
- State.
+send_data(Data, State=#http_state{streams=[Stream=#stream{
+ flow=Flow0, is_alive=true, handler_state=Handlers0}|Tail]}, IsFin) ->
+ {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - Dec
+ end,
+ [
+ {state, State#http_state{streams=[Stream#stream{flow=Flow, handler_state=Handlers}|Tail]}},
+ {active, Flow > 0}
+ ];
+send_data(_, State, _) ->
+ [{state, State}, {active, true}].
+
+%% We only update the active state when the current stream is being updated.
+update_flow(State=#http_state{streams=[Stream=#stream{ref=StreamRef, flow=Flow0}|Tail]},
+ _ReplyTo, StreamRef, Inc) ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ [
+ {state, State#http_state{streams=[Stream#stream{flow=Flow}|Tail]}},
+ {active, Flow > 0}
+ ];
+update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) ->
+ Streams = [case Ref of
+ StreamRef when Flow =/= infinity ->
+ Tuple#stream{flow=Flow + Inc};
+ _ ->
+ Tuple
+ end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0],
+ {state, State#http_state{streams=Streams}}.
%% @todo Use Reason.
close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
EvHandler, EvHandlerState0) ->
- _ = send_data_if_alive(<<>>, State, fin),
+ _ = send_data(<<>>, State, fin),
EvHandlerState = EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -461,23 +491,29 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
keepalive(State) ->
State.
-headers(State=#http_state{out=head},
+headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
- EvHandler, EvHandlerState0) ->
+ InitialFlow0, EvHandler, EvHandlerState0) ->
{Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, undefined,
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
- {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow),
EvHandlerState}.
-request(State=#http_state{out=head}, StreamRef, ReplyTo,
- Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0) ->
+request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers, Body,
+ InitialFlow0, EvHandler, EvHandlerState0) ->
{Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
- {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method, InitialFlow),
EvHandlerState}.
+initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
+initial_flow(InitialFlow, _) -> InitialFlow.
+
send_request(State=#http_state{socket=Socket, transport=Transport, version=Version},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
EvHandler, EvHandlerState0, Function) ->
@@ -602,12 +638,12 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
-connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] ->
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _) when Streams =/= [] ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
State;
-connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
- StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) ->
+connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
+ StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0, InitialFlow0) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -634,7 +670,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
- new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>).
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>, InitialFlow).
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
@@ -735,9 +772,9 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
-new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) ->
+new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method, InitialFlow) ->
State#http_state{streams=Streams
- ++ [#stream{ref=StreamRef, reply_to=ReplyTo,
+ ++ [#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
method=iolist_to_binary(Method), is_alive=true}]}.
is_stream(#http_state{streams=Streams}, StreamRef) ->
@@ -787,8 +824,9 @@ ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
{Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
<<"GET">>, Host, Port, Path, Headers, undefined,
EvHandler, EvHandlerState0, ?FUNCTION_NAME),
+ InitialFlow = maps:get(flow, WsOpts, infinity),
{new_stream(State#http_state{connection=Conn, out=Out},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>),
+ {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>, InitialFlow),
EvHandlerState}.
ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
@@ -853,8 +891,9 @@ ws_handshake_protocols(Buffer, State, StreamRef, Headers, Extensions, Opts) ->
end
end.
-ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport},
- StreamRef, Headers, Extensions, Handler, Opts) ->
+%% We know that the most recent stream is the Websocket one.
+ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport,
+ streams=[#stream{flow=InitialFlow}|_]}, StreamRef, Headers, Extensions, Handler, Opts) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
<<>> ->
@@ -863,4 +902,4 @@ ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Trans
{OK, _, _} = Transport:messages(),
self() ! {OK, Socket, Buffer}
end,
- gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts).
+ gun_ws:init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts).