aboutsummaryrefslogtreecommitdiffstats
path: root/lib/inviso/src/inviso_c.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/inviso/src/inviso_c.erl')
-rw-r--r--lib/inviso/src/inviso_c.erl1335
1 files changed, 1335 insertions, 0 deletions
diff --git a/lib/inviso/src/inviso_c.erl b/lib/inviso/src/inviso_c.erl
new file mode 100644
index 0000000000..ecbd5b0778
--- /dev/null
+++ b/lib/inviso/src/inviso_c.erl
@@ -0,0 +1,1335 @@
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+%%
+%% $Id$
+%%
+%% Author: Ann-Marie L�f, [email protected]
+%% Lennart �hman, [email protected]
+%%
+%% Description: The controlling part of the trace tool Inviso
+%%
+%% This code implements the inviso control component meant to be run on an Erlang
+%% node doing any tracing. It can also be used in a non distributed system where
+%% the control component and the runtime component will run on the same virtual
+%% machine.
+%% The control component is not meant to be started by a supervisor but rather
+%% directly by the user when needed.
+%% This module does not provide any APIs to users. Those are found in inviso.erl.
+%% This module merly has the gen_server call-backs.
+%% ------------------------------------------------------------------------------
+
+-module(inviso_c).
+-behavior(gen_server).
+
+
+%% ------------------------------------------------------------------------------
+%% gen_server callbacks.
+%% ------------------------------------------------------------------------------
+-export([init/1,
+ handle_call/3,handle_cast/2,handle_info/2,
+ terminate/2,
+ code_change/3]).
+%% ------------------------------------------------------------------------------
+
+%% ------------------------------------------------------------------------------
+%% Exported internal functions (used in spawn of help process).
+%% ------------------------------------------------------------------------------
+-export([log_rec_init/4]).
+
+%% ------------------------------------------------------------------------------
+
+%% ------------------------------------------------------------------------------
+%% Records.
+%% ------------------------------------------------------------------------------
+
+%% #state
+%% Record used in the loopdata.
+-record(state,{
+ nodes=[], % [#node,...]
+ distributed, % false | true
+ subscribers=[], % [{pid(),monitor_ref()},...]
+ rt_options=[{dependency,{infinity,node()}},{overload, default}]
+ }).
+%% ------------------------------------------------------------------------------
+
+%% #node
+%% Record storing information about a runtime component connected to this control
+%% component.
+-record(node,{
+ node, % [atom(),...]
+ pid, % pid()
+ vsn,
+ ref, % monitor_ref()
+ tag % term()
+ }).
+%% ------------------------------------------------------------------------------
+
+%% ------------------------------------------------------------------------------
+%% Macros used in this module.
+%% ------------------------------------------------------------------------------
+
+-define(RUNTIME,inviso_rt). % The module API name of the runtime.
+%% ------------------------------------------------------------------------------
+
+
+
+%% ==============================================================================
+%% Controller component implmentation.
+%% ==============================================================================
+
+init({_Parent,Options}) ->
+ process_flag(trap_exit,true),
+ case check_options(Options,start) of
+ {ok,Options2} ->
+ LoopData=initiate_state(Options2),
+ {ok,LoopData};
+ Error ->
+ {stop,Error}
+ end.
+%% ------------------------------------------------------------------------------
+
+handle_call({subscribe,Pid},_From,LD) when pid(Pid) ->
+ MRef=erlang:monitor(process,Pid),
+ {reply,ok,LD#state{subscribers=[{Pid,MRef}|LD#state.subscribers]}};
+handle_call({subscribe,Faulty},_From,LD) ->
+ {reply,{error,{badarg,Faulty}},LD};
+handle_call({unsubscribe,Pid},_From,LD) ->
+ case lists:keysearch(Pid,1,LD#state.subscribers) of
+ {value,{_,MRef}} ->
+ erlang:demonitor(MRef),
+ {reply,ok,LD#state{subscribers=lists:keydelete(Pid,1,LD#state.subscribers)}};
+ false ->
+ {reply,ok,LD}
+ end;
+handle_call({add_nodes,Nodes,Opts,Tag,Condition},_From,LD) ->
+ case check_options(Opts,add_node) of
+ {ok,Opts2} ->
+ Opts3=merge_options(LD#state.rt_options,Opts2),
+ {NewLD,Reply}=do_add_nodes(Nodes,LD,Opts3,Tag,Condition),
+ {reply,adapt_reply(NewLD,Reply),NewLD};
+ Error ->
+ {reply,Error,LD}
+ end;
+handle_call({change_options,Nodes,Opts},_From,LD) ->
+ case check_options(Opts,add_node) of
+ {ok,Opts2} ->
+ {reply,adapt_reply(LD,do_change_option(Nodes,Opts2,LD)),LD};
+ Error ->
+ {reply,Error,LD}
+ end;
+handle_call({init_tracing,TracerDataList},_From,LD) ->
+ {reply,adapt_reply(LD,do_init_tracing(TracerDataList,LD)),LD};
+handle_call({init_tracing,Nodes,TracerData},_From,LD) when list(Nodes) ->
+ TracerDataList=
+ lists:map(fun(N)->{N,TracerData} end,started_trace_nodes(Nodes,LD)),
+ {reply,adapt_reply(LD,do_init_tracing(TracerDataList,LD)),LD};
+handle_call({init_tracing,Nodes,_TracerData},_From,LD) ->
+ {reply,{error,{badarg,Nodes}},LD};
+handle_call({trace_pattern,Nodes,Patterns,FlagList},_From,LD) ->
+ {reply,adapt_reply(LD,distribute_tp(Nodes,Patterns,FlagList,LD)),LD};
+handle_call({trace_flags,Nodes,Args,How},_From,LD) ->
+ {reply,adapt_reply(LD,distribute_tf(Nodes,Args,How,LD)),LD};
+handle_call({trace_flags,NodeArgs,How},_From,LD) ->
+ {reply,distribute_tf(NodeArgs,How,LD),LD}; % Always distributed here.
+handle_call({trace_flags,NodeArgHows},_From,LD) ->
+ {reply,distribute_tf(NodeArgHows,LD),LD}; % Always distributed here.
+handle_call({meta_pattern,Nodes,Args},_From,LD) ->
+ {reply,adapt_reply(LD,distribute_metapattern(Nodes,Args,LD)),LD};
+handle_call({ctp_all,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_ctp_all(Nodes,LD)),LD};
+handle_call({suspend,Reason,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_suspend(Nodes,Reason,LD)),LD};
+handle_call({cancel_suspension,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_cancel_suspension(Nodes,LD)),LD};
+handle_call({stop_tracing,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_stop_tracing(Nodes,LD)),LD};
+handle_call({get_status,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_get_status(Nodes,LD)),LD};
+handle_call({get_tracerdata,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_get_tracerdata(Nodes,LD)),LD};
+handle_call(list_logs,_From,LD) ->
+ {reply,adapt_reply(LD,do_list_logs(all,LD)),LD};
+handle_call({list_logs,TracerDataOrNodesList},_From,LD) ->
+ {reply,adapt_reply(LD,do_list_logs(TracerDataOrNodesList,LD)),LD};
+handle_call({fetch_log,ToNode,Spec,Dest,Prefix},From,LD) ->
+ case LD#state.distributed of
+ true -> % It is a distributed system.
+ do_fetch_log(ToNode,Spec,Dest,Prefix,From,LD),
+ {noreply,LD}; % Reply will come from collector pid.
+ false -> % Stupidity! you dont want this!
+ {reply,{error,not_distributed},LD}
+ end;
+handle_call({delete_log,NodesOrNodeSpecs},_From,LD) ->
+ {reply,adapt_reply(LD,do_delete_log(NodesOrNodeSpecs,LD)),LD};
+handle_call({delete_log,Nodes,Specs},_From,LD) when list(Nodes) ->
+ Reply=do_delete_log(lists:map(fun(N)->{N,Specs} end,Nodes),LD),
+ {reply,adapt_reply(LD,Reply),LD};
+handle_call({delete_log,FaultyNodes,_Specs},_From,LD) ->
+ {reply,{error,{badarg,FaultyNodes}},LD};
+handle_call({clear,Nodes,Options},_From,LD) ->
+ {reply,adapt_reply(LD,do_clear(Nodes,LD,Options)),LD};
+handle_call({flush,Nodes},_From,LD) ->
+ {reply,adapt_reply(LD,do_flush(Nodes,LD)),LD};
+handle_call({stop_nodes,Nodes},_From,LD) ->
+ {NewLD,Reply}=do_stop_nodes(Nodes,LD),
+ {reply,adapt_reply(NewLD,Reply),NewLD};
+handle_call(stop,_From,LD) ->
+ {stop,normal,shutdown,LD};
+handle_call(stop_all,_From,LD) ->
+ {NewLD,Reply}=do_stop_nodes(started_trace_nodes(all,LD),LD),
+ {stop,normal,adapt_reply(NewLD,Reply),NewLD};
+handle_call(Request,_From,LD) -> %% for debug purpose only
+ {reply,{error,{invalid_request,Request}},LD}.
+
+handle_cast(_Request,LD) -> % There are no casts.
+ {noreply,LD}.
+
+handle_info({connect,_Node,Pid,_VSN,Tag},LD) -> % From connecting runtime.
+ {noreply,do_confirm_connection(Pid,Tag,LD)};
+handle_info({trace_event,Event},LD) -> % A runtime component issues an event.
+ send_to_subscribers(Event,LD), % Relay to our subscribers.
+ {noreply,LD};
+handle_info({'DOWN',Ref,process,_From,Info},LD) -> % A runtime component died?
+ {noreply,do_down_msg(Ref,Info,LD)};
+handle_info(state,LD) -> % For debug purposes.
+ io:format("trace_c state: ~p~n",[LD]),
+ {noreply,LD};
+handle_info(_Msg,LD) ->
+ {noreply,LD}.
+
+terminate(_Reason, _LD) ->
+ ok.
+
+code_change(_OldVsn, LoopData, _Extra) ->
+ {ok, LoopData}.
+%% -----------------------------------------------------------------------------
+
+
+%% -----------------------------------------------------------------------------
+%% Handle call help functions.
+%% -----------------------------------------------------------------------------
+
+%% Help function which adapts a reply based on if this is a distributed system
+%% or not. The first argument indicates distribution (='true').
+%% If we are not running a distributed system, all node references are removed.
+adapt_reply(#state{distributed=Distributed},Reply) ->
+ adapt_reply(Distributed,Reply);
+adapt_reply(true,Reply) -> % We are distributed.
+ Reply;
+adapt_reply(false,{ok,[{_Node,LocalReply}]}) ->
+ LocalReply;
+adapt_reply(false,{ok,[]}) ->
+ {error,not_an_added_node}; % �R DET H�R VERKLIGEN RIKTIGT?
+adapt_reply(false,Reply) ->
+ Reply.
+%% ------------------------------------------------------------------------------
+
+
+%% ==============================================================================
+%% First level help functions to handler functions.
+%% ==============================================================================
+
+%% Function starting a runtime component at the nodes in Nodes. If doing non
+%% distributed, Nodes will be a list of nonode@nohost.
+%% Returns {NewLD,{ok,Replies}} or {LD,{error,Reason}}.
+do_add_nodes(Nodes,LD,Options,Tag,Condition) ->
+ do_add_nodes_2(Nodes,LD,Options,Tag,Condition,[]).
+
+do_add_nodes_2([Node|Tail],LD,Options,Tag,Condition,Replies) ->
+ case find(fun is_started/2,LD#state.nodes,Node) of
+ {value,true} -> % Already started by us.
+ do_add_nodes_2(Tail,LD,Options,Tag,Condition,[{Node,{ok,already_added}}|Replies]);
+ no_match ->
+ case ?RUNTIME:start(Node,Options,Tag,Condition) of
+ {node_info,_Node,Pid,VSN,State,Status,new} ->
+ NewLD=do_add_nodes_3(Node,LD,Tag,Pid,VSN,State,Status),
+ do_add_nodes_2(Tail,NewLD,Options,Tag,Condition,[{Node,{ok,new}}|Replies]);
+ {node_info,_Node,Pid,VSN,State,Status,{tag,Tag2}} ->
+ NewLD=do_add_nodes_3(Node,LD,Tag,Pid,VSN,State,Status),
+ do_add_nodes_2(Tail,NewLD,Options,Tag,Condition,
+ [{Node,{ok,{adopted,State,Status,Tag2}}}|Replies]);
+ Error ->
+ do_add_nodes_2(Tail,LD,Options,Tag,Condition,[{Node,Error}|Replies])
+ end
+ end;
+do_add_nodes_2([],LD,_,_,_,Replies) ->
+ {LD,{ok,lists:reverse(Replies)}};
+do_add_nodes_2(Faulty,LD,_,_,_,_) -> % Not a list of nodes.
+ {LD,{error,{badarg,Faulty}}}.
+
+do_add_nodes_3(Node,LD,Tag,Pid,VSN,State,Status) ->
+ MRef=erlang:monitor(process,Pid),
+ NodeRec=#node{node=Node,pid=Pid,vsn=VSN,ref=MRef,tag=Tag},
+ send_to_subscribers({connected,Node,{Tag,{State,Status}}},LD),
+ _NewLD=set_node_rec(NodeRec,LD).
+%% ------------------------------------------------------------------------------
+
+%% Function calling change_options sequensially on all nodes in Nodes.
+%% Returns {ok,Replies} or {error,Reason}.
+do_change_option(Nodes,Options,LD) ->
+ do_change_option_2(started_trace_nodes(Nodes,LD#state.nodes),Options,LD,[]).
+
+do_change_option_2([Node|Tail],Options,LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ Rec when record(Rec,node) ->
+ Answer=?RUNTIME:change_options(Rec#node.pid,Options),
+ do_change_option_2(Tail,Options,LD,[{Node,Answer}|Replies]);
+ Error ->
+ do_change_option_2(Tail,Options,LD,[{Node,Error}|Replies])
+ end;
+do_change_option_2([],_Options,_LD,Replies) ->
+ {ok,Replies};
+do_change_option_2(Faulty,_,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function which calls the runtime components in TracerDataList and initiates
+%% the tracing. TracerDataList may either be just one tracer data which shall be
+%% applied to all runtime components, or a list of nodes and tracerdata.
+do_init_tracing(TracerDataList,LD) ->
+ case inviso_rt_lib:is_tracerdata(TracerDataList) of
+ true -> % Then we must add all our nodes.
+ List=lists:map(fun(N)->{N,TracerDataList} end,started_trace_nodes(all,LD)),
+ do_init_tracing_2(List,LD,[]);
+ false -> % Assume it is a list of {Node,Tracerdata}
+ do_init_tracing_2(TracerDataList,LD,[])
+ end.
+
+do_init_tracing_2([{Node,TracerData}|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ Rec when is_record(Rec,node) ->
+ case check_modify_tracerdata(TracerData,LD) of
+ {ok,NewTracerData} -> % Tracerdata ok and node->pid.
+ Reply=?RUNTIME:init_tracing(Rec#node.pid,NewTracerData),
+ do_init_tracing_2(Tail,LD,[{Node,Reply}|Replies]);
+ {error,Reason} -> % Unknown tracerdata.
+ do_init_tracing_2(Tail,LD,[{Node,{error,Reason}}|Replies])
+ end;
+ {error,Reason} ->
+ do_init_tracing_2(Tail,LD,[{Node,{error,Reason}}|Replies])
+ end;
+do_init_tracing_2([_|Tail],LD,Replies) -> % Just ignore item we don't understand.
+ do_init_tracing_2(Tail,LD,Replies); % Will not end up in Replies either.
+do_init_tracing_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_init_tracing_2(What,_LD,_) ->
+ {error,{badarg,What}}.
+%% -----------------------------------------------------------------------------
+
+%% Function setting trace patterns on all nodes mentioned in Nodes. Uses a
+%% parallel mechanism in the runtime component.
+%% Returns {ok,Reply} or {error,Reason}.
+distribute_tp(all,Patterns,FlagList,LD) ->
+ distribute_tp(started_trace_nodes(all,LD),Patterns,FlagList,LD);
+distribute_tp(Nodes,Patterns,FlagList,LD) when list(Nodes) ->
+ RTpids=lists:map(fun(N)->case get_node_rec(N,LD) of
+ #node{pid=Pid} ->
+ {Pid,N};
+ Error ->
+ {Error,N}
+ end
+ end,
+ Nodes),
+ {ok,?RUNTIME:trace_patterns_parallel(RTpids,Patterns,FlagList)};
+distribute_tp(Faulty,_,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function which in parallel sets trace flags on all nodes in Nodes. Can be
+%% either a list of node names or 'all'. Note that in the reply list there will be an
+%% error indicating nodes not reachable. Either because such a node disappeared or
+%% because it is not one of "our" nodes.
+%% Returns {ok,Reply} or {error,Reason}.
+distribute_tf(all,Args,How,LD) ->
+ distribute_tf(started_trace_nodes(all,LD),Args,How,LD);
+distribute_tf(Nodes,Args,How,LD) when list(Nodes) ->
+ RTpids=lists:map(fun(Node)->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ {Pid,Node};
+ Error -> % Not an added node then!
+ {Error,Node}
+ end
+ end,
+ Nodes),
+ {ok,?RUNTIME:trace_flags_parallel(RTpids,Args,How)};
+distribute_tf(Faulty,_,_,_) ->
+ {error,{badarg,Faulty}}.
+
+%% As above but specific args for each node.
+distribute_tf(NodeArgs,How,LD) when list(NodeArgs) ->
+ RTpidArgs=lists:map(fun({Node,Args})->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ {Pid,Node,Args};
+ Error -> % Not an added node then!
+ {Error,Node}
+ end
+ end,
+ NodeArgs),
+ {ok,?RUNTIME:trace_flags_parallel(RTpidArgs,How)};
+distribute_tf(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+
+%% As above but both specific args for each node and How (set or remove flag).
+distribute_tf(NodeArgHows,LD) when list(NodeArgHows) ->
+ RTpidArgHows=
+ lists:map(fun({Node,Args,How}) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ {Pid,Node,Args,How};
+ Error -> % Not an added node then!
+ {Error,Node}
+ end
+ end,
+ NodeArgHows),
+ {ok,?RUNTIME:trace_flags_parallel(RTpidArgHows)};
+distribute_tf(Faulty,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function making a parallel call to all nodes in Nodes, calling the generic
+%% meta-tracer function stated in Args.
+%% Returns {ok,Reply} or {error,Reason}.
+distribute_metapattern(all,Args,LD) ->
+ distribute_metapattern(started_trace_nodes(all,LD),Args,LD);
+distribute_metapattern(Nodes,Args,LD) when list(Nodes) ->
+ RTpids=lists:map(fun(N)->case get_node_rec(N,LD) of
+ #node{pid=Pid} ->
+ {Pid,N};
+ Error ->
+ {Error,N}
+ end
+ end,
+ Nodes),
+ {ok,?RUNTIME:meta_tracer_call_parallel(RTpids,Args)};
+distribute_metapattern(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function clearing all trace patterns on all node mentioned in Nodes.
+%% Returns {ok,Reply} or {error,Reason}.
+do_ctp_all(Nodes,LD) ->
+ do_ctp_all_2(started_trace_nodes(Nodes,LD),LD,[]).
+
+do_ctp_all_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_ctp_all_2(Tail,LD,[{Node,?RUNTIME:clear_all_tp(Pid)}|Replies]);
+ Error ->
+ do_ctp_all_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_ctp_all_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_ctp_all_2(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function suspending all runtime components mentioned in Nodes.
+%% Returns {ok,Reply} or {error,Reason}.
+do_suspend(Nodes,Reason,LD) ->
+ do_suspend_2(started_trace_nodes(Nodes,LD),Reason,LD,[]).
+
+do_suspend_2([Node|Tail],Reason,LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ Answer=?RUNTIME:call_suspend(Pid,Reason),
+ do_suspend_2(Tail,Reason,LD,[{Node,Answer}|Replies]);
+ Error ->
+ do_suspend_2(Tail,Reason,LD,[{Node,Error}|Replies])
+ end;
+do_suspend_2([],_Reason,_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_suspend_2(Faulty,_,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function cancelling a suspension at the runtime components mentioned in Nodes.
+%% Returns {ok,Reply} or {error,Reason}.
+do_cancel_suspension(Nodes,LD) ->
+ do_cancel_suspension_2(started_trace_nodes(Nodes,LD),LD,[]).
+
+do_cancel_suspension_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ Answer=?RUNTIME:cancel_suspension(Pid),
+ do_cancel_suspension_2(Tail,LD,[{Node,Answer}|Replies]);
+ Error ->
+ do_cancel_suspension_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_cancel_suspension_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_cancel_suspension_2(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function which stops tracing on all nodes in Nodes. The function is performed
+%% in parallel over the nodes to get a more precise stop-time.
+%% Return {ok,Reply} or {error,Reason}.
+do_stop_tracing(all,LD) ->
+ do_stop_tracing(started_trace_nodes(all,LD),LD);
+do_stop_tracing(Nodes,LD) when list(Nodes) ->
+ RTpids=lists:map(fun(N)->case get_node_rec(N,LD) of
+ #node{pid=Pid} ->
+ {Pid,N};
+ Error ->
+ {Error,N}
+ end
+ end,
+ Nodes),
+ {ok,?RUNTIME:stop_tracing_parallel(RTpids)};
+do_stop_tracing(Faulty,_) ->
+ {error,{badarg,Faulty}}.
+%% -----------------------------------------------------------------------------
+
+%% Function fetching the current status of the runtime components in Nodes.
+%% Returns {ok,Reply} or {error,Reason}.
+do_get_status(Nodes,LD) ->
+ do_get_status_2(started_trace_nodes(Nodes,LD),LD,[]).
+
+do_get_status_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_get_status_2(Tail,LD,[{Node,?RUNTIME:get_status(Pid)}|Replies]);
+ Error ->
+ do_get_status_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_get_status_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_get_status_2(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function retrieving the tracerdata for the nodes in Nodes.
+%% Returns {ok,Reply} or {error,Reason}.
+do_get_tracerdata(Nodes,LD) ->
+ do_get_tracerdata_2(started_trace_nodes(Nodes,LD),LD,[]).
+
+do_get_tracerdata_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_get_tracerdata_2(Tail,LD,[{Node,?RUNTIME:get_tracerdata(Pid)}|Replies]);
+ Error ->
+ do_get_tracerdata_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_get_tracerdata_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_get_tracerdata_2(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+%% ------------------------------------------------------------------------------
+
+%% Function that lists all logfiles associated with a certain tracerdata
+%% or the current tracerdata should no tracerdata be mentioned.
+%% Returns {ok,Replies} or {error,Reason}.
+do_list_logs(all,LD) -> % When doing all known nodes.
+ do_list_logs_2(started_trace_nodes(all,LD),LD,[]);
+do_list_logs(TracerDataOrNodesList,LD) ->
+ case inviso_rt_lib:is_tracerdata(TracerDataOrNodesList) of
+ true -> % It is tracerdata for this node.
+ do_list_logs_2([{node(),TracerDataOrNodesList}],LD,[]);
+ false ->
+ do_list_logs_2(TracerDataOrNodesList,LD,[])
+ end.
+
+do_list_logs_2([{Node,TD}|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ case check_modify_tracerdata(TD,LD) of
+ {ok,TracerData} ->
+ Answer=?RUNTIME:list_logs(Pid,TracerData),
+ do_list_logs_2(Tail,LD,[{Node,Answer}|Replies]);
+ Error ->
+ do_list_logs_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+ Error ->
+ do_list_logs_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_list_logs_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ Answer=?RUNTIME:list_logs(Pid),
+ do_list_logs_2(Tail,LD,[{Node,Answer}|Replies]);
+ Error ->
+ do_list_logs_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_list_logs_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_list_logs_2(Other,_LD,_Replies) ->
+ {error,{badarg,Other}}.
+%% -----------------------------------------------------------------------------
+
+%% Function fetching logfiles using distributed erlang. This function does not
+%% return anything significant. This since the reply to the client is always
+%% sent by the CollectPid unless there is badarg fault detected before the
+%% CollectPid is spawned. Note that this function sends a list of fetchers from
+%% which the CollectPid shall expect replies.
+%% We try to catch some bad arguments like Destination and Prefix not being
+%% strings. However the fact that they are lists does not guarantee they are
+%% proper strings.
+do_fetch_log(ToNode,all,Dest,Prefix,From,LD) ->
+ do_fetch_log(ToNode,started_trace_nodes(all,LD),Dest,Prefix,From,LD);
+do_fetch_log(ToNode,Specs,Dest,Prefix,From,LD) when list(Dest),list(Prefix) ->
+ CollectPid=spawn_link(ToNode,?MODULE,log_rec_init,[self(),Dest,Prefix,From]),
+ do_fetch_log_2(Specs,LD,CollectPid,[],[]);
+do_fetch_log(_ToNode,_Specs,Dest,Prefix,From,_LD) ->
+ gen_server:reply(From,{error,{badarg,[Dest,Prefix]}}).
+
+do_fetch_log_2([{Node,Spec}|Rest],LD,CollectPid,Fetchers,Replies) ->
+ if
+ Node==node() -> % This is plain stupid!
+ do_fetch_log_2(Rest,LD,CollectPid,Fetchers,[{Node,{error,own_node}}|Replies]);
+ true ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ {NewFetchers,NewReplies}=
+ do_fetch_log_3(Fetchers,
+ Replies,
+ Node,
+ ?RUNTIME:fetch_log(Pid,CollectPid,Spec)),
+ do_fetch_log_2(Rest,LD,CollectPid,NewFetchers,NewReplies);
+ Error -> % Most likely the node does not exist.
+ do_fetch_log_2(Rest,LD,CollectPid,Fetchers,[{Node,Error}|Replies])
+ end
+ end;
+do_fetch_log_2([Node|Rest],LD,CollectPid,Fetchers,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ {NewFetchers,NewReplies}=
+ do_fetch_log_3(Fetchers,Replies,Node,?RUNTIME:fetch_log(Pid,CollectPid)),
+ do_fetch_log_2(Rest,LD,CollectPid,NewFetchers,NewReplies);
+ Error -> % Most likely the node does not exist.
+ do_fetch_log_2(Rest,LD,CollectPid,Fetchers,[{Node,Error}|Replies])
+ end;
+do_fetch_log_2([],_,CollectPid,Fetchers,Replies) ->
+ CollectPid ! {?MODULE,self(),Fetchers,Replies};
+do_fetch_log_2(FaultySpec,_,CollectPid,_Fetchers,_Replies) ->
+ CollectPid ! {?MODULE,self(),[],{error,{badarg,FaultySpec}}}.
+
+do_fetch_log_3(Fetchers,Replies,Node,{ok,Fetcher}) ->
+ {[{Node,Fetcher}|Fetchers],Replies};
+do_fetch_log_3(Fetchers,Replies,Node,{complete,no_log}) ->
+ {Fetchers,[{Node,{complete,no_log}}|Replies]};
+do_fetch_log_3(Fetchers,Replies,Node,{error,Reason}) ->
+ {Fetchers,[{Node,{error,Reason}}|Replies]}.
+%% -----------------------------------------------------------------------------
+
+%% Function removing files from the runtime components. We can either ask for
+%% all files associated with the current tracerdata to be removed, or provide
+%% tracerdata or a list of files to be removed.
+%% Returns the client reply.
+do_delete_log(all,LD) ->
+ do_delete_log_2(started_trace_nodes(all,LD),LD,[]);
+do_delete_log(NodeSpecs,LD) ->
+ case inviso_rt_lib:is_tracerdata(NodeSpecs) of
+ true -> % It is tracerdata for all nodes.
+ do_delete_log_2(lists:map(fun(N)->{N,NodeSpecs} end,
+ started_trace_nodes(all,LD)),
+ LD,[]);
+ false ->
+ if
+ list(NodeSpecs),list(hd(NodeSpecs)) -> % A list of files.
+ do_delete_log_2(lists:map(fun(N)->{N,NodeSpecs} end,
+ started_trace_nodes(all,LD)),
+ LD,[]);
+ true -> % Then use it as is.
+ do_delete_log_2(NodeSpecs,LD,[])
+ end
+ end.
+
+do_delete_log_2([{Node,Spec}|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_delete_log_2(Tail,LD,[{Node,?RUNTIME:delete_log(Pid,Spec)}|Replies]);
+ Error ->
+ do_delete_log_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_delete_log_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_delete_log_2(Tail,LD,[{Node,?RUNTIME:delete_log(Pid)}|Replies]);
+ Error ->
+ do_delete_log_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_delete_log_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_delete_log_2(Faulty,_,_) ->
+ {error,{badarg,Faulty}}.
+%% -----------------------------------------------------------------------------
+
+%% Function removing files from runtime components.
+%% Returns {ok,Replies} or {error,Reason}.
+do_clear(Nodes,LD,Options) ->
+ do_clear_2(started_trace_nodes(Nodes,LD),LD,Options,[]).
+
+do_clear_2([Node|Tail],LD,Options,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_clear_2(Tail,LD,Options,[{Node,?RUNTIME:clear(Pid,Options)}|Replies]);
+ Error ->
+ do_clear_2(Tail,LD,Options,[{Node,Error}|Replies])
+ end;
+do_clear_2([],_LD,_Options,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_clear_2(FaultyNodes,_LD,_Options,_Replies) ->
+ {error,{badarg,FaultyNodes}}.
+%% -----------------------------------------------------------------------------
+
+%% Function doing a flush trace-port.
+%% Returns {ok,Replies} or {error,Reason}.
+do_flush(Nodes,LD) ->
+ do_flush_2(started_trace_nodes(Nodes,LD),LD,[]).
+
+do_flush_2([Node|Rest],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{pid=Pid} ->
+ do_flush_2(Rest,LD,[{Node,?RUNTIME:flush(Pid)}|Replies]);
+ Error ->
+ do_flush_2(Rest,LD,[{Node,Error}|Replies])
+ end;
+do_flush_2([],_LD,Replies) ->
+ {ok,lists:reverse(Replies)};
+do_flush_2(FaultyNodes,_LD,_Replies) ->
+ {error,{badarg,FaultyNodes}}.
+%% -----------------------------------------------------------------------------
+
+%% Function stopping runtime components. We can only stop runtime components
+%% belonging to this control component.
+%% Returns {NewLoopdata,Reply}.
+do_stop_nodes(Nodes,LD) ->
+ do_stop_nodes_2(started_trace_nodes(Nodes,LD),LD,[]).
+
+do_stop_nodes_2([Node|Tail],LD,Replies) ->
+ case get_node_rec(Node,LD) of
+ #node{ref=MRef} ->
+ erlang:demonitor(MRef),
+ case ?RUNTIME:stop(Node) of
+ ok ->
+ NewLD=delete_node_rec(Node,LD),
+ send_to_subscribers({disconnected,Node,void},LD),
+ do_stop_nodes_2(Tail,NewLD,[{Node,ok}|Replies]);
+ Error ->
+ do_stop_nodes_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+ Error ->
+ do_stop_nodes_2(Tail,LD,[{Node,Error}|Replies])
+ end;
+do_stop_nodes_2([],LD,Replies) ->
+ {LD,{ok,Replies}};
+do_stop_nodes_2(Faulty,LD,_) ->
+ {LD,{error,{badarg,Faulty}}}.
+%% -----------------------------------------------------------------------------
+
+%% Function being called when a runtime component sends a connect message to
+%% the controlcomponent. The control component then confirms that is has indeed
+%% taken on that runtime component.
+%% Returns a new loopdata structure.
+do_confirm_connection(Pid,Tag,LD) ->
+ case ?RUNTIME:confirm_connection(Pid,Tag) of
+ {node_info,Node,_Pid,VSN,State,Status,_Tag} ->
+ MRef=erlang:monitor(process,Pid), % We must monitor it from now on.
+ Rec=#node{node=Node,vsn=VSN,tag=Tag,ref=MRef,pid=Pid},
+ send_to_subscribers({connected,Node,{Tag,{State,Status}}},LD),
+ set_node_rec(Rec,LD); % Makes new loopdata.
+ _Error -> % Strange, it wanted us as control!?
+ LD
+ end.
+%% ------------------------------------------------------------------------------
+
+%% Function handling an incomming DOWN message. This can be one of our runtime
+%% components terminating, or a process subscribing to events. Send a trace-event
+%% to subscribers if it was a runtime terminating and remove it from
+%% our list of runtime components.
+%% Note that if a subscriber has subscribed multiple times to events, we will get
+%% multiple DOWN messages too, since we have monitored that process multiple
+%% times. It is therefore sufficient to remove just one subscription entry here
+%% each time (remove on the monitor reference!).
+%% Returns a new LoopData structure.
+do_down_msg(Ref,Info,LD) ->
+ case find(fun ref/2,LD#state.nodes,Ref) of
+ {value,Node} -> % Yes it was one of our nodes.
+ send_to_subscribers({disconnected,Node,Info},LD),
+ delete_node_rec(Node,LD);
+ no_match -> % Not one of our nodes.
+ case lists:keysearch(Ref,2,LD#state.subscribers) of
+ {value,{_Pid,_}} -> % It was a subscriber terminating.
+ LD#state{subscribers=lists:keydelete(Ref,2,LD#state.subscribers)};
+ false -> % Not one of our subscribers either.
+ LD % Do nothing then.
+ end
+ end.
+%% ------------------------------------------------------------------------------
+
+
+
+%% ==============================================================================
+%% Help functions.
+%% ==============================================================================
+
+
+%% Help function which inspects options to start and add_node. Returns a new
+%% list of options in {ok,Options} or {error,Reason}.
+check_options(Options, Context) ->
+ check_options_2(Options, Context, []).
+
+check_options_2([],_Context,Result) ->
+ {ok,Result};
+check_options_2([{subscribe,Pid}|OptionsTail],start,Result) when pid(Pid) ->
+ check_options_2(OptionsTail,start,[{subscribe,Pid}|Result]);
+check_options_2([{unsubscribe,Pid}|OptionsTail],start,Result) when pid(Pid) ->
+ check_options_2(OptionsTail,start,[{unsubscribe,Pid}|Result]);
+check_options_2([{dependency,How}|OptionsTail],Context,Result) ->
+ check_options_2(OptionsTail,Context,[{dependency,How}|Result]);
+check_options_2([{overload,How}|OptionsTail],Context,Result) ->
+ check_options_2(OptionsTail,Context,[{overload,How}|Result]);
+check_options_2([overload|OptionsTail],Context,Result) ->
+ check_options_2(OptionsTail,Context,[overload|Result]);
+check_options_2([UnKnown|_],_Context,_Result) ->
+ {error,{unknown_option,UnKnown}};
+check_options_2(UnKnown,_Context,_Result) ->
+ {error,{unknown_option,UnKnown}}.
+%% ------------------------------------------------------------------------------
+
+%% Help function initiating the #state structure, i.e the loopdata. Since there
+%% are some initial values from the record defaults when creating a new #state,
+%% those must be compared with possibly specified Options. Specified Options shall
+%% of course override defaults.
+%% Note that it is not the control component's responsibility to understand all
+%% options later given to a runtime component. It mearly stores them in rt_options
+%% so they can be passed to the runtime component at start-up.
+%% Returns a loopdata structure.
+initiate_state(Options) ->
+ ResultingOptions=merge_options((#state{})#state.rt_options,Options),
+ LD1=initiate_state_2(ResultingOptions,#state{rt_options=[]}),
+ case node() of % Finally set the distribution flag.
+ nonode@nohost ->
+ LD1#state{distributed=false};
+ _ ->
+ LD1#state{distributed=true}
+ end.
+
+initiate_state_2([{subscribe,Proc}|Tail],LD) when pid(Proc);atom(Proc)->
+ MRef=erlang:monitor(process,Proc),
+ initiate_state_2(Tail,LD#state{subscribers=[{Proc,MRef}|LD#state.subscribers]});
+initiate_state_2([Opt|Tail],LD) when tuple(Opt),size(Opt)>=1 ->
+ initiate_state_2(Tail,initiate_state_3(element(1,Opt),Opt,LD));
+initiate_state_2([Opt|Tail],LD) when atom(Opt) ->
+ initiate_state_2(Tail,initiate_state_3(Opt,Opt,LD));
+initiate_state_2([_|Tail],LD) ->
+ initiate_state_2(Tail,LD);
+initiate_state_2([],LD) ->
+ LD.
+
+initiate_state_3(OptName,Opt,LD) ->
+ case initiate_state_is_rt_option(OptName) of
+ true -> % Yes, it shall be part of the rt_options.
+ LD#state{rt_options=[Opt|LD#state.rt_options]};
+ false -> % Ignore the option then.
+ LD
+ end.
+
+%% This is the only place you have to change should there be more rt_options
+%% introduced.
+initiate_state_is_rt_option(overload) -> true;
+initiate_state_is_rt_option(dependency) -> true;
+initiate_state_is_rt_option(_) -> false.
+%% ------------------------------------------------------------------------------
+
+%% Help function which takes a list of default options and a list of overriding
+%% options. The function builds a return value consisting of all default options
+%% unless they are overridden by a overriding option.
+%% An option can be either {Param,.....} or Param. I.e either a tuple with zero
+%% or more values associated with the Parameter, or just an atom.
+merge_options([], Options) ->
+ Options;
+merge_options([T|DefaultTail],Options) when tuple(T),size(T)>=1 ->
+ merge_options(DefaultTail,merge_options_2(element(1,T),T,Options));
+merge_options([Param|DefaultTail],Options) when atom(Param) ->
+ merge_options(DefaultTail,merge_options_2(Param,Param,Options));
+merge_options([_|DefaultTail],Options) -> % Strange, bad default option!
+ merge_options(DefaultTail,Options).
+
+merge_options_2(Param,Opt,Options) ->
+ case merge_options_find(Param,Options) of
+ true ->
+ Options;
+ false ->
+ [Opt|Options]
+ end.
+
+merge_options_find(Param,[T|_]) when tuple(T),element(1,T)==Param ->
+ true;
+merge_options_find(Param,[Param|_]) ->
+ true;
+merge_options_find(Param,[_|Rest]) ->
+ merge_options_find(Param,Rest);
+merge_options_find(_,[]) ->
+ false.
+%% ------------------------------------------------------------------------------
+
+%% Help function which transforms certain parts of a tracer data. Those are
+%% parts which must be transformed at the controlnode like node to pid mappings.
+%% It also checks the formatting of the tracerdata since runtime components
+%% does not accept too badly formatted tracerdata.
+%% Returns {ok,NewTraceData} or {error,Reason}.
+check_modify_tracerdata(TracerData,LoopData) when list(TracerData) ->
+ case lists:keysearch(trace,1,TracerData) of
+ {value,{_,TraceTD}} -> % Examine the trace part.
+ case check_modify_tracerdata(TraceTD,LoopData) of
+ {ok,NewTraceTD} ->
+ {ok,lists:keyreplace(trace,1,TracerData,{trace,NewTraceTD})};
+ {error,Reason} -> % The trace part was faulty.
+ {error,Reason} % Ruins everything :-)
+ end;
+ false -> % Unusual, but no trace part.
+ {ok,TracerData} % No modifications necessary.
+ end;
+check_modify_tracerdata(collector,_LoopData) ->
+ {ok, collector};
+check_modify_tracerdata({relayer,Collector},_LoopData) when is_pid(Collector) ->
+ {ok,{relayer,Collector}};
+check_modify_tracerdata({relayer,Collector},LoopData) when is_atom(Collector) ->
+ case get_node_rec(Collector,LoopData) of
+ Rec when is_record(Rec,node) -> % Collector is a known node.
+ {ok,{relayer,Rec#node.pid}};
+ {error,not_an_added_node} ->
+ {error,{not_an_added_node,Collector}}
+ end;
+check_modify_tracerdata({Type,Data},_LoopData) when Type==ip;Type==file ->
+ {ok,{Type,Data}};
+check_modify_tracerdata({Handler,Data},_LoopData) when function(Handler) ->
+ {ok,{Handler,Data}};
+check_modify_tracerdata(Data,_LoopData) ->
+ {error,{bad_tracerdata,Data}}.
+%% -----------------------------------------------------------------------------
+
+%% Help function sending an event to all processes subscribing to events.
+%% Note that the function manipulates the from Node indicator incase we are not
+%% running in a distributed system.
+%% Returns nothing significant.
+send_to_subscribers(Msg={Event,_Node,Data},LD) ->
+ AdaptedMsg=
+ case LD#state.distributed of
+ true ->
+ Msg;
+ false ->
+ {Event,local_runtime,Data}
+ end,
+ TraceEvent={inviso_event,self(),erlang:localtime(),AdaptedMsg},
+ send_to_subscribers_2(LD#state.subscribers,TraceEvent).
+
+send_to_subscribers_2([],_) ->
+ ok;
+send_to_subscribers_2([{Subscriber,_}|Tail],TraceEvent) ->
+ Subscriber ! TraceEvent,
+ send_to_subscribers_2(Tail,TraceEvent).
+%% -----------------------------------------------------------------------------
+
+%% Help function converting the Nodes parameter to known nodes. Actually today
+%% it only converts the all atom to all known nodes.
+%% Returns a list of nodes.
+started_trace_nodes(all,LoopData) ->
+ lists:map(fun(N)->N#node.node end,LoopData#state.nodes);
+started_trace_nodes(Nodes,_) ->
+ Nodes.
+%% ------------------------------------------------------------------------------
+
+%% Help function searching through a list of elements looking for an element
+%% containing Key. How the element shall be interpreted is done by the Fun.
+%% Returns {value,Element} or 'no_match'.
+%% Fun=fun(Element,Key)={return,Value} | false
+find(_,[],_Key) ->
+ no_match;
+find(Fun,[H|T],Key) ->
+ case Fun(H,Key) of
+ {return,Value}->
+ {value,Value};
+ _ ->
+ find(Fun,T,Key)
+ end.
+%% -----------------------------------------------------------------------------
+
+
+%% -----------------------------------------------------------------------------
+%% Functions handling the nodes datastructure part of the #state.
+%% #state.nodes is a list of #node.
+%% -----------------------------------------------------------------------------
+
+%% Function used to build find fun, looking for a certain #node with its monitoring
+%% reference set to Ref. Useful when finding out if a DOWN message comes from one
+%% of our runtime components.
+ref(#node{ref=Ref,node=Node},Ref) ->
+ {return,Node};
+ref(_,_) ->
+ false.
+%% -----------------------------------------------------------------------------
+
+%% use in find/3
+%% Function used to build find fun, finding out if we have a node with the node
+%% name Node.
+is_started(#node{node=Node},Node) ->
+ {return,true};
+is_started(_,_) ->
+ false.
+%% -----------------------------------------------------------------------------
+
+%% Help function replacing or adding an entry for a node. Works on either a list
+%% of #node or a loopdata structure. Returns a new list of #node or a loopdata struct.
+set_node_rec(Rec,LD=#state{nodes=NodeList}) ->
+ LD#state{nodes=set_node_rec_2(Rec,NodeList)}.
+
+set_node_rec_2(Rec,[]) ->
+ [Rec];
+set_node_rec_2(Rec,[NodeRec|Tail]) when NodeRec#node.node==Rec#node.node ->
+ [Rec|Tail];
+set_node_rec_2(Rec,[NodeRec|Tail]) ->
+ [NodeRec|set_node_rec_2(Rec,Tail)].
+%% ------------------------------------------------------------------------------
+
+%% Help function finding a node record for Node in a list of #node or in loopdata.
+%% Returns the #node in question or {error,not_an_added_node}.
+get_node_rec(Node,NodeList) when list(NodeList) ->
+ get_node_rec_2(Node,NodeList);
+get_node_rec(Node,#state{nodes=NodeList}) ->
+ get_node_rec_2(Node,NodeList).
+
+get_node_rec_2(_Node,[]) ->
+ {error,not_an_added_node};
+get_node_rec_2(Node,[NodeRec|_]) when NodeRec#node.node==Node ->
+ NodeRec;
+get_node_rec_2(Node,[_NodeRec|Tail]) ->
+ get_node_rec_2(Node,Tail).
+%% ------------------------------------------------------------------------------
+
+%% Help function removing a #node from either a list of #node or from a loopdata
+%% structure. Returns a new list of #node or a new loopdata structure.
+delete_node_rec(Node,LD=#state{nodes=NodeList}) ->
+ LD#state{nodes=delete_node_rec_2(Node,NodeList)};
+delete_node_rec(Node,NodeList) when list(NodeList) ->
+ delete_node_rec_2(Node,NodeList).
+
+delete_node_rec_2(_,[]) ->
+ [];
+delete_node_rec_2(#node{node=Node},[#node{node=Node}|Tail]) ->
+ Tail;
+delete_node_rec_2(Node,[#node{node=Node}|Tail]) ->
+ Tail;
+delete_node_rec_2(Node,[NRec|Tail]) ->
+ [NRec|delete_node_rec_2(Node,Tail)].
+%% ------------------------------------------------------------------------------
+
+
+%% =============================================================================
+%% Implementation of the help process receiving all logs from the runtime
+%% components. This process is referred to as the CollectPid.
+%% It is responsible for sending the reply back to the control component
+%% client. If a runtime component becomes suspended, the CollectPid is
+%% alerted by the DOWN message.
+%% Note that it may take some time before this process responds back to the client.
+%% Therefore the client must wait for 'infinity'. The job of transferring the
+%% files can be costly. Therefore it is a good idea to stop if no one is really
+%% interested in the result. This collector process monitors the From client in
+%% order to learn if the job can be cancelled. That will also be a possibility
+%% for a client to willfully cancel a fetch job.
+%% =============================================================================
+
+%% Intitial function on which the control component spawns. Note that the start
+%% must be done in two steps since the runtime components must be informed of
+%% the CollectPid. But the CollectPid must also know from which runtime components
+%% it can expect files from.
+%% InitialReplies: contains {Node,Result} for nodes from where there will be no
+%% files, but which must be part of the final reply.
+log_rec_init(Parent,Dest,Prefix,From={ClientPid,_}) ->
+ receive
+ {?MODULE,Parent,Fetchers,InitialReplies} ->
+ RTs=lists:map(fun({N,F})->
+ {N,erlang:monitor(process,F),void,void,void}
+ end,
+ Fetchers),
+ CMRef=erlang:monitor(process,ClientPid), % Monitor the client.
+ case log_rec_loop(Dest,Prefix,RTs,InitialReplies,CMRef) of
+ Reply when list(Reply) -> % It is an ok value.
+ gen_server:reply(From,{ok,Reply});
+ {error,Reason} ->
+ gen_server:reply(From,{error,Reason});
+ false -> % The client terminated, no response.
+ true % Simply terminate, fetchers will notice.
+ end
+ end.
+
+log_rec_loop(_Dest,_Prefix,[],Replies,_CMRef) -> % All nodes done!
+ Replies; % This is the final reply.
+log_rec_loop(Dest,Prefix,RTs,Replies,CMRef) ->
+ receive
+ {Node,open,{FType,RemoteFName}} ->
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_,MRef,_,_,_}} ->
+ {NewRTs,NewReplies}=
+ log_rec_open(Dest,Prefix,Node,MRef,FType,RemoteFName,RTs,Replies),
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ false ->
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {Node,open_failure,{FType,RemoteFName}} ->
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_,MRef,_,_,_}} ->
+ {NewRTs,NewReplies}=
+ log_rec_open_failure(Node,MRef,FType,RemoteFName,RTs,Replies),
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ false ->
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {Node,payload,Bin,FPid} -> % A chunk of data from a fetcher.
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_,_,_,_,void}} -> % Node has no file open here.
+ FPid ! {self(),cancel_transmission}, % No use sending more to me.
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef); % Simply ignore payload.
+ {value,{_Node,MRef,FType,FName,FD}} ->
+ case log_rec_payload(Node,MRef,FType,FName,FD,Bin,RTs,Replies) of
+ {ok,{NewRTs,NewReplies}} ->
+ FPid ! {self(),chunk_ack}, % For flow control.
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ {error,{NewRTs,NewReplies}} ->
+ FPid ! {self(),cancel_transmission}, % No use sending more to me.
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef)
+ end;
+ false -> % Node is not part of transfere.
+ FPid ! {self(),cancel_transmission}, % No use sending more to me.
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {Node,end_of_file} ->
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_,_,_,_,void}} -> % Node has no file open here.
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef);
+ {value,{_,MRef,FType,FName,FD}} ->
+ {NewRTs,NewReplies}=
+ log_rec_eof(Node,MRef,FType,FName,FD,RTs,Replies),
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ false ->
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {Node,end_of_transmission} -> % This runtime is done!
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_Node,MRef,_,_,_}} ->
+ erlang:demonitor(MRef),
+ log_rec_loop(Dest,Prefix,
+ lists:keydelete(Node,1,RTs),
+ log_rec_mkreply(Node,complete,Replies),
+ CMRef);
+ false -> % Strange, not one of our nodes.
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {Node,incomplete} -> % This runtime is done (with errors).
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_,MRef,FType,FName,FD}} ->
+ erlang:demonitor(MRef),
+ {NewRTs,NewReplies}=
+ log_rec_incomplete(Node,FType,FName,FD,RTs,Replies),
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ false -> % Not our, or not anylonger.
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {Node,{error,Reason}} -> % Remote file read_error.
+ case lists:keysearch(Node,1,RTs) of
+ {value,{_,MRef,FType,FName,FD}} ->
+ {NewRTs,NewReplies}=
+ log_rec_error(Node,MRef,FType,FName,FD,RTs,Reason,Replies),
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ false ->
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ {'DOWN',CMRef,process,_,_} -> % The client got tired waiting.
+ log_rec_cancel(Dest,RTs,Replies), % Close and remove all files.
+ false; % Indicate no response message.
+ {'DOWN',Ref,process,_P,_Info} ->
+ case lists:keysearch(Ref,2,RTs) of
+ {value,{Node,_,FType,FName,FD}} ->
+ {NewRTs,NewReplies}=
+ log_rec_incomplete(Node,FType,FName,FD,RTs,Replies),
+ log_rec_loop(Dest,Prefix,NewRTs,NewReplies,CMRef);
+ false -> % Not our, or not anylonger.
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end;
+ _ ->
+ log_rec_loop(Dest,Prefix,RTs,Replies,CMRef)
+ end.
+
+%% Help function opening a new target file on the receiver. It returns
+%% {NewRTs,NewReplies}.
+%% Note that we must protect us against that some of the strings are not proper
+%% strings, but contains garbage.
+log_rec_open(Dest,Prefix,Node,MRef,FType,RemoteFName,RTs,Replies) ->
+ case catch log_rec_open_2(Dest,Prefix,Node,MRef,FType,RemoteFName,RTs,Replies) of
+ {'EXIT',Reason} ->
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,void,void,void}),
+ NewReplies=
+ log_rec_addreply(Node,
+ FType,
+ {error,{file_open,{Reason,[Dest,Prefix,RemoteFName]}}},Replies),
+ {NewRTs,NewReplies};
+ Result ->
+ Result
+ end.
+
+log_rec_open_2(Dest,Prefix,Node,MRef,FType,RemoteFName,RTs,Replies) ->
+ FName=Prefix++RemoteFName, % Our file name.
+ case file:open(filename:join([Dest,FName]),[write]) of
+ {ok,FD} ->
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,FType,FName,FD}),
+ {NewRTs,Replies};
+ {error,Reason} ->
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,void,void,void}),
+ NewReplies=
+ log_rec_addreply(Node,FType,{error,{file_open,{Reason,FName}}},Replies),
+ {NewRTs,NewReplies}
+ end.
+
+%% Help function adding a file that was unsuccessfully opened as failed.
+log_rec_open_failure(Node,MRef,FType,RemoteFName,RTs,Replies) ->
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,void,void,void}),
+ NewReplies=
+ log_rec_addreply(Node,
+ FType,
+ {error,{remote_open,RemoteFName}},Replies),
+ {NewRTs,NewReplies}.
+
+%% Help function whih writes the Bin to the FD file. If writing was unsuccessful,
+%% close the file and modify RTs and add a reply to Replies. Note that we can not
+%% stop the runtime from sending us more data belonging to this file. But we will
+%% simply just inore it from now on.
+%% Returns {SuccessCode,{NewRTs,NewReplies}}.
+log_rec_payload(Node,MRef,FType,FName,FD,Bin,RTs,Replies) ->
+ case file:write(FD,Bin) of
+ ok ->
+ {ok,{RTs,Replies}};
+ {error,Reason} ->
+ file:close(FD),
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,void,void,void}),
+ NewReplies=
+ log_rec_addreply(Node,FType,{error,{file_write,{Reason,FName}}},Replies),
+ {error,{NewRTs,NewReplies}}
+ end.
+
+%% Help function whih shall be used when a file has been successfully transfered.
+%% This function closes the output file and updates RTs and the Replies.
+%% Returns {NewRTs,NewReplies}.
+log_rec_eof(Node,MRef,FType,FName,FD,RTs,Replies) ->
+ file:close(FD),
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,void,void,void}),
+ {NewRTs,log_rec_addreply(Node,FType,{ok,FName},Replies)}.
+
+%% Help function which, if there is an open file, indicates it as truncated in the
+%% replies. And finalize the reply for Node assuming that the node is in total incomplete
+%% Returns {NewRTs,NewReplies}.
+log_rec_incomplete(Node,_FType,_FName,void,RTs,Replies) ->
+ NewRTs=lists:keydelete(Node,1,RTs), % The node is done.
+ {NewRTs,log_rec_mkreply(Node,incomplete,Replies)};
+log_rec_incomplete(Node,FType,FName,FD,RTs,Replies) ->
+ file:close(FD), % Not going to write anymore in this file.
+ NewRTs=lists:keydelete(Node,1,RTs), % The node is done.
+ NewReplies=log_rec_addreply(Node,FType,{error,{truncated,FName}},Replies),
+ {NewRTs,log_rec_mkreply(Node,incomplete,NewReplies)}.
+
+%% Help function handling the case when runtime component experiences an error
+%% transferering the file. That means that there will be no more chunks of this
+%% file. Hence it works a bit like EOF.
+%% Returns {NewRTs,NewReplies}.
+log_rec_error(Node,MRef,FType,FName,FD,RTs,Reason,Replies) ->
+ file:close(FD),
+ NewRTs=lists:keyreplace(Node,1,RTs,{Node,MRef,void,void,void}),
+ {NewRTs,log_rec_addreply(Node,FType,{error,{truncated,{Reason,FName}}},Replies)}.
+%% -----------------------------------------------------------------------------
+
+%% Help function adding a reply to the list of replies.
+%% Replies is a list {Node,FType,Reply} for each file handled, sucessfully or not.
+%% The list may also contain finalized nodes, which will be on the format:
+%% {Node,{Conclusion,[{trace_log,TraceLogReplies},{ti_log,TiLogReplies}]}}.
+log_rec_addreply(Node,FType,Reply,Replies) ->
+ [{Node,FType,Reply}|Replies].
+
+%% Help function which converts the {Node,FType,Reply} tuples in Replies to
+%% a finalized reply.
+log_rec_mkreply(Node,Conclusion,Replies) ->
+ {RemainingReplies,TiReplies,TraceReplies}=
+ log_rec_mkreply_node_ftype(Node,Replies,[],[],[]),
+ [{Node,{Conclusion,[{trace_log,TraceReplies},{ti_log,TiReplies}]}}|
+ RemainingReplies].
+
+%% Help function taking out the ti_log and trace_log file-types replies for
+%% Node. Returns {RemainingReplies,Ti,Trace}.
+log_rec_mkreply_node_ftype(Node,[{Node,ti_log,Result}|Rest],Replies,Ti,Trace) ->
+ log_rec_mkreply_node_ftype(Node,Rest,Replies,[Result|Ti],Trace);
+log_rec_mkreply_node_ftype(Node,[{Node,trace_log,Result}|Rest],Replies,Ti,Trace) ->
+ log_rec_mkreply_node_ftype(Node,Rest,Replies,Ti,[Result|Trace]);
+log_rec_mkreply_node_ftype(Node,[Reply|Rest],Replies,Ti,Trace) ->
+ log_rec_mkreply_node_ftype(Node,Rest,[Reply|Replies],Ti,Trace);
+log_rec_mkreply_node_ftype(_,[],Replies,Ti,Trace) ->
+ {Replies,Ti,Trace}.
+%% -----------------------------------------------------------------------------
+
+%% If the fetching job shall be cancelled, we must close all open files and
+%% remove them including all already closed files. Returns nothing significant.
+log_rec_cancel(Dest,RTs,Replies) ->
+ log_rec_cancel_open(Dest,RTs), % First close and remove all open files.
+ log_rec_cancel_finished(Dest,Replies). % Remove all already closed files.
+
+log_rec_cancel_open(Dest,[{_Node,_MRef,_FType,_FName,void}|Rest]) ->
+ log_rec_cancel_open(Dest,Rest); % There is no open file to close.
+log_rec_cancel_open(Dest,[{_Node,_MRef,_FType,FName,FD}|Rest]) ->
+ file:close(FD),
+ catch file:delete(filename:join(Dest,FName)), % Will just try to do my best.
+ log_rec_cancel_open(Dest,Rest);
+log_rec_cancel_open(_Dest,[]) ->
+ true.
+
+log_rec_cancel_finished(Dest,[{_N,_FT,Reply}|Rest]) ->
+ [FName]=log_rec_cancel_finished_get_fname([Reply]),
+ catch file:delete(filename:join(Dest,FName)),
+ log_rec_cancel_finished(Dest,Rest);
+log_rec_cancel_finished(Dest,[{_N,{_Conclusion,[{_,Replies1},{_,Replies2}]}}|Rest]) ->
+ FNames1=log_rec_cancel_finished_get_fname(Replies1),
+ lists:foreach(fun(FName)->
+ catch file:delete(filename:join(Dest,FName))
+ end,
+ FNames1),
+ FNames2=log_rec_cancel_finished_get_fname(Replies2),
+ lists:foreach(fun(FName)->
+ catch file:delete(filename:join(Dest,FName))
+ end,
+ FNames2),
+ log_rec_cancel_finished(Dest,Rest);
+log_rec_cancel_finished(_Dest,[]) ->
+ true.
+
+%% Help function going through all possible reply values for a file. So
+%% consequently there must be a clause here for every possible log_rec_addreply
+%% call above. Returns a list of filenames that shall be removed in order to
+%% restore the disk since the fetch job is cancelled.
+log_rec_cancel_finished_get_fname([{error,{file_open,{_,FName}}}|Rest]) ->
+ [FName|log_rec_cancel_finished_get_fname(Rest)];
+log_rec_cancel_finished_get_fname([{error,{file_write,{_,FName}}}|Rest]) ->
+ [FName|log_rec_cancel_finished_get_fname(Rest)];
+log_rec_cancel_finished_get_fname([{ok,FName}|Rest]) ->
+ [FName|log_rec_cancel_finished_get_fname(Rest)];
+log_rec_cancel_finished_get_fname([{error,{truncated,{_,FName}}}|Rest]) ->
+ [FName|log_rec_cancel_finished_get_fname(Rest)];
+log_rec_cancel_finished_get_fname([{error,{truncated,FName}}|Rest]) ->
+ [FName|log_rec_cancel_finished_get_fname(Rest)];
+log_rec_cancel_finished_get_fname([_|Rest]) -> % This shall not happend.
+ log_rec_cancel_finished_get_fname(Rest);
+log_rec_cancel_finished_get_fname([]) ->
+ [].
+%% -----------------------------------------------------------------------------
+
+%% EOF