%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(dets).

%% Disk based linear hashing lookup dictionary.

%% Public.
-export([all/0,
	 bchunk/2,
         close/1,
         delete/2,
         delete_all_objects/1,
         delete_object/2,
         first/1,
         foldl/3,
         foldr/3,
         from_ets/2,
         info/1,
         info/2,
         init_table/2,
	 init_table/3,
         insert/2,
         insert_new/2,
         is_compatible_bchunk_format/2,
	 is_dets_file/1,
         lookup/2,
         match/1,
         match/2,
         match/3,
         match_delete/2,
         match_object/1,
         match_object/2,
         match_object/3,
         member/2,
         next/2,
         open_file/1,
         open_file/2,
         pid2name/1,
         repair_continuation/2,
         safe_fixtable/2,
         select/1,
         select/2,
         select/3,
         select_delete/2,
         slot/2,
         sync/1,
         table/1,
         table/2,
         to_ets/2,
         traverse/2,
         update_counter/3]).

%% Server export.
-export([start/0, stop/0]).

%% Internal exports.
-export([istart_link/1, init/2, internal_open/3, add_user/3, 
         internal_close/1, remove_user/2,
	 system_continue/3, system_terminate/4, system_code_change/4]).

%% Debug.
-export([file_info/1,
	 fsck/1,
         fsck/2,
	 get_head_field/2,
	 view/1,
	 where/2,
	 verbose/0,
	 verbose/1
	]).

%% Not documented, or not ready for publication.
-export([lookup_keys/2]).

-export_type([tab_name/0]).

-compile({inline, [{einval,2},{badarg,2},{undefined,1},
                   {badarg_exit,2},{lookup_reply,2}]}).

-include_lib("kernel/include/file.hrl").

-include("dets.hrl").

-type object()   :: tuple().
-type pattern()  :: atom() | tuple().
-type tab_name() :: atom() | reference().

%%% This is the implementation of the mnesia file storage. Each (non
%%% ram-copy) table is maintained in a corresponding .DAT file. The
%%% dat file is organized as a segmented linear hashlist. The head of
%%% the file with the split indicator, size etc is held in ram by the
%%% server at all times.
%%%
%%% The parts specific for formats up to and including 8(c) are
%%% implemented in dets_v8.erl, parts specific for format 9 are
%%% implemented in dets_v9.erl.

%%  The method of hashing is the so called linear hashing algorithm
%%  with segments. 
%%
%%  Linear hashing:
%%
%%         - n indicates next bucket to split (initially zero); 
%%         - m is the size of the hash table 
%%         - initially next = m and n = 0
%%  
%%         - to insert: 
%%                - hash = key mod m 
%%                - if hash < n then hash = key mod 2m 
%%                - when the number of objects exceeds the initial size
%%                  of the hash table, each insertion of an object
%%                  causes bucket n to be split:
%%                      - add a new bucket to the end of the table 
%%                      - redistribute the contents of bucket n 
%%                        using hash = key mod 2m 
%%                      - increment n 
%%                      - if n = m then m = 2m, n = 0 
%%         - to search: 
%%                hash = key mod m 
%%                if hash < n then hash = key mod 2m 
%%                do linear scan of the bucket 
%%  

%%% If a file error occurs on a working dets file, update_mode is set
%%% to the error tuple. When in 'error' mode, the free lists are not
%%% written, and a repair is forced next time the file is opened.

-record(dets_cont, {
         what,        % object | bindings | select | bchunk
	 no_objs,     % requested number of objects: default | integer() > 0
	 bin,         % small chunk not consumed, or 'eof' at end-of-file
	 alloc,       % the part of the file not yet scanned, mostly a binary
	 tab,
         proc,        % the pid of the Dets process
         match_program % true | compiled_match_spec() | undefined
	 }).

-record(open_args, {
          file,
          type,
          keypos,
          repair,
          min_no_slots,
	  max_no_slots,
          ram_file,
          delayed_write,
          auto_save,
          access,
          version,
          debug
         }).

-define(PATTERN_TO_OBJECT_MATCH_SPEC(Pat), [{Pat,[],['$_']}]).
-define(PATTERN_TO_BINDINGS_MATCH_SPEC(Pat), [{Pat,[],['$$']}]).
-define(PATTERN_TO_TRUE_MATCH_SPEC(Pat), [{Pat,[],[true]}]).

%%-define(DEBUGM(X, Y), io:format(X, Y)).
-define(DEBUGM(X, Y), true).

%%-define(DEBUGF(X,Y), io:format(X, Y)).
-define(DEBUGF(X,Y), void).

%%-define(PROFILE(C), C).
-define(PROFILE(C), void).

%%% Some further debug code was added in R12B-1 (stdlib-1.15.1):
%%% - there is a new open_file() option 'debug';
%%% - there is a new OS environment variable 'DETS_DEBUG';
%%% - verbose(true) implies that info messages are written onto
%%%   the error log whenever an unsafe traversal is started.
%%% The 'debug' mode (set by the open_file() option 'debug' or
%%% by os:putenv("DETS_DEBUG", "true")) implies that the results of
%%% calling pwrite() and pread() are tested to some extent. It also
%%% means a considerable overhead when it comes to RAM usage. The
%%% operation of Dets is also slowed down a bit. Note that in debug
%%% mode terms will be output on the error logger.

%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------

add_user(Pid, Tab, Args) ->
    req(Pid, {add_user, Tab, Args}).

-spec all() -> [tab_name()].

all() ->
    dets_server:all().

-type cont() :: #dets_cont{}.
-spec bchunk(tab_name(), 'start' | cont()) ->
    {cont(), binary() | tuple()} | '$end_of_table' | {'error', term()}.

bchunk(Tab, start) ->
    badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]);
bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) ->
    badarg(treq(Tab, {bchunk, State}), [Tab, State]);
bchunk(Tab, Term) ->
    erlang:error(badarg, [Tab, Term]).

-spec close(tab_name()) -> 'ok' | {'error', term()}.

close(Tab) ->  
    case dets_server:close(Tab) of
        badarg -> % Should not happen.
	    {error, not_owner}; % Backwards compatibility...
        Reply ->
            Reply
    end.

-spec delete(tab_name(), term()) -> 'ok' | {'error', term()}.

delete(Tab, Key) ->
    badarg(treq(Tab, {delete_key, [Key]}), [Tab, Key]).

-spec delete_all_objects(tab_name()) -> 'ok' | {'error', term()}.

delete_all_objects(Tab) ->
    case treq(Tab, delete_all_objects) of
	badarg ->
	    erlang:error(badarg, [Tab]);
	fixed ->
	    match_delete(Tab, '_');
	Reply ->
	    Reply
    end.

-spec delete_object(tab_name(), object()) -> 'ok' | {'error', term()}.

delete_object(Tab, O) ->
    badarg(treq(Tab, {delete_object, [O]}), [Tab, O]).

%% Given a filename, fsck it. Debug.
fsck(Fname) ->
    fsck(Fname, default).

fsck(Fname, Version) ->
    catch begin
      {ok, Fd, FH} = read_file_header(Fname, read, false),
      ?DEBUGF("FileHeader: ~p~n", [FH]),	    
      case (FH#fileheader.mod):check_file_header(FH, Fd) of
          {error, not_closed} ->
              fsck(Fd, make_ref(), Fname, FH, default, default, Version);
          {ok, _Head, _Extra} ->
              fsck(Fd, make_ref(), Fname, FH, default, default, Version);
          Error ->
              Error
      end
    end.

-spec first(tab_name()) -> term() | '$end_of_table'.

first(Tab) ->
    badarg_exit(treq(Tab, first), [Tab]).

-spec foldr(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}.

foldr(Fun, Acc, Tab) ->
    foldl(Fun, Acc, Tab).

-spec foldl(fun((object(), Acc) -> Acc), Acc, tab_name()) -> Acc | {'error', term()}.

foldl(Fun, Acc, Tab) ->
    Ref = make_ref(),
    do_traverse(Fun, Acc, Tab, Ref).

-spec from_ets(tab_name(), ets:tab()) -> 'ok' | {'error', term()}.

from_ets(DTab, ETab) ->
    ets:safe_fixtable(ETab, true),
    Spec = ?PATTERN_TO_OBJECT_MATCH_SPEC('_'),
    LC = ets:select(ETab, Spec, 100),
    InitFun = from_ets_fun(LC, ETab),
    Reply = treq(DTab, {initialize, InitFun, term, default}),
    ets:safe_fixtable(ETab, false),
    case Reply of 
        {thrown, Thrown} -> throw(Thrown);
        Else -> badarg(Else, [DTab, ETab])
    end.

from_ets_fun(LC, ETab) ->
    fun(close) ->
            ok;
       (read) when LC =:= '$end_of_table' ->
            end_of_input;
       (read) ->
            {L, C} = LC,
            {L, from_ets_fun(ets:select(C), ETab)}
    end.

info(Tab) ->
    case catch dets_server:get_pid(Tab) of
	{'EXIT', _Reason} ->
	    undefined;
	Pid ->
	    undefined(req(Pid, info))
    end.

info(Tab, owner) ->
    case catch dets_server:get_pid(Tab) of
	Pid when is_pid(Pid) ->
	    Pid;
	_ ->
	    undefined
    end;
info(Tab, users) -> % undocumented
    case dets_server:users(Tab) of
	[] ->
	    undefined;
	Users ->
	    Users
    end;
info(Tab, Tag) ->
    case catch dets_server:get_pid(Tab) of
	{'EXIT', _Reason} ->
	    undefined;
	Pid ->
	    undefined(req(Pid, {info, Tag}))
    end.

init_table(Tab, InitFun) ->
    init_table(Tab, InitFun, []).

init_table(Tab, InitFun, Options) when is_function(InitFun) ->
    case options(Options, [format, min_no_slots]) of
	{badarg,_} -> 
	    erlang:error(badarg, [Tab, InitFun, Options]);
	[Format, MinNoSlots] ->
	    case treq(Tab, {initialize, InitFun, Format, MinNoSlots}) of
		{thrown, Thrown} -> throw(Thrown);
		Else -> badarg(Else, [Tab, InitFun, Options])
	    end
    end;
init_table(Tab, InitFun, Options) ->
    erlang:error(badarg, [Tab, InitFun, Options]).

insert(Tab, Objs) when is_list(Objs) ->
    badarg(treq(Tab, {insert, Objs}), [Tab, Objs]);
insert(Tab, Obj) ->
    badarg(treq(Tab, {insert, [Obj]}), [Tab, Obj]).

insert_new(Tab, Objs) when is_list(Objs) ->
    badarg(treq(Tab, {insert_new, Objs}), [Tab, Objs]);
insert_new(Tab, Obj) ->
    badarg(treq(Tab, {insert_new, [Obj]}), [Tab, Obj]).

internal_close(Pid) ->
    req(Pid, close).

internal_open(Pid, Ref, Args) ->
    req(Pid, {internal_open, Ref, Args}).

is_compatible_bchunk_format(Tab, Term) ->
    badarg(treq(Tab, {is_compatible_bchunk_format, Term}), [Tab, Term]).

is_dets_file(FileName) ->
    case catch read_file_header(FileName, read, false) of
	{ok, Fd, FH} ->
	    file:close(Fd),
	    FH#fileheader.cookie =:= ?MAGIC;
	{error, {tooshort, _}} ->
	    false;
	{error, {not_a_dets_file, _}} ->
	    false;
	Other ->
	    Other
    end.

lookup(Tab, Key) ->
    badarg(treq(Tab, {lookup_keys, [Key]}), [Tab, Key]).

%% Not public.
lookup_keys(Tab, Keys) ->
    case catch lists:usort(Keys) of
	UKeys when is_list(UKeys), UKeys =/= [] ->
	    badarg(treq(Tab, {lookup_keys, UKeys}), [Tab, Keys]);
	_Else ->
	    erlang:error(badarg, [Tab, Keys])
    end.

match(Tab, Pat) ->
    badarg(safe_match(Tab, Pat, bindings), [Tab, Pat]).

match(Tab, Pat, N) ->
    badarg(init_chunk_match(Tab, Pat, bindings, N), [Tab, Pat, N]).
    
match(State) when State#dets_cont.what =:= bindings ->
    badarg(chunk_match(State), [State]);
match(Term) ->
    erlang:error(badarg, [Term]).

-spec match_delete(tab_name(), pattern()) ->
	non_neg_integer() | 'ok' | {'error', term()}.

match_delete(Tab, Pat) ->
    badarg(match_delete(Tab, Pat, delete), [Tab, Pat]).

match_delete(Tab, Pat, What) ->
    safe_fixtable(Tab, true),    
    case compile_match_spec(What, Pat) of
	{Spec, MP} ->
	    Proc = dets_server:get_pid(Tab),
	    R = req(Proc, {match_delete_init, MP, Spec}),
	    do_match_delete(Tab, Proc, R, What, 0);
	badarg ->
	    badarg
    end.

do_match_delete(Tab, _Proc, {done, N1}, select, N) ->
    safe_fixtable(Tab, false),
    N + N1;
do_match_delete(Tab, _Proc, {done, _N1}, _What, _N) ->
    safe_fixtable(Tab, false),
    ok;
do_match_delete(Tab, Proc, {cont, State, N1}, What, N) ->
    do_match_delete(Tab, Proc, req(Proc, {match_delete, State}), What, N+N1);
do_match_delete(Tab, _Proc, Error, _What, _N) ->
    safe_fixtable(Tab, false),
    Error.

match_object(Tab, Pat) ->
    badarg(safe_match(Tab, Pat, object), [Tab, Pat]).

match_object(Tab, Pat, N) ->
    badarg(init_chunk_match(Tab, Pat, object, N), [Tab, Pat, N]).
    
match_object(State) when State#dets_cont.what =:= object ->
    badarg(chunk_match(State), [State]);
match_object(Term) ->
    erlang:error(badarg, [Term]).

member(Tab, Key) ->
    badarg(treq(Tab, {member, Key}), [Tab, Key]).    

next(Tab, Key) ->
    badarg_exit(treq(Tab, {next, Key}), [Tab, Key]).

%% Assuming that a file already exists, open it with the
%% parameters as already specified in the file itself.
%% Return a ref leading to the file.
open_file(File) ->
    case dets_server:open_file(to_list(File)) of
        badarg -> % Should not happen.
            erlang:error(dets_process_died, [File]);
        Reply -> 
            einval(Reply, [File])
    end.

open_file(Tab, Args) when is_list(Args) ->
    case catch defaults(Tab, Args) of
        OpenArgs when is_record(OpenArgs, open_args) ->
            case dets_server:open_file(Tab, OpenArgs) of
                badarg -> % Should not happen.
                    erlang:error(dets_process_died, [Tab, Args]);
                Reply -> 
                    einval(Reply, [Tab, Args])
            end;
	_ ->
	    erlang:error(badarg, [Tab, Args])
    end;
open_file(Tab, Arg) ->
    open_file(Tab, [Arg]).

pid2name(Pid) ->
    dets_server:pid2name(Pid).

remove_user(Pid, From) ->
    req(Pid, {close, From}).

repair_continuation(#dets_cont{match_program = B}=Cont, MS) 
    when is_binary(B) ->
    case ets:is_compiled_ms(B) of
	true ->
	    Cont;
	false ->
            Cont#dets_cont{match_program = ets:match_spec_compile(MS)}
    end;
repair_continuation(#dets_cont{}=Cont, _MS) ->
    Cont;
repair_continuation(T, MS) ->
    erlang:error(badarg, [T, MS]).

safe_fixtable(Tab, Bool) when Bool; not Bool ->
    badarg(treq(Tab, {safe_fixtable, Bool}), [Tab, Bool]);
safe_fixtable(Tab, Term) ->
    erlang:error(badarg, [Tab, Term]).

select(Tab, Pat) ->
    badarg(safe_match(Tab, Pat, select), [Tab, Pat]).

select(Tab, Pat, N) ->
    badarg(init_chunk_match(Tab, Pat, select, N), [Tab, Pat, N]).
    
select(State) when State#dets_cont.what =:= select ->
    badarg(chunk_match(State), [State]);
select(Term) ->
    erlang:error(badarg, [Term]).

select_delete(Tab, Pat) ->
    badarg(match_delete(Tab, Pat, select), [Tab, Pat]).

slot(Tab, Slot) when is_integer(Slot), Slot >= 0 ->
    badarg(treq(Tab, {slot, Slot}), [Tab, Slot]);
slot(Tab, Term) ->
    erlang:error(badarg, [Tab, Term]).

start() ->
    dets_server:start().

stop() ->
    dets_server:stop().

istart_link(Server) ->
    {ok, proc_lib:spawn_link(dets, init, [self(), Server])}.

sync(Tab) ->
    badarg(treq(Tab, sync), [Tab]).

table(Tab) ->
    table(Tab, []).

table(Tab, Opts) ->
    case options(Opts, [traverse, n_objects]) of
        {badarg,_} ->
            erlang:error(badarg, [Tab, Opts]);
        [Traverse, NObjs] ->
            TF = case Traverse of
                     first_next -> 
                         fun() -> qlc_next(Tab, first(Tab)) end;
                     select -> 
                         fun(MS) -> qlc_select(select(Tab, MS, NObjs)) end;
                     {select, MS} ->
                         fun() -> qlc_select(select(Tab, MS, NObjs)) end
                 end,
            PreFun = fun(_) -> safe_fixtable(Tab, true) end,
            PostFun = fun() -> safe_fixtable(Tab, false) end,
            InfoFun = fun(Tag) -> table_info(Tab, Tag) end,
            %% lookup_keys is not public, but convenient
            LookupFun = 
                case Traverse of
                    {select, _MS} -> 
                        undefined;
                    _ -> 
                        fun(_KeyPos, [K]) -> lookup(Tab, K);
                           (_KeyPos, Ks) -> lookup_keys(Tab, Ks) 
                        end
                end,
            FormatFun = 
                fun({all, _NElements, _ElementFun}) ->
                        As = [Tab | [Opts || _ <- [[]], Opts =/= []]],
                        {?MODULE, table, As};
                   ({match_spec, MS}) ->
                        {?MODULE, table, [Tab, [{traverse, {select, MS}} | 
                                                listify(Opts)]]};
                   ({lookup, _KeyPos, [Value], _NElements, ElementFun}) ->
                        io_lib:format("~w:lookup(~w, ~w)", 
                                      [?MODULE, Tab, ElementFun(Value)]);
                   ({lookup, _KeyPos, Values, _NElements, ElementFun}) ->
                        Vals = [ElementFun(V) || V <- Values],
                        io_lib:format("lists:flatmap(fun(V) -> "
                                      "~w:lookup(~w, V) end, ~w)", 
                                      [?MODULE, Tab, Vals])
                end,
            qlc:table(TF, [{pre_fun, PreFun}, {post_fun, PostFun}, 
                           {info_fun, InfoFun}, {format_fun, FormatFun},
                           {key_equality, '=:='},
                           {lookup_fun, LookupFun}])
    end.
         
qlc_next(_Tab, '$end_of_table') ->
    [];
qlc_next(Tab, Key) ->
    case lookup(Tab, Key) of
        Objects when is_list(Objects) ->
            Objects ++ fun() -> qlc_next(Tab, next(Tab, Key)) end;
        Error ->
            %% Do what first and next do.
            exit(Error)
    end.

qlc_select('$end_of_table') -> 
    [];
qlc_select({Objects, Cont}) when is_list(Objects) -> 
    Objects ++ fun() -> qlc_select(select(Cont)) end;
qlc_select(Error) ->
    Error.

table_info(Tab, num_of_objects) ->
    info(Tab, size);
table_info(Tab, keypos) ->
    info(Tab, keypos);
table_info(Tab, is_unique_objects) ->
    info(Tab, type) =/= duplicate_bag;
table_info(_Tab, _) ->
    undefined.

%% End of table/2.

to_ets(DTab, ETab) ->
    case ets:info(ETab, protection) of
	undefined ->
	    erlang:error(badarg, [DTab, ETab]);
        _ ->
	    Fun = fun(X, T) -> true = ets:insert(T, X), T end,
	    foldl(Fun, ETab, DTab)
    end.

traverse(Tab, Fun) ->
    Ref = make_ref(),
    TFun = 
	fun(O, Acc) ->
		case Fun(O) of
		    continue  ->
			Acc;
		    {continue, Val} ->
			[Val | Acc];
		    {done, Value} ->
			throw({Ref, [Value | Acc]});
		    Other ->
			throw({Ref, Other})
		end
	end,
    do_traverse(TFun, [], Tab, Ref).

update_counter(Tab, Key, C) ->
    badarg(treq(Tab, {update_counter, Key, C}), [Tab, Key, C]).

verbose() ->           
    verbose(true).

verbose(What) ->
    ok = dets_server:verbose(What),
    All = dets_server:all(),
    Fun = fun(Tab) -> treq(Tab, {set_verbose, What}) end,
    lists:foreach(Fun, All),
    All.

%% Where in the (open) table is Object located? 
%% The address of the first matching object is returned.
%% Format 9 returns the address of the object collection.
%% -> {ok, Address} | false
where(Tab, Object) ->
    badarg(treq(Tab, {where, Object}), [Tab, Object]).

do_traverse(Fun, Acc, Tab, Ref) ->
    safe_fixtable(Tab, true),    
    Proc = dets_server:get_pid(Tab),
    try
        do_trav(Proc, Acc, Fun)
    catch {Ref, Result} ->
        Result
    after 
        safe_fixtable(Tab, false)
    end.

do_trav(Proc, Acc, Fun) ->
    {Spec, MP} = compile_match_spec(object, '_'),
    %% MP not used
    case req(Proc, {match, MP, Spec, default}) of
	{cont, State} ->
	    do_trav(State, Proc, Acc, Fun);
	Error ->
	    Error
    end.
    
do_trav(#dets_cont{bin = eof}, _Proc, Acc, _Fun) ->
    Acc;
do_trav(State, Proc, Acc, Fun) ->
    case req(Proc, {match_init, State}) of
	{cont, {Bins, NewState}} ->
	    do_trav_bins(NewState, Proc, Acc, Fun, lists:reverse(Bins));
	Error ->
	    Error
    end.

do_trav_bins(State, Proc, Acc, Fun, []) ->
    do_trav(State, Proc, Acc, Fun);
do_trav_bins(State, Proc, Acc, Fun, [Bin | Bins]) ->
    %% Unpack one binary at a time, using the client's heap.
    case catch binary_to_term(Bin) of 
	{'EXIT', _} ->
	    req(Proc, {corrupt, dets_utils:bad_object(do_trav_bins, Bin)});
	Term ->
	    NewAcc = Fun(Term, Acc),
	    do_trav_bins(State, Proc, NewAcc, Fun, Bins)
    end.

safe_match(Tab, Pat, What) ->
    safe_fixtable(Tab, true),
    R = do_safe_match(init_chunk_match(Tab, Pat, What, default), []),
    safe_fixtable(Tab, false),
    R.
    
do_safe_match({error, Error}, _L) ->
    {error, Error};
do_safe_match({L, C}, LL) ->
    do_safe_match(chunk_match(C), L++LL);
do_safe_match('$end_of_table', L) ->
    L;
do_safe_match(badarg, _L) ->
    badarg.

%% What = object | bindings | select
init_chunk_match(Tab, Pat, What, N) when is_integer(N), N >= 0; 
                                         N =:= default ->
    case compile_match_spec(What, Pat) of
	{Spec, MP} ->
            Proc = dets_server:get_pid(Tab),
	    case req(Proc, {match, MP, Spec, N}) of
		{done, L} ->
		    {L, #dets_cont{tab = Tab, proc = Proc, what = What,
                                   bin = eof}};
		{cont, State} ->
		    chunk_match(State#dets_cont{what = What, tab = Tab,
                                                proc = Proc});
		Error ->
		    Error
	    end;
	badarg ->
	    badarg
    end;
init_chunk_match(_Tab, _Pat, _What, _) ->
    badarg.

chunk_match(#dets_cont{proc = Proc}=State) ->
    case req(Proc, {match_init, State}) of
        '$end_of_table'=Reply ->
            Reply;
        {cont, {Bins, NewState}} ->
            MP = NewState#dets_cont.match_program,
            case catch do_foldl_bins(Bins, MP) of
                {'EXIT', _} ->
                    case ets:is_compiled_ms(MP) of
                        true ->
                            Bad = dets_utils:bad_object(chunk_match, Bins),
                            req(Proc, {corrupt, Bad});
                        false ->
                            badarg
                    end;
                [] ->
                    chunk_match(NewState);
                Terms ->
                    {Terms, NewState}
            end;
        Error ->
            Error
    end.

do_foldl_bins(Bins, true) ->
    foldl_bins(Bins, []);
do_foldl_bins(Bins, MP) ->
    foldl_bins(Bins, MP, []).

foldl_bins([], Terms) ->
    %% Preserve time order (version 9). 
    Terms;
foldl_bins([Bin | Bins], Terms) ->    
    foldl_bins(Bins, [binary_to_term(Bin) | Terms]).

foldl_bins([], _MP, Terms) ->
    %% Preserve time order (version 9).
    Terms;
foldl_bins([Bin | Bins], MP, Terms) ->
    Term = binary_to_term(Bin),
    case ets:match_spec_run([Term], MP) of
	[] ->
	    foldl_bins(Bins, MP, Terms);
	[Result] ->
	    foldl_bins(Bins, MP, [Result | Terms])
    end.

%% -> {Spec, binary()} | badarg
compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC('_') = Spec) ->
    {Spec, true};
compile_match_spec(select, Spec) ->
    case catch ets:match_spec_compile(Spec) of
	X when is_binary(X) ->
	    {Spec, X};
	_ ->
	    badarg
    end;
compile_match_spec(object, Pat) ->
    compile_match_spec(select, ?PATTERN_TO_OBJECT_MATCH_SPEC(Pat));
compile_match_spec(bindings, Pat) ->
    compile_match_spec(select, ?PATTERN_TO_BINDINGS_MATCH_SPEC(Pat));
compile_match_spec(delete, Pat) ->
    compile_match_spec(select, ?PATTERN_TO_TRUE_MATCH_SPEC(Pat)).

%% Process the args list as provided to open_file/2.
defaults(Tab, Args) ->
    Defaults0 = #open_args{file = to_list(Tab),
                           type = set,
                           keypos = 1,
                           repair = true, 
                           min_no_slots = default,
                           max_no_slots = default,
                           ram_file = false,
                           delayed_write = ?DEFAULT_CACHE,
                           auto_save = timer:minutes(?DEFAULT_AUTOSAVE),
                           access = read_write,
                           version = default,
                           debug = false},
    Fun = fun repl/2,
    Defaults = lists:foldl(Fun, Defaults0, Args),
    case Defaults#open_args.version of
        8 ->
            Defaults#open_args{max_no_slots = default};
        _ ->
            is_comp_min_max(Defaults)
    end.

to_list(T) when is_atom(T) -> atom_to_list(T);
to_list(T) -> T.

repl({access, A}, Defs) ->
    mem(A, [read, read_write]),
    Defs#open_args{access = A};
repl({auto_save, Int}, Defs) when is_integer(Int), Int >= 0 ->
    Defs#open_args{auto_save = Int};
repl({auto_save, infinity}, Defs) ->
    Defs#open_args{auto_save =infinity};
repl({cache_size, Int}, Defs) when is_integer(Int), Int >= 0 ->
    %% Recognized, but ignored.
    Defs;
repl({cache_size, infinity}, Defs) ->
    Defs;
repl({delayed_write, default}, Defs) ->
    Defs#open_args{delayed_write = ?DEFAULT_CACHE};
repl({delayed_write, {Delay,Size} = C}, Defs) 
          when is_integer(Delay), Delay >= 0, is_integer(Size), Size >= 0 ->
    Defs#open_args{delayed_write = C};
repl({estimated_no_objects, I}, Defs)  ->
    repl({min_no_slots, I}, Defs);
repl({file, File}, Defs) ->
    Defs#open_args{file = to_list(File)};
repl({keypos, P}, Defs) when is_integer(P), P > 0 ->
    Defs#open_args{keypos =P};
repl({max_no_slots, I}, Defs)  ->
    %% Version 9 only.
    MaxSlots = is_max_no_slots(I),
    Defs#open_args{max_no_slots = MaxSlots};
repl({min_no_slots, I}, Defs)  ->
    MinSlots = is_min_no_slots(I),
    Defs#open_args{min_no_slots = MinSlots};
repl({ram_file, Bool}, Defs) ->
    mem(Bool, [true, false]),
    Defs#open_args{ram_file = Bool};
repl({repair, T}, Defs) ->
    mem(T, [true, false, force]),
    Defs#open_args{repair = T};
repl({type, T}, Defs) ->
    mem(T, [set, bag, duplicate_bag]),
    Defs#open_args{type =T};
repl({version, Version}, Defs) ->
    V = is_version(Version),
    Defs#open_args{version = V};
repl({debug, Bool}, Defs) ->
    %% Not documented.
    mem(Bool, [true, false]),
    Defs#open_args{debug = Bool};
repl({_, _}, _) ->
    exit(badarg).

is_min_no_slots(default) -> default;
is_min_no_slots(I) when is_integer(I), I >= ?DEFAULT_MIN_NO_SLOTS -> I;
is_min_no_slots(I) when is_integer(I), I >= 0 -> ?DEFAULT_MIN_NO_SLOTS.

is_max_no_slots(default) -> default;
is_max_no_slots(I) when is_integer(I), I > 0, I < 1 bsl 31 -> I.

is_comp_min_max(Defs) ->
    #open_args{max_no_slots = Max, min_no_slots = Min, version = V} = Defs,
    case V of
	_ when Min =:= default -> Defs;
	_ when Max =:= default -> Defs;
	_ -> true = Min =< Max, Defs
    end.

is_version(default) -> default;
is_version(8) -> 8;
is_version(9) -> 9.

mem(X, L) ->
    case lists:member(X, L) of
	true -> true;
	false -> exit(badarg)
    end.

options(Options, Keys) when is_list(Options) ->
    options(Options, Keys, []);
options(Option, Keys) ->
    options([Option], Keys, []).

options(Options, [Key | Keys], L) when is_list(Options) ->
    V = case lists:keysearch(Key, 1, Options) of
	    {value, {format, Format}} when Format =:= term; 
                                           Format =:= bchunk ->
		{ok, Format};
	    {value, {min_no_slots, I}} ->
		case catch is_min_no_slots(I) of
		    {'EXIT', _} -> badarg;
		    MinNoSlots -> {ok, MinNoSlots}
		end;
            {value, {n_objects, default}} ->
                {ok, default_option(Key)};
            {value, {n_objects, NObjs}} when is_integer(NObjs),
                                             NObjs >= 1 ->
                {ok, NObjs};
            {value, {traverse, select}} ->
                {ok, select};
            {value, {traverse, {select, MS}}} ->
                {ok, {select, MS}};
            {value, {traverse, first_next}} ->
                {ok, first_next};
	    {value, {Key, _}} ->
		badarg;
	    false ->
		Default = default_option(Key),
		{ok, Default}
	end,
    case V of
	badarg ->
	    {badarg, Key};
	{ok, Value} ->
	    NewOptions = lists:keydelete(Key, 1, Options),
	    options(NewOptions, Keys, [Value | L])
    end;
options([], [], L) ->
    lists:reverse(L);
options(Options, _, _L) ->
    {badarg,Options}.

default_option(format) -> term;
default_option(min_no_slots) -> default;
default_option(traverse) -> select;
default_option(n_objects) -> default.

listify(L) when is_list(L) ->
    L;
listify(T) ->
    [T].

treq(Tab, R) ->
    case catch dets_server:get_pid(Tab) of
	Pid when is_pid(Pid) ->
	    req(Pid, R);
	_ ->
	    badarg
    end.

req(Proc, R) ->
    Ref = erlang:monitor(process, Proc),
    Proc ! ?DETS_CALL(self(), R),
    receive 
	{'DOWN', Ref, process, Proc, _Info} ->
            badarg;
	{Proc, Reply} ->
	    erlang:demonitor(Ref),
	    receive 
		{'DOWN', Ref, process, Proc, _Reason} ->
                    Reply
	    after 0 ->
                    Reply
	    end
    end.

%% Inlined.
einval({error, {file_error, _, einval}}, A) ->
    erlang:error(badarg, A);
einval({error, {file_error, _, badarg}}, A) ->
    erlang:error(badarg, A);
einval(Reply, _A) ->
    Reply.

%% Inlined.
badarg(badarg, A) ->
    erlang:error(badarg, A);
badarg(Reply, _A) ->
    Reply.

%% Inlined.
undefined(badarg) ->
    undefined;
undefined(Reply) ->
    Reply.

%% Inlined.
badarg_exit(badarg, A) ->
    erlang:error(badarg, A);
badarg_exit({ok, Reply}, _A) ->
    Reply;
badarg_exit(Reply, _A) ->
    exit(Reply).

%%%-----------------------------------------------------------------
%%% Server functions
%%%-----------------------------------------------------------------

init(Parent, Server) ->
    process_flag(trap_exit, true),
    open_file_loop(#head{parent = Parent, server = Server}).

open_file_loop(Head) ->
    open_file_loop(Head, 0).

open_file_loop(Head, N) when element(1, Head#head.update_mode) =:= error ->
    open_file_loop2(Head, N);
open_file_loop(Head, N) ->
    receive 
        %% When the table is fixed it can be assumed that at least one
        %% traversal is in progress. To speed the traversal up three
        %% things have been done:
        %% - prioritize match_init, bchunk, next, and match_delete_init;
        %% - do not peek the message queue for updates;
        %% - wait 1 ms after each update.
        %% next is normally followed by lookup, but since lookup is also
        %% used when not traversing the table, it is not prioritized.
        ?DETS_CALL(From, {match_init, _State} = Op) ->
            do_apply_op(Op, From, Head, N);
        ?DETS_CALL(From, {bchunk, _State} = Op) ->
            do_apply_op(Op, From, Head, N);
        ?DETS_CALL(From, {next, _Key} = Op) ->
            do_apply_op(Op, From, Head, N);            
        ?DETS_CALL(From, {match_delete_init, _MP, _Spec} = Op) ->
            do_apply_op(Op, From, Head, N);
        {'EXIT', Pid, Reason} when Pid =:= Head#head.parent ->
            %% Parent orders shutdown.
            _NewHead = do_stop(Head),
            exit(Reason);
        {'EXIT', Pid, Reason} when Pid =:= Head#head.server ->
            %% The server is gone.
            _NewHead = do_stop(Head),
            exit(Reason);
        {'EXIT', Pid, _Reason} ->
            %% A process fixing the table exits.
            H2 = remove_fix(Head, Pid, close),
            open_file_loop(H2, N);
        {system, From, Req} ->
            sys:handle_system_msg(Req, From, Head#head.parent, 
                                  ?MODULE, [], Head)
    after 0 ->
            open_file_loop2(Head, N)
    end.

open_file_loop2(Head, N) ->
    receive
        ?DETS_CALL(From, Op) ->
            do_apply_op(Op, From, Head, N);
        {'EXIT', Pid, Reason} when Pid =:= Head#head.parent ->
            %% Parent orders shutdown.
            _NewHead = do_stop(Head),
            exit(Reason);
        {'EXIT', Pid, Reason} when Pid =:= Head#head.server ->
            %% The server is gone.
            _NewHead = do_stop(Head),
            exit(Reason);
        {'EXIT', Pid, _Reason} ->
            %% A process fixing the table exits.
            H2 = remove_fix(Head, Pid, close),
            open_file_loop(H2, N);
        {system, From, Req} ->
            sys:handle_system_msg(Req, From, Head#head.parent, 
                                  ?MODULE, [], Head);
        Message ->
            error_logger:format("** dets: unexpected message"
                                "(ignored): ~w~n", [Message]),
            open_file_loop(Head, N)
    end.

do_apply_op(Op, From, Head, N) ->
    try apply_op(Op, From, Head, N) of
        ok -> 
            open_file_loop(Head, N);
        {N2, H2} when is_record(H2, head), is_integer(N2) ->
            open_file_loop(H2, N2);
        H2 when is_record(H2, head) ->
            open_file_loop(H2, N);
        {{more,From1,Op1,N1}, NewHead} ->
            do_apply_op(Op1, From1, NewHead, N1)
    catch 
        exit:normal -> 
            exit(normal);
        _:Bad -> 
            Name = Head#head.name,
            case dets_utils:debug_mode() of
                true ->
                    %% If stream_op/5 found more requests, this is not
                    %% the last operation.
                    error_logger:format
                      ("** dets: Bug was found when accessing table ~w,~n"
                       "** dets: operation was ~p and reply was ~w.~n"
                       "** dets: Stacktrace: ~w~n",
                       [Name, Op, Bad, erlang:get_stacktrace()]);
                false ->
                    error_logger:format
                      ("** dets: Bug was found when accessing table ~w~n",
                       [Name])
            end,
            if
                From =/= self() ->
                    From ! {self(), {error, {dets_bug, Name, Op, Bad}}};
                true -> % auto_save | may_grow | {delayed_write, _}
                    ok
            end,
            open_file_loop(Head, N)
    end.

apply_op(Op, From, Head, N) ->
    case Op of
	{add_user, Tab, OpenArgs}->
            #open_args{file = Fname, type = Type, keypos = Keypos, 
                       ram_file = Ram, access = Access, 
		       version = Version} = OpenArgs,
            VersionOK = (Version =:= default) or 
                        (Head#head.version =:= Version),
	    %% min_no_slots and max_no_slots are not tested
	    Res = if
		      Tab =:= Head#head.name,
		      Head#head.keypos =:= Keypos,
		      Head#head.type =:= Type,
		      Head#head.ram_file =:= Ram,
		      Head#head.access =:= Access,
		      VersionOK,
		      Fname =:= Head#head.filename ->
			  ok;
		      true ->
			  err({error, incompatible_arguments})
		  end,
	    From ! {self(), Res},
	    ok;
	auto_save ->
	    case Head#head.update_mode of
		saved ->
		    Head;
		{error, _Reason} ->
		    Head;
		_Dirty when N =:= 0 -> % dirty or new_dirty
		    %% The updates seems to have declined
		    dets_utils:vformat("** dets: Auto save of ~p\n", 
                                       [Head#head.name]), 
		    {NewHead, _Res} = perform_save(Head, true),
		    erlang:garbage_collect(),
		    {0, NewHead};
		dirty -> 
		    %% Reset counter and try later
		    start_auto_save_timer(Head),
		    {0, Head}
	    end;
	close  ->
	    From ! {self(), fclose(Head)},
	    _NewHead = unlink_fixing_procs(Head),
	    ?PROFILE(ep:done()),
	    exit(normal);
	{close, Pid} -> 
	    %% Used from dets_server when Pid has closed the table,
	    %% but the table is still opened by some process.
	    NewHead = remove_fix(Head, Pid, close),
	    From ! {self(), status(NewHead)},
	    NewHead;
	{corrupt, Reason} ->
	    {H2, Error} = dets_utils:corrupt_reason(Head, Reason),
	    From ! {self(), Error},
	    H2;
	{delayed_write, WrTime} ->
	    delayed_write(Head, WrTime);
	info ->
	    {H2, Res} = finfo(Head),
	    From ! {self(), Res},
	    H2;
	{info, Tag} ->
	    {H2, Res} = finfo(Head, Tag),
	    From ! {self(), Res},
	    H2;
        {is_compatible_bchunk_format, Term} ->
            Res = test_bchunk_format(Head, Term),
            From ! {self(), Res},
            ok;
	{internal_open, Ref, Args} ->
	    ?PROFILE(ep:do()),
	    case do_open_file(Args, Head#head.parent, Head#head.server,Ref) of
		{ok, H2} -> 
		    From ! {self(), ok},
		    H2;
		Error -> 
		    From ! {self(), Error},
		    exit(normal)
	    end;
	may_grow when Head#head.update_mode =/= saved ->
	    if
		Head#head.update_mode =:= dirty ->
		    %% Won't grow more if the table is full.
		    {H2, _Res} = 
			(Head#head.mod):may_grow(Head, 0, many_times),
		    {N + 1, H2};
		true -> 
		    ok
	    end;
	{set_verbose, What} ->
	    set_verbose(What), 
	    From ! {self(), ok},
	    ok;
	{where, Object} ->
	    {H2, Res} = where_is_object(Head, Object),
	    From ! {self(), Res},
	    H2;
	_Message when element(1, Head#head.update_mode) =:= error ->
	    From ! {self(), status(Head)},
	    ok;
	%% The following messages assume that the status of the table is OK.
	{bchunk_init, Tab} ->
	    {H2, Res} = do_bchunk_init(Head, Tab),
	    From ! {self(), Res},
	    H2;
	{bchunk, State} ->
	    {H2, Res} = do_bchunk(Head, State),
	    From ! {self(), Res},
	    H2;
	delete_all_objects ->
	    {H2, Res} = fdelete_all_objects(Head),
	    From ! {self(), Res},
	    erlang:garbage_collect(),
	    {0, H2};
	{delete_key, Keys} when Head#head.update_mode =:= dirty ->
	    if
		Head#head.version =:= 8 ->
		    {H2, Res} = fdelete_key(Head, Keys),
		    From ! {self(), Res},
		    {N + 1, H2};
		true ->
		    stream_op(Op, From, [], Head, N)
	    end;
	{delete_object, Objs} when Head#head.update_mode =:= dirty ->
	    case check_objects(Objs, Head#head.keypos) of
		true when Head#head.version =:= 8 ->
		    {H2, Res} = fdelete_object(Head, Objs),
		    From ! {self(), Res},
		    {N + 1, H2};
		true ->
		    stream_op(Op, From, [], Head, N);
		false ->
		    From ! {self(), badarg},
		    ok
	    end;
	first ->
	    {H2, Res} = ffirst(Head),
	    From ! {self(), Res},
	    H2;
        {initialize, InitFun, Format, MinNoSlots} ->
            {H2, Res} = finit(Head, InitFun, Format, MinNoSlots),
            From ! {self(), Res},
	    erlang:garbage_collect(),
            H2;
	{insert, Objs} when Head#head.update_mode =:= dirty ->
	    case check_objects(Objs, Head#head.keypos) of
		true when Head#head.version =:= 8 ->
		    {H2, Res} = finsert(Head, Objs),
		    From ! {self(), Res},
		    {N + 1, H2};
		true ->
		    stream_op(Op, From, [], Head, N);
		false ->
		    From ! {self(), badarg},
		    ok
	    end;
	{insert_new, Objs} when Head#head.update_mode =:= dirty ->
            {H2, Res} = finsert_new(Head, Objs),
            From ! {self(), Res},
            {N + 1, H2};
	{lookup_keys, Keys}  when Head#head.version =:= 8 ->
	    {H2, Res} = flookup_keys(Head, Keys),
	    From ! {self(), Res},
	    H2;
	{lookup_keys, _Keys} ->
	    stream_op(Op, From, [], Head, N);
	{match_init, State} ->
	    {H2, Res} = fmatch_init(Head, State),
	    From ! {self(), Res},
	    H2;
	{match, MP, Spec, NObjs} ->
	    {H2, Res} = fmatch(Head, MP, Spec, NObjs),
	    From ! {self(), Res},
	    H2;
	{member, Key} when Head#head.version =:= 8 ->
	    {H2, Res} = fmember(Head, Key),
	    From ! {self(), Res},
	    H2;
	{member, _Key} = Op ->
	    stream_op(Op, From, [], Head, N);
	{next, Key} ->
	    {H2, Res} = fnext(Head, Key),
	    From ! {self(), Res},
	    H2;
	{match_delete, State} when Head#head.update_mode =:= dirty ->
	    {H2, Res} = fmatch_delete(Head, State),
	    From ! {self(), Res},
	    {N + 1, H2};
	{match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty ->
	    {H2, Res} = fmatch_delete_init(Head, MP, Spec),
	    From ! {self(), Res},
	    {N + 1, H2};
	{safe_fixtable, Bool} ->
	    NewHead = do_safe_fixtable(Head, From, Bool),
	    From ! {self(), ok},
	    NewHead;
	{slot, Slot} ->
	    {H2, Res} = fslot(Head, Slot),
	    From ! {self(), Res},
	    H2;
	sync ->
	    {NewHead, Res} = perform_save(Head, true),
	    From ! {self(), Res},
	    erlang:garbage_collect(),
	    {0, NewHead};
	{update_counter, Key, Incr} when Head#head.update_mode =:= dirty ->
	    {NewHead, Res} = do_update_counter(Head, Key, Incr),
	    From ! {self(), Res},
	    {N + 1, NewHead};
	WriteOp when Head#head.update_mode =:= new_dirty ->
	    H2 = Head#head{update_mode = dirty},
	    apply_op(WriteOp, From, H2, 0);
	WriteOp when Head#head.access =:= read_write,
		     Head#head.update_mode =:= saved ->
	    case catch (Head#head.mod):mark_dirty(Head) of
		ok ->
		    start_auto_save_timer(Head),
		    H2 = Head#head{update_mode = dirty},
		    apply_op(WriteOp, From, H2, 0);
		{NewHead, Error} when is_record(NewHead, head) ->
		    From ! {self(), Error},
		    NewHead
	    end;
	WriteOp when is_tuple(WriteOp), Head#head.access =:= read ->
	    Reason = {access_mode, Head#head.filename},
	    From ! {self(), err({error, Reason})},
	    ok
    end.

start_auto_save_timer(Head) when Head#head.auto_save =:= infinity ->
    ok;
start_auto_save_timer(Head) ->
    Millis = Head#head.auto_save,
    erlang:send_after(Millis, self(), ?DETS_CALL(self(), auto_save)).

%% Version 9: Peek the message queue and try to evaluate several
%% lookup requests in parallel. Evalute delete_object, delete and
%% insert as well.
stream_op(Op, Pid, Pids, Head, N) ->
    #head{fixed = Fxd, update_mode = M} = Head, 
    stream_op(Head, Pids, [], N, Pid, Op, Fxd, M).

stream_loop(Head, Pids, C, N, false = Fxd, M) ->
    receive
	?DETS_CALL(From, Message) ->
	    stream_op(Head, Pids, C, N, From, Message, Fxd, M)
    after 0 ->
	    stream_end(Head, Pids, C, N, no_more)
    end;
stream_loop(Head, Pids, C, N, _Fxd, _M) ->
    stream_end(Head, Pids, C, N, no_more).

stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd, M) ->
    NC = [{{lookup,Pid},Keys} | C],
    stream_loop(Head, Pids, NC, N, Fxd, M);
stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd, dirty = M) ->
    NC = [Op | C],
    stream_loop(Head, [Pid | Pids], NC, N, Fxd, M);
stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd, dirty = M) ->
    NC = [Op | C],
    stream_loop(Head, [Pid | Pids], NC, N, Fxd, M);
stream_op(Head, Pids, C, N, Pid, {delete_object, _Os} = Op, Fxd, dirty = M) ->
    NC = [Op | C],
    stream_loop(Head, [Pid | Pids], NC, N, Fxd, M);
stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd, M) ->
    NC = [{{lookup,[Pid]},[Key]} | C],
    stream_loop(Head, Pids, NC, N, Fxd, M);
stream_op(Head, Pids, C, N, Pid, Op, _Fxd, _M) ->
    stream_end(Head, Pids, C, N, {Pid,Op}).

stream_end(Head, Pids0, C, N, Next) ->
    case catch update_cache(Head, lists:reverse(C)) of
	{Head1, [], PwriteList} ->
	    stream_end1(Pids0, Next, N, C, Head1, PwriteList);
	{Head1, Found, PwriteList} ->
	    %% Possibly an optimization: reply to lookup requests
	    %% first, then write stuff. This makes it possible for
	    %% clients to continue while the disk is accessed.
	    %% (Replies to lookup requests are sent earlier than
	    %%  replies to delete and insert requests even if the
	    %%  latter requests were made before the lookup requests,
	    %%  which can be confusing.)
	    lookup_replies(Found),
	    stream_end1(Pids0, Next, N, C, Head1, PwriteList);
	Head1 when is_record(Head1, head) ->
	    stream_end2(Pids0, Pids0, Next, N, C, Head1, ok);	    
	{Head1, Error} when is_record(Head1, head) ->
	    %% Dig out the processes that did lookup or member.
	    Fun = fun({{lookup,[Pid]},_Keys}, L) -> [Pid | L];
		     ({{lookup,Pid},_Keys}, L) -> [Pid | L];
		     (_, L) -> L
		  end,
	    LPs0 = lists:foldl(Fun, [], C),
	    LPs = lists:usort(lists:flatten(LPs0)),
	    stream_end2(Pids0 ++ LPs, Pids0, Next, N, C, Head1, Error);
        DetsError ->
            throw(DetsError)
    end.

stream_end1(Pids, Next, N, C, Head, []) ->
    stream_end2(Pids, Pids, Next, N, C, Head, ok);
stream_end1(Pids, Next, N, C, Head, PwriteList) ->
    {Head1, PR} = (catch dets_utils:pwrite(Head, PwriteList)),
    stream_end2(Pids, Pids, Next, N, C, Head1, PR).

stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) ->
    Pid ! {self(), Reply},
    stream_end2(Pids, Ps, Next, N+1, C, Head, Reply);
stream_end2([], Ps, no_more, N, C, Head, _Reply) ->
    penalty(Head, Ps, C),
    {N, Head};
stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) ->
    {{more,From,Op,N},Head}.

penalty(H, _Ps, _C) when H#head.fixed =:= false ->
    ok;
penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) ->
    ok;
penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) ->
    ok;
penalty(_H, _Ps, _C) ->
    timer:sleep(1).

lookup_replies([{P,O}]) ->
    lookup_reply(P, O);
lookup_replies(Q) ->
    [{P,O} | L] = dets_utils:family(Q),
    lookup_replies(P, lists:append(O), L).

lookup_replies(P, O, []) ->
    lookup_reply(P, O);
lookup_replies(P, O, [{P2,O2} | L]) ->
    lookup_reply(P, O),
    lookup_replies(P2, lists:append(O2), L).

%% If a list of Pid then op was {member, Key}. Inlined.
lookup_reply([P], O) ->
    P ! {self(), O =/= []};
lookup_reply(P, O) ->
    P ! {self(), O}.

%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
system_continue(_Parent, _, Head) ->
    open_file_loop(Head).

system_terminate(Reason, _Parent, _, Head) ->
    _NewHead = do_stop(Head),
    exit(Reason).

%%-----------------------------------------------------------------
%% Code for upgrade.
%%-----------------------------------------------------------------
system_code_change(State, _Module, _OldVsn, _Extra) ->
    {ok, State}.


%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

constants(FH, FileName) ->
    Version = FH#fileheader.version,
    if 
        Version =< 8 ->
            dets_v8:constants();
        Version =:= 9 ->
            dets_v9:constants();
        true ->
            throw({error, {not_a_dets_file, FileName}})
    end.

%% -> {ok, Fd, fileheader()} | throw(Error)
read_file_header(FileName, Access, RamFile) ->
    BF = if
	     RamFile ->
		 case file:read_file(FileName) of
		     {ok, B} -> B;
		     Err -> dets_utils:file_error(FileName, Err)
		 end;
	     true ->
		 FileName
	 end,
    {ok, Fd} = dets_utils:open(BF, open_args(Access, RamFile)),
    {ok, <<Version:32>>} = 
        dets_utils:pread_close(Fd, FileName, ?FILE_FORMAT_VERSION_POS, 4),
    if 
        Version =< 8 ->
            dets_v8:read_file_header(Fd, FileName);
        Version =:= 9 ->
            dets_v9:read_file_header(Fd, FileName);
        true ->
            throw({error, {not_a_dets_file, FileName}})
    end.

fclose(Head) ->
    {Head1, Res} = perform_save(Head, false),
    case Head1#head.ram_file of
	true -> 
	    ignore;
	false -> 
            dets_utils:stop_disk_map(),
	    file:close(Head1#head.fptr)
    end,
    Res.

%% -> {NewHead, Res}
perform_save(Head, DoSync) when Head#head.update_mode =:= dirty;
				Head#head.update_mode =:= new_dirty ->
    case catch begin
                   {Head1, []} = write_cache(Head),
                   {Head2, ok} = (Head1#head.mod):do_perform_save(Head1),
                   ok = ensure_written(Head2, DoSync),
                   {Head2#head{update_mode = saved}, ok}
               end of
        {NewHead, _} = Reply when is_record(NewHead, head) ->
            Reply
    end;
perform_save(Head, _DoSync) ->
    {Head, status(Head)}.

ensure_written(Head, DoSync) when Head#head.ram_file ->
    {ok, EOF} = dets_utils:position(Head, eof),
    {ok, Bin} = dets_utils:pread(Head, 0, EOF, 0),
    if
	DoSync ->
	    dets_utils:write_file(Head, Bin);
	not DoSync ->
            case file:write_file(Head#head.filename, Bin) of
                ok -> 
                    ok;
                Error -> 
                    dets_utils:corrupt_file(Head, Error)
            end
    end;
ensure_written(Head, true) when not Head#head.ram_file ->
    dets_utils:sync(Head);
ensure_written(Head, false) when not Head#head.ram_file ->
    ok.

%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error}
do_bchunk_init(Head, Tab) ->
    case catch write_cache(Head) of
	{H2, []} ->
	    case (H2#head.mod):table_parameters(H2) of
		undefined ->
		    {H2, {error, old_version}};
		Parms ->
                    L = dets_utils:all_allocated(H2),
                    C0 = #dets_cont{no_objs = default, bin = <<>>, alloc = L},
		    BinParms = term_to_binary(Parms),
		    {H2, {C0#dets_cont{tab = Tab, proc = self(),what = bchunk},
                          [BinParms]}}
	    end;
	{NewHead, _} = HeadError when is_record(NewHead, head) ->
	    HeadError
    end.

%% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error}
do_bchunk(Head, #dets_cont{proc = Proc}) when Proc =/= self() ->
    {Head, badarg};
do_bchunk(Head, #dets_cont{bin = eof}) ->
    {Head, '$end_of_table'};
do_bchunk(Head, State) ->
    case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of
	{error, Reason} ->
	    dets_utils:corrupt_reason(Head, Reason);
	{finished, Bins} ->
	    {Head, {State#dets_cont{bin = eof}, Bins}};
	{Bins, NewL} ->
	    {Head, {State#dets_cont{alloc = NewL}, Bins}}
    end.

%% -> {NewHead, Result}
fdelete_all_objects(Head) when Head#head.fixed =:= false ->
    case catch do_delete_all_objects(Head) of
	{ok, NewHead} ->
	    start_auto_save_timer(NewHead),
	    {NewHead, ok};
	{error, Reason} ->
	    dets_utils:corrupt_reason(Head, Reason)
    end;
fdelete_all_objects(Head) ->
    {Head, fixed}.

do_delete_all_objects(Head) ->
    #head{fptr = Fd, name = Tab, filename = Fname, type = Type, keypos = Kp, 
	  ram_file = Ram, auto_save = Auto, min_no_slots = MinSlots,
	  max_no_slots = MaxSlots, cache = Cache} = Head, 
    CacheSz = dets_utils:cache_size(Cache),
    ok = dets_utils:truncate(Fd, Fname, bof),
    (Head#head.mod):initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots,
				  Ram, CacheSz, Auto, true).

%% -> {NewHead, Reply}, Reply = ok | Error.
fdelete_key(Head, Keys) ->
    do_delete(Head, Keys, delete_key).

%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
fdelete_object(Head, Objects) ->
    do_delete(Head, Objects, delete_object).

ffirst(H) ->
    Ref = make_ref(),
    case catch {Ref, ffirst1(H)} of
	{Ref, {NH, R}} -> 
	    {NH, {ok, R}};
	{NH, R} when is_record(NH, head) -> 
	    {NH, {error, R}}
    end.

ffirst1(H) ->
    check_safe_fixtable(H),
    {NH, []} = write_cache(H),
    ffirst(NH, 0).

ffirst(H, Slot) ->
    case (H#head.mod):slot_objs(H, Slot) of
	'$end_of_table' -> {H, '$end_of_table'};
	[] -> ffirst(H, Slot+1);
	[X|_] -> {H, element(H#head.keypos, X)}
    end.

%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
finsert(Head, Objects) ->
    case catch update_cache(Head, Objects, insert) of
	{NewHead, []} ->
	    {NewHead, ok};
	{NewHead, _} = HeadError when is_record(NewHead, head) ->
	    HeadError
    end.

%% -> {NewHead, Reply}, Reply = ok | badarg | Error.
finsert_new(Head, Objects) ->
    KeyPos = Head#head.keypos,
    case catch lists:map(fun(Obj) -> element(KeyPos, Obj) end, Objects) of
        Keys when is_list(Keys) ->
            case catch update_cache(Head, Keys, {lookup, nopid}) of
                {Head1, PidObjs} when is_list(PidObjs) ->
                    case lists:all(fun({_P,OL}) -> OL =:= [] end, PidObjs) of
                        true ->
                            case catch update_cache(Head1, Objects, insert) of
                                {NewHead, []} ->
                                    {NewHead, true};
                                {NewHead, Error} when is_record(NewHead, head) ->
                                    {NewHead, Error}
                            end;
                        false=Reply ->
                            {Head1, Reply}
                    end;
                {NewHead, _} = HeadError when is_record(NewHead, head) ->
                    HeadError
            end;
        _ ->
            {Head, badarg}
    end.

do_safe_fixtable(Head, Pid, true) ->
    case Head#head.fixed of 
	false -> 
	    link(Pid),
	    Fixed = {erlang:now(), [{Pid, 1}]},
	    Ftab = dets_utils:get_freelists(Head),
	    Head#head{fixed = Fixed, freelists = {Ftab, Ftab}};
	{TimeStamp, Counters} ->
	    case lists:keysearch(Pid, 1, Counters) of
		{value, {Pid, Counter}} -> % when Counter > 1
		    NewCounters = lists:keyreplace(Pid, 1, Counters, 
						   {Pid, Counter+1}),
		    Head#head{fixed = {TimeStamp, NewCounters}};
		false ->
		    link(Pid),
		    Fixed = {TimeStamp, [{Pid, 1} | Counters]},
		    Head#head{fixed = Fixed}
	    end
    end;
do_safe_fixtable(Head, Pid, false) ->
    remove_fix(Head, Pid, false).

remove_fix(Head, Pid, How) ->
    case Head#head.fixed of 
	false -> 
	    Head;
	{TimeStamp, Counters} ->
	    case lists:keysearch(Pid, 1, Counters) of
		%% How =:= close when Pid closes the table.
		{value, {Pid, Counter}} when Counter =:= 1; How =:= close ->
		    unlink(Pid),
		    case lists:keydelete(Pid, 1, Counters) of
			[] -> 
			    check_growth(Head),
			    erlang:garbage_collect(),
			    Head#head{fixed = false, 
				  freelists = dets_utils:get_freelists(Head)};
			NewCounters ->
			    Head#head{fixed = {TimeStamp, NewCounters}}
		    end;
		{value, {Pid, Counter}} ->
		    NewCounters = lists:keyreplace(Pid, 1, Counters, 
						   {Pid, Counter-1}),
		    Head#head{fixed = {TimeStamp, NewCounters}};
		false ->
		    Head
	    end
    end.

do_stop(Head) ->
    unlink_fixing_procs(Head),
    fclose(Head).

unlink_fixing_procs(Head) ->
    case Head#head.fixed of
	false ->
	    Head;
	{_, Counters} ->
	    lists:map(fun({Pid, _Counter}) -> unlink(Pid) end, Counters),
	    Head#head{fixed = false, 
		      freelists = dets_utils:get_freelists(Head)}
    end.

check_growth(#head{access = read}) ->
    ok;
check_growth(Head) ->
    NoThings = no_things(Head),
    if
	NoThings > Head#head.next ->
	    erlang:send_after(200, self(), 
			      ?DETS_CALL(self(), may_grow)); % Catch up.
	true ->
	    ok
    end.

finfo(H) -> 
    case catch write_cache(H) of
	{H2, []} ->
	    Info = (catch [{type, H2#head.type}, 
			   {keypos, H2#head.keypos}, 
			   {size, H2#head.no_objects},
			   {file_size, 
			    file_size(H2#head.fptr, H2#head.filename)},
			   {filename, H2#head.filename}]),
	    {H2, Info};
	{H2, _} = HeadError when is_record(H2, head) ->
	    HeadError
    end.

finfo(H, access) -> {H, H#head.access};
finfo(H, auto_save) -> {H, H#head.auto_save};
finfo(H, bchunk_format) -> 
    case catch write_cache(H) of
        {H2, []} ->
            case (H2#head.mod):table_parameters(H2) of
                undefined = Undef ->
                    {H2, Undef};
                Parms ->
                    {H2, term_to_binary(Parms)}
            end;
        {H2, _} = HeadError when is_record(H2, head) ->
            HeadError
    end;
finfo(H, delayed_write) -> % undocumented
    {H, dets_utils:cache_size(H#head.cache)};
finfo(H, filename) -> {H, H#head.filename};
finfo(H, file_size) ->
    case catch write_cache(H) of
	{H2, []} ->
	    {H2, catch file_size(H#head.fptr, H#head.filename)};
	{H2, _} = HeadError when is_record(H2, head) ->
	    HeadError
    end;
finfo(H, fixed) -> 
    %% true if fixtable/2 has been called
    {H, not (H#head.fixed =:= false)}; 
finfo(H, hash) -> {H, H#head.hash_bif};
finfo(H, keypos) -> {H, H#head.keypos};
finfo(H, memory) -> finfo(H, file_size);
finfo(H, no_objects) -> finfo(H, size);
finfo(H, no_keys) ->
    case catch write_cache(H) of
	{H2, []} ->
	    {H2, H2#head.no_keys};
	{H2, _} = HeadError when is_record(H2, head) ->
	    HeadError
    end;
finfo(H, no_slots) -> {H, (H#head.mod):no_slots(H)};
finfo(H, pid) -> {H, self()};
finfo(H, ram_file) -> {H, H#head.ram_file};
finfo(H, safe_fixed) -> {H, H#head.fixed};
finfo(H, size) -> 
    case catch write_cache(H) of
	{H2, []} ->
	    {H2, H2#head.no_objects};
	{H2, _} = HeadError when is_record(H2, head) ->
	    HeadError
    end;
finfo(H, type) -> {H, H#head.type};
finfo(H, version) -> {H, H#head.version};
finfo(H, _) -> {H, undefined}.

file_size(Fd, FileName) -> 
    {ok, Pos} = dets_utils:position(Fd, FileName, eof),
    Pos.

test_bchunk_format(_Head, undefined) ->
    false;
test_bchunk_format(Head, _Term) when Head#head.version =:= 8 ->
    false;
test_bchunk_format(Head, Term) ->
    dets_v9:try_bchunk_header(Term, Head) =/= not_ok.

do_open_file([Fname, Verbose], Parent, Server, Ref) ->
    case catch fopen2(Fname, Ref) of
	{error, _Reason} = Error ->
	    err(Error);
	{ok, Head} ->
	    maybe_put(verbose, Verbose),
	    {ok, Head#head{parent = Parent, server = Server}};
	{'EXIT', _Reason} = Error ->
	    Error;
	Bad ->
	    error_logger:format
	      ("** dets: Bug was found in open_file/1, reply was ~w.~n", 
	       [Bad]),
	    {error, {dets_bug, Fname, Bad}}
    end;
do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref) ->
    case catch fopen3(Tab, OpenArgs) of
	{error, {tooshort, _}} ->
	    file:delete(OpenArgs#open_args.file),
	    do_open_file([Tab, OpenArgs, Verb], Parent, Server, Ref);
	{error, _Reason} = Error ->
	    err(Error);
	{ok, Head} ->
	    maybe_put(verbose, Verb),
	    {ok, Head#head{parent = Parent, server = Server}};
	{'EXIT', _Reason} = Error ->
	    Error;
	Bad ->
	    error_logger:format
	      ("** dets: Bug was found in open_file/2, arguments were~n"
	       "** dets: ~w and reply was ~w.~n", 
	       [OpenArgs, Bad]),
	    {error, {dets_bug, Tab, {open_file, OpenArgs}, Bad}}
    end.

maybe_put(_, undefined) ->
    ignore;
maybe_put(K, V) ->
    put(K, V).

%% -> {Head, Result}, Result = ok | Error | {thrown, Error} | badarg
finit(Head, InitFun, _Format, _NoSlots) when Head#head.access =:= read ->
    _ = (catch InitFun(close)),
    {Head, {error, {access_mode, Head#head.filename}}};
finit(Head, InitFun, _Format, _NoSlots) when Head#head.fixed =/= false ->
    _ = (catch InitFun(close)),
    {Head, {error, {fixed_table, Head#head.name}}};
finit(Head, InitFun, Format, NoSlots) ->
    case catch do_finit(Head, InitFun, Format, NoSlots) of
	{ok, NewHead} ->
	    check_growth(NewHead),
	    start_auto_save_timer(NewHead),
	    {NewHead, ok};
	badarg ->
	    {Head, badarg};
	Error ->
	    dets_utils:corrupt(Head, Error)
    end.

%% -> {ok, NewHead} | throw(badarg) | throw(Error)
do_finit(Head, Init, Format, NoSlots) ->
    #head{fptr = Fd, type = Type, keypos = Kp, auto_save = Auto,
          cache = Cache, filename = Fname, ram_file = Ram,
	  min_no_slots = MinSlots0, max_no_slots = MaxSlots,
          name = Tab, update_mode = UpdateMode, mod = HMod} = Head,
    CacheSz = dets_utils:cache_size(Cache),
    {How, Head1} =
	case Format of
	    term when is_integer(NoSlots), NoSlots > MaxSlots ->
		throw(badarg);
	    term ->
		MinSlots = choose_no_slots(NoSlots, MinSlots0),
		if 
		    UpdateMode =:= new_dirty, MinSlots =:= MinSlots0 ->
			{general_init, Head};
		    true ->
			ok = dets_utils:truncate(Fd, Fname, bof),
			{ok, H} = HMod:initiate_file(Fd, Tab, Fname, Type, Kp,
						     MinSlots, MaxSlots, Ram,
						     CacheSz, Auto, false),
			{general_init, H}
		end;
	    bchunk ->
		ok = dets_utils:truncate(Fd, Fname, bof),
		{bchunk_init, Head}
	end,
    case How of
	bchunk_init -> 
	    case HMod:bchunk_init(Head1, Init) of
		{ok, NewHead} ->
		    {ok, NewHead#head{update_mode = dirty}};
		Error ->
		    Error
	    end;
	general_init ->
	    Cntrs = ets:new(dets_init, []),
	    Input = HMod:bulk_input(Head1, Init, Cntrs),
	    SlotNumbers = {Head1#head.min_no_slots, bulk_init, MaxSlots},
	    {Reply, SizeData} = 
		do_sort(Head1, SlotNumbers, Input, Cntrs, Fname, not_used),
	    Bulk = true,
	    case Reply of 
		{ok, NoDups, H1} ->
		    fsck_copy(SizeData, H1, Bulk, NoDups);
		Else ->
		    close_files(Bulk, SizeData, Head1),
		    Else
	    end
    end.

%% -> {NewHead, [LookedUpObject]} | {NewHead, Error}
flookup_keys(Head, Keys) ->
    case catch update_cache(Head, Keys, {lookup, nopid}) of
	{NewHead, [{_NoPid,Objs}]} -> 
	    {NewHead, Objs};
	{NewHead, L} when is_list(L) ->
	    {NewHead, lists:flatmap(fun({_Pid,OL}) -> OL end, L)};
	{NewHead, _} = HeadError when is_record(NewHead, head) ->
	    HeadError
    end.

%% -> {NewHead, Result}
fmatch_init(Head, #dets_cont{bin = eof}) ->
    {Head, '$end_of_table'};
fmatch_init(Head, C) ->
    case scan(Head, C) of
	{scan_error, Reason} ->
	    dets_utils:corrupt_reason(Head, Reason);
	{Ts, NC} ->
	    {Head, {cont, {Ts, NC}}}
    end.

%% -> {NewHead, Result}
fmatch(Head, MP, Spec, N) ->
    KeyPos = Head#head.keypos,
    case find_all_keys(Spec, KeyPos, []) of
	[] -> 
	    %% Complete match
	    case catch write_cache(Head) of
		{NewHead, []} ->
		    C0 = init_scan(NewHead, N),
		    {NewHead, {cont, C0#dets_cont{match_program = MP}}};
		{NewHead, _} = HeadError when is_record(NewHead, head) ->
		    HeadError
	    end;
	List ->
	    Keys = lists:usort(List),
	    {NewHead, Reply} = flookup_keys(Head, Keys),
	    case Reply of
		Objs when is_list(Objs) ->
		    MatchingObjs = ets:match_spec_run(Objs, MP),
		    {NewHead, {done, MatchingObjs}};
		Error ->
		    {NewHead, Error}
	    end
    end.

find_all_keys([], _, Ks) ->
    Ks;
find_all_keys([{H,_,_} | T], KeyPos, Ks) when is_tuple(H) ->
    case tuple_size(H) of
	Enough when Enough >= KeyPos ->
	    Key = element(KeyPos, H),
	    case contains_variable(Key) of
		true -> 
		    [];
		false ->
		    find_all_keys(T, KeyPos, [Key | Ks])
	    end;
	_ ->
	    find_all_keys(T, KeyPos, Ks)
    end;
find_all_keys(_, _, _) ->
    [].

contains_variable('_') ->
    true;
contains_variable(A) when is_atom(A) ->
    case atom_to_list(A) of
	[$$ | T] ->
	    case (catch list_to_integer(T)) of
		{'EXIT', _} ->
		    false;
		_ ->
		    true
	    end;
	_ ->
	    false
    end;
contains_variable(T) when is_tuple(T) ->
    contains_variable(tuple_to_list(T));
contains_variable([]) ->
    false;
contains_variable([H|T]) ->
    case contains_variable(H) of
	true ->
	    true;
	false ->
	    contains_variable(T)
    end;
contains_variable(_) ->
    false.

%% -> {NewHead, Res}
fmatch_delete_init(Head, MP, Spec)  ->
    KeyPos = Head#head.keypos,
    case catch 
        case find_all_keys(Spec, KeyPos, []) of
            [] -> 
                do_fmatch_delete_var_keys(Head, MP, Spec);
            List ->
                Keys = lists:usort(List),
                do_fmatch_constant_keys(Head, Keys, MP)
        end of
        {NewHead, _} = Reply when is_record(NewHead, head) ->
            Reply            
    end.

%% A note: If deleted objects reside in a bucket with other objects
%% that are not deleted, the bucket is moved. If the address of the
%% moved bucket is greater than original bucket address the kept
%% objects will be read once again later on. 
%% -> {NewHead, Res}
fmatch_delete(Head, C) ->
    case scan(Head, C) of
	{scan_error, Reason} ->
	    dets_utils:corrupt_reason(Head, Reason);
	{[], _} ->
	    {Head, {done, 0}};
	{RTs, NC} ->
	    MP = C#dets_cont.match_program,
	    case catch filter_binary_terms(RTs, MP, []) of
		{'EXIT', _} ->
                    Bad = dets_utils:bad_object(fmatch_delete, RTs),
		    dets_utils:corrupt_reason(Head, Bad);
		Terms -> 
		    do_fmatch_delete(Head, Terms, NC)
	    end
    end.

do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_')) 
            when Head#head.fixed =:= false ->
    %% Handle the case where the file is emptied efficiently.
    %% Empty the cache just to get the number of objects right.
    {Head1, []} = write_cache(Head),
    N = Head1#head.no_objects,
    case fdelete_all_objects(Head1) of
	{NewHead, ok} ->
	    {NewHead, {done, N}};
	Reply ->
	    Reply
    end;
do_fmatch_delete_var_keys(Head, MP, _Spec) ->
    {NewHead, []} = write_cache(Head),
    C0 = init_scan(NewHead, default),
    {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}.

do_fmatch_constant_keys(Head, Keys, MP) ->
    case flookup_keys(Head, Keys) of
	{NewHead, ReadTerms} when is_list(ReadTerms) ->
	    Terms = filter_terms(ReadTerms, MP, []),
	    do_fmatch_delete(NewHead, Terms, fixed);
	Reply ->
	    Reply
    end.

filter_binary_terms([Bin | Bins], MP, L) ->
    Term = binary_to_term(Bin),
    case ets:match_spec_run([Term], MP) of
	[true] -> 
	    filter_binary_terms(Bins, MP, [Term | L]);
	_ ->
	    filter_binary_terms(Bins, MP, L)
    end;
filter_binary_terms([], _MP, L) ->
    L.

filter_terms([Term | Terms], MP, L) ->
    case ets:match_spec_run([Term], MP) of
	[true] -> 
	    filter_terms(Terms, MP, [Term | L]);
	_ ->
	    filter_terms(Terms, MP, L)
    end;
filter_terms([], _MP, L) ->
    L.

do_fmatch_delete(Head, Terms, What) ->
    N = length(Terms),
    case do_delete(Head, Terms, delete_object) of
	{NewHead, ok} when What =:= fixed ->
	    {NewHead, {done, N}};
	{NewHead, ok} ->
	    {NewHead, {cont, What, N}};
	Reply ->
	    Reply
    end.

do_delete(Head, Things, What) ->
    case catch update_cache(Head, Things, What) of
	{NewHead, []} ->
	    {NewHead, ok};
	{NewHead, _} = HeadError when is_record(NewHead, head) ->
	    HeadError
    end.

fmember(Head, Key) ->
    case catch begin
                   {Head2, [{_NoPid,Objs}]} = 
                       update_cache(Head, [Key], {lookup, nopid}),
                   {Head2, Objs =/= []}
               end of
        {NewHead, _} = Reply when is_record(NewHead, head) ->
            Reply
    end.

fnext(Head, Key) ->
    Slot = (Head#head.mod):db_hash(Key, Head),
    Ref = make_ref(),
    case catch {Ref, fnext(Head, Key, Slot)} of
	{Ref, {H, R}} -> 
	    {H, {ok, R}};
	{NewHead, _} = HeadError when is_record(NewHead, head) ->
	    HeadError
    end.

fnext(H, Key, Slot) ->
    {NH, []} = write_cache(H),
    case (H#head.mod):slot_objs(NH, Slot) of
	'$end_of_table' -> {NH, '$end_of_table'};
	L -> fnext_search(NH, Key, Slot, L)
    end.

fnext_search(H, K, Slot, L) ->
    Kp = H#head.keypos,
    case beyond_key(K, Kp, L) of
	[] -> fnext_slot(H, K, Slot+1);
	L2 -> {H, element(H#head.keypos, hd(L2))}
    end.

%% We've got to continue to search for the next key in the next slot
fnext_slot(H, K, Slot) ->
    case (H#head.mod):slot_objs(H, Slot) of
	'$end_of_table' -> {H, '$end_of_table'};
	[] -> fnext_slot(H, K, Slot+1);
	L -> {H, element(H#head.keypos, hd(L))}
    end.

beyond_key(_K, _Kp, []) -> [];
beyond_key(K, Kp, [H|T]) ->
    case dets_utils:cmp(element(Kp, H), K) of
        0 -> beyond_key2(K, Kp, T);
        _ -> beyond_key(K, Kp, T)
    end.

beyond_key2(_K, _Kp, []) -> [];
beyond_key2(K, Kp, [H|T]=L) ->
    case dets_utils:cmp(element(Kp, H), K) of
        0 -> beyond_key2(K, Kp, T);
        _ -> L
    end.

%% Open an already existing file, no arguments
%% -> {ok, head()} | throw(Error)
fopen2(Fname, Tab) ->
    case file:read_file_info(Fname) of
	{ok, _} ->
	    Acc = read_write,
	    Ram = false, 
	    %% Fd is not always closed upon error, but exit is soon called.
	    {ok, Fd, FH} = read_file_header(Fname, Acc, Ram),
            Mod = FH#fileheader.mod,
	    case Mod:check_file_header(FH, Fd) of
		{error, not_closed} ->
		    io:format(user,"dets: file ~p not properly closed, "
			      "repairing ...~n", [Fname]),
                    Version = default,
                    case fsck(Fd, Tab, Fname, FH, default, default, Version) of
                        ok ->
                            fopen2(Fname, Tab);
                        Error ->
                            throw(Error)
                    end;
		{ok, Head, ExtraInfo} ->
		    open_final(Head, Fname, Acc, Ram, ?DEFAULT_CACHE, 
			       Tab, ExtraInfo, false);
		{error, Reason} ->
		    throw({error, {Reason, Fname}})
	    end;
	Error ->
	    dets_utils:file_error(Fname, Error)
    end.

%% Open and possibly create and initialize a file
%% -> {ok, head()} | throw(Error)
fopen3(Tab, OpenArgs) ->
    FileName = OpenArgs#open_args.file,
    case file:read_file_info(FileName) of
	{ok, _} ->
	    fopen_existing_file(Tab, OpenArgs);
	Error when OpenArgs#open_args.access =:= read ->
	    dets_utils:file_error(FileName, Error);
	_Error ->
	    fopen_init_file(Tab, OpenArgs)
    end.

fopen_existing_file(Tab, OpenArgs) ->
    #open_args{file = Fname, type = Type, keypos = Kp, repair = Rep,
               min_no_slots = MinSlots, max_no_slots = MaxSlots,
               ram_file = Ram, delayed_write = CacheSz, auto_save =
               Auto, access = Acc, version = Version, debug = Debug} =
        OpenArgs,
    %% Fd is not always closed upon error, but exit is soon called.
    {ok, Fd, FH} = read_file_header(Fname, Acc, Ram),
    V9 = (Version =:= 9) or (Version =:= default),
    MinF = (MinSlots =:= default) or (MinSlots =:= FH#fileheader.min_no_slots),
    MaxF = (MaxSlots =:= default) or (MaxSlots =:= FH#fileheader.max_no_slots),
    Do = case (FH#fileheader.mod):check_file_header(FH, Fd) of
	     {ok, Head, true} when Rep =:= force, Acc =:= read_write,
				   FH#fileheader.version =:= 9,
				   FH#fileheader.no_colls =/= undefined,
				   MinF, MaxF, V9 ->
		 {compact, Head};
             {ok, _Head, _Extra} when Rep =:= force, Acc =:= read ->
                 throw({error, {access_mode, Fname}});
	     {ok, Head, need_compacting} when Acc =:= read ->
                 {final, Head, true}; % Version 8 only.
	     {ok, _Head, need_compacting} when Rep =:= true ->
		 %% The file needs to be compacted due to a very big
		 %% and fragmented free_list. Version 8 only.
		 M = " is now compacted ...",
		 {repair, M};
	     {ok, _Head, _Extra} when Rep =:= force ->
		 M = ", repair forced.",
		 {repair, M};
	     {ok, Head, ExtraInfo} ->
		 {final, Head, ExtraInfo};
	     {error, not_closed} when Rep =:= force, Acc =:= read_write ->
		 M = ", repair forced.",
		 {repair, M};
	     {error, not_closed} when Rep =:= true, Acc =:= read_write ->
		 M = " not properly closed, repairing ...",
		 {repair, M};
	     {error, not_closed} when Rep =:= false ->
		 throw({error, {needs_repair, Fname}});
	     {error, version_bump} when Rep =:= true, Acc =:= read_write ->
                 %% Version 8 only
		 M = " old version, upgrading ...",
		 {repair, M};
	     {error, Reason} ->
		 throw({error, {Reason, Fname}})
	 end,
    case Do of
	_ when FH#fileheader.type =/= Type ->
	    throw({error, {type_mismatch, Fname}});
	_ when FH#fileheader.keypos =/= Kp ->
	    throw({error, {keypos_mismatch, Fname}});
	{compact, SourceHead} ->
	    io:format(user, "dets: file ~p is now compacted ...~n", [Fname]),
	    {ok, NewSourceHead} = open_final(SourceHead, Fname, read, false,
					     ?DEFAULT_CACHE, Tab, true,
                                             Debug),
	    case catch compact(NewSourceHead) of
		ok ->
		    erlang:garbage_collect(),
		    fopen3(Tab, OpenArgs#open_args{repair = false});
		_Err ->
                    _ = file:close(Fd),
                    dets_utils:stop_disk_map(),
		    io:format(user, "dets: compaction of file ~p failed, "
			      "now repairing ...~n", [Fname]),
                    {ok, Fd2, _FH} = read_file_header(Fname, Acc, Ram),
                    do_repair(Fd2, Tab, Fname, FH, MinSlots, MaxSlots, 
			      Version, OpenArgs)
	    end;
	{repair, Mess} ->
	    io:format(user, "dets: file ~p~s~n", [Fname, Mess]),
            do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, 
		      Version, OpenArgs);
	_ when FH#fileheader.version =/= Version, Version =/= default ->
	    throw({error, {version_mismatch, Fname}});
	{final, H, EI} ->
	    H1 = H#head{auto_save = Auto},
	    open_final(H1, Fname, Acc, Ram, CacheSz, Tab, EI, Debug)
    end.

do_repair(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version, OpenArgs) ->
    case fsck(Fd, Tab, Fname, FH, MinSlots, MaxSlots, Version) of
	ok ->
	    %% No need to update 'version'.
	    erlang:garbage_collect(),
	    fopen3(Tab, OpenArgs#open_args{repair = false});
	Error ->
	    throw(Error)
    end.

%% -> {ok, head()} | throw(Error)
open_final(Head, Fname, Acc, Ram, CacheSz, Tab, ExtraInfo, Debug) ->
    Head1 = Head#head{access = Acc,
		      ram_file = Ram,
		      filename = Fname,
		      name = Tab,
		      cache = dets_utils:new_cache(CacheSz)},
    init_disk_map(Head1#head.version, Tab, Debug),
    Mod = Head#head.mod,
    Mod:cache_segps(Head1#head.fptr, Fname, Head1#head.next),
    Ftab = Mod:init_freelist(Head1, ExtraInfo),
    check_growth(Head1),
    NewHead = Head1#head{freelists = Ftab},
    {ok, NewHead}.

%% -> {ok, head()} | throw(Error)
fopen_init_file(Tab, OpenArgs) ->
    #open_args{file = Fname, type = Type, keypos = Kp, 
               min_no_slots = MinSlotsArg, max_no_slots = MaxSlotsArg, 
	       ram_file = Ram, delayed_write = CacheSz, auto_save = Auto, 
               version = UseVersion, debug = Debug} = OpenArgs,
    MinSlots = choose_no_slots(MinSlotsArg, ?DEFAULT_MIN_NO_SLOTS),
    MaxSlots = choose_no_slots(MaxSlotsArg, ?DEFAULT_MAX_NO_SLOTS),
    FileSpec = if
		   Ram -> [];
		   true -> Fname
	       end,
    {ok, Fd} = dets_utils:open(FileSpec, open_args(read_write, Ram)),
    Version = if
                  UseVersion =:= default ->
                      case os:getenv("DETS_USE_FILE_FORMAT") of
                          "8" -> 8;
                          _ -> 9
                      end;
                  true ->
                      UseVersion
              end,
    Mod = version2module(Version),
    %% No need to truncate an empty file.
    init_disk_map(Version, Tab, Debug),
    case catch Mod:initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots,
				 Ram, CacheSz, Auto, true) of
	{error, Reason} when Ram ->
	    file:close(Fd),
	    throw({error, Reason});
	{error, Reason} ->
	    file:close(Fd),
	    file:delete(Fname),
	    throw({error, Reason});
	{ok, Head} ->
	    start_auto_save_timer(Head),
	    %% init_table does not need to truncate and write header
	    {ok, Head#head{update_mode = new_dirty}}
    end.

%% Debug.
init_disk_map(9, Name, Debug) ->
    case Debug orelse dets_utils:debug_mode() of
        true -> 
            dets_utils:init_disk_map(Name);
        false ->
            ok
    end;
init_disk_map(_Version, _Name, _Debug) ->
    ok.

open_args(Access, RamFile) ->
    A1 = case Access of
	     read -> [];
	     read_write -> [write]
	 end,
    A2 = case RamFile of
	     true -> [ram];
	     false -> [raw]
	   end,
    A1 ++ A2 ++ [binary, read].

version2module(V) when V =< 8 -> dets_v8;
version2module(9) -> dets_v9.

module2version(dets_v8) -> 8;
module2version(dets_v9) -> 9;
module2version(not_used) -> 9.

%% -> ok | throw(Error) 
%% For version 9 tables only.
compact(SourceHead) ->
    #head{name = Tab, filename = Fname, fptr = SFd, type = Type, keypos = Kp,
	  ram_file = Ram, auto_save = Auto} = SourceHead,
    Tmp = tempfile(Fname),
    TblParms = dets_v9:table_parameters(SourceHead),
    {ok, Fd} = dets_utils:open(Tmp, open_args(read_write, false)),
    CacheSz = ?DEFAULT_CACHE,
    %% It is normally not possible to have two open tables in the same
    %% process since the process dictionary is used for caching
    %% segment pointers, but here is works anyway--when reading a file
    %% serially the pointers to not need to be used.
    Head = case catch dets_v9:prep_table_copy(Fd, Tab, Tmp, Type, Kp, Ram, 
					      CacheSz, Auto, TblParms) of
	       {ok, H} ->
		   H;
	       Error ->
		   file:close(Fd),
                   file:delete(Tmp),
		   throw(Error)
	   end,

    case dets_v9:compact_init(SourceHead, Head, TblParms) of
	{ok, NewHead} ->
	    R = case fclose(NewHead) of
		    ok ->
                        ok = file:close(SFd),
			%% Save (rename) Fname first?
			dets_utils:rename(Tmp, Fname);
		    E ->
			E
		end,
	    if 
		R =:= ok -> ok;
		true ->
		    file:delete(Tmp),
		    throw(R)
	    end;
	Err ->
	    file:close(Fd),
            file:delete(Tmp),
	    throw(Err)
    end.
	    
%% -> ok | Error
%% Closes Fd.
fsck(Fd, Tab, Fname, FH, MinSlotsArg, MaxSlotsArg, Version) ->
    %% MinSlots and MaxSlots are the option values.
    #fileheader{min_no_slots = MinSlotsFile, 
                max_no_slots = MaxSlotsFile} = FH,
    EstNoSlots0 = file_no_things(FH),
    MinSlots = choose_no_slots(MinSlotsArg, MinSlotsFile),
    MaxSlots = choose_no_slots(MaxSlotsArg, MaxSlotsFile),
    EstNoSlots = erlang:min(MaxSlots, erlang:max(MinSlots, EstNoSlots0)),
    SlotNumbers = {MinSlots, EstNoSlots, MaxSlots},
    %% When repairing: We first try and sort on slots using MinSlots.
    %% If the number of objects (keys) turns out to be significantly
    %% different from NoSlots, we try again with the correct number of
    %% objects (keys).
    case fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) of
        {try_again, BetterNoSlots} ->
	    BetterSlotNumbers = {MinSlots, BetterNoSlots, MaxSlots},
            case fsck_try(Fd, Tab, FH, Fname, BetterSlotNumbers, Version) of
                {try_again, _} ->
                    file:close(Fd),
                    {error, {cannot_repair, Fname}};
                Else ->
                    Else
            end;
        Else ->
            Else
    end.

choose_no_slots(default, NoSlots) -> NoSlots;
choose_no_slots(NoSlots, _) -> NoSlots.

%% -> ok | {try_again, integer()} | Error
%% Closes Fd unless {try_again, _} is returned.
%% Initiating a table using a fun and repairing (or converting) a
%% file are completely different things, but nevertheless the same
%% method is used in both cases...
fsck_try(Fd, Tab, FH, Fname, SlotNumbers, Version) ->
    Tmp = tempfile(Fname),
    #fileheader{type = Type, keypos = KeyPos} = FH,
    {_MinSlots, EstNoSlots, MaxSlots} = SlotNumbers,
    OpenArgs = #open_args{file = Tmp, type = Type, keypos = KeyPos, 
                          repair = false, min_no_slots = EstNoSlots,
			  max_no_slots = MaxSlots,
                          ram_file = false, delayed_write = ?DEFAULT_CACHE,
                          auto_save = infinity, access = read_write,
                          version = Version, debug = false},
    case catch fopen3(Tab, OpenArgs) of
	{ok, Head} ->
            case fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) of
                {ok, NewHead} ->
                    R = case fclose(NewHead) of
                            ok ->
				%% Save (rename) Fname first?
				dets_utils:rename(Tmp, Fname);
                            Error ->
                                Error
                        end,
                    if 
			R =:= ok -> ok;
			true ->
			    file:delete(Tmp),
			    R
		    end;
		TryAgainOrError ->
                    file:delete(Tmp),
                    TryAgainOrError
            end;
	Error -> 
	    file:close(Fd),
	    Error
    end.

tempfile(Fname) ->
    Tmp = lists:concat([Fname, ".TMP"]),
    case file:delete(Tmp) of
        {error, eacces} -> % 'dets_process_died' happened anyway... (W-nd-ws)
            timer:sleep(5000),
            file:delete(Tmp);
        _ ->
            ok
    end,
    Tmp.

%% -> {ok, NewHead} | {try_again, integer()} | Error
fsck_try_est(Head, Fd, Fname, SlotNumbers, FH) ->
    %% Mod is the module to use for reading input when repairing.
    Mod = FH#fileheader.mod,
    Cntrs = ets:new(dets_repair, []),
    Input = Mod:fsck_input(Head, Fd, Cntrs, FH),
    {Reply, SizeData} = do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod),
    Bulk = false,
    case Reply of 
        {ok, NoDups, H1} ->
            file:close(Fd),
            fsck_copy(SizeData, H1, Bulk, NoDups);
        {try_again, _} = Return ->
            close_files(Bulk, SizeData, Head),
            Return;
        Else ->
            file:close(Fd),
            close_files(Bulk, SizeData, Head),
	    Else
    end.

do_sort(Head, SlotNumbers, Input, Cntrs, Fname, Mod) ->
    OldV = module2version(Mod),
    %% output_objs/4 replaces {LogSize,NoObjects} in Cntrs by
    %% {LogSize,Position,Data,NoObjects | NoCollections}.
    %% Data = {FileName,FileDescriptor} | [object()]
    %% For small tables Data may be a list of objects which is more
    %% efficient since no temporary files are created.
    Output = (Head#head.mod):output_objs(OldV, Head, SlotNumbers, Cntrs),
    TmpDir = filename:dirname(Fname),
    Reply = (catch file_sorter:sort(Input, Output, 
				    [{format, binary},{tmpdir, TmpDir}])),
    L = ets:tab2list(Cntrs),
    ets:delete(Cntrs),
    {Reply, lists:reverse(lists:keysort(1, L))}.

fsck_copy([{_LogSz, Pos, Bins, _NoObjects} | SizeData], Head, _Bulk, NoDups)
   when is_list(Bins) ->
    true = NoDups =:= 0,
    PWs = [{Pos,Bins} | lists:map(fun({_, P, B, _}) -> {P, B} end, SizeData)],
    #head{fptr = Fd, filename = FileName} = Head,
    dets_utils:pwrite(Fd, FileName, PWs),
    {ok, Head#head{update_mode = dirty}};
fsck_copy(SizeData, Head, Bulk, NoDups) ->
    catch fsck_copy1(SizeData, Head, Bulk, NoDups).

fsck_copy1([SzData | L], Head, Bulk, NoDups) ->
    Out = Head#head.fptr,
    {LogSz, Pos, {FileName, Fd}, NoObjects} = SzData,
    Size = if NoObjects =:= 0 -> 0; true -> ?POW(LogSz-1) end,
    ExpectedSize = Size * NoObjects,
    close_tmp(Fd),
    case file:position(Out, Pos) of
	{ok, Pos} -> ok;
	PError -> dets_utils:file_error(FileName, PError)
    end,
    {ok, Pos} = file:position(Out, Pos),
    CR = file:copy({FileName, [raw,binary]}, Out),
    file:delete(FileName),
    case CR of 
	{ok, Copied} when Copied =:= ExpectedSize;
			  NoObjects =:= 0 -> % the segments
	    fsck_copy1(L, Head, Bulk, NoDups);
	{ok, Copied} when Bulk, Head#head.version =:= 8 ->
	    NoZeros = ExpectedSize - Copied,
	    Dups = NoZeros div Size,
	    Addr = Pos+Copied,
	    NewHead = free_n_objects(Head, Addr, Size-1, NoDups),
	    NewNoDups = NoDups - Dups,
	    fsck_copy1(L, NewHead, Bulk, NewNoDups);
	{ok, _Copied} -> % should never happen
	    close_files(Bulk, L, Head),
	    Reason = if Bulk -> initialization_failed; 
			true -> repair_failed end,
            {error, {Reason, Head#head.filename}};
	FError ->
	    close_files(Bulk, L, Head),
	    dets_utils:file_error(FileName, FError)
    end;
fsck_copy1([], Head, _Bulk, NoDups) when NoDups =/= 0 ->
    {error, {initialization_failed, Head#head.filename}};
fsck_copy1([], Head, _Bulk, _NoDups) ->
    {ok, Head#head{update_mode = dirty}}. 

free_n_objects(Head, _Addr, _Size, 0) ->
    Head;
free_n_objects(Head, Addr, Size, N) ->
    {NewHead, _} = dets_utils:free(Head, Addr, Size),
    NewAddr = Addr + Size + 1,
    free_n_objects(NewHead, NewAddr, Size, N-1).

close_files(false, SizeData, Head) ->
    file:close(Head#head.fptr),
    close_files(true, SizeData, Head);
close_files(true, SizeData, _Head) ->
    Fun = fun({_Size, _Pos, {FileName, Fd}, _No}) ->
		  close_tmp(Fd),
		  file:delete(FileName);
	     (_) ->
		  ok
	  end,
    lists:foreach(Fun, SizeData).

close_tmp(Fd) ->
    file:close(Fd).

fslot(H, Slot) ->
    case catch begin
                   {NH, []} = write_cache(H),
                   Objs = (NH#head.mod):slot_objs(NH, Slot),
                   {NH, Objs}
               end of
        {NewHead, _Objects} = Reply when is_record(NewHead, head) ->
            Reply
    end.

do_update_counter(Head, _Key, _Incr) when Head#head.type =/= set ->
    {Head, badarg};
do_update_counter(Head, Key, Incr) ->
    case flookup_keys(Head, [Key]) of
	{H1, [O]} ->
	    Kp = H1#head.keypos,
	    case catch try_update_tuple(O, Kp, Incr) of
		{'EXIT', _} ->
		    {H1, badarg};
		{New, Term} ->
		    case finsert(H1, [Term]) of
			{H2, ok} -> 
			    {H2, New};
			Reply -> 
			    Reply
		    end
	    end;
	{H1, []} ->
	    {H1, badarg};
	HeadError ->
	    HeadError
    end.
    
try_update_tuple(O, _Kp, {Pos, Incr}) ->
    try_update_tuple2(O, Pos, Incr);
try_update_tuple(O, Kp, Incr) ->
    try_update_tuple2(O, Kp+1, Incr).

try_update_tuple2(O, Pos, Incr) ->
    New = element(Pos, O) + Incr,
    {New, setelement(Pos, O, New)}.

set_verbose(true) ->
    put(verbose, yes);
set_verbose(_) ->
    erase(verbose).

where_is_object(Head, Object) ->
    Keypos = Head#head.keypos,
    case check_objects([Object], Keypos) of
	true ->
	    case catch write_cache(Head) of
		{NewHead, []} ->
		    {NewHead, (Head#head.mod):find_object(NewHead, Object)};
		{NewHead, _} = HeadError when is_record(NewHead, head) ->
		    HeadError
	    end;
	false ->
	    {Head, badarg}
    end.

check_objects([T | Ts], Kp) when tuple_size(T) >= Kp ->
    check_objects(Ts, Kp);
check_objects(L, _Kp) ->
    L =:= [].

no_things(Head) when Head#head.no_keys =:= undefined ->
    Head#head.no_objects;
no_things(Head) ->
    Head#head.no_keys.

file_no_things(FH) when FH#fileheader.no_keys =:= undefined ->
    FH#fileheader.no_objects;
file_no_things(FH) ->
    FH#fileheader.no_keys.

%%% The write cache is list of {Key, [Item]} where Item is one of
%%% {Seq, delete_key}, {Seq, {lookup,Pid}}, {Seq, {delete_object,object()}}, 
%%% or {Seq, {insert,object()}}. Seq is a number that increases
%%% monotonically for each item put in the cache. The purpose is to
%%% make sure that items are sorted correctly. Sequences of delete and
%%% insert operations are inserted in the cache without doing any file
%%% operations. When the cache is considered full, a lookup operation
%%% is requested, or after some delay, the contents of the cache are
%%% written to the file, and the cache emptied.
%%%
%%% Data is not allowed to linger more than 'delay' milliseconds in
%%% the write cache. A delayed_write message is received when some
%%% datum has become too old. If 'wrtime' is equal to 'undefined',
%%% then the cache is empty and no such delayed_write message has been
%%% scheduled. Otherwise there is a delayed_write message scheduled,
%%% and the value of 'wrtime' is the time when the cache was last
%%% written, or when it was first updated after the cache was last
%%% written.

update_cache(Head, KeysOrObjects, What) ->
    {Head1, LU, PwriteList} = update_cache(Head, [{What,KeysOrObjects}]),
    {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList),
    {NewHead, LU}.

%% -> {NewHead, [object()], pwrite_list()} | throw({Head, Error})
update_cache(Head, ToAdd) ->
    Cache = Head#head.cache,
    #cache{cache = C, csize = Size0, inserts = Ins} = Cache,
    NewSize = Size0 + erlang:external_size(ToAdd),
    %% The size is used as a sequence number here; it increases monotonically.
    {NewC, NewIns, Lookup, Found} = 
	cache_binary(Head, ToAdd, C, Size0, Ins, false, []),
    NewCache = Cache#cache{cache = NewC, csize = NewSize, inserts = NewIns},
    Head1 = Head#head{cache = NewCache},
    if 
	Lookup; NewSize >= Cache#cache.tsize ->
	    %% The cache is considered full, or some lookup.
	    {NewHead, LU, PwriteList} = (Head#head.mod):write_cache(Head1),
	    {NewHead, Found ++ LU, PwriteList};
	NewC =:= [] ->
	    {Head1, Found, []};
	Cache#cache.wrtime =:= undefined ->
	    %% Empty cache. Schedule a delayed write.
	    Now = now(), Me = self(),
	    Call = ?DETS_CALL(Me, {delayed_write, Now}),
	    erlang:send_after(Cache#cache.delay, Me, Call),
	    {Head1#head{cache = NewCache#cache{wrtime = Now}}, Found, []};
	Size0 =:= 0 ->
	    %% Empty cache that has been written after the
	    %% currently scheduled delayed write.
	    {Head1#head{cache = NewCache#cache{wrtime = now()}}, Found, []};
	true ->
	    %% Cache is not empty, delayed write has been scheduled.
	    {Head1, Found, []}
    end.

cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu,F) when Q =:= delete_object ->
    cache_obj_op(Head, L, C, Seq, Ins, Lu, F, Os, Head#head.keypos, Q);
cache_binary(Head, [{Q,Os} | L], C, Seq, Ins, Lu, F) when Q =:= insert ->
    NewIns = Ins + length(Os),
    cache_obj_op(Head, L, C, Seq, NewIns, Lu, F, Os, Head#head.keypos, Q);
cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) when Q =:= delete_key ->
    cache_key_op(Head, L, C, Seq, Ins, Lu, F, Ks, Q);
cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, _Lu, F) when C =:= [] -> % lookup
    cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q);
cache_binary(Head, [{Q,Ks} | L], C, Seq, Ins, Lu, F) -> % lookup
    case dets_utils:cache_lookup(Head#head.type, Ks, C, []) of
	false ->
	    cache_key_op(Head, L, C, Seq, Ins, true, F, Ks, Q);
	Found ->
	    {lookup,Pid} = Q,
	    cache_binary(Head, L, C, Seq, Ins, Lu, [{Pid,Found} | F])
    end;
cache_binary(_Head, [], C, _Seq, Ins, Lu, F) ->
    {C, Ins, Lu, F}.

cache_key_op(Head, L, C, Seq, Ins, Lu, F, [K | Ks], Q) ->
    E = {K, {Seq, Q}},
    cache_key_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Ks, Q);
cache_key_op(Head, L, C, Seq, Ins, Lu, F, [], _Q) ->
    cache_binary(Head, L, C, Seq, Ins, Lu, F).

cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [O | Os], Kp, Q) ->
    E = {element(Kp, O), {Seq, {Q, O}}},
    cache_obj_op(Head, L, [E | C], Seq+1, Ins, Lu, F, Os, Kp, Q);
cache_obj_op(Head, L, C, Seq, Ins, Lu, F, [], _Kp, _Q) ->
    cache_binary(Head, L, C, Seq, Ins, Lu, F).

%% Called after some delay.
%% -> NewHead
delayed_write(Head, WrTime) ->
    Cache = Head#head.cache,
    LastWrTime = Cache#cache.wrtime,
    if
	LastWrTime =:= WrTime ->
	    %% The cache was not emptied during the last delay.
	    case catch write_cache(Head) of
		{Head2, []} ->
		    NewCache = (Head2#head.cache)#cache{wrtime = undefined},
		    Head2#head{cache = NewCache};
		{NewHead, _Error} -> % Head.update_mode has been updated
		    NewHead
	    end;
	true ->
	    %% The cache was emptied during the delay.
	    %% Has anything been written since then?
	    if 
		Cache#cache.csize =:= 0 ->
		    %% No, further delayed write not needed.
		    NewCache = Cache#cache{wrtime = undefined},
		    Head#head{cache = NewCache};
		true ->
		    %% Yes, schedule a new delayed write.
		    {MS1,S1,M1} = WrTime,
		    {MS2,S2,M2} = LastWrTime,
		    WrT = M1+1000000*(S1+1000000*MS1),
		    LastWrT = M2+1000000*(S2+1000000*MS2),
		    When = round((LastWrT - WrT)/1000), Me = self(),
		    Call = ?DETS_CALL(Me, {delayed_write, LastWrTime}),
		    erlang:send_after(When, Me, Call),
		    Head
	    end
    end.

%% -> {NewHead, [LookedUpObject]} | throw({NewHead, Error})
write_cache(Head) ->
    {Head1, LU, PwriteList} = (Head#head.mod):write_cache(Head),
    {NewHead, ok} = dets_utils:pwrite(Head1, PwriteList),
    {NewHead, LU}.

status(Head) ->
    case Head#head.update_mode of
	saved -> ok;
	dirty -> ok;
	new_dirty -> ok;
	Error -> Error
    end.

%%% Scan the file from start to end by reading chunks.

%% -> dets_cont()
init_scan(Head, NoObjs) ->
    check_safe_fixtable(Head),
    FreeLists = dets_utils:get_freelists(Head),
    Base = Head#head.base,
    {From, To} = dets_utils:find_next_allocated(FreeLists, Base, Base),
    #dets_cont{no_objs = NoObjs, bin = <<>>, alloc = {From, To, <<>>}}.

check_safe_fixtable(Head) ->
    case (Head#head.fixed =:= false) andalso 
         ((get(verbose) =:= yes) orelse dets_utils:debug_mode()) of
        true ->
            error_logger:format
              ("** dets: traversal of ~p needs safe_fixtable~n",
               [Head#head.name]);
        false ->
            ok
    end.

%% -> {[RTerm], dets_cont()} | {scan_error, Reason}
%% RTerm = {Pos, Next, Size, Status, Term}
scan(_Head, #dets_cont{alloc = <<>>}=C) ->
    {[], C};
scan(Head, C) -> % when is_record(C, dets_cont)
    #dets_cont{no_objs = No, alloc = L0, bin = Bin} = C,
    {From, To, L} = L0,
    R = case No of
	    default ->
		0;
	    _ when is_integer(No) ->
		-No-1
	end,
    scan(Bin, Head, From, To, L, [], R, {C, Head#head.type}).

scan(Bin, H, From, To, L, Ts, R, {C0, Type} = C) ->
    case (H#head.mod):scan_objs(H, Bin, From, To, L, Ts, R, Type) of
        {more, NFrom, NTo, NL, NTs, NR, Sz} ->
            scan_read(H, NFrom, NTo, Sz, NL, NTs, NR, C);
        {stop, <<>>=B, NFrom, NTo, <<>>=NL, NTs} ->
            Ftab = dets_utils:get_freelists(H),
            case dets_utils:find_next_allocated(Ftab, NFrom, H#head.base) of
                none ->
                    {NTs, C0#dets_cont{bin = eof, alloc = B}};
                _ -> 
                    {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}}
            end;
        {stop, B, NFrom, NTo, NL, NTs} ->
            {NTs, C0#dets_cont{bin = B, alloc = {NFrom, NTo, NL}}};
        bad_object ->
            {scan_error, dets_utils:bad_object(scan, {From, To, Bin})}
    end.

scan_read(_H, From, To, _Min, L0, Ts, 
	  R, {C, _Type}) when R >= ?CHUNK_SIZE ->
    %% We may have read (much) more than CHUNK_SIZE, if there are holes.
    L = {From, To, L0},
    {Ts, C#dets_cont{bin = <<>>, alloc = L}};
scan_read(H, From, _To, Min, _L, Ts, R, C) ->
    Max = if 
	      Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; 
	      true -> Min 
	  end,
    FreeLists = dets_utils:get_freelists(H),
    case dets_utils:find_allocated(FreeLists, From, Max, H#head.base) of
        <<>>=Bin0 ->
            {Cont, _} = C,
            {Ts, Cont#dets_cont{bin = eof, alloc = Bin0}};
        <<From1:32,To1:32,L1/binary>> ->
            case dets_utils:pread_n(H#head.fptr, From1, Max) of
                eof ->
                    {scan_error, premature_eof};
                NewBin ->
                    scan(NewBin, H, From1, To1, L1, Ts, R, C)
            end
    end.

err(Error) ->
    case get(verbose) of
	yes -> 
	    error_logger:format("** dets: failed with ~w~n", [Error]),
	    Error;
	undefined  ->
	    Error
    end.

%%%%%%%%%%%%%%%%%  DEBUG functions %%%%%%%%%%%%%%%%

file_info(FileName) ->
    case catch read_file_header(FileName, read, false) of
	{ok, Fd, FH} ->
	    file:close(Fd),
            (FH#fileheader.mod):file_info(FH);
	Other ->
	    Other
    end.

get_head_field(Fd, Field) ->
    dets_utils:read_4(Fd, Field).

%% Dump the contents of a DAT file to the tty
%% internal debug function which ignores the closed properly thingie
%% and just tries anyway

view(FileName) ->
    case catch read_file_header(FileName, read, false) of
        {ok, Fd, FH} ->
	    Mod = FH#fileheader.mod,
	    case Mod:check_file_header(FH, Fd) of
		{ok, H0, ExtraInfo} ->
		    Ftab = Mod:init_freelist(H0, ExtraInfo),
		    {_Bump, Base} = constants(FH, FileName),
		    H = H0#head{freelists=Ftab, base = Base},
		    v_free_list(H),
		    Mod:v_segments(H),
		    file:close(Fd);
		X ->
		    file:close(Fd),
		    X
	    end;
	X -> 
	    X
    end.

v_free_list(Head) ->
    io:format("FREE LIST ...... \n",[]),
    io:format("~p~n", [dets_utils:all_free(Head)]),
    io:format("END OF FREE LIST \n",[]).