aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mnesia/src')
-rw-r--r--lib/mnesia/src/Makefile2
-rw-r--r--lib/mnesia/src/mnesia.app.src2
-rw-r--r--lib/mnesia/src/mnesia.erl144
-rw-r--r--lib/mnesia/src/mnesia.hrl3
-rw-r--r--lib/mnesia/src/mnesia_backend_type.erl115
-rw-r--r--lib/mnesia/src/mnesia_bup.erl270
-rw-r--r--lib/mnesia/src/mnesia_checkpoint.erl23
-rw-r--r--lib/mnesia/src/mnesia_controller.erl14
-rw-r--r--lib/mnesia/src/mnesia_dumper.erl427
-rw-r--r--lib/mnesia/src/mnesia_ext_sup.erl59
-rw-r--r--lib/mnesia/src/mnesia_frag.erl67
-rw-r--r--lib/mnesia/src/mnesia_index.erl435
-rw-r--r--lib/mnesia/src/mnesia_lib.erl168
-rw-r--r--lib/mnesia/src/mnesia_loader.erl240
-rw-r--r--lib/mnesia/src/mnesia_log.erl25
-rw-r--r--lib/mnesia/src/mnesia_monitor.erl36
-rw-r--r--lib/mnesia/src/mnesia_schema.erl994
-rw-r--r--lib/mnesia/src/mnesia_sup.erl8
-rw-r--r--lib/mnesia/src/mnesia_tm.erl226
19 files changed, 2508 insertions, 750 deletions
diff --git a/lib/mnesia/src/Makefile b/lib/mnesia/src/Makefile
index ae24bc72de..08a00e6aba 100644
--- a/lib/mnesia/src/Makefile
+++ b/lib/mnesia/src/Makefile
@@ -43,6 +43,7 @@ RELSYSDIR = $(RELEASE_PATH)/lib/mnesia-$(VSN)
# ----------------------------------------------------
MODULES= \
mnesia \
+ mnesia_backend_type \
mnesia_backup \
mnesia_bup \
mnesia_checkpoint \
@@ -50,6 +51,7 @@ MODULES= \
mnesia_controller \
mnesia_dumper\
mnesia_event \
+ mnesia_ext_sup \
mnesia_frag \
mnesia_frag_hash \
mnesia_frag_old_hash \
diff --git a/lib/mnesia/src/mnesia.app.src b/lib/mnesia/src/mnesia.app.src
index c78a7cba1e..006ad4bac1 100644
--- a/lib/mnesia/src/mnesia.app.src
+++ b/lib/mnesia/src/mnesia.app.src
@@ -3,6 +3,7 @@
{vsn, "%VSN%"},
{modules, [
mnesia,
+ mnesia_backend_type,
mnesia_backup,
mnesia_bup,
mnesia_checkpoint,
@@ -10,6 +11,7 @@
mnesia_controller,
mnesia_dumper,
mnesia_event,
+ mnesia_ext_sup,
mnesia_frag,
mnesia_frag_hash,
mnesia_frag_old_hash,
diff --git a/lib/mnesia/src/mnesia.erl b/lib/mnesia/src/mnesia.erl
index e3d9bb1484..9586adbf93 100644
--- a/lib/mnesia/src/mnesia.erl
+++ b/lib/mnesia/src/mnesia.erl
@@ -82,7 +82,8 @@
system_info/0, % Not for public use
%% Database mgt
- create_schema/1, delete_schema/1,
+ create_schema/1, create_schema/2, delete_schema/1,
+ add_backend_type/2,
backup/1, backup/2, traverse_backup/4, traverse_backup/6,
install_fallback/1, install_fallback/2,
uninstall_fallback/0, uninstall_fallback/1,
@@ -197,6 +198,9 @@ e_has_var(X, Pos) ->
%% Start and stop
start() ->
+ start([]).
+
+start_() ->
{Time , Res} = timer:tc(application, start, [?APPLICATION, temporary]),
Secs = Time div 1000000,
@@ -232,7 +236,7 @@ patched_start([{Env, Val} | Tail]) when is_atom(Env) ->
patched_start([Head | _]) ->
{error, {bad_type, Head}};
patched_start([]) ->
- start().
+ start_().
stop() ->
case application:stop(?APPLICATION) of
@@ -297,6 +301,7 @@ ms() ->
%% Keep these last in the list, so
%% mnesia_sup kills these last
+ mnesia_ext_sup,
mnesia_monitor,
mnesia_event
].
@@ -556,22 +561,16 @@ write(_Tid, _Ts, Tab, Val, LockKind) ->
abort({bad_type, Tab, Val, LockKind}).
write_to_store(Tab, Store, Oid, Val) ->
- case ?catch_val({Tab, record_validation}) of
- {RecName, Arity, Type}
- when tuple_size(Val) == Arity, RecName == element(1, Val) ->
- case Type of
- bag ->
- ?ets_insert(Store, {Oid, Val, write});
- _ ->
- ?ets_delete(Store, Oid),
- ?ets_insert(Store, {Oid, Val, write})
- end,
- ok;
- {'EXIT', _} ->
- abort({no_exists, Tab});
- _ ->
- abort({bad_type, Val})
- end.
+ {_, _, Type} = mnesia_lib:validate_record(Tab, Val),
+ Oid = {Tab, element(2, Val)},
+ case Type of
+ bag ->
+ ?ets_insert(Store, {Oid, Val, write});
+ _ ->
+ ?ets_delete(Store, Oid),
+ ?ets_insert(Store, {Oid, Val, write})
+ end,
+ ok.
delete({Tab, Key}) ->
delete(Tab, Key, write);
@@ -1548,16 +1547,9 @@ dirty_write(Tab, Val) ->
do_dirty_write(SyncMode, Tab, Val)
when is_atom(Tab), Tab /= schema, is_tuple(Val), tuple_size(Val) > 2 ->
- case ?catch_val({Tab, record_validation}) of
- {RecName, Arity, _Type}
- when tuple_size(Val) == Arity, RecName == element(1, Val) ->
- Oid = {Tab, element(2, Val)},
- mnesia_tm:dirty(SyncMode, {Oid, Val, write});
- {'EXIT', _} ->
- abort({no_exists, Tab});
- _ ->
- abort({bad_type, Val})
- end;
+ {_, _, _} = mnesia_lib:validate_record(Tab, Val),
+ Oid = {Tab, element(2, Val)},
+ mnesia_tm:dirty(SyncMode, {Oid, Val, write});
do_dirty_write(_SyncMode, Tab, Val) ->
abort({bad_type, Tab, Val}).
@@ -1609,8 +1601,8 @@ dirty_update_counter(Tab, Key, Incr) ->
do_dirty_update_counter(SyncMode, Tab, Key, Incr)
when is_atom(Tab), Tab /= schema, is_integer(Incr) ->
- case ?catch_val({Tab, record_validation}) of
- {RecName, 3, set} ->
+ case mnesia_lib:validate_key(Tab, Key) of
+ {RecName, 3, Type} when Type == set; Type == ordered_set ->
Oid = {Tab, Key},
mnesia_tm:dirty(SyncMode, {Oid, {RecName, Incr}, update_counter});
_ ->
@@ -1910,6 +1902,8 @@ raw_table_info(Tab, Item) ->
info_reply(?ets_info(Tab, Item), Tab, Item);
disc_only_copies ->
info_reply(dets:info(Tab, Item), Tab, Item);
+ {ext, Alias, Mod} ->
+ info_reply(catch Mod:info(Alias, Tab, Item), Tab, Item);
unknown ->
bad_info_reply(Tab, Item)
end
@@ -2022,15 +2016,26 @@ display_tab_info() ->
MasterTabs = mnesia_recover:get_master_node_tables(),
io:format("master node tables = ~p~n", [lists:sort(MasterTabs)]),
+ case get_backend_types() of
+ [] -> ok;
+ Ts -> list_backend_types(Ts, "backend types = ")
+ end,
+
+ case get_index_plugins() of
+ [] -> ok;
+ Ps -> list_index_plugins(Ps, "index plugins = ")
+ end,
+
Tabs = system_info(tables),
- {Unknown, Ram, Disc, DiscOnly} =
- lists:foldl(fun storage_count/2, {[], [], [], []}, Tabs),
+ {Unknown, Ram, Disc, DiscOnly, Ext} =
+ lists:foldl(fun storage_count/2, {[], [], [], [], []}, Tabs),
io:format("remote = ~p~n", [lists:sort(Unknown)]),
io:format("ram_copies = ~p~n", [lists:sort(Ram)]),
io:format("disc_copies = ~p~n", [lists:sort(Disc)]),
io:format("disc_only_copies = ~p~n", [lists:sort(DiscOnly)]),
+ [io:format("~-19s= ~p~n", [atom_to_list(A), Ts]) || {A,Ts} <- Ext],
Rfoldl = fun(T, Acc) ->
Rpat =
@@ -2038,7 +2043,7 @@ display_tab_info() ->
read_only ->
lists:sort([{A, read_only} || A <- val({T, active_replicas})]);
read_write ->
- table_info(T, where_to_commit)
+ [fix_wtc(W) || W <- table_info(T, where_to_commit)]
end,
case lists:keysearch(Rpat, 1, Acc) of
{value, {_Rpat, Rtabs}} ->
@@ -2051,12 +2056,60 @@ display_tab_info() ->
Rdisp = fun({Rpat, Rtabs}) -> io:format("~p = ~p~n", [Rpat, Rtabs]) end,
lists:foreach(Rdisp, lists:sort(Repl)).
-storage_count(T, {U, R, D, DO}) ->
+get_backend_types() ->
+ case ?catch_val({schema, user_property, mnesia_backend_types}) of
+ {'EXIT', _} ->
+ [];
+ {mnesia_backend_types, Ts} ->
+ lists:sort(Ts)
+ end.
+
+get_index_plugins() ->
+ case ?catch_val({schema, user_property, mnesia_index_plugins}) of
+ {'EXIT', _} ->
+ [];
+ {mnesia_index_plugins, Ps} ->
+ lists:sort(Ps)
+ end.
+
+
+list_backend_types([{A,M} | T] = Ts, Legend) ->
+ Indent = [$\s || _ <- Legend],
+ W = integer_to_list(
+ lists:foldl(fun({Alias,_}, Wa) ->
+ erlang:max(Wa, length(atom_to_list(Alias)))
+ end, 0, Ts)),
+ io:fwrite(Legend ++ "~-" ++ W ++ "s - ~s~n",
+ [atom_to_list(A), atom_to_list(M)]),
+ [io:fwrite(Indent ++ "~-" ++ W ++ "s - ~s~n",
+ [atom_to_list(A1), atom_to_list(M1)])
+ || {A1,M1} <- T].
+
+list_index_plugins([{N,M,F} | T] = Ps, Legend) ->
+ Indent = [$\s || _ <- Legend],
+ W = integer_to_list(
+ lists:foldl(fun({N1,_,_}, Wa) ->
+ erlang:max(Wa, length(pp_ix_name(N1)))
+ end, 0, Ps)),
+ io:fwrite(Legend ++ "~-" ++ W ++ "s - ~s:~s~n",
+ [pp_ix_name(N), atom_to_list(M), atom_to_list(F)]),
+ [io:fwrite(Indent ++ "~-" ++ W ++ "s - ~s:~s~n",
+ [pp_ix_name(N1), atom_to_list(M1), atom_to_list(F1)])
+ || {N1,M1,F1} <- T].
+
+pp_ix_name(N) ->
+ lists:flatten(io_lib:fwrite("~w", [N])).
+
+fix_wtc({N, {ext,A,_}}) -> {N, A};
+fix_wtc({N,A}) when is_atom(A) -> {N, A}.
+
+storage_count(T, {U, R, D, DO, Ext}) ->
case table_info(T, storage_type) of
- unknown -> {[T | U], R, D, DO};
- ram_copies -> {U, [T | R], D, DO};
- disc_copies -> {U, R, [T | D], DO};
- disc_only_copies -> {U, R, D, [T | DO]}
+ unknown -> {[T | U], R, D, DO, Ext};
+ ram_copies -> {U, [T | R], D, DO, Ext};
+ disc_copies -> {U, R, [T | D], DO, Ext};
+ disc_only_copies -> {U, R, D, [T | DO], Ext};
+ {ext, A, _} -> {U, R, D, DO, orddict:append(A, T, Ext)}
end.
system_info(Item) ->
@@ -2071,9 +2124,10 @@ system_info2(all) ->
system_info2(db_nodes) ->
DiscNs = ?catch_val({schema, disc_copies}),
RamNs = ?catch_val({schema, ram_copies}),
+ ExtNs = ?catch_val({schema, external_copies}),
if
- is_list(DiscNs), is_list(RamNs) ->
- DiscNs ++ RamNs;
+ is_list(DiscNs), is_list(RamNs), is_list(ExtNs) ->
+ DiscNs ++ RamNs ++ ExtNs;
true ->
case mnesia_schema:read_nodes() of
{ok, Nodes} -> Nodes;
@@ -2177,6 +2231,7 @@ system_info2(access_module) -> mnesia_monitor:get_env(access_module);
system_info2(auto_repair) -> mnesia_monitor:get_env(auto_repair);
system_info2(is_running) -> mnesia_lib:is_running();
system_info2(backup_module) -> mnesia_monitor:get_env(backup_module);
+system_info2(backend_types) -> mnesia_schema:backend_types();
system_info2(event_module) -> mnesia_monitor:get_env(event_module);
system_info2(debug) -> mnesia_monitor:get_env(debug);
system_info2(dump_log_load_regulation) -> mnesia_monitor:get_env(dump_log_load_regulation);
@@ -2213,6 +2268,7 @@ system_info_items(yes) ->
[
access_module,
auto_repair,
+ backend_types,
backup_module,
checkpoints,
db_nodes,
@@ -2306,11 +2362,17 @@ load_mnesia_or_abort() ->
%% Database mgt
create_schema(Ns) ->
- mnesia_bup:create_schema(Ns).
+ create_schema(Ns, []).
+
+create_schema(Ns, Properties) ->
+ mnesia_bup:create_schema(Ns, Properties).
delete_schema(Ns) ->
mnesia_schema:delete_schema(Ns).
+add_backend_type(Alias, Module) ->
+ mnesia_schema:add_backend_type(Alias, Module).
+
backup(Opaque) ->
mnesia_log:backup(Opaque).
diff --git a/lib/mnesia/src/mnesia.hrl b/lib/mnesia/src/mnesia.hrl
index c58b4cfccd..0716dd87c8 100644
--- a/lib/mnesia/src/mnesia.hrl
+++ b/lib/mnesia/src/mnesia.hrl
@@ -68,6 +68,7 @@
ram_copies = [], % [Node]
disc_copies = [], % [Node]
disc_only_copies = [], % [Node]
+ external_copies = [], % [{{Alias,Mod},[Node]}]
load_order = 0, % Integer
access_mode = read_write, % read_write | read_only
majority = false, % true | false
@@ -103,7 +104,7 @@
ram_copies = [],
disc_copies = [],
disc_only_copies = [],
- snmp = [],
+ ext = [],
schema_ops = []
}).
diff --git a/lib/mnesia/src/mnesia_backend_type.erl b/lib/mnesia/src/mnesia_backend_type.erl
new file mode 100644
index 0000000000..4791b82a01
--- /dev/null
+++ b/lib/mnesia/src/mnesia_backend_type.erl
@@ -0,0 +1,115 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2015. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% Behaviour definition for mnesia backend types.
+%%
+
+%%%header_doc_include
+
+-module(mnesia_backend_type).
+
+-export([behaviour_info/1]).
+
+%%%header_doc_include
+
+%%%impl_doc_include
+
+%% Note that mnesia considers all callbacks mandatory!!
+%%
+behaviour_info(callbacks) ->
+ [
+ {add_aliases, 1}, % (Aliases) -> ok
+ {check_definition, 4}, % (TypeAlias, Tab, Nodes, Properties) -> ok
+ {close_table, 2}, % (TypeAlias, Tab) -> ok
+ {create_table, 3}, % (TypeAlias, Tab, Properties) -> ok
+ {delete, 3}, % (TypeAlias, Tab, Key) -> true | ok
+ {delete_table, 2}, % (TypeAlias, Tab) -> ok
+ {first, 2}, % (TypeAlias, Tab) -> Key::Term | '$end_of_table'
+ {fixtable, 3}, % (TypeAlias, Tab, Bool) -> ok | true
+ {last, 2}, % (TypeAlias, Tab) -> Key::Term | '$end_of_table'
+ {index_is_consistent,3}, % (TypeAlias, IxTag, Bool) -> ok
+ {init_backend, 0}, % () -> ok
+ {info, 3}, % (TypeAlias, Tab, Item) -> Term
+ {insert, 3}, % (TypeAlias, Tab, Object) -> ok
+ {lookup, 3}, % (TypeAlias, Tab, Key) -> [Objects]
+ {is_index_consistent,2}, % (TypeAlias, IxTag) -> Bool
+ {load_table, 4}, % (TypeAlias, Tab, Reason, CsList) -> ok
+ {match_delete, 3}, % (TypeAlias, Tab, Pattern) -> ok
+ {next, 3}, % (TypeAlias, Tab, Key) -> Key::Term | '$end_of_table'
+ {prev, 3}, % (TypeAlias, Tab, Key) -> Key::Term | '$end_of_table'
+ {receiver_first_message, 4}, % (Sender, FirstMsg, Alias, Tab) -> {Size, State}
+ {receive_data, 5}, % (Data, Alias, Name, Sender, State) -> {more, State} | {{more, Msg}, State}
+ {receive_done, 4}, % (Alias, Tab, Sender, State) -> ok
+ {real_suffixes, 0}, % () -> [FileSuffix]
+ {remove_aliases, 1}, % (Aliases) -> ok
+ {repair_continuation, 2}, % (Continuation, MatchSpec) -> Continuation
+ {select, 1}, % (Continuation) -> {[Match], Continuation'} | '$end_of_table'
+ {select, 3}, % (TypeAlias, Tab, Pattern) -> {[Match], Continuation'} | '$end_of_table'
+ {select, 4}, % (TypeAlias, Tab, MatchSpec, Limit) {[Match], Continuation'} | '$end_of_table'
+ {sender_init, 4}, % (TypeAlias, Tab, LoadReason, Pid) ->
+ % {standard, Init(), Chunk()} | {Init(), Chunk()}
+ {semantics, 2}, % (TypeAlias, storage | types | index_fun | index_types) ->
+ % ram_copies | disc_copies, set | ordered_set | bag, fun(), ordered | bag
+ {slot, 3}, % (TypeAlias, Tab, Pos) -> '$end_of_table' | Objects | {error, Reason}
+ {sync_close_table, 2}, % (TypeAlias, Tab) -> ok
+ {tmp_suffixes, 0}, % () -> [FileSuffix]
+ {update_counter, 4}, % (TypeAlias, Tab, Counter, Val) -> NewVal
+ {validate_key, 6}, % (TypeAlias, Tab, RecName, Arity, Type, Key) -> {RecName, Arity, Type}
+ {validate_record, 6} % (TypeAlias, Tab, RecName, Arity, Type, Obj) -> {RecName, Arity, Type}
+ ].
+
+%%%impl_doc_include
+
+%% -type tab() :: atom().
+%% -type alias() :: atom().
+%% -type rec_name() :: atom().
+%% -type type() :: set | bag | ordered_set.
+%% -type proplist() :: [{atom(), any()}].
+%% -type key() :: any().
+%% -type db_object() :: tuple().
+
+%% -type matchspec() :: ets:match_spec().
+%% -type limit() :: integer() | infinity.
+
+%% -type cont_fun() :: any().
+%% -type cont() :: '$end_of_table' | cont_fun().
+
+%% -callback check_definition(alias(), tab(), [node()], proplist()) -> ok.
+%% -callback create_table(alias(), tab(), proplist()) -> tab().
+%% -callback load_table(alias(), tab(), any()) -> ok.
+%% -callback delete_table(alias(), tab()) -> ok.
+%% -callback first(alias(), tab()) -> key().
+%% -callback last(alias(), tab()) -> key().
+%% -callback next(alias(), tab(), key()) -> key().
+%% -callback prev(alias(), tab(), key()) -> key().
+%% -callback insert(alias(), tab(), db_object()) -> ok.
+%% -callback lookup(alias(), tab(), key()) -> [db_object()].
+%% -callback delete(alias(), tab(), key()) -> ok.
+%% -callback update_counter(alias(), tab(), key(), integer()) -> integer().
+%% -callback select(cont()) -> {list(), cont()}.
+%% -callback select(alias(), tab(), matchspec()) -> list() | '$end_of_table'.
+%% -callback select(alias(), tab(), matchspec(), limit()) -> {list(), cont()}.
+%% -callback slot(alias(), tab(), integer()) -> key().
+%% -callback validate_key(alias(), tab(), rec_name(), arity(), type(), key()) ->
+%% {rec_name(), arity(), type()}.
+%% -callback validate_record(alias(),tab(),rec_name(),arity(),type(),db_obj()) ->
+%% {rec_name(), arity(), type()}.
+%% -callback repair_continuation(cont(), matchspec()) -> cont().
diff --git a/lib/mnesia/src/mnesia_bup.erl b/lib/mnesia/src/mnesia_bup.erl
index 8b1143a352..3e55deb958 100644
--- a/lib/mnesia/src/mnesia_bup.erl
+++ b/lib/mnesia/src/mnesia_bup.erl
@@ -28,6 +28,7 @@
fallback_exists/0,
tm_fallback_start/1,
create_schema/1,
+ create_schema/2,
install_fallback/1,
install_fallback/2,
uninstall_fallback/0,
@@ -46,7 +47,7 @@
uninstall_fallback_master/2,
local_uninstall_fallback/2,
do_traverse_backup/7,
- trav_apply/4
+ trav_apply/5
]).
-include("mnesia.hrl").
@@ -81,7 +82,8 @@ iterate(Mod, Fun, Opaque, Acc) ->
R = #restore{bup_module = Mod, bup_data = Opaque},
try read_schema_section(R) of
{R2, {Header, Schema, Rest}} ->
- try iter(R2, Header, Schema, Fun, Acc, Rest) of
+ Ext = get_ext_types(Schema),
+ try iter(R2, Header, Schema, Ext, Fun, Acc, Rest) of
{ok, R3, Res} ->
close_read(R3),
{ok, Res}
@@ -96,17 +98,32 @@ iterate(Mod, Fun, Opaque, Acc) ->
Err
end.
-iter(R, Header, Schema, Fun, Acc, []) ->
+get_ext_types(Schema) ->
+ try
+ List = lookup_schema(schema, Schema),
+ case lists:keyfind(user_properties, 1, List) of
+ {_, Props} ->
+ proplists:get_value(
+ mnesia_backend_types, Props, []);
+ false ->
+ []
+ end
+ catch
+ throw:{error, {"Cannot lookup",_}} ->
+ []
+ end.
+
+iter(R, Header, Schema, Ext, Fun, Acc, []) ->
case safe_apply(R, read, [R#restore.bup_data]) of
{R2, []} ->
- Res = Fun([], Header, Schema, Acc),
+ Res = Fun([], Header, Schema, Ext, Acc),
{ok, R2, Res};
{R2, BupItems} ->
- iter(R2, Header, Schema, Fun, Acc, BupItems)
+ iter(R2, Header, Schema, Ext, Fun, Acc, BupItems)
end;
-iter(R, Header, Schema, Fun, Acc, BupItems) ->
- Acc2 = Fun(BupItems, Header, Schema, Acc),
- iter(R, Header, Schema, Fun, Acc2, []).
+iter(R, Header, Schema, Ext, Fun, Acc, BupItems) ->
+ Acc2 = Fun(BupItems, Header, Schema, Ext, Acc),
+ iter(R, Header, Schema, Ext, Fun, Acc2, []).
-spec safe_apply(#restore{}, atom(), list()) -> tuple().
safe_apply(R, write, [_, Items]) when Items =:= [] ->
@@ -262,7 +279,7 @@ convert_0_1(Schema) ->
Cs = mnesia_schema:list2cs(List),
convert_0_1(Schema2, [], Cs);
false ->
- List = mnesia_schema:get_initial_schema(disc_copies, [node()]),
+ List = mnesia_schema:get_initial_schema(disc_copies, [node()], []),
Cs = mnesia_schema:list2cs(List),
convert_0_1(Schema, [], Cs)
end.
@@ -310,16 +327,19 @@ schema2bup({schema, Tab, TableDef}) ->
%% Create schema on the given nodes
%% Requires that old schemas has been deleted
%% Returns ok | {error, Reason}
-create_schema([]) ->
- create_schema([node()]);
-create_schema(Ns) when is_list(Ns) ->
+create_schema(Nodes) ->
+ create_schema(Nodes, []).
+
+create_schema([], Props) ->
+ create_schema([node()], Props);
+create_schema(Ns, Props) when is_list(Ns), is_list(Props) ->
case is_set(Ns) of
true ->
- create_schema(Ns, mnesia_schema:ensure_no_schema(Ns));
+ create_schema(Ns, mnesia_schema:ensure_no_schema(Ns), Props);
false ->
{error, {combine_error, Ns}}
end;
-create_schema(Ns) ->
+create_schema(Ns, _Props) ->
{error, {badarg, Ns}}.
is_set(List) when is_list(List) ->
@@ -327,7 +347,7 @@ is_set(List) when is_list(List) ->
is_set(_) ->
false.
-create_schema(Ns, ok) ->
+create_schema(Ns, ok, Props) ->
%% Ensure that we access the intended Mnesia
%% directory. This function may not be called
%% during startup since it will cause the
@@ -346,7 +366,7 @@ create_schema(Ns, ok) ->
Str = mk_str(),
File = mnesia_lib:dir(Str),
file:delete(File),
- try make_initial_backup(Ns, File, Mod) of
+ try make_initial_backup(Ns, File, Mod, Props) of
{ok, _Res} ->
case do_install_fallback(File, Mod) of
ok ->
@@ -363,9 +383,9 @@ create_schema(Ns, ok) ->
{error, Reason} ->
{error, Reason}
end;
-create_schema(_Ns, {error, Reason}) ->
+create_schema(_Ns, {error, Reason}, _) ->
{error, Reason};
-create_schema(_Ns, Reason) ->
+create_schema(_Ns, Reason, _) ->
{error, Reason}.
mk_str() ->
@@ -373,7 +393,10 @@ mk_str() ->
lists:concat([node()] ++ Now ++ ".TMP").
make_initial_backup(Ns, Opaque, Mod) ->
- Orig = mnesia_schema:get_initial_schema(disc_copies, Ns),
+ make_initial_backup(Ns, Opaque, Mod, []).
+
+make_initial_backup(Ns, Opaque, Mod, Props) ->
+ Orig = mnesia_schema:get_initial_schema(disc_copies, Ns, Props),
Modded = proplists:delete(storage_properties, proplists:delete(majority, Orig)),
Schema = [{schema, schema, Modded}],
O2 = do_apply(Mod, open_write, [Opaque], Opaque),
@@ -486,15 +509,15 @@ install_fallback_master(ClientPid, FA) ->
State = {start, FA},
Opaque = FA#fallback_args.opaque,
Mod = FA#fallback_args.module,
- Res = iterate(Mod, fun restore_recs/4, Opaque, State),
+ Res = iterate(Mod, fun restore_recs/5, Opaque, State),
unlink(ClientPid),
ClientPid ! {self(), Res},
exit(shutdown).
-restore_recs(_, _, _, stop) ->
+restore_recs(_, _, _, _, stop) ->
throw({error, "restore_recs already stopped"});
-restore_recs(Recs, Header, Schema, {start, FA}) ->
+restore_recs(Recs, Header, Schema, Ext, {start, FA}) ->
%% No records in backup
Schema2 = convert_schema(Header#log_header.log_version, Schema),
CreateList = lookup_schema(schema, Schema2),
@@ -505,19 +528,19 @@ restore_recs(Recs, Header, Schema, {start, FA}) ->
Args = [self(), FA],
Pids = [spawn_link(N, ?MODULE, fallback_receiver, Args) || N <- Ns],
send_fallback(Pids, {start, Header, Schema2}),
- Res = restore_recs(Recs, Header, Schema2, Pids),
+ Res = restore_recs(Recs, Header, Schema2, Ext, Pids),
global:del_lock({{mnesia_table_lock, schema}, self()}, Ns),
Res
catch _:Reason ->
throw({error, {"Bad schema in restore_recs", Reason}})
end;
-restore_recs([], _Header, _Schema, Pids) ->
+restore_recs([], _Header, _Schema, _Ext, Pids) ->
send_fallback(Pids, swap),
send_fallback(Pids, stop),
stop;
-restore_recs(Recs, _, _, Pids) ->
+restore_recs(Recs, _, _, _, Pids) ->
send_fallback(Pids, {records, Recs}),
Pids.
@@ -716,7 +739,7 @@ do_fallback_start(true, false) ->
BupFile = fallback_bup(),
Mod = mnesia_backup,
LocalTabs = ?ets_new_table(mnesia_local_tables, [set, public, {keypos, 2}]),
- case iterate(Mod, fun restore_tables/4, BupFile, {start, LocalTabs}) of
+ case iterate(Mod, fun restore_tables/5, BupFile, {start, LocalTabs}) of
{ok, _Res} ->
?SAFE(dets:close(schema)),
TmpSchema = mnesia_lib:tab2tmp(schema),
@@ -737,23 +760,24 @@ do_fallback_start(true, false) ->
{error, {"Cannot start from fallback", Reason}}
end.
-restore_tables(All=[Rec | Recs], Header, Schema, State={local, LocalTabs, LT}) ->
+restore_tables(All=[Rec | Recs], Header, Schema, Ext,
+ State={local, LocalTabs, LT}) ->
Tab = element(1, Rec),
if
Tab =:= LT#local_tab.name ->
Key = element(2, Rec),
(LT#local_tab.add)(Tab, Key, Rec, LT),
- restore_tables(Recs, Header, Schema, State);
+ restore_tables(Recs, Header, Schema, Ext, State);
true ->
NewState = {new, LocalTabs},
- restore_tables(All, Header, Schema, NewState)
+ restore_tables(All, Header, Schema, Ext, NewState)
end;
-restore_tables(All=[Rec | Recs], Header, Schema, {new, LocalTabs}) ->
+restore_tables(All=[Rec | Recs], Header, Schema, Ext, {new, LocalTabs}) ->
Tab = element(1, Rec),
case ?ets_lookup(LocalTabs, Tab) of
[] ->
State = {not_local, LocalTabs, Tab},
- restore_tables(Recs, Header, Schema, State);
+ restore_tables(Recs, Header, Schema, Ext, State);
[LT] when is_record(LT, local_tab) ->
State = {local, LocalTabs, LT},
case LT#local_tab.opened of
@@ -762,38 +786,39 @@ restore_tables(All=[Rec | Recs], Header, Schema, {new, LocalTabs}) ->
(LT#local_tab.open)(Tab, LT),
?ets_insert(LocalTabs,LT#local_tab{opened=true})
end,
- restore_tables(All, Header, Schema, State)
+ restore_tables(All, Header, Schema, Ext, State)
end;
-restore_tables(All=[Rec | Recs], Header, Schema, S = {not_local, LocalTabs, PrevTab}) ->
+restore_tables(All=[Rec | Recs], Header, Schema, Ext,
+ S = {not_local, LocalTabs, PrevTab}) ->
Tab = element(1, Rec),
if
Tab =:= PrevTab ->
- restore_tables(Recs, Header, Schema, S);
+ restore_tables(Recs, Header, Schema, Ext, S);
true ->
State = {new, LocalTabs},
- restore_tables(All, Header, Schema, State)
+ restore_tables(All, Header, Schema, Ext, State)
end;
-restore_tables(Recs, Header, Schema, {start, LocalTabs}) ->
+restore_tables(Recs, Header, Schema, Ext, {start, LocalTabs}) ->
Dir = mnesia_lib:dir(),
OldDir = filename:join([Dir, "OLD_DIR"]),
mnesia_schema:purge_dir(OldDir, []),
mnesia_schema:purge_dir(Dir, [fallback_name()]),
- init_dat_files(Schema, LocalTabs),
+ init_dat_files(Schema, Ext, LocalTabs),
State = {new, LocalTabs},
- restore_tables(Recs, Header, Schema, State);
-restore_tables([], _Header, _Schema, State) ->
+ restore_tables(Recs, Header, Schema, Ext, State);
+restore_tables([], _Header, _Schema, _Ext, State) ->
State.
%% Creates all neccessary dat files and inserts
%% the table definitions in the schema table
%%
%% Returns a list of local_tab tuples for all local tables
-init_dat_files(Schema, LocalTabs) ->
+init_dat_files(Schema, Ext, LocalTabs) ->
TmpFile = mnesia_lib:tab2tmp(schema),
Args = [{file, TmpFile}, {keypos, 2}, {type, set}],
case dets:open_file(schema, Args) of % Assume schema lock
{ok, _} ->
- create_dat_files(Schema, LocalTabs),
+ create_dat_files(Schema, Ext, LocalTabs),
ok = dets:close(schema),
LocalTab = #local_tab{name = schema,
storage_type = disc_copies,
@@ -808,10 +833,10 @@ init_dat_files(Schema, LocalTabs) ->
throw({error, {"Cannot open file", schema, Args, Reason}})
end.
-create_dat_files([{schema, schema, TabDef} | Tail], LocalTabs) ->
+create_dat_files([{schema, schema, TabDef} | Tail], Ext, LocalTabs) ->
ok = dets:insert(schema, {schema, schema, TabDef}),
- create_dat_files(Tail, LocalTabs);
-create_dat_files([{schema, Tab, TabDef} | Tail], LocalTabs) ->
+ create_dat_files(Tail, Ext, LocalTabs);
+create_dat_files([{schema, Tab, TabDef} | Tail], Ext, LocalTabs) ->
TmpFile = mnesia_lib:tab2tmp(Tab),
DatFile = mnesia_lib:tab2dat(Tab),
DclFile = mnesia_lib:tab2dcl(Tab),
@@ -824,56 +849,21 @@ create_dat_files([{schema, Tab, TabDef} | Tail], LocalTabs) ->
mnesia_lib:dets_sync_close(Tab),
file:delete(TmpFile),
- Cs = mnesia_schema:list2cs(TabDef),
+ Cs = mnesia_schema:list2cs(TabDef, Ext),
ok = dets:insert(schema, {schema, Tab, TabDef}),
RecName = Cs#cstruct.record_name,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
+ delete_ext(Storage, Tab),
+ Semantics = mnesia_lib:semantics(Storage, storage),
if
- Storage =:= unknown ->
+ Semantics =:= undefined ->
ok = dets:delete(schema, {schema, Tab}),
- create_dat_files(Tail, LocalTabs);
- Storage =:= disc_only_copies ->
- Args = [{file, TmpFile}, {keypos, 2},
- {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],
- Open = fun(T, LT) when T =:= LT#local_tab.name ->
- case mnesia_lib:dets_sync_open(T, Args) of
- {ok, _} ->
- ok;
- {error, Reason} ->
- throw({error, {"Cannot open file", T, Args, Reason}})
- end
- end,
- Add = fun(T, Key, Rec, LT) when T =:= LT#local_tab.name ->
- case Rec of
- {_T, Key} ->
- ok = dets:delete(T, Key);
- (Rec) when T =:= RecName ->
- ok = dets:insert(Tab, Rec);
- (Rec) ->
- Rec2 = setelement(1, Rec, RecName),
- ok = dets:insert(T, Rec2)
- end
- end,
- Close = fun(T, LT) when T =:= LT#local_tab.name ->
- mnesia_lib:dets_sync_close(T)
- end,
- Swap = fun(T, LT) when T =:= LT#local_tab.name ->
- Expunge(),
- case LT#local_tab.opened of
- true ->
- Close(T,LT);
- false ->
- Open(T,LT),
- Close(T,LT)
- end,
- case file:rename(TmpFile, DatFile) of
- ok ->
- ok;
- {error, Reason} ->
- mnesia_lib:fatal("Cannot rename file ~p -> ~p: ~p~n",
- [TmpFile, DatFile, Reason])
- end
- end,
+ create_dat_files(Tail, Ext, LocalTabs);
+ Semantics =:= disc_only_copies ->
+ Open = disc_only_open_fun(Storage, Cs),
+ Add = disc_only_add_fun(Storage, Cs),
+ Close = disc_only_close_fun(Storage),
+ Swap = disc_only_swap_fun(Storage, Expunge, Open, Close),
LocalTab = #local_tab{name = Tab,
storage_type = Storage,
open = Open,
@@ -883,8 +873,8 @@ create_dat_files([{schema, Tab, TabDef} | Tail], LocalTabs) ->
record_name = RecName,
opened = false},
?ets_insert(LocalTabs, LocalTab),
- create_dat_files(Tail, LocalTabs);
- Storage =:= ram_copies; Storage =:= disc_copies ->
+ create_dat_files(Tail, Ext, LocalTabs);
+ Semantics =:= ram_copies; Storage =:= disc_copies ->
Open = fun(T, LT) when T =:= LT#local_tab.name ->
mnesia_log:open_log({?MODULE, T},
mnesia_log:dcl_log_header(),
@@ -945,18 +935,97 @@ create_dat_files([{schema, Tab, TabDef} | Tail], LocalTabs) ->
opened = false
},
?ets_insert(LocalTabs, LocalTab),
- create_dat_files(Tail, LocalTabs)
+ create_dat_files(Tail, Ext, LocalTabs);
+ true ->
+ error({unknown_semantics, [{semantics, Semantics},
+ {tabdef, TabDef},
+ {ext, Ext}]})
end;
-create_dat_files([{schema, Tab} | Tail], LocalTabs) ->
+create_dat_files([{schema, Tab} | Tail], Ext, LocalTabs) ->
?ets_delete(LocalTabs, Tab),
ok = dets:delete(schema, {schema, Tab}),
TmpFile = mnesia_lib:tab2tmp(Tab),
mnesia_lib:dets_sync_close(Tab),
file:delete(TmpFile),
- create_dat_files(Tail, LocalTabs);
-create_dat_files([], _LocalTabs) ->
+ create_dat_files(Tail, Ext, LocalTabs);
+create_dat_files([], _Ext, _LocalTabs) ->
ok.
+delete_ext({ext, Alias, Mod}, Tab) ->
+ Mod:close_table(Alias, Tab),
+ Mod:delete_table(Alias, Tab),
+ ok;
+delete_ext(_, _) ->
+ ok.
+
+
+disc_only_open_fun(disc_only_copies, #cstruct{name = Tab} =Cs) ->
+ TmpFile = mnesia_lib:tab2tmp(Tab),
+ Args = [{file, TmpFile}, {keypos, 2},
+ {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)}],
+ fun(T, LT) when T =:= LT#local_tab.name ->
+ case mnesia_lib:dets_sync_open(T, Args) of
+ {ok, _} ->
+ ok;
+ {error, Reason} ->
+ throw({error, {"Cannot open file", T, Args, Reason}})
+ end
+ end;
+disc_only_open_fun({ext,Alias,Mod}, Cs) ->
+ fun(T, LT) when T =:= LT#local_tab.name ->
+ ok = Mod:load_table(Alias, T, restore, mnesia_schema:cs2list(Cs))
+ end.
+
+disc_only_add_fun(Storage, #cstruct{name = Tab,
+ record_name = RecName}) ->
+ fun(T, Key, Rec, #local_tab{name = T}) when T =:= Tab->
+ case Rec of
+ {_T, Key} ->
+ ok = mnesia_lib:db_erase(Storage, T, Key);
+ (Rec) when T =:= RecName ->
+ ok = mnesia_lib:db_put(Storage, T, Rec);
+ (Rec) ->
+ ok = mnesia_lib:db_put(Storage, T,
+ setelement(1, Rec, RecName))
+ end
+ end.
+
+disc_only_close_fun(disc_only_copies) ->
+ fun(T, LT) when T =:= LT#local_tab.name ->
+ mnesia_lib:dets_sync_close(T)
+ end;
+disc_only_close_fun({ext, Alias, Mod}) ->
+ fun(T, _LT) ->
+ Mod:sync_close_table(Alias, T)
+ end.
+
+
+disc_only_swap_fun(disc_only_copies, Expunge, Open, Close) ->
+ fun(T, LT) when T =:= LT#local_tab.name ->
+ TmpFile = mnesia_lib:tab2tmp(T),
+ DatFile = mnesia_lib:tab2dat(T),
+ Expunge(),
+ case LT#local_tab.opened of
+ true ->
+ Close(T,LT);
+ false ->
+ Open(T,LT),
+ Close(T,LT)
+ end,
+ case file:rename(TmpFile, DatFile) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ mnesia_lib:fatal("Cannot rename file ~p -> ~p: ~p~n",
+ [TmpFile, DatFile, Reason])
+ end
+ end;
+disc_only_swap_fun({ext, _Alias, _Mod}, _Expunge, _Open, Close) ->
+ fun(T, #local_tab{name = T} = LT) ->
+ Close(T, LT)
+ end.
+
+
uninstall_fallback() ->
uninstall_fallback([{scope, global}]).
@@ -1133,7 +1202,7 @@ do_traverse_backup(ClientPid, Source, SourceMod, Target, TargetMod, Fun, Acc) ->
end,
A = {start, Fun, Acc, TargetMod, Iter},
Res =
- case iterate(SourceMod, fun trav_apply/4, Source, A) of
+ case iterate(SourceMod, fun trav_apply/5, Source, A) of
{ok, {iter, _, Acc2, _, Iter2}} when TargetMod =/= read_only ->
try
do_apply(TargetMod, commit_write, [Iter2], Iter2),
@@ -1152,7 +1221,7 @@ do_traverse_backup(ClientPid, Source, SourceMod, Target, TargetMod, Fun, Acc) ->
unlink(ClientPid),
ClientPid ! {iter_done, self(), Res}.
-trav_apply(Recs, _Header, _Schema, {iter, Fun, Acc, Mod, Iter}) ->
+trav_apply(Recs, _Header, _Schema, _Ext, {iter, Fun, Acc, Mod, Iter}) ->
{NewRecs, Acc2} = filter_foldl(Fun, Acc, Recs),
if
Mod =/= read_only, NewRecs =/= [] ->
@@ -1161,7 +1230,7 @@ trav_apply(Recs, _Header, _Schema, {iter, Fun, Acc, Mod, Iter}) ->
true ->
{iter, Fun, Acc2, Mod, Iter}
end;
-trav_apply(Recs, Header, Schema, {start, Fun, Acc, Mod, Iter}) ->
+trav_apply(Recs, Header, Schema, Ext, {start, Fun, Acc, Mod, Iter}) ->
Iter2 =
if
Mod =/= read_only ->
@@ -1169,8 +1238,9 @@ trav_apply(Recs, Header, Schema, {start, Fun, Acc, Mod, Iter}) ->
true ->
Iter
end,
- TravAcc = trav_apply(Schema, Header, Schema, {iter, Fun, Acc, Mod, Iter2}),
- trav_apply(Recs, Header, Schema, TravAcc).
+ TravAcc = trav_apply(Schema, Header, Schema, Ext,
+ {iter, Fun, Acc, Mod, Iter2}),
+ trav_apply(Recs, Header, Schema, Ext, TravAcc).
filter_foldl(Fun, Acc, [Head|Tail]) ->
case Fun(Head, Acc) of
diff --git a/lib/mnesia/src/mnesia_checkpoint.erl b/lib/mnesia/src/mnesia_checkpoint.erl
index 33b28a2f29..9eb939e8d3 100644
--- a/lib/mnesia/src/mnesia_checkpoint.erl
+++ b/lib/mnesia/src/mnesia_checkpoint.erl
@@ -675,6 +675,16 @@ tab2retainer({Tab, Name}) ->
FlatName = lists:flatten(io_lib:write(Name)),
mnesia_lib:dir(lists:concat([?MODULE, "_", Tab, "_", FlatName, ".RET"])).
+retainer_create(_Cp, R, Tab, Name, Ext = {ext, Alias, Mod}) ->
+ T = {Tab, retainer, Name},
+ P = mnesia_schema:cs2list(val({Tab, cstruct})),
+ Mod:delete_table(Alias, T),
+ ok = Mod:create_table(Alias, T, P),
+ Cs = val({Tab, cstruct}),
+ Mod:load_table(Alias, T, {retainer, create_table},
+ mnesia_schema:cs2list(Cs)),
+ dbg_out("Checkpoint retainer created ~p ~p~n", [Name, Tab]),
+ R#retainer{store = {Ext, T}, really_retain = true};
retainer_create(_Cp, R, Tab, Name, disc_only_copies) ->
Fname = tab2retainer({Tab, Name}),
file:delete(Fname),
@@ -734,15 +744,23 @@ traverse_dcd({Cont, Recs}, Log, Fun) -> %% trashed data??
traverse_dcd(eof, _Log, _Fun) ->
ok.
+retainer_get({{ext, Alias, Mod}, Store}, Key) ->
+ Mod:lookup(Alias, Store, Key);
retainer_get({ets, Store}, Key) -> ?ets_lookup(Store, Key);
retainer_get({dets, Store}, Key) -> dets:lookup(Store, Key).
+retainer_put({{ext, Alias, Mod}, Store}, Val) ->
+ Mod:insert(Alias, Store, Val);
retainer_put({ets, Store}, Val) -> ?ets_insert(Store, Val);
retainer_put({dets, Store}, Val) -> dets:insert(Store, Val).
+retainer_first({{ext, Alias, Mod}, Store}) ->
+ Mod:first(Alias, Store);
retainer_first({ets, Store}) -> ?ets_first(Store);
retainer_first({dets, Store}) -> dets:first(Store).
+retainer_next({{ext, Alias, Mod}, Store}, Key) ->
+ Mod:next(Alias, Store, Key);
retainer_next({ets, Store}, Key) -> ?ets_next(Store, Key);
retainer_next({dets, Store}, Key) -> dets:next(Store, Key).
@@ -761,11 +779,16 @@ retainer_next({dets, Store}, Key) -> dets:next(Store, Key).
retainer_fixtable(Tab, Bool) when is_atom(Tab) ->
mnesia_lib:db_fixtable(val({Tab, storage_type}), Tab, Bool);
+retainer_fixtable({Ext = {ext, _, _}, Tab}, Bool) ->
+ mnesia_lib:db_fixtable(Ext, Tab, Bool);
retainer_fixtable({ets, Tab}, Bool) ->
mnesia_lib:db_fixtable(ram_copies, Tab, Bool);
retainer_fixtable({dets, Tab}, Bool) ->
mnesia_lib:db_fixtable(disc_only_copies, Tab, Bool).
+retainer_delete({{ext, Alias, Mod}, Store}) ->
+ Mod:close_table(Alias, Store),
+ Mod:delete_table(Alias, Store);
retainer_delete({ets, Store}) ->
?ets_delete_table(Store);
retainer_delete({dets, Store}) ->
diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl
index 58cc66e18b..4791e2e290 100644
--- a/lib/mnesia/src/mnesia_controller.erl
+++ b/lib/mnesia/src/mnesia_controller.erl
@@ -385,6 +385,8 @@ force_load_table(Tab) when is_atom(Tab), Tab /= schema ->
do_force_load_table(Tab);
disc_only_copies ->
do_force_load_table(Tab);
+ {ext, _, _} ->
+ do_force_load_table(Tab);
unknown ->
set({Tab, load_by_force}, true),
cast({force_load_updated, Tab}),
@@ -1533,8 +1535,8 @@ update_whereabouts(Tab, Node, State) ->
Storage == unknown ->
%% No own copy, continue to read remotely
add_active_replica(Tab, Node),
- NodeST = mnesia_lib:storage_type_at_node(Node, Tab),
- ReadST = mnesia_lib:storage_type_at_node(Read, Tab),
+ NodeST = mnesia_lib:semantics(mnesia_lib:storage_type_at_node(Node, Tab), storage),
+ ReadST = mnesia_lib:semantics(mnesia_lib:storage_type_at_node(Read, Tab), storage),
if %% Avoid reading from disc_only_copies
NodeST == disc_only_copies ->
ignore;
@@ -1588,6 +1590,7 @@ last_consistent_replica(Tab, Downs) ->
Ram = Cs#cstruct.ram_copies,
Disc = Cs#cstruct.disc_copies,
DiscOnly = Cs#cstruct.disc_only_copies,
+ Ext = Cs#cstruct.external_copies,
BetterCopies0 = mnesia_lib:remote_copy_holders(Cs) -- Downs,
BetterCopies = BetterCopies0 -- Ram,
AccessMode = Cs#cstruct.access_mode,
@@ -1620,7 +1623,7 @@ last_consistent_replica(Tab, Downs) ->
false;
Storage == ram_copies ->
if
- Disc == [], DiscOnly == [] ->
+ Disc == [], DiscOnly == [], Ext == [] ->
%% Nobody has copy on disc
{true, {Tab, ram_only}};
true ->
@@ -1865,6 +1868,11 @@ info([Tab | Tail]) ->
dets:info(Tab, size),
dets:info(Tab, file_size),
"bytes on disc");
+ {ext, Alias, Mod} ->
+ info_format(Tab,
+ Mod:info(Alias, Tab, size),
+ Mod:info(Alias, Tab, memory),
+ "words of mem");
_ ->
info_format(Tab,
?ets_info(Tab, size),
diff --git a/lib/mnesia/src/mnesia_dumper.erl b/lib/mnesia/src/mnesia_dumper.erl
index 7418ab7436..eb02a585a6 100644
--- a/lib/mnesia/src/mnesia_dumper.erl
+++ b/lib/mnesia/src/mnesia_dumper.erl
@@ -38,6 +38,8 @@
needs_dump_ets/1,
raw_dump_table/2,
raw_named_dump_table/2,
+ dump_to_logfile/2,
+ load_from_logfile/3,
start_regulator/0,
opt_dump_log/1,
update/3,
@@ -227,11 +229,11 @@ insert_rec(Rec, InPlace, InitBy, LogV) when is_record(Rec, commit) ->
D = Rec#commit.decision,
case mnesia_recover:wait_for_decision(D, InitBy) of
{Tid, committed} ->
- do_insert_rec(Tid, Rec, InPlace, InitBy, LogV);
+ do_insert_rec(Tid, mnesia_tm:new_cr_format(Rec), InPlace, InitBy, LogV);
{Tid, aborted} ->
case InitBy of
startup ->
- mnesia_schema:undo_prepare_commit(Tid, Rec);
+ mnesia_schema:undo_prepare_commit(Tid, mnesia_tm:new_cr_format(Rec));
_ ->
ok
end
@@ -271,15 +273,30 @@ do_insert_rec(Tid, Rec, InPlace, InitBy, LogV) ->
end
end,
D = Rec#commit.disc_copies,
+ ExtOps = commit_ext(Rec),
insert_ops(Tid, disc_copies, D, InPlace, InitBy, LogV),
+ [insert_ops(Tid, Ext, Ops, InPlace, InitBy, LogV) ||
+ {Ext, Ops} <- ExtOps,
+ storage_semantics(Ext) == disc_copies],
case InitBy of
startup ->
DO = Rec#commit.disc_only_copies,
- insert_ops(Tid, disc_only_copies, DO, InPlace, InitBy, LogV);
+ insert_ops(Tid, disc_only_copies, DO, InPlace, InitBy, LogV),
+ [insert_ops(Tid, Ext, Ops, InPlace, InitBy, LogV) ||
+ {Ext, Ops} <- ExtOps, storage_semantics(Ext) == disc_only_copies];
_ ->
ignore
end.
+commit_ext(#commit{ext = []}) -> [];
+commit_ext(#commit{ext = Ext}) ->
+ case lists:keyfind(ext_copies, 1, Ext) of
+ {_, C} ->
+ lists:foldl(fun({Ext0, Op}, D) ->
+ orddict:append(Ext0, Op, D)
+ end, orddict:new(), C);
+ false -> []
+ end.
update(_Tid, [], _DumperMode) ->
dumped;
@@ -330,14 +347,15 @@ insert_ops(Tid, Storage, [Op | Ops], InPlace, InitBy, Ver) when Ver < "4.3" ->
%% Normal ops
disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
- case open_files(Tab, Storage, InPlace, InitBy) of
+ Semantics = storage_semantics(Storage),
+ case open_files(Tab, Semantics, Storage, InPlace, InitBy) of
true ->
- case Storage of
+ case Semantics of
disc_copies when Tab /= schema ->
mnesia_log:append({?MODULE,Tab}, {{Tab, Key}, Val, Op}),
ok;
_ ->
- dets_insert(Op,Tab,Key,Val)
+ dets_insert(Op,Tab,Key,Val,Storage)
end;
false ->
ignore
@@ -349,34 +367,37 @@ disc_insert(_Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
%% Otherwise we will get a double increment.
%% This is perfect but update_counter is a dirty op.
-dets_insert(Op,Tab,Key,Val) ->
+dets_insert(Op,Tab,Key,Val, Storage0) ->
+ Storage = if Tab == schema -> disc_only_copies;
+ true -> Storage0
+ end,
case Op of
write ->
dets_updated(Tab,Key),
- ok = dets:insert(Tab, Val);
+ ok = mnesia_lib:db_put(Storage, Tab, Val);
delete ->
dets_updated(Tab,Key),
- ok = dets:delete(Tab, Key);
+ ok = mnesia_lib:db_erase(Storage, Tab, Key);
update_counter ->
case dets_incr_counter(Tab,Key) of
true ->
{RecName, Incr} = Val,
- try _ = dets:update_counter(Tab, Key, Incr)
+ try _ = mnesia_lib:db_update_counter(Storage, Tab, Key, Incr)
catch error:_ when Incr < 0 ->
Zero = {RecName, Key, 0},
- ok = dets:insert(Tab, Zero);
+ ok = mnesia_lib:db_put(Storage, Tab, Zero);
error:_ ->
Init = {RecName, Key, Incr},
- ok = dets:insert(Tab, Init)
+ ok = mnesia_lib:db_put(Storage, Tab, Init)
end;
false -> ok
end;
delete_object ->
dets_updated(Tab,Key),
- ok = dets:delete_object(Tab, Val);
+ mnesia_lib:db_match_erase(Storage, Tab, Val);
clear_table ->
dets_cleared(Tab),
- ok = dets:delete_all_objects(Tab)
+ ok = mnesia_lib:db_match_erase(Storage, Tab, '_')
end.
dets_updated(Tab,Key) ->
@@ -431,23 +452,29 @@ insert(_Tid, _Storage, _Tab, _Key, [], _Op, _InPlace, _InitBy) ->
ok;
insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
+ Semantics = storage_semantics(Storage),
Item = {{Tab, Key}, Val, Op},
case InitBy of
startup ->
disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy);
- _ when Storage == ram_copies ->
+ _ when Semantics == ram_copies ->
mnesia_tm:do_update_op(Tid, Storage, Item),
Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
mnesia_tm:do_snmp(Tid, Snmp);
- _ when Storage == disc_copies ->
+ _ when Semantics == disc_copies ->
disc_insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy),
mnesia_tm:do_update_op(Tid, Storage, Item),
Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
mnesia_tm:do_snmp(Tid, Snmp);
- _ when Storage == disc_only_copies ->
+ _ when Semantics == disc_only_copies ->
+ mnesia_tm:do_update_op(Tid, Storage, Item),
+ Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
+ mnesia_tm:do_snmp(Tid, Snmp);
+
+ _ when element(1, Storage) == ext ->
mnesia_tm:do_update_op(Tid, Storage, Item),
Snmp = mnesia_tm:prepare_snmp(Tab, Key, [Item]),
mnesia_tm:do_snmp(Tid, Snmp);
@@ -456,6 +483,9 @@ insert(Tid, Storage, Tab, Key, Val, Op, InPlace, InitBy) ->
ignore
end.
+disc_delete_table(Tab, {ext, Alias, Mod}) ->
+ Mod:close_table(Alias, Tab),
+ Mod:delete_table(Alias, Tab);
disc_delete_table(Tab, Storage) ->
case mnesia_monitor:use_dir() of
true ->
@@ -485,11 +515,14 @@ disc_delete_table(Tab, Storage) ->
ok
end.
-disc_delete_indecies(_Tab, _Cs, Storage) when Storage /= disc_only_copies ->
- ok;
-disc_delete_indecies(Tab, Cs, disc_only_copies) ->
- Indecies = Cs#cstruct.index,
- mnesia_index:del_transient(Tab, Indecies, disc_only_copies).
+disc_delete_indecies(Tab, Cs, Storage) ->
+ case storage_semantics(Storage) of
+ disc_only_copies ->
+ Indecies = Cs#cstruct.index,
+ mnesia_index:del_transient(Tab, Indecies, Storage);
+ _ ->
+ ok
+ end.
insert_op(Tid, Storage, {{Tab, Key}, Val, Op}, InPlace, InitBy) ->
%% Propagate to disc only
@@ -515,6 +548,8 @@ insert_op(Tid, _, {op, change_table_copy_type, N, FromS, ToS, TabDef}, InPlace,
Cs = mnesia_schema:list2cs(TabDef),
Val = mnesia_schema:insert_cstruct(Tid, Cs, true), % Update ram only
{schema, Tab, _} = Val,
+ FromSem = storage_semantics(FromS),
+ ToSem = storage_semantics(ToS),
case lists:member(N, val({current, db_nodes})) of
true when InitBy /= startup ->
mnesia_controller:add_active_replica(Tab, N, Cs);
@@ -527,62 +562,118 @@ insert_op(Tid, _, {op, change_table_copy_type, N, FromS, ToS, TabDef}, InPlace,
Dat = mnesia_lib:tab2dat(Tab),
Dcd = mnesia_lib:tab2dcd(Tab),
Dcl = mnesia_lib:tab2dcl(Tab),
+ Logtmp = mnesia_lib:tab2logtmp(Tab),
case {FromS, ToS} of
- {ram_copies, disc_copies} when Tab == schema ->
- ok = ensure_rename(Dmp, Dat);
- {ram_copies, disc_copies} ->
- file:delete(Dcl),
- ok = ensure_rename(Dmp, Dcd);
- {disc_copies, ram_copies} when Tab == schema ->
- mnesia_lib:set(use_dir, false),
- mnesia_monitor:unsafe_close_dets(Tab),
- ok = file:delete(Dat);
- {disc_copies, ram_copies} ->
- _ = file:delete(Dcl),
- _ = file:delete(Dcd),
- ok;
- {ram_copies, disc_only_copies} ->
- ok = ensure_rename(Dmp, Dat),
- true = open_files(Tab, disc_only_copies, InPlace, InitBy),
- %% ram_delete_table must be done before init_indecies,
- %% it uses info which is reset in init_indecies,
- %% it doesn't matter, because init_indecies don't use
- %% the ram replica of the table when creating the disc
- %% index; Could be improved :)
- mnesia_schema:ram_delete_table(Tab, FromS),
- PosList = Cs#cstruct.index,
- mnesia_index:init_indecies(Tab, disc_only_copies, PosList);
- {disc_only_copies, ram_copies} ->
- mnesia_monitor:unsafe_close_dets(Tab),
- disc_delete_indecies(Tab, Cs, disc_only_copies),
- case InitBy of
- startup ->
- ignore;
+ {{ext,_FromAlias,_FromMod},{ext,ToAlias,ToMod}} ->
+ disc_delete_table(Tab, FromS),
+ ok = ToMod:delete_table(ToAlias, Tab),
+ ok = mnesia_monitor:unsafe_create_external(Tab, ToAlias, ToMod, TabDef),
+ ok = ToMod:load_table(ToAlias, Tab, {dumper,change_table_copy_type}, TabDef),
+ ok = load_from_logfile(ToS, Tab, Logtmp),
+ file:delete(Logtmp),
+ restore_indexes(Tab, ToS, Cs);
+
+ {_,{ext,ToAlias,ToMod}} ->
+ case FromSem of
+ ram_copies ->
+ mnesia_schema:ram_delete_table(Tab, FromS);
_ ->
- mnesia_controller:get_disc_copy(Tab),
- ok
+ if FromSem == disc_copies ->
+ mnesia_schema:ram_delete_table(
+ Tab, FromS);
+ true -> ok
+ end,
+ disc_delete_table(Tab, FromS)
end,
- disc_delete_table(Tab, disc_only_copies);
- {disc_copies, disc_only_copies} ->
- ok = ensure_rename(Dmp, Dat),
- true = open_files(Tab, disc_only_copies, InPlace, InitBy),
- mnesia_schema:ram_delete_table(Tab, FromS),
- PosList = Cs#cstruct.index,
- mnesia_index:init_indecies(Tab, disc_only_copies, PosList),
- _ = file:delete(Dcl),
- _ = file:delete(Dcd),
- ok;
- {disc_only_copies, disc_copies} ->
- mnesia_monitor:unsafe_close_dets(Tab),
- disc_delete_indecies(Tab, Cs, disc_only_copies),
- case InitBy of
- startup ->
- ignore;
- _ ->
- mnesia_log:ets2dcd(Tab),
- mnesia_controller:get_disc_copy(Tab),
- disc_delete_table(Tab, disc_only_copies)
- end
+
+ ok = ToMod:delete_table(ToAlias, Tab),
+ ok = mnesia_monitor:unsafe_create_external(Tab, ToAlias, ToMod, TabDef),
+ ok = ToMod:load_table(ToAlias, Tab, {dumper,change_table_copy_type}, TabDef),
+ ok = load_from_logfile(ToS, Tab, Logtmp),
+ file:delete(Logtmp),
+ restore_indexes(Tab, ToS, Cs);
+
+ {{ext,_FromAlias,_FromMod} = FromS, ToS} ->
+ disc_delete_table(Tab, FromS),
+ case ToS of
+ ram_copies ->
+ change_disc_to_ram(
+ Tab, Cs, FromS, ToS, Logtmp, InitBy);
+ disc_copies ->
+ Args = [{keypos, 2}, public, named_table,
+ Cs#cstruct.type],
+ mnesia_monitor:mktab(Tab, Args),
+ ok = load_from_logfile(ToS, Tab, Logtmp),
+ file:delete(Logtmp);
+ disc_only_copies ->
+ %% ok = ensure_rename(Dmp, Dat),
+ true = open_files(Tab, ToS, InPlace, InitBy),
+ ok = load_from_logfile(ToS, Tab, Logtmp),
+ file:delete(Logtmp)
+ end,
+ restore_indexes(Tab, ToS, Cs);
+
+ _NoneAreExt ->
+
+ case {FromSem, ToSem} of
+ {ram_copies, disc_copies} when Tab == schema ->
+ ok = ensure_rename(Dmp, Dat);
+ {ram_copies, disc_copies} ->
+ file:delete(Dcl),
+ ok = ensure_rename(Dmp, Dcd);
+ {disc_copies, ram_copies} when Tab == schema ->
+ mnesia_lib:set(use_dir, false),
+ mnesia_monitor:unsafe_close_dets(Tab),
+ ok = file:delete(Dat);
+ {disc_copies, ram_copies} ->
+ _ = file:delete(Dcl),
+ _ = file:delete(Dcd),
+ ok;
+ {ram_copies, disc_only_copies} ->
+ ok = ensure_rename(Dmp, Dat),
+ true = open_files(Tab, ToS, InPlace, InitBy),
+ %% ram_delete_table must be done before
+ %% init_indecies, it uses info which is reset
+ %% in init_indecies, it doesn't matter, because
+ %% init_indecies don't use the ram replica of
+ %% the table when creating the disc index;
+ %% Could be improved :)
+ mnesia_schema:ram_delete_table(Tab, FromS),
+ restore_indexes(Tab, ToS, Cs);
+ {disc_only_copies, ram_copies} when FromS == disc_only_copies ->
+ mnesia_monitor:unsafe_close_dets(Tab),
+ disc_delete_indecies(Tab, Cs, FromS),
+ case InitBy of
+ startup ->
+ ignore;
+ _ ->
+ mnesia_controller:get_disc_copy(Tab),
+ ok
+ end,
+ disc_delete_table(Tab, FromS);
+ {disc_only_copies, ram_copies} when element(1, FromS) == ext ->
+ change_disc_to_ram(
+ Tab, Cs, FromS, ToS, Logtmp, InitBy);
+ {disc_copies, disc_only_copies} ->
+ ok = ensure_rename(Dmp, Dat),
+ true = open_files(Tab, ToS, InPlace, InitBy),
+ mnesia_schema:ram_delete_table(Tab, FromS),
+ restore_indexes(Tab, ToS, Cs),
+ _ = file:delete(Dcl),
+ _ = file:delete(Dcd),
+ ok;
+ {disc_only_copies, disc_copies} ->
+ mnesia_monitor:unsafe_close_dets(Tab),
+ disc_delete_indecies(Tab, Cs, disc_only_copies),
+ case InitBy of
+ startup ->
+ ignore;
+ _ ->
+ mnesia_log:ets2dcd(Tab),
+ mnesia_controller:get_disc_copy(Tab),
+ disc_delete_table(Tab, disc_only_copies)
+ end
+ end
end;
true ->
ignore
@@ -607,6 +698,9 @@ insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
Tab = Cs#cstruct.name,
Type = Cs#cstruct.type,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
+ Semantics = if Storage==unknown -> unknown;
+ true -> storage_semantics(Storage)
+ end,
%% Delete all possibly existing files and tables
disc_delete_table(Tab, Storage),
disc_delete_indecies(Tab, Cs, Storage),
@@ -625,13 +719,13 @@ insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
%% And create new ones..
if
- (InitBy == startup) or (Storage == unknown) ->
+ (InitBy == startup) or (Semantics == unknown) ->
ignore;
- Storage == ram_copies ->
+ Semantics == ram_copies ->
EtsProps = proplists:get_value(ets, StorageProps, []),
Args = [{keypos, 2}, public, named_table, Type | EtsProps],
mnesia_monitor:mktab(Tab, Args);
- Storage == disc_copies ->
+ Semantics == disc_copies ->
EtsProps = proplists:get_value(ets, StorageProps, []),
Args = [{keypos, 2}, public, named_table, Type | EtsProps],
mnesia_monitor:mktab(Tab, Args),
@@ -640,7 +734,7 @@ insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
{repair, false}, {mode, read_write}],
{ok, Log} = mnesia_monitor:open_log(FArg),
mnesia_monitor:unsafe_close_log(Log);
- Storage == disc_only_copies ->
+ Storage == disc_only_copies -> % note: Storage, not Semantics
File = mnesia_lib:tab2dat(Tab),
file:delete(File),
DetsProps = proplists:get_value(dets, StorageProps, []),
@@ -649,7 +743,10 @@ insert_op(Tid, _, {op, restore_recreate, TabDef}, InPlace, InitBy) ->
{keypos, 2},
{repair, mnesia_monitor:get_env(auto_repair)}
| DetsProps ],
- mnesia_monitor:open_dets(Tab, Args)
+ mnesia_monitor:open_dets(Tab, Args);
+ element(1,Storage) == ext ->
+ {ext, Alias, Mod} = Storage,
+ Mod:create_table(Alias, Tab, [])
end,
insert_op(Tid, ignore, {op, create_table, TabDef}, InPlace, InitBy);
@@ -659,9 +756,10 @@ insert_op(Tid, _, {op, create_table, TabDef}, InPlace, InitBy) ->
Tab = Cs#cstruct.name,
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
StorageProps = Cs#cstruct.storage_properties,
+ Semantics = storage_semantics(Storage),
case InitBy of
startup ->
- case Storage of
+ case Semantics of
unknown ->
ignore;
ram_copies ->
@@ -679,20 +777,10 @@ insert_op(Tid, _, {op, create_table, TabDef}, InPlace, InitBy) ->
read_write),
mnesia_log:unsafe_close_log(temp)
end;
- _ ->
+ disc_only_copies ->
DetsProps = proplists:get_value(dets, StorageProps, []),
- Args = [{file, mnesia_lib:tab2dat(Tab)},
- {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)},
- {keypos, 2},
- {repair, mnesia_monitor:get_env(auto_repair)}
- | DetsProps ],
- case mnesia_monitor:open_dets(Tab, Args) of
- {ok, _} ->
- mnesia_monitor:unsafe_close_dets(Tab);
- {error, Error} ->
- exit({"Failed to create dets table", Error})
- end
+ try_create_disc_only_copy(Storage, Tab, Cs, DetsProps)
end;
_ ->
Copies = mnesia_lib:copy_holders(Cs),
@@ -715,7 +803,7 @@ insert_op(Tid, _, {op, create_table, TabDef}, InPlace, InitBy) ->
false ->
mnesia_lib:set({Tab, where_to_read}, node())
end,
- case Storage of
+ case Semantics of
ram_copies ->
ignore;
_ ->
@@ -838,7 +926,7 @@ insert_op(Tid, _, {op, del_table_copy, Storage, Node, TabDef}, InPlace, InitBy)
insert_cstruct(Tid, Cs, true, InPlace, InitBy);
Tab /= schema ->
mnesia_controller:del_active_replica(Tab, Node),
- mnesia_lib:del({Tab, Storage}, Node),
+ mnesia_lib:del({Tab, storage_alias(Storage)}, Node),
if
Node == node() ->
case Cs#cstruct.local_content of
@@ -896,9 +984,10 @@ insert_op(Tid, _, {op, add_index, Pos, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
Tab = insert_cstruct(Tid, Cs, true, InPlace, InitBy),
Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
+ Semantics = storage_semantics(Storage),
case InitBy of
- startup when Storage == disc_only_copies ->
- true = open_files(Tab, Storage, InPlace, InitBy),
+ startup when Semantics == disc_only_copies ->
+ true = open_files(Tab, Semantics, Storage, InPlace, InitBy),
mnesia_index:init_indecies(Tab, Storage, [Pos]);
startup ->
ignore;
@@ -919,6 +1008,8 @@ insert_op(Tid, _, {op, del_index, Pos, TabDef}, InPlace, InitBy) ->
mnesia_index:del_index_table(Tab, Storage, Pos);
startup ->
ignore;
+ _ when element(1, Storage) == ext ->
+ mnesia_index:del_index_table(Tab, Storage, Pos);
_ ->
mnesia_index:del_index_table(Tab, Storage, Pos)
end,
@@ -958,19 +1049,68 @@ insert_op(Tid, _, {op, change_table_frag, _Change, TabDef}, InPlace, InitBy) ->
Cs = mnesia_schema:list2cs(TabDef),
insert_cstruct(Tid, Cs, true, InPlace, InitBy).
-open_files(Tab, Storage, UpdateInPlace, InitBy)
- when Storage /= unknown, Storage /= ram_copies ->
+
+storage_semantics({ext, Alias, Mod}) ->
+ Mod:semantics(Alias, storage);
+storage_semantics(Storage) when is_atom(Storage) ->
+ Storage.
+
+storage_alias({ext, Alias, _}) ->
+ Alias;
+storage_alias(Storage) when is_atom(Storage) ->
+ Storage.
+
+change_disc_to_ram(Tab, Cs, FromS, ToS, Logtmp, InitBy) ->
+ disc_delete_indecies(Tab, Cs, FromS),
+ case InitBy of
+ startup ->
+ ignore;
+ _ ->
+ %% ram table will already have been created
+ Tab = ets:info(Tab, name), %% assertion
+ load_from_logfile(ToS, Tab, Logtmp),
+ PosList = Cs#cstruct.index,
+ mnesia_index:init_indecies(Tab, ToS, PosList)
+ end,
+ disc_delete_table(Tab, FromS).
+
+
+try_create_disc_only_copy({ext,Alias,Mod}, Tab, Cs, _) ->
+ Mod:create_table(Alias, Tab, mnesia_schema:cs2list(Cs));
+try_create_disc_only_copy(disc_only_copies, Tab, Cs, DetsProps) ->
+ Args = [{file, mnesia_lib:tab2dat(Tab)},
+ {type, mnesia_lib:disk_type(Tab, Cs#cstruct.type)},
+ {keypos, 2},
+ {repair, mnesia_monitor:get_env(auto_repair)}
+ | DetsProps],
+ case mnesia_monitor:open_dets(Tab, Args) of
+ {ok, _} ->
+ mnesia_monitor:unsafe_close_dets(Tab);
+ {error, Error} ->
+ exit({"Failed to create dets table", Error})
+ end.
+
+restore_indexes(Tab, ToS, Cs) ->
+ PosList = Cs#cstruct.index,
+ mnesia_index:init_indecies(Tab, ToS, PosList).
+
+
+open_files(Tab, Storage, UpdateInPlace, InitBy) ->
+ open_files(Tab, storage_semantics(Storage), Storage, UpdateInPlace, InitBy).
+
+open_files(Tab, Semantics, Storage, UpdateInPlace, InitBy)
+ when Storage /= unknown, Semantics /= ram_copies ->
case get({?MODULE, Tab}) of
undefined ->
case ?catch_val({Tab, setorbag}) of
{'EXIT', _} ->
false;
Type ->
- case Storage of
- disc_copies when Tab /= schema ->
+ Cs = val({Tab, cstruct}),
+ if Semantics == disc_copies, Tab /= schema ->
Bool = open_disc_copies(Tab, InitBy),
Bool;
- _ ->
+ Storage == disc_only_copies; Tab == schema ->
Props = val({Tab, storage_properties}),
DetsProps = proplists:get_value(dets, Props, []),
Fname = prepare_open(Tab, UpdateInPlace),
@@ -981,6 +1121,12 @@ open_files(Tab, Storage, UpdateInPlace, InitBy)
| DetsProps],
{ok, _} = mnesia_monitor:open_dets(Tab, Args),
put({?MODULE, Tab}, {opened_dumper, dat}),
+ true;
+ element(1, Storage) == ext ->
+ {ext, Alias, Mod} = Storage,
+ Mod:load_table(Alias, Tab, InitBy,
+ mnesia_schema:cs2list(Cs)),
+ put({?MODULE, Tab}, {opened_dumper, ext}),
true
end
end;
@@ -989,7 +1135,7 @@ open_files(Tab, Storage, UpdateInPlace, InitBy)
{opened_dumper, _} ->
true
end;
-open_files(_Tab, _Storage, _UpdateInPlace, _InitBy) ->
+open_files(_Tab, _Semantics, _Storage, _UpdateInPlace, _InitBy) ->
false.
open_disc_copies(Tab, InitBy) ->
@@ -1076,12 +1222,13 @@ close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, already_dumped} | Tail])
close_files(InPlace, Outcome, InitBy, Tail);
close_files(InPlace, Outcome, InitBy, [{{?MODULE, Tab}, {opened_dumper, Type}} | Tail]) ->
erase({?MODULE, Tab}),
- case val({Tab, storage_type}) of
+ Storage = val({Tab, storage_type}),
+ case storage_semantics(Storage) of
disc_only_copies when InitBy /= startup ->
ignore;
- disc_copies when Tab /= schema ->
+ disc_copies when Storage /= unknown, Tab /= schema ->
mnesia_log:close_log({?MODULE,Tab});
- Storage ->
+ _ ->
do_close(InPlace, Outcome, Tab, Type, Storage)
end,
close_files(InPlace, Outcome, InitBy, Tail);
@@ -1093,11 +1240,16 @@ close_files(_, _, _InitBy, []) ->
%% If storage is unknown during close clean up files, this can happen if timing
%% is right and dirty_write conflicts with schema operations.
+do_close(_, _, Tab, ext, {ext,Alias,Mod}) ->
+ Mod:close_table(Alias, Tab);
do_close(_, _, Tab, dcl, unknown) ->
mnesia_log:close_log({?MODULE,Tab}),
file:delete(mnesia_lib:tab2dcl(Tab));
do_close(_, _, Tab, dcl, _) -> %% To be safe, can it happen?
mnesia_log:close_log({?MODULE,Tab});
+do_close(_, _, _Tab, ext, unknown) ->
+ %% Not sure what to do here, but let's not crash
+ ok;
do_close(InPlace, Outcome, Tab, dat, Storage) ->
mnesia_monitor:close_dets(Tab),
@@ -1148,7 +1300,8 @@ temp_set_master_nodes() ->
Tabs = val({schema, local_tables}),
Masters = [{Tab, (val({Tab, disc_copies}) ++
val({Tab, ram_copies}) ++
- val({Tab, disc_only_copies})) -- [node()]}
+ val({Tab, disc_only_copies}) ++
+ external_copies(Tab)) -- [node()]}
|| Tab <- Tabs],
%% UseDir = false since we don't want to remember these
%% masternodes and we are running (really soon anyway) since we want this
@@ -1156,6 +1309,13 @@ temp_set_master_nodes() ->
mnesia_recover:log_master_nodes(Masters, false, yes),
ok.
+external_copies(Tab) ->
+ case ?catch_val({Tab, external_copies}) of
+ {'EXIT',_} -> [];
+ Ext ->
+ lists:concat([Ns || {_, Ns} <- Ext])
+ end.
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Raw dump of table. Dumper must have unique access to the ets table.
@@ -1207,6 +1367,59 @@ raw_named_dump_table(Tab, Ftype) ->
raw_dump_table(DetsRef, EtsRef) ->
dets:from_ets(DetsRef, EtsRef).
+dump_to_logfile(Storage, Tab) ->
+ case mnesia_monitor:use_dir() of
+ true ->
+ Logtmp = mnesia_lib:tab2logtmp(Tab),
+ file:delete(Logtmp),
+ case disk_log:open([{name, make_ref()},
+ {file, Logtmp},
+ {repair, false},
+ {linkto, self()}]) of
+ {ok, Fd} ->
+ mnesia_lib:db_fixtable(Storage, Tab, true),
+ try do_dump_to_logfile(Storage, Tab, Fd)
+ after
+ mnesia_lib:db_fixtable(Storage, Tab, false)
+ end;
+ {error, _} = Error ->
+ Error
+ end;
+ false ->
+ {error, {has_no_disc, node()}}
+ end.
+
+do_dump_to_logfile(Storage, Tab, Fd) ->
+ Pat = [{'_',[],['$_']}],
+ log_terms(mnesia_lib:db_select_init(Storage, Tab, Pat, 100), Storage, Tab, Pat, Fd).
+
+log_terms({Objs, Cont}, Storage, Tab, Pat, Fd) ->
+ ok = disk_log:alog_terms(Fd, Objs),
+ log_terms(mnesia_lib:db_select_cont(Storage, Cont, '_'), Storage, Tab, Pat, Fd);
+log_terms('$end_of_table', _, _, _, Fd) ->
+ disk_log:close(Fd).
+
+load_from_logfile(Storage, Tab, F) ->
+ case disk_log:open([{name, make_ref()},
+ {file, F},
+ {repair, true},
+ {linkto, self()}]) of
+ {ok, Fd} ->
+ chunk_from_log(disk_log:chunk(Fd, start), Fd, Storage, Tab);
+ {repaired, Fd, _, _} ->
+ chunk_from_log(disk_log:chunk(Fd, start), Fd, Storage, Tab);
+ {error, _} = E ->
+ E
+ end.
+
+chunk_from_log({Cont, Terms}, Fd, Storage, Tab) ->
+ _ = [mnesia_lib:db_put(Storage, Tab, T) || T <- Terms],
+ chunk_from_log(disk_log:chunk(Fd, Cont), Fd, Storage, Tab);
+chunk_from_log(eof, _, _, _) ->
+ ok.
+
+
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Load regulator
%%
diff --git a/lib/mnesia/src/mnesia_ext_sup.erl b/lib/mnesia/src/mnesia_ext_sup.erl
new file mode 100644
index 0000000000..3e6c72161c
--- /dev/null
+++ b/lib/mnesia/src/mnesia_ext_sup.erl
@@ -0,0 +1,59 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2015. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% This module implements a supervisor for external (plug-in) processes
+
+-module(mnesia_ext_sup).
+-behaviour(supervisor).
+
+-export([start/0,
+ init/1,
+ start_proc/4, start_proc/5,
+ stop_proc/1]).
+
+-define(SHUTDOWN, 120000). % 2 minutes
+
+start() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+start_proc(Name, M, F, A) ->
+ start_proc(Name, M, F, A, []).
+
+start_proc(Name, M, F, A, Opts) ->
+ [Restart, Shutdown, Type, Modules] =
+ [proplists:get_value(K, Opts, Default)
+ || {K, Default} <- [{restart, transient},
+ {shutdown, ?SHUTDOWN},
+ {type, worker},
+ {modules, [M]}]],
+ case supervisor:start_child(
+ ?MODULE, {Name, {M,F,A}, Restart, Shutdown, Type, Modules}) of
+ {error, already_present} ->
+ supervisor:restart_child(?MODULE, Name);
+ Other ->
+ Other
+ end.
+
+stop_proc(Name) ->
+ supervisor:terminate_child(?MODULE, Name).
+
+init(_) ->
+ {ok, {{one_for_all, 10, 60}, []}}.
diff --git a/lib/mnesia/src/mnesia_frag.erl b/lib/mnesia/src/mnesia_frag.erl
index c4de603460..c6e812b36d 100644
--- a/lib/mnesia/src/mnesia_frag.erl
+++ b/lib/mnesia/src/mnesia_frag.erl
@@ -183,6 +183,8 @@ table_info2(ActivityId, Opaque, Tab, Frag, Item) ->
length(val({Tab, disc_copies}));
n_disc_only_copies ->
length(val({Tab, disc_only_copies}));
+ n_external_copies ->
+ length(val({Tab, external_copies}));
frag_names ->
frag_names(Tab);
@@ -498,13 +500,13 @@ expand_cstruct(Cs, Mode) ->
%% Verify keys
ValidKeys = [foreign_key, n_fragments, node_pool,
n_ram_copies, n_disc_copies, n_disc_only_copies,
- hash_module, hash_state],
+ n_external_copies, hash_module, hash_state],
Keys = mnesia_schema:check_keys(Tab, Props, ValidKeys),
mnesia_schema:check_duplicates(Tab, Keys),
%% Pick fragmentation props
ForeignKey = mnesia_schema:pick(Tab, foreign_key, Props, undefined),
- {ForeignKey2, N, Pool, DefaultNR, DefaultND, DefaultNDO} =
+ {ForeignKey2, N, Pool, DefaultNR, DefaultND, DefaultNDO, DefaultNExt} =
pick_props(Tab, Cs, ForeignKey),
%% Verify node_pool
@@ -518,6 +520,7 @@ expand_cstruct(Cs, Mode) ->
NR = mnesia_schema:pick(Tab, n_ram_copies, Props, 0),
ND = mnesia_schema:pick(Tab, n_disc_copies, Props, 0),
NDO = mnesia_schema:pick(Tab, n_disc_only_copies, Props, 0),
+ NExt = mnesia_schema:pick(Tab, n_external_copies, Props, 0),
PosInt = fun(I) when is_integer(I), I >= 0 -> true;
(_I) -> false
@@ -528,6 +531,8 @@ expand_cstruct(Cs, Mode) ->
{bad_type, Tab, {n_disc_copies, ND}}),
mnesia_schema:verify(true, PosInt(NDO),
{bad_type, Tab, {n_disc_only_copies, NDO}}),
+ mnesia_schema:verify(true, PosInt(NExt),
+ {bad_type, Tab, {n_external_copies, NDO}}),
%% Verify n_fragments
Cs2 = verify_n_fragments(N, Cs, Mode),
@@ -542,13 +547,13 @@ expand_cstruct(Cs, Mode) ->
hash_module = HashMod,
hash_state = HashState2},
if
- NR == 0, ND == 0, NDO == 0 ->
- do_expand_cstruct(Cs2, FH, N, Pool, DefaultNR, DefaultND, DefaultNDO, Mode);
+ NR == 0, ND == 0, NDO == 0, NExt == 0 ->
+ do_expand_cstruct(Cs2, FH, N, Pool, DefaultNR, DefaultND, DefaultNDO, DefaultNExt, Mode);
true ->
- do_expand_cstruct(Cs2, FH, N, Pool, NR, ND, NDO, Mode)
+ do_expand_cstruct(Cs2, FH, N, Pool, NR, ND, NDO, NExt, Mode)
end.
-do_expand_cstruct(Cs, FH, N, Pool, NR, ND, NDO, Mode) ->
+do_expand_cstruct(Cs, FH, N, Pool, NR, ND, NDO, NExt, Mode) ->
Tab = Cs#cstruct.name,
LC = Cs#cstruct.local_content,
@@ -562,14 +567,15 @@ do_expand_cstruct(Cs, FH, N, Pool, NR, ND, NDO, Mode) ->
%% Add empty fragments
CommonProps = [{base_table, Tab}],
Cs2 = Cs#cstruct{frag_properties = lists:sort(CommonProps)},
- expand_frag_cstructs(N, NR, ND, NDO, Cs2, Pool, Pool, FH, Mode).
+ expand_frag_cstructs(N, NR, ND, NDO, NExt, Cs2, Pool, Pool, FH, Mode).
verify_n_fragments(N, Cs, Mode) when is_integer(N), N >= 1 ->
case Mode of
create ->
Cs#cstruct{ram_copies = [],
disc_copies = [],
- disc_only_copies = []};
+ disc_only_copies = [],
+ external_copies = []};
activate ->
Reason = {combine_error, Cs#cstruct.name, {n_fragments, N}},
mnesia_schema:verify(1, N, Reason),
@@ -607,7 +613,8 @@ pick_props(Tab, Cs, {ForeignTab, Attr}) ->
DefaultNR = length(val({ForeignTab, ram_copies})),
DefaultND = length(val({ForeignTab, disc_copies})),
DefaultNDO = length(val({ForeignTab, disc_only_copies})),
- {Key, N, Pool, DefaultNR, DefaultND, DefaultNDO};
+ DefaultNExt = length(val({ForeignTab, external_copies})),
+ {Key, N, Pool, DefaultNR, DefaultND, DefaultNDO, DefaultNExt};
pick_props(Tab, Cs, undefined) ->
Props = Cs#cstruct.frag_properties,
DefaultN = 1,
@@ -617,22 +624,23 @@ pick_props(Tab, Cs, undefined) ->
DefaultNR = 1,
DefaultND = 0,
DefaultNDO = 0,
- {undefined, N, Pool, DefaultNR, DefaultND, DefaultNDO};
+ DefaultNExt = 0,
+ {undefined, N, Pool, DefaultNR, DefaultND, DefaultNDO, DefaultNExt};
pick_props(Tab, _Cs, BadKey) ->
mnesia:abort({bad_type, Tab, {foreign_key, BadKey}}).
-expand_frag_cstructs(N, NR, ND, NDO, CommonCs, Dist, Pool, FH, Mode)
+expand_frag_cstructs(N, NR, ND, NDO, NExt, CommonCs, Dist, Pool, FH, Mode)
when N > 1, Mode == create ->
Frag = n_to_frag_name(CommonCs#cstruct.name, N),
Cs = CommonCs#cstruct{name = Frag},
- {Cs2, RevModDist, RestDist} = set_frag_nodes(NR, ND, NDO, Cs, Dist, []),
+ {Cs2, RevModDist, RestDist} = set_frag_nodes(NR, ND, NDO, NExt, Cs, Dist, []),
ModDist = lists:reverse(RevModDist),
Dist2 = rearrange_dist(Cs, ModDist, RestDist, Pool),
%% Adjusts backwards, but it doesn't matter.
{FH2, _FromFrags, _AdditionalWriteFrags} = adjust_before_split(FH),
- CsList = expand_frag_cstructs(N - 1, NR, ND, NDO, CommonCs, Dist2, Pool, FH2, Mode),
+ CsList = expand_frag_cstructs(N - 1, NR, ND, NDO, NExt, CommonCs, Dist2, Pool, FH2, Mode),
[Cs2 | CsList];
-expand_frag_cstructs(1, NR, ND, NDO, CommonCs, Dist, Pool, FH, Mode) ->
+expand_frag_cstructs(1, NR, ND, NDO, NExt, CommonCs, Dist, Pool, FH, Mode) ->
BaseProps = CommonCs#cstruct.frag_properties ++
[{foreign_key, FH#frag_state.foreign_key},
{hash_module, FH#frag_state.hash_module},
@@ -645,25 +653,29 @@ expand_frag_cstructs(1, NR, ND, NDO, CommonCs, Dist, Pool, FH, Mode) ->
activate ->
[BaseCs];
create ->
- {BaseCs2, _, _} = set_frag_nodes(NR, ND, NDO, BaseCs, Dist, []),
+ {BaseCs2, _, _} = set_frag_nodes(NR, ND, NDO, NExt, BaseCs, Dist, []),
[BaseCs2]
end.
-set_frag_nodes(NR, ND, NDO, Cs, [Head | Tail], Acc) when NR > 0 ->
+set_frag_nodes(NR, ND, NDO, NExt, Cs, [Head | Tail], Acc) when NR > 0 ->
Pos = #cstruct.ram_copies,
{Cs2, Head2} = set_frag_node(Cs, Pos, Head),
- set_frag_nodes(NR - 1, ND, NDO, Cs2, Tail, [Head2 | Acc]);
-set_frag_nodes(NR, ND, NDO, Cs, [Head | Tail], Acc) when ND > 0 ->
+ set_frag_nodes(NR - 1, ND, NDO, NExt, Cs2, Tail, [Head2 | Acc]);
+set_frag_nodes(NR, ND, NDO, NExt, Cs, [Head | Tail], Acc) when ND > 0 ->
Pos = #cstruct.disc_copies,
{Cs2, Head2} = set_frag_node(Cs, Pos, Head),
- set_frag_nodes(NR, ND - 1, NDO, Cs2, Tail, [Head2 | Acc]);
-set_frag_nodes(NR, ND, NDO, Cs, [Head | Tail], Acc) when NDO > 0 ->
+ set_frag_nodes(NR, ND - 1, NDO, NExt, Cs2, Tail, [Head2 | Acc]);
+set_frag_nodes(NR, ND, NDO, NExt, Cs, [Head | Tail], Acc) when NDO > 0 ->
Pos = #cstruct.disc_only_copies,
{Cs2, Head2} = set_frag_node(Cs, Pos, Head),
- set_frag_nodes(NR, ND, NDO - 1, Cs2, Tail, [Head2 | Acc]);
-set_frag_nodes(0, 0, 0, Cs, RestDist, ModDist) ->
+ set_frag_nodes(NR, ND, NDO - 1, NExt, Cs2, Tail, [Head2 | Acc]);
+set_frag_nodes(NR, ND, NDO, NExt, Cs, [Head | Tail], Acc) when NExt > 0 ->
+ Pos = #cstruct.external_copies,
+ {Cs2, Head2} = set_frag_node(Cs, Pos, Head),
+ set_frag_nodes(NR, ND, NDO, NExt - 1, Cs2, Tail, [Head2 | Acc]);
+set_frag_nodes(0, 0, 0, 0, Cs, RestDist, ModDist) ->
{Cs, ModDist, RestDist};
-set_frag_nodes(_, _, _, Cs, [], _) ->
+set_frag_nodes(_, _, _, _, Cs, [], _) ->
mnesia:abort({combine_error, Cs#cstruct.name, "Too few nodes in node_pool"}).
set_frag_node(Cs, Pos, Head) ->
@@ -840,13 +852,15 @@ make_add_frag(Tab, SortedNs) ->
NR = length(Cs#cstruct.ram_copies),
ND = length(Cs#cstruct.disc_copies),
NDO = length(Cs#cstruct.disc_only_copies),
+ NExt = length(Cs#cstruct.external_copies),
NewCs = Cs#cstruct{name = NewFrag,
frag_properties = [{base_table, Tab}],
ram_copies = [],
disc_copies = [],
- disc_only_copies = []},
+ disc_only_copies = [],
+ external_copies = []},
- {NewCs2, _, _} = set_frag_nodes(NR, ND, NDO, NewCs, SortedNs, []),
+ {NewCs2, _, _} = set_frag_nodes(NR, ND, NDO, NExt, NewCs, SortedNs, []),
[NewOp] = mnesia_schema:make_create_table(NewCs2),
SplitOps = split(Tab, FH2, FromIndecies, FragNames, []),
@@ -1319,7 +1333,8 @@ count_frag([Frag | Frags], Dist) ->
Dist2 = incr_nodes(val({Frag, ram_copies}), Dist),
Dist3 = incr_nodes(val({Frag, disc_copies}), Dist2),
Dist4 = incr_nodes(val({Frag, disc_only_copies}), Dist3),
- count_frag(Frags, Dist4);
+ Dist5 = incr_nodes(val({Frag, external_copies}), Dist4),
+ count_frag(Frags, Dist5);
count_frag([], Dist) ->
Dist.
diff --git a/lib/mnesia/src/mnesia_index.erl b/lib/mnesia/src/mnesia_index.erl
index be05cfbea3..73d170d1fa 100644
--- a/lib/mnesia/src/mnesia_index.erl
+++ b/lib/mnesia/src/mnesia_index.erl
@@ -23,8 +23,8 @@
-module(mnesia_index).
-export([read/5,
- add_index/5,
- delete_index/3,
+ add_index/6,
+ delete_index/4,
del_object_index/5,
clear_index/4,
dirty_match_object/3,
@@ -39,19 +39,21 @@
get_index_table/3,
tab2filename/2,
- tab2tmp_filename/2,
init_index/2,
init_indecies/3,
del_transient/2,
del_transient/3,
- del_index_table/3]).
+ del_index_table/3,
+
+ index_info/2,
+ ext_index_instances/1]).
-import(mnesia_lib, [val/1, verbose/2]).
-include("mnesia.hrl").
-record(index, {setorbag, pos_list}).
-%% read an object list throuh its index table
+%% read an object list through its index table
%% we assume that table Tab has index on attribute number Pos
read(Tid, Store, Tab, IxKey, Pos) ->
@@ -64,69 +66,103 @@ read(Tid, Store, Tab, IxKey, Pos) ->
ResList
end.
-add_index(Index, Tab, Key, Obj, Old) ->
- add_index2(Index#index.pos_list, Index#index.setorbag, Tab, Key, Obj, Old).
-
-add_index2([{Pos, Ixt} |Tail], bag, Tab, K, Obj, OldRecs) ->
- db_put(Ixt, {element(Pos, Obj), K}),
- add_index2(Tail, bag, Tab, K, Obj, OldRecs);
-add_index2([{Pos, Ixt} |Tail], Type, Tab, K, Obj, OldRecs0) ->
+ext_index_instances(Tab) ->
+ #index{pos_list = PosL} = val({Tab, index_info}),
+ lists:foldr(
+ fun({_, {{ext,Alias,Mod}, Tag}}, Acc) ->
+ [{Alias, Mod, Tag}|Acc];
+ (_, Acc) ->
+ Acc
+ end, [], PosL).
+
+
+add_index(#index{pos_list = PosL, setorbag = SorB},
+ Storage, Tab, Key, Obj, Old) ->
+ add_index2(PosL, SorB, Storage, Tab, Key, Obj, Old).
+
+add_index2([{{Pos,Type}, Ixt} |Tail], bag, Storage, Tab, K, Obj, OldRecs) ->
+ ValsF = index_vals_f(Storage, Tab, Pos),
+ Vals = ValsF(Obj),
+ put_index_vals(Type, Ixt, Vals, K),
+ add_index2(Tail, bag, Storage, Tab, K, Obj, OldRecs);
+add_index2([{{Pos, Type}, Ixt} |Tail], SorB, Storage, Tab, K, Obj, OldRecs0) ->
%% Remove old tuples in index if Tab is updated
+ ValsF = index_vals_f(Storage, Tab, Pos), NewVals = ValsF(Obj),
OldRecs1 = case OldRecs0 of
- undefined -> mnesia_lib:db_get(Tab, K);
+ undefined -> mnesia_lib:db_get(Storage, Tab, K);
_ -> OldRecs0
end,
- IdxVal = element(Pos, Obj),
- case [Old || Old <- OldRecs1, element(Pos, Old) =/= IdxVal] of
- [] when OldRecs1 =:= [] -> %% Write
- db_put(Ixt, {element(Pos, Obj), K}),
- add_index2(Tail, Type, Tab, K, Obj, OldRecs0);
+ IdxVal = ValsF(Obj),
+ case [Old || Old <- OldRecs1, ValsF(Old) =/= IdxVal] of
+ [] when OldRecs1 =:= [] -> % Write
+ put_index_vals(Type, Ixt, NewVals, K),
+ add_index2(Tail, SorB, Storage, Tab, K, Obj, OldRecs1);
[] -> %% when OldRecs1 =/= [] Update without modifying index field
- add_index2(Tail, Type, Tab, K, Obj, OldRecs0);
+ add_index2(Tail, SorB, Storage, Tab, K, Obj, OldRecs1);
OldRecs -> %% Update
- db_put(Ixt, {element(Pos, Obj), K}),
- del_ixes(Ixt, OldRecs, Pos, K),
- add_index2(Tail, Type, Tab, K, Obj, OldRecs0)
+ put_index_vals(Type, Ixt, NewVals, K),
+ [del_ixes(Type, Ixt, ValsF, OldObj, K) || OldObj <- OldRecs],
+ add_index2(Tail, SorB, Storage, Tab, K, Obj, OldRecs1)
end;
-add_index2([], _, _Tab, _K, _Obj, _) -> ok.
+add_index2([], _, _, _Tab, _K, _Obj, _) -> ok.
+
+delete_index(Index, Storage, Tab, K) ->
+ delete_index2(Index#index.pos_list, Storage, Tab, K).
-delete_index(Index, Tab, K) ->
- delete_index2(Index#index.pos_list, Tab, K).
+delete_index2([{{Pos, Type}, Ixt} | Tail], Storage, Tab, K) ->
+ DelObjs = mnesia_lib:db_get(Storage, Tab, K),
+ ValsF = index_vals_f(Storage, Tab, Pos),
+ [del_ixes(Type, Ixt, ValsF, Obj, K) || Obj <- DelObjs],
+ delete_index2(Tail, Storage, Tab, K);
+delete_index2([], _Storage, _Tab, _K) -> ok.
-delete_index2([{Pos, Ixt} | Tail], Tab, K) ->
- DelObjs = mnesia_lib:db_get(Tab, K),
- del_ixes(Ixt, DelObjs, Pos, K),
- delete_index2(Tail, Tab, K);
-delete_index2([], _Tab, _K) -> ok.
+put_index_vals(ordered, Ixt, Vals, K) ->
+ [db_put(Ixt, {{V, K}}) || V <- Vals];
+put_index_vals(bag, Ixt, Vals, K) ->
+ [db_put(Ixt, {V, K}) || V <- Vals].
-del_ixes(_Ixt, [], _Pos, _L) -> ok;
-del_ixes(Ixt, [Obj | Tail], Pos, Key) ->
- db_match_erase(Ixt, {element(Pos, Obj), Key}),
- del_ixes(Ixt, Tail, Pos, Key).
+del_ixes(bag, Ixt, ValsF, Obj, Key) ->
+ Vals = ValsF(Obj),
+ [db_match_erase(Ixt, {V, Key}) || V <- Vals];
+del_ixes(ordered, Ixt, ValsF, Obj, Key) ->
+ Vals = ValsF(Obj),
+ [db_erase(Ixt, {V,Key}) || V <- Vals].
-del_object_index(Index, Tab, K, Obj, Old) ->
- del_object_index2(Index#index.pos_list, Index#index.setorbag, Tab, K, Obj, Old).
+del_object_index(#index{pos_list = PosL, setorbag = SorB}, Storage, Tab, K, Obj) ->
+ del_object_index2(PosL, SorB, Storage, Tab, K, Obj).
-del_object_index2([], _, _Tab, _K, _Obj, _Old) -> ok;
-del_object_index2([{Pos, Ixt} | Tail], SoB, Tab, K, Obj, Old) ->
+del_object_index2([], _, _Storage, _Tab, _K, _Obj) -> ok;
+del_object_index2([{{Pos, Type}, Ixt} | Tail], SoB, Storage, Tab, K, Obj) ->
+ ValsF = index_vals_f(Storage, Tab, Pos),
case SoB of
bag ->
- del_object_bag(Tab, K, Obj, Pos, Ixt, Old);
+ del_object_bag(Type, ValsF, Tab, K, Obj, Ixt);
_ -> %% If set remove the tuple in index table
- del_ixes(Ixt, [Obj], Pos, K)
+ del_ixes(Type, Ixt, ValsF, Obj, K)
end,
- del_object_index2(Tail, SoB, Tab, K, Obj, Old).
-
-del_object_bag(Tab, Key, Obj, Pos, Ixt, undefined) ->
- IxKey = element(Pos, Obj),
- Old = [X || X <- mnesia_lib:db_get(Tab, Key), element(Pos, X) =:= IxKey],
- del_object_bag(Tab, Key, Obj, Pos, Ixt, Old);
-%% If Tab type is bag we need remove index identifier if the object being
-%% deleted was the last one
-del_object_bag(_Tab, Key, Obj, Pos, Ixt, Old) when Old =:= [Obj] ->
- del_ixes(Ixt, [Obj], Pos, Key);
-del_object_bag(_Tab, _Key, _Obj, _Pos, _Ixt, _Old) -> ok.
+ del_object_index2(Tail, SoB, Storage, Tab, K, Obj).
+
+del_object_bag(Type, ValsF, Tab, Key, Obj, Ixt) ->
+ IxKeys = ValsF(Obj),
+ Found = [{X, ValsF(X)} || X <- mnesia_lib:db_get(Tab, Key)],
+ del_object_bag_(IxKeys, Found, Type, Tab, Key, Obj, Ixt).
+
+del_object_bag_([IxK|IxKs], Found, Type, Tab, Key, Obj, Ixt) ->
+ case [X || {X, Ixes} <- Found, lists:member(IxK, Ixes)] of
+ [Old] when Old =:= Obj ->
+ case Type of
+ bag ->
+ db_match_erase(Ixt, {IxK, Key});
+ ordered ->
+ db_erase(Ixt, {{IxK, Key}})
+ end;
+ _ ->
+ ok
+ end,
+ del_object_bag_(IxKs, Found, Type, Tab, Key, Obj, Ixt);
+del_object_bag_([], _, _, _, _, _, _) ->
+ ok.
clear_index(Index, Tab, K, Obj) ->
clear_index2(Index#index.pos_list, Tab, K, Obj).
@@ -138,8 +174,9 @@ clear_index2([{_Pos, Ixt} | Tail], Tab, K, Obj) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-dirty_match_object(Tab, Pat, Pos) ->
+dirty_match_object(Tab, Pat, Pos) when is_integer(Pos) ->
%% Assume that we are on the node where the replica is
+ %% Cannot use index plugins here, as they don't map to match patterns
case element(2, Pat) of
'_' ->
IxKey = element(Pos, Pat),
@@ -161,7 +198,7 @@ realkeys(Tab, Pos, IxKey) ->
Index = get_index_table(Tab, Pos),
db_get(Index, IxKey). % a list on the form [{IxKey, RealKey1} , ....
-dirty_select(Tab, Spec, Pos) ->
+dirty_select(Tab, Spec, Pos) when is_integer(Pos) ->
%% Assume that we are on the node where the replica is
%% Returns the records without applying the match spec
%% The actual filtering is handled by the caller
@@ -171,59 +208,80 @@ dirty_select(Tab, Spec, Pos) ->
lists:append([mnesia_lib:db_get(StorageType, Tab, Key) || {_,Key} <- RealKeys]).
dirty_read(Tab, IxKey, Pos) ->
- ResList = mnesia:dirty_rpc(Tab, ?MODULE, dirty_read2,
- [Tab, IxKey, Pos]),
- case val({Tab, setorbag}) of
- bag ->
- %% Remove all tuples which don't include Ixkey
- mnesia_lib:key_search_all(IxKey, Pos, ResList);
- _ ->
- ResList
- end.
+ mnesia:dirty_rpc(Tab, ?MODULE, dirty_read2,
+ [Tab, IxKey, Pos]).
dirty_read2(Tab, IxKey, Pos) ->
- Ix = get_index_table(Tab, Pos),
- Keys = db_match(Ix, {IxKey, '$1'}),
- r_keys(Keys, Tab, []).
-
-r_keys([[H]|T],Tab,Ack) ->
- V = mnesia_lib:db_get(Tab, H),
- r_keys(T, Tab, V ++ Ack);
-r_keys([], _, Ack) ->
- Ack.
+ #index{pos_list = PosL} = val({Tab, index_info}),
+ Storage = val({Tab, storage_type}),
+ {Type, Ixt} = pick_index(PosL, Tab, Pos),
+ Pat = case Type of
+ ordered -> [{{{IxKey, '$1'}}, [], ['$1']}];
+ bag -> [{{IxKey, '$1'}, [], ['$1']}]
+ end,
+ Keys = db_select(Ixt, Pat),
+ ValsF = index_vals_f(Storage, Tab, Pos),
+ lists:reverse(
+ lists:foldl(
+ fun(K, Acc) ->
+ lists:foldl(
+ fun(Obj, Acc1) ->
+ case lists:member(IxKey, ValsF(Obj)) of
+ true -> [Obj|Acc1];
+ false -> Acc1
+ end
+ end, Acc, mnesia_lib:db_get(Storage, Tab, K))
+ end, [], Keys)).
+
+pick_index([{{{Pfx,_},IxType}, Ixt}|_], _Tab, {_} = Pfx) ->
+ {IxType, Ixt};
+pick_index([{{Pos,IxType}, Ixt}|_], _Tab, Pos) ->
+ {IxType, Ixt};
+pick_index([_|T], Tab, Pos) ->
+ pick_index(T, Tab, Pos);
+pick_index([], Tab, Pos) ->
+ mnesia:abort({no_exist, Tab, {index, Pos}}).
+
%%%%%%% Creation, Init and deletion routines for index tables
%% We can have several indexes on the same table
%% this can be a fairly costly operation if table is *very* large
-tab2filename(Tab, Pos) ->
+tab2filename(Tab, {A}) when is_atom(A) ->
+ mnesia_lib:dir(Tab) ++ "_-" ++ atom_to_list(A) ++ "-.DAT";
+tab2filename(Tab, T) when is_tuple(T) ->
+ tab2filename(Tab, element(1, T));
+tab2filename(Tab, Pos) when is_integer(Pos) ->
mnesia_lib:dir(Tab) ++ "_" ++ integer_to_list(Pos) ++ ".DAT".
-tab2tmp_filename(Tab, Pos) ->
- mnesia_lib:dir(Tab) ++ "_" ++ integer_to_list(Pos) ++ ".TMP".
-
init_index(Tab, Storage) ->
- PosList = val({Tab, index}),
+ Cs = val({Tab, cstruct}),
+ PosList = Cs#cstruct.index,
init_indecies(Tab, Storage, PosList).
init_indecies(Tab, Storage, PosList) ->
case Storage of
unknown ->
ignore;
+ {ext, Alias, Mod} ->
+ init_ext_index(Tab, Storage, Alias, Mod, PosList);
disc_only_copies ->
- init_disc_index(Tab, PosList);
+ init_disc_index(Tab, Storage, PosList);
ram_copies ->
- make_ram_index(Tab, PosList);
+ make_ram_index(Tab, Storage, PosList);
disc_copies ->
- make_ram_index(Tab, PosList)
+ make_ram_index(Tab, Storage, PosList)
end.
%% works for both ram and disc indexes
del_index_table(_, unknown, _) ->
ignore;
-del_index_table(Tab, Storage, Pos) ->
+del_index_table(Tab, Storage, {_} = Pos) ->
+ delete_transient_index(Tab, Pos, Storage),
+ mnesia_lib:del({Tab, index}, Pos);
+del_index_table(Tab, Storage, Pos) when is_integer(Pos) ->
delete_transient_index(Tab, Pos, Storage),
mnesia_lib:del({Tab, index}, Pos).
@@ -236,13 +294,24 @@ del_transient(Tab, [Pos | Tail], Storage) ->
delete_transient_index(Tab, Pos, Storage),
del_transient(Tab, Tail, Storage).
+delete_transient_index(Tab, Pos, {ext, Alias, Mod}) ->
+ PosInfo = case Pos of
+ _ when is_integer(Pos) ->
+ Cs = val({Tab, cstruct}),
+ lists:keyfind(Pos, 1, Cs#cstruct.index);
+ {P, T} -> {P, T}
+ end,
+ Tag = {Tab, index, PosInfo},
+ Mod:close_table(Alias, Tag),
+ Mod:delete_table(Alias, Tag),
+ del_index_info(Tab, Pos),
+ mnesia_lib:unset({Tab, {index, Pos}});
delete_transient_index(Tab, Pos, disc_only_copies) ->
Tag = {Tab, index, Pos},
mnesia_monitor:unsafe_close_dets(Tag),
_ = file:delete(tab2filename(Tab, Pos)),
del_index_info(Tab, Pos), %% Uses val(..)
mnesia_lib:unset({Tab, {index, Pos}});
-
delete_transient_index(Tab, Pos, _Storage) ->
Ixt = val({Tab, {index, Pos}}),
?ets_delete_table(Ixt),
@@ -252,11 +321,12 @@ delete_transient_index(Tab, Pos, _Storage) ->
%%%%% misc functions for the index create/init/delete functions above
%% assuming that the file exists.
-init_disc_index(_Tab, []) ->
+init_disc_index(_Tab, _Storage, []) ->
done;
-init_disc_index(Tab, [Pos | Tail]) when is_integer(Pos) ->
+init_disc_index(Tab, disc_only_copies, [{Pos,_Pref} | Tail]) ->
+ PosInfo = {Pos, bag},
Fn = tab2filename(Tab, Pos),
- IxTag = {Tab, index, Pos},
+ IxTag = {Tab, index, PosInfo},
_ = file:delete(Fn),
Args = [{file, Fn}, {keypos, 1}, {type, bag}],
mnesia_monitor:open_dets(IxTag, Args),
@@ -269,16 +339,59 @@ init_disc_index(Tab, [Pos | Tail]) when is_integer(Pos) ->
mnesia_lib:db_fixtable(Storage, Tab, true),
ok = dets:init_table(IxTag, create_fun(Init, Tab, Pos)),
mnesia_lib:db_fixtable(Storage, Tab, false),
- mnesia_lib:set({Tab, {index, Pos}}, IxTag),
- add_index_info(Tab, val({Tab, setorbag}), {Pos, {dets, IxTag}}),
- init_disc_index(Tab, Tail).
+ mnesia_lib:set({Tab, {index, PosInfo}}, IxTag),
+ add_index_info(Tab, val({Tab, setorbag}), {PosInfo, {dets, IxTag}}),
+ init_disc_index(Tab, Storage, Tail).
+
+init_ext_index(_, _, _, _, []) ->
+ done;
+init_ext_index(Tab, Storage, Alias, Mod, [{Pos,Type} | Tail]) ->
+ PosInfo = {Pos, Type},
+ IxTag = {Tab, index, PosInfo},
+ CS = val({Tab, cstruct}),
+ CsList = mnesia_schema:cs2list(CS),
+ _Res = mnesia_monitor:unsafe_create_external(IxTag, Alias, Mod, CsList),
+ Mod:load_table(Alias, IxTag, init_index, CsList),
+ case Mod:is_index_consistent(Alias, IxTag) of
+ false ->
+ Mod:index_is_consistent(Alias, IxTag, false),
+ Mod:match_delete(Alias, IxTag, '_'),
+ IxValsF = index_vals_f(Storage, Tab, Pos),
+ IxObjF = case Type of
+ bag -> fun(IxVal, Key) -> {IxVal, Key} end;
+ ordered -> fun(IxVal, Key) -> {{IxVal, Key}} end
+ end,
+ mnesia_lib:db_fixtable(Storage, Tab, true),
+ mnesia_lib:db_foldl(
+ Storage,
+ fun(Rec, Acc) ->
+ Key = element(2, Rec),
+ lists:foreach(
+ fun(V) ->
+ IxObj = IxObjF(V, Key),
+ Mod:insert(Alias, IxTag, IxObj)
+ end, IxValsF(Rec)),
+ Acc
+ end, ok, Tab,
+ [{'_', [], ['$_']}], 100),
+ Mod:index_is_consistent(Alias, IxTag, true);
+ true ->
+ ignore
+ end,
+
+ mnesia_lib:set({Tab, {index, PosInfo}}, IxTag),
+
+ add_index_info(Tab, val({Tab, setorbag}), {PosInfo, {Storage, IxTag}}),
+ init_ext_index(Tab, Storage, Alias, Mod, Tail).
create_fun(Cont, Tab, Pos) ->
+ IxF = index_vals_f(disc_only_copies, Tab, Pos),
fun(read) ->
Data =
case Cont of
{start, KeysPerChunk} ->
- mnesia_lib:db_init_chunk(disc_only_copies, Tab, KeysPerChunk);
+ mnesia_lib:db_init_chunk(
+ disc_only_copies, Tab, KeysPerChunk);
'$end_of_table' ->
'$end_of_table';
_Else ->
@@ -288,56 +401,82 @@ create_fun(Cont, Tab, Pos) ->
'$end_of_table' ->
end_of_input;
{Recs, Next} ->
- IdxElems = [{element(Pos, Obj), element(2, Obj)} || Obj <- Recs],
+ IdxElems = lists:flatmap(
+ fun(Obj) ->
+ PrimK = element(2, Obj),
+ [{V, PrimK} || V <- IxF(Obj)]
+ end, Recs),
{IdxElems, create_fun(Next, Tab, Pos)}
end;
(close) ->
ok
end.
-make_ram_index(_, []) ->
+make_ram_index(_, _, []) ->
done;
-make_ram_index(Tab, [Pos | Tail]) ->
- add_ram_index(Tab, Pos),
- make_ram_index(Tab, Tail).
+make_ram_index(Tab, Storage, [Pos | Tail]) ->
+ add_ram_index(Tab, Storage, Pos),
+ make_ram_index(Tab, Storage, Tail).
-add_ram_index(Tab, Pos) when is_integer(Pos) ->
- verbose("Creating index for ~w ~n", [Tab]),
+add_ram_index(Tab, Storage, {Pos, _Pref}) ->
+ Type = ordered,
+ verbose("Creating index for ~w ~p ~p~n", [Tab, Pos, Type]),
SetOrBag = val({Tab, setorbag}),
- IndexType = case SetOrBag of
- set -> duplicate_bag;
- ordered_set -> duplicate_bag;
- bag -> bag
- end,
- Index = mnesia_monitor:mktab(mnesia_index, [IndexType, public]),
+ IxValsF = index_vals_f(Storage, Tab, Pos),
+ IxFun = fun(Val, Key) -> {{Val, Key}} end,
+ Index = mnesia_monitor:mktab(mnesia_index, [ordered_set, public]),
Insert = fun(Rec, _Acc) ->
- true = ?ets_insert(Index, {element(Pos, Rec), element(2, Rec)})
+ PrimK = element(2, Rec),
+ true = ?ets_insert(
+ Index, [IxFun(V, PrimK)
+ || V <- IxValsF(Rec)])
end,
mnesia_lib:db_fixtable(ram_copies, Tab, true),
- true = ets:foldl(Insert, true, Tab),
+ true = mnesia_lib:db_foldl(Storage, Insert, true, Tab),
mnesia_lib:db_fixtable(ram_copies, Tab, false),
mnesia_lib:set({Tab, {index, Pos}}, Index),
- add_index_info(Tab, SetOrBag, {Pos, {ram, Index}});
-add_ram_index(_Tab, snmp) ->
+ add_index_info(Tab, SetOrBag, {{Pos, Type}, {ram, Index}});
+add_ram_index(_Tab, _, snmp) ->
ok.
-add_index_info(Tab, Type, IxElem) ->
+index_info(SetOrBag, PosList) ->
+ IxPlugins = mnesia_schema:index_plugins(),
+ ExpPosList = lists:map(
+ fun({{P,Type},Ixt} = PI) ->
+ case P of
+ {_} = IxN ->
+ {_, M, F} =
+ lists:keyfind(IxN, 1, IxPlugins),
+ {{{IxN,M,F}, Type}, Ixt};
+ _ ->
+ PI
+ end
+ end, PosList),
+ #index{setorbag = SetOrBag, pos_list = ExpPosList}.
+
+add_index_info(Tab, SetOrBag, IxElem) ->
Commit = val({Tab, commit_work}),
case lists:keysearch(index, 1, Commit) of
false ->
- Index = #index{setorbag = Type,
- pos_list = [IxElem]},
- %% Check later if mnesia_tm is sensative about the order
+ IndexInfo = index_info(SetOrBag, [IxElem]),
+ %% Check later if mnesia_tm is sensitive about the order
+ mnesia_lib:set({Tab, index_info}, IndexInfo),
+ mnesia_lib:set({Tab, index}, index_positions(IndexInfo)),
mnesia_lib:set({Tab, commit_work},
- mnesia_lib:sort_commit([Index | Commit]));
+ mnesia_lib:sort_commit([IndexInfo | Commit]));
{value, Old} ->
%% We could check for consistency here
Index = Old#index{pos_list = [IxElem | Old#index.pos_list]},
+ mnesia_lib:set({Tab, index_info}, Index),
+ mnesia_lib:set({Tab, index}, index_positions(Index)),
NewC = lists:keyreplace(index, 1, Commit, Index),
mnesia_lib:set({Tab, commit_work},
mnesia_lib:sort_commit(NewC))
end.
+index_positions(#index{pos_list = PL}) ->
+ [P || {{P,_},_} <- PL].
+
del_index_info(Tab, Pos) ->
Commit = val({Tab, commit_work}),
case lists:keysearch(index, 1, Commit) of
@@ -345,13 +484,21 @@ del_index_info(Tab, Pos) ->
%% Something is wrong ignore
skip;
{value, Old} ->
- case lists:keydelete(Pos, 1, Old#index.pos_list) of
+ case lists:filter(fun({P,_}) ->
+ element(1,P)=/=Pos
+ end,
+ Old#index.pos_list) of
[] ->
+ IndexInfo = index_info(Old#index.setorbag,[]),
+ mnesia_lib:set({Tab, index_info}, IndexInfo),
+ mnesia_lib:set({Tab, index}, index_positions(IndexInfo)),
NewC = lists:keydelete(index, 1, Commit),
mnesia_lib:set({Tab, commit_work},
mnesia_lib:sort_commit(NewC));
New ->
Index = Old#index{pos_list = New},
+ mnesia_lib:set({Tab, index_info}, Index),
+ mnesia_lib:set({Tab, index}, index_positions(Index)),
NewC = lists:keyreplace(index, 1, Commit, Index),
mnesia_lib:set({Tab, commit_work},
mnesia_lib:sort_commit(NewC))
@@ -360,33 +507,69 @@ del_index_info(Tab, Pos) ->
db_put({ram, Ixt}, V) ->
true = ?ets_insert(Ixt, V);
+db_put({{ext, _, _} = Ext, Ixt}, V) ->
+ mnesia_lib:db_put(Ext, Ixt, V);
db_put({dets, Ixt}, V) ->
ok = dets:insert(Ixt, V).
-db_get({ram, Ixt}, K) ->
- ?ets_lookup(Ixt, K);
+db_get({ram, _}=Ixt, IxKey) ->
+ Pat = [{{{IxKey, '$1'}}, [], [{{IxKey,'$1'}}]}],
+ db_select(Ixt, Pat);
+db_get({{ext,_,_} = _Storage, {_,_,{_,Type}}} = Ixt, IxKey) ->
+ Pat = case Type of
+ ordered -> [{{{IxKey, '$1'}}, [], [{{IxKey,'$1'}}]}];
+ bag -> [{{IxKey, '_'}, [], ['$_']}]
+ end,
+ db_select(Ixt, Pat);
db_get({dets, Ixt}, K) ->
dets:lookup(Ixt, K).
+db_erase({ram, Ixt}, K) ->
+ ?ets_delete(Ixt, K);
+db_erase({{ext,_,_} = Ext, Ixt}, K) ->
+ mnesia_lib:db_erase(Ext, Ixt, K);
+db_erase({dets, Ixt}, K) ->
+ dets:delete(Ixt, K).
+
db_match_erase({ram, Ixt}, Pat) ->
true = ?ets_match_delete(Ixt, Pat);
+db_match_erase({{ext,_,_} = Ext, Ixt}, Pat) ->
+ mnesia_lib:db_match_erase(Ext, Ixt, Pat);
db_match_erase({dets, Ixt}, Pat) ->
ok = dets:match_delete(Ixt, Pat).
-db_match({ram, Ixt}, Pat) ->
- ?ets_match(Ixt, Pat);
-db_match({dets, Ixt}, Pat) ->
- dets:match(Ixt, Pat).
+db_select({ram, Ixt}, Pat) ->
+ ets:select(Ixt, Pat);
+db_select({{ext,_,_} = Ext, Ixt}, Pat) ->
+ mnesia_lib:db_select(Ext, Ixt, Pat);
+db_select({dets, Ixt}, Pat) ->
+ dets:select(Ixt, Pat).
+
get_index_table(Tab, Pos) ->
get_index_table(Tab, val({Tab, storage_type}), Pos).
-get_index_table(Tab, ram_copies, Pos) ->
- {ram, val({Tab, {index, Pos}})};
-get_index_table(Tab, disc_copies, Pos) ->
- {ram, val({Tab, {index, Pos}})};
-get_index_table(Tab, disc_only_copies, Pos) ->
- {dets, val({Tab, {index, Pos}})};
-get_index_table(_Tab, unknown, _Pos) ->
- unknown.
-
+get_index_table(Tab, _Storage, Pos) ->
+ #index{pos_list = PosL} = val({Tab, index_info}),
+ {_IxType, Ixt} = pick_index(PosL, Tab, Pos),
+ Ixt.
+
+index_vals_f(Storage, Tab, {_} = Pos) ->
+ index_vals_f(Storage, Tab,
+ lists:keyfind(Pos, 1, mnesia_schema:index_plugins()));
+index_vals_f(_Storage, Tab, {Pos,M,F}) ->
+ fun(Obj) ->
+ M:F(Tab, Pos, Obj)
+ end;
+index_vals_f(Storage, Tab, Pos) when is_integer(Pos) ->
+ case mnesia_lib:semantics(Storage, index_fun) of
+ undefined ->
+ fun(Obj) ->
+ [element(Pos, Obj)]
+ end;
+ F when is_function(F, 4) ->
+ {ext, Alias, _Mod} = Storage,
+ fun(Obj) ->
+ F(Alias, Tab, Pos, Obj)
+ end
+ end.
diff --git a/lib/mnesia/src/mnesia_lib.erl b/lib/mnesia/src/mnesia_lib.erl
index a5eded4f0f..10e232c800 100644
--- a/lib/mnesia/src/mnesia_lib.erl
+++ b/lib/mnesia/src/mnesia_lib.erl
@@ -54,6 +54,7 @@
db_erase_tab/2,
db_first/1,
db_first/2,
+ db_foldl/3, db_foldl/4, db_foldl/6,
db_last/1,
db_last/2,
db_fixtable/3,
@@ -135,6 +136,7 @@
set_local_content_whereabouts/1,
set_remote_where_to_read/1,
set_remote_where_to_read/2,
+ semantics/2,
show/1,
show/2,
sort_commit/1,
@@ -144,6 +146,7 @@
tab2tmp/1,
tab2dcd/1,
tab2dcl/1,
+ tab2logtmp/1,
to_list/1,
union/2,
uniq/1,
@@ -151,6 +154,8 @@
unset/1,
%% update_counter/2,
val/1,
+ validate_key/2,
+ validate_record/2,
vcore/0,
vcore/1,
verbose/2,
@@ -319,15 +324,41 @@ tab2dcd(Tab) -> %% Disc copies data
tab2dcl(Tab) -> %% Disc copies log
dir(lists:concat([Tab, ".DCL"])).
+tab2logtmp(Tab) -> %% Disc copies log
+ dir(lists:concat([Tab, ".LOGTMP"])).
+
storage_type_at_node(Node, Tab) ->
search_key(Node, [{disc_copies, val({Tab, disc_copies})},
{ram_copies, val({Tab, ram_copies})},
- {disc_only_copies, val({Tab, disc_only_copies})}]).
+ {disc_only_copies, val({Tab, disc_only_copies})}|
+ wrap_external(val({Tab, external_copies}))]).
cs_to_storage_type(Node, Cs) ->
search_key(Node, [{disc_copies, Cs#cstruct.disc_copies},
{ram_copies, Cs#cstruct.ram_copies},
- {disc_only_copies, Cs#cstruct.disc_only_copies}]).
+ {disc_only_copies, Cs#cstruct.disc_only_copies} |
+ wrap_external(Cs#cstruct.external_copies)]).
+
+-define(native(T), T==ram_copies; T==disc_copies; T==disc_only_copies).
+
+semantics({ext,Alias,Mod}, Item) ->
+ Mod:semantics(Alias, Item);
+semantics({Alias,Mod}, Item) ->
+ Mod:semantics(Alias, Item);
+semantics(Type, storage) when ?native(Type) ->
+ Type;
+semantics(Type, types) when ?native(Type) ->
+ [set, ordered_set, bag];
+semantics(disc_only_copies, index_types) ->
+ [bag];
+semantics(Type, index_types) when ?native(Type) ->
+ [bag, ordered];
+semantics(_, _) ->
+ undefined.
+
+
+wrap_external(L) ->
+ [{{ext,Alias,Mod},Ns} || {{Alias,Mod},Ns} <- L].
schema_cs_to_storage_type(Node, Cs) ->
case cs_to_storage_type(Node, Cs) of
@@ -335,7 +366,6 @@ schema_cs_to_storage_type(Node, Cs) ->
Other -> Other
end.
-
search_key(Key, [{Val, List} | Tail]) ->
case lists:member(Key, List) of
true -> Val;
@@ -344,6 +374,33 @@ search_key(Key, [{Val, List} | Tail]) ->
search_key(_Key, []) ->
unknown.
+validate_key(Tab, Key) ->
+ case ?catch_val({Tab, record_validation}) of
+ {RecName, Arity, Type} ->
+ {RecName, Arity, Type};
+ {RecName, Arity, Type, Alias, Mod} ->
+ %% external type
+ Mod:validate_key(Alias, Tab, RecName, Arity, Type, Key);
+ {'EXIT', _} ->
+ mnesia:abort({no_exists, Tab})
+ end.
+
+
+validate_record(Tab, Obj) ->
+ case ?catch_val({Tab, record_validation}) of
+ {RecName, Arity, Type}
+ when tuple_size(Obj) == Arity, RecName == element(1, Obj) ->
+ {RecName, Arity, Type};
+ {RecName, Arity, Type, Alias, Mod}
+ when tuple_size(Obj) == Arity, RecName == element(1, Obj) ->
+ %% external type
+ Mod:validate_record(Alias, Tab, RecName, Arity, Type, Obj);
+ {'EXIT', _} ->
+ mnesia:abort({no_exists, Tab});
+ _ ->
+ mnesia:abort({bad_type, Obj})
+ end.
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% ops, we've got some global variables here :-)
@@ -410,9 +467,9 @@ pr_other(Var) ->
no -> {node_not_running, node()};
_ -> {no_exists, Var}
end,
- verbose("~p (~p) val(mnesia_gvar, ~w) -> ~p ~n",
+ verbose("~p (~p) val(mnesia_gvar, ~w) -> ~p ~p ~n",
[self(), process_info(self(), registered_name),
- Var, Why]),
+ Var, Why, erlang:get_stacktrace()]),
mnesia:abort(Why).
%% Some functions for list valued variables
@@ -549,10 +606,16 @@ read_counter(Name) ->
?ets_lookup_element(mnesia_stats, Name, 2).
cs_to_nodes(Cs) ->
+ ext_nodes(Cs#cstruct.external_copies) ++
Cs#cstruct.disc_only_copies ++
Cs#cstruct.disc_copies ++
Cs#cstruct.ram_copies.
+ext_nodes(Ext) ->
+ lists:flatmap(fun({_, Ns}) ->
+ Ns
+ end, Ext).
+
overload_types() ->
[mnesia_tm, mnesia_dump_log].
@@ -748,7 +811,7 @@ view(File) ->
true ->
view(File, dat);
false ->
- case suffix([".LOG", ".BUP", ".ETS"], File) of
+ case suffix([".LOG", ".BUP", ".ETS", ".LOGTMP"], File) of
true ->
view(File, log);
false ->
@@ -1044,18 +1107,24 @@ db_get(Tab, Key) ->
db_get(val({Tab, storage_type}), Tab, Key).
db_get(ram_copies, Tab, Key) -> ?ets_lookup(Tab, Key);
db_get(disc_copies, Tab, Key) -> ?ets_lookup(Tab, Key);
-db_get(disc_only_copies, Tab, Key) -> dets:lookup(Tab, Key).
+db_get(disc_only_copies, Tab, Key) -> dets:lookup(Tab, Key);
+db_get({ext, Alias, Mod}, Tab, Key) ->
+ Mod:lookup(Alias, Tab, Key).
db_init_chunk(Tab) ->
db_init_chunk(val({Tab, storage_type}), Tab, 1000).
db_init_chunk(Tab, N) ->
db_init_chunk(val({Tab, storage_type}), Tab, N).
+db_init_chunk({ext, Alias, Mod}, Tab, N) ->
+ Mod:select(Alias, Tab, [{'_', [], ['$_']}], N);
db_init_chunk(disc_only_copies, Tab, N) ->
dets:select(Tab, [{'_', [], ['$_']}], N);
db_init_chunk(_, Tab, N) ->
ets:select(Tab, [{'_', [], ['$_']}], N).
+db_chunk({ext, _Alias, Mod}, State) ->
+ Mod:select(State);
db_chunk(disc_only_copies, State) ->
dets:select(State);
db_chunk(_, State) ->
@@ -1066,7 +1135,9 @@ db_put(Tab, Val) ->
db_put(ram_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;
db_put(disc_copies, Tab, Val) -> ?ets_insert(Tab, Val), ok;
-db_put(disc_only_copies, Tab, Val) -> dets:insert(Tab, Val).
+db_put(disc_only_copies, Tab, Val) -> dets:insert(Tab, Val);
+db_put({ext, Alias, Mod}, Tab, Val) ->
+ Mod:insert(Alias, Tab, Val).
db_match_object(Tab, Pat) ->
db_match_object(val({Tab, storage_type}), Tab, Pat).
@@ -1075,12 +1146,36 @@ db_match_object(Storage, Tab, Pat) ->
try
case Storage of
disc_only_copies -> dets:match_object(Tab, Pat);
+ {ext, Alias, Mod} -> Mod:select(Alias, Tab, [{Pat, [], ['$_']}]);
_ -> ets:match_object(Tab, Pat)
end
after
db_fixtable(Storage, Tab, false)
end.
+db_foldl(Fun, Acc, Tab) ->
+ db_foldl(val({Tab, storage_type}), Fun, Acc, Tab).
+
+db_foldl(Storage, Fun, Acc, Tab) ->
+ Limit = mnesia_monitor:get_env(fold_chunk_size),
+ db_foldl(Storage, Fun, Acc, Tab, [{'_', [], ['$_']}], Limit).
+
+db_foldl(ram_copies, Fun, Acc, Tab, Pat, Limit) ->
+ mnesia_lib:db_fixtable(ram_copies, Tab, true),
+ try select_foldl(db_select_init(ram_copies, Tab, Pat, Limit),
+ Fun, Acc, ram_copies)
+ after
+ mnesia_lib:db_fixtable(ram_copies, Tab, false)
+ end;
+db_foldl(Storage, Fun, Acc, Tab, Pat, Limit) ->
+ select_foldl(mnesia_lib:db_select_init(Storage, Tab, Pat, Limit), Fun, Acc, Storage).
+
+select_foldl({Objs, Cont}, Fun, Acc, Storage) ->
+ select_foldl(mnesia_lib:db_select_cont(Storage, Cont, []),
+ Fun, lists:foldl(Fun, Acc, Objs), Storage);
+select_foldl('$end_of_table', _, Acc, _) ->
+ Acc.
+
db_select(Tab, Pat) ->
db_select(val({Tab, storage_type}), Tab, Pat).
@@ -1089,17 +1184,33 @@ db_select(Storage, Tab, Pat) ->
try
case Storage of
disc_only_copies -> dets:select(Tab, Pat);
+ {ext, Alias, Mod} -> Mod:select(Alias, Tab, Pat);
_ -> ets:select(Tab, Pat)
end
after
db_fixtable(Storage, Tab, false)
end.
+db_select_init({ext, Alias, Mod}, Tab, Pat, Limit) ->
+ case Mod:select(Alias, Tab, Pat, Limit) of
+ {Matches, Continuation} when is_list(Matches) ->
+ {Matches, {Alias, Continuation}};
+ R ->
+ R
+ end;
db_select_init(disc_only_copies, Tab, Pat, Limit) ->
dets:select(Tab, Pat, Limit);
db_select_init(_, Tab, Pat, Limit) ->
ets:select(Tab, Pat, Limit).
+db_select_cont({ext, Alias, Mod}, Cont0, Ms) ->
+ Cont = Mod:repair_continuation(Cont0, Ms),
+ case Mod:select(Cont) of
+ {Matches, Continuation} when is_list(Matches) ->
+ {Matches, {Alias, Continuation}};
+ R ->
+ R
+ end;
db_select_cont(disc_only_copies, Cont0, Ms) ->
Cont = dets:repair_continuation(Cont0, Ms),
dets:select(Cont);
@@ -1116,13 +1227,18 @@ db_fixtable(disc_copies, Tab, Bool) ->
db_fixtable(dets, Tab, Bool) ->
dets:safe_fixtable(Tab, Bool);
db_fixtable(disc_only_copies, Tab, Bool) ->
- dets:safe_fixtable(Tab, Bool).
+ dets:safe_fixtable(Tab, Bool);
+db_fixtable({ext, Alias, Mod}, Tab, Bool) ->
+ Mod:fixtable(Alias, Tab, Bool).
db_erase(Tab, Key) ->
db_erase(val({Tab, storage_type}), Tab, Key).
db_erase(ram_copies, Tab, Key) -> ?ets_delete(Tab, Key), ok;
db_erase(disc_copies, Tab, Key) -> ?ets_delete(Tab, Key), ok;
-db_erase(disc_only_copies, Tab, Key) -> dets:delete(Tab, Key).
+db_erase(disc_only_copies, Tab, Key) -> dets:delete(Tab, Key);
+db_erase({ext, Alias, Mod}, Tab, Key) ->
+ Mod:delete(Alias, Tab, Key),
+ ok.
db_match_erase(Tab, '_') ->
db_delete_all(val({Tab, storage_type}),Tab);
@@ -1130,7 +1246,10 @@ db_match_erase(Tab, Pat) ->
db_match_erase(val({Tab, storage_type}), Tab, Pat).
db_match_erase(ram_copies, Tab, Pat) -> ?ets_match_delete(Tab, Pat), ok;
db_match_erase(disc_copies, Tab, Pat) -> ?ets_match_delete(Tab, Pat), ok;
-db_match_erase(disc_only_copies, Tab, Pat) -> dets:match_delete(Tab, Pat).
+db_match_erase(disc_only_copies, Tab, Pat) -> dets:match_delete(Tab, Pat);
+db_match_erase({ext, Alias, Mod}, Tab, Pat) ->
+ Mod:match_delete(Alias, Tab, Pat),
+ ok.
db_delete_all(ram_copies, Tab) -> ets:delete_all_objects(Tab);
db_delete_all(disc_copies, Tab) -> ets:delete_all_objects(Tab);
@@ -1140,31 +1259,41 @@ db_first(Tab) ->
db_first(val({Tab, storage_type}), Tab).
db_first(ram_copies, Tab) -> ?ets_first(Tab);
db_first(disc_copies, Tab) -> ?ets_first(Tab);
-db_first(disc_only_copies, Tab) -> dets:first(Tab).
+db_first(disc_only_copies, Tab) -> dets:first(Tab);
+db_first({ext, Alias, Mod}, Tab) ->
+ Mod:first(Alias, Tab).
db_next_key(Tab, Key) ->
db_next_key(val({Tab, storage_type}), Tab, Key).
db_next_key(ram_copies, Tab, Key) -> ?ets_next(Tab, Key);
db_next_key(disc_copies, Tab, Key) -> ?ets_next(Tab, Key);
-db_next_key(disc_only_copies, Tab, Key) -> dets:next(Tab, Key).
+db_next_key(disc_only_copies, Tab, Key) -> dets:next(Tab, Key);
+db_next_key({ext, Alias, Mod}, Tab, Key) ->
+ Mod:next(Alias, Tab, Key).
db_last(Tab) ->
db_last(val({Tab, storage_type}), Tab).
db_last(ram_copies, Tab) -> ?ets_last(Tab);
db_last(disc_copies, Tab) -> ?ets_last(Tab);
-db_last(disc_only_copies, Tab) -> dets:first(Tab). %% Dets don't have order
+db_last(disc_only_copies, Tab) -> dets:first(Tab); %% Dets don't have order
+db_last({ext, Alias, Mod}, Tab) ->
+ Mod:last(Alias, Tab).
db_prev_key(Tab, Key) ->
db_prev_key(val({Tab, storage_type}), Tab, Key).
db_prev_key(ram_copies, Tab, Key) -> ?ets_prev(Tab, Key);
db_prev_key(disc_copies, Tab, Key) -> ?ets_prev(Tab, Key);
-db_prev_key(disc_only_copies, Tab, Key) -> dets:next(Tab, Key). %% Dets don't have order
+db_prev_key(disc_only_copies, Tab, Key) -> dets:next(Tab, Key); %% Dets don't have order
+db_prev_key({ext, Alias, Mod}, Tab, Key) ->
+ Mod:prev(Alias, Tab, Key).
db_slot(Tab, Pos) ->
db_slot(val({Tab, storage_type}), Tab, Pos).
db_slot(ram_copies, Tab, Pos) -> ?ets_slot(Tab, Pos);
db_slot(disc_copies, Tab, Pos) -> ?ets_slot(Tab, Pos);
-db_slot(disc_only_copies, Tab, Pos) -> dets:slot(Tab, Pos).
+db_slot(disc_only_copies, Tab, Pos) -> dets:slot(Tab, Pos);
+db_slot({ext, Alias, Mod}, Tab, Pos) ->
+ Mod:slot(Alias, Tab, Pos).
db_update_counter(Tab, C, Val) ->
db_update_counter(val({Tab, storage_type}), Tab, C, Val).
@@ -1173,13 +1302,16 @@ db_update_counter(ram_copies, Tab, C, Val) ->
db_update_counter(disc_copies, Tab, C, Val) ->
?ets_update_counter(Tab, C, Val);
db_update_counter(disc_only_copies, Tab, C, Val) ->
- dets:update_counter(Tab, C, Val).
+ dets:update_counter(Tab, C, Val);
+db_update_counter({ext, Alias, Mod}, Tab, C, Val) ->
+ Mod:update_counter(Alias, Tab, C, Val).
db_erase_tab(Tab) ->
db_erase_tab(val({Tab, storage_type}), Tab).
db_erase_tab(ram_copies, Tab) -> ?ets_delete_table(Tab);
db_erase_tab(disc_copies, Tab) -> ?ets_delete_table(Tab);
-db_erase_tab(disc_only_copies, _Tab) -> ignore.
+db_erase_tab(disc_only_copies, _Tab) -> ignore;
+db_erase_tab({ext, _Alias, _Mod}, _Tab) -> ignore.
%% assuming that Tab is a valid ets-table
dets_to_ets(Tabname, Tab, File, Type, Rep, Lock) ->
diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl
index bde5585139..71e5829c87 100644
--- a/lib/mnesia/src/mnesia_loader.erl
+++ b/lib/mnesia/src/mnesia_loader.erl
@@ -147,6 +147,19 @@ do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies -
{error, Error} ->
{not_loaded, {"Failed to create dets table", Error}}
end
+ end;
+
+do_get_disc_copy2(Tab, Reason, Storage = {ext, Alias, Mod}, _Type) ->
+ Cs = val({Tab, cstruct}),
+ case mnesia_monitor:unsafe_create_external(Tab, Alias, Mod, Cs) of
+ ok ->
+ ok = ext_load_table(Mod, Alias, Tab, Reason),
+ mnesia_index:init_index(Tab, Storage),
+ set({Tab, load_node}, node()),
+ set({Tab, load_reason}, Reason),
+ {loaded, ok};
+ Other ->
+ {not_loaded, Other}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -259,7 +272,7 @@ init_receiver(Node, Tab,Storage,Cs,Reason) ->
true = lists:member(Node, Active),
{SenderPid, TabSize, DetsData} =
start_remote_sender(Node,Tab,Storage),
- Init = table_init_fun(SenderPid),
+ Init = table_init_fun(SenderPid, Storage),
Args = [self(),Tab,Storage,Cs,SenderPid,
TabSize,DetsData,Init],
Pid = spawn_link(?MODULE, spawned_receiver, Args),
@@ -289,7 +302,12 @@ start_remote_sender(Node,Tab,Storage) ->
mnesia_controller:start_remote_sender(Node, Tab, self(), Storage),
put(mnesia_table_sender_node, {Tab, Node}),
receive
- {SenderPid, {first, TabSize}} ->
+ {SenderPid, {first, _} = Msg}
+ when is_pid(SenderPid), element(1, Storage) == ext ->
+ {ext, Alias, Mod} = Storage,
+ {Sz, Data} = Mod:receiver_first_message(SenderPid, Msg, Alias, Tab),
+ {SenderPid, Sz, Data};
+ {SenderPid, {first, TabSize}} =_M1 ->
{SenderPid, TabSize, false};
{SenderPid, {first, TabSize, DetsData}} ->
{SenderPid, TabSize, DetsData};
@@ -299,16 +317,18 @@ start_remote_sender(Node,Tab,Storage) ->
down(Tab, Storage)
end.
-table_init_fun(SenderPid) ->
+table_init_fun(SenderPid, Storage) ->
fun(read) ->
Receiver = self(),
SenderPid ! {Receiver, more},
- get_data(SenderPid, Receiver)
+ get_data(SenderPid, Receiver, Storage);
+ (close) ->
+ ok
end.
%% Add_table_copy get's it's own locks.
start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,{add_table_copy,_}}) ->
- Init = table_init_fun(SenderPid),
+ Init = table_init_fun(SenderPid, Storage),
case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of
Err = {error, _} ->
SenderPid ! {copier_done, node()},
@@ -391,7 +411,15 @@ create_table(Tab, TabSize, Storage, Cs) ->
{Storage, Tab};
Else ->
Else
- end
+ end;
+ element(1, Storage) == ext ->
+ {_, Alias, Mod} = Storage,
+ case mnesia_monitor:unsafe_create_external(Tab, Alias, Mod, Cs) of
+ ok ->
+ {Storage, Tab};
+ Else ->
+ Else
+ end
end.
tab_receiver(Node, Tab, Storage, Cs, OrigTabRec) ->
@@ -409,21 +437,25 @@ tab_receiver(Node, Tab, Storage, Cs, OrigTabRec) ->
tab_receiver(Node, Tab, Storage, Cs, OrigTabRec)
end.
-make_table_fun(Pid, TabRec) ->
+make_table_fun(Pid, TabRec, Storage) ->
fun(close) ->
ok;
+ ({read, Msg}) ->
+ Pid ! {TabRec, Msg},
+ get_data(Pid, TabRec, Storage);
(read) ->
- get_data(Pid, TabRec)
+ get_data(Pid, TabRec, Storage)
end.
-get_data(Pid, TabRec) ->
+get_data(Pid, TabRec, Storage) ->
receive
{Pid, {more_z, CompressedRecs}} when is_binary(CompressedRecs) ->
- Pid ! {TabRec, more},
- {zlib_uncompress(CompressedRecs), make_table_fun(Pid,TabRec)};
+ maybe_reply(Pid, {TabRec, more}, Storage),
+ {zlib_uncompress(CompressedRecs),
+ make_table_fun(Pid, TabRec, Storage)};
{Pid, {more, Recs}} ->
- Pid ! {TabRec, more},
- {Recs, make_table_fun(Pid,TabRec)};
+ maybe_reply(Pid, {TabRec, more}, Storage),
+ {Recs, make_table_fun(Pid, TabRec, Storage)};
{Pid, no_more} ->
end_of_input;
{copier_done, Node} ->
@@ -431,13 +463,48 @@ get_data(Pid, TabRec) ->
Node ->
{copier_done, Node};
_ ->
- get_data(Pid, TabRec)
+ get_data(Pid, TabRec, Storage)
end;
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
- get_data(Pid, TabRec)
+ get_data(Pid, TabRec, Storage)
end.
+maybe_reply(_, _, {ext, _, _}) ->
+ ignore;
+maybe_reply(Pid, Msg, _) ->
+ Pid ! Msg.
+
+ext_init_table(Alias, Mod, Tab, Fun, State, Sender) ->
+ ok = ext_load_table(Mod, Alias, Tab, {net_load, node(Sender)}),
+ ext_init_table(read, Alias, Mod, Tab, Fun, State, Sender).
+
+ext_load_table(Mod, Alias, Tab, Reason) ->
+ CS = val({Tab, cstruct}),
+ Mod:load_table(Alias, Tab, Reason, mnesia_schema:cs2list(CS)).
+
+
+ext_init_table(Action, Alias, Mod, Tab, Fun, State, Sender) ->
+ case Fun(Action) of
+ {copier_done, Node} ->
+ verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]),
+ down(Tab, {ext,Alias,Mod});
+ {Data, NewFun} ->
+ case Mod:receive_data(Data, Alias, Tab, Sender, State) of
+ {more, NewState} ->
+ ext_init_table({read, more}, Alias, Mod,
+ Tab, NewFun, NewState, Sender);
+ {{more,Msg}, NewState} ->
+ ext_init_table({read, Msg}, Alias, Mod,
+ Tab, NewFun, NewState, Sender)
+ end;
+ end_of_input ->
+ Mod:receive_done(Alias, Tab, Sender, State),
+ ok = Fun(close)
+ end.
+
+init_table(Tab, {ext,Alias,Mod}, Fun, State, Sender) ->
+ ext_init_table(Alias, Mod, Tab, Fun, State, Sender);
init_table(Tab, disc_only_copies, Fun, DetsInfo,Sender) ->
ErtsVer = erlang:system_info(version),
case DetsInfo of
@@ -567,7 +634,10 @@ handle_last({ram_copies, Tab}, _Type, DatBin) ->
ok;
false ->
ok
- end.
+ end;
+
+handle_last(_Storage, _Type, nobin) ->
+ ok.
down(Tab, Storage) ->
case Storage of
@@ -578,7 +648,10 @@ down(Tab, Storage) ->
disc_only_copies ->
TmpFile = mnesia_lib:tab2tmp(Tab),
mnesia_lib:dets_sync_close(Tab),
- file:delete(TmpFile)
+ file:delete(TmpFile);
+ {ext, Alias, Mod} ->
+ catch Mod:close_table(Alias, Tab),
+ catch Mod:delete_table(Alias, Tab)
end,
mnesia_checkpoint:tm_del_copy(Tab, node()),
mnesia_controller:sync_del_table_copy_whereabouts(Tab, node()),
@@ -603,21 +676,35 @@ db_erase({ram_copies, Tab}, Key) ->
db_erase({disc_copies, Tab}, Key) ->
true = ?ets_delete(Tab, Key);
db_erase({disc_only_copies, Tab}, Key) ->
- ok = dets:delete(Tab, Key).
+ ok = dets:delete(Tab, Key);
+db_erase({{ext, Alias, Mod}, Tab}, Key) ->
+ ok = Mod:delete(Alias, Tab, Key).
db_match_erase({ram_copies, Tab} , Pat) ->
true = ?ets_match_delete(Tab, Pat);
db_match_erase({disc_copies, Tab} , Pat) ->
true = ?ets_match_delete(Tab, Pat);
db_match_erase({disc_only_copies, Tab}, Pat) ->
- ok = dets:match_delete(Tab, Pat).
+ ok = dets:match_delete(Tab, Pat);
+db_match_erase({{ext, Alias, Mod}, Tab}, Pat) ->
+ % "ets style" is to return true
+ % "dets style" is to return N | { error, Reason }
+ % or sometimes ok (?)
+ % be nice and accept both
+ case Mod:match_delete(Alias, Tab, Pat) of
+ N when is_integer (N) -> ok;
+ true -> ok;
+ ok -> ok
+ end.
db_put({ram_copies, Tab}, Val) ->
true = ?ets_insert(Tab, Val);
db_put({disc_copies, Tab}, Val) ->
true = ?ets_insert(Tab, Val);
db_put({disc_only_copies, Tab}, Val) ->
- ok = dets:insert(Tab, Val).
+ ok = dets:insert(Tab, Val);
+db_put({{ext, Alias, Mod}, Tab}, Val) ->
+ ok = Mod:insert(Alias, Tab, Val).
%% This code executes at the remote site where the data is
%% executes in a special copier process.
@@ -636,47 +723,62 @@ send_table(Pid, Tab, RemoteS) ->
unknown ->
{error, {no_exists, Tab}};
Storage ->
- %% Send first
- TabSize = mnesia:table_info(Tab, size),
- KeysPerTransfer = calc_nokeys(Storage, Tab),
- ChunkData = dets:info(Tab, bchunk_format),
-
- UseDetsChunk =
- Storage == RemoteS andalso
- Storage == disc_only_copies andalso
- ChunkData /= undefined,
- if
- UseDetsChunk == true ->
- DetsInfo = erlang:system_info(version),
- Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
- true ->
- Pid ! {self(), {first, TabSize}}
- end,
+ do_send_table(Pid, Tab, Storage, RemoteS)
+ end.
- %% Debug info
- put(mnesia_table_sender, {Tab, node(Pid), Pid}),
- {Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer),
-
- SendIt = fun() ->
- NeedLock = need_lock(Tab),
- {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock),
- send_more(Pid, 1, Chunk, Init(), Tab),
- finish_copy(Pid, Tab, Storage, RemoteS, NeedLock)
- end,
-
- try SendIt() of
- {_, receiver_died} -> ok;
- {atomic, no_more} -> ok
- catch
- throw:receiver_died ->
- cleanup_tab_copier(Pid, Storage, Tab),
- ok;
- error:Reason -> %% Prepare failed
- cleanup_tab_copier(Pid, Storage, Tab),
- {error, {tab_copier, Tab, {Reason, erlang:get_stacktrace()}}}
- after
- unlink(whereis(mnesia_tm))
- end
+do_send_table(Pid, Tab, Storage, RemoteS) ->
+ {Init, Chunk} =
+ case Storage of
+ {ext, Alias, Mod} ->
+ case Mod:sender_init(Alias, Tab, RemoteS, Pid) of
+ {standard, I, C} ->
+ Pid ! {self(), {first, Mod:info(Alias, Tab, size)}},
+ {I, C};
+ {_, _} = Res ->
+ Res
+ end;
+ Storage ->
+ %% Send first
+ TabSize = mnesia:table_info(Tab, size),
+ KeysPerTransfer = calc_nokeys(Storage, Tab),
+ ChunkData = dets:info(Tab, bchunk_format),
+
+ UseDetsChunk =
+ Storage == RemoteS andalso
+ Storage == disc_only_copies andalso
+ ChunkData /= undefined,
+ if
+ UseDetsChunk == true ->
+ DetsInfo = erlang:system_info(version),
+ Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
+ true ->
+ Pid ! {self(), {first, TabSize}}
+ end,
+ {_I, _C} =
+ reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer)
+ end,
+ %% Debug info
+ put(mnesia_table_sender, {Tab, node(Pid), Pid}),
+
+ SendIt = fun() ->
+ NeedLock = need_lock(Tab),
+ {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock),
+ send_more(Pid, 1, Chunk, Init(), Tab, Storage),
+ finish_copy(Pid, Tab, Storage, RemoteS, NeedLock)
+ end,
+
+ try SendIt() of
+ {_, receiver_died} -> ok;
+ {atomic, no_more} -> ok
+ catch
+ throw:receiver_died ->
+ cleanup_tab_copier(Pid, Storage, Tab),
+ ok;
+ error:Reason -> %% Prepare failed
+ cleanup_tab_copier(Pid, Storage, Tab),
+ {error, {tab_copier, Tab, {Reason, erlang:get_stacktrace()}}}
+ after
+ unlink(whereis(mnesia_tm))
end.
prepare_copy(Pid, Tab, Storage, NeedLock) ->
@@ -726,20 +828,32 @@ update_where_to_write([H|T], Tab, AddNode) ->
[{update_where_to_write, [add, Tab, AddNode], self()}]),
update_where_to_write(T, Tab, AddNode).
-send_more(Pid, N, Chunk, DataState, Tab) ->
+send_more(Pid, N, Chunk, DataState, Tab, Storage) ->
receive
{NewPid, more} ->
case send_packet(N - 1, NewPid, Chunk, DataState) of
New when is_integer(New) ->
New - 1;
NewData ->
- send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData, Tab)
+ send_more(NewPid, ?MAX_NOPACKETS, Chunk, NewData,
+ Tab, Storage)
+ end;
+ {NewPid, {more, Msg}} when element(1, Storage) == ext ->
+ {ext, Alias, Mod} = Storage,
+ {NewChunk, NewState} =
+ Mod:sender_handle_info(Msg, Alias, Tab, NewPid, DataState),
+ case send_packet(N - 1, NewPid, NewChunk, NewState) of
+ New when is_integer(New) ->
+ New -1;
+ NewData ->
+ send_more(NewPid, N, NewChunk, NewData, Tab,
+ Storage)
end;
{_NewPid, {old_protocol, Tab}} ->
Storage = val({Tab, storage_type}),
{Init, NewChunk} =
reader_funcs(false, Tab, Storage, calc_nokeys(Storage, Tab)),
- send_more(Pid, 1, NewChunk, Init(), Tab);
+ send_more(Pid, 1, NewChunk, Init(), Tab, Storage);
{copier_done, Node} when Node == node(Pid)->
verbose("Receiver of table ~p crashed on ~p (more)~n", [Tab, Node]),
diff --git a/lib/mnesia/src/mnesia_log.erl b/lib/mnesia/src/mnesia_log.erl
index 36135418c8..9536effd42 100644
--- a/lib/mnesia/src/mnesia_log.erl
+++ b/lib/mnesia/src/mnesia_log.erl
@@ -224,17 +224,12 @@ sappend(Log, Term) ->
ok = disk_log:log(Log, Term).
%% Write commit records to the latest_log
-log(C) when C#commit.disc_copies == [],
- C#commit.disc_only_copies == [],
- C#commit.schema_ops == [] ->
- ignore;
log(C) ->
- case mnesia_monitor:use_dir() of
+ case need_log(C) andalso mnesia_monitor:use_dir() of
true ->
if
is_record(C, commit) ->
- C2 = C#commit{ram_copies = [], snmp = []},
- append(latest_log, C2);
+ append(latest_log, strip_snmp(C));
true ->
%% Either a commit record as binary
%% or some decision related info
@@ -247,17 +242,12 @@ log(C) ->
%% Synced
-slog(C) when C#commit.disc_copies == [],
- C#commit.disc_only_copies == [],
- C#commit.schema_ops == [] ->
- ignore;
slog(C) ->
- case mnesia_monitor:use_dir() of
+ case need_log(C) andalso mnesia_monitor:use_dir() of
true ->
if
is_record(C, commit) ->
- C2 = C#commit{ram_copies = [], snmp = []},
- sappend(latest_log, C2);
+ sappend(latest_log, strip_snmp(C));
true ->
%% Either a commit record as binary
%% or some decision related info
@@ -268,6 +258,13 @@ slog(C) ->
ignore
end.
+need_log(#commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext}) ->
+ lists:keymember(ext_copies, 1, Ext);
+need_log(_) -> true.
+
+strip_snmp(#commit{ext=[]}=CR) -> CR;
+strip_snmp(#commit{ext=Ext}=CR) ->
+ CR#commit{ext=lists:keydelete(snmp, 1, Ext)}.
%% Stuff related to the file LOG
diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl
index 4069341700..ab78c9b13e 100644
--- a/lib/mnesia/src/mnesia_monitor.erl
+++ b/lib/mnesia/src/mnesia_monitor.erl
@@ -32,6 +32,7 @@
init/0,
mktab/2,
unsafe_mktab/2,
+ unsafe_create_external/4,
mnesia_down/2,
needs_protocol_conversion/1,
negotiate_protocol/1,
@@ -82,9 +83,9 @@
going_down = [], tm_started = false, early_connects = [],
connecting, mq = [], remote_node_status = []}).
--define(current_protocol_version, {8,2}).
+-define(current_protocol_version, {8,3}).
--define(previous_protocol_version, {8,1}).
+-define(previous_protocol_version, {8,2}).
start() ->
gen_server:start_link({local, ?MODULE}, ?MODULE,
@@ -129,6 +130,8 @@ close_log(Name) ->
unsafe_close_log(Name) ->
unsafe_call({unsafe_close_log, Name}).
+unsafe_create_external(Tab, Alias, Mod, Cs) ->
+ unsafe_call({unsafe_create_external, Tab, Alias, Mod, Cs}).
disconnect(Node) ->
cast({disconnect, Node}).
@@ -193,7 +196,7 @@ protocol_version() ->
%% A sorted list of acceptable protocols the
%% preferred protocols are first in the list
acceptable_protocol_versions() ->
- [protocol_version(), ?previous_protocol_version].
+ [protocol_version(), ?previous_protocol_version, {8,1}].
needs_protocol_conversion(Node) ->
case {?catch_val({protocol, Node}), protocol_version()} of
@@ -405,6 +408,14 @@ handle_call({unsafe_close_log, Name}, _From, State) ->
_ = disk_log:close(Name),
{reply, ok, State};
+handle_call({unsafe_create_external, Tab, Alias, Mod, Cs}, _From, State) ->
+ case catch Mod:create_table(Alias, Tab, mnesia_schema:cs2list(Cs)) of
+ {'EXIT', ExitReason} ->
+ {reply, {error, ExitReason}, State};
+ Reply ->
+ {reply, Reply, State}
+ end;
+
handle_call({negotiate_protocol, Mon, _Version, _Protocols}, _From, State)
when State#state.tm_started == false ->
State2 = State#state{early_connects = [node(Mon) | State#state.early_connects]},
@@ -658,6 +669,7 @@ get_env(E) ->
env() ->
[
access_module,
+ allow_index_on_key,
auto_repair,
backup_module,
debug,
@@ -671,19 +683,23 @@ env() ->
extra_db_nodes,
ignore_fallback_at_startup,
fallback_error_function,
+ fold_chunk_size,
max_wait_for_decision,
schema_location,
core_dir,
pid_sort_order,
no_table_loaders,
dc_dump_limit,
- send_compressed
+ send_compressed,
+ schema
].
default_env(access_module) ->
mnesia;
default_env(auto_repair) ->
true;
+default_env(allow_index_on_key) ->
+ false;
default_env(backup_module) ->
mnesia_backup;
default_env(debug) ->
@@ -709,6 +725,8 @@ default_env(ignore_fallback_at_startup) ->
false;
default_env(fallback_error_function) ->
{mnesia, lkill};
+default_env(fold_chunk_size) ->
+ 100;
default_env(max_wait_for_decision) ->
infinity;
default_env(schema_location) ->
@@ -722,7 +740,9 @@ default_env(no_table_loaders) ->
default_env(dc_dump_limit) ->
4;
default_env(send_compressed) ->
- 0.
+ 0;
+default_env(schema) ->
+ [].
check_type(Env, Val) ->
try do_check_type(Env, Val)
@@ -730,6 +750,7 @@ check_type(Env, Val) ->
end.
do_check_type(access_module, A) when is_atom(A) -> A;
+do_check_type(allow_index_on_key, B) -> bool(B);
do_check_type(auto_repair, B) -> bool(B);
do_check_type(backup_module, B) when is_atom(B) -> B;
do_check_type(debug, debug) -> debug;
@@ -753,6 +774,8 @@ do_check_type(extra_db_nodes, L) when is_list(L) ->
(A) when is_atom(A) -> true
end,
lists:filter(Fun, L);
+do_check_type(fold_chunk_size, I) when is_integer(I), I > 0;
+ I =:= infinity -> I;
do_check_type(max_wait_for_decision, infinity) -> infinity;
do_check_type(max_wait_for_decision, I) when is_integer(I), I > 0 -> I;
do_check_type(schema_location, M) -> media(M);
@@ -766,7 +789,8 @@ do_check_type(pid_sort_order, "standard") -> standard;
do_check_type(pid_sort_order, _) -> false;
do_check_type(no_table_loaders, N) when is_integer(N), N > 0 -> N;
do_check_type(dc_dump_limit,N) when is_number(N), N > 0 -> N;
-do_check_type(send_compressed, L) when is_integer(L), L >= 0, L =< 9 -> L.
+do_check_type(send_compressed, L) when is_integer(L), L >= 0, L =< 9 -> L;
+do_check_type(schema, L) when is_list(L) -> L.
bool(true) -> true;
bool(false) -> false.
diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl
index 782493fb4f..0e4017e4c3 100644
--- a/lib/mnesia/src/mnesia_schema.erl
+++ b/lib/mnesia/src/mnesia_schema.erl
@@ -29,6 +29,16 @@
-module(mnesia_schema).
-export([
+ add_backend_type/2,
+ do_add_backend_type/2,
+ delete_backend_type/1,
+ do_delete_backend_type/1,
+ backend_types/0,
+ add_index_plugin/3,
+ do_add_index_plugin/3,
+ delete_index_plugin/1,
+ do_delete_index_plugin/1,
+ index_plugins/0,
add_snmp/2,
add_table_copy/3,
add_table_index/2,
@@ -55,14 +65,16 @@
dump_tables/1,
ensure_no_schema/1,
get_create_list/1,
- get_initial_schema/2,
+ get_initial_schema/3,
get_table_properties/1,
info/0,
info/1,
init/1,
+ init_backends/0,
insert_cstruct/3,
is_remote_member/1,
list2cs/1,
+ list2cs/2,
lock_schema/0,
merge_schema/0,
merge_schema/1,
@@ -110,7 +122,8 @@
do_delete_table/1,
do_read_table_property/2,
do_delete_table_property/2,
- do_write_table_property/2]).
+ do_write_table_property/2,
+ do_change_table_copy_type/3]).
-include("mnesia.hrl").
-include_lib("kernel/include/file.hrl").
@@ -138,12 +151,30 @@ init(IgnoreFallback) ->
verbose("Schema initiated from: ~p~n", [Source]),
set({schema, tables}, []),
set({schema, local_tables}, []),
+ do_set_schema(schema),
Tabs = set_schema(?ets_first(schema)),
lists:foreach(fun(Tab) -> clear_whereabouts(Tab) end, Tabs),
set({schema, where_to_read}, node()),
set({schema, load_node}, node()),
set({schema, load_reason}, initial),
- mnesia_controller:add_active_replica(schema, node()).
+ mnesia_controller:add_active_replica(schema, node()),
+ init_backends().
+
+
+init_backends() ->
+ Backends = lists:foldl(fun({Alias, Mod}, Acc) ->
+ orddict:append(Mod, Alias, Acc)
+ end, orddict:new(), get_ext_types()),
+ [init_backend(Mod, Aliases) || {Mod, Aliases} <- Backends],
+ ok.
+
+init_backend(Mod, [_|_] = Aliases) ->
+ case Mod:init_backend() of
+ ok ->
+ Mod:add_aliases(Aliases);
+ Error ->
+ mnesia:abort({backend_init_error, Error})
+ end.
exit_on_error({error, Reason}) ->
exit(Reason);
@@ -180,6 +211,7 @@ do_set_schema(Tab, Cs) ->
set({Tab, ram_copies}, Cs#cstruct.ram_copies),
set({Tab, disc_copies}, Cs#cstruct.disc_copies),
set({Tab, disc_only_copies}, Cs#cstruct.disc_only_copies),
+ set({Tab, external_copies}, Cs#cstruct.external_copies),
set({Tab, load_order}, Cs#cstruct.load_order),
set({Tab, access_mode}, Cs#cstruct.access_mode),
set({Tab, majority}, Cs#cstruct.majority),
@@ -195,15 +227,21 @@ do_set_schema(Tab, Cs) ->
set({Tab, arity}, Arity),
RecName = Cs#cstruct.record_name,
set({Tab, record_name}, RecName),
- set({Tab, record_validation}, {RecName, Arity, Type}),
set({Tab, wild_pattern}, wild(RecName, Arity)),
- set({Tab, index}, Cs#cstruct.index),
+ set({Tab, index}, [P || {P,_} <- Cs#cstruct.index]),
+ case Cs#cstruct.index of
+ [] ->
+ set({Tab, index_info}, mnesia_index:index_info(Type, []));
+ _ ->
+ ignore
+ end,
%% create actual index tabs later
set({Tab, cookie}, Cs#cstruct.cookie),
set({Tab, version}, Cs#cstruct.version),
set({Tab, cstruct}, Cs),
Storage = mnesia_lib:schema_cs_to_storage_type(node(), Cs),
set({Tab, storage_type}, Storage),
+ set_record_validation(Tab, Storage, RecName, Arity, Type),
mnesia_lib:add({schema, tables}, Tab),
Ns = mnesia_lib:cs_to_nodes(Cs),
case lists:member(node(), Ns) of
@@ -213,7 +251,24 @@ do_set_schema(Tab, Cs) ->
mnesia_lib:add({schema, local_tables}, Tab);
false ->
ignore
- end.
+ end,
+ set_ext_types(Tab, get_ext_types(), Cs#cstruct.external_copies).
+
+set_record_validation(Tab, {ext,Alias,Mod}, RecName, Arity, Type) ->
+ set({Tab, record_validation}, {RecName, Arity, Type, Alias, Mod});
+set_record_validation(Tab, _, RecName, Arity, Type) ->
+ set({Tab, record_validation}, {RecName, Arity, Type}).
+
+set_ext_types(Tab, ExtTypes, ExtCopies) ->
+ lists:foreach(
+ fun({Type, _} = Key) ->
+ Nodes = case lists:keyfind(Key, 1, ExtCopies) of
+ {_, Ns} -> Ns;
+ false -> []
+ end,
+ set({Tab, Type}, Nodes)
+ end, ExtTypes).
+
wild(RecName, Arity) ->
Wp0 = list_to_tuple(lists:duplicate(Arity, '_')),
@@ -525,9 +580,14 @@ do_read_disc_schema(Fname, Keep) ->
Res.
get_initial_schema(SchemaStorage, Nodes) ->
+ get_initial_schema(SchemaStorage, Nodes, []).
+
+get_initial_schema(SchemaStorage, Nodes, Properties) -> %
+ UserProps = initial_schema_properties(Properties),
Cs = #cstruct{name = schema,
record_name = schema,
- attributes = [table, cstruct]},
+ attributes = [table, cstruct],
+ user_properties = UserProps},
Cs2 =
case SchemaStorage of
ram_copies -> Cs#cstruct{ram_copies = Nodes};
@@ -535,6 +595,35 @@ get_initial_schema(SchemaStorage, Nodes) ->
end,
cs2list(Cs2).
+initial_schema_properties(Props0) ->
+ DefaultProps = remove_duplicates(mnesia_monitor:get_env(schema)),
+ Props = lists:foldl(
+ fun({K,V}, Acc) ->
+ lists:keystore(K, 1, Acc, {K,V})
+ end, DefaultProps, remove_duplicates(Props0)),
+ initial_schema_properties_(Props).
+
+initial_schema_properties_([{backend_types, Types}|Props]) ->
+ lists:foreach(fun({Name, Module}) ->
+ verify_backend_type(Name, Module)
+ end, Types),
+ [{mnesia_backend_types, Types}|initial_schema_properties_(Props)];
+initial_schema_properties_([{index_plugins, Plugins}|Props]) ->
+ lists:foreach(fun({Name, Module, Function}) ->
+ verify_index_plugin(Name, Module, Function)
+ end, Plugins),
+ [{mnesia_index_plugins, Plugins}|initial_schema_properties_(Props)];
+initial_schema_properties_([P|_Props]) ->
+ mnesia:abort({bad_schema_property, P});
+initial_schema_properties_([]) ->
+ [].
+
+remove_duplicates([{K,_} = H|T]) ->
+ [H | remove_duplicates([X || {K1,_} = X <- T,
+ K1 =/= K])];
+remove_duplicates([]) ->
+ [].
+
read_cstructs_from_disc() ->
%% Assumptions:
%% - local schema lock in global
@@ -551,8 +640,9 @@ read_cstructs_from_disc() ->
{type, set}],
case dets:open_file(make_ref(), Args) of
{ok, Tab} ->
+ ExtTypes = get_ext_types_disc(),
Fun = fun({_, _, List}) ->
- {continue, list2cs(List)}
+ {continue, list2cs(List, ExtTypes)}
end,
Cstructs = dets:traverse(Tab, Fun),
dets:close(Tab),
@@ -632,7 +722,7 @@ do_insert_schema_ops(_Store, []) ->
api_list2cs(List) when is_list(List) ->
Name = pick(unknown, name, List, must),
- Keys = check_keys(Name, List, record_info(fields, cstruct)),
+ Keys = check_keys(Name, List),
check_duplicates(Name, Keys),
list2cs(List);
api_list2cs(Other) ->
@@ -647,7 +737,15 @@ cs2list(Cs) when is_record(Cs, cstruct) ->
cs2list(CreateList) when is_list(CreateList) ->
CreateList;
-%% since 4.6
+cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 20 ->
+ Tags = [name,type,
+ ram_copies,disc_copies,disc_only_copies,external_copies,
+ load_order,access_mode,majority,index,snmp,local_content,
+ record_name,attributes,
+ user_properties,frag_properties,storage_properties,
+ cookie,version],
+ rec2list(Tags, Tags, 2, Cs);
+%% since vsn-4.6 (protocol 8.2 or older)
cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 19 ->
Tags = [name,type,ram_copies,disc_copies,disc_only_copies,
load_order,access_mode,majority,index,snmp,local_content,
@@ -657,8 +755,38 @@ cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 19 ->
rec2list(Tags, Tags, 2, Cs).
cs2list(false, Cs) ->
- cs2list(Cs).
+ cs2list(Cs);
+cs2list({8,3}, Cs) ->
+ cs2list(Cs);
+cs2list({8,Minor}, Cs) when Minor =:= 2; Minor =:= 1 ->
+ Orig = record_info(fields, cstruct),
+ Tags = [name,type,ram_copies,disc_copies,disc_only_copies,
+ load_order,access_mode,majority,index,snmp,local_content,
+ record_name,attributes,
+ user_properties,frag_properties,storage_properties,
+ cookie,version],
+ CsList = rec2list(Tags, Orig, 2, Cs),
+ case proplists:get_value(index, CsList, []) of
+ [] -> CsList;
+ NewFormat ->
+ OldFormat = [Pos || {Pos, _Pref} <- NewFormat],
+ lists:keyreplace(index, 1, CsList, {index, OldFormat})
+ end.
+rec2list([index | Tags], [index|Orig], Pos, Rec) ->
+ Val = element(Pos, Rec),
+ [{index, lists:map(
+ fun({_, _Type}=P) -> P;
+ (P) when is_integer(P); is_atom(P) -> {P, ordered}
+ end, Val)} | rec2list(Tags, Orig, Pos + 1, Rec)];
+rec2list([external_copies | Tags], Orig0, Pos, Rec) ->
+ Orig = case Orig0 of
+ [external_copies|Rest] -> Rest;
+ _ -> Orig0
+ end,
+ Val = element(Pos, Rec),
+ [{Alias, Ns} || {{Alias,_}, Ns} <- Val]
+ ++ rec2list(Tags, Orig, Pos+1, Rec);
rec2list([Tag | Tags], [Tag | Orig], Pos, Rec) ->
Val = element(Pos, Rec),
[{Tag, Val} | rec2list(Tags, Orig, Pos + 1, Rec)];
@@ -667,17 +795,33 @@ rec2list([], _, _Pos, _Rec) ->
rec2list(Tags, [_|Orig], Pos, Rec) ->
rec2list(Tags, Orig, Pos+1, Rec).
-normalize_cs(Cstructs, _Node) ->
- Cstructs.
+normalize_cs(Cstructs, Node) ->
+ %% backward-compatibility hack; normalize before returning
+ case need_old_cstructs([Node]) of
+ false ->
+ Cstructs;
+ Version ->
+ %% some other format
+ [convert_cs(Version, Cs) || Cs <- Cstructs]
+ end.
+
+convert_cs(Version, Cs) ->
+ Fields = [Value || {_, Value} <- cs2list(Version, Cs)],
+ list_to_tuple([cstruct|Fields]).
+
+list2cs(List) ->
+ list2cs(List, get_ext_types()).
-list2cs(List) when is_list(List) ->
+list2cs(List, ExtTypes) when is_list(List) ->
Name = pick(unknown, name, List, must),
Type = pick(Name, type, List, set),
Rc0 = pick(Name, ram_copies, List, []),
Dc = pick(Name, disc_copies, List, []),
Doc = pick(Name, disc_only_copies, List, []),
- Rc = case {Rc0, Dc, Doc} of
- {[], [], []} -> [node()];
+
+ Ext = pick_external_copies(List, ExtTypes),
+ Rc = case {Rc0, Dc, Doc, Ext} of
+ {[], [], [], []} -> [node()];
_ -> Rc0
end,
LC = pick(Name, local_content, List, false),
@@ -695,8 +839,6 @@ list2cs(List) when is_list(List) ->
Ix = pick(Name, index, List, []),
verify({alt, [nil, list]}, mnesia_lib:etype(Ix),
{bad_type, Name, {index, [Ix]}}),
- Ix2 = [attr_to_pos(I, Attrs) || I <- Ix],
-
Frag = pick(Name, frag_properties, List, []),
verify({alt, [nil, list]}, mnesia_lib:etype(Frag),
{badarg, Name, {frag_properties, Frag}}),
@@ -723,24 +865,48 @@ list2cs(List) when is_list(List) ->
DetsOpts = proplists:get_value(dets, BEProps, []),
is_list(DetsOpts) orelse mnesia:abort({badarg, Name, {dets, DetsOpts}}),
[CheckProp(Prop, BadDetsOpts) || Prop <- DetsOpts],
- #cstruct{name = Name,
- ram_copies = Rc,
- disc_copies = Dc,
- disc_only_copies = Doc,
- type = Type,
- index = Ix2,
- snmp = Snmp,
- load_order = LoadOrder,
- access_mode = AccessMode,
- majority = Majority,
- local_content = LC,
- record_name = RecName,
- attributes = Attrs,
- user_properties = lists:sort(UserProps),
- frag_properties = lists:sort(Frag),
- storage_properties = lists:sort(BEProps),
- cookie = Cookie,
- version = Version}.
+
+ case lists:keymember(mnesia, 1, application:which_applications()) of
+ true ->
+ Keys = check_keys(Name, List),
+ check_duplicates(Name, Keys);
+ false ->
+ %% check_keys/2 cannot be executed when mnesia is not
+ %% running, due to it not being possible to read what ext
+ %% backends are loaded.
+ %%% this doesn't work - disabled for now:
+ %%%Keys = check_keys(Name, List, record_info(fields, cstruct)),
+ %%%check_duplicates(Name, Keys)
+ ignore
+ end,
+
+ Cs0 = #cstruct{name = Name,
+ ram_copies = Rc,
+ disc_copies = Dc,
+ disc_only_copies = Doc,
+ external_copies = Ext,
+ type = Type,
+ index = Ix,
+ snmp = Snmp,
+ load_order = LoadOrder,
+ access_mode = AccessMode,
+ majority = Majority,
+ local_content = LC,
+ record_name = RecName,
+ attributes = Attrs,
+ user_properties = lists:sort(UserProps),
+ frag_properties = lists:sort(Frag),
+ storage_properties = lists:sort(BEProps),
+ cookie = Cookie,
+ version = Version},
+ case Ix of
+ [] -> Cs0;
+ [_|_] ->
+ Ix2 = expand_index_attrs(Cs0),
+ Cs0#cstruct{index = Ix2}
+ end;
+list2cs(Other, _ExtTypes) ->
+ mnesia:abort({badarg, Other}).
pick(Tab, Key, List, Default) ->
case lists:keysearch(Key, 1, List) of
@@ -754,6 +920,79 @@ pick(Tab, Key, List, Default) ->
mnesia:abort({bad_type, Tab, BadArg})
end.
+pick_external_copies(_List, []) ->
+ [];
+pick_external_copies(List, ExtTypes) ->
+ lists:foldr(
+ fun({K, Val}, Acc) ->
+ case lists:keyfind(K, 1, ExtTypes) of
+ false ->
+ Acc;
+ {_, Mod} ->
+ [{{K,Mod}, Val}|Acc]
+ end
+ end, [], List).
+
+expand_storage_type(S) when S==ram_copies;
+ S==disc_copies;
+ S==disc_only_copies ->
+ S;
+expand_storage_type(S) ->
+ case lists:keyfind(S, 1, get_ext_types()) of
+ false ->
+ mnesia:abort({bad_type, {storage_type, S}});
+ {Alias, Mod} ->
+ {ext, Alias, Mod}
+ end.
+
+get_ext_types() ->
+ get_schema_user_property(mnesia_backend_types).
+
+get_index_plugins() ->
+ get_schema_user_property(mnesia_index_plugins).
+
+get_schema_user_property(Key) ->
+ Tab = schema,
+ %% Must work reliably both within transactions and outside of transactions
+ Res = case get(mnesia_activity_state) of
+ undefined ->
+ dirty_read_table_property(Tab, Key);
+ _ ->
+ do_read_table_property(Tab, Key)
+ end,
+ case Res of
+ undefined ->
+ [];
+ {_, Types} ->
+ Types
+ end.
+
+get_ext_types_disc() ->
+ try get_ext_types_disc_()
+ catch
+ error:_ ->[]
+ end.
+
+get_ext_types_disc_() ->
+ case mnesia_schema:remote_read_schema() of
+ {ok, _, Prop} ->
+ K1 = user_properties,
+ case lists:keyfind(K1, 1, Prop) of
+ {K1, UserProp} ->
+ K2 = mnesia_backend_types,
+ case lists:keyfind(K2, 1, UserProp) of
+ {K2, Types} ->
+ Types;
+ _ ->
+ []
+ end;
+ _ ->
+ []
+ end;
+ _ ->
+ []
+ end.
+
%% Convert attribute name to integer if neccessary
attr_tab_to_pos(_Tab, Pos) when is_integer(Pos) ->
Pos;
@@ -761,6 +1000,7 @@ attr_tab_to_pos(Tab, Attr) ->
attr_to_pos(Attr, val({Tab, attributes})).
%% Convert attribute name to integer if neccessary
+attr_to_pos({_} = P, _) -> P;
attr_to_pos(Pos, _Attrs) when is_integer(Pos) ->
Pos;
attr_to_pos(Attr, Attrs) when is_atom(Attr) ->
@@ -775,8 +1015,18 @@ attr_to_pos(Attr, [_ | Attrs], Pos) ->
attr_to_pos(Attr, _, _) ->
mnesia:abort({bad_type, Attr}).
+check_keys(Tab, Attrs) ->
+ Types = [T || {T,_} <- get_ext_types()],
+ check_keys(Tab, Attrs, Types ++ record_info(fields, cstruct)).
+
check_keys(Tab, [{Key, _Val} | Tail], Items) ->
- case lists:member(Key, Items) of
+ Key1 = if
+ is_tuple(Key) ->
+ element(1, Key);
+ true ->
+ Key
+ end,
+ case lists:member(Key1, Items) of
true -> [Key | check_keys(Tab, Tail, Items)];
false -> mnesia:abort({badarg, Tab, Key})
end;
@@ -800,7 +1050,92 @@ has_duplicates([]) ->
false.
%% This is the only place where we check the validity of data
-verify_cstruct(Cs) when is_record(Cs, cstruct) ->
+
+verify_cstruct(#cstruct{} = Cs) ->
+ assert_correct_cstruct(Cs),
+ Cs1 = verify_external_copies(
+ Cs#cstruct{index = expand_index_attrs(Cs)}),
+ assert_correct_cstruct(Cs1),
+ Cs1.
+
+expand_index_attrs(#cstruct{index = Ix, attributes = Attrs,
+ name = Tab} = Cs) ->
+ Prefered = prefered_index_types(Cs),
+ expand_index_attrs(Ix, Tab, Attrs, Prefered).
+
+expand_index_attrs(Ix, Tab, Attrs, Prefered) ->
+ lists:map(fun(P) when is_integer(P); is_atom(P) ->
+ {attr_to_pos(P, Attrs), Prefered};
+ ({A} = P) when is_atom(A) ->
+ {P, Prefered};
+ ({P, Type}) ->
+ {attr_to_pos(P, Attrs), Type};
+ (_Other) ->
+ mnesia:abort({bad_type, Tab, {index, Ix}})
+ end, Ix).
+
+prefered_index_types(#cstruct{external_copies = Ext}) ->
+ ExtTypes = [mnesia_lib:semantics(S, index_types) ||
+ {S,Ns} <- Ext, Ns =/= []],
+ case intersect_types(ExtTypes) of
+ [] -> ordered;
+ [Pref|_] -> Pref
+ end.
+
+intersect_types([]) ->
+ [];
+intersect_types([S1, S2|Rest]) ->
+ intersect_types([S1 -- (S1 -- S2)|Rest]);
+intersect_types([S]) ->
+ S.
+
+verify_external_copies(#cstruct{external_copies = []} = Cs) ->
+ Cs;
+verify_external_copies(#cstruct{name = Tab, external_copies = EC} = Cs) ->
+ Bad = {bad_type, Tab, {external_copies, EC}},
+ AllECNodes = lists:concat([Ns || {_, Ns} <- EC,
+ is_list(Ns)]),
+ verify(true, length(lists:usort(AllECNodes)) == length(AllECNodes), Bad),
+ CsL = cs2list(Cs),
+ CsL1 = lists:foldl(
+ fun({{Alias, Mod}, Ns} = _X, CsLx) ->
+ BadTab = fun(Why) ->
+ {Why, Tab, {{ext, Alias, Mod},Ns}}
+ end,
+ verify(atom, mnesia_lib:etype(Mod), BadTab),
+ verify(true, fun() ->
+ lists:all(fun is_atom/1, Ns)
+ end, BadTab),
+ check_semantics(Mod, Alias, BadTab, Cs),
+ try Mod:check_definition(Alias, Tab, Ns, CsLx) of
+ ok ->
+ CsLx;
+ {ok, CsLx1} ->
+ CsLx1;
+ {error, Reason} ->
+ mnesia:abort(BadTab(Reason))
+ catch
+ error:E ->
+ mnesia:abort(BadTab(E))
+ end;
+ (_, CsLx) ->
+ CsLx
+ end, CsL, EC),
+ list2cs(CsL1).
+
+check_semantics(Mod, Alias, BadTab, #cstruct{type = Type}) ->
+ Ext = {ext, Alias, Mod},
+ case lists:member(mnesia_lib:semantics(Ext, storage), [ram_copies, disc_copies,
+ disc_only_copies]) of
+ false -> mnesia:abort(BadTab(invalid_storage));
+ true -> ok
+ end,
+ case lists:member(Type, mnesia_lib:semantics(Ext, types)) of
+ false -> mnesia:abort(BadTab(bad_type));
+ true -> ok
+ end.
+
+assert_correct_cstruct(Cs) when is_record(Cs, cstruct) ->
verify_nodes(Cs),
Tab = Cs#cstruct.name,
@@ -841,22 +1176,30 @@ verify_cstruct(Cs) when is_record(Cs, cstruct) ->
Attrs),
Index = Cs#cstruct.index,
+
verify({alt, [nil, list]}, mnesia_lib:etype(Index),
{bad_type, Tab, {index, Index}}),
+ IxPlugins = get_index_plugins(),
+ AllowIndexOnKey = check_if_allow_index_on_key(),
IxFun =
- fun(Pos) ->
- verify(true, fun() ->
- if
- is_integer(Pos),
- Pos > 2,
- Pos =< Arity ->
- true;
- true -> false
- end
- end,
- {bad_type, Tab, {index, [Pos]}})
- end,
+ fun(Pos) ->
+ verify(
+ true, fun() ->
+ I = index_pos(Pos),
+ case Pos of
+ {_, T} ->
+ (T==bag orelse T==ordered)
+ andalso good_ix_pos(
+ I, AllowIndexOnKey,
+ Arity, IxPlugins);
+ _ ->
+ good_ix_pos(Pos, AllowIndexOnKey,
+ Arity, IxPlugins)
+ end
+ end,
+ {bad_type, Tab, {index, [Pos]}})
+ end,
lists:foreach(IxFun, Index),
LC = Cs#cstruct.local_content,
@@ -880,7 +1223,9 @@ verify_cstruct(Cs) when is_record(Cs, cstruct) ->
{badarg, Tab, {snmp, Snmp}}),
CheckProp = fun(Prop) when is_tuple(Prop), size(Prop) >= 1 -> ok;
- (Prop) -> mnesia:abort({bad_type, Tab, {user_properties, [Prop]}})
+ (Prop) ->
+ mnesia:abort({bad_type, Tab,
+ {user_properties, [Prop]}})
end,
lists:foreach(CheckProp, Cs#cstruct.user_properties),
@@ -900,17 +1245,45 @@ verify_cstruct(Cs) when is_record(Cs, cstruct) ->
mnesia:abort({bad_type, Tab, {version, Version}})
end.
+good_ix_pos({_} = P, _, _, Plugins) ->
+ lists:keymember(P, 1, Plugins);
+good_ix_pos(I, true, Arity, _) when is_integer(I) ->
+ I >= 0 andalso I =< Arity;
+good_ix_pos(I, false, Arity, _) when is_integer(I) ->
+ I > 2 andalso I =< Arity;
+good_ix_pos(_, _, _, _) ->
+ false.
+
+
+check_if_allow_index_on_key() ->
+ case mnesia_monitor:get_env(allow_index_on_key) of
+ true ->
+ true;
+ _ ->
+ false
+ end.
+
verify_nodes(Cs) ->
Tab = Cs#cstruct.name,
Ram = Cs#cstruct.ram_copies,
Disc = Cs#cstruct.disc_copies,
DiscOnly = Cs#cstruct.disc_only_copies,
+ Ext = lists:append([Ns || {_,Ns} <- Cs#cstruct.external_copies]),
LoadOrder = Cs#cstruct.load_order,
verify({alt, [nil, list]}, mnesia_lib:etype(Ram),
{bad_type, Tab, {ram_copies, Ram}}),
verify({alt, [nil, list]}, mnesia_lib:etype(Disc),
{bad_type, Tab, {disc_copies, Disc}}),
+ lists:foreach(
+ fun({BE, Ns}) ->
+ verify({alt, [nil, list]}, mnesia_lib:etype(Ns),
+ {bad_type, Tab, {BE, Ns}}),
+ lists:foreach(fun(N) ->
+ verify(atom, mnesia_lib:etype(N),
+ {bad_type, Tab, {BE, Ns}})
+ end, Ns)
+ end, Cs#cstruct.external_copies),
case Tab of
schema ->
verify([], DiscOnly, {bad_type, Tab, {disc_only_copies, DiscOnly}});
@@ -922,12 +1295,15 @@ verify_nodes(Cs) ->
verify(integer, mnesia_lib:etype(LoadOrder),
{bad_type, Tab, {load_order, LoadOrder}}),
- Nodes = Ram ++ Disc ++ DiscOnly,
+ Nodes = Ram ++ Disc ++ DiscOnly ++ Ext,
verify(list, mnesia_lib:etype(Nodes),
{combine_error, Tab,
- [{ram_copies, []}, {disc_copies, []}, {disc_only_copies, []}]}),
+ [{ram_copies, []}, {disc_copies, []},
+ {disc_only_copies, []}, {external_copies, []}]}),
verify(false, has_duplicates(Nodes), {combine_error, Tab, Nodes}),
- AtomCheck = fun(N) -> verify(atom, mnesia_lib:etype(N), {bad_type, Tab, N}) end,
+ AtomCheck = fun(N) ->
+ verify(atom, mnesia_lib:etype(N), {bad_type, Tab, N})
+ end,
lists:foreach(AtomCheck, Nodes).
verify(Expected, Fun, Error) when is_function(Fun) ->
@@ -1010,28 +1386,194 @@ check_active([{badrpc, Reason} | _Replies], Expl, Tab) ->
check_active([], _Expl, _Tab) ->
ok.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Function for definining an external backend type
+
+add_backend_type(Name, Module) ->
+ case schema_transaction(fun() -> do_add_backend_type(Name, Module) end) of
+ {atomic, NeedsInit} ->
+ case NeedsInit of
+ true ->
+ Module:init_backend();
+ false ->
+ ignore
+ end,
+ Module:add_aliases([Name]),
+ {atomic, ok};
+ Other ->
+ Other
+ end.
+
+do_add_backend_type(Name, Module) ->
+ verify_backend_type(Name, Module),
+ Types = case do_read_table_property(schema, mnesia_backend_types) of
+ undefined ->
+ [];
+ {_, Ts} ->
+ case lists:keymember(Name, 1, Ts) of
+ true ->
+ mnesia:abort({backend_type_already_exists, Name});
+ false ->
+ Ts
+ end
+ end,
+ ModuleRegistered = lists:keymember(Module, 2, Types),
+ do_write_table_property(schema, {mnesia_backend_types,
+ [{Name, Module}|Types]}),
+ ModuleRegistered.
+
+delete_backend_type(Name) ->
+ schema_transaction(fun() -> do_delete_backend_type(Name) end).
+
+do_delete_backend_type(Name) ->
+ case do_read_table_property(schema, mnesia_backend_types) of
+ undefined ->
+ [];
+ {_, Ts} ->
+ case lists:keyfind(Name, 1, Ts) of
+ {_, Mod} ->
+ case using_backend_type(Name, Mod) of
+ [_|_] = Tabs ->
+ mnesia:abort({backend_in_use, {Name, Tabs}});
+ [] ->
+ do_write_table_property(
+ schema, {mnesia_backend_types,
+ lists:keydelete(Name, 1, Ts)})
+ end;
+ false ->
+ mnesia:abort({no_such_backend, Name})
+ end
+ end.
+
+using_backend_type(Name, Mod) ->
+ Ext = ets:select(mnesia_gvar,
+ [{ {{'$1',external_copies},'$2'}, [], [{{'$1','$2'}}] }]),
+ Entry = {Name, Mod},
+ [T || {T,C} <- Ext,
+ lists:keymember(Entry, 1, C)].
+
+verify_backend_type(Name, Module) ->
+ case legal_backend_name(Name) of
+ false ->
+ mnesia:abort({bad_type, {backend_type,Name,Module}});
+ true ->
+ ok
+ end,
+ ExpectedExports = mnesia_backend_type:behaviour_info(callbacks),
+ Exports = try Module:module_info(exports)
+ catch
+ error:_ ->
+ mnesia:abort({undef_backend, Module})
+ end,
+ case ExpectedExports -- Exports of
+ [] ->
+ ok;
+ _Other ->
+ io:fwrite(user, "Missing backend_type exports: ~p~n", [_Other]),
+ mnesia:abort({bad_type, {backend_type,Name,Module}})
+ end.
+
+legal_backend_name(Name) ->
+ is_atom(Name) andalso
+ (not lists:member(Name, record_info(fields, cstruct))).
+
+%% Used e.g. by mnesia:system_info(backend_types).
+backend_types() ->
+ [ram_copies, disc_copies, disc_only_copies |
+ [T || {T,_} <- get_ext_types()]].
+
+add_index_plugin(Name, Module, Function) ->
+ schema_transaction(
+ fun() -> do_add_index_plugin(Name, Module, Function) end).
+
+do_add_index_plugin(Name, Module, Function) ->
+ verify_index_plugin(Name, Module, Function),
+ Plugins = case do_read_table_property(schema, mnesia_index_plugins) of
+ undefined ->
+ [];
+ {_, Ps} ->
+ case lists:keymember(Name, 1, Ps) of
+ true ->
+ mnesia:abort({index_plugin_already_exists, Name});
+ false ->
+ Ps
+ end
+ end,
+ do_write_table_property(schema, {mnesia_index_plugins,
+ [{Name, Module, Function}|Plugins]}).
+
+delete_index_plugin(P) ->
+ schema_transaction(
+ fun() -> do_delete_index_plugin(P) end).
+
+do_delete_index_plugin({A} = P) when is_atom(A) ->
+ Plugins = get_index_plugins(),
+ case lists:keyfind(P, 1, Plugins) of
+ false ->
+ mnesia:abort({no_exists, {index_plugin, P}});
+ _Found ->
+ case ets:select(mnesia_gvar,
+ [{ {{'$1',{index,{P,'_'}}},'_'},[],['$1']},
+ { {{'$1',{index,P}},'_'},[],['$1']}], 1) of
+ {[_], _} ->
+ mnesia:abort({plugin_in_use, P});
+ '$end_of_table' ->
+ do_write_table_property(
+ schema, {mnesia_index_plugins,
+ lists:keydelete(P, 1, Plugins)})
+ end
+ end.
+
+verify_index_plugin({A} = Name, Module, Function)
+ when is_atom(A), is_atom(Module), is_atom(Function) ->
+ case code:ensure_loaded(Module) of
+ {error, nofile} ->
+ mnesia:abort({bad_type, {index_plugin,Name,Module,Function}});
+ {module,_} ->
+ %% Index plugins are called as Module:Function(Tab, Pos, Obj)
+ case erlang:function_exported(Module, Function, 3) of
+ true ->
+ ok;
+ false ->
+ mnesia:abort(
+ {bad_type, {index_plugin,Name,Module,Function}})
+ end
+ end;
+verify_index_plugin(Name, Module, Function) ->
+ mnesia:abort({bad_type, {index_plugin,Name,Module,Function}}).
+
+
+%% Used e.g. by mnesia:system_info(backend_types).
+index_plugins() ->
+ get_index_plugins().
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Here's the real interface function to create a table
-create_table(TabDef) ->
- schema_transaction(fun() -> do_multi_create_table(TabDef) end).
+create_table([_|_] = TabDef) ->
+ schema_transaction(fun() -> do_multi_create_table(TabDef) end);
+create_table(Arg) -> {aborted, {badarg, Arg}}.
%% And the corresponding do routines ....
do_multi_create_table(TabDef) ->
get_tid_ts_and_lock(schema, write),
ensure_writable(schema),
+ do_create_table(TabDef),
+ ok.
+
+do_create_table(TabDef) when is_list(TabDef) ->
Cs = api_list2cs(TabDef),
case Cs#cstruct.frag_properties of
[] ->
- do_create_table(Cs);
+ do_create_table_1(Cs);
_Props ->
CsList = mnesia_frag:expand_cstruct(Cs),
- lists:foreach(fun do_create_table/1, CsList)
- end,
- ok.
+ lists:foreach(fun do_create_table_1/1, CsList)
+ end.
-do_create_table(Cs) ->
+do_create_table_1(Cs) ->
{_Mod, _Tid, Ts} = get_tid_ts_and_lock(schema, none),
Store = Ts#tidstore.store,
do_insert_schema_ops(Store, make_create_table(Cs)).
@@ -1041,14 +1583,9 @@ make_create_table(Cs) ->
verify(false, check_if_exists(Tab), {already_exists, Tab}),
unsafe_make_create_table(Cs).
-% unsafe_do_create_table(Cs) ->
-% {_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, none),
-% Store = Ts#tidstore.store,
-% do_insert_schema_ops(Store, unsafe_make_create_table(Cs)).
-
-unsafe_make_create_table(Cs) ->
+unsafe_make_create_table(Cs0) ->
{_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, none),
- verify_cstruct(Cs),
+ Cs = verify_cstruct(Cs0),
Tab = Cs#cstruct.name,
%% Check that we have all disc replica nodes running
@@ -1196,8 +1733,7 @@ make_add_table_copy(Tab, Node, Storage) ->
Cs = incr_version(val({Tab, cstruct})),
Ns = mnesia_lib:cs_to_nodes(Cs),
verify(false, lists:member(Node, Ns), {already_exists, Tab, Node}),
- Cs2 = new_cs(Cs, Node, Storage, add),
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(new_cs(Cs, Node, Storage, add)),
%% Check storage and if node is running
IsRunning = lists:member(Node, val({current, db_nodes})),
@@ -1245,14 +1781,14 @@ make_del_table_copy(Tab, Node) ->
_ when Tab == schema ->
%% ensure_active(Cs2),
ensure_not_active(Tab, Node),
- verify_cstruct(Cs2),
+ Cs3 = verify_cstruct(Cs2),
Ops = remove_node_from_tabs(val({schema, tables}), Node),
- [{op, del_table_copy, ram_copies, Node, vsn_cs2list(Cs2)} | Ops];
+ [{op, del_table_copy, ram_copies, Node, vsn_cs2list(Cs3)} | Ops];
_ ->
ensure_active(Cs),
- verify_cstruct(Cs2),
- get_tid_ts_and_lock(Tab, write),
- [{op, del_table_copy, Storage, Node, vsn_cs2list(Cs2)}]
+ Cs3 = verify_cstruct(Cs2),
+ get_tid_ts_and_lock(Tab, write),
+ [{op, del_table_copy, Storage, Node, vsn_cs2list(Cs3)}]
end.
remove_node_from_tabs([], _Node) ->
@@ -1278,9 +1814,9 @@ remove_node_from_tabs([Tab|Rest], Node) ->
[{op, delete_table, vsn_cs2list(Cs)} |
remove_node_from_tabs(Rest, Node)];
_Ns ->
- verify_cstruct(Cs2),
+ Cs3 = verify_cstruct(Cs2),
get_tid_ts_and_lock(Tab, write),
- [{op, del_table_copy, ram_copies, Node, vsn_cs2list(Cs2)}|
+ [{op, del_table_copy, ram_copies, Node, vsn_cs2list(Cs3)}|
remove_node_from_tabs(Rest, Node)]
end
end.
@@ -1298,8 +1834,34 @@ new_cs(Cs, Node, disc_copies, del) ->
new_cs(Cs, Node, disc_only_copies, del) ->
Cs#cstruct{disc_only_copies =
lists:delete(Node , Cs#cstruct.disc_only_copies)};
-new_cs(Cs, _Node, Storage, _Op) ->
- mnesia:abort({badarg, Cs#cstruct.name, Storage}).
+new_cs(#cstruct{external_copies = ExtCps} = Cs, Node, Storage0, Op) ->
+ Storage = case Storage0 of
+ {ext, Alias, _} -> Alias;
+ Alias -> Alias
+ end,
+ ExtTypes = get_ext_types(),
+ case lists:keyfind(Storage, 1, ExtTypes) of
+ false ->
+ mnesia:abort({badarg, Cs#cstruct.name, Storage});
+ {_, Mod} ->
+ Key = {Storage, Mod},
+ case {lists:keymember(Key, 1, ExtCps), Op} of
+ {false, del} ->
+ Cs;
+ {false, add} ->
+ Cs#cstruct{external_copies = [{Key, [Node]}|ExtCps]};
+ {true, _} ->
+ F = fun({K, Ns}) when K == Key ->
+ case Op of
+ del -> {K, lists:delete(Node, Ns)};
+ add -> {K, opt_add(Node, Ns)}
+ end;
+ (X) ->
+ X
+ end,
+ Cs#cstruct{external_copies = lists:map(F, ExtCps)}
+ end
+ end.
opt_add(N, L) -> [N | lists:delete(N, L)].
@@ -1335,8 +1897,7 @@ make_move_table(Tab, FromNode, ToNode) ->
verify(true, lists:member(ToNode, Running), {not_active, schema, ToNode}),
Cs2 = new_cs(Cs, ToNode, Storage, add),
- Cs3 = new_cs(Cs2, FromNode, Storage, del),
- verify_cstruct(Cs3),
+ Cs3 = verify_cstruct(new_cs(Cs2, FromNode, Storage, del)),
[{op, add_table_copy, Storage, ToNode, vsn_cs2list(Cs2)},
{op, sync_trans},
{op, del_table_copy, Storage, FromNode, vsn_cs2list(Cs3)}].
@@ -1363,9 +1924,11 @@ make_change_table_copy_type(Tab, Node, ToS) ->
Cs = incr_version(val({Tab, cstruct})),
FromS = mnesia_lib:storage_type_at_node(Node, Tab),
- case compare_storage_type(false, FromS, ToS) of
+ ToSExp = expand_storage_type(ToS),
+
+ case compare_storage_type(false, FromS, ToSExp) of
{same, _} ->
- mnesia:abort({already_exists, Tab, Node, ToS});
+ mnesia:abort({already_exists, Tab, Node, ToSExp});
{diff, _} ->
ignore;
incompatible ->
@@ -1373,10 +1936,8 @@ make_change_table_copy_type(Tab, Node, ToS) ->
end,
Cs2 = new_cs(Cs, Node, FromS, del),
- Cs3 = new_cs(Cs2, Node, ToS, add),
- verify_cstruct(Cs3),
-
- [{op, change_table_copy_type, Node, FromS, ToS, vsn_cs2list(Cs3)}].
+ Cs3 = verify_cstruct(new_cs(Cs2, Node, ToS, add)),
+ [{op, change_table_copy_type, Node, FromS, ToSExp, vsn_cs2list(Cs3)}].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% change index functions ....
@@ -1398,11 +1959,12 @@ make_add_table_index(Tab, Pos) ->
Cs = incr_version(val({Tab, cstruct})),
ensure_active(Cs),
Ix = Cs#cstruct.index,
- verify(false, lists:member(Pos, Ix), {already_exists, Tab, Pos}),
+ verify(false, lists:keymember(index_pos(Pos), 1, Ix),
+ {already_exists, Tab, Pos}),
Ix2 = lists:sort([Pos | Ix]),
- Cs2 = Cs#cstruct{index = Ix2},
- verify_cstruct(Cs2),
- [{op, add_index, Pos, vsn_cs2list(Cs2)}].
+ Cs2 = verify_cstruct(Cs#cstruct{index = Ix2}),
+ NewPosInfo = lists:keyfind(Pos, 1, Cs2#cstruct.index),
+ [{op, add_index, NewPosInfo, vsn_cs2list(Cs2)}].
del_table_index(Tab, Pos) ->
schema_transaction(fun() -> do_del_table_index(Tab, Pos) end).
@@ -1420,9 +1982,8 @@ make_del_table_index(Tab, Pos) ->
Cs = incr_version(val({Tab, cstruct})),
ensure_active(Cs),
Ix = Cs#cstruct.index,
- verify(true, lists:member(Pos, Ix), {no_exists, Tab, Pos}),
- Cs2 = Cs#cstruct{index = lists:delete(Pos, Ix)},
- verify_cstruct(Cs2),
+ verify(true, lists:keymember(Pos, 1, Ix), {no_exists, Tab, Pos}),
+ Cs2 = verify_cstruct(Cs#cstruct{index = lists:keydelete(Pos, 1, Ix)}),
[{op, del_index, Pos, vsn_cs2list(Cs2)}].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1444,8 +2005,7 @@ make_add_snmp(Tab, Ustruct) ->
verify([], Cs#cstruct.snmp, {already_exists, Tab, snmp}),
Error = {badarg, Tab, snmp, Ustruct},
verify(true, mnesia_snmp_hook:check_ustruct(Ustruct), Error),
- Cs2 = Cs#cstruct{snmp = Ustruct},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(Cs#cstruct{snmp = Ustruct}),
[{op, add_snmp, Ustruct, vsn_cs2list(Cs2)}].
del_snmp(Tab) ->
@@ -1462,8 +2022,7 @@ make_del_snmp(Tab) ->
ensure_writable(schema),
Cs = incr_version(val({Tab, cstruct})),
ensure_active(Cs),
- Cs2 = Cs#cstruct{snmp = []},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(Cs#cstruct{snmp = []}),
[{op, del_snmp, vsn_cs2list(Cs2)}].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1492,19 +2051,21 @@ make_transform(Tab, Fun, NewAttrs, NewRecName) ->
Cs = incr_version(val({Tab, cstruct})),
ensure_active(Cs),
ensure_writable(Tab),
- case mnesia_lib:val({Tab, index}) of
+ case Cs#cstruct.index of
[] ->
- Cs2 = Cs#cstruct{attributes = NewAttrs, record_name = NewRecName},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(
+ Cs#cstruct{attributes = NewAttrs,
+ record_name = NewRecName}),
[{op, transform, Fun, vsn_cs2list(Cs2)}];
PosList ->
- DelIdx = fun(Pos, Ncs) ->
+ DelIdx = fun({Pos,_}, Ncs) ->
Ix = Ncs#cstruct.index,
- Ncs1 = Ncs#cstruct{index = lists:delete(Pos, Ix)},
+ Ix2 = lists:keydelete(Pos, 1, Ix),
+ Ncs1 = Ncs#cstruct{index = Ix2},
Op = {op, del_index, Pos, vsn_cs2list(Ncs1)},
{Op, Ncs1}
end,
- AddIdx = fun(Pos, Ncs) ->
+ AddIdx = fun({_,_} = Pos, Ncs) ->
Ix = Ncs#cstruct.index,
Ix2 = lists:sort([Pos | Ix]),
Ncs1 = Ncs#cstruct{index = Ix2},
@@ -1514,10 +2075,16 @@ make_transform(Tab, Fun, NewAttrs, NewRecName) ->
{DelOps, Cs1} = lists:mapfoldl(DelIdx, Cs, PosList),
Cs2 = Cs1#cstruct{attributes = NewAttrs, record_name = NewRecName},
{AddOps, Cs3} = lists:mapfoldl(AddIdx, Cs2, PosList),
- verify_cstruct(Cs3),
- lists:flatten([DelOps, {op, transform, Fun, vsn_cs2list(Cs2)}, AddOps])
+ _ = verify_cstruct(Cs3), % just a sanity check
+ lists:flatten([DelOps, {op, transform, Fun, vsn_cs2list(Cs2)},
+ AddOps])
end.
+index_pos({Pos,_}) -> Pos;
+index_pos(Pos) when is_integer(Pos) -> Pos;
+index_pos({P} = Pos) when is_atom(P) -> Pos.
+
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
@@ -1537,8 +2104,7 @@ make_change_table_access_mode(Tab, Mode) ->
ensure_active(Cs),
OldMode = Cs#cstruct.access_mode,
verify(false, OldMode == Mode, {already_exists, Tab, Mode}),
- Cs2 = Cs#cstruct{access_mode = Mode},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(Cs#cstruct{access_mode = Mode}),
[{op, change_table_access_mode, vsn_cs2list(Cs2), OldMode, Mode}].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1558,8 +2124,7 @@ make_change_table_load_order(Tab, LoadOrder) ->
Cs = incr_version(val({Tab, cstruct})),
ensure_active(Cs),
OldLoadOrder = Cs#cstruct.load_order,
- Cs2 = Cs#cstruct{load_order = LoadOrder},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(Cs#cstruct{load_order = LoadOrder}),
[{op, change_table_load_order, vsn_cs2list(Cs2), OldLoadOrder, LoadOrder}].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@@ -1605,6 +2170,7 @@ write_table_property(Tab, Prop) when is_tuple(Prop), size(Prop) >= 1 ->
schema_transaction(fun() -> do_write_table_property(Tab, Prop) end);
write_table_property(Tab, Prop) ->
{aborted, {bad_type, Tab, Prop}}.
+
do_write_table_property(Tab, Prop) ->
TidTs = get_tid_ts_and_lock(schema, write),
{_, _, Ts} = TidTs,
@@ -1636,8 +2202,7 @@ make_write_table_properties(Tab, [Prop | Props], Cs) ->
PropKey = element(1, Prop),
DelProps = lists:keydelete(PropKey, 1, OldProps),
MergedProps = lists:merge(DelProps, [Prop]),
- Cs2 = Cs#cstruct{user_properties = MergedProps},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(Cs#cstruct{user_properties = MergedProps}),
[{op, write_property, vsn_cs2list(Cs2), Prop} |
make_write_table_properties(Tab, Props, Cs2)];
make_write_table_properties(_Tab, [], _Cs) ->
@@ -1682,22 +2247,38 @@ do_read_table_property(Tab, Key) ->
{_, _, Ts} = TidTs,
Store = Ts#tidstore.store,
Props = ets:foldl(
- fun({op, create_table, [{name, T}|Opts]}, _Acc)
- when T==Tab ->
+ fun({op, announce_im_running,_,Opts,_,_}, _Acc) when Tab==schema ->
+ find_props(Opts);
+ ({op, create_table, [{name, T}|Opts]}, _Acc)
+ when T==Tab ->
find_props(Opts);
({op, Op, [{name,T}|Opts], _Prop}, _Acc)
- when T==Tab, Op==write_property; Op==delete_property ->
+ when T==Tab, Op==write_property;
+ T==Tab, Op==delete_property ->
find_props(Opts);
({op, delete_table, [{name,T}|_]}, _Acc)
when T==Tab ->
[];
(_Other, Acc) ->
Acc
- end, [], Store),
- case lists:keysearch(Key, 1, Props) of
- {value, Property} ->
- Property;
- false ->
+ end, undefined, Store),
+ case Props of
+ undefined ->
+ get_tid_ts_and_lock(Tab, read),
+ dirty_read_table_property(Tab, Key);
+ _ when is_list(Props) ->
+ case lists:keyfind(Key, 1, Props) of
+ false ->
+ undefined;
+ Other ->
+ Other
+ end
+ end.
+
+dirty_read_table_property(Tab, Key) ->
+ try ets:lookup_element(mnesia_gvar, {Tab,user_property,Key}, 2)
+ catch
+ error:_ ->
undefined
end.
@@ -1757,8 +2338,7 @@ make_delete_table_properties(Tab, PropKeys) ->
make_delete_table_properties(Tab, [PropKey | PropKeys], Cs) ->
OldProps = Cs#cstruct.user_properties,
Props = lists:keydelete(PropKey, 1, OldProps),
- Cs2 = Cs#cstruct{user_properties = Props},
- verify_cstruct(Cs2),
+ Cs2 = verify_cstruct(Cs#cstruct{user_properties = Props}),
[{op, delete_property, vsn_cs2list(Cs2), PropKey} |
make_delete_table_properties(Tab, PropKeys, Cs2)];
make_delete_table_properties(_Tab, [], _Cs) ->
@@ -1899,6 +2479,11 @@ prepare_op(Tid, {op, create_table, TabDef}, _WaitFor) ->
create_disc_only_table(Tab,Cs),
insert_cstruct(Tid, Cs, false),
{true, optional};
+ {ext, Alias, Mod} ->
+ mnesia_lib:set({Tab, create_table},true),
+ create_external_table(Alias, Tab, Mod, Cs),
+ insert_cstruct(Tid, Cs, false),
+ {true, optional};
unknown -> %% No replica on this node
mnesia_lib:set({Tab, create_table},true),
insert_cstruct(Tid, Cs, false),
@@ -1967,6 +2552,12 @@ prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef}, _WaitFor)
NotActive = mnesia_lib:not_active_here(Tab),
+ if Tab =/= schema ->
+ check_if_disc_required(FromS, ToS);
+ true ->
+ ok
+ end,
+
if
NotActive == true ->
mnesia:abort({not_active, Tab, node()});
@@ -2006,6 +2597,15 @@ prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef}, _WaitFor)
mnesia:abort({combine_error, Tab, ToS})
end;
+ element(1,FromS) == ext; element(1,ToS) == ext ->
+ if ToS == ram_copies ->
+ create_ram_table(Tab, Cs);
+ true ->
+ ok
+ end,
+ mnesia_dumper:dump_to_logfile(FromS, Tab),
+ mnesia_checkpoint:tm_change_table_copy_type(Tab, FromS, ToS);
+
FromS == ram_copies ->
case mnesia_monitor:use_dir() of
true ->
@@ -2021,7 +2621,9 @@ prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef}, _WaitFor)
disc_only_copies ->
mnesia_dumper:raw_named_dump_table(Tab, dmp)
end,
- mnesia_checkpoint:tm_change_table_copy_type(Tab, FromS, ToS)
+ mnesia_checkpoint:tm_change_table_copy_type(Tab,
+ FromS,
+ ToS)
end;
false ->
mnesia:abort({has_no_disc, node()})
@@ -2029,6 +2631,7 @@ prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef}, _WaitFor)
FromS == disc_copies, ToS == disc_only_copies ->
mnesia_dumper:raw_named_dump_table(Tab, dmp);
+
FromS == disc_only_copies ->
Type = Cs#cstruct.type,
create_ram_table(Tab, Cs),
@@ -2040,6 +2643,7 @@ prepare_op(_Tid, {op, change_table_copy_type, N, FromS, ToS, TabDef}, _WaitFor)
Err = "Failed to copy disc data to ram",
mnesia:abort({system_limit, Tab, {Err,Reason}})
end;
+
true ->
ignore
end,
@@ -2103,6 +2707,8 @@ prepare_op(_Tid, {op, transform, Fun, TabDef}, _WaitFor) ->
{true, Objs, mandatory}
catch _:Reason ->
mnesia_lib:db_fixtable(Storage, Tab, false),
+ mnesia_lib:important("Transform function failed: '~p' in '~p'",
+ [Reason, erlang:get_stacktrace()]),
exit({"Bad transform function", Tab, Fun, node(), Reason})
end
end;
@@ -2119,6 +2725,22 @@ prepare_op(_Tid, {op, merge_schema, TabDef}, _WaitFor) ->
prepare_op(_Tid, _Op, _WaitFor) ->
{true, optional}.
+check_if_disc_required(FromS, ToS) ->
+ FromSem = mnesia_lib:semantics(FromS, storage),
+ ToSem = mnesia_lib:semantics(ToS, storage),
+ case {FromSem, ToSem} of
+ {ram_copies, _} when ToSem == disc_copies;
+ ToSem == disc_only_copies ->
+ case mnesia_monitor:use_dir() of
+ true ->
+ ok;
+ false ->
+ mnesia:abort({has_no_disc, node()})
+ end;
+ _ ->
+ ok
+ end.
+
create_ram_table(Tab, #cstruct{type=Type, storage_properties=Props}) ->
EtsOpts = proplists:get_value(ets, Props, []),
Args = [{keypos, 2}, public, named_table, Type | EtsOpts],
@@ -2160,6 +2782,14 @@ create_disc_only_table(Tab, #cstruct{type=Type, storage_properties=Props}) ->
mnesia:abort({system_limit, Tab, {Err,Reason}})
end.
+create_external_table(Alias, Tab, Mod, Cs) ->
+ case mnesia_monitor:unsafe_create_external(Tab, Alias, Mod, Cs) of
+ ok ->
+ ok;
+ {error,Reason} ->
+ Err = "Failed to create external table",
+ mnesia:abort({system_limit, Tab, {Err,Reason}})
+ end.
receive_sync([], Pids) ->
Pids;
@@ -2294,7 +2924,10 @@ undo_prepare_op(Tid, {op, create_table, TabDef}) ->
mnesia_monitor:unsafe_close_dets(Tab),
Dat = mnesia_lib:tab2dat(Tab),
%% disc_delete_table(Tab, Storage),
- file:delete(Dat)
+ file:delete(Dat);
+ {ext, Alias, Mod} ->
+ Mod:close_table(Alias, Tab),
+ Mod:delete_table(Alias, Tab)
end;
undo_prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}) ->
@@ -2394,6 +3027,8 @@ ram_delete_table(Tab, Storage) ->
case Storage of
unknown ->
ignore;
+ {ext, _, _} ->
+ ignore;
disc_only_copies ->
ignore;
_Else ->
@@ -2455,13 +3090,27 @@ purge_known_files([File | Tail], KeepFiles, Dir, Suffixes) ->
ignore;
true ->
AbsFile = filename:join([Dir, File]),
- file:delete(AbsFile)
- end
+ delete_recursive(AbsFile)
+ end
end,
purge_known_files(Tail, KeepFiles, Dir, Suffixes);
purge_known_files([], _KeepFiles, _Dir, _Suffixes) ->
ok.
+%% Removes a directory or file recursively
+delete_recursive(Path) ->
+ case filelib:is_dir(Path) of
+ true ->
+ {ok, Names} = file:list_dir(Path),
+ lists:foreach(fun(Name) ->
+ delete_recursive(filename:join(Path, Name))
+ end,
+ Names),
+ file:del_dir(Path);
+ false ->
+ file:delete(Path)
+ end.
+
has_known_suffix(_File, _Suffixes, true) ->
true;
has_known_suffix(File, [Suffix | Tail], false) ->
@@ -2469,11 +3118,33 @@ has_known_suffix(File, [Suffix | Tail], false) ->
has_known_suffix(_File, [], Bool) ->
Bool.
-known_suffixes() -> real_suffixes() ++ tmp_suffixes().
+known_suffixes() -> known_suffixes(get_ext_types_disc()).
+
+known_suffixes(Ext) -> real_suffixes(Ext) ++ tmp_suffixes(Ext).
-real_suffixes() -> [".DAT", ".LOG", ".BUP", ".DCL", ".DCD"].
+real_suffixes(Ext) -> [".DAT", ".LOG", ".BUP", ".DCL", ".DCD"] ++ ext_real_suffixes(Ext).
-tmp_suffixes() -> [".TMP", ".BUPTMP", ".RET", ".DMP"].
+tmp_suffixes() -> tmp_suffixes(get_ext_types_disc()).
+
+tmp_suffixes(Ext) -> [".TMP", ".BUPTMP", ".RET", ".DMP", "."] ++ ext_tmp_suffixes(Ext).
+
+ext_real_suffixes(Ext) ->
+ try lists:foldl(fun(Mod, Acc) -> Acc++Mod:real_suffixes() end, [],
+ [M || {_,M} <- Ext])
+ catch
+ error:E ->
+ verbose("Cant find real ext suffixes (~p)~n", [E]),
+ []
+ end.
+
+ext_tmp_suffixes(Ext) ->
+ try lists:foldl(fun(Mod, Acc) -> Acc++Mod:tmp_suffixes() end, [],
+ [M || {_,M} <- Ext])
+ catch
+ error:E ->
+ verbose("Cant find tmp ext suffixes (~p)~n", [E]),
+ []
+ end.
info() ->
Tabs = lists:sort(val({schema, tables})),
@@ -2591,12 +3262,12 @@ do_restore(R, BupSchema) ->
arrange_restore(R, Fun, Recs) ->
R2 = R#r{insert_op = Fun, recs = Recs},
- case mnesia_bup:iterate(R#r.module, fun restore_items/4, R#r.opaque, R2) of
+ case mnesia_bup:iterate(R#r.module, fun restore_items/5, R#r.opaque, R2) of
{ok, R3} -> R3#r.recs;
{error, Reason} -> mnesia:abort(Reason)
end.
-restore_items([Rec | Recs], Header, Schema, R) ->
+restore_items([Rec | Recs], Header, Schema, Ext, R) ->
Tab = element(1, Rec),
case lists:keysearch(Tab, 1, R#r.tables) of
{value, {Tab, Where0, Snmp, RecName}} ->
@@ -2609,13 +3280,13 @@ restore_items([Rec | Recs], Header, Schema, R) ->
{Rest, NRecs} = restore_tab_items([Rec | Recs], Tab,
RecName, Where, Snmp,
R#r.recs, R#r.insert_op),
- restore_items(Rest, Header, Schema, R#r{recs = NRecs});
+ restore_items(Rest, Header, Schema, Ext, R#r{recs = NRecs});
false ->
Rest = skip_tab_items(Recs, Tab),
- restore_items(Rest, Header, Schema, R)
+ restore_items(Rest, Header, Schema, Ext, R)
end;
-restore_items([], _Header, _Schema, R) ->
+restore_items([], _Header, _Schema, _Ext, R) ->
R.
restore_func(Tab, R) ->
@@ -2629,8 +3300,14 @@ restore_func(Tab, R) ->
where_to_commit(Tab, CsList) ->
Ram = [{N, ram_copies} || N <- pick(Tab, ram_copies, CsList, [])],
Disc = [{N, disc_copies} || N <- pick(Tab, disc_copies, CsList, [])],
- DiscO = [{N, disc_only_copies} || N <- pick(Tab, disc_only_copies, CsList, [])],
- Ram ++ Disc ++ DiscO.
+ DiscO = [{N, disc_only_copies} ||
+ N <- pick(Tab, disc_only_copies, CsList, [])],
+ ExtNodes = [{Alias, Mod, pick(Tab, Alias, CsList, [])} ||
+ {Alias, Mod} <- get_ext_types()],
+ Ext = lists:foldl(fun({Alias, Mod, Ns}, Acc) ->
+ [{N, {ext, Alias, Mod}} || N <- Ns] ++ Acc
+ end, [], ExtNodes),
+ Ram ++ Disc ++ DiscO ++ Ext.
%% Changes of the Meta info of schema itself is not allowed
restore_schema([{schema, schema, _List} | Schema], R) ->
@@ -2709,7 +3386,13 @@ make_dump_tables([schema | _Tabs]) ->
make_dump_tables([Tab | Tabs]) ->
get_tid_ts_and_lock(Tab, read),
TabDef = get_create_list(Tab),
- DiscResident = val({Tab, disc_copies}) ++ val({Tab, disc_only_copies}),
+ DiscResident =
+ val({Tab, disc_copies}) ++
+ val({Tab, disc_only_copies}) ++
+ lists:concat([Ns || {{A,M},Ns} <- val({Tab, external_copies}),
+ lists:member(
+ mnesia_lib:semantics({ext,A,M},storage),
+ [disc_copies, disc_only_copies])]),
verify([], DiscResident,
{"Only allowed on ram_copies", Tab, DiscResident}),
[{op, dump_table, unknown, TabDef} | make_dump_tables(Tabs)];
@@ -2721,7 +3404,9 @@ merge_schema() ->
schema_transaction(fun() -> do_merge_schema([]) end).
merge_schema(UserFun) ->
- schema_transaction(fun() -> UserFun(fun(Arg) -> do_merge_schema(Arg) end) end).
+ schema_transaction(fun() ->
+ UserFun(fun(Arg) -> do_merge_schema(Arg) end)
+ end).
do_merge_schema(LockTabs0) ->
{_Mod, Tid, Ts} = get_tid_ts_and_lock(schema, write),
@@ -2795,11 +3480,23 @@ do_merge_schema(LockTabs0) ->
end.
fetch_cstructs(Node) ->
- rpc:call(Node, mnesia_controller, get_remote_cstructs, []).
+ Convert = mnesia_monitor:needs_protocol_conversion(Node),
+ case rpc:call(Node, mnesia_controller, get_remote_cstructs, []) of
+ {cstructs, Cs0, RemoteRunning1} when Convert ->
+ {cstructs, [list2cs(cs2list(Cs)) || Cs <- Cs0], RemoteRunning1};
+ Result ->
+ Result
+ end.
-need_old_cstructs() -> false.
+need_old_cstructs() ->
+ need_old_cstructs(val({schema, where_to_write})).
-need_old_cstructs(_Nodes) -> false.
+need_old_cstructs(Nodes) ->
+ Filter = fun(Node) -> mnesia_monitor:needs_protocol_conversion(Node) end,
+ case lists:filter(Filter, Nodes) of
+ [] -> false;
+ Ns -> lists:min([element(1, ?catch_val({protocol, Node})) || Node <- Ns])
+ end.
tab_to_nodes(Tab) when is_atom(Tab) ->
Cs = val({Tab, cstruct}),
@@ -2952,8 +3649,8 @@ do_make_merge_schema(Node, NeedsConv, RemoteCs = #cstruct{}) ->
%% invariants must be enforced in order to allow merge of cstructs.
%%
%% Returns a new cstruct or issues a fatal error
-merge_cstructs(Cs, RemoteCs, Force) ->
- verify_cstruct(Cs),
+merge_cstructs(Cs0, RemoteCs, Force) ->
+ Cs = verify_cstruct(Cs0),
try do_merge_cstructs(Cs, RemoteCs, Force) of
MergedCs when is_record(MergedCs, cstruct) ->
MergedCs
@@ -2963,15 +3660,15 @@ merge_cstructs(Cs, RemoteCs, Force) ->
error:Reason -> exit(Reason)
end.
-do_merge_cstructs(Cs, RemoteCs, Force) ->
- verify_cstruct(RemoteCs),
+do_merge_cstructs(Cs, RemoteCs0, Force) ->
+ RemoteCs = verify_cstruct(RemoteCs0),
Ns = mnesia_lib:uniq(mnesia_lib:cs_to_nodes(Cs) ++
mnesia_lib:cs_to_nodes(RemoteCs)),
{AnythingNew, MergedCs} =
merge_storage_type(Ns, false, Cs, RemoteCs, Force),
- MergedCs2 = merge_versions(AnythingNew, MergedCs, RemoteCs, Force),
- verify_cstruct(MergedCs2),
- MergedCs2.
+ verify_cstruct(
+ merge_versions(AnythingNew, MergedCs, RemoteCs, Force)).
+
merge_storage_type([N | Ns], AnythingNew, Cs, RemoteCs, Force) ->
Local = mnesia_lib:cs_to_storage_type(N, Cs),
@@ -3115,4 +3812,3 @@ unannounce_im_running([N | Ns]) ->
unannounce_im_running(Ns);
unannounce_im_running([]) ->
ok.
-
diff --git a/lib/mnesia/src/mnesia_sup.erl b/lib/mnesia/src/mnesia_sup.erl
index 5037a17ada..4aece81308 100644
--- a/lib/mnesia/src/mnesia_sup.erl
+++ b/lib/mnesia/src/mnesia_sup.erl
@@ -60,9 +60,10 @@ init() ->
Flags = {one_for_all, 0, 3600}, % Should be rest_for_one policy
Event = event_procs(),
+ Ext = ext_procs(),
Kernel = kernel_procs(),
- {ok, {Flags, Event ++ Kernel}}.
+ {ok, {Flags, Event ++ Ext ++ Kernel}}.
event_procs() ->
KillAfter = timer:seconds(30),
@@ -75,6 +76,11 @@ kernel_procs() ->
KA = infinity,
[{K, {K, start, []}, permanent, KA, supervisor, [K, supervisor]}].
+ext_procs() ->
+ K = mnesia_ext_sup,
+ KA = infinity,
+ [{K, {K, start, []}, permanent, KA, supervisor, [K, supervisor]}].
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% event handler
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl
index 6888f61dbd..b116b48312 100644
--- a/lib/mnesia/src/mnesia_tm.erl
+++ b/lib/mnesia/src/mnesia_tm.erl
@@ -42,7 +42,8 @@
put_activity_id/2,
block_tab/1,
unblock_tab/1,
- fixtable/3
+ fixtable/3,
+ new_cr_format/1
]).
%% sys callback functions
@@ -206,10 +207,10 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
{_From, {async_dirty, Tid, Commit, Tab}} ->
case lists:member(Tab, State#state.blocked_tabs) of
false ->
- do_async_dirty(Tid, Commit, Tab),
+ do_async_dirty(Tid, new_cr_format(Commit), Tab),
doit_loop(State);
true ->
- Item = {async_dirty, Tid, Commit, Tab},
+ Item = {async_dirty, Tid, new_cr_format(Commit), Tab},
State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
doit_loop(State2)
end;
@@ -217,10 +218,10 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
{From, {sync_dirty, Tid, Commit, Tab}} ->
case lists:member(Tab, State#state.blocked_tabs) of
false ->
- do_sync_dirty(From, Tid, Commit, Tab),
+ do_sync_dirty(From, Tid, new_cr_format(Commit), Tab),
doit_loop(State);
true ->
- Item = {sync_dirty, From, Tid, Commit, Tab},
+ Item = {sync_dirty, From, Tid, new_cr_format(Commit), Tab},
State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
doit_loop(State2)
end;
@@ -241,10 +242,11 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
reply(From, {error, {system_limit, Msg, Reason}}, State)
end;
- {From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} ->
+ {From, {ask_commit, Protocol, Tid, Commit0, DiscNs, RamNs}} ->
?eval_debug_fun({?MODULE, doit_ask_commit},
[{tid, Tid}, {prot, Protocol}]),
mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
+ Commit = new_cr_format(Commit0),
Pid =
case Protocol of
asym_trans when node(Tid#tid.pid) /= node() ->
@@ -1137,14 +1139,14 @@ arrange(Tid, Store, Type) ->
reverse([]) ->
[];
reverse([H=#commit{ram_copies=Ram, disc_copies=DC,
- disc_only_copies=DOC,snmp = Snmp}
+ disc_only_copies=DOC, ext=Ext}
|R]) ->
[
H#commit{
- ram_copies = lists:reverse(Ram),
- disc_copies = lists:reverse(DC),
- disc_only_copies = lists:reverse(DOC),
- snmp = lists:reverse(Snmp)
+ ram_copies = lists:reverse(Ram),
+ disc_copies = lists:reverse(DC),
+ disc_only_copies = lists:reverse(DOC),
+ ext = [{Type, lists:reverse(E)} || {Type,E} <- Ext]
}
| reverse(R)].
@@ -1311,8 +1313,13 @@ pick_node({dirty,_}, Node, [], Done) ->
pick_node(_Tid, Node, [], _Done) ->
mnesia:abort({bad_commit, {missing_lock, Node}}).
-prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind == snmp ->
- Rec2 = Rec#commit{snmp = [Item | Rec#commit.snmp]},
+prepare_node(Node, Storage, [Item | Items], #commit{ext=Ext0}=Rec, Kind) when Kind == snmp ->
+ Rec2 = case lists:keytake(snmp, 1, Ext0) of
+ false ->
+ Rec#commit{ext = [{snmp,[Item]}|Ext0]};
+ {_, {snmp,Snmp},Ext} ->
+ Rec#commit{ext = [{snmp,[Item|Snmp]}|Ext]}
+ end,
prepare_node(Node, Storage, Items, Rec2, Kind);
prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema ->
Rec2 =
@@ -1323,7 +1330,15 @@ prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema ->
Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]};
disc_only_copies ->
Rec#commit{disc_only_copies =
- [Item | Rec#commit.disc_only_copies]}
+ [Item | Rec#commit.disc_only_copies]};
+ {ext, Alias, Mod} ->
+ Ext0 = Rec#commit.ext,
+ case lists:keytake(ext_copies, 1, Ext0) of
+ false ->
+ Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}]}|Ext0]};
+ {_,{_,EC},Ext} ->
+ Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}|EC]}|Ext]}
+ end
end,
prepare_node(Node, Storage, Items, Rec2, Kind);
prepare_node(_Node, _Storage, Items, Rec, Kind)
@@ -1750,16 +1765,30 @@ do_commit(Tid, Bin) when is_binary(Bin) ->
do_commit(Tid, binary_to_term(Bin));
do_commit(Tid, C) ->
do_commit(Tid, C, optional).
+
do_commit(Tid, Bin, DumperMode) when is_binary(Bin) ->
do_commit(Tid, binary_to_term(Bin), DumperMode);
do_commit(Tid, C, DumperMode) ->
mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode),
- R = do_snmp(Tid, C#commit.snmp),
+ R = do_snmp(Tid, proplists:get_value(snmp, C#commit.ext, [])),
R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R),
R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2),
R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3),
+ R5 = do_update_ext(Tid, C#commit.ext, R4),
mnesia_subscr:report_activity(Tid),
- R4.
+ R5.
+
+%% This could/should be optimized
+do_update_ext(_Tid, [], OldRes) -> OldRes;
+do_update_ext(Tid, Ext, OldRes) ->
+ case lists:keyfind(ext_copies, 1, Ext) of
+ false -> OldRes;
+ {_, Ops} ->
+ Do = fun({{ext, _,_} = Storage, Op}, R) ->
+ do_update(Tid, Storage, [Op], R)
+ end,
+ lists:foldl(Do, OldRes, Ops)
+ end.
%% Update the items
do_update(Tid, Storage, [Op | Ops], OldRes) ->
@@ -1782,12 +1811,12 @@ do_update(_Tid, _Storage, [], Res) ->
Res.
do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) ->
- commit_write(?catch_val({Tab, commit_work}), Tid,
+ commit_write(?catch_val({Tab, commit_work}), Tid, Storage,
Tab, K, Obj, undefined),
mnesia_lib:db_put(Storage, Tab, Obj);
do_update_op(Tid, Storage, {{Tab, K}, Val, delete}) ->
- commit_delete(?catch_val({Tab, commit_work}), Tid, Tab, K, Val, undefined),
+ commit_delete(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Val, undefined),
mnesia_lib:db_erase(Storage, Tab, K);
do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) ->
@@ -1805,86 +1834,84 @@ do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) ->
mnesia_lib:db_put(Storage, Tab, Zero),
{Zero, []}
end,
- commit_update(?catch_val({Tab, commit_work}), Tid, Tab,
+ commit_update(?catch_val({Tab, commit_work}), Tid, Storage, Tab,
K, NewObj, OldObjs),
element(3, NewObj);
do_update_op(Tid, Storage, {{Tab, Key}, Obj, delete_object}) ->
commit_del_object(?catch_val({Tab, commit_work}),
- Tid, Tab, Key, Obj, undefined),
+ Tid, Storage, Tab, Key, Obj),
mnesia_lib:db_match_erase(Storage, Tab, Obj);
do_update_op(Tid, Storage, {{Tab, Key}, Obj, clear_table}) ->
- commit_clear(?catch_val({Tab, commit_work}), Tid, Tab, Key, Obj),
+ commit_clear(?catch_val({Tab, commit_work}), Tid, Storage, Tab, Key, Obj),
mnesia_lib:db_match_erase(Storage, Tab, Obj).
-commit_write([], _, _, _, _, _) -> ok;
-commit_write([{checkpoints, CpList}|R], Tid, Tab, K, Obj, Old) ->
+commit_write([], _, _, _, _, _, _) -> ok;
+commit_write([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, Old) ->
mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
- commit_write(R, Tid, Tab, K, Obj, Old);
-commit_write([H|R], Tid, Tab, K, Obj, Old)
+ commit_write(R, Tid, Storage, Tab, K, Obj, Old);
+commit_write([H|R], Tid, Storage, Tab, K, Obj, Old)
when element(1, H) == subscribers ->
mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
- commit_write(R, Tid, Tab, K, Obj, Old);
-commit_write([H|R], Tid, Tab, K, Obj, Old)
+ commit_write(R, Tid, Storage, Tab, K, Obj, Old);
+commit_write([H|R], Tid, Storage, Tab, K, Obj, Old)
when element(1, H) == index ->
- mnesia_index:add_index(H, Tab, K, Obj, Old),
- commit_write(R, Tid, Tab, K, Obj, Old).
+ mnesia_index:add_index(H, Storage, Tab, K, Obj, Old),
+ commit_write(R, Tid, Storage, Tab, K, Obj, Old).
-commit_update([], _, _, _, _, _) -> ok;
-commit_update([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) ->
+commit_update([], _, _, _, _, _, _) -> ok;
+commit_update([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) ->
Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
- commit_update(R, Tid, Tab, K, Obj, Old);
-commit_update([H|R], Tid, Tab, K, Obj, Old)
+ commit_update(R, Tid, Storage, Tab, K, Obj, Old);
+commit_update([H|R], Tid, Storage, Tab, K, Obj, Old)
when element(1, H) == subscribers ->
mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
- commit_update(R, Tid, Tab, K, Obj, Old);
-commit_update([H|R], Tid, Tab, K, Obj, Old)
+ commit_update(R, Tid, Storage, Tab, K, Obj, Old);
+commit_update([H|R], Tid,Storage, Tab, K, Obj, Old)
when element(1, H) == index ->
- mnesia_index:add_index(H, Tab, K, Obj, Old),
- commit_update(R, Tid, Tab, K, Obj, Old).
+ mnesia_index:add_index(H, Storage, Tab, K, Obj, Old),
+ commit_update(R, Tid, Storage, Tab, K, Obj, Old).
-commit_delete([], _, _, _, _, _) -> ok;
-commit_delete([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) ->
+commit_delete([], _, _, _, _, _, _) -> ok;
+commit_delete([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) ->
Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete, CpList),
- commit_delete(R, Tid, Tab, K, Obj, Old);
-commit_delete([H|R], Tid, Tab, K, Obj, Old)
+ commit_delete(R, Tid, Storage, Tab, K, Obj, Old);
+commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old)
when element(1, H) == subscribers ->
mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete, Old),
- commit_delete(R, Tid, Tab, K, Obj, Old);
-commit_delete([H|R], Tid, Tab, K, Obj, Old)
+ commit_delete(R, Tid, Storage, Tab, K, Obj, Old);
+commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old)
when element(1, H) == index ->
- mnesia_index:delete_index(H, Tab, K),
- commit_delete(R, Tid, Tab, K, Obj, Old).
+ mnesia_index:delete_index(H, Storage, Tab, K),
+ commit_delete(R, Tid, Storage, Tab, K, Obj, Old).
commit_del_object([], _, _, _, _, _) -> ok;
-commit_del_object([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) ->
- Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList),
- commit_del_object(R, Tid, Tab, K, Obj, Old);
-commit_del_object([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == subscribers ->
- mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object, Old),
- commit_del_object(R, Tid, Tab, K, Obj, Old);
-commit_del_object([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == index ->
- mnesia_index:del_object_index(H, Tab, K, Obj, Old),
- commit_del_object(R, Tid, Tab, K, Obj, Old).
-
-commit_clear([], _, _, _, _) -> ok;
-commit_clear([{checkpoints, CpList}|R], Tid, Tab, K, Obj) ->
+commit_del_object([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) ->
+ mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList),
+ commit_del_object(R, Tid, Storage, Tab, K, Obj);
+commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers ->
+ mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object),
+ commit_del_object(R, Tid, Storage, Tab, K, Obj);
+commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == index ->
+ mnesia_index:del_object_index(H, Storage, Tab, K, Obj),
+ commit_del_object(R, Tid, Storage, Tab, K, Obj).
+
+commit_clear([], _, _, _, _, _) -> ok;
+commit_clear([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) ->
mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList),
- commit_clear(R, Tid, Tab, K, Obj);
-commit_clear([H|R], Tid, Tab, K, Obj)
+ commit_clear(R, Tid, Storage, Tab, K, Obj);
+commit_clear([H|R], Tid, Storage, Tab, K, Obj)
when element(1, H) == subscribers ->
mnesia_subscr:report_table_event(H, Tab, Tid, Obj, clear_table, undefined),
- commit_clear(R, Tid, Tab, K, Obj);
-commit_clear([H|R], Tid, Tab, K, Obj)
+ commit_clear(R, Tid, Storage, Tab, K, Obj);
+commit_clear([H|R], Tid, Storage, Tab, K, Obj)
when element(1, H) == index ->
mnesia_index:clear_index(H, Tab, K, Obj),
- commit_clear(R, Tid, Tab, K, Obj).
+ commit_clear(R, Tid, Storage, Tab, K, Obj).
do_snmp(_, []) -> ok;
-do_snmp(Tid, [Head | Tail]) ->
+do_snmp(Tid, [Head|Tail]) ->
try mnesia_snmp_hook:update(Head)
catch _:Reason ->
%% This should only happen when we recently have
@@ -1896,31 +1923,34 @@ do_snmp(Tid, [Head | Tail]) ->
end,
do_snmp(Tid, Tail).
-commit_nodes([C | Tail], AccD, AccR)
- when C#commit.disc_copies == [],
- C#commit.disc_only_copies == [],
- C#commit.schema_ops == [] ->
- commit_nodes(Tail, AccD, [C#commit.node | AccR]);
commit_nodes([C | Tail], AccD, AccR) ->
- commit_nodes(Tail, [C#commit.node | AccD], AccR);
+ case C of
+ #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} ->
+ case lists:keyfind(ext_copies, 1, Ext) of
+ false -> commit_nodes(Tail, AccD, [C#commit.node | AccR]);
+ _ -> commit_nodes(Tail, [C#commit.node | AccD], AccR)
+ end;
+ _ ->
+ commit_nodes(Tail, [C#commit.node | AccD], AccR)
+ end;
commit_nodes([], AccD, AccR) ->
{AccD, AccR}.
commit_decision(D, [C | Tail], AccD, AccR) ->
N = C#commit.node,
{D2, Tail2} =
- case C#commit.schema_ops of
- [] when C#commit.disc_copies == [],
- C#commit.disc_only_copies == [] ->
- commit_decision(D, Tail, AccD, [N | AccR]);
- [] ->
+ case C of
+ #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} ->
+ case lists:keyfind(ext_copies, 1, Ext) of
+ false -> commit_decision(D, Tail, AccD, [N | AccR]);
+ _ -> commit_decision(D, Tail, [N | AccD], AccR)
+ end;
+ #commit{schema_ops=[]} ->
commit_decision(D, Tail, [N | AccD], AccR);
- Ops ->
+ #commit{schema_ops=Ops} ->
case ram_only_ops(N, Ops) of
- true ->
- commit_decision(D, Tail, AccD, [N | AccR]);
- false ->
- commit_decision(D, Tail, [N | AccD], AccR)
+ true -> commit_decision(D, Tail, AccD, [N | AccR]);
+ false -> commit_decision(D, Tail, [N | AccD], AccR)
end
end,
{D2, [C#commit{decision = D2} | Tail2]};
@@ -1948,7 +1978,7 @@ sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) ->
Res = do_dirty(Tid, Head),
{WF, Res};
true ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
+ {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}},
sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor])
end;
sync_send_dirty(_Tid, [], _Tab, WaitFor) ->
@@ -1967,11 +1997,11 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) ->
NewRes = do_dirty(Tid, Head),
async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes);
ReadNode == Node ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
+ {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}},
NewRes = {'EXIT', {aborted, {node_not_running, Node}}},
async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes);
true ->
- {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}},
+ {?MODULE, Node} ! {self(), {async_dirty, Tid, ext_format(Head), Tab}},
async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res)
end;
async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) ->
@@ -2028,23 +2058,29 @@ ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
Node == node() ->
ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
true ->
- Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs),
- Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs},
+ CR = ext_format(Head),
+ Msg = {ask_commit, Protocol, Tid, CR, DiscNs, RamNs},
{?MODULE, Node} ! {self(), Msg},
ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
end;
ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
{WaitFor, Local}.
-%% This used to test protocol conversion between mnesia-nodes
-%% but it is really dependent on the emulator version on the
-%% two nodes (if funs are sent which they are in transform table op).
-%% to be safe we let erts do the translation (many times maybe and thus
-%% slower but it works.
-% opt_term_to_binary(asym_trans, Head, Nodes) ->
-% opt_term_to_binary(Nodes, Head);
-opt_term_to_binary(_Protocol, Head, _Nodes) ->
- Head.
+ext_format(#commit{ext=[]}=CR) -> CR;
+ext_format(#commit{node=Node, ext=Ext}=CR) ->
+ case mnesia_monitor:needs_protocol_conversion(Node) of
+ true ->
+ case lists:keyfind(snmp, 1, Ext) of
+ false -> CR#commit{ext=[]};
+ {snmp, List} -> CR#commit{ext=List}
+ end;
+ false -> CR
+ end.
+
+new_cr_format(#commit{ext=[]}=Cr) -> Cr;
+new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr;
+new_cr_format(#commit{ext=Snmp}=Cr) ->
+ Cr#commit{ext=[{snmp,Snmp}]}.
rec_all([Node | Tail], Tid, Res, Pids) ->
receive