aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl9
-rw-r--r--src/gun_http.erl17
-rw-r--r--src/gun_http2.erl205
-rw-r--r--src/gun_http3.erl7
-rw-r--r--src/gun_tunnel.erl10
5 files changed, 126 insertions, 122 deletions
diff --git a/src/gun.erl b/src/gun.erl
index a7c0239..5a3c233 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -1401,13 +1401,14 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi
event_handler_state=EvHandlerState});
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState,
+ protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:connect(ProtoState,
+ {Commands, CookieStore, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
- Headers, InitialFlow, EvHandler, EvHandlerState0),
- commands(Commands, State#state{event_handler_state=EvHandlerState});
+ Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{cookie_store=CookieStore,
+ event_handler_state=EvHandlerState});
%% Public Websocket interface.
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
WsOpts = maps:get(ws_opts, Opts, #{}),
diff --git a/src/gun_http.erl b/src/gun_http.erl
index d4304b0..98acbc1 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -29,7 +29,7 @@
-export([headers/12]).
-export([request/13]).
-export([data/7]).
--export([connect/9]).
+-export([connect/10]).
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -759,19 +759,20 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{[], EvHandlerState0}
end.
-connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
+connect(State, StreamRef, ReplyTo, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}}),
- {[], EvHandlerState};
-connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
+ {[], CookieStore, EvHandlerState};
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _,
+ CookieStore, _, EvHandlerState)
when Streams =/= [] ->
gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}}),
- {[], EvHandlerState};
+ {[], CookieStore, EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
- EvHandler, EvHandlerState0) ->
+ CookieStore, EvHandler, EvHandlerState0) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -817,9 +818,9 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
InitialFlow = initial_flow(InitialFlow0, Opts),
{{state, new_stream(State, {connect, StreamRef, Destination},
ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)},
- EvHandlerState};
+ CookieStore, EvHandlerState};
Error={error, _} ->
- {Error, EvHandlerState1}
+ {Error, CookieStore, EvHandlerState1}
end.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index d8853e7..15010f5 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -30,7 +30,7 @@
-export([headers/12]).
-export([request/13]).
-export([data/7]).
--export([connect/9]).
+-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
@@ -948,10 +948,24 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport, pings_unack=Pin
{Error, EvHandlerState}
end.
-headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
+headers(State, StreamRef, ReplyTo, Method, Host, Port, Path,
+ Headers, InitialFlow, CookieStore, EvHandler, EvHandlerState) ->
+ request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
+ fun() ->
+ headers1(State, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ end,
+ fun(#tunnel{protocol=Proto, protocol_state=ProtoState0,
+ info=#{origin_host := OriginHost, origin_port := OriginPort}}) ->
+ Proto:headers(ProtoState0, StreamRef, ReplyTo,
+ Method, OriginHost, OriginPort, Path, Headers,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ end).
+
+headers1(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
- when is_reference(StreamRef) ->
+ Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers, CookieStore} = prepare_headers(
@@ -960,7 +974,7 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
RequestEvent = #{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
- function => ?FUNCTION_NAME,
+ function => headers,
method => Method,
authority => Authority,
path => Path,
@@ -981,69 +995,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState};
Error={error, _} ->
{Error, CookieStore, EvHandlerState1}
- end;
-%% Tunneled request.
-headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
- Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
- case get_stream_by_ref(State, StreamRef) of
- %% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
- origin_host := OriginHost, origin_port := OriginPort}}} ->
- {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef,
- ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
- InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream,
- State, EvHandler, EvHandlerState1),
- {ResCommands, CookieStore, EvHandlerState};
- #stream{tunnel=undefined} ->
- gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}}),
- {[], CookieStore0, EvHandlerState0};
- error ->
- error_stream_not_found(State, StreamRef, ReplyTo),
- {[], CookieStore0, EvHandlerState0}
end.
-request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState)
- when is_reference(StreamRef) ->
- case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
- true ->
- gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
- {stream_error, too_many_streams,
- 'Maximum concurrency limit has been reached.'}}),
- {[], CookieStore, EvHandlerState};
- false ->
- request1(State, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers, Body, InitialFlow, CookieStore,
- EvHandler, EvHandlerState)
- end;
-%% Tunneled request.
-request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
- Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
- case get_stream_by_ref(State, StreamRef) of
- %% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
- origin_host := OriginHost, origin_port := OriginPort}}} ->
- {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
- ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
- InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {ResCommands, EvHandlerState} = tunnel_commands(Commands,
- Stream, State, EvHandler, EvHandlerState1),
- {ResCommands, CookieStore, EvHandlerState};
- #stream{tunnel=undefined} ->
- gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}}),
- {[], CookieStore0, EvHandlerState0};
- error ->
- error_stream_not_found(State, StreamRef, ReplyTo),
- {[], CookieStore0, EvHandlerState0}
- end.
+request(State, StreamRef, ReplyTo, Method, Host, Port, Path,
+ Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState) ->
+ request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
+ fun() ->
+ request1(State, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers, Body,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ end,
+ fun(#tunnel{protocol=Proto, protocol_state=ProtoState0,
+ info=#{origin_host := OriginHost, origin_port := OriginPort}}) ->
+ Proto:request(ProtoState0, StreamRef, ReplyTo,
+ Method, OriginHost, OriginPort, Path, Headers, Body,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ end).
request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
- Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
- when is_reference(StreamRef) ->
+ Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -1091,6 +1062,39 @@ request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
{Error, CookieStore, EvHandlerState1}
end.
+%% Normal request.
+request_common(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef,
+ ReplyTo, CookieStore, _, EvHandlerState, OnRequest, _)
+ when is_reference(StreamRef) ->
+ case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
+ true ->
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {stream_error, too_many_streams,
+ 'Maximum concurrency limit has been reached.'}}),
+ {[], CookieStore, EvHandlerState};
+ false ->
+ OnRequest()
+ end;
+%% Tunneled request.
+request_common(State, [StreamRef|_], ReplyTo,
+ CookieStore0, EvHandler, EvHandlerState0, _, OnTunnel)
+ when is_reference(StreamRef) ->
+ case get_stream_by_ref(State, StreamRef) of
+ %% @todo We should send an error to the user if the stream isn't ready.
+ Stream=#stream{tunnel=Tunnel=#tunnel{}} ->
+ {Commands, CookieStore, EvHandlerState1} = OnTunnel(Tunnel),
+ {ResCommands, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {ResCommands, CookieStore, EvHandlerState};
+ #stream{tunnel=undefined} ->
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
+ {[], CookieStore0, EvHandlerState0};
+ error ->
+ error_stream_not_found(State, StreamRef, ReplyTo),
+ {[], CookieStore0, EvHandlerState0}
+ end.
+
initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
initial_flow(InitialFlow, _) -> InitialFlow.
@@ -1264,10 +1268,24 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
Error
end.
-connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
+connect(State, StreamRef, ReplyTo, Destination, TunnelInfo, Headers,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState) ->
+ request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
+ fun() ->
+ connect1(State, StreamRef, ReplyTo,
+ Destination, TunnelInfo, Headers,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ end,
+ fun(#tunnel{protocol=Proto, protocol_state=ProtoState0}) ->
+ Proto:connect(ProtoState0, StreamRef, ReplyTo,
+ Destination, TunnelInfo, Headers,
+ InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ end).
+
+connect1(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0,
- EvHandler, EvHandlerState0)
+ CookieStore, EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
@@ -1318,27 +1336,9 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
flow=InitialFlow, authority=Authority, path= <<>>,
tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
{{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
- EvHandlerState};
+ CookieStore, EvHandlerState};
Error={error, _} ->
- {Error, EvHandlerState1}
- end;
-%% Tunneled request.
-connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
- EvHandler, EvHandlerState0) ->
- case get_stream_by_ref(State, StreamRef) of
- %% @todo Should we send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- {Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef,
- ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
- EvHandler, EvHandlerState0),
- tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
- #stream{tunnel=undefined} ->
- gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}}),
- {[], EvHandlerState0};
- error ->
- error_stream_not_found(State, StreamRef, ReplyTo),
- {[], EvHandlerState0}
+ {Error, CookieStore, EvHandlerState1}
end.
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
@@ -1459,11 +1459,25 @@ stream_info(State, RealStreamRef=[StreamRef|_]) ->
down(#http2_state{stream_refs=Refs}) ->
maps:keys(Refs).
-ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
+ws_upgrade(State, StreamRef, ReplyTo, Host, Port, Path,
+ Headers, WsOpts, CookieStore, EvHandler, EvHandlerState) ->
+ request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState,
+ fun() ->
+ ws_upgrade1(State, StreamRef, ReplyTo,
+ Host, Port, Path, Headers, WsOpts,
+ CookieStore, EvHandler, EvHandlerState)
+ end,
+ fun(#tunnel{protocol=Proto, protocol_state=ProtoState0,
+ info=#{origin_host := OriginHost, origin_port := OriginPort}}) ->
+ Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
+ OriginHost, OriginPort, Path, Headers, WsOpts,
+ CookieStore, EvHandler, EvHandlerState)
+ end).
+
+ws_upgrade1(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo,
Host, Port, Path, Headers0, WsOpts,
- CookieStore0, EvHandler, EvHandlerState0)
- when is_reference(StreamRef) ->
+ CookieStore0, EvHandler, EvHandlerState0) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
<<"CONNECT">>, HTTP2Machine0),
{ok, PseudoHeaders, Headers1, CookieStore} = prepare_headers(State,
@@ -1489,7 +1503,7 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
RequestEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
- function => ?FUNCTION_NAME,
+ function => ws_upgrade,
method => <<"CONNECT">>,
authority => Authority,
path => Path,
@@ -1517,19 +1531,6 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
Stream)}, CookieStore, EvHandlerState};
Error={error, _} ->
{Error, EvHandlerState1}
- end;
-ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
- Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
- case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- {Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade(
- ProtoState0, RealStreamRef, ReplyTo,
- Host, Port, Path, Headers, WsOpts,
- CookieStore0, EvHandler, EvHandlerState0),
- {ResCommands, EvHandlerState} = tunnel_commands(Commands,
- Stream, State, EvHandler, EvHandlerState1),
- {ResCommands, CookieStore, EvHandlerState}
- %% @todo Error conditions?
end.
ws_send(Frames, State, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
diff --git a/src/gun_http3.erl b/src/gun_http3.erl
index 92b2118..49930c7 100644
--- a/src/gun_http3.erl
+++ b/src/gun_http3.erl
@@ -31,7 +31,7 @@
-export([headers/12]).
-export([request/13]).
-export([data/7]).
--export([connect/9]).
+-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
@@ -637,10 +637,11 @@ data(State=#http3_state{conn=Conn, transport=Transport}, StreamRef, _ReplyTo, Is
% {[], EvHandlerState}
end.
--spec connect(_, _, _, _, _, _, _, _, _) -> no_return().
+-spec connect(_, _, _, _, _, _, _, _, _, _) -> no_return().
connect(_State, StreamRef, _ReplyTo, _Destination, _TunnelInfo, _Headers0,
- _InitialFlow0, _EvHandler, _EvHandlerState0) when is_reference(StreamRef) ->
+ _InitialFlow0, _CookieStore, _EvHandler, _EvHandlerState0)
+ when is_reference(StreamRef) ->
error(unimplemented).
-spec cancel(_, _, _, _, _) -> no_return().
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
index 4d7a904..a6d51e3 100644
--- a/src/gun_tunnel.erl
+++ b/src/gun_tunnel.erl
@@ -27,7 +27,7 @@
-export([headers/12]).
-export([request/13]).
-export([data/7]).
--export([connect/9]).
+-export([connect/10]).
-export([cancel/5]).
-export([timeout/3]).
-export([stream_info/2]).
@@ -344,13 +344,13 @@ data(State=#tunnel_state{socket=Socket, transport=Transport,
connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow,
- EvHandler, EvHandlerState0) ->
+ CookieStore0, EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State, StreamRef0),
- {Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef,
+ {Commands, CookieStore, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef,
ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow,
- EvHandler, EvHandlerState0),
+ CookieStore0, EvHandler, EvHandlerState0),
{ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
- {ResCommands, EvHandlerState}.
+ {ResCommands, CookieStore, EvHandlerState}.
cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->