aboutsummaryrefslogtreecommitdiffstats
path: root/lib/dialyzer/src/dialyzer_coordinator.erl
diff options
context:
space:
mode:
authorStavros Aronis <[email protected]>2012-03-20 12:07:42 +0100
committerHenrik Nord <[email protected]>2012-05-21 15:31:22 +0200
commit720b65deff021ddb17aaa125046f97ff13ade883 (patch)
tree612318f9445bcbdf997de037b4c2d85553ff77e0 /lib/dialyzer/src/dialyzer_coordinator.erl
parent4e1ed3a5666c13d442759e710d9d08280362c0bb (diff)
downloadotp-720b65deff021ddb17aaa125046f97ff13ade883.tar.gz
otp-720b65deff021ddb17aaa125046f97ff13ade883.tar.bz2
otp-720b65deff021ddb17aaa125046f97ff13ade883.zip
Regulate all kinds of running workers up to the number of schedulers
Diffstat (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl')
-rw-r--r--lib/dialyzer/src/dialyzer_coordinator.erl208
1 files changed, 100 insertions, 108 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl
index 8ebbb11137..e81da5c456 100644
--- a/lib/dialyzer/src/dialyzer_coordinator.erl
+++ b/lib/dialyzer/src/dialyzer_coordinator.erl
@@ -32,18 +32,18 @@
-export([wait_activation/0, job_done/3]).
%%% Exports for the typesig and dataflow analysis workers
--export([sccs_to_pids/1]).
+-export([sccs_to_pids/1, request_activation/1]).
%%% Exports for the compilation workers
-export([get_next_label/2]).
--export_type([coordinator/0, mode/0, init_data/0]).
+-export_type([coordinator/0, mode/0, init_data/0, result/0]).
%%--------------------------------------------------------------------
-define(MAP, dialyzer_coordinator_map).
--type coordinator() :: pid(). %%opaque
+-type coordinator() :: {pid(), pid()}. %%opaque
-type scc() :: [mfa_or_funlbl()].
-type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'.
@@ -73,12 +73,12 @@
-type job_result() :: dialyzer_analysis_callgraph:one_file_result() |
typesig_result() | dataflow_result() | warnings_result().
--record(state, {active = 0 :: integer(),
- result :: result(),
- next_label = 0 :: integer(),
- tickets = 0 :: integer(),
- queue = queue:new() :: queue(),
- init_data :: init_data()
+-record(state, {mode :: mode(),
+ active = 0 :: integer(),
+ result :: result(),
+ next_label = 0 :: integer(),
+ init_data :: init_data(),
+ regulator :: pid()
}).
-include("dialyzer.hrl").
@@ -96,127 +96,79 @@
parallel_job(Mode, Jobs, InitData) ->
State = spawn_jobs(Mode, Jobs, InitData),
- collect_result(Mode, State).
-
-spawn_jobs(Mode, Jobs, InitData) when
- Mode =:= 'typesig'; Mode =:= 'dataflow' ->
- Coordinator = self(),
- ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]),
+ collect_result(State).
+
+spawn_jobs(Mode, Jobs, InitData) ->
+ Collector = self(),
+ Regulator = spawn_regulator(),
+ Coordinator = {Collector, Regulator},
+ TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'),
+ case TypesigOrDataflow of
+ true ->
+ ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]);
+ false -> ok
+ end,
Fold =
fun(Job, Count) ->
Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
- true = ets:insert(?MAP, {Job, Pid}),
+ case TypesigOrDataflow of
+ true -> true = ets:insert(?MAP, {Job, Pid});
+ false -> request_activation(Regulator, Pid)
+ end,
Count + 1
end,
JobCount = lists:foldl(Fold, 0, Jobs),
Unit =
case Mode of
'typesig' -> "SCCs";
- 'dataflow' -> "modules"
+ _ -> "modules"
end,
dialyzer_timing:send_size_info(JobCount, Unit),
- #state{active = JobCount, result = [], init_data = InitData};
-spawn_jobs(Mode, Jobs, InitData) when
- Mode =:= 'compile'; Mode =:= 'warnings' ->
- Coordinator = self(),
- Fold =
- fun(Job, {InTickets, InQueue, Count}) ->
- Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator),
- NewCount = Count + 1,
- case InTickets of
- 0 -> {InTickets, queue:in(Pid, InQueue), NewCount};
- N -> activate_pid(Pid), {N-1, InQueue, NewCount}
- end
- end,
- CPUs = erlang:system_info(logical_processors_available),
- InitTickets = 4*CPUs,
- {Tickets, Queue, JobCount} =
- lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs),
- dialyzer_timing:send_size_info(JobCount, "modules"),
InitResult =
case Mode of
- 'warnings' -> [];
- 'compile' -> dialyzer_analysis_callgraph:compile_init_result()
+ 'compile' -> dialyzer_analysis_callgraph:compile_init_result();
+ _ -> []
end,
- #state{active = JobCount, result = InitResult, next_label = 0,
- tickets = Tickets, queue = Queue, init_data = InitData}.
-
-collect_result(Mode, State) ->
- case Mode of
- 'compile' -> compile_loop(State);
- 'typesig' -> not_fixpoint_loop(State);
- 'dataflow' -> not_fixpoint_loop(State);
- 'warnings' -> warnings_loop(State)
- end.
+ #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0,
+ init_data = InitData, regulator = Regulator}.
-compile_loop(#state{active = Active, result = Result,
- next_label = NextLabel, tickets = Tickets,
- queue = Queue, init_data = InitData} = State) ->
+collect_result(#state{mode = Mode, active = Active, result = Result,
+ next_label = NextLabel, init_data = InitData,
+ regulator = Regulator} = State) ->
receive
{next_label_request, Estimation, Pid} ->
Pid ! {next_label_reply, NextLabel},
- compile_loop(State#state{next_label = NextLabel + Estimation});
+ collect_result(State#state{next_label = NextLabel + Estimation});
{done, Job, Data} ->
- NewResult =
- dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData),
+ NewResult = update_result(Mode, InitData, Job, Data, Result),
case Active of
1 ->
- {NewResult, NextLabel};
- _ ->
- NewActive = Active - 1,
- {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
- NewState =
- State#state{result = NewResult, active = NewActive,
- queue = NewQueue, tickets = NewTickets},
- compile_loop(NewState)
+ kill_regulator(Regulator),
+ case Mode of
+ 'compile' ->
+ {NewResult, NextLabel};
+ X when X =:= 'typesig'; X =:= 'dataflow' ->
+ ets:delete(?MAP),
+ NewResult;
+ 'warnings' ->
+ NewResult
+ end;
+ N ->
+ collect_result(State#state{result = NewResult, active = N - 1})
end
end.
-not_fixpoint_loop(#state{active = Active, result = Result,
- init_data = InitData} = State) ->
- receive
- {done, _Job, Data} ->
- FinalData = dialyzer_succ_typings:lookup_names(Data, InitData),
- NewResult = FinalData ++ Result,
- case Active of
- 1 ->
- ets:delete(?MAP),
- NewResult;
- _ ->
- NewActive = Active - 1,
- NewState = State#state{active = NewActive, result = NewResult},
- not_fixpoint_loop(NewState)
- end
- end.
-
-warnings_loop(#state{active = Active, result = Result, tickets = Tickets,
- queue = Queue} = State) ->
- receive
- {done, _Job, Data} ->
- NewResult = Data ++ Result,
- case Active of
- 1 -> NewResult;
- _ ->
- NewActive = Active - 1,
- {NewQueue, NewTickets} = manage_waiting(Queue, Tickets),
- NewState =
- State#state{result = NewResult, active = NewActive,
- queue = NewQueue, tickets = NewTickets},
- warnings_loop(NewState)
- end
+update_result(Mode, InitData, Job, Data, Result) ->
+ case Mode of
+ 'compile' ->
+ dialyzer_analysis_callgraph:add_to_result(Job, Data, Result,
+ InitData);
+ X when X =:= 'typesig'; X =:= 'dataflow' ->
+ dialyzer_succ_typings:lookup_names(Data, InitData) ++ Result;
+ 'warnings' ->
+ Data ++ Result
end.
-manage_waiting(Queue, Tickets) ->
- {Waiting, NewQueue} = queue:out(Queue),
- NewTickets =
- case Waiting of
- empty -> Tickets + 1;
- {value, Pid} ->
- activate_pid(Pid),
- Tickets
- end,
- {NewQueue, NewTickets}.
-
-spec sccs_to_pids([scc() | module()]) ->
{[dialyzer_worker:worker()], [scc() | module()]}.
@@ -232,14 +184,15 @@ pid_partition(SCC, {Pids, Unknown}) ->
-spec job_done(job(), job_result(), coordinator()) -> ok.
-job_done(Job, Result, Coordinator) ->
- Coordinator ! {done, Job, Result},
+job_done(Job, Result, {Collector, Regulator}) ->
+ Regulator ! done,
+ Collector ! {done, Job, Result},
ok.
-spec get_next_label(integer(), coordinator()) -> integer().
-get_next_label(EstimatedSize, Coordinator) ->
- Coordinator ! {next_label_request, EstimatedSize, self()},
+get_next_label(EstimatedSize, {Collector, _Regulator}) ->
+ Collector ! {next_label_request, EstimatedSize, self()},
receive
{next_label_reply, NextLabel} -> NextLabel
end.
@@ -251,3 +204,42 @@ wait_activation() ->
activate_pid(Pid) ->
Pid ! activate.
+
+-spec request_activation(coordinator()) -> ok.
+
+request_activation({_Collector, Regulator}) ->
+ Regulator ! {req, self()},
+ wait_activation().
+
+request_activation(Regulator, Pid) ->
+ Regulator ! {req, Pid}.
+
+spawn_regulator() ->
+ InitTickets = dialyzer_utils:parallelism(),
+ spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end).
+
+regulator_loop(Tickets, Queue) ->
+ receive
+ {req, Pid} ->
+ case Tickets of
+ 0 ->
+ regulator_loop(0, queue:in(Pid, Queue));
+ N ->
+ activate_pid(Pid),
+ regulator_loop(N-1, Queue)
+ end;
+ done ->
+ {Waiting, NewQueue} = queue:out(Queue),
+ NewTickets =
+ case Waiting of
+ empty -> Tickets + 1;
+ {value, Pid} ->
+ activate_pid(Pid),
+ Tickets
+ end,
+ regulator_loop(NewTickets, NewQueue);
+ stop -> ok
+ end.
+
+kill_regulator(Regulator) ->
+ Regulator ! stop.