aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHans Nilsson <[email protected]>2016-04-26 15:52:51 +0200
committerHans Nilsson <[email protected]>2016-04-27 13:39:29 +0200
commitb7f81aa55ffa161be01929b8d156e40bf751de17 (patch)
treee8cdf85b7b81df5ae931d1df41ea1e2842b40f0c
parentbbf8fb6e42e730a4037485c3313e63733d8c100b (diff)
downloadotp-b7f81aa55ffa161be01929b8d156e40bf751de17.tar.gz
otp-b7f81aa55ffa161be01929b8d156e40bf751de17.tar.bz2
otp-b7f81aa55ffa161be01929b8d156e40bf751de17.zip
ssh: Channel request timer refactoring
-rw-r--r--lib/ssh/src/ssh_connection_handler.erl121
1 files changed, 62 insertions, 59 deletions
diff --git a/lib/ssh/src/ssh_connection_handler.erl b/lib/ssh/src/ssh_connection_handler.erl
index 6f9b2b3e22..e5229eb954 100644
--- a/lib/ssh/src/ssh_connection_handler.erl
+++ b/lib/ssh/src/ssh_connection_handler.erl
@@ -63,7 +63,7 @@
%%% Behaviour callbacks
-export([handle_event/4, terminate/3, format_status/2, code_change/4]).
-%%% Exports not intended to be used :)
+%%% Exports not intended to be used :). They are used for spawning and tests
-export([init_connection_handler/3, % proc_lib:spawn needs this
init_ssh_record/3, % Export of this internal function
% intended for low-level protocol test suites
@@ -99,6 +99,8 @@ stop(ConnectionHandler)->
%% Internal application API
%%====================================================================
+-define(DefaultTransport, {tcp, gen_tcp, tcp_closed} ).
+
%%--------------------------------------------------------------------
-spec start_connection(role(),
inet:socket(),
@@ -109,11 +111,8 @@ stop(ConnectionHandler)->
start_connection(client = Role, Socket, Options, Timeout) ->
try
{ok, Pid} = sshc_sup:start_child([Role, Socket, Options]),
- {_, Callback, _} =
- proplists:get_value(transport, Options, {tcp, gen_tcp, tcp_closed}),
- ok = socket_control(Socket, Pid, Callback),
- Ref = erlang:monitor(process, Pid),
- handshake(Pid, Ref, Timeout)
+ ok = socket_control(Socket, Pid, Options),
+ handshake(Pid, erlang:monitor(process,Pid), Timeout)
catch
exit:{noproc, _} ->
{error, ssh_not_started};
@@ -383,7 +382,7 @@ init_connection_handler(Role, Socket, Opts) ->
S0 = init_process_state(Role, Socket, Opts),
try
{Protocol, Callback, CloseTag} =
- proplists:get_value(transport, Opts, {tcp, gen_tcp, tcp_closed}),
+ proplists:get_value(transport, Opts, ?DefaultTransport),
S0#data{ssh_params = init_ssh_record(Role, Socket, Opts),
transport_protocol = Protocol,
transport_cb = Callback,
@@ -1070,13 +1069,13 @@ handle_event({call,_}, _, StateName, _) when StateName /= {connected,server},
handle_event({call,From}, {request, ChannelPid, ChannelId, Type, Data, Timeout}, {connected,_}, D0) ->
D = handle_request(ChannelPid, ChannelId, Type, Data, true, From, D0),
%% Note reply to channel will happen later when reply is recived from peer on the socket
- start_timeout(ChannelId, From, Timeout),
+ start_channel_request_timer(ChannelId, From, Timeout),
{keep_state, cache_request_idle_timer_check(D)};
handle_event({call,From}, {request, ChannelId, Type, Data, Timeout}, {connected,_}, D0) ->
D = handle_request(ChannelId, Type, Data, true, From, D0),
%% Note reply to channel will happen later when reply is recived from peer on the socket
- start_timeout(ChannelId, From, Timeout),
+ start_channel_request_timer(ChannelId, From, Timeout),
{keep_state, cache_request_idle_timer_check(D)};
handle_event({call,From}, {global_request, Pid, _, _, _} = Request, {connected,_}, D0) ->
@@ -1089,7 +1088,7 @@ handle_event({call,From}, {data, ChannelId, Type, Data, Timeout}, {connected,_},
{{replies, Replies}, Connection} =
ssh_connection:channel_data(ChannelId, Type, Data, D0#data.connection_state, From),
{Repls,D} = send_replies(Replies, D0#data{connection_state = Connection}),
- start_timeout(ChannelId, From, Timeout),
+ start_channel_request_timer(ChannelId, From, Timeout), % FIXME: No message exchange so why?
{keep_state, D, Repls};
handle_event({call,From}, {eof, ChannelId}, {connected,_}, D0) ->
@@ -1121,7 +1120,7 @@ handle_event({call,From},
send_buf = queue:new()
}),
D = add_request(true, ChannelId, From, D2),
- start_timeout(ChannelId, From, Timeout),
+ start_channel_request_timer(ChannelId, From, Timeout),
{keep_state, cache_cancel_idle_timer(D)};
handle_event({call,From}, {send_window, ChannelId}, {connected,_}, D) ->
@@ -1243,12 +1242,15 @@ handle_event(info, {CloseTag,Socket}, StateName,
StateName, D);
handle_event(info, {timeout, {_, From} = Request}, _,
- #data{connection_state = #connection{requests = Requests} = C0} = D) ->
+ #data{connection_state = #connection{requests = Requests} = C0} = D) ->
case lists:member(Request, Requests) of
true ->
+ %% A channel request is not answered in time. Answer {error,timeout}
+ %% to the caller
C = C0#connection{requests = lists:delete(Request, Requests)},
{keep_state, D#data{connection_state=C}, [{reply,From,{error,timeout}}]};
false ->
+ %% The request is answered - just ignore the timeout
keep_state_and_data
end;
@@ -1424,8 +1426,7 @@ start_the_connection_child(UserPid, Role, Socket, Options) ->
ConnectionSup = proplists:get_value(connection_sup, Sups),
Opts = [{supervisors, Sups}, {user_pid, UserPid} | proplists:get_value(ssh_opts, Options, [])],
{ok, Pid} = ssh_connection_sup:start_child(ConnectionSup, [Role, Socket, Opts]),
- {_, Callback, _} = proplists:get_value(transport, Options, {tcp, gen_tcp, tcp_closed}),
- socket_control(Socket, Pid, Callback),
+ ok = socket_control(Socket, Pid, Options),
Pid.
%%--------------------------------------------------------------------
@@ -1698,42 +1699,6 @@ counterpart_versions(NumVsn, StrVsn, #ssh{role = server} = Ssh) ->
counterpart_versions(NumVsn, StrVsn, #ssh{role = client} = Ssh) ->
Ssh#ssh{s_vsn = NumVsn , s_version = StrVsn}.
-connected_fun(User, Method, #data{ssh_params = #ssh{peer = {_,Peer}},
- opts = Opts}) ->
- case proplists:get_value(connectfun, Opts) of
- undefined ->
- ok;
- Fun ->
- catch Fun(User, Peer, Method)
- end.
-
-retry_fun(_, undefined, _) ->
- ok;
-retry_fun(User, Reason, #data{ssh_params = #ssh{opts = Opts,
- peer = {_,Peer}
- }}) ->
- {Tag,Info} =
- case Reason of
- {error, Error} ->
- {failfun, Error};
- _ ->
- {infofun, Reason}
- end,
- Fun = proplists:get_value(Tag, Opts, fun(_,_)-> ok end),
- try erlang:fun_info(Fun, arity)
- of
- {arity, 2} -> %% Backwards compatible
- catch Fun(User, Info);
- {arity, 3} ->
- catch Fun(User, Peer, Info);
- _ ->
- ok
- catch
- _:_ ->
- ok
- end.
-
-
ssh_info([], _State, Acc) ->
Acc;
ssh_info([client_version | Rest], #data{ssh_params = #ssh{c_vsn = IntVsn,
@@ -1812,8 +1777,6 @@ get_repl(X, Acc) ->
%%%----------------------------------------------------------------
disconnect_fun({disconnect,Msg}, D) ->
disconnect_fun(Msg, D);
-%% disconnect_fun(_, undefined) ->
-%% ok;
disconnect_fun(Reason, #data{opts=Opts}) ->
case proplists:get_value(disconnectfun, Opts) of
undefined ->
@@ -1845,6 +1808,41 @@ debug_fun(#ssh_msg_debug{always_display = Display,
end.
+connected_fun(User, Method, #data{ssh_params = #ssh{peer = {_,Peer}},
+ opts = Opts}) ->
+ case proplists:get_value(connectfun, Opts) of
+ undefined ->
+ ok;
+ Fun ->
+ catch Fun(User, Peer, Method)
+ end.
+
+retry_fun(_, undefined, _) ->
+ ok;
+retry_fun(User, Reason, #data{ssh_params = #ssh{opts = Opts,
+ peer = {_,Peer}
+ }}) ->
+ {Tag,Info} =
+ case Reason of
+ {error, Error} ->
+ {failfun, Error};
+ _ ->
+ {infofun, Reason}
+ end,
+ Fun = proplists:get_value(Tag, Opts, fun(_,_)-> ok end),
+ try erlang:fun_info(Fun, arity)
+ of
+ {arity, 2} -> %% Backwards compatible
+ catch Fun(User, Info);
+ {arity, 3} ->
+ catch Fun(User, Peer, Info);
+ _ ->
+ ok
+ catch
+ _:_ ->
+ ok
+ end.
+
%%%----------------------------------------------------------------
%%% Cache idle timer that closes the connection if there are no
%%% channels open for a while.
@@ -1904,8 +1902,18 @@ cache_request_idle_timer_check(D = #data{idle_timer_value = IdleTime}) ->
D.
%%%----------------------------------------------------------------
-socket_control(Socket, Pid, Transport) ->
- case Transport:controlling_process(Socket, Pid) of
+start_channel_request_timer(_,_, infinity) ->
+ ok;
+start_channel_request_timer(Channel, From, Time) ->
+ erlang:send_after(Time, self(), {timeout, {Channel, From}}).
+
+%%%----------------------------------------------------------------
+%%% Connection start and initalization helpers
+
+socket_control(Socket, Pid, Options) ->
+ {_, TransportCallback, _} = % For example {_,gen_tcp,_}
+ proplists:get_value(transport, Options, ?DefaultTransport),
+ case TransportCallback:controlling_process(Socket, Pid) of
ok ->
gen_statem:cast(Pid, socket_control);
{error, Reason} ->
@@ -1936,8 +1944,3 @@ handshake(Pid, Ref, Timeout) ->
{error, timeout}
end.
-start_timeout(_,_, infinity) ->
- ok;
-start_timeout(Channel, From, Time) ->
- erlang:send_after(Time, self(), {timeout, {Channel, From}}).
-