diff options
-rw-r--r-- | doc/src/manual/gun.asciidoc | 7 | ||||
-rw-r--r-- | src/gun.erl | 48 | ||||
-rw-r--r-- | src/gun_data_h.erl | 2 | ||||
-rw-r--r-- | src/gun_http.erl | 54 | ||||
-rw-r--r-- | src/gun_http2.erl | 64 | ||||
-rw-r--r-- | src/gun_http3.erl | 14 | ||||
-rw-r--r-- | src/gun_raw.erl | 2 | ||||
-rw-r--r-- | src/gun_socks.erl | 4 | ||||
-rw-r--r-- | src/gun_sse_h.erl | 6 | ||||
-rw-r--r-- | src/gun_tunnel.erl | 6 | ||||
-rw-r--r-- | src/gun_ws.erl | 8 | ||||
-rw-r--r-- | src/gun_ws_h.erl | 6 | ||||
-rw-r--r-- | test/gun_SUITE.erl | 47 |
13 files changed, 160 insertions, 108 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index 60f217a..ece9d2a 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -442,7 +442,8 @@ Request headers. ---- req_opts() :: #{ flow => pos_integer(), - reply_to => pid() + reply_to => pid() | {module(), atom(), list()} + | fun((_) -> _) | {fun(), list()} } ---- @@ -457,7 +458,8 @@ flow control is disabled. reply_to (`self()`):: -The pid of the process that will receive the response messages. +The pid of the process that will receive the response messages, +or a function that will be called with the message as argument. === socks_opts() @@ -598,6 +600,7 @@ By default no user option is defined. == Changelog +* *2.2*: The `reply_to` option now accepts functions. * *2.1*: The HTTP/2 option list was updated with new options. * *2.0*: The `default_protocol` and `user_opts` Websocket options were added. diff --git a/src/gun.erl b/src/gun.erl index 5aa42d0..3fd5529 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -112,6 +112,7 @@ -export([connected_ws_only/3]). -export([closing/3]). -export([terminate/3]). +-export([reply/2]). -type req_headers() :: [{binary() | string() | atom(), iodata()}] | #{binary() | string() | atom() => iodata()}. @@ -198,9 +199,15 @@ }. -export_type([raw_opts/0]). +-type reply_to() :: pid() + | fun((_) -> _) + | {fun(), list()} + | {module(), atom(), list()}. +-export_type([reply_to/0]). + -type req_opts() :: #{ flow => pos_integer(), - reply_to => pid(), + reply_to => reply_to(), tunnel => stream_ref() }. -export_type([req_opts/0]). @@ -1221,7 +1228,7 @@ tls_handshake(internal, {tls_handshake, HandshakeEvent, Protocols, ReplyTo}, NewProtocolName -> {NewProtocolName, #{tunnel_transport => tls}} end, Protocol = gun_protocols:handler(NewProtocol), - ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()}, + reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Protocol:name()}), commands([ {switch_transport, gun_tls, TLSSocket}, {switch_protocol, NewProtocol, ReplyTo} @@ -1256,7 +1263,7 @@ tls_handshake(info, {gun_tls_proxy, Socket, {ok, Negotiated}, {HandshakeEvent, P NewProtocolName -> {NewProtocolName, #{tunnel_transport => tls}} end, Protocol = gun_protocols:handler(NewProtocol), - ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()}, + reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Protocol:name()}), EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ socket => Socket, protocol => Protocol:name() @@ -1318,7 +1325,7 @@ connected_protocol_init(internal, {connected, Retries, Socket, NewProtocol}, {next_event, internal, {retries, Retries, Reason}}}; {ok, StateName, ProtoState} -> %% @todo Don't send gun_up and gun_down if active/1 fails here. - Owner ! {gun_up, self(), Protocol:name()}, + reply(Owner, {gun_up, self(), Protocol:name()}), State1 = State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}, case active(State1) of {ok, State2} -> @@ -1340,9 +1347,9 @@ connected_data_only(cast, Msg, _) element(1, Msg) =:= connect; element(1, Msg) =:= ws_upgrade; element(1, Msg) =:= ws_send -> ReplyTo = element(2, Msg), - ReplyTo ! {gun_error, self(), {badstate, + reply(ReplyTo, {gun_error, self(), {badstate, "This connection does not accept new requests to be opened " - "nor does it accept Websocket frames."}}, + "nor does it accept Websocket frames."}}), keep_state_and_data; connected_data_only(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). @@ -1358,8 +1365,8 @@ connected_ws_only(cast, Msg, _) when element(1, Msg) =:= headers; element(1, Msg) =:= request; element(1, Msg) =:= data; element(1, Msg) =:= connect; element(1, Msg) =:= ws_upgrade -> ReplyTo = element(2, Msg), - ReplyTo ! {gun_error, self(), {badstate, - "This connection only accepts Websocket frames."}}, + reply(ReplyTo, {gun_error, self(), {badstate, + "This connection only accepts Websocket frames."}}), keep_state_and_data; connected_ws_only(Type, Event, State) -> handle_common_connected_no_input(Type, Event, ?FUNCTION_NAME, State). @@ -1462,23 +1469,23 @@ closing(state_timeout, closing_timeout, State=#state{status=Status}) -> %% When reconnect is disabled, fail HTTP/Websocket operations immediately. closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow}, State=#state{opts=#{retry := 0}}) -> - ReplyTo ! {gun_error, self(), StreamRef, closing}, + reply(ReplyTo, {gun_error, self(), StreamRef, closing}), {keep_state, State}; closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow}, State=#state{opts=#{retry := 0}}) -> - ReplyTo ! {gun_error, self(), StreamRef, closing}, + reply(ReplyTo, {gun_error, self(), StreamRef, closing}), {keep_state, State}; closing(cast, {connect, ReplyTo, StreamRef, _Destination, _Headers, _InitialFlow}, State=#state{opts=#{retry := 0}}) -> - ReplyTo ! {gun_error, self(), StreamRef, closing}, + reply(ReplyTo, {gun_error, self(), StreamRef, closing}), {keep_state, State}; closing(cast, {ws_upgrade, ReplyTo, StreamRef, _Path, _Headers}, State=#state{opts=#{retry := 0}}) -> - ReplyTo ! {gun_error, self(), StreamRef, closing}, + reply(ReplyTo, {gun_error, self(), StreamRef, closing}), {keep_state, State}; closing(cast, {ws_upgrade, ReplyTo, StreamRef, _Path, _Headers, _WsOpts}, State=#state{opts=#{retry := 0}}) -> - ReplyTo ! {gun_error, self(), StreamRef, closing}, + reply(ReplyTo, {gun_error, self(), StreamRef, closing}), {keep_state, State}; closing(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). @@ -1697,8 +1704,8 @@ handle_common(cast, {set_owner, CurrentOwner, NewOwner}, _, {keep_state, State#state{owner=NewOwner, status={up, NewOwnerRef}}}; %% We cannot change the owner when we are shutting down. handle_common(cast, {set_owner, CurrentOwner, _}, _, #state{owner=CurrentOwner}) -> - CurrentOwner ! {gun_error, self(), {badstate, - "The owner of the connection cannot be changed when the connection is shutting down."}}, + reply(CurrentOwner, {gun_error, self(), {badstate, + "The owner of the connection cannot be changed when the connection is shutting down."}}), keep_state_and_state; handle_common(cast, shutdown, StateName, State=#state{ status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> @@ -1851,7 +1858,7 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, %% We closed the socket, discard any remaining socket events. disconnect_flush(State1), KilledStreams = Protocol:down(ProtoState), - Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams}, + reply(Owner, {gun_down, self(), Protocol:name(), Reason, KilledStreams}), Retry = maps:get(retry, Opts, 5), State2 = keepalive_cancel(State1#state{ socket=undefined, protocol=undefined, protocol_state=undefined}), @@ -1928,3 +1935,12 @@ terminate(Reason, StateName, #state{event_handler=EvHandler, reason => Reason }, EvHandler:terminate(TerminateEvent, EvHandlerState). + +reply(Pid, Reply) when is_pid(Pid) -> + Pid ! Reply; +reply({M, F, A}, Reply) when is_atom(M), is_atom(F), is_list(A) -> + apply(M, F, [Reply|A]); +reply(Fun, Reply) when is_function(Fun, 1) -> + Fun(Reply); +reply({Fun, A}, Reply) when is_list(A), is_function(Fun, length(A) + 1) -> + apply(Fun, [Reply|A]). diff --git a/src/gun_data_h.erl b/src/gun_data_h.erl index 17019d2..930265e 100644 --- a/src/gun_data_h.erl +++ b/src/gun_data_h.erl @@ -29,5 +29,5 @@ init(ReplyTo, StreamRef, _, _, _) -> -spec handle(fin | nofin, binary(), State) -> {done, 1, State} when State::#state{}. handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> - ReplyTo ! {gun_data, self(), StreamRef, IsFin, Data}, + gun:reply(ReplyTo, {gun_data, self(), StreamRef, IsFin, Data}), {done, 1, State}. diff --git a/src/gun_http.erl b/src/gun_http.erl index 14edfeb..2dc2d67 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -45,7 +45,7 @@ -record(websocket, { ref :: gun:stream_ref(), - reply_to :: pid(), + reply_to :: gun:reply_to(), key :: binary(), extensions :: [binary()], opts :: gun:ws_opts() @@ -53,7 +53,7 @@ -record(stream, { ref :: gun:stream_ref() | connect_info() | #websocket{}, - reply_to :: pid(), + reply_to :: gun:reply_to(), flow :: integer() | infinity, method :: binary(), @@ -238,7 +238,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? RealStreamRef = stream_ref(State, StreamRef), - ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, + gun:reply(ReplyTo, {gun_trailers, self(), RealStreamRef, Trailers}), ResponseEvent = #{ stream_ref => RealStreamRef, reply_to => ReplyTo @@ -319,7 +319,7 @@ handle_connect(Rest, State=#http_state{ %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. _ = case Stream of #stream{is_alive=false} -> ok; - _ -> ReplyTo ! {gun_response, self(), RealStreamRef, fin, Status, Headers} + _ -> gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, fin, Status, Headers}) end, %% @todo Figure out whether the event should trigger if the stream was cancelled. EvHandlerState1 = EvHandler:response_headers(#{ @@ -355,7 +355,7 @@ handle_connect(Rest, State=#http_state{ [NewProtocol0] = maps:get(protocols, Destination, [http]), NewProtocol = gun_protocols:add_stream_ref(NewProtocol0, RealStreamRef), Protocol = gun_protocols:handler(NewProtocol), - ReplyTo ! {gun_tunnel_up, self(), RealStreamRef, Protocol:name()}, + gun:reply(ReplyTo, {gun_tunnel_up, self(), RealStreamRef, Protocol:name()}), {[ {origin, <<"http">>, NewHost, NewPort, connect}, {switch_protocol, NewProtocol, ReplyTo} @@ -382,17 +382,17 @@ handle_inform(Rest, State=#http_state{ %% @todo We shouldn't ignore Rest. {_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers), Upgrade = cow_http_hd:parse_upgrade(Upgrade0), - ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers}, + gun:reply(ReplyTo, {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers}), %% @todo We probably need to add_stream_ref? {{switch_protocol, raw, ReplyTo}, CookieStore, EvHandlerState0} catch _:_ -> %% When the Upgrade header is missing or invalid we treat %% the response as any other informational response. - ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, + gun:reply(ReplyTo, {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}), handle(Rest, State, CookieStore, EvHandler, EvHandlerState) end; _ -> - ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, + gun:reply(ReplyTo, {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}), handle(Rest, State, CookieStore, EvHandler, EvHandlerState) end. @@ -407,7 +407,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec false -> {undefined, EvHandlerState0}; true -> - ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, + gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, IsFin, Status, Headers}), EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, @@ -546,7 +546,7 @@ close_streams(_, [], _) -> close_streams(State, [#stream{is_alive=false}|Tail], Reason) -> close_streams(State, Tail, Reason); close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), Reason}), close_streams(State, Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. @@ -563,8 +563,8 @@ keepalive(_State, _, EvHandlerState) -> headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), - {badstate, "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}), {[], CookieStore, EvHandlerState}; headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, @@ -584,8 +584,8 @@ headers(State=#http_state{opts=Opts, out=head}, request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), - {badstate, "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}), {[], CookieStore, EvHandlerState}; request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, @@ -762,13 +762,13 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when is_list(StreamRef) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), - {badstate, "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}), {[], EvHandlerState}; connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when Streams =/= [] -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}), {[], EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, @@ -863,13 +863,13 @@ down(#http_state{streams=Streams}) -> end || #stream{ref=Ref} <- Streams]. error_stream_closed(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream has already been closed."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream has already been closed."}}), ok. error_stream_not_found(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream cannot be found."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream cannot be found."}}), ok. %% Headers information retrieval. @@ -959,13 +959,13 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), - {badstate, "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}), {[], CookieStore, EvHandlerState}; ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, CookieStore, _, EvHandlerState) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "Websocket cannot be used over an HTTP/1.0 connection."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "Websocket cannot be used over an HTTP/1.0 connection."}}), {[], CookieStore, EvHandlerState}; ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, Host, Port, Path, Headers0, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> @@ -1047,7 +1047,7 @@ ws_handshake_end(Buffer, end, %% Inform the user that the upgrade was successful and switch the protocol. RealStreamRef = stream_ref(State, StreamRef), - ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, + gun:reply(ReplyTo, {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}), {switch_protocol, {ws, #{ stream_ref => RealStreamRef, headers => Headers, diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 9b6353d..0774e46 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -66,7 +66,7 @@ ref :: reference(), %% Process to send messages to. - reply_to :: pid(), + reply_to :: gun:reply_to(), %% Flow control. flow :: integer() | infinity, @@ -83,7 +83,7 @@ }). -record(http2_state, { - reply_to :: pid(), + reply_to :: gun:reply_to(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:http2_opts(), @@ -359,8 +359,8 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, %% We notify remote settings changes only if the user requested it. _ = case Opts of #{notify_settings_changed := true} -> - ReplyTo ! {gun_notify, self(), settings_changed, - cow_http2_machine:get_remote_settings(HTTP2Machine)}; + gun:reply(ReplyTo, {gun_notify, self(), settings_changed, + cow_http2_machine:get_remote_settings(HTTP2Machine)}); _ -> ok end, @@ -468,8 +468,8 @@ tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel}, State, EvHandler, EvHandlerState); tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, State, _EvHandler, EvHandlerState) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), - {stream_error, Reason, 'Tunnel closed unexpectedly.'}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), + {stream_error, Reason, 'Tunnel closed unexpectedly.'}}), {{state, delete_stream(State, StreamID)}, EvHandlerState}; %% @todo Set a timeout for closing the Websocket stream. tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) -> @@ -512,7 +512,7 @@ headers_frame(State0=#http2_state{opts=Opts}, headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Status, Headers, EvHandler, EvHandlerState0) -> RealStreamRef = stream_ref(State, StreamRef), - ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, + gun:reply(ReplyTo, {gun_inform, self(), RealStreamRef, Status, Headers}), EvHandlerState = EvHandler:response_inform(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, @@ -566,7 +566,7 @@ headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_ origin_host => DestHost, origin_port => DestPort }, - ReplyTo ! {gun_response, self(), RealStreamRef, fin, Status, Headers}, + gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, fin, Status, Headers}), EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, @@ -656,7 +656,7 @@ headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=Re stream_ref => RealStreamRef, handle_continue_stream_ref => ContinueStreamRef }, - ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, + gun:reply(ReplyTo, {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}), Proto = gun_ws, EvHandlerState = EvHandler:protocol_changed(#{ stream_ref => RealStreamRef, @@ -681,7 +681,7 @@ headers_frame_response(State=#http2_state{content_handlers=Handlers0}, Stream=#stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, IsFin, Status, Headers, EvHandler, EvHandlerState0) -> RealStreamRef = stream_ref(State, StreamRef), - ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, + gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, IsFin, Status, Headers}), EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, @@ -708,7 +708,7 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? RealStreamRef = stream_ref(State, StreamRef), - ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, + gun:reply(ReplyTo, {gun_trailers, self(), RealStreamRef, Trailers}), ResponseEvent = #{ stream_ref => RealStreamRef, reply_to => ReplyTo @@ -720,8 +720,8 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), stream_ref(State0, StreamRef), - {stream_error, Reason, 'Stream reset by server.'}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State0, StreamRef), + {stream_error, Reason, 'Stream reset by server.'}}), EvHandlerState = EvHandler:cancel(#{ stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, @@ -753,8 +753,8 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, }, PushPromiseEvent = case Status of connected -> - ReplyTo ! {gun_push, self(), stream_ref(State, StreamRef), - RealPromisedStreamRef, Method, URI, Headers}, + gun:reply(ReplyTo, {gun_push, self(), stream_ref(State, StreamRef), + RealPromisedStreamRef, Method, URI, Headers}), PushPromiseEvent0#{promised_stream_ref => RealPromisedStreamRef}; _ -> PushPromiseEvent0 @@ -931,7 +931,7 @@ close_reason(Reason) -> {closed, Reason}. %% @todo Do we want an event for this? close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), Reason}), ok. keepalive(State=#http2_state{pings_unack=PingsUnack, opts=Opts}, _, EvHandlerState) @@ -996,8 +996,8 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, State, EvHandler, EvHandlerState1), {ResCommands, CookieStore, EvHandlerState}; #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}), {[], CookieStore0, EvHandlerState0}; error -> error_stream_not_found(State, StreamRef, ReplyTo), @@ -1068,8 +1068,8 @@ request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Stream, State, EvHandler, EvHandlerState1), {ResCommands, CookieStore, EvHandlerState}; #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}), {[], CookieStore0, EvHandlerState0}; error -> error_stream_not_found(State, StreamRef, ReplyTo), @@ -1149,8 +1149,8 @@ data(State, RealStreamRef=[StreamRef|_], ReplyTo, IsFin, Data, EvHandler, EvHand ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1); #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}), {[], EvHandlerState0}; error -> error_stream_not_found(State, StreamRef, ReplyTo), @@ -1240,7 +1240,7 @@ reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, ok -> case take_stream(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), StreamError}), {state, State}; error -> {state, State0} @@ -1318,8 +1318,8 @@ connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, He EvHandler, EvHandlerState0), tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1); #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}), {[], EvHandlerState0}; error -> error_stream_not_found(State, StreamRef, ReplyTo), @@ -1357,8 +1357,8 @@ cancel(State, RealStreamRef=[StreamRef|_], ReplyTo, EvHandler, EvHandlerState0) RealStreamRef, ReplyTo, EvHandler, EvHandlerState0), tunnel_commands(Commands, Stream, State, EvHandler, EvHandlerState1); #stream{tunnel=undefined} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream is not a tunnel."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream is not a tunnel."}}), {[], EvHandlerState0}; error -> error_stream_not_found(State, StreamRef, ReplyTo), @@ -1536,7 +1536,7 @@ connection_error(#http2_state{socket=Socket, transport=Transport, Pids = lists:usort(maps:fold( fun(_, #stream{reply_to=ReplyTo}, Acc) -> [ReplyTo|Acc] end, [], Streams)), - _ = [Pid ! {gun_error, self(), {Reason, HumanReadable}} || Pid <- Pids], + _ = [gun:reply(Pid, {gun_error, self(), {Reason, HumanReadable}}) || Pid <- Pids], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), Reason, <<>>)), @@ -1545,13 +1545,13 @@ connection_error(#http2_state{socket=Socket, transport=Transport, %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream has already been closed."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream has already been closed."}}), ok. error_stream_not_found(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, - "The stream cannot be found."}}, + gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate, + "The stream cannot be found."}}), ok. %% Streams. diff --git a/src/gun_http3.erl b/src/gun_http3.erl index 9994186..92b2118 100644 --- a/src/gun_http3.erl +++ b/src/gun_http3.erl @@ -49,7 +49,7 @@ ref :: reference(), %% Process to send messages to. - reply_to :: undefined | pid(), + reply_to :: undefined | gun:reply_to(), %% Whether the stream is currently in a special state. status :: header | {unidi, control | encoder | decoder} @@ -67,7 +67,7 @@ }). -record(http3_state, { - reply_to :: pid(), + reply_to :: gun:reply_to(), conn :: gun_quicer:quicer_connection_handle(), transport :: module(), opts = #{} :: gun:http2_opts(), @@ -382,7 +382,7 @@ headers_frame(State0=#http3_state{opts=Opts}, Stream, IsFin, Headers, headers_frame_inform(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Status, Headers, EvHandler, EvHandlerState0) -> RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), - ReplyTo ! {gun_inform, self(), RealStreamRef, Status, Headers}, + gun:reply(ReplyTo, {gun_inform, self(), RealStreamRef, Status, Headers}), EvHandlerState = EvHandler:response_inform(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, @@ -395,7 +395,7 @@ headers_frame_response(State=#http3_state{content_handlers=Handlers0}, Stream=#stream{ref=StreamRef, reply_to=ReplyTo}, IsFin, Status, Headers, EvHandler, EvHandlerState0) -> RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), - ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, + gun:reply(ReplyTo, {gun_response, self(), RealStreamRef, IsFin, Status, Headers}), EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => RealStreamRef, reply_to => ReplyTo, @@ -426,7 +426,7 @@ trailers_frame(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Trailers, EvHandler, EvHandlerState0) -> %% @todo We probably want to pass this to gun_content_handler? RealStreamRef = StreamRef, %% @todo stream_ref(State, StreamRef), - ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, + gun:reply(ReplyTo, {gun_trailers, self(), RealStreamRef, Trailers}), ResponseEvent = #{ stream_ref => RealStreamRef, reply_to => ReplyTo @@ -719,8 +719,8 @@ stream_update(State=#http3_state{streams=Streams}, stream_aborted(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> case stream_take(State0, StreamID) of {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef), - {stream_error, Reason, 'Stream reset by server.'}}, + gun:reply(ReplyTo, {gun_error, self(), StreamRef, %% @todo stream_ref(State0, StreamRef), + {stream_error, Reason, 'Stream reset by server.'}}), EvHandlerState = EvHandler:cancel(#{ stream_ref => StreamRef, %% @todo stream_ref(State, StreamRef), reply_to => ReplyTo, diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 50786e3..a3f1ef5 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -57,7 +57,7 @@ init(ReplyTo, Socket, Transport, Opts) -> handle(Data, State=#raw_state{ref=StreamRef, reply_to=ReplyTo, flow=Flow0}, CookieStore, _, EvHandlerState) -> %% When we take over the entire connection there is no stream reference. - ReplyTo ! {gun_data, self(), StreamRef, nofin, Data}, + gun:reply(ReplyTo, {gun_data, self(), StreamRef, nofin, Data}), Flow = case Flow0 of infinity -> infinity; _ -> Flow0 - 1 diff --git a/src/gun_socks.erl b/src/gun_socks.erl index 1b868a2..90b7042 100644 --- a/src/gun_socks.erl +++ b/src/gun_socks.erl @@ -27,7 +27,7 @@ -record(socks_state, { ref :: undefined | gun:stream_ref(), - reply_to :: pid(), + reply_to :: gun:reply_to(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: gun:socks_opts(), @@ -167,7 +167,7 @@ handle(<<5, 0, 0, Rest0/bits>>, #socks_state{ref=StreamRef, reply_to=ReplyTo, op [NewProtocol0] = maps:get(protocols, Opts, [http]), NewProtocol = gun_protocols:add_stream_ref(NewProtocol0, StreamRef), Protocol = gun_protocols:handler(NewProtocol), - ReplyTo ! {gun_tunnel_up, self(), StreamRef, Protocol:name()}, + gun:reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Protocol:name()}), [{origin, <<"http">>, NewHost, NewPort, socks5}, {switch_protocol, NewProtocol, ReplyTo}] end; diff --git a/src/gun_sse_h.erl b/src/gun_sse_h.erl index 03d190b..3efa832 100644 --- a/src/gun_sse_h.erl +++ b/src/gun_sse_h.erl @@ -19,7 +19,7 @@ -export([handle/3]). -record(state, { - reply_to :: pid(), + reply_to :: gun:reply_to(), stream_ref :: reference(), sse_state :: cow_sse:state() }). @@ -49,12 +49,12 @@ handle(IsFin, Data, State) -> handle(IsFin, Data, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, sse_state=SSE0}, Flow) -> case cow_sse:parse(Data, SSE0) of {event, Event, SSE} -> - ReplyTo ! {gun_sse, self(), StreamRef, Event}, + gun:reply(ReplyTo, {gun_sse, self(), StreamRef, Event}), handle(IsFin, <<>>, State#state{sse_state=SSE}, Flow + 1); {more, SSE} -> Inc = case IsFin of fin -> - ReplyTo ! {gun_sse, self(), StreamRef, fin}, + gun:reply(ReplyTo, {gun_sse, self(), StreamRef, fin}), 1; _ -> 0 diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index 789d1e3..559e8f7 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -41,7 +41,7 @@ %% We accept 'undefined' only to simplify the init code. socket = undefined :: #{ gun_pid := pid(), - reply_to := pid(), + reply_to := gun:reply_to(), stream_ref := gun:stream_ref(), handle_continue_stream_ref := gun:stream_ref() } | pid() | undefined, @@ -125,7 +125,7 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun _ = case TunnelProtocol of http -> ok; socks -> ok; - _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()} + _ -> gun:reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Proto:name()}) end, {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, protocol=Proto, protocol_state=ProtoState}, @@ -202,7 +202,7 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}) of {ok, _, ProtoState} -> - ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, + gun:reply(ReplyTo, {gun_tunnel_up, self(), StreamRef, Proto:name()}), {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, CookieStore, EvHandlerState}; Error={error, _} -> diff --git a/src/gun_ws.erl b/src/gun_ws.erl index c59686e..dfe1139 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -42,7 +42,7 @@ }). -record(ws_state, { - reply_to :: pid(), + reply_to :: gun:reply_to(), stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), @@ -84,6 +84,12 @@ do_check_options([Opt={protocols, L}|Opts]) when is_list(L) -> end; do_check_options([{reply_to, P}|Opts]) when is_pid(P) -> do_check_options(Opts); +do_check_options([{reply_to, F}|Opts]) when is_function(F, 1) -> + do_check_options(Opts); +do_check_options([{reply_to, {F, A}}|Opts]) when is_function(F, 1 + length(A)) -> + do_check_options(Opts); +do_check_options([{reply_to, {M, F, A}}|Opts]) when is_atom(M), is_atom(F), is_list(A) -> + do_check_options(Opts); do_check_options([{silence_pings, B}|Opts]) when is_boolean(B) -> do_check_options(Opts); do_check_options([{user_opts, _}|Opts]) -> diff --git a/src/gun_ws_h.erl b/src/gun_ws_h.erl index a7dc9fc..7549906 100644 --- a/src/gun_ws_h.erl +++ b/src/gun_ws_h.erl @@ -19,7 +19,7 @@ -export([handle/2]). -record(state, { - reply_to :: pid(), + reply_to :: gun:reply_to(), stream_ref :: reference(), frag_buffer = <<>> :: binary(), silence_pings :: boolean() @@ -34,11 +34,11 @@ handle({fragment, nofin, _, Payload}, {ok, 0, State#state{frag_buffer= << SoFar/binary, Payload/binary >>}}; handle({fragment, fin, Type, Payload}, State=#state{reply_to=ReplyTo, stream_ref=StreamRef, frag_buffer=SoFar}) -> - ReplyTo ! {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}}, + gun:reply(ReplyTo, {gun_ws, self(), StreamRef, {Type, << SoFar/binary, Payload/binary >>}}), {ok, 1, State#state{frag_buffer= <<>>}}; handle(Frame, State=#state{silence_pings=true}) when Frame =:= ping; Frame =:= pong; element(1, Frame) =:= ping; element(1, Frame) =:= pong -> {ok, 0, State}; handle(Frame, State=#state{reply_to=ReplyTo, stream_ref=StreamRef}) -> - ReplyTo ! {gun_ws, self(), StreamRef, Frame}, + gun:reply(ReplyTo, {gun_ws, self(), StreamRef, Frame}), {ok, 1, State}. diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl index 230bd99..98799c9 100644 --- a/test/gun_SUITE.erl +++ b/test/gun_SUITE.erl @@ -264,13 +264,25 @@ postpone_request_while_not_connected(_) -> reply_to_http(_) -> doc("The reply_to option allows using a separate process for requests."), - do_reply_to(http). + do_reply_to(http, pid). reply_to_http2(_) -> doc("The reply_to option allows using a separate process for requests."), - do_reply_to(http2). + do_reply_to(http2, pid). -do_reply_to(Protocol) -> +reply_to_http_f(_) -> + doc("The reply_to option allows using a fun for requests."), + do_reply_to(http, f). + +reply_to_http_fa(_) -> + doc("The reply_to option allows using a fun/args tuple for requests."), + do_reply_to(http, fa). + +reply_to_http_mfa(_) -> + doc("The reply_to option allows using an MFA tuple for requests."), + do_reply_to(http, mfa). + +do_reply_to(Protocol, ReplyToType) -> {ok, OriginPid, OriginPort} = init_origin(tcp, Protocol, fun(_, _, ClientSocket, ClientTransport) -> {ok, _} = ClientTransport:recv(ClientSocket, 0, infinity), @@ -302,23 +314,38 @@ do_reply_to(Protocol) -> ok = ClientTransport:send(ClientSocket, ResponseData), timer:sleep(1000) end), - {ok, Pid} = gun:open("localhost", OriginPort, #{protocols => [Protocol]}), - {ok, Protocol} = gun:await_up(Pid), + {ok, GunPid} = gun:open("localhost", OriginPort, #{protocols => [Protocol]}), + {ok, Protocol} = gun:await_up(GunPid), handshake_completed = receive_from(OriginPid), + do_reply_to_req(GunPid, ReplyToType), + gun:close(GunPid). + +do_reply_to_req(GunPid, pid) -> Self = self(), ReplyTo = spawn(fun() -> receive Ref when is_reference(Ref) -> - Response = gun:await(Pid, Ref, infinity), + Response = gun:await(GunPid, Ref, infinity), Self ! Response end end), - Ref = gun:get(Pid, "/", [], #{reply_to => ReplyTo}), + Ref = gun:get(GunPid, "/", [], #{reply_to => ReplyTo}), ReplyTo ! Ref, receive Msg -> - {response, _, _, _} = Msg, - gun:close(Pid) - end. + {response, _, _, _} = Msg + end; +do_reply_to_req(GunPid, ReplyToType) -> + ReplyTo = case ReplyToType of + f -> Self = self(), fun(Reply) -> Self ! Reply end; + fa -> {fun do_reply_to_fun/2, [self()]}; + mfa -> {?MODULE, do_reply_to_fun, [self()]} + end, + Ref = gun:get(GunPid, "/", [], #{reply_to => ReplyTo}), + {response, _, _, _} = gun:await(GunPid, Ref, infinity), + ok. + +do_reply_to_fun(Reply, DstPid) -> + DstPid ! Reply. retry_0(_) -> doc("Ensure Gun gives up immediately with retry=0."), |