diff options
author | Stavros Aronis <[email protected]> | 2012-03-20 12:07:42 +0100 |
---|---|---|
committer | Henrik Nord <[email protected]> | 2012-05-21 15:31:22 +0200 |
commit | 720b65deff021ddb17aaa125046f97ff13ade883 (patch) | |
tree | 612318f9445bcbdf997de037b4c2d85553ff77e0 /lib/dialyzer/src/dialyzer_coordinator.erl | |
parent | 4e1ed3a5666c13d442759e710d9d08280362c0bb (diff) | |
download | otp-720b65deff021ddb17aaa125046f97ff13ade883.tar.gz otp-720b65deff021ddb17aaa125046f97ff13ade883.tar.bz2 otp-720b65deff021ddb17aaa125046f97ff13ade883.zip |
Regulate all kinds of running workers up to the number of schedulers
Diffstat (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl')
-rw-r--r-- | lib/dialyzer/src/dialyzer_coordinator.erl | 208 |
1 files changed, 100 insertions, 108 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 8ebbb11137..e81da5c456 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -32,18 +32,18 @@ -export([wait_activation/0, job_done/3]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/1]). +-export([sccs_to_pids/1, request_activation/1]). %%% Exports for the compilation workers -export([get_next_label/2]). --export_type([coordinator/0, mode/0, init_data/0]). +-export_type([coordinator/0, mode/0, init_data/0, result/0]). %%-------------------------------------------------------------------- -define(MAP, dialyzer_coordinator_map). --type coordinator() :: pid(). %%opaque +-type coordinator() :: {pid(), pid()}. %%opaque -type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. @@ -73,12 +73,12 @@ -type job_result() :: dialyzer_analysis_callgraph:one_file_result() | typesig_result() | dataflow_result() | warnings_result(). --record(state, {active = 0 :: integer(), - result :: result(), - next_label = 0 :: integer(), - tickets = 0 :: integer(), - queue = queue:new() :: queue(), - init_data :: init_data() +-record(state, {mode :: mode(), + active = 0 :: integer(), + result :: result(), + next_label = 0 :: integer(), + init_data :: init_data(), + regulator :: pid() }). -include("dialyzer.hrl"). @@ -96,127 +96,79 @@ parallel_job(Mode, Jobs, InitData) -> State = spawn_jobs(Mode, Jobs, InitData), - collect_result(Mode, State). - -spawn_jobs(Mode, Jobs, InitData) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - Coordinator = self(), - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + collect_result(State). + +spawn_jobs(Mode, Jobs, InitData) -> + Collector = self(), + Regulator = spawn_regulator(), + Coordinator = {Collector, Regulator}, + TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'), + case TypesigOrDataflow of + true -> + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]); + false -> ok + end, Fold = fun(Job, Count) -> Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - true = ets:insert(?MAP, {Job, Pid}), + case TypesigOrDataflow of + true -> true = ets:insert(?MAP, {Job, Pid}); + false -> request_activation(Regulator, Pid) + end, Count + 1 end, JobCount = lists:foldl(Fold, 0, Jobs), Unit = case Mode of 'typesig' -> "SCCs"; - 'dataflow' -> "modules" + _ -> "modules" end, dialyzer_timing:send_size_info(JobCount, Unit), - #state{active = JobCount, result = [], init_data = InitData}; -spawn_jobs(Mode, Jobs, InitData) when - Mode =:= 'compile'; Mode =:= 'warnings' -> - Coordinator = self(), - Fold = - fun(Job, {InTickets, InQueue, Count}) -> - Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - NewCount = Count + 1, - case InTickets of - 0 -> {InTickets, queue:in(Pid, InQueue), NewCount}; - N -> activate_pid(Pid), {N-1, InQueue, NewCount} - end - end, - CPUs = erlang:system_info(logical_processors_available), - InitTickets = 4*CPUs, - {Tickets, Queue, JobCount} = - lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), - dialyzer_timing:send_size_info(JobCount, "modules"), InitResult = case Mode of - 'warnings' -> []; - 'compile' -> dialyzer_analysis_callgraph:compile_init_result() + 'compile' -> dialyzer_analysis_callgraph:compile_init_result(); + _ -> [] end, - #state{active = JobCount, result = InitResult, next_label = 0, - tickets = Tickets, queue = Queue, init_data = InitData}. - -collect_result(Mode, State) -> - case Mode of - 'compile' -> compile_loop(State); - 'typesig' -> not_fixpoint_loop(State); - 'dataflow' -> not_fixpoint_loop(State); - 'warnings' -> warnings_loop(State) - end. + #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0, + init_data = InitData, regulator = Regulator}. -compile_loop(#state{active = Active, result = Result, - next_label = NextLabel, tickets = Tickets, - queue = Queue, init_data = InitData} = State) -> +collect_result(#state{mode = Mode, active = Active, result = Result, + next_label = NextLabel, init_data = InitData, + regulator = Regulator} = State) -> receive {next_label_request, Estimation, Pid} -> Pid ! {next_label_reply, NextLabel}, - compile_loop(State#state{next_label = NextLabel + Estimation}); + collect_result(State#state{next_label = NextLabel + Estimation}); {done, Job, Data} -> - NewResult = - dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData), + NewResult = update_result(Mode, InitData, Job, Data, Result), case Active of 1 -> - {NewResult, NextLabel}; - _ -> - NewActive = Active - 1, - {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), - NewState = - State#state{result = NewResult, active = NewActive, - queue = NewQueue, tickets = NewTickets}, - compile_loop(NewState) + kill_regulator(Regulator), + case Mode of + 'compile' -> + {NewResult, NextLabel}; + X when X =:= 'typesig'; X =:= 'dataflow' -> + ets:delete(?MAP), + NewResult; + 'warnings' -> + NewResult + end; + N -> + collect_result(State#state{result = NewResult, active = N - 1}) end end. -not_fixpoint_loop(#state{active = Active, result = Result, - init_data = InitData} = State) -> - receive - {done, _Job, Data} -> - FinalData = dialyzer_succ_typings:lookup_names(Data, InitData), - NewResult = FinalData ++ Result, - case Active of - 1 -> - ets:delete(?MAP), - NewResult; - _ -> - NewActive = Active - 1, - NewState = State#state{active = NewActive, result = NewResult}, - not_fixpoint_loop(NewState) - end - end. - -warnings_loop(#state{active = Active, result = Result, tickets = Tickets, - queue = Queue} = State) -> - receive - {done, _Job, Data} -> - NewResult = Data ++ Result, - case Active of - 1 -> NewResult; - _ -> - NewActive = Active - 1, - {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), - NewState = - State#state{result = NewResult, active = NewActive, - queue = NewQueue, tickets = NewTickets}, - warnings_loop(NewState) - end +update_result(Mode, InitData, Job, Data, Result) -> + case Mode of + 'compile' -> + dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, + InitData); + X when X =:= 'typesig'; X =:= 'dataflow' -> + dialyzer_succ_typings:lookup_names(Data, InitData) ++ Result; + 'warnings' -> + Data ++ Result end. -manage_waiting(Queue, Tickets) -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets + 1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - {NewQueue, NewTickets}. - -spec sccs_to_pids([scc() | module()]) -> {[dialyzer_worker:worker()], [scc() | module()]}. @@ -232,14 +184,15 @@ pid_partition(SCC, {Pids, Unknown}) -> -spec job_done(job(), job_result(), coordinator()) -> ok. -job_done(Job, Result, Coordinator) -> - Coordinator ! {done, Job, Result}, +job_done(Job, Result, {Collector, Regulator}) -> + Regulator ! done, + Collector ! {done, Job, Result}, ok. -spec get_next_label(integer(), coordinator()) -> integer(). -get_next_label(EstimatedSize, Coordinator) -> - Coordinator ! {next_label_request, EstimatedSize, self()}, +get_next_label(EstimatedSize, {Collector, _Regulator}) -> + Collector ! {next_label_request, EstimatedSize, self()}, receive {next_label_reply, NextLabel} -> NextLabel end. @@ -251,3 +204,42 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. + +-spec request_activation(coordinator()) -> ok. + +request_activation({_Collector, Regulator}) -> + Regulator ! {req, self()}, + wait_activation(). + +request_activation(Regulator, Pid) -> + Regulator ! {req, Pid}. + +spawn_regulator() -> + InitTickets = dialyzer_utils:parallelism(), + spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end). + +regulator_loop(Tickets, Queue) -> + receive + {req, Pid} -> + case Tickets of + 0 -> + regulator_loop(0, queue:in(Pid, Queue)); + N -> + activate_pid(Pid), + regulator_loop(N-1, Queue) + end; + done -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets + 1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, + regulator_loop(NewTickets, NewQueue); + stop -> ok + end. + +kill_regulator(Regulator) -> + Regulator ! stop. |