aboutsummaryrefslogtreecommitdiffstats
path: root/lib/stdlib/src/dets.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stdlib/src/dets.erl')
-rw-r--r--lib/stdlib/src/dets.erl2989
1 files changed, 2989 insertions, 0 deletions
diff --git a/lib/stdlib/src/dets.erl b/lib/stdlib/src/dets.erl
new file mode 100644
index 0000000000..7f1c13770b
--- /dev/null
+++ b/lib/stdlib/src/dets.erl
@@ -0,0 +1,2989 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(dets).
+
+%% Disk based linear hashing lookup dictionary.
+
+%% Public.
+-export([all/0,
+ bchunk/2,
+ close/1,
+ delete/2,
+ delete_all_objects/1,
+ delete_object/2,
+ first/1,
+ foldl/3,
+ foldr/3,
+ from_ets/2,
+ info/1,
+ info/2,
+ init_table/2,
+ init_table/3,
+ insert/2,
+ insert_new/2,
+ is_compatible_bchunk_format/2,
+ is_dets_file/1,
+ lookup/2,
+ match/1,
+ match/2,
+ match/3,
+ match_delete/2,
+ match_object/1,
+ match_object/2,
+ match_object/3,
+ member/2,
+ next/2,
+ open_file/1,
+ open_file/2,
+ pid2name/1,
+ repair_continuation/2,
+ safe_fixtable/2,
+ select/1,
+ select/2,
+ select/3,
+ select_delete/2,
+ slot/2,
+ sync/1,
+ table/1,
+ table/2,
+ to_ets/2,
+ traverse/2,
+ update_counter/3]).
+
+%% Server export.
+-export([start/0, stop/0]).
+
+%% Internal exports.
+-export([istart_link/1, init/2, internal_open/3, add_user/3,
+ internal_close/1, remove_user/2,
+ system_continue/3, system_terminate/4, system_code_change/4]).
+
+%% Debug.
+-export([file_info/1,
+ fsck/1,
+ fsck/2,
+ get_head_field/2,
+ view/1,
+ where/2,
+ verbose/0,
+ verbose/1
+ ]).
+
+%% Not documented, or not ready for publication.
+-export([lookup_keys/2]).
+
+
+-compile({inline, [{einval,2},{badarg,2},{undefined,1},
+ {badarg_exit,2},{lookup_reply,2}]}).
+
+-include_lib("kernel/include/file.hrl").
+
+-include("dets.hrl").
+
+-type object() :: tuple().
+-type pattern() :: atom() | tuple().
+-type tab_name() :: atom() | reference().
+
+%%% This is the implementation of the mnesia file storage. Each (non
+%%% ram-copy) table is maintained in a corresponding .DAT file. The
+%%% dat file is organized as a segmented linear hashlist. The head of
+%%% the file with the split indicator, size etc is held in ram by the
+%%% server at all times.
+%%%
+%%% The parts specific for formats up to and including 8(c) are
+%%% implemented in dets_v8.erl, parts specific for format 9 are
+%%% implemented in dets_v9.erl.
+
+%% The method of hashing is the so called linear hashing algorithm
+%% with segments.
+%%
+%% Linear hashing:
+%%
+%% - n indicates next bucket to split (initially zero);
+%% - m is the size of the hash table
+%% - initially next = m and n = 0
+%%
+%% - to insert:
+%% - hash = key mod m
+%% - if hash < n then hash = key mod 2m
+%% - when the number of objects exceeds the initial size
+%% of the hash table, each insertion of an object
+%% causes bucket n to be split:
+%% - add a new bucket to the end of the table
+%% - redistribute the contents of bucket n
+%% using hash = key mod 2m
+%% - increment n
+%% - if n = m then m = 2m, n = 0
+%% - to search:
+%% hash = key mod m
+%% if hash < n then hash = key mod 2m
+%% do linear scan of the bucket
+%%
+
+%%% If a file error occurs on a working dets file, update_mode is set
+%%% to the error tuple. When in 'error' mode, the free lists are not
+%%% written, and a repair is forced next time the file is opened.
+
+-record(dets_cont, {
+ what, % object | bindings | select | bchunk
+ no_objs, % requested number of objects: default | integer() > 0
+ bin, % small chunk not consumed, or 'eof' at end-of-file
+ alloc, % the part of the file not yet scanned, mostly a binary
+ tab,
+ match_program % true | compiled_match_spec() | undefined
+ }).
+
+-record(open_args, {
+ file,
+ type,
+ keypos,
+ repair,
+ min_no_slots,
+ max_no_slots,
+ ram_file,
+ delayed_write,
+ auto_save,
+ access,
+ version,
+ debug
+ }).
+
+-define(PATTERN_TO_OBJECT_MATCH_SPEC(Pat), [{Pat,[],['$_']}]).
+-define(PATTERN_TO_BINDINGS_MATCH_SPEC(Pat), [{Pat,[],['$$']}]).
+-define(PATTERN_TO_TRUE_MATCH_SPEC(Pat), [{Pat,[],[true]}]).
+
+%%-define(DEBUGM(X, Y), io:format(X, Y)).
+-define(DEBUGM(X, Y), true).
+
+%%-define(DEBUGF(X,Y), io:format(X, Y)).
+-define(DEBUGF(X,Y), void).
+
+%%-define(PROFILE(C), C).
+-define(PROFILE(C), void).
+
+%%% Some further debug code was added in R12B-1 (stdlib-1.15.1):
+%%% - there is a new open_file() option 'debug';
+%%% - there is a new OS environment variable 'DETS_DEBUG';
+%%% - verbose(true) implies that info messages are written onto
+%%% the error log whenever an unsafe traversal is started.
+%%% The 'debug' mode (set by the open_file() option 'debug' or
+%%% by os:putenv("DETS_DEBUG", "true")) implies that the results of
+%%% calling pwrite() and pread() are tested to some extent. It also
+%%% means a considerable overhead when it comes to RAM usage. The
+%%% operation of Dets is also slowed down a bit. Note that in debug
+%%% mode terms will be output on the error logger.
+
+%%%----------------------------------------------------------------------
+%%% API
+%%%----------------------------------------------------------------------
+
+add_user(Pid, Tab, Args) ->
+ req(Pid, {add_user, Tab, Args}).
+
+-spec all() -> [tab_name()].
+
+all() ->
+ dets_server:all().
+
+-type cont() :: #dets_cont{}.
+-spec bchunk(tab_name(), 'start' | cont()) ->
+ {cont(), binary() | tuple()} | '$end_of_table' | {'error', term()}.
+
+bchunk(Tab, start) ->
+ badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]);
+bchunk(Tab, #dets_cont{bin = eof, tab = Tab}) ->
+ '$end_of_table';
+bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) ->
+ badarg(treq(Tab, {bchunk, State}), [Tab, State]);
+bchunk(Tab, Term) ->
+ erlang:error(badarg, [Tab, Term]).
+
+-spec close(tab_name()) -> 'ok' | {'error', term()}.
+
+close(Tab) ->
+ case dets_server:close(Tab) of
+ badarg -> % Should not happen.
+ {error, not_owner}; % Backwards compatibility...
+ Reply ->
+ Reply
+ end.
+
+-spec delete(tab_name(), term()) -> 'ok' | {'error', term()}.
+
+delete(Tab, Key) ->
+ badarg(treq(Tab, {delete_key, [Key]}), [Tab, Key]).
+
+-spec delete_all_objects(tab_name()) -> 'ok' | {'error', term()}.
+
+delete_all_objects(Tab) ->
+ case treq(Tab, delete_all_objects) of
+ badarg ->
+ erlang:error(badarg, [Tab]);
+ fixed ->
+ match_delete(Tab, '_');
+ Reply ->
+ Reply
+ end.
+
+-spec delete_object(tab_name(), object()) -> 'ok' | {'error', term()}.
+
+delete_object(Tab, O) ->
+ badarg(treq(Tab, {delete_object, [O]}), [Tab, O]).
+
+%% Given a filename, fsck it. Debug.
+fsck(Fname) ->
+ fsck(Fname, default).
+
+fsck(Fname, Version) ->
+ catch begin
+ {ok, Fd, FH} = read_file_header(Fname, read, false),
+ ?DEBUGF("FileHeader: ~p~n", [FH]),
+ case (FH#fileheader.mod):check_file_header(FH, Fd) of
+ {error, not_closed} ->
+ fsck(Fd, make_ref(), Fname, FH, default, default, Version);
+ {ok, _Head, _Extra} ->
+ fsck(Fd, make_ref(), Fname, FH, default, default, Version);
+ Error ->
+ Error
+ end
+ end.
+
+-spec first(tab_name()) -> term() | '$end_of_table'.
+
+first(Tab) ->
+ badarg_exit(treq(Tab, first), [Tab]).
+
+-spec foldr(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}.
+
+foldr(Fun, Acc, Tab) ->
+ foldl(Fun, Acc, Tab).
+
+-spec foldl(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}.
+
+foldl(Fun, Acc, Tab) ->
+ Ref = make_ref(),
+ do_traverse(Fun, Acc, Tab, Ref).
+
+-spec from_ets(tab_name(), ets:tab()) -> 'ok' | {'error', term()}.
+
+from_ets(DTab, ETab) ->
+ ets:safe_fixtable(ETab, true),
+ Spec = ?PATTERN_TO_OBJECT_MATCH_SPEC('_'),
+ LC = ets:select(ETab, Spec, 100),
+ InitFun = from_ets_fun(LC, ETab),
+ Reply = treq(DTab, {initialize, InitFun, term, default}),
+ ets:safe_fixtable(ETab, false),
+ case Reply of
+ {thrown, Thrown} -> throw(Thrown);
+ Else -> badarg(Else, [DTab, ETab])
+ end.
+
+from_ets_fun(LC, ETab) ->
+ fun(close) ->
+ ok;
+ (read) when LC =:= '$end_of_table' ->
+ end_of_input;
+ (read) ->
+ {L, C} = LC,
+ {L, from_ets_fun(ets:select(C), ETab)}
+ end.
+
+info(Tab) ->
+ case catch dets_server:get_pid(Tab) of
+ {'EXIT', _Reason} ->
+ undefined;
+ Pid ->
+ undefined(req(Pid, info))
+ end.
+
+info(Tab, owner) ->
+ case catch dets_server:get_pid(Tab) of
+ Pid when is_pid(Pid) ->
+ Pid;
+ _ ->
+ undefined
+ end;
+info(Tab, users) -> % undocumented
+ case dets_server:users(Tab) of
+ [] ->
+ undefined;
+ Users ->
+ Users
+ end;
+info(Tab, Tag) ->
+ case catch dets_server:get_pid(Tab) of
+ {'EXIT', _Reason} ->
+ undefined;
+ Pid ->
+ undefined(req(Pid, {info, Tag}))
+ end.
+
+init_table(Tab, InitFun) ->
+ init_table(Tab, InitFun, []).
+
+init_table(Tab, InitFun, Options) when is_function(InitFun) ->
+ case options(Options, [format, min_no_slots]) of
+ {badarg,_} ->
+ erlang:error(badarg, [Tab, InitFun, Options]);
+ [Format, MinNoSlots] ->
+ case treq(Tab, {initialize, InitFun, Format, MinNoSlots}) of
+ {thrown, Thrown} -> throw(Thrown);
+ Else -> badarg(Else, [Tab, InitFun, Options])
+ end
+ end;
+init_table(Tab, InitFun, Options) ->
+ erlang:error(badarg, [Tab, InitFun, Options]).
+
+insert(Tab, Objs) when is_list(Objs) ->
+ badarg(treq(Tab, {insert, Objs}), [Tab, Objs]);
+insert(Tab, Obj) ->
+ badarg(treq(Tab, {insert, [Obj]}), [Tab, Obj]).
+
+insert_new(Tab, Objs) when is_list(Objs) ->
+ badarg(treq(Tab, {insert_new, Objs}), [Tab, Objs]);
+insert_new(Tab, Obj) ->
+ badarg(treq(Tab, {insert_new, [Obj]}), [Tab, Obj]).
+
+internal_close(Pid) ->
+ req(Pid, close).
+
+internal_open(Pid, Ref, Args) ->
+ req(Pid, {internal_open, Ref, Args}).
+
+is_compatible_bchunk_format(Tab, Term) ->
+ badarg(treq(Tab, {is_compatible_bchunk_format, Term}), [Tab, Term]).
+
+is_dets_file(FileName) ->
+ case catch read_file_header(FileName, read, false) of
+ {ok, Fd, FH} ->
+ file:close(Fd),
+ FH#fileheader.cookie =:= ?MAGIC;
+ {error, {tooshort, _}} ->
+ false;
+ {error, {not_a_dets_file, _}} ->
+ false;
+ Other ->
+ Other
+ end.
+
+lookup(Tab, Key) ->
+ badarg(treq(Tab, {lookup_keys, [Key]}), [Tab, Key]).
+
+%% Not public.
+lookup_keys(Tab, Keys) ->
+ case catch lists:usort(Keys) of
+ UKeys when is_list(UKeys), UKeys =/= [] ->
+ badarg(treq(Tab, {lookup_keys, UKeys}), [Tab, Keys]);
+ _Else ->
+ erlang:error(badarg, [Tab, Keys])
+ end.
+
+match(Tab, Pat) ->
+ badarg(safe_match(Tab, Pat, bindings), [Tab, Pat]).
+
+match(Tab, Pat, N) ->
+ badarg(init_chunk_match(Tab, Pat, bindings, N), [Tab, Pat, N]).
+
+match(State) when State#dets_cont.what =:= bindings ->
+ badarg(chunk_match(State), [State]);
+match(Term) ->
+ erlang:error(badarg, [Term]).
+
+-spec match_delete(tab_name(), pattern()) ->
+ non_neg_integer() | 'ok' | {'error', term()}.
+
+match_delete(Tab, Pat) ->
+ badarg(match_delete(Tab, Pat, delete), [Tab, Pat]).
+
+match_delete(Tab, Pat, What) ->
+ safe_fixtable(Tab, true),
+ case compile_match_spec(What, Pat) of
+ {Spec, MP} ->
+ Proc = dets_server:get_pid(Tab),
+ R = req(Proc, {match_delete_init, MP, Spec}),
+ do_match_delete(Tab, Proc, R, What, 0);
+ badarg ->
+ badarg
+ end.
+
+do_match_delete(Tab, _Proc, {done, N1}, select, N) ->
+ safe_fixtable(Tab, false),
+ N + N1;
+do_match_delete(Tab, _Proc, {done, _N1}, _What, _N) ->
+ safe_fixtable(Tab, false),
+ ok;
+do_match_delete(Tab, Proc, {cont, State, N1}, What, N) ->
+ do_match_delete(Tab, Proc, req(Proc, {match_delete, State}), What, N+N1);
+do_match_delete(Tab, _Proc, Error, _What, _N) ->
+ safe_fixtable(Tab, false),
+ Error.
+
+match_object(Tab, Pat) ->
+ badarg(safe_match(Tab, Pat, object), [Tab, Pat]).
+
+match_object(Tab, Pat, N) ->
+ badarg(init_chunk_match(Tab, Pat, object, N), [Tab, Pat, N]).
+
+match_object(State) when State#dets_cont.what =:= object ->
+ badarg(chunk_match(State), [State]);
+match_object(Term) ->
+ erlang:error(badarg, [Term]).
+
+member(Tab, Key) ->
+ badarg(treq(Tab, {member, Key}), [Tab, Key]).
+
+next(Tab, Key) ->
+ badarg_exit(treq(Tab, {next, Key}), [Tab, Key]).
+
+%% Assuming that a file already exists, open it with the
+%% parameters as already specified in the file itself.
+%% Return a ref leading to the file.
+open_file(File) ->
+ case dets_server:open_file(to_list(File)) of
+ badarg -> % Should not happen.
+ erlang:error(dets_process_died, [File]);
+ Reply ->
+ einval(Reply, [File])
+ end.
+
+open_file(Tab, Args) when is_list(Args) ->
+ case catch defaults(Tab, Args) of
+ OpenArgs when is_record(OpenArgs, open_args) ->
+ case dets_server:open_file(Tab, OpenArgs) of
+ badarg -> % Should not happen.
+ erlang:error(dets_process_died, [Tab, Args]);
+ Reply ->
+ einval(Reply, [Tab, Args])
+ end;
+ _ ->
+ erlang:error(badarg, [Tab, Args])
+ end;
+open_file(Tab, Arg) ->
+ open_file(Tab, [Arg]).
+
+pid2name(Pid) ->
+ dets_server:pid2name(Pid).
+
+remove_user(Pid, From) ->
+ req(Pid, {close, From}).
+
+repair_continuation(#dets_cont{match_program = B}=Cont, MS)
+ when is_binary(B) ->
+ case ets:is_compiled_ms(B) of
+ true ->
+ Cont;
+ false ->
+ Cont#dets_cont{match_program = ets:match_spec_compile(MS)}
+ end;
+repair_continuation(#dets_cont{}=Cont, _MS) ->
+ Cont;
+repair_continuation(T, MS) ->
+ erlang:error(badarg, [T, MS]).
+
+safe_fixtable(Tab, Bool) when Bool; not Bool ->
+ badarg(treq(Tab, {safe_fixtable, Bool}), [Tab, Bool]);
+safe_fixtable(Tab, Term) ->
+ erlang:error(badarg, [Tab, Term]).
+
+select(Tab, Pat) ->
+ badarg(safe_match(Tab, Pat, select), [Tab, Pat]).
+
+select(Tab, Pat, N) ->
+ badarg(init_chunk_match(Tab, Pat, select, N), [Tab, Pat, N]).
+
+select(State) when State#dets_cont.what =:= select ->
+ badarg(chunk_match(State), [State]);
+select(Term) ->
+ erlang:error(badarg, [Term]).
+
+select_delete(Tab, Pat) ->
+ badarg(match_delete(Tab, Pat, select), [Tab, Pat]).
+
+slot(Tab, Slot) when is_integer(Slot), Slot >= 0 ->
+ badarg(treq(Tab, {slot, Slot}), [Tab, Slot]);
+slot(Tab, Term) ->
+ erlang:error(badarg, [Tab, Term]).
+
+start() ->
+ dets_server:start().
+
+stop() ->
+ dets_server:stop().
+
+istart_link(Server) ->
+ {ok, proc_lib:spawn_link(dets, init, [self(), Server])}.
+
+sync(Tab) ->
+ badarg(treq(Tab, sync), [Tab]).
+
+table(Tab) ->
+ table(Tab, []).
+
+table(Tab, Opts) ->
+ case options(Opts, [traverse, n_objects]) of
+ {badarg,_} ->
+ erlang:error(badarg, [Tab, Opts]);
+ [Traverse, NObjs] ->
+ TF = case Traverse of
+ first_next ->
+ fun() -> qlc_next(Tab, first(Tab)) end;
+ select ->
+ fun(MS) -> qlc_select(select(Tab, MS, NObjs)) end;
+ {select, MS} ->
+ fun() -> qlc_select(select(Tab, MS, NObjs)) end
+ end,
+ PreFun = fun(_) -> safe_fixtable(Tab, true) end,
+ PostFun = fun() -> safe_fixtable(Tab, false) end,
+ InfoFun = fun(Tag) -> table_info(Tab, Tag) end,
+ %% lookup_keys is not public, but convenient
+ LookupFun =
+ case Traverse of
+ {select, _MS} ->
+ undefined;
+ _ ->
+ fun(_KeyPos, [K]) -> lookup(Tab, K);
+ (_KeyPos, Ks) -> lookup_keys(Tab, Ks)
+ end
+ end,
+ FormatFun =
+ fun({all, _NElements, _ElementFun}) ->
+ As = [Tab | [Opts || _ <- [[]], Opts =/= []]],
+ {?MODULE, table, As};
+ ({match_spec, MS}) ->
+ {?MODULE, table, [Tab, [{traverse, {select, MS}} |
+ listify(Opts)]]};
+ ({lookup, _KeyPos, [Value], _NElements, ElementFun}) ->
+ io_lib:format("~w:lookup(~w, ~w)",
+ [?MODULE, Tab, ElementFun(Value)]);
+ ({lookup, _KeyPos, Values, _NElements, ElementFun}) ->
+ Vals = [ElementFun(V) || V <- Values],
+ io_lib:format("lists:flatmap(fun(V) -> "
+ "~w:lookup(~w, V) end, ~w)",
+ [?MODULE, Tab, Vals])
+ end,
+ qlc:table(TF, [{pre_fun, PreFun}, {post_fun, PostFun},
+ {info_fun, InfoFun}, {format_fun, FormatFun},
+ {key_equality, '=:='},
+ {lookup_fun, LookupFun}])
+ end.
+
+qlc_next(_Tab, '$end_of_table') ->
+ [];
+qlc_next(Tab, Key) ->
+ case lookup(Tab, Key) of
+ Objects when is_list(Objects) ->
+ Objects ++ fun() -> qlc_next(Tab, next(Tab, Key)) end;
+ Error ->
+ %% Do what first and next do.
+ exit(Error)
+ end.
+
+qlc_select('$end_of_table') ->
+ [];
+qlc_select({Objects, Cont}) when is_list(Objects) ->
+ Objects ++ fun() -> qlc_select(select(Cont)) end;
+qlc_select(Error) ->
+ Error.
+
+table_info(Tab, num_of_objects) ->
+ info(Tab, size);
+table_info(Tab, keypos) ->
+ info(Tab, keypos);
+table_info(Tab, is_unique_objects) ->
+ info(Tab, type) =/= duplicate_bag;
+table_info(_Tab, _) ->
+ undefined.
+
+%% End of table/2.
+
+to_ets(DTab, ETab) ->
+ case ets:info(ETab, protection) of
+ undefined ->
+ erlang:error(badarg, [DTab, ETab]);
+ _ ->
+ Fun = fun(X, T) -> true = ets:insert(T, X), T end,
+ foldl(Fun, ETab, DTab)
+ end.
+
+traverse(Tab, Fun) ->
+ Ref = make_ref(),
+ TFun =
+ fun(O, Acc) ->
+ case Fun(O) of
+ continue ->
+ Acc;
+ {continue, Val} ->
+ [Val | Acc];
+ {done, Value} ->
+ throw({Ref, [Value | Acc]});
+ Other ->
+ throw({Ref, Other})
+ end
+ end,
+ do_traverse(TFun, [], Tab, Ref).
+
+update_counter(Tab, Key, C) ->
+ badarg(treq(Tab, {update_counter, Key, C}), [Tab, Key, C]).
+
+verbose() ->
+ verbose(true).
+
+verbose(What) ->
+ ok = dets_server:verbose(What),
+ All = dets_server:all(),
+ Fun = fun(Tab) -> treq(Tab, {set_verbose, What}) end,
+ lists:foreach(Fun, All),
+ All.
+
+%% Where in the (open) table is Object located?
+%% The address of the first matching object is returned.
+%% Format 9 returns the address of the object collection.
+%% -> {ok, Address} | false
+where(Tab, Object) ->
+ badarg(treq(Tab, {where, Object}), [Tab, Object]).
+
+do_traverse(Fun, Acc, Tab, Ref) ->
+ safe_fixtable(Tab, true),
+ Proc = dets_server:get_pid(Tab),
+ try
+ do_trav(Proc, Acc, Fun)
+ catch {Ref, Result} ->
+ Result
+ after
+ safe_fixtable(Tab, false)
+ end.
+
+do_trav(Proc, Acc, Fun) ->
+ {Spec, MP} = compile_match_spec(object, '_'),
+ %% MP not used
+ case req(Proc, {match, MP, Spec, default}) of
+ {cont, State} ->
+ do_trav(State, Proc, Acc, Fun);
+ Error ->
+ Error
+ end.
+
+do_trav(#dets_cont{bin = eof}, _Proc, Acc, _Fun) ->
+ Acc;
+do_trav(State, Proc, Acc, Fun) ->
+ case req(Proc, {match_init, State}) of
+ {cont, {Bins, NewState}} ->
+ do_trav_bins(NewState, Proc, Acc, Fun, lists:reverse(Bins));
+ Error ->
+ Error
+ end.
+
+do_trav_bins(State, Proc, Acc, Fun, []) ->
+ do_trav(State, Proc, Acc, Fun);
+do_trav_bins(State, Proc, Acc, Fun, [Bin | Bins]) ->
+ %% Unpack one binary at a time, using the client's heap.
+ case catch binary_to_term(Bin) of
+ {'EXIT', _} ->
+ req(Proc, {corrupt, dets_utils:bad_object(do_trav_bins, Bin)});
+ Term ->
+ NewAcc = Fun(Term, Acc),
+ do_trav_bins(State, Proc, NewAcc, Fun, Bins)
+ end.
+
+safe_match(Tab, Pat, What) ->
+ safe_fixtable(Tab, true),
+ R = do_safe_match(init_chunk_match(Tab, Pat, What, default), []),
+ safe_fixtable(Tab, false),
+ R.
+
+do_safe_match({error, Error}, _L) ->
+ {error, Error};
+do_safe_match({L, C}, LL) ->
+ do_safe_match(chunk_match(C), L++LL);
+do_safe_match('$end_of_table', L) ->
+ L;
+do_safe_match(badarg, _L) ->
+ badarg.
+
+%% What = object | bindings | select
+init_chunk_match(Tab, Pat, What, N) when is_integer(N), N >= 0;
+ N =:= default ->
+ case compile_match_spec(What, Pat) of
+ {Spec, MP} ->
+ case req(dets_server:get_pid(Tab), {match, MP, Spec, N}) of
+ {done, L} ->
+ {L, #dets_cont{tab = Tab, what = What, bin = eof}};
+ {cont, State} ->
+ chunk_match(State#dets_cont{what = What, tab = Tab});
+ Error ->
+ Error
+ end;
+ badarg ->
+ badarg
+ end;
+init_chunk_match(_Tab, _Pat, _What, _) ->
+ badarg.
+
+chunk_match(State) ->
+ case catch dets_server:get_pid(State#dets_cont.tab) of
+ {'EXIT', _Reason} ->
+ badarg;
+ _Proc when State#dets_cont.bin =:= eof ->
+ '$end_of_table';
+ Proc ->
+ case req(Proc, {match_init, State}) of
+ {cont, {Bins, NewState}} ->
+ MP = NewState#dets_cont.match_program,
+ case catch do_foldl_bins(Bins, MP) of
+ {'EXIT', _} ->
+ case ets:is_compiled_ms(MP) of
+ true ->
+ Bad = dets_utils:bad_object(chunk_match,
+ Bins),
+ req(Proc, {corrupt, Bad});
+ false ->
+ badarg
+ end;
+ [] ->
+ chunk_match(NewState);
+ Terms ->
+ {Terms, NewState}
+ end;
+ Error ->
+ Error
+ end
+ end.
+
+do_foldl_bins(Bins, true) ->
+ foldl_bins(Bins, []);
+do_foldl_bins(Bins, MP) ->
+ foldl_bins(Bins, MP, []).
+
+foldl_bins([], Terms) ->
+ %% Preserve time order (version 9).
+ Terms;
+foldl_bins([Bin | Bins], Terms) ->
+ foldl_bins(Bins, [binary_to_term(Bin) | Terms]).
+
+foldl_bins([], _MP, Terms) ->
+ %% Preserve time order (version 9).
+ Terms;
+foldl_bins([Bin | Bins], MP, Terms) ->
+ Term = binary_to_term(Bin),
+ case ets:match_spec_run([Term], MP) of
+ [] ->
+ foldl_bins(Bins, MP, Terms);
+ [Result] ->
+ foldl_bins(Bins, MP, [Result | Terms])
+ end.
+
+%% -> {Spec, binary()} | badarg
+compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC('_') = Spec) ->
+ {Spec, true};
+compile_match_spec(select, Spec) ->
+ case catch ets:match_spec_compile(Spec) of
+ X when is_binary(X) ->
+ {Spec, X};
+ _ ->
+ badarg
+ end;
+compile_match_spec(object, Pat) ->
+ compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC(Pat));
+compile_match_spec(bindings, Pat) ->
+ compile_match_spec(select, ?PATTERN_TO_BINDINGS_MATCH_SPEC(Pat));
+compile_match_spec(delete, Pat) ->
+ compile_match_spec(select, ?PATTERN_TO_TRUE_MATCH_SPEC(Pat)).
+
+%% Process the args list as provided to open_file/2.
+defaults(Tab, Args) ->
+ Defaults0 = #open_args{file = to_list(Tab),
+ type = set,
+ keypos = 1,
+ repair = true,
+ min_no_slots = default,
+ max_no_slots = default,
+ ram_file = false,
+ delayed_write = ?DEFAULT_CACHE,
+ auto_save = timer:minutes(?DEFAULT_AUTOSAVE),
+ access = read_write,
+ version = default,
+ debug = false},
+ Fun = fun repl/2,
+ Defaults = lists:foldl(Fun, Defaults0, Args),
+ case Defaults#open_args.version of
+ 8 ->
+ Defaults#open_args{max_no_slots = default};
+ _ ->
+ is_comp_min_max(Defaults)
+ end.
+
+to_list(T) when is_atom(T) -> atom_to_list(T);
+to_list(T) -> T.
+
+repl({access, A}, Defs) ->
+ mem(A, [read, read_write]),
+ Defs#open_args{access = A};
+repl({auto_save, Int}, Defs) when is_integer(Int), Int >= 0 ->
+ Defs#open_args{auto_save = Int};
+repl({auto_save, infinity}, Defs) ->
+ Defs#open_args{auto_save =infinity};
+repl({cache_size, Int}, Defs) when is_integer(Int), Int >= 0 ->
+ %% Recognized, but ignored.
+ Defs;
+repl({cache_size, infinity}, Defs) ->
+ Defs;
+repl({delayed_write, default}, Defs) ->
+ Defs#open_args{delayed_write = ?DEFAULT_CACHE};
+repl({delayed_write, {Delay,Size} = C}, Defs)
+ when is_integer(Delay), Delay >= 0, is_integer(Size), Size >= 0 ->
+ Defs#open_args{delayed_write = C};
+repl({estimated_no_objects, I}, Defs) ->
+ repl({min_no_slots, I}, Defs);
+repl({file, File}, Defs) ->
+ Defs#open_args{file = to_list(File)};
+repl({keypos, P}, Defs) when is_integer(P), P > 0 ->
+ Defs#open_args{keypos =P};
+repl({max_no_slots, I}, Defs) ->
+ %% Version 9 only.
+ MaxSlots = is_max_no_slots(I),
+ Defs#open_args{max_no_slots = MaxSlots};
+repl({min_no_slots, I}, Defs) ->
+ MinSlots = is_min_no_slots(I),
+ Defs#open_args{min_no_slots = MinSlots};
+repl({ram_file, Bool}, Defs) ->
+ mem(Bool, [true, false]),
+ Defs#open_args{ram_file = Bool};
+repl({repair, T}, Defs) ->
+ mem(T, [true, false, force]),
+ Defs#open_args{repair = T};
+repl({type, T}, Defs) ->
+ mem(T, [set, bag, duplicate_bag]),
+ Defs#open_args{type =T};
+repl({version, Version}, Defs) ->
+ V = is_version(Version),
+ Defs#open_args{version = V};
+repl({debug, Bool}, Defs) ->
+ %% Not documented.
+ mem(Bool, [true, false]),
+ Defs#open_args{debug = Bool};
+repl({_, _}, _) ->
+ exit(badarg).
+
+is_min_no_slots(default) -> default;
+is_min_no_slots(I) when is_integer(I), I >= ?DEFAULT_MIN_NO_SLOTS -> I;
+is_min_no_slots(I) when is_integer(I), I >= 0 -> ?DEFAULT_MIN_NO_SLOTS.
+
+is_max_no_slots(default) -> default;
+is_max_no_slots(I) when is_integer(I), I > 0, I < 1 bsl 31 -> I.
+
+is_comp_min_max(Defs) ->
+ #open_args{max_no_slots = Max, min_no_slots = Min, version = V} = Defs,
+ case V of
+ _ when Min =:= default -> Defs;
+ _ when Max =:= default -> Defs;
+ _ -> true = Min =< Max, Defs
+ end.
+
+is_version(default) -> default;
+is_version(8) -> 8;
+is_version(9) -> 9.
+
+mem(X, L) ->
+ case lists:member(X, L) of
+ true -> true;
+ false -> exit(badarg)
+ end.
+
+options(Options, Keys) when is_list(Options) ->
+ options(Options, Keys, []);
+options(Option, Keys) ->
+ options([Option], Keys, []).
+
+options(Options, [Key | Keys], L) when is_list(Options) ->
+ V = case lists:keysearch(Key, 1, Options) of
+ {value, {format, Format}} when Format =:= term;
+ Format =:= bchunk ->
+ {ok, Format};
+ {value, {min_no_slots, I}} ->
+ case catch is_min_no_slots(I) of
+ {'EXIT', _} -> badarg;
+ MinNoSlots -> {ok, MinNoSlots}
+ end;
+ {value, {n_objects, default}} ->
+ {ok, default_option(Key)};
+ {value, {n_objects, NObjs}} when is_integer(NObjs),
+ NObjs >= 1 ->
+ {ok, NObjs};
+ {value, {traverse, select}} ->
+ {ok, select};
+ {value, {traverse, {select, MS}}} ->
+ {ok, {select, MS}};
+ {value, {traverse, first_next}} ->
+ {ok, first_next};
+ {value, {Key, _}} ->
+ badarg;
+ false ->
+ Default = default_option(Key),
+ {ok, Default}
+ end,
+ case V of
+ badarg ->
+ {badarg, Key};
+ {ok, Value} ->
+ NewOptions = lists:keydelete(Key, 1, Options),
+ options(NewOptions, Keys, [Value | L])
+ end;
+options([], [], L) ->
+ lists:reverse(L);
+options(Options, _, _L) ->
+ {badarg,Options}.
+
+default_option(format) -> term;
+default_option(min_no_slots) -> default;
+default_option(traverse) -> select;
+default_option(n_objects) -> default.
+
+listify(L) when is_list(L) ->
+ L;
+listify(T) ->
+ [T].
+
+treq(Tab, R) ->
+ case catch dets_server:get_pid(Tab) of
+ Pid when is_pid(Pid) ->
+ req(Pid, R);
+ _ ->
+ badarg
+ end.
+
+req(Proc, R) ->
+ Ref = erlang:monitor(process, Proc),
+ Proc ! ?DETS_CALL(self(), R),
+ receive
+ {'DOWN', Ref, process, Proc, _Info} ->
+ badarg;
+ {Proc, Reply} ->
+ erlang:demonitor(Ref),
+ receive
+ {'DOWN', Ref, process, Proc, _Reason} ->
+ Reply
+ after 0 ->
+ Reply
+ end
+ end.
+
+%% Inlined.
+einval({error, {file_error, _, einval}}, A) ->
+ erlang:error(badarg, A);
+einval({error, {file_error, _, badarg}}, A) ->
+ erlang:error(badarg, A);
+einval(Reply, _A) ->
+ Reply.
+
+%% Inlined.
+badarg(badarg, A) ->
+ erlang:error(badarg, A);
+badarg(Reply, _A) ->
+ Reply.
+
+%% Inlined.
+undefined(badarg) ->
+ undefined;
+undefined(Reply) ->
+ Reply.
+
+%% Inlined.
+badarg_exit(badarg, A) ->
+ erlang:error(badarg, A);
+badarg_exit({ok, Reply}, _A) ->
+ Reply;
+badarg_exit(Reply, _A) ->
+ exit(Reply).
+
+%%%-----------------------------------------------------------------
+%%% Server functions
+%%%-----------------------------------------------------------------
+
+init(Parent, Server) ->
+ process_flag(trap_exit, true),
+ open_file_loop(#head{parent = Parent, server = Server}).
+
+open_file_loop(Head) ->
+ open_file_loop(Head, 0).
+
+open_file_loop(Head, N) when element(1, Head#head.update_mode) =:= error ->
+ open_file_loop2(Head, N);
+open_file_loop(Head, N) ->
+ receive
+ %% When the table is fixed it can be assumed that at least one
+ %% traversal is in progress. To speed the traversal up three
+ %% things have been done:
+ %% - prioritize match_init, bchunk, next, and match_delete_init;
+ %% - do not peek the message queue for updates;
+ %% - wait 1 ms after each update.
+ %% next is normally followed by lookup, but since lookup is also
+ %% used when not traversing the table, it is not prioritized.
+ ?DETS_CALL(From, {match_init, _State} = Op) ->
+ do_apply_op(Op, From, Head, N);
+ ?DETS_CALL(From, {bchunk, _State} = Op) ->
+ do_apply_op(Op, From, Head, N);
+ ?DETS_CALL(From, {next, _Key} = Op) ->
+ do_apply_op(Op, From, Head, N);
+ ?DETS_CALL(From, {match_delete_init, _MP, _Spec} = Op) ->
+ do_apply_op(Op, From, Head, N);
+ {'EXIT', Pid, Reason} when Pid =:= Head#head.parent ->
+ %% Parent orders shutdown.
+ _NewHead = do_stop(Head),
+ exit(Reason);
+ {'EXIT', Pid, Reason} when Pid =:= Head#head.server ->
+ %% The server is gone.
+ _NewHead = do_stop(Head),
+ exit(Reason);
+ {'EXIT', Pid, _Reason} ->
+ %% A process fixing the table exits.
+ H2 = remove_fix(Head, Pid, close),
+ open_file_loop(H2, N);
+ {system, From, Req} ->
+ sys:handle_system_msg(Req, From, Head#head.parent,
+ ?MODULE, [], Head)
+ after 0 ->
+ open_file_loop2(Head, N)
+ end.
+
+open_file_loop2(Head, N) ->
+ receive
+ ?DETS_CALL(From, Op) ->
+ do_apply_op(Op, From, Head, N);
+ {'EXIT', Pid, Reason} when Pid =:= Head#head.parent ->
+ %% Parent orders shutdown.
+ _NewHead = do_stop(Head),
+ exit(Reason);
+ {'EXIT', Pid, Reason} when Pid =:= Head#head.server ->
+ %% The server is gone.
+ _NewHead = do_stop(Head),
+ exit(Reason);
+ {'EXIT', Pid, _Reason} ->
+ %% A process fixing the table exits.
+ H2 = remove_fix(Head, Pid, close),
+ open_file_loop(H2, N);
+ {system, From, Req} ->
+ sys:handle_system_msg(Req, From, Head#head.parent,
+ ?MODULE, [], Head);
+ Message ->
+ error_logger:format("** dets: unexpected message"
+ "(ignored): ~w~n", [Message]),
+ open_file_loop(Head, N)
+ end.
+
+do_apply_op(Op, From, Head, N) ->
+ try apply_op(Op, From, Head, N) of
+ ok ->
+ open_file_loop(Head, N);
+ {N2, H2} when is_record(H2, head), is_integer(N2) ->
+ open_file_loop(H2, N2);
+ H2 when is_record(H2, head) ->
+ open_file_loop(H2, N)
+ catch
+ exit:normal ->
+ exit(normal);
+ _:Bad ->
+ Name = Head#head.name,
+ case dets_utils:debug_mode() of
+ true ->
+ %% If stream_op/5 found more requests, this is not
+ %% the last operation.
+ error_logger:format
+ ("** dets: Bug was found when accessing table ~w,~n"
+ "** dets: operation was ~p and reply was ~w.~n"
+ "** dets: Stacktrace: ~w~n",
+ [Name, Op, Bad, erlang:get_stacktrace()]);
+ false ->
+ error_logger:format
+ ("** dets: Bug was found when accessing table ~w~n",
+ [Name])
+ end,
+ if
+ From =/= self() ->
+ From ! {self(), {error, {dets_bug, Name, Op, Bad}}};
+ true -> % auto_save | may_grow | {delayed_write, _}
+ ok
+ end,
+ open_file_loop(Head, N)
+ end.
+
+apply_op(Op, From, Head, N) ->
+ case Op of
+ {add_user, Tab, OpenArgs}->
+ #open_args{file = Fname, type = Type, keypos = Keypos,
+ ram_file = Ram, access = Access,
+ version = Version} = OpenArgs,
+ VersionOK = (Version =:= default) or
+ (Head#head.version =:= Version),
+ %% min_no_slots and max_no_slots are not tested
+ Res = if
+ Tab =:= Head#head.name,
+ Head#head.keypos =:= Keypos,
+ Head#head.type =:= Type,
+ Head#head.ram_file =:= Ram,
+ Head#head.access =:= Access,
+ VersionOK,
+ Fname =:= Head#head.filename ->
+ ok;
+ true ->
+ err({error, incompatible_arguments})
+ end,
+ From ! {self(), Res},
+ ok;
+ auto_save ->
+ case Head#head.update_mode of
+ saved ->
+ Head;
+ {error, _Reason} ->
+ Head;
+ _Dirty when N =:= 0 -> % dirty or new_dirty
+ %% The updates seems to have declined
+ dets_utils:vformat("** dets: Auto save of ~p\n",
+ [Head#head.name]),
+ {NewHead, _Res} = perform_save(Head, true),
+ erlang:garbage_collect(),
+ {0, NewHead};
+ dirty ->
+ %% Reset counter and try later
+ start_auto_save_timer(Head),
+ {0, Head}
+ end;
+ close ->
+ From ! {self(), fclose(Head)},
+ _NewHead = unlink_fixing_procs(Head),
+ ?PROFILE(ep:done()),
+ exit(normal);
+ {close, Pid} ->
+ %% Used from dets_server when Pid has closed the table,
+ %% but the table is still opened by some process.
+ NewHead = remove_fix(Head, Pid, close),
+ From ! {self(), status(NewHead)},
+ NewHead;
+ {corrupt, Reason} ->
+ {H2, Error} = dets_utils:corrupt_reason(Head, Reason),
+ From ! {self(), Error},
+ H2;
+ {delayed_write, WrTime} ->
+ delayed_write(Head, WrTime);
+ info ->
+ {H2, Res} = finfo(Head),
+ From ! {self(), Res},
+ H2;
+ {info, Tag} ->
+ {H2, Res} = finfo(Head, Tag),
+ From ! {self(), Res},
+ H2;
+ {is_compatible_bchunk_format, Term} ->
+ Res = test_bchunk_format(Head, Term),
+ From ! {self(), Res},
+ ok;
+ {internal_open, Ref, Args} ->
+ ?PROFILE(ep:do()),
+ case do_open_file(Args, Head#head.parent, Head#head.server,Ref) of
+ {ok, H2} ->
+ From ! {self(), ok},
+ H2;
+ Error ->
+ From ! {self(), Error},
+ exit(normal)
+ end;
+ may_grow when Head#head.update_mode =/= saved ->
+ if
+ Head#head.update_mode =:= dirty ->
+ %% Won't grow more if the table is full.
+ {H2, _Res} =
+ (Head#head.mod):may_grow(Head, 0, many_times),
+ {N + 1, H2};
+ true ->
+ ok
+ end;
+ {set_verbose, What} ->
+ set_verbose(What),
+ From ! {self(), ok},
+ ok;
+ {where, Object} ->
+ {H2, Res} = where_is_object(Head, Object),
+ From ! {self(), Res},
+ H2;
+ _Message when element(1, Head#head.update_mode) =:= error ->
+ From ! {self(), status(Head)},
+ ok;
+ %% The following messages assume that the status of the table is OK.
+ {bchunk_init, Tab} ->
+ {H2, Res} = do_bchunk_init(Head, Tab),
+ From ! {self(), Res},
+ H2;
+ {bchunk, State} ->
+ {H2, Res} = do_bchunk(Head, State),
+ From ! {self(), Res},
+ H2;
+ delete_all_objects ->
+ {H2, Res} = fdelete_all_objects(Head),
+ From ! {self(), Res},
+ erlang:garbage_collect(),
+ {0, H2};
+ {delete_key, Keys} when Head#head.update_mode =:= dirty ->
+ if
+ Head#head.version =:= 8 ->
+ {H2, Res} = fdelete_key(Head, Keys),
+ From ! {self(), Res},
+ {N + 1, H2};
+ true ->
+ stream_op(Op, From, [], Head, N)
+ end;
+ {delete_object, Objs} when Head#head.update_mode =:= dirty ->
+ case check_objects(Objs, Head#head.keypos) of
+ true when Head#head.version =:= 8 ->
+ {H2, Res} = fdelete_object(Head, Objs),
+ From ! {self(), Res},
+ {N + 1, H2};
+ true ->
+ stream_op(Op, From, [], Head, N);
+ false ->
+ From ! {self(), badarg},
+ ok
+ end;
+ first ->
+ {H2, Res} = ffirst(Head),
+ From ! {self(), Res},
+ H2;
+ {initialize, InitFun, Format, MinNoSlots} ->
+ {H2, Res} = finit(Head, InitFun, Format, MinNoSlots),
+ From ! {self(), Res},
+ erlang:garbage_collect(),
+ H2;
+ {insert, Objs} when Head#head.update_mode =:= dirty ->
+ case check_objects(Objs, Head#head.keypos) of
+ true when Head#head.version =:= 8 ->
+ {H2, Res} = finsert(Head, Objs),
+ From ! {self(), Res},
+ {N + 1, H2};
+ true ->
+ stream_op(Op, From, [], Head, N);
+ false ->
+ From ! {self(), badarg},
+ ok
+ end;
+ {insert_new, Objs} when Head#head.update_mode =:= dirty ->
+ {H2, Res} = finsert_new(Head, Objs),
+ From ! {self(), Res},
+ {N + 1, H2};
+ {lookup_keys, Keys} when Head#head.version =:= 8 ->
+ {H2, Res} = flookup_keys(Head, Keys),
+ From ! {self(), Res},
+ H2;
+ {lookup_keys, _Keys} ->
+ stream_op(Op, From, [], Head, N);
+ {match_init, State} ->
+ {H2, Res} = fmatch_init(Head, State),
+ From ! {self(), Res},
+ H2;
+ {match, MP, Spec, NObjs} ->
+ {H2, Res} = fmatch(Head, MP, Spec, NObjs),
+ From ! {self(), Res},
+ H2;
+ {member, Key} when Head#head.version =:= 8 ->
+ {H2, Res} = fmember(Head, Key),
+ From ! {self(), Res},
+ H2;
+ {member, _Key} = Op ->
+ stream_op(Op, From, [], Head, N);
+ {next, Key} ->
+ {H2, Res} = fnext(Head, Key),
+ From ! {self(), Res},
+ H2;
+ {match_delete, State} when Head#head.update_mode =:= dirty ->
+ {H2, Res} = fmatch_delete(Head, State),
+ From ! {self(), Res},
+ {N + 1, H2};
+ {match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty ->
+ {H2, Res} = fmatch_delete_init(Head, MP, Spec),
+ From ! {self(), Res},
+ {N + 1, H2};
+ {safe_fixtable, Bool} ->
+ NewHead = do_safe_fixtable(Head, From, Bool),
+ From ! {self(), ok},
+ NewHead;
+ {slot, Slot} ->
+ {H2, Res} = fslot(Head, Slot),
+ From ! {self(), Res},
+ H2;
+ sync ->
+ {NewHead, Res} = perform_save(Head, true),
+ From ! {self(), Res},
+ erlang:garbage_collect(),
+ {0, NewHead};
+ {update_counter, Key, Incr} when Head#head.update_mode =:= dirty ->
+ {NewHead, Res} = do_update_counter(Head, Key, Incr),
+ From ! {self(), Res},
+ {N + 1, NewHead};
+ WriteOp when Head#head.update_mode =:= new_dirty ->
+ H2 = Head#head{update_mode = dirty},
+ apply_op(WriteOp, From, H2, 0);
+ WriteOp when Head#head.access =:= read_write,
+ Head#head.update_mode =:= saved ->
+ case catch (Head#head.mod):mark_dirty(Head) of
+ ok ->
+ start_auto_save_timer(Head),
+ H2 = Head#head{update_mode = dirty},
+ apply_op(WriteOp, From, H2, 0);
+ {NewHead, Error} when is_record(NewHead, head) ->
+ From ! {self(), Error},
+ NewHead
+ end;
+ WriteOp when is_tuple(WriteOp), Head#head.access =:= read ->
+ Reason = {access_mode, Head#head.filename},
+ From ! {self(), err({error, Reason})},
+ ok
+ end.
+
+start_auto_save_timer(Head) when Head#head.auto_save =:= infinity ->
+ ok;
+start_auto_save_timer(Head) ->
+ Millis = Head#head.auto_save,
+ erlang:send_after(Millis, self(), ?DETS_CALL(self(), auto_save)).
+
+%% Version 9: Peek the message queue and try to evaluate several
+%% lookup requests in parallel. Evalute delete_object, delete and
+%% insert as well.
+stream_op(Op, Pid, Pids, Head, N) ->
+ stream_op(Head, Pids, [], N, Pid, Op, Head#head.fixed).
+
+stream_loop(Head, Pids, C, N, false = Fxd) ->
+ receive
+ ?DETS_CALL(From, Message) ->
+ stream_op(Head, Pids, C, N, From, Message, Fxd)
+ after 0 ->
+ stream_end(Head, Pids, C, N, no_more)
+ end;
+stream_loop(Head, Pids, C, N, _Fxd) ->
+ stream_end(Head, Pids, C, N, no_more).
+
+stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd) ->
+ NC = [{{lookup,Pid},Keys} | C],
+ stream_loop(Head, Pids, NC, N, Fxd);
+stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd) ->
+ NC = [Op | C],
+ stream_loop(Head, [Pid | Pids], NC, N, Fxd);
+stream_op(Head, Pids, C, N, Pid, {insert_new, _Objects} = Op, Fxd) ->
+ NC = [Op | C],
+ stream_loop(Head, [Pid | Pids], NC, N, Fxd);
+stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd) ->
+ NC = [Op | C],
+ stream_loop(Head, [Pid | Pids], NC, N, Fxd);
+stream_op(Head, Pids, C, N, Pid, {delete_object, _Objects} = Op, Fxd) ->
+ NC = [Op | C],
+ stream_loop(Head, [Pid | Pids], NC, N, Fxd);
+stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd) ->
+ NC = [{{lookup,[Pid]},[Key]} | C],
+ stream_loop(Head, Pids, NC, N, Fxd);
+stream_op(Head, Pids, C, N, Pid, Op, _Fxd) ->
+ stream_end(Head, Pids, C, N, {Pid,Op}).
+
+stream_end(Head, Pids0, C, N, Next) ->
+ case catch update_cache(Head, lists:reverse(C)) of
+ {Head1, [], PwriteList} ->
+ stream_end1(Pids0, Next, N, C, Head1, PwriteList);
+ {Head1, Found, PwriteList} ->
+ %% Possibly an optimization: reply to lookup requests
+ %% first, then write stuff. This makes it possible for
+ %% clients to continue while the disk is accessed.
+ %% (Replies to lookup requests are sent earlier than
+ %% replies to delete and insert requests even if the
+ %% latter requests were made before the lookup requests,
+ %% which can be confusing.)
+ lookup_replies(Found),
+ stream_end1(Pids0, Next, N, C, Head1, PwriteList);
+ Head1 when is_record(Head1, head) ->
+ stream_end2(Pids0, Pids0, Next, N, C, Head1, ok);
+ {Head1, Error} when is_record(Head1, head) ->
+ %% Dig out the processes that did lookup or member.
+ Fun = fun({{lookup,[Pid]},_Keys}, L) -> [Pid | L];
+ ({{lookup,Pid},_Keys}, L) -> [Pid | L];
+ (_, L) -> L
+ end,
+ LPs0 = lists:foldl(Fun, [], C),
+ LPs = lists:usort(lists:flatten(LPs0)),
+ stream_end2(Pids0 ++ LPs, Pids0, Next, N, C, Head1, Error);
+ DetsError ->
+ throw(DetsError)
+ end.
+
+stream_end1(Pids, Next, N, C, Head, []) ->
+ stream_end2(Pids, Pids, Next, N, C, Head, ok);
+stream_end1(Pids, Next, N, C, Head, PwriteList) ->
+ {Head1, PR} = (catch dets_utils:pwrite(Head, PwriteList)),
+ stream_end2(Pids, Pids, Next, N, C, Head1, PR).
+
+stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) ->
+ Pid ! {self(), Reply},
+ stream_end2(Pids, Ps, Next, N+1, C, Head, Reply);
+stream_end2([], Ps, no_more, N, C, Head, _Reply) ->
+ penalty(Head, Ps, C),
+ {N, Head};
+stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) ->
+ apply_op(Op, From, Head, N).
+
+penalty(H, _Ps, _C) when H#head.fixed =:= false ->
+ ok;
+penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) ->
+ ok;
+penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) ->
+ ok;
+penalty(_H, _Ps, _C) ->
+ timer:sleep(1).
+
+lookup_replies([{P,O}]) ->
+ lookup_reply(P, O);
+lookup_replies(Q) ->
+ [{P,O} | L] = dets_utils:family(Q),
+ lookup_replies(P, lists:append(O), L).
+
+lookup_replies(P, O, []) ->
+ lookup_reply(P, O);
+lookup_replies(P, O, [{P2,O2} | L]) ->
+ lookup_reply(P, O),
+ lookup_replies(P2, lists:append(O2), L).
+
+%% If a list of Pid then op was {member, Key}. Inlined.
+lookup_reply([P], O) ->
+ P ! {self(), O =/= []};
+lookup_reply(P, O) ->
+ P ! {self(), O}.
+
+%%-----------------------------------------------------------------
+%% Callback functions for system messages handling.
+%%-----------------------------------------------------------------
+system_continue(_Parent, _, Head) ->
+ open_file_loop(Head).
+
+system_terminate(Reason, _Parent, _, Head) ->
+ _NewHead = do_stop(Head),
+ exit(Reason).
+
+%%-----------------------------------------------------------------
+%% Code for upgrade.
+%%-----------------------------------------------------------------
+system_code_change(State, _Module, _OldVsn, _Extra) ->
+ {ok, State}.
+
+
+%%%----------------------------------------------------------------------
+%%% Internal functions
+%%%----------------------------------------------------------------------
+
+constants(FH, FileName) ->
+ Version = FH#fileheader.version,
+ if
+ Version =< 8 ->
+ dets_v8:constants();
+ Version =:= 9 ->
+ dets_v9:constants();
+ true ->
+ throw({error, {not_a_dets_file, FileName}})
+ end.
+
+%% -> {ok, Fd, fileheader()} | throw(Error)
+read_file_header(FileName, Access, RamFile) ->
+ BF = if
+ RamFile ->
+ case file:read_file(FileName) of
+ {ok, B} -> B;
+ Err -> dets_utils:file_error(FileName, Err)
+ end;
+ true ->
+ FileName
+ end,
+ {ok, Fd} = dets_utils:open(BF, open_args(Access, RamFile)),
+ {ok, <<Version:32>>} =
+ dets_utils:pread_close(Fd, FileName, ?FILE_FORMAT_VERSION_POS, 4),
+ if
+ Version =< 8 ->
+ dets_v8:read_file_header(Fd, FileName);
+ Version =:= 9 ->
+ dets_v9:read_file_header(Fd, FileName);
+ true ->
+ throw({error, {not_a_dets_file, FileName}})
+ end.
+
+fclose(Head) ->
+ {Head1, Res} = perform_save(Head, false),
+ case Head1#head.ram_file of
+ true ->
+ ignore;
+ false ->
+ dets_utils:stop_disk_map(),
+ file:close(Head1#head.fptr)
+ end,
+ Res.
+
+%% -> {NewHead, Res}
+perform_save(Head, DoSync) when Head#head.update_mode =:= dirty;
+ Head#head.update_mode =:= new_dirty ->
+ case catch begin
+ {Head1, []} = write_cache(Head),
+ {Head2, ok} = (Head1#head.mod):do_perform_save(Head1),
+ ok = ensure_written(Head2, DoSync),
+ {Head2#head{update_mode = saved}, ok}
+ end of
+ {NewHead, _} = Reply when is_record(NewHead, head) ->
+ Reply
+ end;
+perform_save(Head, _DoSync) ->
+ {Head, status(Head)}.
+
+ensure_written(Head, DoSync) when Head#head.ram_file ->
+ {ok, EOF} = dets_utils:position(Head, eof),
+ {ok, Bin} = dets_utils:pread(Head, 0, EOF, 0),
+ if
+ DoSync ->
+ dets_utils:write_file(Head, Bin);
+ not DoSync ->
+ case file:write_file(Head#head.filename, Bin) of
+ ok ->
+ ok;
+ Error ->
+ dets_utils:corrupt_file(Head, Error)
+ end
+ end;
+ensure_written(Head, true) when not Head#head.ram_file ->
+ dets_utils:sync(Head);
+ensure_written(Head, false) when not Head#head.ram_file ->
+ ok.
+
+%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error}
+do_bchunk_init(Head, Tab) ->
+ case catch write_cache(Head) of
+ {H2, []} ->
+ case (H2#head.mod):table_parameters(H2) of
+ undefined ->
+ {H2, {error, old_version}};
+ Parms ->
+ L = dets_utils:all_allocated(H2),
+ C0 = #dets_cont{no_objs = default, bin = <<>>, alloc = L},
+ BinParms = term_to_binary(Parms),
+ {H2, {C0#dets_cont{tab = Tab, what = bchunk}, [BinParms]}}
+ end;
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end.
+
+%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error}
+do_bchunk(Head, State) ->
+ case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of
+ {error, Reason} ->
+ dets_utils:corrupt_reason(Head, Reason);
+ {finished, Bins} ->
+ {Head, {State#dets_cont{bin = eof}, Bins}};
+ {Bins, NewL} ->
+ {Head, {State#dets_cont{alloc = NewL}, Bins}}
+ end.
+
+%% -> {NewHead, Result}
+fdelete_all_objects(Head) when Head#head.fixed =:= false ->
+ case catch do_delete_all_objects(Head) of
+ {ok, NewHead} ->
+ start_auto_save_timer(NewHead),
+ {NewHead, ok};
+ {error, Reason} ->
+ dets_utils:corrupt_reason(Head, Reason)
+ end;
+fdelete_all_objects(Head) ->
+ {Head, fixed}.
+
+do_delete_all_objects(Head) ->
+ #head{fptr = Fd, name = Tab, filename = Fname, type = Type, keypos = Kp,
+ ram_file = Ram, auto_save = Auto, min_no_slots = MinSlots,
+ max_no_slots = MaxSlots, cache = Cache} = Head,
+ CacheSz = dets_utils:cache_size(Cache),
+ ok = dets_utils:truncate(Fd, Fname, bof),
+ (Head#head.mod):initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots,
+ Ram, CacheSz, Auto, true).
+
+%% -> {NewHead, Reply}, Reply = ok | Error.
+fdelete_key(Head, Keys) ->
+ do_delete(Head, Keys, delete_key).
+
+%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
+fdelete_object(Head, Objects) ->
+ do_delete(Head, Objects, delete_object).
+
+ffirst(H) ->
+ Ref = make_ref(),
+ case catch {Ref, ffirst1(H)} of
+ {Ref, {NH, R}} ->
+ {NH, {ok, R}};
+ {NH, R} when is_record(NH, head) ->
+ {NH, {error, R}}
+ end.
+
+ffirst1(H) ->
+ check_safe_fixtable(H),
+ {NH, []} = write_cache(H),
+ ffirst(NH, 0).
+
+ffirst(H, Slot) ->
+ case (H#head.mod):slot_objs(H, Slot) of
+ '$end_of_table' -> {H, '$end_of_table'};
+ [] -> ffirst(H, Slot+1);
+ [X|_] -> {H, element(H#head.keypos, X)}
+ end.
+
+%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
+finsert(Head, Objects) ->
+ case catch update_cache(Head, Objects, insert) of
+ {NewHead, []} ->
+ {NewHead, ok};
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end.
+
+%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
+finsert_new(Head, Objects) ->
+ KeyPos = Head#head.keypos,
+ case catch lists:map(fun(Obj) -> element(KeyPos, Obj) end, Objects) of
+ Keys when is_list(Keys) ->
+ case catch update_cache(Head, Keys, {lookup, nopid}) of
+ {Head1, PidObjs} when is_list(PidObjs) ->
+ case lists:all(fun({_P,OL}) -> OL =:= [] end, PidObjs) of
+ true ->
+ case catch update_cache(Head1, Objects, insert) of
+ {NewHead, []} ->
+ {NewHead, true};
+ {NewHead, Error} when is_record(NewHead, head) ->
+ {NewHead, Error}
+ end;
+ false=Reply ->
+ {Head1, Reply}
+ end;
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end;
+ _ ->
+ {Head, badarg}
+ end.
+
+do_safe_fixtable(Head, Pid, true) ->
+ case Head#head.fixed of
+ false ->
+ link(Pid),
+ Fixed = {erlang:now(), [{Pid, 1}]},
+ Ftab = dets_utils:get_freelists(Head),
+ Head#head{fixed = Fixed, freelists = {Ftab, Ftab}};
+ {TimeStamp, Counters} ->
+ case lists:keysearch(Pid, 1, Counters) of
+ {value, {Pid, Counter}} -> % when Counter > 1
+ NewCounters = lists:keyreplace(Pid, 1, Counters,
+ {Pid, Counter+1}),
+ Head#head{fixed = {TimeStamp, NewCounters}};
+ false ->
+ link(Pid),
+ Fixed = {TimeStamp, [{Pid, 1} | Counters]},
+ Head#head{fixed = Fixed}
+ end
+ end;
+do_safe_fixtable(Head, Pid, false) ->
+ remove_fix(Head, Pid, false).
+
+remove_fix(Head, Pid, How) ->
+ case Head#head.fixed of
+ false ->
+ Head;
+ {TimeStamp, Counters} ->
+ case lists:keysearch(Pid, 1, Counters) of
+ %% How =:= close when Pid closes the table.
+ {value, {Pid, Counter}} when Counter =:= 1; How =:= close ->
+ unlink(Pid),
+ case lists:keydelete(Pid, 1, Counters) of
+ [] ->
+ check_growth(Head),
+ erlang:garbage_collect(),
+ Head#head{fixed = false,
+ freelists = dets_utils:get_freelists(Head)};
+ NewCounters ->
+ Head#head{fixed = {TimeStamp, NewCounters}}
+ end;
+ {value, {Pid, Counter}} ->
+ NewCounters = lists:keyreplace(Pid, 1, Counters,
+ {Pid, Counter-1}),
+ Head#head{fixed = {TimeStamp, NewCounters}};
+ false ->
+ Head
+ end
+ end.
+
+do_stop(Head) ->
+ unlink_fixing_procs(Head),
+ fclose(Head).
+
+unlink_fixing_procs(Head) ->
+ case Head#head.fixed of
+ false ->
+ Head;
+ {_, Counters} ->
+ lists:map(fun({Pid, _Counter}) -> unlink(Pid) end, Counters),
+ Head#head{fixed = false,
+ freelists = dets_utils:get_freelists(Head)}
+ end.
+
+check_growth(#head{access = read}) ->
+ ok;
+check_growth(Head) ->
+ NoThings = no_things(Head),
+ if
+ NoThings > Head#head.next ->
+ erlang:send_after(200, self(),
+ ?DETS_CALL(self(), may_grow)); % Catch up.
+ true ->
+ ok
+ end.
+
+finfo(H) ->
+ case catch write_cache(H) of
+ {H2, []} ->
+ Info = (catch [{type, H2#head.type},
+ {keypos, H2#head.keypos},
+ {size, H2#head.no_objects},
+ {file_size,
+ file_size(H2#head.fptr, H2#head.filename)},
+ {filename, H2#head.filename}]),
+ {H2, Info};
+ {H2, _} = HeadError when is_record(H2, head) ->
+ HeadError
+ end.
+
+finfo(H, access) -> {H, H#head.access};
+finfo(H, auto_save) -> {H, H#head.auto_save};
+finfo(H, bchunk_format) ->
+ case catch write_cache(H) of
+ {H2, []} ->
+ case (H2#head.mod):table_parameters(H2) of
+ undefined = Undef ->
+ {H2, Undef};
+ Parms ->
+ {H2, term_to_binary(Parms)}
+ end;
+ {H2, _} = HeadError when is_record(H2, head) ->
+ HeadError
+ end;
+finfo(H, delayed_write) -> % undocumented
+ {H, dets_utils:cache_size(H#head.cache)};
+finfo(H, filename) -> {H, H#head.filename};
+finfo(H, file_size) ->
+ case catch write_cache(H) of
+ {H2, []} ->
+ {H2, catch file_size(H#head.fptr, H#head.filename)};
+ {H2, _} = HeadError when is_record(H2, head) ->
+ HeadError
+ end;
+finfo(H, fixed) ->
+ %% true if fixtable/2 has been called
+ {H, not (H#head.fixed =:= false)};
+finfo(H, hash) -> {H, H#head.hash_bif};
+finfo(H, keypos) -> {H, H#head.keypos};
+finfo(H, memory) -> finfo(H, file_size);
+finfo(H, no_objects) -> finfo(H, size);
+finfo(H, no_keys) ->
+ case catch write_cache(H) of
+ {H2, []} ->
+ {H2, H2#head.no_keys};
+ {H2, _} = HeadError when is_record(H2, head) ->
+ HeadError
+ end;
+finfo(H, no_slots) -> {H, (H#head.mod):no_slots(H)};
+finfo(H, pid) -> {H, self()};
+finfo(H, ram_file) -> {H, H#head.ram_file};
+finfo(H, safe_fixed) -> {H, H#head.fixed};
+finfo(H, size) ->
+ case catch write_cache(H) of
+ {H2, []} ->
+ {H2, H2#head.no_objects};
+ {H2, _} = HeadError when is_record(H2, head) ->
+ HeadError
+ end;
+finfo(H, type) -> {H, H#head.type};
+finfo(H, version) -> {H, H#head.version};
+finfo(H, _) -> {H, undefined}.
+
+file_size(Fd, FileName) ->
+ {ok, Pos} = dets_utils:position(Fd, FileName, eof),
+ Pos.
+
+test_bchunk_format(_Head, undefined) ->
+ false;
+test_bchunk_format(Head, _Term) when Head#head.version =:= 8 ->
+ false;
+test_bchunk_format(Head, Term) ->
+ dets_v9:try_bchunk_header(Term, Head) =/= not_ok.
+
+do_open_file([Fname, Verbose], Parent, Server, Ref) ->
+ case catch fopen2(Fname, Ref) of
+ {error, _Reason} = Error ->
+ err(Error);
+ {ok, Head} ->
+ maybe_put(verbose, Verbose),
+ {ok, Head#head{parent = Parent, server = Server}};
+ {'EXIT', _Reason} = Error ->
+ Error;
+ Bad ->
+ error_logger:format
+ ("** dets: Bug was found in open_file/1, reply was ~w.~n",
+ [Bad]),
+ {error, {dets_bug, Fname, Bad}}
+ end;
+do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref) ->
+ case catch fopen3(Tab, OpenArgs) of
+ {error, {tooshort, _}} ->
+ file:delete(OpenArgs#open_args.file),
+ do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref);
+ {error, _Reason} = Error ->
+ err(Error);
+ {ok, Head} ->
+ maybe_put(verbose, Verb),
+ {ok, Head#head{parent = Parent, server = Server}};
+ {'EXIT', _Reason} = Error ->
+ Error;
+ Bad ->
+ error_logger:format
+ ("** dets: Bug was found in open_file/2, arguments were~n"
+ "** dets: ~w and reply was ~w.~n",
+ [OpenArgs, Bad]),
+ {error, {dets_bug, Tab, {open_file, OpenArgs}, Bad}}
+ end.
+
+maybe_put(_, undefined) ->
+ ignore;
+maybe_put(K, V) ->
+ put(K, V).
+
+%% -> {Head, Result}, Result = ok | Error | {thrown, Error} | badarg
+finit(Head, InitFun, _Format, _NoSlots) when Head#head.access =:= read ->
+ _ = (catch InitFun(close)),
+ {Head, {error, {access_mode, Head#head.filename}}};
+finit(Head, InitFun, _Format, _NoSlots) when Head#head.fixed =/= false ->
+ _ = (catch InitFun(close)),
+ {Head, {error, {fixed_table, Head#head.name}}};
+finit(Head, InitFun, Format, NoSlots) ->
+ case catch do_finit(Head, InitFun, Format, NoSlots) of
+ {ok, NewHead} ->
+ check_growth(NewHead),
+ start_auto_save_timer(NewHead),
+ {NewHead, ok};
+ badarg ->
+ {Head, badarg};
+ Error ->
+ dets_utils:corrupt(Head, Error)
+ end.
+
+%% -> {ok, NewHead} | throw(badarg) | throw(Error)
+do_finit(Head, Init, Format, NoSlots) ->
+ #head{fptr = Fd, type = Type, keypos = Kp, auto_save = Auto,
+ cache = Cache, filename = Fname, ram_file = Ram,
+ min_no_slots = MinSlots0, max_no_slots = MaxSlots,
+ name = Tab, update_mode = UpdateMode, mod = HMod} = Head,
+ CacheSz = dets_utils:cache_size(Cache),
+ {How, Head1} =
+ case Format of
+ term when is_integer(NoSlots), NoSlots > MaxSlots ->
+ throw(badarg);
+ term ->
+ MinSlots = choose_no_slots(NoSlots, MinSlots0),
+ if
+ UpdateMode =:= new_dirty, MinSlots =:= MinSlots0 ->
+ {general_init, Head};
+ true ->
+ ok = dets_utils:truncate(Fd, Fname, bof),
+ {ok, H} = HMod:initiate_file(Fd, Tab, Fname, Type, Kp,
+ MinSlots, MaxSlots, Ram,
+ CacheSz, Auto, false),
+ {general_init, H}
+ end;
+ bchunk ->
+ ok = dets_utils:truncate(Fd, Fname, bof),
+ {bchunk_init, Head}
+ end,
+ case How of
+ bchunk_init ->
+ case HMod:bchunk_init(Head1, Init) of
+ {ok, NewHead} ->
+ {ok, NewHead#head{update_mode = dirty}};
+ Error ->
+ Error
+ end;
+ general_init ->
+ Cntrs = ets:new(dets_init, []),
+ Input = HMod:bulk_input(Head1, Init, Cntrs),
+ SlotNumbers = {Head1#head.min_no_slots, bulk_init, MaxSlots},
+ {Reply, SizeData} =
+ do_sort(Head1, SlotNumbers, Input, Cntrs, Fname, not_used),
+ Bulk = true,
+ case Reply of
+ {ok, NoDups, H1} ->
+ fsck_copy(SizeData, H1, Bulk, NoDups);
+ Else ->
+ close_files(Bulk, SizeData, Head1),
+ Else
+ end
+ end.
+
+%% -> {NewHead, [LookedUpObject]} | {NewHead, Error}
+flookup_keys(Head, Keys) ->
+ case catch update_cache(Head, Keys, {lookup, nopid}) of
+ {NewHead, [{_NoPid,Objs}]} ->
+ {NewHead, Objs};
+ {NewHead, L} when is_list(L) ->
+ {NewHead, lists:flatmap(fun({_Pid,OL}) -> OL end, L)};
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end.
+
+%% -> {NewHead, Result}
+fmatch_init(Head, C) ->
+ case scan(Head, C) of
+ {scan_error, Reason} ->
+ dets_utils:corrupt_reason(Head, Reason);
+ {Ts, NC} ->
+ {Head, {cont, {Ts, NC}}}
+ end.
+
+%% -> {NewHead, Result}
+fmatch(Head, MP, Spec, N) ->
+ KeyPos = Head#head.keypos,
+ case find_all_keys(Spec, KeyPos, []) of
+ [] ->
+ %% Complete match
+ case catch write_cache(Head) of
+ {NewHead, []} ->
+ C0 = init_scan(NewHead, N),
+ {NewHead, {cont, C0#dets_cont{match_program = MP}}};
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end;
+ List ->
+ Keys = lists:usort(List),
+ {NewHead, Reply} = flookup_keys(Head, Keys),
+ case Reply of
+ Objs when is_list(Objs) ->
+ MatchingObjs = ets:match_spec_run(Objs, MP),
+ {NewHead, {done, MatchingObjs}};
+ Error ->
+ {NewHead, Error}
+ end
+ end.
+
+find_all_keys([], _, Ks) ->
+ Ks;
+find_all_keys([{H,_,_} | T], KeyPos, Ks) when is_tuple(H) ->
+ case tuple_size(H) of
+ Enough when Enough >= KeyPos ->
+ Key = element(KeyPos, H),
+ case contains_variable(Key) of
+ true ->
+ [];
+ false ->
+ find_all_keys(T, KeyPos, [Key | Ks])
+ end;
+ _ ->
+ find_all_keys(T, KeyPos, Ks)
+ end;
+find_all_keys(_, _, _) ->
+ [].
+
+contains_variable('_') ->
+ true;
+contains_variable(A) when is_atom(A) ->
+ case atom_to_list(A) of
+ [$$ | T] ->
+ case (catch list_to_integer(T)) of
+ {'EXIT', _} ->
+ false;
+ _ ->
+ true
+ end;
+ _ ->
+ false
+ end;
+contains_variable(T) when is_tuple(T) ->
+ contains_variable(tuple_to_list(T));
+contains_variable([]) ->
+ false;
+contains_variable([H|T]) ->
+ case contains_variable(H) of
+ true ->
+ true;
+ false ->
+ contains_variable(T)
+ end;
+contains_variable(_) ->
+ false.
+
+%% -> {NewHead, Res}
+fmatch_delete_init(Head, MP, Spec) ->
+ KeyPos = Head#head.keypos,
+ case catch
+ case find_all_keys(Spec, KeyPos, []) of
+ [] ->
+ do_fmatch_delete_var_keys(Head, MP, Spec);
+ List ->
+ Keys = lists:usort(List),
+ do_fmatch_constant_keys(Head, Keys, MP)
+ end of
+ {NewHead, _} = Reply when is_record(NewHead, head) ->
+ Reply
+ end.
+
+%% A note: If deleted objects reside in a bucket with other objects
+%% that are not deleted, the bucket is moved. If the address of the
+%% moved bucket is greater than original bucket address the kept
+%% objects will be read once again later on.
+%% -> {NewHead, Res}
+fmatch_delete(Head, C) ->
+ case scan(Head, C) of
+ {scan_error, Reason} ->
+ dets_utils:corrupt_reason(Head, Reason);
+ {[], _} ->
+ {Head, {done, 0}};
+ {RTs, NC} ->
+ MP = C#dets_cont.match_program,
+ case catch filter_binary_terms(RTs, MP, []) of
+ {'EXIT', _} ->
+ Bad = dets_utils:bad_object(fmatch_delete, RTs),
+ dets_utils:corrupt_reason(Head, Bad);
+ Terms ->
+ do_fmatch_delete(Head, Terms, NC)
+ end
+ end.
+
+do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'))
+ when Head#head.fixed =:= false ->
+ %% Handle the case where the file is emptied efficiently.
+ %% Empty the cache just to get the number of objects right.
+ {Head1, []} = write_cache(Head),
+ N = Head1#head.no_objects,
+ case fdelete_all_objects(Head1) of
+ {NewHead, ok} ->
+ {NewHead, {done, N}};
+ Reply ->
+ Reply
+ end;
+do_fmatch_delete_var_keys(Head, MP, _Spec) ->
+ {NewHead, []} = write_cache(Head),
+ C0 = init_scan(NewHead, default),
+ {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}.
+
+do_fmatch_constant_keys(Head, Keys, MP) ->
+ case flookup_keys(Head, Keys) of
+ {NewHead, ReadTerms} when is_list(ReadTerms) ->
+ Terms = filter_terms(ReadTerms, MP, []),
+ do_fmatch_delete(NewHead, Terms, fixed);
+ Reply ->
+ Reply
+ end.
+
+filter_binary_terms([Bin | Bins], MP, L) ->
+ Term = binary_to_term(Bin),
+ case ets:match_spec_run([Term], MP) of
+ [true] ->
+ filter_binary_terms(Bins, MP, [Term | L]);
+ _ ->
+ filter_binary_terms(Bins, MP, L)
+ end;
+filter_binary_terms([], _MP, L) ->
+ L.
+
+filter_terms([Term | Terms], MP, L) ->
+ case ets:match_spec_run([Term], MP) of
+ [true] ->
+ filter_terms(Terms, MP, [Term | L]);
+ _ ->
+ filter_terms(Terms, MP, L)
+ end;
+filter_terms([], _MP, L) ->
+ L.
+
+do_fmatch_delete(Head, Terms, What) ->
+ N = length(Terms),
+ case do_delete(Head, Terms, delete_object) of
+ {NewHead, ok} when What =:= fixed ->
+ {NewHead, {done, N}};
+ {NewHead, ok} ->
+ {NewHead, {cont, What, N}};
+ Reply ->
+ Reply
+ end.
+
+do_delete(Head, Things, What) ->
+ case catch update_cache(Head, Things, What) of
+ {NewHead, []} ->
+ {NewHead, ok};
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end.
+
+fmember(Head, Key) ->
+ case catch begin
+ {Head2, [{_NoPid,Objs}]} =
+ update_cache(Head, [Key], {lookup, nopid}),
+ {Head2, Objs =/= []}
+ end of
+ {NewHead, _} = Reply when is_record(NewHead, head) ->
+ Reply
+ end.
+
+fnext(Head, Key) ->
+ Slot = (Head#head.mod):db_hash(Key, Head),
+ Ref = make_ref(),
+ case catch {Ref, fnext(Head, Key, Slot)} of
+ {Ref, {H, R}} ->
+ {H, {ok, R}};
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end.
+
+fnext(H, Key, Slot) ->
+ {NH, []} = write_cache(H),
+ case (H#head.mod):slot_objs(NH, Slot) of
+ '$end_of_table' -> {NH, '$end_of_table'};
+ L -> fnext_search(NH, Key, Slot, L)
+ end.
+
+fnext_search(H, K, Slot, L) ->
+ Kp = H#head.keypos,
+ case beyond_key(K, Kp, L) of
+ [] -> fnext_slot(H, K, Slot+1);
+ L2 -> {H, element(H#head.keypos, hd(L2))}
+ end.
+
+%% We've got to continue to search for the next key in the next slot
+fnext_slot(H, K, Slot) ->
+ case (H#head.mod):slot_objs(H, Slot) of
+ '$end_of_table' -> {H, '$end_of_table'};
+ [] -> fnext_slot(H, K, Slot+1);
+ L -> {H, element(H#head.keypos, hd(L))}
+ end.
+
+beyond_key(_K, _Kp, []) -> [];
+beyond_key(K, Kp, [H|T]) ->
+ case dets_utils:cmp(element(Kp, H), K) of
+ 0 -> beyond_key2(K, Kp, T);
+ _ -> beyond_key(K, Kp, T)
+ end.
+
+beyond_key2(_K, _Kp, []) -> [];
+beyond_key2(K, Kp, [H|T]=L) ->
+ case dets_utils:cmp(element(Kp, H), K) of
+ 0 -> beyond_key2(K, Kp, T);
+ _ -> L
+ end.
+
+%% Open an already existing file, no arguments
+%% -> {ok, head()} | throw(Error)
+fopen2(Fname, Tab) ->
+ case file:read_file_info(Fname) of
+ {ok, _} ->
+ Acc = read_write,
+ Ram = false,
+ %% Fd is not always closed upon error, but exit is soon called.
+ {ok, Fd, FH} = read_file_header(Fname, Acc, Ram),
+ Mod = FH#fileheader.mod,
+ case Mod:check_file_header(FH, Fd) of
+ {error, not_closed} ->
+ io:format(user,"dets: file ~p not properly closed, "
+ "repairing ...~n", [Fname]),
+ Version = default,
+ case fsck(Fd, Tab, Fname, FH, default, default, Version) of
+ ok ->
+ fopen2(Fname, Tab);
+ Error ->
+ throw(Error)
+ end;
+ {ok, Head, ExtraInfo} ->
+ open_final(Head, Fname, Acc, Ram, ?DEFAULT_CACHE,
+ Tab, ExtraInfo, false);
+ {error, Reason} ->
+ throw({error, {Reason, Fname}})
+ end;
+ Error ->
+ dets_utils:file_error(Fname, Error)
+ end.
+
+%% Open and possibly create and initialize a file
+%% -> {ok, head()} | throw(Error)
+fopen3(Tab, OpenArgs) ->
+ FileName = OpenArgs#open_args.file,
+ case file:read_file_info(FileName) of
+ {ok, _} ->
+ fopen_existing_file(Tab, OpenArgs);
+ Error when OpenArgs#open_args.access =:= read ->
+ dets_utils:file_error(FileName, Error);
+ _Error ->
+ fopen_init_file(Tab, OpenArgs)
+ end.
+
+fopen_existing_file(Tab, OpenArgs) ->
+ #open_args{file = Fname, type = Type, keypos = Kp, repair = Rep,
+ min_no_slots = MinSlots, max_no_slots = MaxSlots,
+ ram_file = Ram, delayed_write = CacheSz, auto_save =
+ Auto, access = Acc, version = Version, debug = Debug} =
+ OpenArgs,
+ %% Fd is not always closed upon error, but exit is soon called.
+ {ok, Fd, FH} = read_file_header(Fname, Acc, Ram),
+ V9 = (Version =:= 9) or (Version =:= default),
+ MinF = (MinSlots =:= default) or (MinSlots =:= FH#fileheader.min_no_slots),
+ MaxF = (MaxSlots =:= default) or (MaxSlots =:= FH#fileheader.max_no_slots),
+ Do = case (FH#fileheader.mod):check_file_header(FH, Fd) of
+ {ok, Head, true} when Rep =:= force, Acc =:= read_write,
+ FH#fileheader.version =:= 9,
+ FH#fileheader.no_colls =/= undefined,
+ MinF, MaxF, V9 ->
+ {compact, Head};
+ {ok, _Head, _Extra} when Rep =:= force, Acc =:= read ->
+ throw({error, {access_mode, Fname}});
+ {ok, Head, need_compacting} when Acc =:= read ->
+ {final, Head, true}; % Version 8 only.
+ {ok, _Head, need_compacting} when Rep =:= true ->
+ %% The file needs to be compacted due to a very big
+ %% and fragmented free_list. Version 8 only.
+ M = " is now compacted ...",
+ {repair, M};
+ {ok, _Head, _Extra} when Rep =:= force ->
+ M = ", repair forced.",
+ {repair, M};
+ {ok, Head, ExtraInfo} ->
+ {final, Head, ExtraInfo};
+ {error, not_closed} when Rep =:= force, Acc =:= read_write ->
+ M = ", repair forced.",
+ {repair, M};
+ {error, not_closed} when Rep =:= true, Acc =:= read_write ->
+ M = " not properly closed, repairing ...",
+ {repair, M};
+ {error, not_closed} when Rep =:= false ->
+ throw({error, {needs_repair, Fname}});
+ {error, version_bump} when Rep =:= true, Acc =:= read_write ->
+ %% Version 8 only
+ M = " old version, upgrading ...",
+ {repair, M};
+ {error, Reason} ->
+ throw({error, {Reason, Fname}})
+ end,
+ case Do of
+ _ when FH#fileheader.type =/= Type ->
+ throw({error, {type_mismatch, Fname}});
+ _ when FH#fileheader.keypos =/= Kp ->
+ throw({error, {keypos_mismatch, Fname}});
+ {compact, SourceHead} ->
+ io:format(user, "dets: file ~p is now compacted ...~n", [Fname]),
+ {ok, NewSourceHead} = open_final(SourceHead, Fname, read, false,
+ ?DEFAULT_CACHE, Tab, true,
+ Debug),
+ case catch compact(NewSourceHead) of
+ ok ->
+ erlang:garbage_collect(),
+ fopen3(Tab, OpenArgs#open_args{repair = false});
+ _Err ->
+ _ = file:close(Fd),
+ dets_utils:stop_disk_map(),
+ io:format(user, "dets: compaction of file ~p failed, "
+ "now repairing ...~n", [Fname]),
+ {ok, Fd2, _FH} = read_file_header(Fname, Acc, Ram),
+ do_repair(Fd2, Tab, Fname, FH, MinSlots, MaxSlots,
+ Version, OpenArgs)
+ end;
+ {repair, Mess} ->
+ io:format(user, "dets: file ~p~s~n", [Fname, Mess]),
+ do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots,
+ Version, OpenArgs);
+ _ when FH#fileheader.version =/= Version, Version =/= default ->
+ throw({error, {version_mismatch, Fname}});
+ {final, H, EI} ->
+ H1 = H#head{auto_save = Auto},
+ open_final(H1, Fname, Acc, Ram, CacheSz, Tab, EI, Debug)
+ end.
+
+do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version, OpenArgs) ->
+ case fsck(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version) of
+ ok ->
+ %% No need to update 'version'.
+ erlang:garbage_collect(),
+ fopen3(Tab, OpenArgs#open_args{repair = false});
+ Error ->
+ throw(Error)
+ end.
+
+%% -> {ok, head()} | throw(Error)
+open_final(Head, Fname, Acc, Ram, CacheSz, Tab, ExtraInfo, Debug) ->
+ Head1 = Head#head{access = Acc,
+ ram_file = Ram,
+ filename = Fname,
+ name = Tab,
+ cache = dets_utils:new_cache(CacheSz)},
+ init_disk_map(Head1#head.version, Tab, Debug),
+ Mod = Head#head.mod,
+ Mod:cache_segps(Head1#head.fptr, Fname, Head1#head.next),
+ Ftab = Mod:init_freelist(Head1, ExtraInfo),
+ check_growth(Head1),
+ NewHead = Head1#head{freelists = Ftab},
+ {ok, NewHead}.
+
+%% -> {ok, head()} | throw(Error)
+fopen_init_file(Tab, OpenArgs) ->
+ #open_args{file = Fname, type = Type, keypos = Kp,
+ min_no_slots = MinSlotsArg, max_no_slots = MaxSlotsArg,
+ ram_file = Ram, delayed_write = CacheSz, auto_save = Auto,
+ version = UseVersion, debug = Debug} = OpenArgs,
+ MinSlots = choose_no_slots(MinSlotsArg, ?DEFAULT_MIN_NO_SLOTS),
+ MaxSlots = choose_no_slots(MaxSlotsArg, ?DEFAULT_MAX_NO_SLOTS),
+ FileSpec = if
+ Ram -> [];
+ true -> Fname
+ end,
+ {ok, Fd} = dets_utils:open(FileSpec, open_args(read_write, Ram)),
+ Version = if
+ UseVersion =:= default ->
+ case os:getenv("DETS_USE_FILE_FORMAT") of
+ "8" -> 8;
+ _ -> 9
+ end;
+ true ->
+ UseVersion
+ end,
+ Mod = version2module(Version),
+ %% No need to truncate an empty file.
+ init_disk_map(Version, Tab, Debug),
+ case catch Mod:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots,
+ Ram, CacheSz, Auto, true) of
+ {error, Reason} when Ram ->
+ file:close(Fd),
+ throw({error, Reason});
+ {error, Reason} ->
+ file:close(Fd),
+ file:delete(Fname),
+ throw({error, Reason});
+ {ok, Head} ->
+ start_auto_save_timer(Head),
+ %% init_table does not need to truncate and write header
+ {ok, Head#head{update_mode = new_dirty}}
+ end.
+
+%% Debug.
+init_disk_map(9, Name, Debug) ->
+ case Debug orelse dets_utils:debug_mode() of
+ true ->
+ dets_utils:init_disk_map(Name);
+ false ->
+ ok
+ end;
+init_disk_map(_Version, _Name, _Debug) ->
+ ok.
+
+open_args(Access, RamFile) ->
+ A1 = case Access of
+ read -> [];
+ read_write -> [write]
+ end,
+ A2 = case RamFile of
+ true -> [ram];
+ false -> [raw]
+ end,
+ A1 ++ A2 ++ [binary, read].
+
+version2module(V) when V =< 8 -> dets_v8;
+version2module(9) -> dets_v9.
+
+module2version(dets_v8) -> 8;
+module2version(dets_v9) -> 9;
+module2version(not_used) -> 9.
+
+%% -> ok | throw(Error)
+%% For version 9 tables only.
+compact(SourceHead) ->
+ #head{name = Tab, filename = Fname, fptr = SFd, type = Type, keypos = Kp,
+ ram_file = Ram, auto_save = Auto} = SourceHead,
+ Tmp = tempfile(Fname),
+ TblParms = dets_v9:table_parameters(SourceHead),
+ {ok, Fd} = dets_utils:open(Tmp, open_args(read_write, false)),
+ CacheSz = ?DEFAULT_CACHE,
+ %% It is normally not possible to have two open tables in the same
+ %% process since the process dictionary is used for caching
+ %% segment pointers, but here is works anyway--when reading a file
+ %% serially the pointers to not need to be used.
+ Head = case catch dets_v9:prep_table_copy(Fd, Tab, Tmp, Type, Kp, Ram,
+ CacheSz, Auto, TblParms) of
+ {ok, H} ->
+ H;
+ Error ->
+ file:close(Fd),
+ file:delete(Tmp),
+ throw(Error)
+ end,
+
+ case dets_v9:compact_init(SourceHead, Head, TblParms) of
+ {ok, NewHead} ->
+ R = case fclose(NewHead) of
+ ok ->
+ ok = file:close(SFd),
+ %% Save (rename) Fname first?
+ dets_utils:rename(Tmp, Fname);
+ E ->
+ E
+ end,
+ if
+ R =:= ok -> ok;
+ true ->
+ file:delete(Tmp),
+ throw(R)
+ end;
+ Err ->
+ file:close(Fd),
+ file:delete(Tmp),
+ throw(Err)
+ end.
+
+%% -> ok | Error
+%% Closes Fd.
+fsck(Fd, Tab, Fname, FH, MinSlotsArg, MaxSlotsArg, Version) ->
+ %% MinSlots and MaxSlots are the option values.
+ #fileheader{min_no_slots = MinSlotsFile,
+ max_no_slots = MaxSlotsFile} = FH,
+ EstNoSlots0 = file_no_things(FH),
+ MinSlots = choose_no_slots(MinSlotsArg, MinSlotsFile),
+ MaxSlots = choose_no_slots(MaxSlotsArg, MaxSlotsFile),
+ EstNoSlots = erlang:min(MaxSlots, erlang:max(MinSlots, EstNoSlots0)),
+ SlotNumbers = {MinSlots, EstNoSlots, MaxSlots},
+ %% When repairing: We first try and sort on slots using MinSlots.
+ %% If the number of objects (keys) turns out to be significantly
+ %% different from NoSlots, we try again with the correct number of
+ %% objects (keys).
+ case fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) of
+ {try_again, BetterNoSlots} ->
+ BetterSlotNumbers = {MinSlots, BetterNoSlots, MaxSlots},
+ case fsck_try(Fd, Tab, FH, Fname, BetterSlotNumbers, Version) of
+ {try_again, _} ->
+ file:close(Fd),
+ {error, {cannot_repair, Fname}};
+ Else ->
+ Else
+ end;
+ Else ->
+ Else
+ end.
+
+choose_no_slots(default, NoSlots) -> NoSlots;
+choose_no_slots(NoSlots, _) -> NoSlots.
+
+%% -> ok | {try_again, integer()} | Error
+%% Closes Fd unless {try_again, _} is returned.
+%% Initiating a table using a fun and repairing (or converting) a
+%% file are completely different things, but nevertheless the same
+%% method is used in both cases...
+fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) ->
+ Tmp = tempfile(Fname),
+ #fileheader{type = Type, keypos = KeyPos} = FH,
+ {_MinSlots, EstNoSlots, MaxSlots} = SlotNumbers,
+ OpenArgs = #open_args{file = Tmp, type = Type, keypos = KeyPos,
+ repair = false, min_no_slots = EstNoSlots,
+ max_no_slots = MaxSlots,
+ ram_file = false, delayed_write = ?DEFAULT_CACHE,
+ auto_save = infinity, access = read_write,
+ version = Version, debug = false},
+ case catch fopen3(Tab, OpenArgs) of
+ {ok, Head} ->
+ case fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) of
+ {ok, NewHead} ->
+ R = case fclose(NewHead) of
+ ok ->
+ %% Save (rename) Fname first?
+ dets_utils:rename(Tmp, Fname);
+ Error ->
+ Error
+ end,
+ if
+ R =:= ok -> ok;
+ true ->
+ file:delete(Tmp),
+ R
+ end;
+ TryAgainOrError ->
+ file:delete(Tmp),
+ TryAgainOrError
+ end;
+ Error ->
+ file:close(Fd),
+ Error
+ end.
+
+tempfile(Fname) ->
+ Tmp = lists:concat([Fname, ".TMP"]),
+ case file:delete(Tmp) of
+ {error, eacces} -> % 'dets_process_died' happened anyway... (W-nd-ws)
+ timer:sleep(5000),
+ file:delete(Tmp);
+ _ ->
+ ok
+ end,
+ Tmp.
+
+%% -> {ok, NewHead} | {try_again, integer()} | Error
+fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) ->
+ %% Mod is the module to use for reading input when repairing.
+ Mod = FH#fileheader.mod,
+ Cntrs = ets:new(dets_repair, []),
+ Input = Mod:fsck_input(Head, Fd, Cntrs, FH),
+ {Reply, SizeData} = do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod),
+ Bulk = false,
+ case Reply of
+ {ok, NoDups, H1} ->
+ file:close(Fd),
+ fsck_copy(SizeData, H1, Bulk, NoDups);
+ {try_again, _} = Return ->
+ close_files(Bulk, SizeData, Head),
+ Return;
+ Else ->
+ file:close(Fd),
+ close_files(Bulk, SizeData, Head),
+ Else
+ end.
+
+do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod) ->
+ OldV = module2version(Mod),
+ %% output_objs/4 replaces {LogSize,NoObjects} in Cntrs by
+ %% {LogSize,Position,Data,NoObjects | NoCollections}.
+ %% Data = {FileName,FileDescriptor} | [object()]
+ %% For small tables Data may be a list of objects which is more
+ %% efficient since no temporary files are created.
+ Output = (Head#head.mod):output_objs(OldV, Head, SlotNumbers, Cntrs),
+ TmpDir = filename:dirname(Fname),
+ Reply = (catch file_sorter:sort(Input, Output,
+ [{format, binary},{tmpdir, TmpDir}])),
+ L = ets:tab2list(Cntrs),
+ ets:delete(Cntrs),
+ {Reply, lists:reverse(lists:keysort(1, L))}.
+
+fsck_copy([{_LogSz, Pos, Bins, _NoObjects} | SizeData], Head, _Bulk, NoDups)
+ when is_list(Bins) ->
+ true = NoDups =:= 0,
+ PWs = [{Pos,Bins} | lists:map(fun({_, P, B, _}) -> {P, B} end, SizeData)],
+ #head{fptr = Fd, filename = FileName} = Head,
+ dets_utils:pwrite(Fd, FileName, PWs),
+ {ok, Head#head{update_mode = dirty}};
+fsck_copy(SizeData, Head, Bulk, NoDups) ->
+ catch fsck_copy1(SizeData, Head, Bulk, NoDups).
+
+fsck_copy1([SzData | L], Head, Bulk, NoDups) ->
+ Out = Head#head.fptr,
+ {LogSz, Pos, {FileName, Fd}, NoObjects} = SzData,
+ Size = if NoObjects =:= 0 -> 0; true -> ?POW(LogSz-1) end,
+ ExpectedSize = Size * NoObjects,
+ close_tmp(Fd),
+ case file:position(Out, Pos) of
+ {ok, Pos} -> ok;
+ PError -> dets_utils:file_error(FileName, PError)
+ end,
+ {ok, Pos} = file:position(Out, Pos),
+ CR = file:copy({FileName, [raw,binary]}, Out),
+ file:delete(FileName),
+ case CR of
+ {ok, Copied} when Copied =:= ExpectedSize;
+ NoObjects =:= 0 -> % the segments
+ fsck_copy1(L, Head, Bulk, NoDups);
+ {ok, Copied} when Bulk, Head#head.version =:= 8 ->
+ NoZeros = ExpectedSize - Copied,
+ Dups = NoZeros div Size,
+ Addr = Pos+Copied,
+ NewHead = free_n_objects(Head, Addr, Size-1, NoDups),
+ NewNoDups = NoDups - Dups,
+ fsck_copy1(L, NewHead, Bulk, NewNoDups);
+ {ok, _Copied} -> % should never happen
+ close_files(Bulk, L, Head),
+ Reason = if Bulk -> initialization_failed;
+ true -> repair_failed end,
+ {error, {Reason, Head#head.filename}};
+ FError ->
+ close_files(Bulk, L, Head),
+ dets_utils:file_error(FileName, FError)
+ end;
+fsck_copy1([], Head, _Bulk, NoDups) when NoDups =/= 0 ->
+ {error, {initialization_failed, Head#head.filename}};
+fsck_copy1([], Head, _Bulk, _NoDups) ->
+ {ok, Head#head{update_mode = dirty}}.
+
+free_n_objects(Head, _Addr, _Size, 0) ->
+ Head;
+free_n_objects(Head, Addr, Size, N) ->
+ {NewHead, _} = dets_utils:free(Head, Addr, Size),
+ NewAddr = Addr + Size + 1,
+ free_n_objects(NewHead, NewAddr, Size, N-1).
+
+close_files(false, SizeData, Head) ->
+ file:close(Head#head.fptr),
+ close_files(true, SizeData, Head);
+close_files(true, SizeData, _Head) ->
+ Fun = fun({_Size, _Pos, {FileName, Fd}, _No}) ->
+ close_tmp(Fd),
+ file:delete(FileName);
+ (_) ->
+ ok
+ end,
+ lists:foreach(Fun, SizeData).
+
+close_tmp(Fd) ->
+ file:close(Fd).
+
+fslot(H, Slot) ->
+ case catch begin
+ {NH, []} = write_cache(H),
+ Objs = (NH#head.mod):slot_objs(NH, Slot),
+ {NH, Objs}
+ end of
+ {NewHead, _Objects} = Reply when is_record(NewHead, head) ->
+ Reply
+ end.
+
+do_update_counter(Head, _Key, _Incr) when Head#head.type =/= set ->
+ {Head, badarg};
+do_update_counter(Head, Key, Incr) ->
+ case flookup_keys(Head, [Key]) of
+ {H1, [O]} ->
+ Kp = H1#head.keypos,
+ case catch try_update_tuple(O, Kp, Incr) of
+ {'EXIT', _} ->
+ {H1, badarg};
+ {New, Term} ->
+ case finsert(H1, [Term]) of
+ {H2, ok} ->
+ {H2, New};
+ Reply ->
+ Reply
+ end
+ end;
+ {H1, []} ->
+ {H1, badarg};
+ HeadError ->
+ HeadError
+ end.
+
+try_update_tuple(O, _Kp, {Pos, Incr}) ->
+ try_update_tuple2(O, Pos, Incr);
+try_update_tuple(O, Kp, Incr) ->
+ try_update_tuple2(O, Kp+1, Incr).
+
+try_update_tuple2(O, Pos, Incr) ->
+ New = element(Pos, O) + Incr,
+ {New, setelement(Pos, O, New)}.
+
+set_verbose(true) ->
+ put(verbose, yes);
+set_verbose(_) ->
+ erase(verbose).
+
+where_is_object(Head, Object) ->
+ Keypos = Head#head.keypos,
+ case check_objects([Object], Keypos) of
+ true ->
+ case catch write_cache(Head) of
+ {NewHead, []} ->
+ {NewHead, (Head#head.mod):find_object(NewHead, Object)};
+ {NewHead, _} = HeadError when is_record(NewHead, head) ->
+ HeadError
+ end;
+ false ->
+ {Head, badarg}
+ end.
+
+check_objects([T | Ts], Kp) when tuple_size(T) >= Kp ->
+ check_objects(Ts, Kp);
+check_objects(L, _Kp) ->
+ L =:= [].
+
+no_things(Head) when Head#head.no_keys =:= undefined ->
+ Head#head.no_objects;
+no_things(Head) ->
+ Head#head.no_keys.
+
+file_no_things(FH) when FH#fileheader.no_keys =:= undefined ->
+ FH#fileheader.no_objects;
+file_no_things(FH) ->
+ FH#fileheader.no_keys.
+
+%%% The write cache is list of {Key, [Item]} where Item is one of
+%%% {Seq, delete_key}, {Seq, {lookup,Pid}}, {Seq, {delete_object,object()}},
+%%% or {Seq, {insert,object()}}. Seq is a number that increases
+%%% monotonically for each item put in the cache. The purpose is to
+%%% make sure that items are sorted correctly. Sequences of delete and
+%%% insert operations are inserted in the cache without doing any file
+%%% operations. When the cache is considered full, a lookup operation
+%%% is requested, or after some delay, the contents of the cache are
+%%% written to the file, and the cache emptied.
+%%%
+%%% Data is not allowed to linger more than 'delay' milliseconds in
+%%% the write cache. A delayed_write message is received when some
+%%% datum has become too old. If 'wrtime' is equal to 'undefined',
+%%% then the cache is empty and no such delayed_write message has been
+%%% scheduled. Otherwise there is a delayed_write message scheduled,
+%%% and the value of 'wrtime' is the time when the cache was last
+%%% written, or when it was first updated after the cache was last
+%%% written.
+
+update_cache(Head, KeysOrObjects, What) ->
+ {Head1, LU, PwriteList} = update_cache(Head, [{What,KeysOrObjects}]),
+ {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList),
+ {NewHead, LU}.
+
+%% -> {NewHead, [object()], pwrite_list()} | throw({Head, Error})
+update_cache(Head, ToAdd) ->
+ Cache = Head#head.cache,
+ #cache{cache = C, csize = Size0, inserts = Ins} = Cache,
+ NewSize = Size0 + erlang:external_size(ToAdd),
+ %% The size is used as a sequence number here; it increases monotonically.
+ {NewC, NewIns, Lookup, Found} =
+ cache_binary(Head, ToAdd, C, Size0, Ins, false, []),
+ NewCache = Cache#cache{cache = NewC, csize = NewSize, inserts = NewIns},
+ Head1 = Head#head{cache = NewCache},
+ if
+ Lookup; NewSize >= Cache#cache.tsize ->
+ %% The cache is considered full, or some lookup.
+ {NewHead, LU, PwriteList} = (Head#head.mod):write_cache(Head1),
+ {NewHead, Found ++ LU, PwriteList};
+ NewC =:= [] ->
+ {Head1, Found, []};
+ Cache#cache.wrtime =:= undefined ->
+ %% Empty cache. Schedule a delayed write.
+ Now = now(), Me = self(),
+ Call = ?DETS_CALL(Me, {delayed_write, Now}),
+ erlang:send_after(Cache#cache.delay, Me, Call),
+ {Head1#head{cache = NewCache#cache{wrtime = Now}}, Found, []};
+ Size0 =:= 0 ->
+ %% Empty cache that has been written after the
+ %% currently scheduled delayed write.
+ {Head1#head{cache = NewCache#cache{wrtime = now()}}, Found, []};
+ true ->
+ %% Cache is not empty, delayed write has been scheduled.
+ {Head1, Found, []}
+ end.
+
+cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu,F) when Q =:= delete_object ->
+ cache_obj_op(Head, L, C, Seq, Ins, Lu, F, Os, Head#head.keypos, Q);
+cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu, F) when Q =:= insert ->
+ NewIns = Ins + length(Os),
+ cache_obj_op(Head, L, C, Seq, NewIns, Lu, F, Os, Head#head.keypos, Q);
+cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) when Q =:= delete_key ->
+ cache_key_op(Head, L, C, Seq, Ins, Lu, F, Ks, Q);
+cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, _Lu, F) when C =:= [] -> % lookup
+ cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q);
+cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) -> % lookup
+ case dets_utils:cache_lookup(Head#head.type, Ks, C, []) of
+ false ->
+ cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q);
+ Found ->
+ {lookup,Pid} = Q,
+ cache_binary(Head, L, C, Seq, Ins, Lu, [{Pid,Found} | F])
+ end;
+cache_binary(_Head, [], C, _Seq, Ins, Lu, F) ->
+ {C, Ins, Lu, F}.
+
+cache_key_op(Head, L, C, Seq, Ins, Lu, F, [K | Ks], Q) ->
+ E = {K, {Seq, Q}},
+ cache_key_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Ks, Q);
+cache_key_op(Head, L, C, Seq, Ins, Lu, F, [], _Q) ->
+ cache_binary(Head, L, C, Seq, Ins, Lu, F).
+
+cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [O | Os], Kp, Q) ->
+ E = {element(Kp, O), {Seq, {Q, O}}},
+ cache_obj_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Os, Kp, Q);
+cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [], _Kp, _Q) ->
+ cache_binary(Head, L, C, Seq, Ins, Lu, F).
+
+%% Called after some delay.
+%% -> NewHead
+delayed_write(Head, WrTime) ->
+ Cache = Head#head.cache,
+ LastWrTime = Cache#cache.wrtime,
+ if
+ LastWrTime =:= WrTime ->
+ %% The cache was not emptied during the last delay.
+ case catch write_cache(Head) of
+ {Head2, []} ->
+ NewCache = (Head2#head.cache)#cache{wrtime = undefined},
+ Head2#head{cache = NewCache};
+ {NewHead, _Error} -> % Head.update_mode has been updated
+ NewHead
+ end;
+ true ->
+ %% The cache was emptied during the delay.
+ %% Has anything been written since then?
+ if
+ Cache#cache.csize =:= 0 ->
+ %% No, further delayed write not needed.
+ NewCache = Cache#cache{wrtime = undefined},
+ Head#head{cache = NewCache};
+ true ->
+ %% Yes, schedule a new delayed write.
+ {MS1,S1,M1} = WrTime,
+ {MS2,S2,M2} = LastWrTime,
+ WrT = M1+1000000*(S1+1000000*MS1),
+ LastWrT = M2+1000000*(S2+1000000*MS2),
+ When = round((LastWrT - WrT)/1000), Me = self(),
+ Call = ?DETS_CALL(Me, {delayed_write, LastWrTime}),
+ erlang:send_after(When, Me, Call),
+ Head
+ end
+ end.
+
+%% -> {NewHead, [LookedUpObject]} | throw({NewHead, Error})
+write_cache(Head) ->
+ {Head1, LU, PwriteList} = (Head#head.mod):write_cache(Head),
+ {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList),
+ {NewHead, LU}.
+
+status(Head) ->
+ case Head#head.update_mode of
+ saved -> ok;
+ dirty -> ok;
+ new_dirty -> ok;
+ Error -> Error
+ end.
+
+%%% Scan the file from start to end by reading chunks.
+
+%% -> dets_cont()
+init_scan(Head, NoObjs) ->
+ check_safe_fixtable(Head),
+ FreeLists = dets_utils:get_freelists(Head),
+ Base = Head#head.base,
+ {From, To} = dets_utils:find_next_allocated(FreeLists, Base, Base),
+ #dets_cont{no_objs = NoObjs, bin = <<>>, alloc = {From, To, <<>>}}.
+
+check_safe_fixtable(Head) ->
+ case (Head#head.fixed =:= false) andalso
+ ((get(verbose) =:= yes) orelse dets_utils:debug_mode()) of
+ true ->
+ error_logger:format
+ ("** dets: traversal of ~p needs safe_fixtable~n",
+ [Head#head.name]);
+ false ->
+ ok
+ end.
+
+%% -> {[RTerm], dets_cont()} | {scan_error, Reason}
+%% RTerm = {Pos, Next, Size, Status, Term}
+scan(_Head, #dets_cont{alloc = <<>>}=C) ->
+ {[], C};
+scan(Head, C) -> % when is_record(C, dets_cont)
+ #dets_cont{no_objs = No, alloc = L0, bin = Bin} = C,
+ {From, To, L} = L0,
+ R = case No of
+ default ->
+ 0;
+ _ when is_integer(No) ->
+ -No-1
+ end,
+ scan(Bin, Head, From, To, L, [], R, {C, Head#head.type}).
+
+scan(Bin, H, From, To, L, Ts, R, {C0, Type} = C) ->
+ case (H#head.mod):scan_objs(H, Bin, From, To, L, Ts, R, Type) of
+ {more, NFrom, NTo, NL, NTs, NR, Sz} ->
+ scan_read(H, NFrom, NTo, Sz, NL, NTs, NR, C);
+ {stop, <<>>=B, NFrom, NTo, <<>>=NL, NTs} ->
+ Ftab = dets_utils:get_freelists(H),
+ case dets_utils:find_next_allocated(Ftab, NFrom, H#head.base) of
+ none ->
+ {NTs, C0#dets_cont{bin = eof, alloc = B}};
+ _ ->
+ {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}}
+ end;
+ {stop, B, NFrom, NTo, NL, NTs} ->
+ {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}};
+ bad_object ->
+ {scan_error, dets_utils:bad_object(scan, {From, To, Bin})}
+ end.
+
+scan_read(_H, From, To, _Min, L0, Ts,
+ R, {C, _Type}) when R >= ?CHUNK_SIZE ->
+ %% We may have read (much) more than CHUNK_SIZE, if there are holes.
+ L = {From, To, L0},
+ {Ts, C#dets_cont{bin = <<>>, alloc = L}};
+scan_read(H, From, _To, Min, _L, Ts, R, C) ->
+ Max = if
+ Min < ?CHUNK_SIZE -> ?CHUNK_SIZE;
+ true -> Min
+ end,
+ FreeLists = dets_utils:get_freelists(H),
+ case dets_utils:find_allocated(FreeLists, From, Max, H#head.base) of
+ <<>>=Bin0 ->
+ {Cont, _} = C,
+ {Ts, Cont#dets_cont{bin = eof, alloc = Bin0}};
+ <<From1:32,To1:32,L1/binary>> ->
+ case dets_utils:pread_n(H#head.fptr, From1, Max) of
+ eof ->
+ {scan_error, premature_eof};
+ NewBin ->
+ scan(NewBin, H, From1, To1, L1, Ts, R, C)
+ end
+ end.
+
+err(Error) ->
+ case get(verbose) of
+ yes ->
+ error_logger:format("** dets: failed with ~w~n", [Error]),
+ Error;
+ undefined ->
+ Error
+ end.
+
+%%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%%
+
+file_info(FileName) ->
+ case catch read_file_header(FileName, read, false) of
+ {ok, Fd, FH} ->
+ file:close(Fd),
+ (FH#fileheader.mod):file_info(FH);
+ Other ->
+ Other
+ end.
+
+get_head_field(Fd, Field) ->
+ dets_utils:read_4(Fd, Field).
+
+%% Dump the contents of a DAT file to the tty
+%% internal debug function which ignores the closed properly thingie
+%% and just tries anyway
+
+view(FileName) ->
+ case catch read_file_header(FileName, read, false) of
+ {ok, Fd, FH} ->
+ Mod = FH#fileheader.mod,
+ case Mod:check_file_header(FH, Fd) of
+ {ok, H0, ExtraInfo} ->
+ Ftab = Mod:init_freelist(H0, ExtraInfo),
+ {_Bump, Base} = constants(FH, FileName),
+ H = H0#head{freelists=Ftab, base = Base},
+ v_free_list(H),
+ Mod:v_segments(H),
+ file:close(Fd);
+ X ->
+ file:close(Fd),
+ X
+ end;
+ X ->
+ X
+ end.
+
+v_free_list(Head) ->
+ io:format("FREE LIST ...... \n",[]),
+ io:format("~p~n", [dets_utils:all_free(Head)]),
+ io:format("END OF FREE LIST \n",[]).