%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1996-2010. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% -module(dets). %% Disk based linear hashing lookup dictionary. %% Public. -export([all/0, bchunk/2, close/1, delete/2, delete_all_objects/1, delete_object/2, first/1, foldl/3, foldr/3, from_ets/2, info/1, info/2, init_table/2, init_table/3, insert/2, insert_new/2, is_compatible_bchunk_format/2, is_dets_file/1, lookup/2, match/1, match/2, match/3, match_delete/2, match_object/1, match_object/2, match_object/3, member/2, next/2, open_file/1, open_file/2, pid2name/1, repair_continuation/2, safe_fixtable/2, select/1, select/2, select/3, select_delete/2, slot/2, sync/1, table/1, table/2, to_ets/2, traverse/2, update_counter/3]). %% Server export. -export([start/0, stop/0]). %% Internal exports. -export([istart_link/1, init/2, internal_open/3, add_user/3, internal_close/1, remove_user/2, system_continue/3, system_terminate/4, system_code_change/4]). %% Debug. -export([file_info/1, fsck/1, fsck/2, get_head_field/2, view/1, where/2, verbose/0, verbose/1 ]). %% Not documented, or not ready for publication. -export([lookup_keys/2]). -export_type([tab_name/0]). -compile({inline, [{einval,2},{badarg,2},{undefined,1}, {badarg_exit,2},{lookup_reply,2}]}). -include_lib("kernel/include/file.hrl"). -include("dets.hrl"). -type object() :: tuple(). -type pattern() :: atom() | tuple(). -type tab_name() :: atom() | reference(). %%% This is the implementation of the mnesia file storage. Each (non %%% ram-copy) table is maintained in a corresponding .DAT file. The %%% dat file is organized as a segmented linear hashlist. The head of %%% the file with the split indicator, size etc is held in ram by the %%% server at all times. %%% %%% The parts specific for formats up to and including 8(c) are %%% implemented in dets_v8.erl, parts specific for format 9 are %%% implemented in dets_v9.erl. %% The method of hashing is the so called linear hashing algorithm %% with segments. %% %% Linear hashing: %% %% - n indicates next bucket to split (initially zero); %% - m is the size of the hash table %% - initially next = m and n = 0 %% %% - to insert: %% - hash = key mod m %% - if hash < n then hash = key mod 2m %% - when the number of objects exceeds the initial size %% of the hash table, each insertion of an object %% causes bucket n to be split: %% - add a new bucket to the end of the table %% - redistribute the contents of bucket n %% using hash = key mod 2m %% - increment n %% - if n = m then m = 2m, n = 0 %% - to search: %% hash = key mod m %% if hash < n then hash = key mod 2m %% do linear scan of the bucket %% %%% If a file error occurs on a working dets file, update_mode is set %%% to the error tuple. When in 'error' mode, the free lists are not %%% written, and a repair is forced next time the file is opened. -record(dets_cont, { what, % object | bindings | select | bchunk no_objs, % requested number of objects: default | integer() > 0 bin, % small chunk not consumed, or 'eof' at end-of-file alloc, % the part of the file not yet scanned, mostly a binary tab, proc, % the pid of the Dets process match_program % true | compiled_match_spec() | undefined }). -record(open_args, { file, type, keypos, repair, min_no_slots, max_no_slots, ram_file, delayed_write, auto_save, access, version, debug }). -define(PATTERN_TO_OBJECT_MATCH_SPEC(Pat), [{Pat,[],['$_']}]). -define(PATTERN_TO_BINDINGS_MATCH_SPEC(Pat), [{Pat,[],['$$']}]). -define(PATTERN_TO_TRUE_MATCH_SPEC(Pat), [{Pat,[],[true]}]). %%-define(DEBUGM(X, Y), io:format(X, Y)). -define(DEBUGM(X, Y), true). %%-define(DEBUGF(X,Y), io:format(X, Y)). -define(DEBUGF(X,Y), void). %%-define(PROFILE(C), C). -define(PROFILE(C), void). %%% Some further debug code was added in R12B-1 (stdlib-1.15.1): %%% - there is a new open_file() option 'debug'; %%% - there is a new OS environment variable 'DETS_DEBUG'; %%% - verbose(true) implies that info messages are written onto %%% the error log whenever an unsafe traversal is started. %%% The 'debug' mode (set by the open_file() option 'debug' or %%% by os:putenv("DETS_DEBUG", "true")) implies that the results of %%% calling pwrite() and pread() are tested to some extent. It also %%% means a considerable overhead when it comes to RAM usage. The %%% operation of Dets is also slowed down a bit. Note that in debug %%% mode terms will be output on the error logger. %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- add_user(Pid, Tab, Args) -> req(Pid, {add_user, Tab, Args}). -spec all() -> [tab_name()]. all() -> dets_server:all(). -type cont() :: #dets_cont{}. -spec bchunk(tab_name(), 'start' | cont()) -> {cont(), binary() | tuple()} | '$end_of_table' | {'error', term()}. bchunk(Tab, start) -> badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]); bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) -> badarg(treq(Tab, {bchunk, State}), [Tab, State]); bchunk(Tab, Term) -> erlang:error(badarg, [Tab, Term]). -spec close(tab_name()) -> 'ok' | {'error', term()}. close(Tab) -> case dets_server:close(Tab) of badarg -> % Should not happen. {error, not_owner}; % Backwards compatibility... Reply -> Reply end. -spec delete(tab_name(), term()) -> 'ok' | {'error', term()}. delete(Tab, Key) -> badarg(treq(Tab, {delete_key, [Key]}), [Tab, Key]). -spec delete_all_objects(tab_name()) -> 'ok' | {'error', term()}. delete_all_objects(Tab) -> case treq(Tab, delete_all_objects) of badarg -> erlang:error(badarg, [Tab]); fixed -> match_delete(Tab, '_'); Reply -> Reply end. -spec delete_object(tab_name(), object()) -> 'ok' | {'error', term()}. delete_object(Tab, O) -> badarg(treq(Tab, {delete_object, [O]}), [Tab, O]). %% Given a filename, fsck it. Debug. fsck(Fname) -> fsck(Fname, default). fsck(Fname, Version) -> catch begin {ok, Fd, FH} = read_file_header(Fname, read, false), ?DEBUGF("FileHeader: ~p~n", [FH]), case (FH#fileheader.mod):check_file_header(FH, Fd) of {error, not_closed} -> fsck(Fd, make_ref(), Fname, FH, default, default, Version); {ok, _Head, _Extra} -> fsck(Fd, make_ref(), Fname, FH, default, default, Version); Error -> Error end end. -spec first(tab_name()) -> term() | '$end_of_table'. first(Tab) -> badarg_exit(treq(Tab, first), [Tab]). -spec foldr(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}. foldr(Fun, Acc, Tab) -> foldl(Fun, Acc, Tab). -spec foldl(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}. foldl(Fun, Acc, Tab) -> Ref = make_ref(), do_traverse(Fun, Acc, Tab, Ref). -spec from_ets(tab_name(), ets:tab()) -> 'ok' | {'error', term()}. from_ets(DTab, ETab) -> ets:safe_fixtable(ETab, true), Spec = ?PATTERN_TO_OBJECT_MATCH_SPEC('_'), LC = ets:select(ETab, Spec, 100), InitFun = from_ets_fun(LC, ETab), Reply = treq(DTab, {initialize, InitFun, term, default}), ets:safe_fixtable(ETab, false), case Reply of {thrown, Thrown} -> throw(Thrown); Else -> badarg(Else, [DTab, ETab]) end. from_ets_fun(LC, ETab) -> fun(close) -> ok; (read) when LC =:= '$end_of_table' -> end_of_input; (read) -> {L, C} = LC, {L, from_ets_fun(ets:select(C), ETab)} end. info(Tab) -> case catch dets_server:get_pid(Tab) of {'EXIT', _Reason} -> undefined; Pid -> undefined(req(Pid, info)) end. info(Tab, owner) -> case catch dets_server:get_pid(Tab) of Pid when is_pid(Pid) -> Pid; _ -> undefined end; info(Tab, users) -> % undocumented case dets_server:users(Tab) of [] -> undefined; Users -> Users end; info(Tab, Tag) -> case catch dets_server:get_pid(Tab) of {'EXIT', _Reason} -> undefined; Pid -> undefined(req(Pid, {info, Tag})) end. init_table(Tab, InitFun) -> init_table(Tab, InitFun, []). init_table(Tab, InitFun, Options) when is_function(InitFun) -> case options(Options, [format, min_no_slots]) of {badarg,_} -> erlang:error(badarg, [Tab, InitFun, Options]); [Format, MinNoSlots] -> case treq(Tab, {initialize, InitFun, Format, MinNoSlots}) of {thrown, Thrown} -> throw(Thrown); Else -> badarg(Else, [Tab, InitFun, Options]) end end; init_table(Tab, InitFun, Options) -> erlang:error(badarg, [Tab, InitFun, Options]). insert(Tab, Objs) when is_list(Objs) -> badarg(treq(Tab, {insert, Objs}), [Tab, Objs]); insert(Tab, Obj) -> badarg(treq(Tab, {insert, [Obj]}), [Tab, Obj]). insert_new(Tab, Objs) when is_list(Objs) -> badarg(treq(Tab, {insert_new, Objs}), [Tab, Objs]); insert_new(Tab, Obj) -> badarg(treq(Tab, {insert_new, [Obj]}), [Tab, Obj]). internal_close(Pid) -> req(Pid, close). internal_open(Pid, Ref, Args) -> req(Pid, {internal_open, Ref, Args}). is_compatible_bchunk_format(Tab, Term) -> badarg(treq(Tab, {is_compatible_bchunk_format, Term}), [Tab, Term]). is_dets_file(FileName) -> case catch read_file_header(FileName, read, false) of {ok, Fd, FH} -> file:close(Fd), FH#fileheader.cookie =:= ?MAGIC; {error, {tooshort, _}} -> false; {error, {not_a_dets_file, _}} -> false; Other -> Other end. lookup(Tab, Key) -> badarg(treq(Tab, {lookup_keys, [Key]}), [Tab, Key]). %% Not public. lookup_keys(Tab, Keys) -> case catch lists:usort(Keys) of UKeys when is_list(UKeys), UKeys =/= [] -> badarg(treq(Tab, {lookup_keys, UKeys}), [Tab, Keys]); _Else -> erlang:error(badarg, [Tab, Keys]) end. match(Tab, Pat) -> badarg(safe_match(Tab, Pat, bindings), [Tab, Pat]). match(Tab, Pat, N) -> badarg(init_chunk_match(Tab, Pat, bindings, N), [Tab, Pat, N]). match(State) when State#dets_cont.what =:= bindings -> badarg(chunk_match(State), [State]); match(Term) -> erlang:error(badarg, [Term]). -spec match_delete(tab_name(), pattern()) -> non_neg_integer() | 'ok' | {'error', term()}. match_delete(Tab, Pat) -> badarg(match_delete(Tab, Pat, delete), [Tab, Pat]). match_delete(Tab, Pat, What) -> safe_fixtable(Tab, true), case compile_match_spec(What, Pat) of {Spec, MP} -> Proc = dets_server:get_pid(Tab), R = req(Proc, {match_delete_init, MP, Spec}), do_match_delete(Tab, Proc, R, What, 0); badarg -> badarg end. do_match_delete(Tab, _Proc, {done, N1}, select, N) -> safe_fixtable(Tab, false), N + N1; do_match_delete(Tab, _Proc, {done, _N1}, _What, _N) -> safe_fixtable(Tab, false), ok; do_match_delete(Tab, Proc, {cont, State, N1}, What, N) -> do_match_delete(Tab, Proc, req(Proc, {match_delete, State}), What, N+N1); do_match_delete(Tab, _Proc, Error, _What, _N) -> safe_fixtable(Tab, false), Error. match_object(Tab, Pat) -> badarg(safe_match(Tab, Pat, object), [Tab, Pat]). match_object(Tab, Pat, N) -> badarg(init_chunk_match(Tab, Pat, object, N), [Tab, Pat, N]). match_object(State) when State#dets_cont.what =:= object -> badarg(chunk_match(State), [State]); match_object(Term) -> erlang:error(badarg, [Term]). member(Tab, Key) -> badarg(treq(Tab, {member, Key}), [Tab, Key]). next(Tab, Key) -> badarg_exit(treq(Tab, {next, Key}), [Tab, Key]). %% Assuming that a file already exists, open it with the %% parameters as already specified in the file itself. %% Return a ref leading to the file. open_file(File) -> case dets_server:open_file(to_list(File)) of badarg -> % Should not happen. erlang:error(dets_process_died, [File]); Reply -> einval(Reply, [File]) end. open_file(Tab, Args) when is_list(Args) -> case catch defaults(Tab, Args) of OpenArgs when is_record(OpenArgs, open_args) -> case dets_server:open_file(Tab, OpenArgs) of badarg -> % Should not happen. erlang:error(dets_process_died, [Tab, Args]); Reply -> einval(Reply, [Tab, Args]) end; _ -> erlang:error(badarg, [Tab, Args]) end; open_file(Tab, Arg) -> open_file(Tab, [Arg]). pid2name(Pid) -> dets_server:pid2name(Pid). remove_user(Pid, From) -> req(Pid, {close, From}). repair_continuation(#dets_cont{match_program = B}=Cont, MS) when is_binary(B) -> case ets:is_compiled_ms(B) of true -> Cont; false -> Cont#dets_cont{match_program = ets:match_spec_compile(MS)} end; repair_continuation(#dets_cont{}=Cont, _MS) -> Cont; repair_continuation(T, MS) -> erlang:error(badarg, [T, MS]). safe_fixtable(Tab, Bool) when Bool; not Bool -> badarg(treq(Tab, {safe_fixtable, Bool}), [Tab, Bool]); safe_fixtable(Tab, Term) -> erlang:error(badarg, [Tab, Term]). select(Tab, Pat) -> badarg(safe_match(Tab, Pat, select), [Tab, Pat]). select(Tab, Pat, N) -> badarg(init_chunk_match(Tab, Pat, select, N), [Tab, Pat, N]). select(State) when State#dets_cont.what =:= select -> badarg(chunk_match(State), [State]); select(Term) -> erlang:error(badarg, [Term]). select_delete(Tab, Pat) -> badarg(match_delete(Tab, Pat, select), [Tab, Pat]). slot(Tab, Slot) when is_integer(Slot), Slot >= 0 -> badarg(treq(Tab, {slot, Slot}), [Tab, Slot]); slot(Tab, Term) -> erlang:error(badarg, [Tab, Term]). start() -> dets_server:start(). stop() -> dets_server:stop(). istart_link(Server) -> {ok, proc_lib:spawn_link(dets, init, [self(), Server])}. sync(Tab) -> badarg(treq(Tab, sync), [Tab]). table(Tab) -> table(Tab, []). table(Tab, Opts) -> case options(Opts, [traverse, n_objects]) of {badarg,_} -> erlang:error(badarg, [Tab, Opts]); [Traverse, NObjs] -> TF = case Traverse of first_next -> fun() -> qlc_next(Tab, first(Tab)) end; select -> fun(MS) -> qlc_select(select(Tab, MS, NObjs)) end; {select, MS} -> fun() -> qlc_select(select(Tab, MS, NObjs)) end end, PreFun = fun(_) -> safe_fixtable(Tab, true) end, PostFun = fun() -> safe_fixtable(Tab, false) end, InfoFun = fun(Tag) -> table_info(Tab, Tag) end, %% lookup_keys is not public, but convenient LookupFun = case Traverse of {select, _MS} -> undefined; _ -> fun(_KeyPos, [K]) -> lookup(Tab, K); (_KeyPos, Ks) -> lookup_keys(Tab, Ks) end end, FormatFun = fun({all, _NElements, _ElementFun}) -> As = [Tab | [Opts || _ <- [[]], Opts =/= []]], {?MODULE, table, As}; ({match_spec, MS}) -> {?MODULE, table, [Tab, [{traverse, {select, MS}} | listify(Opts)]]}; ({lookup, _KeyPos, [Value], _NElements, ElementFun}) -> io_lib:format("~w:lookup(~w, ~w)", [?MODULE, Tab, ElementFun(Value)]); ({lookup, _KeyPos, Values, _NElements, ElementFun}) -> Vals = [ElementFun(V) || V <- Values], io_lib:format("lists:flatmap(fun(V) -> " "~w:lookup(~w, V) end, ~w)", [?MODULE, Tab, Vals]) end, qlc:table(TF, [{pre_fun, PreFun}, {post_fun, PostFun}, {info_fun, InfoFun}, {format_fun, FormatFun}, {key_equality, '=:='}, {lookup_fun, LookupFun}]) end. qlc_next(_Tab, '$end_of_table') -> []; qlc_next(Tab, Key) -> case lookup(Tab, Key) of Objects when is_list(Objects) -> Objects ++ fun() -> qlc_next(Tab, next(Tab, Key)) end; Error -> %% Do what first and next do. exit(Error) end. qlc_select('$end_of_table') -> []; qlc_select({Objects, Cont}) when is_list(Objects) -> Objects ++ fun() -> qlc_select(select(Cont)) end; qlc_select(Error) -> Error. table_info(Tab, num_of_objects) -> info(Tab, size); table_info(Tab, keypos) -> info(Tab, keypos); table_info(Tab, is_unique_objects) -> info(Tab, type) =/= duplicate_bag; table_info(_Tab, _) -> undefined. %% End of table/2. to_ets(DTab, ETab) -> case ets:info(ETab, protection) of undefined -> erlang:error(badarg, [DTab, ETab]); _ -> Fun = fun(X, T) -> true = ets:insert(T, X), T end, foldl(Fun, ETab, DTab) end. traverse(Tab, Fun) -> Ref = make_ref(), TFun = fun(O, Acc) -> case Fun(O) of continue -> Acc; {continue, Val} -> [Val | Acc]; {done, Value} -> throw({Ref, [Value | Acc]}); Other -> throw({Ref, Other}) end end, do_traverse(TFun, [], Tab, Ref). update_counter(Tab, Key, C) -> badarg(treq(Tab, {update_counter, Key, C}), [Tab, Key, C]). verbose() -> verbose(true). verbose(What) -> ok = dets_server:verbose(What), All = dets_server:all(), Fun = fun(Tab) -> treq(Tab, {set_verbose, What}) end, lists:foreach(Fun, All), All. %% Where in the (open) table is Object located? %% The address of the first matching object is returned. %% Format 9 returns the address of the object collection. %% -> {ok, Address} | false where(Tab, Object) -> badarg(treq(Tab, {where, Object}), [Tab, Object]). do_traverse(Fun, Acc, Tab, Ref) -> safe_fixtable(Tab, true), Proc = dets_server:get_pid(Tab), try do_trav(Proc, Acc, Fun) catch {Ref, Result} -> Result after safe_fixtable(Tab, false) end. do_trav(Proc, Acc, Fun) -> {Spec, MP} = compile_match_spec(object, '_'), %% MP not used case req(Proc, {match, MP, Spec, default}) of {cont, State} -> do_trav(State, Proc, Acc, Fun); Error -> Error end. do_trav(#dets_cont{bin = eof}, _Proc, Acc, _Fun) -> Acc; do_trav(State, Proc, Acc, Fun) -> case req(Proc, {match_init, State}) of {cont, {Bins, NewState}} -> do_trav_bins(NewState, Proc, Acc, Fun, lists:reverse(Bins)); Error -> Error end. do_trav_bins(State, Proc, Acc, Fun, []) -> do_trav(State, Proc, Acc, Fun); do_trav_bins(State, Proc, Acc, Fun, [Bin | Bins]) -> %% Unpack one binary at a time, using the client's heap. case catch binary_to_term(Bin) of {'EXIT', _} -> req(Proc, {corrupt, dets_utils:bad_object(do_trav_bins, Bin)}); Term -> NewAcc = Fun(Term, Acc), do_trav_bins(State, Proc, NewAcc, Fun, Bins) end. safe_match(Tab, Pat, What) -> safe_fixtable(Tab, true), R = do_safe_match(init_chunk_match(Tab, Pat, What, default), []), safe_fixtable(Tab, false), R. do_safe_match({error, Error}, _L) -> {error, Error}; do_safe_match({L, C}, LL) -> do_safe_match(chunk_match(C), L++LL); do_safe_match('$end_of_table', L) -> L; do_safe_match(badarg, _L) -> badarg. %% What = object | bindings | select init_chunk_match(Tab, Pat, What, N) when is_integer(N), N >= 0; N =:= default -> case compile_match_spec(What, Pat) of {Spec, MP} -> Proc = dets_server:get_pid(Tab), case req(Proc, {match, MP, Spec, N}) of {done, L} -> {L, #dets_cont{tab = Tab, proc = Proc, what = What, bin = eof}}; {cont, State} -> chunk_match(State#dets_cont{what = What, tab = Tab, proc = Proc}); Error -> Error end; badarg -> badarg end; init_chunk_match(_Tab, _Pat, _What, _) -> badarg. chunk_match(#dets_cont{proc = Proc}=State) -> case req(Proc, {match_init, State}) of '$end_of_table'=Reply -> Reply; {cont, {Bins, NewState}} -> MP = NewState#dets_cont.match_program, case catch do_foldl_bins(Bins, MP) of {'EXIT', _} -> case ets:is_compiled_ms(MP) of true -> Bad = dets_utils:bad_object(chunk_match, Bins), req(Proc, {corrupt, Bad}); false -> badarg end; [] -> chunk_match(NewState); Terms -> {Terms, NewState} end; Error -> Error end. do_foldl_bins(Bins, true) -> foldl_bins(Bins, []); do_foldl_bins(Bins, MP) -> foldl_bins(Bins, MP, []). foldl_bins([], Terms) -> %% Preserve time order (version 9). Terms; foldl_bins([Bin | Bins], Terms) -> foldl_bins(Bins, [binary_to_term(Bin) | Terms]). foldl_bins([], _MP, Terms) -> %% Preserve time order (version 9). Terms; foldl_bins([Bin | Bins], MP, Terms) -> Term = binary_to_term(Bin), case ets:match_spec_run([Term], MP) of [] -> foldl_bins(Bins, MP, Terms); [Result] -> foldl_bins(Bins, MP, [Result | Terms]) end. %% -> {Spec, binary()} | badarg compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC('_') = Spec) -> {Spec, true}; compile_match_spec(select, Spec) -> case catch ets:match_spec_compile(Spec) of X when is_binary(X) -> {Spec, X}; _ -> badarg end; compile_match_spec(object, Pat) -> compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC(Pat)); compile_match_spec(bindings, Pat) -> compile_match_spec(select, ?PATTERN_TO_BINDINGS_MATCH_SPEC(Pat)); compile_match_spec(delete, Pat) -> compile_match_spec(select, ?PATTERN_TO_TRUE_MATCH_SPEC(Pat)). %% Process the args list as provided to open_file/2. defaults(Tab, Args) -> Defaults0 = #open_args{file = to_list(Tab), type = set, keypos = 1, repair = true, min_no_slots = default, max_no_slots = default, ram_file = false, delayed_write = ?DEFAULT_CACHE, auto_save = timer:minutes(?DEFAULT_AUTOSAVE), access = read_write, version = default, debug = false}, Fun = fun repl/2, Defaults = lists:foldl(Fun, Defaults0, Args), case Defaults#open_args.version of 8 -> Defaults#open_args{max_no_slots = default}; _ -> is_comp_min_max(Defaults) end. to_list(T) when is_atom(T) -> atom_to_list(T); to_list(T) -> T. repl({access, A}, Defs) -> mem(A, [read, read_write]), Defs#open_args{access = A}; repl({auto_save, Int}, Defs) when is_integer(Int), Int >= 0 -> Defs#open_args{auto_save = Int}; repl({auto_save, infinity}, Defs) -> Defs#open_args{auto_save =infinity}; repl({cache_size, Int}, Defs) when is_integer(Int), Int >= 0 -> %% Recognized, but ignored. Defs; repl({cache_size, infinity}, Defs) -> Defs; repl({delayed_write, default}, Defs) -> Defs#open_args{delayed_write = ?DEFAULT_CACHE}; repl({delayed_write, {Delay,Size} = C}, Defs) when is_integer(Delay), Delay >= 0, is_integer(Size), Size >= 0 -> Defs#open_args{delayed_write = C}; repl({estimated_no_objects, I}, Defs) -> repl({min_no_slots, I}, Defs); repl({file, File}, Defs) -> Defs#open_args{file = to_list(File)}; repl({keypos, P}, Defs) when is_integer(P), P > 0 -> Defs#open_args{keypos =P}; repl({max_no_slots, I}, Defs) -> %% Version 9 only. MaxSlots = is_max_no_slots(I), Defs#open_args{max_no_slots = MaxSlots}; repl({min_no_slots, I}, Defs) -> MinSlots = is_min_no_slots(I), Defs#open_args{min_no_slots = MinSlots}; repl({ram_file, Bool}, Defs) -> mem(Bool, [true, false]), Defs#open_args{ram_file = Bool}; repl({repair, T}, Defs) -> mem(T, [true, false, force]), Defs#open_args{repair = T}; repl({type, T}, Defs) -> mem(T, [set, bag, duplicate_bag]), Defs#open_args{type =T}; repl({version, Version}, Defs) -> V = is_version(Version), Defs#open_args{version = V}; repl({debug, Bool}, Defs) -> %% Not documented. mem(Bool, [true, false]), Defs#open_args{debug = Bool}; repl({_, _}, _) -> exit(badarg). is_min_no_slots(default) -> default; is_min_no_slots(I) when is_integer(I), I >= ?DEFAULT_MIN_NO_SLOTS -> I; is_min_no_slots(I) when is_integer(I), I >= 0 -> ?DEFAULT_MIN_NO_SLOTS. is_max_no_slots(default) -> default; is_max_no_slots(I) when is_integer(I), I > 0, I < 1 bsl 31 -> I. is_comp_min_max(Defs) -> #open_args{max_no_slots = Max, min_no_slots = Min, version = V} = Defs, case V of _ when Min =:= default -> Defs; _ when Max =:= default -> Defs; _ -> true = Min =< Max, Defs end. is_version(default) -> default; is_version(8) -> 8; is_version(9) -> 9. mem(X, L) -> case lists:member(X, L) of true -> true; false -> exit(badarg) end. options(Options, Keys) when is_list(Options) -> options(Options, Keys, []); options(Option, Keys) -> options([Option], Keys, []). options(Options, [Key | Keys], L) when is_list(Options) -> V = case lists:keysearch(Key, 1, Options) of {value, {format, Format}} when Format =:= term; Format =:= bchunk -> {ok, Format}; {value, {min_no_slots, I}} -> case catch is_min_no_slots(I) of {'EXIT', _} -> badarg; MinNoSlots -> {ok, MinNoSlots} end; {value, {n_objects, default}} -> {ok, default_option(Key)}; {value, {n_objects, NObjs}} when is_integer(NObjs), NObjs >= 1 -> {ok, NObjs}; {value, {traverse, select}} -> {ok, select}; {value, {traverse, {select, MS}}} -> {ok, {select, MS}}; {value, {traverse, first_next}} -> {ok, first_next}; {value, {Key, _}} -> badarg; false -> Default = default_option(Key), {ok, Default} end, case V of badarg -> {badarg, Key}; {ok, Value} -> NewOptions = lists:keydelete(Key, 1, Options), options(NewOptions, Keys, [Value | L]) end; options([], [], L) -> lists:reverse(L); options(Options, _, _L) -> {badarg,Options}. default_option(format) -> term; default_option(min_no_slots) -> default; default_option(traverse) -> select; default_option(n_objects) -> default. listify(L) when is_list(L) -> L; listify(T) -> [T]. treq(Tab, R) -> case catch dets_server:get_pid(Tab) of Pid when is_pid(Pid) -> req(Pid, R); _ -> badarg end. req(Proc, R) -> Ref = erlang:monitor(process, Proc), Proc ! ?DETS_CALL(self(), R), receive {'DOWN', Ref, process, Proc, _Info} -> badarg; {Proc, Reply} -> erlang:demonitor(Ref), receive {'DOWN', Ref, process, Proc, _Reason} -> Reply after 0 -> Reply end end. %% Inlined. einval({error, {file_error, _, einval}}, A) -> erlang:error(badarg, A); einval({error, {file_error, _, badarg}}, A) -> erlang:error(badarg, A); einval(Reply, _A) -> Reply. %% Inlined. badarg(badarg, A) -> erlang:error(badarg, A); badarg(Reply, _A) -> Reply. %% Inlined. undefined(badarg) -> undefined; undefined(Reply) -> Reply. %% Inlined. badarg_exit(badarg, A) -> erlang:error(badarg, A); badarg_exit({ok, Reply}, _A) -> Reply; badarg_exit(Reply, _A) -> exit(Reply). %%%----------------------------------------------------------------- %%% Server functions %%%----------------------------------------------------------------- init(Parent, Server) -> process_flag(trap_exit, true), open_file_loop(#head{parent = Parent, server = Server}). open_file_loop(Head) -> open_file_loop(Head, 0). open_file_loop(Head, N) when element(1, Head#head.update_mode) =:= error -> open_file_loop2(Head, N); open_file_loop(Head, N) -> receive %% When the table is fixed it can be assumed that at least one %% traversal is in progress. To speed the traversal up three %% things have been done: %% - prioritize match_init, bchunk, next, and match_delete_init; %% - do not peek the message queue for updates; %% - wait 1 ms after each update. %% next is normally followed by lookup, but since lookup is also %% used when not traversing the table, it is not prioritized. ?DETS_CALL(From, {match_init, _State} = Op) -> do_apply_op(Op, From, Head, N); ?DETS_CALL(From, {bchunk, _State} = Op) -> do_apply_op(Op, From, Head, N); ?DETS_CALL(From, {next, _Key} = Op) -> do_apply_op(Op, From, Head, N); ?DETS_CALL(From, {match_delete_init, _MP, _Spec} = Op) -> do_apply_op(Op, From, Head, N); {'EXIT', Pid, Reason} when Pid =:= Head#head.parent -> %% Parent orders shutdown. _NewHead = do_stop(Head), exit(Reason); {'EXIT', Pid, Reason} when Pid =:= Head#head.server -> %% The server is gone. _NewHead = do_stop(Head), exit(Reason); {'EXIT', Pid, _Reason} -> %% A process fixing the table exits. H2 = remove_fix(Head, Pid, close), open_file_loop(H2, N); {system, From, Req} -> sys:handle_system_msg(Req, From, Head#head.parent, ?MODULE, [], Head) after 0 -> open_file_loop2(Head, N) end. open_file_loop2(Head, N) -> receive ?DETS_CALL(From, Op) -> do_apply_op(Op, From, Head, N); {'EXIT', Pid, Reason} when Pid =:= Head#head.parent -> %% Parent orders shutdown. _NewHead = do_stop(Head), exit(Reason); {'EXIT', Pid, Reason} when Pid =:= Head#head.server -> %% The server is gone. _NewHead = do_stop(Head), exit(Reason); {'EXIT', Pid, _Reason} -> %% A process fixing the table exits. H2 = remove_fix(Head, Pid, close), open_file_loop(H2, N); {system, From, Req} -> sys:handle_system_msg(Req, From, Head#head.parent, ?MODULE, [], Head); Message -> error_logger:format("** dets: unexpected message" "(ignored): ~w~n", [Message]), open_file_loop(Head, N) end. do_apply_op(Op, From, Head, N) -> try apply_op(Op, From, Head, N) of ok -> open_file_loop(Head, N); {N2, H2} when is_record(H2, head), is_integer(N2) -> open_file_loop(H2, N2); H2 when is_record(H2, head) -> open_file_loop(H2, N); {{more,From1,Op1,N1}, NewHead} -> do_apply_op(Op1, From1, NewHead, N1) catch exit:normal -> exit(normal); _:Bad -> Name = Head#head.name, case dets_utils:debug_mode() of true -> %% If stream_op/5 found more requests, this is not %% the last operation. error_logger:format ("** dets: Bug was found when accessing table ~w,~n" "** dets: operation was ~p and reply was ~w.~n" "** dets: Stacktrace: ~w~n", [Name, Op, Bad, erlang:get_stacktrace()]); false -> error_logger:format ("** dets: Bug was found when accessing table ~w~n", [Name]) end, if From =/= self() -> From ! {self(), {error, {dets_bug, Name, Op, Bad}}}; true -> % auto_save | may_grow | {delayed_write, _} ok end, open_file_loop(Head, N) end. apply_op(Op, From, Head, N) -> case Op of {add_user, Tab, OpenArgs}-> #open_args{file = Fname, type = Type, keypos = Keypos, ram_file = Ram, access = Access, version = Version} = OpenArgs, VersionOK = (Version =:= default) or (Head#head.version =:= Version), %% min_no_slots and max_no_slots are not tested Res = if Tab =:= Head#head.name, Head#head.keypos =:= Keypos, Head#head.type =:= Type, Head#head.ram_file =:= Ram, Head#head.access =:= Access, VersionOK, Fname =:= Head#head.filename -> ok; true -> err({error, incompatible_arguments}) end, From ! {self(), Res}, ok; auto_save -> case Head#head.update_mode of saved -> Head; {error, _Reason} -> Head; _Dirty when N =:= 0 -> % dirty or new_dirty %% The updates seems to have declined dets_utils:vformat("** dets: Auto save of ~p\n", [Head#head.name]), {NewHead, _Res} = perform_save(Head, true), erlang:garbage_collect(), {0, NewHead}; dirty -> %% Reset counter and try later start_auto_save_timer(Head), {0, Head} end; close -> From ! {self(), fclose(Head)}, _NewHead = unlink_fixing_procs(Head), ?PROFILE(ep:done()), exit(normal); {close, Pid} -> %% Used from dets_server when Pid has closed the table, %% but the table is still opened by some process. NewHead = remove_fix(Head, Pid, close), From ! {self(), status(NewHead)}, NewHead; {corrupt, Reason} -> {H2, Error} = dets_utils:corrupt_reason(Head, Reason), From ! {self(), Error}, H2; {delayed_write, WrTime} -> delayed_write(Head, WrTime); info -> {H2, Res} = finfo(Head), From ! {self(), Res}, H2; {info, Tag} -> {H2, Res} = finfo(Head, Tag), From ! {self(), Res}, H2; {is_compatible_bchunk_format, Term} -> Res = test_bchunk_format(Head, Term), From ! {self(), Res}, ok; {internal_open, Ref, Args} -> ?PROFILE(ep:do()), case do_open_file(Args, Head#head.parent, Head#head.server,Ref) of {ok, H2} -> From ! {self(), ok}, H2; Error -> From ! {self(), Error}, exit(normal) end; may_grow when Head#head.update_mode =/= saved -> if Head#head.update_mode =:= dirty -> %% Won't grow more if the table is full. {H2, _Res} = (Head#head.mod):may_grow(Head, 0, many_times), {N + 1, H2}; true -> ok end; {set_verbose, What} -> set_verbose(What), From ! {self(), ok}, ok; {where, Object} -> {H2, Res} = where_is_object(Head, Object), From ! {self(), Res}, H2; _Message when element(1, Head#head.update_mode) =:= error -> From ! {self(), status(Head)}, ok; %% The following messages assume that the status of the table is OK. {bchunk_init, Tab} -> {H2, Res} = do_bchunk_init(Head, Tab), From ! {self(), Res}, H2; {bchunk, State} -> {H2, Res} = do_bchunk(Head, State), From ! {self(), Res}, H2; delete_all_objects -> {H2, Res} = fdelete_all_objects(Head), From ! {self(), Res}, erlang:garbage_collect(), {0, H2}; {delete_key, Keys} when Head#head.update_mode =:= dirty -> if Head#head.version =:= 8 -> {H2, Res} = fdelete_key(Head, Keys), From ! {self(), Res}, {N + 1, H2}; true -> stream_op(Op, From, [], Head, N) end; {delete_object, Objs} when Head#head.update_mode =:= dirty -> case check_objects(Objs, Head#head.keypos) of true when Head#head.version =:= 8 -> {H2, Res} = fdelete_object(Head, Objs), From ! {self(), Res}, {N + 1, H2}; true -> stream_op(Op, From, [], Head, N); false -> From ! {self(), badarg}, ok end; first -> {H2, Res} = ffirst(Head), From ! {self(), Res}, H2; {initialize, InitFun, Format, MinNoSlots} -> {H2, Res} = finit(Head, InitFun, Format, MinNoSlots), From ! {self(), Res}, erlang:garbage_collect(), H2; {insert, Objs} when Head#head.update_mode =:= dirty -> case check_objects(Objs, Head#head.keypos) of true when Head#head.version =:= 8 -> {H2, Res} = finsert(Head, Objs), From ! {self(), Res}, {N + 1, H2}; true -> stream_op(Op, From, [], Head, N); false -> From ! {self(), badarg}, ok end; {insert_new, Objs} when Head#head.update_mode =:= dirty -> {H2, Res} = finsert_new(Head, Objs), From ! {self(), Res}, {N + 1, H2}; {lookup_keys, Keys} when Head#head.version =:= 8 -> {H2, Res} = flookup_keys(Head, Keys), From ! {self(), Res}, H2; {lookup_keys, _Keys} -> stream_op(Op, From, [], Head, N); {match_init, State} -> {H2, Res} = fmatch_init(Head, State), From ! {self(), Res}, H2; {match, MP, Spec, NObjs} -> {H2, Res} = fmatch(Head, MP, Spec, NObjs), From ! {self(), Res}, H2; {member, Key} when Head#head.version =:= 8 -> {H2, Res} = fmember(Head, Key), From ! {self(), Res}, H2; {member, _Key} = Op -> stream_op(Op, From, [], Head, N); {next, Key} -> {H2, Res} = fnext(Head, Key), From ! {self(), Res}, H2; {match_delete, State} when Head#head.update_mode =:= dirty -> {H2, Res} = fmatch_delete(Head, State), From ! {self(), Res}, {N + 1, H2}; {match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty -> {H2, Res} = fmatch_delete_init(Head, MP, Spec), From ! {self(), Res}, {N + 1, H2}; {safe_fixtable, Bool} -> NewHead = do_safe_fixtable(Head, From, Bool), From ! {self(), ok}, NewHead; {slot, Slot} -> {H2, Res} = fslot(Head, Slot), From ! {self(), Res}, H2; sync -> {NewHead, Res} = perform_save(Head, true), From ! {self(), Res}, erlang:garbage_collect(), {0, NewHead}; {update_counter, Key, Incr} when Head#head.update_mode =:= dirty -> {NewHead, Res} = do_update_counter(Head, Key, Incr), From ! {self(), Res}, {N + 1, NewHead}; WriteOp when Head#head.update_mode =:= new_dirty -> H2 = Head#head{update_mode = dirty}, apply_op(WriteOp, From, H2, 0); WriteOp when Head#head.access =:= read_write, Head#head.update_mode =:= saved -> case catch (Head#head.mod):mark_dirty(Head) of ok -> start_auto_save_timer(Head), H2 = Head#head{update_mode = dirty}, apply_op(WriteOp, From, H2, 0); {NewHead, Error} when is_record(NewHead, head) -> From ! {self(), Error}, NewHead end; WriteOp when is_tuple(WriteOp), Head#head.access =:= read -> Reason = {access_mode, Head#head.filename}, From ! {self(), err({error, Reason})}, ok end. start_auto_save_timer(Head) when Head#head.auto_save =:= infinity -> ok; start_auto_save_timer(Head) -> Millis = Head#head.auto_save, erlang:send_after(Millis, self(), ?DETS_CALL(self(), auto_save)). %% Version 9: Peek the message queue and try to evaluate several %% lookup requests in parallel. Evalute delete_object, delete and %% insert as well. stream_op(Op, Pid, Pids, Head, N) -> #head{fixed = Fxd, update_mode = M} = Head, stream_op(Head, Pids, [], N, Pid, Op, Fxd, M). stream_loop(Head, Pids, C, N, false = Fxd, M) -> receive ?DETS_CALL(From, Message) -> stream_op(Head, Pids, C, N, From, Message, Fxd, M) after 0 -> stream_end(Head, Pids, C, N, no_more) end; stream_loop(Head, Pids, C, N, _Fxd, _M) -> stream_end(Head, Pids, C, N, no_more). stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd, M) -> NC = [{{lookup,Pid},Keys} | C], stream_loop(Head, Pids, NC, N, Fxd, M); stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd, dirty = M) -> NC = [Op | C], stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd, dirty = M) -> NC = [Op | C], stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); stream_op(Head, Pids, C, N, Pid, {delete_object, _Os} = Op, Fxd, dirty = M) -> NC = [Op | C], stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd, M) -> NC = [{{lookup,[Pid]},[Key]} | C], stream_loop(Head, Pids, NC, N, Fxd, M); stream_op(Head, Pids, C, N, Pid, Op, _Fxd, _M) -> stream_end(Head, Pids, C, N, {Pid,Op}). stream_end(Head, Pids0, C, N, Next) -> case catch update_cache(Head, lists:reverse(C)) of {Head1, [], PwriteList} -> stream_end1(Pids0, Next, N, C, Head1, PwriteList); {Head1, Found, PwriteList} -> %% Possibly an optimization: reply to lookup requests %% first, then write stuff. This makes it possible for %% clients to continue while the disk is accessed. %% (Replies to lookup requests are sent earlier than %% replies to delete and insert requests even if the %% latter requests were made before the lookup requests, %% which can be confusing.) lookup_replies(Found), stream_end1(Pids0, Next, N, C, Head1, PwriteList); Head1 when is_record(Head1, head) -> stream_end2(Pids0, Pids0, Next, N, C, Head1, ok); {Head1, Error} when is_record(Head1, head) -> %% Dig out the processes that did lookup or member. Fun = fun({{lookup,[Pid]},_Keys}, L) -> [Pid | L]; ({{lookup,Pid},_Keys}, L) -> [Pid | L]; (_, L) -> L end, LPs0 = lists:foldl(Fun, [], C), LPs = lists:usort(lists:flatten(LPs0)), stream_end2(Pids0 ++ LPs, Pids0, Next, N, C, Head1, Error); DetsError -> throw(DetsError) end. stream_end1(Pids, Next, N, C, Head, []) -> stream_end2(Pids, Pids, Next, N, C, Head, ok); stream_end1(Pids, Next, N, C, Head, PwriteList) -> {Head1, PR} = (catch dets_utils:pwrite(Head, PwriteList)), stream_end2(Pids, Pids, Next, N, C, Head1, PR). stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) -> Pid ! {self(), Reply}, stream_end2(Pids, Ps, Next, N+1, C, Head, Reply); stream_end2([], Ps, no_more, N, C, Head, _Reply) -> penalty(Head, Ps, C), {N, Head}; stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) -> {{more,From,Op,N},Head}. penalty(H, _Ps, _C) when H#head.fixed =:= false -> ok; penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) -> ok; penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) -> ok; penalty(_H, _Ps, _C) -> timer:sleep(1). lookup_replies([{P,O}]) -> lookup_reply(P, O); lookup_replies(Q) -> [{P,O} | L] = dets_utils:family(Q), lookup_replies(P, lists:append(O), L). lookup_replies(P, O, []) -> lookup_reply(P, O); lookup_replies(P, O, [{P2,O2} | L]) -> lookup_reply(P, O), lookup_replies(P2, lists:append(O2), L). %% If a list of Pid then op was {member, Key}. Inlined. lookup_reply([P], O) -> P ! {self(), O =/= []}; lookup_reply(P, O) -> P ! {self(), O}. %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- system_continue(_Parent, _, Head) -> open_file_loop(Head). system_terminate(Reason, _Parent, _, Head) -> _NewHead = do_stop(Head), exit(Reason). %%----------------------------------------------------------------- %% Code for upgrade. %%----------------------------------------------------------------- system_code_change(State, _Module, _OldVsn, _Extra) -> {ok, State}. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- constants(FH, FileName) -> Version = FH#fileheader.version, if Version =< 8 -> dets_v8:constants(); Version =:= 9 -> dets_v9:constants(); true -> throw({error, {not_a_dets_file, FileName}}) end. %% -> {ok, Fd, fileheader()} | throw(Error) read_file_header(FileName, Access, RamFile) -> BF = if RamFile -> case file:read_file(FileName) of {ok, B} -> B; Err -> dets_utils:file_error(FileName, Err) end; true -> FileName end, {ok, Fd} = dets_utils:open(BF, open_args(Access, RamFile)), {ok, <<Version:32>>} = dets_utils:pread_close(Fd, FileName, ?FILE_FORMAT_VERSION_POS, 4), if Version =< 8 -> dets_v8:read_file_header(Fd, FileName); Version =:= 9 -> dets_v9:read_file_header(Fd, FileName); true -> throw({error, {not_a_dets_file, FileName}}) end. fclose(Head) -> {Head1, Res} = perform_save(Head, false), case Head1#head.ram_file of true -> ignore; false -> dets_utils:stop_disk_map(), file:close(Head1#head.fptr) end, Res. %% -> {NewHead, Res} perform_save(Head, DoSync) when Head#head.update_mode =:= dirty; Head#head.update_mode =:= new_dirty -> case catch begin {Head1, []} = write_cache(Head), {Head2, ok} = (Head1#head.mod):do_perform_save(Head1), ok = ensure_written(Head2, DoSync), {Head2#head{update_mode = saved}, ok} end of {NewHead, _} = Reply when is_record(NewHead, head) -> Reply end; perform_save(Head, _DoSync) -> {Head, status(Head)}. ensure_written(Head, DoSync) when Head#head.ram_file -> {ok, EOF} = dets_utils:position(Head, eof), {ok, Bin} = dets_utils:pread(Head, 0, EOF, 0), if DoSync -> dets_utils:write_file(Head, Bin); not DoSync -> case file:write_file(Head#head.filename, Bin) of ok -> ok; Error -> dets_utils:corrupt_file(Head, Error) end end; ensure_written(Head, true) when not Head#head.ram_file -> dets_utils:sync(Head); ensure_written(Head, false) when not Head#head.ram_file -> ok. %% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} do_bchunk_init(Head, Tab) -> case catch write_cache(Head) of {H2, []} -> case (H2#head.mod):table_parameters(H2) of undefined -> {H2, {error, old_version}}; Parms -> L = dets_utils:all_allocated(H2), C0 = #dets_cont{no_objs = default, bin = <<>>, alloc = L}, BinParms = term_to_binary(Parms), {H2, {C0#dets_cont{tab = Tab, proc = self(),what = bchunk}, [BinParms]}} end; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end. %% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} do_bchunk(Head, #dets_cont{proc = Proc}) when Proc =/= self() -> {Head, badarg}; do_bchunk(Head, #dets_cont{bin = eof}) -> {Head, '$end_of_table'}; do_bchunk(Head, State) -> case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of {error, Reason} -> dets_utils:corrupt_reason(Head, Reason); {finished, Bins} -> {Head, {State#dets_cont{bin = eof}, Bins}}; {Bins, NewL} -> {Head, {State#dets_cont{alloc = NewL}, Bins}} end. %% -> {NewHead, Result} fdelete_all_objects(Head) when Head#head.fixed =:= false -> case catch do_delete_all_objects(Head) of {ok, NewHead} -> start_auto_save_timer(NewHead), {NewHead, ok}; {error, Reason} -> dets_utils:corrupt_reason(Head, Reason) end; fdelete_all_objects(Head) -> {Head, fixed}. do_delete_all_objects(Head) -> #head{fptr = Fd, name = Tab, filename = Fname, type = Type, keypos = Kp, ram_file = Ram, auto_save = Auto, min_no_slots = MinSlots, max_no_slots = MaxSlots, cache = Cache} = Head, CacheSz = dets_utils:cache_size(Cache), ok = dets_utils:truncate(Fd, Fname, bof), (Head#head.mod):initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz, Auto, true). %% -> {NewHead, Reply}, Reply = ok | Error. fdelete_key(Head, Keys) -> do_delete(Head, Keys, delete_key). %% -> {NewHead, Reply}, Reply = ok | badarg | Error. fdelete_object(Head, Objects) -> do_delete(Head, Objects, delete_object). ffirst(H) -> Ref = make_ref(), case catch {Ref, ffirst1(H)} of {Ref, {NH, R}} -> {NH, {ok, R}}; {NH, R} when is_record(NH, head) -> {NH, {error, R}} end. ffirst1(H) -> check_safe_fixtable(H), {NH, []} = write_cache(H), ffirst(NH, 0). ffirst(H, Slot) -> case (H#head.mod):slot_objs(H, Slot) of '$end_of_table' -> {H, '$end_of_table'}; [] -> ffirst(H, Slot+1); [X|_] -> {H, element(H#head.keypos, X)} end. %% -> {NewHead, Reply}, Reply = ok | badarg | Error. finsert(Head, Objects) -> case catch update_cache(Head, Objects, insert) of {NewHead, []} -> {NewHead, ok}; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end. %% -> {NewHead, Reply}, Reply = ok | badarg | Error. finsert_new(Head, Objects) -> KeyPos = Head#head.keypos, case catch lists:map(fun(Obj) -> element(KeyPos, Obj) end, Objects) of Keys when is_list(Keys) -> case catch update_cache(Head, Keys, {lookup, nopid}) of {Head1, PidObjs} when is_list(PidObjs) -> case lists:all(fun({_P,OL}) -> OL =:= [] end, PidObjs) of true -> case catch update_cache(Head1, Objects, insert) of {NewHead, []} -> {NewHead, true}; {NewHead, Error} when is_record(NewHead, head) -> {NewHead, Error} end; false=Reply -> {Head1, Reply} end; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end; _ -> {Head, badarg} end. do_safe_fixtable(Head, Pid, true) -> case Head#head.fixed of false -> link(Pid), Fixed = {erlang:now(), [{Pid, 1}]}, Ftab = dets_utils:get_freelists(Head), Head#head{fixed = Fixed, freelists = {Ftab, Ftab}}; {TimeStamp, Counters} -> case lists:keysearch(Pid, 1, Counters) of {value, {Pid, Counter}} -> % when Counter > 1 NewCounters = lists:keyreplace(Pid, 1, Counters, {Pid, Counter+1}), Head#head{fixed = {TimeStamp, NewCounters}}; false -> link(Pid), Fixed = {TimeStamp, [{Pid, 1} | Counters]}, Head#head{fixed = Fixed} end end; do_safe_fixtable(Head, Pid, false) -> remove_fix(Head, Pid, false). remove_fix(Head, Pid, How) -> case Head#head.fixed of false -> Head; {TimeStamp, Counters} -> case lists:keysearch(Pid, 1, Counters) of %% How =:= close when Pid closes the table. {value, {Pid, Counter}} when Counter =:= 1; How =:= close -> unlink(Pid), case lists:keydelete(Pid, 1, Counters) of [] -> check_growth(Head), erlang:garbage_collect(), Head#head{fixed = false, freelists = dets_utils:get_freelists(Head)}; NewCounters -> Head#head{fixed = {TimeStamp, NewCounters}} end; {value, {Pid, Counter}} -> NewCounters = lists:keyreplace(Pid, 1, Counters, {Pid, Counter-1}), Head#head{fixed = {TimeStamp, NewCounters}}; false -> Head end end. do_stop(Head) -> unlink_fixing_procs(Head), fclose(Head). unlink_fixing_procs(Head) -> case Head#head.fixed of false -> Head; {_, Counters} -> lists:map(fun({Pid, _Counter}) -> unlink(Pid) end, Counters), Head#head{fixed = false, freelists = dets_utils:get_freelists(Head)} end. check_growth(#head{access = read}) -> ok; check_growth(Head) -> NoThings = no_things(Head), if NoThings > Head#head.next -> erlang:send_after(200, self(), ?DETS_CALL(self(), may_grow)); % Catch up. true -> ok end. finfo(H) -> case catch write_cache(H) of {H2, []} -> Info = (catch [{type, H2#head.type}, {keypos, H2#head.keypos}, {size, H2#head.no_objects}, {file_size, file_size(H2#head.fptr, H2#head.filename)}, {filename, H2#head.filename}]), {H2, Info}; {H2, _} = HeadError when is_record(H2, head) -> HeadError end. finfo(H, access) -> {H, H#head.access}; finfo(H, auto_save) -> {H, H#head.auto_save}; finfo(H, bchunk_format) -> case catch write_cache(H) of {H2, []} -> case (H2#head.mod):table_parameters(H2) of undefined = Undef -> {H2, Undef}; Parms -> {H2, term_to_binary(Parms)} end; {H2, _} = HeadError when is_record(H2, head) -> HeadError end; finfo(H, delayed_write) -> % undocumented {H, dets_utils:cache_size(H#head.cache)}; finfo(H, filename) -> {H, H#head.filename}; finfo(H, file_size) -> case catch write_cache(H) of {H2, []} -> {H2, catch file_size(H#head.fptr, H#head.filename)}; {H2, _} = HeadError when is_record(H2, head) -> HeadError end; finfo(H, fixed) -> %% true if fixtable/2 has been called {H, not (H#head.fixed =:= false)}; finfo(H, hash) -> {H, H#head.hash_bif}; finfo(H, keypos) -> {H, H#head.keypos}; finfo(H, memory) -> finfo(H, file_size); finfo(H, no_objects) -> finfo(H, size); finfo(H, no_keys) -> case catch write_cache(H) of {H2, []} -> {H2, H2#head.no_keys}; {H2, _} = HeadError when is_record(H2, head) -> HeadError end; finfo(H, no_slots) -> {H, (H#head.mod):no_slots(H)}; finfo(H, pid) -> {H, self()}; finfo(H, ram_file) -> {H, H#head.ram_file}; finfo(H, safe_fixed) -> {H, H#head.fixed}; finfo(H, size) -> case catch write_cache(H) of {H2, []} -> {H2, H2#head.no_objects}; {H2, _} = HeadError when is_record(H2, head) -> HeadError end; finfo(H, type) -> {H, H#head.type}; finfo(H, version) -> {H, H#head.version}; finfo(H, _) -> {H, undefined}. file_size(Fd, FileName) -> {ok, Pos} = dets_utils:position(Fd, FileName, eof), Pos. test_bchunk_format(_Head, undefined) -> false; test_bchunk_format(Head, _Term) when Head#head.version =:= 8 -> false; test_bchunk_format(Head, Term) -> dets_v9:try_bchunk_header(Term, Head) =/= not_ok. do_open_file([Fname, Verbose], Parent, Server, Ref) -> case catch fopen2(Fname, Ref) of {error, _Reason} = Error -> err(Error); {ok, Head} -> maybe_put(verbose, Verbose), {ok, Head#head{parent = Parent, server = Server}}; {'EXIT', _Reason} = Error -> Error; Bad -> error_logger:format ("** dets: Bug was found in open_file/1, reply was ~w.~n", [Bad]), {error, {dets_bug, Fname, Bad}} end; do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref) -> case catch fopen3(Tab, OpenArgs) of {error, {tooshort, _}} -> file:delete(OpenArgs#open_args.file), do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref); {error, _Reason} = Error -> err(Error); {ok, Head} -> maybe_put(verbose, Verb), {ok, Head#head{parent = Parent, server = Server}}; {'EXIT', _Reason} = Error -> Error; Bad -> error_logger:format ("** dets: Bug was found in open_file/2, arguments were~n" "** dets: ~w and reply was ~w.~n", [OpenArgs, Bad]), {error, {dets_bug, Tab, {open_file, OpenArgs}, Bad}} end. maybe_put(_, undefined) -> ignore; maybe_put(K, V) -> put(K, V). %% -> {Head, Result}, Result = ok | Error | {thrown, Error} | badarg finit(Head, InitFun, _Format, _NoSlots) when Head#head.access =:= read -> _ = (catch InitFun(close)), {Head, {error, {access_mode, Head#head.filename}}}; finit(Head, InitFun, _Format, _NoSlots) when Head#head.fixed =/= false -> _ = (catch InitFun(close)), {Head, {error, {fixed_table, Head#head.name}}}; finit(Head, InitFun, Format, NoSlots) -> case catch do_finit(Head, InitFun, Format, NoSlots) of {ok, NewHead} -> check_growth(NewHead), start_auto_save_timer(NewHead), {NewHead, ok}; badarg -> {Head, badarg}; Error -> dets_utils:corrupt(Head, Error) end. %% -> {ok, NewHead} | throw(badarg) | throw(Error) do_finit(Head, Init, Format, NoSlots) -> #head{fptr = Fd, type = Type, keypos = Kp, auto_save = Auto, cache = Cache, filename = Fname, ram_file = Ram, min_no_slots = MinSlots0, max_no_slots = MaxSlots, name = Tab, update_mode = UpdateMode, mod = HMod} = Head, CacheSz = dets_utils:cache_size(Cache), {How, Head1} = case Format of term when is_integer(NoSlots), NoSlots > MaxSlots -> throw(badarg); term -> MinSlots = choose_no_slots(NoSlots, MinSlots0), if UpdateMode =:= new_dirty, MinSlots =:= MinSlots0 -> {general_init, Head}; true -> ok = dets_utils:truncate(Fd, Fname, bof), {ok, H} = HMod:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz, Auto, false), {general_init, H} end; bchunk -> ok = dets_utils:truncate(Fd, Fname, bof), {bchunk_init, Head} end, case How of bchunk_init -> case HMod:bchunk_init(Head1, Init) of {ok, NewHead} -> {ok, NewHead#head{update_mode = dirty}}; Error -> Error end; general_init -> Cntrs = ets:new(dets_init, []), Input = HMod:bulk_input(Head1, Init, Cntrs), SlotNumbers = {Head1#head.min_no_slots, bulk_init, MaxSlots}, {Reply, SizeData} = do_sort(Head1, SlotNumbers, Input, Cntrs, Fname, not_used), Bulk = true, case Reply of {ok, NoDups, H1} -> fsck_copy(SizeData, H1, Bulk, NoDups); Else -> close_files(Bulk, SizeData, Head1), Else end end. %% -> {NewHead, [LookedUpObject]} | {NewHead, Error} flookup_keys(Head, Keys) -> case catch update_cache(Head, Keys, {lookup, nopid}) of {NewHead, [{_NoPid,Objs}]} -> {NewHead, Objs}; {NewHead, L} when is_list(L) -> {NewHead, lists:flatmap(fun({_Pid,OL}) -> OL end, L)}; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end. %% -> {NewHead, Result} fmatch_init(Head, #dets_cont{bin = eof}) -> {Head, '$end_of_table'}; fmatch_init(Head, C) -> case scan(Head, C) of {scan_error, Reason} -> dets_utils:corrupt_reason(Head, Reason); {Ts, NC} -> {Head, {cont, {Ts, NC}}} end. %% -> {NewHead, Result} fmatch(Head, MP, Spec, N) -> KeyPos = Head#head.keypos, case find_all_keys(Spec, KeyPos, []) of [] -> %% Complete match case catch write_cache(Head) of {NewHead, []} -> C0 = init_scan(NewHead, N), {NewHead, {cont, C0#dets_cont{match_program = MP}}}; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end; List -> Keys = lists:usort(List), {NewHead, Reply} = flookup_keys(Head, Keys), case Reply of Objs when is_list(Objs) -> MatchingObjs = ets:match_spec_run(Objs, MP), {NewHead, {done, MatchingObjs}}; Error -> {NewHead, Error} end end. find_all_keys([], _, Ks) -> Ks; find_all_keys([{H,_,_} | T], KeyPos, Ks) when is_tuple(H) -> case tuple_size(H) of Enough when Enough >= KeyPos -> Key = element(KeyPos, H), case contains_variable(Key) of true -> []; false -> find_all_keys(T, KeyPos, [Key | Ks]) end; _ -> find_all_keys(T, KeyPos, Ks) end; find_all_keys(_, _, _) -> []. contains_variable('_') -> true; contains_variable(A) when is_atom(A) -> case atom_to_list(A) of [$$ | T] -> case (catch list_to_integer(T)) of {'EXIT', _} -> false; _ -> true end; _ -> false end; contains_variable(T) when is_tuple(T) -> contains_variable(tuple_to_list(T)); contains_variable([]) -> false; contains_variable([H|T]) -> case contains_variable(H) of true -> true; false -> contains_variable(T) end; contains_variable(_) -> false. %% -> {NewHead, Res} fmatch_delete_init(Head, MP, Spec) -> KeyPos = Head#head.keypos, case catch case find_all_keys(Spec, KeyPos, []) of [] -> do_fmatch_delete_var_keys(Head, MP, Spec); List -> Keys = lists:usort(List), do_fmatch_constant_keys(Head, Keys, MP) end of {NewHead, _} = Reply when is_record(NewHead, head) -> Reply end. %% A note: If deleted objects reside in a bucket with other objects %% that are not deleted, the bucket is moved. If the address of the %% moved bucket is greater than original bucket address the kept %% objects will be read once again later on. %% -> {NewHead, Res} fmatch_delete(Head, C) -> case scan(Head, C) of {scan_error, Reason} -> dets_utils:corrupt_reason(Head, Reason); {[], _} -> {Head, {done, 0}}; {RTs, NC} -> MP = C#dets_cont.match_program, case catch filter_binary_terms(RTs, MP, []) of {'EXIT', _} -> Bad = dets_utils:bad_object(fmatch_delete, RTs), dets_utils:corrupt_reason(Head, Bad); Terms -> do_fmatch_delete(Head, Terms, NC) end end. do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_')) when Head#head.fixed =:= false -> %% Handle the case where the file is emptied efficiently. %% Empty the cache just to get the number of objects right. {Head1, []} = write_cache(Head), N = Head1#head.no_objects, case fdelete_all_objects(Head1) of {NewHead, ok} -> {NewHead, {done, N}}; Reply -> Reply end; do_fmatch_delete_var_keys(Head, MP, _Spec) -> {NewHead, []} = write_cache(Head), C0 = init_scan(NewHead, default), {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}. do_fmatch_constant_keys(Head, Keys, MP) -> case flookup_keys(Head, Keys) of {NewHead, ReadTerms} when is_list(ReadTerms) -> Terms = filter_terms(ReadTerms, MP, []), do_fmatch_delete(NewHead, Terms, fixed); Reply -> Reply end. filter_binary_terms([Bin | Bins], MP, L) -> Term = binary_to_term(Bin), case ets:match_spec_run([Term], MP) of [true] -> filter_binary_terms(Bins, MP, [Term | L]); _ -> filter_binary_terms(Bins, MP, L) end; filter_binary_terms([], _MP, L) -> L. filter_terms([Term | Terms], MP, L) -> case ets:match_spec_run([Term], MP) of [true] -> filter_terms(Terms, MP, [Term | L]); _ -> filter_terms(Terms, MP, L) end; filter_terms([], _MP, L) -> L. do_fmatch_delete(Head, Terms, What) -> N = length(Terms), case do_delete(Head, Terms, delete_object) of {NewHead, ok} when What =:= fixed -> {NewHead, {done, N}}; {NewHead, ok} -> {NewHead, {cont, What, N}}; Reply -> Reply end. do_delete(Head, Things, What) -> case catch update_cache(Head, Things, What) of {NewHead, []} -> {NewHead, ok}; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end. fmember(Head, Key) -> case catch begin {Head2, [{_NoPid,Objs}]} = update_cache(Head, [Key], {lookup, nopid}), {Head2, Objs =/= []} end of {NewHead, _} = Reply when is_record(NewHead, head) -> Reply end. fnext(Head, Key) -> Slot = (Head#head.mod):db_hash(Key, Head), Ref = make_ref(), case catch {Ref, fnext(Head, Key, Slot)} of {Ref, {H, R}} -> {H, {ok, R}}; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end. fnext(H, Key, Slot) -> {NH, []} = write_cache(H), case (H#head.mod):slot_objs(NH, Slot) of '$end_of_table' -> {NH, '$end_of_table'}; L -> fnext_search(NH, Key, Slot, L) end. fnext_search(H, K, Slot, L) -> Kp = H#head.keypos, case beyond_key(K, Kp, L) of [] -> fnext_slot(H, K, Slot+1); L2 -> {H, element(H#head.keypos, hd(L2))} end. %% We've got to continue to search for the next key in the next slot fnext_slot(H, K, Slot) -> case (H#head.mod):slot_objs(H, Slot) of '$end_of_table' -> {H, '$end_of_table'}; [] -> fnext_slot(H, K, Slot+1); L -> {H, element(H#head.keypos, hd(L))} end. beyond_key(_K, _Kp, []) -> []; beyond_key(K, Kp, [H|T]) -> case dets_utils:cmp(element(Kp, H), K) of 0 -> beyond_key2(K, Kp, T); _ -> beyond_key(K, Kp, T) end. beyond_key2(_K, _Kp, []) -> []; beyond_key2(K, Kp, [H|T]=L) -> case dets_utils:cmp(element(Kp, H), K) of 0 -> beyond_key2(K, Kp, T); _ -> L end. %% Open an already existing file, no arguments %% -> {ok, head()} | throw(Error) fopen2(Fname, Tab) -> case file:read_file_info(Fname) of {ok, _} -> Acc = read_write, Ram = false, %% Fd is not always closed upon error, but exit is soon called. {ok, Fd, FH} = read_file_header(Fname, Acc, Ram), Mod = FH#fileheader.mod, case Mod:check_file_header(FH, Fd) of {error, not_closed} -> io:format(user,"dets: file ~p not properly closed, " "repairing ...~n", [Fname]), Version = default, case fsck(Fd, Tab, Fname, FH, default, default, Version) of ok -> fopen2(Fname, Tab); Error -> throw(Error) end; {ok, Head, ExtraInfo} -> open_final(Head, Fname, Acc, Ram, ?DEFAULT_CACHE, Tab, ExtraInfo, false); {error, Reason} -> throw({error, {Reason, Fname}}) end; Error -> dets_utils:file_error(Fname, Error) end. %% Open and possibly create and initialize a file %% -> {ok, head()} | throw(Error) fopen3(Tab, OpenArgs) -> FileName = OpenArgs#open_args.file, case file:read_file_info(FileName) of {ok, _} -> fopen_existing_file(Tab, OpenArgs); Error when OpenArgs#open_args.access =:= read -> dets_utils:file_error(FileName, Error); _Error -> fopen_init_file(Tab, OpenArgs) end. fopen_existing_file(Tab, OpenArgs) -> #open_args{file = Fname, type = Type, keypos = Kp, repair = Rep, min_no_slots = MinSlots, max_no_slots = MaxSlots, ram_file = Ram, delayed_write = CacheSz, auto_save = Auto, access = Acc, version = Version, debug = Debug} = OpenArgs, %% Fd is not always closed upon error, but exit is soon called. {ok, Fd, FH} = read_file_header(Fname, Acc, Ram), V9 = (Version =:= 9) or (Version =:= default), MinF = (MinSlots =:= default) or (MinSlots =:= FH#fileheader.min_no_slots), MaxF = (MaxSlots =:= default) or (MaxSlots =:= FH#fileheader.max_no_slots), Do = case (FH#fileheader.mod):check_file_header(FH, Fd) of {ok, Head, true} when Rep =:= force, Acc =:= read_write, FH#fileheader.version =:= 9, FH#fileheader.no_colls =/= undefined, MinF, MaxF, V9 -> {compact, Head}; {ok, _Head, _Extra} when Rep =:= force, Acc =:= read -> throw({error, {access_mode, Fname}}); {ok, Head, need_compacting} when Acc =:= read -> {final, Head, true}; % Version 8 only. {ok, _Head, need_compacting} when Rep =:= true -> %% The file needs to be compacted due to a very big %% and fragmented free_list. Version 8 only. M = " is now compacted ...", {repair, M}; {ok, _Head, _Extra} when Rep =:= force -> M = ", repair forced.", {repair, M}; {ok, Head, ExtraInfo} -> {final, Head, ExtraInfo}; {error, not_closed} when Rep =:= force, Acc =:= read_write -> M = ", repair forced.", {repair, M}; {error, not_closed} when Rep =:= true, Acc =:= read_write -> M = " not properly closed, repairing ...", {repair, M}; {error, not_closed} when Rep =:= false -> throw({error, {needs_repair, Fname}}); {error, version_bump} when Rep =:= true, Acc =:= read_write -> %% Version 8 only M = " old version, upgrading ...", {repair, M}; {error, Reason} -> throw({error, {Reason, Fname}}) end, case Do of _ when FH#fileheader.type =/= Type -> throw({error, {type_mismatch, Fname}}); _ when FH#fileheader.keypos =/= Kp -> throw({error, {keypos_mismatch, Fname}}); {compact, SourceHead} -> io:format(user, "dets: file ~p is now compacted ...~n", [Fname]), {ok, NewSourceHead} = open_final(SourceHead, Fname, read, false, ?DEFAULT_CACHE, Tab, true, Debug), case catch compact(NewSourceHead) of ok -> erlang:garbage_collect(), fopen3(Tab, OpenArgs#open_args{repair = false}); _Err -> _ = file:close(Fd), dets_utils:stop_disk_map(), io:format(user, "dets: compaction of file ~p failed, " "now repairing ...~n", [Fname]), {ok, Fd2, _FH} = read_file_header(Fname, Acc, Ram), do_repair(Fd2, Tab, Fname, FH, MinSlots, MaxSlots, Version, OpenArgs) end; {repair, Mess} -> io:format(user, "dets: file ~p~s~n", [Fname, Mess]), do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version, OpenArgs); _ when FH#fileheader.version =/= Version, Version =/= default -> throw({error, {version_mismatch, Fname}}); {final, H, EI} -> H1 = H#head{auto_save = Auto}, open_final(H1, Fname, Acc, Ram, CacheSz, Tab, EI, Debug) end. do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version, OpenArgs) -> case fsck(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version) of ok -> %% No need to update 'version'. erlang:garbage_collect(), fopen3(Tab, OpenArgs#open_args{repair = false}); Error -> throw(Error) end. %% -> {ok, head()} | throw(Error) open_final(Head, Fname, Acc, Ram, CacheSz, Tab, ExtraInfo, Debug) -> Head1 = Head#head{access = Acc, ram_file = Ram, filename = Fname, name = Tab, cache = dets_utils:new_cache(CacheSz)}, init_disk_map(Head1#head.version, Tab, Debug), Mod = Head#head.mod, Mod:cache_segps(Head1#head.fptr, Fname, Head1#head.next), Ftab = Mod:init_freelist(Head1, ExtraInfo), check_growth(Head1), NewHead = Head1#head{freelists = Ftab}, {ok, NewHead}. %% -> {ok, head()} | throw(Error) fopen_init_file(Tab, OpenArgs) -> #open_args{file = Fname, type = Type, keypos = Kp, min_no_slots = MinSlotsArg, max_no_slots = MaxSlotsArg, ram_file = Ram, delayed_write = CacheSz, auto_save = Auto, version = UseVersion, debug = Debug} = OpenArgs, MinSlots = choose_no_slots(MinSlotsArg, ?DEFAULT_MIN_NO_SLOTS), MaxSlots = choose_no_slots(MaxSlotsArg, ?DEFAULT_MAX_NO_SLOTS), FileSpec = if Ram -> []; true -> Fname end, {ok, Fd} = dets_utils:open(FileSpec, open_args(read_write, Ram)), Version = if UseVersion =:= default -> case os:getenv("DETS_USE_FILE_FORMAT") of "8" -> 8; _ -> 9 end; true -> UseVersion end, Mod = version2module(Version), %% No need to truncate an empty file. init_disk_map(Version, Tab, Debug), case catch Mod:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz, Auto, true) of {error, Reason} when Ram -> file:close(Fd), throw({error, Reason}); {error, Reason} -> file:close(Fd), file:delete(Fname), throw({error, Reason}); {ok, Head} -> start_auto_save_timer(Head), %% init_table does not need to truncate and write header {ok, Head#head{update_mode = new_dirty}} end. %% Debug. init_disk_map(9, Name, Debug) -> case Debug orelse dets_utils:debug_mode() of true -> dets_utils:init_disk_map(Name); false -> ok end; init_disk_map(_Version, _Name, _Debug) -> ok. open_args(Access, RamFile) -> A1 = case Access of read -> []; read_write -> [write] end, A2 = case RamFile of true -> [ram]; false -> [raw] end, A1 ++ A2 ++ [binary, read]. version2module(V) when V =< 8 -> dets_v8; version2module(9) -> dets_v9. module2version(dets_v8) -> 8; module2version(dets_v9) -> 9; module2version(not_used) -> 9. %% -> ok | throw(Error) %% For version 9 tables only. compact(SourceHead) -> #head{name = Tab, filename = Fname, fptr = SFd, type = Type, keypos = Kp, ram_file = Ram, auto_save = Auto} = SourceHead, Tmp = tempfile(Fname), TblParms = dets_v9:table_parameters(SourceHead), {ok, Fd} = dets_utils:open(Tmp, open_args(read_write, false)), CacheSz = ?DEFAULT_CACHE, %% It is normally not possible to have two open tables in the same %% process since the process dictionary is used for caching %% segment pointers, but here is works anyway--when reading a file %% serially the pointers to not need to be used. Head = case catch dets_v9:prep_table_copy(Fd, Tab, Tmp, Type, Kp, Ram, CacheSz, Auto, TblParms) of {ok, H} -> H; Error -> file:close(Fd), file:delete(Tmp), throw(Error) end, case dets_v9:compact_init(SourceHead, Head, TblParms) of {ok, NewHead} -> R = case fclose(NewHead) of ok -> ok = file:close(SFd), %% Save (rename) Fname first? dets_utils:rename(Tmp, Fname); E -> E end, if R =:= ok -> ok; true -> file:delete(Tmp), throw(R) end; Err -> file:close(Fd), file:delete(Tmp), throw(Err) end. %% -> ok | Error %% Closes Fd. fsck(Fd, Tab, Fname, FH, MinSlotsArg, MaxSlotsArg, Version) -> %% MinSlots and MaxSlots are the option values. #fileheader{min_no_slots = MinSlotsFile, max_no_slots = MaxSlotsFile} = FH, EstNoSlots0 = file_no_things(FH), MinSlots = choose_no_slots(MinSlotsArg, MinSlotsFile), MaxSlots = choose_no_slots(MaxSlotsArg, MaxSlotsFile), EstNoSlots = erlang:min(MaxSlots, erlang:max(MinSlots, EstNoSlots0)), SlotNumbers = {MinSlots, EstNoSlots, MaxSlots}, %% When repairing: We first try and sort on slots using MinSlots. %% If the number of objects (keys) turns out to be significantly %% different from NoSlots, we try again with the correct number of %% objects (keys). case fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) of {try_again, BetterNoSlots} -> BetterSlotNumbers = {MinSlots, BetterNoSlots, MaxSlots}, case fsck_try(Fd, Tab, FH, Fname, BetterSlotNumbers, Version) of {try_again, _} -> file:close(Fd), {error, {cannot_repair, Fname}}; Else -> Else end; Else -> Else end. choose_no_slots(default, NoSlots) -> NoSlots; choose_no_slots(NoSlots, _) -> NoSlots. %% -> ok | {try_again, integer()} | Error %% Closes Fd unless {try_again, _} is returned. %% Initiating a table using a fun and repairing (or converting) a %% file are completely different things, but nevertheless the same %% method is used in both cases... fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) -> Tmp = tempfile(Fname), #fileheader{type = Type, keypos = KeyPos} = FH, {_MinSlots, EstNoSlots, MaxSlots} = SlotNumbers, OpenArgs = #open_args{file = Tmp, type = Type, keypos = KeyPos, repair = false, min_no_slots = EstNoSlots, max_no_slots = MaxSlots, ram_file = false, delayed_write = ?DEFAULT_CACHE, auto_save = infinity, access = read_write, version = Version, debug = false}, case catch fopen3(Tab, OpenArgs) of {ok, Head} -> case fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) of {ok, NewHead} -> R = case fclose(NewHead) of ok -> %% Save (rename) Fname first? dets_utils:rename(Tmp, Fname); Error -> Error end, if R =:= ok -> ok; true -> file:delete(Tmp), R end; TryAgainOrError -> file:delete(Tmp), TryAgainOrError end; Error -> file:close(Fd), Error end. tempfile(Fname) -> Tmp = lists:concat([Fname, ".TMP"]), case file:delete(Tmp) of {error, eacces} -> % 'dets_process_died' happened anyway... (W-nd-ws) timer:sleep(5000), file:delete(Tmp); _ -> ok end, Tmp. %% -> {ok, NewHead} | {try_again, integer()} | Error fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) -> %% Mod is the module to use for reading input when repairing. Mod = FH#fileheader.mod, Cntrs = ets:new(dets_repair, []), Input = Mod:fsck_input(Head, Fd, Cntrs, FH), {Reply, SizeData} = do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod), Bulk = false, case Reply of {ok, NoDups, H1} -> file:close(Fd), fsck_copy(SizeData, H1, Bulk, NoDups); {try_again, _} = Return -> close_files(Bulk, SizeData, Head), Return; Else -> file:close(Fd), close_files(Bulk, SizeData, Head), Else end. do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod) -> OldV = module2version(Mod), %% output_objs/4 replaces {LogSize,NoObjects} in Cntrs by %% {LogSize,Position,Data,NoObjects | NoCollections}. %% Data = {FileName,FileDescriptor} | [object()] %% For small tables Data may be a list of objects which is more %% efficient since no temporary files are created. Output = (Head#head.mod):output_objs(OldV, Head, SlotNumbers, Cntrs), TmpDir = filename:dirname(Fname), Reply = (catch file_sorter:sort(Input, Output, [{format, binary},{tmpdir, TmpDir}])), L = ets:tab2list(Cntrs), ets:delete(Cntrs), {Reply, lists:reverse(lists:keysort(1, L))}. fsck_copy([{_LogSz, Pos, Bins, _NoObjects} | SizeData], Head, _Bulk, NoDups) when is_list(Bins) -> true = NoDups =:= 0, PWs = [{Pos,Bins} | lists:map(fun({_, P, B, _}) -> {P, B} end, SizeData)], #head{fptr = Fd, filename = FileName} = Head, dets_utils:pwrite(Fd, FileName, PWs), {ok, Head#head{update_mode = dirty}}; fsck_copy(SizeData, Head, Bulk, NoDups) -> catch fsck_copy1(SizeData, Head, Bulk, NoDups). fsck_copy1([SzData | L], Head, Bulk, NoDups) -> Out = Head#head.fptr, {LogSz, Pos, {FileName, Fd}, NoObjects} = SzData, Size = if NoObjects =:= 0 -> 0; true -> ?POW(LogSz-1) end, ExpectedSize = Size * NoObjects, close_tmp(Fd), case file:position(Out, Pos) of {ok, Pos} -> ok; PError -> dets_utils:file_error(FileName, PError) end, {ok, Pos} = file:position(Out, Pos), CR = file:copy({FileName, [raw,binary]}, Out), file:delete(FileName), case CR of {ok, Copied} when Copied =:= ExpectedSize; NoObjects =:= 0 -> % the segments fsck_copy1(L, Head, Bulk, NoDups); {ok, Copied} when Bulk, Head#head.version =:= 8 -> NoZeros = ExpectedSize - Copied, Dups = NoZeros div Size, Addr = Pos+Copied, NewHead = free_n_objects(Head, Addr, Size-1, NoDups), NewNoDups = NoDups - Dups, fsck_copy1(L, NewHead, Bulk, NewNoDups); {ok, _Copied} -> % should never happen close_files(Bulk, L, Head), Reason = if Bulk -> initialization_failed; true -> repair_failed end, {error, {Reason, Head#head.filename}}; FError -> close_files(Bulk, L, Head), dets_utils:file_error(FileName, FError) end; fsck_copy1([], Head, _Bulk, NoDups) when NoDups =/= 0 -> {error, {initialization_failed, Head#head.filename}}; fsck_copy1([], Head, _Bulk, _NoDups) -> {ok, Head#head{update_mode = dirty}}. free_n_objects(Head, _Addr, _Size, 0) -> Head; free_n_objects(Head, Addr, Size, N) -> {NewHead, _} = dets_utils:free(Head, Addr, Size), NewAddr = Addr + Size + 1, free_n_objects(NewHead, NewAddr, Size, N-1). close_files(false, SizeData, Head) -> file:close(Head#head.fptr), close_files(true, SizeData, Head); close_files(true, SizeData, _Head) -> Fun = fun({_Size, _Pos, {FileName, Fd}, _No}) -> close_tmp(Fd), file:delete(FileName); (_) -> ok end, lists:foreach(Fun, SizeData). close_tmp(Fd) -> file:close(Fd). fslot(H, Slot) -> case catch begin {NH, []} = write_cache(H), Objs = (NH#head.mod):slot_objs(NH, Slot), {NH, Objs} end of {NewHead, _Objects} = Reply when is_record(NewHead, head) -> Reply end. do_update_counter(Head, _Key, _Incr) when Head#head.type =/= set -> {Head, badarg}; do_update_counter(Head, Key, Incr) -> case flookup_keys(Head, [Key]) of {H1, [O]} -> Kp = H1#head.keypos, case catch try_update_tuple(O, Kp, Incr) of {'EXIT', _} -> {H1, badarg}; {New, Term} -> case finsert(H1, [Term]) of {H2, ok} -> {H2, New}; Reply -> Reply end end; {H1, []} -> {H1, badarg}; HeadError -> HeadError end. try_update_tuple(O, _Kp, {Pos, Incr}) -> try_update_tuple2(O, Pos, Incr); try_update_tuple(O, Kp, Incr) -> try_update_tuple2(O, Kp+1, Incr). try_update_tuple2(O, Pos, Incr) -> New = element(Pos, O) + Incr, {New, setelement(Pos, O, New)}. set_verbose(true) -> put(verbose, yes); set_verbose(_) -> erase(verbose). where_is_object(Head, Object) -> Keypos = Head#head.keypos, case check_objects([Object], Keypos) of true -> case catch write_cache(Head) of {NewHead, []} -> {NewHead, (Head#head.mod):find_object(NewHead, Object)}; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end; false -> {Head, badarg} end. check_objects([T | Ts], Kp) when tuple_size(T) >= Kp -> check_objects(Ts, Kp); check_objects(L, _Kp) -> L =:= []. no_things(Head) when Head#head.no_keys =:= undefined -> Head#head.no_objects; no_things(Head) -> Head#head.no_keys. file_no_things(FH) when FH#fileheader.no_keys =:= undefined -> FH#fileheader.no_objects; file_no_things(FH) -> FH#fileheader.no_keys. %%% The write cache is list of {Key, [Item]} where Item is one of %%% {Seq, delete_key}, {Seq, {lookup,Pid}}, {Seq, {delete_object,object()}}, %%% or {Seq, {insert,object()}}. Seq is a number that increases %%% monotonically for each item put in the cache. The purpose is to %%% make sure that items are sorted correctly. Sequences of delete and %%% insert operations are inserted in the cache without doing any file %%% operations. When the cache is considered full, a lookup operation %%% is requested, or after some delay, the contents of the cache are %%% written to the file, and the cache emptied. %%% %%% Data is not allowed to linger more than 'delay' milliseconds in %%% the write cache. A delayed_write message is received when some %%% datum has become too old. If 'wrtime' is equal to 'undefined', %%% then the cache is empty and no such delayed_write message has been %%% scheduled. Otherwise there is a delayed_write message scheduled, %%% and the value of 'wrtime' is the time when the cache was last %%% written, or when it was first updated after the cache was last %%% written. update_cache(Head, KeysOrObjects, What) -> {Head1, LU, PwriteList} = update_cache(Head, [{What,KeysOrObjects}]), {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList), {NewHead, LU}. %% -> {NewHead, [object()], pwrite_list()} | throw({Head, Error}) update_cache(Head, ToAdd) -> Cache = Head#head.cache, #cache{cache = C, csize = Size0, inserts = Ins} = Cache, NewSize = Size0 + erlang:external_size(ToAdd), %% The size is used as a sequence number here; it increases monotonically. {NewC, NewIns, Lookup, Found} = cache_binary(Head, ToAdd, C, Size0, Ins, false, []), NewCache = Cache#cache{cache = NewC, csize = NewSize, inserts = NewIns}, Head1 = Head#head{cache = NewCache}, if Lookup; NewSize >= Cache#cache.tsize -> %% The cache is considered full, or some lookup. {NewHead, LU, PwriteList} = (Head#head.mod):write_cache(Head1), {NewHead, Found ++ LU, PwriteList}; NewC =:= [] -> {Head1, Found, []}; Cache#cache.wrtime =:= undefined -> %% Empty cache. Schedule a delayed write. Now = now(), Me = self(), Call = ?DETS_CALL(Me, {delayed_write, Now}), erlang:send_after(Cache#cache.delay, Me, Call), {Head1#head{cache = NewCache#cache{wrtime = Now}}, Found, []}; Size0 =:= 0 -> %% Empty cache that has been written after the %% currently scheduled delayed write. {Head1#head{cache = NewCache#cache{wrtime = now()}}, Found, []}; true -> %% Cache is not empty, delayed write has been scheduled. {Head1, Found, []} end. cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu,F) when Q =:= delete_object -> cache_obj_op(Head, L, C, Seq, Ins, Lu, F, Os, Head#head.keypos, Q); cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu, F) when Q =:= insert -> NewIns = Ins + length(Os), cache_obj_op(Head, L, C, Seq, NewIns, Lu, F, Os, Head#head.keypos, Q); cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) when Q =:= delete_key -> cache_key_op(Head, L, C, Seq, Ins, Lu, F, Ks, Q); cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, _Lu, F) when C =:= [] -> % lookup cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q); cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) -> % lookup case dets_utils:cache_lookup(Head#head.type, Ks, C, []) of false -> cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q); Found -> {lookup,Pid} = Q, cache_binary(Head, L, C, Seq, Ins, Lu, [{Pid,Found} | F]) end; cache_binary(_Head, [], C, _Seq, Ins, Lu, F) -> {C, Ins, Lu, F}. cache_key_op(Head, L, C, Seq, Ins, Lu, F, [K | Ks], Q) -> E = {K, {Seq, Q}}, cache_key_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Ks, Q); cache_key_op(Head, L, C, Seq, Ins, Lu, F, [], _Q) -> cache_binary(Head, L, C, Seq, Ins, Lu, F). cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [O | Os], Kp, Q) -> E = {element(Kp, O), {Seq, {Q, O}}}, cache_obj_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Os, Kp, Q); cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [], _Kp, _Q) -> cache_binary(Head, L, C, Seq, Ins, Lu, F). %% Called after some delay. %% -> NewHead delayed_write(Head, WrTime) -> Cache = Head#head.cache, LastWrTime = Cache#cache.wrtime, if LastWrTime =:= WrTime -> %% The cache was not emptied during the last delay. case catch write_cache(Head) of {Head2, []} -> NewCache = (Head2#head.cache)#cache{wrtime = undefined}, Head2#head{cache = NewCache}; {NewHead, _Error} -> % Head.update_mode has been updated NewHead end; true -> %% The cache was emptied during the delay. %% Has anything been written since then? if Cache#cache.csize =:= 0 -> %% No, further delayed write not needed. NewCache = Cache#cache{wrtime = undefined}, Head#head{cache = NewCache}; true -> %% Yes, schedule a new delayed write. {MS1,S1,M1} = WrTime, {MS2,S2,M2} = LastWrTime, WrT = M1+1000000*(S1+1000000*MS1), LastWrT = M2+1000000*(S2+1000000*MS2), When = round((LastWrT - WrT)/1000), Me = self(), Call = ?DETS_CALL(Me, {delayed_write, LastWrTime}), erlang:send_after(When, Me, Call), Head end end. %% -> {NewHead, [LookedUpObject]} | throw({NewHead, Error}) write_cache(Head) -> {Head1, LU, PwriteList} = (Head#head.mod):write_cache(Head), {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList), {NewHead, LU}. status(Head) -> case Head#head.update_mode of saved -> ok; dirty -> ok; new_dirty -> ok; Error -> Error end. %%% Scan the file from start to end by reading chunks. %% -> dets_cont() init_scan(Head, NoObjs) -> check_safe_fixtable(Head), FreeLists = dets_utils:get_freelists(Head), Base = Head#head.base, {From, To} = dets_utils:find_next_allocated(FreeLists, Base, Base), #dets_cont{no_objs = NoObjs, bin = <<>>, alloc = {From, To, <<>>}}. check_safe_fixtable(Head) -> case (Head#head.fixed =:= false) andalso ((get(verbose) =:= yes) orelse dets_utils:debug_mode()) of true -> error_logger:format ("** dets: traversal of ~p needs safe_fixtable~n", [Head#head.name]); false -> ok end. %% -> {[RTerm], dets_cont()} | {scan_error, Reason} %% RTerm = {Pos, Next, Size, Status, Term} scan(_Head, #dets_cont{alloc = <<>>}=C) -> {[], C}; scan(Head, C) -> % when is_record(C, dets_cont) #dets_cont{no_objs = No, alloc = L0, bin = Bin} = C, {From, To, L} = L0, R = case No of default -> 0; _ when is_integer(No) -> -No-1 end, scan(Bin, Head, From, To, L, [], R, {C, Head#head.type}). scan(Bin, H, From, To, L, Ts, R, {C0, Type} = C) -> case (H#head.mod):scan_objs(H, Bin, From, To, L, Ts, R, Type) of {more, NFrom, NTo, NL, NTs, NR, Sz} -> scan_read(H, NFrom, NTo, Sz, NL, NTs, NR, C); {stop, <<>>=B, NFrom, NTo, <<>>=NL, NTs} -> Ftab = dets_utils:get_freelists(H), case dets_utils:find_next_allocated(Ftab, NFrom, H#head.base) of none -> {NTs, C0#dets_cont{bin = eof, alloc = B}}; _ -> {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}} end; {stop, B, NFrom, NTo, NL, NTs} -> {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}}; bad_object -> {scan_error, dets_utils:bad_object(scan, {From, To, Bin})} end. scan_read(_H, From, To, _Min, L0, Ts, R, {C, _Type}) when R >= ?CHUNK_SIZE -> %% We may have read (much) more than CHUNK_SIZE, if there are holes. L = {From, To, L0}, {Ts, C#dets_cont{bin = <<>>, alloc = L}}; scan_read(H, From, _To, Min, _L, Ts, R, C) -> Max = if Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; true -> Min end, FreeLists = dets_utils:get_freelists(H), case dets_utils:find_allocated(FreeLists, From, Max, H#head.base) of <<>>=Bin0 -> {Cont, _} = C, {Ts, Cont#dets_cont{bin = eof, alloc = Bin0}}; <<From1:32,To1:32,L1/binary>> -> case dets_utils:pread_n(H#head.fptr, From1, Max) of eof -> {scan_error, premature_eof}; NewBin -> scan(NewBin, H, From1, To1, L1, Ts, R, C) end end. err(Error) -> case get(verbose) of yes -> error_logger:format("** dets: failed with ~w~n", [Error]), Error; undefined -> Error end. %%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%% file_info(FileName) -> case catch read_file_header(FileName, read, false) of {ok, Fd, FH} -> file:close(Fd), (FH#fileheader.mod):file_info(FH); Other -> Other end. get_head_field(Fd, Field) -> dets_utils:read_4(Fd, Field). %% Dump the contents of a DAT file to the tty %% internal debug function which ignores the closed properly thingie %% and just tries anyway view(FileName) -> case catch read_file_header(FileName, read, false) of {ok, Fd, FH} -> Mod = FH#fileheader.mod, case Mod:check_file_header(FH, Fd) of {ok, H0, ExtraInfo} -> Ftab = Mod:init_freelist(H0, ExtraInfo), {_Bump, Base} = constants(FH, FileName), H = H0#head{freelists=Ftab, base = Base}, v_free_list(H), Mod:v_segments(H), file:close(Fd); X -> file:close(Fd), X end; X -> X end. v_free_list(Head) -> io:format("FREE LIST ...... \n",[]), io:format("~p~n", [dets_utils:all_free(Head)]), io:format("END OF FREE LIST \n",[]).