diff options
author | Micael Karlberg <[email protected]> | 2012-01-10 14:52:00 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2012-01-12 12:13:18 +0100 |
commit | 411f6240932aab3721ff842c674d9610216cfd88 (patch) | |
tree | 348e767c70bae544a5b30d96aaf1bd7159f7f578 | |
parent | daf5b0eeb6f0d8c805f7a0e2fc117c8c788b855c (diff) | |
download | otp-411f6240932aab3721ff842c674d9610216cfd88.tar.gz otp-411f6240932aab3721ff842c674d9610216cfd88.tar.bz2 otp-411f6240932aab3721ff842c674d9610216cfd88.zip |
[snmp/agent] Synchronization feature added
OTP-9851
-rw-r--r-- | lib/snmp/src/agent/snmpa_agent.erl | 110 | ||||
-rw-r--r-- | lib/snmp/src/agent/snmpa_vacm.erl | 77 |
2 files changed, 150 insertions, 37 deletions
diff --git a/lib/snmp/src/agent/snmpa_agent.erl b/lib/snmp/src/agent/snmpa_agent.erl index 46c634969d..b78f520c50 100644 --- a/lib/snmp/src/agent/snmpa_agent.erl +++ b/lib/snmp/src/agent/snmpa_agent.erl @@ -69,7 +69,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, tr_var/2, tr_varbind/1, handle_pdu/7, worker/2, worker_loop/1, - do_send_trap/7, do_send_trap/8]). + do_send_trap/7, do_send_trap/8, serialize/2, serialize/3]). -include("snmpa_internal.hrl"). @@ -271,6 +271,14 @@ update_mibs_cache_age(Agent, Age) -> call(Agent, {mibs_cache_request, {update_age, Age}}). +serialize(Tag, Exec) -> + serialize(snmp_master_agent, Tag, Exec). + +serialize(Agent, Tag, Exec) + when (is_pid(Agent) orelse is_atom(Agent)) andalso is_function(Exec, 0) -> + call(Agent, {serialize, Tag, Exec}). + + increment_counter(Counter, Initial, Max) -> %% This is to make sure no one else increments our counter Key = {Counter, self()}, @@ -934,6 +942,7 @@ handle_info({'EXIT', Pid, Reason}, S) -> end, {noreply, S} end; + handle_info({'DOWN', Ref, process, Pid, {mibs_cache_reply, Reply}}, #state{mibs_cache_request = {Pid, Ref, From}} = S) -> ?vlog("reply from the mibs cache request handler (~p): ~n~p", @@ -941,6 +950,11 @@ handle_info({'DOWN', Ref, process, Pid, {mibs_cache_reply, Reply}}, gen_server:reply(From, Reply), {noreply, S#state{mibs_cache_request = undefined}}; +handle_info({'DOWN', Mon, process, _Pid, {serializer_done, Tag}}, S) -> + ?vlog("handle_info(DOWN) -> serializer done (~w)", [Tag]), + handle_serializer_down(Mon, Tag), + {noreply, S}; + handle_info(Info, S) -> warning_msg("received unexpected info: ~n~p", [Info]), {noreply, S}. @@ -1283,27 +1297,33 @@ handle_call({me_of, Oid}, _From, S) -> {reply, Reply, S}; handle_call(get_log_type, _From, S) -> - ?vlog("get_log_type", []), + ?vlog("handle_call(get_log_type) -> entry with", []), Reply = handle_get_log_type(S), {reply, Reply, S}; handle_call({set_log_type, NewType}, _From, S) -> - ?vlog("set_log_type -> " + ?vlog("handle_call(set_log_type) -> entry with" "~n NewType: ~p", [NewType]), Reply = handle_set_log_type(S, NewType), {reply, Reply, S}; handle_call(get_request_limit, _From, S) -> - ?vlog("get_request_limit", []), + ?vlog("handle_call(get_request_limit) -> entry with", []), Reply = handle_get_request_limit(S), {reply, Reply, S}; handle_call({set_request_limit, NewLimit}, _From, S) -> - ?vlog("set_request_limit -> " + ?vlog("handle_call(set_request_limit) -> entry with" "~n NewLimit: ~p", [NewLimit]), Reply = handle_set_request_limit(S, NewLimit), {reply, Reply, S}; - + +handle_call({serialize, Tag, Exec}, From, S) -> + ?vlog("handle_call(serialize) -> entry with" + "~n Tag: ~p", [Tag]), + handle_serialize(Tag, Exec, From), + {noreply, S}; + handle_call(stop, _From, S) -> {stop, normal, ok, S}; @@ -3859,6 +3879,7 @@ mapfoldl(F, Eas, Accu0, [Hd|Tail]) -> {Accu2,[R|Rs]}; mapfoldl(_F, _Eas, Accu, []) -> {Accu,[]}. + %%----------------------------------------------------------------- %% Runtime debugging of the agent. %%----------------------------------------------------------------- @@ -3983,6 +4004,83 @@ handle_set_request_limit(_, _) -> {error, not_supported}. +%% ------------------------------------------------------------------------ +%% Funtions handling the serialize mechanism +%% ------------------------------------------------------------------------ + +serialize_key(Mon) -> {Mon, serialize}. +storage_key(Tag) -> {Tag, storage}. + +start_serializer(Exec, Tag) -> + Serializer = fun() -> + (catch Exec()), + exit({serializer_done, Tag}) + end, + {_Pid, Mon} = erlang:spawn_opt(Serializer, [monitor]), + Mon. + +handle_serialize(Tag, Exec, From) -> + StorageKey = storage_key(Tag), + case ets:lookup(snmp_agent_table, StorageKey) of + [] -> + %% We are first + ?vtrace("handle_serialize -> nothing found" + "~n (for ~p)", [StorageKey]), + Mon = start_serializer(Exec, Tag), + SerializeKey = serialize_key(Mon), + %% These are waiting for reply + ets:insert(snmp_agent_table, {SerializeKey, [From]}), + %% These are waiting for exec: That is none at the moment + ets:insert(snmp_agent_table, {StorageKey, {Exec, []}}), + ok; + [{_StorageKey, {_Exec, Froms}}] -> + %% There is a process already running, so store the request + ?vtrace("handle_serialize -> found" + "~n (for ~p)" + "~n Forms: ~p", [StorageKey]), + NewStorage = {Exec, [From|Froms]}, + ets:insert(snmp_agent_table, {StorageKey, NewStorage}), + ok + end. + + +deliver_serialize_reply([From]) -> + %% This is the normal case, so we handle it here to optimize + gen_server:reply(From, ok); +deliver_serialize_reply(SerializeReqs) -> + lists:foreach(fun(From) -> gen_server:reply(From, ok) end, + lists:reverse(SerializeReqs)). + +handle_serializer_down(Mon, Tag) -> + SerializeKey = serialize_key(Mon), + [{_, SerializeReqs}] = ets:lookup(snmp_agent_table, SerializeKey), + ?vtrace("handle_serializer_down -> found" + "~n (for ~p)" + "~n ~p", + [SerializeKey, SerializeReqs]), + deliver_serialize_reply(SerializeReqs), + ets:delete(snmp_agent_table, SerializeKey), + StorageKey = storage_key(Tag), + case ets:lookup(snmp_agent_table, StorageKey) of + [{_StorageKey, {_Exec, []}}] -> + %% We are done + ?vtrace("handle_serializer_down -> we are done", []), + ets:delete(snmp_agent_table, StorageKey), + ok; + [{_StorageKey, {Exec, Froms}}] -> + %% We have some waiting, so start a new one + ?vtrace("handle_serializer_down -> waiting requests for ~p: " + "~n ~p", [StorageKey, Froms]), + Mon2 = start_serializer(Exec, Tag), + SerializeKey2 = serialize_key(Mon2), + ets:insert(snmp_agent_table, {SerializeKey2, Froms}), + ets:insert(snmp_agent_table, {StorageKey, {Exec, []}}), + ok + end. + + +%% ------------------------------------------------------------------------ + agent_info(#state{worker = W, set_worker = SW}) -> case (catch get_agent_info(W, SW)) of Info when is_list(Info) -> diff --git a/lib/snmp/src/agent/snmpa_vacm.erl b/lib/snmp/src/agent/snmpa_vacm.erl index c31b8e61a3..efe6378105 100644 --- a/lib/snmp/src/agent/snmpa_vacm.erl +++ b/lib/snmp/src/agent/snmpa_vacm.erl @@ -276,45 +276,60 @@ dump_table(_) -> %% If there is already a running dumper process, instead increment %% the dump_request counter. %% When the dumper process exits, the master agent checks the -%% the dump_request counter, and if that is greated than zero, +%% the dump_request counter, and if that is greater than zero, %% create another dumper process and resets the counter. -%% In this way the dumping is serializede, but the master-agent -%% process is not burdened by the dumping. +%% In this way the dumping is serialized, but the master-agent +%% process is not burdend by the dumping. %% </SUGGESTION> dump_table() -> - [{_, FName}] = ets:lookup(snmp_agent_table, snmpa_vacm_file), - TmpName = unique(FName), - case ets:tab2file(snmpa_vacm, TmpName) of - ok -> - case file:rename(TmpName, FName) of - ok -> - ok; - Else -> % What is this? Undocumented return code... - user_err("Warning: could not move VACM db ~p" - " (~p)", [FName, Else]) - end; - {error, Reason} -> - user_err("Warning: could not save vacm db ~p (~p)", - [FName, Reason]) - end. + %% The dumper fun is executed in a specially started process, + %% that does that one thing and then exits. + %% Also, to prevent the system to run "wild" (keep calling + %% dump function before they are done), the agents serialize + %% function return when that dump is done! + Dumper = + fun() -> + [{_, FName}] = ets:lookup(snmp_agent_table, snmpa_vacm_file), + %% TmpName = FName ++ ".tmp", + TmpName = unique_name(FName), + case ets:tab2file(snmpa_vacm, TmpName) of + ok -> + case file:rename(TmpName, FName) of + ok -> + ok; + Else -> % What is this? Undocumented return code... + user_err("Warning: could not move VACM db ~p" + " (~p)", [FName, Else]) + end; + {error, Reason} -> + user_err("Warning: could not save vacm db ~p (~p)", + [FName, Reason]) + end + end, + snmpa_agent:serialize(snmpa_vacm_dump_request, Dumper). + %% This little thing is an attempt to create a "unique" filename %% in order to minimize the risk of two processes at the same %% time dumping the table. -unique(Pre) -> - {A, B, C} = os:timestamp(), - {D, _} = erlang:statistics(reductions), - {E, _} = erlang:statistics(runtime), - {F, _} = erlang:statistics(wall_clock), - {G, H, _} = erlang:statistics(garbage_collection), - Data = [A, B, C, D, E, F, G, H], - unique(Pre, Data, 0). - -unique(Pre, [], Unique) -> +%% The serialization handled by the agent does this much better, +%% but this also gives us a "timestamp" which could be usefull for +%% debugging reasons. +unique_name(Pre) -> + unique_name(Pre, os:timestamp()). + +unique_name(Pre, {_A, _B, C} = Timestamp) -> + {Date, Time} = calendar:now_to_datetime(Timestamp), + {YYYY, MM, DD} = Date, + {Hour, Min, Sec} = Time, + FormatDate = + io_lib:format("~.4w~.2.0w~.2.0w_~.2.0w~.2.0w~.2.0w_~w", + [YYYY, MM, DD, Hour, Min, Sec, round(C/1000)]), + unique_name2(Pre, FormatDate). + +unique_name2(Pre, FormatedDate) -> PidPart = unique_pid(), - lists:flatten(io_lib:format("~s.~s~w.tmp", [Pre, PidPart, Unique])); -unique(Pre, [H|T], Unique) -> - unique(Pre, T, Unique bxor H). + lists:flatten(io_lib:format("~s.~s~s.tmp", [Pre, PidPart, FormatedDate])). unique_pid() -> case string:tokens(pid_to_list(self()), [$<,$.,$>]) of |