From 15529b1392c3c3d452dcae3ce6786ff41fda9f13 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Fri, 17 Feb 2012 18:12:58 +0100 Subject: Generalize coordinator and worker --- lib/dialyzer/src/dialyzer_coordinator.erl | 215 ++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 lib/dialyzer/src/dialyzer_coordinator.erl (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl new file mode 100644 index 0000000000..a72d383365 --- /dev/null +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -0,0 +1,215 @@ +%% -*- erlang-indent-level: 2 -*- +%%----------------------------------------------------------------------- +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2006-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%%%------------------------------------------------------------------- +%%% File : dialyzer_coordinator.erl +%%% Authors : Stavros Aronis +%%% +%%% Description: +%%% +%%% The parallel version of Dialyzer's typesig analysis is spread over 4 modules +%%% with the intention to both minimize the changes on the original code and use +%%% a separate module for every kind of Erlang process that will be running. +%%% +%%% There are therefore 3 kinds of processes: +%%% +%%% - The original Dialyzer backend (in succ_typings module) +%%% - The worker process for the typesig analysis (in typesig and +%%% worker) +%%% - A coordinator of the worker processes (in coordinator) +%%% +%%% Operation guidelines: +%%% +%%% - The backend requests from the coordinator to spawn a worker for each SCC +%%% - The backend notifies the coordinator when all SCC have been spawned and +%%% waits for the server to report that the PLT has been updated +%%% - Each worker is responsible to notify all those who wait for it. +%%% +%%%------------------------------------------------------------------- + +-module(dialyzer_coordinator). + +-export([ + all_spawned/1, + scc_done/3, + scc_spawn/2, + sccs_to_pids_reply/0, + sccs_to_pids_request/2, + start/1, + receive_not_fixpoint/0 + ]). + +-behaviour(gen_server). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-type coordinator() :: pid(). +-type map() :: dict(). +-type scc() :: [mfa_or_funlbl()]. + +-record(state, {parent :: pid(), + spawn_count = 0 :: integer(), + all_spawned = false :: boolean(), + scc_to_pid = new_map() :: map(), + not_fixpoint = [] :: [mfa_or_funlbl()], + servers :: dialyzer_typesig:servers() + }). + +-include("dialyzer.hrl"). + +%%-------------------------------------------------------------------- + +-spec start(dialyzer_typesig:servers()) -> pid(). + +start(Servers) -> + {ok, Pid} = gen_server:start(?MODULE, {self(), Servers}, []), + Pid. + +-spec scc_spawn(scc(), coordinator()) -> ok. + +scc_spawn(SCC, Coordinator) -> + cast({scc_spawn, SCC}, Coordinator). + +-spec sccs_to_pids_request([scc()], coordinator()) -> ok. + +sccs_to_pids_request(SCCs, Coordinator) -> + cast({sccs_to_pids, SCCs, self()}, Coordinator). + +scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> + Pids = [fetch_map(SCC, SCCtoPID) || SCC <- SCCs], + Worker ! {sccs_to_pids, Pids}, + ok. + +-spec sccs_to_pids_reply() -> [dialyzer_worker:worker()]. + +sccs_to_pids_reply() -> + receive {sccs_to_pids, Pids} -> Pids end. + +-spec scc_done(scc(), scc(), coordinator()) -> ok. + +scc_done(SCC, NotFixpoint, Coordinator) -> + cast({scc_done, SCC, NotFixpoint}, Coordinator). + +-spec all_spawned(coordinator()) -> ok. + +all_spawned(Coordinator) -> + cast(all_spawned, Coordinator). + +send_done_to_parent(#state{parent = Parent, not_fixpoint = NotFixpoint}) -> + Parent ! {not_fixpoint, NotFixpoint}. + +-spec receive_not_fixpoint() -> dialyzer_plt:plt(). + +receive_not_fixpoint() -> + receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. + +%%-------------------------------------------------------------------- + +-spec init([]) -> {ok, #state{}}. + +init({Parent, Servers}) -> + {ok, #state{parent = Parent, servers = Servers}}. + +-spec handle_call(Query::term(), From::term(), #state{}) -> + {reply, Reply::term(), #state{}}. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +-spec handle_cast(Msg::term(), #state{}) -> + {noreply, #state{}} | {stop, normal, #state{}}. + +handle_cast({scc_done, _SCC, NotFixpoint}, + #state{spawn_count = SpawnCount, + all_spawned = AllSpawned, + not_fixpoint = OldNotFixpoint + } = State) -> + NewNotFixpoint = ordsets:union(OldNotFixpoint, NotFixpoint), + UpdatedState = State#state{not_fixpoint = NewNotFixpoint}, + Action = + case AllSpawned of + false -> reduce; + true -> + case SpawnCount of + 1 -> finish; + _ -> reduce + end + end, + case Action of + reduce -> + NewState = UpdatedState#state{spawn_count = SpawnCount - 1}, + {noreply, NewState}; + finish -> + send_done_to_parent(UpdatedState), + {stop, normal, State} + end; +handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> + case SpawnCount of + 0 -> + send_done_to_parent(State), + {stop, normal, State}; + _ -> + NewState = State#state{all_spawned = true}, + {noreply, NewState} + end; +handle_cast({sccs_to_pids, SCCs, Worker}, + #state{scc_to_pid = SCCtoPID} = State) -> + scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), + {noreply, State}; +handle_cast({scc_spawn, SCC}, + #state{servers = Servers, + spawn_count = SpawnCount, + scc_to_pid = SCCtoPID + } = State) -> + Pid = dialyzer_worker:launch(SCC, Servers), + {noreply, + State#state{spawn_count = SpawnCount + 1, + scc_to_pid = store_map(SCC, Pid, SCCtoPID)} + }. + +-spec handle_info(term(), #state{}) -> {noreply, #state{}}. + +handle_info(_Info, State) -> + {noreply, State}. + +-spec terminate(term(), #state{}) -> ok. + +terminate(_Reason, _State) -> + ok. + +-spec code_change(term(), #state{}, term()) -> {ok, #state{}}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- + +cast(Message, Coordinator) -> + gen_server:cast(Coordinator, Message). + +new_map() -> + dict:new(). + +store_map(Key, Value, Map) -> + dict:store(Key, Value, Map). + +fetch_map(Key, Map) -> + dict:fetch(Key, Map). -- cgit v1.2.3 From 12c5985b862c5e8e7e88033a21e909b51225d76f Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Fri, 17 Feb 2012 18:47:58 +0100 Subject: Parallel dataflow --- lib/dialyzer/src/dialyzer_coordinator.erl | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index a72d383365..fa78670883 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -52,7 +52,7 @@ scc_spawn/2, sccs_to_pids_reply/0, sccs_to_pids_request/2, - start/1, + start/2, receive_not_fixpoint/0 ]). @@ -64,8 +64,10 @@ -type coordinator() :: pid(). -type map() :: dict(). -type scc() :: [mfa_or_funlbl()]. +-type mode() :: typesig | dataflow. -record(state, {parent :: pid(), + mode :: mode(), spawn_count = 0 :: integer(), all_spawned = false :: boolean(), scc_to_pid = new_map() :: map(), @@ -77,10 +79,10 @@ %%-------------------------------------------------------------------- --spec start(dialyzer_typesig:servers()) -> pid(). +-spec start(mode(), dialyzer_typesig:servers()) -> pid(). -start(Servers) -> - {ok, Pid} = gen_server:start(?MODULE, {self(), Servers}, []), +start(Mode, Servers) -> + {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), Pid. -spec scc_spawn(scc(), coordinator()) -> ok. @@ -123,10 +125,10 @@ receive_not_fixpoint() -> %%-------------------------------------------------------------------- --spec init([]) -> {ok, #state{}}. +-spec init({pid(), mode(), dialyzer_succ_typings:servers()}) -> {ok, #state{}}. -init({Parent, Servers}) -> - {ok, #state{parent = Parent, servers = Servers}}. +init({Parent, Mode, Servers}) -> + {ok, #state{parent = Parent, mode = Mode, servers = Servers}}. -spec handle_call(Query::term(), From::term(), #state{}) -> {reply, Reply::term(), #state{}}. @@ -175,11 +177,12 @@ handle_cast({sccs_to_pids, SCCs, Worker}, scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), {noreply, State}; handle_cast({scc_spawn, SCC}, - #state{servers = Servers, + #state{mode = Mode, + servers = Servers, spawn_count = SpawnCount, scc_to_pid = SCCtoPID } = State) -> - Pid = dialyzer_worker:launch(SCC, Servers), + Pid = dialyzer_worker:launch(Mode, SCC, Servers), {noreply, State#state{spawn_count = SpawnCount + 1, scc_to_pid = store_map(SCC, Pid, SCCtoPID)} -- cgit v1.2.3 From 08d6fa6c97be82c4b4a480ec04aa06ae8e781783 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Sun, 19 Feb 2012 01:40:54 +0100 Subject: Parallel compilation of files under analysis --- lib/dialyzer/src/dialyzer_coordinator.erl | 235 ++++++++++++++++++++---------- 1 file changed, 156 insertions(+), 79 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index fa78670883..d8a3ef2bd2 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -46,15 +46,26 @@ -module(dialyzer_coordinator). --export([ - all_spawned/1, - scc_done/3, - scc_spawn/2, +%%% Exports for all possible uses of coordinator +-export([start/2, + all_spawned/1]). + +%%% Exports for the typesig and dataflow analysis main process +-export([scc_spawn/2, + receive_not_fixpoint/0]). + +%%% Exports for the typesig and dataflow analysis workers +-export([scc_done/3, sccs_to_pids_reply/0, - sccs_to_pids_request/2, - start/2, - receive_not_fixpoint/0 - ]). + sccs_to_pids_request/2]). + +%%% Exports for the compilation main process +-export([compiler_spawn/2, + receive_compilation_data/0]). + +%%% Exports for the compilation workers +-export([compilation_done/3, + get_next_label/2]). -behaviour(gen_server). @@ -64,129 +75,192 @@ -type coordinator() :: pid(). -type map() :: dict(). -type scc() :: [mfa_or_funlbl()]. --type mode() :: typesig | dataflow. +-type mode() :: 'typesig' | 'dataflow' | 'compile'. -record(state, {parent :: pid(), mode :: mode(), spawn_count = 0 :: integer(), all_spawned = false :: boolean(), - scc_to_pid = new_map() :: map(), - not_fixpoint = [] :: [mfa_or_funlbl()], - servers :: dialyzer_typesig:servers() + job_to_pid :: map(), + next_label :: integer(), + result :: [mfa_or_funlbl()] | + dialyzer_analysis_callgraph:result(), + init_job_data :: dialyzer_typesig:servers() }). -include("dialyzer.hrl"). %%-------------------------------------------------------------------- --spec start(mode(), dialyzer_typesig:servers()) -> pid(). +-spec start('typesig' | 'dataflow', dialyzer_typesig:servers()) -> pid(); + ('compile', dialyzer_analysis_callgraph:servers()) -> pid(). start(Mode, Servers) -> - {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), - Pid. + {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), + Pid. -spec scc_spawn(scc(), coordinator()) -> ok. scc_spawn(SCC, Coordinator) -> - cast({scc_spawn, SCC}, Coordinator). + cast({scc_spawn, SCC}, Coordinator). -spec sccs_to_pids_request([scc()], coordinator()) -> ok. sccs_to_pids_request(SCCs, Coordinator) -> - cast({sccs_to_pids, SCCs, self()}, Coordinator). + cast({sccs_to_pids, SCCs, self()}, Coordinator). scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> - Pids = [fetch_map(SCC, SCCtoPID) || SCC <- SCCs], - Worker ! {sccs_to_pids, Pids}, - ok. + Pids = [fetch_map(SCC, SCCtoPID) || SCC <- SCCs], + Worker ! {sccs_to_pids, Pids}, + ok. -spec sccs_to_pids_reply() -> [dialyzer_worker:worker()]. sccs_to_pids_reply() -> - receive {sccs_to_pids, Pids} -> Pids end. + receive {sccs_to_pids, Pids} -> Pids end. -spec scc_done(scc(), scc(), coordinator()) -> ok. scc_done(SCC, NotFixpoint, Coordinator) -> - cast({scc_done, SCC, NotFixpoint}, Coordinator). + cast({done, SCC, NotFixpoint}, Coordinator). + +-spec compilation_done(file:filename(), + dialyzer_analysis_callgraph:compilation_data(), + coordinator()) -> ok. + +compilation_done(Filename, CompilationData, Coordinator) -> + cast({done, Filename, CompilationData}, Coordinator). -spec all_spawned(coordinator()) -> ok. all_spawned(Coordinator) -> - cast(all_spawned, Coordinator). - -send_done_to_parent(#state{parent = Parent, not_fixpoint = NotFixpoint}) -> - Parent ! {not_fixpoint, NotFixpoint}. + cast(all_spawned, Coordinator). + +send_done_to_parent(#state{mode = Mode, + parent = Parent, + result = Result, + next_label = NextLabel}) -> + Msg = + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> {not_fixpoint, Result}; + 'compile' -> {compilation_data, Result, NextLabel} + end, + Parent ! Msg. -spec receive_not_fixpoint() -> dialyzer_plt:plt(). receive_not_fixpoint() -> - receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. + receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. -%%-------------------------------------------------------------------- +-spec receive_compilation_data() -> + {dialyzer_analysis_callgraph:result(), integer()}. + +receive_compilation_data() -> + receive {compilation_data, CompilationData, NextLabel} -> + {CompilationData, NextLabel} + end. + +-spec compiler_spawn(file:filename(), coordinator()) -> ok. --spec init({pid(), mode(), dialyzer_succ_typings:servers()}) -> {ok, #state{}}. +compiler_spawn(Filename, Coordinator) -> + cast({compiler_spawn, Filename}, Coordinator). -init({Parent, Mode, Servers}) -> - {ok, #state{parent = Parent, mode = Mode, servers = Servers}}. +-spec get_next_label(integer(), coordinator()) -> integer(). + +get_next_label(EstimatedSize, Coordinator) -> + call({get_next_label, EstimatedSize}, Coordinator). + +%%-------------------------------------------------------------------- + +-spec init({pid(), mode(), dialyzer_succ_typings:servers() | + dialyzer_analysis_callgraph:servers()}) -> {ok, #state{}}. + +init({Parent, Mode, InitJobData}) -> + InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, + State = + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> + InitState#state{result = [], job_to_pid = new_map()}; + 'compile' -> + InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), + InitState#state{result = InitResult, next_label = 0} + end, + {ok, State}. -spec handle_call(Query::term(), From::term(), #state{}) -> - {reply, Reply::term(), #state{}}. + {reply, Reply::term(), #state{}}. -handle_call(_Request, _From, State) -> - {reply, ok, State}. +handle_call({get_next_label, EstimatedSize}, _From, + #state{next_label = NextLabel} = State) -> + {reply, NextLabel, State#state{next_label = NextLabel + EstimatedSize}}. -spec handle_cast(Msg::term(), #state{}) -> - {noreply, #state{}} | {stop, normal, #state{}}. + {noreply, #state{}} | {stop, normal, #state{}}. -handle_cast({scc_done, _SCC, NotFixpoint}, - #state{spawn_count = SpawnCount, +handle_cast({done, Job, NewData}, + #state{mode = Mode, + spawn_count = SpawnCount, all_spawned = AllSpawned, - not_fixpoint = OldNotFixpoint + result = OldResult } = State) -> - NewNotFixpoint = ordsets:union(OldNotFixpoint, NotFixpoint), - UpdatedState = State#state{not_fixpoint = NewNotFixpoint}, - Action = - case AllSpawned of - false -> reduce; - true -> - case SpawnCount of - 1 -> finish; - _ -> reduce - end - end, - case Action of - reduce -> - NewState = UpdatedState#state{spawn_count = SpawnCount - 1}, - {noreply, NewState}; - finish -> - send_done_to_parent(UpdatedState), - {stop, normal, State} - end; + NewResult = + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> + ordsets:union(OldResult, NewData); + 'compile' -> + dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult) + end, + UpdatedState = State#state{result = NewResult}, + Action = + case AllSpawned of + false -> reduce; + true -> + case SpawnCount of + 1 -> finish; + _ -> reduce + end + end, + case Action of + reduce -> + NewState = UpdatedState#state{spawn_count = SpawnCount - 1}, + {noreply, NewState}; + finish -> + send_done_to_parent(UpdatedState), + {stop, normal, State} + end; handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> - case SpawnCount of - 0 -> - send_done_to_parent(State), - {stop, normal, State}; - _ -> - NewState = State#state{all_spawned = true}, - {noreply, NewState} - end; + case SpawnCount of + 0 -> + send_done_to_parent(State), + {stop, normal, State}; + _ -> + NewState = State#state{all_spawned = true}, + {noreply, NewState} + end; handle_cast({sccs_to_pids, SCCs, Worker}, - #state{scc_to_pid = SCCtoPID} = State) -> - scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), - {noreply, State}; + #state{job_to_pid = SCCtoPID} = State) -> + scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), + {noreply, State}; handle_cast({scc_spawn, SCC}, #state{mode = Mode, - servers = Servers, + init_job_data = Servers, spawn_count = SpawnCount, - scc_to_pid = SCCtoPID + job_to_pid = SCCtoPID } = State) -> - Pid = dialyzer_worker:launch(Mode, SCC, Servers), - {noreply, - State#state{spawn_count = SpawnCount + 1, - scc_to_pid = store_map(SCC, Pid, SCCtoPID)} - }. + Pid = dialyzer_worker:launch(Mode, SCC, Servers), + {noreply, + State#state{spawn_count = SpawnCount + 1, + job_to_pid = store_map(SCC, Pid, SCCtoPID)} + }; +handle_cast({compiler_spawn, Filename}, + #state{mode = Mode, + init_job_data = Servers, + spawn_count = SpawnCount + } = State) -> + dialyzer_worker:launch(Mode, Filename, Servers), + {noreply, + State#state{spawn_count = SpawnCount + 1} + }. -spec handle_info(term(), #state{}) -> {noreply, #state{}}. @@ -206,13 +280,16 @@ code_change(_OldVsn, State, _Extra) -> %%-------------------------------------------------------------------- cast(Message, Coordinator) -> - gen_server:cast(Coordinator, Message). + gen_server:cast(Coordinator, Message). + +call(Message, Coordinator) -> + gen_server:call(Coordinator, Message). new_map() -> - dict:new(). + dict:new(). store_map(Key, Value, Map) -> - dict:store(Key, Value, Map). + dict:store(Key, Value, Map). fetch_map(Key, Map) -> - dict:fetch(Key, Map). + dict:fetch(Key, Map). -- cgit v1.2.3 From b8ab5bc3211fdee28000e2fd35a47dafa2a4316c Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Sun, 19 Feb 2012 22:05:47 +0100 Subject: Fix types and specs in Dialyzer --- lib/dialyzer/src/dialyzer_coordinator.erl | 63 +++++++++++++++++-------------- 1 file changed, 35 insertions(+), 28 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index d8a3ef2bd2..9bcd7e4c63 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -46,17 +46,19 @@ -module(dialyzer_coordinator). -%%% Exports for all possible uses of coordinator +%%% Exports for all possible uses of coordinator from main process -export([start/2, all_spawned/1]). +%%% Exports for all possible workers +-export([job_done/3]). + %%% Exports for the typesig and dataflow analysis main process -export([scc_spawn/2, receive_not_fixpoint/0]). %%% Exports for the typesig and dataflow analysis workers --export([scc_done/3, - sccs_to_pids_reply/0, +-export([sccs_to_pids_reply/0, sccs_to_pids_request/2]). %%% Exports for the compilation main process @@ -67,39 +69,44 @@ -export([compilation_done/3, get_next_label/2]). +-export_type([coordinator/0, mode/0]). + -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --type coordinator() :: pid(). --type map() :: dict(). --type scc() :: [mfa_or_funlbl()]. --type mode() :: 'typesig' | 'dataflow' | 'compile'. - --record(state, {parent :: pid(), - mode :: mode(), - spawn_count = 0 :: integer(), - all_spawned = false :: boolean(), - job_to_pid :: map(), - next_label :: integer(), - result :: [mfa_or_funlbl()] | - dialyzer_analysis_callgraph:result(), - init_job_data :: dialyzer_typesig:servers() +-type coordinator() :: pid(). %%opaque + +-type map() :: dict(). +-type scc() :: [mfa_or_funlbl()]. +-type mode() :: 'typesig' | 'dataflow' | 'compile'. +-type servers() :: dialyzer_succ_typings:servers() | + dialyzer_analysis_callgraph:servers(). + +-record(state, {parent :: pid(), + mode :: mode(), + spawn_count = 0 :: integer(), + all_spawned = false :: boolean(), + job_to_pid :: map(), + next_label :: integer(), + result :: [mfa_or_funlbl()] | + dialyzer_analysis_callgraph:result(), + init_job_data :: servers() }). -include("dialyzer.hrl"). %%-------------------------------------------------------------------- --spec start('typesig' | 'dataflow', dialyzer_typesig:servers()) -> pid(); +-spec start('typesig' | 'dataflow', dialyzer_succ_typings:servers()) -> pid(); ('compile', dialyzer_analysis_callgraph:servers()) -> pid(). start(Mode, Servers) -> {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), Pid. --spec scc_spawn(scc(), coordinator()) -> ok. +-spec scc_spawn(scc() | module(), coordinator()) -> ok. scc_spawn(SCC, Coordinator) -> cast({scc_spawn, SCC}, Coordinator). @@ -119,10 +126,10 @@ scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> sccs_to_pids_reply() -> receive {sccs_to_pids, Pids} -> Pids end. --spec scc_done(scc(), scc(), coordinator()) -> ok. +-spec job_done(scc() | file:filename(), term(), coordinator()) -> ok. -scc_done(SCC, NotFixpoint, Coordinator) -> - cast({done, SCC, NotFixpoint}, Coordinator). +job_done(Job, Result, Coordinator) -> + cast({done, Job, Result}, Coordinator). -spec compilation_done(file:filename(), dialyzer_analysis_callgraph:compilation_data(), @@ -145,9 +152,10 @@ send_done_to_parent(#state{mode = Mode, X when X =:= 'typesig'; X =:= 'dataflow' -> {not_fixpoint, Result}; 'compile' -> {compilation_data, Result, NextLabel} end, - Parent ! Msg. + Parent ! Msg, + ok. --spec receive_not_fixpoint() -> dialyzer_plt:plt(). +-spec receive_not_fixpoint() -> [mfa_or_funlbl()]. receive_not_fixpoint() -> receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. @@ -172,8 +180,7 @@ get_next_label(EstimatedSize, Coordinator) -> %%-------------------------------------------------------------------- --spec init({pid(), mode(), dialyzer_succ_typings:servers() | - dialyzer_analysis_callgraph:servers()}) -> {ok, #state{}}. +-spec init({pid(), mode(), servers()}) -> {ok, #state{}}. init({Parent, Mode, InitJobData}) -> InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, @@ -247,7 +254,7 @@ handle_cast({scc_spawn, SCC}, spawn_count = SpawnCount, job_to_pid = SCCtoPID } = State) -> - Pid = dialyzer_worker:launch(Mode, SCC, Servers), + Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), {noreply, State#state{spawn_count = SpawnCount + 1, job_to_pid = store_map(SCC, Pid, SCCtoPID)} @@ -257,7 +264,7 @@ handle_cast({compiler_spawn, Filename}, init_job_data = Servers, spawn_count = SpawnCount } = State) -> - dialyzer_worker:launch(Mode, Filename, Servers), + dialyzer_worker:launch(Mode, Filename, Servers, self()), {noreply, State#state{spawn_count = SpawnCount + 1} }. -- cgit v1.2.3 From 009710cec5e8b9786a5219dc3f682d6010352160 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 20 Feb 2012 15:54:43 +0100 Subject: Parallel warning generation --- lib/dialyzer/src/dialyzer_coordinator.erl | 38 +++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 12 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 9bcd7e4c63..93159ce7df 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -53,9 +53,14 @@ %%% Exports for all possible workers -export([job_done/3]). -%%% Exports for the typesig and dataflow analysis main process --export([scc_spawn/2, - receive_not_fixpoint/0]). +%%% Export for the typesig, dataflow and warnings main process +-export([scc_spawn/2]). + +%%% Export for the typesig and dataflow analysis main process +-export([receive_not_fixpoint/0]). + +%%% Export for warning main process +-export([receive_warnings/0]). %%% Exports for the typesig and dataflow analysis workers -export([sccs_to_pids_reply/0, @@ -79,10 +84,11 @@ -type coordinator() :: pid(). %%opaque -type map() :: dict(). --type scc() :: [mfa_or_funlbl()]. --type mode() :: 'typesig' | 'dataflow' | 'compile'. +-type scc() :: [mfa_or_funlbl()] | module(). +-type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. -type servers() :: dialyzer_succ_typings:servers() | - dialyzer_analysis_callgraph:servers(). + dialyzer_analysis_callgraph:servers() | + dialyzer_succ_typings:warning_servers(). -record(state, {parent :: pid(), mode :: mode(), @@ -91,7 +97,8 @@ job_to_pid :: map(), next_label :: integer(), result :: [mfa_or_funlbl()] | - dialyzer_analysis_callgraph:result(), + dialyzer_analysis_callgraph:result() | + [dial_warning()], init_job_data :: servers() }). @@ -99,8 +106,7 @@ %%-------------------------------------------------------------------- --spec start('typesig' | 'dataflow', dialyzer_succ_typings:servers()) -> pid(); - ('compile', dialyzer_analysis_callgraph:servers()) -> pid(). +-spec start(mode(), servers()) -> coordinator(). start(Mode, Servers) -> {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), @@ -150,7 +156,8 @@ send_done_to_parent(#state{mode = Mode, Msg = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> {not_fixpoint, Result}; - 'compile' -> {compilation_data, Result, NextLabel} + 'compile' -> {compilation_data, Result, NextLabel}; + 'warnings' -> {warnings, Result} end, Parent ! Msg, ok. @@ -168,6 +175,11 @@ receive_compilation_data() -> {CompilationData, NextLabel} end. +-spec receive_warnings() -> [dial_warning()]. + +receive_warnings() -> + receive {warnings, Warnings} -> Warnings end. + -spec compiler_spawn(file:filename(), coordinator()) -> ok. compiler_spawn(Filename, Coordinator) -> @@ -186,7 +198,7 @@ init({Parent, Mode, InitJobData}) -> InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, State = case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> + X when X =:= 'typesig'; X =:= 'dataflow'; X =:= 'warnings' -> InitState#state{result = [], job_to_pid = new_map()}; 'compile' -> InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), @@ -215,7 +227,9 @@ handle_cast({done, Job, NewData}, X when X =:= 'typesig'; X =:= 'dataflow' -> ordsets:union(OldResult, NewData); 'compile' -> - dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult) + dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); + 'warnings' -> + NewData ++ OldResult end, UpdatedState = State#state{result = NewResult}, Action = -- cgit v1.2.3 From cf573e2ea378bae4c43007fb457dcd8379caf547 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 20 Feb 2012 21:29:22 +0100 Subject: Simplify behaviour checking code --- lib/dialyzer/src/dialyzer_coordinator.erl | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 93159ce7df..437af69f5a 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -220,18 +220,20 @@ handle_cast({done, Job, NewData}, #state{mode = Mode, spawn_count = SpawnCount, all_spawned = AllSpawned, - result = OldResult + result = OldResult, + job_to_pid = JobToPID } = State) -> - NewResult = + {NewResult, NewJobToPID} = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> - ordsets:union(OldResult, NewData); + {ordsets:union(OldResult, NewData), dict:erase(Job, JobToPID)}; 'compile' -> - dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); + {dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult), + JobToPID}; 'warnings' -> - NewData ++ OldResult + {NewData ++ OldResult, dict:erase(Job, JobToPID)} end, - UpdatedState = State#state{result = NewResult}, + UpdatedState = State#state{result = NewResult, job_to_pid = NewJobToPID}, Action = case AllSpawned of false -> reduce; -- cgit v1.2.3 From 0bae6c82503b348e62f11edfbfc880145ef06794 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Tue, 21 Feb 2012 16:01:50 +0100 Subject: Avoid digraph_utils:condensation and ordering in typesig --- lib/dialyzer/src/dialyzer_coordinator.erl | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 437af69f5a..9d31c739d0 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -123,8 +123,8 @@ sccs_to_pids_request(SCCs, Coordinator) -> cast({sccs_to_pids, SCCs, self()}, Coordinator). scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> - Pids = [fetch_map(SCC, SCCtoPID) || SCC <- SCCs], - Worker ! {sccs_to_pids, Pids}, + Result = map_lookup(SCCs, SCCtoPID), + Worker ! {sccs_to_pids, Result}, ok. -spec sccs_to_pids_reply() -> [dialyzer_worker:worker()]. @@ -314,5 +314,12 @@ new_map() -> store_map(Key, Value, Map) -> dict:store(Key, Value, Map). -fetch_map(Key, Map) -> - dict:fetch(Key, Map). +map_lookup(SCCs, Map) -> + Fold = + fun(Key, {Mapped, Unknown}) -> + case dict:find(Key, Map) of + {ok, Value} -> {[Value|Mapped], Unknown}; + error -> {Mapped, [Key|Unknown]} + end + end, + lists:foldl(Fold, {[], []}, SCCs). -- cgit v1.2.3 From e200c6519651319ddcb874d0aff23cc21e1a9c83 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Wed, 22 Feb 2012 16:04:20 +0100 Subject: Fix specs --- lib/dialyzer/src/dialyzer_coordinator.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 9d31c739d0..1dc3e97f7c 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -127,7 +127,7 @@ scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> Worker ! {sccs_to_pids, Result}, ok. --spec sccs_to_pids_reply() -> [dialyzer_worker:worker()]. +-spec sccs_to_pids_reply() -> {[dialyzer_worker:worker()], [scc()]}. sccs_to_pids_reply() -> receive {sccs_to_pids, Pids} -> Pids end. -- cgit v1.2.3 From 44b23610234bf4028aedd326388bec503aee1026 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Wed, 22 Feb 2012 16:48:42 +0100 Subject: Coordinator translates functions requiring further analysis --- lib/dialyzer/src/dialyzer_coordinator.erl | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 1dc3e97f7c..1921f96b78 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -221,12 +221,14 @@ handle_cast({done, Job, NewData}, spawn_count = SpawnCount, all_spawned = AllSpawned, result = OldResult, - job_to_pid = JobToPID + job_to_pid = JobToPID, + init_job_data = Servers } = State) -> {NewResult, NewJobToPID} = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> - {ordsets:union(OldResult, NewData), dict:erase(Job, JobToPID)}; + FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), + {ordsets:union(OldResult, FinalData), dict:erase(Job, JobToPID)}; 'compile' -> {dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult), JobToPID}; -- cgit v1.2.3 From dd1d17c1798a601dbe6378795c9a2fdd0effcc15 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Thu, 23 Feb 2012 13:46:51 +0100 Subject: Worker PIDs are stored in an ETS table --- lib/dialyzer/src/dialyzer_coordinator.erl | 89 +++++++++++++------------------ 1 file changed, 36 insertions(+), 53 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 1921f96b78..af85dfe82d 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -63,8 +63,7 @@ -export([receive_warnings/0]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids_reply/0, - sccs_to_pids_request/2]). +-export([sccs_to_pids/1]). %%% Exports for the compilation main process -export([compiler_spawn/2, @@ -81,9 +80,12 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%%-------------------------------------------------------------------- + +-define(MAP, dialyzer_coordinator_map). + -type coordinator() :: pid(). %%opaque --type map() :: dict(). -type scc() :: [mfa_or_funlbl()] | module(). -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. -type servers() :: dialyzer_succ_typings:servers() | @@ -94,7 +96,6 @@ mode :: mode(), spawn_count = 0 :: integer(), all_spawned = false :: boolean(), - job_to_pid :: map(), next_label :: integer(), result :: [mfa_or_funlbl()] | dialyzer_analysis_callgraph:result() | @@ -117,20 +118,17 @@ start(Mode, Servers) -> scc_spawn(SCC, Coordinator) -> cast({scc_spawn, SCC}, Coordinator). --spec sccs_to_pids_request([scc()], coordinator()) -> ok. +-spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}. -sccs_to_pids_request(SCCs, Coordinator) -> - cast({sccs_to_pids, SCCs, self()}, Coordinator). - -scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> - Result = map_lookup(SCCs, SCCtoPID), - Worker ! {sccs_to_pids, Result}, - ok. +sccs_to_pids(SCCs) -> + lists:foldl(fun pid_partition/2, {[], []}, SCCs). --spec sccs_to_pids_reply() -> {[dialyzer_worker:worker()], [scc()]}. - -sccs_to_pids_reply() -> - receive {sccs_to_pids, Pids} -> Pids end. +pid_partition(SCC, {Pids, Unknown}) -> + try ets:lookup_element(?MAP, SCC, 2) of + Result -> {[Result|Pids], Unknown} + catch + _:_ -> {Pids, [SCC|Unknown]} + end. -spec job_done(scc() | file:filename(), term(), coordinator()) -> ok. @@ -198,8 +196,11 @@ init({Parent, Mode, InitJobData}) -> InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, State = case Mode of - X when X =:= 'typesig'; X =:= 'dataflow'; X =:= 'warnings' -> - InitState#state{result = [], job_to_pid = new_map()}; + X when X =:= 'typesig'; X =:= 'dataflow' -> + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + InitState#state{result = []}; + 'warnings' -> + InitState#state{result = []}; 'compile' -> InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), InitState#state{result = InitResult, next_label = 0} @@ -221,21 +222,19 @@ handle_cast({done, Job, NewData}, spawn_count = SpawnCount, all_spawned = AllSpawned, result = OldResult, - job_to_pid = JobToPID, init_job_data = Servers } = State) -> - {NewResult, NewJobToPID} = + NewResult = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - {ordsets:union(OldResult, FinalData), dict:erase(Job, JobToPID)}; + ordsets:union(OldResult, FinalData); 'compile' -> - {dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult), - JobToPID}; + dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); 'warnings' -> - {NewData ++ OldResult, dict:erase(Job, JobToPID)} + NewData ++ OldResult end, - UpdatedState = State#state{result = NewResult, job_to_pid = NewJobToPID}, + UpdatedState = State#state{result = NewResult}, Action = case AllSpawned of false -> reduce; @@ -262,21 +261,17 @@ handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> NewState = State#state{all_spawned = true}, {noreply, NewState} end; -handle_cast({sccs_to_pids, SCCs, Worker}, - #state{job_to_pid = SCCtoPID} = State) -> - scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), - {noreply, State}; handle_cast({scc_spawn, SCC}, #state{mode = Mode, init_job_data = Servers, - spawn_count = SpawnCount, - job_to_pid = SCCtoPID - } = State) -> + spawn_count = SpawnCount} = State) -> Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - {noreply, - State#state{spawn_count = SpawnCount + 1, - job_to_pid = store_map(SCC, Pid, SCCtoPID)} - }; + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> + true = ets:insert(?MAP, {SCC, Pid}); + _ -> true + end, + {noreply, State#state{spawn_count = SpawnCount + 1}}; handle_cast({compiler_spawn, Filename}, #state{mode = Mode, init_job_data = Servers, @@ -294,7 +289,11 @@ handle_info(_Info, State) -> -spec terminate(term(), #state{}) -> ok. -terminate(_Reason, _State) -> +terminate(_Reason, #state{mode = Mode}) -> + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> ets:delete(?MAP); + _ -> true + end, ok. -spec code_change(term(), #state{}, term()) -> {ok, #state{}}. @@ -309,19 +308,3 @@ cast(Message, Coordinator) -> call(Message, Coordinator) -> gen_server:call(Coordinator, Message). - -new_map() -> - dict:new(). - -store_map(Key, Value, Map) -> - dict:store(Key, Value, Map). - -map_lookup(SCCs, Map) -> - Fold = - fun(Key, {Mapped, Unknown}) -> - case dict:find(Key, Map) of - {ok, Value} -> {[Value|Mapped], Unknown}; - error -> {Mapped, [Key|Unknown]} - end - end, - lists:foldl(Fold, {[], []}, SCCs). -- cgit v1.2.3 From 8f56878ce7b2d9a4f2118663838287c6d2fa9502 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Tue, 28 Feb 2012 11:04:46 +0100 Subject: Infinity timeout for coordinator calls --- lib/dialyzer/src/dialyzer_coordinator.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index af85dfe82d..fa0b5dcd9d 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -307,4 +307,4 @@ cast(Message, Coordinator) -> gen_server:cast(Coordinator, Message). call(Message, Coordinator) -> - gen_server:call(Coordinator, Message). + gen_server:call(Coordinator, Message, infinity). -- cgit v1.2.3 From 0ecc1f181062da8b019f226ae2c567078ee2e860 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Fri, 24 Feb 2012 18:31:31 +0100 Subject: Ticket-based regulation of memory consumption --- lib/dialyzer/src/dialyzer_coordinator.erl | 113 +++++++++++++++++++++++------- 1 file changed, 89 insertions(+), 24 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index fa0b5dcd9d..91a0dd662d 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -47,11 +47,10 @@ -module(dialyzer_coordinator). %%% Exports for all possible uses of coordinator from main process --export([start/2, - all_spawned/1]). +-export([start/2, all_spawned/1]). %%% Exports for all possible workers --export([job_done/3]). +-export([wait_activation/0, job_done/3]). %%% Export for the typesig, dataflow and warnings main process -export([scc_spawn/2]). @@ -63,15 +62,13 @@ -export([receive_warnings/0]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/1]). +-export([request_activation/1, sccs_to_pids/1]). %%% Exports for the compilation main process --export([compiler_spawn/2, - receive_compilation_data/0]). +-export([compiler_spawn/2, receive_compilation_data/0]). %%% Exports for the compilation workers --export([compilation_done/3, - get_next_label/2]). +-export([get_next_label/2, compilation_done/3]). -export_type([coordinator/0, mode/0]). @@ -100,7 +97,9 @@ result :: [mfa_or_funlbl()] | dialyzer_analysis_callgraph:result() | [dial_warning()], - init_job_data :: servers() + init_job_data :: servers(), + tickets :: integer(), + queue :: queue() }). -include("dialyzer.hrl"). @@ -188,12 +187,35 @@ compiler_spawn(Filename, Coordinator) -> get_next_label(EstimatedSize, Coordinator) -> call({get_next_label, EstimatedSize}, Coordinator). +-spec request_activation(coordinator()) -> ok. + +request_activation(Coordinator) -> + cast({request_activation, self()}, Coordinator). + +-spec wait_activation() -> ok. + +wait_activation() -> + receive activate -> ok end. + +activate_pid(Pid) -> + Pid ! activate. + %%-------------------------------------------------------------------- -spec init({pid(), mode(), servers()}) -> {ok, #state{}}. init({Parent, Mode, InitJobData}) -> - InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, + BaseTickets = erlang:system_info(logical_processors_available), + Tickets = + case Mode of + 'compile' -> 4*BaseTickets; + 'typesig' -> BaseTickets*BaseTickets; + 'dataflow' -> 4*BaseTickets; + 'warnings' -> 4*BaseTickets + end, + InitState = + #state{parent = Parent, mode = Mode, init_job_data = InitJobData, + tickets = Tickets, queue = queue:new()}, State = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> @@ -222,8 +244,18 @@ handle_cast({done, Job, NewData}, spawn_count = SpawnCount, all_spawned = AllSpawned, result = OldResult, - init_job_data = Servers + init_job_data = Servers, + tickets = Tickets, + queue = Queue } = State) -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets+1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, NewResult = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> @@ -234,7 +266,8 @@ handle_cast({done, Job, NewData}, 'warnings' -> NewData ++ OldResult end, - UpdatedState = State#state{result = NewResult}, + UpdatedState = + State#state{result = NewResult, tickets = NewTickets, queue = NewQueue}, Action = case AllSpawned of false -> reduce; @@ -261,26 +294,58 @@ handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> NewState = State#state{all_spawned = true}, {noreply, NewState} end; +handle_cast({request_activation, Pid}, + #state{tickets = Tickets, queue = Queue} = State) -> + {NewTickets, NewQueue} = + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} + end, + {noreply, State#state{tickets = NewTickets, queue = NewQueue}}; handle_cast({scc_spawn, SCC}, #state{mode = Mode, init_job_data = Servers, - spawn_count = SpawnCount} = State) -> + spawn_count = SpawnCount, + tickets = Tickets, + queue = Queue} = State) -> Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - true = ets:insert(?MAP, {SCC, Pid}); - _ -> true - end, - {noreply, State#state{spawn_count = SpawnCount + 1}}; + {NewTickets, NewQueue} = + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> + true = ets:insert(?MAP, {SCC, Pid}), + {Tickets, Queue}; + warnings -> + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} + end + end, + {noreply, State#state{spawn_count = SpawnCount + 1, + tickets = NewTickets, + queue = NewQueue}}; handle_cast({compiler_spawn, Filename}, #state{mode = Mode, init_job_data = Servers, - spawn_count = SpawnCount + spawn_count = SpawnCount, + tickets = Tickets, + queue = Queue } = State) -> - dialyzer_worker:launch(Mode, Filename, Servers, self()), - {noreply, - State#state{spawn_count = SpawnCount + 1} - }. + Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()), + NewSpawnCount = SpawnCount + 1, + {NewTickets, NewQueue} = + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} + end, + {noreply, State#state{spawn_count = NewSpawnCount, + tickets = NewTickets, + queue = NewQueue}}. -spec handle_info(term(), #state{}) -> {noreply, #state{}}. -- cgit v1.2.3 From e21f6ea9d28b0e8ed9609338499daaab306fa439 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 27 Feb 2012 21:41:01 +0100 Subject: Plain concatenation for typesig not-fixpoint list --- lib/dialyzer/src/dialyzer_coordinator.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 91a0dd662d..24bc38616c 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -260,7 +260,7 @@ handle_cast({done, Job, NewData}, case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - ordsets:union(OldResult, FinalData); + FinalData ++ OldResult; 'compile' -> dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); 'warnings' -> -- cgit v1.2.3 From 6d0da21ad9f0bba67d5bd389ca76a67613884d5f Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 27 Feb 2012 21:41:34 +0100 Subject: Typesig and dataflow analyses no longer use ticket regulation --- lib/dialyzer/src/dialyzer_coordinator.erl | 86 +++++++++++++++++-------------- 1 file changed, 46 insertions(+), 40 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 24bc38616c..3663b3c737 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -209,9 +209,8 @@ init({Parent, Mode, InitJobData}) -> Tickets = case Mode of 'compile' -> 4*BaseTickets; - 'typesig' -> BaseTickets*BaseTickets; - 'dataflow' -> 4*BaseTickets; - 'warnings' -> 4*BaseTickets + 'warnings' -> 4*BaseTickets; + _ -> non_regulated end, InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData, @@ -239,15 +238,19 @@ handle_call({get_next_label, EstimatedSize}, _From, -spec handle_cast(Msg::term(), #state{}) -> {noreply, #state{}} | {stop, normal, #state{}}. +handle_cast({done, _Job, NewData}, + #state{mode = Mode, result = OldResult, + init_job_data = Servers} = State) when + Mode =:= 'typesig'; Mode =:= 'dataflow' -> + FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), + UpdatedState = State#state{result = FinalData ++ OldResult}, + reduce_or_stop(UpdatedState); handle_cast({done, Job, NewData}, #state{mode = Mode, - spawn_count = SpawnCount, - all_spawned = AllSpawned, result = OldResult, - init_job_data = Servers, tickets = Tickets, - queue = Queue - } = State) -> + queue = Queue} = State) when + Mode =:= 'compile'; Mode =:= 'warnings' -> {Waiting, NewQueue} = queue:out(Queue), NewTickets = case Waiting of @@ -258,9 +261,6 @@ handle_cast({done, Job, NewData}, end, NewResult = case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - FinalData ++ OldResult; 'compile' -> dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); 'warnings' -> @@ -268,23 +268,7 @@ handle_cast({done, Job, NewData}, end, UpdatedState = State#state{result = NewResult, tickets = NewTickets, queue = NewQueue}, - Action = - case AllSpawned of - false -> reduce; - true -> - case SpawnCount of - 1 -> finish; - _ -> reduce - end - end, - case Action of - reduce -> - NewState = UpdatedState#state{spawn_count = SpawnCount - 1}, - {noreply, NewState}; - finish -> - send_done_to_parent(UpdatedState), - {stop, normal, State} - end; + reduce_or_stop(UpdatedState); handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> case SpawnCount of 0 -> @@ -306,23 +290,25 @@ handle_cast({request_activation, Pid}, {noreply, State#state{tickets = NewTickets, queue = NewQueue}}; handle_cast({scc_spawn, SCC}, #state{mode = Mode, + init_job_data = Servers, + spawn_count = SpawnCount} = State) when + Mode =:= 'typesig'; Mode =:= 'dataflow' -> + Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), + true = ets:insert(?MAP, {SCC, Pid}), + {noreply, State#state{spawn_count = SpawnCount + 1}}; +handle_cast({scc_spawn, SCC}, + #state{mode = 'warnings', init_job_data = Servers, spawn_count = SpawnCount, tickets = Tickets, queue = Queue} = State) -> - Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), + Pid = dialyzer_worker:launch('warnings', SCC, Servers, self()), {NewTickets, NewQueue} = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - true = ets:insert(?MAP, {SCC, Pid}), - {Tickets, Queue}; - warnings -> - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} end, {noreply, State#state{spawn_count = SpawnCount + 1, tickets = NewTickets, @@ -373,3 +359,23 @@ cast(Message, Coordinator) -> call(Message, Coordinator) -> gen_server:call(Coordinator, Message, infinity). + +reduce_or_stop(#state{all_spawned = AllSpawned, + spawn_count = SpawnCount} = State) -> + Action = + case AllSpawned of + false -> reduce; + true -> + case SpawnCount of + 1 -> finish; + _ -> reduce + end + end, + case Action of + reduce -> + NewState = State#state{spawn_count = SpawnCount - 1}, + {noreply, NewState}; + finish -> + send_done_to_parent(State), + {stop, normal, State} + end. -- cgit v1.2.3 From d0419ddb2e43b1ed957108712e9bf9505d7a0c01 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Tue, 28 Feb 2012 15:29:04 +0100 Subject: Fix race in coordinator --- lib/dialyzer/src/dialyzer_coordinator.erl | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 3663b3c737..f4f1fda8fd 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -152,7 +152,9 @@ send_done_to_parent(#state{mode = Mode, next_label = NextLabel}) -> Msg = case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> {not_fixpoint, Result}; + X when X =:= 'typesig'; X =:= 'dataflow' -> + ets:delete(?MAP), + {not_fixpoint, Result}; 'compile' -> {compilation_data, Result, NextLabel}; 'warnings' -> {warnings, Result} end, @@ -340,11 +342,7 @@ handle_info(_Info, State) -> -spec terminate(term(), #state{}) -> ok. -terminate(_Reason, #state{mode = Mode}) -> - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> ets:delete(?MAP); - _ -> true - end, +terminate(_Reason, _State) -> ok. -spec code_change(term(), #state{}, term()) -> {ok, #state{}}. -- cgit v1.2.3 From 913ee73601e3f7d0b27d833bd67cf6ee3868018a Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Wed, 22 Feb 2012 15:31:16 +0100 Subject: All spawns are now spawn_links --- lib/dialyzer/src/dialyzer_coordinator.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index f4f1fda8fd..c417b45717 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -109,7 +109,7 @@ -spec start(mode(), servers()) -> coordinator(). start(Mode, Servers) -> - {ok, Pid} = gen_server:start(?MODULE, {self(), Mode, Servers}, []), + {ok, Pid} = gen_server:start_link(?MODULE, {self(), Mode, Servers}, []), Pid. -spec scc_spawn(scc() | module(), coordinator()) -> ok. -- cgit v1.2.3 From 1630eaa34f2cb8d6fceefbb752cfe94ac8e44b6a Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Tue, 28 Feb 2012 22:49:04 +0100 Subject: Coordinator is no longer a separate process --- lib/dialyzer/src/dialyzer_coordinator.erl | 459 +++++++++++------------------- 1 file changed, 165 insertions(+), 294 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index c417b45717..490cfb8d49 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -21,61 +21,23 @@ %%%------------------------------------------------------------------- %%% File : dialyzer_coordinator.erl %%% Authors : Stavros Aronis -%%% -%%% Description: -%%% -%%% The parallel version of Dialyzer's typesig analysis is spread over 4 modules -%%% with the intention to both minimize the changes on the original code and use -%%% a separate module for every kind of Erlang process that will be running. -%%% -%%% There are therefore 3 kinds of processes: -%%% -%%% - The original Dialyzer backend (in succ_typings module) -%%% - The worker process for the typesig analysis (in typesig and -%%% worker) -%%% - A coordinator of the worker processes (in coordinator) -%%% -%%% Operation guidelines: -%%% -%%% - The backend requests from the coordinator to spawn a worker for each SCC -%%% - The backend notifies the coordinator when all SCC have been spawned and -%%% waits for the server to report that the PLT has been updated -%%% - Each worker is responsible to notify all those who wait for it. -%%% %%%------------------------------------------------------------------- -module(dialyzer_coordinator). -%%% Exports for all possible uses of coordinator from main process --export([start/2, all_spawned/1]). +%%% Export for dialyzer main process +-export([parallel_job/3]). %%% Exports for all possible workers -export([wait_activation/0, job_done/3]). -%%% Export for the typesig, dataflow and warnings main process --export([scc_spawn/2]). - -%%% Export for the typesig and dataflow analysis main process --export([receive_not_fixpoint/0]). - -%%% Export for warning main process --export([receive_warnings/0]). - %%% Exports for the typesig and dataflow analysis workers --export([request_activation/1, sccs_to_pids/1]). - -%%% Exports for the compilation main process --export([compiler_spawn/2, receive_compilation_data/0]). +-export([sccs_to_pids/1]). %%% Exports for the compilation workers -export([get_next_label/2, compilation_done/3]). --export_type([coordinator/0, mode/0]). - --behaviour(gen_server). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). +-export_type([coordinator/0, mode/0, init_data/0]). %%-------------------------------------------------------------------- @@ -83,41 +45,169 @@ -type coordinator() :: pid(). %%opaque --type scc() :: [mfa_or_funlbl()] | module(). +-type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. --type servers() :: dialyzer_succ_typings:servers() | - dialyzer_analysis_callgraph:servers() | - dialyzer_succ_typings:warning_servers(). - --record(state, {parent :: pid(), - mode :: mode(), - spawn_count = 0 :: integer(), - all_spawned = false :: boolean(), - next_label :: integer(), - result :: [mfa_or_funlbl()] | - dialyzer_analysis_callgraph:result() | - [dial_warning()], - init_job_data :: servers(), - tickets :: integer(), - queue :: queue() + +-type compile_jobs() :: [file:filename()]. +-type typesig_jobs() :: [scc()]. +-type dataflow_jobs() :: [module()]. +-type warnings_jobs() :: [module()]. + +-type compile_init_data() :: dialyzer_analysis_callgraph:compile_init_data(). +-type typesig_init_data() :: dialyzer_succ_typings:typesig_init_data(). +-type dataflow_init_data() :: dialyzer_succ_typings:dataflow_init_data(). +-type warnings_init_data() :: dialyzer_succ_typings:warnings_init_data(). + +-type compile_result() :: dialyzer_analysis_callgraph:compile_result(). +-type typesig_result() :: [mfa_or_funlbl()]. +-type dataflow_result() :: [mfa_or_funlbl()]. +-type warnings_result() :: [dial_warning()]. + +-type init_data() :: compile_init_data() | typesig_init_data() | + dataflow_init_data() | warnings_init_data(). + +-type result() :: compile_result() | typesig_result() | + dataflow_result() | warnings_result(). + +-record(state, {active = 0 :: integer(), + result :: result(), + next_label = 0 :: integer(), + tickets = 0 :: integer(), + queue = queue:new() :: queue(), + init_data :: init_data() }). -include("dialyzer.hrl"). %%-------------------------------------------------------------------- --spec start(mode(), servers()) -> coordinator(). +-spec parallel_job('compile', compile_jobs(), compile_init_data()) -> + {compile_result(), integer()}; + ('typesig', typesig_jobs(), typesig_init_data()) -> + typesig_result(); + ('dataflow', dataflow_jobs(), dataflow_init_data()) -> + dataflow_result(); + ('warnings', warnings_jobs(), warnings_init_data()) -> + warnings_result(). -start(Mode, Servers) -> - {ok, Pid} = gen_server:start_link(?MODULE, {self(), Mode, Servers}, []), - Pid. +parallel_job(Mode, Jobs, InitData) -> + State = spawn_jobs(Mode, Jobs, InitData), + collect_result(Mode, State). --spec scc_spawn(scc() | module(), coordinator()) -> ok. +spawn_jobs(Mode, Jobs, InitData) when + Mode =:= 'typesig'; Mode =:= 'dataflow' -> + Coordinator = self(), + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + Fold = + fun(Job, Count) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + true = ets:insert(?MAP, {Job, Pid}), + Count + 1 + end, + JobCount = lists:foldl(Fold, 0, Jobs), + #state{active = JobCount, result = [], init_data = InitData}; +spawn_jobs(Mode, Jobs, InitData) when + Mode =:= 'compile'; Mode =:= 'warnings' -> + Coordinator = self(), + Fold = + fun(Job, {InTickets, InQueue, Count}) -> + Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), + NewCount = Count + 1, + case InTickets of + 0 -> {InTickets, queue:in(Pid, InQueue), NewCount}; + N -> activate_pid(Pid), {N-1, InQueue, NewCount} + end + end, + CPUs = erlang:system_info(logical_processors_available), + InitTickets = 4*CPUs, + {Tickets, Queue, JobCount} = + lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), + InitResult = + case Mode of + 'warnings' -> []; + 'compile' -> dialyzer_analysis_callgraph:compile_init_result() + end, + #state{active = JobCount, result = InitResult, next_label = 0, + tickets = Tickets, queue = Queue, init_data = InitData}. + +collect_result(Mode, State) -> + case Mode of + 'compile' -> compile_loop(State); + 'typesig' -> not_fixpoint_loop(State); + 'dataflow' -> not_fixpoint_loop(State); + 'warnings' -> warnings_loop(State) + end. -scc_spawn(SCC, Coordinator) -> - cast({scc_spawn, SCC}, Coordinator). +compile_loop(#state{active = Active, result = Result, + next_label = NextLabel, tickets = Tickets, + queue = Queue, init_data = InitData} = State) -> + receive + {next_label_request, Estimation, Pid} -> + Pid ! {next_label_reply, NextLabel}, + compile_loop(State#state{next_label = NextLabel + Estimation}); + {done, Job, Data} -> + NewResult = + dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData), + case Active of + 1 -> + {NewResult, NextLabel}; + _ -> + NewActive = Active - 1, + {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), + NewState = + State#state{result = NewResult, active = NewActive, + queue = NewQueue, tickets = NewTickets}, + compile_loop(NewState) + end + end. --spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}. +not_fixpoint_loop(#state{active = Active, result = Result, + init_data = InitData} = State) -> + receive + {done, _Job, Data} -> + FinalData = dialyzer_succ_typings:lookup_names(Data, InitData), + NewResult = FinalData ++ Result, + case Active of + 1 -> + ets:delete(?MAP), + NewResult; + _ -> + NewActive = Active - 1, + NewState = State#state{active = NewActive, result = NewResult}, + not_fixpoint_loop(NewState) + end + end. + +warnings_loop(#state{active = Active, result = Result, tickets = Tickets, + queue = Queue} = State) -> + receive + {done, _Job, Data} -> + NewResult = Data ++ Result, + case Active of + 1 -> NewResult; + _ -> + NewActive = Active - 1, + {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), + NewState = + State#state{result = NewResult, active = NewActive, + queue = NewQueue, tickets = NewTickets}, + warnings_loop(NewState) + end + end. + +manage_waiting(Queue, Tickets) -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets + 1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, + {NewQueue, NewTickets}. + +-spec sccs_to_pids([scc() | module()]) -> + {[dialyzer_worker:worker()], [scc() | module()]}. sccs_to_pids(SCCs) -> lists:foldl(fun pid_partition/2, {[], []}, SCCs). @@ -129,70 +219,27 @@ pid_partition(SCC, {Pids, Unknown}) -> _:_ -> {Pids, [SCC|Unknown]} end. --spec job_done(scc() | file:filename(), term(), coordinator()) -> ok. +-spec job_done(scc() | module() | file:filename(), term(), coordinator()) -> ok. job_done(Job, Result, Coordinator) -> - cast({done, Job, Result}, Coordinator). + Coordinator ! {done, Job, Result}, + ok. -spec compilation_done(file:filename(), - dialyzer_analysis_callgraph:compilation_data(), + dialyzer_analysis_callgraph:compile_result(), coordinator()) -> ok. compilation_done(Filename, CompilationData, Coordinator) -> - cast({done, Filename, CompilationData}, Coordinator). - --spec all_spawned(coordinator()) -> ok. - -all_spawned(Coordinator) -> - cast(all_spawned, Coordinator). - -send_done_to_parent(#state{mode = Mode, - parent = Parent, - result = Result, - next_label = NextLabel}) -> - Msg = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - ets:delete(?MAP), - {not_fixpoint, Result}; - 'compile' -> {compilation_data, Result, NextLabel}; - 'warnings' -> {warnings, Result} - end, - Parent ! Msg, + Coordinator ! {done, Filename, CompilationData}, ok. --spec receive_not_fixpoint() -> [mfa_or_funlbl()]. - -receive_not_fixpoint() -> - receive {not_fixpoint, NotFixpoint} -> NotFixpoint end. - --spec receive_compilation_data() -> - {dialyzer_analysis_callgraph:result(), integer()}. - -receive_compilation_data() -> - receive {compilation_data, CompilationData, NextLabel} -> - {CompilationData, NextLabel} - end. - --spec receive_warnings() -> [dial_warning()]. - -receive_warnings() -> - receive {warnings, Warnings} -> Warnings end. - --spec compiler_spawn(file:filename(), coordinator()) -> ok. - -compiler_spawn(Filename, Coordinator) -> - cast({compiler_spawn, Filename}, Coordinator). - -spec get_next_label(integer(), coordinator()) -> integer(). get_next_label(EstimatedSize, Coordinator) -> - call({get_next_label, EstimatedSize}, Coordinator). - --spec request_activation(coordinator()) -> ok. - -request_activation(Coordinator) -> - cast({request_activation, self()}, Coordinator). + Coordinator ! {next_label_request, EstimatedSize, self()}, + receive + {next_label_reply, NextLabel} -> NextLabel + end. -spec wait_activation() -> ok. @@ -201,179 +248,3 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. - -%%-------------------------------------------------------------------- - --spec init({pid(), mode(), servers()}) -> {ok, #state{}}. - -init({Parent, Mode, InitJobData}) -> - BaseTickets = erlang:system_info(logical_processors_available), - Tickets = - case Mode of - 'compile' -> 4*BaseTickets; - 'warnings' -> 4*BaseTickets; - _ -> non_regulated - end, - InitState = - #state{parent = Parent, mode = Mode, init_job_data = InitJobData, - tickets = Tickets, queue = queue:new()}, - State = - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), - InitState#state{result = []}; - 'warnings' -> - InitState#state{result = []}; - 'compile' -> - InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), - InitState#state{result = InitResult, next_label = 0} - end, - {ok, State}. - --spec handle_call(Query::term(), From::term(), #state{}) -> - {reply, Reply::term(), #state{}}. - -handle_call({get_next_label, EstimatedSize}, _From, - #state{next_label = NextLabel} = State) -> - {reply, NextLabel, State#state{next_label = NextLabel + EstimatedSize}}. - --spec handle_cast(Msg::term(), #state{}) -> - {noreply, #state{}} | {stop, normal, #state{}}. - -handle_cast({done, _Job, NewData}, - #state{mode = Mode, result = OldResult, - init_job_data = Servers} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - UpdatedState = State#state{result = FinalData ++ OldResult}, - reduce_or_stop(UpdatedState); -handle_cast({done, Job, NewData}, - #state{mode = Mode, - result = OldResult, - tickets = Tickets, - queue = Queue} = State) when - Mode =:= 'compile'; Mode =:= 'warnings' -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets+1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - NewResult = - case Mode of - 'compile' -> - dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); - 'warnings' -> - NewData ++ OldResult - end, - UpdatedState = - State#state{result = NewResult, tickets = NewTickets, queue = NewQueue}, - reduce_or_stop(UpdatedState); -handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> - case SpawnCount of - 0 -> - send_done_to_parent(State), - {stop, normal, State}; - _ -> - NewState = State#state{all_spawned = true}, - {noreply, NewState} - end; -handle_cast({request_activation, Pid}, - #state{tickets = Tickets, queue = Queue} = State) -> - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{tickets = NewTickets, queue = NewQueue}}; -handle_cast({scc_spawn, SCC}, - #state{mode = Mode, - init_job_data = Servers, - spawn_count = SpawnCount} = State) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - true = ets:insert(?MAP, {SCC, Pid}), - {noreply, State#state{spawn_count = SpawnCount + 1}}; -handle_cast({scc_spawn, SCC}, - #state{mode = 'warnings', - init_job_data = Servers, - spawn_count = SpawnCount, - tickets = Tickets, - queue = Queue} = State) -> - Pid = dialyzer_worker:launch('warnings', SCC, Servers, self()), - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{spawn_count = SpawnCount + 1, - tickets = NewTickets, - queue = NewQueue}}; -handle_cast({compiler_spawn, Filename}, - #state{mode = Mode, - init_job_data = Servers, - spawn_count = SpawnCount, - tickets = Tickets, - queue = Queue - } = State) -> - Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()), - NewSpawnCount = SpawnCount + 1, - {NewTickets, NewQueue} = - case Tickets of - 0 -> {Tickets, queue:in(Pid, Queue)}; - N -> - activate_pid(Pid), - {N-1, Queue} - end, - {noreply, State#state{spawn_count = NewSpawnCount, - tickets = NewTickets, - queue = NewQueue}}. - --spec handle_info(term(), #state{}) -> {noreply, #state{}}. - -handle_info(_Info, State) -> - {noreply, State}. - --spec terminate(term(), #state{}) -> ok. - -terminate(_Reason, _State) -> - ok. - --spec code_change(term(), #state{}, term()) -> {ok, #state{}}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- - -cast(Message, Coordinator) -> - gen_server:cast(Coordinator, Message). - -call(Message, Coordinator) -> - gen_server:call(Coordinator, Message, infinity). - -reduce_or_stop(#state{all_spawned = AllSpawned, - spawn_count = SpawnCount} = State) -> - Action = - case AllSpawned of - false -> reduce; - true -> - case SpawnCount of - 1 -> finish; - _ -> reduce - end - end, - case Action of - reduce -> - NewState = State#state{spawn_count = SpawnCount - 1}, - {noreply, NewState}; - finish -> - send_done_to_parent(State), - {stop, normal, State} - end. -- cgit v1.2.3 From 4a1fa04e5f57ad56e35aae8e9ff278bf1133889a Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 5 Mar 2012 11:39:28 +0100 Subject: Change --time to --statistics and include more info --- lib/dialyzer/src/dialyzer_coordinator.erl | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 490cfb8d49..63b2d8c3f2 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -105,6 +105,12 @@ spawn_jobs(Mode, Jobs, InitData) when Count + 1 end, JobCount = lists:foldl(Fold, 0, Jobs), + Unit = + case Mode of + 'typesig' -> "SCCs"; + 'dataflow' -> "modules" + end, + dialyzer_timing:send_size_info(JobCount, Unit), #state{active = JobCount, result = [], init_data = InitData}; spawn_jobs(Mode, Jobs, InitData) when Mode =:= 'compile'; Mode =:= 'warnings' -> @@ -122,6 +128,7 @@ spawn_jobs(Mode, Jobs, InitData) when InitTickets = 4*CPUs, {Tickets, Queue, JobCount} = lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), + dialyzer_timing:send_size_info(JobCount, "modules"), InitResult = case Mode of 'warnings' -> []; -- cgit v1.2.3 From 4920bb64ad39fde53b02e73c22e7e771b016283f Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 5 Mar 2012 12:56:37 +0100 Subject: Remove unused function --- lib/dialyzer/src/dialyzer_coordinator.erl | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 63b2d8c3f2..1ae30d9862 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -35,7 +35,7 @@ -export([sccs_to_pids/1]). %%% Exports for the compilation workers --export([get_next_label/2, compilation_done/3]). +-export([get_next_label/2]). -export_type([coordinator/0, mode/0, init_data/0]). @@ -232,14 +232,6 @@ job_done(Job, Result, Coordinator) -> Coordinator ! {done, Job, Result}, ok. --spec compilation_done(file:filename(), - dialyzer_analysis_callgraph:compile_result(), - coordinator()) -> ok. - -compilation_done(Filename, CompilationData, Coordinator) -> - Coordinator ! {done, Filename, CompilationData}, - ok. - -spec get_next_label(integer(), coordinator()) -> integer(). get_next_label(EstimatedSize, Coordinator) -> -- cgit v1.2.3 From 76b7c72882ee521a7c5a39d33ecf1009a72ee4e3 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Mon, 5 Mar 2012 13:01:11 +0100 Subject: Fix types and specs --- lib/dialyzer/src/dialyzer_coordinator.erl | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 1ae30d9862..8ebbb11137 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -69,6 +69,10 @@ -type result() :: compile_result() | typesig_result() | dataflow_result() | warnings_result(). +-type job() :: scc() | module() | file:filename(). +-type job_result() :: dialyzer_analysis_callgraph:one_file_result() | + typesig_result() | dataflow_result() | warnings_result(). + -record(state, {active = 0 :: integer(), result :: result(), next_label = 0 :: integer(), @@ -226,7 +230,7 @@ pid_partition(SCC, {Pids, Unknown}) -> _:_ -> {Pids, [SCC|Unknown]} end. --spec job_done(scc() | module() | file:filename(), term(), coordinator()) -> ok. +-spec job_done(job(), job_result(), coordinator()) -> ok. job_done(Job, Result, Coordinator) -> Coordinator ! {done, Job, Result}, -- cgit v1.2.3 From 720b65deff021ddb17aaa125046f97ff13ade883 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Tue, 20 Mar 2012 12:07:42 +0100 Subject: Regulate all kinds of running workers up to the number of schedulers --- lib/dialyzer/src/dialyzer_coordinator.erl | 208 ++++++++++++++---------------- 1 file changed, 100 insertions(+), 108 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 8ebbb11137..e81da5c456 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -32,18 +32,18 @@ -export([wait_activation/0, job_done/3]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/1]). +-export([sccs_to_pids/1, request_activation/1]). %%% Exports for the compilation workers -export([get_next_label/2]). --export_type([coordinator/0, mode/0, init_data/0]). +-export_type([coordinator/0, mode/0, init_data/0, result/0]). %%-------------------------------------------------------------------- -define(MAP, dialyzer_coordinator_map). --type coordinator() :: pid(). %%opaque +-type coordinator() :: {pid(), pid()}. %%opaque -type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. @@ -73,12 +73,12 @@ -type job_result() :: dialyzer_analysis_callgraph:one_file_result() | typesig_result() | dataflow_result() | warnings_result(). --record(state, {active = 0 :: integer(), - result :: result(), - next_label = 0 :: integer(), - tickets = 0 :: integer(), - queue = queue:new() :: queue(), - init_data :: init_data() +-record(state, {mode :: mode(), + active = 0 :: integer(), + result :: result(), + next_label = 0 :: integer(), + init_data :: init_data(), + regulator :: pid() }). -include("dialyzer.hrl"). @@ -96,127 +96,79 @@ parallel_job(Mode, Jobs, InitData) -> State = spawn_jobs(Mode, Jobs, InitData), - collect_result(Mode, State). - -spawn_jobs(Mode, Jobs, InitData) when - Mode =:= 'typesig'; Mode =:= 'dataflow' -> - Coordinator = self(), - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + collect_result(State). + +spawn_jobs(Mode, Jobs, InitData) -> + Collector = self(), + Regulator = spawn_regulator(), + Coordinator = {Collector, Regulator}, + TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'), + case TypesigOrDataflow of + true -> + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]); + false -> ok + end, Fold = fun(Job, Count) -> Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - true = ets:insert(?MAP, {Job, Pid}), + case TypesigOrDataflow of + true -> true = ets:insert(?MAP, {Job, Pid}); + false -> request_activation(Regulator, Pid) + end, Count + 1 end, JobCount = lists:foldl(Fold, 0, Jobs), Unit = case Mode of 'typesig' -> "SCCs"; - 'dataflow' -> "modules" + _ -> "modules" end, dialyzer_timing:send_size_info(JobCount, Unit), - #state{active = JobCount, result = [], init_data = InitData}; -spawn_jobs(Mode, Jobs, InitData) when - Mode =:= 'compile'; Mode =:= 'warnings' -> - Coordinator = self(), - Fold = - fun(Job, {InTickets, InQueue, Count}) -> - Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), - NewCount = Count + 1, - case InTickets of - 0 -> {InTickets, queue:in(Pid, InQueue), NewCount}; - N -> activate_pid(Pid), {N-1, InQueue, NewCount} - end - end, - CPUs = erlang:system_info(logical_processors_available), - InitTickets = 4*CPUs, - {Tickets, Queue, JobCount} = - lists:foldl(Fold, {InitTickets, queue:new(), 0}, Jobs), - dialyzer_timing:send_size_info(JobCount, "modules"), InitResult = case Mode of - 'warnings' -> []; - 'compile' -> dialyzer_analysis_callgraph:compile_init_result() + 'compile' -> dialyzer_analysis_callgraph:compile_init_result(); + _ -> [] end, - #state{active = JobCount, result = InitResult, next_label = 0, - tickets = Tickets, queue = Queue, init_data = InitData}. - -collect_result(Mode, State) -> - case Mode of - 'compile' -> compile_loop(State); - 'typesig' -> not_fixpoint_loop(State); - 'dataflow' -> not_fixpoint_loop(State); - 'warnings' -> warnings_loop(State) - end. + #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0, + init_data = InitData, regulator = Regulator}. -compile_loop(#state{active = Active, result = Result, - next_label = NextLabel, tickets = Tickets, - queue = Queue, init_data = InitData} = State) -> +collect_result(#state{mode = Mode, active = Active, result = Result, + next_label = NextLabel, init_data = InitData, + regulator = Regulator} = State) -> receive {next_label_request, Estimation, Pid} -> Pid ! {next_label_reply, NextLabel}, - compile_loop(State#state{next_label = NextLabel + Estimation}); + collect_result(State#state{next_label = NextLabel + Estimation}); {done, Job, Data} -> - NewResult = - dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, InitData), + NewResult = update_result(Mode, InitData, Job, Data, Result), case Active of 1 -> - {NewResult, NextLabel}; - _ -> - NewActive = Active - 1, - {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), - NewState = - State#state{result = NewResult, active = NewActive, - queue = NewQueue, tickets = NewTickets}, - compile_loop(NewState) + kill_regulator(Regulator), + case Mode of + 'compile' -> + {NewResult, NextLabel}; + X when X =:= 'typesig'; X =:= 'dataflow' -> + ets:delete(?MAP), + NewResult; + 'warnings' -> + NewResult + end; + N -> + collect_result(State#state{result = NewResult, active = N - 1}) end end. -not_fixpoint_loop(#state{active = Active, result = Result, - init_data = InitData} = State) -> - receive - {done, _Job, Data} -> - FinalData = dialyzer_succ_typings:lookup_names(Data, InitData), - NewResult = FinalData ++ Result, - case Active of - 1 -> - ets:delete(?MAP), - NewResult; - _ -> - NewActive = Active - 1, - NewState = State#state{active = NewActive, result = NewResult}, - not_fixpoint_loop(NewState) - end - end. - -warnings_loop(#state{active = Active, result = Result, tickets = Tickets, - queue = Queue} = State) -> - receive - {done, _Job, Data} -> - NewResult = Data ++ Result, - case Active of - 1 -> NewResult; - _ -> - NewActive = Active - 1, - {NewQueue, NewTickets} = manage_waiting(Queue, Tickets), - NewState = - State#state{result = NewResult, active = NewActive, - queue = NewQueue, tickets = NewTickets}, - warnings_loop(NewState) - end +update_result(Mode, InitData, Job, Data, Result) -> + case Mode of + 'compile' -> + dialyzer_analysis_callgraph:add_to_result(Job, Data, Result, + InitData); + X when X =:= 'typesig'; X =:= 'dataflow' -> + dialyzer_succ_typings:lookup_names(Data, InitData) ++ Result; + 'warnings' -> + Data ++ Result end. -manage_waiting(Queue, Tickets) -> - {Waiting, NewQueue} = queue:out(Queue), - NewTickets = - case Waiting of - empty -> Tickets + 1; - {value, Pid} -> - activate_pid(Pid), - Tickets - end, - {NewQueue, NewTickets}. - -spec sccs_to_pids([scc() | module()]) -> {[dialyzer_worker:worker()], [scc() | module()]}. @@ -232,14 +184,15 @@ pid_partition(SCC, {Pids, Unknown}) -> -spec job_done(job(), job_result(), coordinator()) -> ok. -job_done(Job, Result, Coordinator) -> - Coordinator ! {done, Job, Result}, +job_done(Job, Result, {Collector, Regulator}) -> + Regulator ! done, + Collector ! {done, Job, Result}, ok. -spec get_next_label(integer(), coordinator()) -> integer(). -get_next_label(EstimatedSize, Coordinator) -> - Coordinator ! {next_label_request, EstimatedSize, self()}, +get_next_label(EstimatedSize, {Collector, _Regulator}) -> + Collector ! {next_label_request, EstimatedSize, self()}, receive {next_label_reply, NextLabel} -> NextLabel end. @@ -251,3 +204,42 @@ wait_activation() -> activate_pid(Pid) -> Pid ! activate. + +-spec request_activation(coordinator()) -> ok. + +request_activation({_Collector, Regulator}) -> + Regulator ! {req, self()}, + wait_activation(). + +request_activation(Regulator, Pid) -> + Regulator ! {req, Pid}. + +spawn_regulator() -> + InitTickets = dialyzer_utils:parallelism(), + spawn_link(fun() -> regulator_loop(InitTickets, queue:new()) end). + +regulator_loop(Tickets, Queue) -> + receive + {req, Pid} -> + case Tickets of + 0 -> + regulator_loop(0, queue:in(Pid, Queue)); + N -> + activate_pid(Pid), + regulator_loop(N-1, Queue) + end; + done -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets + 1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, + regulator_loop(NewTickets, NewQueue); + stop -> ok + end. + +kill_regulator(Regulator) -> + Regulator ! stop. -- cgit v1.2.3 From 5c52ff6b5f10c7bd9ce06cdf607e88035c16e079 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Thu, 29 Mar 2012 14:44:08 +0200 Subject: Anonymous time server --- lib/dialyzer/src/dialyzer_coordinator.erl | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index e81da5c456..b91fc95959 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -26,7 +26,7 @@ -module(dialyzer_coordinator). %%% Export for dialyzer main process --export([parallel_job/3]). +-export([parallel_job/4]). %%% Exports for all possible workers -export([wait_activation/0, job_done/3]). @@ -44,6 +44,7 @@ -define(MAP, dialyzer_coordinator_map). -type coordinator() :: {pid(), pid()}. %%opaque +-type timing() :: dialyzer_timing:timing_server(). -type scc() :: [mfa_or_funlbl()]. -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. @@ -85,20 +86,20 @@ %%-------------------------------------------------------------------- --spec parallel_job('compile', compile_jobs(), compile_init_data()) -> +-spec parallel_job('compile', compile_jobs(), compile_init_data(), timing()) -> {compile_result(), integer()}; - ('typesig', typesig_jobs(), typesig_init_data()) -> + ('typesig', typesig_jobs(), typesig_init_data(), timing()) -> typesig_result(); - ('dataflow', dataflow_jobs(), dataflow_init_data()) -> - dataflow_result(); - ('warnings', warnings_jobs(), warnings_init_data()) -> - warnings_result(). + ('dataflow', dataflow_jobs(), dataflow_init_data(), + timing()) -> dataflow_result(); + ('warnings', warnings_jobs(), warnings_init_data(), + timing()) -> warnings_result(). -parallel_job(Mode, Jobs, InitData) -> - State = spawn_jobs(Mode, Jobs, InitData), +parallel_job(Mode, Jobs, InitData, Timing) -> + State = spawn_jobs(Mode, Jobs, InitData, Timing), collect_result(State). -spawn_jobs(Mode, Jobs, InitData) -> +spawn_jobs(Mode, Jobs, InitData, Timing) -> Collector = self(), Regulator = spawn_regulator(), Coordinator = {Collector, Regulator}, @@ -123,7 +124,7 @@ spawn_jobs(Mode, Jobs, InitData) -> 'typesig' -> "SCCs"; _ -> "modules" end, - dialyzer_timing:send_size_info(JobCount, Unit), + dialyzer_timing:send_size_info(Timing, JobCount, Unit), InitResult = case Mode of 'compile' -> dialyzer_analysis_callgraph:compile_init_result(); -- cgit v1.2.3 From 12b8ce08ece794e677fdd148723fbe0a707bef6f Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Thu, 29 Mar 2012 15:06:47 +0200 Subject: Anonymous SCCtoPID ETS table --- lib/dialyzer/src/dialyzer_coordinator.erl | 58 +++++++++++++++++-------------- 1 file changed, 31 insertions(+), 27 deletions(-) (limited to 'lib/dialyzer/src/dialyzer_coordinator.erl') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index b91fc95959..5719132215 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -32,7 +32,7 @@ -export([wait_activation/0, job_done/3]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/1, request_activation/1]). +-export([sccs_to_pids/2, request_activation/1]). %%% Exports for the compilation workers -export([get_next_label/2]). @@ -41,9 +41,11 @@ %%-------------------------------------------------------------------- --define(MAP, dialyzer_coordinator_map). +-type collector() :: pid(). +-type regulator() :: pid(). +-type scc_to_pid() :: ets:tid() | 'unused'. --type coordinator() :: {pid(), pid()}. %%opaque +-type coordinator() :: {collector(), regulator(), scc_to_pid()}. %%opaque -type timing() :: dialyzer_timing:timing_server(). -type scc() :: [mfa_or_funlbl()]. @@ -79,7 +81,8 @@ result :: result(), next_label = 0 :: integer(), init_data :: init_data(), - regulator :: pid() + regulator :: regulator(), + scc_to_pid :: scc_to_pid() }). -include("dialyzer.hrl"). @@ -102,18 +105,18 @@ parallel_job(Mode, Jobs, InitData, Timing) -> spawn_jobs(Mode, Jobs, InitData, Timing) -> Collector = self(), Regulator = spawn_regulator(), - Coordinator = {Collector, Regulator}, TypesigOrDataflow = (Mode =:= 'typesig') orelse (Mode =:= 'dataflow'), - case TypesigOrDataflow of - true -> - ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]); - false -> ok - end, + SCCtoPID = + case TypesigOrDataflow of + true -> ets:new(scc_to_pid, [{read_concurrency, true}]); + false -> unused + end, + Coordinator = {Collector, Regulator, SCCtoPID}, Fold = fun(Job, Count) -> Pid = dialyzer_worker:launch(Mode, Job, InitData, Coordinator), case TypesigOrDataflow of - true -> true = ets:insert(?MAP, {Job, Pid}); + true -> true = ets:insert(SCCtoPID, {Job, Pid}); false -> request_activation(Regulator, Pid) end, Count + 1 @@ -131,11 +134,11 @@ spawn_jobs(Mode, Jobs, InitData, Timing) -> _ -> [] end, #state{mode = Mode, active = JobCount, result = InitResult, next_label = 0, - init_data = InitData, regulator = Regulator}. + init_data = InitData, regulator = Regulator, scc_to_pid = SCCtoPID}. collect_result(#state{mode = Mode, active = Active, result = Result, next_label = NextLabel, init_data = InitData, - regulator = Regulator} = State) -> + regulator = Regulator, scc_to_pid = SCCtoPID} = State) -> receive {next_label_request, Estimation, Pid} -> Pid ! {next_label_reply, NextLabel}, @@ -149,7 +152,7 @@ collect_result(#state{mode = Mode, active = Active, result = Result, 'compile' -> {NewResult, NextLabel}; X when X =:= 'typesig'; X =:= 'dataflow' -> - ets:delete(?MAP), + ets:delete(SCCtoPID), NewResult; 'warnings' -> NewResult @@ -170,29 +173,30 @@ update_result(Mode, InitData, Job, Data, Result) -> Data ++ Result end. --spec sccs_to_pids([scc() | module()]) -> +-spec sccs_to_pids([scc() | module()], coordinator()) -> {[dialyzer_worker:worker()], [scc() | module()]}. -sccs_to_pids(SCCs) -> - lists:foldl(fun pid_partition/2, {[], []}, SCCs). - -pid_partition(SCC, {Pids, Unknown}) -> - try ets:lookup_element(?MAP, SCC, 2) of - Result -> {[Result|Pids], Unknown} - catch - _:_ -> {Pids, [SCC|Unknown]} - end. +sccs_to_pids(SCCs, {_Collector, _Regulator, SCCtoPID}) -> + Fold = + fun(SCC, {Pids, Unknown}) -> + try ets:lookup_element(SCCtoPID, SCC, 2) of + Result -> {[Result|Pids], Unknown} + catch + _:_ -> {Pids, [SCC|Unknown]} + end + end, + lists:foldl(Fold, {[], []}, SCCs). -spec job_done(job(), job_result(), coordinator()) -> ok. -job_done(Job, Result, {Collector, Regulator}) -> +job_done(Job, Result, {Collector, Regulator, _SCCtoPID}) -> Regulator ! done, Collector ! {done, Job, Result}, ok. -spec get_next_label(integer(), coordinator()) -> integer(). -get_next_label(EstimatedSize, {Collector, _Regulator}) -> +get_next_label(EstimatedSize, {Collector, _Regulator, _SCCtoPID}) -> Collector ! {next_label_request, EstimatedSize, self()}, receive {next_label_reply, NextLabel} -> NextLabel @@ -208,7 +212,7 @@ activate_pid(Pid) -> -spec request_activation(coordinator()) -> ok. -request_activation({_Collector, Regulator}) -> +request_activation({_Collector, Regulator, _SCCtoPID}) -> Regulator ! {req, self()}, wait_activation(). -- cgit v1.2.3