%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2001-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(dets_v9).
%% Dets files, implementation part. This module handles version 9.
%% To be called from dets.erl only.
-export([mark_dirty/1, read_file_header/2,
check_file_header/2, do_perform_save/1, initiate_file/11,
prep_table_copy/9, init_freelist/2, fsck_input/4,
bulk_input/3, output_objs/4, bchunk_init/2,
try_bchunk_header/2, compact_init/3, read_bchunks/2,
write_cache/1, may_grow/3, find_object/2, slot_objs/2,
scan_objs/8, db_hash/2, no_slots/1, table_parameters/1]).
-export([file_info/1, v_segments/1]).
-export([cache_segps/3]).
-compile({inline, [{max_objsize,1},{maxobjsize,1}]}).
-compile({inline, [{write_segment_file,6}]}).
-compile({inline, [{sz2pos,1},{adjsz,1}]}).
-compile({inline, [{skip_bytes,6},{make_object,4}]}).
-compile({inline, [{segp_cache,2},{get_segp,1},{get_arrpart,1}]}).
-compile({inline, [{h,2}]}).
-include("dets.hrl").
%% The layout of the file is :
%%
%% bytes decsription
%% ---------------------- File header
%% 4 FreelistsPointer
%% 4 Cookie
%% 4 ClosedProperly (pos=8)
%% 4 Type (pos=12)
%% 4 Version (pos=16)
%% 4 M
%% 4 Next
%% 4 KeyPos
%% 4 NoObjects
%% 4 NoKeys
%% 4 MinNoSlots
%% 4 MaxNoSlots
%% 4 HashMethod
%% 4 N
%% ---
%% 256 Version 9(a): Reserved for future versions. Initially zeros.
%% Version 9(b) has instead:
%% 112 28 counters for the buddy system sizes 2^4 to 2^31.
%% 144 Reserved for future versions. Initially zeros.
%% Version 9(c) has instead:
%% 112 28 counters for the buddy system sizes (as for 9(b)).
%% 16 MD5-sum for the 44 plus 112 bytes before the MD5-sum.
%% (FreelistsPointer, Cookie and ClosedProperly are not digested.)
%% 128 Reserved for future versions. Initially zeros.
%% Version 9(d), introduced in R15A, has instead:
%% 112 28 counters for the buddy system sizes (as for 9(b)).
%% 16 MD5-sum for the 44 plus 112 bytes before the MD5-sum.
%% (FreelistsPointer, Cookie and ClosedProperly are not digested.)
%% 4 Base of the buddy system.
%% 0 (zero) if the base is equal to ?BASE. Compatible with R14B.
%% File size at the end of the file is RealFileSize - Base.
%% The reason for modifying file size is that when a file created
%% by R15 is read by R14 a repair takes place immediately, which
%% is acceptable when downgrading.
%% 124 Reserved for future versions. Initially zeros.
%% ---
%% ------------------ end of file header
%% 4*256 SegmentArray Pointers.
%% ------------------ This is BASE.
%% 4*512 SegmentArray Part 1
%% ... More SegmentArray Parts
%% 8*256 First segment
%% ??? Objects (free and alive)
%% 4*512 Further SegmentArray Part.
%% ??? Objects (free and alive)
%% 8*256 Further segment.
%% ??? Objects (free and alive)
%% ... more objects, segment array parts, and segments ...
%% -----------------------------
%% ??? Free lists
%% -----------------------------
%% 4 File size, in bytes. See 9(d) obove.
%% Before we can find an object we must find the slot where the
%% object resides. Each slot is a (possibly empty) list (or chain) of
%% objects that hash to the same slot. If the value stored in the
%% slot is zero, the slot chain is empty. If the slot value is
%% non-zero, the value points to a position in the file where the
%% collection of objects resides. Each collection has the following
%% layout:
%%
%% bytes decsription
%% --------------------
%% 4 Size of the area allocated for the collection (8+Sz)
%% 4 Status (FREE or ACTIVE). These two are the Object Header.
%% Sz A binary containing the objects per key, sorted on key.
%%
%% When repairing or converting a file, the status field is used.
%%
%% The binary containing the objects per key of a table of type 'set'
%% has the following layout:
%%
%% bytes decsription
%% --------------------
%% 4 Size of the object of the first key (4+OSz1)
%% OSz1 The object of the first key
%% ...
%% 4 Size of the object of the ith key (4+OSzi)
%% OSzi The object of the ith key
%%
%% The binary containing the objects per key of a table of type 'bag'
%% or 'duplicate_bag' has the following layout:
%%
%% bytes decsription
%% ----------------------
%% 4 Size of the objects of the first key (4 + OSz1_1+...+OSz1_j+...)
%% 4 Size of the first object of the first key (4+OSz1_1)
%% OSz1_1 The first object of the first key
%% ...
%% 4 Size of the jth object of the first key (4+OSz1_j)
%% OSz1_j The jth object of the first key
%% ...
%% 4 Size of the objects of the ith key (4 + OSzi_1+...+OSzi_k+...)
%% 4 Size of the first object of the ith key (4+OSzi_1)
%% OSzi_1 The first object of the ith key
%% ...
%% 4 Size of the kth object of the ith key (4+OSzi_k)
%% OSzi_k The kth object of the ith key
%% ...
%%
%% The objects of a key are placed in time order, that is, the older
%% objects come first. If a new object is inserted, it is inserted
%% last.
%%
%%
%%
%%|---------------|
%%| head |
%%| |
%%| |
%%|_______________|
%%| |--|
%%|___part ptr 1__| |
%%| | | segarr part 1
%%|___part ptr 2__| V______________|
%%| | | p1 |
%%| | |______________|--|
%%| .... | | p2 | |
%% (256) |______________| |
%% | | |
%% | .... | | segment 1
%% | (512) | V __slot 0 ____|
%% | size |
%% | pointer |--|
%% |___slot 1 ____| |
%% | | |
%% | .... | | objects in slot 0
%% (256) V segment 1
%% |___________|
%% | size |
%% |___________|
%% | status |
%% |___________|
%% | |
%% | object |
%% | collec. |
%% |___________|
%%%
%%% File header
%%%
-define(RESERVED, 124). % Reserved for future use.
-define(COLL_CNTRS, (28*4)). % Counters for the buddy system.
-define(MD5SZ, 16).
-define(FL_BASE, 4).
-define(HEADSZ, 56+?COLL_CNTRS % The size of the file header, in bytes,
+?MD5SZ+?FL_BASE). % not including the reserved part.
-define(HEADEND, (?HEADSZ+?RESERVED)).
% End of header and reserved area.
-define(SEGSZ, 512). % Size of a segment, in words. SZOBJP*SEGSZP.
-define(SEGSZP, 256). % Size of a segment, in number of pointers.
-define(SEGSZP_LOG2, 8).
-define(SEGOBJSZ, (4 * ?SZOBJP)).
-define(SEGPARTSZ, 512). % Size of segment array part, in words.
-define(SEGPARTSZ_LOG2, 9).
-define(SEGARRSZ, 256). % Maximal number of segment array parts..
-define(SEGARRADDR(PartN), (?HEADEND + (4 * (PartN)))).
-define(SEGPARTADDR(P,SegN), ((P) + (4 * ?REM2(SegN, ?SEGPARTSZ)))).
-define(BASE, ?SEGARRADDR(?SEGARRSZ)).
-define(MAXSLOTS, (?SEGARRSZ * ?SEGPARTSZ * ?SEGSZP)).
-define(SLOT2SEG(S), ((S) bsr ?SEGSZP_LOG2)).
-define(SEG2SEGARRPART(S), ((S) bsr ?SEGPARTSZ_LOG2)).
-define(PHASH, 0).
-define(PHASH2, 1).
%% BIG is used for hashing. BIG must be greater than the maximum
%% number of slots, currently 32 M (MAXSLOTS).
-define(BIG, 16#3ffffff). % 64 M
%% Hard coded positions into the file header:
-define(FREELIST_POS, 0).
-define(CLOSED_PROPERLY_POS, 8).
-define(D_POS, 20).
%%% Dets file versions up to 8 are handled in dets_v8. This module
%%% handles version 9, introduced in R8.
%%%
%%% Version 9(a) tables have 256 reserved bytes in the file header,
%%% all initialized to zero.
%%% Version 9(b) tables use the first 112 of these bytes for storing
%%% number of objects for each size of the buddy system. An empty 9(b)
%%% table cannot be distinguished from an empty 9(a) table.
%%% 9(c) has an MD5-sum for the file header.
-define(FILE_FORMAT_VERSION, 9).
-define(NOT_PROPERLY_CLOSED,0).
-define(CLOSED_PROPERLY,1).
%% Size of object pointer, in words. SEGSZ = SZOBJP * SEGSZP.
-define(SZOBJP, 2).
-define(OHDSZ, 8). % The size of the object header, in bytes.
-define(STATUS_POS, 4). % Position of the status field.
-define(OHDSZ_v8, 12). % The size of the version 8 object header.
%% The size of each object is a multiple of 16.
%% BUMP is used when repairing files.
-define(BUMP, 16).
%%% '$hash' is the value of HASH_PARMS in R8, '$hash2' is the value in R9.
%%%
%%% The fields of the ?HASH_PARMS records are the same, but having
%%% different tags makes bchunk_init on R8 nodes reject data from R9
%%% nodes, and vice versa. This is overkill, and due to an oversight.
%%% What should have been done in R8 was to check the hash method, not
%%% only the type of the table and the key position. R8 nodes cannot
%%% handle the phash2 method.
-define(HASH_PARMS, '$hash2').
-define(BCHUNK_FORMAT_VERSION, 1).
-record(?HASH_PARMS, {
file_format_version,
bchunk_format_version,
file, type, keypos, hash_method,
n,m,next,
min,max,
no_objects,no_keys,
no_colls % [{LogSz,NoColls}], NoColls >= 0
}).
-define(ACTUAL_SEG_SIZE, (?SEGSZ*4)).
-define(MAXBUD, 32).
%%-define(DEBUGF(X,Y), io:format(X, Y)).
-define(DEBUGF(X,Y), void).
%% -> ok | throw({NewHead,Error})
mark_dirty(Head) ->
Dirty = [{?CLOSED_PROPERLY_POS, <<?NOT_PROPERLY_CLOSED:32>>}],
dets_utils:pwrite(Head, Dirty),
dets_utils:sync(Head),
dets_utils:position(Head, Head#head.freelists_p),
dets_utils:truncate(Head, cur).
%% -> {ok, head()} | throw(Error) | throw(badarg)
prep_table_copy(Fd, Tab, Fname, Type, Kp, Ram, CacheSz, Auto, Parms) ->
case Parms of
#?HASH_PARMS{file_format_version = ?FILE_FORMAT_VERSION,
bchunk_format_version = ?BCHUNK_FORMAT_VERSION,
n = N, m = M, next = Next,
min = Min, max = Max,
hash_method = HashMethodCode,
no_objects = NoObjects, no_keys = NoKeys,
no_colls = _NoColls}
when is_integer(N), is_integer(M), is_integer(Next),
is_integer(Min), is_integer(Max),
is_integer(NoObjects), is_integer(NoKeys),
NoObjects >= NoKeys ->
HashMethod = code_to_hash_method(HashMethodCode),
case hash_invars(N, M, Next, Min, Max) of
false ->
throw(badarg);
true ->
init_file(Fd, Tab, Fname, Type, Kp, Min, Max, Ram,
CacheSz, Auto, false, M, N, Next, HashMethod,
NoObjects, NoKeys)
end;
_ ->
throw(badarg)
end.
%% -> {ok, head()} | throw(Error)
%% The File header and the SegmentArray Pointers are written here.
%% SegmentArray Parts are also written, but the segments are are not
%% initialized on file unless DoInitSegments is 'true'. (When
%% initializing a file by calling init_table, some time is saved by
%% not writing the segments twice.)
initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots0, MaxSlots0,
Ram, CacheSz, Auto, DoInitSegments) ->
MaxSlots1 = erlang:min(MaxSlots0, ?MAXSLOTS),
MinSlots1 = erlang:min(MinSlots0, MaxSlots1),
MinSlots = slots2(MinSlots1),
MaxSlots = slots2(MaxSlots1),
M = Next = MinSlots,
N = 0,
init_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz,
Auto, DoInitSegments, M, N, Next, phash2, 0, 0).
init_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz,
Auto, DoInitSegments, M, N, Next, HashMethod, NoObjects, NoKeys) ->
Ftab = dets_utils:init_alloc(?BASE),
Head0 = #head{
m = M,
m2 = M * 2,
next = Next,
fptr = Fd,
no_objects = NoObjects,
no_keys = NoKeys,
maxobjsize = 0,
n = N,
type = Type,
update_mode = dirty,
freelists = Ftab,
no_collections = orddict:new(),
auto_save = Auto,
hash_bif = HashMethod,
has_md5 = true,
keypos = Kp,
min_no_slots = MinSlots,
max_no_slots = MaxSlots,
ram_file = Ram,
filename = Fname,
name = Tab,
cache = dets_utils:new_cache(CacheSz),
version = ?FILE_FORMAT_VERSION,
bump = ?BUMP,
base = ?BASE, % to be overwritten
mod = ?MODULE
},
FreeListsPointer = 0,
NoColls = <<0:?COLL_CNTRS/unit:8>>, %% Buddy system counters.
FileHeader = file_header(Head0, FreeListsPointer,
?NOT_PROPERLY_CLOSED, NoColls),
W0 = {0, [FileHeader |
<<0:(4*?SEGARRSZ)/unit:8>>]}, %% SegmentArray Pointers
%% Remove cached pointers to segment array parts and segments:
lists:foreach(fun({I1,I2}) when is_integer(I1), is_integer(I2) -> ok;
({K,V}) -> put(K, V)
end, erase()),
%% Initialize array parts.
%% All parts before segments, for the sake of repair and initialization.
Zero = seg_zero(),
{Head1, Ws1} = init_parts(Head0, 0, no_parts(Next), Zero, []),
NoSegs = no_segs(Next),
{Head2, WsI, WsP} = init_segments(Head1, 0, NoSegs, Zero, [], []),
Ws2 = if
DoInitSegments -> WsP ++ WsI;
true -> WsP
end,
dets_utils:pwrite(Fd, Fname, [W0 | lists:append(Ws1) ++ Ws2]),
true = hash_invars(Head2),
%% The allocations that have been made so far (parts, segments)
%% are permanent; the table will never shrink. Therefore the base
%% of the Buddy system can be set to the first free object.
%% This is used in allocate_all(), see below.
{_, Where, _} = dets_utils:alloc(Head2, ?BUMP),
NewFtab = dets_utils:init_alloc(Where),
Head = Head2#head{freelists = NewFtab, base = Where},
{ok, Head}.
%% Returns a power of two not less than 256.
slots2(NoSlots) when NoSlots >= 256 ->
?POW(dets_utils:log2(NoSlots)).
init_parts(Head, PartNo, NoParts, Zero, Ws) when PartNo < NoParts ->
PartPos = ?SEGARRADDR(PartNo),
{NewHead, W, _Part} = alloc_part(Head, Zero, PartPos),
init_parts(NewHead, PartNo+1, NoParts, Zero, [W | Ws]);
init_parts(Head, _PartNo, _NoParts, _Zero, Ws) ->
{Head, Ws}.
%% -> {Head, SegInitList, OtherList};
%% SegPtrList = SegInitList = pwrite_list().
init_segments(Head, SegNo, NoSegs, SegZero, WsP, WsI) when SegNo < NoSegs ->
{NewHead, WI, Ws} = allocate_segment(Head, SegZero, SegNo),
init_segments(NewHead, SegNo+1, NoSegs, SegZero, Ws ++ WsP, [WI | WsI]);
init_segments(Head, _SegNo, _NoSegs, _SegZero, WsP, WsI) ->
{Head, WsI, WsP}.
%% -> {NewHead, SegInit, [SegPtr | PartStuff]}
allocate_segment(Head, SegZero, SegNo) ->
PartPos = ?SEGARRADDR(SegNo div ?SEGPARTSZ),
case get_arrpart(PartPos) of
undefined ->
%% may throw error:
{Head1, [InitArrPart, ArrPartPointer], Part} =
alloc_part(Head, SegZero, PartPos),
{NewHead, InitSegment, [SegPointer]} =
alloc_seg(Head1, SegZero, SegNo, Part),
{NewHead, InitSegment, [InitArrPart, SegPointer, ArrPartPointer]};
Part ->
alloc_seg(Head, SegZero, SegNo, Part)
end.
alloc_part(Head, PartZero, PartPos) ->
%% may throw error:
{NewHead, Part, _} = dets_utils:alloc(Head, adjsz(4 * ?SEGPARTSZ)),
arrpart_cache(PartPos, Part),
InitArrPart = {Part, PartZero}, % same size as segment
ArrPartPointer = {PartPos, <<Part:32>>},
{NewHead, [InitArrPart, ArrPartPointer], Part}.
alloc_seg(Head, SegZero, SegNo, Part) ->
%% may throw error:
{NewHead, Segment, _} = dets_utils:alloc(Head, adjsz(4 * ?SEGSZ)),
InitSegment = {Segment, SegZero},
Pos = ?SEGPARTADDR(Part, SegNo),
segp_cache(Pos, Segment),
dets_utils:disk_map_segment(Segment, SegZero),
SegPointer = {Pos, <<Segment:32>>},
{NewHead, InitSegment, [SegPointer]}.
%% Read free lists (using a Buddy System) from file.
init_freelist(Head, true) ->
Pos = Head#head.freelists_p,
free_lists_from_file(Head, Pos).
%% -> {ok, Fd, fileheader()} | throw(Error)
read_file_header(Fd, FileName) ->
{ok, Bin} = dets_utils:pread_close(Fd, FileName, 0, ?HEADSZ),
<<FreeList:32, Cookie:32, CP:32, Type2:32,
Version:32, M:32, Next:32, Kp:32,
NoObjects:32, NoKeys:32, MinNoSlots:32, MaxNoSlots:32,
HashMethod:32, N:32, NoCollsB:?COLL_CNTRS/binary,
MD5:?MD5SZ/binary, FlBase:32>> = Bin,
<<_:12/binary,MD5DigestedPart:(?HEADSZ-?MD5SZ-?FL_BASE-12)/binary,
_/binary>> = Bin,
{ok, EOF} = dets_utils:position_close(Fd, FileName, eof),
{ok, <<FileSize:32>>} = dets_utils:pread_close(Fd, FileName, EOF-4, 4),
{CL, <<>>} = lists:foldl(fun(LSz, {Acc,<<NN:32,R/binary>>}) ->
if
NN =:= 0 -> {Acc, R};
true -> {[{LSz,NN} | Acc], R}
end
end, {[], NoCollsB}, lists:seq(4, ?MAXBUD-1)),
NoColls =
if
CL =:= [], NoObjects > 0 -> % Version 9(a)
undefined;
true ->
lists:reverse(CL)
end,
Base = case FlBase of
0 -> ?BASE;
_ -> FlBase
end,
FH = #fileheader{freelist = FreeList,
fl_base = Base,
cookie = Cookie,
closed_properly = CP,
type = dets_utils:code_to_type(Type2),
version = Version,
m = M,
next = Next,
keypos = Kp,
no_objects = NoObjects,
no_keys = NoKeys,
min_no_slots = MinNoSlots,
max_no_slots = MaxNoSlots,
no_colls = NoColls,
hash_method = HashMethod,
read_md5 = MD5,
has_md5 = <<0:?MD5SZ/unit:8>> =/= MD5,
md5 = erlang:md5(MD5DigestedPart),
trailer = FileSize + FlBase,
eof = EOF,
n = N,
mod = ?MODULE},
{ok, Fd, FH}.
%% -> {ok, head(), ExtraInfo} | {error, Reason} (Reason lacking file name)
%% ExtraInfo = true
check_file_header(FH, Fd) ->
HashBif = code_to_hash_method(FH#fileheader.hash_method),
Test =
if
FH#fileheader.cookie =/= ?MAGIC ->
{error, not_a_dets_file};
FH#fileheader.type =:= badtype ->
{error, invalid_type_code};
FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
{error, bad_version};
FH#fileheader.has_md5,
FH#fileheader.read_md5 =/= FH#fileheader.md5 ->
{error, not_a_dets_file}; % harsh but fair
FH#fileheader.trailer =/= FH#fileheader.eof ->
{error, not_closed};
HashBif =:= undefined ->
{error, bad_hash_bif};
FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY ->
{ok, true};
FH#fileheader.closed_properly =:= ?NOT_PROPERLY_CLOSED ->
{error, not_closed};
true ->
{error, not_a_dets_file}
end,
case Test of
{ok, ExtraInfo} ->
MaxObjSize = max_objsize(FH#fileheader.no_colls),
H = #head{
m = FH#fileheader.m,
m2 = FH#fileheader.m * 2,
next = FH#fileheader.next,
fptr = Fd,
no_objects = FH#fileheader.no_objects,
no_keys = FH#fileheader.no_keys,
maxobjsize = MaxObjSize,
n = FH#fileheader.n,
type = FH#fileheader.type,
update_mode = saved,
auto_save = infinity, % not saved on file
fixed = false, % not saved on file
freelists_p = FH#fileheader.freelist,
hash_bif = HashBif,
has_md5 = FH#fileheader.has_md5,
keypos = FH#fileheader.keypos,
min_no_slots = FH#fileheader.min_no_slots,
max_no_slots = FH#fileheader.max_no_slots,
no_collections = FH#fileheader.no_colls,
version = ?FILE_FORMAT_VERSION,
mod = ?MODULE,
bump = ?BUMP,
base = FH#fileheader.fl_base},
{ok, H, ExtraInfo};
Error ->
Error
end.
%% Inlined.
max_objsize(NoColls = undefined) ->
NoColls;
max_objsize(NoColls) ->
max_objsize(NoColls, 0).
max_objsize([], Max) ->
Max;
max_objsize([{_,0} | L], Max) ->
max_objsize(L, Max);
max_objsize([{I,_} | L], _Max) ->
max_objsize(L, I).
cache_segps(Fd, FileName, M) ->
NoParts = no_parts(M),
ArrStart = ?SEGARRADDR(0),
{ok, Bin} = dets_utils:pread_close(Fd, FileName, ArrStart, 4 * NoParts),
cache_arrparts(Bin, ?HEADEND, Fd, FileName).
cache_arrparts(<<ArrPartPos:32, B/binary>>, Pos, Fd, FileName) ->
arrpart_cache(Pos, ArrPartPos),
{ok, ArrPartBin} = dets_utils:pread_close(Fd, FileName,
ArrPartPos,
?SEGPARTSZ*4),
cache_segps1(Fd, ArrPartBin, ArrPartPos),
cache_arrparts(B, Pos+4, Fd, FileName);
cache_arrparts(<<>>, _Pos, _Fd, _FileName) ->
ok.
cache_segps1(_Fd, <<0:32,_/binary>>, _P) ->
ok;
cache_segps1(Fd, <<S:32,B/binary>>, P) ->
dets_utils:disk_map_segment_p(Fd, S),
segp_cache(P, S),
cache_segps1(Fd, B, P+4);
cache_segps1(_Fd, <<>>, _P) ->
ok.
no_parts(NoSlots) ->
((NoSlots - 1) div (?SEGSZP * ?SEGPARTSZ)) + 1.
no_segs(NoSlots) ->
((NoSlots - 1) div ?SEGSZP) + 1.
%%%
%%% Repair, conversion and initialization of a dets file.
%%%
%%% bulk_input/3. Initialization, the general case (any stream of objects).
%%% output_objs/4. Initialization (general case) and repair.
%%% bchunk_init/2. Initialization using bchunk.
bulk_input(Head, InitFun, _Cntrs) ->
bulk_input(Head, InitFun, make_ref(), 0).
bulk_input(Head, InitFun, Ref, Seq) ->
fun(close) ->
_ = (catch InitFun(close));
(read) ->
case catch {Ref, InitFun(read)} of
{Ref, end_of_input} ->
end_of_input;
{Ref, {L0, NewInitFun}} when is_list(L0),
is_function(NewInitFun) ->
Kp = Head#head.keypos,
case catch bulk_objects(L0, Head, Kp, Seq, []) of
{'EXIT', _Error} ->
_ = (catch NewInitFun(close)),
{error, invalid_objects_list};
{L, NSeq} ->
{L, bulk_input(Head, NewInitFun, Ref, NSeq)}
end;
{Ref, Value} ->
{error, {init_fun, Value}};
Error ->
throw({thrown, Error})
end
end.
bulk_objects([T | Ts], Head, Kp, Seq, L) ->
BT = term_to_binary(T),
Key = element(Kp, T),
bulk_objects(Ts, Head, Kp, Seq+1, [make_object(Head, Key, Seq, BT) | L]);
bulk_objects([], _Head, Kp, Seq, L) when is_integer(Kp), is_integer(Seq) ->
{L, Seq}.
-define(FSCK_SEGMENT, 1).
-define(FSCK_SEGMENT2, 10000).
-define(VEMPTY, {}).
-define(VSET(I, V, E), setelement(I, V, E)).
-define(VGET(I, V), element(I, V)).
-define(VEXT(S, V, T),
list_to_tuple(tuple_to_list(V) ++ lists:duplicate(S-tuple_size(V), T))).
%% Number of bytes that will be handled before the cache is written to
%% file. Used when compacting or writing chunks.
-define(CACHE_SIZE, (60*?CHUNK_SIZE)).
%% {LogSize,NoObjects} in Cntrs is replaced by
%% {LogSize,Position,{FileName,FileDescriptor},NoCollections}.
%% There is also an object {no, NoObjects, NoKeys}.
-define(COUNTERS, no).
-define(OBJ_COUNTER, 2).
-define(KEY_COUNTER, 3).
output_objs(OldV, Head, SlotNums, Cntrs) when OldV =< 9 ->
fun(close) ->
%% Make sure that the segments are initialized in case
%% init_table has been called.
Cache = ?VEMPTY,
Acc = [], % This is the only way Acc can be empty.
true = ets:insert(Cntrs, {?FSCK_SEGMENT,0,[],0}),
true = ets:insert(Cntrs, {?COUNTERS, 0, 0}),
Fun = output_objs2(foo, Acc, OldV, Head, Cache, Cntrs,
SlotNums, bar),
Fun(close);
([]) ->
output_objs(OldV, Head, SlotNums, Cntrs);
(L) ->
%% Information about number of objects per size is not
%% relevant for version 9. It is the number of collections
%% that matters.
true = ets:delete_all_objects(Cntrs),
true = ets:insert(Cntrs, {?COUNTERS, 0, 0}),
Es = bin2term(L, OldV, Head#head.keypos),
%% The cache is a tuple indexed by the (log) size. An element
%% is [BinaryObject].
Cache = ?VEMPTY,
{NE, NAcc, NCache} = output_slots(Es, Head, Cache, Cntrs, 0, 0),
output_objs2(NE, NAcc, OldV, Head, NCache, Cntrs, SlotNums, 1)
end.
output_objs2(E, Acc, OldV, Head, Cache, SizeT, SlotNums, 0) ->
NCache = write_all_sizes(Cache, SizeT, Head, more),
%% Number of handled file_sorter chunks before writing:
Max = erlang:max(1, erlang:min(tuple_size(NCache), 10)),
output_objs2(E, Acc, OldV, Head, NCache, SizeT, SlotNums, Max);
output_objs2(E, Acc, OldV, Head, Cache, SizeT, SlotNums, ChunkI) ->
fun(close) ->
{_, [], Cache1} =
if
Acc =:= [] -> {foo, [], Cache};
true -> output_slot(Acc, Head, Cache, [], SizeT, 0, 0)
end,
_NCache = write_all_sizes(Cache1, SizeT, Head, no_more),
SegSz = ?ACTUAL_SEG_SIZE,
{_, SegEnd, _} = dets_utils:alloc(Head, adjsz(SegSz)),
[{?COUNTERS,NoObjects,NoKeys}] = ets:lookup(SizeT, ?COUNTERS),
Head1 = Head#head{no_objects = NoObjects, no_keys = NoKeys},
true = ets:delete(SizeT, ?COUNTERS),
{NewHead, NL, _MaxSz, _End} = allocate_all_objects(Head1, SizeT),
%% It is not known until all objects have been collected
%% how many object collections there are per size. Now
%% that is known and the absolute positions of the object
%% collections can be calculated.
segment_file(SizeT, NewHead, NL, SegEnd),
{MinSlots, EstNoSlots, MaxSlots} = SlotNums,
if
EstNoSlots =:= bulk_init ->
{ok, 0, NewHead};
true ->
EstNoSegs = no_segs(EstNoSlots),
MinNoSegs = no_segs(MinSlots),
MaxNoSegs = no_segs(MaxSlots),
NoSegs = no_segs(NoKeys),
Diff = abs(NoSegs - EstNoSegs),
if
Diff > 5, NoSegs =< MaxNoSegs, NoSegs >= MinNoSegs ->
{try_again, NoKeys};
true ->
{ok, 0, NewHead}
end
end;
(L) ->
Es = bin2term(L, OldV, Head#head.keypos),
{NE, NAcc, NCache} =
output_slots(E, Es, Acc, Head, Cache, SizeT, 0, 0),
output_objs2(NE, NAcc, OldV, Head, NCache, SizeT, SlotNums,
ChunkI-1)
end.
%%% Compaction.
compact_init(ReadHead, WriteHead, TableParameters) ->
SizeT = ets:new(dets_compact, []),
#head{no_keys = NoKeys, no_objects = NoObjects} = ReadHead,
NoObjsPerSize = TableParameters#?HASH_PARMS.no_colls,
{NewWriteHead, Bases, SegAddr, SegEnd} =
prepare_file_init(NoObjects, NoKeys, NoObjsPerSize, SizeT, WriteHead),
Input = compact_input(ReadHead, NewWriteHead, SizeT, tuple_size(Bases)),
Output = fast_output(NewWriteHead, SizeT, Bases, SegAddr, SegEnd),
TmpDir = filename:dirname(NewWriteHead#head.filename),
Reply = (catch file_sorter:sort(Input, Output,
[{format, binary},{tmpdir, TmpDir},
{header, 1}])), % compact_objs/9: 13 bytes
ets:delete(SizeT),
Reply.
compact_input(Head, WHead, SizeT, NoSizes) ->
L = dets_utils:all_allocated_as_list(Head),
Cache = ?VEXT(NoSizes, ?VEMPTY, [0 | []]),
compact_input(Head, WHead, SizeT, Cache, L).
compact_input(Head, WHead, SizeT, Cache, L) ->
fun(close) ->
ok;
(read) ->
compact_read(Head, WHead, SizeT, Cache, L, 0, [], 0)
end.
compact_read(_Head, WHead, SizeT, Cache, [], _Min, [], _ASz) ->
_ = fast_write_all_sizes(Cache, SizeT, WHead),
end_of_input;
compact_read(Head, WHead, SizeT, Cache, L, Min, SegBs, ASz)
when ASz + Min >= ?CACHE_SIZE, ASz > 0 ->
NCache = fast_write_all_sizes(Cache, SizeT, WHead),
{SegBs, compact_input(Head, WHead, SizeT, NCache, L)};
compact_read(Head, WHead, SizeT, Cache, [[From | To] | L], Min, SegBs, ASz) ->
Max = erlang:max(?CHUNK_SIZE*3, Min),
case check_pread_arg(Max, Head) of
true ->
case dets_utils:pread_n(Head#head.fptr, From, Max) of
eof ->
%% Should never happen since compaction will not
%% be tried unless the file trailer is valid.
not_ok; % try a proper repair
Bin1 when byte_size(Bin1) < Min ->
%% The last object may not be padded.
Pad = Min - byte_size(Bin1),
NewBin = <<Bin1/binary, 0:Pad/unit:8>>,
compact_objs(Head, WHead, SizeT, NewBin, L,
From, To, SegBs, Cache, ASz);
NewBin ->
compact_objs(Head, WHead, SizeT, NewBin, L,
From, To, SegBs, Cache, ASz)
end;
false ->
not_ok % try a proper repair
end.
compact_objs(Head, WHead, SizeT, Bin, L, From, To, SegBs, Cache, ASz)
when From =:= To ->
case L of
[] ->
{SegBs, compact_input(Head, WHead, SizeT, Cache, L)};
[[From1 | To1] | L1] ->
Skip1 = From1 - From,
case Bin of
<<_:Skip1/binary,NewBin/binary>> ->
compact_objs(Head, WHead, SizeT, NewBin, L1, From1, To1,
SegBs, Cache, ASz);
_ when byte_size(Bin) < Skip1 ->
compact_read(Head, WHead, SizeT, Cache, L, 0, SegBs, ASz)
end
end;
compact_objs(Head, WHead, SizeT, <<Size:32, St:32, _Sz:32, KO/binary>> = Bin,
L, From, To, SegBs, Cache, ASz) when St =:= ?ACTIVE ->
LSize = sz2pos(Size),
Size2 = ?POW(LSize-1),
if
byte_size(Bin) >= Size2 ->
NASz = ASz + Size2,
<<SlotObjs:Size2/binary, NewBin/binary>> = Bin,
Term = if
Head#head.type =:= set ->
binary_to_term(KO);
true ->
<<_KSz:32,B2/binary>> = KO,
binary_to_term(B2)
end,
Key = element(Head#head.keypos, Term),
Slot = db_hash(Key, Head),
From1 = From + Size2,
[Addr | AL] = ?VGET(LSize, Cache),
NCache = ?VSET(LSize, Cache, [Addr + Size2 | [SlotObjs | AL]]),
NSegBs = [<<Slot:32,Size:32,Addr:32,LSize:8>> | SegBs],
compact_objs(Head, WHead, SizeT, NewBin, L, From1,
To, NSegBs, NCache, NASz);
true ->
compact_read(Head, WHead, SizeT, Cache, [[From|To] | L],
Size2, SegBs, ASz)
end;
compact_objs(Head, WHead, SizeT, <<_:32, _St:32, _:32, _/binary>> = Bin,
L, From, To, SegBs, Cache, ASz)
when byte_size(Bin) >= ?ACTUAL_SEG_SIZE -> % , _St =/= ?ACTIVE
<<_:?ACTUAL_SEG_SIZE/binary, NewBin/binary>> = Bin,
compact_objs(Head, WHead, SizeT, NewBin, L, From + ?ACTUAL_SEG_SIZE,
To, SegBs, Cache, ASz);
compact_objs(Head, WHead, SizeT, <<_:32, _St:32, _:32, _/binary>> = Bin,
L, From, To, SegBs, Cache, ASz)
when byte_size(Bin) < ?ACTUAL_SEG_SIZE -> % , _St =/= ?ACTIVE
compact_read(Head, WHead, SizeT, Cache, [[From|To] | L],
?ACTUAL_SEG_SIZE, SegBs, ASz);
compact_objs(Head, WHead, SizeT, _Bin, L, From, To, SegBs, Cache, ASz) ->
compact_read(Head, WHead, SizeT, Cache, [[From|To] | L], 0, SegBs, ASz).
%%% End compaction.
%%% Bchunk.
read_bchunks(Head, L) ->
read_bchunks(Head, L, 0, [], 0).
read_bchunks(_Head, L, Min, Bs, ASz) when ASz + Min >= 4*?CHUNK_SIZE,
Bs =/= [] ->
{lists:reverse(Bs), L};
read_bchunks(Head, {From, To, L}, Min, Bs, ASz) ->
Max = erlang:max(?CHUNK_SIZE*2, Min),
case check_pread_arg(Max, Head) of
true ->
case dets_utils:pread_n(Head#head.fptr, From, Max) of
eof ->
%% Should never happen.
{error, premature_eof};
NewBin when byte_size(NewBin) >= Min ->
bchunks(Head, L, NewBin, Bs, ASz, From, To);
Bin1 when To - From =:= Min, L =:= <<>> ->
%% when byte_size(Bin1) < Min.
%% The last object may not be padded.
Pad = Min - byte_size(Bin1),
NewBin = <<Bin1/binary, 0:Pad/unit:8>>,
bchunks(Head, L, NewBin, Bs, ASz, From, To);
_ ->
{error, premature_eof}
end;
false ->
{error, dets_utils:bad_object(bad_object, {read_bchunks, Max})}
end.
bchunks(Head, L, Bin, Bs, ASz, From, To) when From =:= To ->
if
L =:= <<>> ->
{finished, lists:reverse(Bs)};
true ->
<<From1:32, To1:32, L1/binary>> = L,
Skip1 = From1 - From,
case Bin of
<<_:Skip1/binary,NewBin/binary>> ->
bchunks(Head, L1, NewBin, Bs, ASz, From1, To1);
_ when byte_size(Bin) < Skip1 ->
read_bchunks(Head, {From1,To1,L1}, 0, Bs, ASz)
end
end;
bchunks(Head, L, <<Size:32, St:32, _Sz:32, KO/binary>> = Bin, Bs, ASz,
From, To) when St =:= ?ACTIVE; St =:= ?FREE ->
LSize = sz2pos(Size),
Size2 = ?POW(LSize-1),
if
byte_size(Bin) >= Size2 ->
<<B0:Size2/binary, NewBin/binary>> = Bin,
%% LSize and Slot are used in make_slots/6. The reason to
%% calculate Slot here is to reduce the CPU load in
%% make_slots/6.
Term = if
Head#head.type =:= set ->
binary_to_term(KO);
true ->
<<_KSz:32,B2/binary>> = KO,
binary_to_term(B2)
end,
Key = element(Head#head.keypos, Term),
Slot = db_hash(Key, Head),
B = {LSize,Slot,B0},
bchunks(Head, L, NewBin, [B | Bs], ASz + Size2, From+Size2, To);
true ->
read_bchunks(Head, {From, To, L}, Size2, Bs, ASz)
end;
bchunks(Head, L, <<_:32, _St:32, _:32, _/binary>> = Bin, Bs, ASz, From, To)
when byte_size(Bin) >= ?ACTUAL_SEG_SIZE ->
<<_:?ACTUAL_SEG_SIZE/binary, NewBin/binary>> = Bin,
bchunks(Head, L, NewBin, Bs, ASz, From + ?ACTUAL_SEG_SIZE, To);
bchunks(Head, L, <<_:32, _St:32, _:32, _/binary>> = Bin, Bs, ASz, From, To)
when byte_size(Bin) < ?ACTUAL_SEG_SIZE ->
read_bchunks(Head, {From, To, L}, ?ACTUAL_SEG_SIZE, Bs, ASz);
bchunks(Head, L, _Bin, Bs, ASz, From, To) ->
read_bchunks(Head, {From, To, L}, 0, Bs, ASz).
%%% End bchunk.
%% -> {ok, NewHead} | throw(Error) | Error
bchunk_init(Head, InitFun) ->
Ref = make_ref(),
%% The non-empty list of data begins with the table parameters.
case catch {Ref, InitFun(read)} of
{Ref, end_of_input} ->
{error, {init_fun, end_of_input}};
{Ref, {[], NInitFun}} when is_function(NInitFun) ->
bchunk_init(Head, NInitFun);
{Ref, {[ParmsBin | L], NInitFun}}
when is_list(L), is_function(NInitFun) ->
#head{fptr = Fd, type = Type, keypos = Kp,
auto_save = Auto, cache = Cache,
filename = Fname, ram_file = Ram,
name = Tab} = Head,
case try_bchunk_header(ParmsBin, Head) of
{ok, Parms} ->
#?HASH_PARMS{no_objects = NoObjects,
no_keys = NoKeys,
no_colls = NoObjsPerSize} = Parms,
CacheSz = dets_utils:cache_size(Cache),
{ok, Head1} =
prep_table_copy(Fd, Tab, Fname, Type,
Kp, Ram, CacheSz,
Auto, Parms),
SizeT = ets:new(dets_init, []),
{NewHead, Bases, SegAddr, SegEnd} =
prepare_file_init(NoObjects, NoKeys,
NoObjsPerSize, SizeT, Head1),
ECache = ?VEXT(tuple_size(Bases), ?VEMPTY, [0 | []]),
Input =
fun(close) ->
_ = (catch NInitFun(close));
(read) ->
do_make_slots(L, ECache, SizeT, NewHead, Ref,
0, NInitFun)
end,
Output = fast_output(NewHead, SizeT, Bases, SegAddr,SegEnd),
TmpDir = filename:dirname(Head#head.filename),
Reply = (catch file_sorter:sort(Input, Output,
[{format, binary},
{tmpdir, TmpDir},
{header, 1}])),
ets:delete(SizeT),
Reply;
not_ok ->
{error, {init_fun, ParmsBin}}
end;
{Ref, Value} ->
{error, {init_fun, Value}};
Error ->
{thrown, Error}
end.
try_bchunk_header(ParmsBin, Head) ->
#head{type = Type, keypos = Kp, hash_bif = HashBif} = Head,
HashMethod = hash_method_to_code(HashBif),
case catch binary_to_term(ParmsBin) of
Parms when is_record(Parms, ?HASH_PARMS),
Parms#?HASH_PARMS.type =:= Type,
Parms#?HASH_PARMS.keypos =:= Kp,
Parms#?HASH_PARMS.hash_method =:= HashMethod,
Parms#?HASH_PARMS.bchunk_format_version =:=
?BCHUNK_FORMAT_VERSION ->
{ok, Parms};
_ ->
not_ok
end.
bchunk_input(InitFun, SizeT, Head, Ref, Cache, ASz) ->
fun(close) ->
_ = (catch InitFun(close));
(read) ->
case catch {Ref, InitFun(read)} of
{Ref, end_of_input} ->
_ = fast_write_all_sizes(Cache, SizeT, Head),
end_of_input;
{Ref, {L, NInitFun}} when is_list(L), is_function(NInitFun) ->
do_make_slots(L, Cache, SizeT, Head, Ref, ASz,
NInitFun);
{Ref, Value} ->
{error, {init_fun, Value}};
Error ->
throw({thrown, Error})
end
end.
do_make_slots(L, Cache, SizeT, Head, Ref, ASz, InitFun) ->
case catch make_slots(L, Cache, [], ASz) of
{'EXIT', _} ->
_ = (catch InitFun(close)),
{error, invalid_objects_list};
{Cache1, SegBs, NASz} when NASz > ?CACHE_SIZE ->
NCache = fast_write_all_sizes(Cache1, SizeT, Head),
F = bchunk_input(InitFun, SizeT, Head, Ref, NCache, 0),
{SegBs, F};
{NCache, SegBs, NASz} ->
F = bchunk_input(InitFun, SizeT, Head, Ref, NCache, NASz),
{SegBs, F}
end.
make_slots([{LSize,Slot,<<Size:32, St:32, Sz:32, KO/binary>> = Bin0} | Bins],
Cache, SegBs, ASz) ->
Bin = if
St =:= ?ACTIVE ->
Bin0;
St =:= ?FREE ->
<<Size:32,?ACTIVE:32,Sz:32,KO/binary>>
end,
BSz = byte_size(Bin0),
true = (BSz =:= ?POW(LSize-1)),
NASz = ASz + BSz,
[Addr | L] = ?VGET(LSize, Cache),
NSegBs = [<<Slot:32,Size:32,Addr:32,LSize:8>> | SegBs],
NCache = ?VSET(LSize, Cache, [Addr + BSz | [Bin | L]]),
make_slots(Bins, NCache, NSegBs, NASz);
make_slots([], Cache, SegBs, ASz) ->
{Cache, SegBs, ASz}.
fast_output(Head, SizeT, Bases, SegAddr, SegEnd) ->
fun(close) ->
fast_output_end(Head, SizeT);
(L) ->
case file:position(Head#head.fptr, SegAddr) of
{ok, SegAddr} ->
NewSegAddr = write_segment_file(L, Bases, Head, [],
SegAddr, SegAddr),
fast_output2(Head, SizeT, Bases, NewSegAddr,
SegAddr, SegEnd);
Error ->
catch dets_utils:file_error(Error, Head#head.filename)
end
end.
fast_output2(Head, SizeT, Bases, SegAddr, SS, SegEnd) ->
fun(close) ->
FinalZ = SegEnd - SegAddr,
dets_utils:write(Head, dets_utils:make_zeros(FinalZ)),
fast_output_end(Head, SizeT);
(L) ->
NewSegAddr = write_segment_file(L, Bases, Head, [], SegAddr, SS),
fast_output2(Head, SizeT, Bases, NewSegAddr, SS, SegEnd)
end.
fast_output_end(Head, SizeT) ->
case ets:foldl(fun({_Sz,_Pos,Cnt,NoC}, Acc) -> (Cnt =:= NoC) and Acc end,
true, SizeT) of
true -> {ok, Head};
false -> {error, invalid_objects_list}
end.
%% Inlined.
write_segment_file([<<Slot:32,BSize:32,AddrToBe:32,LSize:8>> | Bins],
Bases, Head, Ws, SegAddr, SS) ->
%% Should call slot_position/1, but since all segments are
%% allocated in a sequence, the position of a slot can be
%% calculated faster.
Pos = SS + ?SZOBJP*4 * Slot, % Same as Pos = slot_position(Slot).
write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos,
BSize, AddrToBe, LSize);
write_segment_file([], _Bases, Head, Ws, SegAddr, _SS) ->
dets_utils:write(Head, Ws),
SegAddr.
write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
AddrToBe, LSize) when Pos =:= SegAddr ->
Addr = AddrToBe + element(LSize, Bases),
NWs = [Ws | <<BSize:32,Addr:32>>],
write_segment_file(Bins, Bases, Head, NWs, SegAddr + ?SZOBJP*4, SS);
write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
AddrToBe, LSize) when Pos - SegAddr < 100 ->
Addr = AddrToBe + element(LSize, Bases),
NoZeros = Pos - SegAddr,
NWs = [Ws | <<0:NoZeros/unit:8,BSize:32,Addr:32>>],
NSegAddr = SegAddr + NoZeros + ?SZOBJP*4,
write_segment_file(Bins, Bases, Head, NWs, NSegAddr, SS);
write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
AddrToBe, LSize) ->
Addr = AddrToBe + element(LSize, Bases),
NoZeros = Pos - SegAddr,
NWs = [Ws, dets_utils:make_zeros(NoZeros) | <<BSize:32,Addr:32>>],
NSegAddr = SegAddr + NoZeros + ?SZOBJP*4,
write_segment_file(Bins, Bases, Head, NWs, NSegAddr, SS).
fast_write_all_sizes(Cache, SizeT, Head) ->
CacheL = lists:reverse(tuple_to_list(Cache)),
fast_write_sizes(CacheL, tuple_size(Cache), SizeT, Head, [], []).
fast_write_sizes([], _Sz, _SizeT, Head, NCL, PwriteList) ->
#head{filename = FileName, fptr = Fd} = Head,
ok = dets_utils:pwrite(Fd, FileName, PwriteList),
list_to_tuple(NCL);
fast_write_sizes([[_Addr] = C | CL], Sz, SizeT, Head, NCL, PwriteList) ->
fast_write_sizes(CL, Sz-1, SizeT, Head, [C | NCL], PwriteList);
fast_write_sizes([[Addr | C] | CL], Sz, SizeT, Head, NCL, PwriteList) ->
case ets:lookup(SizeT, Sz) of
[] ->
throw({error, invalid_objects_list});
[{Sz,Position,_ObjCounter,_NoCollections}] ->
%% Update ObjCounter:
NoColls = length(C),
_ = ets:update_counter(SizeT, Sz, {3, NoColls}),
Pos = Position + Addr - NoColls*?POW(Sz-1),
fast_write_sizes(CL, Sz-1, SizeT, Head, [[Addr] | NCL],
[{Pos,lists:reverse(C)} | PwriteList])
end.
prepare_file_init(NoObjects, NoKeys, NoObjsPerSize, SizeT, Head) ->
SegSz = ?ACTUAL_SEG_SIZE,
{_, SegEnd, _} = dets_utils:alloc(Head, adjsz(SegSz)),
Head1 = Head#head{no_objects = NoObjects, no_keys = NoKeys},
true = ets:insert(SizeT, {?FSCK_SEGMENT,0,[],0}),
lists:foreach(fun({LogSz,NoColls}) ->
true = ets:insert(SizeT, {LogSz+1,0,0,NoColls})
end, NoObjsPerSize),
{NewHead, NL0, MaxSz, EndOfFile} = allocate_all_objects(Head1, SizeT),
[{?FSCK_SEGMENT,SegAddr,[],0} | NL] = NL0,
true = ets:delete_all_objects(SizeT),
lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end, NL),
Bases = lists:foldl(fun({LSz,P,_D,_N}, A) -> setelement(LSz,A,P) end,
erlang:make_tuple(MaxSz, 0), NL),
Est = lists:foldl(fun({LSz,_,_,N}, A) -> A + ?POW(LSz-1)*N end, 0, NL),
ok = write_bytes(NewHead, EndOfFile, Est),
{NewHead, Bases, SegAddr, SegEnd}.
%% Writes "zeros" to the file. This ensures that the file blocks are
%% allocated more or less contiguously, which reduces the seek times
%% to a minimum when the file is later read serially from beginning to
%% end (as is done when calling select and the like). A well-formed
%% file will be created also if nothing is written (as is the case for
%% small files, for efficiency).
write_bytes(_Head, _EndOfFile, Est) when Est < ?CACHE_SIZE ->
ok;
write_bytes(Head, EndOfFile, _Est) ->
Fd = Head#head.fptr,
{ok, Start} = file:position(Fd, eof),
BytesToWrite = EndOfFile - Start,
SizeInKB = 64,
Bin = list_to_binary(lists:duplicate(SizeInKB * 4, lists:seq(0, 255))),
write_loop(Head, BytesToWrite, Bin).
write_loop(Head, BytesToWrite, Bin) when BytesToWrite >= byte_size(Bin) ->
case file:write(Head#head.fptr, Bin) of
ok -> write_loop(Head, BytesToWrite - byte_size(Bin), Bin);
Error -> dets_utils:file_error(Error, Head#head.filename)
end;
write_loop(_Head, 0, _Bin) ->
ok;
write_loop(Head, BytesToWrite, Bin) ->
<<SmallBin:BytesToWrite/binary,_/binary>> = Bin,
write_loop(Head, BytesToWrite, SmallBin).
%% By allocating bigger objects before smaller ones, holes in the
%% buddy system memory map are avoided.
allocate_all_objects(Head, SizeT) ->
DTL = lists:reverse(lists:keysort(1, ets:tab2list(SizeT))),
MaxSz = element(1, hd(DTL)),
{Head1, NL} = allocate_all(Head, DTL, []),
%% Find the position that will be the end of the file by allocating
%% a minimal object.
{_Head, EndOfFile, _} = dets_utils:alloc(Head1, ?BUMP),
NewHead = Head1#head{maxobjsize = max_objsize(Head1#head.no_collections)},
{NewHead, NL, MaxSz, EndOfFile}.
%% One (temporary) file for each buddy size, write all objects of that
%% size to the file.
%%
%% Before R15 a "hole" was needed before the first bucket if the size
%% of the biggest bucket was greater than the size of a segment. The
%% hole proved to be a problem with almost full tables with huge
%% buckets. Since R15 the hole is no longer needed due to the fact
%% that the base of the Buddy system is flexible.
allocate_all(Head, [{?FSCK_SEGMENT,_,Data,_}], L) ->
%% And one file for the segments...
%% Note that space for the array parts and the segments has
%% already been allocated, but the segments have not been
%% initialized on disk.
NoParts = no_parts(Head#head.next),
%% All parts first, ensured by init_segments/6.
Addr = ?BASE + NoParts * 4 * ?SEGPARTSZ,
{Head, [{?FSCK_SEGMENT,Addr,Data,0} | L]};
allocate_all(Head, [{LSize,_,Data,NoCollections} | DTL], L) ->
Size = ?POW(LSize-1),
{_Head, Addr, _} = dets_utils:alloc(Head, adjsz(Size)),
Head1 = dets_utils:alloc_many(Head, Size, NoCollections, Addr),
NoColls = Head1#head.no_collections,
NewNoColls = orddict:update_counter(LSize-1, NoCollections, NoColls),
NewHead = Head1#head{no_collections = NewNoColls},
E = {LSize,Addr,Data,NoCollections},
allocate_all(NewHead, DTL, [E | L]).
bin2term(Bin, 9, Kp) ->
bin2term1(Bin, Kp, []);
bin2term(Bin, 8, Kp) ->
bin2term_v8(Bin, Kp, []).
bin2term1([<<Slot:32, Seq:32, BinTerm/binary>> | BTs], Kp, L) ->
Term = binary_to_term(BinTerm),
Key = element(Kp, Term),
bin2term1(BTs, Kp, [{Slot, Key, Seq, Term, BinTerm} | L]);
bin2term1([], _Kp, L) ->
lists:reverse(L).
bin2term_v8([<<Slot:32, BinTerm/binary>> | BTs], Kp, L) ->
Term = binary_to_term(BinTerm),
Key = element(Kp, Term),
bin2term_v8(BTs, Kp, [{Slot, Key, foo, Term, BinTerm} | L]);
bin2term_v8([], _Kp, L) ->
lists:reverse(L).
write_all_sizes({}=Cache, _SizeT, _Head, _More) ->
Cache;
write_all_sizes(Cache, SizeT, Head, More) ->
CacheL = lists:reverse(tuple_to_list(Cache)),
Sz = length(CacheL),
NCL = case ets:info(SizeT, size) of
1 when More =:= no_more -> % COUNTERS only...
all_sizes(CacheL, Sz, SizeT);
_ ->
write_sizes(CacheL, Sz, SizeT, Head)
end,
list_to_tuple(NCL).
all_sizes([]=CL, _Sz, _SizeT) ->
CL;
all_sizes([[]=C | CL], Sz, SizeT) ->
[C | all_sizes(CL, Sz-1, SizeT)];
all_sizes([C0 | CL], Sz, SizeT) ->
C = lists:reverse(C0),
NoCollections = length(C),
true = ets:insert(SizeT, {Sz,0,C,NoCollections}),
[[] | all_sizes(CL, Sz-1, SizeT)].
write_sizes([]=CL, _Sz, _SizeT, _Head) ->
CL;
write_sizes([[]=C | CL], Sz, SizeT, Head) ->
[C | write_sizes(CL, Sz-1, SizeT, Head)];
write_sizes([C | CL], Sz, SizeT, Head) ->
{FileName, Fd} =
case ets:lookup(SizeT, Sz) of
[] ->
temp_file(Head, SizeT, Sz);
[{_,_,{FN,F},_}] ->
{FN, F}
end,
NoCollections = length(C),
_ = ets:update_counter(SizeT, Sz, {4,NoCollections}),
case file:write(Fd, lists:reverse(C)) of
ok ->
[[] | write_sizes(CL, Sz-1, SizeT, Head)];
Error ->
dets_utils:file_error(FileName, Error)
end.
output_slots([E | Es], Head, Cache, SizeT, NoKeys, NoObjs) ->
output_slots(E, Es, [E], Head, Cache, SizeT, NoKeys, NoObjs);
output_slots([], _Head, Cache, SizeT, NoKeys, NoObjs) ->
_ = ets:update_counter(SizeT, ?COUNTERS, {?OBJ_COUNTER,NoObjs}),
_ = ets:update_counter(SizeT, ?COUNTERS, {?KEY_COUNTER,NoKeys}),
{not_a_tuple, [], Cache}.
output_slots(E, [E1 | Es], Acc, Head, Cache, SizeT, NoKeys, NoObjs)
when element(1, E) =:= element(1, E1) ->
output_slots(E1, Es, [E1 | Acc], Head, Cache, SizeT, NoKeys, NoObjs);
output_slots(E, [], Acc, _Head, Cache, SizeT, NoKeys, NoObjs) ->
_ = ets:update_counter(SizeT, ?COUNTERS, {?OBJ_COUNTER,NoObjs}),
_ = ets:update_counter(SizeT, ?COUNTERS, {?KEY_COUNTER,NoKeys}),
{E, Acc, Cache};
output_slots(_E, L, Acc, Head, Cache, SizeT, NoKeys, NoObjs) ->
output_slot(Acc, Head, Cache, L, SizeT, NoKeys, NoObjs).
output_slot(Es, Head, Cache, L, SizeT, NoKeys, NoObjs) ->
Slot = element(1, hd(Es)),
%% Plain lists:sort/1 will do.
{Bins, Size, No, KNo} = prep_slot(lists:sort(Es), Head),
NNoKeys = NoKeys + KNo,
NNoObjs = NoObjs + No,
%% First the object collection.
BSize = Size + ?OHDSZ,
LSize = sz2pos(BSize),
Size2 = ?POW(LSize-1),
Pad = <<0:(Size2-BSize)/unit:8>>,
BinObject = [<<BSize:32, ?ACTIVE:32>>, Bins | Pad],
Cache1 =
if
LSize > tuple_size(Cache) ->
C1 = ?VEXT(LSize, Cache, []),
?VSET(LSize, C1, [BinObject]);
true ->
CL = ?VGET(LSize, Cache),
?VSET(LSize, Cache, [BinObject | CL])
end,
%% Then the pointer to the object collection.
%% Cannot yet determine the absolute pointers; segment_file/4 does that.
PBin = <<Slot:32,BSize:32,LSize:8>>,
PL = ?VGET(?FSCK_SEGMENT, Cache1),
NCache = ?VSET(?FSCK_SEGMENT, Cache1, [PBin | PL]),
output_slots(L, Head, NCache, SizeT, NNoKeys, NNoObjs).
prep_slot(L, Head) when Head#head.type =/= set ->
prep_slot(L, Head, []);
prep_slot([{_Slot,Key,_Seq,_T,BT} | L], _Head) ->
prep_set_slot(L, Key, BT, 0, 0, 0, []).
prep_slot([{_Slot, Key, Seq, T, _BT} | L], Head, W) ->
prep_slot(L, Head, [{Key, {Seq, {insert,T}}} | W]);
prep_slot([], Head, W) ->
WLs = dets_utils:family(W),
{[], Bins, Size, No, KNo, _} =
eval_slot(WLs, [], Head#head.type, [], [], 0, 0, 0, false),
{Bins, Size, No, KNo}.
%% Optimization, prep_slot/3 would work for set tables as well.
prep_set_slot([{_,K,_Seq,_T1,BT1} | L], K, _BT, Sz, NoKeys, NoObjs, Ws) ->
prep_set_slot(L, K, BT1, Sz, NoKeys, NoObjs, Ws);
prep_set_slot([{_,K1,_Seq,_T1,BT1} | L], _K, BT, Sz, NoKeys, NoObjs, Ws) ->
BSize = byte_size(BT) + 4,
NWs = [Ws,<<BSize:32>>|BT],
prep_set_slot(L, K1, BT1, Sz+BSize, NoKeys+1, NoObjs+1, NWs);
prep_set_slot([], _K, BT, Sz, NoKeys, NoObjs, Ws) ->
BSize = byte_size(BT) + 4,
{[Ws, <<BSize:32>> | BT], Sz + BSize, NoKeys+1, NoObjs+1}.
segment_file(SizeT, Head, FileData, SegEnd) ->
I = 2,
true = ets:delete_all_objects(SizeT),
lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end, FileData),
[{?FSCK_SEGMENT,SegAddr,Data,0} | FileData1] = FileData,
NewData =
case Data of
{InFile,In0} ->
{OutFile, Out} = temp_file(Head, SizeT, I),
file:close(In0),
{ok, In} = dets_utils:open(InFile, [raw,binary,read]),
{ok, 0} = dets_utils:position(In, InFile, bof),
seg_file(SegAddr, SegAddr, In, InFile, Out, OutFile, SizeT,
SegEnd),
file:close(In),
file:delete(InFile),
{OutFile,Out};
Objects ->
{LastAddr, B} = seg_file(Objects, SegAddr, SegAddr, SizeT, []),
dets_utils:disk_map_segment(SegAddr, B),
FinalZ = SegEnd - LastAddr,
[B | dets_utils:make_zeros(FinalZ)]
end,
%% Restore the positions.
true = ets:delete_all_objects(SizeT),
%% To get the segments copied first by dets:fsck_copy/4, use a big
%% number here, FSCK_SEGMENT2.
lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end,
[{?FSCK_SEGMENT2,SegAddr,NewData,0} | FileData1]),
ok.
seg_file(Addr, SS, In, InFile, Out, OutFile, SizeT, SegEnd) ->
case dets_utils:read_n(In, 4500) of
eof ->
FinalZ = SegEnd - Addr,
dets_utils:fwrite(Out, OutFile, dets_utils:make_zeros(FinalZ));
Bin ->
{NewAddr, L} = seg_file(Bin, Addr, SS, SizeT, []),
dets_utils:disk_map_segment(Addr, L),
ok = dets_utils:fwrite(Out, OutFile, L),
seg_file(NewAddr, SS, In, InFile, Out, OutFile, SizeT, SegEnd)
end.
seg_file(<<Slot:32,BSize:32,LSize:8,T/binary>>, Addr, SS, SizeT, L) ->
seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize);
seg_file([<<Slot:32,BSize:32,LSize:8>> | T], Addr, SS, SizeT, L) ->
seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize);
seg_file([], Addr, _SS, _SizeT, L) ->
{Addr, lists:reverse(L)};
seg_file(<<>>, Addr, _SS, _SizeT, L) ->
{Addr, lists:reverse(L)}.
seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize) ->
%% Should call slot_position/1, but since all segments are
%% allocated in a sequence, the position of a slot can be
%% calculated faster.
SlotPos = SS + ?SZOBJP*4 * Slot, % SlotPos = slot_position(Slot)
NoZeros = SlotPos - Addr,
PSize = NoZeros+?SZOBJP*4,
Inc = ?POW(LSize-1),
CollP = ets:update_counter(SizeT, LSize, Inc) - Inc,
PointerBin = if
NoZeros =:= 0 ->
<<BSize:32, CollP:32>>;
NoZeros > 100 ->
[dets_utils:make_zeros(NoZeros) |
<<BSize:32, CollP:32>>];
true ->
<<0:NoZeros/unit:8, BSize:32, CollP:32>>
end,
seg_file(T, Addr + PSize, SS, SizeT, [PointerBin | L]).
temp_file(Head, SizeT, N) ->
TmpName = lists:concat([Head#head.filename, '.', N]),
{ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]),
%% The file table is consulted when cleaning up.
true = ets:insert(SizeT, {N,0,{TmpName,Fd},0}),
{TmpName, Fd}.
%% Does not close Fd.
fsck_input(Head, Fd, Cntrs, FileHeader) ->
MaxSz0 = case FileHeader#fileheader.has_md5 of
true when is_integer(FileHeader#fileheader.no_colls) ->
?POW(max_objsize(FileHeader#fileheader.no_colls));
_ ->
%% The file is not compressed, so the bucket size
%% cannot exceed the filesize, for all buckets.
case file:position(Fd, eof) of
{ok, Pos} ->
Pos;
_ ->
1 bsl 32
end
end,
MaxSz = erlang:max(MaxSz0, ?CHUNK_SIZE),
State0 = fsck_read(?BASE, Fd, [], 0),
fsck_input(Head, State0, Fd, MaxSz, Cntrs).
fsck_input(Head, State, Fd, MaxSz, Cntrs) ->
fun(close) ->
ok;
(read) ->
case State of
done ->
end_of_input;
{done, L, _Seq} ->
R = count_input(Head, Cntrs, L),
{R, fsck_input(Head, done, Fd, MaxSz, Cntrs)};
{cont, L, Bin, Pos, Seq} ->
R = count_input(Head, Cntrs, L),
FR = fsck_objs(Bin, Head#head.keypos, Head, [], Seq),
NewState = fsck_read(FR, Pos, Fd, MaxSz, Head),
{R, fsck_input(Head, NewState, Fd, MaxSz, Cntrs)}
end
end.
%% The ets table Cntrs is used for counting objects per size.
count_input(Head, Cntrs, L) when Head#head.version =:= 8 ->
count_input1(Cntrs, L, []);
count_input(_Head, _Cntrs, L) ->
lists:reverse(L).
count_input1(Cntrs, [[LogSz | B] | Ts], L) ->
case catch ets:update_counter(Cntrs, LogSz, 1) of
N when is_integer(N) -> ok;
_Badarg -> true = ets:insert(Cntrs, {LogSz, 1})
end,
count_input1(Cntrs, Ts, [B | L]);
count_input1(_Cntrs, [], L) ->
L.
fsck_read(Pos, F, L, Seq) ->
case file:position(F, Pos) of
{ok, _} ->
read_more_bytes([], 0, Pos, F, L, Seq);
_Error ->
{done, L, Seq}
end.
fsck_read({more, Bin, Sz, L, Seq}, Pos, F, MaxSz, Head) when Sz > MaxSz ->
FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L, Seq),
fsck_read(FR, Pos, F, MaxSz, Head);
fsck_read({more, Bin, Sz, L, Seq}, Pos, F, _MaxSz, _Head) ->
read_more_bytes(Bin, Sz, Pos, F, L, Seq);
fsck_read({new, Skip, L, Seq}, Pos, F, _MaxSz, _Head) ->
NewPos = Pos + Skip,
fsck_read(NewPos, F, L, Seq).
read_more_bytes(B, Min, Pos, F, L, Seq) ->
Max = if
Min < ?CHUNK_SIZE -> ?CHUNK_SIZE;
true -> Min
end,
case dets_utils:read_n(F, Max) of
eof ->
{done, L, Seq};
Bin ->
NewPos = Pos + byte_size(Bin),
{cont, L, list_to_binary([B, Bin]), NewPos, Seq}
end.
fsck_objs(Bin = <<Sz:32, Status:32, Tail/binary>>, Kp, Head, L, Seq) ->
if
Status =:= ?ACTIVE ->
Sz1 = Sz-?OHDSZ,
case Tail of
<<BinTerm:Sz1/binary, Tail2/binary>> ->
case catch bin2keybins(BinTerm, Head) of
{'EXIT', _Reason} ->
%% The whole collection of objects is skipped.
skip_bytes(Bin, ?BUMP, Kp, Head, L, Seq);
BOs ->
{NL, NSeq} = make_objects(BOs, Seq, Kp, Head, L),
Skip = ?POW(sz2pos(Sz)-1) - Sz,
skip_bytes(Tail2, Skip, Kp, Head, NL, NSeq)
end;
_ when byte_size(Tail) < Sz1 ->
{more, Bin, Sz, L, Seq}
end;
true ->
skip_bytes(Bin, ?BUMP, Kp, Head, L, Seq)
end;
fsck_objs(Bin, _Kp, _Head, L, Seq) ->
{more, Bin, 0, L, Seq}.
make_objects([{K,BT}|Os], Seq, Kp, Head, L) when Head#head.version =:= 8 ->
LogSz = dets_v8:sz2pos(byte_size(BT)+?OHDSZ_v8),
Slot = dets_v8:db_hash(K, Head),
Obj = [LogSz | <<Slot:32, LogSz:8, BT/binary>>],
make_objects(Os, Seq, Kp, Head, [Obj | L]);
make_objects([{K,BT} | Os], Seq, Kp, Head, L) ->
Obj = make_object(Head, K, Seq, BT),
make_objects(Os, Seq+1, Kp, Head, [Obj | L]);
make_objects([], Seq, _Kp, _Head, L) ->
{L, Seq}.
%% Inlined.
make_object(Head, Key, Seq, BT) ->
Slot = db_hash(Key, Head),
<<Slot:32, Seq:32, BT/binary>>.
%% Inlined.
skip_bytes(Bin, Skip, Kp, Head, L, Seq) ->
case Bin of
<<_:Skip/binary, Tail/binary>> ->
fsck_objs(Tail, Kp, Head, L, Seq);
_ when byte_size(Bin) < Skip ->
{new, Skip - byte_size(Bin), L, Seq}
end.
%%%
%%% End of repair, conversion and initialization of a dets file.
%%%
%% -> {NewHead, ok} | throw({Head, Error})
do_perform_save(H) ->
{ok, FreeListsPointer} = dets_utils:position(H, eof),
H1 = H#head{freelists_p = FreeListsPointer},
{FLW, FLSize} = free_lists_to_file(H1),
FileSize = FreeListsPointer + FLSize + 4,
AdjustedFileSize = case H#head.base of
?BASE -> FileSize;
Base -> FileSize - Base
end,
ok = dets_utils:write(H1, [FLW | <<AdjustedFileSize:32>>]),
FileHeader = file_header(H1, FreeListsPointer, ?CLOSED_PROPERLY),
case dets_utils:debug_mode() of
true ->
TmpHead0 = init_freelist(H1#head{fixed = false}, true),
TmpHead = TmpHead0#head{base = H1#head.base},
case
catch dets_utils:all_allocated_as_list(TmpHead)
=:= dets_utils:all_allocated_as_list(H1)
of
true ->
dets_utils:pwrite(H1, [{0, FileHeader}]);
_ ->
throw(
dets_utils:corrupt_reason(H1, {failed_to_save_free_lists,
FreeListsPointer,
TmpHead#head.freelists,
H1#head.freelists}))
end;
false ->
dets_utils:pwrite(H1, [{0, FileHeader}])
end.
file_header(Head, FreeListsPointer, ClosedProperly) ->
NoColls = case Head#head.no_collections of
undefined -> [];
NC -> NC
end,
L = orddict:merge(fun(_K, V1, V2) -> V1 + V2 end,
NoColls,
lists:map(fun(X) -> {X,0} end, lists:seq(4,?MAXBUD-1))),
CW = lists:map(fun({_LSz,N}) -> <<N:32>> end, L),
file_header(Head, FreeListsPointer, ClosedProperly, CW).
file_header(Head, FreeListsPointer, ClosedProperly, NoColls) ->
Cookie = ?MAGIC,
TypeCode = dets_utils:type_to_code(Head#head.type),
Version = ?FILE_FORMAT_VERSION,
HashMethod = hash_method_to_code(Head#head.hash_bif),
H1 = <<FreeListsPointer:32, Cookie:32, ClosedProperly:32>>,
H2 = <<TypeCode:32,
Version:32,
(Head#head.m):32,
(Head#head.next):32,
(Head#head.keypos):32,
(Head#head.no_objects):32,
(Head#head.no_keys):32,
(Head#head.min_no_slots):32,
(Head#head.max_no_slots):32,
HashMethod:32,
(Head#head.n):32>>,
DigH = [H2 | NoColls],
MD5 = case Head#head.has_md5 of
true -> erlang:md5(DigH);
false -> <<0:?MD5SZ/unit:8>>
end,
Base = case Head#head.base of
?BASE -> <<0:32>>;
FlBase -> <<FlBase:32>>
end,
[H1, DigH, MD5, Base | <<0:?RESERVED/unit:8>>].
%% Going through some trouble to avoid creating one single binary for
%% the free lists. If the free lists are huge, binary_to_term and
%% term_to_binary could otherwise stop the emulator for quite some time.
-define(MAXFREEOBJ, 4096).
-define(ENDFREE, 12345).
free_lists_to_file(H) ->
FL = dets_utils:get_freelists(H),
free_list_to_file(FL, H, 1, tuple_size(FL), [], 0).
free_list_to_file(_Ftab, _H, Pos, Sz, Ws, WsSz) when Pos > Sz ->
{[Ws | <<(4+?OHDSZ):32, ?FREE:32, ?ENDFREE:32>>], WsSz+4+?OHDSZ};
free_list_to_file(Ftab, H, Pos, Sz, Ws, WsSz) ->
Max = (?MAXFREEOBJ - 4 - ?OHDSZ) div 4,
F = fun(N, L, W, S) when N =:= 0 -> {N, L, W, S};
(N, L, W, S) ->
{L1, N1, More} =
if
N > Max ->
{lists:sublist(L, Max), Max,
{N-Max, lists:nthtail(Max, L)}};
true ->
{L, N, no_more}
end,
Size = N1*4 + 4 + ?OHDSZ,
Header = <<Size:32, ?FREE:32, Pos:32>>,
NW = [W, Header | L1],
case More of
no_more ->
{0, [], NW, S+Size};
{NN, NL} ->
ok = dets_utils:write(H, NW),
{NN, NL, [], S+Size}
end
end,
{NWs,NWsSz} = dets_utils:tree_to_bin(element(Pos, Ftab), F, Max, Ws, WsSz),
free_list_to_file(Ftab, H, Pos+1, Sz, NWs, NWsSz).
free_lists_from_file(H, Pos) ->
dets_utils:position(H#head.fptr, H#head.filename, Pos),
FL = dets_utils:empty_free_lists(),
case catch bin_to_tree([], H, start, FL, -1, []) of
{'EXIT', _} ->
throw({error, {bad_freelists, H#head.filename}});
Ftab ->
H#head{freelists = Ftab, base = ?BASE}
end.
bin_to_tree(Bin, H, LastPos, Ftab, A0, L) ->
case Bin of
<<_Size:32,?FREE:32,?ENDFREE:32,_/binary>> when L =:= [] ->
Ftab;
<<_Size:32,?FREE:32,?ENDFREE:32,_/binary>> ->
setelement(LastPos, Ftab, dets_utils:list_to_tree(L));
<<Size:32,?FREE:32,Pos:32,T/binary>>
when byte_size(T) >= Size-4-?OHDSZ ->
{NFtab, L1, A1} =
if
Pos =/= LastPos, LastPos =/= start ->
Tree = dets_utils:list_to_tree(L),
{setelement(LastPos, Ftab, Tree), [], -1};
true ->
{Ftab, L, A0}
end,
{NL, B2, A2} = bin_to_tree1(T, Size-?OHDSZ-4, A1, L1),
bin_to_tree(B2, H, Pos, NFtab, A2, NL);
_ ->
Bin2 = dets_utils:read_n(H#head.fptr, ?MAXFREEOBJ),
bin_to_tree(list_to_binary([Bin | Bin2]), H, LastPos, Ftab, A0, L)
end.
bin_to_tree1(<<A1:32,A2:32,A3:32,A4:32,T/binary>>, Size, A, L)
when Size >= 16, A < A1, A1 < A2, A2 < A3, A3 < A4 ->
bin_to_tree1(T, Size-16, A4, [A4, A3, A2, A1 | L]);
bin_to_tree1(<<A1:32,T/binary>>, Size, A, L) when Size >= 4, A < A1 ->
bin_to_tree1(T, Size - 4, A1, [A1 | L]);
bin_to_tree1(B, 0, A, L) ->
{L, B, A}.
%% -> [term()] | throw({Head, Error})
slot_objs(H, Slot) when Slot >= H#head.next ->
'$end_of_table';
slot_objs(H, Slot) ->
{ok, _Pointer, Objects} = slot_objects(H, Slot),
Objects.
%% Inlined.
h(I, phash2) -> erlang:phash2(I); % -> [0..2^27-1]
h(I, phash) -> erlang:phash(I, ?BIG) - 1.
db_hash(Key, Head) when Head#head.hash_bif =:= phash2 ->
H = erlang:phash2(Key),
Hash = ?REM2(H, Head#head.m),
if
Hash < Head#head.n ->
?REM2(H, Head#head.m2); % H rem (2 * m)
true ->
Hash
end;
db_hash(Key, Head) ->
H = h(Key, Head#head.hash_bif),
Hash = H rem Head#head.m,
if
Hash < Head#head.n ->
H rem (Head#head.m2); % H rem (2 * m)
true ->
Hash
end.
hash_method_to_code(phash2) -> ?PHASH2;
hash_method_to_code(phash) -> ?PHASH.
code_to_hash_method(?PHASH2) -> phash2;
code_to_hash_method(?PHASH) -> phash;
code_to_hash_method(_) -> undefined.
no_slots(Head) ->
{Head#head.min_no_slots, Head#head.next, Head#head.max_no_slots}.
table_parameters(Head) ->
case Head#head.no_collections of
undefined ->
undefined; % Version 9(a)
CL ->
NoColls0 = lists:foldl(fun({_,0}, A) -> A;
(E, A) -> [E | A]
end, [], CL),
NoColls = lists:reverse(NoColls0),
#?HASH_PARMS{file_format_version = Head#head.version,
bchunk_format_version = ?BCHUNK_FORMAT_VERSION,
file = filename:basename(Head#head.filename),
type = Head#head.type,
keypos = Head#head.keypos,
hash_method = hash_method_to_code(Head#head.hash_bif),
n = Head#head.n, m = Head#head.m,
next = Head#head.next,
min = Head#head.min_no_slots,
max = Head#head.max_no_slots,
no_objects = Head#head.no_objects,
no_keys = Head#head.no_keys, no_colls = NoColls}
end.
%% Allow quite a lot when reading object collections.
-define(MAXCOLL, (10 * ?CHUNK_SIZE)).
%% Re-hashing a segment, starting with SlotStart.
%%
%% On the average, half of the keys of the slot are put in a new slot.
%% If the old slot is i, then the new slot is i+m. The new slots
%% reside in a newly allocated segment.
%%
%% -> {NewHead, ok} | throw({Head, Error})
re_hash(Head, SlotStart) ->
FromSlotPos = slot_position(SlotStart),
ToSlotPos = slot_position(SlotStart + Head#head.m),
RSpec = [{FromSlotPos, 4 * ?SEGSZ}],
{ok, [FromBin]} = dets_utils:pread(RSpec, Head),
split_bins(FromBin, Head, FromSlotPos, ToSlotPos, [], [], 0).
split_bins(<<>>, Head, _Pos1, _Pos2, _ToRead, _L, 0) ->
{Head, ok};
split_bins(<<>>, Head, Pos1, Pos2, ToRead, L, _SoFar) ->
re_hash_write(Head, ToRead, L, Pos1, Pos2);
split_bins(FB, Head, Pos1, Pos2, ToRead, L, SoFar) ->
<<Sz1:32, P1:32, FT/binary>> = FB,
<<B1:?OHDSZ/binary, _/binary>> = FB,
NSoFar = SoFar + Sz1,
NPos1 = Pos1 + ?SZOBJP*4,
NPos2 = Pos2 + ?SZOBJP*4,
if
NSoFar > ?MAXCOLL, ToRead =/= [] ->
{NewHead, ok} = re_hash_write(Head, ToRead, L, Pos1, Pos2),
split_bins(FB, NewHead, Pos1, Pos2, [], [], 0);
Sz1 =:= 0 ->
E = {skip,B1},
split_bins(FT, Head, NPos1, NPos2, ToRead, [E | L], NSoFar);
true ->
E = {Sz1,P1,B1,Pos1,Pos2},
NewToRead = [{P1,Sz1} | ToRead],
split_bins(FT, Head, NPos1, NPos2, NewToRead, [E | L], NSoFar)
end.
re_hash_write(Head, ToRead, L, Pos1, Pos2) ->
check_pread2_arg(ToRead, Head),
{ok, Bins} = dets_utils:pread(ToRead, Head),
Z = <<0:32, 0:32>>,
{Head1, BinFS, BinTS, WsB} = re_hash_slots(Bins, L, Head, Z, [],[],[]),
WPos1 = Pos1 - ?SZOBJP*4*length(L),
WPos2 = Pos2 - ?SZOBJP*4*length(L),
ToWrite = [{WPos1,BinFS}, {WPos2, BinTS} | WsB],
dets_utils:pwrite(Head1, ToWrite).
re_hash_slots(Bins, [{skip,B1} | L], Head, Z, BinFS, BinTS, WsB) ->
re_hash_slots(Bins, L, Head, Z, [B1 | BinFS], [Z | BinTS], WsB);
re_hash_slots([FB | Bins], [E | L], Head, Z, BinFS, BinTS, WsB) ->
{Sz1,P1,B1,Pos1,Pos2} = E,
KeyObjs = case catch per_key(Head, FB) of
{'EXIT', _Error} ->
Bad = dets_utils:bad_object(re_hash_slots, {FB, E}),
throw(dets_utils:corrupt_reason(Head, Bad));
Else ->
Else
end,
case re_hash_split(KeyObjs, Head, [], 0, [], 0) of
{_KL, _KSz, [], 0} ->
Sz1 = _KSz + ?OHDSZ,
re_hash_slots(Bins, L, Head, Z, [B1 | BinFS], [Z | BinTS], WsB);
{[], 0, _ML, _MSz} -> %% Optimization.
Sz1 = _MSz + ?OHDSZ,
re_hash_slots(Bins, L, Head, Z, [Z | BinFS], [B1 | BinTS], WsB);
{KL, KSz, ML, MSz} when KL =/= [], KSz > 0, ML =/= [], MSz > 0 ->
{Head1, FS1, Ws1} =
updated(Head, P1, Sz1, KSz, Pos1, KL, true, foo, bar),
{NewHead, [{Pos2,Bin2}], Ws2} =
updated(Head1, 0, 0, MSz, Pos2, ML, true, foo, bar),
NewBinFS = case FS1 of
[{Pos1,Bin1}] -> [Bin1 | BinFS];
[] -> [B1 | BinFS] % cannot happen
end,
NewBinTS = [Bin2 | BinTS],
NewWsB = Ws2 ++ Ws1 ++ WsB,
re_hash_slots(Bins, L, NewHead, Z, NewBinFS, NewBinTS, NewWsB)
end;
re_hash_slots([], [], Head, _Z, BinFS, BinTS, WsB) ->
{Head, BinFS, BinTS, lists:reverse(WsB)}.
re_hash_split([E | KeyObjs], Head, KL, KSz, ML, MSz) ->
{Key,Sz,Bin,_Item,_Objs} = E,
New = h(Key, Head#head.hash_bif) rem Head#head.m2, % h(key) rem (m * 2)
if
New >= Head#head.m ->
re_hash_split(KeyObjs, Head, KL, KSz, [Bin | ML], MSz + Sz);
true ->
re_hash_split(KeyObjs, Head, [Bin | KL], KSz + Sz, ML, MSz)
end;
re_hash_split([], _Head, KL, KSz, ML, MSz) ->
{lists:reverse(KL), KSz, lists:reverse(ML), MSz}.
%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error})
write_cache(Head) ->
C = Head#head.cache,
case dets_utils:is_empty_cache(C) of
true -> {Head, [], []};
false ->
{NewC, MaxInserts, PerKey} = dets_utils:reset_cache(C),
%% MaxNoInsertedKeys is an upper limit on the number of new keys.
MaxNoInsertedKeys = erlang:min(MaxInserts, length(PerKey)),
Head1 = Head#head{cache = NewC},
case may_grow(Head1, MaxNoInsertedKeys, once) of
{Head2, ok} ->
eval_work_list(Head2, PerKey);
HeadError ->
throw(HeadError)
end
end.
%% -> {NewHead, ok} | {NewHead, Error}
may_grow(Head, 0, once) ->
%% Do not re-hash if there is a chance that the file is not dirty.
{Head, ok};
may_grow(Head, _N, _How) when Head#head.fixed =/= false ->
{Head, ok};
may_grow(#head{access = read}=Head, _N, _How) ->
{Head, ok};
may_grow(Head, _N, _How) when Head#head.next >= Head#head.max_no_slots ->
{Head, ok};
may_grow(Head, N, How) ->
Extra = erlang:min(2*?SEGSZP, Head#head.no_keys + N - Head#head.next),
case catch may_grow1(Head, Extra, How) of
{error, _Reason} = Error -> % alloc may throw error
dets_utils:corrupt(Head, Error);
{NewHead, Reply} when is_record(Head, head) ->
{NewHead, Reply}
end.
may_grow1(Head, Extra, many_times) when Extra > ?SEGSZP ->
Reply = grow(Head, 1, undefined),
self() ! ?DETS_CALL(self(), may_grow),
Reply;
may_grow1(Head, Extra, _How) ->
grow(Head, Extra, undefined).
%% -> {Head, ok} | throw({Head, Error})
grow(Head, Extra, _SegZero) when Extra =< 0 ->
{Head, ok};
grow(Head, Extra, undefined) ->
grow(Head, Extra, seg_zero());
grow(Head, _Extra, _SegZero) when Head#head.next >= Head#head.max_no_slots ->
{Head, ok};
grow(Head, Extra, SegZero) ->
#head{n = N, next = Next, m = M} = Head,
SegNum = Next div ?SEGSZP,
{Head0, W, Ws1} = allocate_segment(Head, SegZero, SegNum),
%% re_hash/2 will overwrite the segment, but initialize it anyway...
{Head1, ok} = dets_utils:pwrite(Head0, [W | Ws1]),
%% If re_hash fails, segp_cache has been called, but it does not matter.
{Head2, ok} = re_hash(Head1, N),
NewHead =
if
N + ?SEGSZP =:= M ->
Head2#head{n = 0, next = Next + ?SEGSZP, m = 2 * M, m2 = 4 * M};
true ->
Head2#head{n = N + ?SEGSZP, next = Next + ?SEGSZP}
end,
true = hash_invars(NewHead),
grow(NewHead, Extra - ?SEGSZP, SegZero).
hash_invars(H) ->
hash_invars(H#head.n, H#head.m, H#head.next, H#head.min_no_slots,
H#head.max_no_slots).
-define(M8(X), (((X) band (?SEGSZP - 1)) =:= 0)).
hash_invars(N, M, Next, Min, Max) ->
?M8(N) and ?M8(M) and ?M8(Next) and ?M8(Min) and ?M8(Max)
and (0 =< N) and (N =< M) and (N =< 2*Next) and (M =< Next)
and (Next =< 2*M) and (0 =< Min) and (Min =< Next) and (Next =< Max)
and (Min =< M).
seg_zero() ->
<<0:(4*?SEGSZ)/unit:8>>.
find_object(Head, Object) ->
Key = element(Head#head.keypos, Object),
Slot = db_hash(Key, Head),
find_object(Head, Object, Slot).
find_object(H, _Obj, Slot) when Slot >= H#head.next ->
false;
find_object(H, Obj, Slot) ->
case catch slot_objects(H, Slot) of
{ok, Pointer, Objects} ->
case lists:member(Obj, Objects) of
true -> {ok, Pointer};
false -> false
end;
_ -> false
end.
%% -> {ok, BucketP, Objects} | throw({Head, Error})
slot_objects(Head, Slot) ->
SlotPos = slot_position(Slot),
MaxSize = maxobjsize(Head),
case dets_utils:ipread(Head, SlotPos, MaxSize) of
{ok, {BucketSz, Pointer, <<BucketSz:32, _St:32, KeysObjs/binary>>}} ->
case catch bin2objs(KeysObjs, Head#head.type, []) of
{'EXIT', _Error} ->
Bad = dets_utils:bad_object(slot_objects,
{SlotPos, KeysObjs}),
throw(dets_utils:corrupt_reason(Head, Bad));
Objs when is_list(Objs) ->
{ok, Pointer, lists:reverse(Objs)}
end;
[] ->
{ok, 0, []};
BadRead -> % eof or bad badly formed binary
Bad = dets_utils:bad_object(slot_objects, {SlotPos, BadRead}),
throw(dets_utils:corrupt_reason(Head, Bad))
end.
%%%
%%% Cache routines depending on the dets file format.
%%%
%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error})
eval_work_list(Head, [{Key,[{_Seq,{lookup,Pid}}]}]) ->
SlotPos = slot_position(db_hash(Key, Head)),
MaxSize = maxobjsize(Head),
Objs = case dets_utils:ipread(Head, SlotPos, MaxSize) of
{ok, {_BucketSz, _Pointer, Bin}} ->
case catch per_key(Head, Bin) of
{'EXIT', _Error} ->
Bad = dets_utils:bad_object(eval_work_list,
{SlotPos, Bin}),
throw(dets_utils:corrupt_reason(Head, Bad));
KeyObjs when is_list(KeyObjs) ->
case dets_utils:mkeysearch(Key, 1, KeyObjs) of
false ->
[];
{value, {Key,_KS,_KB,O,Os}} ->
case catch binobjs2terms(Os) of
{'EXIT', _Error} ->
Bad = dets_utils:bad_object
(eval_work_list,
{SlotPos, Bin, KeyObjs}),
throw(dets_utils:corrupt_reason
(Head, Bad));
Terms when is_list(Terms) ->
get_objects([O | Terms])
end
end
end;
[] ->
[];
BadRead -> % eof or bad badly formed binary
Bad = dets_utils:bad_object(eval_work_list,
{SlotPos, BadRead}),
throw(dets_utils:corrupt_reason(Head, Bad))
end,
{Head, [{Pid,Objs}], []};
eval_work_list(Head, PerKey) ->
SWLs = tag_with_slot(PerKey, Head, []),
P1 = dets_utils:family(SWLs),
{PerSlot, SlotPositions} = remove_slot_tag(P1, [], []),
{ok, Bins} = dets_utils:pread(SlotPositions, Head),
read_buckets(PerSlot, SlotPositions, Bins, Head, [], [], [], [], 0, 0, 0).
tag_with_slot([{K,_} = WL | WLs], Head, L) ->
tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]);
tag_with_slot([], _Head, L) ->
L.
remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) ->
remove_slot_tag(SSWLs, [SWLs | Ls], [{slot_position(S), ?SEGOBJSZ} | SPs]);
remove_slot_tag([], Ls, SPs) ->
{Ls, SPs}.
read_buckets([WLs | SPs], [{P1,_8} | Ss], [<<_Zero:32,P2:32>> | Bs], Head,
PWLs, ToRead, LU, Ws, NoObjs, NoKeys, SoFar) when P2 =:= 0 ->
{NewHead, NLU, NWs, No, KNo} =
eval_bucket_keys(WLs, P1, 0, 0, [], Head, Ws, LU),
NewNoObjs = No + NoObjs,
NewNoKeys = KNo + NoKeys,
read_buckets(SPs, Ss, Bs, NewHead, PWLs, ToRead, NLU, NWs,
NewNoObjs, NewNoKeys, SoFar);
read_buckets([WorkLists| SPs], [{P1,_8} | Ss], [<<Size:32,P2:32>> | Bs], Head,
PWLs, ToRead, LU, Ws, NoObjs, NoKeys, SoFar)
when SoFar + Size < ?MAXCOLL; ToRead =:= [] ->
NewToRead = [{P2, Size} | ToRead],
NewPWLs = [{P2,P1,WorkLists} | PWLs],
NewSoFar = SoFar + Size,
read_buckets(SPs, Ss, Bs, Head, NewPWLs, NewToRead, LU, Ws,
NoObjs, NoKeys, NewSoFar);
read_buckets(SPs, Ss, Bs, Head, PWLs0, ToRead0, LU, Ws, NoObjs, NoKeys, SoFar)
when SoFar > 0 ->
%% It pays off to sort the positions. The seek times are reduced,
%% at least if the file blocks are reasonably contiguous, as is
%% often the case.
PWLs = lists:keysort(1, PWLs0),
ToRead = lists:keysort(1, ToRead0),
check_pread2_arg(ToRead, Head),
{ok, Bins} = dets_utils:pread(ToRead, Head),
case catch eval_buckets(Bins, PWLs, Head, LU, Ws, 0, 0) of
{ok, NewHead, NLU, [], 0, 0} ->
read_buckets(SPs, Ss, Bs, NewHead, [], [], NLU, [],
NoObjs, NoKeys, 0);
{ok, Head1, NLU, NWs, No, KNo} ->
NewNoObjs = NoObjs + No,
NewNoKeys = NoKeys + KNo,
%% It does not seem to reduce seek times to sort positions
%% when writing (maybe because it takes several calls to
%% write_cache/1 to fill the file system's buffer cache).
{NewHead, ok} = dets_utils:pwrite(Head1, lists:reverse(NWs)),
read_buckets(SPs, Ss, Bs, NewHead, [], [], NLU, [],
NewNoObjs, NewNoKeys, 0);
Error ->
Bad = dets_utils:bad_object(read_buckets, {Bins, Error}),
throw(dets_utils:corrupt_reason(Head, Bad))
end;
read_buckets([], [], [], Head, [], [], LU, Ws, NoObjs, NoKeys, 0) ->
{NewHead, NWs} = update_no_keys(Head, Ws, NoObjs, NoKeys),
{NewHead, LU, lists:reverse(NWs)}.
eval_buckets([Bin | Bins], [SP | SPs], Head, LU, Ws, NoObjs, NoKeys) ->
{Pos, P1, WLs} = SP,
KeyObjs = per_key(Head, Bin),
{NewHead, NLU, NWs, No, KNo} =
eval_bucket_keys(WLs, P1, Pos, byte_size(Bin), KeyObjs, Head,Ws,LU),
eval_buckets(Bins, SPs, NewHead, NLU, NWs, NoObjs + No, NoKeys + KNo);
eval_buckets([], [], Head, LU, Ws, NoObjs, NoKeys) ->
{ok, Head, LU, Ws, NoObjs, NoKeys}.
eval_bucket_keys(WLs, SlotPos, Pos, OldSize, KeyObjs, Head, Ws, LU) ->
{NLU, Bins, BSize, No, KNo, Ch} =
eval_slot(WLs, KeyObjs, Head#head.type, LU, [], 0, 0, 0, false),
{NewHead, W1, W2} =
updated(Head, Pos, OldSize, BSize, SlotPos, Bins, Ch, No, KNo),
{NewHead, NLU, W2++W1++Ws, No, KNo}.
updated(Head, Pos, OldSize, BSize, SlotPos, Bins, Ch, DeltaNoOs, DeltaNoKs) ->
BinsSize = BSize + ?OHDSZ,
if
Pos =:= 0, BSize =:= 0 ->
{Head, [], []};
Pos =:= 0, BSize > 0 ->
{Head1, NewPos, FPos} = dets_utils:alloc(Head, adjsz(BinsSize)),
NewHead = one_bucket_added(Head1, FPos-1),
W1 = {NewPos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
W2 = {SlotPos, <<BinsSize:32, NewPos:32>>},
{NewHead, [W2], [W1]};
Pos =/= 0, BSize =:= 0 ->
{Head1, FPos} = dets_utils:free(Head, Pos, adjsz(OldSize)),
NewHead = one_bucket_removed(Head1, FPos-1),
W1 = {Pos+?STATUS_POS, <<?FREE:32>>},
W2 = {SlotPos, <<0:32, 0:32>>},
{NewHead, [W2], [W1]};
Pos =/= 0, BSize > 0, Ch =:= false ->
{Head, [], []};
Pos =/= 0, BSize > 0 ->
%% Doubtful. The scan function has to be careful since
%% partly scanned objects may be overwritten.
Overwrite0 = if
OldSize =:= BinsSize -> same;
true -> sz2pos(OldSize) =:= sz2pos(BinsSize)
end,
Overwrite = if
Head#head.fixed =/= false ->
%% Make sure that if the table is
%% fixed, nothing is overwritten,
%% unless the number of objects and
%% the number of keys remain the same.
%% This is used by bchunk, which
%% assumes that it traverses exactly
%% the same number of objects and keys
%% (and collections) as were present
%% when chunking started (the table
%% must have been fixed).
(Overwrite0 =/= false) and
(DeltaNoOs =:= 0) and (DeltaNoKs =:= 0);
true ->
Overwrite0
end,
if
Overwrite =:= same ->
W1 = {Pos+?OHDSZ, Bins},
{Head, [], [W1]};
Overwrite ->
W1 = {Pos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
%% Pos is already there, but return {SlotPos, <8 bytes>}.
W2 = {SlotPos, <<BinsSize:32, Pos:32>>},
{Head, [W2], [W1]};
true ->
{Head1, FPosF} = dets_utils:free(Head, Pos, adjsz(OldSize)),
{Head2, NewPos, FPosA} =
dets_utils:alloc(Head1, adjsz(BinsSize)),
Head3 = one_bucket_added(Head2, FPosA-1),
NewHead = one_bucket_removed(Head3, FPosF-1),
W0 = {NewPos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
W2 = {SlotPos, <<BinsSize:32, NewPos:32>>},
W1 = if
Pos =/= NewPos ->
%% W0 first.
[W0, {Pos+?STATUS_POS, <<?FREE:32>>}];
true ->
[W0]
end,
{NewHead, [W2], W1}
end
end.
one_bucket_added(H, _Log2) when H#head.no_collections =:= undefined ->
H;
one_bucket_added(H, Log2) when H#head.maxobjsize >= Log2 ->
NewNoColls = orddict:update_counter(Log2, 1, H#head.no_collections),
H#head{no_collections = NewNoColls};
one_bucket_added(H, Log2) ->
NewNoColls = orddict:update_counter(Log2, 1, H#head.no_collections),
H#head{no_collections = NewNoColls, maxobjsize = Log2}.
one_bucket_removed(H, _FPos) when H#head.no_collections =:= undefined ->
H;
one_bucket_removed(H, Log2) when H#head.maxobjsize > Log2 ->
NewNoColls = orddict:update_counter(Log2, -1, H#head.no_collections),
H#head{no_collections = NewNoColls};
one_bucket_removed(H, Log2) when H#head.maxobjsize =:= Log2 ->
NewNoColls = orddict:update_counter(Log2, -1, H#head.no_collections),
MaxObjSize = max_objsize(NewNoColls),
H#head{no_collections = NewNoColls, maxobjsize = MaxObjSize}.
eval_slot([{Key,Commands} | WLs] = WLs0, [{K,KS,KB,O,Os} | KOs1]=KOs,
Type, LU, Ws, No, KNo,BSz, Ch) ->
case dets_utils:cmp(K, Key) of
0 ->
Old = [O | binobjs2terms(Os)],
{NLU, NWs, Sz, No1, KNo1, NCh} =
eval_key(Key, Commands, Old, Type, KB, KS, LU, Ws, Ch),
eval_slot(WLs, KOs1, Type, NLU, NWs, No1 + No,
KNo1 + KNo, Sz + BSz, NCh);
-1 ->
eval_slot(WLs0, KOs1, Type, LU, [Ws | KB], No,
KNo, KS + BSz, Ch);
1 ->
{NLU, NWs, Sz, No1, KNo1, NCh} =
eval_key(Key, Commands, [], Type, [], 0, LU, Ws, Ch),
eval_slot(WLs, KOs, Type, NLU, NWs, No1 + No,
KNo1 + KNo, Sz + BSz, NCh)
end;
eval_slot([{Key,Commands} | WLs], [], Type, LU, Ws, No, KNo,BSz, Ch) ->
{NLU, NWs, Sz, No1, KNo1, NCh} =
eval_key(Key, Commands, [], Type, [], 0, LU, Ws, Ch),
eval_slot(WLs, [], Type, NLU, NWs, No1 + No, KNo1 + KNo, Sz + BSz, NCh);
eval_slot([], [{_Key,Size,KeyBin,_,_} | KOs], Type, LU, Ws, No, KNo,BSz, Ch) ->
eval_slot([], KOs, Type, LU, [Ws | KeyBin], No, KNo, Size + BSz, Ch);
eval_slot([], [], _Type, LU, Ws, No, KNo, BSz, Ch) ->
{LU, Ws, BSz, No, KNo, Ch}.
eval_key(_K, [{_Seq,{lookup,Pid}}], [], _Type, _KeyBin, _KeySz, LU, Ws, Ch) ->
NLU = [{Pid, []} | LU],
{NLU, Ws, 0, 0, 0, Ch};
eval_key(_K, [{_Seq,{lookup,Pid}}], Old0, _Type, KeyBin, KeySz, LU, Ws, Ch) ->
Old = lists:keysort(2, Old0), % sort on sequence number
Objs = get_objects(Old),
NLU = [{Pid, Objs} | LU],
{NLU, [Ws | KeyBin], KeySz, 0, 0, Ch};
eval_key(K, Comms, Orig, Type, KeyBin, KeySz, LU, Ws, Ch) ->
Old = dets_utils:msort(Orig),
case eval_key1(Comms, [], Old, Type, K, LU, Ws, 0, Orig) of
{ok, NLU} when Old =:= [] ->
{NLU, Ws, 0, 0, 0, Ch};
{ok, NLU} ->
{NLU, [Ws | KeyBin], KeySz, 0, 0, Ch};
{NLU, NWs, NSz, No} when Old =:= [], NSz > 0 ->
{NLU, NWs, NSz, No, 1, true};
{NLU, NWs, NSz, No} when Old =/= [], NSz =:= 0 ->
{NLU, NWs, NSz, No, -1, true};
{NLU, NWs, NSz, No} ->
{NLU, NWs, NSz, No, 0, true}
end.
%% First find 'delete_key' and 'lookup' commands, and handle the 'set' type.
eval_key1([{_Seq,{insert,Term}} | L], Cs, [{Term,_,_}] = Old, Type=set, K,
LU, Ws, No, Orig) ->
eval_key1(L, Cs, Old, Type, K, LU, Ws, No, Orig);
eval_key1([{Seq,{insert,Term}} | L], Cs, Old, Type=set, K, LU, Ws, No, Orig)
->
NNo = No + 1 - length(Old),
eval_key1(L, Cs, [{Term,Seq,insert}], Type, K, LU, Ws, NNo, Orig);
eval_key1([{_Seq,{lookup,Pid}} | L], Cs, Old, Type, Key, LU, Ws, No, Orig) ->
{ok, New0, NewNo} = eval_comms(Cs, Old, Type, No),
New = lists:keysort(2, New0), % sort on sequence number
Objs = get_objects(New),
NLU = [{Pid, Objs} | LU],
if
L =:= [] ->
eval_end(New, NLU, Type, Ws, NewNo, Orig);
true ->
NewOld = dets_utils:msort(New),
eval_key1(L, [], NewOld, Type, Key, NLU, Ws, NewNo, Orig)
end;
eval_key1([{_Seq,delete_key} | L], _Cs, Old, Type, K, LU, Ws, No, Orig) ->
NewNo = No - length(Old),
eval_key1(L, [], [], Type, K, LU, Ws, NewNo, Orig);
eval_key1([{_Seq,{delete_object,Term}} | L], Cs, [{Term,_,_}], Type=set, K,
LU, Ws, No, Orig) ->
eval_key1(L, Cs, [], Type, K, LU, Ws, No-1, Orig);
eval_key1([{_Seq,{delete_object,_T}}| L], Cs, Old1, Type=set, K, LU,
Ws, No, Orig) ->
eval_key1(L, Cs, Old1, Type, K, LU, Ws, No, Orig);
eval_key1([{Seq,{Comm,Term}} | L], Cs, Old, Type, K, LU, Ws, No, Orig)
when Type =/= set ->
eval_key1(L, [{Term,Seq,Comm} | Cs], Old, Type, K, LU, Ws, No, Orig);
eval_key1([], Cs, Old, Type=set, _Key, LU, Ws, No, Orig) ->
[] = Cs,
eval_end(Old, LU, Type, Ws, No, Orig);
eval_key1([], Cs, Old, Type, _Key, LU, Ws, No, Orig) ->
{ok, New, NewNo} = eval_comms(Cs, Old, Type, No),
eval_end(New, LU, Type, Ws, NewNo, Orig).
eval_comms([], L, _Type=set, No) ->
{ok, L, No};
eval_comms(Cs, Old, Type, No) ->
Commands = dets_utils:msort(Cs),
case Type of
bag -> eval_bag(Commands, Old, [], No);
duplicate_bag -> eval_dupbag(Commands, Old, [], No)
end.
eval_end(New0, LU, Type, Ws, NewNo, Orig) ->
New = lists:keysort(2, New0), % sort on sequence number
NoChange = if
length(New) =/= length(Orig) -> false;
true ->
same_terms(Orig, New)
end,
if
NoChange ->
%% The key's objects have not changed.
{ok, LU};
New =:= [] ->
{LU, Ws, 0, NewNo};
true ->
{Ws1, Sz} = make_bins(New, [], 0),
if
Type =:= set ->
{LU, [Ws | Ws1], Sz, NewNo};
true ->
NSz = Sz + 4,
{LU, [Ws, <<NSz:32>> | Ws1], NSz, NewNo}
end
end.
same_terms([E1 | L1], [E2 | L2]) when element(1, E1) =:= element(1, E2) ->
same_terms(L1, L2);
same_terms([], []) ->
true;
same_terms(_L1, _L2) ->
false.
make_bins([{_Term,_Seq,B} | L], W, Sz) when is_binary(B) ->
make_bins(L, [W | B], Sz + byte_size(B));
make_bins([{Term,_Seq,insert} | L], W, Sz) ->
B = term_to_binary(Term),
BSize = byte_size(B) + 4,
make_bins(L, [W, [<<BSize:32>> | B]], Sz + BSize);
make_bins([], W, Sz) ->
{W, Sz}.
get_objects([{T,_S,_BT} | L]) ->
[T | get_objects(L)];
get_objects([]) ->
[].
eval_bag([{Term1,_S1,Op}=N | L]=L0, [{Term2,_,_}=O | Old]=Old0, New, No) ->
case {Op, dets_utils:cmp(Term1, Term2)} of
{delete_object, -1} ->
eval_bag(L, Old0, New, No);
{insert, -1} ->
bag_object(L, Old0, New, No, [N], Term1);
{delete_object, 0} ->
bag_object(L, Old, New, No-1, [], Term1);
{insert, 0} ->
bag_object(L, Old, New, No-1, [N], Term1);
{_, 1} ->
eval_bag(L0, Old, [O | New], No)
end;
eval_bag([{_Term1,_Seq1,delete_object} | L], []=Old, New, No) ->
eval_bag(L, Old, New, No);
eval_bag([{Term,_Seq1,insert} = N | L], []=Old, New, No) ->
bag_object(L, Old, New, No, [N], Term);
eval_bag([]=L, [O | Old], New, No) ->
eval_bag(L, Old, [O | New], No);
eval_bag([], [], New, No) ->
{ok, New, No}.
bag_object([{Term,_,insert} = N | L], Old, New, No, _N, Term) ->
bag_object(L, Old, New, No, [N], Term);
bag_object([{Term,_,delete_object} | L], Old, New, No, _N, Term) ->
bag_object(L, Old, New, No, [], Term);
bag_object(L, Old, New, No, [], _Term) ->
eval_bag(L, Old, New, No);
bag_object(L, Old, New, No, [N], _Term) ->
eval_bag(L, Old, [N | New], No+1).
eval_dupbag([{Term1,_S1,Op}=N | L]=L0, [{Term2,_,_}=O | Old]=Old0, New, No) ->
case {Op, dets_utils:cmp(Term1, Term2)} of
{delete_object, -1} ->
eval_dupbag(L, Old0, New, No);
{insert, -1} ->
dup_object(L, Old0, New, No+1, Term1, [N]);
{_, 0} ->
old_dup_object(L0, Old, New, No, Term1, [O]);
{_, 1} ->
eval_dupbag(L0, Old, [O | New], No)
end;
eval_dupbag([{_Term1,_Seq1,delete_object} | L], []=Old, New, No) ->
eval_dupbag(L, Old, New, No);
eval_dupbag([{Term,_Seq1,insert} = N | L], []=Old, New, No) ->
dup_object(L, Old, New, No+1, Term, [N]);
eval_dupbag([]=L, [O | Old], New, No) ->
eval_dupbag(L, Old, [O | New], No);
eval_dupbag([], [], New, No) ->
{ok, New, No}.
old_dup_object(L, [{Term,_,_} = Obj | Old], New, No, Term, N) ->
old_dup_object(L, Old, New, No, Term, [Obj | N]);
old_dup_object(L, Old, New, No, Term, N) ->
dup_object(L, Old, New, No, Term, N).
dup_object([{Term,_,insert} = Obj | L], Old, New, No, Term, Q) ->
dup_object(L, Old, New, No+1, Term, [Obj | Q]);
dup_object([{Term,_Seq,delete_object} | L], Old, New, No, Term, Q) ->
%% All objects are deleted.
NewNo = No - length(Q),
dup_object(L, Old, New, NewNo, Term, []);
dup_object(L, Old, New, No, _Term, Q) ->
eval_dupbag(L, Old, Q ++ New, No).
%% Update no_keys on the file too, if the number of segments that
%% dets:fsck/6 uses for estimate has changed.
update_no_keys(Head, Ws, 0, 0) -> {Head, Ws};
update_no_keys(Head, Ws, DeltaObjects, DeltaKeys) ->
NoKeys = Head#head.no_keys,
NewNoKeys = NoKeys + DeltaKeys,
NewNoObject = Head#head.no_objects + DeltaObjects,
NewHead = Head#head{no_objects = NewNoObject, no_keys = NewNoKeys},
NWs =
if
NewNoKeys > NewHead#head.max_no_slots ->
Ws;
NoKeys div ?SEGSZP =:= NewNoKeys div ?SEGSZP ->
Ws;
true ->
[{0, file_header(NewHead, 0, ?NOT_PROPERLY_CLOSED)} | Ws]
end,
{NewHead, NWs}.
slot_position(S) ->
SegNo = ?SLOT2SEG(S), % S div ?SEGSZP
PartPos = ?SEGARRADDR(?SEG2SEGARRPART(SegNo)), % SegNo div ?SEGPARTSZ
Part = get_arrpart(PartPos),
Pos = ?SEGPARTADDR(Part, SegNo),
get_segp(Pos) + (?SEGOBJSZ * ?REM2(S, ?SEGSZP)).
check_pread2_arg([{_Pos,Sz}], Head) when Sz > ?MAXCOLL ->
case check_pread_arg(Sz, Head) of
true ->
ok;
false ->
Bad = dets_utils:bad_object(check_pread2_arg, Sz),
throw(dets_utils:corrupt_reason(Head, Bad))
end;
check_pread2_arg(_ToRead, _Head) ->
ok.
check_pread_arg(Sz, Head) when Sz > ?MAXCOLL ->
maxobjsize(Head) >= Sz;
check_pread_arg(_Sz, _Head) ->
true.
%% Inlined.
segp_cache(Pos, Segment) ->
put(Pos, Segment).
%% Inlined.
get_segp(Pos) ->
get(Pos).
arrpart_cache(Pos, ArrPart) ->
put(Pos, ArrPart).
%% Inlined.
get_arrpart(Pos) ->
get(Pos).
sz2pos(N) ->
1 + dets_utils:log2(N).
%% Inlined. Compensates for the bug in dets_utils:sz2pos/1.
adjsz(N) ->
N-1.
%% Inlined.
maxobjsize(Head) when Head#head.maxobjsize =:= undefined ->
?POW(32);
maxobjsize(Head) ->
?POW(Head#head.maxobjsize).
scan_objs(Head, Bin, From, To, L, Ts, R, Type) ->
case catch scan_skip(Bin, From, To, L, Ts, R, Type, 0) of
{'EXIT', _Reason} ->
bad_object;
Reply = {more, _From1, _To, _L, _Ts, _R, Size} when Size > ?MAXCOLL ->
case check_pread_arg(Size, Head) of
true -> Reply;
false -> bad_object
end;
Reply ->
Reply
end.
scan_skip(Bin, From, To, L, Ts, R, Type, Skip) ->
From1 = From + Skip,
case Bin of
_ when From1 >= To ->
if
From1 > To; L =:= <<>> ->
{more, From1, To, L, Ts, R, 0};
true ->
<<From2:32, To1:32, L1/binary>> = L,
Skip1 = From2 - From,
scan_skip(Bin, From, To1, L1, Ts, R, Type, Skip1)
end;
<<_:Skip/binary, _Size:32, St:32, _Sz:32, KO/binary>>
when St =/= ?ACTIVE, St =/= ?FREE ->
%% Neither ?ACTIVE nor ?FREE is a multiple of ?BUMP and
%% thus cannot be found in segments or segment array
%% parts.
scan_skip(KO, From1+12, To, L, Ts, R, Type, ?ACTUAL_SEG_SIZE-12);
<<_:Skip/binary, Size:32, _St:32, Sz:32, KO/binary>>
when Size-12 =< byte_size(KO) ->
%% St = ?FREE means that the object was deleted after
%% scanning started
bin2bins(KO, From1+12, To, L, Ts, R, Type, Size, Sz);
<<_:Skip/binary, Size:32, _St:32, _Sz:32, _KO/binary>> ->
{more, From1, To, L, Ts, R, Size};
_ when Skip >= 0 ->
{more, From1, To, L, Ts, R, 0}
end.
%% Appends objects in reversed order. All objects of the slot are
%% extracted. Note that binary_to_term/1 ignores garbage at the end.
bin2bins(Bin, From, To, L, Ts, R, Type=set, Size, ObjSz0) ->
ObjsSz1 = Size - ObjSz0,
if
ObjsSz1 =:= ?OHDSZ ->
slot_end(Bin, From, To, L, [Bin | Ts], R, Type, Size, 1);
true ->
ObjSz = ObjSz0-4,
<<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
bins_set(T, From, To, L, [Bin | Ts], R, Type, Size, 2,
NObjSz, ObjsSz1-NObjSz, Bin)
end;
bin2bins(<<ObjSz:32, Bin/binary>> = KO, From, To, L, Ts, R, Type, Size, Sz) ->
bins_bag(Bin, From, To, L, Ts, R, Type, Size, 1,
Sz-ObjSz-4, ObjSz-4, Size-Sz, KO).
bins_set(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _ObjSz0, ?OHDSZ, KO) ->
slot_end(KO, From, To, L, [Bin | Ts], R, Type, Size, NoObjs);
bins_set(Bin, From, To, L, Ts, R, Type, Size, NoObjs, ObjSz0, ObjsSz, KO) ->
ObjSz = ObjSz0 - 4,
<<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
bins_set(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
NObjSz, ObjsSz-NObjSz, KO).
bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, Sz, ObjSz, ObjsSz, KO)
when Sz > 0 ->
<<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
bins_bag(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
Sz-NObjSz, NObjSz-4, ObjsSz, KO);
bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _Z, _ObjSz, ?OHDSZ, KO) ->
slot_end(KO, From, To, L, [Bin | Ts], R, Type, Size, NoObjs);
bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _Z, ObjSz, ObjsSz, KO) ->
<<_:ObjSz/binary, Sz:32, NObjSz:32, T/binary>> = Bin,
bins_bag(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
Sz-NObjSz-4, NObjSz-4, ObjsSz-Sz, KO).
slot_end(KO, From, To, L, Ts, R, Type, Size, NoObjs) ->
Skip = ?POW(dets_utils:log2(Size)) - 12, % expensive...
if
R >= 0 ->
scan_skip(KO, From, To, L, Ts, R+Size, Type, Skip);
true ->
%% Should check this at the end of every key.
case R + NoObjs of
R1 when R1 >= -1 ->
From1 = From + Skip,
Bin1 = case KO of
<<_:Skip/binary, B/binary>> -> B;
_ -> <<>>
end,
{stop, Bin1, From1, To, L, Ts};
R1 ->
scan_skip(KO, From, To, L, Ts, R1, Type, Skip)
end
end.
%%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%%
file_info(FH) ->
#fileheader{closed_properly = CP, keypos = Kp,
m = M, next = Next, n = N, version = Version,
type = Type, no_objects = NoObjects, no_keys = NoKeys}
= FH,
if
CP =:= 0 ->
{error, not_closed};
FH#fileheader.cookie =/= ?MAGIC ->
{error, not_a_dets_file};
FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
{error, bad_version};
true ->
{ok, [{closed_properly,CP},{keypos,Kp},{m, M},{n,N},
{next,Next},{no_objects,NoObjects},{no_keys,NoKeys},
{type,Type},{version,Version}]}
end.
v_segments(#head{}=H) ->
v_parts(H, 0, 0).
v_parts(_H, ?SEGARRSZ, _SegNo) ->
done;
v_parts(H, PartNo, SegNo) ->
Fd = H#head.fptr,
PartPos = dets_utils:read_4(Fd, ?SEGARRADDR(PartNo)),
if
PartPos =:= 0 ->
done;
true ->
PartBin = dets_utils:pread_n(Fd, PartPos, ?SEGPARTSZ*4),
v_segments(H, PartBin, PartNo+1, SegNo)
end.
v_segments(H, <<>>, PartNo, SegNo) ->
v_parts(H, PartNo, SegNo);
v_segments(_H, <<0:32,_/binary>>, _PartNo, _SegNo) ->
done;
v_segments(H, <<Seg:32,T/binary>>, PartNo, SegNo) ->
io:format("<~w>SEGMENT ~w~n", [Seg, SegNo]),
v_segment(H, SegNo, Seg, 0),
v_segments(H, T, PartNo, SegNo+1).
v_segment(_H, _, _SegPos, ?SEGSZP) ->
done;
v_segment(H, SegNo, SegPos, SegSlot) ->
Slot = SegSlot + (SegNo * ?SEGSZP),
BucketP = SegPos + (4 * ?SZOBJP * SegSlot),
case catch read_bucket(H, BucketP, H#head.type) of
{'EXIT', Reason} ->
dets_utils:vformat("** dets: Corrupt or truncated dets file~n",
[]),
io:format("~nERROR ~tp~n", [Reason]);
[] -> %% don't print empty buckets
true;
{Size, CollP, Objects} ->
io:format(" <~w>~w: <~w:~p>~w~n",
[BucketP, Slot, CollP, Size, Objects])
end,
v_segment(H, SegNo, SegPos, SegSlot+1).
%% -> [] | {Pointer, [object()]} | throw(EXIT)
read_bucket(Head, Position, Type) ->
MaxSize = maxobjsize(Head),
case dets_utils:ipread(Head, Position, MaxSize) of
{ok, {Size, Pointer, <<Size:32, _Status:32, KeysObjs/binary>>}} ->
Objs = bin2objs(KeysObjs, Type, []),
{Size, Pointer, lists:reverse(Objs)};
[] ->
[]
end.
-define(SEQSTART, -(1 bsl 26)).
%% -> [{Key,SizeOfWholeKey,WholeKeyBin,FirstObject,OtherObjects}] |throw(EXIT)
%% FirstObject = {Term, Seq, Binary}
%% Seq < 0 (and ascending).
per_key(Head, <<BinSize:32, ?ACTIVE:32, Bin/binary>> = B) ->
true = (byte_size(B) =:= BinSize),
if
Head#head.type =:= set ->
per_set_key(Bin, Head#head.keypos, []);
true ->
per_bag_key(Bin, Head#head.keypos, [])
end.
per_set_key(<<Size:32, T/binary>> = B, KeyPos, L) ->
<<KeyBin:Size/binary, R/binary>> = B,
Term = binary_to_term(T),
Key = element(KeyPos, Term),
Item = {Term, ?SEQSTART, KeyBin},
per_set_key(R, KeyPos, [{Key,Size,KeyBin,Item,[]} | L]);
per_set_key(<<>>, KeyPos, L) when is_integer(KeyPos) ->
lists:reverse(L).
per_bag_key(<<Size:32, ObjSz:32, T/binary>> = B, KeyPos, L) ->
<<KeyBin:Size/binary, R/binary>> = B,
ObjSz1 = ObjSz - 4,
Size1 = Size - ObjSz - 4,
<<_:ObjSz1/binary, KeyObjs:Size1/binary, _/binary>> = T,
<<_Size:32, Bin:ObjSz/binary, _/binary>> = B,
Term = binary_to_term(T),
Key = element(KeyPos, Term),
Item = {Term, ?SEQSTART, Bin},
per_bag_key(R, KeyPos, [{Key,Size,KeyBin,Item,KeyObjs} | L]);
per_bag_key(<<>>, KeyPos, L) when is_integer(KeyPos) ->
lists:reverse(L).
binobjs2terms(<<ObjSz:32, T/binary>> = B) ->
binobjs2terms(B, T, ObjSz, byte_size(B)-ObjSz, ?SEQSTART+1, []);
binobjs2terms([] = B) ->
B;
binobjs2terms(<<>>) ->
[].
binobjs2terms(Bin, Obj, _ObjSz, _Size=0, N, L) ->
lists:reverse(L, [{binary_to_term(Obj), N, Bin}]);
binobjs2terms(Bin, Bin1, ObjSz, Size, N, L) ->
<<B:ObjSz/binary, T/binary>> = Bin,
<<NObjSz:32, T1/binary>> = T,
Item = {binary_to_term(Bin1), N, B},
binobjs2terms(T, T1, NObjSz, Size-NObjSz, N+1, [Item | L]).
%% Appends objects in reversed order.
bin2objs(KeysObjs, set, Ts) ->
<<ObjSz:32, T/binary>> = KeysObjs,
bin2objs(T, ObjSz-4, byte_size(KeysObjs)-ObjSz, Ts);
bin2objs(KeysObjs, _Type, Ts) ->
bin2objs2(KeysObjs, Ts).
bin2objs2(<<Size:32, ObjSz:32, T/binary>>, Ts) ->
bin2objs(T, ObjSz-4, Size-ObjSz-4, Ts);
bin2objs2(<<>>, Ts) ->
Ts.
bin2objs(Bin, ObjSz, _Size=0, Ts) ->
<<_:ObjSz/binary, T/binary>> = Bin,
bin2objs2(T, [binary_to_term(Bin) | Ts]);
bin2objs(Bin, ObjSz, Size, Ts) ->
<<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
bin2objs(T, NObjSz-4, Size-NObjSz, [binary_to_term(Bin) | Ts]).
bin2keybins(KeysObjs, Head) when Head#head.type =:= set ->
<<ObjSz:32, T/binary>> = KeysObjs,
bin2keybins(T, Head#head.keypos, ObjSz-4, byte_size(KeysObjs)-ObjSz,[]);
bin2keybins(KeysObjs, Head) ->
bin2keybins2(KeysObjs, Head#head.keypos, []).
bin2keybins2(<<Size:32, ObjSz:32, T/binary>>, Kp, L) ->
bin2keybins(T, Kp, ObjSz-4, Size-ObjSz-4, L);
bin2keybins2(<<>>, Kp, L) when is_integer(Kp) ->
lists:reverse(L).
bin2keybins(Bin, Kp, ObjSz, _Size=0, L) ->
<<Obj:ObjSz/binary, T/binary>> = Bin,
Term = binary_to_term(Obj),
bin2keybins2(T, Kp, [{element(Kp, Term),Obj} | L]);
bin2keybins(Bin, Kp, ObjSz, Size, L) ->
<<Obj:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
Term = binary_to_term(Obj),
bin2keybins(T, Kp, NObjSz-4, Size-NObjSz, [{element(Kp,Term),Obj} | L]).