diff options
author | Stavros Aronis <[email protected]> | 2012-02-28 22:49:04 +0100 |
---|---|---|
committer | Henrik Nord <[email protected]> | 2012-05-21 15:31:21 +0200 |
commit | 1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a (patch) | |
tree | 7f30f58e3e4ca2befda694a04db8e55966cf5786 /lib/dialyzer/src/dialyzer_coordinator.erl | |
parent | 913ee73601e3f7d0b27d833bd67cf6ee3868018a (diff) | |
download | otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.gz otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.tar.bz2 otp-1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a.zip |
Coordinator is no longer a separate process
Diffstat (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl')
-rw-r--r-- | lib/dialyzer/src/dialyzer_coordinator.erl | 459 |
1 files changed, 165 insertions, 294 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index c417b45717..490cfb8d49 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -21,61 +21,23 @@ %%%------------------------------------------------------------------- %%% File : dialyzer_coordinator.erl %%% Authors : Stavros Aronis <[email protected]> -%%% -%%% Description: -%%% -%%% The parallel version of Dialyzer's typesig analysis is spread over 4 modules -%%% with the intention to both minimize the changes on the original code and use -%%% a separate module for every kind of Erlang process that will be running. -%%% -%%% There are therefore 3 kinds of processes: -%%% -%%% - The original Dialyzer backend (in succ_typings module) -%%% - The worker process for the typesig analysis (in typesig and -%%% worker) -%%% - A coordinator of the worker processes (in coordinator) -%%% -%%% Operation guidelines: -%%% -%%% - The backend requests from the coordinator to spawn a worker for each SCC -%%% - The backend notifies the coordinator when all SCC have been spawned and -%%% waits for the server to report that the PLT has been updated -%%% - Each worker is responsible to notify all those who wait for it. -%%% %%%------------------------------------------------------------------- -module(dialyzer_coordinator). -%%% Exports for all possible uses of coordinator from main process --export([start/2, all_spawned/1]). +%%% Export for dialyzer main process +-export([parallel_job/3]). %%% Exports for all possible workers -export([wait_activation/0, job_done/3]). -%%% Export for the typesig, dataflow and warnings main process --export([scc_spawn/2]). - -%%% Export for the typesig and dataflow analysis main process --export([receive_not_fixpoint/0]). - -%%% Export for warning main process --export([receive_warnings/0]). - %%% Exports for the typesig and dataflow analysis workers --export([request_activation/1, sccs_to_pids/1]). - -%%% Exports for the compilation main process --export([compiler_spawn/2, receive_compilation_data/0]). +-export([sccs_to_pids/1]). %%% Exports for the compilation workers -export([get_next_label/2, compilation_done/3]). --export_type([coordinator/0, mode/0]). - --behaviour(gen_server). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export_type([coordinator/0, mode/0, init_data/0]). %%-------------------------------------------------------------------- @@ -83,41 +45,169 @@ -type coordinator() :: pid(). %%opaque --type scc() :: [mfa_or_funlbl()] | module(). +-type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. --type servers() :: dialyzer_succ_typings:servers() | - dialyzer_analysis_callgraph:servers() | - dialyzer_succ_typings:warning_servers(). - --record(state, {parent :: pid(), - mode :: mode(), - spawn_count = 0 :: integer(), - all_spawned = false :: boolean(), - next_label :: integer(), - result :: [mfa_or_funlbl()] | - dialyzer_analysis_callgraph:result() | - [dial_warning()], - init_job_data :: servers(), - tickets :: integer(), - queue :: queue() + +-type compile_jobs() :: [file:filename()]. +-type typesig_jobs() :: [scc()]. +-type dataflow_jobs() :: [module()]. +-type warnings_jobs() :: [module()]. + +-type compile_init_data() :: dialyzer_analysis_callgraph:compile_init_data(). +-type typesig_init_data() :: dialyzer_succ_typings:typesig_init_data(). +-type dataflow_init_data() :: dialyzer_succ_typings:dataflow_init_data(). +-type warnings_init_data() :: dialyzer_succ_typings:warnings_init_data(). + +-type compile_result() :: dialyzer_analysis_callgraph:compile_result(). +-type typesig_result() :: [mfa_or_funlbl()]. +-type dataflow_result() :: [mfa_or_funlbl()]. +-type warnings_result() :: [dial_warning()]. + +-type init_data() :: compile_init_data() | typesig_init_data() | + dataflow_init_data() | warnings_init_data(). + +-type result() :: compile_result() | typesig_result() | + dataflow_result() | warnings_result(). + +-record(state, {active = 0 :: integer(), + result :: result(), + next_label = 0 :: integer(), + tickets = 0 :: integer(), + queue = queue:new() :: queue(), + init_data :: init_data() }). -include("dialyzer.hrl"). %%-------------------------------------------------------------------- --spec start(mode(), servers()) -> coordinator(). +-spec parallel_job('compile', compile_jobs(), compile_init_data()) -> + {compile_result(), integer()}; + ('typesig', typesig_jobs(), typesig_init_data()) -> + typesig_result(); + ('dataflow', dataflow_jobs(), dataflow_init_data()) -> + dataflow_result(); + ('warnings', warnings_jobs(), warnings_init_data()) -> + warnings_result(). -start(Mode, Servers) -> - {ok, Pid} = gen_server:start_link(?MODULE, {self(), Mode, Servers}, []), - Pid. +parallel_job(Mode, Jobs, InitData) -> + State = spawn_jobs(Mode, Jobs, InitData), + collect_result(Mode, State). --spec scc_spawn(scc() | module(), coordinator()) -> ok. +spawn_jobs(Mode, Jobs, InitData) when + Mode =:= 'typesig'; Mode =:= 'dataflow' -> + Coordinator = self(), + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + Fold = + fun(Job, Count) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + true = ets:insert(?MAP, {Job, Pid}), + Count + 1 + end, + JobCount = lists:foldl(Fold, 0, Jobs), + #state{active = JobCount, result = [], init_data = InitData}; +spawn_jobs(Mode, Jobs, InitData) when + Mode =:= 'compile'; Mode =:= 'warnings' -> + Coordinator = self(), + Fold = + fun(Job, {InTickets, InQueue, Count}) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + NewCount = Count + 1, + case InTickets of + 0 -> {InTickets, queue:in(Pid, InQueue), NewCount}; + N -> activate_pid(Pid), {N-1, InQueue, NewCount} + end + end, + CPUs = erlang:system_info(logical_processors_available), + InitTickets = 4*CPUs, + {Tickets, Queue, JobCount} = + lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), + InitResult = + case Mode of + 'warnings' -> []; + 'compile' -> dialyzer_analysis_callgraph:compile_init_result() + end, + #state{active = JobCount, result = InitResult, next_label = 0, + tickets = Tickets, queue = Queue, init_data = InitData}. + +collect_result(Mode, State) -> + case Mode of + 'compile' -> compile_loop(State); + 'typesig' -> not_fixpoint_loop(State); + 'dataflow' -> not_fixpoint_loop(State); + 'warnings' -> warnings_loop(State) + end. -scc_spawn(SCC, Coordinator) -> - cast({scc_spawn, SCC}, Coordinator). +compile_loop(#state{active = Active, result = Result, + next_label = NextLabel, tickets = Tickets, + queue = Queue, init_data = InitData} = State) -> + receive + {next_label_request, Estimation, Pid} -> + Pid ! {next_label_reply, NextLabel}, + compile_loop(State#state{next_label = NextLabel + Estimation}); + {done, Job, Data} -> + NewResult = + dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData), + case Active of + 1 -> + {NewResult, NextLabel}; + _ -> + NewActive = Active - 1, + {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), + NewState = + State#state{result = NewResult, active = NewActive, + queue = NewQueue, tickets = NewTickets}, + compile_loop(NewState) + end + end. --spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}. +not_fixpoint_loop(#state{active = Active, result = Result, + init_data = InitData} = State) -> + receive + {done, _Job, Data} -> + FinalData = dialyzer_succ_typings:lookup_names(Data, InitData), + NewResult = FinalData ++ Result, + case Active of + 1 -> + ets:delete(?MAP), + NewResult; + _ -> + NewActive = Active - 1, + NewState = State#state{active = NewActive, result = NewResult}, + not_fixpoint_loop(NewState) + end + end. + +warnings_loop(#state{active = Active, result = Result, tickets = Tickets, + queue = Queue} = State) -> + receive + {done, _Job, Data} -> + NewResult = Data ++ Result, + case Active of + 1 -> NewResult; + _ -> + NewActive = Active - 1, + {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), + NewState = + State#state{result = NewResult, active = NewActive, + queue = NewQueue, tickets = NewTickets}, + warnings_loop(NewState) + end + end. + +manage_waiting(Queue, Tickets) -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets + 1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, + {NewQueue, NewTickets}. + +-spec sccs_to_pids([scc() | module()]) -> + {[dialyzer_worker:worker()], [scc() | module()]}. sccs_to_pids(SCCs) -> lists:foldl(fun pid_partition/2, {[], []}, SCCs). @@ -129,70 +219,27 @@ pid_partition(SCC, {Pids, Unknown}) -> _:_ -> {Pids, [SCC|Unknown]} end. --spec job_done(scc() | file:filename(), term(), coordinator()) -> ok. +-spec job_done(scc() | module() | file:filename(), term(), coordinator()) -> ok. job_done(Job, Result, Coordinator) -> - cast({done, Job, Result}, Coordinator). + Coordinator ! {done, Job, Result}, + ok. -spec compilation_done(file:filename(), - dialyzer_analysis_callgraph:compilation_data(), + dialyzer_analysis_callgraph:compile_result(), coordinator()) -> ok. compilation_done(Filename, CompilationData, Coordinator) -> - cast({done, Filename, CompilationData}, Coordinator). - --spec all_spawned(coordinator()) -> ok. - -all_spawned(Coordinator) -> - cast(all_spawned, Coordinator). - -send_done_to_parent(#state{mode = Mode, - parent = Parent, - result = Result, - next_label = NextLabel}) -> - Msg = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - ets:delete(?MAP), - {not_fixpoint, Result}; - 'compile' -> {compilation_data, Result, NextLabel}; - 'warnings' -> {warnings, Result} - end, - Parent ! Msg, + Coordinator ! {done, Filename, CompilationData}, ok. --spec receive_not_fixpoint() -> [mfa_or_funlbl()]. - -receive_not_fixpoint() -> - receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. - --spec receive_compilation_data() -> - {dialyzer_analysis_callgraph:result(), integer()}. - -receive_compilation_data() -> - receive {compilation_data, CompilationData, NextLabel} -> - {CompilationData, NextLabel} - end. - --spec receive_warnings() -> [dial_warning()]. - -receive_warnings() -> - receive {warnings, Warnings} -> Warnings end. - --spec compiler_spawn(file:filename(), coordinator()) -> ok. - -compiler_spawn(Filename, Coordinator) -> - cast({compiler_spawn, Filename}, Coordinator). - -spec get_next_label(integer(), coordinator()) -> integer(). get_next_label(EstimatedSize, Coordinator) -> - call({get_next_label, EstimatedSize}, Coordinator). - --spec request_activation(coordinator()) -> ok. - -request_activation(Coordinator) -> - cast({request_activation, self()}, Coordinator). + Coordinator ! {next_label_request, EstimatedSize, self()}, + receive + {next_label_reply, NextLabel} -> NextLabel + end. -spec wait_activation() -> ok. @@ -201,179 +248,3 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. - -%%-------------------------------------------------------------------- - --spec init({pid(), mode(), servers()}) -> {ok, #state{}}. - -init({Parent, Mode, InitJobData}) -> - BaseTickets = erlang:system_info(logical_processors_available), - Tickets = - case Mode of - 'compile' -> 4*BaseTickets; - 'warnings' -> 4*BaseTickets; - _ -> non_regulated - end, - InitState = - #state{parent = Parent, mode = Mode, init_job_data = InitJobData, - tickets = Tickets, queue = queue:new()}, - State = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), - InitState#state{result = []}; - 'warnings' -> - InitState#state{result = []}; - 'compile' -> - InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), - InitState#state{result = InitResult, next_label = 0} - end, - {ok, State}. - --spec handle_call(Query::term(), From::term(), #state{}) -> - {reply, Reply::term(), #state{}}. - -handle_call({get_next_label, EstimatedSize}, _From, - #state{next_label = NextLabel} = State) -> - {reply, NextLabel, State#state{next_label = NextLabel + EstimatedSize}}. - --spec handle_cast(Msg::term(), #state{}) -> - {noreply, #state{}} | {stop, normal, #state{}}. - -handle_cast({done, _Job, NewData}, - #state{mode = Mode, result = OldResult, - init_job_data = Servers} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - UpdatedState = State#state{result = FinalData ++ OldResult}, - reduce_or_stop(UpdatedState); -handle_cast({done, Job, NewData}, - #state{mode = Mode, - result = OldResult, - tickets = Tickets, - queue = Queue} = State) when - Mode =:= 'compile'; Mode =:= 'warnings' -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets+1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - NewResult = - case Mode of - 'compile' -> - dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); - 'warnings' -> - NewData ++ OldResult - end, - UpdatedState = - State#state{result = NewResult, tickets = NewTickets, queue = NewQueue}, - reduce_or_stop(UpdatedState); -handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> - case SpawnCount of - 0 -> - send_done_to_parent(State), - {stop, normal, State}; - _ -> - NewState = State#state{all_spawned = true}, - {noreply, NewState} - end; -handle_cast({request_activation, Pid}, - #state{tickets = Tickets, queue = Queue} = State) -> - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{tickets = NewTickets, queue = NewQueue}}; -handle_cast({scc_spawn, SCC}, - #state{mode = Mode, - init_job_data = Servers, - spawn_count = SpawnCount} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - true = ets:insert(?MAP, {SCC, Pid}), - {noreply, State#state{spawn_count = SpawnCount + 1}}; -handle_cast({scc_spawn, SCC}, - #state{mode = 'warnings', - init_job_data = Servers, - spawn_count = SpawnCount, - tickets = Tickets, - queue = Queue} = State) -> - Pid = dialyzer_worker:launch('warnings', SCC, Servers, self()), - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{spawn_count = SpawnCount + 1, - tickets = NewTickets, - queue = NewQueue}}; -handle_cast({compiler_spawn, Filename}, - #state{mode = Mode, - init_job_data = Servers, - spawn_count = SpawnCount, - tickets = Tickets, - queue = Queue - } = State) -> - Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()), - NewSpawnCount = SpawnCount + 1, - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{spawn_count = NewSpawnCount, - tickets = NewTickets, - queue = NewQueue}}. - --spec handle_info(term(), #state{}) -> {noreply, #state{}}. - -handle_info(_Info, State) -> - {noreply, State}. - --spec terminate(term(), #state{}) -> ok. - -terminate(_Reason, _State) -> - ok. - --spec code_change(term(), #state{}, term()) -> {ok, #state{}}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -cast(Message, Coordinator) -> - gen_server:cast(Coordinator, Message). - -call(Message, Coordinator) -> - gen_server:call(Coordinator, Message, infinity). - -reduce_or_stop(#state{all_spawned = AllSpawned, - spawn_count = SpawnCount} = State) -> - Action = - case AllSpawned of - false -> reduce; - true -> - case SpawnCount of - 1 -> finish; - _ -> reduce - end - end, - case Action of - reduce -> - NewState = State#state{spawn_count = SpawnCount - 1}, - {noreply, NewState}; - finish -> - send_done_to_parent(State), - {stop, normal, State} - end. |