From 1cc3b32b8ef6c187b8be3601319e21c1ba04fa27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 24 Oct 2022 14:09:59 +0200 Subject: Handle send errors --- src/gun.erl | 30 +++--- src/gun_http.erl | 136 ++++++++++++++++---------- src/gun_http2.erl | 282 +++++++++++++++++++++++++++++++++-------------------- src/gun_raw.erl | 6 +- src/gun_socks.erl | 30 ++++-- src/gun_tunnel.erl | 108 +++++++++++--------- src/gun_ws.erl | 33 ++++--- 7 files changed, 386 insertions(+), 239 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index bfb1bc4..4108b93 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -1237,17 +1237,25 @@ connected_ws_only(Type, Event, State) -> connected(internal, {connected, Socket, NewProtocol}, State0=#state{owner=Owner, opts=Opts, transport=Transport}) -> {Protocol, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), - %% @todo Handle error result from Protocol:init/4 - {ok, StateName, ProtoState} = Protocol:init(Owner, Socket, Transport, ProtoOpts), - Owner ! {gun_up, self(), Protocol:name()}, - case active(State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}) of - {ok, State} -> - case Protocol:has_keepalive() of - true -> {next_state, StateName, keepalive_timeout(State)}; - false -> {next_state, StateName, State} - end; - Disconnect -> - Disconnect + case Protocol:init(Owner, Socket, Transport, ProtoOpts) of + Error={error, _} -> + %% @todo Don't send gun_up and gun_down if Protocol:init/4 failes here. + Owner ! {gun_up, self(), Protocol:name()}, + disconnect(State0, Error); + {ok, StateName, ProtoState} -> + %% @todo Don't send gun_up and gun_down if active/1 failes here. + Owner ! {gun_up, self(), Protocol:name()}, + State1 = State0#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}, + case active(State1) of + {ok, State2} -> + State = case Protocol:has_keepalive() of + true -> keepalive_timeout(State2); + false -> State2 + end, + {next_state, StateName, State}; + Disconnect -> + Disconnect + end end; %% Public HTTP interface. %% diff --git a/src/gun_http.erl b/src/gun_http.erl index 4fc07ee..38806d0 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -550,8 +550,10 @@ keepalive(#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState {[], EvHandlerState}; %% We can only keep-alive by sending an empty line in-between streams. keepalive(#http_state{socket=Socket, transport=Transport, out=head}, _, EvHandlerState) -> - Transport:send(Socket, <<"\r\n">>), - {[], EvHandlerState}; + case Transport:send(Socket, <<"\r\n">>) of + ok -> {[], EvHandlerState}; + Error={error, _} -> {Error, EvHandlerState} + end; keepalive(_State, _, EvHandlerState) -> {[], EvHandlerState}. @@ -563,13 +565,18 @@ headers(State, StreamRef, ReplyTo, _, _, _, _, _, _, CookieStore, _, EvHandlerSt headers(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> - {Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, + {SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, undefined, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - InitialFlow = initial_flow(InitialFlow0, Opts), - {{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, - ReplyTo, Method, Authority, Path, InitialFlow)}, - CookieStore, EvHandlerState}. + Command = case SendResult of + ok -> + InitialFlow = initial_flow(InitialFlow0, Opts), + {state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, + ReplyTo, Method, Authority, Path, InitialFlow)}; + Error={error, _} -> + Error + end, + {Command, CookieStore, EvHandlerState}. request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandlerState) when is_list(StreamRef) -> @@ -579,13 +586,18 @@ request(State, StreamRef, ReplyTo, _, _, _, _, _, _, _, CookieStore, _, EvHandle request(State=#http_state{opts=Opts, out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0) -> - {Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, + {SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - InitialFlow = initial_flow(InitialFlow0, Opts), - {{state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, - ReplyTo, Method, Authority, Path, InitialFlow)}, - CookieStore, EvHandlerState}. + Command = case SendResult of + ok -> + InitialFlow = initial_flow(InitialFlow0, Opts), + {state, new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, + ReplyTo, Method, Authority, Path, InitialFlow)}; + Error={error, _} -> + Error + end, + {Command, CookieStore, EvHandlerState}. initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; initial_flow(InitialFlow, _) -> InitialFlow. @@ -632,7 +644,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), - Transport:send(Socket, [ + SendResult = Transport:send(Socket, [ cow_http:request(Method, Path, Version, Headers), [Body || Body =/= undefined]]), EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), @@ -646,7 +658,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi _ -> EvHandlerState2 end, - {Authority, Conn, Out, CookieStore, EvHandlerState}. + {SendResult, Authority, Conn, Out, CookieStore, EvHandlerState}. host_header(TransportName, Host0, Port) -> Host = case Host0 of @@ -692,41 +704,52 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, DataLength = iolist_size(Data), case Out of body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin -> - if + DataToSend = if DataLength =:= 0 -> - Transport:send(Socket, cow_http_te:last_chunk()); + cow_http_te:last_chunk(); true -> - Transport:send(Socket, [ + [ cow_http_te:chunk(Data), cow_http_te:last_chunk() - ]) + ] end, - RequestEndEvent = #{ - stream_ref => stream_ref(State, StreamRef), - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), - {{state, State#http_state{out=head}}, EvHandlerState}; + case Transport:send(Socket, DataToSend) of + ok -> + RequestEndEvent = #{ + stream_ref => stream_ref(State, StreamRef), + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, + EvHandlerState0), + {{state, State#http_state{out=head}}, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState0} + end; body_chunked when Version =:= 'HTTP/1.1' -> - Transport:send(Socket, cow_http_te:chunk(Data)), - {[], EvHandlerState0}; + case Transport:send(Socket, cow_http_te:chunk(Data)) of + ok -> {[], EvHandlerState0}; + Error={error, _} -> {Error, EvHandlerState0} + end; {body, Length} when DataLength =< Length -> - Transport:send(Socket, Data), Length2 = Length - DataLength, - if - Length2 =:= 0, IsFin =:= fin -> + case Transport:send(Socket, Data) of + ok when Length2 =:= 0, IsFin =:= fin -> RequestEndEvent = #{ stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), {{state, State#http_state{out=head}}, EvHandlerState}; - Length2 > 0, IsFin =:= nofin -> - {{state, State#http_state{out={body, Length2}}}, EvHandlerState0} + ok when Length2 > 0, IsFin =:= nofin -> + {{state, State#http_state{out={body, Length2}}}, EvHandlerState0}; + Error={error, _} -> + {Error, EvHandlerState0} end; body_chunked -> %% HTTP/1.0 - Transport:send(Socket, Data), - {[], EvHandlerState0} + case Transport:send(Socket, Data) of + ok -> {[], EvHandlerState0}; + Error={error, _} -> {Error, EvHandlerState0} + end end; _ -> error_stream_not_found(State, StreamRef, ReplyTo), @@ -779,19 +802,22 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), - Transport:send(Socket, [ - cow_http:request(<<"CONNECT">>, Authority, Version, Headers) - ]), - EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), - InitialFlow = initial_flow(InitialFlow0, Opts), - {{state, new_stream(State, {connect, StreamRef, Destination}, ReplyTo, - <<"CONNECT">>, Authority, <<>>, InitialFlow)}, - EvHandlerState}. + case Transport:send(Socket, cow_http:request(<<"CONNECT">>, + Authority, Version, Headers)) of + ok -> + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), + InitialFlow = initial_flow(InitialFlow0, Opts), + {{state, new_stream(State, {connect, StreamRef, Destination}, + ReplyTo, <<"CONNECT">>, Authority, <<>>, InitialFlow)}, + EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState1} + end. %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> @@ -960,14 +986,20 @@ ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, {<<"sec-websocket-key">>, Key} |Headers2 ], - {Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, + {SendResult, Authority, Conn, Out, CookieStore, EvHandlerState} = send_request(State, StreamRef, ReplyTo, <<"GET">>, Host, Port, Path, Headers, undefined, CookieStore0, EvHandler, EvHandlerState0, ?FUNCTION_NAME), - InitialFlow = maps:get(flow, WsOpts, infinity), - {{state, new_stream(State#http_state{connection=Conn, out=Out}, - #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, extensions=GunExtensions, opts=WsOpts}, - ReplyTo, <<"GET">>, Authority, Path, InitialFlow)}, - CookieStore, EvHandlerState}. + Command = case SendResult of + ok -> + InitialFlow = maps:get(flow, WsOpts, infinity), + {state, new_stream(State#http_state{connection=Conn, out=Out}, + #websocket{ref=StreamRef, reply_to=ReplyTo, key=Key, + extensions=GunExtensions, opts=WsOpts}, + ReplyTo, <<"GET">>, Authority, Path, InitialFlow)}; + Error={error, _} -> + Error + end, + {Command, CookieStore, EvHandlerState}. ws_handshake(Buffer, State, Ws=#websocket{key=Key}, Headers) -> %% @todo check upgrade, connection diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 46093e0..292504b 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -182,11 +182,14 @@ init(ReplyTo, Socket, Transport, Opts0) -> BaseStreamRef = maps:get(stream_ref, Opts, undefined), TunnelTransport = maps:get(tunnel_transport, Opts, undefined), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts#{message_tag => BaseStreamRef}), - State = #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport, - opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, - content_handlers=Handlers, http2_machine=HTTP2Machine}, - Transport:send(Socket, Preface), - {ok, connected, State}. + case Transport:send(Socket, Preface) of + ok -> + {ok, connected, #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport, + opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, + content_handlers=Handlers, http2_machine=HTTP2Machine}}; + Error={error, _Reason} -> + Error + end. switch_transport(Transport, Socket, State) -> State#http2_state{socket=Socket, transport=Transport}. @@ -349,13 +352,18 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, _ -> ok end, - Transport:send(Socket, cow_http2:settings_ack()); + case Transport:send(Socket, cow_http2:settings_ack()) of + ok -> {state, State}; + Error={error, _} -> Error + end; {ping, Opaque} -> - Transport:send(Socket, cow_http2:ping_ack(Opaque)); + case Transport:send(Socket, cow_http2:ping_ack(Opaque)) of + ok -> {state, State}; + Error={error, _} -> Error + end; _ -> - ok - end, - {state, State}. + {state, State} + end. data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of @@ -373,6 +381,7 @@ data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerStat {ResCommands, CookieStore, EvHandlerState} end. +%% Send errors are returned. Other errors cause the stream to be deleted. tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState) when not is_list(Command) -> tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState); @@ -601,11 +610,15 @@ headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_ } } end, - {tunnel, ProtoState, EvHandlerState} = Proto:init( - ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3), - {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, - info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})}, - EvHandlerState}. + case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3) of + {tunnel, ProtoState, EvHandlerState} -> + {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, + info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})}, + EvHandlerState}; + %% @todo We should not error out the entire connection on tunnel errors. + Error={error, _} -> + {Error, EvHandlerState3} + end. headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, tunnel=Tunnel=#tunnel{info=#websocket_info{opts=WsOpts}}}, @@ -730,8 +743,12 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, %% We cancel the push_promise immediately when we are shutting down. _ -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), - Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), - {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState} + case Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)) of + ok -> + {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState} + end end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> @@ -788,12 +805,14 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, ok -> {state, State}; {ok, Increment, HTTP2Machine} -> - Transport:send(Socket, cow_http2:window_update(Increment)), - {state, State#http2_state{http2_machine=HTTP2Machine}} + case Transport:send(Socket, cow_http2:window_update(Increment)) of + ok -> {state, State#http2_state{http2_machine=HTTP2Machine}}; + Error={error, _} -> Error + end end. %% Update both the connection and the stream's window. -update_window(State=#http2_state{socket=Socket, transport=Transport, +update_window(State0=#http2_state{socket=Socket, transport=Transport, opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow}, http2_machine=HTTP2Machine0}, StreamID) -> {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of @@ -804,11 +823,16 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, ok -> {<<>>, HTTP2Machine2}; {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} end, + State = State0#http2_state{http2_machine=HTTP2Machine}, case {Data1, Data2} of - {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) - end, - {state, State#http2_state{http2_machine=HTTP2Machine}}. + {<<>>, <<>>} -> + {state, State}; + _ -> + case Transport:send(Socket, [Data1, Data2]) of + ok -> {state, State}; + Error={error, _} -> Error + end + end. %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than @@ -823,10 +847,12 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT }, case Status of connected -> - Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - no_error, <<>>)), - {state, State#http2_state{status=goaway}}; + case Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)) of + ok -> {state, State#http2_state{status=goaway}}; + Error={error, _} -> Error + end; _ -> {state, State} end. @@ -851,13 +877,17 @@ closing(Reason0, State=#http2_state{socket=Socket, transport=Transport, owner_down -> no_error; _ -> internal_error end, - Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - Reason, <<>>)), - {[ - {state, State#http2_state{status=closing}}, - closing(State) - ], EvHandlerState}. + case Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + Reason, <<>>)) of + ok -> + {[ + {state, State#http2_state{status=closing}}, + closing(State) + ], EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState} + end. closing(#http2_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), @@ -879,8 +909,10 @@ close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> ok. keepalive(#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> - Transport:send(Socket, cow_http2:ping(0)), - {[], EvHandlerState}. + case Transport:send(Socket, cow_http2:ping(0)) of + ok -> {[], EvHandlerState}; + Error={error, _} -> {Error, EvHandlerState} + end. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, @@ -903,13 +935,19 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - InitialFlow = initial_flow(InitialFlow0, Opts), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path}, - {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - CookieStore, EvHandlerState}; + case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of + ok -> + EvHandlerState = EvHandler:request_headers(RequestEvent, + EvHandlerState1), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + flow=InitialFlow, authority=Authority, path=Path}, + {{state, create_stream(State#http2_state{ + http2_machine=HTTP2Machine}, Stream)}, CookieStore, + EvHandlerState}; + Error={error, _} -> + {Error, CookieStore, EvHandlerState1} + end; %% Tunneled request. headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> @@ -960,23 +998,27 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, end, {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, IsFin0, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - InitialFlow = initial_flow(InitialFlow0, Opts), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path}, - State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream), - case IsFin of - fin -> - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; - nofin -> - {StateOrError, EvHandlerStateRet} = maybe_send_data( - State, StreamID, fin, Body, EvHandler, EvHandlerState), - {StateOrError, CookieStore, EvHandlerStateRet} + case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of + ok -> + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, + authority=Authority, path=Path}, + State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream), + case IsFin of + fin -> + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; + nofin -> + {StateOrError, EvHandlerStateRet} = maybe_send_data( + State, StreamID, fin, Body, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet} + end; + Error={error, _} -> + {Error, CookieStore, EvHandlerState1} end; %% Tunneled request. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, @@ -1132,8 +1174,12 @@ send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), - {state, State}; + case Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)) of + ok -> + {state, State}; + Error={error, _} -> + Error + end; %% @todo Uncomment this once sendfile is supported. %send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, % StreamID, IsFin, {sendfile, Offset, Bytes, Path}) -> @@ -1145,18 +1191,26 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) -> {ok, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers), - Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), - {state, State#http2_state{http2_machine=HTTP2Machine}}. + case Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)) of + ok -> + {state, State#http2_state{http2_machine=HTTP2Machine}}; + Error={error, _} -> + Error + end. reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, StreamID, StreamError={stream_error, Reason, _}) -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), - case take_stream(State0, StreamID) of - {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, - {state, State}; - error -> - {state, State0} + case Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)) of + ok -> + case take_stream(State0, StreamID) of + {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, + {state, State}; + error -> + {state, State0} + end; + Error={error, _} -> + Error end. connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, @@ -1200,18 +1254,23 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), - EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), - InitialFlow = initial_flow(InitialFlow0, Opts), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, - {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - EvHandlerState}; + case Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)) of + ok -> + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + flow=InitialFlow, authority=Authority, path= <<>>, + tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, + EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState1} + end; %% Tunneled request. connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, EvHandler, EvHandlerState0) -> @@ -1237,15 +1296,19 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), - Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), - EvHandlerState = EvHandler:cancel(#{ - stream_ref => stream_ref(State, StreamRef), - reply_to => ReplyTo, - endpoint => local, - reason => cancel - }, EvHandlerState0), - {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID)}, - EvHandlerState}; + case Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)) of + ok -> + EvHandlerState = EvHandler:cancel(#{ + stream_ref => stream_ref(State, StreamRef), + reply_to => ReplyTo, + endpoint => local, + reason => cancel + }, EvHandlerState0), + {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, + StreamID)}, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState0} + end; error -> error_stream_not_found(State, StreamRef, ReplyTo), {[], EvHandlerState0} @@ -1384,19 +1447,26 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders#{protocol => <<"websocket">>}, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), - InitialFlow = maps:get(flow, WsOpts, infinity), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{ - extensions=GunExtensions, opts=WsOpts}}}, - {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - CookieStore, EvHandlerState}; + case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of + ok -> + EvHandlerState2 = EvHandler:request_headers(RequestEvent, + EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, + EvHandlerState2), + InitialFlow = maps:get(flow, WsOpts, infinity), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + flow=InitialFlow, authority=Authority, path=Path, + tunnel=#tunnel{info=#websocket_info{ + extensions=GunExtensions, opts=WsOpts}}}, + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream)}, CookieStore, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState1} + end; ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of diff --git a/src/gun_raw.erl b/src/gun_raw.erl index 464b637..480d6bc 100644 --- a/src/gun_raw.erl +++ b/src/gun_raw.erl @@ -59,5 +59,7 @@ close(_, _, _, EvHandlerState) -> %% @todo Initiate closing on IsFin=fin. data(#raw_state{ref=StreamRef, socket=Socket, transport=Transport}, StreamRef, _ReplyTo, _IsFin, Data, _EvHandler, EvHandlerState) -> - Transport:send(Socket, Data), - {[], EvHandlerState}. + case Transport:send(Socket, Data) of + ok -> {[], EvHandlerState}; + Error={error, _} -> {Error, EvHandlerState} + end. diff --git a/src/gun_socks.erl b/src/gun_socks.erl index b94bc2c..51f1e0b 100644 --- a/src/gun_socks.erl +++ b/src/gun_socks.erl @@ -92,10 +92,14 @@ init(ReplyTo, Socket, Transport, Opts) -> {username_password, _, _} -> <<2>>; none -> <<0>> end || A <- Auth>>, - Transport:send(Socket, [<<5, (length(Auth))>>, Methods]), - {ok, connected_no_input, #socks_state{ref=StreamRef, reply_to=ReplyTo, - socket=Socket, transport=Transport, - opts=Opts, version=Version, status=auth_method_select}}. + case Transport:send(Socket, [<<5, (length(Auth))>>, Methods]) of + ok -> + {ok, connected_no_input, #socks_state{ref=StreamRef, reply_to=ReplyTo, + socket=Socket, transport=Transport, + opts=Opts, version=Version, status=auth_method_select}}; + Error={error, _Reason} -> + Error + end. switch_transport(Transport, Socket, State) -> State#socks_state{socket=Socket, transport=Transport}. @@ -105,20 +109,26 @@ handle(Data, State, CookieStore, _, EvHandlerState) -> %% No authentication. handle(<<5, 0>>, State=#socks_state{version=5, status=auth_method_select}) -> - send_socks5_connect(State), - {state, State#socks_state{status=connect}}; + case send_socks5_connect(State) of + ok -> {state, State#socks_state{status=connect}}; + Error={error, _} -> Error + end; %% Username/password authentication. handle(<<5, 2>>, State=#socks_state{socket=Socket, transport=Transport, opts=#{auth := AuthMethods}, version=5, status=auth_method_select}) -> [{username_password, Username, Password}] = [Method || Method <- AuthMethods], ULen = byte_size(Username), PLen = byte_size(Password), - Transport:send(Socket, <<1, ULen, Username/binary, PLen, Password/binary>>), - {state, State#socks_state{status=auth_username_password}}; + case Transport:send(Socket, <<1, ULen, Username/binary, PLen, Password/binary>>) of + ok -> {state, State#socks_state{status=auth_username_password}}; + Error={error, _} -> Error + end; %% Username/password authentication successful. handle(<<1, 0>>, State=#socks_state{version=5, status=auth_username_password}) -> - send_socks5_connect(State), - {state, State#socks_state{status=connect}}; + case send_socks5_connect(State) of + ok -> {state, State#socks_state{status=connect}}; + Error={error, _} -> Error + end; %% Username/password authentication error. handle(<<1, _>>, #socks_state{version=5, status=auth_username_password}) -> {error, {socks5, username_password_auth_failure}}; diff --git a/src/gun_tunnel.erl b/src/gun_tunnel.erl index 1582a9d..3f7babf 100644 --- a/src/gun_tunnel.erl +++ b/src/gun_tunnel.erl @@ -113,23 +113,26 @@ init(ReplyTo, OriginSocket, OriginTransport, Opts=#{stream_ref := StreamRef, tun %% Initialize the protocol. #{new_protocol := NewProtocol} -> {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), - %% @todo Handle error result from Proto:init/4 - {ok, _, ProtoState} = Proto:init(ReplyTo, OriginSocket, OriginTransport, - ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}), - EvHandlerState = EvHandler:protocol_changed(#{ - stream_ref => StreamRef, - protocol => Proto:name() - }, EvHandlerState0), - %% When the tunnel protocol is HTTP/1.1 or SOCKS - %% the gun_tunnel_up message was already sent. - _ = case TunnelProtocol of - http -> ok; - socks -> ok; - _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()} - end, - {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, - protocol=Proto, protocol_state=ProtoState}, - EvHandlerState}; + case Proto:init(ReplyTo, OriginSocket, OriginTransport, + ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tcp}) of + {ok, _, ProtoState} -> + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), + %% When the tunnel protocol is HTTP/1.1 or SOCKS + %% the gun_tunnel_up message was already sent. + _ = case TunnelProtocol of + http -> ok; + socks -> ok; + _ -> ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()} + end, + {tunnel, State#tunnel_state{socket=OriginSocket, transport=OriginTransport, + protocol=Proto, protocol_state=ProtoState}, + EvHandlerState}; + Error={error, _} -> + Error + end; %% We can't initialize the protocol until the TLS handshake has completed. #{handshake_event := HandshakeEvent0, protocols := Protocols} -> #{handle_continue_stream_ref := ContinueStreamRef} = OriginSocket, @@ -196,12 +199,15 @@ handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {ok, Negotiated}, reply_to => ReplyTo, stream_ref => StreamRef }, - %% @todo Handle error result from Proto:init/4 - {ok, _, ProtoState} = Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, - ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}), - ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, - {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, - CookieStore, EvHandlerState}; + case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, + ProtoOpts#{stream_ref => StreamRef, tunnel_transport => tls}) of + {ok, _, ProtoState} -> + ReplyTo ! {gun_tunnel_up, self(), StreamRef, Proto:name()}, + {{state, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}}, + CookieStore, EvHandlerState}; + Error={error, _} -> + {Error, CookieStore, EvHandlerState} + end; handle_continue(ContinueStreamRef, {gun_tls_proxy, ProxyPid, {error, Reason}, {handle_continue, _, HandshakeEvent, _}}, #tunnel_state{socket=ProxyPid}, CookieStore, EvHandler, EvHandlerState0) @@ -321,8 +327,10 @@ data(State=#tunnel_state{socket=Socket, transport=Transport, TunnelStreamRef = outer_stream_ref(TunnelStreamRef0), case StreamRef0 of TunnelStreamRef -> - ok = Transport:send(Socket, Data), - {[], EvHandlerState0}; + case Transport:send(Socket, Data) of + ok -> {[], EvHandlerState0}; + Error={error, _} -> {Error, EvHandlerState0} + end; _ -> StreamRef = maybe_dereference(State, StreamRef0), {Commands, EvHandlerState1} = Proto:data(ProtoState0, StreamRef, @@ -449,6 +457,7 @@ ws_send(Frames, State=#tunnel_state{protocol=Proto, protocol_state=ProtoState}, %% Internal. +%% Returns an error on send errors, a state otherwise commands(Command, State, EvHandler, EvHandlerState) when not is_list(Command) -> commands([Command], State, EvHandler, EvHandlerState); commands([], State, _, EvHandlerState) -> @@ -468,8 +477,10 @@ commands([{state, ProtoState}|Tail], State, EvHandler, EvHandlerState) -> commands([{send, _IsFin, Data}|Tail], State=#tunnel_state{socket=Socket, transport=Transport}, EvHandler, EvHandlerState) -> - Transport:send(Socket, Data), - commands(Tail, State, EvHandler, EvHandlerState); + case Transport:send(Socket, Data) of + ok -> commands(Tail, State, EvHandler, EvHandlerState); + Error={error, _} -> {Error, EvHandlerState} + end; commands([Origin={origin, Scheme, Host, Port, Type}|Tail], State=#tunnel_state{stream_ref=StreamRef}, EvHandler, EvHandlerState0) -> @@ -487,15 +498,18 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], EvHandler, EvHandlerState0) -> {Proto, ProtoOpts} = gun_protocols:handler_and_opts(NewProtocol, Opts), %% This should only apply to Websocket for the time being. - %% @todo Handle error result from Proto:init/4 - {ok, connected_ws_only, ProtoState} = Proto:init(ReplyTo, Socket, Transport, ProtoOpts), - #{stream_ref := StreamRef} = ProtoOpts, - EvHandlerState = EvHandler:protocol_changed(#{ - stream_ref => StreamRef, - protocol => Proto:name() - }, EvHandlerState0), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, - EvHandler, EvHandlerState); + case Proto:init(ReplyTo, Socket, Transport, ProtoOpts) of + {ok, connected_ws_only, ProtoState} -> + #{stream_ref := StreamRef} = ProtoOpts, + EvHandlerState = EvHandler:protocol_changed(#{ + stream_ref => StreamRef, + protocol => Proto:name() + }, EvHandlerState0), + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); + Error={error, _} -> + {Error, EvHandlerState0} + end; commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, stream_ref=TunnelStreamRef, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, @@ -535,10 +549,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, - OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, - EvHandler, EvHandlerState); + case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0) of + {tunnel, ProtoState, EvHandlerState} -> + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); + Error={error, _} -> + {Error, EvHandlerState0} + end; commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], State=#tunnel_state{transport=Transport, info=#{origin_host := Host, origin_port := Port}, opts=Opts, protocol=CurrentProto, @@ -583,10 +600,13 @@ commands([{tls_handshake, HandshakeEvent0, Protocols, ReplyTo}|Tail], } }, Proto = gun_tunnel, - {tunnel, ProtoState, EvHandlerState} = Proto:init(ReplyTo, - OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0), - commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, - EvHandler, EvHandlerState); + case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState0) of + {tunnel, ProtoState, EvHandlerState} -> + commands(Tail, State#tunnel_state{protocol=Proto, protocol_state=ProtoState}, + EvHandler, EvHandlerState); + Error={error, _} -> + {Error, EvHandlerState0} + end; commands([{active, true}|Tail], State, EvHandler, EvHandlerState) -> commands(Tail, State, EvHandler, EvHandlerState). diff --git a/src/gun_ws.erl b/src/gun_ws.erl index e02179a..e864351 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -312,20 +312,25 @@ send(Frame, State=#ws_state{stream_ref=StreamRef, frame => Frame }, EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), - Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), - EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), - if - Frame =:= close; element(1, Frame) =:= close -> - {[ - {state, State#ws_state{out=close}}, - %% We can close immediately if we already received a close frame. - case In of - close -> close; - _ -> closing(State) - end - ], EvHandlerState}; - true -> - {[], EvHandlerState} + case Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)) of + ok -> + EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), + if + Frame =:= close; element(1, Frame) =:= close -> + {[ + {state, State#ws_state{out=close}}, + %% We can close immediately if we already + %% received a close frame. + case In of + close -> close; + _ -> closing(State) + end + ], EvHandlerState}; + true -> + {[], EvHandlerState} + end; + Error={error, _} -> + {Error, EvHandlerState0} end. %% Send many frames. -- cgit v1.2.3