From 12c5985b862c5e8e7e88033a21e909b51225d76f Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Fri, 17 Feb 2012 18:47:58 +0100 Subject: Parallel dataflow --- lib/dialyzer/src/dialyzer_callgraph.erl | 9 ++++---- lib/dialyzer/src/dialyzer_coordinator.erl | 21 ++++++++++-------- lib/dialyzer/src/dialyzer_succ_typings.erl | 35 ++++++++++++++++++------------ lib/dialyzer/src/dialyzer_worker.erl | 35 ++++++++++++++++++++---------- 4 files changed, 60 insertions(+), 40 deletions(-) diff --git a/lib/dialyzer/src/dialyzer_callgraph.erl b/lib/dialyzer/src/dialyzer_callgraph.erl index 7379fbc3c4..385f5621d6 100644 --- a/lib/dialyzer/src/dialyzer_callgraph.erl +++ b/lib/dialyzer/src/dialyzer_callgraph.erl @@ -262,9 +262,8 @@ module_postorder(#callgraph{digraph = DG}) -> MDG = digraph:new([acyclic]), MDG1 = digraph_confirm_vertices(Nodes, MDG), MDG2 = create_module_digraph(Edges, MDG1), - PostOrder = digraph_utils:postorder(MDG2), - digraph:delete(MDG2), - PostOrder. + PostOrder = digraph_utils:topsort(MDG2), + {PostOrder, MDG2}. %% The module deps of a module are modules that depend on the module -spec module_deps(callgraph()) -> dict(). @@ -341,9 +340,9 @@ reset_from_funs(Funs, #callgraph{digraph = DG, module_postorder_from_funs(Funs, #callgraph{digraph = DG} = CG) -> SubGraph = digraph_reaching_subgraph(Funs, DG), - PO = module_postorder(CG#callgraph{digraph = SubGraph}), + {PO, Active} = module_postorder(CG#callgraph{digraph = SubGraph}), digraph_delete(SubGraph), - PO. + {PO, CG#callgraph{active_digraph = Active}}. ets_lookup_dict(Key, Table) -> try ets:lookup_element(Table, Key, 2) of diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index a72d383365..fa78670883 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -52,7 +52,7 @@ scc_spawn/2, sccs_to_pids_reply/0, sccs_to_pids_request/2, - start/1, + start/2, receive_not_fixpoint/0 ]). @@ -64,8 +64,10 @@ -type coordinator() :: pid(). -type map() :: dict(). -type scc() :: [mfa_or_funlbl()]. +-type mode() :: typesig | dataflow. -record(state, {parent :: pid(), + mode :: mode(), spawn_count = 0 :: integer(), all_spawned = false :: boolean(), scc_to_pid = new_map() :: map(), @@ -77,10 +79,10 @@ %%-------------------------------------------------------------------- --spec start(dialyzer_typesig:servers()) -> pid(). +-spec start(mode(), dialyzer_typesig:servers()) -> pid(). -start(Servers) -> - {ok, Pid} = gen_server:start(?MODULE, {self(), Servers}, []), +start(Mode, Servers) -> + {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), Pid. -spec scc_spawn(scc(), coordinator()) -> ok. @@ -123,10 +125,10 @@ receive_not_fixpoint() -> %%-------------------------------------------------------------------- --spec init([]) -> {ok, #state{}}. +-spec init({pid(), mode(), dialyzer_succ_typings:servers()}) -> {ok, #state{}}. -init({Parent, Servers}) -> - {ok, #state{parent = Parent, servers = Servers}}. +init({Parent, Mode, Servers}) -> + {ok, #state{parent = Parent, mode = Mode, servers = Servers}}. -spec handle_call(Query::term(), From::term(), #state{}) -> {reply, Reply::term(), #state{}}. @@ -175,11 +177,12 @@ handle_cast({sccs_to_pids, SCCs, Worker}, scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), {noreply, State}; handle_cast({scc_spawn, SCC}, - #state{servers = Servers, + #state{mode = Mode, + servers = Servers, spawn_count = SpawnCount, scc_to_pid = SCCtoPID } = State) -> - Pid = dialyzer_worker:launch(SCC, Servers), + Pid = dialyzer_worker:launch(Mode, SCC, Servers), {noreply, State#state{spawn_count = SpawnCount + 1, scc_to_pid = store_map(SCC, Pid, SCCtoPID)} diff --git a/lib/dialyzer/src/dialyzer_succ_typings.erl b/lib/dialyzer/src/dialyzer_succ_typings.erl index 237adb36ea..90d851bc3c 100644 --- a/lib/dialyzer/src/dialyzer_succ_typings.erl +++ b/lib/dialyzer/src/dialyzer_succ_typings.erl @@ -31,7 +31,9 @@ analyze_callgraph/4, get_warnings/6, find_succ_types_for_scc/1, + refine_one_module/1, collect_scc_data/2, + collect_refine_scc_data/2, find_required_by/2, find_depends_on/2 ]). @@ -98,9 +100,10 @@ get_refined_success_typings(SCCs, State) -> {not_fixpoint, NotFixpoint1, State1} -> Callgraph = State1#st.callgraph, NotFixpoint2 = [lookup_name(F, Callgraph) || F <- NotFixpoint1], - ModulePostorder = + {ModulePostorder, ModCallgraph} = dialyzer_callgraph:module_postorder_from_funs(NotFixpoint2, Callgraph), - case refine_succ_typings(ModulePostorder, State1) of + ModState = State1#st{callgraph = ModCallgraph}, + case refine_succ_typings(ModulePostorder, ModState) of {fixpoint, State2} -> State2; {not_fixpoint, NotFixpoint3, State2} -> @@ -187,23 +190,27 @@ postprocess_dataflow_warns([{?WARN_CONTRACT_RANGE, {CallF, CallL}, Msg}|Rest], postprocess_dataflow_warns([W|Rest], State, Wacc, Acc) -> postprocess_dataflow_warns(Rest, State, Wacc, [W|Acc]). -refine_succ_typings(ModulePostorder, State) -> +refine_succ_typings(ModulePostorder, #st{codeserver = Codeserver, + callgraph = Callgraph, + plt = Plt} = State) -> ?debug("Module postorder: ~p\n", [ModulePostorder]), - refine_succ_typings(ModulePostorder, State, []). + Servers = {Codeserver, Callgraph, Plt}, + Coordinator = dialyzer_coordinator:start(dataflow, Servers), + refine_succ_typings(ModulePostorder, State, Coordinator). -refine_succ_typings([M|Rest], State, Fixpoint) -> - #st{callgraph = Callgraph, codeserver = CodeServer, plt = PLT} = State, +refine_succ_typings([M|Rest], State, Coordinator) -> Msg = io_lib:format("Dataflow of module: ~w\n", [M]), send_log(State#st.parent, Msg), ?debug("~s\n", [Msg]), - Data = collect_refine_scc_data(M, {CodeServer, Callgraph, PLT}), - FixpointFromScc = refine_one_module(Data), - NewFixpoint = ordsets:union(Fixpoint, FixpointFromScc), - refine_succ_typings(Rest, State, NewFixpoint); -refine_succ_typings([], State, Fixpoint) -> - case Fixpoint =:= [] of + dialyzer_coordinator:scc_spawn(M, Coordinator), + refine_succ_typings(Rest, State, Coordinator); +refine_succ_typings([], State, Coordinator) -> + dialyzer_coordinator:all_spawned(Coordinator), + NotFixpoint = dialyzer_coordinator:receive_not_fixpoint(), + ?debug("==================== Dataflow done ====================\n\n", []), + case NotFixpoint =:= [] of true -> {fixpoint, State}; - false -> {not_fixpoint, Fixpoint, State} + false -> {not_fixpoint, NotFixpoint, State} end. -type servers() :: term(). @@ -303,7 +310,7 @@ compare_types_1([], [], _Strict, NotFixpoint) -> find_succ_typings(SCCs, #st{codeserver = Codeserver, callgraph = Callgraph, plt = Plt} = State) -> Servers = {Codeserver, dialyzer_callgraph:mini_callgraph(Callgraph), Plt}, - Coordinator = dialyzer_coordinator:start(Servers), + Coordinator = dialyzer_coordinator:start(typesig, Servers), find_succ_typings(SCCs, State, Coordinator). find_succ_typings([SCC|Rest], #st{parent = Parent} = State, Coordinator) -> diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl index 0bfdcf6bdb..a2d30c27d3 100644 --- a/lib/dialyzer/src/dialyzer_worker.erl +++ b/lib/dialyzer/src/dialyzer_worker.erl @@ -20,11 +20,12 @@ -module(dialyzer_worker). --export([launch/2]). +-export([launch/3]). -type worker() :: pid(). -record(state, { + mode :: dialyzer_coordinator:mode(), scc = [] :: mfa_or_funlbl(), depends_on = [] :: list(), coordinator :: dialyzer_coordinator:coordinator(), @@ -44,10 +45,12 @@ %%-------------------------------------------------------------------- --spec launch([mfa_or_funlbl()], dialyzer_typesig:servers()) -> worker(). +-spec launch(dialyzer_coordinator:mode(), [mfa_or_funlbl()], + dialyzer_typesig:servers()) -> worker(). -launch(SCC, Servers) -> - State = #state{scc = SCC, +launch(Mode, SCC, Servers) -> + State = #state{mode = Mode, + scc = SCC, servers = Servers, coordinator = self()}, spawn(fun() -> loop(initializing, State) end). @@ -81,13 +84,13 @@ loop(waiting, State) -> loop(updating, NewState); loop(getting_data, State) -> ?debug("Data: ~p\n",[State#state.scc]), - loop(updating, get_typesig_data(State)); + loop(updating, get_data(State)); loop(running, State) -> ?debug("Run: ~p\n",[State#state.scc]), ok = ask_coordinator_for_callers(State), - NotFixpoint = find_succ_typings(State), + NotFixpoint = do_work(State), Callers = get_callers_reply_from_coordinator(), - ok = broadcast_own_succ_typings(State, Callers), + ok = broadcast_done(State, Callers), report_to_coordinator(NotFixpoint, State). waits_more_success_typings(#state{depends_on = Depends}) -> @@ -103,8 +106,13 @@ has_data(#state{scc_data = Data}) -> _ -> true end. -get_typesig_data(#state{scc = SCC, servers = Servers} = State) -> - State#state{scc_data = dialyzer_succ_typings:collect_scc_data(SCC, Servers)}. +get_data(#state{mode = Mode, scc = SCC, servers = Servers} = State) -> + Data = + case Mode of + typesig -> dialyzer_succ_typings:collect_scc_data(SCC, Servers); + dataflow -> dialyzer_succ_typings:collect_refine_scc_data(SCC, Servers) + end, + State#state{scc_data = Data}. ask_coordinator_for_callers(#state{scc = SCC, servers = Servers, @@ -117,7 +125,7 @@ ask_coordinator_for_callers(#state{scc = SCC, get_callers_reply_from_coordinator() -> dialyzer_coordinator:sccs_to_pids_reply(). -broadcast_own_succ_typings(#state{scc = SCC}, Callers) -> +broadcast_done(#state{scc = SCC}, Callers) -> ?debug("Sending ~p: ~p\n",[SCC, Callers]), SendSTFun = fun(PID) -> PID ! {done, SCC} end, lists:foreach(SendSTFun, Callers). @@ -133,8 +141,11 @@ wait_for_success_typings(#state{depends_on = DependsOn} = State) -> State end. -find_succ_typings(#state{scc_data = SCCData}) -> - dialyzer_succ_typings:find_succ_types_for_scc(SCCData). +do_work(#state{mode = Mode, scc_data = SCCData}) -> + case Mode of + typesig -> dialyzer_succ_typings:find_succ_types_for_scc(SCCData); + dataflow -> dialyzer_succ_typings:refine_one_module(SCCData) + end. report_to_coordinator(NotFixpoint, #state{scc = SCC, coordinator = Coordinator}) -> -- cgit v1.2.3