diff options
Diffstat (limited to 'lib/observer/src')
-rw-r--r-- | lib/observer/src/ttb.erl | 711 |
1 files changed, 504 insertions, 207 deletions
diff --git a/lib/observer/src/ttb.erl b/lib/observer/src/ttb.erl index 221b71df6a..072aa165e7 100644 --- a/lib/observer/src/ttb.erl +++ b/lib/observer/src/ttb.erl @@ -18,9 +18,11 @@ %% -module(ttb). -author('[email protected]'). +-author('[email protected]'). %% API --export([tracer/0,tracer/1,tracer/2,p/2,stop/0,stop/1]). +-export([tracer/0,tracer/1,tracer/2,p/2,stop/0,stop/1,start_trace/4]). +-export([get_et_handler/0]). -export([tp/2, tp/3, tp/4, ctp/0, ctp/1, ctp/2, ctp/3, tpl/2, tpl/3, tpl/4, ctpl/0, ctpl/1, ctpl/2, ctpl/3, ctpg/0, ctpg/1, ctpg/2, ctpg/3]). -export([seq_trigger_ms/0,seq_trigger_ms/1]). @@ -34,24 +36,38 @@ -include_lib("kernel/include/file.hrl"). -define(meta_time,5000). +-define(fetch_time, 10000). -define(history_table,ttb_history_table). -define(seq_trace_flags,[send,'receive',print,timestamp]). --define(upload_dir,"ttb_upload"). +-define(upload_dir(Logname),"ttb_upload_"++Logname). +-define(last_config, "ttb_last_config"). +-define(partial_dir, "ttb_partial_result"). -ifdef(debug). --define(get_status,;get_status -> erlang:display(dict:to_list(NodeInfo)),loop(NodeInfo)). +-define(get_status,;get_status -> erlang:display(dict:to_list(NodeInfo),loop(NodeInfo, TraceInfo)). -else. -define(get_status,). -endif. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Shortcut +start_trace(Nodes, Patterns, {Procs, Flags}, Options) -> + {ok, _} = tracer(Nodes, Options), + [{ok, _} = apply(?MODULE, tpl, tuple_to_list(Args)) || Args <- Patterns], + {ok, _} = p(Procs, Flags). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Open a trace port on all given nodes and create the meta data file tracer() -> tracer(node()). +tracer(shell) -> tracer(node(), shell); +tracer(dbg) -> tracer(node(), {shell, only}); tracer(Nodes) -> tracer(Nodes,[]). tracer(Nodes,Opt) -> - start(), - store(tracer,[Nodes,Opt]), {PI,Client,Traci} = opt(Opt), - do_tracer(Nodes,PI,Client,Traci). + %%We use initial Traci as SessionInfo for loop/2 + Pid = start(Traci), + store(tracer,[Nodes,Opt]), + do_tracer(Nodes,PI,Client,[{ttb_control, Pid}|Traci]). do_tracer(Nodes0,PI,Client,Traci) -> Nodes = nods(Nodes0), @@ -59,9 +75,14 @@ do_tracer(Nodes0,PI,Client,Traci) -> do_tracer(Clients,PI,Traci). do_tracer(Clients,PI,Traci) -> + ShellOutput = proplists:get_value(shell, Traci, false), {ClientSucc,Succ} = lists:foldl( fun({N,{local,File},TF},{CS,S}) -> + TF2 = case ShellOutput of + only -> none; + _ -> TF + end, [_Sname,Host] = string:tokens(atom_to_list(N),"@"), case catch dbg:tracer(N,port,dbg:trace_port(ip,0)) of {ok,N} -> @@ -69,8 +90,8 @@ do_tracer(Clients,PI,Traci) -> {ok,T} = dbg:get_tracer(N), rpc:call(N,seq_trace,set_system_tracer,[T]), dbg:trace_client(ip,{Host,Port}, - {fun ip_to_file/2,{file,File}}), - {[{N,{local,File,Port},TF}|CS], [N|S]}; + {fun ip_to_file/2,{{file,File}, ShellOutput}}), + {[{N,{local,File,Port},TF2}|CS], [N|S]}; Other -> display_warning(N,{cannot_open_ip_trace_port, Host, @@ -98,17 +119,54 @@ do_tracer(Clients,PI,Traci) -> {ok,Succ} end. +opt(Opt) when is_list(Opt) -> + opt(Opt,{true,?MODULE,[]}); opt(Opt) -> - opt(Opt,{true,?MODULE,[]}). + opt([Opt]). opt([{process_info,PI}|O],{_,Client,Traci}) -> opt(O,{PI,Client,Traci}); opt([{file,Client}|O],{PI,_,Traci}) -> - opt(O,{PI,Client,Traci}); + opt(O,{PI,Client,[{logfile,get_logname(Client)}|Traci]}); opt([{handler,Handler}|O],{PI,Client,Traci}) -> opt(O,{PI,Client,[{handler,Handler}|Traci]}); +opt([{timer, {MSec, StopOpts}}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{timer,{MSec, StopOpts}}|Traci]}); +opt([{timer, MSec}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{timer,{MSec, []}}|Traci]}); +opt([{overload_check, {MSec,M,F}}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{overload_check,{MSec,M,F}}|Traci]}); +opt([shell|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{shell, true}|Traci]}); +opt([{shell,Type}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{shell, Type}|Traci]}); +opt([resume|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{resume, {true, ?fetch_time}}|Traci]}); +opt([{resume,MSec}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{resume, {true, MSec}}|Traci]}); +opt([{flush,MSec}|O],{PI,Client,Traci}) -> + opt(O,{PI,Client,[{flush, MSec}|Traci]}); opt([],Opt) -> - Opt. + ensure_opt(Opt). + +ensure_opt({PI,Client,Traci}) -> + case {proplists:get_value(flush, Traci), Client} of + {undefined, _} -> ok; + {_, {local, _}} -> exit(flush_unsupported_with_ip_trace_port); + {_,_} -> ok + end, + NeedIpTracer = proplists:get_value(shell, Traci, false) /= false, + case {NeedIpTracer, Client} of + {false, _} -> {PI, Client, Traci}; + {true, ?MODULE} -> {PI, {local, ?MODULE}, Traci}; + {true, {local, File}} -> {PI, {local, File}, Traci}; + {true, _} -> exit(local_client_required_on_shell_tracing) + end. + +get_logname({local, F}) -> get_logname(F); +get_logname({wrap, F}) -> filename:basename(F); +get_logname({wrap, F, _, _}) -> filename:basename(F); +get_logname(F) -> filename:basename(F). nods(all) -> Nodes1 = remove_active([node()|nodes()]), @@ -205,17 +263,29 @@ run_history([H|T]) -> ok -> run_history(T); {error,not_found} -> {error,{not_found,H}} end; + +run_history(all) -> + CurrentHist = ets:tab2list(?history_table), + ets:delete_all_objects(?history_table), + [run_printed(MFA,true) || {_, MFA} <- CurrentHist]; +run_history(all_silent) -> + CurrentHist = ets:tab2list(?history_table), + ets:delete_all_objects(?history_table), + [run_printed(MFA,false) || {_, MFA} <- CurrentHist]; run_history([]) -> ok; run_history(N) -> case catch ets:lookup(?history_table,N) of [{N,{M,F,A}}] -> - print_func(M,F,A), - R = apply(M,F,A), - print_result(R); + run_printed({M,F,A},true); _ -> {error, not_found} end. + +run_printed({M,F,A},Verbose) -> + Verbose andalso print_func(M,F,A), + R = apply(M,F,A), + Verbose andalso print_result(R). write_config(ConfigFile,all) -> write_config(ConfigFile,['_']); @@ -223,6 +293,8 @@ write_config(ConfigFile,Config) -> write_config(ConfigFile,Config,[]). write_config(ConfigFile,all,Opt) -> write_config(ConfigFile,['_'],Opt); +write_config(ConfigFile,Config,Opt) when not(is_list(Opt)) -> + write_config(ConfigFile,Config,[Opt]); write_config(ConfigFile,Nums,Opt) when is_list(Nums), is_integer(hd(Nums)); Nums=:=['_'] -> F = fun(N) -> ets:select(?history_table, @@ -313,6 +385,7 @@ arg_list([A1|A],Acc) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Set trace flags on processes p(Procs0,Flags0) -> + ensure_no_overloaded_nodes(), store(p,[Procs0,Flags0]), no_store_p(Procs0,Flags0). no_store_p(Procs0,Flags0) -> @@ -327,11 +400,12 @@ no_store_p(Procs0,Flags0) -> {error,Reason} -> display_warning(P,Reason), {PMatched,Ps} - end + end end,{[],[]},Procs) of {[],[]} -> {error, no_match}; {SuccMatched,Succ} -> no_store_write_trace_info(flags,{Succ,Flags}), + ?MODULE ! trace_started, {ok,SuccMatched} end end. @@ -339,7 +413,7 @@ no_store_p(Procs0,Flags0) -> transform_flags([clear]) -> [clear]; transform_flags(Flags) -> - dbg:transform_flags(Flags). + dbg:transform_flags([timestamp | Flags]). procs(Procs) when is_list(Procs) -> @@ -365,24 +439,30 @@ proc({global,Name}) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Trace pattern tp(A,B) -> - store(tp,[A,B]), - dbg:tp(A,B). + ensure_no_overloaded_nodes(), + store(tp,[A,ms(B)]), + dbg:tp(A,ms(B)). tp(A,B,C) -> - store(tp,[A,B,C]), - dbg:tp(A,B,C). + ensure_no_overloaded_nodes(), + store(tp,[A,B,ms(C)]), + dbg:tp(A,B,ms(C)). tp(A,B,C,D) -> - store(tp,[A,B,C,D]), - dbg:tp(A,B,C,D). + ensure_no_overloaded_nodes(), + store(tp,[A,B,C,ms(D)]), + dbg:tp(A,B,C,ms(D)). tpl(A,B) -> - store(tpl,[A,B]), - dbg:tpl(A,B). + ensure_no_overloaded_nodes(), + store(tpl,[A,ms(B)]), + dbg:tpl(A,ms(B)). tpl(A,B,C) -> - store(tpl,[A,B,C]), - dbg:tpl(A,B,C). + ensure_no_overloaded_nodes(), + store(tpl,[A,B,ms(C)]), + dbg:tpl(A,B,ms(C)). tpl(A,B,C,D) -> - store(tpl,[A,B,C,D]), - dbg:tpl(A,B,C,D). + ensure_no_overloaded_nodes(), + store(tpl,[A,B,C,ms(D)]), + dbg:tpl(A,B,C,ms(D)). ctp() -> store(ctp,[]), @@ -423,6 +503,56 @@ ctpg(A,B,C) -> store(ctpg,[A,B,C]), dbg:ctpg(A,B,C). +ms(return) -> + [{'_',[],[{return_trace}]}]; +ms(caller) -> + [{'_',[],[{message,{caller}}]}]; +ms({codestr, FunStr}) -> + {ok, MS} = string2ms(FunStr), + MS; +ms(Other) -> + Other. + +ensure_no_overloaded_nodes() -> + Overloaded = case whereis(?MODULE) of + undefined -> + []; + _ -> + ?MODULE ! {get_overloaded, self()}, + receive O -> O end + end, + case Overloaded of + [] -> ok; + Overloaded -> exit({error, overload_protection_active, Overloaded}) + end. + +-spec string2ms(string()) -> {ok, list()} | {error, fun_format}. +string2ms(FunStr) -> + case erl_scan:string(fix_dot(FunStr)) of + {ok, Tokens, _} -> + case erl_parse:parse_exprs(Tokens) of + {ok, [Expression]} -> + case Expression of + {_, _, {clauses, Clauses}} -> + {ok, ms_transform:transform_from_shell(dbg, Clauses, [])}; + _ -> + {error, fun_format} + end; + _ -> + {error, fun_format} + end; + _ ->{error, fun_format} + end. + +-spec fix_dot(string()) -> string(). +fix_dot(FunStr) -> + [H | Rest] = lists:reverse(FunStr), + case H of + $. -> + FunStr; + H -> + lists:reverse([$., H | Rest]) + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Support for sequential trace @@ -457,66 +587,109 @@ no_store_write_trace_info(Key,What) -> %%% Stop tracing on all nodes stop() -> stop([]). -stop(Opts) -> +stop(Opts) when is_list(Opts) -> Fetch = stop_opts(Opts), - case whereis(?MODULE) of - undefined -> ok; - Pid when is_pid(Pid) -> - ?MODULE ! {stop,Fetch,self()}, - receive {?MODULE,stopped} -> ok end + Result = + case whereis(?MODULE) of + undefined -> ok; + Pid when is_pid(Pid) -> + ?MODULE ! {stop,Fetch,self()}, + receive {?MODULE,R} -> R end + end, + case {Fetch, Result} of + {nofetch, _} -> + ok; + {_, {stopped, _}} -> + %% Printout moved out of the ttb loop to avoid occasional deadlock + io:format("Stored logs in ~s~n", [element(2, Result)]); + {_, _} -> + ok end, - stopped. + stop_return(Result,Opts); +stop(Opts) -> + stop([Opts]). stop_opts(Opts) -> - case lists:member(format,Opts) of - true -> - format; % format implies fetch - false -> - case lists:member(fetch,Opts) of - true -> fetch; - false -> nofetch - end + FetchDir = proplists:get_value(fetch_dir, Opts), + ensure_fetch_dir(FetchDir), + FormatData = case proplists:get_value(format, Opts) of + undefined -> false; + true -> {format, []}; + FOpts -> {format, FOpts} + end, + case {FormatData, lists:member(return_fetch_dir, Opts)} of + {false, true} -> + {fetch, FetchDir}; % if we specify return_fetch_dir, the data should be fetched + {false, false} -> + case lists:member(nofetch,Opts) of + false -> {fetch, FetchDir}; + true -> nofetch + end; + {FormatData, _} -> + {FormatData, FetchDir} + end. + +ensure_fetch_dir(undefined) -> ok; +ensure_fetch_dir(Dir) -> + case filelib:is_file(Dir) of + true -> + throw({error, exists, Dir}); + false -> + ok + end. + +stop_return(R,Opts) -> + case {lists:member(return_fetch_dir,Opts),R} of + {true,_} -> + R; + {false,{stopped,_}} -> + stopped; + {false,_} -> + %% Anything other than 'stopped' would not be bw compatible... + stopped end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Process implementation -start() -> +start(SessionInfo) -> case whereis(?MODULE) of undefined -> Parent = self(), - Pid = spawn(fun() -> init(Parent) end), - receive {started,Pid} -> ok end; + Pid = spawn(fun() -> init(Parent, SessionInfo) end), + receive {started,Pid} -> ok end, + Pid; Pid when is_pid(Pid) -> - ok + Pid end. - -init(Parent) -> +init(Parent, SessionInfo) -> register(?MODULE,self()), ets:new(?history_table,[ordered_set,named_table,public]), Parent ! {started,self()}, - loop(dict:new()). + NewSessionInfo = [{partials, 0}, {dead_nodes, []} | SessionInfo], + try_send_flush_tick(NewSessionInfo), + loop(dict:new(), NewSessionInfo). -loop(NodeInfo) -> +loop(NodeInfo, SessionInfo) -> receive {init_node,Node,MetaFile,PI,Traci} -> erlang:monitor_node(Node,true), - MetaPid = + {AbsoluteMetaFile, MetaPid} = case rpc:call(Node, observer_backend, ttb_init_node, [MetaFile,PI,Traci]) of - {ok,MP} -> - MP; + {ok,MF,MP} -> + {MF,MP}; {badrpc,nodedown} -> %% We will get a nodedown message - undefined + {MetaFile,undefined} end, - loop(dict:store(Node,{MetaFile,MetaPid},NodeInfo)); + loop(dict:store(Node,{AbsoluteMetaFile,MetaPid},NodeInfo), SessionInfo); {get_nodes,Sender} -> Sender ! {?MODULE,dict:fetch_keys(NodeInfo)}, - loop(NodeInfo); + loop(NodeInfo, SessionInfo); {write_trace_info,Key,What} -> dict:fold(fun(Node,{_MetaFile,MetaPid},_) -> rpc:call(Node,observer_backend, @@ -524,55 +697,121 @@ loop(NodeInfo) -> end, ok, NodeInfo), - loop(NodeInfo); + loop(NodeInfo, SessionInfo); {nodedown,Node} -> - loop(dict:erase(Node,NodeInfo)); - {stop,nofetch,Sender} -> - dict:fold( - fun(Node,{_,MetaPid},_) -> - rpc:call(Node,observer_backend,ttb_stop,[MetaPid]) - end, - ok, - NodeInfo), - dbg:stop_clear(), - ets:delete(?history_table), - Sender ! {?MODULE,stopped}; - {stop,FetchOrFormat,Sender} -> - Localhost = host(node()), - Dir = ?upload_dir++ts(), - file:make_dir(Dir), - %% The nodes are traversed twice here because - %% the meta tracing in observer_backend must be - %% stopped before dbg is stopped, and dbg must - %% be stopped before the trace logs are moved orelse - %% windows complains. - AllNodesAndMeta = - dict:fold( - fun(Node,{MetaFile,MetaPid},Nodes) -> - rpc:call(Node,observer_backend,ttb_stop,[MetaPid]), - [{Node,MetaFile}|Nodes] - end, - [], - NodeInfo), - dbg:stop_clear(), - AllNodes = - lists:map( - fun({Node,MetaFile}) -> - spawn(fun() -> fetch(Localhost,Dir,Node,MetaFile) end), - Node - end, - AllNodesAndMeta), - ets:delete(?history_table), - wait_for_fetch(AllNodes), - io:format("Stored logs in ~s~n",[filename:absname(Dir)]), - case FetchOrFormat of - format -> format(Dir); - fetch -> ok + NewState = make_node_dead(Node, NodeInfo, SessionInfo), + loop(dict:erase(Node,NodeInfo), NewState); + {noderesumed,Node,Reporter} -> + {MetaFile, CurrentSuffix, NewState} = make_node_alive(Node, SessionInfo), + fetch_partial_result(Node, MetaFile, CurrentSuffix), + spawn(fun() -> resume_trace(Reporter) end), + loop(NodeInfo, NewState); + {timeout, StopOpts} -> + spawn(?MODULE, stop, [StopOpts]), + loop(NodeInfo, SessionInfo); + {node_overloaded, Node} -> + io:format("Overload check activated on node: ~p.~n", [Node]), + {Overloaded, SI} = {proplists:get_value(overloaded, SessionInfo, []), + lists:keydelete(overloaded, 1, SessionInfo)}, + loop(NodeInfo, [{overloaded, [Node|Overloaded]} | SI]); + {get_overloaded, Pid} -> + Pid ! proplists:get_value(overloaded, SessionInfo, []), + loop(NodeInfo, SessionInfo); + trace_started -> + case proplists:get_value(timer, SessionInfo) of + undefined -> ok; + {MSec, StopOpts} -> erlang:send_after(MSec, self(), {timeout, StopOpts}) end, - Sender ! {?MODULE,stopped} - ?get_status + loop(NodeInfo, SessionInfo); + flush_timeout -> + [ dbg:flush_trace_port(Node) || Node <- dict:fetch_keys(NodeInfo) ], + try_send_flush_tick(SessionInfo), + loop(NodeInfo, SessionInfo); + {stop,nofetch,Sender} -> + do_stop(nofetch, Sender, NodeInfo, SessionInfo); + {stop,FetchSpec,Sender} -> + case proplists:get_value(shell, SessionInfo, false) of + only -> do_stop(nofetch, Sender, NodeInfo, SessionInfo); + _ -> do_stop(FetchSpec, Sender, NodeInfo, SessionInfo) + end + end. + +do_stop(nofetch, Sender, NodeInfo, _) -> + write_config(?last_config, all), + dict:fold( + fun(Node,{_,MetaPid},_) -> + rpc:call(Node,observer_backend,ttb_stop,[MetaPid]) + end, + ok, + NodeInfo), + dbg:stop_clear(), + ets:delete(?history_table), + Sender ! {?MODULE, stopped}; + +do_stop({FetchOrFormat, UserDir}, Sender, NodeInfo, SessionInfo) -> + write_config(?last_config, all), + Localhost = host(node()), + Dir = get_fetch_dir(UserDir, proplists:get_value(logfile, SessionInfo)), + file:make_dir(Dir), + %% The nodes are traversed twice here because + %% the meta tracing in observer_backend must be + %% stopped before dbg is stopped, and dbg must + %% be stopped before the trace logs are moved orelse + %% windows complains. + AllNodesAndMeta = + dict:fold( + fun(Node,{MetaFile,MetaPid},Nodes) -> + rpc:call(Node,observer_backend,ttb_stop,[MetaPid]), + [{Node,MetaFile}|Nodes] + end, + [], + NodeInfo), + dbg:stop_clear(), + AllNodes = + lists:map( + fun({Node,MetaFile}) -> + spawn(fun() -> fetch_report(Localhost,Dir,Node,MetaFile) end), + Node + end, + AllNodesAndMeta), + ets:delete(?history_table), + wait_for_fetch(AllNodes), + copy_partials(Dir, proplists:get_value(partials, SessionInfo)), + Absname = filename:absname(Dir), + case FetchOrFormat of + fetch -> ok; + {format, Opts} -> format(Dir, Opts) + end, + Sender ! {?MODULE,{stopped,Absname}}. + +make_node_dead(Node, NodeInfo, SessionInfo) -> + {MetaFile,_} = dict:fetch(Node, NodeInfo), + NewDeadNodes = [{Node, MetaFile} | proplists:get_value(dead_nodes, SessionInfo)], + [{dead_nodes, NewDeadNodes} | lists:keydelete(dead_nodes, 1, SessionInfo)]. + +make_node_alive(Node, SessionInfo) -> + DeadNodes = proplists:get_value(dead_nodes, SessionInfo), + Partials = proplists:get_value(partials, SessionInfo), + {value, {_, MetaFile}, Dn2} = lists:keytake(Node, 1, DeadNodes), + SessionInfo2 = lists:keyreplace(dead_nodes, 1, SessionInfo, {dead_nodes, Dn2}), + {MetaFile, Partials + 1, lists:keyreplace(partials, 1, SessionInfo2, {partials, Partials + 1})}. + +try_send_flush_tick(State) -> + case proplists:get_value(flush, State) of + undefined -> + ok; + MSec -> + erlang:send_after(MSec, self(), flush_timeout) end. +get_fetch_dir(undefined,undefined) -> ?upload_dir(?MODULE_STRING) ++ ts(); +get_fetch_dir(undefined,Logname) -> ?upload_dir(Logname) ++ ts(); +get_fetch_dir(Dir,_) -> Dir. + +resume_trace(Reporter) -> + ?MODULE:run_history(all_silent), + Reporter ! trace_resumed. + get_nodes() -> ?MODULE ! {get_nodes,self()}, receive {?MODULE,Nodes} -> Nodes end. @@ -582,19 +821,40 @@ ts() -> io_lib:format("-~4.4.0w~2.2.0w~2.2.0w-~2.2.0w~2.2.0w~2.2.0w", [Y,M,D,H,Min,S]). +copy_partials(_, 0) -> + ok; +copy_partials(Dir, Num) -> + PartialDir = ?partial_dir ++ integer_to_list(Num), + file:rename(PartialDir, filename:join(Dir,PartialDir)), + copy_partials(Dir, Num - 1). + +fetch_partial_result(Node,MetaFile,Current) -> + DirName = ?partial_dir ++ integer_to_list(Current), + case file:list_dir(DirName) of + {error, enoent} -> + ok; + {ok, Files} -> + [ file:delete(filename:join(DirName, File)) || File <- Files ], + file:del_dir(DirName) + end, + file:make_dir(DirName), + fetch(host(node()), DirName, Node, MetaFile). +fetch_report(Localhost, Dir, Node, MetaFile) -> + fetch(Localhost,Dir,Node,MetaFile), + ?MODULE ! {fetch_complete,Node}. fetch(Localhost,Dir,Node,MetaFile) -> - case host(Node) of - Localhost -> % same host, just move the files - Files = rpc:call(Node,observer_backend,ttb_get_filenames,[MetaFile]), + case (host(Node) == Localhost) orelse is_local(MetaFile) of + true -> % same host, just move the files + Files = get_filenames(Node,MetaFile), lists:foreach( - fun(File0) -> - File = filename:join(Dir,filename:basename(File0)), - file:rename(File0,File) - end, - Files); - _Otherhost -> + fun(File0) -> + Dest = filename:join(Dir,filename:basename(File0)), + file:rename(File0, Dest) + end, + Files); + false -> {ok, LSock} = gen_tcp:listen(0, [binary,{packet,2},{active,false}]), {ok,Port} = inet:port(LSock), rpc:cast(Node,observer_backend,ttb_fetch, @@ -603,8 +863,17 @@ fetch(Localhost,Dir,Node,MetaFile) -> receive_files(Dir,Sock,undefined), ok = gen_tcp:close(LSock), ok = gen_tcp:close(Sock) - end, - ?MODULE ! {fetch_complete,Node}. + end. + +is_local({local, _, _}) -> + true; +is_local(_) -> + false. + +get_filenames(_N, {local,F,_}) -> + observer_backend:ttb_get_filenames(F); +get_filenames(N, F) -> + rpc:call(N, observer_backend,ttb_get_filenames,[F]). receive_files(Dir,Sock,Fd) -> case gen_tcp:recv(Sock, 0) of @@ -646,9 +915,16 @@ wait_for_fetch(Nodes) -> %%% - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - write_info(Nodes,PI,Traci) -> - lists:foreach(fun({N,{local,C,_},F}) -> - MetaFile = F ++ ".ti", - file:delete(MetaFile), + {ok, Cwd} = file:get_cwd(), + lists:foreach(fun({N,{local,C,_},F}) -> + MetaFile = case F of + none -> + none; + F -> + AbsFile = filename:join(Cwd, F) ++ ".ti", + file:delete(AbsFile), + AbsFile + end, Traci1 = [{node,N},{file,C}|Traci], {ok,Port} = dbg:get_tracer(N), ?MODULE ! @@ -662,38 +938,35 @@ write_info(Nodes,PI,Traci) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Format binary trace logs +get_et_handler() -> + {fun ttb_et:handler/4, initial}. + format(Files) -> format(Files,[]). format(Files,Opt) -> - {Out,Handler} = format_opt(Opt), + {Out,Handler,DisableSort} = format_opt(Opt), ets:new(?MODULE,[named_table]), - format(Files,Out,Handler). -format(File,Out,Handler) when is_list(File), is_integer(hd(File)) -> + format(Files,Out,Handler, DisableSort). +format(File,Out,Handler,DisableSort) when is_list(File), is_integer(hd(File)) -> Files = case filelib:is_dir(File) of true -> % will merge all files in the directory - MetaFiles = filelib:wildcard(filename:join(File,"*.ti")), - lists:map(fun(M) -> - Sub = string:left(M,length(M)-3), - case filelib:is_file(Sub) of - true -> Sub; - false -> Sub++".*.wrp" - end - end, - MetaFiles); + List = filelib:wildcard(filename:join(File, ?partial_dir++"*")), + lists:append(collect_files([File | List])); false -> % format one file [File] end, - format(Files,Out,Handler); -format(Files,Out,Handler) when is_list(Files), is_list(hd(Files)) -> + format(Files,Out,Handler,DisableSort); +format(Files,Out,Handler,DisableSort) when is_list(Files), is_list(hd(Files)) -> StopDbg = case whereis(dbg) of undefined -> true; _ -> false end, - Details = lists:foldl(fun(File,Acc) -> [prepare(File,Handler)|Acc] end, + Details = lists:foldl(fun(File,Acc) -> [prepare(File)|Acc] end, [],Files), Fd = get_fd(Out), - R = do_format(Fd,Details), + RealHandler = get_handler(Handler, Files), + R = do_format(Fd,Details,DisableSort,RealHandler), file:close(Fd), ets:delete(?MODULE), case StopDbg of @@ -702,7 +975,30 @@ format(Files,Out,Handler) when is_list(Files), is_list(hd(Files)) -> end, R. -prepare(File,Handler) -> +collect_files(Dirs) -> + lists:map(fun(Dir) -> + MetaFiles = filelib:wildcard(filename:join(Dir,"*.ti")), + lists:map(fun(M) -> + Sub = string:left(M,length(M)-3), + case filelib:is_file(Sub) of + true -> Sub; + false -> Sub++".*.wrp" + end + end, + MetaFiles) + end, Dirs). + +get_handler(undefined, Files) -> + %%We retrieve traci from the first available file + {Traci, _} = read_traci(hd(Files)), + case dict:find(handler, Traci) of + error -> {fun defaulthandler/4, initial}; + {ok, [Handler]} -> Handler + end; +get_handler(Handler, _) -> + Handler. + +prepare(File) -> {Traci,Proci} = read_traci(File), Node = get_node(Traci), lists:foreach(fun({Pid,PI}) -> @@ -714,19 +1010,21 @@ prepare(File,Handler) -> ets:insert(?MODULE,{Pid,PI,Node}) end,Proci), FileOrWrap = get_file(File,Traci), - Handler1 = get_handler(Handler,Traci), - {FileOrWrap,Traci,Handler1}. + {FileOrWrap,Traci}. -format_opt(Opt) -> +format_opt(Opt) when is_list(Opt) -> Out = case lists:keysearch(out,1,Opt) of {value,{out,O}} -> O; _ -> standard_io end, Handler = case lists:keysearch(handler,1,Opt) of - {value,{handler,H}} -> H; - _ -> undefined + {value,{handler,H}} -> H; + _ -> undefined end, - {Out,Handler}. + DisableSort = proplists:get_value(disable_sort, Opt, false), + {Out,Handler,DisableSort}; +format_opt(Opt) -> + format_opt([Opt]). read_traci(File) -> @@ -800,75 +1098,61 @@ check_client(Client,File) when is_tuple(Client),element(2,Client)==wrap -> check_exists(File) -> case file:read_file_info(File) of {ok,#file_info{type=regular}} -> File; - _ -> + _ -> exit({error,no_file}) end. - -get_handler(Handler,Traci) -> - case Handler of - undefined -> - case dict:find(handler,Traci) of - {ok,[H]} -> H; - error -> undefined - end; - _ -> - Handler - end. -do_format(Fd,Details) -> - Clients = lists:foldl(fun({FileOrWrap,Traci,Handler},Acc) -> - [start_client(FileOrWrap,Traci,Handler) - |Acc] +do_format(Fd,Details,DisableSort,Handler) -> + Clients = lists:foldl(fun({FileOrWrap,Traci},Acc) -> + [start_client(FileOrWrap,Traci)|Acc] end,[],Details), - init_collector(Fd,Clients). - - -start_client(FileOrWrap,Traci,et) -> - dbg:trace_client(file, FileOrWrap, - {fun handler/2, - {dict:to_list(Traci),{{ttb_et,handler},initial}}}); -start_client(FileOrWrap,Traci,undefined) -> - dbg:trace_client(file, FileOrWrap, - {fun handler/2, - {dict:to_list(Traci),{fun defaulthandler/4,initial}}}); -start_client(FileOrWrap,Traci,Handler) -> - dbg:trace_client(file, FileOrWrap, - {fun handler/2, {dict:to_list(Traci),Handler}}). - -handler(Trace,State) -> - %% State here is only used for the initial state. The accumulated - %% State is maintained by collector!!! - receive - {get,Collector} -> Collector ! {self(),{Trace,State}}; + init_collector(Fd,Clients,DisableSort,Handler). + +start_client(FileOrWrap,Traci) -> + dbg:trace_client(file, FileOrWrap, + {fun handler/2, dict:to_list(Traci)}). + +handler(Trace,Traci) -> + %%We return our own Traci so that it not necesarry to look it up + %%This may take time if something huge has been written to it + receive + {get,Collector} -> Collector ! {self(),{Trace,Traci}}; done -> ok end, - State. + Traci. -handler1(Trace,{Fd,{Traci,{Fun,State}}}) when is_function(Fun) -> - {Traci,{Fun,Fun(Fd,Trace,Traci,State)}}; -handler1(Trace,{Fd,{Traci,{{M,F},State}}}) when is_atom(M), is_atom(F) -> - {Traci,{{M,F},M:F(Fd,Trace,Traci,State)}}. +%%Used to handle common state (the same for all clients) +handler2(Trace,{Fd,Traci,{Fun,State}}) when is_function(Fun) -> + {Fun, Fun(Fd, Trace, Traci, State)}; +handler2(Trace,{Fd,Traci,{{M,F},State}}) when is_atom(M), is_atom(F) -> + {{M,F}, M:F(Fd, Trace, Traci, State)}. defaulthandler(Fd,Trace,_Traci,initial) -> dbg:dhandler(Trace,Fd); defaulthandler(_Fd,Trace,_Traci,State) -> dbg:dhandler(Trace,State). -init_collector(Fd,Clients) -> +init_collector(Fd,Clients,DisableSort,Handler) -> Collected = get_first(Clients), - collector(Fd,sort(Collected)). + case DisableSort of + true -> collector(Fd,Collected, DisableSort, Handler); + false -> collector(Fd,sort(Collected), DisableSort, Handler) + end. -collector(Fd,[{_,{Client,{Trace,State}}}|Rest]) -> +collector(Fd,[{_,{Client,{Trace,Traci}}} |Rest], DisableSort, CommonState) -> Trace1 = update_procinfo(Trace), - State1 = handler1(Trace1,{Fd,State}), - case get_next(Client,State1) of - end_of_trace -> - handler1(end_of_trace,{Fd,State1}), - collector(Fd,Rest); - Next -> collector(Fd,sort([Next|Rest])) + CommonState2 = handler2(Trace1, {Fd, Traci, CommonState}), + case get_next(Client) of + end_of_trace -> + collector(Fd,Rest,DisableSort, CommonState2); + Next -> case DisableSort of + false -> collector(Fd,sort([Next|Rest]), DisableSort, CommonState2); + true -> collector(Fd,[Next|Rest], DisableSort, CommonState2) + end end; -collector(_Fd,[]) -> +collector(Fd,[], _, CommonState) -> + handler2(end_of_trace, {Fd, end_of_trace, CommonState}), ok. update_procinfo({drop,_N}=Trace) -> @@ -895,7 +1179,7 @@ update_procinfo(Trace) -> ProcInfo = get_procinfo(Pid), setelement(2,Trace,ProcInfo). -get_procinfo(Pid) when is_pid(Pid) -> +get_procinfo(Pid) when is_pid(Pid); is_port(Pid) -> case ets:lookup(?MODULE,Pid) of [PI] -> PI; [] -> Pid @@ -913,21 +1197,21 @@ get_procinfo({Name,Node}) when is_atom(Name) -> get_first([Client|Clients]) -> Client ! {get,self()}, - receive - {Client,{end_of_trace,_}} -> + receive + {Client,{end_of_trace,_}} -> get_first(Clients); - {Client,{Trace,_State}}=Next -> + {Client,{Trace,_}}=Next -> [{timestamp(Trace),Next}|get_first(Clients)] end; get_first([]) -> []. -get_next(Client,State) when is_pid(Client) -> +get_next(Client) when is_pid(Client) -> Client ! {get,self()}, - receive - {Client,{end_of_trace,_}} -> + receive + {Client,{end_of_trace,_}} -> end_of_trace; - {Client,{Trace,_OldState}} -> - {timestamp(Trace),{Client,{Trace,State}}} % inserting new state!! + {Client,{Trace, Traci}} -> + {timestamp(Trace),{Client,{Trace,Traci}}} end. sort(List) -> @@ -971,19 +1255,34 @@ display_warning(Item,Warning) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Trace client which reads an IP port and puts data directly to a file. %%% This is used when tracing remote nodes with no file system. -ip_to_file(Trace,{file,File}) -> +ip_to_file({metadata,_,_},{_, only} = State) -> + State; +ip_to_file(Trace, {_, only} = State) -> + dbg:dhandler(Trace, standard_io), + State; +ip_to_file(Trace,{{file,File}, ShellOutput}) -> Fun = dbg:trace_port(file,File), %File can be a filename or a wrap spec Port = Fun(), - ip_to_file(Trace,Port); -ip_to_file({metadata,MetaFile,MetaData},Port) -> + case Trace of + {metadata, _, _} -> ok; + Trace -> show_trace(Trace, ShellOutput) + end, + ip_to_file(Trace,{Port,ShellOutput}); +ip_to_file({metadata,MetaFile,MetaData},State) -> {ok,MetaFd} = file:open(MetaFile,[write,raw,append]), file:write(MetaFd,MetaData), file:close(MetaFd), - Port; -ip_to_file(Trace,Port) -> + State; +ip_to_file(Trace,{Port, ShellOutput}) -> + show_trace(Trace, ShellOutput), B = term_to_binary(Trace), erlang:port_command(Port,B), - Port. + {Port, ShellOutput}. + +show_trace(Trace, true) -> + dbg:dhandler(Trace, standard_io); +show_trace(_, _) -> + ok. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% For debugging @@ -996,5 +1295,3 @@ dump_ti(<<>>,Acc) -> dump_ti(B,Acc) -> {Term,Rest} = get_term(B), dump_ti(Rest,[Term|Acc]). - - |