aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--doc/src/manual/gun.asciidoc7
-rw-r--r--src/gun.erl48
-rw-r--r--src/gun_data_h.erl2
-rw-r--r--src/gun_http.erl54
-rw-r--r--src/gun_http2.erl64
-rw-r--r--src/gun_http3.erl14
-rw-r--r--src/gun_raw.erl2
-rw-r--r--src/gun_socks.erl4
-rw-r--r--src/gun_sse_h.erl6
-rw-r--r--src/gun_tunnel.erl6
-rw-r--r--src/gun_ws.erl8
-rw-r--r--src/gun_ws_h.erl6
-rw-r--r--test/gun_SUITE.erl47
13 files changed, 160 insertions, 108 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index 60f217a..ece9d2a 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -442,7 +442,8 @@ Request headers.
----
req_opts() :: #{
flow => pos_integer(),
- reply_to => pid()
+ reply_to => pid() | {module(), atom(), list()}
+ | fun((_) -> _) | {fun(), list()}
}
----
@@ -457,7 +458,8 @@ flow control is disabled.
reply_to (`self()`)::
-The pid of the process that will receive the response messages.
+The pid of the process that will receive the response messages,
+or a function that will be called with the message as argument.
=== socks_opts()
@@ -598,6 +600,7 @@ By default no user option is defined.
== Changelog
+* *2.2*: The `reply_to` option now accepts functions.
* *2.1*: The HTTP/2 option list was updated with new options.
* *2.0*: The `default_protocol` and `user_opts` Websocket
options were added.
diff --git a/src/gun.erl b/src/gun.erl
index 5aa42d0..3fd5529 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -112,6 +112,7 @@
-export([connected_ws_only/3]).
-export([closing/3]).
-export([terminate/3]).
+-export([reply/2]).
-type req_headers() :: [{binary() | string() | atom(), iodata()}]
| #{binary() | string() | atom() => iodata()}.
@@ -198,9 +199,15 @@
}.
-export_type([raw_opts/0]).
+-type reply_to() :: pid()
+ | fun((_) -> _)
+ | {fun(), list()}
+ | {module(), atom(), list()}.
+-export_type([reply_to/0]).
+
-type req_opts() :: #{
flow => pos_integer(),
- reply_to => pid(),
+ reply_to => reply_to(),
tunnel => stream_ref()
}.
-export_type([req_opts/0]).
@@ -1221,7 +1228,7 @@ tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols, ReplyTo},
NewProtocolName -> {NewProtocolName, #{tunnel_transport => tls}}
end,
Protocol = gun_protocols:handler(NewProtocol),
- ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()},
+ reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Protocol:name()}),
commands([
{switch_transport, gun_tls, TLSSocket},
{switch_protocol, NewProtocol, ReplyTo}
@@ -1256,7 +1263,7 @@ tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, P
NewProtocolName -> {NewProtocolName, #{tunnel_transport => tls}}
end,
Protocol = gun_protocols:handler(NewProtocol),
- ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()},
+ reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Protocol:name()}),
EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
socket => Socket,
protocol => Protocol:name()
@@ -1318,7 +1325,7 @@ connected_protocol_init(internal, {connected, Retries, Socket, NewProtocol},
{next_event, internal, {retries, Retries, Reason}}};
{ok, StateName, ProtoState} ->
%% @todo Don't send gun_up and gun_down if active/1 fails here.
- Owner ! {gun_up, self(), Protocol:name()},
+ reply(Owner, {gun_up, self(), Protocol:name()}),
State1 = State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState},
case active(State1) of
{ok, State2} ->
@@ -1340,9 +1347,9 @@ connected_data_only(cast, Msg, _)
element(1, Msg) =:= connect; element(1, Msg) =:= ws_upgrade;
element(1, Msg) =:= ws_send ->
ReplyTo = element(2, Msg),
- ReplyTo ! {gun_error, self(), {badstate,
+ reply(ReplyTo, {gun_error, self(), {badstate,
"This connection does not accept new requests to be opened "
- "nor does it accept Websocket frames."}},
+ "nor does it accept Websocket frames."}}),
keep_state_and_data;
connected_data_only(Type, Event, State) ->
handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
@@ -1358,8 +1365,8 @@ connected_ws_only(cast, Msg, _)
when element(1, Msg) =:= headers; element(1, Msg) =:= request; element(1, Msg) =:= data;
element(1, Msg) =:= connect; element(1, Msg) =:= ws_upgrade ->
ReplyTo = element(2, Msg),
- ReplyTo ! {gun_error, self(), {badstate,
- "This connection only accepts Websocket frames."}},
+ reply(ReplyTo, {gun_error, self(), {badstate,
+ "This connection only accepts Websocket frames."}}),
keep_state_and_data;
connected_ws_only(Type, Event, State) ->
handle_common_connected_no_input(Type, Event, ?FUNCTION_NAME, State).
@@ -1462,23 +1469,23 @@ closing(state_timeout, closing_timeout, State=#state{status=Status}) ->
%% When reconnect is disabled, fail HTTP/Websocket operations immediately.
closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow},
State=#state{opts=#{retry := 0}}) ->
- ReplyTo ! {gun_error, self(), StreamRef, closing},
+ reply(ReplyTo, {gun_error, self(), StreamRef, closing}),
{keep_state, State};
closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow},
State=#state{opts=#{retry := 0}}) ->
- ReplyTo ! {gun_error, self(), StreamRef, closing},
+ reply(ReplyTo, {gun_error, self(), StreamRef, closing}),
{keep_state, State};
closing(cast, {connect, ReplyTo, StreamRef, _Destination, _Headers, _InitialFlow},
State=#state{opts=#{retry := 0}}) ->
- ReplyTo ! {gun_error, self(), StreamRef, closing},
+ reply(ReplyTo, {gun_error, self(), StreamRef, closing}),
{keep_state, State};
closing(cast, {ws_upgrade, ReplyTo, StreamRef, _Path, _Headers},
State=#state{opts=#{retry := 0}}) ->
- ReplyTo ! {gun_error, self(), StreamRef, closing},
+ reply(ReplyTo, {gun_error, self(), StreamRef, closing}),
{keep_state, State};
closing(cast, {ws_upgrade, ReplyTo, StreamRef, _Path, _Headers, _WsOpts},
State=#state{opts=#{retry := 0}}) ->
- ReplyTo ! {gun_error, self(), StreamRef, closing},
+ reply(ReplyTo, {gun_error, self(), StreamRef, closing}),
{keep_state, State};
closing(Type, Event, State) ->
handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
@@ -1697,8 +1704,8 @@ handle_common(cast, {set_owner, CurrentOwner, NewOwner}, _,
{keep_state, State#state{owner=NewOwner, status={up, NewOwnerRef}}};
%% We cannot change the owner when we are shutting down.
handle_common(cast, {set_owner, CurrentOwner, _}, _, #state{owner=CurrentOwner}) ->
- CurrentOwner ! {gun_error, self(), {badstate,
- "The owner of the connection cannot be changed when the connection is shutting down."}},
+ reply(CurrentOwner, {gun_error, self(), {badstate,
+ "The owner of the connection cannot be changed when the connection is shutting down."}}),
keep_state_and_state;
handle_common(cast, shutdown, StateName, State=#state{
status=Status, socket=Socket, transport=Transport, protocol=Protocol}) ->
@@ -1851,7 +1858,7 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
%% We closed the socket, discard any remaining socket events.
disconnect_flush(State1),
KilledStreams = Protocol:down(ProtoState),
- Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams},
+ reply(Owner, {gun_down, self(), Protocol:name(), Reason, KilledStreams}),
Retry = maps:get(retry, Opts, 5),
State2 = keepalive_cancel(State1#state{
socket=undefined, protocol=undefined, protocol_state=undefined}),
@@ -1928,3 +1935,12 @@ terminate(Reason, StateName, #state{event_handler=EvHandler,
reason => Reason
},
EvHandler:terminate(TerminateEvent, EvHandlerState).
+
+reply(Pid, Reply) when is_pid(Pid) ->
+ Pid ! Reply;
+reply({M, F, A}, Reply) when is_atom(M), is_atom(F), is_list(A) ->
+ apply(M, F, [Reply|A]);
+reply(Fun, Reply) when is_function(Fun, 1) ->
+ Fun(Reply);
+reply({Fun, A}, Reply) when is_list(A), is_function(Fun, length(A) + 1) ->
+ apply(Fun, [Reply|A]).
diff --git a/src/gun_data_h.erl b/src/gun_data_h.erl
index 17019d2..930265e 100644
--- a/src/gun_data_h.erl
+++ b/src/gun_data_h.erl
@@ -29,5 +29,5 @@ init(ReplyTo, StreamRef, _, _, _) ->
-spec handle(fin | nofin, binary(), State) -> {done, 1, State} when State::#state{}.
handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
- ReplyTo ! {gun_data, self(), StreamRef, IsFin, Data},
+ gun:reply(ReplyTo, {gun_data, self(), StreamRef, IsFin, Data}),
{done, 1, State}.
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 14edfeb..2dc2d67 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -45,7 +45,7 @@
-record(websocket, {
ref :: gun:stream_ref(),
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
key :: binary(),
extensions :: [binary()],
opts :: gun:ws_opts()
@@ -53,7 +53,7 @@
-record(stream, {
ref :: gun:stream_ref() | connect_info() | #websocket{},
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
flow :: integer() | infinity,
method :: binary(),
@@ -238,7 +238,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
RealStreamRef = stream_ref(State, StreamRef),
- ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
+ gun:reply(ReplyTo, {gun_trailers, self(), RealStreamRef, Trailers}),
ResponseEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
@@ -319,7 +319,7 @@ handle_connect(Rest, State=#http_state{
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
_ = case Stream of
#stream{is_alive=false} -> ok;
- _ -> ReplyTo ! {gun_response, self(), RealStreamRef, fin, Status, Headers}
+ _ -> gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, fin, Status, Headers})
end,
%% @todo Figure out whether the event should trigger if the stream was cancelled.
EvHandlerState1 = EvHandler:response_headers(#{
@@ -355,7 +355,7 @@ handle_connect(Rest, State=#http_state{
[NewProtocol0] = maps:get(protocols, Destination, [http]),
NewProtocol = gun_protocols:add_stream_ref(NewProtocol0, RealStreamRef),
Protocol = gun_protocols:handler(NewProtocol),
- ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()},
+ gun:reply(ReplyTo, {gun_tunnel_up, self(), RealStreamRef, Protocol:name()}),
{[
{origin, <<"http">>, NewHost, NewPort, connect},
{switch_protocol, NewProtocol, ReplyTo}
@@ -382,17 +382,17 @@ handle_inform(Rest, State=#http_state{
%% @todo We shouldn't ignore Rest.
{_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers),
Upgrade = cow_http_hd:parse_upgrade(Upgrade0),
- ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers},
+ gun:reply(ReplyTo, {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers}),
%% @todo We probably need to add_stream_ref?
{{switch_protocol, raw, ReplyTo}, CookieStore, EvHandlerState0}
catch _:_ ->
%% When the Upgrade header is missing or invalid we treat
%% the response as any other informational response.
- ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
+ gun:reply(ReplyTo, {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}),
handle(Rest, State, CookieStore, EvHandler, EvHandlerState)
end;
_ ->
- ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
+ gun:reply(ReplyTo, {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}),
handle(Rest, State, CookieStore, EvHandler, EvHandlerState)
end.
@@ -407,7 +407,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
false ->
{undefined, EvHandlerState0};
true ->
- ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
+ gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, IsFin, Status, Headers}),
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
@@ -546,7 +546,7 @@ close_streams(_, [], _) ->
close_streams(State, [#stream{is_alive=false}|Tail], Reason) ->
close_streams(State, Tail, Reason);
close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), Reason}),
close_streams(State, Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
@@ -563,8 +563,8 @@ keepalive(_State, _, EvHandlerState) ->
headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
- {badstate, "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}}),
{[], CookieStore, EvHandlerState};
headers(State=#http_state{opts=Opts, out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
@@ -584,8 +584,8 @@ headers(State=#http_state{opts=Opts, out=head},
request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
- {badstate, "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}}),
{[], CookieStore, EvHandlerState};
request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo,
Method, Host, Port, Path, Headers, Body,
@@ -762,13 +762,13 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
- {badstate, "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}}),
{[], EvHandlerState};
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when Streams =/= [] ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}),
{[], EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
@@ -863,13 +863,13 @@ down(#http_state{streams=Streams}) ->
end || #stream{ref=Ref} <- Streams].
error_stream_closed(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream has already been closed."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream has already been closed."}}),
ok.
error_stream_not_found(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream cannot be found."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream cannot be found."}}),
ok.
%% Headers information retrieval.
@@ -959,13 +959,13 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState)
when is_list(StreamRef) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
- {badstate, "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}}),
{[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "Websocket cannot be used over an HTTP/1.0 connection."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "Websocket cannot be used over an HTTP/1.0 connection."}}),
{[], CookieStore, EvHandlerState};
ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
Host, Port, Path, Headers0, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
@@ -1047,7 +1047,7 @@ ws_handshake_end(Buffer,
end,
%% Inform the user that the upgrade was successful and switch the protocol.
RealStreamRef = stream_ref(State, StreamRef),
- ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers},
+ gun:reply(ReplyTo, {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}),
{switch_protocol, {ws, #{
stream_ref => RealStreamRef,
headers => Headers,
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 9b6353d..0774e46 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -66,7 +66,7 @@
ref :: reference(),
%% Process to send messages to.
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
%% Flow control.
flow :: integer() | infinity,
@@ -83,7 +83,7 @@
}).
-record(http2_state, {
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:http2_opts(),
@@ -359,8 +359,8 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket,
%% We notify remote settings changes only if the user requested it.
_ = case Opts of
#{notify_settings_changed := true} ->
- ReplyTo ! {gun_notify, self(), settings_changed,
- cow_http2_machine:get_remote_settings(HTTP2Machine)};
+ gun:reply(ReplyTo, {gun_notify, self(), settings_changed,
+ cow_http2_machine:get_remote_settings(HTTP2Machine)});
_ ->
ok
end,
@@ -468,8 +468,8 @@ tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
State, EvHandler, EvHandlerState);
tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
State, _EvHandler, EvHandlerState) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
- {stream_error, Reason, 'Tunnel closed unexpectedly.'}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {stream_error, Reason, 'Tunnel closed unexpectedly.'}}),
{{state, delete_stream(State, StreamID)}, EvHandlerState};
%% @todo Set a timeout for closing the Websocket stream.
tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
@@ -512,7 +512,7 @@ headers_frame(State0=#http2_state{opts=Opts},
headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo},
Status, Headers, EvHandler, EvHandlerState0) ->
RealStreamRef = stream_ref(State, StreamRef),
- ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
+ gun:reply(ReplyTo, {gun_inform, self(), RealStreamRef, Status, Headers}),
EvHandlerState = EvHandler:response_inform(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
@@ -566,7 +566,7 @@ headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_
origin_host => DestHost,
origin_port => DestPort
},
- ReplyTo ! {gun_response, self(), RealStreamRef, fin, Status, Headers},
+ gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, fin, Status, Headers}),
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
@@ -656,7 +656,7 @@ headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=Re
stream_ref => RealStreamRef,
handle_continue_stream_ref => ContinueStreamRef
},
- ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers},
+ gun:reply(ReplyTo, {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}),
Proto = gun_ws,
EvHandlerState = EvHandler:protocol_changed(#{
stream_ref => RealStreamRef,
@@ -681,7 +681,7 @@ headers_frame_response(State=#http2_state{content_handlers=Handlers0},
Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
IsFin, Status, Headers, EvHandler, EvHandlerState0) ->
RealStreamRef = stream_ref(State, StreamRef),
- ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
+ gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, IsFin, Status, Headers}),
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
@@ -708,7 +708,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
RealStreamRef = stream_ref(State, StreamRef),
- ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
+ gun:reply(ReplyTo, {gun_trailers, self(), RealStreamRef, Trailers}),
ResponseEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
@@ -720,8 +720,8 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef),
- {stream_error, Reason, 'Stream reset by server.'}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State0, StreamRef),
+ {stream_error, Reason, 'Stream reset by server.'}}),
EvHandlerState = EvHandler:cancel(#{
stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
@@ -753,8 +753,8 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
},
PushPromiseEvent = case Status of
connected ->
- ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef),
- RealPromisedStreamRef, Method, URI, Headers},
+ gun:reply(ReplyTo, {gun_push, self(), stream_ref(State, StreamRef),
+ RealPromisedStreamRef, Method, URI, Headers}),
PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef};
_ ->
PushPromiseEvent0
@@ -931,7 +931,7 @@ close_reason(Reason) -> {closed, Reason}.
%% @todo Do we want an event for this?
close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), Reason}),
ok.
keepalive(State=#http2_state{pings_unack=PingsUnack, opts=Opts}, _, EvHandlerState)
@@ -996,8 +996,8 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
{[], CookieStore0, EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
@@ -1068,8 +1068,8 @@ request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Stream, State, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState};
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
{[], CookieStore0, EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
@@ -1149,8 +1149,8 @@ data(State, RealStreamRef=[StreamRef|_], ReplyTo, IsFin, Data, EvHandler, EvHand
ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
{[], EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
@@ -1240,7 +1240,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
ok ->
case take_stream(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), StreamError}),
{state, State};
error ->
{state, State0}
@@ -1318,8 +1318,8 @@ connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, He
EvHandler, EvHandlerState0),
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
{[], EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
@@ -1357,8 +1357,8 @@ cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0)
RealStreamRef, ReplyTo, EvHandler, EvHandlerState0),
tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1);
#stream{tunnel=undefined} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
{[], EvHandlerState0};
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
@@ -1536,7 +1536,7 @@ connection_error(#http2_state{socket=Socket, transport=Transport,
Pids = lists:usort(maps:fold(
fun(_, #stream{reply_to=ReplyTo}, Acc) -> [ReplyTo|Acc] end,
[], Streams)),
- _ = [Pid ! {gun_error, self(), {Reason, HumanReadable}} || Pid <- Pids],
+ _ = [gun:reply(Pid, {gun_error, self(), {Reason, HumanReadable}}) || Pid <- Pids],
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
Reason, <<>>)),
@@ -1545,13 +1545,13 @@ connection_error(#http2_state{socket=Socket, transport=Transport,
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream has already been closed."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream has already been closed."}}),
ok.
error_stream_not_found(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream cannot be found."}},
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream cannot be found."}}),
ok.
%% Streams.
diff --git a/src/gun_http3.erl b/src/gun_http3.erl
index 9994186..92b2118 100644
--- a/src/gun_http3.erl
+++ b/src/gun_http3.erl
@@ -49,7 +49,7 @@
ref :: reference(),
%% Process to send messages to.
- reply_to :: undefined | pid(),
+ reply_to :: undefined | gun:reply_to(),
%% Whether the stream is currently in a special state.
status :: header | {unidi, control | encoder | decoder}
@@ -67,7 +67,7 @@
}).
-record(http3_state, {
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
conn :: gun_quicer:quicer_connection_handle(),
transport :: module(),
opts = #{} :: gun:http2_opts(),
@@ -382,7 +382,7 @@ headers_frame(State0=#http3_state{opts=Opts}, Stream, IsFin, Headers,
headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo},
Status, Headers, EvHandler, EvHandlerState0) ->
RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef),
- ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers},
+ gun:reply(ReplyTo, {gun_inform, self(), RealStreamRef, Status, Headers}),
EvHandlerState = EvHandler:response_inform(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
@@ -395,7 +395,7 @@ headers_frame_response(State=#http3_state{content_handlers=Handlers0},
Stream=#stream{ref=StreamRef, reply_to=ReplyTo},
IsFin, Status, Headers, EvHandler, EvHandlerState0) ->
RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef),
- ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
+ gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, IsFin, Status, Headers}),
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
@@ -426,7 +426,7 @@ trailers_frame(State, #stream{ref=StreamRef, reply_to=ReplyTo},
Trailers, EvHandler, EvHandlerState0) ->
%% @todo We probably want to pass this to gun_content_handler?
RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef),
- ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
+ gun:reply(ReplyTo, {gun_trailers, self(), RealStreamRef, Trailers}),
ResponseEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo
@@ -719,8 +719,8 @@ stream_update(State=#http3_state{streams=Streams},
stream_aborted(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
case stream_take(State0, StreamID) of
{#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef),
- {stream_error, Reason, 'Stream reset by server.'}},
+ gun:reply(ReplyTo, {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef),
+ {stream_error, Reason, 'Stream reset by server.'}}),
EvHandlerState = EvHandler:cancel(#{
stream_ref => StreamRef, %% @todo stream_ref(State, StreamRef),
reply_to => ReplyTo,
diff --git a/src/gun_raw.erl b/src/gun_raw.erl
index 50786e3..a3f1ef5 100644
--- a/src/gun_raw.erl
+++ b/src/gun_raw.erl
@@ -57,7 +57,7 @@ init(ReplyTo, Socket, Transport, Opts) ->
handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo, flow=Flow0},
CookieStore, _, EvHandlerState) ->
%% When we take over the entire connection there is no stream reference.
- ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
+ gun:reply(ReplyTo, {gun_data, self(), StreamRef, nofin, Data}),
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 - 1
diff --git a/src/gun_socks.erl b/src/gun_socks.erl
index 1b868a2..90b7042 100644
--- a/src/gun_socks.erl
+++ b/src/gun_socks.erl
@@ -27,7 +27,7 @@
-record(socks_state, {
ref :: undefined | gun:stream_ref(),
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
opts = #{} :: gun:socks_opts(),
@@ -167,7 +167,7 @@ handle(<<5, 0, 0, Rest0/bits>>, #socks_state{ref=StreamRef, reply_to=ReplyTo, op
[NewProtocol0] = maps:get(protocols, Opts, [http]),
NewProtocol = gun_protocols:add_stream_ref(NewProtocol0, StreamRef),
Protocol = gun_protocols:handler(NewProtocol),
- ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()},
+ gun:reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Protocol:name()}),
[{origin, <<"http">>, NewHost, NewPort, socks5},
{switch_protocol, NewProtocol, ReplyTo}]
end;
diff --git a/src/gun_sse_h.erl b/src/gun_sse_h.erl
index 03d190b..3efa832 100644
--- a/src/gun_sse_h.erl
+++ b/src/gun_sse_h.erl
@@ -19,7 +19,7 @@
-export([handle/3]).
-record(state, {
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
stream_ref :: reference(),
sse_state :: cow_sse:state()
}).
@@ -49,12 +49,12 @@ handle(IsFin, Data, State) ->
handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}, Flow) ->
case cow_sse:parse(Data, SSE0) of
{event, Event, SSE} ->
- ReplyTo ! {gun_sse, self(), StreamRef, Event},
+ gun:reply(ReplyTo, {gun_sse, self(), StreamRef, Event}),
handle(IsFin, <<>>, State#state{sse_state=SSE}, Flow + 1);
{more, SSE} ->
Inc = case IsFin of
fin ->
- ReplyTo ! {gun_sse, self(), StreamRef, fin},
+ gun:reply(ReplyTo, {gun_sse, self(), StreamRef, fin}),
1;
_ ->
0
diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl
index 789d1e3..559e8f7 100644
--- a/src/gun_tunnel.erl
+++ b/src/gun_tunnel.erl
@@ -41,7 +41,7 @@
%% We accept 'undefined' only to simplify the init code.
socket = undefined :: #{
gun_pid := pid(),
- reply_to := pid(),
+ reply_to := gun:reply_to(),
stream_ref := gun:stream_ref(),
handle_continue_stream_ref := gun:stream_ref()
} | pid() | undefined,
@@ -125,7 +125,7 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun
_ = case TunnelProtocol of
http -> ok;
socks -> ok;
- _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}
+ _ -> gun:reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Proto:name()})
end,
{tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport,
protocol=Proto, protocol_state=ProtoState},
@@ -202,7 +202,7 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated},
case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy,
ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}) of
{ok, _, ProtoState} ->
- ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()},
+ gun:reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Proto:name()}),
{{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}},
CookieStore, EvHandlerState};
Error={error, _} ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index c59686e..dfe1139 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -42,7 +42,7 @@
}).
-record(ws_state, {
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
@@ -84,6 +84,12 @@ do_check_options([Opt={protocols, L}|Opts]) when is_list(L) ->
end;
do_check_options([{reply_to, P}|Opts]) when is_pid(P) ->
do_check_options(Opts);
+do_check_options([{reply_to, F}|Opts]) when is_function(F, 1) ->
+ do_check_options(Opts);
+do_check_options([{reply_to, {F, A}}|Opts]) when is_function(F, 1 + length(A)) ->
+ do_check_options(Opts);
+do_check_options([{reply_to, {M, F, A}}|Opts]) when is_atom(M), is_atom(F), is_list(A) ->
+ do_check_options(Opts);
do_check_options([{silence_pings, B}|Opts]) when is_boolean(B) ->
do_check_options(Opts);
do_check_options([{user_opts, _}|Opts]) ->
diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl
index a7dc9fc..7549906 100644
--- a/src/gun_ws_h.erl
+++ b/src/gun_ws_h.erl
@@ -19,7 +19,7 @@
-export([handle/2]).
-record(state, {
- reply_to :: pid(),
+ reply_to :: gun:reply_to(),
stream_ref :: reference(),
frag_buffer = <<>> :: binary(),
silence_pings :: boolean()
@@ -34,11 +34,11 @@ handle({fragment, nofin, _, Payload},
{ok, 0, State#state{frag_buffer= << SoFar/binary, Payload/binary >>}};
handle({fragment, fin, Type, Payload},
State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) ->
- ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}},
+ gun:reply(ReplyTo, {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}}),
{ok, 1, State#state{frag_buffer= <<>>}};
handle(Frame, State=#state{silence_pings=true}) when Frame =:= ping; Frame =:= pong;
element(1, Frame) =:= ping; element(1, Frame) =:= pong ->
{ok, 0, State};
handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) ->
- ReplyTo ! {gun_ws, self(), StreamRef, Frame},
+ gun:reply(ReplyTo, {gun_ws, self(), StreamRef, Frame}),
{ok, 1, State}.
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index 230bd99..98799c9 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -264,13 +264,25 @@ postpone_request_while_not_connected(_) ->
reply_to_http(_) ->
doc("The reply_to option allows using a separate process for requests."),
- do_reply_to(http).
+ do_reply_to(http, pid).
reply_to_http2(_) ->
doc("The reply_to option allows using a separate process for requests."),
- do_reply_to(http2).
+ do_reply_to(http2, pid).
-do_reply_to(Protocol) ->
+reply_to_http_f(_) ->
+ doc("The reply_to option allows using a fun for requests."),
+ do_reply_to(http, f).
+
+reply_to_http_fa(_) ->
+ doc("The reply_to option allows using a fun/args tuple for requests."),
+ do_reply_to(http, fa).
+
+reply_to_http_mfa(_) ->
+ doc("The reply_to option allows using an MFA tuple for requests."),
+ do_reply_to(http, mfa).
+
+do_reply_to(Protocol, ReplyToType) ->
{ok, OriginPid, OriginPort} = init_origin(tcp, Protocol,
fun(_, _, ClientSocket, ClientTransport) ->
{ok, _} = ClientTransport:recv(ClientSocket, 0, infinity),
@@ -302,23 +314,38 @@ do_reply_to(Protocol) ->
ok = ClientTransport:send(ClientSocket, ResponseData),
timer:sleep(1000)
end),
- {ok, Pid} = gun:open("localhost", OriginPort, #{protocols => [Protocol]}),
- {ok, Protocol} = gun:await_up(Pid),
+ {ok, GunPid} = gun:open("localhost", OriginPort, #{protocols => [Protocol]}),
+ {ok, Protocol} = gun:await_up(GunPid),
handshake_completed = receive_from(OriginPid),
+ do_reply_to_req(GunPid, ReplyToType),
+ gun:close(GunPid).
+
+do_reply_to_req(GunPid, pid) ->
Self = self(),
ReplyTo = spawn(fun() ->
receive Ref when is_reference(Ref) ->
- Response = gun:await(Pid, Ref, infinity),
+ Response = gun:await(GunPid, Ref, infinity),
Self ! Response
end
end),
- Ref = gun:get(Pid, "/", [], #{reply_to => ReplyTo}),
+ Ref = gun:get(GunPid, "/", [], #{reply_to => ReplyTo}),
ReplyTo ! Ref,
receive
Msg ->
- {response, _, _, _} = Msg,
- gun:close(Pid)
- end.
+ {response, _, _, _} = Msg
+ end;
+do_reply_to_req(GunPid, ReplyToType) ->
+ ReplyTo = case ReplyToType of
+ f -> Self = self(), fun(Reply) -> Self ! Reply end;
+ fa -> {fun do_reply_to_fun/2, [self()]};
+ mfa -> {?MODULE, do_reply_to_fun, [self()]}
+ end,
+ Ref = gun:get(GunPid, "/", [], #{reply_to => ReplyTo}),
+ {response, _, _, _} = gun:await(GunPid, Ref, infinity),
+ ok.
+
+do_reply_to_fun(Reply, DstPid) ->
+ DstPid ! Reply.
retry_0(_) ->
doc("Ensure Gun gives up immediately with retry=0."),