%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1998-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
-module(mnesia_registry).
%%%----------------------------------------------------------------------
%%% File : mnesia_registry.erl
%%% Purpose : Support dump and restore of a registry on a C-node
%%% This is an OTP internal module and is not public available.
%%%
%%% Example : Dump some hardcoded records into the Mnesia table Tab
%%%
%%% case rpc:call(Node, mnesia_registry, start_dump, [Tab, self()]) of
%%% Pid when pid(Pid) ->
%%% Pid ! {write, key1, key_size1, val_type1, val_size1, val1},
%%% Pid ! {delete, key3},
%%% Pid ! {write, key2, key_size2, val_type2, val_size2, val2},
%%% Pid ! {write, key4, key_size4, val_type4, val_size4, val4},
%%% Pid ! {commit, self()},
%%% receive
%%% {ok, Pid} ->
%%% ok;
%%% {'EXIT', Pid, Reason} ->
%%% exit(Reason)
%%% end;
%%% {badrpc, Reason} ->
%%% exit(Reason)
%%% end.
%%%
%%% Example : Restore the corresponding Mnesia table Tab
%%%
%%% case rpc:call(Node, mnesia_registry, start_restore, [Tab, self()]) of
%%% {size, Pid, N, LargestKey, LargestVal} ->
%%% Pid ! {send_records, self()},
%%% Fun = fun() ->
%%% receive
%%% {restore, KeySize, ValSize, ValType, Key, Val} ->
%%% {Key, Val};
%%% {'EXIT', Pid, Reason} ->
%%% exit(Reason)
%%% end
%%% end,
%%% lists:map(Fun, lists:seq(1, N));
%%% {badrpc, Reason} ->
%%% exit(Reason)
%%% end.
%%%
%%%----------------------------------------------------------------------
%% External exports
%% Avoid warning for local function max/2 clashing with autoimported BIF.
-compile({no_auto_import,[max/2]}).
-export([start_dump/2, start_restore/2]).
-export([create_table/1, create_table/2]).
%% Internal exports
-export([init/4]).
-record(state, {table, ops = [], link_to}).
-record(registry_entry, {key, key_size, val_type, val_size, val}).
-record(size, {pid = self(), n_values = 0, largest_key = 0, largest_val = 0}).
%%%----------------------------------------------------------------------
%%% Client
%%%----------------------------------------------------------------------
start(Type, Tab, LinkTo) ->
Starter = self(),
Args = [Type, Starter, LinkTo, Tab],
Pid = spawn_link(?MODULE, init, Args),
%% The receiver process may unlink the current process
receive
{ok, Res} ->
Res;
{'EXIT', Pid, Reason} when LinkTo == Starter ->
exit(Reason)
end.
%% Starts a receiver process and optionally creates a Mnesia table
%% with suitable default values. Returns the Pid of the receiver process
%%
%% The receiver process accumulates Mnesia operations and performs
%% all operations or none at commit. The understood messages are:
%%
%% {write, Key, KeySize, ValType, ValSize, Val} ->
%% accumulates mnesia:write({Tab, Key, KeySize, ValType, ValSize, Val})
%% (no reply)
%% {delete, Key} ->
%% accumulates mnesia:delete({Tab, Key}) (no reply)
%% {commit, ReplyTo} ->
%% commits all accumulated operations
%% and stops the process (replies {ok, Pid})
%% abort ->
%% stops the process (no reply)
%%
%% The receiver process is linked to the process with the process identifier
%% LinkTo. If some error occurs the receiver process will invoke exit(Reason)
%% and it is up to he LinkTo process to act properly when it receives an exit
%% signal.
start_dump(Tab, LinkTo) ->
start(dump, Tab, LinkTo).
%% Starts a sender process which sends restore messages back to the
%% LinkTo process. But first are some statistics about the table
%% determined and returned as a 5-tuple:
%%
%% {size, SenderPid, N, LargestKeySize, LargestValSize}
%%
%% where N is the number of records in the table. Then the sender process
%% waits for a 2-tuple message:
%%
%% {send_records, ReplyTo}
%%
%% At last N 6-tuple messages is sent to the ReplyTo process:
%%
%% ReplyTo ! {restore, KeySize, ValSize, ValType, Key, Val}
%%
%% If some error occurs the receiver process will invoke exit(Reason)
%% and it is up to he LinkTo process to act properly when it receives an
%% exit signal.
start_restore(Tab, LinkTo) ->
start(restore, Tab, LinkTo).
%% Optionally creates the Mnesia table Tab with suitable default values.
%% Returns ok or EXIT's
create_table(Tab) ->
Storage = mnesia:table_info(schema, storage_type),
create_table(Tab, [{Storage, [node()]}]).
create_table(Tab, TabDef) ->
Attrs = record_info(fields, registry_entry),
case mnesia:create_table(Tab, [{attributes, Attrs} | TabDef]) of
{atomic, ok} ->
ok;
{aborted, {already_exists, Tab}} ->
ok;
{aborted, Reason} ->
exit(Reason)
end.
%%%----------------------------------------------------------------------
%%% Server
%%%----------------------------------------------------------------------
init(Type, Starter, LinkTo, Tab) ->
if
LinkTo /= Starter ->
link(LinkTo),
unlink(Starter);
true ->
ignore
end,
case Type of
dump ->
Starter ! {ok, self()},
dump_loop(#state{table = Tab, link_to = LinkTo});
restore ->
restore_table(Tab, Starter, LinkTo)
end.
%%%----------------------------------------------------------------------
%%% Dump loop
%%%----------------------------------------------------------------------
dump_loop(S) ->
Tab = S#state.table,
Ops = S#state.ops,
receive
{write, Key, KeySize, ValType, ValSize, Val} ->
RE = #registry_entry{key = Key,
key_size = KeySize,
val_type = ValType,
val_size = ValSize,
val = Val},
dump_loop(S#state{ops = [{write, RE} | Ops]});
{delete, Key} ->
dump_loop(S#state{ops = [{delete, Key} | Ops]});
{commit, ReplyTo} ->
create_table(Tab),
RecName = mnesia:table_info(Tab, record_name),
%% The Ops are in reverse order, but there is no need
%% for reversing the list of accumulated operations
case mnesia:transaction(fun handle_ops/3, [Tab, RecName, Ops]) of
{atomic, ok} ->
ReplyTo ! {ok, self()},
stop(S#state.link_to);
{aborted, Reason} ->
exit({aborted, Reason})
end;
abort ->
stop(S#state.link_to);
BadMsg ->
exit({bad_message, BadMsg})
end.
stop(LinkTo) ->
unlink(LinkTo),
exit(normal).
%% Grab a write lock for the entire table
%% and iterate over all accumulated operations
handle_ops(Tab, RecName, Ops) ->
mnesia:write_lock_table(Tab),
do_handle_ops(Tab, RecName, Ops).
do_handle_ops(Tab, RecName, [{write, RegEntry} | Ops]) ->
Record = setelement(1, RegEntry, RecName),
mnesia:write(Tab, Record, write),
do_handle_ops(Tab, RecName, Ops);
do_handle_ops(Tab, RecName, [{delete, Key} | Ops]) ->
mnesia:delete(Tab, Key, write),
do_handle_ops(Tab, RecName, Ops);
do_handle_ops(_Tab, _RecName, []) ->
ok.
%%%----------------------------------------------------------------------
%%% Restore table
%%%----------------------------------------------------------------------
restore_table(Tab, Starter, LinkTo) ->
Pat = mnesia:table_info(Tab, wild_pattern),
Fun = fun() -> mnesia:match_object(Tab, Pat, read) end,
case mnesia:transaction(Fun) of
{atomic, AllRecords} ->
Size = calc_size(AllRecords, #size{}),
Starter ! {ok, Size},
receive
{send_records, ReplyTo} ->
send_records(AllRecords, ReplyTo),
unlink(LinkTo),
exit(normal);
BadMsg ->
exit({bad_message, BadMsg})
end;
{aborted, Reason} ->
exit(Reason)
end.
calc_size([H | T], S) ->
KeySize = max(element(#registry_entry.key_size, H), S#size.largest_key),
ValSize = max(element(#registry_entry.val_size, H), S#size.largest_val),
N = S#size.n_values + 1,
calc_size(T, S#size{n_values = N, largest_key = KeySize, largest_val = ValSize});
calc_size([], Size) ->
Size.
max(New, Old) when New > Old -> New;
max(_New, Old) -> Old.
send_records([H | T], ReplyTo) ->
KeySize = element(#registry_entry.key_size, H),
ValSize = element(#registry_entry.val_size, H),
ValType = element(#registry_entry.val_type, H),
Key = element(#registry_entry.key, H),
Val = element(#registry_entry.val, H),
ReplyTo ! {restore, KeySize, ValSize, ValType, Key, Val},
send_records(T, ReplyTo);
send_records([], _ReplyTo) ->
ok.