%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1998-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%%% Purpose : Loads tables from local disc or from remote node
-module(mnesia_loader).
%% Mnesia internal stuff
-export([disc_load_table/2,
net_load_table/4,
send_table/3]).
-export([old_node_init_table/6]). %% Spawned old node protocol conversion hack
-export([spawned_receiver/8]). %% Spawned lock taking process
-import(mnesia_lib, [set/2, fatal/2, verbose/2, dbg_out/2]).
-include("mnesia.hrl").
val(Var) ->
case ?catch_val(Var) of
{'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
Value -> Value
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Load a table from local disc
disc_load_table(Tab, Reason) ->
Storage = val({Tab, storage_type}),
Type = val({Tab, setorbag}),
dbg_out("Getting table ~p (~p) from disc: ~p~n",
[Tab, Storage, Reason]),
?eval_debug_fun({?MODULE, do_get_disc_copy},
[{tab, Tab},
{reason, Reason},
{storage, Storage},
{type, Type}]),
do_get_disc_copy2(Tab, Reason, Storage, Type).
do_get_disc_copy2(Tab, _Reason, Storage, _Type) when Storage == unknown ->
verbose("Local table copy of ~p has recently been deleted, ignored.~n",
[Tab]),
{loaded, ok}; %% ?
do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_copies ->
%% NOW we create the actual table
Repair = mnesia_monitor:get_env(auto_repair),
Args = [{keypos, 2}, public, named_table, Type],
case Reason of
{dumper, _} -> %% Resources allready allocated
ignore;
_ ->
mnesia_monitor:mktab(Tab, Args),
Count = mnesia_log:dcd2ets(Tab, Repair),
case ets:info(Tab, size) of
X when X < Count * 4 ->
ok = mnesia_log:ets2dcd(Tab);
_ ->
ignore
end
end,
mnesia_index:init_index(Tab, Storage),
snmpify(Tab, Storage),
set({Tab, load_node}, node()),
set({Tab, load_reason}, Reason),
{loaded, ok};
do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == ram_copies ->
Args = [{keypos, 2}, public, named_table, Type],
case Reason of
{dumper, _} -> %% Resources allready allocated
ignore;
_ ->
mnesia_monitor:mktab(Tab, Args),
Fname = mnesia_lib:tab2dcd(Tab),
Datname = mnesia_lib:tab2dat(Tab),
Repair = mnesia_monitor:get_env(auto_repair),
case mnesia_monitor:use_dir() of
true ->
case mnesia_lib:exists(Fname) of
true -> mnesia_log:dcd2ets(Tab, Repair);
false ->
case mnesia_lib:exists(Datname) of
true ->
mnesia_lib:dets_to_ets(Tab, Tab, Datname,
Type, Repair, no);
false ->
false
end
end;
false ->
false
end
end,
mnesia_index:init_index(Tab, Storage),
snmpify(Tab, Storage),
set({Tab, load_node}, node()),
set({Tab, load_reason}, Reason),
{loaded, ok};
do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies ->
Args = [{file, mnesia_lib:tab2dat(Tab)},
{type, mnesia_lib:disk_type(Tab, Type)},
{keypos, 2},
{repair, mnesia_monitor:get_env(auto_repair)}],
case Reason of
{dumper, _} ->
mnesia_index:init_index(Tab, Storage),
snmpify(Tab, Storage),
set({Tab, load_node}, node()),
set({Tab, load_reason}, Reason),
{loaded, ok};
_ ->
case mnesia_monitor:open_dets(Tab, Args) of
{ok, _} ->
mnesia_index:init_index(Tab, Storage),
snmpify(Tab, Storage),
set({Tab, load_node}, node()),
set({Tab, load_reason}, Reason),
{loaded, ok};
{error, Error} ->
{not_loaded, {"Failed to create dets table", Error}}
end
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Load a table from a remote node
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% Receiver Sender
%% -------- ------
%% Grab schema lock on table
%% Determine table size
%% Create empty pre-grown table
%% Grab read lock on table
%% Let receiver subscribe on updates done on sender node
%% Disable rehashing of table
%% Release read lock on table
%% Send table to receiver in chunks
%%
%% Grab read lock on table
%% Block dirty updates
%% Update wherabouts
%%
%% Cancel the update subscription
%% Process the subscription events
%% Optionally dump to disc
%% Unblock dirty updates
%% Release read lock on table
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-define(MAX_TRANSFER_SIZE, 7500).
-define(MAX_RAM_FILE_SIZE, 1000000).
-define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1).
-define(MAX_NOPACKETS, 20).
net_load_table(Tab, Reason, Ns, Cs)
when Reason == {dumper,add_table_copy} ->
try_net_load_table(Tab, Reason, Ns, Cs);
net_load_table(Tab, Reason, Ns, _Cs) ->
try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})).
try_net_load_table(Tab, _Reason, [], _Cs) ->
verbose("Copy failed. No active replicas of ~p are available.~n", [Tab]),
{not_loaded, none_active};
try_net_load_table(Tab, Reason, Ns, Cs) ->
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
do_get_network_copy(Tab, Reason, Ns, Storage, Cs).
do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->
verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]),
{not_loaded, storage_unknown};
do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
[Node | Tail] = Ns,
case lists:member(Node,val({current, db_nodes})) of
true ->
dbg_out("Getting table ~p (~p) from node ~p: ~p~n",
[Tab, Storage, Node, Reason]),
?eval_debug_fun({?MODULE, do_get_network_copy},
[{tab, Tab}, {reason, Reason},
{nodes, Ns}, {storage, Storage}]),
case init_receiver(Node, Tab, Storage, Cs, Reason) of
ok ->
set({Tab, load_node}, Node),
set({Tab, load_reason}, Reason),
mnesia_controller:i_have_tab(Tab),
dbg_out("Table ~p copied from ~p to ~p~n", [Tab, Node, node()]),
{loaded, ok};
Err = {error, _} when element(1, Reason) == dumper ->
{not_loaded,Err};
restart ->
try_net_load_table(Tab, Reason, Tail ++ [Node], Cs);
down ->
try_net_load_table(Tab, Reason, Tail, Cs)
end;
false ->
try_net_load_table(Tab, Reason, Tail, Cs)
end.
snmpify(Tab, Storage) ->
do_snmpify(Tab, val({Tab, snmp}), Storage).
do_snmpify(_Tab, [], _Storage) ->
ignore;
do_snmpify(Tab, Us, Storage) ->
Snmp = mnesia_snmp_hook:create_table(Us, Tab, Storage),
set({Tab, {index, snmp}}, Snmp).
%% Start the recieiver
init_receiver(Node, Tab, Storage, Cs, Reas={dumper,add_table_copy}) ->
case start_remote_sender(Node, Tab, Storage) of
{SenderPid, TabSize, DetsData} ->
start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,Reas);
Else ->
Else
end;
init_receiver(Node, Tab,Storage,Cs,Reason) ->
%% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping.
%% Both may grab tables-locks in different order.
Load =
fun() ->
{_,Tid,Ts} = get(mnesia_activity_state),
mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab}),
%% Check that table still exists
Active = val({Tab, active_replicas}),
%% Check that we havn't loaded it already
case val({Tab,where_to_read}) == node() of
true -> ok;
_ ->
%% And that sender still got a copy
%% (something might have happend while
%% we where waiting for the lock)
true = lists:member(Node, Active),
{SenderPid, TabSize, DetsData} =
start_remote_sender(Node,Tab,Storage),
Init = table_init_fun(SenderPid),
Args = [self(),Tab,Storage,Cs,SenderPid,
TabSize,DetsData,Init],
Pid = spawn_link(?MODULE, spawned_receiver, Args),
put(mnesia_real_loader, Pid),
wait_on_load_complete(Pid)
end
end,
Res =
case mnesia:transaction(Load, 20) of
{atomic, {error,Result}} when
element(1,Reason) == dumper ->
{error,Result};
{atomic, {error,Result}} ->
fatal("Cannot create table ~p: ~p~n",
[[Tab, Storage], Result]);
{atomic, Result} -> Result;
{aborted, nomore} -> restart;
{aborted, _Reas} ->
verbose("Receiver failed on ~p from ~p:~nReason: ~p~n",
[Tab,Node,_Reas]),
down %% either this node or sender is dying
end,
unlink(whereis(mnesia_tm)), %% Avoid late unlink from tm
Res.
start_remote_sender(Node,Tab,Storage) ->
mnesia_controller:start_remote_sender(Node, Tab, self(), Storage),
put(mnesia_table_sender_node, {Tab, Node}),
receive
{SenderPid, {first, TabSize}} ->
{SenderPid, TabSize, false};
{SenderPid, {first, TabSize, DetsData}} ->
{SenderPid, TabSize, DetsData};
%% Protocol conversion hack
{copier_done, Node} ->
verbose("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),
down(Tab, Storage)
end.
table_init_fun(SenderPid) ->
PConv = mnesia_monitor:needs_protocol_conversion(node(SenderPid)),
MeMyselfAndI = self(),
fun(read) ->
Receiver =
if
PConv == true ->
MeMyselfAndI ! {actual_tabrec, self()},
MeMyselfAndI; %% Old mnesia
PConv == false -> self()
end,
SenderPid ! {Receiver, more},
get_data(SenderPid, Receiver)
end.
%% Add_table_copy get's it's own locks.
start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) ->
Init = table_init_fun(SenderPid),
case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of
Err = {error, _} ->
SenderPid ! {copier_done, node()},
Err;
Else ->
Else
end.
spawned_receiver(ReplyTo,Tab,Storage,Cs, SenderPid,TabSize,DetsData, Init) ->
process_flag(trap_exit, true),
Done = do_init_table(Tab,Storage,Cs,
SenderPid,TabSize,DetsData,
ReplyTo, Init),
ReplyTo ! {self(),Done},
unlink(ReplyTo),
unlink(whereis(mnesia_controller)),
exit(normal).
wait_on_load_complete(Pid) ->
receive
{Pid, Res} ->
Res;
{'EXIT', Pid, Reason} ->
exit(Reason);
Else ->
Pid ! Else,
wait_on_load_complete(Pid)
end.
do_init_table(Tab,Storage,Cs,SenderPid,
TabSize,DetsInfo,OrigTabRec,Init) ->
case create_table(Tab, TabSize, Storage, Cs) of
{Storage,Tab} ->
%% Debug info
Node = node(SenderPid),
put(mnesia_table_receiver, {Tab, Node, SenderPid}),
mnesia_tm:block_tab(Tab),
PConv = mnesia_monitor:needs_protocol_conversion(Node),
case init_table(Tab,Storage,Init,PConv,DetsInfo,SenderPid) of
ok ->
tab_receiver(Node,Tab,Storage,Cs,PConv,OrigTabRec);
Reason ->
Msg = "[d]ets:init table failed",
verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]),
down(Tab, Storage)
end;
Error ->
Error
end.
create_table(Tab, TabSize, Storage, Cs) ->
if
Storage == disc_only_copies ->
mnesia_lib:lock_table(Tab),
Tmp = mnesia_lib:tab2tmp(Tab),
Size = lists:max([TabSize, 256]),
Args = [{file, Tmp},
{keypos, 2},
%% {ram_file, true},
{estimated_no_objects, Size},
{repair, mnesia_monitor:get_env(auto_repair)},
{type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],
file:delete(Tmp),
case mnesia_lib:dets_sync_open(Tab, Args) of
{ok, _} ->
mnesia_lib:unlock_table(Tab),
{Storage, Tab};
Else ->
mnesia_lib:unlock_table(Tab),
Else
end;
(Storage == ram_copies) or (Storage == disc_copies) ->
Args = [{keypos, 2}, public, named_table, Cs#cstruct.type],
case mnesia_monitor:unsafe_mktab(Tab, Args) of
Tab ->
{Storage, Tab};
Else ->
Else
end
end.
tab_receiver(Node, Tab, Storage, Cs, PConv, OrigTabRec) ->
receive
{SenderPid, {no_more, DatBin}} when PConv == false ->
finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec);
%% Protocol conversion hack
{SenderPid, {no_more, DatBin}} when is_pid(PConv) ->
PConv ! {SenderPid, no_more},
receive
{old_init_table_complete, ok} ->
finish_copy(Storage, Tab, Cs, SenderPid, DatBin,OrigTabRec);
{old_init_table_complete, Reason} ->
Msg = "OLD: [d]ets:init table failed",
verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]),
down(Tab, Storage)
end;
{actual_tabrec, Pid} ->
tab_receiver(Node, Tab, Storage, Cs, Pid,OrigTabRec);
{SenderPid, {more, [Recs]}} when is_pid(PConv) ->
PConv ! {SenderPid, {more, Recs}}, %% Forward Msg to OldNodes
tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec);
{'EXIT', PConv, Reason} -> %% [d]ets:init process crashed
Msg = "Receiver crashed",
verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]),
down(Tab, Storage);
%% Protocol conversion hack
{copier_done, Node} ->
verbose("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),
down(Tab, Storage);
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec)
end.
make_table_fun(Pid, TabRec) ->
fun(close) ->
ok;
(read) ->
get_data(Pid, TabRec)
end.
get_data(Pid, TabRec) ->
receive
{Pid, {more_z, CompressedRecs}} when is_binary(CompressedRecs) ->
Pid ! {TabRec, more},
{zlib_uncompress(CompressedRecs), make_table_fun(Pid,TabRec)};
{Pid, {more, Recs}} ->
Pid ! {TabRec, more},
{Recs, make_table_fun(Pid,TabRec)};
{Pid, no_more} ->
end_of_input;
{copier_done, Node} ->
case node(Pid) of
Node ->
{copier_done, Node};
_ ->
get_data(Pid, TabRec)
end;
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
get_data(Pid, TabRec)
end.
init_table(Tab, disc_only_copies, Fun, false, DetsInfo,Sender) ->
ErtsVer = erlang:system_info(version),
case DetsInfo of
{ErtsVer, DetsData} ->
Res = (catch dets:is_compatible_bchunk_format(Tab, DetsData)),
case Res of
{'EXIT',{undef,[{dets,_,_,_}|_]}} ->
Sender ! {self(), {old_protocol, Tab}},
dets:init_table(Tab, Fun); %% Old dets version
{'EXIT', What} ->
exit(What);
false ->
Sender ! {self(), {old_protocol, Tab}},
dets:init_table(Tab, Fun); %% Old dets version
true ->
dets:init_table(Tab, Fun, [{format, bchunk}])
end;
Old when Old /= false ->
Sender ! {self(), {old_protocol, Tab}},
dets:init_table(Tab, Fun); %% Old dets version
_ ->
dets:init_table(Tab, Fun)
end;
init_table(Tab, _, Fun, false, _DetsInfo,_) ->
case catch ets:init_table(Tab, Fun) of
true ->
ok;
{'EXIT', Else} -> Else
end;
init_table(Tab, Storage, Fun, true, _DetsInfo, Sender) -> %% Old Nodes
spawn_link(?MODULE, old_node_init_table,
[Tab, Storage, Fun, self(), false, Sender]),
ok.
old_node_init_table(Tab, Storage, Fun, TabReceiver, DetsInfo,Sender) ->
Res = init_table(Tab, Storage, Fun, false, DetsInfo,Sender),
TabReceiver ! {old_init_table_complete, Res},
unlink(TabReceiver),
ok.
finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
TabRef = {Storage, Tab},
subscr_receiver(TabRef, Cs#cstruct.record_name),
case handle_last(TabRef, Cs#cstruct.type, DatBin) of
ok ->
mnesia_index:init_index(Tab, Storage),
snmpify(Tab, Storage),
%% OrigTabRec must not be the spawned tab-receiver
%% due to old protocol.
SenderPid ! {OrigTabRec, no_more},
mnesia_tm:unblock_tab(Tab),
ok;
{error, Reason} ->
Msg = "Failed to handle last",
verbose("~s: ~p: ~p~n", [Msg, Tab, Reason]),
down(Tab, Storage)
end.
subscr_receiver(TabRef = {_, Tab}, RecName) ->
receive
{mnesia_table_event, {Op, Val, _Tid}} ->
if
Tab == RecName ->
handle_event(TabRef, Op, Val);
true ->
handle_event(TabRef, Op, setelement(1, Val, RecName))
end,
subscr_receiver(TabRef, RecName);
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
subscr_receiver(TabRef, RecName)
after 0 ->
ok
end.
handle_event(TabRef, write, Rec) ->
db_put(TabRef, Rec);
handle_event(TabRef, delete, {_Tab, Key}) ->
db_erase(TabRef, Key);
handle_event(TabRef, delete_object, OldRec) ->
db_match_erase(TabRef, OldRec);
handle_event(TabRef, clear_table, {_Tab, _Key}) ->
db_match_erase(TabRef, '_').
handle_last({disc_copies, Tab}, _Type, nobin) ->
Ret = mnesia_log:ets2dcd(Tab),
Fname = mnesia_lib:tab2dat(Tab),
case mnesia_lib:exists(Fname) of
true -> %% Remove old .DAT files.
file:delete(Fname);
false ->
ok
end,
Ret;
handle_last({disc_only_copies, Tab}, Type, nobin) ->
mnesia_lib:dets_sync_close(Tab),
Tmp = mnesia_lib:tab2tmp(Tab),
Dat = mnesia_lib:tab2dat(Tab),
case file:rename(Tmp, Dat) of
ok ->
Args = [{file, mnesia_lib:tab2dat(Tab)},
{type, mnesia_lib:disk_type(Tab, Type)},
{keypos, 2},
{repair, mnesia_monitor:get_env(auto_repair)}],
mnesia_monitor:open_dets(Tab, Args),
ok;
{error, Reason} ->
{error, {"Cannot swap tmp files", Tab, Reason}}
end;
handle_last({ram_copies, _Tab}, _Type, nobin) ->
ok;
handle_last({ram_copies, Tab}, _Type, DatBin) ->
case mnesia_monitor:use_dir() of
true ->
mnesia_lib:lock_table(Tab),
Tmp = mnesia_lib:tab2tmp(Tab),
ok = file:write_file(Tmp, DatBin),
ok = file:rename(Tmp, mnesia_lib:tab2dcd(Tab)),
mnesia_lib:unlock_table(Tab),
ok;
false ->
ok
end.
down(Tab, Storage) ->
case Storage of
ram_copies ->
catch ?ets_delete_table(Tab);
disc_copies ->
catch ?ets_delete_table(Tab);
disc_only_copies ->
TmpFile = mnesia_lib:tab2tmp(Tab),
mnesia_lib:dets_sync_close(Tab),
file:delete(TmpFile)
end,
mnesia_checkpoint:tm_del_copy(Tab, node()),
mnesia_controller:sync_del_table_copy_whereabouts(Tab, node()),
mnesia_tm:unblock_tab(Tab),
flush_subcrs(),
down.
flush_subcrs() ->
receive
{mnesia_table_event, _} ->
flush_subcrs();
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
flush_subcrs()
after 0 ->
done
end.
db_erase({ram_copies, Tab}, Key) ->
true = ?ets_delete(Tab, Key);
db_erase({disc_copies, Tab}, Key) ->
true = ?ets_delete(Tab, Key);
db_erase({disc_only_copies, Tab}, Key) ->
ok = dets:delete(Tab, Key).
db_match_erase({ram_copies, Tab} , Pat) ->
true = ?ets_match_delete(Tab, Pat);
db_match_erase({disc_copies, Tab} , Pat) ->
true = ?ets_match_delete(Tab, Pat);
db_match_erase({disc_only_copies, Tab}, Pat) ->
ok = dets:match_delete(Tab, Pat).
db_put({ram_copies, Tab}, Val) ->
true = ?ets_insert(Tab, Val);
db_put({disc_copies, Tab}, Val) ->
true = ?ets_insert(Tab, Val);
db_put({disc_only_copies, Tab}, Val) ->
ok = dets:insert(Tab, Val).
%% This code executes at the remote site where the data is
%% executes in a special copier process.
calc_nokeys(Storage, Tab) ->
%% Calculate #keys per transfer
Key = mnesia_lib:db_first(Storage, Tab),
Recs = mnesia_lib:db_get(Storage, Tab, Key),
BinSize = size(term_to_binary(Recs)),
(?MAX_TRANSFER_SIZE div BinSize) + 1.
send_table(Pid, Tab, RemoteS) ->
case ?catch_val({Tab, storage_type}) of
{'EXIT', _} ->
{error, {no_exists, Tab}};
unknown ->
{error, {no_exists, Tab}};
Storage ->
%% Send first
TabSize = mnesia:table_info(Tab, size),
Pconvert = mnesia_monitor:needs_protocol_conversion(node(Pid)),
KeysPerTransfer = calc_nokeys(Storage, Tab),
ChunkData = dets:info(Tab, bchunk_format),
UseDetsChunk =
Storage == RemoteS andalso
Storage == disc_only_copies andalso
ChunkData /= undefined andalso
Pconvert == false,
if
UseDetsChunk == true ->
DetsInfo = erlang:system_info(version),
Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
true ->
Pid ! {self(), {first, TabSize}}
end,
%% Debug info
put(mnesia_table_sender, {Tab, node(Pid), Pid}),
{Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer),
SendIt = fun() ->
prepare_copy(Pid, Tab, Storage),
send_more(Pid, 1, Chunk, Init(), Tab, Pconvert),
finish_copy(Pid, Tab, Storage, RemoteS)
end,
case catch SendIt() of
receiver_died ->
cleanup_tab_copier(Pid, Storage, Tab),
unlink(whereis(mnesia_tm)),
ok;
{_, receiver_died} ->
unlink(whereis(mnesia_tm)),
ok;
{atomic, no_more} ->
unlink(whereis(mnesia_tm)),
ok;
Reason ->
cleanup_tab_copier(Pid, Storage, Tab),
unlink(whereis(mnesia_tm)),
{error, Reason}
end
end.
prepare_copy(Pid, Tab, Storage) ->
Trans =
fun() ->
mnesia:lock_table(Tab, load),
mnesia_subscr:subscribe(Pid, {table, Tab}),
update_where_to_write(Tab, node(Pid)),
mnesia_lib:db_fixtable(Storage, Tab, true),
ok
end,
case mnesia:transaction(Trans) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
exit({tab_copier_prepare, Tab, Reason})
end.
update_where_to_write(Tab, Node) ->
case val({Tab, access_mode}) of
read_only ->
ignore;
read_write ->
Current = val({current, db_nodes}),
Ns =
case lists:member(Node, Current) of
true -> Current;
false -> [Node | Current]
end,
update_where_to_write(Ns, Tab, Node)
end.
update_where_to_write([], _, _) ->
ok;
update_where_to_write([H|T], Tab, AddNode) ->
rpc:call(H, mnesia_controller, call,
[{update_where_to_write, [add, Tab, AddNode], self()}]),
update_where_to_write(T, Tab, AddNode).
send_more(Pid, N, Chunk, DataState, Tab, OldNode) ->
receive
{NewPid, more} ->
case send_packet(N - 1, NewPid, Chunk, DataState, OldNode) of
New when is_integer(New) ->
New - 1;
NewData ->
send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, Tab, OldNode)
end;
{_NewPid, {old_protocol, Tab}} ->
Storage = val({Tab, storage_type}),
{Init, NewChunk} =
reader_funcs(false, Tab, Storage, calc_nokeys(Storage, Tab)),
send_more(Pid, 1, NewChunk, Init(), Tab, OldNode);
{copier_done, Node} when Node == node(Pid)->
verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]),
throw(receiver_died)
end.
reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) ->
case UseDetsChunk of
false ->
{fun() -> mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) end,
fun(Cont) -> mnesia_lib:db_chunk(Storage, Cont) end};
true ->
{fun() -> dets_bchunk(Tab, start) end,
fun(Cont) -> dets_bchunk(Tab, Cont) end}
end.
dets_bchunk(Tab, Chunk) -> %% Arrg
case dets:bchunk(Tab, Chunk) of
{Cont, Data} -> {Data, Cont};
Else -> Else
end.
zlib_compress(Data, Level) ->
BinData = term_to_binary(Data),
Z = zlib:open(),
zlib:deflateInit(Z, Level),
Bs = zlib:deflate(Z, BinData, finish),
zlib:deflateEnd(Z),
zlib:close(Z),
list_to_binary(Bs).
zlib_uncompress(Data) when is_binary(Data) ->
binary_to_term(zlib:uncompress(Data)).
compression_level() ->
NoCompression = 0,
case ?catch_val(send_compressed) of
{'EXIT', _} ->
mnesia_lib:set(send_compressed, NoCompression),
NoCompression;
Val -> Val
end.
send_packet(N, Pid, _Chunk, '$end_of_table', OldNode) ->
case OldNode of
true -> ignore; %% Old nodes can't handle the new no_more
false -> Pid ! {self(), no_more}
end,
N;
send_packet(N, Pid, Chunk, {[], Cont}, OldNode) ->
send_packet(N, Pid, Chunk, Chunk(Cont), OldNode);
send_packet(N, Pid, Chunk, {Recs, Cont}, OldNode) when N < ?MAX_NOPACKETS ->
case OldNode of
true ->
Pid ! {self(), {more, [Recs]}}; %% Old need's wrapping list
false ->
case compression_level() of
0 ->
Pid ! {self(), {more, Recs}};
Level ->
Pid ! {self(), {more_z, zlib_compress(Recs, Level)}}
end
end,
send_packet(N+1, Pid, Chunk, Chunk(Cont), OldNode);
send_packet(_N, _Pid, _Chunk, DataState, _OldNode) ->
DataState.
finish_copy(Pid, Tab, Storage, RemoteS) ->
RecNode = node(Pid),
DatBin = dat2bin(Tab, Storage, RemoteS),
Trans =
fun() ->
mnesia:read_lock_table(Tab),
A = val({Tab, access_mode}),
mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A),
cleanup_tab_copier(Pid, Storage, Tab),
mnesia_checkpoint:tm_add_copy(Tab, RecNode),
Pid ! {self(), {no_more, DatBin}},
receive
{Pid, no_more} -> % Dont bother about the spurious 'more' message
no_more;
{copier_done, Node} when Node == node(Pid)->
verbose("Tab receiver ~p crashed (more): ~p~n", [Tab, Node]),
receiver_died
end
end,
mnesia:transaction(Trans).
cleanup_tab_copier(Pid, Storage, Tab) ->
mnesia_lib:db_fixtable(Storage, Tab, false),
mnesia_subscr:unsubscribe(Pid, {table, Tab}).
dat2bin(Tab, ram_copies, ram_copies) ->
mnesia_lib:lock_table(Tab),
Res = file:read_file(mnesia_lib:tab2dcd(Tab)),
mnesia_lib:unlock_table(Tab),
case Res of
{ok, DatBin} -> DatBin;
_ -> nobin
end;
dat2bin(_Tab, _LocalS, _RemoteS) ->
nobin.
handle_exit(Pid, Reason) when node(Pid) == node() ->
exit(Reason);
handle_exit(_Pid, _Reason) -> %% Not from our node, this will be handled by
ignore. %% mnesia_down soon.