diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/stdlib/src/dets.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/stdlib/src/dets.erl')
-rw-r--r-- | lib/stdlib/src/dets.erl | 2989 |
1 files changed, 2989 insertions, 0 deletions
diff --git a/lib/stdlib/src/dets.erl b/lib/stdlib/src/dets.erl new file mode 100644 index 0000000000..7f1c13770b --- /dev/null +++ b/lib/stdlib/src/dets.erl @@ -0,0 +1,2989 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(dets). + +%% Disk based linear hashing lookup dictionary. + +%% Public. +-export([all/0, + bchunk/2, + close/1, + delete/2, + delete_all_objects/1, + delete_object/2, + first/1, + foldl/3, + foldr/3, + from_ets/2, + info/1, + info/2, + init_table/2, + init_table/3, + insert/2, + insert_new/2, + is_compatible_bchunk_format/2, + is_dets_file/1, + lookup/2, + match/1, + match/2, + match/3, + match_delete/2, + match_object/1, + match_object/2, + match_object/3, + member/2, + next/2, + open_file/1, + open_file/2, + pid2name/1, + repair_continuation/2, + safe_fixtable/2, + select/1, + select/2, + select/3, + select_delete/2, + slot/2, + sync/1, + table/1, + table/2, + to_ets/2, + traverse/2, + update_counter/3]). + +%% Server export. +-export([start/0, stop/0]). + +%% Internal exports. +-export([istart_link/1, init/2, internal_open/3, add_user/3, + internal_close/1, remove_user/2, + system_continue/3, system_terminate/4, system_code_change/4]). + +%% Debug. +-export([file_info/1, + fsck/1, + fsck/2, + get_head_field/2, + view/1, + where/2, + verbose/0, + verbose/1 + ]). + +%% Not documented, or not ready for publication. +-export([lookup_keys/2]). + + +-compile({inline, [{einval,2},{badarg,2},{undefined,1}, + {badarg_exit,2},{lookup_reply,2}]}). + +-include_lib("kernel/include/file.hrl"). + +-include("dets.hrl"). + +-type object() :: tuple(). +-type pattern() :: atom() | tuple(). +-type tab_name() :: atom() | reference(). + +%%% This is the implementation of the mnesia file storage. Each (non +%%% ram-copy) table is maintained in a corresponding .DAT file. The +%%% dat file is organized as a segmented linear hashlist. The head of +%%% the file with the split indicator, size etc is held in ram by the +%%% server at all times. +%%% +%%% The parts specific for formats up to and including 8(c) are +%%% implemented in dets_v8.erl, parts specific for format 9 are +%%% implemented in dets_v9.erl. + +%% The method of hashing is the so called linear hashing algorithm +%% with segments. +%% +%% Linear hashing: +%% +%% - n indicates next bucket to split (initially zero); +%% - m is the size of the hash table +%% - initially next = m and n = 0 +%% +%% - to insert: +%% - hash = key mod m +%% - if hash < n then hash = key mod 2m +%% - when the number of objects exceeds the initial size +%% of the hash table, each insertion of an object +%% causes bucket n to be split: +%% - add a new bucket to the end of the table +%% - redistribute the contents of bucket n +%% using hash = key mod 2m +%% - increment n +%% - if n = m then m = 2m, n = 0 +%% - to search: +%% hash = key mod m +%% if hash < n then hash = key mod 2m +%% do linear scan of the bucket +%% + +%%% If a file error occurs on a working dets file, update_mode is set +%%% to the error tuple. When in 'error' mode, the free lists are not +%%% written, and a repair is forced next time the file is opened. + +-record(dets_cont, { + what, % object | bindings | select | bchunk + no_objs, % requested number of objects: default | integer() > 0 + bin, % small chunk not consumed, or 'eof' at end-of-file + alloc, % the part of the file not yet scanned, mostly a binary + tab, + match_program % true | compiled_match_spec() | undefined + }). + +-record(open_args, { + file, + type, + keypos, + repair, + min_no_slots, + max_no_slots, + ram_file, + delayed_write, + auto_save, + access, + version, + debug + }). + +-define(PATTERN_TO_OBJECT_MATCH_SPEC(Pat), [{Pat,[],['$_']}]). +-define(PATTERN_TO_BINDINGS_MATCH_SPEC(Pat), [{Pat,[],['$$']}]). +-define(PATTERN_TO_TRUE_MATCH_SPEC(Pat), [{Pat,[],[true]}]). + +%%-define(DEBUGM(X, Y), io:format(X, Y)). +-define(DEBUGM(X, Y), true). + +%%-define(DEBUGF(X,Y), io:format(X, Y)). +-define(DEBUGF(X,Y), void). + +%%-define(PROFILE(C), C). +-define(PROFILE(C), void). + +%%% Some further debug code was added in R12B-1 (stdlib-1.15.1): +%%% - there is a new open_file() option 'debug'; +%%% - there is a new OS environment variable 'DETS_DEBUG'; +%%% - verbose(true) implies that info messages are written onto +%%% the error log whenever an unsafe traversal is started. +%%% The 'debug' mode (set by the open_file() option 'debug' or +%%% by os:putenv("DETS_DEBUG", "true")) implies that the results of +%%% calling pwrite() and pread() are tested to some extent. It also +%%% means a considerable overhead when it comes to RAM usage. The +%%% operation of Dets is also slowed down a bit. Note that in debug +%%% mode terms will be output on the error logger. + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- + +add_user(Pid, Tab, Args) -> + req(Pid, {add_user, Tab, Args}). + +-spec all() -> [tab_name()]. + +all() -> + dets_server:all(). + +-type cont() :: #dets_cont{}. +-spec bchunk(tab_name(), 'start' | cont()) -> + {cont(), binary() | tuple()} | '$end_of_table' | {'error', term()}. + +bchunk(Tab, start) -> + badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]); +bchunk(Tab, #dets_cont{bin = eof, tab = Tab}) -> + '$end_of_table'; +bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) -> + badarg(treq(Tab, {bchunk, State}), [Tab, State]); +bchunk(Tab, Term) -> + erlang:error(badarg, [Tab, Term]). + +-spec close(tab_name()) -> 'ok' | {'error', term()}. + +close(Tab) -> + case dets_server:close(Tab) of + badarg -> % Should not happen. + {error, not_owner}; % Backwards compatibility... + Reply -> + Reply + end. + +-spec delete(tab_name(), term()) -> 'ok' | {'error', term()}. + +delete(Tab, Key) -> + badarg(treq(Tab, {delete_key, [Key]}), [Tab, Key]). + +-spec delete_all_objects(tab_name()) -> 'ok' | {'error', term()}. + +delete_all_objects(Tab) -> + case treq(Tab, delete_all_objects) of + badarg -> + erlang:error(badarg, [Tab]); + fixed -> + match_delete(Tab, '_'); + Reply -> + Reply + end. + +-spec delete_object(tab_name(), object()) -> 'ok' | {'error', term()}. + +delete_object(Tab, O) -> + badarg(treq(Tab, {delete_object, [O]}), [Tab, O]). + +%% Given a filename, fsck it. Debug. +fsck(Fname) -> + fsck(Fname, default). + +fsck(Fname, Version) -> + catch begin + {ok, Fd, FH} = read_file_header(Fname, read, false), + ?DEBUGF("FileHeader: ~p~n", [FH]), + case (FH#fileheader.mod):check_file_header(FH, Fd) of + {error, not_closed} -> + fsck(Fd, make_ref(), Fname, FH, default, default, Version); + {ok, _Head, _Extra} -> + fsck(Fd, make_ref(), Fname, FH, default, default, Version); + Error -> + Error + end + end. + +-spec first(tab_name()) -> term() | '$end_of_table'. + +first(Tab) -> + badarg_exit(treq(Tab, first), [Tab]). + +-spec foldr(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}. + +foldr(Fun, Acc, Tab) -> + foldl(Fun, Acc, Tab). + +-spec foldl(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}. + +foldl(Fun, Acc, Tab) -> + Ref = make_ref(), + do_traverse(Fun, Acc, Tab, Ref). + +-spec from_ets(tab_name(), ets:tab()) -> 'ok' | {'error', term()}. + +from_ets(DTab, ETab) -> + ets:safe_fixtable(ETab, true), + Spec = ?PATTERN_TO_OBJECT_MATCH_SPEC('_'), + LC = ets:select(ETab, Spec, 100), + InitFun = from_ets_fun(LC, ETab), + Reply = treq(DTab, {initialize, InitFun, term, default}), + ets:safe_fixtable(ETab, false), + case Reply of + {thrown, Thrown} -> throw(Thrown); + Else -> badarg(Else, [DTab, ETab]) + end. + +from_ets_fun(LC, ETab) -> + fun(close) -> + ok; + (read) when LC =:= '$end_of_table' -> + end_of_input; + (read) -> + {L, C} = LC, + {L, from_ets_fun(ets:select(C), ETab)} + end. + +info(Tab) -> + case catch dets_server:get_pid(Tab) of + {'EXIT', _Reason} -> + undefined; + Pid -> + undefined(req(Pid, info)) + end. + +info(Tab, owner) -> + case catch dets_server:get_pid(Tab) of + Pid when is_pid(Pid) -> + Pid; + _ -> + undefined + end; +info(Tab, users) -> % undocumented + case dets_server:users(Tab) of + [] -> + undefined; + Users -> + Users + end; +info(Tab, Tag) -> + case catch dets_server:get_pid(Tab) of + {'EXIT', _Reason} -> + undefined; + Pid -> + undefined(req(Pid, {info, Tag})) + end. + +init_table(Tab, InitFun) -> + init_table(Tab, InitFun, []). + +init_table(Tab, InitFun, Options) when is_function(InitFun) -> + case options(Options, [format, min_no_slots]) of + {badarg,_} -> + erlang:error(badarg, [Tab, InitFun, Options]); + [Format, MinNoSlots] -> + case treq(Tab, {initialize, InitFun, Format, MinNoSlots}) of + {thrown, Thrown} -> throw(Thrown); + Else -> badarg(Else, [Tab, InitFun, Options]) + end + end; +init_table(Tab, InitFun, Options) -> + erlang:error(badarg, [Tab, InitFun, Options]). + +insert(Tab, Objs) when is_list(Objs) -> + badarg(treq(Tab, {insert, Objs}), [Tab, Objs]); +insert(Tab, Obj) -> + badarg(treq(Tab, {insert, [Obj]}), [Tab, Obj]). + +insert_new(Tab, Objs) when is_list(Objs) -> + badarg(treq(Tab, {insert_new, Objs}), [Tab, Objs]); +insert_new(Tab, Obj) -> + badarg(treq(Tab, {insert_new, [Obj]}), [Tab, Obj]). + +internal_close(Pid) -> + req(Pid, close). + +internal_open(Pid, Ref, Args) -> + req(Pid, {internal_open, Ref, Args}). + +is_compatible_bchunk_format(Tab, Term) -> + badarg(treq(Tab, {is_compatible_bchunk_format, Term}), [Tab, Term]). + +is_dets_file(FileName) -> + case catch read_file_header(FileName, read, false) of + {ok, Fd, FH} -> + file:close(Fd), + FH#fileheader.cookie =:= ?MAGIC; + {error, {tooshort, _}} -> + false; + {error, {not_a_dets_file, _}} -> + false; + Other -> + Other + end. + +lookup(Tab, Key) -> + badarg(treq(Tab, {lookup_keys, [Key]}), [Tab, Key]). + +%% Not public. +lookup_keys(Tab, Keys) -> + case catch lists:usort(Keys) of + UKeys when is_list(UKeys), UKeys =/= [] -> + badarg(treq(Tab, {lookup_keys, UKeys}), [Tab, Keys]); + _Else -> + erlang:error(badarg, [Tab, Keys]) + end. + +match(Tab, Pat) -> + badarg(safe_match(Tab, Pat, bindings), [Tab, Pat]). + +match(Tab, Pat, N) -> + badarg(init_chunk_match(Tab, Pat, bindings, N), [Tab, Pat, N]). + +match(State) when State#dets_cont.what =:= bindings -> + badarg(chunk_match(State), [State]); +match(Term) -> + erlang:error(badarg, [Term]). + +-spec match_delete(tab_name(), pattern()) -> + non_neg_integer() | 'ok' | {'error', term()}. + +match_delete(Tab, Pat) -> + badarg(match_delete(Tab, Pat, delete), [Tab, Pat]). + +match_delete(Tab, Pat, What) -> + safe_fixtable(Tab, true), + case compile_match_spec(What, Pat) of + {Spec, MP} -> + Proc = dets_server:get_pid(Tab), + R = req(Proc, {match_delete_init, MP, Spec}), + do_match_delete(Tab, Proc, R, What, 0); + badarg -> + badarg + end. + +do_match_delete(Tab, _Proc, {done, N1}, select, N) -> + safe_fixtable(Tab, false), + N + N1; +do_match_delete(Tab, _Proc, {done, _N1}, _What, _N) -> + safe_fixtable(Tab, false), + ok; +do_match_delete(Tab, Proc, {cont, State, N1}, What, N) -> + do_match_delete(Tab, Proc, req(Proc, {match_delete, State}), What, N+N1); +do_match_delete(Tab, _Proc, Error, _What, _N) -> + safe_fixtable(Tab, false), + Error. + +match_object(Tab, Pat) -> + badarg(safe_match(Tab, Pat, object), [Tab, Pat]). + +match_object(Tab, Pat, N) -> + badarg(init_chunk_match(Tab, Pat, object, N), [Tab, Pat, N]). + +match_object(State) when State#dets_cont.what =:= object -> + badarg(chunk_match(State), [State]); +match_object(Term) -> + erlang:error(badarg, [Term]). + +member(Tab, Key) -> + badarg(treq(Tab, {member, Key}), [Tab, Key]). + +next(Tab, Key) -> + badarg_exit(treq(Tab, {next, Key}), [Tab, Key]). + +%% Assuming that a file already exists, open it with the +%% parameters as already specified in the file itself. +%% Return a ref leading to the file. +open_file(File) -> + case dets_server:open_file(to_list(File)) of + badarg -> % Should not happen. + erlang:error(dets_process_died, [File]); + Reply -> + einval(Reply, [File]) + end. + +open_file(Tab, Args) when is_list(Args) -> + case catch defaults(Tab, Args) of + OpenArgs when is_record(OpenArgs, open_args) -> + case dets_server:open_file(Tab, OpenArgs) of + badarg -> % Should not happen. + erlang:error(dets_process_died, [Tab, Args]); + Reply -> + einval(Reply, [Tab, Args]) + end; + _ -> + erlang:error(badarg, [Tab, Args]) + end; +open_file(Tab, Arg) -> + open_file(Tab, [Arg]). + +pid2name(Pid) -> + dets_server:pid2name(Pid). + +remove_user(Pid, From) -> + req(Pid, {close, From}). + +repair_continuation(#dets_cont{match_program = B}=Cont, MS) + when is_binary(B) -> + case ets:is_compiled_ms(B) of + true -> + Cont; + false -> + Cont#dets_cont{match_program = ets:match_spec_compile(MS)} + end; +repair_continuation(#dets_cont{}=Cont, _MS) -> + Cont; +repair_continuation(T, MS) -> + erlang:error(badarg, [T, MS]). + +safe_fixtable(Tab, Bool) when Bool; not Bool -> + badarg(treq(Tab, {safe_fixtable, Bool}), [Tab, Bool]); +safe_fixtable(Tab, Term) -> + erlang:error(badarg, [Tab, Term]). + +select(Tab, Pat) -> + badarg(safe_match(Tab, Pat, select), [Tab, Pat]). + +select(Tab, Pat, N) -> + badarg(init_chunk_match(Tab, Pat, select, N), [Tab, Pat, N]). + +select(State) when State#dets_cont.what =:= select -> + badarg(chunk_match(State), [State]); +select(Term) -> + erlang:error(badarg, [Term]). + +select_delete(Tab, Pat) -> + badarg(match_delete(Tab, Pat, select), [Tab, Pat]). + +slot(Tab, Slot) when is_integer(Slot), Slot >= 0 -> + badarg(treq(Tab, {slot, Slot}), [Tab, Slot]); +slot(Tab, Term) -> + erlang:error(badarg, [Tab, Term]). + +start() -> + dets_server:start(). + +stop() -> + dets_server:stop(). + +istart_link(Server) -> + {ok, proc_lib:spawn_link(dets, init, [self(), Server])}. + +sync(Tab) -> + badarg(treq(Tab, sync), [Tab]). + +table(Tab) -> + table(Tab, []). + +table(Tab, Opts) -> + case options(Opts, [traverse, n_objects]) of + {badarg,_} -> + erlang:error(badarg, [Tab, Opts]); + [Traverse, NObjs] -> + TF = case Traverse of + first_next -> + fun() -> qlc_next(Tab, first(Tab)) end; + select -> + fun(MS) -> qlc_select(select(Tab, MS, NObjs)) end; + {select, MS} -> + fun() -> qlc_select(select(Tab, MS, NObjs)) end + end, + PreFun = fun(_) -> safe_fixtable(Tab, true) end, + PostFun = fun() -> safe_fixtable(Tab, false) end, + InfoFun = fun(Tag) -> table_info(Tab, Tag) end, + %% lookup_keys is not public, but convenient + LookupFun = + case Traverse of + {select, _MS} -> + undefined; + _ -> + fun(_KeyPos, [K]) -> lookup(Tab, K); + (_KeyPos, Ks) -> lookup_keys(Tab, Ks) + end + end, + FormatFun = + fun({all, _NElements, _ElementFun}) -> + As = [Tab | [Opts || _ <- [[]], Opts =/= []]], + {?MODULE, table, As}; + ({match_spec, MS}) -> + {?MODULE, table, [Tab, [{traverse, {select, MS}} | + listify(Opts)]]}; + ({lookup, _KeyPos, [Value], _NElements, ElementFun}) -> + io_lib:format("~w:lookup(~w, ~w)", + [?MODULE, Tab, ElementFun(Value)]); + ({lookup, _KeyPos, Values, _NElements, ElementFun}) -> + Vals = [ElementFun(V) || V <- Values], + io_lib:format("lists:flatmap(fun(V) -> " + "~w:lookup(~w, V) end, ~w)", + [?MODULE, Tab, Vals]) + end, + qlc:table(TF, [{pre_fun, PreFun}, {post_fun, PostFun}, + {info_fun, InfoFun}, {format_fun, FormatFun}, + {key_equality, '=:='}, + {lookup_fun, LookupFun}]) + end. + +qlc_next(_Tab, '$end_of_table') -> + []; +qlc_next(Tab, Key) -> + case lookup(Tab, Key) of + Objects when is_list(Objects) -> + Objects ++ fun() -> qlc_next(Tab, next(Tab, Key)) end; + Error -> + %% Do what first and next do. + exit(Error) + end. + +qlc_select('$end_of_table') -> + []; +qlc_select({Objects, Cont}) when is_list(Objects) -> + Objects ++ fun() -> qlc_select(select(Cont)) end; +qlc_select(Error) -> + Error. + +table_info(Tab, num_of_objects) -> + info(Tab, size); +table_info(Tab, keypos) -> + info(Tab, keypos); +table_info(Tab, is_unique_objects) -> + info(Tab, type) =/= duplicate_bag; +table_info(_Tab, _) -> + undefined. + +%% End of table/2. + +to_ets(DTab, ETab) -> + case ets:info(ETab, protection) of + undefined -> + erlang:error(badarg, [DTab, ETab]); + _ -> + Fun = fun(X, T) -> true = ets:insert(T, X), T end, + foldl(Fun, ETab, DTab) + end. + +traverse(Tab, Fun) -> + Ref = make_ref(), + TFun = + fun(O, Acc) -> + case Fun(O) of + continue -> + Acc; + {continue, Val} -> + [Val | Acc]; + {done, Value} -> + throw({Ref, [Value | Acc]}); + Other -> + throw({Ref, Other}) + end + end, + do_traverse(TFun, [], Tab, Ref). + +update_counter(Tab, Key, C) -> + badarg(treq(Tab, {update_counter, Key, C}), [Tab, Key, C]). + +verbose() -> + verbose(true). + +verbose(What) -> + ok = dets_server:verbose(What), + All = dets_server:all(), + Fun = fun(Tab) -> treq(Tab, {set_verbose, What}) end, + lists:foreach(Fun, All), + All. + +%% Where in the (open) table is Object located? +%% The address of the first matching object is returned. +%% Format 9 returns the address of the object collection. +%% -> {ok, Address} | false +where(Tab, Object) -> + badarg(treq(Tab, {where, Object}), [Tab, Object]). + +do_traverse(Fun, Acc, Tab, Ref) -> + safe_fixtable(Tab, true), + Proc = dets_server:get_pid(Tab), + try + do_trav(Proc, Acc, Fun) + catch {Ref, Result} -> + Result + after + safe_fixtable(Tab, false) + end. + +do_trav(Proc, Acc, Fun) -> + {Spec, MP} = compile_match_spec(object, '_'), + %% MP not used + case req(Proc, {match, MP, Spec, default}) of + {cont, State} -> + do_trav(State, Proc, Acc, Fun); + Error -> + Error + end. + +do_trav(#dets_cont{bin = eof}, _Proc, Acc, _Fun) -> + Acc; +do_trav(State, Proc, Acc, Fun) -> + case req(Proc, {match_init, State}) of + {cont, {Bins, NewState}} -> + do_trav_bins(NewState, Proc, Acc, Fun, lists:reverse(Bins)); + Error -> + Error + end. + +do_trav_bins(State, Proc, Acc, Fun, []) -> + do_trav(State, Proc, Acc, Fun); +do_trav_bins(State, Proc, Acc, Fun, [Bin | Bins]) -> + %% Unpack one binary at a time, using the client's heap. + case catch binary_to_term(Bin) of + {'EXIT', _} -> + req(Proc, {corrupt, dets_utils:bad_object(do_trav_bins, Bin)}); + Term -> + NewAcc = Fun(Term, Acc), + do_trav_bins(State, Proc, NewAcc, Fun, Bins) + end. + +safe_match(Tab, Pat, What) -> + safe_fixtable(Tab, true), + R = do_safe_match(init_chunk_match(Tab, Pat, What, default), []), + safe_fixtable(Tab, false), + R. + +do_safe_match({error, Error}, _L) -> + {error, Error}; +do_safe_match({L, C}, LL) -> + do_safe_match(chunk_match(C), L++LL); +do_safe_match('$end_of_table', L) -> + L; +do_safe_match(badarg, _L) -> + badarg. + +%% What = object | bindings | select +init_chunk_match(Tab, Pat, What, N) when is_integer(N), N >= 0; + N =:= default -> + case compile_match_spec(What, Pat) of + {Spec, MP} -> + case req(dets_server:get_pid(Tab), {match, MP, Spec, N}) of + {done, L} -> + {L, #dets_cont{tab = Tab, what = What, bin = eof}}; + {cont, State} -> + chunk_match(State#dets_cont{what = What, tab = Tab}); + Error -> + Error + end; + badarg -> + badarg + end; +init_chunk_match(_Tab, _Pat, _What, _) -> + badarg. + +chunk_match(State) -> + case catch dets_server:get_pid(State#dets_cont.tab) of + {'EXIT', _Reason} -> + badarg; + _Proc when State#dets_cont.bin =:= eof -> + '$end_of_table'; + Proc -> + case req(Proc, {match_init, State}) of + {cont, {Bins, NewState}} -> + MP = NewState#dets_cont.match_program, + case catch do_foldl_bins(Bins, MP) of + {'EXIT', _} -> + case ets:is_compiled_ms(MP) of + true -> + Bad = dets_utils:bad_object(chunk_match, + Bins), + req(Proc, {corrupt, Bad}); + false -> + badarg + end; + [] -> + chunk_match(NewState); + Terms -> + {Terms, NewState} + end; + Error -> + Error + end + end. + +do_foldl_bins(Bins, true) -> + foldl_bins(Bins, []); +do_foldl_bins(Bins, MP) -> + foldl_bins(Bins, MP, []). + +foldl_bins([], Terms) -> + %% Preserve time order (version 9). + Terms; +foldl_bins([Bin | Bins], Terms) -> + foldl_bins(Bins, [binary_to_term(Bin) | Terms]). + +foldl_bins([], _MP, Terms) -> + %% Preserve time order (version 9). + Terms; +foldl_bins([Bin | Bins], MP, Terms) -> + Term = binary_to_term(Bin), + case ets:match_spec_run([Term], MP) of + [] -> + foldl_bins(Bins, MP, Terms); + [Result] -> + foldl_bins(Bins, MP, [Result | Terms]) + end. + +%% -> {Spec, binary()} | badarg +compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC('_') = Spec) -> + {Spec, true}; +compile_match_spec(select, Spec) -> + case catch ets:match_spec_compile(Spec) of + X when is_binary(X) -> + {Spec, X}; + _ -> + badarg + end; +compile_match_spec(object, Pat) -> + compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC(Pat)); +compile_match_spec(bindings, Pat) -> + compile_match_spec(select, ?PATTERN_TO_BINDINGS_MATCH_SPEC(Pat)); +compile_match_spec(delete, Pat) -> + compile_match_spec(select, ?PATTERN_TO_TRUE_MATCH_SPEC(Pat)). + +%% Process the args list as provided to open_file/2. +defaults(Tab, Args) -> + Defaults0 = #open_args{file = to_list(Tab), + type = set, + keypos = 1, + repair = true, + min_no_slots = default, + max_no_slots = default, + ram_file = false, + delayed_write = ?DEFAULT_CACHE, + auto_save = timer:minutes(?DEFAULT_AUTOSAVE), + access = read_write, + version = default, + debug = false}, + Fun = fun repl/2, + Defaults = lists:foldl(Fun, Defaults0, Args), + case Defaults#open_args.version of + 8 -> + Defaults#open_args{max_no_slots = default}; + _ -> + is_comp_min_max(Defaults) + end. + +to_list(T) when is_atom(T) -> atom_to_list(T); +to_list(T) -> T. + +repl({access, A}, Defs) -> + mem(A, [read, read_write]), + Defs#open_args{access = A}; +repl({auto_save, Int}, Defs) when is_integer(Int), Int >= 0 -> + Defs#open_args{auto_save = Int}; +repl({auto_save, infinity}, Defs) -> + Defs#open_args{auto_save =infinity}; +repl({cache_size, Int}, Defs) when is_integer(Int), Int >= 0 -> + %% Recognized, but ignored. + Defs; +repl({cache_size, infinity}, Defs) -> + Defs; +repl({delayed_write, default}, Defs) -> + Defs#open_args{delayed_write = ?DEFAULT_CACHE}; +repl({delayed_write, {Delay,Size} = C}, Defs) + when is_integer(Delay), Delay >= 0, is_integer(Size), Size >= 0 -> + Defs#open_args{delayed_write = C}; +repl({estimated_no_objects, I}, Defs) -> + repl({min_no_slots, I}, Defs); +repl({file, File}, Defs) -> + Defs#open_args{file = to_list(File)}; +repl({keypos, P}, Defs) when is_integer(P), P > 0 -> + Defs#open_args{keypos =P}; +repl({max_no_slots, I}, Defs) -> + %% Version 9 only. + MaxSlots = is_max_no_slots(I), + Defs#open_args{max_no_slots = MaxSlots}; +repl({min_no_slots, I}, Defs) -> + MinSlots = is_min_no_slots(I), + Defs#open_args{min_no_slots = MinSlots}; +repl({ram_file, Bool}, Defs) -> + mem(Bool, [true, false]), + Defs#open_args{ram_file = Bool}; +repl({repair, T}, Defs) -> + mem(T, [true, false, force]), + Defs#open_args{repair = T}; +repl({type, T}, Defs) -> + mem(T, [set, bag, duplicate_bag]), + Defs#open_args{type =T}; +repl({version, Version}, Defs) -> + V = is_version(Version), + Defs#open_args{version = V}; +repl({debug, Bool}, Defs) -> + %% Not documented. + mem(Bool, [true, false]), + Defs#open_args{debug = Bool}; +repl({_, _}, _) -> + exit(badarg). + +is_min_no_slots(default) -> default; +is_min_no_slots(I) when is_integer(I), I >= ?DEFAULT_MIN_NO_SLOTS -> I; +is_min_no_slots(I) when is_integer(I), I >= 0 -> ?DEFAULT_MIN_NO_SLOTS. + +is_max_no_slots(default) -> default; +is_max_no_slots(I) when is_integer(I), I > 0, I < 1 bsl 31 -> I. + +is_comp_min_max(Defs) -> + #open_args{max_no_slots = Max, min_no_slots = Min, version = V} = Defs, + case V of + _ when Min =:= default -> Defs; + _ when Max =:= default -> Defs; + _ -> true = Min =< Max, Defs + end. + +is_version(default) -> default; +is_version(8) -> 8; +is_version(9) -> 9. + +mem(X, L) -> + case lists:member(X, L) of + true -> true; + false -> exit(badarg) + end. + +options(Options, Keys) when is_list(Options) -> + options(Options, Keys, []); +options(Option, Keys) -> + options([Option], Keys, []). + +options(Options, [Key | Keys], L) when is_list(Options) -> + V = case lists:keysearch(Key, 1, Options) of + {value, {format, Format}} when Format =:= term; + Format =:= bchunk -> + {ok, Format}; + {value, {min_no_slots, I}} -> + case catch is_min_no_slots(I) of + {'EXIT', _} -> badarg; + MinNoSlots -> {ok, MinNoSlots} + end; + {value, {n_objects, default}} -> + {ok, default_option(Key)}; + {value, {n_objects, NObjs}} when is_integer(NObjs), + NObjs >= 1 -> + {ok, NObjs}; + {value, {traverse, select}} -> + {ok, select}; + {value, {traverse, {select, MS}}} -> + {ok, {select, MS}}; + {value, {traverse, first_next}} -> + {ok, first_next}; + {value, {Key, _}} -> + badarg; + false -> + Default = default_option(Key), + {ok, Default} + end, + case V of + badarg -> + {badarg, Key}; + {ok, Value} -> + NewOptions = lists:keydelete(Key, 1, Options), + options(NewOptions, Keys, [Value | L]) + end; +options([], [], L) -> + lists:reverse(L); +options(Options, _, _L) -> + {badarg,Options}. + +default_option(format) -> term; +default_option(min_no_slots) -> default; +default_option(traverse) -> select; +default_option(n_objects) -> default. + +listify(L) when is_list(L) -> + L; +listify(T) -> + [T]. + +treq(Tab, R) -> + case catch dets_server:get_pid(Tab) of + Pid when is_pid(Pid) -> + req(Pid, R); + _ -> + badarg + end. + +req(Proc, R) -> + Ref = erlang:monitor(process, Proc), + Proc ! ?DETS_CALL(self(), R), + receive + {'DOWN', Ref, process, Proc, _Info} -> + badarg; + {Proc, Reply} -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, process, Proc, _Reason} -> + Reply + after 0 -> + Reply + end + end. + +%% Inlined. +einval({error, {file_error, _, einval}}, A) -> + erlang:error(badarg, A); +einval({error, {file_error, _, badarg}}, A) -> + erlang:error(badarg, A); +einval(Reply, _A) -> + Reply. + +%% Inlined. +badarg(badarg, A) -> + erlang:error(badarg, A); +badarg(Reply, _A) -> + Reply. + +%% Inlined. +undefined(badarg) -> + undefined; +undefined(Reply) -> + Reply. + +%% Inlined. +badarg_exit(badarg, A) -> + erlang:error(badarg, A); +badarg_exit({ok, Reply}, _A) -> + Reply; +badarg_exit(Reply, _A) -> + exit(Reply). + +%%%----------------------------------------------------------------- +%%% Server functions +%%%----------------------------------------------------------------- + +init(Parent, Server) -> + process_flag(trap_exit, true), + open_file_loop(#head{parent = Parent, server = Server}). + +open_file_loop(Head) -> + open_file_loop(Head, 0). + +open_file_loop(Head, N) when element(1, Head#head.update_mode) =:= error -> + open_file_loop2(Head, N); +open_file_loop(Head, N) -> + receive + %% When the table is fixed it can be assumed that at least one + %% traversal is in progress. To speed the traversal up three + %% things have been done: + %% - prioritize match_init, bchunk, next, and match_delete_init; + %% - do not peek the message queue for updates; + %% - wait 1 ms after each update. + %% next is normally followed by lookup, but since lookup is also + %% used when not traversing the table, it is not prioritized. + ?DETS_CALL(From, {match_init, _State} = Op) -> + do_apply_op(Op, From, Head, N); + ?DETS_CALL(From, {bchunk, _State} = Op) -> + do_apply_op(Op, From, Head, N); + ?DETS_CALL(From, {next, _Key} = Op) -> + do_apply_op(Op, From, Head, N); + ?DETS_CALL(From, {match_delete_init, _MP, _Spec} = Op) -> + do_apply_op(Op, From, Head, N); + {'EXIT', Pid, Reason} when Pid =:= Head#head.parent -> + %% Parent orders shutdown. + _NewHead = do_stop(Head), + exit(Reason); + {'EXIT', Pid, Reason} when Pid =:= Head#head.server -> + %% The server is gone. + _NewHead = do_stop(Head), + exit(Reason); + {'EXIT', Pid, _Reason} -> + %% A process fixing the table exits. + H2 = remove_fix(Head, Pid, close), + open_file_loop(H2, N); + {system, From, Req} -> + sys:handle_system_msg(Req, From, Head#head.parent, + ?MODULE, [], Head) + after 0 -> + open_file_loop2(Head, N) + end. + +open_file_loop2(Head, N) -> + receive + ?DETS_CALL(From, Op) -> + do_apply_op(Op, From, Head, N); + {'EXIT', Pid, Reason} when Pid =:= Head#head.parent -> + %% Parent orders shutdown. + _NewHead = do_stop(Head), + exit(Reason); + {'EXIT', Pid, Reason} when Pid =:= Head#head.server -> + %% The server is gone. + _NewHead = do_stop(Head), + exit(Reason); + {'EXIT', Pid, _Reason} -> + %% A process fixing the table exits. + H2 = remove_fix(Head, Pid, close), + open_file_loop(H2, N); + {system, From, Req} -> + sys:handle_system_msg(Req, From, Head#head.parent, + ?MODULE, [], Head); + Message -> + error_logger:format("** dets: unexpected message" + "(ignored): ~w~n", [Message]), + open_file_loop(Head, N) + end. + +do_apply_op(Op, From, Head, N) -> + try apply_op(Op, From, Head, N) of + ok -> + open_file_loop(Head, N); + {N2, H2} when is_record(H2, head), is_integer(N2) -> + open_file_loop(H2, N2); + H2 when is_record(H2, head) -> + open_file_loop(H2, N) + catch + exit:normal -> + exit(normal); + _:Bad -> + Name = Head#head.name, + case dets_utils:debug_mode() of + true -> + %% If stream_op/5 found more requests, this is not + %% the last operation. + error_logger:format + ("** dets: Bug was found when accessing table ~w,~n" + "** dets: operation was ~p and reply was ~w.~n" + "** dets: Stacktrace: ~w~n", + [Name, Op, Bad, erlang:get_stacktrace()]); + false -> + error_logger:format + ("** dets: Bug was found when accessing table ~w~n", + [Name]) + end, + if + From =/= self() -> + From ! {self(), {error, {dets_bug, Name, Op, Bad}}}; + true -> % auto_save | may_grow | {delayed_write, _} + ok + end, + open_file_loop(Head, N) + end. + +apply_op(Op, From, Head, N) -> + case Op of + {add_user, Tab, OpenArgs}-> + #open_args{file = Fname, type = Type, keypos = Keypos, + ram_file = Ram, access = Access, + version = Version} = OpenArgs, + VersionOK = (Version =:= default) or + (Head#head.version =:= Version), + %% min_no_slots and max_no_slots are not tested + Res = if + Tab =:= Head#head.name, + Head#head.keypos =:= Keypos, + Head#head.type =:= Type, + Head#head.ram_file =:= Ram, + Head#head.access =:= Access, + VersionOK, + Fname =:= Head#head.filename -> + ok; + true -> + err({error, incompatible_arguments}) + end, + From ! {self(), Res}, + ok; + auto_save -> + case Head#head.update_mode of + saved -> + Head; + {error, _Reason} -> + Head; + _Dirty when N =:= 0 -> % dirty or new_dirty + %% The updates seems to have declined + dets_utils:vformat("** dets: Auto save of ~p\n", + [Head#head.name]), + {NewHead, _Res} = perform_save(Head, true), + erlang:garbage_collect(), + {0, NewHead}; + dirty -> + %% Reset counter and try later + start_auto_save_timer(Head), + {0, Head} + end; + close -> + From ! {self(), fclose(Head)}, + _NewHead = unlink_fixing_procs(Head), + ?PROFILE(ep:done()), + exit(normal); + {close, Pid} -> + %% Used from dets_server when Pid has closed the table, + %% but the table is still opened by some process. + NewHead = remove_fix(Head, Pid, close), + From ! {self(), status(NewHead)}, + NewHead; + {corrupt, Reason} -> + {H2, Error} = dets_utils:corrupt_reason(Head, Reason), + From ! {self(), Error}, + H2; + {delayed_write, WrTime} -> + delayed_write(Head, WrTime); + info -> + {H2, Res} = finfo(Head), + From ! {self(), Res}, + H2; + {info, Tag} -> + {H2, Res} = finfo(Head, Tag), + From ! {self(), Res}, + H2; + {is_compatible_bchunk_format, Term} -> + Res = test_bchunk_format(Head, Term), + From ! {self(), Res}, + ok; + {internal_open, Ref, Args} -> + ?PROFILE(ep:do()), + case do_open_file(Args, Head#head.parent, Head#head.server,Ref) of + {ok, H2} -> + From ! {self(), ok}, + H2; + Error -> + From ! {self(), Error}, + exit(normal) + end; + may_grow when Head#head.update_mode =/= saved -> + if + Head#head.update_mode =:= dirty -> + %% Won't grow more if the table is full. + {H2, _Res} = + (Head#head.mod):may_grow(Head, 0, many_times), + {N + 1, H2}; + true -> + ok + end; + {set_verbose, What} -> + set_verbose(What), + From ! {self(), ok}, + ok; + {where, Object} -> + {H2, Res} = where_is_object(Head, Object), + From ! {self(), Res}, + H2; + _Message when element(1, Head#head.update_mode) =:= error -> + From ! {self(), status(Head)}, + ok; + %% The following messages assume that the status of the table is OK. + {bchunk_init, Tab} -> + {H2, Res} = do_bchunk_init(Head, Tab), + From ! {self(), Res}, + H2; + {bchunk, State} -> + {H2, Res} = do_bchunk(Head, State), + From ! {self(), Res}, + H2; + delete_all_objects -> + {H2, Res} = fdelete_all_objects(Head), + From ! {self(), Res}, + erlang:garbage_collect(), + {0, H2}; + {delete_key, Keys} when Head#head.update_mode =:= dirty -> + if + Head#head.version =:= 8 -> + {H2, Res} = fdelete_key(Head, Keys), + From ! {self(), Res}, + {N + 1, H2}; + true -> + stream_op(Op, From, [], Head, N) + end; + {delete_object, Objs} when Head#head.update_mode =:= dirty -> + case check_objects(Objs, Head#head.keypos) of + true when Head#head.version =:= 8 -> + {H2, Res} = fdelete_object(Head, Objs), + From ! {self(), Res}, + {N + 1, H2}; + true -> + stream_op(Op, From, [], Head, N); + false -> + From ! {self(), badarg}, + ok + end; + first -> + {H2, Res} = ffirst(Head), + From ! {self(), Res}, + H2; + {initialize, InitFun, Format, MinNoSlots} -> + {H2, Res} = finit(Head, InitFun, Format, MinNoSlots), + From ! {self(), Res}, + erlang:garbage_collect(), + H2; + {insert, Objs} when Head#head.update_mode =:= dirty -> + case check_objects(Objs, Head#head.keypos) of + true when Head#head.version =:= 8 -> + {H2, Res} = finsert(Head, Objs), + From ! {self(), Res}, + {N + 1, H2}; + true -> + stream_op(Op, From, [], Head, N); + false -> + From ! {self(), badarg}, + ok + end; + {insert_new, Objs} when Head#head.update_mode =:= dirty -> + {H2, Res} = finsert_new(Head, Objs), + From ! {self(), Res}, + {N + 1, H2}; + {lookup_keys, Keys} when Head#head.version =:= 8 -> + {H2, Res} = flookup_keys(Head, Keys), + From ! {self(), Res}, + H2; + {lookup_keys, _Keys} -> + stream_op(Op, From, [], Head, N); + {match_init, State} -> + {H2, Res} = fmatch_init(Head, State), + From ! {self(), Res}, + H2; + {match, MP, Spec, NObjs} -> + {H2, Res} = fmatch(Head, MP, Spec, NObjs), + From ! {self(), Res}, + H2; + {member, Key} when Head#head.version =:= 8 -> + {H2, Res} = fmember(Head, Key), + From ! {self(), Res}, + H2; + {member, _Key} = Op -> + stream_op(Op, From, [], Head, N); + {next, Key} -> + {H2, Res} = fnext(Head, Key), + From ! {self(), Res}, + H2; + {match_delete, State} when Head#head.update_mode =:= dirty -> + {H2, Res} = fmatch_delete(Head, State), + From ! {self(), Res}, + {N + 1, H2}; + {match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty -> + {H2, Res} = fmatch_delete_init(Head, MP, Spec), + From ! {self(), Res}, + {N + 1, H2}; + {safe_fixtable, Bool} -> + NewHead = do_safe_fixtable(Head, From, Bool), + From ! {self(), ok}, + NewHead; + {slot, Slot} -> + {H2, Res} = fslot(Head, Slot), + From ! {self(), Res}, + H2; + sync -> + {NewHead, Res} = perform_save(Head, true), + From ! {self(), Res}, + erlang:garbage_collect(), + {0, NewHead}; + {update_counter, Key, Incr} when Head#head.update_mode =:= dirty -> + {NewHead, Res} = do_update_counter(Head, Key, Incr), + From ! {self(), Res}, + {N + 1, NewHead}; + WriteOp when Head#head.update_mode =:= new_dirty -> + H2 = Head#head{update_mode = dirty}, + apply_op(WriteOp, From, H2, 0); + WriteOp when Head#head.access =:= read_write, + Head#head.update_mode =:= saved -> + case catch (Head#head.mod):mark_dirty(Head) of + ok -> + start_auto_save_timer(Head), + H2 = Head#head{update_mode = dirty}, + apply_op(WriteOp, From, H2, 0); + {NewHead, Error} when is_record(NewHead, head) -> + From ! {self(), Error}, + NewHead + end; + WriteOp when is_tuple(WriteOp), Head#head.access =:= read -> + Reason = {access_mode, Head#head.filename}, + From ! {self(), err({error, Reason})}, + ok + end. + +start_auto_save_timer(Head) when Head#head.auto_save =:= infinity -> + ok; +start_auto_save_timer(Head) -> + Millis = Head#head.auto_save, + erlang:send_after(Millis, self(), ?DETS_CALL(self(), auto_save)). + +%% Version 9: Peek the message queue and try to evaluate several +%% lookup requests in parallel. Evalute delete_object, delete and +%% insert as well. +stream_op(Op, Pid, Pids, Head, N) -> + stream_op(Head, Pids, [], N, Pid, Op, Head#head.fixed). + +stream_loop(Head, Pids, C, N, false = Fxd) -> + receive + ?DETS_CALL(From, Message) -> + stream_op(Head, Pids, C, N, From, Message, Fxd) + after 0 -> + stream_end(Head, Pids, C, N, no_more) + end; +stream_loop(Head, Pids, C, N, _Fxd) -> + stream_end(Head, Pids, C, N, no_more). + +stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd) -> + NC = [{{lookup,Pid},Keys} | C], + stream_loop(Head, Pids, NC, N, Fxd); +stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd) -> + NC = [Op | C], + stream_loop(Head, [Pid | Pids], NC, N, Fxd); +stream_op(Head, Pids, C, N, Pid, {insert_new, _Objects} = Op, Fxd) -> + NC = [Op | C], + stream_loop(Head, [Pid | Pids], NC, N, Fxd); +stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd) -> + NC = [Op | C], + stream_loop(Head, [Pid | Pids], NC, N, Fxd); +stream_op(Head, Pids, C, N, Pid, {delete_object, _Objects} = Op, Fxd) -> + NC = [Op | C], + stream_loop(Head, [Pid | Pids], NC, N, Fxd); +stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd) -> + NC = [{{lookup,[Pid]},[Key]} | C], + stream_loop(Head, Pids, NC, N, Fxd); +stream_op(Head, Pids, C, N, Pid, Op, _Fxd) -> + stream_end(Head, Pids, C, N, {Pid,Op}). + +stream_end(Head, Pids0, C, N, Next) -> + case catch update_cache(Head, lists:reverse(C)) of + {Head1, [], PwriteList} -> + stream_end1(Pids0, Next, N, C, Head1, PwriteList); + {Head1, Found, PwriteList} -> + %% Possibly an optimization: reply to lookup requests + %% first, then write stuff. This makes it possible for + %% clients to continue while the disk is accessed. + %% (Replies to lookup requests are sent earlier than + %% replies to delete and insert requests even if the + %% latter requests were made before the lookup requests, + %% which can be confusing.) + lookup_replies(Found), + stream_end1(Pids0, Next, N, C, Head1, PwriteList); + Head1 when is_record(Head1, head) -> + stream_end2(Pids0, Pids0, Next, N, C, Head1, ok); + {Head1, Error} when is_record(Head1, head) -> + %% Dig out the processes that did lookup or member. + Fun = fun({{lookup,[Pid]},_Keys}, L) -> [Pid | L]; + ({{lookup,Pid},_Keys}, L) -> [Pid | L]; + (_, L) -> L + end, + LPs0 = lists:foldl(Fun, [], C), + LPs = lists:usort(lists:flatten(LPs0)), + stream_end2(Pids0 ++ LPs, Pids0, Next, N, C, Head1, Error); + DetsError -> + throw(DetsError) + end. + +stream_end1(Pids, Next, N, C, Head, []) -> + stream_end2(Pids, Pids, Next, N, C, Head, ok); +stream_end1(Pids, Next, N, C, Head, PwriteList) -> + {Head1, PR} = (catch dets_utils:pwrite(Head, PwriteList)), + stream_end2(Pids, Pids, Next, N, C, Head1, PR). + +stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) -> + Pid ! {self(), Reply}, + stream_end2(Pids, Ps, Next, N+1, C, Head, Reply); +stream_end2([], Ps, no_more, N, C, Head, _Reply) -> + penalty(Head, Ps, C), + {N, Head}; +stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) -> + apply_op(Op, From, Head, N). + +penalty(H, _Ps, _C) when H#head.fixed =:= false -> + ok; +penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) -> + ok; +penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) -> + ok; +penalty(_H, _Ps, _C) -> + timer:sleep(1). + +lookup_replies([{P,O}]) -> + lookup_reply(P, O); +lookup_replies(Q) -> + [{P,O} | L] = dets_utils:family(Q), + lookup_replies(P, lists:append(O), L). + +lookup_replies(P, O, []) -> + lookup_reply(P, O); +lookup_replies(P, O, [{P2,O2} | L]) -> + lookup_reply(P, O), + lookup_replies(P2, lists:append(O2), L). + +%% If a list of Pid then op was {member, Key}. Inlined. +lookup_reply([P], O) -> + P ! {self(), O =/= []}; +lookup_reply(P, O) -> + P ! {self(), O}. + +%%----------------------------------------------------------------- +%% Callback functions for system messages handling. +%%----------------------------------------------------------------- +system_continue(_Parent, _, Head) -> + open_file_loop(Head). + +system_terminate(Reason, _Parent, _, Head) -> + _NewHead = do_stop(Head), + exit(Reason). + +%%----------------------------------------------------------------- +%% Code for upgrade. +%%----------------------------------------------------------------- +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +constants(FH, FileName) -> + Version = FH#fileheader.version, + if + Version =< 8 -> + dets_v8:constants(); + Version =:= 9 -> + dets_v9:constants(); + true -> + throw({error, {not_a_dets_file, FileName}}) + end. + +%% -> {ok, Fd, fileheader()} | throw(Error) +read_file_header(FileName, Access, RamFile) -> + BF = if + RamFile -> + case file:read_file(FileName) of + {ok, B} -> B; + Err -> dets_utils:file_error(FileName, Err) + end; + true -> + FileName + end, + {ok, Fd} = dets_utils:open(BF, open_args(Access, RamFile)), + {ok, <<Version:32>>} = + dets_utils:pread_close(Fd, FileName, ?FILE_FORMAT_VERSION_POS, 4), + if + Version =< 8 -> + dets_v8:read_file_header(Fd, FileName); + Version =:= 9 -> + dets_v9:read_file_header(Fd, FileName); + true -> + throw({error, {not_a_dets_file, FileName}}) + end. + +fclose(Head) -> + {Head1, Res} = perform_save(Head, false), + case Head1#head.ram_file of + true -> + ignore; + false -> + dets_utils:stop_disk_map(), + file:close(Head1#head.fptr) + end, + Res. + +%% -> {NewHead, Res} +perform_save(Head, DoSync) when Head#head.update_mode =:= dirty; + Head#head.update_mode =:= new_dirty -> + case catch begin + {Head1, []} = write_cache(Head), + {Head2, ok} = (Head1#head.mod):do_perform_save(Head1), + ok = ensure_written(Head2, DoSync), + {Head2#head{update_mode = saved}, ok} + end of + {NewHead, _} = Reply when is_record(NewHead, head) -> + Reply + end; +perform_save(Head, _DoSync) -> + {Head, status(Head)}. + +ensure_written(Head, DoSync) when Head#head.ram_file -> + {ok, EOF} = dets_utils:position(Head, eof), + {ok, Bin} = dets_utils:pread(Head, 0, EOF, 0), + if + DoSync -> + dets_utils:write_file(Head, Bin); + not DoSync -> + case file:write_file(Head#head.filename, Bin) of + ok -> + ok; + Error -> + dets_utils:corrupt_file(Head, Error) + end + end; +ensure_written(Head, true) when not Head#head.ram_file -> + dets_utils:sync(Head); +ensure_written(Head, false) when not Head#head.ram_file -> + ok. + +%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} +do_bchunk_init(Head, Tab) -> + case catch write_cache(Head) of + {H2, []} -> + case (H2#head.mod):table_parameters(H2) of + undefined -> + {H2, {error, old_version}}; + Parms -> + L = dets_utils:all_allocated(H2), + C0 = #dets_cont{no_objs = default, bin = <<>>, alloc = L}, + BinParms = term_to_binary(Parms), + {H2, {C0#dets_cont{tab = Tab, what = bchunk}, [BinParms]}} + end; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end. + +%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} +do_bchunk(Head, State) -> + case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of + {error, Reason} -> + dets_utils:corrupt_reason(Head, Reason); + {finished, Bins} -> + {Head, {State#dets_cont{bin = eof}, Bins}}; + {Bins, NewL} -> + {Head, {State#dets_cont{alloc = NewL}, Bins}} + end. + +%% -> {NewHead, Result} +fdelete_all_objects(Head) when Head#head.fixed =:= false -> + case catch do_delete_all_objects(Head) of + {ok, NewHead} -> + start_auto_save_timer(NewHead), + {NewHead, ok}; + {error, Reason} -> + dets_utils:corrupt_reason(Head, Reason) + end; +fdelete_all_objects(Head) -> + {Head, fixed}. + +do_delete_all_objects(Head) -> + #head{fptr = Fd, name = Tab, filename = Fname, type = Type, keypos = Kp, + ram_file = Ram, auto_save = Auto, min_no_slots = MinSlots, + max_no_slots = MaxSlots, cache = Cache} = Head, + CacheSz = dets_utils:cache_size(Cache), + ok = dets_utils:truncate(Fd, Fname, bof), + (Head#head.mod):initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, + Ram, CacheSz, Auto, true). + +%% -> {NewHead, Reply}, Reply = ok | Error. +fdelete_key(Head, Keys) -> + do_delete(Head, Keys, delete_key). + +%% -> {NewHead, Reply}, Reply = ok | badarg | Error. +fdelete_object(Head, Objects) -> + do_delete(Head, Objects, delete_object). + +ffirst(H) -> + Ref = make_ref(), + case catch {Ref, ffirst1(H)} of + {Ref, {NH, R}} -> + {NH, {ok, R}}; + {NH, R} when is_record(NH, head) -> + {NH, {error, R}} + end. + +ffirst1(H) -> + check_safe_fixtable(H), + {NH, []} = write_cache(H), + ffirst(NH, 0). + +ffirst(H, Slot) -> + case (H#head.mod):slot_objs(H, Slot) of + '$end_of_table' -> {H, '$end_of_table'}; + [] -> ffirst(H, Slot+1); + [X|_] -> {H, element(H#head.keypos, X)} + end. + +%% -> {NewHead, Reply}, Reply = ok | badarg | Error. +finsert(Head, Objects) -> + case catch update_cache(Head, Objects, insert) of + {NewHead, []} -> + {NewHead, ok}; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end. + +%% -> {NewHead, Reply}, Reply = ok | badarg | Error. +finsert_new(Head, Objects) -> + KeyPos = Head#head.keypos, + case catch lists:map(fun(Obj) -> element(KeyPos, Obj) end, Objects) of + Keys when is_list(Keys) -> + case catch update_cache(Head, Keys, {lookup, nopid}) of + {Head1, PidObjs} when is_list(PidObjs) -> + case lists:all(fun({_P,OL}) -> OL =:= [] end, PidObjs) of + true -> + case catch update_cache(Head1, Objects, insert) of + {NewHead, []} -> + {NewHead, true}; + {NewHead, Error} when is_record(NewHead, head) -> + {NewHead, Error} + end; + false=Reply -> + {Head1, Reply} + end; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end; + _ -> + {Head, badarg} + end. + +do_safe_fixtable(Head, Pid, true) -> + case Head#head.fixed of + false -> + link(Pid), + Fixed = {erlang:now(), [{Pid, 1}]}, + Ftab = dets_utils:get_freelists(Head), + Head#head{fixed = Fixed, freelists = {Ftab, Ftab}}; + {TimeStamp, Counters} -> + case lists:keysearch(Pid, 1, Counters) of + {value, {Pid, Counter}} -> % when Counter > 1 + NewCounters = lists:keyreplace(Pid, 1, Counters, + {Pid, Counter+1}), + Head#head{fixed = {TimeStamp, NewCounters}}; + false -> + link(Pid), + Fixed = {TimeStamp, [{Pid, 1} | Counters]}, + Head#head{fixed = Fixed} + end + end; +do_safe_fixtable(Head, Pid, false) -> + remove_fix(Head, Pid, false). + +remove_fix(Head, Pid, How) -> + case Head#head.fixed of + false -> + Head; + {TimeStamp, Counters} -> + case lists:keysearch(Pid, 1, Counters) of + %% How =:= close when Pid closes the table. + {value, {Pid, Counter}} when Counter =:= 1; How =:= close -> + unlink(Pid), + case lists:keydelete(Pid, 1, Counters) of + [] -> + check_growth(Head), + erlang:garbage_collect(), + Head#head{fixed = false, + freelists = dets_utils:get_freelists(Head)}; + NewCounters -> + Head#head{fixed = {TimeStamp, NewCounters}} + end; + {value, {Pid, Counter}} -> + NewCounters = lists:keyreplace(Pid, 1, Counters, + {Pid, Counter-1}), + Head#head{fixed = {TimeStamp, NewCounters}}; + false -> + Head + end + end. + +do_stop(Head) -> + unlink_fixing_procs(Head), + fclose(Head). + +unlink_fixing_procs(Head) -> + case Head#head.fixed of + false -> + Head; + {_, Counters} -> + lists:map(fun({Pid, _Counter}) -> unlink(Pid) end, Counters), + Head#head{fixed = false, + freelists = dets_utils:get_freelists(Head)} + end. + +check_growth(#head{access = read}) -> + ok; +check_growth(Head) -> + NoThings = no_things(Head), + if + NoThings > Head#head.next -> + erlang:send_after(200, self(), + ?DETS_CALL(self(), may_grow)); % Catch up. + true -> + ok + end. + +finfo(H) -> + case catch write_cache(H) of + {H2, []} -> + Info = (catch [{type, H2#head.type}, + {keypos, H2#head.keypos}, + {size, H2#head.no_objects}, + {file_size, + file_size(H2#head.fptr, H2#head.filename)}, + {filename, H2#head.filename}]), + {H2, Info}; + {H2, _} = HeadError when is_record(H2, head) -> + HeadError + end. + +finfo(H, access) -> {H, H#head.access}; +finfo(H, auto_save) -> {H, H#head.auto_save}; +finfo(H, bchunk_format) -> + case catch write_cache(H) of + {H2, []} -> + case (H2#head.mod):table_parameters(H2) of + undefined = Undef -> + {H2, Undef}; + Parms -> + {H2, term_to_binary(Parms)} + end; + {H2, _} = HeadError when is_record(H2, head) -> + HeadError + end; +finfo(H, delayed_write) -> % undocumented + {H, dets_utils:cache_size(H#head.cache)}; +finfo(H, filename) -> {H, H#head.filename}; +finfo(H, file_size) -> + case catch write_cache(H) of + {H2, []} -> + {H2, catch file_size(H#head.fptr, H#head.filename)}; + {H2, _} = HeadError when is_record(H2, head) -> + HeadError + end; +finfo(H, fixed) -> + %% true if fixtable/2 has been called + {H, not (H#head.fixed =:= false)}; +finfo(H, hash) -> {H, H#head.hash_bif}; +finfo(H, keypos) -> {H, H#head.keypos}; +finfo(H, memory) -> finfo(H, file_size); +finfo(H, no_objects) -> finfo(H, size); +finfo(H, no_keys) -> + case catch write_cache(H) of + {H2, []} -> + {H2, H2#head.no_keys}; + {H2, _} = HeadError when is_record(H2, head) -> + HeadError + end; +finfo(H, no_slots) -> {H, (H#head.mod):no_slots(H)}; +finfo(H, pid) -> {H, self()}; +finfo(H, ram_file) -> {H, H#head.ram_file}; +finfo(H, safe_fixed) -> {H, H#head.fixed}; +finfo(H, size) -> + case catch write_cache(H) of + {H2, []} -> + {H2, H2#head.no_objects}; + {H2, _} = HeadError when is_record(H2, head) -> + HeadError + end; +finfo(H, type) -> {H, H#head.type}; +finfo(H, version) -> {H, H#head.version}; +finfo(H, _) -> {H, undefined}. + +file_size(Fd, FileName) -> + {ok, Pos} = dets_utils:position(Fd, FileName, eof), + Pos. + +test_bchunk_format(_Head, undefined) -> + false; +test_bchunk_format(Head, _Term) when Head#head.version =:= 8 -> + false; +test_bchunk_format(Head, Term) -> + dets_v9:try_bchunk_header(Term, Head) =/= not_ok. + +do_open_file([Fname, Verbose], Parent, Server, Ref) -> + case catch fopen2(Fname, Ref) of + {error, _Reason} = Error -> + err(Error); + {ok, Head} -> + maybe_put(verbose, Verbose), + {ok, Head#head{parent = Parent, server = Server}}; + {'EXIT', _Reason} = Error -> + Error; + Bad -> + error_logger:format + ("** dets: Bug was found in open_file/1, reply was ~w.~n", + [Bad]), + {error, {dets_bug, Fname, Bad}} + end; +do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref) -> + case catch fopen3(Tab, OpenArgs) of + {error, {tooshort, _}} -> + file:delete(OpenArgs#open_args.file), + do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref); + {error, _Reason} = Error -> + err(Error); + {ok, Head} -> + maybe_put(verbose, Verb), + {ok, Head#head{parent = Parent, server = Server}}; + {'EXIT', _Reason} = Error -> + Error; + Bad -> + error_logger:format + ("** dets: Bug was found in open_file/2, arguments were~n" + "** dets: ~w and reply was ~w.~n", + [OpenArgs, Bad]), + {error, {dets_bug, Tab, {open_file, OpenArgs}, Bad}} + end. + +maybe_put(_, undefined) -> + ignore; +maybe_put(K, V) -> + put(K, V). + +%% -> {Head, Result}, Result = ok | Error | {thrown, Error} | badarg +finit(Head, InitFun, _Format, _NoSlots) when Head#head.access =:= read -> + _ = (catch InitFun(close)), + {Head, {error, {access_mode, Head#head.filename}}}; +finit(Head, InitFun, _Format, _NoSlots) when Head#head.fixed =/= false -> + _ = (catch InitFun(close)), + {Head, {error, {fixed_table, Head#head.name}}}; +finit(Head, InitFun, Format, NoSlots) -> + case catch do_finit(Head, InitFun, Format, NoSlots) of + {ok, NewHead} -> + check_growth(NewHead), + start_auto_save_timer(NewHead), + {NewHead, ok}; + badarg -> + {Head, badarg}; + Error -> + dets_utils:corrupt(Head, Error) + end. + +%% -> {ok, NewHead} | throw(badarg) | throw(Error) +do_finit(Head, Init, Format, NoSlots) -> + #head{fptr = Fd, type = Type, keypos = Kp, auto_save = Auto, + cache = Cache, filename = Fname, ram_file = Ram, + min_no_slots = MinSlots0, max_no_slots = MaxSlots, + name = Tab, update_mode = UpdateMode, mod = HMod} = Head, + CacheSz = dets_utils:cache_size(Cache), + {How, Head1} = + case Format of + term when is_integer(NoSlots), NoSlots > MaxSlots -> + throw(badarg); + term -> + MinSlots = choose_no_slots(NoSlots, MinSlots0), + if + UpdateMode =:= new_dirty, MinSlots =:= MinSlots0 -> + {general_init, Head}; + true -> + ok = dets_utils:truncate(Fd, Fname, bof), + {ok, H} = HMod:initiate_file(Fd, Tab, Fname, Type, Kp, + MinSlots, MaxSlots, Ram, + CacheSz, Auto, false), + {general_init, H} + end; + bchunk -> + ok = dets_utils:truncate(Fd, Fname, bof), + {bchunk_init, Head} + end, + case How of + bchunk_init -> + case HMod:bchunk_init(Head1, Init) of + {ok, NewHead} -> + {ok, NewHead#head{update_mode = dirty}}; + Error -> + Error + end; + general_init -> + Cntrs = ets:new(dets_init, []), + Input = HMod:bulk_input(Head1, Init, Cntrs), + SlotNumbers = {Head1#head.min_no_slots, bulk_init, MaxSlots}, + {Reply, SizeData} = + do_sort(Head1, SlotNumbers, Input, Cntrs, Fname, not_used), + Bulk = true, + case Reply of + {ok, NoDups, H1} -> + fsck_copy(SizeData, H1, Bulk, NoDups); + Else -> + close_files(Bulk, SizeData, Head1), + Else + end + end. + +%% -> {NewHead, [LookedUpObject]} | {NewHead, Error} +flookup_keys(Head, Keys) -> + case catch update_cache(Head, Keys, {lookup, nopid}) of + {NewHead, [{_NoPid,Objs}]} -> + {NewHead, Objs}; + {NewHead, L} when is_list(L) -> + {NewHead, lists:flatmap(fun({_Pid,OL}) -> OL end, L)}; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end. + +%% -> {NewHead, Result} +fmatch_init(Head, C) -> + case scan(Head, C) of + {scan_error, Reason} -> + dets_utils:corrupt_reason(Head, Reason); + {Ts, NC} -> + {Head, {cont, {Ts, NC}}} + end. + +%% -> {NewHead, Result} +fmatch(Head, MP, Spec, N) -> + KeyPos = Head#head.keypos, + case find_all_keys(Spec, KeyPos, []) of + [] -> + %% Complete match + case catch write_cache(Head) of + {NewHead, []} -> + C0 = init_scan(NewHead, N), + {NewHead, {cont, C0#dets_cont{match_program = MP}}}; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end; + List -> + Keys = lists:usort(List), + {NewHead, Reply} = flookup_keys(Head, Keys), + case Reply of + Objs when is_list(Objs) -> + MatchingObjs = ets:match_spec_run(Objs, MP), + {NewHead, {done, MatchingObjs}}; + Error -> + {NewHead, Error} + end + end. + +find_all_keys([], _, Ks) -> + Ks; +find_all_keys([{H,_,_} | T], KeyPos, Ks) when is_tuple(H) -> + case tuple_size(H) of + Enough when Enough >= KeyPos -> + Key = element(KeyPos, H), + case contains_variable(Key) of + true -> + []; + false -> + find_all_keys(T, KeyPos, [Key | Ks]) + end; + _ -> + find_all_keys(T, KeyPos, Ks) + end; +find_all_keys(_, _, _) -> + []. + +contains_variable('_') -> + true; +contains_variable(A) when is_atom(A) -> + case atom_to_list(A) of + [$$ | T] -> + case (catch list_to_integer(T)) of + {'EXIT', _} -> + false; + _ -> + true + end; + _ -> + false + end; +contains_variable(T) when is_tuple(T) -> + contains_variable(tuple_to_list(T)); +contains_variable([]) -> + false; +contains_variable([H|T]) -> + case contains_variable(H) of + true -> + true; + false -> + contains_variable(T) + end; +contains_variable(_) -> + false. + +%% -> {NewHead, Res} +fmatch_delete_init(Head, MP, Spec) -> + KeyPos = Head#head.keypos, + case catch + case find_all_keys(Spec, KeyPos, []) of + [] -> + do_fmatch_delete_var_keys(Head, MP, Spec); + List -> + Keys = lists:usort(List), + do_fmatch_constant_keys(Head, Keys, MP) + end of + {NewHead, _} = Reply when is_record(NewHead, head) -> + Reply + end. + +%% A note: If deleted objects reside in a bucket with other objects +%% that are not deleted, the bucket is moved. If the address of the +%% moved bucket is greater than original bucket address the kept +%% objects will be read once again later on. +%% -> {NewHead, Res} +fmatch_delete(Head, C) -> + case scan(Head, C) of + {scan_error, Reason} -> + dets_utils:corrupt_reason(Head, Reason); + {[], _} -> + {Head, {done, 0}}; + {RTs, NC} -> + MP = C#dets_cont.match_program, + case catch filter_binary_terms(RTs, MP, []) of + {'EXIT', _} -> + Bad = dets_utils:bad_object(fmatch_delete, RTs), + dets_utils:corrupt_reason(Head, Bad); + Terms -> + do_fmatch_delete(Head, Terms, NC) + end + end. + +do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_')) + when Head#head.fixed =:= false -> + %% Handle the case where the file is emptied efficiently. + %% Empty the cache just to get the number of objects right. + {Head1, []} = write_cache(Head), + N = Head1#head.no_objects, + case fdelete_all_objects(Head1) of + {NewHead, ok} -> + {NewHead, {done, N}}; + Reply -> + Reply + end; +do_fmatch_delete_var_keys(Head, MP, _Spec) -> + {NewHead, []} = write_cache(Head), + C0 = init_scan(NewHead, default), + {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}. + +do_fmatch_constant_keys(Head, Keys, MP) -> + case flookup_keys(Head, Keys) of + {NewHead, ReadTerms} when is_list(ReadTerms) -> + Terms = filter_terms(ReadTerms, MP, []), + do_fmatch_delete(NewHead, Terms, fixed); + Reply -> + Reply + end. + +filter_binary_terms([Bin | Bins], MP, L) -> + Term = binary_to_term(Bin), + case ets:match_spec_run([Term], MP) of + [true] -> + filter_binary_terms(Bins, MP, [Term | L]); + _ -> + filter_binary_terms(Bins, MP, L) + end; +filter_binary_terms([], _MP, L) -> + L. + +filter_terms([Term | Terms], MP, L) -> + case ets:match_spec_run([Term], MP) of + [true] -> + filter_terms(Terms, MP, [Term | L]); + _ -> + filter_terms(Terms, MP, L) + end; +filter_terms([], _MP, L) -> + L. + +do_fmatch_delete(Head, Terms, What) -> + N = length(Terms), + case do_delete(Head, Terms, delete_object) of + {NewHead, ok} when What =:= fixed -> + {NewHead, {done, N}}; + {NewHead, ok} -> + {NewHead, {cont, What, N}}; + Reply -> + Reply + end. + +do_delete(Head, Things, What) -> + case catch update_cache(Head, Things, What) of + {NewHead, []} -> + {NewHead, ok}; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end. + +fmember(Head, Key) -> + case catch begin + {Head2, [{_NoPid,Objs}]} = + update_cache(Head, [Key], {lookup, nopid}), + {Head2, Objs =/= []} + end of + {NewHead, _} = Reply when is_record(NewHead, head) -> + Reply + end. + +fnext(Head, Key) -> + Slot = (Head#head.mod):db_hash(Key, Head), + Ref = make_ref(), + case catch {Ref, fnext(Head, Key, Slot)} of + {Ref, {H, R}} -> + {H, {ok, R}}; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end. + +fnext(H, Key, Slot) -> + {NH, []} = write_cache(H), + case (H#head.mod):slot_objs(NH, Slot) of + '$end_of_table' -> {NH, '$end_of_table'}; + L -> fnext_search(NH, Key, Slot, L) + end. + +fnext_search(H, K, Slot, L) -> + Kp = H#head.keypos, + case beyond_key(K, Kp, L) of + [] -> fnext_slot(H, K, Slot+1); + L2 -> {H, element(H#head.keypos, hd(L2))} + end. + +%% We've got to continue to search for the next key in the next slot +fnext_slot(H, K, Slot) -> + case (H#head.mod):slot_objs(H, Slot) of + '$end_of_table' -> {H, '$end_of_table'}; + [] -> fnext_slot(H, K, Slot+1); + L -> {H, element(H#head.keypos, hd(L))} + end. + +beyond_key(_K, _Kp, []) -> []; +beyond_key(K, Kp, [H|T]) -> + case dets_utils:cmp(element(Kp, H), K) of + 0 -> beyond_key2(K, Kp, T); + _ -> beyond_key(K, Kp, T) + end. + +beyond_key2(_K, _Kp, []) -> []; +beyond_key2(K, Kp, [H|T]=L) -> + case dets_utils:cmp(element(Kp, H), K) of + 0 -> beyond_key2(K, Kp, T); + _ -> L + end. + +%% Open an already existing file, no arguments +%% -> {ok, head()} | throw(Error) +fopen2(Fname, Tab) -> + case file:read_file_info(Fname) of + {ok, _} -> + Acc = read_write, + Ram = false, + %% Fd is not always closed upon error, but exit is soon called. + {ok, Fd, FH} = read_file_header(Fname, Acc, Ram), + Mod = FH#fileheader.mod, + case Mod:check_file_header(FH, Fd) of + {error, not_closed} -> + io:format(user,"dets: file ~p not properly closed, " + "repairing ...~n", [Fname]), + Version = default, + case fsck(Fd, Tab, Fname, FH, default, default, Version) of + ok -> + fopen2(Fname, Tab); + Error -> + throw(Error) + end; + {ok, Head, ExtraInfo} -> + open_final(Head, Fname, Acc, Ram, ?DEFAULT_CACHE, + Tab, ExtraInfo, false); + {error, Reason} -> + throw({error, {Reason, Fname}}) + end; + Error -> + dets_utils:file_error(Fname, Error) + end. + +%% Open and possibly create and initialize a file +%% -> {ok, head()} | throw(Error) +fopen3(Tab, OpenArgs) -> + FileName = OpenArgs#open_args.file, + case file:read_file_info(FileName) of + {ok, _} -> + fopen_existing_file(Tab, OpenArgs); + Error when OpenArgs#open_args.access =:= read -> + dets_utils:file_error(FileName, Error); + _Error -> + fopen_init_file(Tab, OpenArgs) + end. + +fopen_existing_file(Tab, OpenArgs) -> + #open_args{file = Fname, type = Type, keypos = Kp, repair = Rep, + min_no_slots = MinSlots, max_no_slots = MaxSlots, + ram_file = Ram, delayed_write = CacheSz, auto_save = + Auto, access = Acc, version = Version, debug = Debug} = + OpenArgs, + %% Fd is not always closed upon error, but exit is soon called. + {ok, Fd, FH} = read_file_header(Fname, Acc, Ram), + V9 = (Version =:= 9) or (Version =:= default), + MinF = (MinSlots =:= default) or (MinSlots =:= FH#fileheader.min_no_slots), + MaxF = (MaxSlots =:= default) or (MaxSlots =:= FH#fileheader.max_no_slots), + Do = case (FH#fileheader.mod):check_file_header(FH, Fd) of + {ok, Head, true} when Rep =:= force, Acc =:= read_write, + FH#fileheader.version =:= 9, + FH#fileheader.no_colls =/= undefined, + MinF, MaxF, V9 -> + {compact, Head}; + {ok, _Head, _Extra} when Rep =:= force, Acc =:= read -> + throw({error, {access_mode, Fname}}); + {ok, Head, need_compacting} when Acc =:= read -> + {final, Head, true}; % Version 8 only. + {ok, _Head, need_compacting} when Rep =:= true -> + %% The file needs to be compacted due to a very big + %% and fragmented free_list. Version 8 only. + M = " is now compacted ...", + {repair, M}; + {ok, _Head, _Extra} when Rep =:= force -> + M = ", repair forced.", + {repair, M}; + {ok, Head, ExtraInfo} -> + {final, Head, ExtraInfo}; + {error, not_closed} when Rep =:= force, Acc =:= read_write -> + M = ", repair forced.", + {repair, M}; + {error, not_closed} when Rep =:= true, Acc =:= read_write -> + M = " not properly closed, repairing ...", + {repair, M}; + {error, not_closed} when Rep =:= false -> + throw({error, {needs_repair, Fname}}); + {error, version_bump} when Rep =:= true, Acc =:= read_write -> + %% Version 8 only + M = " old version, upgrading ...", + {repair, M}; + {error, Reason} -> + throw({error, {Reason, Fname}}) + end, + case Do of + _ when FH#fileheader.type =/= Type -> + throw({error, {type_mismatch, Fname}}); + _ when FH#fileheader.keypos =/= Kp -> + throw({error, {keypos_mismatch, Fname}}); + {compact, SourceHead} -> + io:format(user, "dets: file ~p is now compacted ...~n", [Fname]), + {ok, NewSourceHead} = open_final(SourceHead, Fname, read, false, + ?DEFAULT_CACHE, Tab, true, + Debug), + case catch compact(NewSourceHead) of + ok -> + erlang:garbage_collect(), + fopen3(Tab, OpenArgs#open_args{repair = false}); + _Err -> + _ = file:close(Fd), + dets_utils:stop_disk_map(), + io:format(user, "dets: compaction of file ~p failed, " + "now repairing ...~n", [Fname]), + {ok, Fd2, _FH} = read_file_header(Fname, Acc, Ram), + do_repair(Fd2, Tab, Fname, FH, MinSlots, MaxSlots, + Version, OpenArgs) + end; + {repair, Mess} -> + io:format(user, "dets: file ~p~s~n", [Fname, Mess]), + do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, + Version, OpenArgs); + _ when FH#fileheader.version =/= Version, Version =/= default -> + throw({error, {version_mismatch, Fname}}); + {final, H, EI} -> + H1 = H#head{auto_save = Auto}, + open_final(H1, Fname, Acc, Ram, CacheSz, Tab, EI, Debug) + end. + +do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version, OpenArgs) -> + case fsck(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version) of + ok -> + %% No need to update 'version'. + erlang:garbage_collect(), + fopen3(Tab, OpenArgs#open_args{repair = false}); + Error -> + throw(Error) + end. + +%% -> {ok, head()} | throw(Error) +open_final(Head, Fname, Acc, Ram, CacheSz, Tab, ExtraInfo, Debug) -> + Head1 = Head#head{access = Acc, + ram_file = Ram, + filename = Fname, + name = Tab, + cache = dets_utils:new_cache(CacheSz)}, + init_disk_map(Head1#head.version, Tab, Debug), + Mod = Head#head.mod, + Mod:cache_segps(Head1#head.fptr, Fname, Head1#head.next), + Ftab = Mod:init_freelist(Head1, ExtraInfo), + check_growth(Head1), + NewHead = Head1#head{freelists = Ftab}, + {ok, NewHead}. + +%% -> {ok, head()} | throw(Error) +fopen_init_file(Tab, OpenArgs) -> + #open_args{file = Fname, type = Type, keypos = Kp, + min_no_slots = MinSlotsArg, max_no_slots = MaxSlotsArg, + ram_file = Ram, delayed_write = CacheSz, auto_save = Auto, + version = UseVersion, debug = Debug} = OpenArgs, + MinSlots = choose_no_slots(MinSlotsArg, ?DEFAULT_MIN_NO_SLOTS), + MaxSlots = choose_no_slots(MaxSlotsArg, ?DEFAULT_MAX_NO_SLOTS), + FileSpec = if + Ram -> []; + true -> Fname + end, + {ok, Fd} = dets_utils:open(FileSpec, open_args(read_write, Ram)), + Version = if + UseVersion =:= default -> + case os:getenv("DETS_USE_FILE_FORMAT") of + "8" -> 8; + _ -> 9 + end; + true -> + UseVersion + end, + Mod = version2module(Version), + %% No need to truncate an empty file. + init_disk_map(Version, Tab, Debug), + case catch Mod:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, + Ram, CacheSz, Auto, true) of + {error, Reason} when Ram -> + file:close(Fd), + throw({error, Reason}); + {error, Reason} -> + file:close(Fd), + file:delete(Fname), + throw({error, Reason}); + {ok, Head} -> + start_auto_save_timer(Head), + %% init_table does not need to truncate and write header + {ok, Head#head{update_mode = new_dirty}} + end. + +%% Debug. +init_disk_map(9, Name, Debug) -> + case Debug orelse dets_utils:debug_mode() of + true -> + dets_utils:init_disk_map(Name); + false -> + ok + end; +init_disk_map(_Version, _Name, _Debug) -> + ok. + +open_args(Access, RamFile) -> + A1 = case Access of + read -> []; + read_write -> [write] + end, + A2 = case RamFile of + true -> [ram]; + false -> [raw] + end, + A1 ++ A2 ++ [binary, read]. + +version2module(V) when V =< 8 -> dets_v8; +version2module(9) -> dets_v9. + +module2version(dets_v8) -> 8; +module2version(dets_v9) -> 9; +module2version(not_used) -> 9. + +%% -> ok | throw(Error) +%% For version 9 tables only. +compact(SourceHead) -> + #head{name = Tab, filename = Fname, fptr = SFd, type = Type, keypos = Kp, + ram_file = Ram, auto_save = Auto} = SourceHead, + Tmp = tempfile(Fname), + TblParms = dets_v9:table_parameters(SourceHead), + {ok, Fd} = dets_utils:open(Tmp, open_args(read_write, false)), + CacheSz = ?DEFAULT_CACHE, + %% It is normally not possible to have two open tables in the same + %% process since the process dictionary is used for caching + %% segment pointers, but here is works anyway--when reading a file + %% serially the pointers to not need to be used. + Head = case catch dets_v9:prep_table_copy(Fd, Tab, Tmp, Type, Kp, Ram, + CacheSz, Auto, TblParms) of + {ok, H} -> + H; + Error -> + file:close(Fd), + file:delete(Tmp), + throw(Error) + end, + + case dets_v9:compact_init(SourceHead, Head, TblParms) of + {ok, NewHead} -> + R = case fclose(NewHead) of + ok -> + ok = file:close(SFd), + %% Save (rename) Fname first? + dets_utils:rename(Tmp, Fname); + E -> + E + end, + if + R =:= ok -> ok; + true -> + file:delete(Tmp), + throw(R) + end; + Err -> + file:close(Fd), + file:delete(Tmp), + throw(Err) + end. + +%% -> ok | Error +%% Closes Fd. +fsck(Fd, Tab, Fname, FH, MinSlotsArg, MaxSlotsArg, Version) -> + %% MinSlots and MaxSlots are the option values. + #fileheader{min_no_slots = MinSlotsFile, + max_no_slots = MaxSlotsFile} = FH, + EstNoSlots0 = file_no_things(FH), + MinSlots = choose_no_slots(MinSlotsArg, MinSlotsFile), + MaxSlots = choose_no_slots(MaxSlotsArg, MaxSlotsFile), + EstNoSlots = erlang:min(MaxSlots, erlang:max(MinSlots, EstNoSlots0)), + SlotNumbers = {MinSlots, EstNoSlots, MaxSlots}, + %% When repairing: We first try and sort on slots using MinSlots. + %% If the number of objects (keys) turns out to be significantly + %% different from NoSlots, we try again with the correct number of + %% objects (keys). + case fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) of + {try_again, BetterNoSlots} -> + BetterSlotNumbers = {MinSlots, BetterNoSlots, MaxSlots}, + case fsck_try(Fd, Tab, FH, Fname, BetterSlotNumbers, Version) of + {try_again, _} -> + file:close(Fd), + {error, {cannot_repair, Fname}}; + Else -> + Else + end; + Else -> + Else + end. + +choose_no_slots(default, NoSlots) -> NoSlots; +choose_no_slots(NoSlots, _) -> NoSlots. + +%% -> ok | {try_again, integer()} | Error +%% Closes Fd unless {try_again, _} is returned. +%% Initiating a table using a fun and repairing (or converting) a +%% file are completely different things, but nevertheless the same +%% method is used in both cases... +fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) -> + Tmp = tempfile(Fname), + #fileheader{type = Type, keypos = KeyPos} = FH, + {_MinSlots, EstNoSlots, MaxSlots} = SlotNumbers, + OpenArgs = #open_args{file = Tmp, type = Type, keypos = KeyPos, + repair = false, min_no_slots = EstNoSlots, + max_no_slots = MaxSlots, + ram_file = false, delayed_write = ?DEFAULT_CACHE, + auto_save = infinity, access = read_write, + version = Version, debug = false}, + case catch fopen3(Tab, OpenArgs) of + {ok, Head} -> + case fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) of + {ok, NewHead} -> + R = case fclose(NewHead) of + ok -> + %% Save (rename) Fname first? + dets_utils:rename(Tmp, Fname); + Error -> + Error + end, + if + R =:= ok -> ok; + true -> + file:delete(Tmp), + R + end; + TryAgainOrError -> + file:delete(Tmp), + TryAgainOrError + end; + Error -> + file:close(Fd), + Error + end. + +tempfile(Fname) -> + Tmp = lists:concat([Fname, ".TMP"]), + case file:delete(Tmp) of + {error, eacces} -> % 'dets_process_died' happened anyway... (W-nd-ws) + timer:sleep(5000), + file:delete(Tmp); + _ -> + ok + end, + Tmp. + +%% -> {ok, NewHead} | {try_again, integer()} | Error +fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) -> + %% Mod is the module to use for reading input when repairing. + Mod = FH#fileheader.mod, + Cntrs = ets:new(dets_repair, []), + Input = Mod:fsck_input(Head, Fd, Cntrs, FH), + {Reply, SizeData} = do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod), + Bulk = false, + case Reply of + {ok, NoDups, H1} -> + file:close(Fd), + fsck_copy(SizeData, H1, Bulk, NoDups); + {try_again, _} = Return -> + close_files(Bulk, SizeData, Head), + Return; + Else -> + file:close(Fd), + close_files(Bulk, SizeData, Head), + Else + end. + +do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod) -> + OldV = module2version(Mod), + %% output_objs/4 replaces {LogSize,NoObjects} in Cntrs by + %% {LogSize,Position,Data,NoObjects | NoCollections}. + %% Data = {FileName,FileDescriptor} | [object()] + %% For small tables Data may be a list of objects which is more + %% efficient since no temporary files are created. + Output = (Head#head.mod):output_objs(OldV, Head, SlotNumbers, Cntrs), + TmpDir = filename:dirname(Fname), + Reply = (catch file_sorter:sort(Input, Output, + [{format, binary},{tmpdir, TmpDir}])), + L = ets:tab2list(Cntrs), + ets:delete(Cntrs), + {Reply, lists:reverse(lists:keysort(1, L))}. + +fsck_copy([{_LogSz, Pos, Bins, _NoObjects} | SizeData], Head, _Bulk, NoDups) + when is_list(Bins) -> + true = NoDups =:= 0, + PWs = [{Pos,Bins} | lists:map(fun({_, P, B, _}) -> {P, B} end, SizeData)], + #head{fptr = Fd, filename = FileName} = Head, + dets_utils:pwrite(Fd, FileName, PWs), + {ok, Head#head{update_mode = dirty}}; +fsck_copy(SizeData, Head, Bulk, NoDups) -> + catch fsck_copy1(SizeData, Head, Bulk, NoDups). + +fsck_copy1([SzData | L], Head, Bulk, NoDups) -> + Out = Head#head.fptr, + {LogSz, Pos, {FileName, Fd}, NoObjects} = SzData, + Size = if NoObjects =:= 0 -> 0; true -> ?POW(LogSz-1) end, + ExpectedSize = Size * NoObjects, + close_tmp(Fd), + case file:position(Out, Pos) of + {ok, Pos} -> ok; + PError -> dets_utils:file_error(FileName, PError) + end, + {ok, Pos} = file:position(Out, Pos), + CR = file:copy({FileName, [raw,binary]}, Out), + file:delete(FileName), + case CR of + {ok, Copied} when Copied =:= ExpectedSize; + NoObjects =:= 0 -> % the segments + fsck_copy1(L, Head, Bulk, NoDups); + {ok, Copied} when Bulk, Head#head.version =:= 8 -> + NoZeros = ExpectedSize - Copied, + Dups = NoZeros div Size, + Addr = Pos+Copied, + NewHead = free_n_objects(Head, Addr, Size-1, NoDups), + NewNoDups = NoDups - Dups, + fsck_copy1(L, NewHead, Bulk, NewNoDups); + {ok, _Copied} -> % should never happen + close_files(Bulk, L, Head), + Reason = if Bulk -> initialization_failed; + true -> repair_failed end, + {error, {Reason, Head#head.filename}}; + FError -> + close_files(Bulk, L, Head), + dets_utils:file_error(FileName, FError) + end; +fsck_copy1([], Head, _Bulk, NoDups) when NoDups =/= 0 -> + {error, {initialization_failed, Head#head.filename}}; +fsck_copy1([], Head, _Bulk, _NoDups) -> + {ok, Head#head{update_mode = dirty}}. + +free_n_objects(Head, _Addr, _Size, 0) -> + Head; +free_n_objects(Head, Addr, Size, N) -> + {NewHead, _} = dets_utils:free(Head, Addr, Size), + NewAddr = Addr + Size + 1, + free_n_objects(NewHead, NewAddr, Size, N-1). + +close_files(false, SizeData, Head) -> + file:close(Head#head.fptr), + close_files(true, SizeData, Head); +close_files(true, SizeData, _Head) -> + Fun = fun({_Size, _Pos, {FileName, Fd}, _No}) -> + close_tmp(Fd), + file:delete(FileName); + (_) -> + ok + end, + lists:foreach(Fun, SizeData). + +close_tmp(Fd) -> + file:close(Fd). + +fslot(H, Slot) -> + case catch begin + {NH, []} = write_cache(H), + Objs = (NH#head.mod):slot_objs(NH, Slot), + {NH, Objs} + end of + {NewHead, _Objects} = Reply when is_record(NewHead, head) -> + Reply + end. + +do_update_counter(Head, _Key, _Incr) when Head#head.type =/= set -> + {Head, badarg}; +do_update_counter(Head, Key, Incr) -> + case flookup_keys(Head, [Key]) of + {H1, [O]} -> + Kp = H1#head.keypos, + case catch try_update_tuple(O, Kp, Incr) of + {'EXIT', _} -> + {H1, badarg}; + {New, Term} -> + case finsert(H1, [Term]) of + {H2, ok} -> + {H2, New}; + Reply -> + Reply + end + end; + {H1, []} -> + {H1, badarg}; + HeadError -> + HeadError + end. + +try_update_tuple(O, _Kp, {Pos, Incr}) -> + try_update_tuple2(O, Pos, Incr); +try_update_tuple(O, Kp, Incr) -> + try_update_tuple2(O, Kp+1, Incr). + +try_update_tuple2(O, Pos, Incr) -> + New = element(Pos, O) + Incr, + {New, setelement(Pos, O, New)}. + +set_verbose(true) -> + put(verbose, yes); +set_verbose(_) -> + erase(verbose). + +where_is_object(Head, Object) -> + Keypos = Head#head.keypos, + case check_objects([Object], Keypos) of + true -> + case catch write_cache(Head) of + {NewHead, []} -> + {NewHead, (Head#head.mod):find_object(NewHead, Object)}; + {NewHead, _} = HeadError when is_record(NewHead, head) -> + HeadError + end; + false -> + {Head, badarg} + end. + +check_objects([T | Ts], Kp) when tuple_size(T) >= Kp -> + check_objects(Ts, Kp); +check_objects(L, _Kp) -> + L =:= []. + +no_things(Head) when Head#head.no_keys =:= undefined -> + Head#head.no_objects; +no_things(Head) -> + Head#head.no_keys. + +file_no_things(FH) when FH#fileheader.no_keys =:= undefined -> + FH#fileheader.no_objects; +file_no_things(FH) -> + FH#fileheader.no_keys. + +%%% The write cache is list of {Key, [Item]} where Item is one of +%%% {Seq, delete_key}, {Seq, {lookup,Pid}}, {Seq, {delete_object,object()}}, +%%% or {Seq, {insert,object()}}. Seq is a number that increases +%%% monotonically for each item put in the cache. The purpose is to +%%% make sure that items are sorted correctly. Sequences of delete and +%%% insert operations are inserted in the cache without doing any file +%%% operations. When the cache is considered full, a lookup operation +%%% is requested, or after some delay, the contents of the cache are +%%% written to the file, and the cache emptied. +%%% +%%% Data is not allowed to linger more than 'delay' milliseconds in +%%% the write cache. A delayed_write message is received when some +%%% datum has become too old. If 'wrtime' is equal to 'undefined', +%%% then the cache is empty and no such delayed_write message has been +%%% scheduled. Otherwise there is a delayed_write message scheduled, +%%% and the value of 'wrtime' is the time when the cache was last +%%% written, or when it was first updated after the cache was last +%%% written. + +update_cache(Head, KeysOrObjects, What) -> + {Head1, LU, PwriteList} = update_cache(Head, [{What,KeysOrObjects}]), + {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList), + {NewHead, LU}. + +%% -> {NewHead, [object()], pwrite_list()} | throw({Head, Error}) +update_cache(Head, ToAdd) -> + Cache = Head#head.cache, + #cache{cache = C, csize = Size0, inserts = Ins} = Cache, + NewSize = Size0 + erlang:external_size(ToAdd), + %% The size is used as a sequence number here; it increases monotonically. + {NewC, NewIns, Lookup, Found} = + cache_binary(Head, ToAdd, C, Size0, Ins, false, []), + NewCache = Cache#cache{cache = NewC, csize = NewSize, inserts = NewIns}, + Head1 = Head#head{cache = NewCache}, + if + Lookup; NewSize >= Cache#cache.tsize -> + %% The cache is considered full, or some lookup. + {NewHead, LU, PwriteList} = (Head#head.mod):write_cache(Head1), + {NewHead, Found ++ LU, PwriteList}; + NewC =:= [] -> + {Head1, Found, []}; + Cache#cache.wrtime =:= undefined -> + %% Empty cache. Schedule a delayed write. + Now = now(), Me = self(), + Call = ?DETS_CALL(Me, {delayed_write, Now}), + erlang:send_after(Cache#cache.delay, Me, Call), + {Head1#head{cache = NewCache#cache{wrtime = Now}}, Found, []}; + Size0 =:= 0 -> + %% Empty cache that has been written after the + %% currently scheduled delayed write. + {Head1#head{cache = NewCache#cache{wrtime = now()}}, Found, []}; + true -> + %% Cache is not empty, delayed write has been scheduled. + {Head1, Found, []} + end. + +cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu,F) when Q =:= delete_object -> + cache_obj_op(Head, L, C, Seq, Ins, Lu, F, Os, Head#head.keypos, Q); +cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu, F) when Q =:= insert -> + NewIns = Ins + length(Os), + cache_obj_op(Head, L, C, Seq, NewIns, Lu, F, Os, Head#head.keypos, Q); +cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) when Q =:= delete_key -> + cache_key_op(Head, L, C, Seq, Ins, Lu, F, Ks, Q); +cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, _Lu, F) when C =:= [] -> % lookup + cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q); +cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) -> % lookup + case dets_utils:cache_lookup(Head#head.type, Ks, C, []) of + false -> + cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q); + Found -> + {lookup,Pid} = Q, + cache_binary(Head, L, C, Seq, Ins, Lu, [{Pid,Found} | F]) + end; +cache_binary(_Head, [], C, _Seq, Ins, Lu, F) -> + {C, Ins, Lu, F}. + +cache_key_op(Head, L, C, Seq, Ins, Lu, F, [K | Ks], Q) -> + E = {K, {Seq, Q}}, + cache_key_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Ks, Q); +cache_key_op(Head, L, C, Seq, Ins, Lu, F, [], _Q) -> + cache_binary(Head, L, C, Seq, Ins, Lu, F). + +cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [O | Os], Kp, Q) -> + E = {element(Kp, O), {Seq, {Q, O}}}, + cache_obj_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Os, Kp, Q); +cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [], _Kp, _Q) -> + cache_binary(Head, L, C, Seq, Ins, Lu, F). + +%% Called after some delay. +%% -> NewHead +delayed_write(Head, WrTime) -> + Cache = Head#head.cache, + LastWrTime = Cache#cache.wrtime, + if + LastWrTime =:= WrTime -> + %% The cache was not emptied during the last delay. + case catch write_cache(Head) of + {Head2, []} -> + NewCache = (Head2#head.cache)#cache{wrtime = undefined}, + Head2#head{cache = NewCache}; + {NewHead, _Error} -> % Head.update_mode has been updated + NewHead + end; + true -> + %% The cache was emptied during the delay. + %% Has anything been written since then? + if + Cache#cache.csize =:= 0 -> + %% No, further delayed write not needed. + NewCache = Cache#cache{wrtime = undefined}, + Head#head{cache = NewCache}; + true -> + %% Yes, schedule a new delayed write. + {MS1,S1,M1} = WrTime, + {MS2,S2,M2} = LastWrTime, + WrT = M1+1000000*(S1+1000000*MS1), + LastWrT = M2+1000000*(S2+1000000*MS2), + When = round((LastWrT - WrT)/1000), Me = self(), + Call = ?DETS_CALL(Me, {delayed_write, LastWrTime}), + erlang:send_after(When, Me, Call), + Head + end + end. + +%% -> {NewHead, [LookedUpObject]} | throw({NewHead, Error}) +write_cache(Head) -> + {Head1, LU, PwriteList} = (Head#head.mod):write_cache(Head), + {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList), + {NewHead, LU}. + +status(Head) -> + case Head#head.update_mode of + saved -> ok; + dirty -> ok; + new_dirty -> ok; + Error -> Error + end. + +%%% Scan the file from start to end by reading chunks. + +%% -> dets_cont() +init_scan(Head, NoObjs) -> + check_safe_fixtable(Head), + FreeLists = dets_utils:get_freelists(Head), + Base = Head#head.base, + {From, To} = dets_utils:find_next_allocated(FreeLists, Base, Base), + #dets_cont{no_objs = NoObjs, bin = <<>>, alloc = {From, To, <<>>}}. + +check_safe_fixtable(Head) -> + case (Head#head.fixed =:= false) andalso + ((get(verbose) =:= yes) orelse dets_utils:debug_mode()) of + true -> + error_logger:format + ("** dets: traversal of ~p needs safe_fixtable~n", + [Head#head.name]); + false -> + ok + end. + +%% -> {[RTerm], dets_cont()} | {scan_error, Reason} +%% RTerm = {Pos, Next, Size, Status, Term} +scan(_Head, #dets_cont{alloc = <<>>}=C) -> + {[], C}; +scan(Head, C) -> % when is_record(C, dets_cont) + #dets_cont{no_objs = No, alloc = L0, bin = Bin} = C, + {From, To, L} = L0, + R = case No of + default -> + 0; + _ when is_integer(No) -> + -No-1 + end, + scan(Bin, Head, From, To, L, [], R, {C, Head#head.type}). + +scan(Bin, H, From, To, L, Ts, R, {C0, Type} = C) -> + case (H#head.mod):scan_objs(H, Bin, From, To, L, Ts, R, Type) of + {more, NFrom, NTo, NL, NTs, NR, Sz} -> + scan_read(H, NFrom, NTo, Sz, NL, NTs, NR, C); + {stop, <<>>=B, NFrom, NTo, <<>>=NL, NTs} -> + Ftab = dets_utils:get_freelists(H), + case dets_utils:find_next_allocated(Ftab, NFrom, H#head.base) of + none -> + {NTs, C0#dets_cont{bin = eof, alloc = B}}; + _ -> + {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}} + end; + {stop, B, NFrom, NTo, NL, NTs} -> + {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}}; + bad_object -> + {scan_error, dets_utils:bad_object(scan, {From, To, Bin})} + end. + +scan_read(_H, From, To, _Min, L0, Ts, + R, {C, _Type}) when R >= ?CHUNK_SIZE -> + %% We may have read (much) more than CHUNK_SIZE, if there are holes. + L = {From, To, L0}, + {Ts, C#dets_cont{bin = <<>>, alloc = L}}; +scan_read(H, From, _To, Min, _L, Ts, R, C) -> + Max = if + Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; + true -> Min + end, + FreeLists = dets_utils:get_freelists(H), + case dets_utils:find_allocated(FreeLists, From, Max, H#head.base) of + <<>>=Bin0 -> + {Cont, _} = C, + {Ts, Cont#dets_cont{bin = eof, alloc = Bin0}}; + <<From1:32,To1:32,L1/binary>> -> + case dets_utils:pread_n(H#head.fptr, From1, Max) of + eof -> + {scan_error, premature_eof}; + NewBin -> + scan(NewBin, H, From1, To1, L1, Ts, R, C) + end + end. + +err(Error) -> + case get(verbose) of + yes -> + error_logger:format("** dets: failed with ~w~n", [Error]), + Error; + undefined -> + Error + end. + +%%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%% + +file_info(FileName) -> + case catch read_file_header(FileName, read, false) of + {ok, Fd, FH} -> + file:close(Fd), + (FH#fileheader.mod):file_info(FH); + Other -> + Other + end. + +get_head_field(Fd, Field) -> + dets_utils:read_4(Fd, Field). + +%% Dump the contents of a DAT file to the tty +%% internal debug function which ignores the closed properly thingie +%% and just tries anyway + +view(FileName) -> + case catch read_file_header(FileName, read, false) of + {ok, Fd, FH} -> + Mod = FH#fileheader.mod, + case Mod:check_file_header(FH, Fd) of + {ok, H0, ExtraInfo} -> + Ftab = Mod:init_freelist(H0, ExtraInfo), + {_Bump, Base} = constants(FH, FileName), + H = H0#head{freelists=Ftab, base = Base}, + v_free_list(H), + Mod:v_segments(H), + file:close(Fd); + X -> + file:close(Fd), + X + end; + X -> + X + end. + +v_free_list(Head) -> + io:format("FREE LIST ...... \n",[]), + io:format("~p~n", [dets_utils:all_free(Head)]), + io:format("END OF FREE LIST \n",[]). |