aboutsummaryrefslogtreecommitdiffstats
path: root/lib/dialyzer/src/dialyzer_coordinator.erl
diff options
context:
space:
mode:
authorStavros Aronis <[email protected]>2012-02-28 22:49:04 +0100
committerHenrik Nord <[email protected]>2012-05-21 15:31:21 +0200
commit1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a (patch)
tree7f30f58e3e4ca2befda694a04db8e55966cf5786 /lib/dialyzer/src/dialyzer_coordinator.erl
parent913ee73601e3f7d0b27d833bd67cf6ee3868018a (diff)
downloadotp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.gz
otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.bz2
otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.zip
Coordinator is no longer a separate process
Diffstat (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl')
-rw-r--r--lib/dialyzer/src/dialyzer_coordinator.erl459
1 files changed, 165 insertions, 294 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl
index c417b45717..490cfb8d49 100644
--- a/lib/dialyzer/src/dialyzer_coordinator.erl
+++ b/lib/dialyzer/src/dialyzer_coordinator.erl
@@ -21,61 +21,23 @@
%%%-------------------------------------------------------------------
%%% File : dialyzer_coordinator.erl
%%% Authors : Stavros Aronis <[email protected]>
-%%%
-%%% Description:
-%%%
-%%% The parallel version of Dialyzer's typesig analysis is spread over 4 modules
-%%% with the intention to both minimize the changes on the original code and use
-%%% a separate module for every kind of Erlang process that will be running.
-%%%
-%%% There are therefore 3 kinds of processes:
-%%%
-%%% - The original Dialyzer backend (in succ_typings module)
-%%% - The worker process for the typesig analysis (in typesig and
-%%% worker)
-%%% - A coordinator of the worker processes (in coordinator)
-%%%
-%%% Operation guidelines:
-%%%
-%%% - The backend requests from the coordinator to spawn a worker for each SCC
-%%% - The backend notifies the coordinator when all SCC have been spawned and
-%%% waits for the server to report that the PLT has been updated
-%%% - Each worker is responsible to notify all those who wait for it.
-%%%
%%%-------------------------------------------------------------------
-module(dialyzer_coordinator).
-%%% Exports for all possible uses of coordinator from main process
--export([start/2, all_spawned/1]).
+%%% Export for dialyzer main process
+-export([parallel_job/3]).
%%% Exports for all possible workers
-export([wait_activation/0, job_done/3]).
-%%% Export for the typesig, dataflow and warnings main process
--export([scc_spawn/2]).
-
-%%% Export for the typesig and dataflow analysis main process
--export([receive_not_fixpoint/0]).
-
-%%% Export for warning main process
--export([receive_warnings/0]).
-
%%% Exports for the typesig and dataflow analysis workers
--export([request_activation/1, sccs_to_pids/1]).
-
-%%% Exports for the compilation main process
--export([compiler_spawn/2, receive_compilation_data/0]).
+-export([sccs_to_pids/1]).
%%% Exports for the compilation workers
-export([get_next_label/2, compilation_done/3]).
--export_type([coordinator/0, mode/0]).
-
--behaviour(gen_server).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
+-export_type([coordinator/0, mode/0, init_data/0]).
%%--------------------------------------------------------------------
@@ -83,41 +45,169 @@
-type coordinator() :: pid(). %%opaque
--type scc() :: [mfa_or_funlbl()] | module().
+-type scc() :: [mfa_or_funlbl()].
-type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'.
--type servers() :: dialyzer_succ_typings:servers() |
- dialyzer_analysis_callgraph:servers() |
- dialyzer_succ_typings:warning_servers().
-
--record(state, {parent :: pid(),
- mode :: mode(),
- spawn_count = 0 :: integer(),
- all_spawned = false :: boolean(),
- next_label :: integer(),
- result :: [mfa_or_funlbl()] |
- dialyzer_analysis_callgraph:result() |
- [dial_warning()],
- init_job_data :: servers(),
- tickets :: integer(),
- queue :: queue()
+
+-type compile_jobs() :: [file:filename()].
+-type typesig_jobs() :: [scc()].
+-type dataflow_jobs() :: [module()].
+-type warnings_jobs() :: [module()].
+
+-type compile_init_data() :: dialyzer_analysis_callgraph:compile_init_data().
+-type typesig_init_data() :: dialyzer_succ_typings:typesig_init_data().
+-type dataflow_init_data() :: dialyzer_succ_typings:dataflow_init_data().
+-type warnings_init_data() :: dialyzer_succ_typings:warnings_init_data().
+
+-type compile_result() :: dialyzer_analysis_callgraph:compile_result().
+-type typesig_result() :: [mfa_or_funlbl()].
+-type dataflow_result() :: [mfa_or_funlbl()].
+-type warnings_result() :: [dial_warning()].
+
+-type init_data() :: compile_init_data() | typesig_init_data() |
+ dataflow_init_data() | warnings_init_data().
+
+-type result() :: compile_result() | typesig_result() |
+ dataflow_result() | warnings_result().
+
+-record(state, {active = 0 :: integer(),
+ result :: result(),
+ next_label = 0 :: integer(),
+ tickets = 0 :: integer(),
+ queue = queue:new() :: queue(),
+ init_data :: init_data()
}).
-include("dialyzer.hrl").
%%--------------------------------------------------------------------
--spec start(mode(), servers()) -> coordinator().
+-spec parallel_job('compile', compile_jobs(), compile_init_data()) ->
+ {compile_result(), integer()};
+ ('typesig', typesig_jobs(), typesig_init_data()) ->
+ typesig_result();
+ ('dataflow', dataflow_jobs(), dataflow_init_data()) ->
+ dataflow_result();
+ ('warnings', warnings_jobs(), warnings_init_data()) ->
+ warnings_result().
-start(Mode, Servers) ->
- {ok, Pid} = gen_server:start_link(?MODULE, {self(), Mode, Servers}, []),
- Pid.
+parallel_job(Mode, Jobs, InitData) ->
+ State = spawn_jobs(Mode, Jobs, InitData),
+ collect_result(Mode, State).
--spec scc_spawn(scc() | module(), coordinator()) -> ok.
+spawn_jobs(Mode, Jobs, InitData) when
+ Mode =:= 'typesig'; Mode =:= 'dataflow' ->
+ Coordinator = self(),
+ ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
+ Fold =
+ fun(Job, Count) ->
+ Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
+ true = ets:insert(?MAP, {Job, Pid}),
+ Count + 1
+ end,
+ JobCount = lists:foldl(Fold, 0, Jobs),
+ #state{active = JobCount, result = [], init_data = InitData};
+spawn_jobs(Mode, Jobs, InitData) when
+ Mode =:= 'compile'; Mode =:= 'warnings' ->
+ Coordinator = self(),
+ Fold =
+ fun(Job, {InTickets, InQueue, Count}) ->
+ Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
+ NewCount = Count + 1,
+ case InTickets of
+ 0 -> {InTickets, queue:in(Pid, InQueue), NewCount};
+ N -> activate_pid(Pid), {N-1, InQueue, NewCount}
+ end
+ end,
+ CPUs = erlang:system_info(logical_processors_available),
+ InitTickets = 4*CPUs,
+ {Tickets, Queue, JobCount} =
+ lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs),
+ InitResult =
+ case Mode of
+ 'warnings' -> [];
+ 'compile' -> dialyzer_analysis_callgraph:compile_init_result()
+ end,
+ #state{active = JobCount, result = InitResult, next_label = 0,
+ tickets = Tickets, queue = Queue, init_data = InitData}.
+
+collect_result(Mode, State) ->
+ case Mode of
+ 'compile' -> compile_loop(State);
+ 'typesig' -> not_fixpoint_loop(State);
+ 'dataflow' -> not_fixpoint_loop(State);
+ 'warnings' -> warnings_loop(State)
+ end.
-scc_spawn(SCC, Coordinator) ->
- cast({scc_spawn, SCC}, Coordinator).
+compile_loop(#state{active = Active, result = Result,
+ next_label = NextLabel, tickets = Tickets,
+ queue = Queue, init_data = InitData} = State) ->
+ receive
+ {next_label_request, Estimation, Pid} ->
+ Pid ! {next_label_reply, NextLabel},
+ compile_loop(State#state{next_label = NextLabel + Estimation});
+ {done, Job, Data} ->
+ NewResult =
+ dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData),
+ case Active of
+ 1 ->
+ {NewResult, NextLabel};
+ _ ->
+ NewActive = Active - 1,
+ {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
+ NewState =
+ State#state{result = NewResult, active = NewActive,
+ queue = NewQueue, tickets = NewTickets},
+ compile_loop(NewState)
+ end
+ end.
--spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}.
+not_fixpoint_loop(#state{active = Active, result = Result,
+ init_data = InitData} = State) ->
+ receive
+ {done, _Job, Data} ->
+ FinalData = dialyzer_succ_typings:lookup_names(Data, InitData),
+ NewResult = FinalData ++ Result,
+ case Active of
+ 1 ->
+ ets:delete(?MAP),
+ NewResult;
+ _ ->
+ NewActive = Active - 1,
+ NewState = State#state{active = NewActive, result = NewResult},
+ not_fixpoint_loop(NewState)
+ end
+ end.
+
+warnings_loop(#state{active = Active, result = Result, tickets = Tickets,
+ queue = Queue} = State) ->
+ receive
+ {done, _Job, Data} ->
+ NewResult = Data ++ Result,
+ case Active of
+ 1 -> NewResult;
+ _ ->
+ NewActive = Active - 1,
+ {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
+ NewState =
+ State#state{result = NewResult, active = NewActive,
+ queue = NewQueue, tickets = NewTickets},
+ warnings_loop(NewState)
+ end
+ end.
+
+manage_waiting(Queue, Tickets) ->
+ {Waiting, NewQueue} = queue:out(Queue),
+ NewTickets =
+ case Waiting of
+ empty -> Tickets + 1;
+ {value, Pid} ->
+ activate_pid(Pid),
+ Tickets
+ end,
+ {NewQueue, NewTickets}.
+
+-spec sccs_to_pids([scc() | module()]) ->
+ {[dialyzer_worker:worker()], [scc() | module()]}.
sccs_to_pids(SCCs) ->
lists:foldl(fun pid_partition/2, {[], []}, SCCs).
@@ -129,70 +219,27 @@ pid_partition(SCC, {Pids, Unknown}) ->
_:_ -> {Pids, [SCC|Unknown]}
end.
--spec job_done(scc() | file:filename(), term(), coordinator()) -> ok.
+-spec job_done(scc() | module() | file:filename(), term(), coordinator()) -> ok.
job_done(Job, Result, Coordinator) ->
- cast({done, Job, Result}, Coordinator).
+ Coordinator ! {done, Job, Result},
+ ok.
-spec compilation_done(file:filename(),
- dialyzer_analysis_callgraph:compilation_data(),
+ dialyzer_analysis_callgraph:compile_result(),
coordinator()) -> ok.
compilation_done(Filename, CompilationData, Coordinator) ->
- cast({done, Filename, CompilationData}, Coordinator).
-
--spec all_spawned(coordinator()) -> ok.
-
-all_spawned(Coordinator) ->
- cast(all_spawned, Coordinator).
-
-send_done_to_parent(#state{mode = Mode,
- parent = Parent,
- result = Result,
- next_label = NextLabel}) ->
- Msg =
- case Mode of
- X when X =:= 'typesig'; X =:= 'dataflow' ->
- ets:delete(?MAP),
- {not_fixpoint, Result};
- 'compile' -> {compilation_data, Result, NextLabel};
- 'warnings' -> {warnings, Result}
- end,
- Parent ! Msg,
+ Coordinator ! {done, Filename, CompilationData},
ok.
--spec receive_not_fixpoint() -> [mfa_or_funlbl()].
-
-receive_not_fixpoint() ->
- receive {not_fixpoint, NotFixpoint} -> NotFixpoint end.
-
--spec receive_compilation_data() ->
- {dialyzer_analysis_callgraph:result(), integer()}.
-
-receive_compilation_data() ->
- receive {compilation_data, CompilationData, NextLabel} ->
- {CompilationData, NextLabel}
- end.
-
--spec receive_warnings() -> [dial_warning()].
-
-receive_warnings() ->
- receive {warnings, Warnings} -> Warnings end.
-
--spec compiler_spawn(file:filename(), coordinator()) -> ok.
-
-compiler_spawn(Filename, Coordinator) ->
- cast({compiler_spawn, Filename}, Coordinator).
-
-spec get_next_label(integer(), coordinator()) -> integer().
get_next_label(EstimatedSize, Coordinator) ->
- call({get_next_label, EstimatedSize}, Coordinator).
-
--spec request_activation(coordinator()) -> ok.
-
-request_activation(Coordinator) ->
- cast({request_activation, self()}, Coordinator).
+ Coordinator ! {next_label_request, EstimatedSize, self()},
+ receive
+ {next_label_reply, NextLabel} -> NextLabel
+ end.
-spec wait_activation() -> ok.
@@ -201,179 +248,3 @@ wait_activation() ->
activate_pid(Pid) ->
Pid ! activate.
-
-%%--------------------------------------------------------------------
-
--spec init({pid(), mode(), servers()}) -> {ok, #state{}}.
-
-init({Parent, Mode, InitJobData}) ->
- BaseTickets = erlang:system_info(logical_processors_available),
- Tickets =
- case Mode of
- 'compile' -> 4*BaseTickets;
- 'warnings' -> 4*BaseTickets;
- _ -> non_regulated
- end,
- InitState =
- #state{parent = Parent, mode = Mode, init_job_data = InitJobData,
- tickets = Tickets, queue = queue:new()},
- State =
- case Mode of
- X when X =:= 'typesig'; X =:= 'dataflow' ->
- ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
- InitState#state{result = []};
- 'warnings' ->
- InitState#state{result = []};
- 'compile' ->
- InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(),
- InitState#state{result = InitResult, next_label = 0}
- end,
- {ok, State}.
-
--spec handle_call(Query::term(), From::term(), #state{}) ->
- {reply, Reply::term(), #state{}}.
-
-handle_call({get_next_label, EstimatedSize}, _From,
- #state{next_label = NextLabel} = State) ->
- {reply, NextLabel, State#state{next_label = NextLabel + EstimatedSize}}.
-
--spec handle_cast(Msg::term(), #state{}) ->
- {noreply, #state{}} | {stop, normal, #state{}}.
-
-handle_cast({done, _Job, NewData},
- #state{mode = Mode, result = OldResult,
- init_job_data = Servers} = State) when
- Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers),
- UpdatedState = State#state{result = FinalData ++ OldResult},
- reduce_or_stop(UpdatedState);
-handle_cast({done, Job, NewData},
- #state{mode = Mode,
- result = OldResult,
- tickets = Tickets,
- queue = Queue} = State) when
- Mode =:= 'compile'; Mode =:= 'warnings' ->
- {Waiting, NewQueue} = queue:out(Queue),
- NewTickets =
- case Waiting of
- empty -> Tickets+1;
- {value, Pid} ->
- activate_pid(Pid),
- Tickets
- end,
- NewResult =
- case Mode of
- 'compile' ->
- dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult);
- 'warnings' ->
- NewData ++ OldResult
- end,
- UpdatedState =
- State#state{result = NewResult, tickets = NewTickets, queue = NewQueue},
- reduce_or_stop(UpdatedState);
-handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) ->
- case SpawnCount of
- 0 ->
- send_done_to_parent(State),
- {stop, normal, State};
- _ ->
- NewState = State#state{all_spawned = true},
- {noreply, NewState}
- end;
-handle_cast({request_activation, Pid},
- #state{tickets = Tickets, queue = Queue} = State) ->
- {NewTickets, NewQueue} =
- case Tickets of
- 0 -> {Tickets, queue:in(Pid, Queue)};
- N ->
- activate_pid(Pid),
- {N-1, Queue}
- end,
- {noreply, State#state{tickets = NewTickets, queue = NewQueue}};
-handle_cast({scc_spawn, SCC},
- #state{mode = Mode,
- init_job_data = Servers,
- spawn_count = SpawnCount} = State) when
- Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()),
- true = ets:insert(?MAP, {SCC, Pid}),
- {noreply, State#state{spawn_count = SpawnCount + 1}};
-handle_cast({scc_spawn, SCC},
- #state{mode = 'warnings',
- init_job_data = Servers,
- spawn_count = SpawnCount,
- tickets = Tickets,
- queue = Queue} = State) ->
- Pid = dialyzer_worker:launch('warnings', SCC, Servers, self()),
- {NewTickets, NewQueue} =
- case Tickets of
- 0 -> {Tickets, queue:in(Pid, Queue)};
- N ->
- activate_pid(Pid),
- {N-1, Queue}
- end,
- {noreply, State#state{spawn_count = SpawnCount + 1,
- tickets = NewTickets,
- queue = NewQueue}};
-handle_cast({compiler_spawn, Filename},
- #state{mode = Mode,
- init_job_data = Servers,
- spawn_count = SpawnCount,
- tickets = Tickets,
- queue = Queue
- } = State) ->
- Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()),
- NewSpawnCount = SpawnCount + 1,
- {NewTickets, NewQueue} =
- case Tickets of
- 0 -> {Tickets, queue:in(Pid, Queue)};
- N ->
- activate_pid(Pid),
- {N-1, Queue}
- end,
- {noreply, State#state{spawn_count = NewSpawnCount,
- tickets = NewTickets,
- queue = NewQueue}}.
-
--spec handle_info(term(), #state{}) -> {noreply, #state{}}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
--spec terminate(term(), #state{}) -> ok.
-
-terminate(_Reason, _State) ->
- ok.
-
--spec code_change(term(), #state{}, term()) -> {ok, #state{}}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%--------------------------------------------------------------------
-
-cast(Message, Coordinator) ->
- gen_server:cast(Coordinator, Message).
-
-call(Message, Coordinator) ->
- gen_server:call(Coordinator, Message, infinity).
-
-reduce_or_stop(#state{all_spawned = AllSpawned,
- spawn_count = SpawnCount} = State) ->
- Action =
- case AllSpawned of
- false -> reduce;
- true ->
- case SpawnCount of
- 1 -> finish;
- _ -> reduce
- end
- end,
- case Action of
- reduce ->
- NewState = State#state{spawn_count = SpawnCount - 1},
- {noreply, NewState};
- finish ->
- send_done_to_parent(State),
- {stop, normal, State}
- end.