diff options
-rw-r--r-- | lib/mnesia/src/mnesia_loader.erl | 236 |
1 files changed, 172 insertions, 64 deletions
diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl index 41fcd76fcb..93660007f1 100644 --- a/lib/mnesia/src/mnesia_loader.erl +++ b/lib/mnesia/src/mnesia_loader.erl @@ -147,7 +147,14 @@ do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies - {error, Error} -> {not_loaded, {"Failed to create dets table", Error}} end - end. + end; + +do_get_disc_copy2(Tab, Reason, Storage = {ext, Alias, Mod}, _Type) -> + ok = ext_load_table(Mod, Alias, Tab, Reason), + mnesia_index:init_index(Tab, Storage), + set({Tab, load_node}, node()), + set({Tab, load_reason}, Reason), + {loaded, ok}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Load a table from a remote node @@ -259,7 +266,7 @@ init_receiver(Node, Tab,Storage,Cs,Reason) -> true = lists:member(Node, Active), {SenderPid, TabSize, DetsData} = start_remote_sender(Node,Tab,Storage), - Init = table_init_fun(SenderPid), + Init = table_init_fun(SenderPid, Storage), Args = [self(),Tab,Storage,Cs,SenderPid, TabSize,DetsData,Init], Pid = spawn_link(?MODULE, spawned_receiver, Args), @@ -289,7 +296,12 @@ start_remote_sender(Node,Tab,Storage) -> mnesia_controller:start_remote_sender(Node, Tab, self(), Storage), put(mnesia_table_sender_node, {Tab, Node}), receive - {SenderPid, {first, TabSize}} -> + {SenderPid, {first, _} = Msg} + when is_pid(SenderPid), element(1, Storage) == ext -> + {ext, Alias, Mod} = Storage, + {Sz, Data} = Mod:receiver_first_message(SenderPid, Msg, Alias, Tab), + {SenderPid, Sz, Data}; + {SenderPid, {first, TabSize}} =_M1 -> {SenderPid, TabSize, false}; {SenderPid, {first, TabSize, DetsData}} -> {SenderPid, TabSize, DetsData}; @@ -299,16 +311,18 @@ start_remote_sender(Node,Tab,Storage) -> down(Tab, Storage) end. -table_init_fun(SenderPid) -> +table_init_fun(SenderPid, Storage) -> fun(read) -> Receiver = self(), SenderPid ! {Receiver, more}, - get_data(SenderPid, Receiver) + get_data(SenderPid, Receiver, Storage); + (close) -> + ok end. %% Add_table_copy get's it's own locks. start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,{add_table_copy,_}}) -> - Init = table_init_fun(SenderPid), + Init = table_init_fun(SenderPid, Storage), case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of Err = {error, _} -> SenderPid ! {copier_done, node()}, @@ -391,7 +405,15 @@ create_table(Tab, TabSize, Storage, Cs) -> {Storage, Tab}; Else -> Else - end + end; + element(1, Storage) == ext -> + {_, Alias, Mod} = Storage, + case mnesia_monitor:unsafe_create_external(Tab, Alias, Mod, Cs) of + ok -> + {Storage, Tab}; + Else -> + Else + end end. tab_receiver(Node, Tab, Storage, Cs, OrigTabRec) -> @@ -409,21 +431,25 @@ tab_receiver(Node, Tab, Storage, Cs, OrigTabRec) -> tab_receiver(Node, Tab, Storage, Cs, OrigTabRec) end. -make_table_fun(Pid, TabRec) -> +make_table_fun(Pid, TabRec, Storage) -> fun(close) -> ok; + ({read, Msg}) -> + Pid ! {TabRec, Msg}, + get_data(Pid, TabRec, Storage); (read) -> - get_data(Pid, TabRec) + get_data(Pid, TabRec, Storage) end. -get_data(Pid, TabRec) -> +get_data(Pid, TabRec, Storage) -> receive {Pid, {more_z, CompressedRecs}} when is_binary(CompressedRecs) -> - Pid ! {TabRec, more}, - {zlib_uncompress(CompressedRecs), make_table_fun(Pid,TabRec)}; + maybe_reply(Pid, {TabRec, more}, Storage), + {zlib_uncompress(CompressedRecs), + make_table_fun(Pid, TabRec, Storage)}; {Pid, {more, Recs}} -> - Pid ! {TabRec, more}, - {Recs, make_table_fun(Pid,TabRec)}; + maybe_reply(Pid, {TabRec, more}, Storage), + {Recs, make_table_fun(Pid, TabRec, Storage)}; {Pid, no_more} -> end_of_input; {copier_done, Node} -> @@ -431,13 +457,48 @@ get_data(Pid, TabRec) -> Node -> {copier_done, Node}; _ -> - get_data(Pid, TabRec) + get_data(Pid, TabRec, Storage) end; {'EXIT', Pid, Reason} -> handle_exit(Pid, Reason), - get_data(Pid, TabRec) + get_data(Pid, TabRec, Storage) + end. + +maybe_reply(_, _, {ext, _, _}) -> + ignore; +maybe_reply(Pid, Msg, _) -> + Pid ! Msg. + +ext_init_table(Alias, Mod, Tab, Fun, State, Sender) -> + ok = ext_load_table(Mod, Alias, Tab, {net_load, node(Sender)}), + ext_init_table(read, Alias, Mod, Tab, Fun, State, Sender). + +ext_load_table(Mod, Alias, Tab, Reason) -> + CS = val({Tab, cstruct}), + Mod:load_table(Alias, Tab, Reason, mnesia_schema:cs2list(CS)). + + +ext_init_table(Action, Alias, Mod, Tab, Fun, State, Sender) -> + case Fun(Action) of + {copier_done, Node} -> + verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]), + down(Tab, {ext,Alias,Mod}); + {Data, NewFun} -> + case Mod:receive_data(Data, Alias, Tab, Sender, State) of + {more, NewState} -> + ext_init_table({read, more}, Alias, Mod, + Tab, NewFun, NewState, Sender); + {{more,Msg}, NewState} -> + ext_init_table({read, Msg}, Alias, Mod, + Tab, NewFun, NewState, Sender) + end; + end_of_input -> + Mod:receive_done(Alias, Tab, Sender, State), + ok = Fun(close) end. +init_table(Tab, {ext,Alias,Mod}, Fun, State, Sender) -> + ext_init_table(Alias, Mod, Tab, Fun, State, Sender); init_table(Tab, disc_only_copies, Fun, DetsInfo,Sender) -> ErtsVer = erlang:system_info(version), case DetsInfo of @@ -567,7 +628,10 @@ handle_last({ram_copies, Tab}, _Type, DatBin) -> ok; false -> ok - end. + end; + +handle_last(_Storage, _Type, nobin) -> + ok. down(Tab, Storage) -> case Storage of @@ -578,7 +642,10 @@ down(Tab, Storage) -> disc_only_copies -> TmpFile = mnesia_lib:tab2tmp(Tab), mnesia_lib:dets_sync_close(Tab), - file:delete(TmpFile) + file:delete(TmpFile); + {ext, Alias, Mod} -> + catch Mod:close_table(Alias, Tab), + catch Mod:delete_table(Alias, Tab) end, mnesia_checkpoint:tm_del_copy(Tab, node()), mnesia_controller:sync_del_table_copy_whereabouts(Tab, node()), @@ -603,21 +670,35 @@ db_erase({ram_copies, Tab}, Key) -> db_erase({disc_copies, Tab}, Key) -> true = ?ets_delete(Tab, Key); db_erase({disc_only_copies, Tab}, Key) -> - ok = dets:delete(Tab, Key). + ok = dets:delete(Tab, Key); +db_erase({{ext, Alias, Mod}, Tab}, Key) -> + ok = Mod:delete(Alias, Tab, Key). db_match_erase({ram_copies, Tab} , Pat) -> true = ?ets_match_delete(Tab, Pat); db_match_erase({disc_copies, Tab} , Pat) -> true = ?ets_match_delete(Tab, Pat); db_match_erase({disc_only_copies, Tab}, Pat) -> - ok = dets:match_delete(Tab, Pat). + ok = dets:match_delete(Tab, Pat); +db_match_erase({{ext, Alias, Mod}, Tab}, Pat) -> + % "ets style" is to return true + % "dets style" is to return N | { error, Reason } + % or sometimes ok (?) + % be nice and accept both + case Mod:match_delete(Alias, Tab, Pat) of + N when is_integer (N) -> ok; + true -> ok; + ok -> ok + end. db_put({ram_copies, Tab}, Val) -> true = ?ets_insert(Tab, Val); db_put({disc_copies, Tab}, Val) -> true = ?ets_insert(Tab, Val); db_put({disc_only_copies, Tab}, Val) -> - ok = dets:insert(Tab, Val). + ok = dets:insert(Tab, Val); +db_put({{ext, Alias, Mod}, Tab}, Val) -> + ok = Mod:insert(Alias, Tab, Val). %% This code executes at the remote site where the data is %% executes in a special copier process. @@ -636,47 +717,62 @@ send_table(Pid, Tab, RemoteS) -> unknown -> {error, {no_exists, Tab}}; Storage -> - %% Send first - TabSize = mnesia:table_info(Tab, size), - KeysPerTransfer = calc_nokeys(Storage, Tab), - ChunkData = dets:info(Tab, bchunk_format), - - UseDetsChunk = - Storage == RemoteS andalso - Storage == disc_only_copies andalso - ChunkData /= undefined, - if - UseDetsChunk == true -> - DetsInfo = erlang:system_info(version), - Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}}; - true -> - Pid ! {self(), {first, TabSize}} - end, + do_send_table(Pid, Tab, Storage, RemoteS) + end. - %% Debug info - put(mnesia_table_sender, {Tab, node(Pid), Pid}), - {Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer), - - SendIt = fun() -> - NeedLock = need_lock(Tab), - {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock), - send_more(Pid, 1, Chunk, Init(), Tab), - finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) - end, - - try SendIt() of - {_, receiver_died} -> ok; - {atomic, no_more} -> ok - catch - throw:receiver_died -> - cleanup_tab_copier(Pid, Storage, Tab), - ok; - error:Reason -> %% Prepare failed - cleanup_tab_copier(Pid, Storage, Tab), - {error, {tab_copier, Tab, {Reason, erlang:get_stacktrace()}}} - after - unlink(whereis(mnesia_tm)) - end +do_send_table(Pid, Tab, Storage, RemoteS) -> + {Init, Chunk} = + case Storage of + {ext, Alias, Mod} -> + case Mod:sender_init(Alias, Tab, RemoteS, Pid) of + {standard, I, C} -> + Pid ! {self(), {first, Mod:info(Alias, Tab, size)}}, + {I, C}; + {_, _} = Res -> + Res + end; + Storage -> + %% Send first + TabSize = mnesia:table_info(Tab, size), + KeysPerTransfer = calc_nokeys(Storage, Tab), + ChunkData = dets:info(Tab, bchunk_format), + + UseDetsChunk = + Storage == RemoteS andalso + Storage == disc_only_copies andalso + ChunkData /= undefined, + if + UseDetsChunk == true -> + DetsInfo = erlang:system_info(version), + Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}}; + true -> + Pid ! {self(), {first, TabSize}} + end, + {_I, _C} = + reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) + end, + %% Debug info + put(mnesia_table_sender, {Tab, node(Pid), Pid}), + + SendIt = fun() -> + NeedLock = need_lock(Tab), + {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock), + send_more(Pid, 1, Chunk, Init(), Tab, Storage), + finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) + end, + + try SendIt() of + {_, receiver_died} -> ok; + {atomic, no_more} -> ok + catch + throw:receiver_died -> + cleanup_tab_copier(Pid, Storage, Tab), + ok; + error:Reason -> %% Prepare failed + cleanup_tab_copier(Pid, Storage, Tab), + {error, {tab_copier, Tab, {Reason, erlang:get_stacktrace()}}} + after + unlink(whereis(mnesia_tm)) end. prepare_copy(Pid, Tab, Storage, NeedLock) -> @@ -726,20 +822,32 @@ update_where_to_write([H|T], Tab, AddNode) -> [{update_where_to_write, [add, Tab, AddNode], self()}]), update_where_to_write(T, Tab, AddNode). -send_more(Pid, N, Chunk, DataState, Tab) -> +send_more(Pid, N, Chunk, DataState, Tab, Storage) -> receive {NewPid, more} -> case send_packet(N - 1, NewPid, Chunk, DataState) of New when is_integer(New) -> New - 1; NewData -> - send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, Tab) + send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, + Tab, Storage) + end; + {NewPid, {more, Msg}} when element(1, Storage) == ext -> + {ext, Alias, Mod} = Storage, + {NewChunk, NewState} = + Mod:sender_handle_info(Msg, Alias, Tab, NewPid, DataState), + case send_packet(N - 1, NewPid, NewChunk, NewState) of + New when is_integer(New) -> + New -1; + NewData -> + send_more(NewPid, N, NewChunk, NewData, Tab, + Storage) end; {_NewPid, {old_protocol, Tab}} -> Storage = val({Tab, storage_type}), {Init, NewChunk} = reader_funcs(false, Tab, Storage, calc_nokeys(Storage, Tab)), - send_more(Pid, 1, NewChunk, Init(), Tab); + send_more(Pid, 1, NewChunk, Init(), Tab, Storage); {copier_done, Node} when Node == node(Pid)-> verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]), |