From dd1d17c1798a601dbe6378795c9a2fdd0effcc15 Mon Sep 17 00:00:00 2001 From: Stavros Aronis Date: Thu, 23 Feb 2012 13:46:51 +0100 Subject: Worker PIDs are stored in an ETS table --- lib/dialyzer/src/dialyzer_coordinator.erl | 89 +++++++++++++------------------ lib/dialyzer/src/dialyzer_worker.erl | 25 ++++----- 2 files changed, 46 insertions(+), 68 deletions(-) (limited to 'lib/dialyzer/src') diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index 1921f96b78..af85dfe82d 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -63,8 +63,7 @@ -export([receive_warnings/0]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids_reply/0, - sccs_to_pids_request/2]). +-export([sccs_to_pids/1]). %%% Exports for the compilation main process -export([compiler_spawn/2, @@ -81,9 +80,12 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%%-------------------------------------------------------------------- + +-define(MAP, dialyzer_coordinator_map). + -type coordinator() :: pid(). %%opaque --type map() :: dict(). -type scc() :: [mfa_or_funlbl()] | module(). -type mode() :: 'typesig' | 'dataflow' | 'compile' | 'warnings'. -type servers() :: dialyzer_succ_typings:servers() | @@ -94,7 +96,6 @@ mode :: mode(), spawn_count = 0 :: integer(), all_spawned = false :: boolean(), - job_to_pid :: map(), next_label :: integer(), result :: [mfa_or_funlbl()] | dialyzer_analysis_callgraph:result() | @@ -117,20 +118,17 @@ start(Mode, Servers) -> scc_spawn(SCC, Coordinator) -> cast({scc_spawn, SCC}, Coordinator). --spec sccs_to_pids_request([scc()], coordinator()) -> ok. +-spec sccs_to_pids([scc()]) -> {[dialyzer_worker:worker()], [scc()]}. -sccs_to_pids_request(SCCs, Coordinator) -> - cast({sccs_to_pids, SCCs, self()}, Coordinator). - -scc_to_pids_request_handle(Worker, SCCs, SCCtoPID) -> - Result = map_lookup(SCCs, SCCtoPID), - Worker ! {sccs_to_pids, Result}, - ok. +sccs_to_pids(SCCs) -> + lists:foldl(fun pid_partition/2, {[], []}, SCCs). --spec sccs_to_pids_reply() -> {[dialyzer_worker:worker()], [scc()]}. - -sccs_to_pids_reply() -> - receive {sccs_to_pids, Pids} -> Pids end. +pid_partition(SCC, {Pids, Unknown}) -> + try ets:lookup_element(?MAP, SCC, 2) of + Result -> {[Result|Pids], Unknown} + catch + _:_ -> {Pids, [SCC|Unknown]} + end. -spec job_done(scc() | file:filename(), term(), coordinator()) -> ok. @@ -198,8 +196,11 @@ init({Parent, Mode, InitJobData}) -> InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, State = case Mode of - X when X =:= 'typesig'; X =:= 'dataflow'; X =:= 'warnings' -> - InitState#state{result = [], job_to_pid = new_map()}; + X when X =:= 'typesig'; X =:= 'dataflow' -> + ?MAP = ets:new(?MAP, [named_table, {read_concurrency, true}]), + InitState#state{result = []}; + 'warnings' -> + InitState#state{result = []}; 'compile' -> InitResult = dialyzer_analysis_callgraph:compile_coordinator_init(), InitState#state{result = InitResult, next_label = 0} @@ -221,21 +222,19 @@ handle_cast({done, Job, NewData}, spawn_count = SpawnCount, all_spawned = AllSpawned, result = OldResult, - job_to_pid = JobToPID, init_job_data = Servers } = State) -> - {NewResult, NewJobToPID} = + NewResult = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> FinalData = dialyzer_succ_typings:lookup_names(NewData, Servers), - {ordsets:union(OldResult, FinalData), dict:erase(Job, JobToPID)}; + ordsets:union(OldResult, FinalData); 'compile' -> - {dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult), - JobToPID}; + dialyzer_analysis_callgraph:add_to_result(Job, NewData, OldResult); 'warnings' -> - {NewData ++ OldResult, dict:erase(Job, JobToPID)} + NewData ++ OldResult end, - UpdatedState = State#state{result = NewResult, job_to_pid = NewJobToPID}, + UpdatedState = State#state{result = NewResult}, Action = case AllSpawned of false -> reduce; @@ -262,21 +261,17 @@ handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> NewState = State#state{all_spawned = true}, {noreply, NewState} end; -handle_cast({sccs_to_pids, SCCs, Worker}, - #state{job_to_pid = SCCtoPID} = State) -> - scc_to_pids_request_handle(Worker, SCCs, SCCtoPID), - {noreply, State}; handle_cast({scc_spawn, SCC}, #state{mode = Mode, init_job_data = Servers, - spawn_count = SpawnCount, - job_to_pid = SCCtoPID - } = State) -> + spawn_count = SpawnCount} = State) -> Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - {noreply, - State#state{spawn_count = SpawnCount + 1, - job_to_pid = store_map(SCC, Pid, SCCtoPID)} - }; + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> + true = ets:insert(?MAP, {SCC, Pid}); + _ -> true + end, + {noreply, State#state{spawn_count = SpawnCount + 1}}; handle_cast({compiler_spawn, Filename}, #state{mode = Mode, init_job_data = Servers, @@ -294,7 +289,11 @@ handle_info(_Info, State) -> -spec terminate(term(), #state{}) -> ok. -terminate(_Reason, _State) -> +terminate(_Reason, #state{mode = Mode}) -> + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> ets:delete(?MAP); + _ -> true + end, ok. -spec code_change(term(), #state{}, term()) -> {ok, #state{}}. @@ -309,19 +308,3 @@ cast(Message, Coordinator) -> call(Message, Coordinator) -> gen_server:call(Coordinator, Message). - -new_map() -> - dict:new(). - -store_map(Key, Value, Map) -> - dict:store(Key, Value, Map). - -map_lookup(SCCs, Map) -> - Fold = - fun(Key, {Mapped, Unknown}) -> - case dict:find(Key, Map) of - {ok, Value} -> {[Value|Mapped], Unknown}; - error -> {Mapped, [Key|Unknown]} - end - end, - lists:foldl(Fold, {[], []}, SCCs). diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl index 74e227f587..37ca6854b1 100644 --- a/lib/dialyzer/src/dialyzer_worker.erl +++ b/lib/dialyzer/src/dialyzer_worker.erl @@ -101,7 +101,6 @@ loop(running, #state{mode = 'warnings'} = State) -> loop(running, #state{mode = Mode} = State) when Mode =:= 'typesig'; Mode =:= 'dataflow' -> ?debug("Run: ~p\n",[State#state.job]), - ok = ask_coordinator_for_callers(State), NotFixpoint = do_work(State), ok = broadcast_done(State), report_to_coordinator(NotFixpoint, State). @@ -112,29 +111,25 @@ waits_more_success_typings(#state{depends_on = Depends}) -> _ -> true end. -ask_coordinator_for_callers(#state{job = SCC, - servers = Servers, - coordinator = Coordinator}) -> +broadcast_done(#state{job = SCC, servers = Servers}) -> RequiredBy = dialyzer_succ_typings:find_required_by(SCC, Servers), - ?debug("Waiting for me~p: ~p\n",[SCC, RequiredBy]), - dialyzer_coordinator:sccs_to_pids_request(RequiredBy, Coordinator). - -broadcast_done(#state{job = SCC, coordinator = Coordinator}) -> - {Callers, Unknown} = dialyzer_coordinator:sccs_to_pids_reply(), + {Callers, Unknown} = dialyzer_coordinator:sccs_to_pids(RequiredBy), send_done(Callers, SCC), - continue_broadcast_done(Unknown, SCC, Coordinator). + continue_broadcast_done(Unknown, SCC). send_done(Callers, SCC) -> ?debug("Sending ~p: ~p\n",[SCC, Callers]), SendSTFun = fun(PID) -> PID ! {done, SCC} end, lists:foreach(SendSTFun, Callers). -continue_broadcast_done([], _SCC, _Coordinator) -> ok; -continue_broadcast_done(Rest, SCC, Coordinator) -> - dialyzer_coordinator:sccs_to_pids_request(Rest, Coordinator), - {Callers, Unknown} = dialyzer_coordinator:sccs_to_pids_reply(), +continue_broadcast_done([], _SCC) -> ok; +continue_broadcast_done(Rest, SCC) -> + %% This time limit should be greater than the time required + %% by the coordinator to spawn all processes. + timer:sleep(500), + {Callers, Unknown} = dialyzer_coordinator:sccs_to_pids(Rest), send_done(Callers, SCC), - continue_broadcast_done(Unknown, SCC, Coordinator). + continue_broadcast_done(Unknown, SCC). wait_for_success_typings(#state{depends_on = DependsOn} = State) -> receive -- cgit v1.2.3