%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2001-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(dets_v8).

%% Dets files, implementation part. This module handles versions up to
%% and including 8(c). To be called from dets.erl only.

-export([mark_dirty/1, read_file_header/2,
         check_file_header/2, do_perform_save/1, initiate_file/11,
         init_freelist/2, fsck_input/4,
         bulk_input/3, output_objs/4, write_cache/1, may_grow/3,
         find_object/2, re_hash/2, slot_objs/2, scan_objs/8,
         db_hash/2, no_slots/1, table_parameters/1]).

-export([file_info/1, v_segments/1]).

-export([cache_segps/3]).

%% For backward compatibility.
-export([sz2pos/1]).

-compile({inline, [{sz2pos,1},{scan_skip,7}]}).
-compile({inline, [{skip_bytes,5}, {get_segp,1}]}).
-compile({inline, [{wl_lookup,5}]}).
-compile({inline, [{actual_seg_size,0}]}).

-include("dets.hrl").

%%  The layout of the file is :
%%
%%   bytes   decsription
%%  ---------------------- File header
%%    4      FreelistsPointer
%%    4      Cookie
%%    4      ClosedProperly (pos=8)
%%    4      Type (pos=12)
%%    4      Version (pos=16)
%%    4      M
%%    4      Next
%%    4      KeyPos
%%    4      NoObjects
%%    4      N
%%  ------------------ end of file header
%%    4*8192 SegmentArray
%%  ------------------
%%    4*256  First segment
%%  ----------------------------- This is BASE.
%%    ???    Objects (free and alive)
%%    4*256  Second segment (2 kB now, due to a bug)
%%    ???    Objects (free and alive)
%%    ... more objects and segments ...
%%  -----------------------------
%%    ???    Free lists
%%  -----------------------------
%%    4      File size, in bytes. 

%%  The first slot (0) in the segment array always points to the
%%  pre-allocated first segment.
%%  Before we can find an object we must find the slot where the
%%  object resides. Each slot is a (possibly empty) list (or chain) of
%%  objects that hash to the same slot. If the value stored in the
%%  slot is zero, the slot chain is empty. If the slot value is
%%  non-zero, the value points to a position in the file where the
%%  chain starts. Each object in a chain has the following layout:
%%
%%   bytes  decsription
%%  --------------------
%%    4     Pointer to the next object of the chain.
%%    4     Size of the object in bytes (Sz).
%%    4     Status  (FREE or ACTIVE)
%%    Sz    Binary representing the object
%%
%%  The status field is used while repairing a file (but not next or size).
%%
%%|---------------|
%%|      head     |
%%|       	  |
%%|               |
%%|_______________|
%%|		  |------|
%%|___seg ptr1____|      |
%%|		  |      |
%%|__ seg ptr 2___|      |
%%|               |      |    segment 1
%%|	....	  |      V _____________
%%			 |		|
%%			 |		|
%%			 |___slot 0 ____|
%%                       |              |
%%                       |___slot 1 ____|-----|
%%			 |		|     |
%%			 |   .....	|     |  1:st obj in slot 1
%%					      V  segment 1
%%						|-----------|
%%						|  next     |
%%						|___________|
%%						|  size     |
%%						|___________|
%%						|  status   |
%%						|___________|
%%						|	    |
%%						|           |
%%						|   obj     |
%%						|           |

%%%
%%% File header
%%%

-define(HEADSZ, 40).          % The size of the file header, in bytes.
-define(SEGSZ, 256).          % Size of a segment, in words.
-define(SEGSZ_LOG2, 8).
-define(SEGARRSZ, 8192).      % Maximal number of segments.
-define(SEGADDR(SegN), (?HEADSZ + (4 * (SegN)))).
-define(BASE, ?SEGADDR((?SEGSZ + ?SEGARRSZ))).
-define(MAXOBJS, (?SEGSZ * ?SEGARRSZ)). % 2 M objects

-define(SLOT2SEG(S), ((S) bsr ?SEGSZ_LOG2)).

%% BIG is used for hashing. BIG must be greater than the maximum
%% number of slots, currently MAXOBJS.
-define(BIG, 16#ffffff).

%% Hard coded positions into the file header:
-define(FREELIST_POS, 0).
-define(CLOSED_PROPERLY_POS, 8).
-define(D_POS, 20).
-define(NO_OBJECTS_POS, (?D_POS + 12)).

%% The version of a dets file is indicated by the ClosedProperly
%% field. Version 6 was used in the R1A release, and version 7 in the
%% R1B release up to and including the R3B01 release. Both version 6
%% and version 7 indicate properly closed files by the value
%% CLOSED_PROPERLY.
%%
%% The current version, 8, has three sub-versions:
%%
%% - 8(a), indicated by the value CLOSED_PROPERLY (same as in versions 6 
%%         and 7), introduced in R3B02;
%% - 8(b), indicated by the value CLOSED_PROPERLY2(_NEED_COMPACTING),
%%         introduced in R5A and used up to and including R6A;
%% - 8(c), indicated by the value CLOSED_PROPERLY_NEW_HASH(_NEED_COMPACTING),
%%         in use since R6B.
%%
%% The difference between the 8(a) and the 8(b) versions is the format
%% used for free lists saved on dets files.
%% The 8(c) version uses a different hashing algorithm, erlang:phash
%% (former versions use erlang:hash).
%% Version 8(b) files are only converted to version 8(c) if repair is
%% done, so we need compatibility with 8(b) for a _long_ time.
%%
%% There are known bugs due to the fact that keys and objects are
%% sometimes compared (==) and sometimes matched (=:=). The version
%% used by default (9, see dets_v9.erl) does not have this problem.

-define(NOT_PROPERLY_CLOSED,0).
-define(CLOSED_PROPERLY,1).
-define(CLOSED_PROPERLY2,2).
-define(CLOSED_PROPERLY2_NEED_COMPACTING,3).
-define(CLOSED_PROPERLY_NEW_HASH,4).
-define(CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING,5).

-define(FILE_FORMAT_VERSION, 8).
-define(CAN_BUMP_BY_REPAIR, [6, 7]).
-define(CAN_CONVERT_FREELIST, [8]).

%%%
%%% Object header (next, size, status).
%%%

-define(OHDSZ, 12).         % The size of the object header, in bytes.
-define(STATUS_POS, 8).     % Position of the status field.

%% The size of each object is a multiple of 16.
%% BUMP is used when repairing files.
-define(BUMP, 16).

-define(ReadAhead, 512).

%%-define(DEBUGF(X,Y), io:format(X, Y)).
-define(DEBUGF(X,Y), void).

%% -> ok | throw({NewHead,Error})
mark_dirty(Head) ->
    Dirty = [{?CLOSED_PROPERLY_POS, <<?NOT_PROPERLY_CLOSED:32>>}],
    dets_utils:pwrite(Head, Dirty),
    dets_utils:sync(Head),
    dets_utils:position(Head, Head#head.freelists_p),
    dets_utils:truncate(Head, cur).

%% -> {ok, head()} | throw(Error)
initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, 
		Ram, CacheSz, Auto, _DoInitSegments) ->
    Freelist = 0,
    Cookie = ?MAGIC,
    ClosedProperly = ?NOT_PROPERLY_CLOSED, % immediately overwritten
    Version = ?FILE_FORMAT_VERSION,
    Factor = est_no_segments(MinSlots),
    N = 0,
    M = Next = ?SEGSZ * Factor,
    NoObjects = 0,
    dets_utils:pwrite(Fd, Fname, 0, 
                      <<Freelist:32,
                      Cookie:32,
                      ClosedProperly:32,
                      (dets_utils:type_to_code(Type)):32,
                      Version:32,
                      M:32,
                      Next:32,
                      Kp:32,
                      NoObjects:32,
                      N:32,
		      0:(?SEGARRSZ*4)/unit:8, % Initialize SegmentArray
		      0:(?SEGSZ*4)/unit:8>>), % Initialize first segment
    %% We must set the first slot of the segment pointer array to
    %% point to the first segment
    Pos = ?SEGADDR(0),
    SegP = (?HEADSZ + (4 * ?SEGARRSZ)),
    dets_utils:pwrite(Fd, Fname, Pos, <<SegP:32>>),
    segp_cache(Pos, SegP),

    Ftab = dets_utils:init_alloc(?BASE),
    H0 = #head{freelists=Ftab, fptr = Fd, base = ?BASE},
    {H1, Ws} = init_more_segments(H0, 1, Factor, undefined, []),

    %% This is not optimal but simple: always initiate the segments.
    dets_utils:pwrite(Fd, Fname, Ws),

    %% Return a new nice head structure
    Head = #head{
      m  = M,
      m2 = M * 2,
      next = Next,
      fptr = Fd,
      no_objects = NoObjects,
      n = N,
      type = Type,
      update_mode = dirty,
      freelists = H1#head.freelists,
      auto_save = Auto,
      hash_bif = phash,
      keypos = Kp,
      min_no_slots = Factor * ?SEGSZ,
      max_no_slots = no_segs(MaxSlots) * ?SEGSZ,
      
      ram_file = Ram, 
      filename = Fname, 
      name = Tab,
      cache = dets_utils:new_cache(CacheSz),
      version = Version,
      bump = ?BUMP,
      base = ?BASE,
      mod = ?MODULE
     },
    {ok, Head}.

est_no_segments(MinSlots) when 1 + ?SLOT2SEG(MinSlots) > ?SEGARRSZ ->
    ?SEGARRSZ;
est_no_segments(MinSlots) ->
    1 + ?SLOT2SEG(MinSlots).

init_more_segments(Head, SegNo, Factor, undefined, Ws) when SegNo < Factor ->
    init_more_segments(Head, SegNo, Factor, seg_zero(), Ws);
init_more_segments(Head, SegNo, Factor, SegZero, Ws) when SegNo < Factor ->
    {NewHead, W} = allocate_segment(Head, SegZero, SegNo),
    init_more_segments(NewHead, SegNo+1, Factor, SegZero, W++Ws);
init_more_segments(Head, _SegNo, _Factor, _SegZero, Ws) ->
    {Head, Ws}.

allocate_segment(Head, SegZero, SegNo) ->
    %% may throw error:
    {NewHead, Segment, _} = dets_utils:alloc(Head, 4 * ?SEGSZ),
    InitSegment = {Segment, SegZero},
    Pos = ?SEGADDR(SegNo),
    segp_cache(Pos, Segment),
    SegPointer = {Pos, <<Segment:32>>},
    {NewHead, [InitSegment, SegPointer]}.

%% Read free lists (using a Buddy System) from file. 
init_freelist(Head, {convert_freelist,_Version}) ->
    %% This function converts the saved freelist of the form
    %% [{Slot1,Addr1},{Addr1,Addr2},...,{AddrN,0},{Slot2,Addr},...]
    %% i.e each slot is a linked list which ends with a 0.
    %% This is stored in a bplus_tree per Slot.
    %% Each Slot is a position in a tuple.

    Ftab = dets_utils:empty_free_lists(),
    Pos = Head#head.freelists_p,
    case catch prterm(Head, Pos, ?OHDSZ) of
	{0, _Sz, Term}  ->
	    FreeList1 = lists:reverse(Term),
            FreeList = dets_utils:init_slots_from_old_file(FreeList1, Ftab),
            Head#head{freelists = FreeList, base = ?BASE};
	_ ->
	    throw({error, {bad_freelists, Head#head.filename}})
    end;
init_freelist(Head, _) ->
    %% bplus_tree stored as is
    Pos = Head#head.freelists_p,
    case catch prterm(Head, Pos, ?OHDSZ) of
	{0, _Sz, Term}  ->
            Head#head{freelists = Term, base = ?BASE};
	_ ->
	    throw({error, {bad_freelists, Head#head.filename}})
    end.

%% -> {ok, Fd, fileheader()} | throw(Error)
read_file_header(Fd, FileName) ->
    {ok, Bin} = dets_utils:pread_close(Fd, FileName, 0, ?HEADSZ),
    [Freelist, Cookie, CP, Type2, Version, M, Next, Kp, NoObjects, N] = 
	bin2ints(Bin),
    {ok, EOF} = dets_utils:position_close(Fd, FileName, eof),
    {ok, <<FileSize:32>>} = dets_utils:pread_close(Fd, FileName, EOF-4, 4),
    FH = #fileheader{freelist = Freelist,
                     fl_base = ?BASE,
		     cookie = Cookie,
		     closed_properly = CP,
		     type = dets_utils:code_to_type(Type2),
		     version = Version,
		     m = M,
		     next = Next,
		     keypos = Kp,
		     no_objects = NoObjects,
		     min_no_slots = ?DEFAULT_MIN_NO_SLOTS,
		     max_no_slots = ?DEFAULT_MAX_NO_SLOTS,
		     trailer = FileSize,
		     eof = EOF,
		     n = N,
		     mod = ?MODULE},
    {ok, Fd, FH}.

%% -> {ok, head(), ExtraInfo} | {error, Reason} (Reason lacking file name)
%% ExtraInfo = {convert_freelist, Version} | true | need_compacting 
check_file_header(FH, Fd) ->
    Test = 
	if
	    FH#fileheader.cookie =/= ?MAGIC ->
		{error, not_a_dets_file};
	    FH#fileheader.type =:= badtype ->
		{error, invalid_type_code};
	    FH#fileheader.version =/= ?FILE_FORMAT_VERSION -> 
		case lists:member(FH#fileheader.version,
                                  ?CAN_BUMP_BY_REPAIR) of
		    true ->
			{error, version_bump};
		    false ->
			{error, bad_version}
		end;
	    FH#fileheader.trailer =/= FH#fileheader.eof ->
		{error, not_closed};
	    FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY ->
		case lists:member(FH#fileheader.version,
				  ?CAN_CONVERT_FREELIST) of
		    true ->
			{ok, {convert_freelist, FH#fileheader.version}, hash};
		    false ->
			{error, not_closed} % should not happen
		end;
	    FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY2 ->
		{ok, true, hash};
	    FH#fileheader.closed_properly =:= 
	          ?CLOSED_PROPERLY2_NEED_COMPACTING  ->
		{ok, need_compacting, hash};
	    FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY_NEW_HASH ->
		{ok, true, phash};
	    FH#fileheader.closed_properly =:= 
	         ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING  ->
		{ok, need_compacting, phash};
	    FH#fileheader.closed_properly =:= ?NOT_PROPERLY_CLOSED ->
		{error, not_closed};
	    FH#fileheader.closed_properly > 
	         ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING ->
		{error, not_closed};
	    true ->
		{error, not_a_dets_file}
	end,
    case Test of
	{ok, ExtraInfo, HashAlg} ->
	    H = #head{
	      m = FH#fileheader.m,
	      m2 = FH#fileheader.m * 2,
	      next = FH#fileheader.next,
	      fptr = Fd,
	      no_objects= FH#fileheader.no_objects,
	      n = FH#fileheader.n,
	      type = FH#fileheader.type,
	      update_mode = saved,
	      auto_save = infinity,             % not saved on file
	      fixed = false,			% not saved on file
	      freelists_p = FH#fileheader.freelist,
	      hash_bif = HashAlg,
	      keypos = FH#fileheader.keypos,
	      min_no_slots = FH#fileheader.min_no_slots,
	      max_no_slots = FH#fileheader.max_no_slots,
	      version = ?FILE_FORMAT_VERSION,
	      mod = ?MODULE,
	      bump = ?BUMP,
	      base = FH#fileheader.fl_base},
	    {ok, H, ExtraInfo};
	Error ->
	    Error
    end.

cache_segps(Fd, FileName, M) ->
    NSegs = no_segs(M),
    {ok, Bin} = dets_utils:pread_close(Fd, FileName, ?HEADSZ, 4 * NSegs),
    Fun = fun(S, P) -> segp_cache(P, S), P+4 end,
    lists:foldl(Fun, ?HEADSZ, bin2ints(Bin)).

no_segs(NoSlots) ->
    ?SLOT2SEG(NoSlots - 1) + 1.

bin2ints(<<Int:32, B/binary>>) ->
    [Int | bin2ints(B)];
bin2ints(<<>>) ->
    [].

%%%
%%% Repair, conversion and initialization of a dets file.
%%%

bulk_input(Head, InitFun, Cntrs) ->
    bulk_input(Head, InitFun, Cntrs, make_ref()).

bulk_input(Head, InitFun, Cntrs, Ref) ->
    fun(close) ->
	    ok;
       (read) ->
	    case catch {Ref, InitFun(read)} of
		{Ref, end_of_input} ->
		    end_of_input;
		{Ref, {L0, NewInitFun}} when is_list(L0), 
                                             is_function(NewInitFun) ->
		    Kp = Head#head.keypos,
		    case catch bulk_objects(L0, Head, Cntrs, Kp, []) of
			{'EXIT', _Error} ->
			    _ = (catch NewInitFun(close)),
			    {error, invalid_objects_list};
			L ->
			    {L, bulk_input(Head, NewInitFun, Cntrs, Ref)}
		    end;
		{Ref, Value} ->
		    {error, {init_fun, Value}};
		Error ->
		    throw({thrown, Error})
	    end
    end.

bulk_objects([T | Ts], Head, Cntrs, Kp, L) ->
    BT = term_to_binary(T),
    Sz = byte_size(BT),
    LogSz = sz2pos(Sz+?OHDSZ),
    count_object(Cntrs, LogSz),
    Key = element(Kp, T),
    bulk_objects(Ts, Head, Cntrs, Kp, [make_object(Head, Key, LogSz, BT) | L]);
bulk_objects([], _Head, _Cntrs, _Kp, L) ->
    L.

-define(FSCK_SEGMENT, 10000).

-define(DCT(D, CT), [D | CT]).

-define(VNEW(N, E), erlang:make_tuple(N, E)).
-define(VSET(I, V, E), setelement(I, V, E)).
-define(VGET(I, V), element(I, V)).

%% OldVersion not used, assuming later versions have been converted already.
output_objs(OldVersion, Head, SlotNumbers, Cntrs) ->
    fun(close) ->
	    {ok, 0, Head};
       ([]) ->
	    output_objs(OldVersion, Head, SlotNumbers, Cntrs);
       (L) ->
	    %% Descending sizes.
	    Count = lists:sort(ets:tab2list(Cntrs)),
	    RCount = lists:reverse(Count),
	    NoObjects = lists:foldl(fun({_Sz,No}, A) -> A + No end, 0, Count),
	    {_, MinSlots, _} = SlotNumbers,
	    if
		%% Using number of objects for bags and duplicate bags
		%% is not ideal; number of (unique) keys should be
		%% used instead. The effect is that there will be more
		%% segments than "necessary".
		MinSlots =/= bulk_init,
		abs(?SLOT2SEG(NoObjects) - ?SLOT2SEG(MinSlots)) > 5,
		(NoObjects < ?MAXOBJS) ->
		    {try_again, NoObjects};
		true ->
                    Head1 = Head#head{no_objects = NoObjects},
		    SegSz = actual_seg_size(),
		    {_, End, _} = dets_utils:alloc(Head, SegSz-1),
		    %% Now {LogSize,NoObjects} in Cntrs is replaced by
		    %% {LogSize,Position,{FileName,FileDescriptor},NoObjects}.
		    {Head2, CT} = allocate_all_objects(Head1, RCount, Cntrs),
		    [E | Es] = bin2term(L, []),
		    {NE, Acc, DCT1} = 
			output_slots(E, Es, [E], Head2, ?DCT(0, CT)),
		    NDCT = write_all_sizes(DCT1, Cntrs),
		    Max = ets:info(Cntrs, size),
                    output_objs2(NE, Acc, Head2, Cntrs, NDCT, End, Max,Max)
	    end
    end.

output_objs2(E, Acc, Head, Cntrs, DCT, End, 0, MaxNoChunks) ->
    NDCT = write_all_sizes(DCT, Cntrs),
    output_objs2(E, Acc, Head, Cntrs, NDCT, End, MaxNoChunks, MaxNoChunks);
output_objs2(E, Acc, Head, Cntrs, DCT, End, ChunkI, MaxNoChunks) ->
    fun(close) ->
	    DCT1 = output_slot(Acc, Head, DCT),
	    NDCT = write_all_sizes(DCT1, Cntrs),
	    ?DCT(NoDups, CT) = NDCT,
	    [SegAddr | []] = ?VGET(tuple_size(CT), CT),
            FinalZ = End - SegAddr,
            [{?FSCK_SEGMENT, _, {FileName, Fd}, _}] = 
		ets:lookup(Cntrs, ?FSCK_SEGMENT),
	    ok = dets_utils:fwrite(Fd, FileName, 
				   dets_utils:make_zeros(FinalZ)),
            NewHead = Head#head{no_objects = Head#head.no_objects - NoDups},
	    {ok, NoDups, NewHead};
       (L) ->
	    Es = bin2term(L, []),
	    {NE, NAcc, NDCT} = output_slots(E, Es, Acc, Head, DCT),
	    output_objs2(NE, NAcc, Head, Cntrs, NDCT, End, 
			 ChunkI-1, MaxNoChunks)
    end.

%% By allocating bigger objects before smaller ones, holes in the
%% buddy system memory map are avoided. Unfortunately, the segments
%% are always allocated first, so if there are objects bigger than a
%% segment, there is a hole to handle. (Haven't considered placing the
%% segments among other objects of the same size.)
allocate_all_objects(Head, Count, Cntrs) ->
    SegSize = actual_seg_size(),
    {Head1, HSz, HN, HA} = alloc_hole(Count, Head, SegSize),
    {Max, _} = hd(Count),
    CT = ?VNEW(Max+1, not_used),
    {Head2, NCT} = allocate_all(Head1, Count, Cntrs, CT),
    Head3 = free_hole(Head2, HSz, HN, HA),
    {Head3, NCT}.

alloc_hole([{LSize,_} | _], Head, SegSz) when ?POW(LSize-1) > SegSz ->
    {_, SegAddr, _} = dets_utils:alloc(Head, SegSz-1),
    Size = ?POW(LSize-1)-1,
    {_, Addr, _} = dets_utils:alloc(Head, Size),
    N = (Addr - SegAddr) div SegSz,
    Head1 = dets_utils:alloc_many(Head, SegSz, N, SegAddr),
    {Head1, SegSz-1, N, SegAddr};
alloc_hole(_Count, Head, _SegSz) ->
    {Head, 0, 0, 0}.

free_hole(Head, _Size, 0, _Addr) ->
    Head;
free_hole(Head, Size, N, Addr) ->
    {Head1, _} = dets_utils:free(Head, Addr, Size),
    free_hole(Head1, Size, N-1, Addr+Size+1).

%% One (temporary) file for each buddy size, write all objects of that
%% size to the file.
allocate_all(Head, [{LSize,NoObjects} | Count], Cntrs, CT) ->
    Size = ?POW(LSize-1)-1,
    {_Head, Addr, _} = dets_utils:alloc(Head, Size),
    NewHead = dets_utils:alloc_many(Head, Size+1, NoObjects, Addr),
    {FileName, Fd} = temp_file(Head, LSize),
    true = ets:insert(Cntrs, {LSize, Addr, {FileName, Fd}, NoObjects}),
    NCT = ?VSET(LSize, CT, [Addr | []]),
    allocate_all(NewHead, Count, Cntrs, NCT);
allocate_all(Head, [], Cntrs, CT) ->
    %% Note that space for the segments has been allocated already.
    %% And one file for the segments...
    {FileName, Fd} = temp_file(Head, ?FSCK_SEGMENT),
    Addr = ?SEGADDR(?SEGARRSZ),
    true = ets:insert(Cntrs, {?FSCK_SEGMENT, Addr, {FileName, Fd}, 0}),
    NCT = ?VSET(tuple_size(CT), CT, [Addr | []]),
    {Head, NCT}.

temp_file(Head, N) ->
    TmpName = lists:concat([Head#head.filename, '.', N]),
    {ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]),
    {TmpName, Fd}.

bin2term([<<Slot:32, LogSize:8, BinTerm/binary>> | BTs], L) ->
    bin2term(BTs, [{Slot, LogSize, BinTerm} | L]);
bin2term([], L) ->
    lists:reverse(L).

write_all_sizes(?DCT(D, CT), Cntrs) ->
    ?DCT(D, write_sizes(1, tuple_size(CT), CT, Cntrs)).

write_sizes(Sz, Sz, CT, Cntrs) ->
    write_size(Sz, ?FSCK_SEGMENT, CT, Cntrs);
write_sizes(Sz, MaxSz, CT, Cntrs) ->
    NCT = write_size(Sz, Sz, CT, Cntrs),
    write_sizes(Sz+1, MaxSz, NCT, Cntrs).

write_size(Sz, I, CT, Cntrs) ->
    case ?VGET(Sz, CT) of
	not_used ->
	    CT;
	[Addr | L] ->
	    {FileName, Fd} = ets:lookup_element(Cntrs, I, 3),
	    case file:write(Fd, lists:reverse(L)) of
		ok ->
		    ?VSET(Sz, CT, [Addr | []]);
		Error ->
		    dets_utils:file_error(FileName, Error)
	    end
    end.

output_slots(E, [E1 | Es], Acc, Head, DCT) 
                       when element(1, E) =:= element(1, E1) ->
    output_slots(E1, Es, [E1 | Acc], Head, DCT);
output_slots(_E, [E | L], Acc, Head, DCT) ->
    NDCT = output_slot(Acc, Head, DCT),
    output_slots(E, L, [E], Head, NDCT);
output_slots(E, [], Acc, _Head, DCT) ->
    {E, Acc, DCT}.

output_slot([E], _Head, ?DCT(D, CT)) ->
    ?DCT(D, output_slot([{foo, E}], 0, foo, CT));
output_slot(Es0, Head, ?DCT(D, CT)) ->
    Kp = Head#head.keypos,
    Fun = fun({_Slot, _LSize, BinTerm} = E) -> 
		  Key = element(Kp, binary_to_term(BinTerm)),
		  {Key, E}
	  end,
    Es = lists:map(Fun, Es0),
    NEs = case Head#head.type of
	      set ->
		  [{Key0,_} = E | L0] = lists:sort(Es),
		  choose_one(lists:sort(L0), Key0, [E]);
	      bag -> 
		  lists:usort(Es);
	      duplicate_bag -> 
		  lists:sort(Es)
	  end,
    Dups = D + length(Es) - length(NEs),
    ?DCT(Dups, output_slot(NEs, 0, foo, CT)).

choose_one([{Key,_} | Es], Key, L) ->
    choose_one(Es, Key, L);
choose_one([{Key,_} = E | Es], _Key, L) ->
    choose_one(Es, Key, [E | L]);
choose_one([], _Key, L) ->
    L.

output_slot([E | Es], Next, _Slot, CT) ->
    {_Key, {Slot, LSize, BinTerm}} = E,
    Size = byte_size(BinTerm),
    Size2 = ?POW(LSize-1),
    Pad = <<0:(Size2-Size-?OHDSZ)/unit:8>>,
    BinObject = [<<Next:32, Size:32, ?ACTIVE:32>>, BinTerm | Pad],
    [Addr | L] = ?VGET(LSize, CT),
    NCT = ?VSET(LSize, CT, [Addr+Size2 | [BinObject | L]]),
    output_slot(Es, Addr, Slot, NCT);
output_slot([], Next, Slot, CT) ->
    I = tuple_size(CT),
    [Addr | L] = ?VGET(I, CT),
    {Pos, _} = slot_position(Slot),
    NoZeros = Pos - Addr,
    BinObject = if 
		    NoZeros > 100 ->
			[dets_utils:make_zeros(NoZeros) | <<Next:32>>];
		    true ->
			<<0:NoZeros/unit:8,Next:32>>
   	        end,
    Size = NoZeros+4,
    ?VSET(I, CT, [Addr+Size | [BinObject | L]]).

%% Does not close Fd.
fsck_input(Head, Fd, Cntrs, _FileHeader) ->
    %% The file is not compressed, so the object size cannot exceed
    %% the filesize, for all objects.
    MaxSz = case file:position(Fd, eof) of
                {ok, Pos} ->
                    Pos;
                _ ->
                    (1 bsl 32) - 1
            end,
    State0 = fsck_read(?BASE, Fd, []),
    fsck_input1(Head, State0, Fd, MaxSz, Cntrs).

fsck_input1(Head, State, Fd, MaxSz, Cntrs) ->
    fun(close) ->
	    ok;
       (read) ->
	    case State of
		done ->
		    end_of_input;
		{done, L} ->
		    R = count_input(Cntrs, L, []),
		    {R, fsck_input1(Head, done, Fd, MaxSz, Cntrs)};
		{cont, L, Bin, Pos} ->
		    R = count_input(Cntrs, L, []),
                    FR = fsck_objs(Bin, Head#head.keypos, Head, []),
		    NewState = fsck_read(FR, Pos, Fd, MaxSz, Head),
		    {R, fsck_input1(Head, NewState, Fd, MaxSz, Cntrs)}
	    end
    end.

%% The ets table Cntrs is used for counting objects per size.
count_input(Cntrs, [[LogSz | B] | Ts], L) ->
    count_object(Cntrs, LogSz),
    count_input(Cntrs, Ts, [B | L]);
count_input(_Cntrs, [], L) ->
    L.

count_object(Cntrs, LogSz) ->
    case catch ets:update_counter(Cntrs, LogSz, 1) of
	N when is_integer(N) -> ok;
	_Badarg -> true = ets:insert(Cntrs, {LogSz, 1})
    end.

fsck_read(Pos, F, L) ->
    case file:position(F, Pos) of
	{ok, _} ->
	    read_more_bytes(<<>>, 0, Pos, F, L);
	_Error ->
	    {done, L}
    end.

fsck_read({more, Bin, Sz, L}, Pos, F, MaxSz, Head) when Sz > MaxSz ->
    FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L),
    fsck_read(FR, Pos, F, MaxSz, Head);
fsck_read({more, Bin, Sz, L}, Pos, F, _MaxSz, _Head) ->
    read_more_bytes(Bin, Sz, Pos, F, L);
fsck_read({new, Skip, L}, Pos, F, _MaxSz, _Head) ->
    NewPos = Pos + Skip,
    fsck_read(NewPos, F, L).

read_more_bytes(B, Min, Pos, F, L) ->
    Max = if 
	      Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; 
	      true -> Min 
	  end,
    case dets_utils:read_n(F, Max) of
	eof ->
	    {done, L};
	Bin ->
	    NewPos = Pos + byte_size(Bin),
	    {cont, L, list_to_binary([B, Bin]), NewPos}
    end.

fsck_objs(Bin = <<_N:32, Sz:32, Status:32, Tail/binary>>, Kp, Head, L) ->
    if 
	Status =:= ?ACTIVE ->
	    case Tail of
		<<BinTerm:Sz/binary, Tail2/binary>> ->
		    case catch element(Kp, binary_to_term(BinTerm)) of
			{'EXIT', _} ->
			    skip_bytes(Bin, ?BUMP, Kp, Head, L);
			Key ->
			    LogSz = sz2pos(Sz+?OHDSZ),
			    Obj = make_object(Head, Key, LogSz, BinTerm),
			    NL = [[LogSz | Obj] | L],
			    Skip = ?POW(LogSz-1) - Sz - ?OHDSZ,
			    skip_bytes(Tail2, Skip, Kp, Head, NL)
		    end;
		_ ->
                    {more, Bin, Sz, L}
	    end;
	true -> 
	    skip_bytes(Bin, ?BUMP, Kp, Head, L)
    end;
fsck_objs(Bin, _Kp, _Head, L) ->
    {more, Bin, 0, L}.
    
%% Version 8 has to know about version 9.
make_object(Head, Key, _LogSz, BT) when Head#head.version =:= 9 ->
    Slot = dets_v9:db_hash(Key, Head),
    <<Slot:32, BT/binary>>;
make_object(Head, Key, LogSz, BT) ->
    Slot = db_hash(Key, Head),
    <<Slot:32, LogSz:8, BT/binary>>.

%% Inlined.
skip_bytes(Bin, Skip, Kp, Head, L) ->
    case Bin of
	<<_:Skip/binary, Tail/binary>> ->
	    fsck_objs(Tail, Kp, Head, L);
	_ ->
            {new, Skip - byte_size(Bin), L}
    end.

%% -> {NewHead, ok} | throw({Head, Error})
do_perform_save(H) ->
    FL = dets_utils:get_freelists(H),
    B = term_to_binary(FL),
    Size = byte_size(B),
    ?DEBUGF("size of freelist = ~p~n", [Size]),
    ?DEBUGF("head.m = ~p~n", [H#head.m]),
    ?DEBUGF("head.no_objects = ~p~n", [H#head.no_objects]),

    {ok, Pos} = dets_utils:position(H, eof),
    H1 = H#head{freelists_p = Pos},
    W1 = {?FREELIST_POS, <<Pos:32>>},
    W2 = {Pos, [<<0:32, Size:32, ?FREE:32>>, B]},
    
    W3 = {?D_POS, <<(H1#head.m):32, 
	            (H1#head.next):32, 
	            (H1#head.keypos):32,
	            (H1#head.no_objects):32,
		    (H1#head.n):32>>},
    {ClosedProperly, ClosedProperlyNeedCompacitng} = 
	case H1#head.hash_bif of
	    hash ->
		{?CLOSED_PROPERLY2, ?CLOSED_PROPERLY2_NEED_COMPACTING};
	    phash ->
		{?CLOSED_PROPERLY_NEW_HASH, 
		 ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING}
	end,
    W4 = 
	if 
	    Size > 1000, Size > H1#head.no_objects ->
		{?CLOSED_PROPERLY_POS, 
		 <<ClosedProperlyNeedCompacitng:32>>};
	    true ->
		{?CLOSED_PROPERLY_POS, <<ClosedProperly:32>>}
	end,
    W5 = {?FILE_FORMAT_VERSION_POS, <<?FILE_FORMAT_VERSION:32>>},
    {H2, ok} = dets_utils:pwrite(H1, [W1,W2,W3,W4,W5]),
    {ok, Pos2} = dets_utils:position(H2, eof),
    ?DEBUGF("Writing file size ~p, eof at ~p~n", [Pos2+4, Pos2]),
    dets_utils:pwrite(H2, [{Pos2, <<(Pos2 + 4):32>>}]).

%% -> [term()] | throw({Head, Error})
slot_objs(H, Slot) when Slot >= H#head.next ->
    '$end_of_table';
slot_objs(H, Slot) ->
    {_Pos, Chain} = chain(H, Slot),
    collect_chain(H, Chain).

collect_chain(_H, 0) -> [];
collect_chain(H, Pos) ->
    {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead),
    [Term | collect_chain(H, Next)].

db_hash(Key, Head) ->
    H = h(Key, Head#head.hash_bif),
    Hash = H rem Head#head.m,
    if
	Hash < Head#head.n ->
	    H rem (Head#head.m2); % H rem (2 * m)
	true ->
	    Hash
    end.

h(I, phash) -> erlang:phash(I, ?BIG) - 1;
h(I, HF) -> erlang:HF(I, ?BIG) - 1. %% stupid BIF has 1 counts.

no_slots(_Head) ->
    undefined.

table_parameters(_Head) ->
    undefined.

%% Re-hashing a segment, starting with SlotStart.
%%
%% On the average, half of the objects of the chain are put into a new
%% chain. If the slot of the old chain is i, then the slot of the new
%% chain is i+m.
%% Note that the insertion of objects into the new chain is simplified
%% by the fact that the chains are not sorted on key, which means that
%% each moved object can be inserted first in the new chain.
%% (It is also a fact that the objects with the same key are not sorted.)
%%
%% -> {ok, Writes} | throw({Head, Error})
re_hash(Head, SlotStart) ->
    {SlotPos, _4} = slot_position(SlotStart),
    {ok, Bin} = dets_utils:pread(Head, SlotPos, 4*?SEGSZ, 0),
    {Read, Cs} = split_bin(SlotPos, Bin, [], []),
    re_hash_read(Head, [], Read, Cs).

split_bin(Pos, <<P:32, B/binary>>, R, Cs) ->
    if
	P =:= 0 ->
	    split_bin(Pos+4, B, R, Cs);
	true ->
	    split_bin(Pos+4, B, [{P,?ReadAhead} | R], [[Pos] | Cs])
    end;
split_bin(_Pos, <<>>, R, Cs) ->
    {R, Cs}.

re_hash_read(Head, Cs, R, RCs) ->
    {ok, Bins} = dets_utils:pread(R, Head),
    re_hash_read(Head, R, RCs, Bins, Cs, [], []).

re_hash_read(Head, [{Pos, Size} | Ps], [C | Cs], 
	     [<<Next:32, Sz:32, _Status:32, Bin0/binary>> | Bins], 
	     DoneCs, R, RCs) ->
    case byte_size(Bin0) of
	BinSz when BinSz >= Sz ->
	    case catch binary_to_term(Bin0) of
		{'EXIT', _Error} ->
		    throw(dets_utils:corrupt_reason(Head, bad_object));
		Term ->
		    Key = element(Head#head.keypos, Term),
		    New = h(Key, Head#head.hash_bif) rem Head#head.m2,
		    NC = case New >= Head#head.m of
			     true -> [{Pos,New} | C];
			     false -> [Pos | C]
			 end,
		    if
			Next =:= 0 ->
			    NDoneCs = [NC | DoneCs], 
			    re_hash_read(Head, Ps, Cs, Bins, NDoneCs, R, RCs);
			true ->
			    NR = [{Next,?ReadAhead} | R],
			    NRCs = [NC | RCs],
			    re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, NRCs)
		    end
	    end;
	BinSz when Size =:= BinSz+?OHDSZ ->
	    NR = [{Pos, Sz+?OHDSZ} | R],
	    re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, [C | RCs]);
	_BinSz ->
	    throw({Head, {error, {premature_eof, Head#head.filename}}})
    end;
re_hash_read(Head, [], [], [], Cs, [], []) ->
    re_hash_traverse_chains(Cs, Head, [], [], []);
re_hash_read(Head, [], [], [], Cs, R, RCs) ->
    re_hash_read(Head, Cs, R, RCs).

re_hash_traverse_chains([C | Cs], Head, Rs, Ns, Ws) ->
    case re_hash_find_new(C, Rs, start, start) of
	false ->
	    re_hash_traverse_chains(Cs, Head, Rs, Ns, Ws);
	{NRs, FirstNew, LastNew} -> 
	    LastInNew = case C of
			    [{_,_} | _] -> true;
			    _ -> false
			end,
	    N = {FirstNew, LastNew, LastInNew},
	    NWs = re_hash_link(C, start, start, start, Ws),
	    re_hash_traverse_chains(Cs, Head, NRs, [N | Ns], NWs)
    end;
re_hash_traverse_chains([], Head, Rs, Ns, Ws) ->
    {ok, Bins} = dets_utils:pread(Rs, Head),
    {ok, insert_new(Rs, Bins, Ns, Ws)}.

re_hash_find_new([{Pos,NewSlot} | C], R, start, start) ->
    {SPos, _4} = slot_position(NewSlot),
    re_hash_find_new(C, [{SPos,4} | R], Pos, Pos);
re_hash_find_new([{Pos,_SPos} | C], R, _FirstNew, LastNew) ->
    re_hash_find_new(C, R, Pos, LastNew);
re_hash_find_new([_Pos | C], R, FirstNew, LastNew) ->
    re_hash_find_new(C, R, FirstNew, LastNew);
re_hash_find_new([], _R, start, start) ->
    false;
re_hash_find_new([], R, FirstNew, LastNew) ->
    {R, FirstNew, LastNew}.

re_hash_link([{Pos,_SPos} | C], LastOld, start, _LastInNew, Ws) ->
    re_hash_link(C, LastOld, Pos, true, Ws);
re_hash_link([{Pos,_SPos} | C], LastOld, LastNew, false, Ws) ->
    re_hash_link(C, LastOld, Pos, true, [{Pos,<<LastNew:32>>} | Ws]);
re_hash_link([{Pos,_SPos} | C], LastOld, _LastNew, LastInNew, Ws) ->
    re_hash_link(C, LastOld, Pos, LastInNew, Ws);
re_hash_link([Pos | C], start, LastNew, true, Ws) ->
    re_hash_link(C, Pos, LastNew, false, [{Pos,<<0:32>>} | Ws]);
re_hash_link([Pos | C], LastOld, LastNew, true, Ws) ->
    re_hash_link(C, Pos, LastNew, false, [{Pos,<<LastOld:32>>} | Ws]);
re_hash_link([Pos | C], _LastOld, LastNew, LastInNew, Ws) ->
    re_hash_link(C, Pos, LastNew, LastInNew, Ws);
re_hash_link([], _LastOld, _LastNew, _LastInNew, Ws) ->
    Ws.

insert_new([{NewSlotPos,_4} | Rs], [<<P:32>> = PB | Bins], [N | Ns], Ws) ->
    {FirstNew, LastNew, LastInNew} = N,
    Ws1 = case P of
	      0 when LastInNew ->
		  Ws;
	      0 ->
		  [{LastNew, <<0:32>>} | Ws];
	      _ ->
		  [{LastNew, PB} | Ws]
	  end,
    NWs = [{NewSlotPos, <<FirstNew:32>>} | Ws1],
    insert_new(Rs, Bins, Ns, NWs);
insert_new([], [], [], Ws) ->
    Ws.

%% When writing the cache, a 'work list' is first created:
%%   WorkList = [{Key, {Delete,Lookup,[Inserted]}}]
%%   Delete = keep | delete
%%   Lookup = skip | lookup
%%   Inserted = {object(), No}
%%   No = integer()
%% If No =< 0 then there will be -No instances of object() on the file
%% when the cache has been written. If No > 0 then No instances of
%% object() will be added to the file.
%% If Delete has the value 'delete', then all objects with the key Key
%% have been deleted. (This could be viewed as a shorthand for {Object,0}
%% for each object Object on the file not mentioned in some Inserted.)
%% If Lookup has the value 'lookup', all objects with the key Key will
%% be returned.
%%

%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error})
write_cache(Head) ->
    #head{cache = C, type = Type} = Head,
    case dets_utils:is_empty_cache(C) of
	true -> {Head, [], []};
	false ->
	    {NewC, _MaxInserts, PerKey} = dets_utils:reset_cache(C),
	    %% NoInsertedKeys is an upper limit on the number of new keys.
	    {WL, NoInsertedKeys} = make_wl(PerKey, Type),
	    Head1 = Head#head{cache = NewC},
	    case may_grow(Head1, NoInsertedKeys, once) of
		{Head2, ok} ->
		    eval_work_list(Head2, WL);
		HeadError ->
		    throw(HeadError)
	    end
    end.

make_wl(PerKey, Type) ->
    make_wl(PerKey, Type, [], 0).

make_wl([{Key,L} | PerKey], Type, WL, Ins) ->
    [Cs | I] = wl(L, Type),
    make_wl(PerKey, Type, [{Key,Cs} | WL], Ins+I);
make_wl([], _Type, WL, Ins) ->
    {WL, Ins}.

wl(L, Type) ->
    wl(L, Type, keep, skip, 0, []).

wl([{_Seq, delete_key} | Cs], Type, _Del, Lookup, _I, _Objs) ->
    wl(Cs, Type, delete, Lookup, 0, []);
wl([{_Seq, {delete_object, Object}} | Cs], Type, Del, Lookup, I, Objs) ->
    NObjs = lists:keydelete(Object, 1, Objs),
    wl(Cs, Type, Del, Lookup, I, [{Object,0} | NObjs]);
wl([{_Seq, {insert, Object}} | Cs], Type, _Del, Lookup, _I, _Objs) 
                    when Type =:= set ->
    wl(Cs, Type, delete, Lookup, 1, [{Object,-1}]);
wl([{_Seq, {insert, Object}} | Cs], Type, Del, Lookup, _I, Objs) ->
    NObjs = 
	case lists:keyfind(Object, 1, Objs) of
	    {_, 0} ->
		lists:keyreplace(Object, 1, Objs, {Object,-1});
	    {_, _C} when Type =:= bag -> % C =:= 1; C =:= -1
		Objs;
	    {_, C} when C < 0 -> % when Type =:= duplicate_bag
		lists:keyreplace(Object, 1, Objs, {Object,C-1});
	    {_, C} -> % when C > 0, Type =:= duplicate_bag
		lists:keyreplace(Object, 1, Objs, {Object,C+1});
	    false when Del =:= delete ->
		[{Object, -1} | Objs];
	    false ->
		[{Object, 1} | Objs]
	end,
    wl(Cs, Type, Del, Lookup, 1, NObjs);
wl([{_Seq, {lookup,_Pid}=Lookup} | Cs], Type, Del, _Lookup, I, Objs) ->
    wl(Cs, Type, Del, Lookup, I, Objs);
wl([], _Type, Del, Lookup, I, Objs) ->
    [{Del, Lookup, Objs} | I].

%% -> {NewHead, ok} | {NewHead, Error}
may_grow(Head, 0, once) ->
    {Head, ok};
may_grow(Head, _N, _How) when Head#head.fixed =/= false ->
    {Head, ok};
may_grow(#head{access = read}=Head, _N, _How) ->
    {Head, ok};
may_grow(Head, _N, _How) when Head#head.next >= ?MAXOBJS ->
    {Head, ok};
may_grow(Head, N, How) ->
    Extra = erlang:min(2*?SEGSZ, Head#head.no_objects + N - Head#head.next),
    case catch may_grow1(Head, Extra, How) of
	{error, Reason} -> % alloc may throw error
	    {Head, {error, Reason}};
	Reply ->
	    Reply
    end.

may_grow1(Head, Extra, many_times) when Extra > ?SEGSZ ->
    Reply = grow(Head, 1, undefined),
    self() ! ?DETS_CALL(self(), may_grow),
    Reply;
may_grow1(Head, Extra, _How) ->    
    grow(Head, Extra, undefined).

%% -> {Head, ok} | throw({Head, Error})
grow(Head, Extra, _SegZero) when Extra =< 0 ->
    {Head, ok};
grow(Head, Extra, undefined) ->
    grow(Head, Extra, seg_zero());
grow(Head, Extra, SegZero) ->
    #head{n = N, next = Next, m = M} = Head,
    SegNum = ?SLOT2SEG(Next),
    {Head0, Ws1} = allocate_segment(Head, SegZero, SegNum),
    {Head1, ok} = dets_utils:pwrite(Head0, Ws1),
    %% If re_hash fails, segp_cache has been called, but it does not matter.
    {ok, Ws2} = re_hash(Head1, N),
    {Head2, ok} = dets_utils:pwrite(Head1, Ws2),
    NewHead =
	if 
	    N + ?SEGSZ =:= M ->
		Head2#head{n = 0, next = Next + ?SEGSZ, m = 2 * M, m2 = 4 * M};
	    true ->
		Head2#head{n = N + ?SEGSZ, next = Next + ?SEGSZ}
	end,
    grow(NewHead, Extra - ?SEGSZ, SegZero).

seg_zero() ->
    <<0:(4*?SEGSZ)/unit:8>>.

find_object(Head, Object) ->
    Key = element(Head#head.keypos, Object),
    Slot = db_hash(Key, Head),
    find_object(Head, Object, Slot).    

find_object(H, _Obj, Slot) when Slot >= H#head.next ->
    false;
find_object(H, Obj, Slot) ->
    {_Pos, Chain} = chain(H, Slot),
    case catch find_obj(H, Obj, Chain) of
	{ok, Pos} ->
	    {ok, Pos};
	_Else ->
	    false
    end.

find_obj(H, Obj, Pos) when Pos > 0 ->
    {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead),
    if 
	Term == Obj ->
	    {ok, Pos};
	true ->
	    find_obj(H, Obj, Next)
    end.

%% Given, a slot, return the {Pos, Chain} in the file where the
%% objects hashed to this slot reside. Pos is the position in the
%% file where the chain pointer is written and Chain is the position
%% in the file where the first object resides.
chain(Head, Slot) ->
    Pos = ?SEGADDR(?SLOT2SEG(Slot)),
    Segment = get_segp(Pos),
    FinalPos = Segment + (4 * ?REM2(Slot, ?SEGSZ)),
    {ok, <<Chain:32>>} = dets_utils:pread(Head, FinalPos, 4, 0),
    {FinalPos, Chain}.

%%%
%%% Cache routines depending on the dets file format.
%%%

%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error})
eval_work_list(Head, WorkLists) ->
    SWLs = tag_with_slot(WorkLists, Head, []),
    P1 = dets_utils:family(SWLs), 
    {PerSlot, SlotPositions} = remove_slot_tag(P1, [], []),
    {ok, Bins} = dets_utils:pread(SlotPositions, Head),
    first_object(PerSlot, SlotPositions, Bins, Head, [], [], [], []).

tag_with_slot([{K,_} = WL | WLs], Head, L) ->
    tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]);
tag_with_slot([], _Head, L) ->
    L.

remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) ->
    remove_slot_tag(SSWLs, [SWLs | Ls], [slot_position(S) | SPs]);
remove_slot_tag([], Ls, SPs) ->
    {Ls, SPs}.

%% The initial chain pointers and the first object in each chain are
%% read "in parallel", that is, with one call to file:pread/2 (two
%% calls altogether). The following chain objects are read one by
%% one. This is a compromise: if the chains are long and threads are
%% active, it would be faster to keep a state for each chain and read
%% the objects of the chains in parallel, but the overhead would be
%% quite substantial.

first_object([WorkLists | SPs], [{P1,_4} | Ss], [<<P2:32>> | Bs], Head,
	      ObjsToRead, ToRead, Ls, LU) when P2 =:= 0 ->
    L0 = [{old,P1}],
    {L, NLU} = eval_slot(Head, ?ReadAhead, P2, WorkLists, L0, LU),
    first_object(SPs, Ss, Bs, Head, ObjsToRead, ToRead, [L | Ls], NLU);
first_object([WorkLists | SPs], [{P1,_4} | Ss], [<<P2:32>> | Bs], Head, 
	      ObjsToRead, ToRead, Ls, LU) ->
    E = {P1,P2,WorkLists},
    first_object(SPs, Ss, Bs, Head, 
		 [E | ObjsToRead], [{P2, ?ReadAhead} | ToRead], Ls, LU);
first_object([], [], [], Head, ObjsToRead, ToRead, Ls, LU) ->
    {ok, Bins} = dets_utils:pread(ToRead, Head),
    case catch eval_first(Bins, ObjsToRead, Head, Ls, LU) of
	{ok, NLs, NLU} -> 
	    case create_writes(NLs, Head, [], 0) of
		{Head1, [], 0} ->
		    {Head1, NLU, []};
		{Head1, Ws, No} ->
		    {NewHead, Ws2} = update_no_objects(Head1, Ws, No),
		    {NewHead, NLU, Ws2}
	    end;
	_Error -> 
	    throw(dets_utils:corrupt_reason(Head, bad_object))
    end.

%% Update no_objects on the file too, if the number of segments that
%% dets:fsck/6 use for estimate has changed.
update_no_objects(Head, Ws, 0) -> {Head, Ws};
update_no_objects(Head, Ws, Delta) ->
    No = Head#head.no_objects,
    NewNo = No + Delta,
    NWs = 
	if 
	    NewNo > ?MAXOBJS ->
		Ws;
	    ?SLOT2SEG(No) =:= ?SLOT2SEG(NewNo) ->
		Ws;
	    true ->
		[{?NO_OBJECTS_POS, <<NewNo:32>>} | Ws]
	end,
    {Head#head{no_objects = NewNo}, NWs}.

eval_first([<<Next:32, Sz:32, _Status:32, Bin/binary>> | Bins], 
	   [SP | SPs], Head, Ls, LU) ->
    {P1, P2, WLs} = SP,
    L0 = [{old,P1}],
    case byte_size(Bin) of
	BinSz when BinSz >= Sz ->
	    Term = binary_to_term(Bin),
	    Key = element(Head#head.keypos, Term),
	    {L, NLU} = find_key(Head, P2, Next, Sz, Term, Key, WLs, L0, LU),
	    eval_first(Bins, SPs, Head, [L | Ls], NLU);
	_BinSz ->
	    {L, NLU} = eval_slot(Head, Sz+?OHDSZ, P2, WLs, L0, LU),
	    eval_first(Bins, SPs, Head, [L | Ls], NLU)
    end;
eval_first([], [], _Head, Ls, LU) ->
    {ok, Ls, LU}.

eval_slot(_Head, _TrySize, _Pos=0, [], L, LU) ->
    {L, LU};
eval_slot(Head, _TrySize, Pos=0, [WL | WLs], L, LU) ->
    {_Key, {_Delete, LookUp, Objects}} = WL,
    {NL, NLU} = end_of_key(Objects, LookUp, L, []),
    eval_slot(Head, ?ReadAhead, Pos, WLs, NL, NLU++LU);
eval_slot(Head, TrySize, Pos, WLs, L, LU) ->
    {NextPos, Size, Term} = prterm(Head, Pos, TrySize),
    Key = element(Head#head.keypos, Term),
    find_key(Head, Pos, NextPos, Size, Term, Key, WLs, L, LU).

find_key(Head, Pos, NextPos, Size, Term, Key, WLs, L, LU) ->
    case lists:keyfind(Key, 1, WLs) of
	{_, {Delete, LookUp, Objects}} = WL ->
	    NWLs = lists:delete(WL, WLs),
	    {NewObjects, NL, LUK} = eval_object(Size, Term, Delete, LookUp, 
						Objects, Head, Pos, L, []),
	    eval_key(Key, Delete, LookUp, NewObjects, Head, NextPos, 
		     NWLs, NL, LU, LUK);
	false ->
	    L0 = [{old,Pos} | L],
	    eval_slot(Head, ?ReadAhead, NextPos, WLs, L0, LU)
    end.

eval_key(_Key, _Delete, Lookup, _Objects, Head, Pos, WLs, L, LU, LUK) 
                            when Head#head.type =:= set ->
    NLU = case Lookup of
	      {lookup, Pid} -> [{Pid,LUK} | LU];
	      skip -> LU
	  end,
    eval_slot(Head, ?ReadAhead, Pos, WLs, L, NLU);
eval_key(_Key, _Delete, LookUp, Objects, Head, Pos, WLs, L, LU, LUK) 
                                                          when Pos =:= 0 ->
    {NL, NLU} = end_of_key(Objects, LookUp, L, LUK),
    eval_slot(Head, ?ReadAhead, Pos, WLs, NL, NLU++LU);
eval_key(Key, Delete, LookUp, Objects, Head, Pos, WLs, L, LU, LUK) ->
    {NextPos, Size, Term} = prterm(Head, Pos, ?ReadAhead),
    case element(Head#head.keypos, Term) of
	Key ->
	    {NewObjects, NL, LUK1} = 
		eval_object(Size, Term, Delete, LookUp,Objects,Head,Pos,L,LUK),
	    eval_key(Key, Delete, LookUp, NewObjects, Head, NextPos, WLs, 
		     NL, LU, LUK1);
	Key2 ->
	    {L1, NLU} = end_of_key(Objects, LookUp, L, LUK),
	    find_key(Head, Pos, NextPos, Size, Term, Key2, WLs, L1, NLU++LU)
    end.

%% All objects in Objects have the key Key.
eval_object(Size, Term, Delete, LookUp, Objects, Head, Pos, L, LU) ->
    Type = Head#head.type,
    case lists:keyfind(Term, 1, Objects) of
	{_Object, N} when N =:= 0 ->
	    L1 = [{delete,Pos,Size} | L],
	    {Objects, L1, LU};
	{_Object, N} when N < 0, Type =:= set ->
	    L1 = [{old,Pos} | L],
	    wl_lookup(LookUp, Objects, Term, L1, LU);
	{Object, _N} when Type =:= bag -> % when N =:= 1; N =:= -1
	    L1 = [{old,Pos} | L],
	    Objects1 = lists:keydelete(Object, 1, Objects),
	    wl_lookup(LookUp, Objects1, Term, L1, LU);
	{Object, N} when N < 0, Type =:= duplicate_bag ->
	    L1 = [{old,Pos} | L],
	    Objects1 = lists:keyreplace(Object, 1, Objects, {Object,N+1}),
	    wl_lookup(LookUp, Objects1, Term, L1, LU);
	{_Object, N} when N > 0, Type =:= duplicate_bag ->
	    L1 = [{old,Pos} | L],
	    wl_lookup(LookUp, Objects, Term, L1, LU);
	false when Type =:= set, Delete =:= delete ->
	    case lists:keyfind(-1, 2, Objects) of
		false -> % no inserted object, perhaps deleted objects
		    L1 = [{delete,Pos,Size} | L],
		    {[], L1, LU};
		{Term2, -1} ->
		    Bin2 = term_to_binary(Term2),
		    NSize = byte_size(Bin2),
		    Overwrite = 
			if
			    NSize =:= Size ->
				true;
			    true ->
				SizePos = sz2pos(Size+?OHDSZ),
				NSizePos = sz2pos(NSize+?OHDSZ),
				SizePos =:= NSizePos
			end,
		    E = if 
			    Overwrite ->
				{overwrite,Bin2,Pos};
			    true ->
				{replace,Bin2,Pos,Size}
			end,
		    wl_lookup(LookUp, [], Term2, [E | L], LU)
	    end;
	false when Delete =:= delete ->
	    L1 = [{delete,Pos,Size} | L],
	    {Objects, L1, LU};
	false ->
	    L1 = [{old,Pos} | L],
	    wl_lookup(LookUp, Objects, Term, L1, LU)
    end.

%% Inlined.
wl_lookup({lookup,_}, Objects, Term, L, LU) ->
    {Objects, L, [Term | LU]};
wl_lookup(skip, Objects, _Term, L, LU) ->
    {Objects, L, LU}.

end_of_key([{Object,N0} | Objs], LookUp, L, LU) when N0 =/= 0 ->
    N = abs(N0),
    NL = [{insert,N,term_to_binary(Object)} | L],
    NLU = case LookUp of 
	      {lookup, _} ->
		  lists:duplicate(N, Object) ++ LU;
	      skip ->
		  LU
	  end,
    end_of_key(Objs, LookUp, NL, NLU);
end_of_key([_ | Objects], LookUp, L, LU) ->
    end_of_key(Objects, LookUp, L, LU);
end_of_key([], {lookup,Pid}, L, LU) ->
    {L, [{Pid,LU}]};
end_of_key([], skip, L, LU) ->
    {L, LU}.

create_writes([L | Ls], H, Ws, No) ->
    {NH, NWs, NNo} = create_writes(L, H, Ws, No, 0, true),
    create_writes(Ls, NH, NWs, NNo);
create_writes([], H, Ws, No) ->
    {H, lists:reverse(Ws), No}.

create_writes([{old,Pos} | L], H, Ws, No, _Next, true) ->
    create_writes(L, H, Ws, No, Pos, true);
create_writes([{old,Pos} | L], H, Ws, No, Next, false) ->
    W = {Pos, <<Next:32>>},
    create_writes(L, H, [W | Ws], No, Pos, true);
create_writes([{insert,N,Bin} | L], H, Ws, No, Next, _NextIsOld) ->
    {NH, NWs, Pos} = create_inserts(N, H, Ws, Next, byte_size(Bin), Bin),
    create_writes(L, NH, NWs, No+N, Pos, false);
create_writes([{overwrite,Bin,Pos} | L], H, Ws, No, Next, _) ->
    Size = byte_size(Bin),
    W = {Pos, [<<Next:32, Size:32, ?ACTIVE:32>>, Bin]},
    create_writes(L, H, [W | Ws], No, Pos, true);
create_writes([{replace,Bin,Pos,OSize} | L], H, Ws, No, Next, _) ->
    Size = byte_size(Bin),
    {H1, _} = dets_utils:free(H, Pos, OSize+?OHDSZ),
    {NH, NewPos, _} = dets_utils:alloc(H1, ?OHDSZ + Size),
    W1 = {NewPos, [<<Next:32, Size:32, ?ACTIVE:32>>, Bin]},
    NWs = if 
	      Pos =:= NewPos -> 
		  [W1 | Ws];
	      true -> 
		  W2 = {Pos+?STATUS_POS, <<?FREE:32>>},
		  [W1,W2 | Ws]
	  end,
    create_writes(L, NH, NWs, No, NewPos, false);
create_writes([{delete,Pos,Size} | L], H, Ws, No, Next, _) ->
    {NH, _} = dets_utils:free(H, Pos, Size+?OHDSZ),
    NWs = [{Pos+?STATUS_POS,<<?FREE:32>>} | Ws],
    create_writes(L, NH, NWs, No-1, Next, false);
create_writes([], H, Ws, No, _Next, _NextIsOld) ->
    {H, Ws, No}.

create_inserts(0, H, Ws, Next, _Size, _Bin) ->
    {H, Ws, Next};
create_inserts(N, H, Ws, Next, Size, Bin) ->
    {NH, Pos, _} = dets_utils:alloc(H, ?OHDSZ + Size),
    W = {Pos, [<<Next:32, Size:32, ?ACTIVE:32>>, Bin]},
    create_inserts(N-1, NH, [W | Ws], Pos, Size, Bin).

slot_position(S) ->
    Pos = ?SEGADDR(?SLOT2SEG(S)),
    Segment = get_segp(Pos),
    FinalPos = Segment + (4 * ?REM2(S, ?SEGSZ)),
    {FinalPos, 4}.

%% Twice the size of a segment due to the bug in sz2pos/1. Inlined.
actual_seg_size() ->
    ?POW(sz2pos(?SEGSZ*4)-1).

segp_cache(Pos, Segment) ->
    put(Pos, Segment).

%% Inlined.
get_segp(Pos) ->
    get(Pos).

%% Bug: If Sz0 is equal to 2**k for some k, then 2**(k+1) bytes are
%% allocated (wasting 2**k bytes).
sz2pos(N) ->
    1 + dets_utils:log2(N+1).

scan_objs(_Head, Bin, From, To, L, Ts, R, _Type) ->
    scan_objs(Bin, From, To, L, Ts, R).

scan_objs(Bin, From, To, L, Ts, -1) ->
    {stop, Bin, From, To, L, Ts};
scan_objs(B = <<_N:32, Sz:32, St:32, T/binary>>, From, To, L, Ts, R) ->
    if 
	St =:= ?ACTIVE;
	St =:= ?FREE -> % deleted after scanning started
	    case T of
		<<BinTerm:Sz/binary, T2/binary>> ->
		    NTs = [BinTerm | Ts],
		    OSz = Sz + ?OHDSZ,
		    Skip = ?POW(sz2pos(OSz)-1) - OSz,
		    F2 = From + OSz,
		    NR = if 
			     R < 0 ->
				 R + 1;
			     true ->
				 R + OSz + Skip
			 end,
		    scan_skip(T2, F2, To, Skip, L, NTs, NR);
		_ ->
                    {more, From, To, L, Ts, R, Sz+?OHDSZ}
	    end;
	true -> % a segment
	    scan_skip(B, From, To, actual_seg_size(), L, Ts, R)
    end;
scan_objs(_B, From, To, L, Ts, R) ->
    {more, From, To, L, Ts, R, 0}.

scan_skip(Bin, From, To, Skip, L, Ts, R) when From + Skip < To ->
    SkipPos = From + Skip,
    case Bin of
	<<_:Skip/binary, Tail/binary>> ->
	    scan_objs(Tail, SkipPos, To, L, Ts, R);
	_ ->
            {more, SkipPos, To, L, Ts, R, 0}
    end;
scan_skip(Bin, From, To, Skip, L, Ts, R) when From + Skip =:= To ->
    scan_next_allocated(Bin, From, To, L, Ts, R);
scan_skip(_Bin, From, _To, Skip, L, Ts, R) -> % when From + Skip > _To 
    From1 = From + Skip,
    {more, From1, From1, L, Ts, R, 0}.

scan_next_allocated(_Bin, _From, To, <<>>=L, Ts, R) ->
    {more, To, To, L, Ts, R, 0};
scan_next_allocated(Bin, From0, _To, <<From:32, To:32, L/binary>>, Ts, R) ->
    Skip = From - From0,
    scan_skip(Bin, From0, To, Skip, L, Ts, R).

%% Read term from file at position Pos
prterm(Head, Pos, ReadAhead) ->
    Res = dets_utils:pread(Head, Pos, ?OHDSZ, ReadAhead),
    ?DEBUGF("file:pread(~p, ~p, ?) -> ~p~n", [Head#head.filename, Pos, Res]),
    {ok, <<Next:32, Sz:32, _Status:32, Bin0/binary>>} = Res,
    ?DEBUGF("{Next, Sz} = ~p~n", [{Next, Sz}]),
    Bin = case byte_size(Bin0) of
	      Actual when Actual >= Sz ->
		  Bin0;
	      _ ->
		  {ok, Bin1} = dets_utils:pread(Head, Pos +  ?OHDSZ, Sz, 0),
		  Bin1
	  end,
    Term = binary_to_term(Bin),
    {Next, Sz, Term}.

%%%%%%%%%%%%%%%%%  DEBUG functions %%%%%%%%%%%%%%%%

file_info(FH) ->
    #fileheader{closed_properly = CP, keypos = Kp,
                m = M, next = Next, n = N, version = Version,
                type = Type, no_objects = NoObjects} 
        = FH,
    if
        CP =:= 0 ->
            {error, not_closed};
        FH#fileheader.cookie =/= ?MAGIC ->
            {error, not_a_dets_file};
        FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
            {error, bad_version};
        true ->
            {ok, [{closed_properly,CP},{keypos,Kp},{m, M},
                  {n,N},{next,Next},{no_objects,NoObjects},
                  {type,Type},{version,Version}]}
    end.

v_segments(H) ->
    v_segments(H, 0).

v_segments(_H, ?SEGARRSZ) ->
    done;
v_segments(H, SegNo) ->
    Seg = dets_utils:read_4(H#head.fptr, ?SEGADDR(SegNo)),
    if
	Seg =:= 0 ->
	    done;
	true ->
	    io:format("SEGMENT ~w ", [SegNo]),
	    io:format("At position ~w~n", [Seg]),
	    v_segment(H, SegNo, Seg, 0),
	    v_segments(H, SegNo+1)
    end.

v_segment(_H, _, _SegPos, ?SEGSZ) ->
    done;
v_segment(H, SegNo, SegPos, SegSlot) ->
    Slot = SegSlot + (SegNo * ?SEGSZ),
    Chain = dets_utils:read_4(H#head.fptr, SegPos + (4 * SegSlot)),
    if 
	Chain =:= 0 ->  %% don't print empty chains
	    true;
	true ->
	    io:format("   <~p>~p: [",[SegPos + (4 * SegSlot), Slot]),
	    print_chain(H, Chain)
    end,
    v_segment(H, SegNo, SegPos, SegSlot+1).

print_chain(_H, 0) ->
    io:format("] \n", []);
print_chain(H, Pos) ->
    {ok, _} = file:position(H#head.fptr, Pos),
    case rterm(H#head.fptr) of
	{ok, 0, _Sz, Term} ->
	    io:format("<~p>~p] \n",[Pos, Term]);
	{ok, Next, _Sz, Term} ->
	    io:format("<~p>~p, ", [Pos, Term]),
	    print_chain(H, Next);
	Other ->
	    io:format("~nERROR ~p~n", [Other])
    end.

%% Can't be used at the bucket level!!!!
%% Only when we go down a chain
rterm(F) ->
    case catch rterm2(F) of
	{'EXIT', Reason} -> %% truncated DAT file 
	    dets_utils:vformat("** dets: Corrupt or truncated dets file~n", 
                               []), 
	    {error, Reason};
	Other -> 
	    Other
    end.

rterm2(F) ->
    {ok, <<Next:32, Sz:32, _:32>>} = file:read(F, ?OHDSZ),
    {ok, Bin} = file:read(F, Sz),
    Term = binary_to_term(Bin),
    {ok, Next, Sz, Term}.