From 720b65deff021ddb17aaa125046f97ff13ade883 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Tue, 20 Mar 2012 12:07:42 +0100 Subject: Regulate all kinds of running workers up to the number of schedulers --- lib/dialyzer/src/dialyzer_callgraph.erl | 3 +- lib/dialyzer/src/dialyzer_coordinator.erl | 208 ++++++++++++++---------------- lib/dialyzer/src/dialyzer_typesig.erl | 2 +- lib/dialyzer/src/dialyzer_utils.erl | 12 +- lib/dialyzer/src/dialyzer_worker.erl | 37 +++++- 5 files changed, 145 insertions(+), 117 deletions(-) (limited to 'lib') diff --git a/lib/dialyzer/src/dialyzer_callgraph.erl b/lib/dialyzer/src/dialyzer_callgraph.erl index 4ff9f5920b..64e0ee88af 100644 --- a/lib/dialyzer/src/dialyzer_callgraph.erl +++ b/lib/dialyzer/src/dialyzer_callgraph.erl @@ -265,8 +265,7 @@ module_postorder(#callgraph{digraph = DG}) -> digraph_confirm_vertices(sets:to_list(Nodes), MDG), Foreach = fun({M1,M2}) -> digraph:add_edge(MDG, M1, M2) end, lists:foreach(Foreach, sets:to_list(Edges)), - PostOrder = digraph_utils:topsort(MDG), - {PostOrder, {'d', MDG}}. + {digraph_utils:topsort(MDG), {'d', MDG}}. edge_fold({{M1,_,_},{M2,_,_}}, Set) -> case M1 =/= M2 of diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 8ebbb11137..e81da5c456 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -32,18 +32,18 @@ -export([wait_activation/0, job_done/3]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/1]). +-export([sccs_to_pids/1, request_activation/1]). %%% Exports for the compilation workers -export([get_next_label/2]). --export_type([coordinator/0, mode/0, init_data/0]). +-export_type([coordinator/0, mode/0, init_data/0, result/0]). %%-------------------------------------------------------------------- -define(MAP, dialyzer_coordinator_map). --type coordinator() :: pid(). %%opaque +-type coordinator() :: {pid(), pid()}. %%opaque -type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. @@ -73,12 +73,12 @@ -type job_result() :: dialyzer_analysis_callgraph:one_file_result() | typesig_result() | dataflow_result() | warnings_result(). --record(state, {active = 0 :: integer(), - result :: result(), - next_label = 0 :: integer(), - tickets = 0 :: integer(), - queue = queue:new() :: queue(), - init_data :: init_data() +-record(state, {mode :: mode(), + active = 0 :: integer(), + result :: result(), + next_label = 0 :: integer(), + init_data :: init_data(), + regulator :: pid() }). -include("dialyzer.hrl"). @@ -96,127 +96,79 @@ parallel_job(Mode, Jobs, InitData) -> State = spawn_jobs(Mode, Jobs, InitData), - collect_result(Mode, State). - -spawn_jobs(Mode, Jobs, InitData) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - Coordinator = self(), - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + collect_result(State). + +spawn_jobs(Mode, Jobs, InitData) -> + Collector = self(), + Regulator = spawn_regulator(), + Coordinator = {Collector, Regulator}, + TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'), + case TypesigOrDataflow of + true -> + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]); + false -> ok + end, Fold = fun(Job, Count) -> Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - true = ets:insert(?MAP, {Job, Pid}), + case TypesigOrDataflow of + true -> true = ets:insert(?MAP, {Job, Pid}); + false -> request_activation(Regulator, Pid) + end, Count + 1 end, JobCount = lists:foldl(Fold, 0, Jobs), Unit = case Mode of 'typesig' -> "SCCs"; - 'dataflow' -> "modules" + _ -> "modules" end, dialyzer_timing:send_size_info(JobCount, Unit), - #state{active = JobCount, result = [], init_data = InitData}; -spawn_jobs(Mode, Jobs, InitData) when - Mode =:= 'compile'; Mode =:= 'warnings' -> - Coordinator = self(), - Fold = - fun(Job, {InTickets, InQueue, Count}) -> - Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - NewCount = Count + 1, - case InTickets of - 0 -> {InTickets, queue:in(Pid, InQueue), NewCount}; - N -> activate_pid(Pid), {N-1, InQueue, NewCount} - end - end, - CPUs = erlang:system_info(logical_processors_available), - InitTickets = 4*CPUs, - {Tickets, Queue, JobCount} = - lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), - dialyzer_timing:send_size_info(JobCount, "modules"), InitResult = case Mode of - 'warnings' -> []; - 'compile' -> dialyzer_analysis_callgraph:compile_init_result() + 'compile' -> dialyzer_analysis_callgraph:compile_init_result(); + _ -> [] end, - #state{active = JobCount, result = InitResult, next_label = 0, - tickets = Tickets, queue = Queue, init_data = InitData}. - -collect_result(Mode, State) -> - case Mode of - 'compile' -> compile_loop(State); - 'typesig' -> not_fixpoint_loop(State); - 'dataflow' -> not_fixpoint_loop(State); - 'warnings' -> warnings_loop(State) - end. + #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0, + init_data = InitData, regulator = Regulator}. -compile_loop(#state{active = Active, result = Result, - next_label = NextLabel, tickets = Tickets, - queue = Queue, init_data = InitData} = State) -> +collect_result(#state{mode = Mode, active = Active, result = Result, + next_label = NextLabel, init_data = InitData, + regulator = Regulator} = State) -> receive {next_label_request, Estimation, Pid} -> Pid ! {next_label_reply, NextLabel}, - compile_loop(State#state{next_label = NextLabel + Estimation}); + collect_result(State#state{next_label = NextLabel + Estimation}); {done, Job, Data} -> - NewResult = - dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData), + NewResult = update_result(Mode, InitData, Job, Data, Result), case Active of 1 -> - {NewResult, NextLabel}; - _ -> - NewActive = Active - 1, - {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), - NewState = - State#state{result = NewResult, active = NewActive, - queue = NewQueue, tickets = NewTickets}, - compile_loop(NewState) + kill_regulator(Regulator), + case Mode of + 'compile' -> + {NewResult, NextLabel}; + X when X =:= 'typesig'; X =:= 'dataflow' -> + ets:delete(?MAP), + NewResult; + 'warnings' -> + NewResult + end; + N -> + collect_result(State#state{result = NewResult, active = N - 1}) end end. -not_fixpoint_loop(#state{active = Active, result = Result, - init_data = InitData} = State) -> - receive - {done, _Job, Data} -> - FinalData = dialyzer_succ_typings:lookup_names(Data, InitData), - NewResult = FinalData ++ Result, - case Active of - 1 -> - ets:delete(?MAP), - NewResult; - _ -> - NewActive = Active - 1, - NewState = State#state{active = NewActive, result = NewResult}, - not_fixpoint_loop(NewState) - end - end. - -warnings_loop(#state{active = Active, result = Result, tickets = Tickets, - queue = Queue} = State) -> - receive - {done, _Job, Data} -> - NewResult = Data ++ Result, - case Active of - 1 -> NewResult; - _ -> - NewActive = Active - 1, - {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), - NewState = - State#state{result = NewResult, active = NewActive, - queue = NewQueue, tickets = NewTickets}, - warnings_loop(NewState) - end +update_result(Mode, InitData, Job, Data, Result) -> + case Mode of + 'compile' -> + dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, + InitData); + X when X =:= 'typesig'; X =:= 'dataflow' -> + dialyzer_succ_typings:lookup_names(Data, InitData) ++ Result; + 'warnings' -> + Data ++ Result end. -manage_waiting(Queue, Tickets) -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets + 1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - {NewQueue, NewTickets}. - -spec sccs_to_pids([scc() | module()]) -> {[dialyzer_worker:worker()], [scc() | module()]}. @@ -232,14 +184,15 @@ pid_partition(SCC, {Pids, Unknown}) -> -spec job_done(job(), job_result(), coordinator()) -> ok. -job_done(Job, Result, Coordinator) -> - Coordinator ! {done, Job, Result}, +job_done(Job, Result, {Collector, Regulator}) -> + Regulator ! done, + Collector ! {done, Job, Result}, ok. -spec get_next_label(integer(), coordinator()) -> integer(). -get_next_label(EstimatedSize, Coordinator) -> - Coordinator ! {next_label_request, EstimatedSize, self()}, +get_next_label(EstimatedSize, {Collector, _Regulator}) -> + Collector ! {next_label_request, EstimatedSize, self()}, receive {next_label_reply, NextLabel} -> NextLabel end. @@ -251,3 +204,42 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. + +-spec request_activation(coordinator()) -> ok. + +request_activation({_Collector, Regulator}) -> + Regulator ! {req, self()}, + wait_activation(). + +request_activation(Regulator, Pid) -> + Regulator ! {req, Pid}. + +spawn_regulator() -> + InitTickets = dialyzer_utils:parallelism(), + spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end). + +regulator_loop(Tickets, Queue) -> + receive + {req, Pid} -> + case Tickets of + 0 -> + regulator_loop(0, queue:in(Pid, Queue)); + N -> + activate_pid(Pid), + regulator_loop(N-1, Queue) + end; + done -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets + 1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, + regulator_loop(NewTickets, NewQueue); + stop -> ok + end. + +kill_regulator(Regulator) -> + Regulator ! stop. diff --git a/lib/dialyzer/src/dialyzer_typesig.erl b/lib/dialyzer/src/dialyzer_typesig.erl index 3f73afb971..6ee1795fc5 100644 --- a/lib/dialyzer/src/dialyzer_typesig.erl +++ b/lib/dialyzer/src/dialyzer_typesig.erl @@ -1743,7 +1743,7 @@ parallel_split(SCC) -> case Length > 2*?worth_it of false -> false; true -> - case min(erlang:system_info(logical_processors_available), 8) of + case min(dialyzer_utils:parallelism(), 8) of 1 -> false; CPUs -> FullShare = Length div CPUs + 1, diff --git a/lib/dialyzer/src/dialyzer_utils.erl b/lib/dialyzer/src/dialyzer_utils.erl index 2a248fb028..149e777e1f 100644 --- a/lib/dialyzer/src/dialyzer_utils.erl +++ b/lib/dialyzer/src/dialyzer_utils.erl @@ -43,7 +43,8 @@ pp_hook/0, process_record_remote_types/1, sets_filter/2, - src_compiler_opts/0 + src_compiler_opts/0, + parallelism/0 ]). -include("dialyzer.hrl"). @@ -536,3 +537,12 @@ pp_unit(Unit, Ctxt, Cont) -> pp_atom(Atom) -> String = atom_to_list(cerl:atom_val(Atom)), prettypr:text(String). + +%%------------------------------------------------------------------------------ + +-spec parallelism() -> integer(). + +parallelism() -> + CPUs = erlang:system_info(logical_processors_available), + Schedulers = erlang:system_info(schedulers), + min(CPUs, Schedulers). diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl index 453bb76dfc..cccf1d144b 100644 --- a/lib/dialyzer/src/dialyzer_worker.erl +++ b/lib/dialyzer/src/dialyzer_worker.erl @@ -20,15 +20,16 @@ -module(dialyzer_worker). --export([launch/4]). +-export([launch/4, sequential/4]). -export_type([worker/0]). -type worker() :: pid(). %%opaque --type mode() :: dialyzer_coordinator:mode(). +-type mode() :: dialyzer_coordinator:mode(). -type coordinator() :: dialyzer_coordinator:coordinator(). --type init_data() :: dialyzer_coordinator:init_data(). +-type init_data() :: dialyzer_coordinator:init_data(). +-type result() :: dialyzer_coordinator:result(). -record(state, { mode :: mode(), @@ -55,7 +56,7 @@ launch(Mode, Job, InitData, Coordinator) -> State = #state{mode = Mode, job = Job, - init_data = InitData, + init_data = InitData, coordinator = Coordinator}, InitState = case Mode of @@ -89,7 +90,7 @@ loop(running, #state{mode = 'compile'} = State) -> case start_compilation(State) of {ok, EstimatedSize, Data} -> Label = ask_coordinator_for_label(EstimatedSize, State), - dialyzer_analysis_callgraph:continue_compilation(Label, Data); + continue_compilation(Label, Data); {error, _Reason} = Error -> Error end, @@ -101,6 +102,7 @@ loop(running, #state{mode = 'warnings'} = State) -> report_to_coordinator(Result, State); loop(running, #state{mode = Mode} = State) when Mode =:= 'typesig'; Mode =:= 'dataflow' -> + request_activation(State), ?debug("Run: ~p\n",[State#state.job]), NotFixpoint = do_work(State), ok = broadcast_done(State), @@ -140,6 +142,9 @@ wait_for_success_typings(#state{depends_on = DependsOn} = State) -> State end. +request_activation(#state{coordinator = Coordinator}) -> + dialyzer_coordinator:request_activation(Coordinator). + do_work(#state{mode = Mode, job = Job, init_data = InitData}) -> case Mode of typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData); @@ -156,5 +161,27 @@ start_compilation(#state{job = Job, init_data = InitData}) -> ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) -> dialyzer_coordinator:get_next_label(EstimatedSize, Coordinator). +continue_compilation(Label, Data) -> + dialyzer_analysis_callgraph:continue_compilation(Label, Data). + collect_warnings(#state{job = Job, init_data = InitData}) -> dialyzer_succ_typings:collect_warnings(Job, InitData). + +%%------------------------------------------------------------------------------ + +-type extra() :: label() | 'unused'. + +-spec sequential(mode(), [mfa_or_funlbl()], init_data(), extra()) -> result(). + +sequential('compile', Job, InitData, Extra) -> + case dialyzer_analysis_callgraph:start_compilation(Job, InitData) of + {ok, EstimatedSize, Data} -> + {EstimatedSize, continue_compilation(Extra, Data)}; + {error, _Reason} = Error -> {0, Error} + end; +sequential('typesig', Job, InitData, _Extra) -> + dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData); +sequential('dataflow', Job, InitData, _Extra) -> + dialyzer_succ_typings:refine_one_module(Job, InitData); +sequential('warnings', Job, InitData, _Extra) -> + dialyzer_succ_typings:collect_warnings(Job, InitData). -- cgit v1.2.3