diff options
author | Stavros Aronis <[email protected]> | 2012-02-28 22:49:04 +0100 |
---|---|---|
committer | Henrik Nord <[email protected]> | 2012-05-21 15:31:21 +0200 |
commit | 1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a (patch) | |
tree | 7f30f58e3e4ca2befda694a04db8e55966cf5786 /lib/dialyzer/src | |
parent | 913ee73601e3f7d0b27d833bd67cf6ee3868018a (diff) | |
download | otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.gz otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.bz2 otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.zip |
Coordinator is no longer a separate process
Diffstat (limited to 'lib/dialyzer/src')
-rw-r--r-- | lib/dialyzer/src/dialyzer_analysis_callgraph.erl | 91 | ||||
-rw-r--r-- | lib/dialyzer/src/dialyzer_coordinator.erl | 459 | ||||
-rw-r--r-- | lib/dialyzer/src/dialyzer_succ_typings.erl | 76 | ||||
-rw-r--r-- | lib/dialyzer/src/dialyzer_worker.erl | 33 |
4 files changed, 258 insertions, 401 deletions
diff --git a/lib/dialyzer/src/dialyzer_analysis_callgraph.erl b/lib/dialyzer/src/dialyzer_analysis_callgraph.erl index 02108d6e38..e2e65d2e25 100644 --- a/lib/dialyzer/src/dialyzer_analysis_callgraph.erl +++ b/lib/dialyzer/src/dialyzer_analysis_callgraph.erl @@ -30,14 +30,14 @@ -export([start/3]). --export([compile_coordinator_init/0, - add_to_result/3, +-export([compile_init_result/0, + add_to_result/4, start_compilation/2, continue_compilation/2]). --export_type([compilation_data/0, - result/0, - servers/0]). +-export_type([compile_mid_data/0, + compile_result/0, + compile_init_data/0]). -include("dialyzer.hrl"). @@ -210,28 +210,37 @@ analyze_callgraph(Callgraph, State) -> %% Build the callgraph and fill the codeserver. %%-------------------------------------------------------------------- +-record(compile_init,{ + callgraph :: dialyzer_callgraph:callgraph(), + codeserver :: dialyzer_codeserver:codeserver(), + defines = [] :: [dial_define()], + include_dirs = [] :: [file:filename()], + start_from = byte_code :: start_from(), + use_contracts = true :: boolean() + }). + +make_compile_init(#analysis_state{codeserver = Codeserver, + defines = Defs, + include_dirs = Dirs, + use_contracts = UseContracts, + start_from = StartFrom}, Callgraph) -> + #compile_init{callgraph = Callgraph, + codeserver = Codeserver, + defines = [{d, Macro, Val} || {Macro, Val} <- Defs], + include_dirs = [{i, D} || D <- Dirs], + use_contracts = UseContracts, + start_from = StartFrom}. + compile_and_store(Files, #analysis_state{codeserver = CServer, - defines = Defs, - include_dirs = Dirs, - parent = Parent, - use_contracts = UseContracts, - start_from = StartFrom - } = State) -> + parent = Parent} = State) -> send_log(Parent, "Reading files and computing callgraph... "), {T1, _} = statistics(runtime), - Includes = [{i, D} || D <- Dirs], - Defines = [{d, Macro, Val} || {Macro, Val} <- Defs], Callgraph = dialyzer_callgraph:new(), - Servers = {Callgraph, CServer, StartFrom, Includes, Defines, UseContracts}, - Coordinator = dialyzer_coordinator:start(compile, Servers), - Spawner = fun(F) -> dialyzer_coordinator:compiler_spawn(F, Coordinator) end, - lists:foreach(Spawner, Files), - dialyzer_coordinator:all_spawned(Coordinator), - {{V, E, Failed, NoWarn, Modules}, NextLabel} = - ?timing("compile", _C1, dialyzer_coordinator:receive_compilation_data()), + CompileInit = make_compile_init(State, Callgraph), + {{Failed, NoWarn, Modules}, NextLabel} = + ?timing("compile", + dialyzer_coordinator:parallel_job(compile, Files, CompileInit)), CServer2 = dialyzer_codeserver:set_next_core_label(NextLabel, CServer), - Callgraph = - ?timing("digraph", _C2, dialyzer_callgraph:add_edges(E, V, Callgraph)), case Failed =:= [] of true -> NewFiles = lists:zip(lists:reverse(Modules), Files), @@ -255,33 +264,39 @@ compile_and_store(Files, #analysis_state{codeserver = CServer, send_log(Parent, Msg2), {Callgraph, sets:from_list(NoWarn), CServer2}. --type servers() :: term(). %%opaque --type result() :: term(). %%opaque --type file_result() :: term(). %%opaque --type compilation_data() :: term(). %%opaque +-type compile_init_data() :: #compile_init{}. +-type compile_result() :: {list(), list(), list()}. %%opaque +-type one_file_result() :: term(). %%opaque +-type compile_mid_data() :: term(). %%opaque --spec compile_coordinator_init() -> result(). +-spec compile_init_result() -> compile_result(). -compile_coordinator_init() -> {[], [], [], [], []}. +compile_init_result() -> {[], [], []}. --spec add_to_result(file:filename(), file_result(), result()) -> result(). +-spec add_to_result(file:filename(), one_file_result(), compile_result(), + compile_init_data()) -> compile_result(). -add_to_result(File, NewData, {V, E, Failed, NoWarn, Mods}) -> +add_to_result(File, NewData, {Failed, NoWarn, Mods}, InitData) -> case NewData of {error, Reason} -> {[{File, Reason}|Failed], NoWarn, Mods}; - {ok, NV, NE, NewNoWarn, Mod} -> - {NV ++ V, NE ++ E, Failed, NewNoWarn ++ NoWarn, [Mod|Mods]} + {ok, V, E, NewNoWarn, Mod} -> + Callgraph = InitData#compile_init.callgraph, + dialyzer_callgraph:add_edges(E, V, Callgraph), + {Failed, NewNoWarn ++ NoWarn, [Mod|Mods]} end. --spec start_compilation(file:filename(), servers()) -> - {error, term()} |{ok, integer(), compilation_data()}. +-spec start_compilation(file:filename(), compile_init_data()) -> + {error, term()} |{ok, integer(), compile_mid_data()}. -start_compilation(File, {Callgraph, Codeserver, StartFrom, - Includes, Defines, UseContracts}) -> +start_compilation(File, + #compile_init{callgraph = Callgraph, codeserver = Codeserver, + defines = Defines, include_dirs = IncludeD, + use_contracts = UseContracts, + start_from = StartFrom}) -> case StartFrom of src_code -> - compile_src(File, Includes, Defines, Callgraph, Codeserver, UseContracts); + compile_src(File, IncludeD, Defines, Callgraph, Codeserver, UseContracts); byte_code -> compile_byte(File, Callgraph, Codeserver, UseContracts) end. @@ -379,7 +394,7 @@ store_core(Mod, Core, NoWarn, Callgraph, CServer) -> CoreTree = cerl:from_records(Core), {ok, cerl_trees:size(CoreTree), {Mod, CoreTree, NoWarn, Callgraph, CServer}}. --spec continue_compilation(integer(), compilation_data()) -> file_result(). +-spec continue_compilation(integer(), compile_mid_data()) -> one_file_result(). continue_compilation(NextLabel, {Mod, CoreTree, NoWarn, Callgraph, CServer}) -> {LabeledTree, _NewNextLabel} = cerl_trees:label(CoreTree, NextLabel), diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index c417b45717..490cfb8d49 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -21,61 +21,23 @@ %%%------------------------------------------------------------------- %%% File : dialyzer_coordinator.erl %%% Authors : Stavros Aronis <[email protected]> -%%% -%%% Description: -%%% -%%% The parallel version of Dialyzer's typesig analysis is spread over 4 modules -%%% with the intention to both minimize the changes on the original code and use -%%% a separate module for every kind of Erlang process that will be running. -%%% -%%% There are therefore 3 kinds of processes: -%%% -%%% - The original Dialyzer backend (in succ_typings module) -%%% - The worker process for the typesig analysis (in typesig and -%%% worker) -%%% - A coordinator of the worker processes (in coordinator) -%%% -%%% Operation guidelines: -%%% -%%% - The backend requests from the coordinator to spawn a worker for each SCC -%%% - The backend notifies the coordinator when all SCC have been spawned and -%%% waits for the server to report that the PLT has been updated -%%% - Each worker is responsible to notify all those who wait for it. -%%% %%%------------------------------------------------------------------- -module(dialyzer_coordinator). -%%% Exports for all possible uses of coordinator from main process --export([start/2, all_spawned/1]). +%%% Export for dialyzer main process +-export([parallel_job/3]). %%% Exports for all possible workers -export([wait_activation/0, job_done/3]). -%%% Export for the typesig, dataflow and warnings main process --export([scc_spawn/2]). - -%%% Export for the typesig and dataflow analysis main process --export([receive_not_fixpoint/0]). - -%%% Export for warning main process --export([receive_warnings/0]). - %%% Exports for the typesig and dataflow analysis workers --export([request_activation/1, sccs_to_pids/1]). - -%%% Exports for the compilation main process --export([compiler_spawn/2, receive_compilation_data/0]). +-export([sccs_to_pids/1]). %%% Exports for the compilation workers -export([get_next_label/2, compilation_done/3]). --export_type([coordinator/0, mode/0]). - --behaviour(gen_server). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export_type([coordinator/0, mode/0, init_data/0]). %%-------------------------------------------------------------------- @@ -83,41 +45,169 @@ -type coordinator() :: pid(). %%opaque --type scc() :: [mfa_or_funlbl()] | module(). +-type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. --type servers() :: dialyzer_succ_typings:servers() | - dialyzer_analysis_callgraph:servers() | - dialyzer_succ_typings:warning_servers(). - --record(state, {parent :: pid(), - mode :: mode(), - spawn_count = 0 :: integer(), - all_spawned = false :: boolean(), - next_label :: integer(), - result :: [mfa_or_funlbl()] | - dialyzer_analysis_callgraph:result() | - [dial_warning()], - init_job_data :: servers(), - tickets :: integer(), - queue :: queue() + +-type compile_jobs() :: [file:filename()]. +-type typesig_jobs() :: [scc()]. +-type dataflow_jobs() :: [module()]. +-type warnings_jobs() :: [module()]. + +-type compile_init_data() :: dialyzer_analysis_callgraph:compile_init_data(). +-type typesig_init_data() :: dialyzer_succ_typings:typesig_init_data(). +-type dataflow_init_data() :: dialyzer_succ_typings:dataflow_init_data(). +-type warnings_init_data() :: dialyzer_succ_typings:warnings_init_data(). + +-type compile_result() :: dialyzer_analysis_callgraph:compile_result(). +-type typesig_result() :: [mfa_or_funlbl()]. +-type dataflow_result() :: [mfa_or_funlbl()]. +-type warnings_result() :: [dial_warning()]. + +-type init_data() :: compile_init_data() | typesig_init_data() | + dataflow_init_data() | warnings_init_data(). + +-type result() :: compile_result() | typesig_result() | + dataflow_result() | warnings_result(). + +-record(state, {active = 0 :: integer(), + result :: result(), + next_label = 0 :: integer(), + tickets = 0 :: integer(), + queue = queue:new() :: queue(), + init_data :: init_data() }). -include("dialyzer.hrl"). %%-------------------------------------------------------------------- --spec start(mode(), servers()) -> coordinator(). +-spec parallel_job('compile', compile_jobs(), compile_init_data()) -> + {compile_result(), integer()}; + ('typesig', typesig_jobs(), typesig_init_data()) -> + typesig_result(); + ('dataflow', dataflow_jobs(), dataflow_init_data()) -> + dataflow_result(); + ('warnings', warnings_jobs(), warnings_init_data()) -> + warnings_result(). -start(Mode, Servers) -> - {ok, Pid} = gen_server:start_link(?MODULE, {self(), Mode, Servers}, []), - Pid. +parallel_job(Mode, Jobs, InitData) -> + State = spawn_jobs(Mode, Jobs, InitData), + collect_result(Mode, State). --spec scc_spawn(scc() | module(), coordinator()) -> ok. +spawn_jobs(Mode, Jobs, InitData) when + Mode =:= 'typesig'; Mode =:= 'dataflow' -> + Coordinator = self(), + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + Fold = + fun(Job, Count) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + true = ets:insert(?MAP, {Job, Pid}), + Count + 1 + end, + JobCount = lists:foldl(Fold, 0, Jobs), + #state{active = JobCount, result = [], init_data = InitData}; +spawn_jobs(Mode, Jobs, InitData) when + Mode =:= 'compile'; Mode =:= 'warnings' -> + Coordinator = self(), + Fold = + fun(Job, {InTickets, InQueue, Count}) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + NewCount = Count + 1, + case InTickets of + 0 -> {InTickets, queue:in(Pid, InQueue), NewCount}; + N -> activate_pid(Pid), {N-1, InQueue, NewCount} + end + end, + CPUs = erlang:system_info(logical_processors_available), + InitTickets = 4*CPUs, + {Tickets, Queue, JobCount} = + lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), + InitResult = + case Mode of + 'warnings' -> []; + 'compile' -> dialyzer_analysis_callgraph:compile_init_result() + end, + #state{active = JobCount, result = InitResult, next_label = 0, + tickets = Tickets, queue = Queue, init_data = InitData}. + +collect_result(Mode, State) -> + case Mode of + 'compile' -> compile_loop(State); + 'typesig' -> not_fixpoint_loop(State); + 'dataflow' -> not_fixpoint_loop(State); + 'warnings' -> warnings_loop(State) + end. -scc_spawn(SCC, Coordinator) -> - cast({scc_spawn, SCC}, Coordinator). +compile_loop(#state{active = Active, result = Result, + next_label = NextLabel, tickets = Tickets, + queue = Queue, init_data = InitData} = State) -> + receive + {next_label_request, Estimation, Pid} -> + Pid ! {next_label_reply, NextLabel}, + compile_loop(State#state{next_label = NextLabel + Estimation}); + {done, Job, Data} -> + NewResult = + dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData), + case Active of + 1 -> + {NewResult, NextLabel}; + _ -> + NewActive = Active - 1, + {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), + NewState = + State#state{result = NewResult, active = NewActive, + queue = NewQueue, tickets = NewTickets}, + compile_loop(NewState) + end + end. --spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}. +not_fixpoint_loop(#state{active = Active, result = Result, + init_data = InitData} = State) -> + receive + {done, _Job, Data} -> + FinalData = dialyzer_succ_typings:lookup_names(Data, InitData), + NewResult = FinalData ++ Result, + case Active of + 1 -> + ets:delete(?MAP), + NewResult; + _ -> + NewActive = Active - 1, + NewState = State#state{active = NewActive, result = NewResult}, + not_fixpoint_loop(NewState) + end + end. + +warnings_loop(#state{active = Active, result = Result, tickets = Tickets, + queue = Queue} = State) -> + receive + {done, _Job, Data} -> + NewResult = Data ++ Result, + case Active of + 1 -> NewResult; + _ -> + NewActive = Active - 1, + {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), + NewState = + State#state{result = NewResult, active = NewActive, + queue = NewQueue, tickets = NewTickets}, + warnings_loop(NewState) + end + end. + +manage_waiting(Queue, Tickets) -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets + 1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, + {NewQueue, NewTickets}. + +-spec sccs_to_pids([scc() | module()]) -> + {[dialyzer_worker:worker()], [scc() | module()]}. sccs_to_pids(SCCs) -> lists:foldl(fun pid_partition/2, {[], []}, SCCs). @@ -129,70 +219,27 @@ pid_partition(SCC, {Pids, Unknown}) -> _:_ -> {Pids, [SCC|Unknown]} end. --spec job_done(scc() | file:filename(), term(), coordinator()) -> ok. +-spec job_done(scc() | module() | file:filename(), term(), coordinator()) -> ok. job_done(Job, Result, Coordinator) -> - cast({done, Job, Result}, Coordinator). + Coordinator ! {done, Job, Result}, + ok. -spec compilation_done(file:filename(), - dialyzer_analysis_callgraph:compilation_data(), + dialyzer_analysis_callgraph:compile_result(), coordinator()) -> ok. compilation_done(Filename, CompilationData, Coordinator) -> - cast({done, Filename, CompilationData}, Coordinator). - --spec all_spawned(coordinator()) -> ok. - -all_spawned(Coordinator) -> - cast(all_spawned, Coordinator). - -send_done_to_parent(#state{mode = Mode, - parent = Parent, - result = Result, - next_label = NextLabel}) -> - Msg = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - ets:delete(?MAP), - {not_fixpoint, Result}; - 'compile' -> {compilation_data, Result, NextLabel}; - 'warnings' -> {warnings, Result} - end, - Parent ! Msg, + Coordinator ! {done, Filename, CompilationData}, ok. --spec receive_not_fixpoint() -> [mfa_or_funlbl()]. - -receive_not_fixpoint() -> - receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. - --spec receive_compilation_data() -> - {dialyzer_analysis_callgraph:result(), integer()}. - -receive_compilation_data() -> - receive {compilation_data, CompilationData, NextLabel} -> - {CompilationData, NextLabel} - end. - --spec receive_warnings() -> [dial_warning()]. - -receive_warnings() -> - receive {warnings, Warnings} -> Warnings end. - --spec compiler_spawn(file:filename(), coordinator()) -> ok. - -compiler_spawn(Filename, Coordinator) -> - cast({compiler_spawn, Filename}, Coordinator). - -spec get_next_label(integer(), coordinator()) -> integer(). get_next_label(EstimatedSize, Coordinator) -> - call({get_next_label, EstimatedSize}, Coordinator). - --spec request_activation(coordinator()) -> ok. - -request_activation(Coordinator) -> - cast({request_activation, self()}, Coordinator). + Coordinator ! {next_label_request, EstimatedSize, self()}, + receive + {next_label_reply, NextLabel} -> NextLabel + end. -spec wait_activation() -> ok. @@ -201,179 +248,3 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. - -%%-------------------------------------------------------------------- - --spec init({pid(), mode(), servers()}) -> {ok, #state{}}. - -init({Parent, Mode, InitJobData}) -> - BaseTickets = erlang:system_info(logical_processors_available), - Tickets = - case Mode of - 'compile' -> 4*BaseTickets; - 'warnings' -> 4*BaseTickets; - _ -> non_regulated - end, - InitState = - #state{parent = Parent, mode = Mode, init_job_data = InitJobData, - tickets = Tickets, queue = queue:new()}, - State = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), - InitState#state{result = []}; - 'warnings' -> - InitState#state{result = []}; - 'compile' -> - InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), - InitState#state{result = InitResult, next_label = 0} - end, - {ok, State}. - --spec handle_call(Query::term(), From::term(), #state{}) -> - {reply, Reply::term(), #state{}}. - -handle_call({get_next_label, EstimatedSize}, _From, - #state{next_label = NextLabel} = State) -> - {reply, NextLabel, State#state{next_label = NextLabel + EstimatedSize}}. - --spec handle_cast(Msg::term(), #state{}) -> - {noreply, #state{}} | {stop, normal, #state{}}. - -handle_cast({done, _Job, NewData}, - #state{mode = Mode, result = OldResult, - init_job_data = Servers} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - UpdatedState = State#state{result = FinalData ++ OldResult}, - reduce_or_stop(UpdatedState); -handle_cast({done, Job, NewData}, - #state{mode = Mode, - result = OldResult, - tickets = Tickets, - queue = Queue} = State) when - Mode =:= 'compile'; Mode =:= 'warnings' -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets+1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - NewResult = - case Mode of - 'compile' -> - dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); - 'warnings' -> - NewData ++ OldResult - end, - UpdatedState = - State#state{result = NewResult, tickets = NewTickets, queue = NewQueue}, - reduce_or_stop(UpdatedState); -handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> - case SpawnCount of - 0 -> - send_done_to_parent(State), - {stop, normal, State}; - _ -> - NewState = State#state{all_spawned = true}, - {noreply, NewState} - end; -handle_cast({request_activation, Pid}, - #state{tickets = Tickets, queue = Queue} = State) -> - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{tickets = NewTickets, queue = NewQueue}}; -handle_cast({scc_spawn, SCC}, - #state{mode = Mode, - init_job_data = Servers, - spawn_count = SpawnCount} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - true = ets:insert(?MAP, {SCC, Pid}), - {noreply, State#state{spawn_count = SpawnCount + 1}}; -handle_cast({scc_spawn, SCC}, - #state{mode = 'warnings', - init_job_data = Servers, - spawn_count = SpawnCount, - tickets = Tickets, - queue = Queue} = State) -> - Pid = dialyzer_worker:launch('warnings', SCC, Servers, self()), - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{spawn_count = SpawnCount + 1, - tickets = NewTickets, - queue = NewQueue}}; -handle_cast({compiler_spawn, Filename}, - #state{mode = Mode, - init_job_data = Servers, - spawn_count = SpawnCount, - tickets = Tickets, - queue = Queue - } = State) -> - Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()), - NewSpawnCount = SpawnCount + 1, - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{spawn_count = NewSpawnCount, - tickets = NewTickets, - queue = NewQueue}}. - --spec handle_info(term(), #state{}) -> {noreply, #state{}}. - -handle_info(_Info, State) -> - {noreply, State}. - --spec terminate(term(), #state{}) -> ok. - -terminate(_Reason, _State) -> - ok. - --spec code_change(term(), #state{}, term()) -> {ok, #state{}}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -cast(Message, Coordinator) -> - gen_server:cast(Coordinator, Message). - -call(Message, Coordinator) -> - gen_server:call(Coordinator, Message, infinity). - -reduce_or_stop(#state{all_spawned = AllSpawned, - spawn_count = SpawnCount} = State) -> - Action = - case AllSpawned of - false -> reduce; - true -> - case SpawnCount of - 1 -> finish; - _ -> reduce - end - end, - case Action of - reduce -> - NewState = State#state{spawn_count = SpawnCount - 1}, - {noreply, NewState}; - finish -> - send_done_to_parent(State), - {stop, normal, State} - end. diff --git a/lib/dialyzer/src/dialyzer_succ_typings.erl b/lib/dialyzer/src/dialyzer_succ_typings.erl index ff4102f5a3..b980e43ecc 100644 --- a/lib/dialyzer/src/dialyzer_succ_typings.erl +++ b/lib/dialyzer/src/dialyzer_succ_typings.erl @@ -41,7 +41,7 @@ lookup_names/2 ]). --export_type([servers/0, warning_servers/0]). +-export_type([typesig_init_data/0, dataflow_init_data/0, warnings_init_data/0]). %%-define(DEBUG, true). @@ -61,7 +61,12 @@ %% State record -- local to this module -type parent() :: 'none' | pid(). --type servers() :: term(). %%opaque +-type typesig_init_data() :: term(). +-type dataflow_init_data() :: term(). +-type warnings_init_data() :: term(). + +-type fixpoint_init_data() :: typesig_init_data() | dataflow_init_data(). + -type scc() :: [mfa_or_funlbl()] | [module()]. @@ -146,12 +151,8 @@ get_warnings(Callgraph, Plt, DocPlt, Codeserver, NoWarnUnused, Parent) -> get_warnings_from_modules(Mods, State, DocPlt) -> #st{callgraph = Callgraph, codeserver = Codeserver, no_warn_unused = NoWarnUnused, plt = Plt} = State, - Servers = {Callgraph, Codeserver, NoWarnUnused, Plt, DocPlt}, - Coordinator = dialyzer_coordinator:start('warnings', Servers), - Spawner = fun(M) -> dialyzer_coordinator:scc_spawn(M, Coordinator) end, - lists:foreach(Spawner, Mods), - dialyzer_coordinator:all_spawned(Coordinator), - dialyzer_coordinator:receive_warnings(). + Init = {Callgraph, Codeserver, NoWarnUnused, Plt, DocPlt}, + dialyzer_coordinator:parallel_job(warnings, Mods, Init). -type warning_servers() :: term(). @@ -210,45 +211,36 @@ postprocess_dataflow_warns([{?WARN_CONTRACT_RANGE, {CallF, CallL}, Msg}|Rest], postprocess_dataflow_warns(Rest, Codeserver, WAcc, [W|Acc]) end. -refine_succ_typings(ModulePostorder, #st{codeserver = Codeserver, - callgraph = Callgraph, - plt = Plt} = State) -> +refine_succ_typings(Modules, #st{codeserver = Codeserver, + callgraph = Callgraph, + plt = Plt} = State) -> ?debug("Module postorder: ~p\n", [ModulePostorder]), - Servers = {Codeserver, Callgraph, Plt}, - Coordinator = dialyzer_coordinator:start(dataflow, Servers), - ?timing("refine",refine_succ_typings(ModulePostorder, State, Coordinator)). - -refine_succ_typings([M|Rest], State, Coordinator) -> - Msg = io_lib:format("Dataflow of module: ~w\n", [M]), - send_log(State#st.parent, Msg), - ?debug("~s\n", [Msg]), - dialyzer_coordinator:scc_spawn(M, Coordinator), - refine_succ_typings(Rest, State, Coordinator); -refine_succ_typings([], State, Coordinator) -> - dialyzer_coordinator:all_spawned(Coordinator), - NotFixpoint = dialyzer_coordinator:receive_not_fixpoint(), + Init = {Codeserver, Callgraph, Plt}, + NotFixpoint = + ?timing("refine", + dialyzer_coordinator:parallel_job(dataflow, Modules, Init)), ?debug("==================== Dataflow done ====================\n\n", []), case NotFixpoint =:= [] of true -> {fixpoint, State}; false -> {not_fixpoint, NotFixpoint, State} end. --spec find_depends_on(scc() | module(), servers()) -> [scc()]. +-spec find_depends_on(scc() | module(), fixpoint_init_data()) -> [scc()]. find_depends_on(SCC, {_Codeserver, Callgraph, _Plt}) -> dialyzer_callgraph:get_depends_on(SCC, Callgraph). --spec find_required_by(scc() | module(), servers()) -> [scc()]. +-spec find_required_by(scc() | module(), fixpoint_init_data()) -> [scc()]. find_required_by(SCC, {_Codeserver, Callgraph, _Plt}) -> dialyzer_callgraph:get_required_by(SCC, Callgraph). --spec lookup_names([label()], servers()) -> [mfa_or_funlbl()]. +-spec lookup_names([label()], fixpoint_init_data()) -> [mfa_or_funlbl()]. lookup_names(Labels, {_Codeserver, Callgraph, _Plt}) -> [lookup_name(F, Callgraph) || F <- Labels]. --spec refine_one_module(module(), servers()) -> [label()]. % ordset +-spec refine_one_module(module(), dataflow_init_data()) -> [label()]. % ordset refine_one_module(M, {CodeServer, Callgraph, Plt}) -> ModCode = dialyzer_codeserver:lookup_mod_code(M, CodeServer), @@ -322,28 +314,17 @@ compare_types_1([], [], _Strict, NotFixpoint) -> find_succ_typings(SCCs, #st{codeserver = Codeserver, callgraph = Callgraph, plt = Plt} = State) -> - Servers = {Codeserver, dialyzer_callgraph:mini_callgraph(Callgraph), Plt}, - Coordinator = dialyzer_coordinator:start(typesig, Servers), - ?timing("spawn", _C1, find_succ_typings(SCCs, State, Coordinator)), - dialyzer_coordinator:all_spawned(Coordinator), + Init = {Codeserver, dialyzer_callgraph:mini_callgraph(Callgraph), Plt}, NotFixpoint = - ?timing("typesig", _C2, dialyzer_coordinator:receive_not_fixpoint()), + ?timing("typesig", + dialyzer_coordinator:parallel_job(typesig, SCCs, Init)), ?debug("==================== Typesig done ====================\n\n", []), case NotFixpoint =:= [] of true -> {fixpoint, State}; false -> {not_fixpoint, NotFixpoint, State} end. -find_succ_typings([SCC|Rest], #st{parent = Parent} = State, Coordinator) -> - Msg = io_lib:format("Typesig analysis for SCC: ~w\n", [format_scc(SCC)]), - ?debug("~s", [Msg]), - send_log(Parent, Msg), - dialyzer_coordinator:scc_spawn(SCC, Coordinator), - find_succ_typings(Rest, State, Coordinator); -find_succ_typings([], _State, _Coordinator) -> - ok. - --spec find_succ_types_for_scc(scc(), servers()) -> [mfa_or_funlbl()]. +-spec find_succ_types_for_scc(scc(), typesig_init_data()) -> [mfa_or_funlbl()]. find_succ_types_for_scc(SCC, {Codeserver, Callgraph, Plt}) -> SCC_Info = [{MFA, @@ -459,12 +440,3 @@ lookup_name(F, CG) -> error -> F; {ok, Name} -> Name end. - -send_log(none, _Msg) -> - ok; -send_log(Parent, Msg) -> - Parent ! {self(), log, lists:flatten(Msg)}, - ok. - -format_scc(SCC) -> - [MFA || {_M, _F, _A} = MFA <- SCC]. diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl index 0ef30cf940..cc4032d154 100644 --- a/lib/dialyzer/src/dialyzer_worker.erl +++ b/lib/dialyzer/src/dialyzer_worker.erl @@ -28,14 +28,13 @@ -type mode() :: dialyzer_coordinator:mode(). -type coordinator() :: dialyzer_coordinator:coordinator(). --type servers() :: dialyzer_succ_typings:servers() | - dialyzer_analysis_callgraph:servers(). +-type init_data() :: dialyzer_coordinator:init_data(). -record(state, { mode :: mode(), job :: mfa_or_funlbl() | file:filename(), coordinator :: coordinator(), - servers :: servers(), + init_data :: init_data(), depends_on = [] :: list() }). @@ -51,12 +50,12 @@ %%-------------------------------------------------------------------- --spec launch(mode(), [mfa_or_funlbl()], servers(), coordinator()) -> worker(). +-spec launch(mode(), [mfa_or_funlbl()], init_data(), coordinator()) -> worker(). -launch(Mode, Job, Servers, Coordinator) -> +launch(Mode, Job, InitData, Coordinator) -> State = #state{mode = Mode, job = Job, - servers = Servers, + init_data = InitData, coordinator = Coordinator}, InitState = case Mode of @@ -75,8 +74,8 @@ loop(updating, State) -> false -> running end, loop(NextStatus, State); -loop(initializing, #state{job = SCC, servers = Servers} = State) -> - DependsOn = dialyzer_succ_typings:find_depends_on(SCC, Servers), +loop(initializing, #state{job = SCC, init_data = InitData} = State) -> + DependsOn = dialyzer_succ_typings:find_depends_on(SCC, InitData), ?debug("Deps ~p: ~p\n",[State#state.job, DependsOn]), loop(updating, State#state{depends_on = DependsOn}); loop(waiting, State) -> @@ -113,8 +112,8 @@ waits_more_success_typings(#state{depends_on = Depends}) -> _ -> true end. -broadcast_done(#state{job = SCC, servers = Servers}) -> - RequiredBy = dialyzer_succ_typings:find_required_by(SCC, Servers), +broadcast_done(#state{job = SCC, init_data = InitData}) -> + RequiredBy = dialyzer_succ_typings:find_required_by(SCC, InitData), {Callers, Unknown} = dialyzer_coordinator:sccs_to_pids(RequiredBy), send_done(Callers, SCC), continue_broadcast_done(Unknown, SCC). @@ -144,18 +143,18 @@ wait_for_success_typings(#state{depends_on = DependsOn} = State) -> State end. -do_work(#state{mode = Mode, job = Job, servers = Servers}) -> +do_work(#state{mode = Mode, job = Job, init_data = InitData}) -> case Mode of - typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, Servers); - dataflow -> dialyzer_succ_typings:refine_one_module(Job, Servers) + typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData); + dataflow -> dialyzer_succ_typings:refine_one_module(Job, InitData) end. report_to_coordinator(Result, #state{job = Job, coordinator = Coordinator}) -> ?debug("Done: ~p\n",[Job]), dialyzer_coordinator:job_done(Job, Result, Coordinator). -start_compilation(#state{job = Job, servers = Servers}) -> - dialyzer_analysis_callgraph:start_compilation(Job, Servers). +start_compilation(#state{job = Job, init_data = InitData}) -> + dialyzer_analysis_callgraph:start_compilation(Job, InitData). ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) -> dialyzer_coordinator:get_next_label(EstimatedSize, Coordinator). @@ -163,5 +162,5 @@ ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) -> continue_compilation(Label, Data) -> dialyzer_analysis_callgraph:continue_compilation(Label, Data). -collect_warnings(#state{job = Job, servers = Servers}) -> - dialyzer_succ_typings:collect_warnings(Job, Servers). +collect_warnings(#state{job = Job, init_data = InitData}) -> + dialyzer_succ_typings:collect_warnings(Job, InitData). |