aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStavros Aronis <[email protected]>2012-03-20 12:07:42 +0100
committerHenrik Nord <[email protected]>2012-05-21 15:31:22 +0200
commit720b65deff021ddb17aaa125046f97ff13ade883 (patch)
tree612318f9445bcbdf997de037b4c2d85553ff77e0
parent4e1ed3a5666c13d442759e710d9d08280362c0bb (diff)
downloadotp-720b65deff021ddb17aaa125046f97ff13ade883.tar.gz
otp-720b65deff021ddb17aaa125046f97ff13ade883.tar.bz2
otp-720b65deff021ddb17aaa125046f97ff13ade883.zip
Regulate all kinds of running workers up to the number of schedulers
-rw-r--r--lib/dialyzer/src/dialyzer_callgraph.erl3
-rw-r--r--lib/dialyzer/src/dialyzer_coordinator.erl208
-rw-r--r--lib/dialyzer/src/dialyzer_typesig.erl2
-rw-r--r--lib/dialyzer/src/dialyzer_utils.erl12
-rw-r--r--lib/dialyzer/src/dialyzer_worker.erl37
5 files changed, 145 insertions, 117 deletions
diff --git a/lib/dialyzer/src/dialyzer_callgraph.erl b/lib/dialyzer/src/dialyzer_callgraph.erl
index 4ff9f5920b..64e0ee88af 100644
--- a/lib/dialyzer/src/dialyzer_callgraph.erl
+++ b/lib/dialyzer/src/dialyzer_callgraph.erl
@@ -265,8 +265,7 @@ module_postorder(#callgraph{digraph = DG}) ->
digraph_confirm_vertices(sets:to_list(Nodes), MDG),
Foreach = fun({M1,M2}) -> digraph:add_edge(MDG, M1, M2) end,
lists:foreach(Foreach, sets:to_list(Edges)),
- PostOrder = digraph_utils:topsort(MDG),
- {PostOrder, {'d', MDG}}.
+ {digraph_utils:topsort(MDG), {'d', MDG}}.
edge_fold({{M1,_,_},{M2,_,_}}, Set) ->
case M1 =/= M2 of
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl
index 8ebbb11137..e81da5c456 100644
--- a/lib/dialyzer/src/dialyzer_coordinator.erl
+++ b/lib/dialyzer/src/dialyzer_coordinator.erl
@@ -32,18 +32,18 @@
-export([wait_activation/0, job_done/3]).
%%% Exports for the typesig and dataflow analysis workers
--export([sccs_to_pids/1]).
+-export([sccs_to_pids/1, request_activation/1]).
%%% Exports for the compilation workers
-export([get_next_label/2]).
--export_type([coordinator/0, mode/0, init_data/0]).
+-export_type([coordinator/0, mode/0, init_data/0, result/0]).
%%--------------------------------------------------------------------
-define(MAP, dialyzer_coordinator_map).
--type coordinator() :: pid(). %%opaque
+-type coordinator() :: {pid(), pid()}. %%opaque
-type scc() :: [mfa_or_funlbl()].
-type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'.
@@ -73,12 +73,12 @@
-type job_result() :: dialyzer_analysis_callgraph:one_file_result() |
typesig_result() | dataflow_result() | warnings_result().
--record(state, {active = 0 :: integer(),
- result :: result(),
- next_label = 0 :: integer(),
- tickets = 0 :: integer(),
- queue = queue:new() :: queue(),
- init_data :: init_data()
+-record(state, {mode :: mode(),
+ active = 0 :: integer(),
+ result :: result(),
+ next_label = 0 :: integer(),
+ init_data :: init_data(),
+ regulator :: pid()
}).
-include("dialyzer.hrl").
@@ -96,127 +96,79 @@
parallel_job(Mode, Jobs, InitData) ->
State = spawn_jobs(Mode, Jobs, InitData),
- collect_result(Mode, State).
-
-spawn_jobs(Mode, Jobs, InitData) when
- Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- Coordinator = self(),
- ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
+ collect_result(State).
+
+spawn_jobs(Mode, Jobs, InitData) ->
+ Collector = self(),
+ Regulator = spawn_regulator(),
+ Coordinator = {Collector, Regulator},
+ TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'),
+ case TypesigOrDataflow of
+ true ->
+ ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]);
+ false -> ok
+ end,
Fold =
fun(Job, Count) ->
Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
- true = ets:insert(?MAP, {Job, Pid}),
+ case TypesigOrDataflow of
+ true -> true = ets:insert(?MAP, {Job, Pid});
+ false -> request_activation(Regulator, Pid)
+ end,
Count + 1
end,
JobCount = lists:foldl(Fold, 0, Jobs),
Unit =
case Mode of
'typesig' -> "SCCs";
- 'dataflow' -> "modules"
+ _ -> "modules"
end,
dialyzer_timing:send_size_info(JobCount, Unit),
- #state{active = JobCount, result = [], init_data = InitData};
-spawn_jobs(Mode, Jobs, InitData) when
- Mode =:= 'compile'; Mode =:= 'warnings' ->
- Coordinator = self(),
- Fold =
- fun(Job, {InTickets, InQueue, Count}) ->
- Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
- NewCount = Count + 1,
- case InTickets of
- 0 -> {InTickets, queue:in(Pid, InQueue), NewCount};
- N -> activate_pid(Pid), {N-1, InQueue, NewCount}
- end
- end,
- CPUs = erlang:system_info(logical_processors_available),
- InitTickets = 4*CPUs,
- {Tickets, Queue, JobCount} =
- lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs),
- dialyzer_timing:send_size_info(JobCount, "modules"),
InitResult =
case Mode of
- 'warnings' -> [];
- 'compile' -> dialyzer_analysis_callgraph:compile_init_result()
+ 'compile' -> dialyzer_analysis_callgraph:compile_init_result();
+ _ -> []
end,
- #state{active = JobCount, result = InitResult, next_label = 0,
- tickets = Tickets, queue = Queue, init_data = InitData}.
-
-collect_result(Mode, State) ->
- case Mode of
- 'compile' -> compile_loop(State);
- 'typesig' -> not_fixpoint_loop(State);
- 'dataflow' -> not_fixpoint_loop(State);
- 'warnings' -> warnings_loop(State)
- end.
+ #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0,
+ init_data = InitData, regulator = Regulator}.
-compile_loop(#state{active = Active, result = Result,
- next_label = NextLabel, tickets = Tickets,
- queue = Queue, init_data = InitData} = State) ->
+collect_result(#state{mode = Mode, active = Active, result = Result,
+ next_label = NextLabel, init_data = InitData,
+ regulator = Regulator} = State) ->
receive
{next_label_request, Estimation, Pid} ->
Pid ! {next_label_reply, NextLabel},
- compile_loop(State#state{next_label = NextLabel + Estimation});
+ collect_result(State#state{next_label = NextLabel + Estimation});
{done, Job, Data} ->
- NewResult =
- dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData),
+ NewResult = update_result(Mode, InitData, Job, Data, Result),
case Active of
1 ->
- {NewResult, NextLabel};
- _ ->
- NewActive = Active - 1,
- {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
- NewState =
- State#state{result = NewResult, active = NewActive,
- queue = NewQueue, tickets = NewTickets},
- compile_loop(NewState)
+ kill_regulator(Regulator),
+ case Mode of
+ 'compile' ->
+ {NewResult, NextLabel};
+ X when X =:= 'typesig'; X =:= 'dataflow' ->
+ ets:delete(?MAP),
+ NewResult;
+ 'warnings' ->
+ NewResult
+ end;
+ N ->
+ collect_result(State#state{result = NewResult, active = N - 1})
end
end.
-not_fixpoint_loop(#state{active = Active, result = Result,
- init_data = InitData} = State) ->
- receive
- {done, _Job, Data} ->
- FinalData = dialyzer_succ_typings:lookup_names(Data, InitData),
- NewResult = FinalData ++ Result,
- case Active of
- 1 ->
- ets:delete(?MAP),
- NewResult;
- _ ->
- NewActive = Active - 1,
- NewState = State#state{active = NewActive, result = NewResult},
- not_fixpoint_loop(NewState)
- end
- end.
-
-warnings_loop(#state{active = Active, result = Result, tickets = Tickets,
- queue = Queue} = State) ->
- receive
- {done, _Job, Data} ->
- NewResult = Data ++ Result,
- case Active of
- 1 -> NewResult;
- _ ->
- NewActive = Active - 1,
- {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
- NewState =
- State#state{result = NewResult, active = NewActive,
- queue = NewQueue, tickets = NewTickets},
- warnings_loop(NewState)
- end
+update_result(Mode, InitData, Job, Data, Result) ->
+ case Mode of
+ 'compile' ->
+ dialyzer_analysis_callgraph:add_to_result(Job, Data, Result,
+ InitData);
+ X when X =:= 'typesig'; X =:= 'dataflow' ->
+ dialyzer_succ_typings:lookup_names(Data, InitData) ++ Result;
+ 'warnings' ->
+ Data ++ Result
end.
-manage_waiting(Queue, Tickets) ->
- {Waiting, NewQueue} = queue:out(Queue),
- NewTickets =
- case Waiting of
- empty -> Tickets + 1;
- {value, Pid} ->
- activate_pid(Pid),
- Tickets
- end,
- {NewQueue, NewTickets}.
-
-spec sccs_to_pids([scc() | module()]) ->
{[dialyzer_worker:worker()], [scc() | module()]}.
@@ -232,14 +184,15 @@ pid_partition(SCC, {Pids, Unknown}) ->
-spec job_done(job(), job_result(), coordinator()) -> ok.
-job_done(Job, Result, Coordinator) ->
- Coordinator ! {done, Job, Result},
+job_done(Job, Result, {Collector, Regulator}) ->
+ Regulator ! done,
+ Collector ! {done, Job, Result},
ok.
-spec get_next_label(integer(), coordinator()) -> integer().
-get_next_label(EstimatedSize, Coordinator) ->
- Coordinator ! {next_label_request, EstimatedSize, self()},
+get_next_label(EstimatedSize, {Collector, _Regulator}) ->
+ Collector ! {next_label_request, EstimatedSize, self()},
receive
{next_label_reply, NextLabel} -> NextLabel
end.
@@ -251,3 +204,42 @@ wait_activation() ->
activate_pid(Pid) ->
Pid ! activate.
+
+-spec request_activation(coordinator()) -> ok.
+
+request_activation({_Collector, Regulator}) ->
+ Regulator ! {req, self()},
+ wait_activation().
+
+request_activation(Regulator, Pid) ->
+ Regulator ! {req, Pid}.
+
+spawn_regulator() ->
+ InitTickets = dialyzer_utils:parallelism(),
+ spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end).
+
+regulator_loop(Tickets, Queue) ->
+ receive
+ {req, Pid} ->
+ case Tickets of
+ 0 ->
+ regulator_loop(0, queue:in(Pid, Queue));
+ N ->
+ activate_pid(Pid),
+ regulator_loop(N-1, Queue)
+ end;
+ done ->
+ {Waiting, NewQueue} = queue:out(Queue),
+ NewTickets =
+ case Waiting of
+ empty -> Tickets + 1;
+ {value, Pid} ->
+ activate_pid(Pid),
+ Tickets
+ end,
+ regulator_loop(NewTickets, NewQueue);
+ stop -> ok
+ end.
+
+kill_regulator(Regulator) ->
+ Regulator ! stop.
diff --git a/lib/dialyzer/src/dialyzer_typesig.erl b/lib/dialyzer/src/dialyzer_typesig.erl
index 3f73afb971..6ee1795fc5 100644
--- a/lib/dialyzer/src/dialyzer_typesig.erl
+++ b/lib/dialyzer/src/dialyzer_typesig.erl
@@ -1743,7 +1743,7 @@ parallel_split(SCC) ->
case Length > 2*?worth_it of
false -> false;
true ->
- case min(erlang:system_info(logical_processors_available), 8) of
+ case min(dialyzer_utils:parallelism(), 8) of
1 -> false;
CPUs ->
FullShare = Length div CPUs + 1,
diff --git a/lib/dialyzer/src/dialyzer_utils.erl b/lib/dialyzer/src/dialyzer_utils.erl
index 2a248fb028..149e777e1f 100644
--- a/lib/dialyzer/src/dialyzer_utils.erl
+++ b/lib/dialyzer/src/dialyzer_utils.erl
@@ -43,7 +43,8 @@
pp_hook/0,
process_record_remote_types/1,
sets_filter/2,
- src_compiler_opts/0
+ src_compiler_opts/0,
+ parallelism/0
]).
-include("dialyzer.hrl").
@@ -536,3 +537,12 @@ pp_unit(Unit, Ctxt, Cont) ->
pp_atom(Atom) ->
String = atom_to_list(cerl:atom_val(Atom)),
prettypr:text(String).
+
+%%------------------------------------------------------------------------------
+
+-spec parallelism() -> integer().
+
+parallelism() ->
+ CPUs = erlang:system_info(logical_processors_available),
+ Schedulers = erlang:system_info(schedulers),
+ min(CPUs, Schedulers).
diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl
index 453bb76dfc..cccf1d144b 100644
--- a/lib/dialyzer/src/dialyzer_worker.erl
+++ b/lib/dialyzer/src/dialyzer_worker.erl
@@ -20,15 +20,16 @@
-module(dialyzer_worker).
--export([launch/4]).
+-export([launch/4, sequential/4]).
-export_type([worker/0]).
-type worker() :: pid(). %%opaque
--type mode() :: dialyzer_coordinator:mode().
+-type mode() :: dialyzer_coordinator:mode().
-type coordinator() :: dialyzer_coordinator:coordinator().
--type init_data() :: dialyzer_coordinator:init_data().
+-type init_data() :: dialyzer_coordinator:init_data().
+-type result() :: dialyzer_coordinator:result().
-record(state, {
mode :: mode(),
@@ -55,7 +56,7 @@
launch(Mode, Job, InitData, Coordinator) ->
State = #state{mode = Mode,
job = Job,
- init_data = InitData,
+ init_data = InitData,
coordinator = Coordinator},
InitState =
case Mode of
@@ -89,7 +90,7 @@ loop(running, #state{mode = 'compile'} = State) ->
case start_compilation(State) of
{ok, EstimatedSize, Data} ->
Label = ask_coordinator_for_label(EstimatedSize, State),
- dialyzer_analysis_callgraph:continue_compilation(Label, Data);
+ continue_compilation(Label, Data);
{error, _Reason} = Error ->
Error
end,
@@ -101,6 +102,7 @@ loop(running, #state{mode = 'warnings'} = State) ->
report_to_coordinator(Result, State);
loop(running, #state{mode = Mode} = State) when
Mode =:= 'typesig'; Mode =:= 'dataflow' ->
+ request_activation(State),
?debug("Run: ~p\n",[State#state.job]),
NotFixpoint = do_work(State),
ok = broadcast_done(State),
@@ -140,6 +142,9 @@ wait_for_success_typings(#state{depends_on = DependsOn} = State) ->
State
end.
+request_activation(#state{coordinator = Coordinator}) ->
+ dialyzer_coordinator:request_activation(Coordinator).
+
do_work(#state{mode = Mode, job = Job, init_data = InitData}) ->
case Mode of
typesig -> dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData);
@@ -156,5 +161,27 @@ start_compilation(#state{job = Job, init_data = InitData}) ->
ask_coordinator_for_label(EstimatedSize, #state{coordinator = Coordinator}) ->
dialyzer_coordinator:get_next_label(EstimatedSize, Coordinator).
+continue_compilation(Label, Data) ->
+ dialyzer_analysis_callgraph:continue_compilation(Label, Data).
+
collect_warnings(#state{job = Job, init_data = InitData}) ->
dialyzer_succ_typings:collect_warnings(Job, InitData).
+
+%%------------------------------------------------------------------------------
+
+-type extra() :: label() | 'unused'.
+
+-spec sequential(mode(), [mfa_or_funlbl()], init_data(), extra()) -> result().
+
+sequential('compile', Job, InitData, Extra) ->
+ case dialyzer_analysis_callgraph:start_compilation(Job, InitData) of
+ {ok, EstimatedSize, Data} ->
+ {EstimatedSize, continue_compilation(Extra, Data)};
+ {error, _Reason} = Error -> {0, Error}
+ end;
+sequential('typesig', Job, InitData, _Extra) ->
+ dialyzer_succ_typings:find_succ_types_for_scc(Job, InitData);
+sequential('dataflow', Job, InitData, _Extra) ->
+ dialyzer_succ_typings:refine_one_module(Job, InitData);
+sequential('warnings', Job, InitData, _Extra) ->
+ dialyzer_succ_typings:collect_warnings(Job, InitData).