diff options
author | Micael Karlberg <[email protected]> | 2012-01-10 14:52:00 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2012-01-12 12:13:18 +0100 |
commit | 411f6240932aab3721ff842c674d9610216cfd88 (patch) | |
tree | 348e767c70bae544a5b30d96aaf1bd7159f7f578 /lib/snmp/src/agent/snmpa_agent.erl | |
parent | daf5b0eeb6f0d8c805f7a0e2fc117c8c788b855c (diff) | |
download | otp-411f6240932aab3721ff842c674d9610216cfd88.tar.gz otp-411f6240932aab3721ff842c674d9610216cfd88.tar.bz2 otp-411f6240932aab3721ff842c674d9610216cfd88.zip |
[snmp/agent] Synchronization feature added
OTP-9851
Diffstat (limited to 'lib/snmp/src/agent/snmpa_agent.erl')
-rw-r--r-- | lib/snmp/src/agent/snmpa_agent.erl | 110 |
1 files changed, 104 insertions, 6 deletions
diff --git a/lib/snmp/src/agent/snmpa_agent.erl b/lib/snmp/src/agent/snmpa_agent.erl index 46c634969d..b78f520c50 100644 --- a/lib/snmp/src/agent/snmpa_agent.erl +++ b/lib/snmp/src/agent/snmpa_agent.erl @@ -69,7 +69,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, tr_var/2, tr_varbind/1, handle_pdu/7, worker/2, worker_loop/1, - do_send_trap/7, do_send_trap/8]). + do_send_trap/7, do_send_trap/8, serialize/2, serialize/3]). -include("snmpa_internal.hrl"). @@ -271,6 +271,14 @@ update_mibs_cache_age(Agent, Age) -> call(Agent, {mibs_cache_request, {update_age, Age}}). +serialize(Tag, Exec) -> + serialize(snmp_master_agent, Tag, Exec). + +serialize(Agent, Tag, Exec) + when (is_pid(Agent) orelse is_atom(Agent)) andalso is_function(Exec, 0) -> + call(Agent, {serialize, Tag, Exec}). + + increment_counter(Counter, Initial, Max) -> %% This is to make sure no one else increments our counter Key = {Counter, self()}, @@ -934,6 +942,7 @@ handle_info({'EXIT', Pid, Reason}, S) -> end, {noreply, S} end; + handle_info({'DOWN', Ref, process, Pid, {mibs_cache_reply, Reply}}, #state{mibs_cache_request = {Pid, Ref, From}} = S) -> ?vlog("reply from the mibs cache request handler (~p): ~n~p", @@ -941,6 +950,11 @@ handle_info({'DOWN', Ref, process, Pid, {mibs_cache_reply, Reply}}, gen_server:reply(From, Reply), {noreply, S#state{mibs_cache_request = undefined}}; +handle_info({'DOWN', Mon, process, _Pid, {serializer_done, Tag}}, S) -> + ?vlog("handle_info(DOWN) -> serializer done (~w)", [Tag]), + handle_serializer_down(Mon, Tag), + {noreply, S}; + handle_info(Info, S) -> warning_msg("received unexpected info: ~n~p", [Info]), {noreply, S}. @@ -1283,27 +1297,33 @@ handle_call({me_of, Oid}, _From, S) -> {reply, Reply, S}; handle_call(get_log_type, _From, S) -> - ?vlog("get_log_type", []), + ?vlog("handle_call(get_log_type) -> entry with", []), Reply = handle_get_log_type(S), {reply, Reply, S}; handle_call({set_log_type, NewType}, _From, S) -> - ?vlog("set_log_type -> " + ?vlog("handle_call(set_log_type) -> entry with" "~n NewType: ~p", [NewType]), Reply = handle_set_log_type(S, NewType), {reply, Reply, S}; handle_call(get_request_limit, _From, S) -> - ?vlog("get_request_limit", []), + ?vlog("handle_call(get_request_limit) -> entry with", []), Reply = handle_get_request_limit(S), {reply, Reply, S}; handle_call({set_request_limit, NewLimit}, _From, S) -> - ?vlog("set_request_limit -> " + ?vlog("handle_call(set_request_limit) -> entry with" "~n NewLimit: ~p", [NewLimit]), Reply = handle_set_request_limit(S, NewLimit), {reply, Reply, S}; - + +handle_call({serialize, Tag, Exec}, From, S) -> + ?vlog("handle_call(serialize) -> entry with" + "~n Tag: ~p", [Tag]), + handle_serialize(Tag, Exec, From), + {noreply, S}; + handle_call(stop, _From, S) -> {stop, normal, ok, S}; @@ -3859,6 +3879,7 @@ mapfoldl(F, Eas, Accu0, [Hd|Tail]) -> {Accu2,[R|Rs]}; mapfoldl(_F, _Eas, Accu, []) -> {Accu,[]}. + %%----------------------------------------------------------------- %% Runtime debugging of the agent. %%----------------------------------------------------------------- @@ -3983,6 +4004,83 @@ handle_set_request_limit(_, _) -> {error, not_supported}. +%% ------------------------------------------------------------------------ +%% Funtions handling the serialize mechanism +%% ------------------------------------------------------------------------ + +serialize_key(Mon) -> {Mon, serialize}. +storage_key(Tag) -> {Tag, storage}. + +start_serializer(Exec, Tag) -> + Serializer = fun() -> + (catch Exec()), + exit({serializer_done, Tag}) + end, + {_Pid, Mon} = erlang:spawn_opt(Serializer, [monitor]), + Mon. + +handle_serialize(Tag, Exec, From) -> + StorageKey = storage_key(Tag), + case ets:lookup(snmp_agent_table, StorageKey) of + [] -> + %% We are first + ?vtrace("handle_serialize -> nothing found" + "~n (for ~p)", [StorageKey]), + Mon = start_serializer(Exec, Tag), + SerializeKey = serialize_key(Mon), + %% These are waiting for reply + ets:insert(snmp_agent_table, {SerializeKey, [From]}), + %% These are waiting for exec: That is none at the moment + ets:insert(snmp_agent_table, {StorageKey, {Exec, []}}), + ok; + [{_StorageKey, {_Exec, Froms}}] -> + %% There is a process already running, so store the request + ?vtrace("handle_serialize -> found" + "~n (for ~p)" + "~n Forms: ~p", [StorageKey]), + NewStorage = {Exec, [From|Froms]}, + ets:insert(snmp_agent_table, {StorageKey, NewStorage}), + ok + end. + + +deliver_serialize_reply([From]) -> + %% This is the normal case, so we handle it here to optimize + gen_server:reply(From, ok); +deliver_serialize_reply(SerializeReqs) -> + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, + lists:reverse(SerializeReqs)). + +handle_serializer_down(Mon, Tag) -> + SerializeKey = serialize_key(Mon), + [{_, SerializeReqs}] = ets:lookup(snmp_agent_table, SerializeKey), + ?vtrace("handle_serializer_down -> found" + "~n (for ~p)" + "~n ~p", + [SerializeKey, SerializeReqs]), + deliver_serialize_reply(SerializeReqs), + ets:delete(snmp_agent_table, SerializeKey), + StorageKey = storage_key(Tag), + case ets:lookup(snmp_agent_table, StorageKey) of + [{_StorageKey, {_Exec, []}}] -> + %% We are done + ?vtrace("handle_serializer_down -> we are done", []), + ets:delete(snmp_agent_table, StorageKey), + ok; + [{_StorageKey, {Exec, Froms}}] -> + %% We have some waiting, so start a new one + ?vtrace("handle_serializer_down -> waiting requests for ~p: " + "~n ~p", [StorageKey, Froms]), + Mon2 = start_serializer(Exec, Tag), + SerializeKey2 = serialize_key(Mon2), + ets:insert(snmp_agent_table, {SerializeKey2, Froms}), + ets:insert(snmp_agent_table, {StorageKey, {Exec, []}}), + ok + end. + + +%% ------------------------------------------------------------------------ + agent_info(#state{worker = W, set_worker = SW}) -> case (catch get_agent_info(W, SW)) of Info when is_list(Info) -> |