From 1cc3b32b8ef6c187b8be3601319e21c1ba04fa27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Mon, 24 Oct 2022 14:09:59 +0200 Subject: Handle send errors --- src/gun_http2.erl | 282 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 176 insertions(+), 106 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 46093e0..292504b 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -182,11 +182,14 @@ init(ReplyTo, Socket, Transport, Opts0) -> BaseStreamRef = maps:get(stream_ref, Opts, undefined), TunnelTransport = maps:get(tunnel_transport, Opts, undefined), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts#{message_tag => BaseStreamRef}), - State = #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport, - opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, - content_handlers=Handlers, http2_machine=HTTP2Machine}, - Transport:send(Socket, Preface), - {ok, connected, State}. + case Transport:send(Socket, Preface) of + ok -> + {ok, connected, #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport, + opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport, + content_handlers=Handlers, http2_machine=HTTP2Machine}}; + Error={error, _Reason} -> + Error + end. switch_transport(Transport, Socket, State) -> State#http2_state{socket=Socket, transport=Transport}. @@ -349,13 +352,18 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket, _ -> ok end, - Transport:send(Socket, cow_http2:settings_ack()); + case Transport:send(Socket, cow_http2:settings_ack()) of + ok -> {state, State}; + Error={error, _} -> Error + end; {ping, Opaque} -> - Transport:send(Socket, cow_http2:ping_ack(Opaque)); + case Transport:send(Socket, cow_http2:ping_ack(Opaque)) of + ok -> {state, State}; + Error={error, _} -> Error + end; _ -> - ok - end, - {state, State}. + {state, State} + end. data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_id(State0, StreamID) of @@ -373,6 +381,7 @@ data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerStat {ResCommands, CookieStore, EvHandlerState} end. +%% Send errors are returned. Other errors cause the stream to be deleted. tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState) when not is_list(Command) -> tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState); @@ -601,11 +610,15 @@ headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_ } } end, - {tunnel, ProtoState, EvHandlerState} = Proto:init( - ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3), - {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, - info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})}, - EvHandlerState}. + case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3) of + {tunnel, ProtoState, EvHandlerState} -> + {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established, + info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})}, + EvHandlerState}; + %% @todo We should not error out the entire connection on tunnel errors. + Error={error, _} -> + {Error, EvHandlerState3} + end. headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo, tunnel=Tunnel=#tunnel{info=#websocket_info{opts=WsOpts}}}, @@ -730,8 +743,12 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, %% We cancel the push_promise immediately when we are shutting down. _ -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), - Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), - {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState} + case Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)) of + ok -> + {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState} + end end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> @@ -788,12 +805,14 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, ok -> {state, State}; {ok, Increment, HTTP2Machine} -> - Transport:send(Socket, cow_http2:window_update(Increment)), - {state, State#http2_state{http2_machine=HTTP2Machine}} + case Transport:send(Socket, cow_http2:window_update(Increment)) of + ok -> {state, State#http2_state{http2_machine=HTTP2Machine}}; + Error={error, _} -> Error + end end. %% Update both the connection and the stream's window. -update_window(State=#http2_state{socket=Socket, transport=Transport, +update_window(State0=#http2_state{socket=Socket, transport=Transport, opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow}, http2_machine=HTTP2Machine0}, StreamID) -> {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of @@ -804,11 +823,16 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, ok -> {<<>>, HTTP2Machine2}; {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} end, + State = State0#http2_state{http2_machine=HTTP2Machine}, case {Data1, Data2} of - {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) - end, - {state, State#http2_state{http2_machine=HTTP2Machine}}. + {<<>>, <<>>} -> + {state, State}; + _ -> + case Transport:send(Socket, [Data1, Data2]) of + ok -> {state, State}; + Error={error, _} -> Error + end + end. %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than @@ -823,10 +847,12 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT }, case Status of connected -> - Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - no_error, <<>>)), - {state, State#http2_state{status=goaway}}; + case Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)) of + ok -> {state, State#http2_state{status=goaway}}; + Error={error, _} -> Error + end; _ -> {state, State} end. @@ -851,13 +877,17 @@ closing(Reason0, State=#http2_state{socket=Socket, transport=Transport, owner_down -> no_error; _ -> internal_error end, - Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - Reason, <<>>)), - {[ - {state, State#http2_state{status=closing}}, - closing(State) - ], EvHandlerState}. + case Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + Reason, <<>>)) of + ok -> + {[ + {state, State#http2_state{status=closing}}, + closing(State) + ], EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState} + end. closing(#http2_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), @@ -879,8 +909,10 @@ close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> ok. keepalive(#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) -> - Transport:send(Socket, cow_http2:ping(0)), - {[], EvHandlerState}. + case Transport:send(Socket, cow_http2:ping(0)) of + ok -> {[], EvHandlerState}; + Error={error, _} -> {Error, EvHandlerState} + end. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, @@ -903,13 +935,19 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - InitialFlow = initial_flow(InitialFlow0, Opts), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path}, - {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - CookieStore, EvHandlerState}; + case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of + ok -> + EvHandlerState = EvHandler:request_headers(RequestEvent, + EvHandlerState1), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + flow=InitialFlow, authority=Authority, path=Path}, + {{state, create_stream(State#http2_state{ + http2_machine=HTTP2Machine}, Stream)}, CookieStore, + EvHandlerState}; + Error={error, _} -> + {Error, CookieStore, EvHandlerState1} + end; %% Tunneled request. headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) -> @@ -960,23 +998,27 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, end, {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, IsFin0, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), - InitialFlow = initial_flow(InitialFlow0, Opts), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path}, - State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream), - case IsFin of - fin -> - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; - nofin -> - {StateOrError, EvHandlerStateRet} = maybe_send_data( - State, StreamID, fin, Body, EvHandler, EvHandlerState), - {StateOrError, CookieStore, EvHandlerStateRet} + case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of + ok -> + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, + authority=Authority, path=Path}, + State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream), + case IsFin of + fin -> + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)}; + nofin -> + {StateOrError, EvHandlerStateRet} = maybe_send_data( + State, StreamID, fin, Body, EvHandler, EvHandlerState), + {StateOrError, CookieStore, EvHandlerStateRet} + end; + Error={error, _} -> + {Error, CookieStore, EvHandlerState1} end; %% Tunneled request. request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port, @@ -1132,8 +1174,12 @@ send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), - {state, State}; + case Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)) of + ok -> + {state, State}; + Error={error, _} -> + Error + end; %% @todo Uncomment this once sendfile is supported. %send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, % StreamID, IsFin, {sendfile, Offset, Bytes, Path}) -> @@ -1145,18 +1191,26 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) -> {ok, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers), - Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), - {state, State#http2_state{http2_machine=HTTP2Machine}}. + case Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)) of + ok -> + {state, State#http2_state{http2_machine=HTTP2Machine}}; + Error={error, _} -> + Error + end. reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, StreamID, StreamError={stream_error, Reason, _}) -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), - case take_stream(State0, StreamID) of - {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> - ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, - {state, State}; - error -> - {state, State0} + case Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)) of + ok -> + case take_stream(State0, StreamID) of + {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError}, + {state, State}; + error -> + {state, State0} + end; + Error={error, _} -> + Error end. connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, @@ -1200,18 +1254,23 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), - EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), - InitialFlow = initial_flow(InitialFlow0, Opts), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, - {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - EvHandlerState}; + case Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)) of + ok -> + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), + InitialFlow = initial_flow(InitialFlow0, Opts), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + flow=InitialFlow, authority=Authority, path= <<>>, + tunnel=#tunnel{destination=Destination, info=TunnelInfo}}, + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, + EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState1} + end; %% Tunneled request. connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow, EvHandler, EvHandlerState0) -> @@ -1237,15 +1296,19 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), - Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), - EvHandlerState = EvHandler:cancel(#{ - stream_ref => stream_ref(State, StreamRef), - reply_to => ReplyTo, - endpoint => local, - reason => cancel - }, EvHandlerState0), - {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID)}, - EvHandlerState}; + case Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)) of + ok -> + EvHandlerState = EvHandler:cancel(#{ + stream_ref => stream_ref(State, StreamRef), + reply_to => ReplyTo, + endpoint => local, + reason => cancel + }, EvHandlerState0), + {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, + StreamID)}, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState0} + end; error -> error_stream_not_found(State, StreamRef, ReplyTo), {[], EvHandlerState0} @@ -1384,19 +1447,26 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders#{protocol => <<"websocket">>}, Headers), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), - RequestEndEvent = #{ - stream_ref => RealStreamRef, - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), - InitialFlow = maps:get(flow, WsOpts, infinity), - Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow, - authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{ - extensions=GunExtensions, opts=WsOpts}}}, - {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)}, - CookieStore, EvHandlerState}; + case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of + ok -> + EvHandlerState2 = EvHandler:request_headers(RequestEvent, + EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, + EvHandlerState2), + InitialFlow = maps:get(flow, WsOpts, infinity), + Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, + flow=InitialFlow, authority=Authority, path=Path, + tunnel=#tunnel{info=#websocket_info{ + extensions=GunExtensions, opts=WsOpts}}}, + {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream)}, CookieStore, EvHandlerState}; + Error={error, _} -> + {Error, EvHandlerState1} + end; ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo, Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of -- cgit v1.2.3