From 21ecb1a4f3e037411af0c72eb93948ecafbd2984 Mon Sep 17 00:00:00 2001
From: Simon Cornish <simon@cali.local>
Date: Mon, 11 May 2015 15:55:02 -0700
Subject: Add tests for ssh rekeying

---
 lib/ssh/test/Makefile            |   3 +-
 lib/ssh/test/ssh_basic_SUITE.erl | 177 ++++++++++++++++-
 lib/ssh/test/ssh_relay.erl       | 407 +++++++++++++++++++++++++++++++++++++++
 3 files changed, 581 insertions(+), 6 deletions(-)
 create mode 100644 lib/ssh/test/ssh_relay.erl

(limited to 'lib')

diff --git a/lib/ssh/test/Makefile b/lib/ssh/test/Makefile
index 740dbd0235..39b2f57d26 100644
--- a/lib/ssh/test/Makefile
+++ b/lib/ssh/test/Makefile
@@ -40,7 +40,8 @@ MODULES= \
 	ssh_connection_SUITE \
 	ssh_echo_server \
 	ssh_peername_sockname_server \
-	ssh_test_cli 
+	ssh_test_cli \
+	ssh_relay
 
 HRL_FILES_NEEDED_IN_TEST= \
 	$(ERL_TOP)/lib/ssh/src/ssh.hrl \
diff --git a/lib/ssh/test/ssh_basic_SUITE.erl b/lib/ssh/test/ssh_basic_SUITE.erl
index 242c9a3bd9..aaf0fa9905 100644
--- a/lib/ssh/test/ssh_basic_SUITE.erl
+++ b/lib/ssh/test/ssh_basic_SUITE.erl
@@ -29,6 +29,7 @@
 
 -define(NEWLINE, <<"\r\n">>).
 
+-define(REKEY_DATA_TMO, 65000).
 %%--------------------------------------------------------------------
 %% Common Test interface functions -----------------------------------
 %%--------------------------------------------------------------------
@@ -44,6 +45,7 @@ all() ->
      {group, dsa_pass_key},
      {group, rsa_pass_key},
      {group, internal_error},
+     {group, renegotiate},
      daemon_already_started,
      server_password_option,
      server_userpassword_option,
@@ -69,6 +71,7 @@ groups() ->
      {dsa_pass_key, [], [pass_phrase]},
      {rsa_pass_key, [], [pass_phrase]},
      {internal_error, [], [internal_error]},
+     {renegotiate, [], [rekey, rekey_limit, renegotiate1, renegotiate2]},
      {hardening_tests, [], [ssh_connect_nonegtimeout_connected_parallel,
 			    ssh_connect_nonegtimeout_connected_sequential,
 			    ssh_connect_negtimeout_parallel,
@@ -84,8 +87,7 @@ groups() ->
 basic_tests() ->
     [send, close, peername_sockname,
      exec, exec_compressed, shell, cli, known_hosts, 
-     idle_time, rekey, openssh_zlib_basic_test,
-     misc_ssh_options, inet_option].
+     idle_time, openssh_zlib_basic_test, misc_ssh_options, inet_option].
 
 
 %%--------------------------------------------------------------------
@@ -333,25 +335,175 @@ idle_time(Config) ->
 rekey() ->
     [{doc, "Idle timeout test"}].
 rekey(Config) ->
-    SystemDir = filename:join(?config(priv_dir, Config), system),
+    SystemDir = ?config(data_dir, Config),
     UserDir = ?config(priv_dir, Config),
 
     {Pid, Host, Port} = ssh_test_lib:daemon([{system_dir, SystemDir},
-					     {user_dir, UserDir},
+    					     {user_dir, UserDir},
 					     {failfun, fun ssh_test_lib:failfun/2},
+    					     {user_passwords,
+    					      [{"simon", "says"}]},
 					     {rekey_limit, 0}]),
+
     ConnectionRef =
 	ssh_test_lib:connect(Host, Port, [{silently_accept_hosts, true},
 					  {user_dir, UserDir},
+					  {user, "simon"},
+					  {password, "says"},
 					  {user_interaction, false},
 					  {rekey_limit, 0}]),
     receive
-    after 200000 ->
+    after ?REKEY_DATA_TMO ->
 	    %%By this time rekeying would have been done
 	    ssh:close(ConnectionRef),
 	    ssh:stop_daemon(Pid)
     end.
 %%--------------------------------------------------------------------
+rekey_limit() ->
+    [{doc, "Test rekeying by data volume"}].
+rekey_limit(Config) ->
+    SystemDir = ?config(data_dir, Config),
+    UserDir = ?config(priv_dir, Config),
+    DataFile = filename:join(UserDir, "rekey.data"),
+
+    {Pid, Host, Port} = ssh_test_lib:daemon([{system_dir, SystemDir},
+    					     {user_dir, UserDir},
+    					     {user_passwords,
+    					      [{"simon", "says"}]}]),
+    {ok, SftpPid, ConnectionRef} =
+    	ssh_sftp:start_channel(Host, Port, [{system_dir, SystemDir},
+    					    {user_dir, UserDir},
+    					    {user, "simon"},
+    					    {password, "says"},
+    					    {rekey_limit, 2500},
+    					    {user_interaction, false},
+    					    {silently_accept_hosts, true}]),
+
+    Kex1 = get_kex_init(ConnectionRef),
+
+    ct:sleep(?REKEY_DATA_TMO),
+    Kex1 = get_kex_init(ConnectionRef),
+
+    Data = lists:duplicate(9000,1),
+    ok = ssh_sftp:write_file(SftpPid, DataFile, Data),
+
+    ct:sleep(?REKEY_DATA_TMO),
+    Kex2 = get_kex_init(ConnectionRef),
+
+    false = (Kex2 == Kex1),
+
+    ct:sleep(?REKEY_DATA_TMO),
+    Kex2 = get_kex_init(ConnectionRef),
+
+    ok = ssh_sftp:write_file(SftpPid, DataFile, "hi\n"),
+
+    ct:sleep(?REKEY_DATA_TMO),
+    Kex2 = get_kex_init(ConnectionRef),
+
+    false = (Kex2 == Kex1),
+
+    ct:sleep(?REKEY_DATA_TMO),
+    Kex2 = get_kex_init(ConnectionRef),
+
+
+    ssh_sftp:stop_channel(SftpPid),
+    ssh:close(ConnectionRef),
+    ssh:stop_daemon(Pid).
+
+%%--------------------------------------------------------------------
+renegotiate1() ->
+    [{doc, "Test rekeying with simulataneous send request"}].
+renegotiate1(Config) ->
+    SystemDir = ?config(data_dir, Config),
+    UserDir = ?config(priv_dir, Config),
+    DataFile = filename:join(UserDir, "renegotiate1.data"),
+
+    {Pid, Host, DPort} = ssh_test_lib:daemon([{system_dir, SystemDir},
+					       {user_dir, UserDir},
+					       {user_passwords,
+						[{"simon", "says"}]}]),
+    RPort = ssh_test_lib:inet_port(),
+
+    {ok,RelayPid} = ssh_relay:start_link({0,0,0,0}, RPort, Host, DPort),
+
+    {ok, SftpPid, ConnectionRef} =
+	ssh_sftp:start_channel(Host, RPort, [{system_dir, SystemDir},
+					     {user_dir, UserDir},
+					     {user, "simon"},
+					     {password, "says"},
+					     {user_interaction, false},
+					     {silently_accept_hosts, true}]),
+
+    Kex1 = get_kex_init(ConnectionRef),
+
+    {ok, Handle} = ssh_sftp:open(SftpPid, DataFile, [write]),
+
+    ok = ssh_sftp:write(SftpPid, Handle, "hi\n"),
+
+    ssh_relay:hold(RelayPid, rx, 20, 1000),
+    ssh_connection_handler:renegotiate(ConnectionRef),
+    spawn(fun() -> ok=ssh_sftp:write(SftpPid, Handle, "another hi\n") end),
+
+    ct:sleep(2000),
+
+    Kex2 = get_kex_init(ConnectionRef),
+
+    false = (Kex2 == Kex1),
+    
+    ssh_relay:stop(RelayPid),
+    ssh_sftp:stop_channel(SftpPid),
+    ssh:close(ConnectionRef),
+    ssh:stop_daemon(Pid).
+
+%%--------------------------------------------------------------------
+renegotiate2() ->
+    [{doc, "Test rekeying with inflight messages from peer"}].
+renegotiate2(Config) ->
+    SystemDir = ?config(data_dir, Config),
+    UserDir = ?config(priv_dir, Config),
+    DataFile = filename:join(UserDir, "renegotiate1.data"),
+
+    {Pid, Host, DPort} = ssh_test_lib:daemon([{system_dir, SystemDir},
+					       {user_dir, UserDir},
+					       {user_passwords,
+						[{"simon", "says"}]}]),
+    RPort = ssh_test_lib:inet_port(),
+
+    {ok,RelayPid} = ssh_relay:start_link({0,0,0,0}, RPort, Host, DPort),
+
+    {ok, SftpPid, ConnectionRef} =
+	ssh_sftp:start_channel(Host, RPort, [{system_dir, SystemDir},
+					     {user_dir, UserDir},
+					     {user, "simon"},
+					     {password, "says"},
+					     {user_interaction, false},
+					     {silently_accept_hosts, true}]),
+
+    Kex1 = get_kex_init(ConnectionRef),
+
+    {ok, Handle} = ssh_sftp:open(SftpPid, DataFile, [write]),
+
+    ok = ssh_sftp:write(SftpPid, Handle, "hi\n"),
+
+    ssh_relay:hold(RelayPid, rx, 20, infinity),
+    spawn(fun() -> ok=ssh_sftp:write(SftpPid, Handle, "another hi\n") end),
+    %% need a small pause here to ensure ssh_sftp:write is executed
+    ct:sleep(10),
+    ssh_connection_handler:renegotiate(ConnectionRef),
+    ssh_relay:release(RelayPid, rx),
+
+    ct:sleep(2000),
+
+    Kex2 = get_kex_init(ConnectionRef),
+
+    false = (Kex2 == Kex1),
+
+    ssh_relay:stop(RelayPid),
+    ssh_sftp:stop_channel(SftpPid),
+    ssh:close(ConnectionRef),
+    ssh:stop_daemon(Pid).
+
+%%--------------------------------------------------------------------
 shell() ->
     [{doc, "Test that ssh:shell/2 works"}].
 shell(Config) when is_list(Config) ->
@@ -1300,3 +1452,18 @@ fake_daemon(_Config) ->
 	{sockname,Server,ServerHost,ServerPort} -> {Server, ServerHost, ServerPort}
     end.
 
+%% get_kex_init - helper function to get key_exchange_init_msg
+get_kex_init(Conn) ->
+    %% First, validate the key exchange is complete (StateName == connected)
+    {connected,S} = sys:get_state(Conn),
+    %% Next, walk through the elements of the #state record looking
+    %% for the #ssh_msg_kexinit record. This method is robust against
+    %% changes to either record. The KEXINIT message contains a cookie
+    %% unique to each invocation of the key exchange procedure (RFC4253)
+    SL = tuple_to_list(S),
+    case lists:keyfind(ssh_msg_kexinit, 1, SL) of
+	false ->
+	    throw(not_found);
+	KexInit ->
+	    KexInit
+    end.
diff --git a/lib/ssh/test/ssh_relay.erl b/lib/ssh/test/ssh_relay.erl
new file mode 100644
index 0000000000..a4f2bad2e2
--- /dev/null
+++ b/lib/ssh/test/ssh_relay.erl
@@ -0,0 +1,407 @@
+%%%-------------------------------------------------------------------
+%%% @author Simon Cornish <simon@cali.coffee>
+%%% @copyright (C) 2015, Simon Cornish
+%%% @doc
+%%% Provide manipulatable TCP-level relaying for testing SSH
+%%% @end
+%%% Created :  7 May 2015 by Simon Cornish <simon@cali.coffee>
+%%%-------------------------------------------------------------------
+-module(ssh_relay).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/4]).
+-export([stop/1]).
+-export([hold/4, release/2, release_next/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+	 terminate/2, code_change/3]).
+
+-record(hold, {
+	  port,
+	  n,
+	  tmo,
+	  tref,
+	  q = []
+	 }).
+
+-record(state, {
+	  local_addr,
+	  local_port,
+	  peer_addr,
+	  peer_port,
+	  lpid,
+	  local,
+	  peer,
+	  tx_hold,
+	  rx_hold
+	 }).
+
+-define(ACCEPT_TMO, 200).
+%%%===================================================================
+%%% API
+%%%===================================================================
+%%--------------------------------------------------------------------
+%% @doc
+%% Hold N (or 'all') messages in given direction.
+%% Messages will be released after the N+1th message or
+%% Tmo ms or 'infinity'
+%%
+%% Dir is 'tx' for direction local -> peer
+%%    and 'rx' for direction peer  -> local
+%%
+%% An Error, ealready, is returned if there is already a hold
+%% in the given direction
+%%
+%% @spec hold(Srv, Dir, N, Tmo) -> ok | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+hold(Srv, Dir, N, Tmo) ->
+    gen_server:call(Srv, {hold, Dir, N, Tmo}).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Release all held messages in given direction.
+%%
+%% An Error, enoent, is returned if there is no hold
+%% in the given direction
+%%
+%% @spec release(Srv, Dir) -> ok | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+release(Srv, Dir) ->
+    gen_server:call(Srv, {release, Dir}).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Release all held messages in given direction after the 
+%% next message in the trigger direction
+%%
+%% An Error, enoent, is returned if there is no hold
+%% in the given direction
+%%
+%% @spec release_next(Srv, Dir, TriggerDir) -> ok | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+release_next(Srv, Dir, TriggerDir) ->
+    gen_server:call(Srv, {release_next, Dir, TriggerDir}).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link(ListenAddr, ListenPort, PeerAddr, PeerPort) ->
+    gen_server:start_link(?MODULE, [ListenAddr, ListenPort, PeerAddr, PeerPort], []).
+
+stop(Srv) ->
+    unlink(Srv),
+    Srv ! stop.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%%                     {ok, State, Timeout} |
+%%                     ignore |
+%%                     {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([ListenAddr, ListenPort, PeerAddr, PeerPort | Options]) ->
+    IfAddr = case ListenAddr of
+		 {0,0,0,0} ->
+		     [];
+		 _ ->
+		     [{ifaddr, ListenAddr}]
+	     end,
+    case gen_tcp:listen(ListenPort, [{reuseaddr, true}, {backlog, 1}, {active, false}, binary | IfAddr]) of
+	{ok, LSock} ->
+	    Parent = self(),
+	    {LPid, _LMod} = spawn_monitor(fun() -> listen(Parent, LSock) end),
+	    S = #state{local_addr = ListenAddr,
+		       local_port = ListenPort,
+		       lpid = LPid,
+		       peer_addr = PeerAddr,
+		       peer_port = PeerPort
+		      },
+	    {ok, S};
+	Error ->
+	    {stop, Error}
+    end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%%                                   {reply, Reply, State} |
+%%                                   {reply, Reply, State, Timeout} |
+%%                                   {noreply, State} |
+%%                                   {noreply, State, Timeout} |
+%%                                   {stop, Reason, Reply, State} |
+%%                                   {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_call({hold, Dir, N, Tmo}, _From, State) ->
+    case Dir of
+	tx ->
+	    do_hold(#state.tx_hold, State#state.peer, N, Tmo, State);
+	rx ->
+	    do_hold(#state.rx_hold, State#state.local, N, Tmo, State);
+	_ ->
+	    {reply, {error, einval}, State}
+    end;
+handle_call({release, Dir}, _From, State) ->
+    case Dir of
+	tx ->
+	    do_release(#state.tx_hold, State);
+	rx ->
+	    do_release(#state.rx_hold, State);
+	_ ->
+	    {reply, {error, einval}, State}
+    end;
+handle_call({release_next, _Dir, _TriggerDir}, _From, State) ->
+    {reply, {error, nyi}, State};
+
+handle_call(Request, _From, State) ->
+    Reply = {unhandled, Request},
+    {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%%                                  {noreply, State, Timeout} |
+%%                                  {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%%                                   {noreply, State, Timeout} |
+%%                                   {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info({tcp, Local, Data}, S) when S#state.local == Local ->
+    S1 = do_local(Data, S),
+    {noreply, S1};
+
+handle_info({tcp_error, Local, Error}, S) when S#state.local == Local ->
+    S1 = do_local({error, Error}, S),
+    {noreply, S1};
+
+handle_info({tcp_closed, Local}, S) when S#state.local == Local ->
+    S1 = do_local(closed, S),
+    {noreply, S1};
+
+handle_info({tcp, Peer, Data}, S) when S#state.peer == Peer ->
+    S1 = do_peer(Data, S),
+    {noreply, S1};
+
+handle_info({tcp_error, Peer, Error}, S) when S#state.peer == Peer ->
+    S1 = do_peer({error, Error}, S),
+    {noreply, S1};
+
+handle_info({tcp_closed, Peer}, S) when S#state.peer == Peer ->
+    S1 = do_peer(closed, S),
+    {noreply, S1};
+
+handle_info({accept, Local}, S) ->
+    S1 = do_accept(Local, S),
+    {noreply, S1};
+
+handle_info({activate, Local}, State) ->
+    inet:setopts(Local, [{active, true}]),
+    {noreply, State};
+
+handle_info({release, Pos}, S) ->
+    {reply, _, S1} = do_release(Pos,S),
+    {noreply, S1};
+
+handle_info(stop, State) ->
+    {stop, normal, State};
+
+handle_info({'DOWN', _Ref, _process, LPid, Reason}, S) when S#state.lpid == LPid ->
+    io:format("Acceptor has finished: ~p~n", [Reason]),
+    {noreply, S};
+
+handle_info(_Info, State) ->
+    io:format("Unhandled info: ~p~n", [_Info]),
+    {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+    ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+do_hold(Pos, _Port, _N, _Tmo, S) when element(Pos, S) /= undefined ->
+    {reply, {error, ealready}, S};
+do_hold(Pos, Port, N, Tmo, S) ->
+    TRef = if is_integer(Tmo) andalso Tmo > 0 ->
+		   erlang:send_after(Tmo, self(), {release, Pos});
+	      true ->
+		   undefined
+	   end,
+    Hold = #hold{port = Port, n = N, tmo = Tmo, tref = TRef},
+    {reply, ok, setelement(Pos, S, Hold)}.
+
+do_release(HPos, S) when element(HPos, S) == undefined ->
+    {reply, {error, enoent}, S};
+do_release(HPos, S) ->
+    #hold{port = Port, tref = TRef, q = Q} = element(HPos, S),
+    lists:foreach(fun(M) -> gen_tcp:send(Port, M), erlang:yield() end, Q),
+    catch erlang:cancel_timer(TRef),
+    receive
+	{release, HPos} -> ok
+    after 0 ->
+	    ok
+    end,
+    {reply, ok, setelement(HPos, S, undefined)}.
+
+listen(Parent, LSock) ->
+    monitor(process, Parent),
+    do_listen(Parent, LSock).
+
+do_listen(Parent, LSock) ->
+    %% So annoying there is no select-like sematic for this
+    case gen_tcp:accept(LSock, ?ACCEPT_TMO) of
+	{ok, Sock} ->
+	    Parent ! {accept, Sock},
+	    gen_tcp:controlling_process(Sock, Parent),
+	    Parent ! {activate, Sock},
+	    do_flush(Parent, Sock),
+	    gen_tcp:close(LSock);
+	{error, timeout} ->
+	    receive 
+		DOWN when element(1, DOWN) == 'DOWN' ->
+		    ok;
+		stop ->
+		    ok
+	    after 1 ->
+		    do_listen(Parent, LSock)
+	    end;
+	Error ->
+	    gen_tcp:close(LSock),
+	    exit({accept,Error})
+    end.
+
+do_flush(Parent, Sock) ->
+    receive 
+	{Tcp, Sock, _} = Msg when Tcp == tcp; Tcp == tcp_error ->
+	    Parent ! Msg,
+	    do_flush(Parent, Sock);
+	{tcp_closed, Sock} = Msg ->
+	    Parent ! Msg,
+	    do_flush(Parent, Sock)
+    after 1 ->
+	    ok
+    end.
+
+do_accept(Local, S) ->
+    case gen_tcp:connect(S#state.peer_addr, S#state.peer_port, [{active, true}, binary]) of
+	{ok, Peer} ->
+	    S#state{local = Local, peer = Peer};
+	Error ->
+	    exit({connect, Error})
+    end.
+
+do_local(Data, S) when is_binary(Data) ->
+    TxH = S#state.tx_hold,
+    if TxH == undefined ->
+	    gen_tcp:send(S#state.peer, Data),
+	    S;
+       TxH#hold.n == 0 ->
+	    lists:foreach(fun(M) -> gen_tcp:send(S#state.peer, M) end, TxH#hold.q),
+	    gen_tcp:send(S#state.peer, Data),
+	    catch erlang:cancel_timer(TxH#hold.tref),
+	    TxP = #state.tx_hold,
+	    receive
+		{release, TxP} ->
+		    ok
+	    after 0 ->
+		    ok
+	    end,
+	    S#state{tx_hold = undefined};
+       true ->
+	    Q = TxH#hold.q ++ [Data],
+	    N = if is_integer(TxH#hold.n) ->
+			TxH#hold.n -1;
+		   true ->
+			TxH#hold.n
+		end,
+	    S#state{tx_hold = TxH#hold{q = Q, n = N}}
+    end;
+do_local(Error, _S) ->
+    exit({local, Error}).
+
+do_peer(Data, S) when is_binary(Data) ->
+    RxH = S#state.rx_hold,
+    if RxH == undefined ->
+	    gen_tcp:send(S#state.local, Data),
+	    S;
+       RxH#hold.n == 0 ->
+	    lists:foreach(fun(M) -> gen_tcp:send(S#state.local, M) end, RxH#hold.q),
+	    gen_tcp:send(S#state.local, Data),
+	    catch erlang:cancel_timer(RxH#hold.tref),
+	    RxP = #state.rx_hold,
+	    receive
+		{release, RxP} ->
+		    ok
+	    after 0 ->
+		    ok
+	    end,
+	    S#state{rx_hold = undefined};
+       true ->
+	    Q = RxH#hold.q ++ [Data],
+	    N = if is_integer(RxH#hold.n) ->
+			RxH#hold.n -1;
+		   true ->
+			RxH#hold.n
+		end,
+	    S#state{rx_hold = RxH#hold{q = Q, n = N}}
+    end;
+do_peer(Error, _S) ->
+    exit({peer, Error}).
+
-- 
cgit v1.2.3