diff options
-rw-r--r-- | lib/observer/src/ttb.erl | 353 |
1 files changed, 262 insertions, 91 deletions
diff --git a/lib/observer/src/ttb.erl b/lib/observer/src/ttb.erl index d5f4b52b42..dfba0411a0 100644 --- a/lib/observer/src/ttb.erl +++ b/lib/observer/src/ttb.erl @@ -35,10 +35,12 @@ -include_lib("kernel/include/file.hrl"). -define(meta_time,5000). +-define(fetch_time, 10000). -define(history_table,ttb_history_table). -define(seq_trace_flags,[send,'receive',print,timestamp]). -define(upload_dir,"ttb_upload"). -define(last_config, "ttb_last_config"). +-define(partial_dir, "ttb_partial_result"). -ifdef(debug). -define(get_status,;get_status -> erlang:display(dict:to_list(NodeInfo),loop(NodeInfo, TraceInfo)). -else. @@ -49,21 +51,22 @@ %%% Shortcut start_trace(Nodes, Patterns, {Procs, Flags}, Options) -> {ok, _} = tracer(Nodes, Options), - {ok, _} = p(Procs, Flags), - [{ok, _} = apply(?MODULE, tpl, tuple_to_list(Args)) || Args <- Patterns]. - + [{ok, _} = apply(?MODULE, tpl, tuple_to_list(Args)) || Args <- Patterns], + {ok, _} = p(Procs, Flags). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Open a trace port on all given nodes and create the meta data file tracer() -> tracer(node()). +tracer(shell) -> tracer(node(), shell); +tracer(dbg) -> tracer(node(), {shell, only}); tracer(Nodes) -> tracer(Nodes,[]). tracer(Nodes,Opt) -> {PI,Client,Traci} = opt(Opt), %%We use initial Traci as SessionInfo for loop/2 - start(Traci), + Pid = start(Traci), store(tracer,[Nodes,Opt]), - do_tracer(Nodes,PI,Client,Traci). + do_tracer(Nodes,PI,Client,[{ttb_control, Pid}|Traci]). do_tracer(Nodes0,PI,Client,Traci) -> Nodes = nods(Nodes0), @@ -71,9 +74,14 @@ do_tracer(Nodes0,PI,Client,Traci) -> do_tracer(Clients,PI,Traci). do_tracer(Clients,PI,Traci) -> + ShellOutput = proplists:get_value(shell, Traci, false), {ClientSucc,Succ} = lists:foldl( fun({N,{local,File},TF},{CS,S}) -> + TF2 = case ShellOutput of + only -> none; + _ -> TF + end, [_Sname,Host] = string:tokens(atom_to_list(N),"@"), case catch dbg:tracer(N,port,dbg:trace_port(ip,0)) of {ok,N} -> @@ -81,8 +89,8 @@ do_tracer(Clients,PI,Traci) -> {ok,T} = dbg:get_tracer(N), rpc:call(N,seq_trace,set_system_tracer,[T]), dbg:trace_client(ip,{Host,Port}, - {fun ip_to_file/2,{file,File}}), - {[{N,{local,File,Port},TF}|CS], [N|S]}; + {fun ip_to_file/2,{{file,File}, ShellOutput}}), + {[{N,{local,File,Port},TF2}|CS], [N|S]}; Other -> display_warning(N,{cannot_open_ip_trace_port, Host, @@ -125,8 +133,34 @@ opt([{timer, {MSec, StopOpts}}|O],{PI,Client,Traci}) -> opt(O,{PI,Client,[{timer,{MSec, StopOpts}}|Traci]}); opt([{timer, MSec}|O],{PI,Client,Traci}) -> opt(O,{PI,Client,[{timer,{MSec, []}}|Traci]}); +opt([{overload_check, {MSec,M,F}}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{overload_check,{MSec,M,F}}|Traci]}); +opt([shell|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{shell, true}|Traci]}); +opt([{shell,Type}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{shell, Type}|Traci]}); +opt([resume|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{resume, {true, ?fetch_time}}|Traci]}); +opt([{resume,MSec}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{resume, {true, MSec}}|Traci]}); +opt([{flush,MSec}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{flush, MSec}|Traci]}); opt([],Opt) -> - Opt. + ensure_opt(Opt). + +ensure_opt({PI,Client,Traci}) -> + case {proplists:get_value(flush, Traci), Client} of + {undefined, _} -> ok; + {_, {local, _}} -> exit(flush_unsupported_with_ip_trace_port); + {_,_} -> ok + end, + NeedIpTracer = proplists:get_value(shell, Traci, false) /= false, + case {NeedIpTracer, Client} of + {false, _} -> {PI, Client, Traci}; + {true, ?MODULE} -> {PI, {local, ?MODULE}, Traci}; + {true, {local, File}} -> {PI, {local, File}, Traci}; + {true, _} -> exit(local_client_required_on_shell_tracing) + end. nods(all) -> Nodes1 = remove_active([node()|nodes()]), @@ -223,17 +257,29 @@ run_history([H|T]) -> ok -> run_history(T); {error,not_found} -> {error,{not_found,H}} end; + +run_history(all) -> + CurrentHist = ets:tab2list(?history_table), + ets:delete_all_objects(?history_table), + [run_printed(MFA,true) || {_, MFA} <- CurrentHist]; +run_history(all_silent) -> + CurrentHist = ets:tab2list(?history_table), + ets:delete_all_objects(?history_table), + [run_printed(MFA,false) || {_, MFA} <- CurrentHist]; run_history([]) -> ok; run_history(N) -> case catch ets:lookup(?history_table,N) of [{N,{M,F,A}}] -> - print_func(M,F,A), - R = apply(M,F,A), - print_result(R); + run_printed({M,F,A},true); _ -> {error, not_found} end. + +run_printed({M,F,A},Verbose) -> + Verbose andalso print_func(M,F,A), + R = apply(M,F,A), + Verbose andalso print_result(R). write_config(ConfigFile,all) -> write_config(ConfigFile,['_']); @@ -331,6 +377,7 @@ arg_list([A1|A],Acc) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Set trace flags on processes p(Procs0,Flags0) -> + ensure_no_overloaded_nodes(), store(p,[Procs0,Flags0]), no_store_p(Procs0,Flags0). no_store_p(Procs0,Flags0) -> @@ -384,22 +431,28 @@ proc({global,Name}) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Trace pattern tp(A,B) -> + ensure_no_overloaded_nodes(), store(tp,[A,ms(B)]), dbg:tp(A,ms(B)). tp(A,B,C) -> + ensure_no_overloaded_nodes(), store(tp,[A,B,ms(C)]), dbg:tp(A,B,ms(C)). tp(A,B,C,D) -> + ensure_no_overloaded_nodes(), store(tp,[A,B,C,ms(D)]), dbg:tp(A,B,C,ms(D)). tpl(A,B) -> + ensure_no_overloaded_nodes(), store(tpl,[A,ms(B)]), dbg:tpl(A,ms(B)). tpl(A,B,C) -> + ensure_no_overloaded_nodes(), store(tpl,[A,B,ms(C)]), dbg:tpl(A,B,ms(C)). tpl(A,B,C,D) -> + ensure_no_overloaded_nodes(), store(tpl,[A,B,C,ms(D)]), dbg:tpl(A,B,C,ms(D)). @@ -452,6 +505,14 @@ ms({codestr, FunStr}) -> ms(Other) -> Other. +ensure_no_overloaded_nodes() -> + ttb ! {get_overloaded, self()}, + Overloaded = receive O -> O after 300 -> [] end, + case Overloaded of + [] -> ok; + Overloaded -> exit({error, overload_protection_active, Overloaded}) + end. + -spec string2ms(string()) -> {ok, list()} | {error, fun_format}. string2ms(FunStr) -> case erl_scan:string(fix_dot(FunStr)) of @@ -537,12 +598,12 @@ stop_opts(Opts) -> case {FormatData, lists:member(return, Opts)} of {false, true} -> {fetch, FetchDir}; % if we specify return, the data should be fetched - {false, false} -> - case lists:member(fetch,Opts) of - true -> {fetch, FetchDir}; - false -> nofetch + {false, false} -> + case lists:member(nofetch,Opts) of + false -> {fetch, FetchDir}; + true -> nofetch end; - {FormatData, _} -> + {FormatData, _} -> {FormatData, FetchDir} end. @@ -558,6 +619,8 @@ ensure_fetch_dir(Dir) -> stop_return(R,Opts) -> case {lists:member(return,Opts),R} of {true,_} -> + %%Printout moved out of the ttb loop to avoid occasional deadlock + io:format("Stored logs in ~s~n",[element(2, R)]), R; {false,{stopped,_}} -> stopped; @@ -574,16 +637,19 @@ start(SessionInfo) -> undefined -> Parent = self(), Pid = spawn(fun() -> init(Parent, SessionInfo) end), - receive {started,Pid} -> ok end; + receive {started,Pid} -> ok end, + Pid; Pid when is_pid(Pid) -> - ok + Pid end. init(Parent, SessionInfo) -> register(?MODULE,self()), ets:new(?history_table,[ordered_set,named_table,public]), Parent ! {started,self()}, - loop(dict:new(), SessionInfo). + NewSessionInfo = [{partials, 0}, {dead_nodes, []} | SessionInfo], + try_send_flush_tick(NewSessionInfo), + loop(dict:new(), NewSessionInfo). loop(NodeInfo, SessionInfo) -> receive @@ -613,68 +679,118 @@ loop(NodeInfo, SessionInfo) -> NodeInfo), loop(NodeInfo, SessionInfo); {nodedown,Node} -> - loop(dict:erase(Node,NodeInfo), SessionInfo); + NewState = make_node_dead(Node, NodeInfo, SessionInfo), + loop(dict:erase(Node,NodeInfo), NewState); + {noderesumed,Node,Reporter} -> + {MetaFile, CurrentSuffix, NewState} = make_node_alive(Node, SessionInfo), + fetch_partial_result(Node, MetaFile, CurrentSuffix), + spawn(fun() -> resume_trace(Reporter) end), + loop(NodeInfo, NewState); {timeout, StopOpts} -> spawn(?MODULE, stop, [StopOpts]), loop(NodeInfo, SessionInfo); + {node_overloaded, Node} -> + io:format("Overload check activated on node: ~p.~n", [Node]), + {Overloaded, SI} = {proplists:get_value(overloaded, SessionInfo, []), + lists:keydelete(overloaded, 1, SessionInfo)}, + loop(NodeInfo, [{overloaded, [Node|Overloaded]} | SI]); + {get_overloaded, Pid} -> + Pid ! proplists:get_value(overloaded, SessionInfo, []), + loop(NodeInfo, SessionInfo); trace_started -> case proplists:get_value(timer, SessionInfo) of undefined -> ok; {MSec, StopOpts} -> erlang:send_after(MSec, self(), {timeout, StopOpts}) end, loop(NodeInfo, SessionInfo); + flush_timeout -> + [ dbg:flush_trace_port(Node) || Node <- dict:fetch_keys(NodeInfo) ], + try_send_flush_tick(SessionInfo), + loop(NodeInfo, SessionInfo); {stop,nofetch,Sender} -> - write_config(?last_config, all), - dict:fold( - fun(Node,{_,MetaPid},_) -> - rpc:call(Node,observer_backend,ttb_stop,[MetaPid]) - end, - ok, - NodeInfo), - dbg:stop_clear(), - ets:delete(?history_table), - Sender ! {?MODULE,stopped}; - {stop,{FetchOrFormat, UserDir} ,Sender} -> - write_config(?last_config, all), - Localhost = host(node()), - Dir = get_fetch_dir(UserDir), - file:make_dir(Dir), - %% The nodes are traversed twice here because - %% the meta tracing in observer_backend must be - %% stopped before dbg is stopped, and dbg must - %% be stopped before the trace logs are moved orelse - %% windows complains. - AllNodesAndMeta = - dict:fold( - fun(Node,{MetaFile,MetaPid},Nodes) -> - rpc:call(Node,observer_backend,ttb_stop,[MetaPid]), - [{Node,MetaFile}|Nodes] - end, - [], - NodeInfo), - dbg:stop_clear(), - AllNodes = - lists:map( - fun({Node,MetaFile}) -> - spawn(fun() -> fetch(Localhost,Dir,Node,MetaFile) end), - Node - end, - AllNodesAndMeta), - ets:delete(?history_table), - wait_for_fetch(AllNodes), - Absname = filename:absname(Dir), - io:format("Stored logs in ~s~n",[Absname]), - case FetchOrFormat of - fetch -> ok; - {format, Opts} -> format(Dir, Opts) - end, - Sender ! {?MODULE,{stopped,Absname}} - ?get_status + do_stop(nofetch, Sender, NodeInfo, SessionInfo); + {stop,FetchSpec,Sender} -> + case proplists:get_value(shell, SessionInfo, false) of + only -> do_stop(nofetch, Sender, NodeInfo, SessionInfo); + _ -> do_stop(FetchSpec, Sender, NodeInfo, SessionInfo) + end + end. + +do_stop(nofetch, Sender, NodeInfo, _) -> + write_config(?last_config, all), + dict:fold( + fun(Node,{_,MetaPid},_) -> + rpc:call(Node,observer_backend,ttb_stop,[MetaPid]) + end, + ok, + NodeInfo), + dbg:stop_clear(), + ets:delete(?history_table), + Sender ! {?MODULE, stopped}; + +do_stop({FetchOrFormat, UserDir}, Sender, NodeInfo, SessionInfo) -> + write_config(?last_config, all), + Localhost = host(node()), + Dir = get_fetch_dir(UserDir), + file:make_dir(Dir), + %% The nodes are traversed twice here because + %% the meta tracing in observer_backend must be + %% stopped before dbg is stopped, and dbg must + %% be stopped before the trace logs are moved orelse + %% windows complains. + AllNodesAndMeta = + dict:fold( + fun(Node,{MetaFile,MetaPid},Nodes) -> + rpc:call(Node,observer_backend,ttb_stop,[MetaPid]), + [{Node,MetaFile}|Nodes] + end, + [], + NodeInfo), + dbg:stop_clear(), + AllNodes = + lists:map( + fun({Node,MetaFile}) -> + spawn(fun() -> fetch_report(Localhost,Dir,Node,MetaFile) end), + Node + end, + AllNodesAndMeta), + ets:delete(?history_table), + wait_for_fetch(AllNodes), + copy_partials(Dir, proplists:get_value(partials, SessionInfo)), + Absname = filename:absname(Dir), + case FetchOrFormat of + fetch -> ok; + {format, Opts} -> format(Dir, Opts) + end, + Sender ! {?MODULE,{stopped,Absname}}. + +make_node_dead(Node, NodeInfo, SessionInfo) -> + {MetaFile,_} = dict:fetch(Node, NodeInfo), + NewDeadNodes = [{Node, MetaFile} | proplists:get_value(dead_nodes, SessionInfo)], + [{dead_nodes, NewDeadNodes} | lists:keydelete(dead_nodes, 1, SessionInfo)]. + +make_node_alive(Node, SessionInfo) -> + DeadNodes = proplists:get_value(dead_nodes, SessionInfo), + Partials = proplists:get_value(partials, SessionInfo), + {value, {_, MetaFile}, Dn2} = lists:keytake(Node, 1, DeadNodes), + SessionInfo2 = lists:keyreplace(dead_nodes, 1, SessionInfo, {dead_nodes, Dn2}), + {MetaFile, Partials + 1, lists:keyreplace(partials, 1, SessionInfo2, {partials, Partials + 1})}. + +try_send_flush_tick(State) -> + case proplists:get_value(flush, State) of + undefined -> + ok; + MSec -> + erlang:send_after(MSec, self(), flush_timeout) end. get_fetch_dir(undefined) -> ?upload_dir ++ ts(); get_fetch_dir(Dir) -> Dir. +resume_trace(Reporter) -> + ?MODULE:run_history(all_silent), + Reporter ! trace_resumed. + get_nodes() -> ?MODULE ! {get_nodes,self()}, receive {?MODULE,Nodes} -> Nodes end. @@ -684,17 +800,46 @@ ts() -> io_lib:format("-~4.4.0w~2.2.0w~2.2.0w-~2.2.0w~2.2.0w~2.2.0w", [Y,M,D,H,Min,S]). +copy_partials(_, 0) -> + ok; +copy_partials(Dir, Num) -> + PartialDir = ?partial_dir ++ integer_to_list(Num), + file:rename(PartialDir, filename:join(Dir,PartialDir)), + copy_partials(Dir, Num - 1). + +fetch_partial_result(Node,MetaFile,Current) -> + DirName = ?partial_dir ++ integer_to_list(Current), + case file:list_dir(DirName) of + {error, enoent} -> + ok; + {ok, Files} -> + [ file:delete(filename:join(DirName, File)) || File <- Files ], + file:del_dir(DirName) + end, + file:make_dir(DirName), + fetch(host(node()), DirName, Node, MetaFile). + +fetch_report(Localhost, Dir, Node, MetaFile) -> + fetch(Localhost,Dir,Node,MetaFile), + ?MODULE ! {fetch_complete,Node}. + fetch(Localhost,Dir,Node,MetaFile) -> case (host(Node) == Localhost) orelse is_local(MetaFile) of true -> % same host, just move the files Files = get_filenames(Node,MetaFile), lists:foreach( fun(File0) -> - %%Other nodes may still have different CWD - {ok, Cwd} = rpc:call(Node, file, get_cwd, []), - File1 = filename:join(Cwd, File0), - File = filename:join(Dir,filename:basename(File1)), - file:rename(File1,File) + case MetaFile of + {local, _, _} -> + File = filename:join(Dir,filename:basename(File0)), + file:rename(File0, File); + _ -> + %%Other nodes may still have different CWD + {ok, Cwd} = rpc:call(Node, file, get_cwd, []), + File1 = filename:join(Cwd, File0), + File = filename:join(Dir,filename:basename(File1)), + file:rename(File1,File) + end end, Files); false -> @@ -706,8 +851,7 @@ fetch(Localhost,Dir,Node,MetaFile) -> receive_files(Dir,Sock,undefined), ok = gen_tcp:close(LSock), ok = gen_tcp:close(Sock) - end, - ?MODULE ! {fetch_complete,Node}. + end. is_local({local, _, _}) -> true; @@ -759,9 +903,14 @@ wait_for_fetch(Nodes) -> %%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - write_info(Nodes,PI,Traci) -> - lists:foreach(fun({N,{local,C,_},F}) -> - MetaFile = F ++ ".ti", - file:delete(MetaFile), + lists:foreach(fun({N,{local,C,_},F}) -> + MetaFile = case F of + none -> + none; + F -> + file:delete(F ++ ".ti"), + F ++ ".ti" + end, Traci1 = [{node,N},{file,C}|Traci], {ok,Port} = dbg:get_tracer(N), ?MODULE ! @@ -788,15 +937,8 @@ format(File,Out,Handler,DisableSort) when is_list(File), is_integer(hd(File)) -> Files = case filelib:is_dir(File) of true -> % will merge all files in the directory - MetaFiles = filelib:wildcard(filename:join(File,"*.ti")), - lists:map(fun(M) -> - Sub = string:left(M,length(M)-3), - case filelib:is_file(Sub) of - true -> Sub; - false -> Sub++".*.wrp" - end - end, - MetaFiles); + List = filelib:wildcard(filename:join(File, ?partial_dir++"*")), + lists:append(collect_files([File | List])); false -> % format one file [File] end, @@ -819,6 +961,19 @@ format(Files,Out,Handler,DisableSort) when is_list(Files), is_list(hd(Files)) -> end, R. +collect_files(Dirs) -> + lists:map(fun(Dir) -> + MetaFiles = filelib:wildcard(filename:join(Dir,"*.ti")), + lists:map(fun(M) -> + Sub = string:left(M,length(M)-3), + case filelib:is_file(Sub) of + true -> Sub; + false -> Sub++".*.wrp" + end + end, + MetaFiles) + end, Dirs). + get_handler(undefined, Files) -> %%We retrieve traci from the first available file {Traci, _} = read_traci(hd(Files)), @@ -933,6 +1088,7 @@ check_exists(File) -> exit({error,no_file}) end. + do_format(Fd,Details,DisableSort,Handler) -> Clients = lists:foldl(fun({FileOrWrap,Traci},Acc) -> [start_client(FileOrWrap,Traci)|Acc] @@ -1085,19 +1241,34 @@ display_warning(Item,Warning) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Trace client which reads an IP port and puts data directly to a file. %%% This is used when tracing remote nodes with no file system. -ip_to_file(Trace,{file,File}) -> +ip_to_file({metadata,_,_},{_, only} = State) -> + State; +ip_to_file(Trace, {_, only} = State) -> + dbg:dhandler(Trace, standard_io), + State; +ip_to_file(Trace,{{file,File}, ShellOutput}) -> Fun = dbg:trace_port(file,File), %File can be a filename or a wrap spec Port = Fun(), - ip_to_file(Trace,Port); -ip_to_file({metadata,MetaFile,MetaData},Port) -> + case Trace of + {metadata, _, _} -> ok; + Trace -> show_trace(Trace, ShellOutput) + end, + ip_to_file(Trace,{Port,ShellOutput}); +ip_to_file({metadata,MetaFile,MetaData},State) -> {ok,MetaFd} = file:open(MetaFile,[write,raw,append]), file:write(MetaFd,MetaData), file:close(MetaFd), - Port; -ip_to_file(Trace,Port) -> + State; +ip_to_file(Trace,{Port, ShellOutput}) -> + show_trace(Trace, ShellOutput), B = term_to_binary(Trace), erlang:port_command(Port,B), - Port. + {Port, ShellOutput}. + +show_trace(Trace, true) -> + dbg:dhandler(Trace, standard_io); +show_trace(_,_) -> + ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% For debugging |