From b7f81aa55ffa161be01929b8d156e40bf751de17 Mon Sep 17 00:00:00 2001 From: Hans Nilsson Date: Tue, 26 Apr 2016 15:52:51 +0200 Subject: ssh: Channel request timer refactoring --- lib/ssh/src/ssh_connection_handler.erl | 121 +++++++++++++++++---------------- 1 file changed, 62 insertions(+), 59 deletions(-) (limited to 'lib/ssh/src') diff --git a/lib/ssh/src/ssh_connection_handler.erl b/lib/ssh/src/ssh_connection_handler.erl index 6f9b2b3e22..e5229eb954 100644 --- a/lib/ssh/src/ssh_connection_handler.erl +++ b/lib/ssh/src/ssh_connection_handler.erl @@ -63,7 +63,7 @@ %%% Behaviour callbacks -export([handle_event/4, terminate/3, format_status/2, code_change/4]). -%%% Exports not intended to be used :) +%%% Exports not intended to be used :). They are used for spawning and tests -export([init_connection_handler/3, % proc_lib:spawn needs this init_ssh_record/3, % Export of this internal function % intended for low-level protocol test suites @@ -99,6 +99,8 @@ stop(ConnectionHandler)-> %% Internal application API %%==================================================================== +-define(DefaultTransport, {tcp, gen_tcp, tcp_closed} ). + %%-------------------------------------------------------------------- -spec start_connection(role(), inet:socket(), @@ -109,11 +111,8 @@ stop(ConnectionHandler)-> start_connection(client = Role, Socket, Options, Timeout) -> try {ok, Pid} = sshc_sup:start_child([Role, Socket, Options]), - {_, Callback, _} = - proplists:get_value(transport, Options, {tcp, gen_tcp, tcp_closed}), - ok = socket_control(Socket, Pid, Callback), - Ref = erlang:monitor(process, Pid), - handshake(Pid, Ref, Timeout) + ok = socket_control(Socket, Pid, Options), + handshake(Pid, erlang:monitor(process,Pid), Timeout) catch exit:{noproc, _} -> {error, ssh_not_started}; @@ -383,7 +382,7 @@ init_connection_handler(Role, Socket, Opts) -> S0 = init_process_state(Role, Socket, Opts), try {Protocol, Callback, CloseTag} = - proplists:get_value(transport, Opts, {tcp, gen_tcp, tcp_closed}), + proplists:get_value(transport, Opts, ?DefaultTransport), S0#data{ssh_params = init_ssh_record(Role, Socket, Opts), transport_protocol = Protocol, transport_cb = Callback, @@ -1070,13 +1069,13 @@ handle_event({call,_}, _, StateName, _) when StateName /= {connected,server}, handle_event({call,From}, {request, ChannelPid, ChannelId, Type, Data, Timeout}, {connected,_}, D0) -> D = handle_request(ChannelPid, ChannelId, Type, Data, true, From, D0), %% Note reply to channel will happen later when reply is recived from peer on the socket - start_timeout(ChannelId, From, Timeout), + start_channel_request_timer(ChannelId, From, Timeout), {keep_state, cache_request_idle_timer_check(D)}; handle_event({call,From}, {request, ChannelId, Type, Data, Timeout}, {connected,_}, D0) -> D = handle_request(ChannelId, Type, Data, true, From, D0), %% Note reply to channel will happen later when reply is recived from peer on the socket - start_timeout(ChannelId, From, Timeout), + start_channel_request_timer(ChannelId, From, Timeout), {keep_state, cache_request_idle_timer_check(D)}; handle_event({call,From}, {global_request, Pid, _, _, _} = Request, {connected,_}, D0) -> @@ -1089,7 +1088,7 @@ handle_event({call,From}, {data, ChannelId, Type, Data, Timeout}, {connected,_}, {{replies, Replies}, Connection} = ssh_connection:channel_data(ChannelId, Type, Data, D0#data.connection_state, From), {Repls,D} = send_replies(Replies, D0#data{connection_state = Connection}), - start_timeout(ChannelId, From, Timeout), + start_channel_request_timer(ChannelId, From, Timeout), % FIXME: No message exchange so why? {keep_state, D, Repls}; handle_event({call,From}, {eof, ChannelId}, {connected,_}, D0) -> @@ -1121,7 +1120,7 @@ handle_event({call,From}, send_buf = queue:new() }), D = add_request(true, ChannelId, From, D2), - start_timeout(ChannelId, From, Timeout), + start_channel_request_timer(ChannelId, From, Timeout), {keep_state, cache_cancel_idle_timer(D)}; handle_event({call,From}, {send_window, ChannelId}, {connected,_}, D) -> @@ -1243,12 +1242,15 @@ handle_event(info, {CloseTag,Socket}, StateName, StateName, D); handle_event(info, {timeout, {_, From} = Request}, _, - #data{connection_state = #connection{requests = Requests} = C0} = D) -> + #data{connection_state = #connection{requests = Requests} = C0} = D) -> case lists:member(Request, Requests) of true -> + %% A channel request is not answered in time. Answer {error,timeout} + %% to the caller C = C0#connection{requests = lists:delete(Request, Requests)}, {keep_state, D#data{connection_state=C}, [{reply,From,{error,timeout}}]}; false -> + %% The request is answered - just ignore the timeout keep_state_and_data end; @@ -1424,8 +1426,7 @@ start_the_connection_child(UserPid, Role, Socket, Options) -> ConnectionSup = proplists:get_value(connection_sup, Sups), Opts = [{supervisors, Sups}, {user_pid, UserPid} | proplists:get_value(ssh_opts, Options, [])], {ok, Pid} = ssh_connection_sup:start_child(ConnectionSup, [Role, Socket, Opts]), - {_, Callback, _} = proplists:get_value(transport, Options, {tcp, gen_tcp, tcp_closed}), - socket_control(Socket, Pid, Callback), + ok = socket_control(Socket, Pid, Options), Pid. %%-------------------------------------------------------------------- @@ -1698,42 +1699,6 @@ counterpart_versions(NumVsn, StrVsn, #ssh{role = server} = Ssh) -> counterpart_versions(NumVsn, StrVsn, #ssh{role = client} = Ssh) -> Ssh#ssh{s_vsn = NumVsn , s_version = StrVsn}. -connected_fun(User, Method, #data{ssh_params = #ssh{peer = {_,Peer}}, - opts = Opts}) -> - case proplists:get_value(connectfun, Opts) of - undefined -> - ok; - Fun -> - catch Fun(User, Peer, Method) - end. - -retry_fun(_, undefined, _) -> - ok; -retry_fun(User, Reason, #data{ssh_params = #ssh{opts = Opts, - peer = {_,Peer} - }}) -> - {Tag,Info} = - case Reason of - {error, Error} -> - {failfun, Error}; - _ -> - {infofun, Reason} - end, - Fun = proplists:get_value(Tag, Opts, fun(_,_)-> ok end), - try erlang:fun_info(Fun, arity) - of - {arity, 2} -> %% Backwards compatible - catch Fun(User, Info); - {arity, 3} -> - catch Fun(User, Peer, Info); - _ -> - ok - catch - _:_ -> - ok - end. - - ssh_info([], _State, Acc) -> Acc; ssh_info([client_version | Rest], #data{ssh_params = #ssh{c_vsn = IntVsn, @@ -1812,8 +1777,6 @@ get_repl(X, Acc) -> %%%---------------------------------------------------------------- disconnect_fun({disconnect,Msg}, D) -> disconnect_fun(Msg, D); -%% disconnect_fun(_, undefined) -> -%% ok; disconnect_fun(Reason, #data{opts=Opts}) -> case proplists:get_value(disconnectfun, Opts) of undefined -> @@ -1845,6 +1808,41 @@ debug_fun(#ssh_msg_debug{always_display = Display, end. +connected_fun(User, Method, #data{ssh_params = #ssh{peer = {_,Peer}}, + opts = Opts}) -> + case proplists:get_value(connectfun, Opts) of + undefined -> + ok; + Fun -> + catch Fun(User, Peer, Method) + end. + +retry_fun(_, undefined, _) -> + ok; +retry_fun(User, Reason, #data{ssh_params = #ssh{opts = Opts, + peer = {_,Peer} + }}) -> + {Tag,Info} = + case Reason of + {error, Error} -> + {failfun, Error}; + _ -> + {infofun, Reason} + end, + Fun = proplists:get_value(Tag, Opts, fun(_,_)-> ok end), + try erlang:fun_info(Fun, arity) + of + {arity, 2} -> %% Backwards compatible + catch Fun(User, Info); + {arity, 3} -> + catch Fun(User, Peer, Info); + _ -> + ok + catch + _:_ -> + ok + end. + %%%---------------------------------------------------------------- %%% Cache idle timer that closes the connection if there are no %%% channels open for a while. @@ -1904,8 +1902,18 @@ cache_request_idle_timer_check(D = #data{idle_timer_value = IdleTime}) -> D. %%%---------------------------------------------------------------- -socket_control(Socket, Pid, Transport) -> - case Transport:controlling_process(Socket, Pid) of +start_channel_request_timer(_,_, infinity) -> + ok; +start_channel_request_timer(Channel, From, Time) -> + erlang:send_after(Time, self(), {timeout, {Channel, From}}). + +%%%---------------------------------------------------------------- +%%% Connection start and initalization helpers + +socket_control(Socket, Pid, Options) -> + {_, TransportCallback, _} = % For example {_,gen_tcp,_} + proplists:get_value(transport, Options, ?DefaultTransport), + case TransportCallback:controlling_process(Socket, Pid) of ok -> gen_statem:cast(Pid, socket_control); {error, Reason} -> @@ -1936,8 +1944,3 @@ handshake(Pid, Ref, Timeout) -> {error, timeout} end. -start_timeout(_,_, infinity) -> - ok; -start_timeout(Channel, From, Time) -> - erlang:send_after(Time, self(), {timeout, {Channel, From}}). - -- cgit v1.2.3