%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2003-2010. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %%% @doc Generic connection owner process. %%% %%% @type handle() = pid(). A handle for using a connection implemented %%% with ct_gen_conn.erl. -module(ct_gen_conn). -compile(export_all). -export([start/4, stop/1]). -export([call/2, call/3, do_within_time/2]). -ifdef(debug). -define(dbg,true). -else. -define(dbg,false). -endif. -record(gen_opts,{callback, name, address, init_data, conn_pid, cb_state, ct_util_server}). %%%----------------------------------------------------------------- %%% @spec start(Name,Address,InitData,CallbackMod) -> %%% {ok,Handle} | {error,Reason} %%% Name = term() %%% CallbackMod = atom() %%% InitData = term() %%% Address = term() %%% %%% @doc Open a connection and start the generic connection owner process. %%% %%%
The CallbackMod
is a specific callback module for
%%% each type of connection (e.g. telnet, ftp,...). It must export the
%%% function init/3
which takes the arguments
%%% Name
, Addresse
) and
%%% InitData
and returna
%%% {ok,ConnectionPid,State}
or
%%% {error,Reason}
.
Execute the given Fun
, but interrupt if it takes
%%% more than Timeout
milliseconds.
The execution is also interrupted if the connection is %%% closed.
do_within_time(Fun,Timeout) -> Self = self(), Silent = get(silent), TmpPid = spawn_link(fun() -> put(silent,Silent), R = Fun(), Self ! {self(),R} end), ConnPid = get(conn_pid), receive {TmpPid,Result} -> Result; {'EXIT',ConnPid,_Reason}=M -> unlink(TmpPid), exit(TmpPid,kill), self() ! M, {error,connection_closed} after Timeout -> exit(TmpPid,kill), receive {TmpPid,Result} -> %% TmpPid just managed to send the result at the same time %% as the timeout expired. receive {'EXIT',TmpPid,_reason} -> ok end, Result; {'EXIT',TmpPid,killed} -> %% TmpPid did not send the result before the timeout expired. {error,timeout} end end. %%%================================================================= %%% Internal functions call(Pid, Msg) -> call(Pid, Msg, infinity). call(Pid, Msg, Timeout) -> MRef = erlang:monitor(process,Pid), Ref = make_ref(), Pid ! {Msg,{self(),Ref}}, receive {Ref, Result} -> erlang:demonitor(MRef, [flush]), case Result of {retry,_Data} -> call(Pid,Result); Other -> Other end; {'DOWN',MRef,process,_,Reason} -> {error,{process_down,Pid,Reason}} after Timeout -> log("ct_gen_conn", "Connection process ~p not responding. Killing now!", [Pid]), exit(Pid, kill), {error,{process_down,Pid,forced_termination}} end. return({To,Ref},Result) -> To ! {Ref, Result}. init_gen(Parent,Opts) -> process_flag(trap_exit,true), CtUtilServer = whereis(ct_util_server), link(CtUtilServer), put(silent,false), case catch (Opts#gen_opts.callback):init(Opts#gen_opts.name, Opts#gen_opts.address, Opts#gen_opts.init_data) of {ok,ConnPid,State} when is_pid(ConnPid) -> link(ConnPid), put(conn_pid,ConnPid), Parent ! {connected,self()}, loop(Opts#gen_opts{conn_pid=ConnPid, cb_state=State, ct_util_server=CtUtilServer}); {error,Reason} -> Parent ! {{error,Reason},self()} end. loop(Opts) -> receive {'EXIT',Pid,Reason} when Pid==Opts#gen_opts.conn_pid -> log("Connection down!\nOpening new!","Reason: ~p\nAddress: ~p\n", [Reason,Opts#gen_opts.address]), case reconnect(Opts) of {ok, NewPid, NewState} -> link(NewPid), put(conn_pid,NewPid), loop(Opts#gen_opts{conn_pid=NewPid,cb_state=NewState}); Error -> ct_util:unregister_connection(self()), log("Reconnect failed. Giving up!","Reason: ~p\n",[Error]) end; {'EXIT',Pid,Reason} -> case Opts#gen_opts.ct_util_server of Pid -> exit(Reason); _ -> loop(Opts) end; {stop, From} -> ct_util:unregister_connection(self()), (Opts#gen_opts.callback):terminate(Opts#gen_opts.conn_pid, Opts#gen_opts.cb_state), return(From,ok), ok; {{retry,{Error,_Name,CPid,_Msg}}, From} when CPid == Opts#gen_opts.conn_pid -> %% only retry if failure is because of a reconnection Return = case Error of {error,_} -> Error; Reason -> {error,Reason} end, return(From, Return), loop(Opts); {{retry,{_Error,_Name,_CPid,Msg}}, From} -> log("Rerunning command","Connection reestablished. Rerunning command...",[]), {Return,NewState} = (Opts#gen_opts.callback):handle_msg(Msg,Opts#gen_opts.cb_state), return(From, Return), loop(Opts#gen_opts{cb_state=NewState}); {Msg,From={Pid,_Ref}} when is_pid(Pid) -> {Return,NewState} = (Opts#gen_opts.callback):handle_msg(Msg,Opts#gen_opts.cb_state), return(From, Return), loop(Opts#gen_opts{cb_state=NewState}) end. nozero({ok,S}) when is_list(S) -> {ok,[C || C <- S, C=/=0, C=/=13]}; nozero(M) -> M. reconnect(Opts) -> (Opts#gen_opts.callback):reconnect(Opts#gen_opts.address, Opts#gen_opts.cb_state). log(Func,Args) -> case get(silent) of true when not ?dbg-> ok; _ -> apply(ct_logs,Func,Args) end.