From 299c93f661f6edc8f726ece6d8c29a357e19fafe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Sat, 7 Sep 2013 14:01:19 +0200 Subject: Implement recv timeout for SPDY --- src/cowboy_spdy.erl | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/cowboy_spdy.erl b/src/cowboy_spdy.erl index dfac29c..bc843ec 100644 --- a/src/cowboy_spdy.erl +++ b/src/cowboy_spdy.erl @@ -51,7 +51,7 @@ input = nofin :: fin | nofin, in_buffer = <<>> :: binary(), is_recv = false :: {true, {non_neg_integer(), pid()}, - pid(), non_neg_integer()} | false, + pid(), non_neg_integer(), reference()} | false, output = nofin :: fin | nofin }). @@ -127,8 +127,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, terminate(State); {Error, Socket, _Reason} -> terminate(State); - %% @todo Timeout (send a message to self). - {recv, FromSocket = {Pid, StreamID}, FromPid, Length, _Timeout} + {recv, FromSocket = {Pid, StreamID}, FromPid, Length, Timeout} when Pid =:= self() -> Child = #child{in_buffer=InBuffer, is_recv=false} = get_child(StreamID, State), @@ -141,9 +140,18 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, FromPid ! {recv, FromSocket, {ok, Data}}, loop(replace_child(Child#child{in_buffer=Rest}, State)); true -> + TRef = erlang:send_after(Timeout, self(), + {recv_timeout, FromSocket}), loop(replace_child(Child#child{ - is_recv={true, FromSocket, FromPid, Length}}, State)) + is_recv={true, FromSocket, FromPid, Length, TRef}}, + State)) end; + {recv_timeout, {Pid, StreamID}} + when Pid =:= self() -> + Child = #child{is_recv={true, FromSocket, FromPid, _, _}} + = get_child(StreamID, State), + FromPid ! {recv, FromSocket, {error, timeout}}, + loop(replace_child(Child#child{is_recv=false}, State)); {reply, {Pid, StreamID}, Status, Headers} when Pid =:= self() -> Child = #child{output=nofin} = get_child(StreamID, State), @@ -257,12 +265,15 @@ handle_frame(State, {data, StreamID, IsFin, Data}) -> Data2 = << Buffer/binary, Data/binary >>, IsFin2 = if IsFin -> fin; true -> nofin end, Child2 = case IsRecv of - {true, FromSocket, FromPid, 0} -> + {true, FromSocket, FromPid, 0, TRef} -> FromPid ! {recv, FromSocket, {ok, Data2}}, + cancel_recv_timeout(StreamID, TRef), Child#child{input=IsFin2, in_buffer= <<>>, is_recv=false}; - {true, FromSocket, FromPid, Length} when byte_size(Data2) >= Length -> + {true, FromSocket, FromPid, Length, TRef} + when byte_size(Data2) >= Length -> << Data3:Length/binary, Rest/binary >> = Data2, FromPid ! {recv, FromSocket, {ok, Data3}}, + cancel_recv_timeout(StreamID, TRef), Child#child{input=IsFin2, in_buffer=Rest, is_recv=false}; _ -> Child#child{input=IsFin2, in_buffer=Data2} @@ -277,6 +288,16 @@ handle_frame(State, Frame) -> error_logger:error_msg("Ignored frame ~p", [Frame]), loop(State). +cancel_recv_timeout(StreamID, TRef) -> + _ = erlang:cancel_timer(TRef), + receive + {recv_timeout, {Pid, StreamID}} + when Pid =:= self() -> + ok + after 0 -> + ok + end. + %% @todo We must wait for the children to finish here, %% but only up to N milliseconds. Then we shutdown. terminate(_State) -> -- cgit v1.2.3