aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorStavros Aronis <[email protected]>2012-02-24 18:31:31 +0100
committerHenrik Nord <[email protected]>2012-05-21 15:31:20 +0200
commit0ecc1f181062da8b019f226ae2c567078ee2e860 (patch)
tree00c1eba159f7471375c362d93c9906826468ad2b
parent8f56878ce7b2d9a4f2118663838287c6d2fa9502 (diff)
downloadotp-0ecc1f181062da8b019f226ae2c567078ee2e860.tar.gz
otp-0ecc1f181062da8b019f226ae2c567078ee2e860.tar.bz2
otp-0ecc1f181062da8b019f226ae2c567078ee2e860.zip
Ticket-based regulation of memory consumption
-rw-r--r--lib/dialyzer/src/dialyzer_coordinator.erl113
-rw-r--r--lib/dialyzer/src/dialyzer_worker.erl7
2 files changed, 96 insertions, 24 deletions
diff --git a/lib/dialyzer/src/dialyzer_coordinator.erl b/lib/dialyzer/src/dialyzer_coordinator.erl
index fa0b5dcd9d..91a0dd662d 100644
--- a/lib/dialyzer/src/dialyzer_coordinator.erl
+++ b/lib/dialyzer/src/dialyzer_coordinator.erl
@@ -47,11 +47,10 @@
-module(dialyzer_coordinator).
%%% Exports for all possible uses of coordinator from main process
--export([start/2,
- all_spawned/1]).
+-export([start/2, all_spawned/1]).
%%% Exports for all possible workers
--export([job_done/3]).
+-export([wait_activation/0, job_done/3]).
%%% Export for the typesig, dataflow and warnings main process
-export([scc_spawn/2]).
@@ -63,15 +62,13 @@
-export([receive_warnings/0]).
%%% Exports for the typesig and dataflow analysis workers
--export([sccs_to_pids/1]).
+-export([request_activation/1, sccs_to_pids/1]).
%%% Exports for the compilation main process
--export([compiler_spawn/2,
- receive_compilation_data/0]).
+-export([compiler_spawn/2, receive_compilation_data/0]).
%%% Exports for the compilation workers
--export([compilation_done/3,
- get_next_label/2]).
+-export([get_next_label/2, compilation_done/3]).
-export_type([coordinator/0, mode/0]).
@@ -100,7 +97,9 @@
result :: [mfa_or_funlbl()] |
dialyzer_analysis_callgraph:result() |
[dial_warning()],
- init_job_data :: servers()
+ init_job_data :: servers(),
+ tickets :: integer(),
+ queue :: queue()
}).
-include("dialyzer.hrl").
@@ -188,12 +187,35 @@ compiler_spawn(Filename, Coordinator) ->
get_next_label(EstimatedSize, Coordinator) ->
call({get_next_label, EstimatedSize}, Coordinator).
+-spec request_activation(coordinator()) -> ok.
+
+request_activation(Coordinator) ->
+ cast({request_activation, self()}, Coordinator).
+
+-spec wait_activation() -> ok.
+
+wait_activation() ->
+ receive activate -> ok end.
+
+activate_pid(Pid) ->
+ Pid ! activate.
+
%%--------------------------------------------------------------------
-spec init({pid(), mode(), servers()}) -> {ok, #state{}}.
init({Parent, Mode, InitJobData}) ->
- InitState = #state{parent = Parent, mode = Mode, init_job_data = InitJobData},
+ BaseTickets = erlang:system_info(logical_processors_available),
+ Tickets =
+ case Mode of
+ 'compile' -> 4*BaseTickets;
+ 'typesig' -> BaseTickets*BaseTickets;
+ 'dataflow' -> 4*BaseTickets;
+ 'warnings' -> 4*BaseTickets
+ end,
+ InitState =
+ #state{parent = Parent, mode = Mode, init_job_data = InitJobData,
+ tickets = Tickets, queue = queue:new()},
State =
case Mode of
X when X =:= 'typesig'; X =:= 'dataflow' ->
@@ -222,8 +244,18 @@ handle_cast({done, Job, NewData},
spawn_count = SpawnCount,
all_spawned = AllSpawned,
result = OldResult,
- init_job_data = Servers
+ init_job_data = Servers,
+ tickets = Tickets,
+ queue = Queue
} = State) ->
+ {Waiting, NewQueue} = queue:out(Queue),
+ NewTickets =
+ case Waiting of
+ empty -> Tickets+1;
+ {value, Pid} ->
+ activate_pid(Pid),
+ Tickets
+ end,
NewResult =
case Mode of
X when X =:= 'typesig'; X =:= 'dataflow' ->
@@ -234,7 +266,8 @@ handle_cast({done, Job, NewData},
'warnings' ->
NewData ++ OldResult
end,
- UpdatedState = State#state{result = NewResult},
+ UpdatedState =
+ State#state{result = NewResult, tickets = NewTickets, queue = NewQueue},
Action =
case AllSpawned of
false -> reduce;
@@ -261,26 +294,58 @@ handle_cast(all_spawned, #state{spawn_count = SpawnCount} = State) ->
NewState = State#state{all_spawned = true},
{noreply, NewState}
end;
+handle_cast({request_activation, Pid},
+ #state{tickets = Tickets, queue = Queue} = State) ->
+ {NewTickets, NewQueue} =
+ case Tickets of
+ 0 -> {Tickets, queue:in(Pid, Queue)};
+ N ->
+ activate_pid(Pid),
+ {N-1, Queue}
+ end,
+ {noreply, State#state{tickets = NewTickets, queue = NewQueue}};
handle_cast({scc_spawn, SCC},
#state{mode = Mode,
init_job_data = Servers,
- spawn_count = SpawnCount} = State) ->
+ spawn_count = SpawnCount,
+ tickets = Tickets,
+ queue = Queue} = State) ->
Pid = dialyzer_worker:launch(Mode, SCC, Servers, self()),
- case Mode of
- X when X =:= 'typesig'; X =:= 'dataflow' ->
- true = ets:insert(?MAP, {SCC, Pid});
- _ -> true
- end,
- {noreply, State#state{spawn_count = SpawnCount + 1}};
+ {NewTickets, NewQueue} =
+ case Mode of
+ X when X =:= 'typesig'; X =:= 'dataflow' ->
+ true = ets:insert(?MAP, {SCC, Pid}),
+ {Tickets, Queue};
+ warnings ->
+ case Tickets of
+ 0 -> {Tickets, queue:in(Pid, Queue)};
+ N ->
+ activate_pid(Pid),
+ {N-1, Queue}
+ end
+ end,
+ {noreply, State#state{spawn_count = SpawnCount + 1,
+ tickets = NewTickets,
+ queue = NewQueue}};
handle_cast({compiler_spawn, Filename},
#state{mode = Mode,
init_job_data = Servers,
- spawn_count = SpawnCount
+ spawn_count = SpawnCount,
+ tickets = Tickets,
+ queue = Queue
} = State) ->
- dialyzer_worker:launch(Mode, Filename, Servers, self()),
- {noreply,
- State#state{spawn_count = SpawnCount + 1}
- }.
+ Pid = dialyzer_worker:launch(Mode, Filename, Servers, self()),
+ NewSpawnCount = SpawnCount + 1,
+ {NewTickets, NewQueue} =
+ case Tickets of
+ 0 -> {Tickets, queue:in(Pid, Queue)};
+ N ->
+ activate_pid(Pid),
+ {N-1, Queue}
+ end,
+ {noreply, State#state{spawn_count = NewSpawnCount,
+ tickets = NewTickets,
+ queue = NewQueue}}.
-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
diff --git a/lib/dialyzer/src/dialyzer_worker.erl b/lib/dialyzer/src/dialyzer_worker.erl
index 37ca6854b1..6392854101 100644
--- a/lib/dialyzer/src/dialyzer_worker.erl
+++ b/lib/dialyzer/src/dialyzer_worker.erl
@@ -84,6 +84,7 @@ loop(waiting, State) ->
NewState = wait_for_success_typings(State),
loop(updating, NewState);
loop(running, #state{mode = 'compile'} = State) ->
+ dialyzer_coordinator:wait_activation(),
?debug("Compile: ~s\n",[State#state.job]),
Result =
case start_compilation(State) of
@@ -95,16 +96,22 @@ loop(running, #state{mode = 'compile'} = State) ->
end,
report_to_coordinator(Result, State);
loop(running, #state{mode = 'warnings'} = State) ->
+ dialyzer_coordinator:wait_activation(),
?debug("Warning: ~s\n",[State#state.job]),
Result = collect_warnings(State),
report_to_coordinator(Result, State);
loop(running, #state{mode = Mode} = State) when
Mode =:= 'typesig'; Mode =:= 'dataflow' ->
+ request_activation(State),
+ dialyzer_coordinator:wait_activation(),
?debug("Run: ~p\n",[State#state.job]),
NotFixpoint = do_work(State),
ok = broadcast_done(State),
report_to_coordinator(NotFixpoint, State).
+request_activation(#state{coordinator = Coordinator}) ->
+ dialyzer_coordinator:request_activation(Coordinator).
+
waits_more_success_typings(#state{depends_on = Depends}) ->
case Depends of
[] -> false;