diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 100 | ||||
-rw-r--r-- | src/gun_http.erl | 112 | ||||
-rw-r--r-- | src/gun_http2.erl | 2 | ||||
-rw-r--r-- | src/gun_tls.erl | 6 | ||||
-rw-r--r-- | src/gun_ws.erl | 2 |
5 files changed, 188 insertions, 34 deletions
diff --git a/src/gun.erl b/src/gun.erl index 115f603..af576fb 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -55,6 +55,11 @@ %% Streaming data. -export([data/4]). +%% Tunneling. +-export([connect/2]). +-export([connect/3]). +-export([connect/4]). + %% Awaiting gun messages. -export([await/2]). -export([await/3]). @@ -110,6 +115,22 @@ -export_type([opts/0]). %% @todo Add an option to disable/enable the notowner behavior. +-type connect_destination() :: #{ + host := inet:hostname() | inet:ip_address(), + port := inet:port_number(), + username => iodata(), + password => iodata(), + protocol => http | http2, + transport => tcp | tls, + tls_opts => [ssl:connect_option()], + tls_handshake_timeout => timeout() +}. +-export_type([connect_destination/0]). + +%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here +%% to indicate that the request must be sent on an existing CONNECT stream. +%% This is of course not required for HTTP/1.1 since the CONNECT takes over +%% the entire connection. -type req_opts() :: #{ reply_to => pid() }. @@ -137,8 +158,10 @@ parent :: pid(), owner :: pid(), owner_ref :: reference(), - host :: inet:hostname(), + host :: inet:hostname() | inet:ip_address(), port :: inet:port_number(), + origin_host :: inet:hostname() | inet:ip_address(), + origin_port :: inet:port_number(), opts :: opts(), keepalive_ref :: undefined | reference(), socket :: undefined | inet:socket() | ssl:sslsocket(), @@ -358,6 +381,7 @@ request(ServerPid, Method, Path, Headers) -> request(ServerPid, Method, Path, Headers, Body) -> request(ServerPid, Method, Path, Headers, Body, #{}). +%% @todo Accept header names as maps. -spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference(). request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_ref(), @@ -372,6 +396,23 @@ data(ServerPid, StreamRef, IsFin, Data) -> _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, ok. +%% Tunneling. + +-spec connect(pid(), connect_destination()) -> reference(). +connect(ServerPid, Destination) -> + connect(ServerPid, Destination, [], #{}). + +-spec connect(pid(), connect_destination(), headers()) -> reference(). +connect(ServerPid, Destination, Headers) -> + connect(ServerPid, Destination, Headers, #{}). + +-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference(). +connect(ServerPid, Destination, Headers, ReqOpts) -> + StreamRef = make_ref(), + ReplyTo = maps:get(reply_to, ReqOpts, self()), + _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers}, + StreamRef. + %% Awaiting gun messages. %% @todo spec await await_body @@ -565,6 +606,8 @@ ws_upgrade(ServerPid, Path, Headers, Opts) -> _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts}, StreamRef. +%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. +%% But it can be kept for the time being since it can still work for HTTP/1.1. -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> _ = ServerPid ! {ws_send, self(), Frames}, @@ -601,13 +644,14 @@ init(Parent, Owner, Host, Port, Opts) -> tls -> gun_tls end, OwnerRef = monitor(process, Owner), - connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, - host=Host, port=Port, opts=Opts, transport=Transport}, Retry). + transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, + host=Host, port=Port, origin_host=Host, origin_port=Port, + opts=Opts, transport=Transport}, Retry). default_transport(443) -> tls; default_transport(_) -> tcp. -connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) -> +transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) -> Protocols = [case P of http -> <<"http/1.1">>; http2 -> <<"h2">> @@ -626,7 +670,7 @@ connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tl {error, Reason} -> retry(State#state{last_error=Reason}, Retries) end; -connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> +transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> TransportOpts = [binary, {active, false} |maps:get(transport_opts, Opts, [])], case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of @@ -670,7 +714,7 @@ retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) -> _ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry), receive retry -> - connect(State, Retries); + transport_connect(State, Retries); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {retry_loop, State, Retries}) @@ -690,20 +734,18 @@ before_loop(State=#state{opts=Opts, protocol=Protocol}) -> end, loop(State#state{keepalive_ref=KeepaliveRef}). -loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, port=Port, opts=Opts, - socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> +loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, + origin_host=Host, origin_port=Port, opts=Opts, socket=Socket, + transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> {OK, Closed, Error} = Transport:messages(), Transport:setopts(Socket, [{active, once}]), receive {OK, Socket, Data} -> case Protocol:handle(Data, ProtoState) of - close -> - Transport:close(Socket), - down(State, normal); - {upgrade, Protocol2, ProtoState2} -> - ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); - ProtoState2 -> - loop(State#state{protocol_state=ProtoState2}) + Commands when is_list(Commands) -> + commands(Commands, State); + Command -> + commands([Command], State) end; {Closed, Socket} -> Protocol:close(ProtoState), @@ -736,6 +778,9 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por ProtoState2 = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data), loop(State#state{protocol_state=ProtoState2}); + {connect, ReplyTo, StreamRef, Destination, Headers} -> + ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), + loop(State#state{protocol_state=ProtoState2}); {cancel, ReplyTo, StreamRef} -> ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), loop(State#state{protocol_state=ProtoState2}); @@ -786,6 +831,31 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por loop(State) end. +commands([], State) -> + loop(State); +commands([close|_], State=#state{socket=Socket, transport=Transport}) -> + Transport:close(Socket), + down(State, normal); +commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) -> + Transport:close(Socket), + down(State, Error); +commands([{state, ProtoState}|Tail], State) -> + commands(Tail, State#state{protocol_state=ProtoState}); +%% @todo The scheme should probably not be ignored. +commands([{origin, _Scheme, Host, Port}|Tail], State) -> + commands(Tail, State#state{origin_host=Host, origin_port=Port}); +commands([{switch_transport, Transport, Socket}|Tail], State) -> + commands(Tail, State#state{socket=Socket, transport=Transport}); +%% @todo The two loops should be reunified and this clause generalized. +commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) -> + ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState}); +%% @todo And this state should probably not be ignored. +commands([{switch_protocol, Protocol, _ProtoState0}|Tail], + State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) -> + ProtoOpts = maps:get(http2_opts, Opts, #{}), + ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), + commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}). + ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> {OK, Closed, Error} = Transport:messages(), diff --git a/src/gun_http.erl b/src/gun_http.erl index d07502e..ee9d04f 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -23,6 +23,7 @@ -export([request/8]). -export([request/9]). -export([data/5]). +-export([connect/5]). -export([cancel/3]). -export([down/1]). -export([ws_upgrade/7]). @@ -30,10 +31,13 @@ -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked | body_trailer. %% @todo Make that a record. +-type connect_info() :: {connect, reference(), gun:connect_destination()}. + +%% @todo Make that a record. -type websocket_info() :: {websocket, reference(), binary(), [binary()], gun:ws_opts()}. %% key, extensions, options -record(stream, { - ref :: reference() | websocket_info(), + ref :: reference() | connect_info() | websocket_info(), reply_to :: pid(), method :: binary(), is_alive :: boolean(), @@ -87,7 +91,7 @@ init(Owner, Socket, Transport, Opts) -> %% Stop looping when we got no more data. handle(<<>>, State) -> - State; + {state, State}; %% Close when server responds and we don't have any open streams. handle(_, #http_state{streams=[]}) -> close; @@ -95,33 +99,33 @@ handle(_, #http_state{streams=[]}) -> handle(Data, State=#http_state{in=head, buffer=Buffer}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> State#http_state{buffer=Data2}; + nomatch -> {state, State#http_state{buffer=Data2}}; {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}) end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}) -> - send_data_if_alive(Data, State, nofin); + {state, send_data_if_alive(Data, State, nofin)}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, connection=Conn}) -> Buffer2 = << Buffer/binary, Data/binary >>, case cow_http_te:stream_chunked(Buffer2, InState) of more -> - State#http_state{buffer=Buffer2}; + {state, State#http_state{buffer=Buffer2}}; {more, Data2, InState2} -> - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin); + nofin)}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin); + nofin)}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - send_data_if_alive(Data2, + {state, send_data_if_alive(Data2, State#http_state{buffer=Rest, in_state=InState2}, - nofin); + nofin)}; {done, HasTrailers, Rest} -> IsFin = case HasTrailers of trailers -> nofin; @@ -156,7 +160,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> State#http_state{buffer=Data2}; + nomatch -> {state, State#http_state{buffer=Data2}}; {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? @@ -174,14 +178,14 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> if %% More data coming. DataSize < Length -> - send_data_if_alive(Data, + {state, send_data_if_alive(Data, State#http_state{in={body, Length - DataSize}}, - nofin); + nofin)}; %% Stream finished, no rest. DataSize =:= Length -> State1 = send_data_if_alive(Data, State, fin), case Conn of - keepalive -> end_stream(State1); + keepalive -> {state, end_stream(State1)}; close -> close end; %% Stream finished, rest. @@ -194,7 +198,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> end end. -handle_head(Data, State=#http_state{version=ClientVersion, +handle_head(Data, State=#http_state{socket=Socket, version=ClientVersion, content_handlers=Handlers0, connection=Conn, streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, method=Method, is_alive=IsAlive}|Tail]}) -> @@ -203,6 +207,44 @@ handle_head(Data, State=#http_state{version=ClientVersion, case {Status, StreamRef} of {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts); + {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> + case IsAlive of + false -> + ok; + true -> + ReplyTo ! {gun_response, self(), RealStreamRef, + fin, Status, Headers}, + ok + end, + %% We expect there to be no additional data after the CONNECT response. + <<>> = Rest2, + State2 = end_stream(State#http_state{streams=[Stream|Tail]}), + NewHost = maps:get(host, Destination), + NewPort = maps:get(port, Destination), + DestProtocol = maps:get(protocol, Destination, http), + case Destination of + #{transport := tls} -> + TLSOpts = maps:get(tls_opts, Destination, []), + TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity), + case gun_tls:connect(Socket, TLSOpts, TLSTimeout) of + {ok, TLSSocket} when DestProtocol =:= http2 -> + [{switch_transport, gun_tls, TLSSocket}, + {switch_protocol, gun_http2, State2}, + {origin, <<"https">>, NewHost, NewPort}]; + {ok, TLSSocket} -> + [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, + {switch_transport, gun_tls, TLSSocket}, + {origin, <<"https">>, NewHost, NewPort}]; + Error -> + Error + end; + _ when DestProtocol =:= http2 -> + [{switch_protocol, gun_http2, State2}, + {origin, <<"http">>, NewHost, NewPort}]; + _ -> + [{state, State2}, + {origin, <<"http">>, NewHost, NewPort}] + end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, handle(Rest2, State); @@ -211,7 +253,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, IsFin = case In of head -> fin; _ -> nofin end, Handlers = case IsAlive of false -> - ok; + undefined; true -> ReplyTo ! {gun_response, self(), stream_ref(StreamRef), IsFin, Status, Headers}, @@ -243,6 +285,7 @@ handle_head(Data, State=#http_state{version=ClientVersion, end end. +stream_ref({connect, StreamRef, _}) -> StreamRef; stream_ref({websocket, StreamRef, _, _, _}) -> StreamRef; stream_ref(StreamRef) -> StreamRef. @@ -372,6 +415,40 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, error_stream_not_found(State, StreamRef, ReplyTo) end. +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, + State; +connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, + StreamRef, ReplyTo, Destination=#{host := Host0}, Headers0) -> + Host = case Host0 of + Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); + _ -> Host0 + end, + Port = maps:get(port, Destination, 1080), + Authority = [Host, $:, integer_to_binary(Port)], + Headers1 = lists:keydelete(<<"content-length">>, 1, + lists:keydelete(<<"transfer-encoding">>, 1, Headers0)), + Headers2 = case lists:keymember(<<"host">>, 1, Headers1) of + false -> [{<<"host">>, Authority}|Headers1]; + true -> Headers1 + end, + HasProxyAuthorization = lists:keymember(<<"proxy-authorization">>, 1, Headers2), + Headers3 = case {HasProxyAuthorization, Destination} of + {false, #{username := UserID, password := Password}} -> + [{<<"proxy-authorization">>, [ + <<"Basic ">>, + base64:encode(iolist_to_binary([UserID, $:, Password]))]} + |Headers2]; + _ -> + Headers2 + end, + Headers = transform_header_names(State, Headers3), + Transport:send(Socket, [ + cow_http:request(<<"CONNECT">>, Authority, Version, Headers) + ]), + new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). + %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State, StreamRef, ReplyTo) -> case is_stream(State, StreamRef) of @@ -384,6 +461,7 @@ cancel(State, StreamRef, ReplyTo) -> %% HTTP does not provide any way to figure out what streams are unprocessed. down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of + {connect, Ref2, _} -> Ref2; {websocket, Ref2, _, _, _} -> Ref2; _ -> Ref end || #stream{ref=Ref} <- Streams], diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 89deea4..edbc7c0 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -124,7 +124,7 @@ parse(Data0, State0=#http2_state{buffer=Buffer}) -> Error = {connection_error, _, _} -> terminate(State0, Error); more -> - State0#http2_state{buffer=Data} + {state, State0#http2_state{buffer=Data}} end. %% DATA frame. diff --git a/src/gun_tls.erl b/src/gun_tls.erl index 6d749aa..f58620f 100644 --- a/src/gun_tls.erl +++ b/src/gun_tls.erl @@ -15,6 +15,7 @@ -module(gun_tls). -export([messages/0]). +-export([connect/3]). -export([connect/4]). -export([send/2]). -export([setopts/2]). @@ -23,6 +24,11 @@ messages() -> {ssl, ssl_closed, ssl_error}. +-spec connect(inet:socket(), any(), timeout()) + -> {ok, ssl:sslsocket()} | {error, atom()}. +connect(Socket, Opts, Timeout) -> + ssl:connect(Socket, Opts, Timeout). + -spec connect(inet:ip_address() | inet:hostname(), inet:port_number(), any(), timeout()) -> {ok, ssl:sslsocket()} | {error, atom()}. diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 5b6962b..b89840e 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -69,7 +69,7 @@ name() -> ws. init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), - {upgrade, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, + {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. |