aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorStavros Aronis <[email protected]>2012-02-28 22:49:04 +0100
committerHenrik Nord <[email protected]>2012-05-21 15:31:21 +0200
commit1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a (patch)
tree7f30f58e3e4ca2befda694a04db8e55966cf5786 /lib
parent913ee73601e3f7d0b27d833bd67cf6ee3868018a (diff)
downloadotp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.gz
otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.bz2
otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.zip
Coordinator is no longer a separate process
Diffstat (limited to 'lib')
-rw-r--r--lib/dialyzer/src/dialyzer_analysis_callgraph.erl91
-rw-r--r--lib/dialyzer/src/dialyzer_coordinator.erl459
-rw-r--r--lib/dialyzer/src/dialyzer_succ_typings.erl76
-rw-r--r--lib/dialyzer/src/dialyzer_worker.erl33
4 files changed, 258 insertions, 401 deletions
diff --git a/lib/dialyzer/src/dialyzer_analysis_callgraph.erl b/lib/dialyzer/src/dialyzer_analysis_callgraph.erl
index 02108d6e38..e2e65d2e25 100644
--- a/lib/dialyzer/src/dialyzer_analysis_callgraph.erl
+++ b/lib/dialyzer/src/dialyzer_analysis_callgraph.erl
@@ -30,14 +30,14 @@
-export([start/3]).
--export([compile_coordinator_init/0,
- add_to_result/3,
+-export([compile_init_result/0,
+ add_to_result/4,
start_compilation/2,
continue_compilation/2]).
--export_type([compilation_data/0,
- result/0,
- servers/0]).
+-export_type([compile_mid_data/0,
+ compile_result/0,
+ compile_init_data/0]).
-include("dialyzer.hrl").
@@ -210,28 +210,37 @@ analyze_callgraph(Callgraph, State) ->
%% Build the callgraph and fill the codeserver.
%%--------------------------------------------------------------------
+-record(compile_init,{
+ callgraph :: dialyzer_callgraph:callgraph(),
+ codeserver :: dialyzer_codeserver:codeserver(),
+ defines = [] :: [dial_define()],
+ include_dirs = [] :: [file:filename()],
+ start_from = byte_code :: start_from(),
+ use_contracts = true :: boolean()
+ }).
+
+make_compile_init(#analysis_state{codeserver = Codeserver,
+ defines = Defs,
+ include_dirs = Dirs,
+ use_contracts = UseContracts,
+ start_from = StartFrom}, Callgraph) ->
+ #compile_init{callgraph = Callgraph,
+ codeserver = Codeserver,
+ defines = [{d, Macro, Val} || {Macro, Val} <- Defs],
+ include_dirs = [{i, D} || D <- Dirs],
+ use_contracts = UseContracts,
+ start_from = StartFrom}.
+
compile_and_store(Files, #analysis_state{codeserver = CServer,
- defines = Defs,
- include_dirs = Dirs,
- parent = Parent,
- use_contracts = UseContracts,
- start_from = StartFrom
- } = State) ->
+ parent = Parent} = State) ->
send_log(Parent, "Reading files and computing callgraph... "),
{T1, _} = statistics(runtime),
- Includes = [{i, D} || D <- Dirs],
- Defines = [{d, Macro, Val} || {Macro, Val} <- Defs],
Callgraph = dialyzer_callgraph:new(),
- Servers = {Callgraph, CServer, StartFrom, Includes, Defines, UseContracts},
- Coordinator = dialyzer_coordinator:start(compile, Servers),
- Spawner = fun(F) -> dialyzer_coordinator:compiler_spawn(F, Coordinator) end,
- lists:foreach(Spawner, Files),
- dialyzer_coordinator:all_spawned(Coordinator),
- {{V, E, Failed, NoWarn, Modules}, NextLabel} =
- ?timing("compile", _C1, dialyzer_coordinator:receive_compilation_data()),
+ CompileInit = make_compile_init(State, Callgraph),
+ {{Failed, NoWarn, Modules}, NextLabel} =
+ ?timing("compile",
+ dialyzer_coordinator:parallel_job(compile, Files, CompileInit)),
CServer2 = dialyzer_codeserver:set_next_core_label(NextLabel, CServer),
- Callgraph =
- ?timing("digraph", _C2, dialyzer_callgraph:add_edges(E, V, Callgraph)),
case Failed =:= [] of
true ->
NewFiles = lists:zip(lists:reverse(Modules), Files),
@@ -255,33 +264,39 @@ compile_and_store(Files, #analysis_state{codeserver = CServer,
send_log(Parent, Msg2),
{Callgraph, sets:from_list(NoWarn), CServer2}.
--type servers() :: term(). %%opaque
--type result() :: term(). %%opaque
--type file_result() :: term(). %%opaque
--type compilation_data() :: term(). %%opaque
+-type compile_init_data() :: #compile_init{}.
+-type compile_result() :: {list(), list(), list()}. %%opaque
+-type one_file_result() :: term(). %%opaque
+-type compile_mid_data() :: term(). %%opaque
--spec compile_coordinator_init() -> result().
+-spec compile_init_result() -> compile_result().
-compile_coordinator_init() -> {[], [], [], [], []}.
+compile_init_result() -> {[], [], []}.
--spec add_to_result(file:filename(), file_result(), result()) -> result().
+-spec add_to_result(file:filename(), one_file_result(), compile_result(),
+ compile_init_data()) -> compile_result().
-add_to_result(File, NewData, {V, E, Failed, NoWarn, Mods}) ->
+add_to_result(File, NewData, {Failed, NoWarn, Mods}, InitData) ->
case NewData of
{error, Reason} ->
{[{File, Reason}|Failed], NoWarn, Mods};
- {ok, NV, NE, NewNoWarn, Mod} ->
- {NV ++ V, NE ++ E, Failed, NewNoWarn ++ NoWarn, [Mod|Mods]}
+ {ok, V, E, NewNoWarn, Mod} ->
+ Callgraph = InitData#compile_init.callgraph,
+ dialyzer_callgraph:add_edges(E, V, Callgraph),
+ {Failed, NewNoWarn ++ NoWarn, [Mod|Mods]}
end.
--spec start_compilation(file:filename(), servers()) ->
- {error, term()} |{ok, integer(), compilation_data()}.
+-spec start_compilation(file:filename(), compile_init_data()) ->
+ {error, term()} |{ok, integer(), compile_mid_data()}.
-start_compilation(File, {Callgraph, Codeserver, StartFrom,
- Includes, Defines, UseContracts}) ->
+start_compilation(File,
+ #compile_init{callgraph = Callgraph, codeserver = Codeserver,
+ defines = Defines, include_dirs = IncludeD,
+ use_contracts = UseContracts,
+ start_from = StartFrom}) ->
case StartFrom of
src_code ->
- compile_src(File, Includes, Defines, Callgraph, Codeserver, UseContracts);
+ compile_src(File, IncludeD, Defines, Callgraph, Codeserver, UseContracts);
byte_code ->
compile_byte(File, Callgraph, Codeserver, UseContracts)
end.
@@ -379,7 +394,7 @@ store_core(Mod, Core, NoWarn, Callgraph, CServer) ->
CoreTree = cerl:from_records(Core),
{ok, cerl_trees:size(CoreTree), {Mod, CoreTree, NoWarn, Callgraph, CServer}}.
--spec continue_compilation(integer(), compilation_data()) -> file_result().
+-spec continue_compilation(integer(), compile_mid_data()) -> one_file_result().
continue_compilation(NextLabel, {Mod, CoreTree, NoWarn, Callgraph, CServer}) ->
{LabeledTree, _NewNextLabel} = cerl_trees:label(CoreTree, NextLabel),
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl
index c417b45717..490cfb8d49 100644
--- a/lib/dialyzer/src/dialyzer_coordinator.erl
+++ b/lib/dialyzer/src/dialyzer_coordinator.erl
@@ -21,61 +21,23 @@
%%%-------------------------------------------------------------------
%%% File : dialyzer_coordinator.erl
%%% Authors : Stavros Aronis <[email protected]>
-%%%
-%%% Description:
-%%%
-%%% The parallel version of Dialyzer's typesig analysis is spread over 4 modules
-%%% with the intention to both minimize the changes on the original code and use
-%%% a separate module for every kind of Erlang process that will be running.
-%%%
-%%% There are therefore 3 kinds of processes:
-%%%
-%%% - The original Dialyzer backend (in succ_typings module)
-%%% - The worker process for the typesig analysis (in typesig and
-%%% worker)
-%%% - A coordinator of the worker processes (in coordinator)
-%%%
-%%% Operation guidelines:
-%%%
-%%% - The backend requests from the coordinator to spawn a worker for each SCC
-%%% - The backend notifies the coordinator when all SCC have been spawned and
-%%% waits for the server to report that the PLT has been updated
-%%% - Each worker is responsible to notify all those who wait for it.
-%%%
%%%-------------------------------------------------------------------
-module(dialyzer_coordinator).
-%%% Exports for all possible uses of coordinator from main process
--export([start/2, all_spawned/1]).
+%%% Export for dialyzer main process
+-export([parallel_job/3]).
%%% Exports for all possible workers
-export([wait_activation/0, job_done/3]).
-%%% Export for the typesig, dataflow and warnings main process
--export([scc_spawn/2]).
-
-%%% Export for the typesig and dataflow analysis main process
--export([receive_not_fixpoint/0]).
-
-%%% Export for warning main process
--export([receive_warnings/0]).
-
%%% Exports for the typesig and dataflow analysis workers
--export([request_activation/1, sccs_to_pids/1]).
-
-%%% Exports for the compilation main process
--export([compiler_spawn/2, receive_compilation_data/0]).
+-export([sccs_to_pids/1]).
%%% Exports for the compilation workers
-export([get_next_label/2, compilation_done/3]).
--export_type([coordinator/0, mode/0]).
-
--behaviour(gen_server).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
+-export_type([coordinator/0, mode/0, init_data/0]).
%%--------------------------------------------------------------------
@@ -83,41 +45,169 @@
-type coordinator() :: pid(). %%opaque
--type scc() :: [mfa_or_funlbl()] | module().
+-type scc() :: [mfa_or_funlbl()].
-type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'.
--type servers() :: dialyzer_succ_typings:servers() |
- dialyzer_analysis_callgraph:servers() |
- dialyzer_succ_typings:warning_servers().
-
--record(state, {parent :: pid(),
- mode :: mode(),
- spawn_count = 0 :: integer(),
- all_spawned = false :: boolean(),
- next_label :: integer(),
- result :: [mfa_or_funlbl()] |
- dialyzer_analysis_callgraph:result() |
- [dial_warning()],
- init_job_data :: servers(),
- tickets :: integer(),
- queue :: queue()
+
+-type compile_jobs() :: [file:filename()].
+-type typesig_jobs() :: [scc()].
+-type dataflow_jobs() :: [module()].
+-type warnings_jobs() :: [module()].
+
+-type compile_init_data() :: dialyzer_analysis_callgraph:compile_init_data().
+-type typesig_init_data() :: dialyzer_succ_typings:typesig_init_data().
+-type dataflow_init_data() :: dialyzer_succ_typings:dataflow_init_data().
+-type warnings_init_data() :: dialyzer_succ_typings:warnings_init_data().
+
+-type compile_result() :: dialyzer_analysis_callgraph:compile_result().
+-type typesig_result() :: [mfa_or_funlbl()].
+-type dataflow_result() :: [mfa_or_funlbl()].
+-type warnings_result() :: [dial_warning()].
+
+-type init_data() :: compile_init_data() | typesig_init_data() |
+ dataflow_init_data() | warnings_init_data().
+
+-type result() :: compile_result() | typesig_result() |
+ dataflow_result() | warnings_result().
+
+-record(state, {active = 0 :: integer(),
+ result :: result(),
+ next_label = 0 :: integer(),
+ tickets = 0 :: integer(),
+ queue = queue:new() :: queue(),
+ init_data :: init_data()
}).
-include("dialyzer.hrl").
%%--------------------------------------------------------------------
--spec start(mode(), servers()) -> coordinator().
+-spec parallel_job('compile', compile_jobs(), compile_init_data()) ->
+ {compile_result(), integer()};
+ ('typesig', typesig_jobs(), typesig_init_data()) ->
+ typesig_result();
+ ('dataflow', dataflow_jobs(), dataflow_init_data()) ->
+ dataflow_result();
+ ('warnings', warnings_jobs(), warnings_init_data()) ->
+ warnings_result().
-start(Mode, Servers) ->
- {ok, Pid} = gen_server:start_link(?MODULE, {self(), Mode, Servers}, []),
- Pid.
+parallel_job(Mode, Jobs, InitData) ->
+ State = spawn_jobs(Mode, Jobs, InitData),
+ collect_result(Mode, State).
--spec scc_spawn(scc() | module(), coordinator()) -> ok.
+spawn_jobs(Mode, Jobs, InitData) when
+ Mode =:= 'typesig'; Mode =:= 'dataflow' ->
+ Coordinator = self(),
+ ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
+ Fold =
+ fun(Job, Count) ->
+ Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
+ true = ets:insert(?MAP, {Job, Pid}),
+ Count + 1
+ end,
+ JobCount = lists:foldl(Fold, 0, Jobs),
+ #state{active = JobCount, result = [], init_data = InitData};
+spawn_jobs(Mode, Jobs, InitData) when
+ Mode =:= 'compile'; Mode =:= 'warnings' ->
+ Coordinator = self(),
+ Fold =
+ fun(Job, {InTickets, InQueue, Count}) ->
+ Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
+ NewCount = Count + 1,
+ case InTickets of
+ 0 -> {InTickets, queue:in(Pid, InQueue), NewCount};
+ N -> activate_pid(Pid), {N-1, InQueue, NewCount}
+ end
+ end,
+ CPUs = erlang:system_info(logical_processors_available),
+ InitTickets = 4*CPUs,
+ {Tickets, Queue, JobCount} =
+ lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs),
+ InitResult =
+ case Mode of
+ 'warnings' -> [];
+ 'compile' -> dialyzer_analysis_callgraph:compile_init_result()
+ end,
+ #state{active = JobCount, result = InitResult, next_label = 0,
+ tickets = Tickets, queue = Queue, init_data = InitData}.
+
+collect_result(Mode, State) ->
+ case Mode of
+ 'compile' -> compile_loop(State);
+ 'typesig' -> not_fixpoint_loop(State);
+ 'dataflow' -> not_fixpoint_loop(State);
+ 'warnings' -> warnings_loop(State)
+ end.
-scc_spawn(SCC, Coordinator) ->
- cast({scc_spawn, SCC}, Coordinator).
+compile_loop(#state{active = Active, result = Result,
+ next_label = NextLabel, tickets = Tickets,
+ queue = Queue, init_data = InitData} = State) ->
+ receive
+ {next_label_request, Estimation, Pid} ->
+ Pid ! {next_label_reply, NextLabel},
+ compile_loop(State#state{next_label = NextLabel + Estimation});
+ {done, Job, Data} ->
+ NewResult =
+ dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData),
+ case Active of
+ 1 ->
+ {NewResult, NextLabel};
+ _ ->
+ NewActive = Active - 1,
+ {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
+ NewState =
+ State#state{result = NewResult, active = NewActive,
+ queue = NewQueue, tickets = NewTickets},
+ compile_loop(NewState)
+ end
+ end.
--spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}.
+not_fixpoint_loop(#state{active = Active, result = Result,
+ init_data = InitData} = State) ->
+ receive
+ {done, _Job, Data} ->
+ FinalData = dialyzer_succ_typings:lookup_names(Data, InitData),
+ NewResult = FinalData ++ Result,
+ case Active of
+ 1 ->
+ ets:delete(?MAP),
+ NewResult;
+ _ ->
+ NewActive = Active - 1,
+ NewState = State#state{active = NewActive, result = NewResult},
+ not_fixpoint_loop(NewState)
+ end
+ end.
+
+warnings_loop(#state{active = Active, result = Result, tickets = Tickets,
+ queue = Queue} = State) ->
+ receive
+ {done, _Job, Data} ->
+ NewResult = Data ++ Result,
+ case Active of
+ 1 -> NewResult;
+ _ ->
+ NewActive = Active - 1,
+ {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
+ NewState =
+ State#state{result = NewResult, active = NewActive,
+ queue = NewQueue, tickets = NewTickets},
+ warnings_loop(NewState)
+ end
+ end.
+
+manage_waiting(Queue, Tickets) ->
+ {Waiting, NewQueue} = queue:out(Queue),
+ NewTickets =
+ case Waiting of
+ empty -> Tickets + 1;
+ {value, Pid} ->
+ activate_pid(Pid),
+ Tickets
+ end,
+ {NewQueue, NewTickets}.
+
+-spec sccs_to_pids([scc() | module()]) ->
+ {[dialyzer_worker:worker()], [scc() | module()]}.
sccs_to_pids(SCCs) ->
lists:foldl(fun pid_partition/2, {[], []}, SCCs).
@@ -129,70 +219,27 @@ pid_partition(SCC, {Pids, Unknown}) ->
_:_ -> {Pids, [SCC|Unknown]}
end.
--spec job_done(scc() | file:filename(), term(), coordinator()) -> ok.
+-spec job_done(scc() | module() | file:filename(), term(), coordinator()) -> ok.
job_done(Job, Result, Coordinator) ->
- cast({done, Job, Result}, Coordinator).
+ Coordinator ! {done, Job, Result},
+ ok.
-spec compilation_done(file:filename(),
- dialyzer_analysis_callgraph:compilation_data(),
+ dialyzer_analysis_callgraph:compile_result(),
coordinator()) -> ok.
compilation_done(Filename, CompilationData, Coordinator) ->
- cast({done, Filename, CompilationData}, Coordinator).
-
--spec all_spawned(coordinator()) -> ok.
-
-all_spawned(Coordinator) ->
- cast(all_spawned, Coordinator).
-
-send_done_to_parent(#state{mode = Mode,
- parent = Parent,
- result = Result,
- next_label = NextLabel}) ->
- Msg =
- case Mode of
- X when X =:= 'typesig'; X =:= 'dataflow' ->
- ets:delete(?MAP),
- {not_fixpoint, Result};
- 'compile' -> {compilation_data, Result, NextLabel};
- 'warnings' -> {warnings, Result}
- end,
- Parent ! Msg,
+ Coordinator ! {done, Filename, CompilationData},
ok.
--spec receive_not_fixpoint() -> [mfa_or_funlbl()].
-
-receive_not_fixpoint() ->
- receive {not_fixpoint, NotFixpoint} -> NotFixpoint end.
-
--spec receive_compilation_data() ->
- {dialyzer_analysis_callgraph:result(), integer()}.
-
-receive_compilation_data() ->
- receive {compilation_data, CompilationData, NextLabel} ->
- {CompilationData, NextLabel}
- end.
-
--spec receive_warnings() -> [dial_warning()].
-
-receive_warnings() ->
- receive {warnings, Warnings} -> Warnings end.
-
--spec compiler_spawn(file:filename(), coordinator()) -> ok.
-
-compiler_spawn(Filename, Coordinator) ->
- cast({compiler_spawn, Filename}, Coordinator).
-
-spec get_next_label(integer(), coordinator()) -> integer().
get_next_label(EstimatedSize, Coordinator) ->
- call({get_next_label, EstimatedSize}, Coordinator).
-
--spec request_activation(coordinator()) -> ok.
-
-request_activation(Coordinator) ->
- cast({request_activation, self()}, Coordinator).
+ Coordinator ! {next_label_request, EstimatedSize, self()},
+ receive
+ {next_label_reply, NextLabel} -> NextLabel
+ end.
-spec wait_activation() -> ok.
@@ -201,179 +248,3 @@ wait_activation() ->
activate_pid(Pid) ->
Pid ! activate.
-
-%%--------------------------------------------------------------------
-
--spec init({pid(), mode(), servers()}) -> {ok, #state{}}.
-
-init({Parent, Mode, InitJobData}) ->
- BaseTickets = erlang:system_info(logical_processors_available),
- Tickets =
- case Mode of
- 'compile' -> 4*BaseTickets;
- 'warnings' -> 4*BaseTickets;
- _ -> non_regulated
- end,
- InitState =
- #state{parent = Parent, mode = Mode, init_job_data = InitJobData,
- tickets = Tickets, queue = queue:new()},
- State =
- case Mode of
- X when X =:= 'typesig'; X =:= 'dataflow' ->
- ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
- InitState#state{result = []};
- 'warnings' ->
- InitState#state{result = []};
- 'compile' ->
- InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(),
- InitState#state{result = InitResult, next_label = 0}
- end,
- {ok, State}.
-
--spec handle_call(Query::term(), From::term(), #state{}) ->
- {reply, Reply::term(), #state{}}.
-
-handle_call({get_next_label, EstimatedSize}, _From,
- #state{next_label = NextLabel} = State) ->
- {reply, NextLabel, State#state{next_label = NextLabel + EstimatedSize}}.
-
--spec handle_cast(Msg::term(), #state{}) ->
- {noreply, #state{}} | {stop, normal, #state{}}.
-
-handle_cast({done, _Job, NewData},
- #state{mode = Mode, result = OldResult,
- init_job_data = Servers} = State) when
- Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers),
- UpdatedState = State#state{result = FinalData ++ OldResult},
- reduce_or_stop(UpdatedState);
-handle_cast({done, Job, NewData},
- #state{mode = Mode,
- result = OldResult,
- tickets = Tickets,
- queue = Queue} = State) when
- Mode =:= 'compile'; Mode =:= 'warnings' ->
- {Waiting, NewQueue} = queue:out(Queue),
- NewTickets =
- case Waiting of
- empty -> Tickets+1;
- {value, Pid} ->
- activate_pid(Pid),
- Tickets
- end,
- NewResult =
- case Mode of
- 'compile' ->
- dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult);
- 'warnings' ->
- NewData ++ OldResult
- end,
- UpdatedState =
- State#state{result = NewResult, tickets = NewTickets, queue = NewQueue},
- reduce_or_stop(UpdatedState);
-handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) ->
- case SpawnCount of
- 0 ->
- send_done_to_parent(State),
- {stop, normal, State};
- _ ->
- NewState = State#state{all_spawned = true},
- {noreply, NewState}
- end;
-handle_cast({request_activation, Pid},
- #state{tickets = Tickets, queue = Queue} = State) ->
- {NewTickets, NewQueue} =
- case Tickets of
- 0 -> {Tickets, queue:in(Pid, Queue)};
- N ->
- activate_pid(Pid),
- {N-1, Queue}
- end,
- {noreply, State#state{tickets = NewTickets, queue = NewQueue}};
-handle_cast({scc_spawn, SCC},
- #state{mode = Mode,
- init_job_data = Servers,
- spawn_count = SpawnCount} = State) when
- Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()),
- true = ets:insert(?MAP, {SCC, Pid}),
- {noreply, State#state{spawn_count = SpawnCount + 1}};
-handle_cast({scc_spawn, SCC},
- #state{mode = 'warnings',
- init_job_data = Servers,
- spawn_count = SpawnCount,
- tickets = Tickets,
- queue = Queue} = State) ->
- Pid = dialyzer_worker:launch('warnings', SCC, Servers, self()),
- {NewTickets, NewQueue} =
- case Tickets of
- 0 -> {Tickets, queue:in(Pid, Queue)};
- N ->
- activate_pid(Pid),
- {N-1, Queue}
- end,
- {noreply, State#state{spawn_count = SpawnCount + 1,
- tickets = NewTickets,
- queue = NewQueue}};
-handle_cast({compiler_spawn, Filename},
- #state{mode = Mode,
- init_job_data = Servers,
- spawn_count = SpawnCount,
- tickets = Tickets,
- queue = Queue
- } = State) ->
- Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()),
- NewSpawnCount = SpawnCount + 1,
- {NewTickets, NewQueue} =
- case Tickets of
- 0 -> {Tickets, queue:in(Pid, Queue)};
- N ->
- activate_pid(Pid),
- {N-1, Queue}
- end,
- {noreply, State#state{spawn_count = NewSpawnCount,
- tickets = NewTickets,
- queue = NewQueue}}.
-
--spec handle_info(term(), #state{}) -> {noreply, #state{}}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
--spec terminate(term(), #state{}) -> ok.
-
-terminate(_Reason, _State) ->
- ok.
-
--spec code_change(term(), #state{}, term()) -> {ok, #state{}}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-cast(Message, Coordinator) ->
- gen_server:cast(Coordinator, Message).
-
-call(Message, Coordinator) ->
- gen_server:call(Coordinator, Message, infinity).
-
-reduce_or_stop(#state{all_spawned = AllSpawned,
- spawn_count = SpawnCount} = State) ->
- Action =
- case AllSpawned of
- false -> reduce;
- true ->
- case SpawnCount of
- 1 -> finish;
- _ -> reduce
- end
- end,
- case Action of
- reduce ->
- NewState = State#state{spawn_count = SpawnCount - 1},
- {noreply, NewState};
- finish ->
- send_done_to_parent(State),
- {stop, normal, State}
- end.
diff --git a/lib/dialyzer/src/dialyzer_succ_typings.erl b/lib/dialyzer/src/dialyzer_succ_typings.erl
index ff4102f5a3..b980e43ecc 100644
--- a/lib/dialyzer/src/dialyzer_succ_typings.erl
+++ b/lib/dialyzer/src/dialyzer_succ_typings.erl
@@ -41,7 +41,7 @@
lookup_names/2
]).
--export_type([servers/0, warning_servers/0]).
+-export_type([typesig_init_data/0, dataflow_init_data/0, warnings_init_data/0]).
%%-define(DEBUG, true).
@@ -61,7 +61,12 @@
%% State record -- local to this module
-type parent() :: 'none' | pid().
--type servers() :: term(). %%opaque
+-type typesig_init_data() :: term().
+-type dataflow_init_data() :: term().
+-type warnings_init_data() :: term().
+
+-type fixpoint_init_data() :: typesig_init_data() | dataflow_init_data().
+
-type scc() :: [mfa_or_funlbl()] | [module()].
@@ -146,12 +151,8 @@ get_warnings(Callgraph, Plt, DocPlt, Codeserver, NoWarnUnused, Parent) ->
get_warnings_from_modules(Mods, State, DocPlt) ->
#st{callgraph = Callgraph, codeserver = Codeserver,
no_warn_unused = NoWarnUnused, plt = Plt} = State,
- Servers = {Callgraph, Codeserver, NoWarnUnused, Plt, DocPlt},
- Coordinator = dialyzer_coordinator:start('warnings', Servers),
- Spawner = fun(M) -> dialyzer_coordinator:scc_spawn(M, Coordinator) end,
- lists:foreach(Spawner, Mods),
- dialyzer_coordinator:all_spawned(Coordinator),
- dialyzer_coordinator:receive_warnings().
+ Init = {Callgraph, Codeserver, NoWarnUnused, Plt, DocPlt},
+ dialyzer_coordinator:parallel_job(warnings, Mods, Init).
-type warning_servers() :: term().
@@ -210,45 +211,36 @@ postprocess_dataflow_warns([{?WARN_CONTRACT_RANGE, {CallF, CallL}, Msg}|Rest],
postprocess_dataflow_warns(Rest, Codeserver, WAcc, [W|Acc])
end.
-refine_succ_typings(ModulePostorder, #st{codeserver = Codeserver,
- callgraph = Callgraph,
- plt = Plt} = State) ->
+refine_succ_typings(Modules, #st{codeserver = Codeserver,
+ callgraph = Callgraph,
+ plt = Plt} = State) ->
?debug("Module postorder: ~p\n", [ModulePostorder]),
- Servers = {Codeserver, Callgraph, Plt},
- Coordinator = dialyzer_coordinator:start(dataflow, Servers),
- ?timing("refine",refine_succ_typings(ModulePostorder, State, Coordinator)).
-
-refine_succ_typings([M|Rest], State, Coordinator) ->
- Msg = io_lib:format("Dataflow of module: ~w\n", [M]),
- send_log(State#st.parent, Msg),
- ?debug("~s\n", [Msg]),
- dialyzer_coordinator:scc_spawn(M, Coordinator),
- refine_succ_typings(Rest, State, Coordinator);
-refine_succ_typings([], State, Coordinator) ->
- dialyzer_coordinator:all_spawned(Coordinator),
- NotFixpoint = dialyzer_coordinator:receive_not_fixpoint(),
+ Init = {Codeserver, Callgraph, Plt},
+ NotFixpoint =
+ ?timing("refine",
+ dialyzer_coordinator:parallel_job(dataflow, Modules, Init)),
?debug("==================== Dataflow done ====================\n\n", []),
case NotFixpoint =:= [] of
true -> {fixpoint, State};
false -> {not_fixpoint, NotFixpoint, State}
end.
--spec find_depends_on(scc() | module(), servers()) -> [scc()].
+-spec find_depends_on(scc() | module(), fixpoint_init_data()) -> [scc()].
find_depends_on(SCC, {_Codeserver, Callgraph, _Plt}) ->
dialyzer_callgraph:get_depends_on(SCC, Callgraph).
--spec find_required_by(scc() | module(), servers()) -> [scc()].
+-spec find_required_by(scc() | module(), fixpoint_init_data()) -> [scc()].
find_required_by(SCC, {_Codeserver, Callgraph, _Plt}) ->
dialyzer_callgraph:get_required_by(SCC, Callgraph).
--spec lookup_names([label()], servers()) -> [mfa_or_funlbl()].
+-spec lookup_names([label()], fixpoint_init_data()) -> [mfa_or_funlbl()].
lookup_names(Labels, {_Codeserver, Callgraph, _Plt}) ->
[lookup_name(F, Callgraph) || F <- Labels].
--spec refine_one_module(module(), servers()) -> [label()]. % ordset
+-spec refine_one_module(module(), dataflow_init_data()) -> [label()]. % ordset
refine_one_module(M, {CodeServer, Callgraph, Plt}) ->
ModCode = dialyzer_codeserver:lookup_mod_code(M, CodeServer),
@@ -322,28 +314,17 @@ compare_types_1([], [], _Strict, NotFixpoint) ->
find_succ_typings(SCCs, #st{codeserver = Codeserver, callgraph = Callgraph,
plt = Plt} = State) ->
- Servers = {Codeserver, dialyzer_callgraph:mini_callgraph(Callgraph), Plt},
- Coordinator = dialyzer_coordinator:start(typesig, Servers),
- ?timing("spawn", _C1, find_succ_typings(SCCs, State, Coordinator)),
- dialyzer_coordinator:all_spawned(Coordinator),
+ Init = {Codeserver, dialyzer_callgraph:mini_callgraph(Callgraph), Plt},
NotFixpoint =
- ?timing("typesig", _C2, dialyzer_coordinator:receive_not_fixpoint()),
+ ?timing("typesig",
+ dialyzer_coordinator:parallel_job(typesig, SCCs, Init)),
?debug("==================== Typesig done ====================\n\n", []),
case NotFixpoint =:= [] of
true -> {fixpoint, State};
false -> {not_fixpoint, NotFixpoint, State}
end.
-find_succ_typings([SCC|Rest], #st{parent = Parent} = State, Coordinator) ->
- Msg = io_lib:format("Typesig analysis for SCC: ~w\n", [format_scc(SCC)]),
- ?debug("~s", [Msg]),
- send_log(Parent, Msg),
- dialyzer_coordinator:scc_spawn(SCC, Coordinator),
- find_succ_typings(Rest, State, Coordinator);
-find_succ_typings([], _State, _Coordinator) ->
- ok.
-
--spec find_succ_types_for_scc(scc(), servers()) -> [mfa_or_funlbl()].
+-spec find_succ_types_for_scc(scc(), typesig_init_data()) -> [mfa_or_funlbl()].
find_succ_types_for_scc(SCC, {Codeserver, Callgraph, Plt}) ->
SCC_Info = [{MFA,
@@ -459,12 +440,3 @@ lookup_name(F, CG) ->
error -> F;
{ok, Name} -> Name
end.
-
-send_log(none, _Msg) ->
- ok;
-send_log(Parent, Msg) ->
- Parent ! {self(), log, lists:flatten(Msg)},
- ok.
-
-format_scc(SCC) ->
- [MFA || {_M, _F, _A} = MFA <- SCC].
diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl
index 0ef30cf940..cc4032d154 100644
--- a/lib/dialyzer/src/dialyzer_worker.erl
+++ b/lib/dialyzer/src/dialyzer_worker.erl
@@ -28,14 +28,13 @@
-type mode() :: dialyzer_coordinator:mode().
-type coordinator() :: dialyzer_coordinator:coordinator().
--type servers() :: dialyzer_succ_typings:servers() |
- dialyzer_analysis_callgraph:servers().
+-type init_data() :: dialyzer_coordinator:init_data().
-record(state, {
mode :: mode(),
job :: mfa_or_funlbl() | file:filename(),
coordinator :: coordinator(),
- servers :: servers(),
+ init_data :: init_data(),
depends_on = [] :: list()
}).
@@ -51,12 +50,12 @@
%%--------------------------------------------------------------------
--spec launch(mode(), [mfa_or_funlbl()], servers(), coordinator()) -> worker().
+-spec launch(mode(), [mfa_or_funlbl()], init_data(), coordinator()) -> worker().
-launch(Mode, Job, Servers, Coordinator) ->
+launch(Mode, Job, InitData, Coordinator) ->
State = #state{mode = Mode,
job = Job,
- servers = Servers,
+ init_data = InitData,
coordinator = Coordinator},
InitState =
case Mode of
@@ -75,8 +74,8 @@ loop(updating, State) ->
false -> running
end,
loop(NextStatus, State);
-loop(initializing, #state{job = SCC, servers = Servers} = State) ->
- DependsOn = dialyzer_succ_typings:find_depends_on(SCC, Servers),
+loop(initializing, #state{job = SCC, init_data = InitData} = State) ->
+ DependsOn = dialyzer_succ_typings:find_depends_on(SCC, InitData),
?debug("Deps ~p: ~p\n",[State#state.job, DependsOn]),
loop(updating, State#state{depends_on = DependsOn});
loop(waiting, State) ->
@@ -113,8 +112,8 @@ waits_more_success_typings(#state{depends_on = Depends}) ->
_ -> true
end.
-broadcast_done(#state{job = SCC, servers = Servers}) ->
- RequiredBy = dialyzer_succ_typings:find_required_by(SCC, Servers),
+broadcast_done(#state{job = SCC, init_data = InitData}) ->
+ RequiredBy = dialyzer_succ_typings:find_required_by(SCC, InitData),
{Callers, Unknown} = dialyzer_coordinator:sccs_to_pids(RequiredBy),
send_done(Callers, SCC),
continue_broadcast_done(Unknown, SCC).
@@ -144,18 +143,18 @@ wait_for_success_typings(#state{depends_on = DependsOn} = State) ->
State
end.
-do_work(#state{mode = Mode, job = Job, servers = Servers}) ->
+do_work(#state{mode = Mode, job = Job, init_data = InitData}) ->
case Mode of
- typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, Servers);
- dataflow -> dialyzer_succ_typings:refine_one_module(Job, Servers)
+ typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData);
+ dataflow -> dialyzer_succ_typings:refine_one_module(Job, InitData)
end.
report_to_coordinator(Result, #state{job = Job, coordinator = Coordinator}) ->
?debug("Done: ~p\n",[Job]),
dialyzer_coordinator:job_done(Job, Result, Coordinator).
-start_compilation(#state{job = Job, servers = Servers}) ->
- dialyzer_analysis_callgraph:start_compilation(Job, Servers).
+start_compilation(#state{job = Job, init_data = InitData}) ->
+ dialyzer_analysis_callgraph:start_compilation(Job, InitData).
ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) ->
dialyzer_coordinator:get_next_label(EstimatedSize, Coordinator).
@@ -163,5 +162,5 @@ ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) ->
continue_compilation(Label, Data) ->
dialyzer_analysis_callgraph:continue_compilation(Label, Data).
-collect_warnings(#state{job = Job, servers = Servers}) ->
- dialyzer_succ_typings:collect_warnings(Job, Servers).
+collect_warnings(#state{job = Job, init_data = InitData}) ->
+ dialyzer_succ_typings:collect_warnings(Job, InitData).