diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 9 | ||||
-rw-r--r-- | src/gun_http.erl | 17 | ||||
-rw-r--r-- | src/gun_http2.erl | 205 | ||||
-rw-r--r-- | src/gun_http3.erl | 7 | ||||
-rw-r--r-- | src/gun_tunnel.erl | 10 |
5 files changed, 126 insertions, 122 deletions
diff --git a/src/gun.erl b/src/gun.erl index a7c0239..5a3c233 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -1401,13 +1401,14 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi event_handler_state=EvHandlerState}); connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState, + protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:connect(ProtoState, + {Commands, CookieStore, EvHandlerState} = Protocol:connect(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Destination, #{host => Host, port => Port}, - Headers, InitialFlow, EvHandler, EvHandlerState0), - commands(Commands, State#state{event_handler_state=EvHandlerState}); + Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), + commands(Commands, State#state{cookie_store=CookieStore, + event_handler_state=EvHandlerState}); %% Public Websocket interface. connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) -> WsOpts = maps:get(ws_opts, Opts, #{}), diff --git a/src/gun_http.erl b/src/gun_http.erl index d4304b0..98acbc1 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -29,7 +29,7 @@ -export([headers/12]). -export([request/13]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -759,19 +759,20 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {[], EvHandlerState0} end. -connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) +connect(State, StreamRef, ReplyTo, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}), - {[], EvHandlerState}; -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) + {[], CookieStore, EvHandlerState}; +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, + CookieStore, _, EvHandlerState) when Streams =/= [] -> gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}), - {[], EvHandlerState}; + {[], CookieStore, EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, - EvHandler, EvHandlerState0) -> + CookieStore, EvHandler, EvHandlerState0) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -817,9 +818,9 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version InitialFlow = initial_flow(InitialFlow0, Opts), {{state, new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)}, - EvHandlerState}; + CookieStore, EvHandlerState}; Error={error, _} -> - {Error, EvHandlerState1} + {Error, CookieStore, EvHandlerState1} end. %% We can't cancel anything, we can just stop forwarding messages to the owner. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index d8853e7..15010f5 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -30,7 +30,7 @@ -export([headers/12]). -export([request/13]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). @@ -948,10 +948,24 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport, pings_unack=Pin {Error, EvHandlerState} end. -headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, +headers(State, StreamRef, ReplyTo, Method, Host, Port, Path, + Headers, InitialFlow, CookieStore, EvHandler, EvHandlerState) -> + request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState, + fun() -> + headers1(State, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, + InitialFlow, CookieStore, EvHandler, EvHandlerState) + end, + fun(#tunnel{protocol=Proto, protocol_state=ProtoState0, + info=#{origin_host := OriginHost, origin_port := OriginPort}}) -> + Proto:headers(ProtoState0, StreamRef, ReplyTo, + Method, OriginHost, OriginPort, Path, Headers, + InitialFlow, CookieStore, EvHandler, EvHandlerState) + end). + +headers1(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) - when is_reference(StreamRef) -> + Path, Headers0, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers, CookieStore} = prepare_headers( @@ -960,7 +974,7 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, RequestEvent = #{ stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, - function => ?FUNCTION_NAME, + function => headers, method => Method, authority => Authority, path => Path, @@ -981,69 +995,26 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState}; Error={error, _} -> {Error, CookieStore, EvHandlerState1} - end; -%% Tunneled request. -headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, - Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> - case get_stream_by_ref(State, StreamRef) of - %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ - origin_host := OriginHost, origin_port := OriginPort}}} -> - {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef, - ReplyTo, Method, OriginHost, OriginPort, Path, Headers, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {ResCommands, EvHandlerState} = tunnel_commands(Commands, Stream, - State, EvHandler, EvHandlerState1), - {ResCommands, CookieStore, EvHandlerState}; - #stream{tunnel=undefined} -> - gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}), - {[], CookieStore0, EvHandlerState0}; - error -> - error_stream_not_found(State, StreamRef, ReplyTo), - {[], CookieStore0, EvHandlerState0} end. -request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState) - when is_reference(StreamRef) -> - case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of - true -> - gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), - {stream_error, too_many_streams, - 'Maximum concurrency limit has been reached.'}}), - {[], CookieStore, EvHandlerState}; - false -> - request1(State, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers, Body, InitialFlow, CookieStore, - EvHandler, EvHandlerState) - end; -%% Tunneled request. -request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, - Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> - case get_stream_by_ref(State, StreamRef) of - %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ - origin_host := OriginHost, origin_port := OriginPort}}} -> - {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef, - ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, - InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {ResCommands, EvHandlerState} = tunnel_commands(Commands, - Stream, State, EvHandler, EvHandlerState1), - {ResCommands, CookieStore, EvHandlerState}; - #stream{tunnel=undefined} -> - gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}), - {[], CookieStore0, EvHandlerState0}; - error -> - error_stream_not_found(State, StreamRef, ReplyTo), - {[], CookieStore0, EvHandlerState0} - end. +request(State, StreamRef, ReplyTo, Method, Host, Port, Path, + Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState) -> + request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState, + fun() -> + request1(State, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, Body, + InitialFlow, CookieStore, EvHandler, EvHandlerState) + end, + fun(#tunnel{protocol=Proto, protocol_state=ProtoState0, + info=#{origin_host := OriginHost, origin_port := OriginPort}}) -> + Proto:request(ProtoState0, StreamRef, ReplyTo, + Method, OriginHost, OriginPort, Path, Headers, Body, + InitialFlow, CookieStore, EvHandler, EvHandlerState) + end). request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, - Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) - when is_reference(StreamRef) -> + Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( @@ -1091,6 +1062,39 @@ request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, {Error, CookieStore, EvHandlerState1} end. +%% Normal request. +request_common(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, + ReplyTo, CookieStore, _, EvHandlerState, OnRequest, _) + when is_reference(StreamRef) -> + case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of + true -> + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), + {stream_error, too_many_streams, + 'Maximum concurrency limit has been reached.'}}), + {[], CookieStore, EvHandlerState}; + false -> + OnRequest() + end; +%% Tunneled request. +request_common(State, [StreamRef|_], ReplyTo, + CookieStore0, EvHandler, EvHandlerState0, _, OnTunnel) + when is_reference(StreamRef) -> + case get_stream_by_ref(State, StreamRef) of + %% @todo We should send an error to the user if the stream isn't ready. + Stream=#stream{tunnel=Tunnel=#tunnel{}} -> + {Commands, CookieStore, EvHandlerState1} = OnTunnel(Tunnel), + {ResCommands, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {ResCommands, CookieStore, EvHandlerState}; + #stream{tunnel=undefined} -> + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}), + {[], CookieStore0, EvHandlerState0}; + error -> + error_stream_not_found(State, StreamRef, ReplyTo), + {[], CookieStore0, EvHandlerState0} + end. + initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; initial_flow(InitialFlow, _) -> InitialFlow. @@ -1264,10 +1268,24 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, Error end. -connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, +connect(State, StreamRef, ReplyTo, Destination, TunnelInfo, Headers, + InitialFlow, CookieStore, EvHandler, EvHandlerState) -> + request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState, + fun() -> + connect1(State, StreamRef, ReplyTo, + Destination, TunnelInfo, Headers, + InitialFlow, CookieStore, EvHandler, EvHandlerState) + end, + fun(#tunnel{protocol=Proto, protocol_state=ProtoState0}) -> + Proto:connect(ProtoState0, StreamRef, ReplyTo, + Destination, TunnelInfo, Headers, + InitialFlow, CookieStore, EvHandler, EvHandlerState) + end). + +connect1(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Destination=#{host := Host0}, TunnelInfo, Headers0, InitialFlow0, - EvHandler, EvHandlerState0) + CookieStore, EvHandler, EvHandlerState0) when is_reference(StreamRef) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); @@ -1318,27 +1336,9 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, flow=InitialFlow, authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - EvHandlerState}; + CookieStore, EvHandlerState}; Error={error, _} -> - {Error, EvHandlerState1} - end; -%% Tunneled request. -connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, - EvHandler, EvHandlerState0) -> - case get_stream_by_ref(State, StreamRef) of - %% @todo Should we send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - {Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef, - ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, - EvHandler, EvHandlerState0), - tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1); - #stream{tunnel=undefined} -> - gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}), - {[], EvHandlerState0}; - error -> - error_stream_not_found(State, StreamRef, ReplyTo), - {[], EvHandlerState0} + {Error, CookieStore, EvHandlerState1} end. cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, @@ -1459,11 +1459,25 @@ stream_info(State, RealStreamRef=[StreamRef|_]) -> down(#http2_state{stream_refs=Refs}) -> maps:keys(Refs). -ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, +ws_upgrade(State, StreamRef, ReplyTo, Host, Port, Path, + Headers, WsOpts, CookieStore, EvHandler, EvHandlerState) -> + request_common(State, StreamRef, ReplyTo, CookieStore, EvHandler, EvHandlerState, + fun() -> + ws_upgrade1(State, StreamRef, ReplyTo, + Host, Port, Path, Headers, WsOpts, + CookieStore, EvHandler, EvHandlerState) + end, + fun(#tunnel{protocol=Proto, protocol_state=ProtoState0, + info=#{origin_host := OriginHost, origin_port := OriginPort}}) -> + Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo, + OriginHost, OriginPort, Path, Headers, WsOpts, + CookieStore, EvHandler, EvHandlerState) + end). + +ws_upgrade1(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Host, Port, Path, Headers0, WsOpts, - CookieStore0, EvHandler, EvHandlerState0) - when is_reference(StreamRef) -> + CookieStore0, EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( <<"CONNECT">>, HTTP2Machine0), {ok, PseudoHeaders, Headers1, CookieStore} = prepare_headers(State, @@ -1489,7 +1503,7 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, RequestEvent = #{ stream_ref => RealStreamRef, reply_to => ReplyTo, - function => ?FUNCTION_NAME, + function => ws_upgrade, method => <<"CONNECT">>, authority => Authority, path => Path, @@ -1517,19 +1531,6 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, Stream)}, CookieStore, EvHandlerState}; Error={error, _} -> {Error, EvHandlerState1} - end; -ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, - Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> - case get_stream_by_ref(State, StreamRef) of - Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - {Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade( - ProtoState0, RealStreamRef, ReplyTo, - Host, Port, Path, Headers, WsOpts, - CookieStore0, EvHandler, EvHandlerState0), - {ResCommands, EvHandlerState} = tunnel_commands(Commands, - Stream, State, EvHandler, EvHandlerState1), - {ResCommands, CookieStore, EvHandlerState} - %% @todo Error conditions? end. ws_send(Frames, State, RealStreamRef, ReplyTo, EvHandler, EvHandlerState0) -> diff --git a/src/gun_http3.erl b/src/gun_http3.erl index 92b2118..49930c7 100644 --- a/src/gun_http3.erl +++ b/src/gun_http3.erl @@ -31,7 +31,7 @@ -export([headers/12]). -export([request/13]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). @@ -637,10 +637,11 @@ data(State=#http3_state{conn=Conn, transport=Transport}, StreamRef, _ReplyTo, Is % {[], EvHandlerState} end. --spec connect(_, _, _, _, _, _, _, _, _) -> no_return(). +-spec connect(_, _, _, _, _, _, _, _, _, _) -> no_return(). connect(_State, StreamRef, _ReplyTo, _Destination, _TunnelInfo, _Headers0, - _InitialFlow0, _EvHandler, _EvHandlerState0) when is_reference(StreamRef) -> + _InitialFlow0, _CookieStore, _EvHandler, _EvHandlerState0) + when is_reference(StreamRef) -> error(unimplemented). -spec cancel(_, _, _, _, _) -> no_return(). diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index 4d7a904..a6d51e3 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -27,7 +27,7 @@ -export([headers/12]). -export([request/13]). -export([data/7]). --export([connect/9]). +-export([connect/10]). -export([cancel/5]). -export([timeout/3]). -export([stream_info/2]). @@ -344,13 +344,13 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow, - EvHandler, EvHandlerState0) -> + CookieStore0, EvHandler, EvHandlerState0) -> StreamRef = maybe_dereference(State, StreamRef0), - {Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef, + {Commands, CookieStore, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef, ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, - EvHandler, EvHandlerState0), + CookieStore0, EvHandler, EvHandlerState0), {ResCommands, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), - {ResCommands, EvHandlerState}. + {ResCommands, CookieStore, EvHandlerState}. cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> |