aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl282
1 files changed, 176 insertions, 106 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 46093e0..292504b 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -182,11 +182,14 @@ init(ReplyTo, Socket, Transport, Opts0) ->
BaseStreamRef = maps:get(stream_ref, Opts, undefined),
TunnelTransport = maps:get(tunnel_transport, Opts, undefined),
{ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts#{message_tag => BaseStreamRef}),
- State = #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport,
- opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport,
- content_handlers=Handlers, http2_machine=HTTP2Machine},
- Transport:send(Socket, Preface),
- {ok, connected, State}.
+ case Transport:send(Socket, Preface) of
+ ok ->
+ {ok, connected, #http2_state{reply_to=ReplyTo, socket=Socket, transport=Transport,
+ opts=Opts, base_stream_ref=BaseStreamRef, tunnel_transport=TunnelTransport,
+ content_handlers=Handlers, http2_machine=HTTP2Machine}};
+ Error={error, _Reason} ->
+ Error
+ end.
switch_transport(Transport, Socket, State) ->
State#http2_state{socket=Socket, transport=Transport}.
@@ -349,13 +352,18 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket,
_ ->
ok
end,
- Transport:send(Socket, cow_http2:settings_ack());
+ case Transport:send(Socket, cow_http2:settings_ack()) of
+ ok -> {state, State};
+ Error={error, _} -> Error
+ end;
{ping, Opaque} ->
- Transport:send(Socket, cow_http2:ping_ack(Opaque));
+ case Transport:send(Socket, cow_http2:ping_ack(Opaque)) of
+ ok -> {state, State};
+ Error={error, _} -> Error
+ end;
_ ->
- ok
- end,
- {state, State}.
+ {state, State}
+ end.
data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_id(State0, StreamID) of
@@ -373,6 +381,7 @@ data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerStat
{ResCommands, CookieStore, EvHandlerState}
end.
+%% Send errors are returned. Other errors cause the stream to be deleted.
tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState)
when not is_list(Command) ->
tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState);
@@ -601,11 +610,15 @@ headers_frame_connect(State=#http2_state{transport=Transport, opts=Opts, tunnel_
}
}
end,
- {tunnel, ProtoState, EvHandlerState} = Proto:init(
- ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3),
- {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established,
- info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})},
- EvHandlerState}.
+ case Proto:init(ReplyTo, OriginSocket, gun_tcp_proxy, ProtoOpts, EvHandler, EvHandlerState3) of
+ {tunnel, ProtoState, EvHandlerState} ->
+ {{state, store_stream(State, Stream#stream{tunnel=Tunnel#tunnel{state=established,
+ info=TunnelInfo, protocol=Proto, protocol_state=ProtoState}})},
+ EvHandlerState};
+ %% @todo We should not error out the entire connection on tunnel errors.
+ Error={error, _} ->
+ {Error, EvHandlerState3}
+ end.
headers_frame_connect_websocket(State, Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
tunnel=Tunnel=#tunnel{info=#websocket_info{opts=WsOpts}}},
@@ -730,8 +743,12 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
%% We cancel the push_promise immediately when we are shutting down.
_ ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
- Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)),
- {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState}
+ case Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)) of
+ ok ->
+ {{state, State#http2_state{http2_machine=HTTP2Machine}}, EvHandlerState};
+ Error={error, _} ->
+ {Error, EvHandlerState}
+ end
end.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
@@ -788,12 +805,14 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
ok ->
{state, State};
{ok, Increment, HTTP2Machine} ->
- Transport:send(Socket, cow_http2:window_update(Increment)),
- {state, State#http2_state{http2_machine=HTTP2Machine}}
+ case Transport:send(Socket, cow_http2:window_update(Increment)) of
+ ok -> {state, State#http2_state{http2_machine=HTTP2Machine}};
+ Error={error, _} -> Error
+ end
end.
%% Update both the connection and the stream's window.
-update_window(State=#http2_state{socket=Socket, transport=Transport,
+update_window(State0=#http2_state{socket=Socket, transport=Transport,
opts=#{initial_connection_window_size := ConnWindow, initial_stream_window_size := StreamWindow},
http2_machine=HTTP2Machine0}, StreamID) ->
{Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(ConnWindow, HTTP2Machine0) of
@@ -804,11 +823,16 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
ok -> {<<>>, HTTP2Machine2};
{ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3}
end,
+ State = State0#http2_state{http2_machine=HTTP2Machine},
case {Data1, Data2} of
- {<<>>, <<>>} -> ok;
- _ -> Transport:send(Socket, [Data1, Data2])
- end,
- {state, State#http2_state{http2_machine=HTTP2Machine}}.
+ {<<>>, <<>>} ->
+ {state, State};
+ _ ->
+ case Transport:send(Socket, [Data1, Data2]) of
+ ok -> {state, State};
+ Error={error, _} -> Error
+ end
+ end.
%% We may have to cancel streams even if we receive multiple
%% GOAWAY frames as the LastStreamID value may be lower than
@@ -823,10 +847,12 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT
},
case Status of
connected ->
- Transport:send(Socket, cow_http2:goaway(
- cow_http2_machine:get_last_streamid(HTTP2Machine),
- no_error, <<>>)),
- {state, State#http2_state{status=goaway}};
+ case Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ no_error, <<>>)) of
+ ok -> {state, State#http2_state{status=goaway}};
+ Error={error, _} -> Error
+ end;
_ ->
{state, State}
end.
@@ -851,13 +877,17 @@ closing(Reason0, State=#http2_state{socket=Socket, transport=Transport,
owner_down -> no_error;
_ -> internal_error
end,
- Transport:send(Socket, cow_http2:goaway(
- cow_http2_machine:get_last_streamid(HTTP2Machine),
- Reason, <<>>)),
- {[
- {state, State#http2_state{status=closing}},
- closing(State)
- ], EvHandlerState}.
+ case Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ Reason, <<>>)) of
+ ok ->
+ {[
+ {state, State#http2_state{status=closing}},
+ closing(State)
+ ], EvHandlerState};
+ Error={error, _} ->
+ {Error, EvHandlerState}
+ end.
closing(#http2_state{opts=Opts}) ->
Timeout = maps:get(closing_timeout, Opts, 15000),
@@ -879,8 +909,10 @@ close_stream(State, #stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
ok.
keepalive(#http2_state{socket=Socket, transport=Transport}, _, EvHandlerState) ->
- Transport:send(Socket, cow_http2:ping(0)),
- {[], EvHandlerState}.
+ case Transport:send(Socket, cow_http2:ping(0)) of
+ ok -> {[], EvHandlerState};
+ Error={error, _} -> {Error, EvHandlerState}
+ end.
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
@@ -903,13 +935,19 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
- Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
- EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- InitialFlow = initial_flow(InitialFlow0, Opts),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
- authority=Authority, path=Path},
- {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
- CookieStore, EvHandlerState};
+ case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of
+ ok ->
+ EvHandlerState = EvHandler:request_headers(RequestEvent,
+ EvHandlerState1),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+ flow=InitialFlow, authority=Authority, path=Path},
+ {{state, create_stream(State#http2_state{
+ http2_machine=HTTP2Machine}, Stream)}, CookieStore,
+ EvHandlerState};
+ Error={error, _} ->
+ {Error, CookieStore, EvHandlerState1}
+ end;
%% Tunneled request.
headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
Path, Headers, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
@@ -960,23 +998,27 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
end,
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, IsFin0, PseudoHeaders, Headers),
- Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
- EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- InitialFlow = initial_flow(InitialFlow0, Opts),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
- authority=Authority, path=Path},
- State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream),
- case IsFin of
- fin ->
- RequestEndEvent = #{
- stream_ref => RealStreamRef,
- reply_to => ReplyTo
- },
- {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
- nofin ->
- {StateOrError, EvHandlerStateRet} = maybe_send_data(
- State, StreamID, fin, Body, EvHandler, EvHandlerState),
- {StateOrError, CookieStore, EvHandlerStateRet}
+ case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of
+ ok ->
+ EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
+ authority=Authority, path=Path},
+ State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream),
+ case IsFin of
+ fin ->
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ {{state, State}, CookieStore, EvHandler:request_end(RequestEndEvent, EvHandlerState)};
+ nofin ->
+ {StateOrError, EvHandlerStateRet} = maybe_send_data(
+ State, StreamID, fin, Body, EvHandler, EvHandlerState),
+ {StateOrError, CookieStore, EvHandlerStateRet}
+ end;
+ Error={error, _} ->
+ {Error, CookieStore, EvHandlerState1}
end;
%% Tunneled request.
request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
@@ -1132,8 +1174,12 @@ send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) ->
send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
StreamID, IsFin, {data, Data}) ->
- Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
- {state, State};
+ case Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)) of
+ ok ->
+ {state, State};
+ Error={error, _} ->
+ Error
+ end;
%% @todo Uncomment this once sendfile is supported.
%send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
% StreamID, IsFin, {sendfile, Offset, Bytes, Path}) ->
@@ -1145,18 +1191,26 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) ->
{ok, HeaderBlock, HTTP2Machine}
= cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers),
- Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
- {state, State#http2_state{http2_machine=HTTP2Machine}}.
+ case Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)) of
+ ok ->
+ {state, State#http2_state{http2_machine=HTTP2Machine}};
+ Error={error, _} ->
+ Error
+ end.
reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
StreamID, StreamError={stream_error, Reason, _}) ->
- Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
- case take_stream(State0, StreamID) of
- {#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
- ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
- {state, State};
- error ->
- {state, State0}
+ case Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)) of
+ ok ->
+ case take_stream(State0, StreamID) of
+ {#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), StreamError},
+ {state, State};
+ error ->
+ {state, State0}
+ end;
+ Error={error, _} ->
+ Error
end.
connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
@@ -1200,18 +1254,23 @@ connect(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, nofin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
- Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
- EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- RequestEndEvent = #{
- stream_ref => RealStreamRef,
- reply_to => ReplyTo
- },
- EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
- InitialFlow = initial_flow(InitialFlow0, Opts),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
- authority=Authority, path= <<>>, tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
- {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
- EvHandlerState};
+ case Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)) of
+ ok ->
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
+ InitialFlow = initial_flow(InitialFlow0, Opts),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+ flow=InitialFlow, authority=Authority, path= <<>>,
+ tunnel=#tunnel{destination=Destination, info=TunnelInfo}},
+ {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
+ EvHandlerState};
+ Error={error, _} ->
+ {Error, EvHandlerState1}
+ end;
%% Tunneled request.
connect(State, RealStreamRef=[StreamRef|_], ReplyTo, Destination, TunnelInfo, Headers0, InitialFlow,
EvHandler, EvHandlerState0) ->
@@ -1237,15 +1296,19 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
- Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
- EvHandlerState = EvHandler:cancel(#{
- stream_ref => stream_ref(State, StreamRef),
- reply_to => ReplyTo,
- endpoint => local,
- reason => cancel
- }, EvHandlerState0),
- {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID)},
- EvHandlerState};
+ case Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)) of
+ ok ->
+ EvHandlerState = EvHandler:cancel(#{
+ stream_ref => stream_ref(State, StreamRef),
+ reply_to => ReplyTo,
+ endpoint => local,
+ reason => cancel
+ }, EvHandlerState0),
+ {{state, delete_stream(State#http2_state{http2_machine=HTTP2Machine},
+ StreamID)}, EvHandlerState};
+ Error={error, _} ->
+ {Error, EvHandlerState0}
+ end;
error ->
error_stream_not_found(State, StreamRef, ReplyTo),
{[], EvHandlerState0}
@@ -1384,19 +1447,26 @@ ws_upgrade(State=#http2_state{socket=Socket, transport=Transport,
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders#{protocol => <<"websocket">>}, Headers),
- Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
- EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- RequestEndEvent = #{
- stream_ref => RealStreamRef,
- reply_to => ReplyTo
- },
- EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
- InitialFlow = maps:get(flow, WsOpts, infinity),
- Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow,
- authority=Authority, path=Path, tunnel=#tunnel{info=#websocket_info{
- extensions=GunExtensions, opts=WsOpts}}},
- {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream)},
- CookieStore, EvHandlerState};
+ case Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)) of
+ ok ->
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent,
+ EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent,
+ EvHandlerState2),
+ InitialFlow = maps:get(flow, WsOpts, infinity),
+ Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+ flow=InitialFlow, authority=Authority, path=Path,
+ tunnel=#tunnel{info=#websocket_info{
+ extensions=GunExtensions, opts=WsOpts}}},
+ {{state, create_stream(State#http2_state{http2_machine=HTTP2Machine},
+ Stream)}, CookieStore, EvHandlerState};
+ Error={error, _} ->
+ {Error, EvHandlerState1}
+ end;
ws_upgrade(State, RealStreamRef=[StreamRef|_], ReplyTo,
Host, Port, Path, Headers, WsOpts, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of