diff options
author | Stavros Aronis <[email protected]> | 2012-02-24 18:31:31 +0100 |
---|---|---|
committer | Henrik Nord <[email protected]> | 2012-05-21 15:31:20 +0200 |
commit | 0ecc1f181062da8b019f226ae2c567078ee2e860 (patch) | |
tree | 00c1eba159f7471375c362d93c9906826468ad2b | |
parent | 8f56878ce7b2d9a4f2118663838287c6d2fa9502 (diff) | |
download | otp-0ecc1f181062da8b019f226ae2c567078ee2e860.tar.gz otp-0ecc1f181062da8b019f226ae2c567078ee2e860.tar.bz2 otp-0ecc1f181062da8b019f226ae2c567078ee2e860.zip |
Ticket-based regulation of memory consumption
-rw-r--r-- | lib/dialyzer/src/dialyzer_coordinator.erl | 113 | ||||
-rw-r--r-- | lib/dialyzer/src/dialyzer_worker.erl | 7 |
2 files changed, 96 insertions, 24 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl index fa0b5dcd9d..91a0dd662d 100644 --- a/lib/dialyzer/src/dialyzer_coordinator.erl +++ b/lib/dialyzer/src/dialyzer_coordinator.erl @@ -47,11 +47,10 @@ -module(dialyzer_coordinator). %%% Exports for all possible uses of coordinator from main process --export([start/2, - all_spawned/1]). +-export([start/2, all_spawned/1]). %%% Exports for all possible workers --export([job_done/3]). +-export([wait_activation/0, job_done/3]). %%% Export for the typesig, dataflow and warnings main process -export([scc_spawn/2]). @@ -63,15 +62,13 @@ -export([receive_warnings/0]). %%% Exports for the typesig and dataflow analysis workers --export([sccs_to_pids/1]). +-export([request_activation/1, sccs_to_pids/1]). %%% Exports for the compilation main process --export([compiler_spawn/2, - receive_compilation_data/0]). +-export([compiler_spawn/2, receive_compilation_data/0]). %%% Exports for the compilation workers --export([compilation_done/3, - get_next_label/2]). +-export([get_next_label/2, compilation_done/3]). -export_type([coordinator/0, mode/0]). @@ -100,7 +97,9 @@ result :: [mfa_or_funlbl()] | dialyzer_analysis_callgraph:result() | [dial_warning()], - init_job_data :: servers() + init_job_data :: servers(), + tickets :: integer(), + queue :: queue() }). -include("dialyzer.hrl"). @@ -188,12 +187,35 @@ compiler_spawn(Filename, Coordinator) -> get_next_label(EstimatedSize, Coordinator) -> call({get_next_label, EstimatedSize}, Coordinator). +-spec request_activation(coordinator()) -> ok. + +request_activation(Coordinator) -> + cast({request_activation, self()}, Coordinator). + +-spec wait_activation() -> ok. + +wait_activation() -> + receive activate -> ok end. + +activate_pid(Pid) -> + Pid ! activate. + %%-------------------------------------------------------------------- -spec init({pid(), mode(), servers()}) -> {ok, #state{}}. init({Parent, Mode, InitJobData}) -> - InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData}, + BaseTickets = erlang:system_info(logical_processors_available), + Tickets = + case Mode of + 'compile' -> 4*BaseTickets; + 'typesig' -> BaseTickets*BaseTickets; + 'dataflow' -> 4*BaseTickets; + 'warnings' -> 4*BaseTickets + end, + InitState = + #state{parent = Parent, mode = Mode, init_job_data = InitJobData, + tickets = Tickets, queue = queue:new()}, State = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> @@ -222,8 +244,18 @@ handle_cast({done, Job, NewData}, spawn_count = SpawnCount, all_spawned = AllSpawned, result = OldResult, - init_job_data = Servers + init_job_data = Servers, + tickets = Tickets, + queue = Queue } = State) -> + {Waiting, NewQueue} = queue:out(Queue), + NewTickets = + case Waiting of + empty -> Tickets+1; + {value, Pid} -> + activate_pid(Pid), + Tickets + end, NewResult = case Mode of X when X =:= 'typesig'; X =:= 'dataflow' -> @@ -234,7 +266,8 @@ handle_cast({done, Job, NewData}, 'warnings' -> NewData ++ OldResult end, - UpdatedState = State#state{result = NewResult}, + UpdatedState = + State#state{result = NewResult, tickets = NewTickets, queue = NewQueue}, Action = case AllSpawned of false -> reduce; @@ -261,26 +294,58 @@ handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) -> NewState = State#state{all_spawned = true}, {noreply, NewState} end; +handle_cast({request_activation, Pid}, + #state{tickets = Tickets, queue = Queue} = State) -> + {NewTickets, NewQueue} = + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} + end, + {noreply, State#state{tickets = NewTickets, queue = NewQueue}}; handle_cast({scc_spawn, SCC}, #state{mode = Mode, init_job_data = Servers, - spawn_count = SpawnCount} = State) -> + spawn_count = SpawnCount, + tickets = Tickets, + queue = Queue} = State) -> Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()), - case Mode of - X when X =:= 'typesig'; X =:= 'dataflow' -> - true = ets:insert(?MAP, {SCC, Pid}); - _ -> true - end, - {noreply, State#state{spawn_count = SpawnCount + 1}}; + {NewTickets, NewQueue} = + case Mode of + X when X =:= 'typesig'; X =:= 'dataflow' -> + true = ets:insert(?MAP, {SCC, Pid}), + {Tickets, Queue}; + warnings -> + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} + end + end, + {noreply, State#state{spawn_count = SpawnCount + 1, + tickets = NewTickets, + queue = NewQueue}}; handle_cast({compiler_spawn, Filename}, #state{mode = Mode, init_job_data = Servers, - spawn_count = SpawnCount + spawn_count = SpawnCount, + tickets = Tickets, + queue = Queue } = State) -> - dialyzer_worker:launch(Mode, Filename, Servers, self()), - {noreply, - State#state{spawn_count = SpawnCount + 1} - }. + Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()), + NewSpawnCount = SpawnCount + 1, + {NewTickets, NewQueue} = + case Tickets of + 0 -> {Tickets, queue:in(Pid, Queue)}; + N -> + activate_pid(Pid), + {N-1, Queue} + end, + {noreply, State#state{spawn_count = NewSpawnCount, + tickets = NewTickets, + queue = NewQueue}}. -spec handle_info(term(), #state{}) -> {noreply, #state{}}. diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl index 37ca6854b1..6392854101 100644 --- a/lib/dialyzer/src/dialyzer_worker.erl +++ b/lib/dialyzer/src/dialyzer_worker.erl @@ -84,6 +84,7 @@ loop(waiting, State) -> NewState = wait_for_success_typings(State), loop(updating, NewState); loop(running, #state{mode = 'compile'} = State) -> + dialyzer_coordinator:wait_activation(), ?debug("Compile: ~s\n",[State#state.job]), Result = case start_compilation(State) of @@ -95,16 +96,22 @@ loop(running, #state{mode = 'compile'} = State) -> end, report_to_coordinator(Result, State); loop(running, #state{mode = 'warnings'} = State) -> + dialyzer_coordinator:wait_activation(), ?debug("Warning: ~s\n",[State#state.job]), Result = collect_warnings(State), report_to_coordinator(Result, State); loop(running, #state{mode = Mode} = State) when Mode =:= 'typesig'; Mode =:= 'dataflow' -> + request_activation(State), + dialyzer_coordinator:wait_activation(), ?debug("Run: ~p\n",[State#state.job]), NotFixpoint = do_work(State), ok = broadcast_done(State), report_to_coordinator(NotFixpoint, State). +request_activation(#state{coordinator = Coordinator}) -> + dialyzer_coordinator:request_activation(Coordinator). + waits_more_success_typings(#state{depends_on = Depends}) -> case Depends of [] -> false; |