aboutsummaryrefslogtreecommitdiffstats
path: root/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_loader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_loader.erl')
-rw-r--r--lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_loader.erl805
1 files changed, 0 insertions, 805 deletions
diff --git a/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_loader.erl b/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_loader.erl
deleted file mode 100644
index df3309cfa6..0000000000
--- a/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_loader.erl
+++ /dev/null
@@ -1,805 +0,0 @@
-%% ``The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved via the world wide web at http://www.erlang.org/.
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
-%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
-%% AB. All Rights Reserved.''
-%%
-%% $Id: mnesia_loader.erl,v 1.2 2010/03/04 13:54:19 maria Exp $
-%%
-%%% Purpose : Loads tables from local disc or from remote node
-
--module(mnesia_loader).
-
-%% Mnesia internal stuff
--export([disc_load_table/2,
- net_load_table/4,
- send_table/3]).
-
--export([old_node_init_table/6]). %% Spawned old node protocol conversion hack
--export([spawned_receiver/8]). %% Spawned lock taking process
-
--import(mnesia_lib, [set/2, fatal/2, verbose/2, dbg_out/2]).
-
--include("mnesia.hrl").
-
-val(Var) ->
- case ?catch_val(Var) of
- {'EXIT', Reason} -> mnesia_lib:other_val(Var, Reason);
- Value -> Value
- end.
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% Load a table from local disc
-
-disc_load_table(Tab, Reason) ->
- Storage = val({Tab, storage_type}),
- Type = val({Tab, setorbag}),
- dbg_out("Getting table ~p (~p) from disc: ~p~n",
- [Tab, Storage, Reason]),
- ?eval_debug_fun({?MODULE, do_get_disc_copy},
- [{tab, Tab},
- {reason, Reason},
- {storage, Storage},
- {type, Type}]),
- do_get_disc_copy2(Tab, Reason, Storage, Type).
-
-do_get_disc_copy2(Tab, _Reason, Storage, _Type) when Storage == unknown ->
- verbose("Local table copy of ~p has recently been deleted, ignored.~n",
- [Tab]),
- {loaded, ok}; %% ?
-do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_copies ->
- %% NOW we create the actual table
- Repair = mnesia_monitor:get_env(auto_repair),
- Args = [{keypos, 2}, public, named_table, Type],
- case Reason of
- {dumper, _} -> %% Resources allready allocated
- ignore;
- _ ->
- mnesia_monitor:mktab(Tab, Args),
- Count = mnesia_log:dcd2ets(Tab, Repair),
- case ets:info(Tab, size) of
- X when X < Count * 4 ->
- ok = mnesia_log:ets2dcd(Tab);
- _ ->
- ignore
- end
- end,
- mnesia_index:init_index(Tab, Storage),
- snmpify(Tab, Storage),
- set({Tab, load_node}, node()),
- set({Tab, load_reason}, Reason),
- {loaded, ok};
-
-do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == ram_copies ->
- Args = [{keypos, 2}, public, named_table, Type],
- case Reason of
- {dumper, _} -> %% Resources allready allocated
- ignore;
- _ ->
- mnesia_monitor:mktab(Tab, Args),
- Fname = mnesia_lib:tab2dcd(Tab),
- Datname = mnesia_lib:tab2dat(Tab),
- Repair = mnesia_monitor:get_env(auto_repair),
- case mnesia_monitor:use_dir() of
- true ->
- case mnesia_lib:exists(Fname) of
- true -> mnesia_log:dcd2ets(Tab, Repair);
- false ->
- case mnesia_lib:exists(Datname) of
- true ->
- mnesia_lib:dets_to_ets(Tab, Tab, Datname,
- Type, Repair, no);
- false ->
- false
- end
- end;
- false ->
- false
- end
- end,
- mnesia_index:init_index(Tab, Storage),
- snmpify(Tab, Storage),
- set({Tab, load_node}, node()),
- set({Tab, load_reason}, Reason),
- {loaded, ok};
-
-do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies ->
- Args = [{file, mnesia_lib:tab2dat(Tab)},
- {type, mnesia_lib:disk_type(Tab, Type)},
- {keypos, 2},
- {repair, mnesia_monitor:get_env(auto_repair)}],
- case Reason of
- {dumper, _} ->
- mnesia_index:init_index(Tab, Storage),
- snmpify(Tab, Storage),
- set({Tab, load_node}, node()),
- set({Tab, load_reason}, Reason),
- {loaded, ok};
- _ ->
- case mnesia_monitor:open_dets(Tab, Args) of
- {ok, _} ->
- mnesia_index:init_index(Tab, Storage),
- snmpify(Tab, Storage),
- set({Tab, load_node}, node()),
- set({Tab, load_reason}, Reason),
- {loaded, ok};
- {error, Error} ->
- {not_loaded, {"Failed to create dets table", Error}}
- end
- end.
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% Load a table from a remote node
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%%
-%% Receiver Sender
-%% -------- ------
-%% Grab schema lock on table
-%% Determine table size
-%% Create empty pre-grown table
-%% Grab read lock on table
-%% Let receiver subscribe on updates done on sender node
-%% Disable rehashing of table
-%% Release read lock on table
-%% Send table to receiver in chunks
-%%
-%% Grab read lock on table
-%% Block dirty updates
-%% Update wherabouts
-%%
-%% Cancel the update subscription
-%% Process the subscription events
-%% Optionally dump to disc
-%% Unblock dirty updates
-%% Release read lock on table
-%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-
--define(MAX_TRANSFER_SIZE, 7500).
--define(MAX_RAM_FILE_SIZE, 1000000).
--define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1).
--define(MAX_NOPACKETS, 20).
-
-net_load_table(Tab, Reason, Ns, Cs)
- when Reason == {dumper,add_table_copy} ->
- try_net_load_table(Tab, Reason, Ns, Cs);
-net_load_table(Tab, Reason, Ns, _Cs) ->
- try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})).
-
-try_net_load_table(Tab, _Reason, [], _Cs) ->
- verbose("Copy failed. No active replicas of ~p are available.~n", [Tab]),
- {not_loaded, none_active};
-try_net_load_table(Tab, Reason, Ns, Cs) ->
- Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
- do_get_network_copy(Tab, Reason, Ns, Storage, Cs).
-
-do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->
- verbose("Local table copy of ~p has recently been deleted, ignored.~n", [Tab]),
- {not_loaded, storage_unknown};
-do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
- [Node | Tail] = Ns,
- dbg_out("Getting table ~p (~p) from node ~p: ~p~n",
- [Tab, Storage, Node, Reason]),
- ?eval_debug_fun({?MODULE, do_get_network_copy},
- [{tab, Tab}, {reason, Reason},
- {nodes, Ns}, {storage, Storage}]),
- mnesia_controller:start_remote_sender(Node, Tab, self(), Storage),
- put(mnesia_table_sender_node, {Tab, Node}),
- case init_receiver(Node, Tab, Storage, Cs, Reason) of
- ok ->
- set({Tab, load_node}, Node),
- set({Tab, load_reason}, Reason),
- mnesia_controller:i_have_tab(Tab),
- dbg_out("Table ~p copied from ~p to ~p~n", [Tab, Node, node()]),
- {loaded, ok};
- Err = {error, _} when element(1, Reason) == dumper ->
- {not_loaded,Err};
- restart ->
- try_net_load_table(Tab, Reason, Tail, Cs);
- down ->
- try_net_load_table(Tab, Reason, Tail, Cs)
- end.
-
-snmpify(Tab, Storage) ->
- do_snmpify(Tab, val({Tab, snmp}), Storage).
-
-do_snmpify(_Tab, [], _Storage) ->
- ignore;
-do_snmpify(Tab, Us, Storage) ->
- Snmp = mnesia_snmp_hook:create_table(Us, Tab, Storage),
- set({Tab, {index, snmp}}, Snmp).
-
-%% Start the recieiver
-%% Sender should be started first, so we don't have the schema-read
-%% lock to long (or get stuck in a deadlock)
-init_receiver(Node, Tab, Storage, Cs, Reason) ->
- receive
- {SenderPid, {first, TabSize}} ->
- spawn_receiver(Tab,Storage,Cs,SenderPid,
- TabSize,false,Reason);
- {SenderPid, {first, TabSize, DetsData}} ->
- spawn_receiver(Tab,Storage,Cs,SenderPid,
- TabSize,DetsData,Reason);
- %% Protocol conversion hack
- {copier_done, Node} ->
- dbg_out("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),
- down(Tab, Storage)
- end.
-
-
-table_init_fun(SenderPid) ->
- PConv = mnesia_monitor:needs_protocol_conversion(node(SenderPid)),
- MeMyselfAndI = self(),
- fun(read) ->
- Receiver =
- if
- PConv == true ->
- MeMyselfAndI ! {actual_tabrec, self()},
- MeMyselfAndI; %% Old mnesia
- PConv == false -> self()
- end,
- SenderPid ! {Receiver, more},
- get_data(SenderPid, Receiver)
- end.
-
-
-%% Add_table_copy get's it's own locks.
-spawn_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) ->
- Init = table_init_fun(SenderPid),
- case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of
- Err = {error, _} ->
- SenderPid ! {copier_done, node()},
- Err;
- Else ->
- Else
- end;
-
-spawn_receiver(Tab,Storage,Cs,SenderPid,
- TabSize,DetsData,Reason) ->
- %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping.
- %% Both may grab tables-locks in different order.
- Load = fun() ->
- {_,Tid,Ts} = get(mnesia_activity_state),
- mnesia_locker:rlock(Tid, Ts#tidstore.store,
- {schema, Tab}),
- Init = table_init_fun(SenderPid),
- Pid = spawn_link(?MODULE, spawned_receiver,
- [self(),Tab,Storage,Cs,
- SenderPid,TabSize,DetsData,
- Init]),
- put(mnesia_real_loader, Pid),
- wait_on_load_complete(Pid)
- end,
- Res = case mnesia:transaction(Load, 20) of
- {'atomic', {error,Result}} when element(1,Reason) == dumper ->
- SenderPid ! {copier_done, node()},
- {error,Result};
- {'atomic', {error,Result}} ->
- SenderPid ! {copier_done, node()},
- fatal("Cannot create table ~p: ~p~n",
- [[Tab, Storage], Result]);
- {'atomic', Result} -> Result;
- {aborted, nomore} ->
- SenderPid ! {copier_done, node()},
- restart;
- {aborted, _ } ->
- SenderPid ! {copier_done, node()},
- down %% either this node or sender is dying
- end,
- unlink(whereis(mnesia_tm)), %% Avoid late unlink from tm
- Res.
-
-spawned_receiver(ReplyTo,Tab,Storage,Cs,
- SenderPid,TabSize,DetsData, Init) ->
- process_flag(trap_exit, true),
- Done = do_init_table(Tab,Storage,Cs,
- SenderPid,TabSize,DetsData,
- ReplyTo, Init),
- ReplyTo ! {self(),Done},
- unlink(ReplyTo),
- unlink(whereis(mnesia_controller)),
- exit(normal).
-
-wait_on_load_complete(Pid) ->
- receive
- {Pid, Res} ->
- Res;
- {'EXIT', Pid, Reason} ->
- exit(Reason);
- Else ->
- Pid ! Else,
- wait_on_load_complete(Pid)
- end.
-
-tab_receiver(Node, Tab, Storage, Cs, PConv, OrigTabRec) ->
- receive
- {SenderPid, {no_more, DatBin}} when PConv == false ->
- finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec);
-
- %% Protocol conversion hack
- {SenderPid, {no_more, DatBin}} when pid(PConv) ->
- PConv ! {SenderPid, no_more},
- receive
- {old_init_table_complete, ok} ->
- finish_copy(Storage, Tab, Cs, SenderPid, DatBin,OrigTabRec);
- {old_init_table_complete, Reason} ->
- Msg = "OLD: [d]ets:init table failed",
- dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
- down(Tab, Storage)
- end;
-
- {actual_tabrec, Pid} ->
- tab_receiver(Node, Tab, Storage, Cs, Pid,OrigTabRec);
-
- {SenderPid, {more, [Recs]}} when pid(PConv) ->
- PConv ! {SenderPid, {more, Recs}}, %% Forward Msg to OldNodes
- tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec);
-
- {'EXIT', PConv, Reason} -> %% [d]ets:init process crashed
- Msg = "Receiver crashed",
- dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
- down(Tab, Storage);
-
- %% Protocol conversion hack
- {copier_done, Node} ->
- dbg_out("Sender of table ~p crashed on node ~p ~n", [Tab, Node]),
- down(Tab, Storage);
-
- {'EXIT', Pid, Reason} ->
- handle_exit(Pid, Reason),
- tab_receiver(Node, Tab, Storage, Cs, PConv,OrigTabRec)
- end.
-
-create_table(Tab, TabSize, Storage, Cs) ->
- if
- Storage == disc_only_copies ->
- mnesia_lib:lock_table(Tab),
- Tmp = mnesia_lib:tab2tmp(Tab),
- Size = lists:max([TabSize, 256]),
- Args = [{file, Tmp},
- {keypos, 2},
-%% {ram_file, true},
- {estimated_no_objects, Size},
- {repair, mnesia_monitor:get_env(auto_repair)},
- {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],
- file:delete(Tmp),
- case mnesia_lib:dets_sync_open(Tab, Args) of
- {ok, _} ->
- mnesia_lib:unlock_table(Tab),
- {Storage, Tab};
- Else ->
- mnesia_lib:unlock_table(Tab),
- Else
- end;
- (Storage == ram_copies) or (Storage == disc_copies) ->
- Args = [{keypos, 2}, public, named_table, Cs#cstruct.type],
- case mnesia_monitor:unsafe_mktab(Tab, Args) of
- Tab ->
- {Storage, Tab};
- Else ->
- Else
- end
- end.
-
-do_init_table(Tab,Storage,Cs,SenderPid,
- TabSize,DetsInfo,OrigTabRec,Init) ->
- case create_table(Tab, TabSize, Storage, Cs) of
- {Storage,Tab} ->
- %% Debug info
- Node = node(SenderPid),
- put(mnesia_table_receiver, {Tab, Node, SenderPid}),
- mnesia_tm:block_tab(Tab),
- PConv = mnesia_monitor:needs_protocol_conversion(Node),
-
- case init_table(Tab,Storage,Init,PConv,DetsInfo,SenderPid) of
- ok ->
- tab_receiver(Node,Tab,Storage,Cs,PConv,OrigTabRec);
- Reason ->
- Msg = "[d]ets:init table failed",
- dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
- down(Tab, Storage)
- end;
- Error ->
- Error
- end.
-
-make_table_fun(Pid, TabRec) ->
- fun(close) ->
- ok;
- (read) ->
- get_data(Pid, TabRec)
- end.
-
-get_data(Pid, TabRec) ->
- receive
- {Pid, {more, Recs}} ->
- Pid ! {TabRec, more},
- {Recs, make_table_fun(Pid,TabRec)};
- {Pid, no_more} ->
- end_of_input;
- {copier_done, Node} ->
- case node(Pid) of
- Node ->
- {copier_done, Node};
- _ ->
- get_data(Pid, TabRec)
- end;
- {'EXIT', Pid, Reason} ->
- handle_exit(Pid, Reason),
- get_data(Pid, TabRec)
- end.
-
-init_table(Tab, disc_only_copies, Fun, false, DetsInfo,Sender) ->
- ErtsVer = erlang:system_info(version),
- case DetsInfo of
- {ErtsVer, DetsData} ->
- Res = (catch dets:is_compatible_bchunk_format(Tab, DetsData)),
- case Res of
- {'EXIT',{undef,[{dets,_,_}|_]}} ->
- Sender ! {self(), {old_protocol, Tab}},
- dets:init_table(Tab, Fun); %% Old dets version
- {'EXIT', What} ->
- exit(What);
- false ->
- Sender ! {self(), {old_protocol, Tab}},
- dets:init_table(Tab, Fun); %% Old dets version
- true ->
- dets:init_table(Tab, Fun, [{format, bchunk}])
- end;
- Old when Old /= false ->
- Sender ! {self(), {old_protocol, Tab}},
- dets:init_table(Tab, Fun); %% Old dets version
- _ ->
- dets:init_table(Tab, Fun)
- end;
-init_table(Tab, _, Fun, false, _DetsInfo,_) ->
- case catch ets:init_table(Tab, Fun) of
- true ->
- ok;
- {'EXIT', Else} -> Else
- end;
-init_table(Tab, Storage, Fun, true, _DetsInfo, Sender) -> %% Old Nodes
- spawn_link(?MODULE, old_node_init_table,
- [Tab, Storage, Fun, self(), false, Sender]),
- ok.
-
-old_node_init_table(Tab, Storage, Fun, TabReceiver, DetsInfo,Sender) ->
- Res = init_table(Tab, Storage, Fun, false, DetsInfo,Sender),
- TabReceiver ! {old_init_table_complete, Res},
- unlink(TabReceiver),
- ok.
-
-finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
- TabRef = {Storage, Tab},
- subscr_receiver(TabRef, Cs#cstruct.record_name),
- case handle_last(TabRef, Cs#cstruct.type, DatBin) of
- ok ->
- mnesia_index:init_index(Tab, Storage),
- snmpify(Tab, Storage),
- %% OrigTabRec must not be the spawned tab-receiver
- %% due to old protocol.
- SenderPid ! {OrigTabRec, no_more},
- mnesia_tm:unblock_tab(Tab),
- ok;
- {error, Reason} ->
- Msg = "Failed to handle last",
- dbg_out("~s: ~p: ~p~n", [Msg, Tab, Reason]),
- down(Tab, Storage)
- end.
-
-subscr_receiver(TabRef = {_, Tab}, RecName) ->
- receive
- {mnesia_table_event, {Op, Val, _Tid}} ->
- if
- Tab == RecName ->
- handle_event(TabRef, Op, Val);
- true ->
- handle_event(TabRef, Op, setelement(1, Val, RecName))
- end,
- subscr_receiver(TabRef, RecName);
-
- {'EXIT', Pid, Reason} ->
- handle_exit(Pid, Reason),
- subscr_receiver(TabRef, RecName)
- after 0 ->
- ok
- end.
-
-handle_event(TabRef, write, Rec) ->
- db_put(TabRef, Rec);
-handle_event(TabRef, delete, {_Tab, Key}) ->
- db_erase(TabRef, Key);
-handle_event(TabRef, delete_object, OldRec) ->
- db_match_erase(TabRef, OldRec);
-handle_event(TabRef, clear_table, {_Tab, _Key}) ->
- db_match_erase(TabRef, '_').
-
-handle_last({disc_copies, Tab}, _Type, nobin) ->
- Ret = mnesia_log:ets2dcd(Tab),
- Fname = mnesia_lib:tab2dat(Tab),
- case mnesia_lib:exists(Fname) of
- true -> %% Remove old .DAT files.
- file:delete(Fname);
- false ->
- ok
- end,
- Ret;
-
-handle_last({disc_only_copies, Tab}, Type, nobin) ->
- case mnesia_lib:swap_tmp_files([Tab]) of
- [] ->
- Args = [{file, mnesia_lib:tab2dat(Tab)},
- {type, mnesia_lib:disk_type(Tab, Type)},
- {keypos, 2},
- {repair, mnesia_monitor:get_env(auto_repair)}],
- mnesia_monitor:open_dets(Tab, Args),
- ok;
- L when list(L) ->
- {error, {"Cannot swap tmp files", Tab, L}}
- end;
-
-handle_last({ram_copies, _Tab}, _Type, nobin) ->
- ok;
-handle_last({ram_copies, Tab}, _Type, DatBin) ->
- case mnesia_monitor:use_dir() of
- true ->
- mnesia_lib:lock_table(Tab),
- Tmp = mnesia_lib:tab2tmp(Tab),
- ok = file:write_file(Tmp, DatBin),
- ok = file:rename(Tmp, mnesia_lib:tab2dcd(Tab)),
- mnesia_lib:unlock_table(Tab),
- ok;
- false ->
- ok
- end.
-
-down(Tab, Storage) ->
- case Storage of
- ram_copies ->
- catch ?ets_delete_table(Tab);
- disc_copies ->
- catch ?ets_delete_table(Tab);
- disc_only_copies ->
- mnesia_lib:cleanup_tmp_files([Tab])
- end,
- mnesia_checkpoint:tm_del_copy(Tab, node()),
- mnesia_controller:sync_del_table_copy_whereabouts(Tab, node()),
- mnesia_tm:unblock_tab(Tab),
- flush_subcrs(),
- down.
-
-flush_subcrs() ->
- receive
- {mnesia_table_event, _} ->
- flush_subcrs();
-
- {'EXIT', Pid, Reason} ->
- handle_exit(Pid, Reason),
- flush_subcrs()
- after 0 ->
- done
- end.
-
-db_erase({ram_copies, Tab}, Key) ->
- true = ?ets_delete(Tab, Key);
-db_erase({disc_copies, Tab}, Key) ->
- true = ?ets_delete(Tab, Key);
-db_erase({disc_only_copies, Tab}, Key) ->
- ok = dets:delete(Tab, Key).
-
-db_match_erase({ram_copies, Tab} , Pat) ->
- true = ?ets_match_delete(Tab, Pat);
-db_match_erase({disc_copies, Tab} , Pat) ->
- true = ?ets_match_delete(Tab, Pat);
-db_match_erase({disc_only_copies, Tab}, Pat) ->
- ok = dets:match_delete(Tab, Pat).
-
-db_put({ram_copies, Tab}, Val) ->
- true = ?ets_insert(Tab, Val);
-db_put({disc_copies, Tab}, Val) ->
- true = ?ets_insert(Tab, Val);
-db_put({disc_only_copies, Tab}, Val) ->
- ok = dets:insert(Tab, Val).
-
-%% This code executes at the remote site where the data is
-%% executes in a special copier process.
-
-calc_nokeys(Storage, Tab) ->
- %% Calculate #keys per transfer
- Key = mnesia_lib:db_first(Storage, Tab),
- Recs = mnesia_lib:db_get(Storage, Tab, Key),
- BinSize = size(term_to_binary(Recs)),
- (?MAX_TRANSFER_SIZE div BinSize) + 1.
-
-send_table(Pid, Tab, RemoteS) ->
- case ?catch_val({Tab, storage_type}) of
- {'EXIT', _} ->
- {error, {no_exists, Tab}};
- unknown ->
- {error, {no_exists, Tab}};
- Storage ->
- %% Send first
- TabSize = mnesia:table_info(Tab, size),
- Pconvert = mnesia_monitor:needs_protocol_conversion(node(Pid)),
- KeysPerTransfer = calc_nokeys(Storage, Tab),
- ChunkData = dets:info(Tab, bchunk_format),
-
- UseDetsChunk =
- Storage == RemoteS andalso
- Storage == disc_only_copies andalso
- ChunkData /= undefined andalso
- Pconvert == false,
- if
- UseDetsChunk == true ->
- DetsInfo = erlang:system_info(version),
- Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
- true ->
- Pid ! {self(), {first, TabSize}}
- end,
-
- %% Debug info
- put(mnesia_table_sender, {Tab, node(Pid), Pid}),
- {Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer),
-
- SendIt = fun() ->
- prepare_copy(Pid, Tab, Storage),
- send_more(Pid, 1, Chunk, Init(), Tab, Pconvert),
- finish_copy(Pid, Tab, Storage, RemoteS)
- end,
-
- case catch SendIt() of
- receiver_died ->
- cleanup_tab_copier(Pid, Storage, Tab),
- unlink(whereis(mnesia_tm)),
- ok;
- {_, receiver_died} ->
- unlink(whereis(mnesia_tm)),
- ok;
- {'atomic', no_more} ->
- unlink(whereis(mnesia_tm)),
- ok;
- Reason ->
- cleanup_tab_copier(Pid, Storage, Tab),
- unlink(whereis(mnesia_tm)),
- {error, Reason}
- end
- end.
-
-prepare_copy(Pid, Tab, Storage) ->
- Trans =
- fun() ->
- mnesia:write_lock_table(Tab),
- mnesia_subscr:subscribe(Pid, {table, Tab}),
- update_where_to_write(Tab, node(Pid)),
- mnesia_lib:db_fixtable(Storage, Tab, true),
- ok
- end,
- case mnesia:transaction(Trans) of
- {'atomic', ok} ->
- ok;
- {aborted, Reason} ->
- exit({tab_copier_prepare, Tab, Reason})
- end.
-
-update_where_to_write(Tab, Node) ->
- case val({Tab, access_mode}) of
- read_only ->
- ignore;
- read_write ->
- Current = val({current, db_nodes}),
- Ns =
- case lists:member(Node, Current) of
- true -> Current;
- false -> [Node | Current]
- end,
- update_where_to_write(Ns, Tab, Node)
- end.
-
-update_where_to_write([], _, _) ->
- ok;
-update_where_to_write([H|T], Tab, AddNode) ->
- rpc:call(H, mnesia_controller, call,
- [{update_where_to_write, [add, Tab, AddNode], self()}]),
- update_where_to_write(T, Tab, AddNode).
-
-send_more(Pid, N, Chunk, DataState, Tab, OldNode) ->
- receive
- {NewPid, more} ->
- case send_packet(N - 1, NewPid, Chunk, DataState, OldNode) of
- New when integer(New) ->
- New - 1;
- NewData ->
- send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, Tab, OldNode)
- end;
- {_NewPid, {old_protocol, Tab}} ->
- Storage = val({Tab, storage_type}),
- {Init, NewChunk} =
- reader_funcs(false, Tab, Storage, calc_nokeys(Storage, Tab)),
- send_more(Pid, 1, NewChunk, Init(), Tab, OldNode);
-
- {copier_done, Node} when Node == node(Pid)->
- verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]),
- throw(receiver_died)
- end.
-
-reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) ->
- case UseDetsChunk of
- false ->
- {fun() -> mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) end,
- fun(Cont) -> mnesia_lib:db_chunk(Storage, Cont) end};
- true ->
- {fun() -> dets_bchunk(Tab, start) end,
- fun(Cont) -> dets_bchunk(Tab, Cont) end}
- end.
-
-dets_bchunk(Tab, Chunk) -> %% Arrg
- case dets:bchunk(Tab, Chunk) of
- {Cont, Data} -> {Data, Cont};
- Else -> Else
- end.
-
-send_packet(N, Pid, _Chunk, '$end_of_table', OldNode) ->
- case OldNode of
- true -> ignore; %% Old nodes can't handle the new no_more
- false -> Pid ! {self(), no_more}
- end,
- N;
-send_packet(N, Pid, Chunk, {[], Cont}, OldNode) ->
- send_packet(N, Pid, Chunk, Chunk(Cont), OldNode);
-send_packet(N, Pid, Chunk, {Recs, Cont}, OldNode) when N < ?MAX_NOPACKETS ->
- case OldNode of
- true -> Pid ! {self(), {more, [Recs]}}; %% Old need's wrapping list
- false -> Pid ! {self(), {more, Recs}}
- end,
- send_packet(N+1, Pid, Chunk, Chunk(Cont), OldNode);
-send_packet(_N, _Pid, _Chunk, DataState, _OldNode) ->
- DataState.
-
-finish_copy(Pid, Tab, Storage, RemoteS) ->
- RecNode = node(Pid),
- DatBin = dat2bin(Tab, Storage, RemoteS),
- Trans =
- fun() ->
- mnesia:read_lock_table(Tab),
- A = val({Tab, access_mode}),
- mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A),
- cleanup_tab_copier(Pid, Storage, Tab),
- mnesia_checkpoint:tm_add_copy(Tab, RecNode),
- Pid ! {self(), {no_more, DatBin}},
- receive
- {Pid, no_more} -> % Dont bother about the spurious 'more' message
- no_more;
- {copier_done, Node} when Node == node(Pid)->
- verbose("Tab receiver ~p crashed (more): ~p~n", [Tab, Node]),
- receiver_died
- end
- end,
- mnesia:transaction(Trans).
-
-cleanup_tab_copier(Pid, Storage, Tab) ->
- mnesia_lib:db_fixtable(Storage, Tab, false),
- mnesia_subscr:unsubscribe(Pid, {table, Tab}).
-
-dat2bin(Tab, ram_copies, ram_copies) ->
- mnesia_lib:lock_table(Tab),
- Res = file:read_file(mnesia_lib:tab2dcd(Tab)),
- mnesia_lib:unlock_table(Tab),
- case Res of
- {ok, DatBin} -> DatBin;
- _ -> nobin
- end;
-dat2bin(_Tab, _LocalS, _RemoteS) ->
- nobin.
-
-handle_exit(Pid, Reason) when node(Pid) == node() ->
- exit(Reason);
-handle_exit(_Pid, _Reason) -> %% Not from our node, this will be handled by
- ignore. %% mnesia_down soon.