aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/ssh/src/ssh_connection_handler.erl249
-rw-r--r--lib/ssh/test/Makefile3
-rw-r--r--lib/ssh/test/ssh_basic_SUITE.erl177
-rw-r--r--lib/ssh/test/ssh_relay.erl407
4 files changed, 734 insertions, 102 deletions
diff --git a/lib/ssh/src/ssh_connection_handler.erl b/lib/ssh/src/ssh_connection_handler.erl
index 2c7f132916..65208ae158 100644
--- a/lib/ssh/src/ssh_connection_handler.erl
+++ b/lib/ssh/src/ssh_connection_handler.erl
@@ -71,6 +71,7 @@
key_exchange_init_msg, % #ssh_msg_kexinit{}
renegotiate = false, % boolean()
last_size_rekey = 0,
+ event_queue = [],
connection_queue,
address,
port,
@@ -83,6 +84,11 @@
{next_state, state_name(), term(), timeout()} |
{stop, term(), term()}.
+-type gen_fsm_sync_return() :: {next_state, state_name(), term()} |
+ {next_state, state_name(), term(), timeout()} |
+ {reply, term(), state_name(), term()} |
+ {stop, term(), term(), term()}.
+
%%====================================================================
%% Internal application API
%%====================================================================
@@ -433,9 +439,7 @@ key_exchange(#ssh_msg_kex_dh_gex_reply{} = Msg,
new_keys(#ssh_msg_newkeys{} = Msg, #state{ssh_params = Ssh0} = State0) ->
{ok, Ssh} = ssh_transport:handle_new_keys(Msg, Ssh0),
- {NextStateName, State} =
- after_new_keys(State0#state{ssh_params = Ssh}),
- {next_state, NextStateName, next_packet(State)}.
+ after_new_keys(next_packet(State0#state{ssh_params = Ssh})).
%%--------------------------------------------------------------------
-spec userauth(#ssh_msg_service_request{} | #ssh_msg_service_accept{} |
@@ -559,11 +563,13 @@ userauth(#ssh_msg_userauth_banner{message = Msg},
-spec connected({#ssh_msg_kexinit{}, binary()}, %%| %% #ssh_msg_kexdh_init{},
#state{}) -> gen_fsm_state_return().
%%--------------------------------------------------------------------
-connected({#ssh_msg_kexinit{}, _Payload} = Event, State) ->
- kexinit(Event, State#state{renegotiate = true}).
-%% ;
-%% connected(#ssh_msg_kexdh_init{} = Event, State) ->
-%% key_exchange(Event, State#state{renegotiate = true}).
+connected({#ssh_msg_kexinit{}, _Payload} = Event, #state{ssh_params = Ssh0} = State0) ->
+ {KeyInitMsg, SshPacket, Ssh} = ssh_transport:key_exchange_init_msg(Ssh0),
+ State = State0#state{ssh_params = Ssh,
+ key_exchange_init_msg = KeyInitMsg,
+ renegotiate = true},
+ send_msg(SshPacket, State),
+ kexinit(Event, State).
%%--------------------------------------------------------------------
-spec handle_event(#ssh_msg_disconnect{} | #ssh_msg_ignore{} | #ssh_msg_debug{} |
@@ -592,33 +598,6 @@ handle_event(#ssh_msg_debug{always_display = Display, message = DbgMsg, language
handle_event(#ssh_msg_unimplemented{}, StateName, State) ->
{next_state, StateName, next_packet(State)};
-handle_event({adjust_window, ChannelId, Bytes}, StateName,
- #state{connection_state =
- #connection{channel_cache = Cache}} = State0) ->
- State =
- case ssh_channel:cache_lookup(Cache, ChannelId) of
- #channel{recv_window_size = WinSize, remote_id = Id} = Channel ->
- ssh_channel:cache_update(Cache, Channel#channel{recv_window_size =
- WinSize + Bytes}),
- Msg = ssh_connection:channel_adjust_window_msg(Id, Bytes),
- send_replies([{connection_reply, Msg}], State0);
- undefined ->
- State0
- end,
- {next_state, StateName, next_packet(State)};
-
-handle_event({reply_request, success, ChannelId}, StateName,
- #state{connection_state =
- #connection{channel_cache = Cache}} = State0) ->
- State = case ssh_channel:cache_lookup(Cache, ChannelId) of
- #channel{remote_id = RemoteId} ->
- Msg = ssh_connection:channel_success_msg(RemoteId),
- send_replies([{connection_reply, Msg}], State0);
- undefined ->
- State0
- end,
- {next_state, StateName, State};
-
handle_event(renegotiate, connected, #state{ssh_params = Ssh0}
= State) ->
{KeyInitMsg, SshPacket, Ssh} = ssh_transport:key_exchange_init_msg(Ssh0),
@@ -630,8 +609,7 @@ handle_event(renegotiate, connected, #state{ssh_params = Ssh0}
renegotiate = true})};
handle_event(renegotiate, StateName, State) ->
- timer:apply_after(?REKEY_TIMOUT, gen_fsm, send_all_state_event, [self(), renegotiate]),
- %% Allready in keyexcahange so ignore
+ %% Already in key-exchange so safe to ignore
{next_state, StateName, State};
%% Rekey due to sent data limit reached?
@@ -653,6 +631,38 @@ handle_event(data_size, connected, #state{ssh_params = Ssh0} = State) ->
{next_state, connected, next_packet(State)}
end;
handle_event(data_size, StateName, State) ->
+ %% Already in key-exchange so safe to ignore
+ {next_state, StateName, State};
+
+handle_event(Event, StateName, State) when StateName /= connected ->
+ Events = [{event, Event} | State#state.event_queue],
+ {next_state, StateName, State#state{event_queue = Events}};
+
+handle_event({adjust_window, ChannelId, Bytes}, StateName,
+ #state{connection_state =
+ #connection{channel_cache = Cache}} = State0) ->
+ State =
+ case ssh_channel:cache_lookup(Cache, ChannelId) of
+ #channel{recv_window_size = WinSize, remote_id = Id} = Channel ->
+ ssh_channel:cache_update(Cache, Channel#channel{recv_window_size =
+ WinSize + Bytes}),
+ Msg = ssh_connection:channel_adjust_window_msg(Id, Bytes),
+ send_replies([{connection_reply, Msg}], State0);
+ undefined ->
+ State0
+ end,
+ {next_state, StateName, next_packet(State)};
+
+handle_event({reply_request, success, ChannelId}, StateName,
+ #state{connection_state =
+ #connection{channel_cache = Cache}} = State0) ->
+ State = case ssh_channel:cache_lookup(Cache, ChannelId) of
+ #channel{remote_id = RemoteId} ->
+ Msg = ssh_connection:channel_success_msg(RemoteId),
+ send_replies([{connection_reply, Msg}], State0);
+ undefined ->
+ State0
+ end,
{next_state, StateName, State};
handle_event({request, ChannelPid, ChannelId, Type, Data}, StateName, State0) ->
@@ -683,8 +693,65 @@ handle_event({unknown, Data}, StateName, State) ->
sockname]} | {channel_info, channel_id(), [recv_window |
send_window]} |
{close, channel_id()} | stop, term(), state_name(), #state{})
- -> gen_fsm_state_return().
+ -> gen_fsm_sync_return().
%%--------------------------------------------------------------------
+handle_sync_event(get_print_info, _From, StateName, State) ->
+ Reply =
+ try
+ {inet:sockname(State#state.socket),
+ inet:peername(State#state.socket)
+ }
+ of
+ {{ok,Local}, {ok,Remote}} -> {{Local,Remote},io_lib:format("statename=~p",[StateName])};
+ _ -> {{"-",0},"-"}
+ catch
+ _:_ -> {{"?",0},"?"}
+ end,
+ {reply, Reply, StateName, State};
+
+handle_sync_event({connection_info, Options}, _From, StateName, State) ->
+ Info = ssh_info(Options, State, []),
+ {reply, Info, StateName, State};
+
+handle_sync_event({channel_info, ChannelId, Options}, _From, StateName,
+ #state{connection_state = #connection{channel_cache = Cache}} = State) ->
+ case ssh_channel:cache_lookup(Cache, ChannelId) of
+ #channel{} = Channel ->
+ Info = ssh_channel_info(Options, Channel, []),
+ {reply, Info, StateName, State};
+ undefined ->
+ {reply, [], StateName, State}
+ end;
+
+handle_sync_event({info, ChannelPid}, _From, StateName,
+ #state{connection_state =
+ #connection{channel_cache = Cache}} = State) ->
+ Result = ssh_channel:cache_foldl(
+ fun(Channel, Acc) when ChannelPid == all;
+ Channel#channel.user == ChannelPid ->
+ [Channel | Acc];
+ (_, Acc) ->
+ Acc
+ end, [], Cache),
+ {reply, {ok, Result}, StateName, State};
+
+handle_sync_event(stop, _, _StateName, #state{connection_state = Connection0,
+ role = Role,
+ opts = Opts} = State0) ->
+ {disconnect, Reason, {{replies, Replies}, Connection}} =
+ ssh_connection:handle_msg(#ssh_msg_disconnect{code = ?SSH_DISCONNECT_BY_APPLICATION,
+ description = "User closed down connection",
+ language = "en"}, Connection0, Role),
+ State = send_replies(Replies, State0),
+ SSHOpts = proplists:get_value(ssh_opts, Opts),
+ disconnect_fun(Reason, SSHOpts),
+ {stop, normal, ok, State#state{connection_state = Connection}};
+
+
+handle_sync_event(Event, From, StateName, State) when StateName /= connected ->
+ Events = [{sync, Event, From} | State#state.event_queue],
+ {next_state, StateName, State#state{event_queue = Events}};
+
handle_sync_event({request, ChannelPid, ChannelId, Type, Data, Timeout}, From, StateName, State0) ->
{{replies, Replies}, State1} = handle_request(ChannelPid,
ChannelId, Type, Data,
@@ -787,46 +854,6 @@ handle_sync_event({recv_window, ChannelId}, _From, StateName,
end,
{reply, Reply, StateName, next_packet(State)};
-handle_sync_event(get_print_info, _From, StateName, State) ->
- Reply =
- try
- {inet:sockname(State#state.socket),
- inet:peername(State#state.socket)
- }
- of
- {{ok,Local}, {ok,Remote}} -> {{Local,Remote},io_lib:format("statename=~p",[StateName])};
- _ -> {{"-",0},"-"}
- catch
- _:_ -> {{"?",0},"?"}
- end,
- {reply, Reply, StateName, State};
-
-handle_sync_event({connection_info, Options}, _From, StateName, State) ->
- Info = ssh_info(Options, State, []),
- {reply, Info, StateName, State};
-
-handle_sync_event({channel_info, ChannelId, Options}, _From, StateName,
- #state{connection_state = #connection{channel_cache = Cache}} = State) ->
- case ssh_channel:cache_lookup(Cache, ChannelId) of
- #channel{} = Channel ->
- Info = ssh_channel_info(Options, Channel, []),
- {reply, Info, StateName, State};
- undefined ->
- {reply, [], StateName, State}
- end;
-
-handle_sync_event({info, ChannelPid}, _From, StateName,
- #state{connection_state =
- #connection{channel_cache = Cache}} = State) ->
- Result = ssh_channel:cache_foldl(
- fun(Channel, Acc) when ChannelPid == all;
- Channel#channel.user == ChannelPid ->
- [Channel | Acc];
- (_, Acc) ->
- Acc
- end, [], Cache),
- {reply, {ok, Result}, StateName, State};
-
handle_sync_event({close, ChannelId}, _, StateName,
#state{connection_state =
#connection{channel_cache = Cache}} = State0) ->
@@ -841,19 +868,7 @@ handle_sync_event({close, ChannelId}, _, StateName,
undefined ->
State0
end,
- {reply, ok, StateName, next_packet(State)};
-
-handle_sync_event(stop, _, _StateName, #state{connection_state = Connection0,
- role = Role,
- opts = Opts} = State0) ->
- {disconnect, Reason, {{replies, Replies}, Connection}} =
- ssh_connection:handle_msg(#ssh_msg_disconnect{code = ?SSH_DISCONNECT_BY_APPLICATION,
- description = "User closed down connection",
- language = "en"}, Connection0, Role),
- State = send_replies(Replies, State0),
- SSHOpts = proplists:get_value(ssh_opts, Opts),
- disconnect_fun(Reason, SSHOpts),
- {stop, normal, ok, State#state{connection_state = Connection}}.
+ {reply, ok, StateName, next_packet(State)}.
%%--------------------------------------------------------------------
-spec handle_info({atom(), port(), binary()} | {atom(), port()} |
@@ -1282,8 +1297,17 @@ generate_event(<<?BYTE(Byte), _/binary>> = Msg, StateName,
ConnectionMsg = ssh_message:decode(Msg),
State1 = generate_event_new_state(State0, EncData),
try ssh_connection:handle_msg(ConnectionMsg, Connection0, Role) of
- {{replies, Replies}, Connection} ->
- State = send_replies(Replies, State1#state{connection_state = Connection}),
+ {{replies, Replies0}, Connection} ->
+ if StateName == connected ->
+ Replies = Replies0,
+ State2 = State1;
+ true ->
+ {ConnReplies, Replies} =
+ lists:splitwith(fun not_connected_filter/1, Replies0),
+ Q = State1#state.event_queue ++ ConnReplies,
+ State2 = State1#state{ event_queue = Q }
+ end,
+ State = send_replies(Replies, State2#state{connection_state = Connection}),
{next_state, StateName, next_packet(State)};
{noreply, Connection} ->
{next_state, StateName, next_packet(State1#state{connection_state = Connection})};
@@ -1456,15 +1480,43 @@ next_packet(#state{socket = Socket} = State) ->
State.
after_new_keys(#state{renegotiate = true} = State) ->
- {connected, State#state{renegotiate = false}};
+ State1 = State#state{renegotiate = false, event_queue = []},
+ lists:foldr(fun after_new_keys_events/2, {next_state, connected, State1}, State#state.event_queue);
after_new_keys(#state{renegotiate = false,
ssh_params = #ssh{role = client} = Ssh0} = State) ->
{Msg, Ssh} = ssh_auth:service_request_msg(Ssh0),
send_msg(Msg, State),
- {userauth, State#state{ssh_params = Ssh}};
+ {next_state, userauth, State#state{ssh_params = Ssh}};
after_new_keys(#state{renegotiate = false,
ssh_params = #ssh{role = server}} = State) ->
- {userauth, State}.
+ {next_state, userauth, State}.
+
+after_new_keys_events({sync, _Event, From}, {stop, _Reason, _StateData}=Terminator) ->
+ gen_fsm:reply(From, {error, closed}),
+ Terminator;
+after_new_keys_events(_, {stop, _Reason, _StateData}=Terminator) ->
+ Terminator;
+after_new_keys_events({sync, Event, From}, {next_state, StateName, StateData}) ->
+ case handle_sync_event(Event, From, StateName, StateData) of
+ {reply, Reply, NextStateName, NewStateData} ->
+ gen_fsm:reply(From, Reply),
+ {next_state, NextStateName, NewStateData};
+ {next_state, NextStateName, NewStateData}->
+ {next_state, NextStateName, NewStateData};
+ {stop, Reason, Reply, NewStateData} ->
+ gen_fsm:reply(From, Reply),
+ {stop, Reason, NewStateData}
+ end;
+after_new_keys_events({event, Event}, {next_state, StateName, StateData}) ->
+ case handle_event(Event, StateName, StateData) of
+ {next_state, NextStateName, NewStateData}->
+ {next_state, NextStateName, NewStateData};
+ {stop, Reason, NewStateData} ->
+ {stop, Reason, NewStateData}
+ end;
+after_new_keys_events({connection_reply, _Data} = Reply, {StateName, State}) ->
+ NewState = send_replies([Reply], State),
+ {next_state, StateName, NewState}.
handle_ssh_packet_data(RemainingSshPacketLen, DecData, EncData, StateName,
State) ->
@@ -1625,6 +1677,11 @@ log_error(Reason) ->
error_logger:error_report(Report),
"Internal error".
+not_connected_filter({connection_reply, _Data}) ->
+ true;
+not_connected_filter(_) ->
+ false.
+
send_replies([], State) ->
State;
send_replies([{connection_reply, Data} | Rest], #state{ssh_params = Ssh0} = State) ->
diff --git a/lib/ssh/test/Makefile b/lib/ssh/test/Makefile
index 740dbd0235..39b2f57d26 100644
--- a/lib/ssh/test/Makefile
+++ b/lib/ssh/test/Makefile
@@ -40,7 +40,8 @@ MODULES= \
ssh_connection_SUITE \
ssh_echo_server \
ssh_peername_sockname_server \
- ssh_test_cli
+ ssh_test_cli \
+ ssh_relay
HRL_FILES_NEEDED_IN_TEST= \
$(ERL_TOP)/lib/ssh/src/ssh.hrl \
diff --git a/lib/ssh/test/ssh_basic_SUITE.erl b/lib/ssh/test/ssh_basic_SUITE.erl
index 242c9a3bd9..aaf0fa9905 100644
--- a/lib/ssh/test/ssh_basic_SUITE.erl
+++ b/lib/ssh/test/ssh_basic_SUITE.erl
@@ -29,6 +29,7 @@
-define(NEWLINE, <<"\r\n">>).
+-define(REKEY_DATA_TMO, 65000).
%%--------------------------------------------------------------------
%% Common Test interface functions -----------------------------------
%%--------------------------------------------------------------------
@@ -44,6 +45,7 @@ all() ->
{group, dsa_pass_key},
{group, rsa_pass_key},
{group, internal_error},
+ {group, renegotiate},
daemon_already_started,
server_password_option,
server_userpassword_option,
@@ -69,6 +71,7 @@ groups() ->
{dsa_pass_key, [], [pass_phrase]},
{rsa_pass_key, [], [pass_phrase]},
{internal_error, [], [internal_error]},
+ {renegotiate, [], [rekey, rekey_limit, renegotiate1, renegotiate2]},
{hardening_tests, [], [ssh_connect_nonegtimeout_connected_parallel,
ssh_connect_nonegtimeout_connected_sequential,
ssh_connect_negtimeout_parallel,
@@ -84,8 +87,7 @@ groups() ->
basic_tests() ->
[send, close, peername_sockname,
exec, exec_compressed, shell, cli, known_hosts,
- idle_time, rekey, openssh_zlib_basic_test,
- misc_ssh_options, inet_option].
+ idle_time, openssh_zlib_basic_test, misc_ssh_options, inet_option].
%%--------------------------------------------------------------------
@@ -333,25 +335,175 @@ idle_time(Config) ->
rekey() ->
[{doc, "Idle timeout test"}].
rekey(Config) ->
- SystemDir = filename:join(?config(priv_dir, Config), system),
+ SystemDir = ?config(data_dir, Config),
UserDir = ?config(priv_dir, Config),
{Pid, Host, Port} = ssh_test_lib:daemon([{system_dir, SystemDir},
- {user_dir, UserDir},
+ {user_dir, UserDir},
{failfun, fun ssh_test_lib:failfun/2},
+ {user_passwords,
+ [{"simon", "says"}]},
{rekey_limit, 0}]),
+
ConnectionRef =
ssh_test_lib:connect(Host, Port, [{silently_accept_hosts, true},
{user_dir, UserDir},
+ {user, "simon"},
+ {password, "says"},
{user_interaction, false},
{rekey_limit, 0}]),
receive
- after 200000 ->
+ after ?REKEY_DATA_TMO ->
%%By this time rekeying would have been done
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid)
end.
%%--------------------------------------------------------------------
+rekey_limit() ->
+ [{doc, "Test rekeying by data volume"}].
+rekey_limit(Config) ->
+ SystemDir = ?config(data_dir, Config),
+ UserDir = ?config(priv_dir, Config),
+ DataFile = filename:join(UserDir, "rekey.data"),
+
+ {Pid, Host, Port} = ssh_test_lib:daemon([{system_dir, SystemDir},
+ {user_dir, UserDir},
+ {user_passwords,
+ [{"simon", "says"}]}]),
+ {ok, SftpPid, ConnectionRef} =
+ ssh_sftp:start_channel(Host, Port, [{system_dir, SystemDir},
+ {user_dir, UserDir},
+ {user, "simon"},
+ {password, "says"},
+ {rekey_limit, 2500},
+ {user_interaction, false},
+ {silently_accept_hosts, true}]),
+
+ Kex1 = get_kex_init(ConnectionRef),
+
+ ct:sleep(?REKEY_DATA_TMO),
+ Kex1 = get_kex_init(ConnectionRef),
+
+ Data = lists:duplicate(9000,1),
+ ok = ssh_sftp:write_file(SftpPid, DataFile, Data),
+
+ ct:sleep(?REKEY_DATA_TMO),
+ Kex2 = get_kex_init(ConnectionRef),
+
+ false = (Kex2 == Kex1),
+
+ ct:sleep(?REKEY_DATA_TMO),
+ Kex2 = get_kex_init(ConnectionRef),
+
+ ok = ssh_sftp:write_file(SftpPid, DataFile, "hi\n"),
+
+ ct:sleep(?REKEY_DATA_TMO),
+ Kex2 = get_kex_init(ConnectionRef),
+
+ false = (Kex2 == Kex1),
+
+ ct:sleep(?REKEY_DATA_TMO),
+ Kex2 = get_kex_init(ConnectionRef),
+
+
+ ssh_sftp:stop_channel(SftpPid),
+ ssh:close(ConnectionRef),
+ ssh:stop_daemon(Pid).
+
+%%--------------------------------------------------------------------
+renegotiate1() ->
+ [{doc, "Test rekeying with simulataneous send request"}].
+renegotiate1(Config) ->
+ SystemDir = ?config(data_dir, Config),
+ UserDir = ?config(priv_dir, Config),
+ DataFile = filename:join(UserDir, "renegotiate1.data"),
+
+ {Pid, Host, DPort} = ssh_test_lib:daemon([{system_dir, SystemDir},
+ {user_dir, UserDir},
+ {user_passwords,
+ [{"simon", "says"}]}]),
+ RPort = ssh_test_lib:inet_port(),
+
+ {ok,RelayPid} = ssh_relay:start_link({0,0,0,0}, RPort, Host, DPort),
+
+ {ok, SftpPid, ConnectionRef} =
+ ssh_sftp:start_channel(Host, RPort, [{system_dir, SystemDir},
+ {user_dir, UserDir},
+ {user, "simon"},
+ {password, "says"},
+ {user_interaction, false},
+ {silently_accept_hosts, true}]),
+
+ Kex1 = get_kex_init(ConnectionRef),
+
+ {ok, Handle} = ssh_sftp:open(SftpPid, DataFile, [write]),
+
+ ok = ssh_sftp:write(SftpPid, Handle, "hi\n"),
+
+ ssh_relay:hold(RelayPid, rx, 20, 1000),
+ ssh_connection_handler:renegotiate(ConnectionRef),
+ spawn(fun() -> ok=ssh_sftp:write(SftpPid, Handle, "another hi\n") end),
+
+ ct:sleep(2000),
+
+ Kex2 = get_kex_init(ConnectionRef),
+
+ false = (Kex2 == Kex1),
+
+ ssh_relay:stop(RelayPid),
+ ssh_sftp:stop_channel(SftpPid),
+ ssh:close(ConnectionRef),
+ ssh:stop_daemon(Pid).
+
+%%--------------------------------------------------------------------
+renegotiate2() ->
+ [{doc, "Test rekeying with inflight messages from peer"}].
+renegotiate2(Config) ->
+ SystemDir = ?config(data_dir, Config),
+ UserDir = ?config(priv_dir, Config),
+ DataFile = filename:join(UserDir, "renegotiate1.data"),
+
+ {Pid, Host, DPort} = ssh_test_lib:daemon([{system_dir, SystemDir},
+ {user_dir, UserDir},
+ {user_passwords,
+ [{"simon", "says"}]}]),
+ RPort = ssh_test_lib:inet_port(),
+
+ {ok,RelayPid} = ssh_relay:start_link({0,0,0,0}, RPort, Host, DPort),
+
+ {ok, SftpPid, ConnectionRef} =
+ ssh_sftp:start_channel(Host, RPort, [{system_dir, SystemDir},
+ {user_dir, UserDir},
+ {user, "simon"},
+ {password, "says"},
+ {user_interaction, false},
+ {silently_accept_hosts, true}]),
+
+ Kex1 = get_kex_init(ConnectionRef),
+
+ {ok, Handle} = ssh_sftp:open(SftpPid, DataFile, [write]),
+
+ ok = ssh_sftp:write(SftpPid, Handle, "hi\n"),
+
+ ssh_relay:hold(RelayPid, rx, 20, infinity),
+ spawn(fun() -> ok=ssh_sftp:write(SftpPid, Handle, "another hi\n") end),
+ %% need a small pause here to ensure ssh_sftp:write is executed
+ ct:sleep(10),
+ ssh_connection_handler:renegotiate(ConnectionRef),
+ ssh_relay:release(RelayPid, rx),
+
+ ct:sleep(2000),
+
+ Kex2 = get_kex_init(ConnectionRef),
+
+ false = (Kex2 == Kex1),
+
+ ssh_relay:stop(RelayPid),
+ ssh_sftp:stop_channel(SftpPid),
+ ssh:close(ConnectionRef),
+ ssh:stop_daemon(Pid).
+
+%%--------------------------------------------------------------------
shell() ->
[{doc, "Test that ssh:shell/2 works"}].
shell(Config) when is_list(Config) ->
@@ -1300,3 +1452,18 @@ fake_daemon(_Config) ->
{sockname,Server,ServerHost,ServerPort} -> {Server, ServerHost, ServerPort}
end.
+%% get_kex_init - helper function to get key_exchange_init_msg
+get_kex_init(Conn) ->
+ %% First, validate the key exchange is complete (StateName == connected)
+ {connected,S} = sys:get_state(Conn),
+ %% Next, walk through the elements of the #state record looking
+ %% for the #ssh_msg_kexinit record. This method is robust against
+ %% changes to either record. The KEXINIT message contains a cookie
+ %% unique to each invocation of the key exchange procedure (RFC4253)
+ SL = tuple_to_list(S),
+ case lists:keyfind(ssh_msg_kexinit, 1, SL) of
+ false ->
+ throw(not_found);
+ KexInit ->
+ KexInit
+ end.
diff --git a/lib/ssh/test/ssh_relay.erl b/lib/ssh/test/ssh_relay.erl
new file mode 100644
index 0000000000..a4f2bad2e2
--- /dev/null
+++ b/lib/ssh/test/ssh_relay.erl
@@ -0,0 +1,407 @@
+%%%-------------------------------------------------------------------
+%%% @author Simon Cornish <[email protected]>
+%%% @copyright (C) 2015, Simon Cornish
+%%% @doc
+%%% Provide manipulatable TCP-level relaying for testing SSH
+%%% @end
+%%% Created : 7 May 2015 by Simon Cornish <[email protected]>
+%%%-------------------------------------------------------------------
+-module(ssh_relay).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/4]).
+-export([stop/1]).
+-export([hold/4, release/2, release_next/3]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(hold, {
+ port,
+ n,
+ tmo,
+ tref,
+ q = []
+ }).
+
+-record(state, {
+ local_addr,
+ local_port,
+ peer_addr,
+ peer_port,
+ lpid,
+ local,
+ peer,
+ tx_hold,
+ rx_hold
+ }).
+
+-define(ACCEPT_TMO, 200).
+%%%===================================================================
+%%% API
+%%%===================================================================
+%%--------------------------------------------------------------------
+%% @doc
+%% Hold N (or 'all') messages in given direction.
+%% Messages will be released after the N+1th message or
+%% Tmo ms or 'infinity'
+%%
+%% Dir is 'tx' for direction local -> peer
+%% and 'rx' for direction peer -> local
+%%
+%% An Error, ealready, is returned if there is already a hold
+%% in the given direction
+%%
+%% @spec hold(Srv, Dir, N, Tmo) -> ok | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+hold(Srv, Dir, N, Tmo) ->
+ gen_server:call(Srv, {hold, Dir, N, Tmo}).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Release all held messages in given direction.
+%%
+%% An Error, enoent, is returned if there is no hold
+%% in the given direction
+%%
+%% @spec release(Srv, Dir) -> ok | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+release(Srv, Dir) ->
+ gen_server:call(Srv, {release, Dir}).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Release all held messages in given direction after the
+%% next message in the trigger direction
+%%
+%% An Error, enoent, is returned if there is no hold
+%% in the given direction
+%%
+%% @spec release_next(Srv, Dir, TriggerDir) -> ok | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+release_next(Srv, Dir, TriggerDir) ->
+ gen_server:call(Srv, {release_next, Dir, TriggerDir}).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Starts the server
+%%
+%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
+%% @end
+%%--------------------------------------------------------------------
+start_link(ListenAddr, ListenPort, PeerAddr, PeerPort) ->
+ gen_server:start_link(?MODULE, [ListenAddr, ListenPort, PeerAddr, PeerPort], []).
+
+stop(Srv) ->
+ unlink(Srv),
+ Srv ! stop.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Initializes the server
+%%
+%% @spec init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% @end
+%%--------------------------------------------------------------------
+init([ListenAddr, ListenPort, PeerAddr, PeerPort | Options]) ->
+ IfAddr = case ListenAddr of
+ {0,0,0,0} ->
+ [];
+ _ ->
+ [{ifaddr, ListenAddr}]
+ end,
+ case gen_tcp:listen(ListenPort, [{reuseaddr, true}, {backlog, 1}, {active, false}, binary | IfAddr]) of
+ {ok, LSock} ->
+ Parent = self(),
+ {LPid, _LMod} = spawn_monitor(fun() -> listen(Parent, LSock) end),
+ S = #state{local_addr = ListenAddr,
+ local_port = ListenPort,
+ lpid = LPid,
+ peer_addr = PeerAddr,
+ peer_port = PeerPort
+ },
+ {ok, S};
+ Error ->
+ {stop, Error}
+ end.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling call messages
+%%
+%% @spec handle_call(Request, From, State) ->
+%% {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_call({hold, Dir, N, Tmo}, _From, State) ->
+ case Dir of
+ tx ->
+ do_hold(#state.tx_hold, State#state.peer, N, Tmo, State);
+ rx ->
+ do_hold(#state.rx_hold, State#state.local, N, Tmo, State);
+ _ ->
+ {reply, {error, einval}, State}
+ end;
+handle_call({release, Dir}, _From, State) ->
+ case Dir of
+ tx ->
+ do_release(#state.tx_hold, State);
+ rx ->
+ do_release(#state.rx_hold, State);
+ _ ->
+ {reply, {error, einval}, State}
+ end;
+handle_call({release_next, _Dir, _TriggerDir}, _From, State) ->
+ {reply, {error, nyi}, State};
+
+handle_call(Request, _From, State) ->
+ Reply = {unhandled, Request},
+ {reply, Reply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling cast messages
+%%
+%% @spec handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Handling all non call/cast messages
+%%
+%% @spec handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% @end
+%%--------------------------------------------------------------------
+handle_info({tcp, Local, Data}, S) when S#state.local == Local ->
+ S1 = do_local(Data, S),
+ {noreply, S1};
+
+handle_info({tcp_error, Local, Error}, S) when S#state.local == Local ->
+ S1 = do_local({error, Error}, S),
+ {noreply, S1};
+
+handle_info({tcp_closed, Local}, S) when S#state.local == Local ->
+ S1 = do_local(closed, S),
+ {noreply, S1};
+
+handle_info({tcp, Peer, Data}, S) when S#state.peer == Peer ->
+ S1 = do_peer(Data, S),
+ {noreply, S1};
+
+handle_info({tcp_error, Peer, Error}, S) when S#state.peer == Peer ->
+ S1 = do_peer({error, Error}, S),
+ {noreply, S1};
+
+handle_info({tcp_closed, Peer}, S) when S#state.peer == Peer ->
+ S1 = do_peer(closed, S),
+ {noreply, S1};
+
+handle_info({accept, Local}, S) ->
+ S1 = do_accept(Local, S),
+ {noreply, S1};
+
+handle_info({activate, Local}, State) ->
+ inet:setopts(Local, [{active, true}]),
+ {noreply, State};
+
+handle_info({release, Pos}, S) ->
+ {reply, _, S1} = do_release(Pos,S),
+ {noreply, S1};
+
+handle_info(stop, State) ->
+ {stop, normal, State};
+
+handle_info({'DOWN', _Ref, _process, LPid, Reason}, S) when S#state.lpid == LPid ->
+ io:format("Acceptor has finished: ~p~n", [Reason]),
+ {noreply, S};
+
+handle_info(_Info, State) ->
+ io:format("Unhandled info: ~p~n", [_Info]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any
+%% necessary cleaning up. When it returns, the gen_server terminates
+%% with Reason. The return value is ignored.
+%%
+%% @spec terminate(Reason, State) -> void()
+%% @end
+%%--------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ok.
+
+%%--------------------------------------------------------------------
+%% @private
+%% @doc
+%% Convert process state when code is changed
+%%
+%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% @end
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%%===================================================================
+%%% Internal functions
+%%%===================================================================
+do_hold(Pos, _Port, _N, _Tmo, S) when element(Pos, S) /= undefined ->
+ {reply, {error, ealready}, S};
+do_hold(Pos, Port, N, Tmo, S) ->
+ TRef = if is_integer(Tmo) andalso Tmo > 0 ->
+ erlang:send_after(Tmo, self(), {release, Pos});
+ true ->
+ undefined
+ end,
+ Hold = #hold{port = Port, n = N, tmo = Tmo, tref = TRef},
+ {reply, ok, setelement(Pos, S, Hold)}.
+
+do_release(HPos, S) when element(HPos, S) == undefined ->
+ {reply, {error, enoent}, S};
+do_release(HPos, S) ->
+ #hold{port = Port, tref = TRef, q = Q} = element(HPos, S),
+ lists:foreach(fun(M) -> gen_tcp:send(Port, M), erlang:yield() end, Q),
+ catch erlang:cancel_timer(TRef),
+ receive
+ {release, HPos} -> ok
+ after 0 ->
+ ok
+ end,
+ {reply, ok, setelement(HPos, S, undefined)}.
+
+listen(Parent, LSock) ->
+ monitor(process, Parent),
+ do_listen(Parent, LSock).
+
+do_listen(Parent, LSock) ->
+ %% So annoying there is no select-like sematic for this
+ case gen_tcp:accept(LSock, ?ACCEPT_TMO) of
+ {ok, Sock} ->
+ Parent ! {accept, Sock},
+ gen_tcp:controlling_process(Sock, Parent),
+ Parent ! {activate, Sock},
+ do_flush(Parent, Sock),
+ gen_tcp:close(LSock);
+ {error, timeout} ->
+ receive
+ DOWN when element(1, DOWN) == 'DOWN' ->
+ ok;
+ stop ->
+ ok
+ after 1 ->
+ do_listen(Parent, LSock)
+ end;
+ Error ->
+ gen_tcp:close(LSock),
+ exit({accept,Error})
+ end.
+
+do_flush(Parent, Sock) ->
+ receive
+ {Tcp, Sock, _} = Msg when Tcp == tcp; Tcp == tcp_error ->
+ Parent ! Msg,
+ do_flush(Parent, Sock);
+ {tcp_closed, Sock} = Msg ->
+ Parent ! Msg,
+ do_flush(Parent, Sock)
+ after 1 ->
+ ok
+ end.
+
+do_accept(Local, S) ->
+ case gen_tcp:connect(S#state.peer_addr, S#state.peer_port, [{active, true}, binary]) of
+ {ok, Peer} ->
+ S#state{local = Local, peer = Peer};
+ Error ->
+ exit({connect, Error})
+ end.
+
+do_local(Data, S) when is_binary(Data) ->
+ TxH = S#state.tx_hold,
+ if TxH == undefined ->
+ gen_tcp:send(S#state.peer, Data),
+ S;
+ TxH#hold.n == 0 ->
+ lists:foreach(fun(M) -> gen_tcp:send(S#state.peer, M) end, TxH#hold.q),
+ gen_tcp:send(S#state.peer, Data),
+ catch erlang:cancel_timer(TxH#hold.tref),
+ TxP = #state.tx_hold,
+ receive
+ {release, TxP} ->
+ ok
+ after 0 ->
+ ok
+ end,
+ S#state{tx_hold = undefined};
+ true ->
+ Q = TxH#hold.q ++ [Data],
+ N = if is_integer(TxH#hold.n) ->
+ TxH#hold.n -1;
+ true ->
+ TxH#hold.n
+ end,
+ S#state{tx_hold = TxH#hold{q = Q, n = N}}
+ end;
+do_local(Error, _S) ->
+ exit({local, Error}).
+
+do_peer(Data, S) when is_binary(Data) ->
+ RxH = S#state.rx_hold,
+ if RxH == undefined ->
+ gen_tcp:send(S#state.local, Data),
+ S;
+ RxH#hold.n == 0 ->
+ lists:foreach(fun(M) -> gen_tcp:send(S#state.local, M) end, RxH#hold.q),
+ gen_tcp:send(S#state.local, Data),
+ catch erlang:cancel_timer(RxH#hold.tref),
+ RxP = #state.rx_hold,
+ receive
+ {release, RxP} ->
+ ok
+ after 0 ->
+ ok
+ end,
+ S#state{rx_hold = undefined};
+ true ->
+ Q = RxH#hold.q ++ [Data],
+ N = if is_integer(RxH#hold.n) ->
+ RxH#hold.n -1;
+ true ->
+ RxH#hold.n
+ end,
+ S#state{rx_hold = RxH#hold{q = Q, n = N}}
+ end;
+do_peer(Error, _S) ->
+ exit({peer, Error}).
+