%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1997-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %%%********************************************************************* %%% %%% Description: Module containing the interface towards ETS tables, %%% i.e., handling the polling and thereafter sending the %%% result to the database part of the table tool. %%% %%%********************************************************************* -module(tv_etsread). -compile([{nowarn_deprecated_function,{gs,destroy,1}}, {nowarn_deprecated_function,{gs,start,0}}, {nowarn_deprecated_function,{gs,window,3}}]). -export([etsread/2]). -include("tv_int_def.hrl"). -include("tv_int_msg.hrl"). %%%********************************************************************* %%% EXTERNAL FUNCTIONS %%%********************************************************************* etsread(MasterPid, ErrorMsgMode) -> process_flag(trap_exit, true), put(error_msg_mode, ErrorMsgMode), blocked(MasterPid). %%%********************************************************************* %%% INTERNAL FUNCTIONS %%%********************************************************************* blocked(MasterPid) -> receive Msg -> case Msg of #etsread_deblock{} -> deblock(Msg, MasterPid); {'EXIT', Pid, Reason} -> exit_signals({Pid, Reason}, MasterPid), blocked(MasterPid); {error_msg_mode, Mode} -> put(error_msg_mode, Mode), blocked(MasterPid); _Other -> %% io:format("Received signal ~p~n", [_Other]), blocked(MasterPid) end end. deblock(Msg, MasterPid) -> #etsread_deblock{dbs_pid = DbsPid, table_type = KindOfTable, node = Node, local_node = LocalNode, table_id = TableId, poll_interval = PollInt} = Msg, PollInterval = case PollInt of infinity -> PollInt; _Other -> PollInt * 1000 end, %% Get table info! case catch get_table_info(Node, LocalNode, TableId, KindOfTable) of nodedown -> MasterPid ! #pc_nodedown{sender = self(), automatic_polling = false}, blocked(MasterPid); no_table -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}, blocked(MasterPid); mnesia_not_started -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}, blocked(MasterPid); {unexpected_error,_Reason} -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}, blocked(MasterPid); {Type, Pos, Protection} -> MasterPid ! #etsread_deblock_cfm{sender = self(), type = Type, keypos = Pos, protection = Protection }, timer:sleep(500), case catch read_table(Node, LocalNode, TableId, KindOfTable, DbsPid) of nodedown -> MasterPid ! #pc_nodedown{sender = self(), automatic_polling = false}, blocked(MasterPid); no_table -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}, blocked(MasterPid); mnesia_not_started -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}, blocked(MasterPid); {unexpected_error,_Reason} -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}, blocked(MasterPid); _ElapsedTime -> deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval) end end. get_table_info(Node, LocalNode, TableId, KindOfTable) -> case KindOfTable of ets -> % Check whether table is 'bag' or 'set' type. Type = tv_ets_rpc:info(Node, LocalNode, TableId, type), % Get position for the key. Pos = tv_ets_rpc:info(Node, LocalNode, TableId, keypos), Protection = tv_ets_rpc:info(Node, LocalNode, TableId, protection), {Type, Pos, Protection}; mnesia -> Type = tv_mnesia_rpc:table_info(Node, LocalNode, TableId, type), Pos = 2, %% All Mnesia tables are regarded as being public! {Type, Pos, public} end. deblocked_loop(MasterPid,DbsPid,Node,LocalNode,TableId,KindOfTable,PollInterval) -> receive Msg -> case Msg of #etsread_poll_table{} -> case catch read_table(Node, LocalNode, TableId, KindOfTable, DbsPid) of %% No automatic polling here! nodedown -> MasterPid ! #pc_nodedown{sender = self(), automatic_polling = false}; no_table -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}; mnesia_not_started -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}; {unexpected_error,_Reason} -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = false}; _ElapsedTime -> done end, deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval); #etsread_set_poll_interval{interval = PollInt} -> NewPollInterval = case PollInt of infinity -> PollInt; _Other -> PollInt * 1000 end, deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, NewPollInterval); #etsread_deblock{} -> deblock(Msg, MasterPid); #etsread_update_object{key_no=KeyNo, object=Obj, old_object=OldObj} -> update_object(KindOfTable, Node, LocalNode, TableId, DbsPid, KeyNo, Obj, OldObj, MasterPid, PollInterval), deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval); #etsread_new_object{object=Obj} -> new_object(KindOfTable, Node, LocalNode, TableId, DbsPid, Obj, MasterPid, PollInterval), deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval); #etsread_delete_object{object=Obj} -> delete_object(KindOfTable, Node, LocalNode, TableId, DbsPid, Obj, MasterPid, PollInterval), deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval); #ip_dead_table{} -> AutoPoll = case PollInterval of infinity -> false; _Other -> true end, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}, deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, infinity); #etsread_nodedown{} -> AutoPoll = case PollInterval of infinity -> false; _Other -> true end, MasterPid ! #pc_nodedown{sender = self(), automatic_polling = AutoPoll}, deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, infinity); {error_msg_mode, Mode} -> put(error_msg_mode, Mode), deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval); {'EXIT', Pid, Reason} -> exit_signals({Pid, Reason}, MasterPid), deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, PollInterval) end after PollInterval -> %% Automatic polling must be on, otherwise these %% lines would never be executed! NewPollInterval = case catch read_table(Node,LocalNode,TableId,KindOfTable,DbsPid) of nodedown -> MasterPid ! #pc_nodedown{sender = self(), automatic_polling = true}, infinity; no_table -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = true}, infinity; mnesia_not_started -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = true}, infinity; {unexpected_error,_Reason} -> MasterPid ! #pc_dead_table{sender = self(), automatic_polling = true}, infinity; ElapsedMilliseconds -> if (ElapsedMilliseconds * 1000) >= PollInterval -> infinity; true -> PollInterval end end, deblocked_loop(MasterPid, DbsPid, Node, LocalNode, TableId, KindOfTable, NewPollInterval) end. exit_signals(ExitInfo, MasterPid) -> case ExitInfo of {MasterPid, _Reason} -> exit(normal); _Other -> done end. update_object(KindOfTable, Node, LocalNode, TableId, DbsPid, KeyNo, Obj, OldObj, MasterPid, PollInterval) -> AutoPoll = case PollInterval of infinity -> false; _Other -> true end, case check_record_format(KindOfTable, Node, LocalNode, TableId, Obj) of bad_format -> DbsPid ! #etsread_update_object_cfm{sender = self(), success = false}; ok -> %% Check that we are allowed to edit the table! case catch update_object2(KindOfTable, Node, LocalNode, TableId, DbsPid, KeyNo, Obj, OldObj) of nodedown -> DbsPid ! #etsread_update_object_cfm{sender = self(), success = false}, MasterPid ! #pc_nodedown{sender = self(), automatic_polling = AutoPoll}; no_table -> DbsPid ! #etsread_update_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; mnesia_not_started -> DbsPid ! #etsread_update_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; {unexpected_error,_Reason} -> DbsPid ! #etsread_update_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; ok -> DbsPid ! #etsread_update_object_cfm{sender = self(), success = true} end end. update_object2(ets, Node, LocalNode, Tab, _DbsPid, KeyNo, Obj, OldObj) -> %% We shall update a specific object! If the table is a 'set' table, %% it is just to insert the altered object. However, if the table %% is a 'bag', or a 'duplicate_bag', we first have to remove the %% old object, and then insert the altered one. %% But, we aren't finished with that... we also want to preserve %% the time order, meaning we have to delete *ALL* objects having the %% very same key, and then insert them again! (Actually we would have %% to do this anyhow, due to limitations in the interface functions, %% but this remark has to be noted!) OldKey = element(KeyNo, OldObj), InsertList = case tv_ets_rpc:info(Node, LocalNode, Tab, type) of set -> %% Have to remove old object, because the key may be what's changed. tv_ets_rpc:delete(Node, LocalNode, Tab, OldKey), [Obj]; ordered_set -> %% Have to remove old object, because the key may be what's changed. tv_ets_rpc:delete(Node, LocalNode, Tab, OldKey), [Obj]; _Other -> %% 'bag' or 'duplicate_bag' OldList = tv_ets_rpc:lookup(Node, LocalNode, Tab, OldKey), tv_ets_rpc:delete(Node, LocalNode, Tab, OldKey), %% Have to beware of duplicate_bag tables, %% i.e., the same object may occur more than %% once, but we only want to replace it once! {_Replaced, TmpList} = lists:foldl( fun(Data, {Replaced,Acc}) when Data =/= OldObj -> {Replaced, [Data | Acc]}; (_Data, {Replaced,Acc}) when not Replaced -> {true, [Obj | Acc]}; (Data, {Replaced,Acc}) -> {Replaced, [Data | Acc]} end, {false, []}, OldList), lists:reverse(TmpList) end, lists:foreach(fun(H) -> tv_ets_rpc:insert(Node, LocalNode, Tab, H) end, InsertList), ok; update_object2(mnesia, Node, LocalNode, Tab, _DbsPid, KeyNo, Obj, OldObj) -> OldKey = element(KeyNo, OldObj), InsertList = case tv_mnesia_rpc:table_info(Node, LocalNode, Tab, type) of set -> tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> mnesia:delete(Tab,OldKey,write) end), [Obj]; ordered_set -> tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> mnesia:delete(Tab,OldKey,write) end), [Obj]; _Other -> %% 'bag' or 'duplicate_bag' {atomic, OldList} = tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> mnesia:read(Tab,OldKey,read) end), %% We can't use mnesia:delete_object here, because %% time order wouldn't be preserved then!!! tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> mnesia:delete(Tab,OldKey,write) end), ChangeFun = fun(H) when H =:= OldObj -> Obj; (H) -> H end, [ChangeFun(X) || X <- OldList] end, lists:foreach(fun(H) -> tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> %% This mnesia call shall not be distributed, %% since the transaction sees to that it is %% executed on the right node!!! mnesia:write(Tab,H,write) end) end, InsertList), ok. delete_object(KindOfTable, Node, LocalNode, TableId, DbsPid, Obj, MasterPid, PollInterval) -> AutoPoll = case PollInterval of infinity -> false; _Other -> true end, case catch delete_object2(KindOfTable, Node, LocalNode, TableId, DbsPid, Obj) of nodedown -> DbsPid ! #etsread_delete_object_cfm{sender = self(), success = false}, MasterPid ! #pc_nodedown{sender = self(), automatic_polling = AutoPoll}; no_table -> DbsPid ! #etsread_delete_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; mnesia_not_started -> DbsPid ! #etsread_delete_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; {unexpected_error,_Reason} -> DbsPid ! #etsread_delete_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; ok -> DbsPid ! #etsread_delete_object_cfm{sender = self(), success = true} end. delete_object2(ets, Node, LocalNode, Tab, _DbsPid, Obj) -> KeyNo = tv_ets_rpc:info(Node, LocalNode, Tab, keypos), Key = element(KeyNo, Obj), InsertList = case tv_ets_rpc:info(Node, LocalNode, Tab, type) of set -> %% Have to remove old object, because the key may be what's changed. tv_ets_rpc:delete(Node, LocalNode, Tab, Key), []; ordered_set -> %% Have to remove old object, because the key may be what's changed. tv_ets_rpc:delete(Node, LocalNode, Tab, Key), []; _Other -> %% 'bag' or 'duplicate_bag' OldList = tv_ets_rpc:lookup(Node, LocalNode, Tab, Key), tv_ets_rpc:delete(Node, LocalNode, Tab, Key), OldList -- [Obj] end, lists:foreach(fun(H) -> tv_ets_rpc:insert(Node, LocalNode, Tab, H) end, InsertList), ok; delete_object2(mnesia, Node, LocalNode, Tab, _DbsPid, Obj) -> tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> %% This mnesia call shall not be distributed, %% since the transaction sees to that it is %% executed on the right node!!! mnesia:delete_object(Tab,Obj,write) end), ok. new_object(KindOfTable, Node, LocalNode, TableId, DbsPid, Obj, MasterPid, PollInterval) -> AutoPoll = case PollInterval of infinity -> false; _Other -> true end, case check_record_format(KindOfTable, Node, LocalNode, TableId, Obj) of bad_format -> DbsPid ! #etsread_new_object_cfm{sender = self(), success = false}; ok -> %% Check that we are allowed to edit the table! case catch new_object2(KindOfTable, Node, LocalNode, TableId, DbsPid, Obj) of nodedown -> DbsPid ! #etsread_new_object_cfm{sender = self(), success = false}, MasterPid ! #pc_nodedown{sender = self(), automatic_polling = AutoPoll}; no_table -> DbsPid ! #etsread_new_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; mnesia_not_started -> DbsPid ! #etsread_new_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; {unexpected_error,_Reason} -> DbsPid ! #etsread_new_object_cfm{sender = self(), success = false}, MasterPid ! #pc_dead_table{sender = self(), automatic_polling = AutoPoll}; ok -> DbsPid ! #etsread_new_object_cfm{sender = self(), success = true} end end. new_object2(ets, Node, LocalNode, Tab, _DbsPid, Obj) -> tv_ets_rpc:insert(Node, LocalNode, Tab, Obj), ok; new_object2(mnesia, Node, LocalNode, Tab, _DbsPid, Obj) -> tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> %% This mnesia call shall not be distributed, %% since the transaction sees to that it is %% executed on the right node!!! mnesia:write(Tab,Obj,write) end), ok. check_record_format(mnesia, Node, LocalNode, Tab, Obj) -> Arity = tv_mnesia_rpc:table_info(Node, LocalNode, Tab, arity), case size(Obj) of Arity -> ok; _Other -> gs:window(etsreadwin, gs:start(), []), case get(error_msg_mode) of normal -> tv_utils:notify(etsreadwin, "TV Notification", ["The record is not complete,", "too few fields are specified!"]); haiku -> tv_utils:notify(etsreadwin, "TV Notification", ["The attempt to change", "The specified record size", "Is simply ignored."]) end, gs:destroy(etsreadwin), bad_format end; check_record_format(ets, _Node, _LocalNode, _Tab, _Obj) -> ok. read_table(Node, LocalNode, Tab, KindOfTable, DbsPid) -> T1 = time(), {TableContent, ListOfKeys} = case KindOfTable of ets -> {tv_ets_rpc:tab2list(Node, LocalNode, Tab), [tv_ets_rpc:info(Node, LocalNode, Tab, keypos)] }; mnesia -> %% It may be tempting to use Mnesia event subscription, %% but will this really save the day? The main drawback %% is that we will then have to update the table copy we %% store internally in two different ways: one for the %% Mnesia tables, and one for the ETS tables. Also, if %% the Mnesia tables are frequently updated, this will %% cause TV to work all the time too (either updating the %% table copy for each inserted/deleted object, or storing %% these objects until polling is ordered). To make this %% work smoothly requires a bit of work... %% The second drawback is that it doesn't seem clear in all %% circumstances how the subscription actually works - i.e., %% if we only use subscriptions, can we actually be sure that %% the *real* state of the table is the same as the one kept %% in TV? For example, imagine the scenario that Mnesia is %% stopped, all Mnesia directories are removed (from the UNIX %% shell), and then Mnesia once again is started. The first %% problem is that we have to check for start/stop of Mnesia, %% the second is that we then have to rescan the actual table. %% The logic for this may require som effort to write! %% Also, what will happen if the table is killed/dies? %% Will we get messages for each element in the table? %% (I havent't checked this last issue, this is just som thoughts.) %% And generally, there is always a risk that a message is lost, %% which will result in TV showing an erroneous table content. %% %% All in all, using Mnesia subscriptions *may* be a sub-optimization. %% The current solution works fine, is also easy to control, and is %% mainly the same for both ETS and Mnesia tables. %% My suggestion is that it is used until someone actually complains %% about the polling time being too long for huge tables! :-) %% (However, it shall be emphasized that it is this module that %% actually polls the Mnesia/ETS tables, meaning that it is %% mainly this module that has to be modified, should the usage of %% subscriptions be desired. The other module that has to be modified %% is the one maintaining the internal copy of the table.) WildPattern = tv_mnesia_rpc:table_info(Node,LocalNode,Tab,wild_pattern), {atomic, Content} = tv_mnesia_rpc:transaction( Node, LocalNode, fun() -> %% This mnesia call shall not be distributed, %% since the transaction sees to that it is %% executed on the right node!!! mnesia:match_object(Tab, WildPattern, read) end), {Content, [2 | tv_mnesia_rpc:table_info(Node, LocalNode,Tab, index)]} end, T2 = time(), ElapsedTime = compute_elapsed_seconds(T1, T2), DbsPid ! #dbs_new_data{sender = self(), data = TableContent, keys = ListOfKeys, time_to_read_table = ElapsedTime }, ElapsedTime. compute_elapsed_seconds({H1, M1, S1}, {H2, M2, S2}) -> ElapsedHours = get_time_diff(hours, H1, H2), ElapsedMinutes = get_time_diff(minutes, M1, M2), ElapsedSeconds = get_time_diff(seconds, S1, S2), (ElapsedHours * 3600) + (ElapsedMinutes * 60) + ElapsedSeconds + 1. get_time_diff(_Type, T1, T2) when T1 =< T2 -> T2 - T1; get_time_diff(hours, T1, T2) -> T2 + 24 - T1; get_time_diff(minutes, T1, T2) -> T2 + 60 - T1; get_time_diff(seconds, T1, T2) -> T2 + 60 - T1.