aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorViktor Söderqvist <[email protected]>2022-03-07 21:07:35 +0100
committerLoïc Hoguin <[email protected]>2022-03-08 12:02:39 +0100
commitb85a3894f3b8f86e255668a3ca3b1722c5d9d94e (patch)
tree918934a7814a34d9ac7bee9f1740d51e2fbdc02f /src
parentf9175998687678e227bdd49669e2d83f0648fa57 (diff)
downloadgun-b85a3894f3b8f86e255668a3ca3b1722c5d9d94e.tar.gz
gun-b85a3894f3b8f86e255668a3ca3b1722c5d9d94e.tar.bz2
gun-b85a3894f3b8f86e255668a3ca3b1722c5d9d94e.zip
Return commands instead of state in remaining callbacks
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl37
-rw-r--r--src/gun_http.erl56
-rw-r--r--src/gun_http2.erl98
-rw-r--r--src/gun_raw.erl8
-rw-r--r--src/gun_tunnel.erl53
-rw-r--r--src/gun_ws.erl3
6 files changed, 133 insertions, 122 deletions
diff --git a/src/gun.erl b/src/gun.erl
index d808750..ddd6cec 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -1249,32 +1249,31 @@ connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
+ {Commands, CookieStore, EvHandlerState} = Protocol:headers(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
- event_handler_state=EvHandlerState}};
+ commands(Commands, State#state{cookie_store=CookieStore,
+ event_handler_state=EvHandlerState});
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState, cookie_store=CookieStore0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
+ {Commands, CookieStore, EvHandlerState} = Protocol:request(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
- event_handler_state=EvHandlerState}};
+ commands(Commands, State#state{cookie_store=CookieStore,
+ event_handler_state=EvHandlerState});
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState,
+ {Commands, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
Headers, InitialFlow, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2,
- event_handler_state=EvHandlerState}};
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
%% Public Websocket interface.
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
WsOpts = maps:get(ws_opts, Opts, #{}),
@@ -1289,11 +1288,11 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers, WsOpts},
opts => WsOpts
}, EvHandlerState0),
%% @todo Can fail if HTTP/1.0.
- {ProtoState2, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
+ {Commands, CookieStore, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState1),
- {keep_state, State#state{protocol_state=ProtoState2, cookie_store=CookieStore,
- event_handler_state=EvHandlerState}};
+ commands(Commands, State#state{cookie_store=CookieStore,
+ event_handler_state=EvHandlerState});
%% @todo Maybe better standardize the protocol callbacks argument orders.
connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
protocol=Protocol, protocol_state=ProtoState,
@@ -1370,10 +1369,10 @@ closing(Type, Event, State) ->
handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
+ {Commands, EvHandlerState} = Protocol:data(ProtoState,
dereference_stream_ref(StreamRef, State),
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
handle_common_connected(info, {timeout, TRef, Name}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:timeout(ProtoState, Name, TRef),
@@ -1419,9 +1418,9 @@ handle_common_connected_no_input(info, {handle_continue, StreamRef, Msg}, _,
handle_common_connected_no_input(info, keepalive, _,
State=#state{protocol=Protocol, protocol_state=ProtoState0,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
- {keep_state, keepalive_timeout(State#state{
- protocol_state=ProtoState, event_handler_state=EvHandlerState})};
+ {Commands, EvHandlerState} = Protocol:keepalive(ProtoState0, EvHandler, EvHandlerState0),
+ commands(Commands, keepalive_timeout(State#state{
+ event_handler_state=EvHandlerState}));
handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow}, _,
State0=#state{protocol=Protocol, protocol_state=ProtoState}) ->
Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
@@ -1429,9 +1428,9 @@ handle_common_connected_no_input(cast, {update_flow, ReplyTo, StreamRef, Flow},
handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
+ {Commands, EvHandlerState} = Protocol:cancel(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _,
State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) ->
Intermediaries = [I || I=#{protocol := http} <- Intermediaries0],
diff --git a/src/gun_http.erl b/src/gun_http.erl
index aaa6d15..ec29b71 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -546,20 +546,20 @@ close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
close_streams(State, Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
-keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
- {State, EvHandlerState};
+keepalive(#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
+ {[], EvHandlerState};
%% We can only keep-alive by sending an empty line in-between streams.
-keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
+keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) ->
Transport:send(Socket, <<"\r\n">>),
- {State, EvHandlerState};
-keepalive(State, _, EvHandlerState) ->
- {State, EvHandlerState}.
+ {[], EvHandlerState};
+keepalive(_State, _, EvHandlerState) ->
+ {[], EvHandlerState}.
headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- {State, CookieStore, EvHandlerState};
+ {[], CookieStore, EvHandlerState};
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
@@ -567,15 +567,15 @@ headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
- {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
- Method, Authority, Path, InitialFlow),
+ {{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
+ ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.
request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- {State, CookieStore, EvHandlerState};
+ {[], CookieStore, EvHandlerState};
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) ->
@@ -583,8 +583,8 @@ request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = initial_flow(InitialFlow0, Opts),
- {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo,
- Method, Authority, Path, InitialFlow),
+ {{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef,
+ ReplyTo, Method, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.
initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
@@ -704,10 +704,10 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
- {State#http_state{out=head}, EvHandlerState};
+ {{state, State#http_state{out=head}}, EvHandlerState};
body_chunked when Version =:= 'HTTP/1.1' ->
Transport:send(Socket, cow_http_te:chunk(Data)),
- {State, EvHandlerState0};
+ {[], EvHandlerState0};
{body, Length} when DataLength =< Length ->
Transport:send(Socket, Data),
Length2 = Length - DataLength,
@@ -718,13 +718,13 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
- {State#http_state{out=head}, EvHandlerState};
+ {{state, State#http_state{out=head}}, EvHandlerState};
Length2 > 0, IsFin =:= nofin ->
- {State#http_state{out={body, Length2}}, EvHandlerState0}
+ {{state, State#http_state{out={body, Length2}}}, EvHandlerState0}
end;
body_chunked -> %% HTTP/1.0
Transport:send(Socket, Data),
- {State, EvHandlerState0}
+ {[], EvHandlerState0}
end;
_ ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
@@ -734,12 +734,12 @@ connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- {State, EvHandlerState};
+ {[], EvHandlerState};
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
- {State, EvHandlerState};
+ {[], EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
EvHandler, EvHandlerState0) ->
@@ -786,8 +786,8 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
- {new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
- <<"CONNECT">>, Authority, <<>>, InitialFlow),
+ {{state, new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
+ <<"CONNECT">>, Authority, <<>>, InitialFlow)},
EvHandlerState}.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
@@ -801,7 +801,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
endpoint => local,
reason => cancel
}, EvHandlerState0),
- {State, EvHandlerState};
+ {{state, State}, EvHandlerState};
false ->
{error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0}
end.
@@ -831,12 +831,12 @@ down(#http_state{streams=Streams}) ->
error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
- State.
+ {state, State}.
error_stream_not_found(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
- State.
+ {state, State}.
%% Headers information retrieval.
@@ -927,12 +927,12 @@ ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerSt
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- {State, CookieStore, EvHandlerState};
+ {[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
- {State, CookieStore, EvHandlerState};
+ {[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
Host, Port, Path, Headers0, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
@@ -960,9 +960,9 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined,
CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME),
InitialFlow = maps:get(flow, WsOpts, infinity),
- {new_stream(State#http_state{connection=Conn, out=Out},
+ {{state, new_stream(State#http_state{connection=Conn, out=Out},
#websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts},
- ReplyTo, <<"GET">>, Authority, Path, InitialFlow),
+ ReplyTo, <<"GET">>, Authority, Path, InitialFlow)},
CookieStore, EvHandlerState}.
ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index ec51235..23755b9 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -851,9 +851,9 @@ close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
ok.
-keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
+keepalive(#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
Transport:send(Socket, cow_http2:ping(0)),
- {State, EvHandlerState}.
+ {[], EvHandlerState}.
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
@@ -881,24 +881,25 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path=Path},
- {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
CookieStore, EvHandlerState};
%% Tunneled request.
headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
- {ProtoState, CookieStore, EvHandlerState} = Proto:headers(ProtoState0, RealStreamRef,
+ {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- CookieStore, EvHandlerState};
+ {State1, EvHandlerState} = tunnel_commands(Commands, Stream,
+ State, EvHandler, EvHandlerState1),
+ {{state, State1}, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- {State, CookieStore0, EvHandlerState0};
+ {[], CookieStore0, EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
CookieStore0, EvHandlerState0}
@@ -944,28 +945,29 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
stream_ref => RealStreamRef,
reply_to => ReplyTo
},
- {State, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
+ {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
nofin ->
{StateRet, EvHandlerStateRet} = maybe_send_data(
State, StreamID, fin, Body, EvHandler, EvHandlerState),
- {StateRet, CookieStore, EvHandlerStateRet}
+ {{state, StateRet}, CookieStore, EvHandlerStateRet}
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
origin_host := OriginHost, origin_port := OriginPort}}} ->
- {ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, RealStreamRef,
+ {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- CookieStore, EvHandlerState};
+ {State1, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {{state, State1}, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- {State, CookieStore0, EvHandlerState0};
+ {[], CookieStore0, EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
CookieStore0, EvHandlerState0}
@@ -1020,13 +1022,16 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
{ok, _, fin} ->
{error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
{ok, _, _} when Tunnel =:= undefined ->
- maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState);
+ {State1, EvHandlerStateRet} = maybe_send_data(State,
+ StreamID, IsFin, Data, EvHandler, EvHandlerState),
+ {{state, State1}, EvHandlerStateRet};
{ok, _, _} ->
#tunnel{protocol=Proto, protocol_state=ProtoState0} = Tunnel,
- {ProtoState, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
+ {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- EvHandlerState1}
+ {State1, EvHandlerStateRet} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerStateRet}
end;
error ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState}
@@ -1034,15 +1039,16 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
%% Tunneled data.
data(State, RealStreamRef=[StreamRef|_], ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- {ProtoState, EvHandlerState} = Proto:data(ProtoState0, RealStreamRef,
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {Commands, EvHandlerState1} = Proto:data(ProtoState0, RealStreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- EvHandlerState};
+ {State1, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- {State, EvHandlerState0};
+ {[], EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
@@ -1165,23 +1171,24 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
- {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
EvHandlerState};
%% Tunneled request.
connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
%% @todo Should we send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, RealStreamRef,
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {Commands, EvHandlerState1} = Proto:connect(ProtoState0, RealStreamRef,
ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- EvHandlerState};
+ {State1, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- {State, EvHandlerState0};
+ {[], EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
EvHandlerState0}
@@ -1200,7 +1207,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
endpoint => local,
reason => cancel
}, EvHandlerState0),
- {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID),
+ {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID)},
EvHandlerState};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
@@ -1209,15 +1216,16 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
%% Tunneled request.
cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, RealStreamRef,
- ReplyTo, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- EvHandlerState};
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {Commands, EvHandlerState1} = Proto:cancel(ProtoState0,
+ RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {State1, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState};
#stream{tunnel=undefined} ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream is not a tunnel."}},
- {State, EvHandlerState0};
+ {[], EvHandlerState0};
error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
EvHandlerState0}
@@ -1352,19 +1360,19 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{
extensions=GunExtensions, opts=WsOpts}}},
- {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream),
+ {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
CookieStore, EvHandlerState};
ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
- Stream=#stream{tunnel=Tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
- {ProtoState, CookieStore, EvHandlerState} = Proto:ws_upgrade(
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
+ {Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade(
ProtoState0, RealStreamRef, ReplyTo,
Host, Port, Path, Headers, WsOpts,
CookieStore0, EvHandler, EvHandlerState0),
- {store_stream(State, Stream#stream{
- tunnel=Tunnel#tunnel{protocol_state=ProtoState}}),
- CookieStore, EvHandlerState}
+ {State1, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {{state, State1}, CookieStore, EvHandlerState}
%% @todo Error conditions?
end.
@@ -1399,12 +1407,12 @@ connection_error(#http2_state{socket=Socket, transport=Transport,
error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
- State.
+ {state, State}.
error_stream_not_found(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
- State.
+ {state, State}.
%% Streams.
diff --git a/src/gun_raw.erl b/src/gun_raw.erl
index 0988d1b..f92c777 100644
--- a/src/gun_raw.erl
+++ b/src/gun_raw.erl
@@ -44,10 +44,10 @@ init(ReplyTo, Socket, Transport, Opts) ->
StreamRef = maps:get(stream_ref, Opts, undefined),
{connected_data_only, #raw_state{ref=StreamRef, reply_to=ReplyTo, socket=Socket, transport=Transport}}.
-handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) ->
+handle(Data, #raw_state{ref=StreamRef, reply_to=ReplyTo}, CookieStore, _, EvHandlerState) ->
%% When we take over the entire connection there is no stream reference.
ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
- {{state, State}, CookieStore, EvHandlerState}.
+ {[], CookieStore, EvHandlerState}.
%% We can always close immediately.
closing(_, _, _, EvHandlerState) ->
@@ -57,7 +57,7 @@ close(_, _, _, EvHandlerState) ->
EvHandlerState.
%% @todo Initiate closing on IsFin=fin.
-data(State=#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef,
+data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef,
_ReplyTo, _IsFin, Data, _EvHandler, EvHandlerState) ->
Transport:send(Socket, Data),
- {State, EvHandlerState}.
+ {[], EvHandlerState}.
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
index 43e8767..40addf6 100644
--- a/src/gun_tunnel.erl
+++ b/src/gun_tunnel.erl
@@ -273,32 +273,32 @@ close(_Reason, _State, _EvHandler, EvHandlerState) ->
%% @todo Closing must be propagated to tunnels.
EvHandlerState.
-keepalive(State, _EvHandler, EvHandlerState) ->
+keepalive(_State, _EvHandler, EvHandlerState) ->
%% @todo Need to figure out how to handle keepalive for tunnels.
- {State, EvHandlerState}.
+ {[], EvHandlerState}.
%% We pass the headers forward and optionally dereference StreamRef.
-headers(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
+headers(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
- StreamRef = maybe_dereference(State, StreamRef0),
- {ProtoState, CookieStore, EvHandlerState} = Proto:headers(ProtoState0, StreamRef,
+ StreamRef = maybe_dereference(State0, StreamRef0),
+ {Commands, CookieStore, EvHandlerState1} = Proto:headers(ProtoState0, StreamRef,
ReplyTo, Method, Host, Port, Path, Headers,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState},
- CookieStore, EvHandlerState}.
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, CookieStore, EvHandlerState}.
%% We pass the request forward and optionally dereference StreamRef.
-request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
+request(State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
info=#{origin_host := OriginHost, origin_port := OriginPort}},
StreamRef0, ReplyTo, Method, _Host, _Port, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
- StreamRef = maybe_dereference(State, StreamRef0),
- {ProtoState, CookieStore, EvHandlerState} = Proto:request(ProtoState0, StreamRef,
+ StreamRef = maybe_dereference(State0, StreamRef0),
+ {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, StreamRef,
ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState},
- CookieStore, EvHandlerState}.
+ {State, EvHandlerState} = commands(Commands, State0, EvHandler, EvHandlerState1),
+ {{state, State}, CookieStore, EvHandlerState}.
%% When the next tunnel is SOCKS we pass the data forward directly.
%% This is needed because SOCKS has no StreamRef and the data cannot
@@ -306,9 +306,10 @@ request(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
data(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0,
protocol_origin={origin, _, _, _, socks5}},
StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0) ->
- {ProtoState, EvHandlerState} = Proto:data(ProtoState0, StreamRef,
+ {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState};
+ {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState};
%% CONNECT tunnels pass the data forward and dereference StreamRef
%% unless they are the recipient of the callback, in which case the
%% data is sent to the socket.
@@ -319,12 +320,14 @@ data(State=#tunnel_state{socket=Socket, transport=Transport,
case StreamRef0 of
TunnelStreamRef ->
ok = Transport:send(Socket, Data),
- {State, EvHandlerState0};
+ {[], EvHandlerState0};
_ ->
StreamRef = maybe_dereference(State, StreamRef0),
- {ProtoState, EvHandlerState} = Proto:data(ProtoState0, StreamRef,
+ {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef,
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}
+ {State1, EvHandlerState} = commands(Commands, State,
+ EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState}
end.
%% We pass the CONNECT request forward and optionally dereference StreamRef.
@@ -333,17 +336,19 @@ connect(State=#tunnel_state{info=#{origin_host := Host, origin_port := Port},
StreamRef0, ReplyTo, Destination, _, Headers, InitialFlow,
EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State, StreamRef0),
- {ProtoState, EvHandlerState} = Proto:connect(ProtoState0, StreamRef,
+ {Commands, EvHandlerState1} = Proto:connect(ProtoState0, StreamRef,
ReplyTo, Destination, #{host => Host, port => Port}, Headers, InitialFlow,
EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
+ {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState}.
cancel(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
StreamRef = maybe_dereference(State, StreamRef0),
- {ProtoState, EvHandlerState} = Proto:cancel(ProtoState0, StreamRef,
+ {Commands, EvHandlerState1} = Proto:cancel(ProtoState0, StreamRef,
ReplyTo, EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState}, EvHandlerState}.
+ {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
+ {{state, State1}, EvHandlerState}.
timeout(State=#tunnel_state{protocol=Proto, protocol_state=ProtoState0}, Msg, TRef) ->
case Proto:timeout(ProtoState0, Msg, TRef) of
@@ -426,11 +431,11 @@ ws_upgrade(State=#tunnel_state{info=TunnelInfo, protocol=Proto, protocol_state=P
origin_host := Host,
origin_port := Port
} = TunnelInfo,
- {ProtoState, CookieStore, EvHandlerState} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
+ {Commands, CookieStore, EvHandlerState1} = Proto:ws_upgrade(ProtoState0, StreamRef, ReplyTo,
Host, Port, Path, Headers, WsOpts,
CookieStore0, EvHandler, EvHandlerState0),
- {State#tunnel_state{protocol_state=ProtoState},
- CookieStore, EvHandlerState}.
+ {State1, EvHandlerState} = commands(Commands, State, EvHandler, EvHandlerState1),
+ {{state, State1}, CookieStore, EvHandlerState}.
ws_send(Frames, State0=#tunnel_state{protocol=Proto, protocol_state=ProtoState},
StreamRef0, ReplyTo, EvHandler, EvHandlerState0) ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 3a3969e..8cf2b49 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -299,8 +299,7 @@ close(_, _, _, EvHandlerState) ->
EvHandlerState.
keepalive(State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState0) ->
- {[], EvHandlerState} = send(ping, State, ReplyTo, EvHandler, EvHandlerState0),
- {State, EvHandlerState}.
+ send(ping, State, ReplyTo, EvHandler, EvHandlerState0).
%% Send one frame.
send(Frame, State=#ws_state{stream_ref=StreamRef,