From b85a3894f3b8f86e255668a3ca3b1722c5d9d94e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 7 Mar 2022 21:07:35 +0100 Subject: Return commands instead of state in remaining callbacks --- src/gun.erl | 37 ++++++++++----------- src/gun_http.erl | 56 +++++++++++++++---------------- src/gun_http2.erl | 98 +++++++++++++++++++++++++++++------------------------- src/gun_raw.erl | 8 ++--- src/gun_tunnel.erl | 53 ++++++++++++++++------------- src/gun_ws.erl | 3 +- 6 files changed, 133 insertions(+), 122 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index d808750..ddd6cec 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -1249,32 +1249,31 @@ connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, CookieStore, EvHandlerState} = Protocol:headers(ProtoState, + {Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Method, Host, Port, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore, - event_handler_state=EvHandlerState}}; + commands(Commands, State#state{cookie_store=CookieStore, + event_handler_state=EvHandlerState}); connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, CookieStore, EvHandlerState} = Protocol:request(ProtoState, + {Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore, - event_handler_state=EvHandlerState}}; + commands(Commands, State#state{cookie_store=CookieStore, + event_handler_state=EvHandlerState}); connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState, + {Commands, EvHandlerState} = Protocol:connect(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, - event_handler_state=EvHandlerState}}; + commands(Commands, State#state{event_handler_state=EvHandlerState}); %% Public Websocket interface. connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) -> WsOpts = maps:get(ws_opts, Opts, #{}), @@ -1289,11 +1288,11 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts}, opts => WsOpts }, EvHandlerState0), %% @todo Can fail if HTTP/1.0. - {ProtoState2, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState, + {Commands, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState1), - {keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore, - event_handler_state=EvHandlerState}}; + commands(Commands, State#state{cookie_store=CookieStore, + event_handler_state=EvHandlerState}); %% @todo Maybe better standardize the protocol callbacks argument orders. connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ protocol=Protocol, protocol_state=ProtoState, @@ -1370,10 +1369,10 @@ closing(Type, Event, State) -> handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _, State=#state{protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, + {Commands, EvHandlerState} = Protocol:data(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; + commands(Commands, State#state{event_handler_state=EvHandlerState}); handle_common_connected(info, {timeout, TRef, Name}, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> Commands = Protocol:timeout(ProtoState, Name, TRef), @@ -1419,9 +1418,9 @@ handle_common_connected_no_input(info, {handle_continue, StreamRef, Msg}, _, handle_common_connected_no_input(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState0, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0), - {keep_state, keepalive_timeout(State#state{ - protocol_state=ProtoState, event_handler_state=EvHandlerState})}; + {Commands, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0), + commands(Commands, keepalive_timeout(State#state{ + event_handler_state=EvHandlerState})); handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{protocol=Protocol, protocol_state=ProtoState}) -> Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), @@ -1429,9 +1428,9 @@ handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + {Commands, EvHandlerState} = Protocol:cancel(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; + commands(Commands, State#state{event_handler_state=EvHandlerState}); handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _, State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) -> Intermediaries = [I || I=#{protocol := http} <- Intermediaries0], diff --git a/src/gun_http.erl b/src/gun_http.erl index aaa6d15..ec29b71 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -546,20 +546,20 @@ close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> close_streams(State, Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. -keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) -> - {State, EvHandlerState}; +keepalive(#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) -> + {[], EvHandlerState}; %% We can only keep-alive by sending an empty line in-between streams. -keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) -> +keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) -> Transport:send(Socket, <<"\r\n">>), - {State, EvHandlerState}; -keepalive(State, _, EvHandlerState) -> - {State, EvHandlerState}. + {[], EvHandlerState}; +keepalive(_State, _, EvHandlerState) -> + {[], EvHandlerState}. headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, CookieStore, EvHandlerState}; + {[], CookieStore, EvHandlerState}; headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> @@ -567,15 +567,15 @@ headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), InitialFlow = initial_flow(InitialFlow0, Opts), - {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, - Method, Authority, Path, InitialFlow), + {{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, + ReplyTo, Method, Authority, Path, InitialFlow)}, CookieStore, EvHandlerState}. request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, CookieStore, EvHandlerState}; + {[], CookieStore, EvHandlerState}; request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> @@ -583,8 +583,8 @@ request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), InitialFlow = initial_flow(InitialFlow0, Opts), - {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, - Method, Authority, Path, InitialFlow), + {{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, + ReplyTo, Method, Authority, Path, InitialFlow)}, CookieStore, EvHandlerState}. initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; @@ -704,10 +704,10 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), - {State#http_state{out=head}, EvHandlerState}; + {{state, State#http_state{out=head}}, EvHandlerState}; body_chunked when Version =:= 'HTTP/1.1' -> Transport:send(Socket, cow_http_te:chunk(Data)), - {State, EvHandlerState0}; + {[], EvHandlerState0}; {body, Length} when DataLength =< Length -> Transport:send(Socket, Data), Length2 = Length - DataLength, @@ -718,13 +718,13 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), - {State#http_state{out=head}, EvHandlerState}; + {{state, State#http_state{out=head}}, EvHandlerState}; Length2 > 0, IsFin =:= nofin -> - {State#http_state{out={body, Length2}}, EvHandlerState0} + {{state, State#http_state{out={body, Length2}}}, EvHandlerState0} end; body_chunked -> %% HTTP/1.0 Transport:send(Socket, Data), - {State, EvHandlerState0} + {[], EvHandlerState0} end; _ -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} @@ -734,12 +734,12 @@ connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, EvHandlerState}; + {[], EvHandlerState}; connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when Streams =/= [] -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, - {State, EvHandlerState}; + {[], EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> @@ -786,8 +786,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), - {new_stream(State, {connect, StreamRef, Destination}, ReplyTo, - <<"CONNECT">>, Authority, <<>>, InitialFlow), + {{state, new_stream(State, {connect, StreamRef, Destination}, ReplyTo, + <<"CONNECT">>, Authority, <<>>, InitialFlow)}, EvHandlerState}. %% We can't cancel anything, we can just stop forwarding messages to the owner. @@ -801,7 +801,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> endpoint => local, reason => cancel }, EvHandlerState0), - {State, EvHandlerState}; + {{state, State}, EvHandlerState}; false -> {error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0} end. @@ -831,12 +831,12 @@ down(#http_state{streams=Streams}) -> error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, - State. + {state, State}. error_stream_not_found(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream cannot be found."}}, - State. + {state, State}. %% Headers information retrieval. @@ -927,12 +927,12 @@ ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerSt when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, CookieStore, EvHandlerState}; + {[], CookieStore, EvHandlerState}; ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "Websocket cannot be used over an HTTP/1.0 connection."}}, - {State, CookieStore, EvHandlerState}; + {[], CookieStore, EvHandlerState}; ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, Host, Port, Path, Headers0, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of @@ -960,9 +960,9 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), InitialFlow = maps:get(flow, WsOpts, infinity), - {new_stream(State#http_state{connection=Conn, out=Out}, + {{state, new_stream(State#http_state{connection=Conn, out=Out}, #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts}, - ReplyTo, <<"GET">>, Authority, Path, InitialFlow), + ReplyTo, <<"GET">>, Authority, Path, InitialFlow)}, CookieStore, EvHandlerState}. ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index ec51235..23755b9 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -851,9 +851,9 @@ close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, ok. -keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> +keepalive(#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> Transport:send(Socket, cow_http2:ping(0)), - {State, EvHandlerState}. + {[], EvHandlerState}. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, @@ -881,24 +881,25 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path}, - {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, CookieStore, EvHandlerState}; %% Tunneled request. headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ origin_host := OriginHost, origin_port := OriginPort}}} -> - {ProtoState, CookieStore, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef, + {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef, ReplyTo, Method, OriginHost, OriginPort, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - CookieStore, EvHandlerState}; + {State1, EvHandlerState} = tunnel_commands(Commands, Stream, + State, EvHandler, EvHandlerState1), + {{state, State1}, CookieStore, EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, CookieStore0, EvHandlerState0}; + {[], CookieStore0, EvHandlerState0}; error -> {error_stream_not_found(State, StreamRef, ReplyTo), CookieStore0, EvHandlerState0} @@ -944,28 +945,29 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, stream_ref => RealStreamRef, reply_to => ReplyTo }, - {State, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; + {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; nofin -> {StateRet, EvHandlerStateRet} = maybe_send_data( State, StreamID, fin, Body, EvHandler, EvHandlerState), - {StateRet, CookieStore, EvHandlerStateRet} + {{state, StateRet}, CookieStore, EvHandlerStateRet} end; %% Tunneled request. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo We should send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{ origin_host := OriginHost, origin_port := OriginPort}}} -> - {ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, RealStreamRef, + {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef, ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - CookieStore, EvHandlerState}; + {State1, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {{state, State1}, CookieStore, EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, CookieStore0, EvHandlerState0}; + {[], CookieStore0, EvHandlerState0}; error -> {error_stream_not_found(State, StreamRef, ReplyTo), CookieStore0, EvHandlerState0} @@ -1020,13 +1022,16 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, {ok, _, fin} -> {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, _} when Tunnel =:= undefined -> - maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState); + {State1, EvHandlerStateRet} = maybe_send_data(State, + StreamID, IsFin, Data, EvHandler, EvHandlerState), + {{state, State1}, EvHandlerStateRet}; {ok, _, _} -> #tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel, - {ProtoState, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, + {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - EvHandlerState1} + {State1, EvHandlerStateRet} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerStateRet} end; error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} @@ -1034,15 +1039,16 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, %% Tunneled data. data(State, RealStreamRef=[StreamRef|_], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - {ProtoState, EvHandlerState} = Proto:data(ProtoState0, RealStreamRef, + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {Commands, EvHandlerState1} = Proto:data(ProtoState0, RealStreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - EvHandlerState}; + {State1, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, EvHandlerState0}; + {[], EvHandlerState0}; error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. @@ -1165,23 +1171,24 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, - {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, EvHandlerState}; %% Tunneled request. connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of %% @todo Should we send an error to the user if the stream isn't ready. - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef, + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef, ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - EvHandlerState}; + {State1, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, EvHandlerState0}; + {[], EvHandlerState0}; error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} @@ -1200,7 +1207,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP endpoint => local, reason => cancel }, EvHandlerState0), - {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID), + {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID)}, EvHandlerState}; error -> {error_stream_not_found(State, StreamRef, ReplyTo), @@ -1209,15 +1216,16 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP %% Tunneled request. cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef, - ReplyTo, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - EvHandlerState}; + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {Commands, EvHandlerState1} = Proto:cancel(ProtoState0, + RealStreamRef, ReplyTo, EvHandler, EvHandlerState0), + {State1, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState}; #stream{tunnel=undefined} -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - {State, EvHandlerState0}; + {[], EvHandlerState0}; error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} @@ -1352,19 +1360,19 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{ extensions=GunExtensions, opts=WsOpts}}}, - {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, CookieStore, EvHandlerState}; ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of - Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> - {ProtoState, CookieStore, EvHandlerState} = Proto:ws_upgrade( + Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} -> + {Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade( ProtoState0, RealStreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0), - {store_stream(State, Stream#stream{ - tunnel=Tunnel#tunnel{protocol_state=ProtoState}}), - CookieStore, EvHandlerState} + {State1, EvHandlerState} = tunnel_commands(Commands, + Stream, State, EvHandler, EvHandlerState1), + {{state, State1}, CookieStore, EvHandlerState} %% @todo Error conditions? end. @@ -1399,12 +1407,12 @@ connection_error(#http2_state{socket=Socket, transport=Transport, error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, - State. + {state, State}. error_stream_not_found(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream cannot be found."}}, - State. + {state, State}. %% Streams. diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 0988d1b..f92c777 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -44,10 +44,10 @@ init(ReplyTo, Socket, Transport, Opts) -> StreamRef = maps:get(stream_ref, Opts, undefined), {connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport}}. -handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) -> +handle(Data, #raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) -> %% When we take over the entire connection there is no stream reference. ReplyTo ! {gun_data, self(), StreamRef, nofin, Data}, - {{state, State}, CookieStore, EvHandlerState}. + {[], CookieStore, EvHandlerState}. %% We can always close immediately. closing(_, _, _, EvHandlerState) -> @@ -57,7 +57,7 @@ close(_, _, _, EvHandlerState) -> EvHandlerState. %% @todo Initiate closing on IsFin=fin. -data(State=#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef, +data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef, _ReplyTo, _IsFin, Data, _EvHandler, EvHandlerState) -> Transport:send(Socket, Data), - {State, EvHandlerState}. + {[], EvHandlerState}. diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index 43e8767..40addf6 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -273,32 +273,32 @@ close(_Reason, _State, _EvHandler, EvHandlerState) -> %% @todo Closing must be propagated to tunnels. EvHandlerState. -keepalive(State, _EvHandler, EvHandlerState) -> +keepalive(_State, _EvHandler, EvHandlerState) -> %% @todo Need to figure out how to handle keepalive for tunnels. - {State, EvHandlerState}. + {[], EvHandlerState}. %% We pass the headers forward and optionally dereference StreamRef. -headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, +headers(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, Method, Host, Port, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> - StreamRef = maybe_dereference(State, StreamRef0), - {ProtoState, CookieStore, EvHandlerState} = Proto:headers(ProtoState0, StreamRef, + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, - CookieStore, EvHandlerState}. + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, CookieStore, EvHandlerState}. %% We pass the request forward and optionally dereference StreamRef. -request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0, +request(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState0, info=#{origin_host := OriginHost, origin_port := OriginPort}}, StreamRef0, ReplyTo, Method, _Host, _Port, Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> - StreamRef = maybe_dereference(State, StreamRef0), - {ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, StreamRef, + StreamRef = maybe_dereference(State0, StreamRef0), + {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, StreamRef, ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, - CookieStore, EvHandlerState}. + {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1), + {{state, State}, CookieStore, EvHandlerState}. %% When the next tunnel is SOCKS we pass the data forward directly. %% This is needed because SOCKS has no StreamRef and the data cannot @@ -306,9 +306,10 @@ request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0, data(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0, protocol_origin={origin, _, _, _, socks5}}, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) -> - {ProtoState, EvHandlerState} = Proto:data(ProtoState0, StreamRef, + {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}; + {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState}; %% CONNECT tunnels pass the data forward and dereference StreamRef %% unless they are the recipient of the callback, in which case the %% data is sent to the socket. @@ -319,12 +320,14 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, case StreamRef0 of TunnelStreamRef -> ok = Transport:send(Socket, Data), - {State, EvHandlerState0}; + {[], EvHandlerState0}; _ -> StreamRef = maybe_dereference(State, StreamRef0), - {ProtoState, EvHandlerState} = Proto:data(ProtoState0, StreamRef, + {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState} + {State1, EvHandlerState} = commands(Commands, State, + EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState} end. %% We pass the CONNECT request forward and optionally dereference StreamRef. @@ -333,17 +336,19 @@ connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port}, StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow, EvHandler, EvHandlerState0) -> StreamRef = maybe_dereference(State, StreamRef0), - {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef, + {Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef, ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState}. cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> StreamRef = maybe_dereference(State, StreamRef0), - {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, StreamRef, + {Commands, EvHandlerState1} = Proto:cancel(ProtoState0, StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}. + {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), + {{state, State1}, EvHandlerState}. timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) -> case Proto:timeout(ProtoState0, Msg, TRef) of @@ -426,11 +431,11 @@ ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=P origin_host := Host, origin_port := Port } = TunnelInfo, - {ProtoState, CookieStore, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo, + {Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0), - {State#tunnel_state{protocol_state=ProtoState}, - CookieStore, EvHandlerState}. + {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1), + {{state, State1}, CookieStore, EvHandlerState}. ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, StreamRef0, ReplyTo, EvHandler, EvHandlerState0) -> diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 3a3969e..8cf2b49 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -299,8 +299,7 @@ close(_, _, _, EvHandlerState) -> EvHandlerState. keepalive(State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState0) -> - {[], EvHandlerState} = send(ping, State, ReplyTo, EvHandler, EvHandlerState0), - {State, EvHandlerState}. + send(ping, State, ReplyTo, EvHandler, EvHandlerState0). %% Send one frame. send(Frame, State=#ws_state{stream_ref=StreamRef, -- cgit v1.2.3