aboutsummaryrefslogtreecommitdiffstats
path: root/lib/stdlib/src/dets_v9.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/stdlib/src/dets_v9.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/stdlib/src/dets_v9.erl')
-rw-r--r--lib/stdlib/src/dets_v9.erl2761
1 files changed, 2761 insertions, 0 deletions
diff --git a/lib/stdlib/src/dets_v9.erl b/lib/stdlib/src/dets_v9.erl
new file mode 100644
index 0000000000..53238e962f
--- /dev/null
+++ b/lib/stdlib/src/dets_v9.erl
@@ -0,0 +1,2761 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2001-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(dets_v9).
+
+%% Dets files, implementation part. This module handles version 9.
+%% To be called from dets.erl only.
+
+-export([constants/0, mark_dirty/1, read_file_header/2,
+ check_file_header/2, do_perform_save/1, initiate_file/11,
+ prep_table_copy/9, init_freelist/2, fsck_input/4,
+ bulk_input/3, output_objs/4, bchunk_init/2,
+ try_bchunk_header/2, compact_init/3, read_bchunks/2,
+ write_cache/1, may_grow/3, find_object/2, slot_objs/2,
+ scan_objs/8, db_hash/2, no_slots/1, table_parameters/1]).
+
+-export([file_info/1, v_segments/1]).
+
+-export([cache_segps/3]).
+
+-compile({inline, [{max_objsize,1},{maxobjsize,1}]}).
+-compile({inline, [{write_segment_file,6}]}).
+-compile({inline, [{sz2pos,1},{adjsz,1}]}).
+-compile({inline, [{skip_bytes,6},{make_object,4}]}).
+-compile({inline, [{segp_cache,2},{get_segp,1},{get_arrpart,1}]}).
+-compile({inline, [{h,2}]}).
+
+-include("dets.hrl").
+
+%% The layout of the file is :
+%%
+%% bytes decsription
+%% ---------------------- File header
+%% 4 FreelistsPointer
+%% 4 Cookie
+%% 4 ClosedProperly (pos=8)
+%% 4 Type (pos=12)
+%% 4 Version (pos=16)
+%% 4 M
+%% 4 Next
+%% 4 KeyPos
+%% 4 NoObjects
+%% 4 NoKeys
+%% 4 MinNoSlots
+%% 4 MaxNoSlots
+%% 4 HashMethod
+%% 4 N
+%% ---
+%% 256 Version 9(a): Reserved for future versions. Initially zeros.
+%% Version 9(b) has instead:
+%% 112 28 counters for the buddy system sizes 2^4 to 2^31.
+%% 144 Reserved for future versions. Initially zeros.
+%% Version 9(c) has instead:
+%% 112 28 counters for the buddy system sizes (as for 9(b)).
+%% 16 MD5-sum for the 44 plus 112 bytes before the MD5-sum.
+%% (FreelistsPointer, Cookie and ClosedProperly are not digested.)
+%% 128 Reserved for future versions. Initially zeros.
+%% ---
+%% ------------------ end of file header
+%% 4*256 SegmentArray Pointers.
+%% ------------------ This is BASE.
+%% 4*512 SegmentArray Part 1
+%% ... More SegmentArray Parts
+%% 8*256 First segment
+%% ??? Objects (free and alive)
+%% 4*512 Further SegmentArray Part.
+%% ??? Objects (free and alive)
+%% 8*256 Further segment.
+%% ??? Objects (free and alive)
+%% ... more objects, segment array parts, and segments ...
+%% -----------------------------
+%% ??? Free lists
+%% -----------------------------
+%% 4 File size, in bytes.
+
+%% Before we can find an object we must find the slot where the
+%% object resides. Each slot is a (possibly empty) list (or chain) of
+%% objects that hash to the same slot. If the value stored in the
+%% slot is zero, the slot chain is empty. If the slot value is
+%% non-zero, the value points to a position in the file where the
+%% collection of objects resides. Each collection has the following
+%% layout:
+%%
+%% bytes decsription
+%% --------------------
+%% 4 Size of the area allocated for the collection (8+Sz)
+%% 4 Status (FREE or ACTIVE). These two are the Object Header.
+%% Sz A binary containing the objects per key, sorted on key.
+%%
+%% When repairing or converting a file, the status field is used.
+%%
+%% The binary containing the objects per key of a table of type 'set'
+%% has the following layout:
+%%
+%% bytes decsription
+%% --------------------
+%% 4 Size of the object of the first key (4+OSz1)
+%% OSz1 The object of the first key
+%% ...
+%% 4 Size of the object of the ith key (4+OSzi)
+%% OSzi The object of the ith key
+%%
+%% The binary containing the objects per key of a table of type 'bag'
+%% or 'duplicate_bag' has the following layout:
+%%
+%% bytes decsription
+%% ----------------------
+%% 4 Size of the objects of the first key (4 + OSz1_1+...+OSz1_j+...)
+%% 4 Size of the first object of the first key (4+OSz1_1)
+%% OSz1_1 The first object of the first key
+%% ...
+%% 4 Size of the jth object of the first key (4+OSz1_j)
+%% OSz1_j The jth object of the first key
+%% ...
+%% 4 Size of the objects of the ith key (4 + OSzi_1+...+OSzi_k+...)
+%% 4 Size of the first object of the ith key (4+OSzi_1)
+%% OSzi_1 The first object of the ith key
+%% ...
+%% 4 Size of the kth object of the ith key (4+OSzi_k)
+%% OSzi_k The kth object of the ith key
+%% ...
+%%
+%% The objects of a key are placed in time order, that is, the older
+%% objects come first. If a new object is inserted, it is inserted
+%% last.
+%%
+%%
+%%
+%%|---------------|
+%%| head |
+%%| |
+%%| |
+%%|_______________|
+%%| |--|
+%%|___part ptr 1__| |
+%%| | | segarr part 1
+%%|___part ptr 2__| V______________|
+%%| | | p1 |
+%%| | |______________|--|
+%%| .... | | p2 | |
+%% (256) |______________| |
+%% | | |
+%% | .... | | segment 1
+%% | (512) | V __slot 0 ____|
+%% | size |
+%% | pointer |--|
+%% |___slot 1 ____| |
+%% | | |
+%% | .... | | objects in slot 0
+%% (256) V segment 1
+%% |___________|
+%% | size |
+%% |___________|
+%% | status |
+%% |___________|
+%% | |
+%% | object |
+%% | collec. |
+%% |___________|
+
+%%%
+%%% File header
+%%%
+
+-define(RESERVED, 128). % Reserved for future use.
+
+-define(COLL_CNTRS, (28*4)). % Counters for the buddy system.
+-define(MD5SZ, 16).
+
+-define(HEADSZ,
+ 56+?COLL_CNTRS+?MD5SZ). % The size of the file header, in bytes,
+ % not including the reserved part.
+-define(HEADEND, (?HEADSZ+?RESERVED)).
+ % End of header and reserved area.
+-define(SEGSZ, 512). % Size of a segment, in words. SZOBJP*SEGSZP.
+-define(SEGSZP, 256). % Size of a segment, in number of pointers.
+-define(SEGSZP_LOG2, 8).
+-define(SEGOBJSZ, (4 * ?SZOBJP)).
+-define(SEGPARTSZ, 512). % Size of segment array part, in words.
+-define(SEGPARTSZ_LOG2, 9).
+-define(SEGARRSZ, 256). % Maximal number of segment array parts..
+-define(SEGARRADDR(PartN), (?HEADEND + (4 * (PartN)))).
+-define(SEGPARTADDR(P,SegN), ((P) + (4 * ?REM2(SegN, ?SEGPARTSZ)))).
+-define(BASE, ?SEGARRADDR(?SEGARRSZ)).
+-define(MAXSLOTS, (?SEGARRSZ * ?SEGPARTSZ * ?SEGSZP)).
+
+-define(SLOT2SEG(S), ((S) bsr ?SEGSZP_LOG2)).
+-define(SEG2SEGARRPART(S), ((S) bsr ?SEGPARTSZ_LOG2)).
+
+-define(PHASH, 0).
+-define(PHASH2, 1).
+
+%% BIG is used for hashing. BIG must be greater than the maximum
+%% number of slots, currently 32 M (MAXSLOTS).
+-define(BIG, 16#3ffffff). % 64 M
+
+%% Hard coded positions into the file header:
+-define(FREELIST_POS, 0).
+-define(CLOSED_PROPERLY_POS, 8).
+-define(D_POS, 20).
+
+%%% Dets file versions up to 8 are handled in dets_v8. This module
+%%% handles version 9, introduced in R8.
+%%%
+%%% Version 9(a) tables have 256 reserved bytes in the file header,
+%%% all initialized to zero.
+%%% Version 9(b) tables use the first 112 of these bytes for storing
+%%% number of objects for each size of the buddy system. An empty 9(b)
+%%% table cannot be distinguished from an empty 9(a) table.
+%%% 9(c) has an MD5-sum for the file header.
+
+-define(FILE_FORMAT_VERSION, 9).
+
+-define(NOT_PROPERLY_CLOSED,0).
+-define(CLOSED_PROPERLY,1).
+
+%% Size of object pointer, in words. SEGSZ = SZOBJP * SEGSZP.
+-define(SZOBJP, 2).
+
+-define(OHDSZ, 8). % The size of the object header, in bytes.
+-define(STATUS_POS, 4). % Position of the status field.
+
+-define(OHDSZ_v8, 12). % The size of the version 8 object header.
+
+%% The size of each object is a multiple of 16.
+%% BUMP is used when repairing files.
+-define(BUMP, 16).
+
+%%% '$hash' is the value of HASH_PARMS in R8, '$hash2' is the value in R9.
+%%%
+%%% The fields of the ?HASH_PARMS records are the same, but having
+%%% different tags makes bchunk_init on R8 nodes reject data from R9
+%%% nodes, and vice versa. This is overkill, and due to an oversight.
+%%% What should have been done in R8 was to check the hash method, not
+%%% only the type of the table and the key position. R8 nodes cannot
+%%% handle the phash2 method.
+-define(HASH_PARMS, '$hash2').
+
+-define(BCHUNK_FORMAT_VERSION, 1).
+
+-record(?HASH_PARMS, {
+ file_format_version,
+ bchunk_format_version,
+ file, type, keypos, hash_method,
+ n,m,next,
+ min,max,
+ no_objects,no_keys,
+ no_colls % [{LogSz,NoColls}], NoColls >= 0
+ }).
+
+-define(ACTUAL_SEG_SIZE, (?SEGSZ*4)).
+
+-define(MAXBUD, 32).
+
+%%-define(DEBUGF(X,Y), io:format(X, Y)).
+-define(DEBUGF(X,Y), void).
+
+%% {Bump}
+constants() ->
+ {?BUMP, ?BASE}.
+
+%% -> ok | throw({NewHead,Error})
+mark_dirty(Head) ->
+ Dirty = [{?CLOSED_PROPERLY_POS, <<?NOT_PROPERLY_CLOSED:32>>}],
+ dets_utils:pwrite(Head, Dirty),
+ dets_utils:sync(Head),
+ dets_utils:position(Head, Head#head.freelists_p),
+ dets_utils:truncate(Head, cur).
+
+%% -> {ok, head()} | throw(Error) | throw(badarg)
+prep_table_copy(Fd, Tab, Fname, Type, Kp, Ram, CacheSz, Auto, Parms) ->
+ case Parms of
+ #?HASH_PARMS{file_format_version = ?FILE_FORMAT_VERSION,
+ bchunk_format_version = ?BCHUNK_FORMAT_VERSION,
+ n = N, m = M, next = Next,
+ min = Min, max = Max,
+ hash_method = HashMethodCode,
+ no_objects = NoObjects, no_keys = NoKeys,
+ no_colls = _NoColls}
+ when is_integer(N), is_integer(M), is_integer(Next),
+ is_integer(Min), is_integer(Max),
+ is_integer(NoObjects), is_integer(NoKeys),
+ NoObjects >= NoKeys ->
+ HashMethod = code_to_hash_method(HashMethodCode),
+ case hash_invars(N, M, Next, Min, Max) of
+ false ->
+ throw(badarg);
+ true ->
+ init_file(Fd, Tab, Fname, Type, Kp, Min, Max, Ram,
+ CacheSz, Auto, false, M, N, Next, HashMethod,
+ NoObjects, NoKeys)
+ end;
+ _ ->
+ throw(badarg)
+ end.
+
+%% -> {ok, head()} | throw(Error)
+%% The File header and the SegmentArray Pointers are written here.
+%% SegmentArray Parts are also written, but the segments are are not
+%% initialized on file unless DoInitSegments is 'true'. (When
+%% initializing a file by calling init_table, some time is saved by
+%% not writing the segments twice.)
+initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots0, MaxSlots0,
+ Ram, CacheSz, Auto, DoInitSegments) ->
+ MaxSlots1 = erlang:min(MaxSlots0, ?MAXSLOTS),
+ MinSlots1 = erlang:min(MinSlots0, MaxSlots1),
+ MinSlots = slots2(MinSlots1),
+ MaxSlots = slots2(MaxSlots1),
+ M = Next = MinSlots,
+ N = 0,
+ init_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz,
+ Auto, DoInitSegments, M, N, Next, phash2, 0, 0).
+
+init_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, Ram, CacheSz,
+ Auto, DoInitSegments, M, N, Next, HashMethod, NoObjects, NoKeys) ->
+ Ftab = dets_utils:init_alloc(?BASE),
+
+ Head0 = #head{
+ m = M,
+ m2 = M * 2,
+ next = Next,
+ fptr = Fd,
+ no_objects = NoObjects,
+ no_keys = NoKeys,
+ maxobjsize = 0,
+ n = N,
+ type = Type,
+ update_mode = dirty,
+ freelists = Ftab,
+ no_collections = orddict:new(),
+ auto_save = Auto,
+ hash_bif = HashMethod,
+ has_md5 = true,
+ keypos = Kp,
+ min_no_slots = MinSlots,
+ max_no_slots = MaxSlots,
+
+ ram_file = Ram,
+ filename = Fname,
+ name = Tab,
+ cache = dets_utils:new_cache(CacheSz),
+ version = ?FILE_FORMAT_VERSION,
+ bump = ?BUMP,
+ base = ?BASE,
+ mod = ?MODULE
+ },
+
+ FreeListsPointer = 0,
+ NoColls = <<0:?COLL_CNTRS/unit:8>>, %% Buddy system counters.
+ FileHeader = file_header(Head0, FreeListsPointer,
+ ?NOT_PROPERLY_CLOSED, NoColls),
+ W0 = {0, [FileHeader |
+ <<0:(4*?SEGARRSZ)/unit:8>>]}, %% SegmentArray Pointers
+
+ %% Remove cached pointers to segment array parts and segments:
+ lists:foreach(fun({I1,I2}) when is_integer(I1), is_integer(I2) -> ok;
+ ({K,V}) -> put(K, V)
+ end, erase()),
+
+ %% Initialize array parts.
+ %% All parts before segments, for the sake of repair and initialization.
+ Zero = seg_zero(),
+ {Head1, Ws1} = init_parts(Head0, 0, no_parts(Next), Zero, []),
+ NoSegs = no_segs(Next),
+
+ {Head, WsI, WsP} = init_segments(Head1, 0, NoSegs, Zero, [], []),
+ Ws2 = if
+ DoInitSegments -> WsP ++ WsI;
+ true -> WsP
+ end,
+ dets_utils:pwrite(Fd, Fname, [W0 | lists:append(Ws1) ++ Ws2]),
+ true = hash_invars(Head),
+ {ok, Head}.
+
+%% Returns a power of two not less than 256.
+slots2(NoSlots) when NoSlots >= 256 ->
+ ?POW(dets_utils:log2(NoSlots)).
+
+init_parts(Head, PartNo, NoParts, Zero, Ws) when PartNo < NoParts ->
+ PartPos = ?SEGARRADDR(PartNo),
+ {NewHead, W, _Part} = alloc_part(Head, Zero, PartPos),
+ init_parts(NewHead, PartNo+1, NoParts, Zero, [W | Ws]);
+init_parts(Head, _PartNo, _NoParts, _Zero, Ws) ->
+ {Head, Ws}.
+
+%% -> {Head, SegInitList, OtherList};
+%% SegPtrList = SegInitList = pwrite_list().
+init_segments(Head, SegNo, NoSegs, SegZero, WsP, WsI) when SegNo < NoSegs ->
+ {NewHead, WI, Ws} = allocate_segment(Head, SegZero, SegNo),
+ init_segments(NewHead, SegNo+1, NoSegs, SegZero, Ws ++ WsP, [WI | WsI]);
+init_segments(Head, _SegNo, _NoSegs, _SegZero, WsP, WsI) ->
+ {Head, WsI, WsP}.
+
+%% -> {NewHead, SegInit, [SegPtr | PartStuff]}
+allocate_segment(Head, SegZero, SegNo) ->
+ PartPos = ?SEGARRADDR(SegNo div ?SEGPARTSZ),
+ case get_arrpart(PartPos) of
+ undefined ->
+ %% may throw error:
+ {Head1, [InitArrPart, ArrPartPointer], Part} =
+ alloc_part(Head, SegZero, PartPos),
+ {NewHead, InitSegment, [SegPointer]} =
+ alloc_seg(Head1, SegZero, SegNo, Part),
+ {NewHead, InitSegment, [InitArrPart, SegPointer, ArrPartPointer]};
+ Part ->
+ alloc_seg(Head, SegZero, SegNo, Part)
+ end.
+
+alloc_part(Head, PartZero, PartPos) ->
+ %% may throw error:
+ {NewHead, Part, _} = dets_utils:alloc(Head, adjsz(4 * ?SEGPARTSZ)),
+ arrpart_cache(PartPos, Part),
+ InitArrPart = {Part, PartZero}, % same size as segment
+ ArrPartPointer = {PartPos, <<Part:32>>},
+ {NewHead, [InitArrPart, ArrPartPointer], Part}.
+
+alloc_seg(Head, SegZero, SegNo, Part) ->
+ %% may throw error:
+ {NewHead, Segment, _} = dets_utils:alloc(Head, adjsz(4 * ?SEGSZ)),
+ InitSegment = {Segment, SegZero},
+ Pos = ?SEGPARTADDR(Part, SegNo),
+ segp_cache(Pos, Segment),
+ dets_utils:disk_map_segment(Segment, SegZero),
+ SegPointer = {Pos, <<Segment:32>>},
+ {NewHead, InitSegment, [SegPointer]}.
+
+%% Read free lists (using a Buddy System) from file.
+init_freelist(Head, true) ->
+ Pos = Head#head.freelists_p,
+ free_lists_from_file(Head, Pos).
+
+%% -> {ok, Fd, fileheader()} | throw(Error)
+read_file_header(Fd, FileName) ->
+ {ok, Bin} = dets_utils:pread_close(Fd, FileName, 0, ?HEADSZ),
+ <<FreeList:32, Cookie:32, CP:32, Type2:32,
+ Version:32, M:32, Next:32, Kp:32,
+ NoObjects:32, NoKeys:32, MinNoSlots:32, MaxNoSlots:32,
+ HashMethod:32, N:32, NoCollsB:?COLL_CNTRS/binary,
+ MD5:?MD5SZ/binary>> = Bin,
+ <<_:12/binary,MD5DigestedPart:(?HEADSZ-?MD5SZ-12)/binary,_/binary>> = Bin,
+ {ok, EOF} = dets_utils:position_close(Fd, FileName, eof),
+ {ok, <<FileSize:32>>} = dets_utils:pread_close(Fd, FileName, EOF-4, 4),
+ {CL, <<>>} = lists:foldl(fun(LSz, {Acc,<<NN:32,R/binary>>}) ->
+ if
+ NN =:= 0 -> {Acc, R};
+ true -> {[{LSz,NN} | Acc], R}
+ end
+ end, {[], NoCollsB}, lists:seq(4, ?MAXBUD-1)),
+ NoColls =
+ if
+ CL =:= [], NoObjects > 0 -> % Version 9(a)
+ undefined;
+ true ->
+ lists:reverse(CL)
+ end,
+
+ FH = #fileheader{freelist = FreeList,
+ cookie = Cookie,
+ closed_properly = CP,
+ type = dets_utils:code_to_type(Type2),
+ version = Version,
+ m = M,
+ next = Next,
+ keypos = Kp,
+ no_objects = NoObjects,
+ no_keys = NoKeys,
+ min_no_slots = MinNoSlots,
+ max_no_slots = MaxNoSlots,
+ no_colls = NoColls,
+ hash_method = HashMethod,
+ read_md5 = MD5,
+ has_md5 = <<0:?MD5SZ/unit:8>> =/= MD5,
+ md5 = erlang:md5(MD5DigestedPart),
+ trailer = FileSize,
+ eof = EOF,
+ n = N,
+ mod = ?MODULE},
+ {ok, Fd, FH}.
+
+%% -> {ok, head(), ExtraInfo} | {error, Reason} (Reason lacking file name)
+%% ExtraInfo = true
+check_file_header(FH, Fd) ->
+ HashBif = code_to_hash_method(FH#fileheader.hash_method),
+ Test =
+ if
+ FH#fileheader.cookie =/= ?MAGIC ->
+ {error, not_a_dets_file};
+ FH#fileheader.type =:= badtype ->
+ {error, invalid_type_code};
+ FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
+ {error, bad_version};
+ FH#fileheader.has_md5,
+ FH#fileheader.read_md5 =/= FH#fileheader.md5 ->
+ {error, not_a_dets_file}; % harsh but fair
+ FH#fileheader.trailer =/= FH#fileheader.eof ->
+ {error, not_closed};
+ HashBif =:= undefined ->
+ {error, bad_hash_bif};
+ FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY ->
+ {ok, true};
+ FH#fileheader.closed_properly =:= ?NOT_PROPERLY_CLOSED ->
+ {error, not_closed};
+ true ->
+ {error, not_a_dets_file}
+ end,
+ case Test of
+ {ok, ExtraInfo} ->
+ MaxObjSize = max_objsize(FH#fileheader.no_colls),
+ H = #head{
+ m = FH#fileheader.m,
+ m2 = FH#fileheader.m * 2,
+ next = FH#fileheader.next,
+ fptr = Fd,
+ no_objects = FH#fileheader.no_objects,
+ no_keys = FH#fileheader.no_keys,
+ maxobjsize = MaxObjSize,
+ n = FH#fileheader.n,
+ type = FH#fileheader.type,
+ update_mode = saved,
+ auto_save = infinity, % not saved on file
+ fixed = false, % not saved on file
+ freelists_p = FH#fileheader.freelist,
+ hash_bif = HashBif,
+ has_md5 = FH#fileheader.has_md5,
+ keypos = FH#fileheader.keypos,
+ min_no_slots = FH#fileheader.min_no_slots,
+ max_no_slots = FH#fileheader.max_no_slots,
+ no_collections = FH#fileheader.no_colls,
+ version = ?FILE_FORMAT_VERSION,
+ mod = ?MODULE,
+ bump = ?BUMP,
+ base = ?BASE},
+ {ok, H, ExtraInfo};
+ Error ->
+ Error
+ end.
+
+%% Inlined.
+max_objsize(NoColls = undefined) ->
+ NoColls;
+max_objsize(NoColls) ->
+ max_objsize(NoColls, 0).
+
+max_objsize([], Max) ->
+ Max;
+max_objsize([{_,0} | L], Max) ->
+ max_objsize(L, Max);
+max_objsize([{I,_} | L], _Max) ->
+ max_objsize(L, I).
+
+cache_segps(Fd, FileName, M) ->
+ NoParts = no_parts(M),
+ ArrStart = ?SEGARRADDR(0),
+ {ok, Bin} = dets_utils:pread_close(Fd, FileName, ArrStart, 4 * NoParts),
+ cache_arrparts(Bin, ?HEADEND, Fd, FileName).
+
+cache_arrparts(<<ArrPartPos:32, B/binary>>, Pos, Fd, FileName) ->
+ arrpart_cache(Pos, ArrPartPos),
+ {ok, ArrPartBin} = dets_utils:pread_close(Fd, FileName,
+ ArrPartPos,
+ ?SEGPARTSZ*4),
+ cache_segps1(Fd, ArrPartBin, ArrPartPos),
+ cache_arrparts(B, Pos+4, Fd, FileName);
+cache_arrparts(<<>>, _Pos, _Fd, _FileName) ->
+ ok.
+
+cache_segps1(_Fd, <<0:32,_/binary>>, _P) ->
+ ok;
+cache_segps1(Fd, <<S:32,B/binary>>, P) ->
+ dets_utils:disk_map_segment_p(Fd, S),
+ segp_cache(P, S),
+ cache_segps1(Fd, B, P+4);
+cache_segps1(_Fd, <<>>, _P) ->
+ ok.
+
+no_parts(NoSlots) ->
+ ((NoSlots - 1) div (?SEGSZP * ?SEGPARTSZ)) + 1.
+
+no_segs(NoSlots) ->
+ ((NoSlots - 1) div ?SEGSZP) + 1.
+
+%%%
+%%% Repair, conversion and initialization of a dets file.
+%%%
+
+%%% bulk_input/3. Initialization, the general case (any stream of objects).
+%%% output_objs/4. Initialization (general case) and repair.
+%%% bchunk_init/2. Initialization using bchunk.
+
+bulk_input(Head, InitFun, _Cntrs) ->
+ bulk_input(Head, InitFun, make_ref(), 0).
+
+bulk_input(Head, InitFun, Ref, Seq) ->
+ fun(close) ->
+ _ = (catch InitFun(close));
+ (read) ->
+ case catch {Ref, InitFun(read)} of
+ {Ref, end_of_input} ->
+ end_of_input;
+ {Ref, {L0, NewInitFun}} when is_list(L0),
+ is_function(NewInitFun) ->
+ Kp = Head#head.keypos,
+ case catch bulk_objects(L0, Head, Kp, Seq, []) of
+ {'EXIT', _Error} ->
+ _ = (catch NewInitFun(close)),
+ {error, invalid_objects_list};
+ {L, NSeq} ->
+ {L, bulk_input(Head, NewInitFun, Ref, NSeq)}
+ end;
+ {Ref, Value} ->
+ {error, {init_fun, Value}};
+ Error ->
+ throw({thrown, Error})
+ end
+ end.
+
+bulk_objects([T | Ts], Head, Kp, Seq, L) ->
+ BT = term_to_binary(T),
+ Key = element(Kp, T),
+ bulk_objects(Ts, Head, Kp, Seq+1, [make_object(Head, Key, Seq, BT) | L]);
+bulk_objects([], _Head, Kp, Seq, L) when is_integer(Kp), is_integer(Seq) ->
+ {L, Seq}.
+
+-define(FSCK_SEGMENT, 1).
+-define(FSCK_SEGMENT2, 10000).
+
+-define(VEMPTY, {}).
+-define(VSET(I, V, E), setelement(I, V, E)).
+-define(VGET(I, V), element(I, V)).
+-define(VEXT(S, V, T),
+ list_to_tuple(tuple_to_list(V) ++ lists:duplicate(S-tuple_size(V), T))).
+
+%% Number of bytes that will be handled before the cache is written to
+%% file. Used when compacting or writing chunks.
+-define(CACHE_SIZE, (60*?CHUNK_SIZE)).
+
+%% {LogSize,NoObjects} in Cntrs is replaced by
+%% {LogSize,Position,{FileName,FileDescriptor},NoCollections}.
+%% There is also an object {no, NoObjects, NoKeys}.
+-define(COUNTERS, no).
+-define(OBJ_COUNTER, 2).
+-define(KEY_COUNTER, 3).
+
+output_objs(OldV, Head, SlotNums, Cntrs) when OldV =< 9 ->
+ fun(close) ->
+ %% Make sure that the segments are initialized in case
+ %% init_table has been called.
+ Cache = ?VEMPTY,
+ Acc = [], % This is the only way Acc can be empty.
+ true = ets:insert(Cntrs, {?FSCK_SEGMENT,0,[],0}),
+ true = ets:insert(Cntrs, {?COUNTERS, 0, 0}),
+ Fun = output_objs2(foo, Acc, OldV, Head, Cache, Cntrs,
+ SlotNums, bar),
+ Fun(close);
+ ([]) ->
+ output_objs(OldV, Head, SlotNums, Cntrs);
+ (L) ->
+ %% Information about number of objects per size is not
+ %% relevant for version 9. It is the number of collections
+ %% that matters.
+ true = ets:delete_all_objects(Cntrs),
+ true = ets:insert(Cntrs, {?COUNTERS, 0, 0}),
+ Es = bin2term(L, OldV, Head#head.keypos),
+ %% The cache is a tuple indexed by the (log) size. An element
+ %% is [BinaryObject].
+ Cache = ?VEMPTY,
+ {NE, NAcc, NCache} = output_slots(Es, Head, Cache, Cntrs, 0, 0),
+ output_objs2(NE, NAcc, OldV, Head, NCache, Cntrs, SlotNums, 1)
+ end.
+
+output_objs2(E, Acc, OldV, Head, Cache, SizeT, SlotNums, 0) ->
+ NCache = write_all_sizes(Cache, SizeT, Head, more),
+ %% Number of handled file_sorter chunks before writing:
+ Max = erlang:max(1, erlang:min(tuple_size(NCache), 10)),
+ output_objs2(E, Acc, OldV, Head, NCache, SizeT, SlotNums, Max);
+output_objs2(E, Acc, OldV, Head, Cache, SizeT, SlotNums, ChunkI) ->
+ fun(close) ->
+ {_, [], Cache1} =
+ if
+ Acc =:= [] -> {foo, [], Cache};
+ true -> output_slot(Acc, Head, Cache, [], SizeT, 0, 0)
+ end,
+ _NCache = write_all_sizes(Cache1, SizeT, Head, no_more),
+ SegSz = ?ACTUAL_SEG_SIZE,
+ {_, SegEnd, _} = dets_utils:alloc(Head, adjsz(SegSz)),
+ [{?COUNTERS,NoObjects,NoKeys}] = ets:lookup(SizeT, ?COUNTERS),
+ Head1 = Head#head{no_objects = NoObjects, no_keys = NoKeys},
+ true = ets:delete(SizeT, ?COUNTERS),
+ {NewHead, NL, _MaxSz, _End} = allocate_all_objects(Head1, SizeT),
+ %% It is not known until all objects have been collected
+ %% how many object collections there are per size. Now
+ %% that is known and the absolute positions of the object
+ %% collections can be calculated.
+ segment_file(SizeT, NewHead, NL, SegEnd),
+ {MinSlots, EstNoSlots, MaxSlots} = SlotNums,
+ if
+ EstNoSlots =:= bulk_init ->
+ {ok, 0, NewHead};
+ true ->
+ EstNoSegs = no_segs(EstNoSlots),
+ MinNoSegs = no_segs(MinSlots),
+ MaxNoSegs = no_segs(MaxSlots),
+ NoSegs = no_segs(NoKeys),
+ Diff = abs(NoSegs - EstNoSegs),
+ if
+ Diff > 5, NoSegs =< MaxNoSegs, NoSegs >= MinNoSegs ->
+ {try_again, NoKeys};
+ true ->
+ {ok, 0, NewHead}
+ end
+ end;
+ (L) ->
+ Es = bin2term(L, OldV, Head#head.keypos),
+ {NE, NAcc, NCache} =
+ output_slots(E, Es, Acc, Head, Cache, SizeT, 0, 0),
+ output_objs2(NE, NAcc, OldV, Head, NCache, SizeT, SlotNums,
+ ChunkI-1)
+ end.
+
+%%% Compaction.
+
+compact_init(ReadHead, WriteHead, TableParameters) ->
+ SizeT = ets:new(dets_compact, []),
+ #head{no_keys = NoKeys, no_objects = NoObjects} = ReadHead,
+
+ NoObjsPerSize = TableParameters#?HASH_PARMS.no_colls,
+ {NewWriteHead, Bases, SegAddr, SegEnd} =
+ prepare_file_init(NoObjects, NoKeys, NoObjsPerSize, SizeT, WriteHead),
+
+ Input = compact_input(ReadHead, NewWriteHead, SizeT, tuple_size(Bases)),
+ Output = fast_output(NewWriteHead, SizeT, Bases, SegAddr, SegEnd),
+ TmpDir = filename:dirname(NewWriteHead#head.filename),
+ Reply = (catch file_sorter:sort(Input, Output,
+ [{format, binary},{tmpdir, TmpDir},
+ {header, 1}])), % compact_objs/9: 13 bytes
+ ets:delete(SizeT),
+ Reply.
+
+compact_input(Head, WHead, SizeT, NoSizes) ->
+ L = dets_utils:all_allocated_as_list(Head),
+ Cache = ?VEXT(NoSizes, ?VEMPTY, [0 | []]),
+ compact_input(Head, WHead, SizeT, Cache, L).
+
+compact_input(Head, WHead, SizeT, Cache, L) ->
+ fun(close) ->
+ ok;
+ (read) ->
+ compact_read(Head, WHead, SizeT, Cache, L, 0, [], 0)
+ end.
+
+compact_read(_Head, WHead, SizeT, Cache, [], _Min, [], _ASz) ->
+ _ = fast_write_all_sizes(Cache, SizeT, WHead),
+ end_of_input;
+compact_read(Head, WHead, SizeT, Cache, L, Min, SegBs, ASz)
+ when ASz + Min >= ?CACHE_SIZE, ASz > 0 ->
+ NCache = fast_write_all_sizes(Cache, SizeT, WHead),
+ {SegBs, compact_input(Head, WHead, SizeT, NCache, L)};
+compact_read(Head, WHead, SizeT, Cache, [[From | To] | L], Min, SegBs, ASz) ->
+ Max = erlang:max(?CHUNK_SIZE*3, Min),
+ case check_pread_arg(Max, Head) of
+ true ->
+ case dets_utils:pread_n(Head#head.fptr, From, Max) of
+ eof ->
+ %% Should never happen since compaction will not
+ %% be tried unless the file trailer is valid.
+ not_ok; % try a proper repair
+ Bin1 when byte_size(Bin1) < Min ->
+ %% The last object may not be padded.
+ Pad = Min - byte_size(Bin1),
+ NewBin = <<Bin1/binary, 0:Pad/unit:8>>,
+ compact_objs(Head, WHead, SizeT, NewBin, L,
+ From, To, SegBs, Cache, ASz);
+ NewBin ->
+ compact_objs(Head, WHead, SizeT, NewBin, L,
+ From, To, SegBs, Cache, ASz)
+ end;
+ false ->
+ not_ok % try a proper repair
+ end.
+
+compact_objs(Head, WHead, SizeT, Bin, L, From, To, SegBs, Cache, ASz)
+ when From =:= To ->
+ case L of
+ [] ->
+ {SegBs, compact_input(Head, WHead, SizeT, Cache, L)};
+ [[From1 | To1] | L1] ->
+ Skip1 = From1 - From,
+ case Bin of
+ <<_:Skip1/binary,NewBin/binary>> ->
+ compact_objs(Head, WHead, SizeT, NewBin, L1, From1, To1,
+ SegBs, Cache, ASz);
+ _ when byte_size(Bin) < Skip1 ->
+ compact_read(Head, WHead, SizeT, Cache, L, 0, SegBs, ASz)
+ end
+ end;
+compact_objs(Head, WHead, SizeT, <<Size:32, St:32, _Sz:32, KO/binary>> = Bin,
+ L, From, To, SegBs, Cache, ASz) when St =:= ?ACTIVE ->
+ LSize = sz2pos(Size),
+ Size2 = ?POW(LSize-1),
+ if
+ byte_size(Bin) >= Size2 ->
+ NASz = ASz + Size2,
+ <<SlotObjs:Size2/binary, NewBin/binary>> = Bin,
+ Term = if
+ Head#head.type =:= set ->
+ binary_to_term(KO);
+ true ->
+ <<_KSz:32,B2/binary>> = KO,
+ binary_to_term(B2)
+ end,
+ Key = element(Head#head.keypos, Term),
+ Slot = db_hash(Key, Head),
+ From1 = From + Size2,
+ [Addr | AL] = ?VGET(LSize, Cache),
+ NCache = ?VSET(LSize, Cache, [Addr + Size2 | [SlotObjs | AL]]),
+ NSegBs = [<<Slot:32,Size:32,Addr:32,LSize:8>> | SegBs],
+ compact_objs(Head, WHead, SizeT, NewBin, L, From1,
+ To, NSegBs, NCache, NASz);
+ true ->
+ compact_read(Head, WHead, SizeT, Cache, [[From|To] | L],
+ Size2, SegBs, ASz)
+ end;
+compact_objs(Head, WHead, SizeT, <<_:32, _St:32, _:32, _/binary>> = Bin,
+ L, From, To, SegBs, Cache, ASz)
+ when byte_size(Bin) >= ?ACTUAL_SEG_SIZE -> % , _St =/= ?ACTIVE
+ <<_:?ACTUAL_SEG_SIZE/binary, NewBin/binary>> = Bin,
+ compact_objs(Head, WHead, SizeT, NewBin, L, From + ?ACTUAL_SEG_SIZE,
+ To, SegBs, Cache, ASz);
+compact_objs(Head, WHead, SizeT, <<_:32, _St:32, _:32, _/binary>> = Bin,
+ L, From, To, SegBs, Cache, ASz)
+ when byte_size(Bin) < ?ACTUAL_SEG_SIZE -> % , _St =/= ?ACTIVE
+ compact_read(Head, WHead, SizeT, Cache, [[From|To] | L],
+ ?ACTUAL_SEG_SIZE, SegBs, ASz);
+compact_objs(Head, WHead, SizeT, _Bin, L, From, To, SegBs, Cache, ASz) ->
+ compact_read(Head, WHead, SizeT, Cache, [[From|To] | L], 0, SegBs, ASz).
+
+%%% End compaction.
+
+%%% Bchunk.
+
+read_bchunks(Head, L) ->
+ read_bchunks(Head, L, 0, [], 0).
+
+read_bchunks(_Head, L, Min, Bs, ASz) when ASz + Min >= 4*?CHUNK_SIZE,
+ Bs =/= [] ->
+ {lists:reverse(Bs), L};
+read_bchunks(Head, {From, To, L}, Min, Bs, ASz) ->
+ Max = erlang:max(?CHUNK_SIZE*2, Min),
+ case check_pread_arg(Max, Head) of
+ true ->
+ case dets_utils:pread_n(Head#head.fptr, From, Max) of
+ eof ->
+ %% Should never happen.
+ {error, premature_eof};
+ NewBin when byte_size(NewBin) >= Min ->
+ bchunks(Head, L, NewBin, Bs, ASz, From, To);
+ Bin1 when To - From =:= Min, L =:= <<>> ->
+ %% when byte_size(Bin1) < Min.
+ %% The last object may not be padded.
+ Pad = Min - byte_size(Bin1),
+ NewBin = <<Bin1/binary, 0:Pad/unit:8>>,
+ bchunks(Head, L, NewBin, Bs, ASz, From, To);
+ _ ->
+ {error, premature_eof}
+ end;
+ false ->
+ {error, dets_utils:bad_object(bad_object, {read_bchunks, Max})}
+ end.
+
+bchunks(Head, L, Bin, Bs, ASz, From, To) when From =:= To ->
+ if
+ L =:= <<>> ->
+ {finished, lists:reverse(Bs)};
+ true ->
+ <<From1:32, To1:32, L1/binary>> = L,
+ Skip1 = From1 - From,
+ case Bin of
+ <<_:Skip1/binary,NewBin/binary>> ->
+ bchunks(Head, L1, NewBin, Bs, ASz, From1, To1);
+ _ when byte_size(Bin) < Skip1 ->
+ read_bchunks(Head, {From1,To1,L1}, 0, Bs, ASz)
+ end
+ end;
+bchunks(Head, L, <<Size:32, St:32, _Sz:32, KO/binary>> = Bin, Bs, ASz,
+ From, To) when St =:= ?ACTIVE; St =:= ?FREE ->
+ LSize = sz2pos(Size),
+ Size2 = ?POW(LSize-1),
+ if
+ byte_size(Bin) >= Size2 ->
+ <<B0:Size2/binary, NewBin/binary>> = Bin,
+ %% LSize and Slot are used in make_slots/6. The reason to
+ %% calculate Slot here is to reduce the CPU load in
+ %% make_slots/6.
+ Term = if
+ Head#head.type =:= set ->
+ binary_to_term(KO);
+ true ->
+ <<_KSz:32,B2/binary>> = KO,
+ binary_to_term(B2)
+ end,
+ Key = element(Head#head.keypos, Term),
+ Slot = db_hash(Key, Head),
+ B = {LSize,Slot,B0},
+ bchunks(Head, L, NewBin, [B | Bs], ASz + Size2, From+Size2, To);
+ true ->
+ read_bchunks(Head, {From, To, L}, Size2, Bs, ASz)
+ end;
+bchunks(Head, L, <<_:32, _St:32, _:32, _/binary>> = Bin, Bs, ASz, From, To)
+ when byte_size(Bin) >= ?ACTUAL_SEG_SIZE ->
+ <<_:?ACTUAL_SEG_SIZE/binary, NewBin/binary>> = Bin,
+ bchunks(Head, L, NewBin, Bs, ASz, From + ?ACTUAL_SEG_SIZE, To);
+bchunks(Head, L, <<_:32, _St:32, _:32, _/binary>> = Bin, Bs, ASz, From, To)
+ when byte_size(Bin) < ?ACTUAL_SEG_SIZE ->
+ read_bchunks(Head, {From, To, L}, ?ACTUAL_SEG_SIZE, Bs, ASz);
+bchunks(Head, L, _Bin, Bs, ASz, From, To) ->
+ read_bchunks(Head, {From, To, L}, 0, Bs, ASz).
+
+%%% End bchunk.
+
+%% -> {ok, NewHead} | throw(Error) | Error
+bchunk_init(Head, InitFun) ->
+ Ref = make_ref(),
+ %% The non-empty list of data begins with the table parameters.
+ case catch {Ref, InitFun(read)} of
+ {Ref, end_of_input} ->
+ {error, {init_fun, end_of_input}};
+ {Ref, {[], NInitFun}} when is_function(NInitFun) ->
+ bchunk_init(Head, NInitFun);
+ {Ref, {[ParmsBin | L], NInitFun}}
+ when is_list(L), is_function(NInitFun) ->
+ #head{fptr = Fd, type = Type, keypos = Kp,
+ auto_save = Auto, cache = Cache,
+ filename = Fname, ram_file = Ram,
+ name = Tab} = Head,
+ case try_bchunk_header(ParmsBin, Head) of
+ {ok, Parms} ->
+ #?HASH_PARMS{no_objects = NoObjects,
+ no_keys = NoKeys,
+ no_colls = NoObjsPerSize} = Parms,
+ CacheSz = dets_utils:cache_size(Cache),
+ {ok, Head1} =
+ prep_table_copy(Fd, Tab, Fname, Type,
+ Kp, Ram, CacheSz,
+ Auto, Parms),
+ SizeT = ets:new(dets_init, []),
+ {NewHead, Bases, SegAddr, SegEnd} =
+ prepare_file_init(NoObjects, NoKeys,
+ NoObjsPerSize, SizeT, Head1),
+ ECache = ?VEXT(tuple_size(Bases), ?VEMPTY, [0 | []]),
+ Input =
+ fun(close) ->
+ _ = (catch NInitFun(close));
+ (read) ->
+ do_make_slots(L, ECache, SizeT, NewHead, Ref,
+ 0, NInitFun)
+ end,
+ Output = fast_output(NewHead, SizeT, Bases, SegAddr,SegEnd),
+ TmpDir = filename:dirname(Head#head.filename),
+ Reply = (catch file_sorter:sort(Input, Output,
+ [{format, binary},
+ {tmpdir, TmpDir},
+ {header, 1}])),
+ ets:delete(SizeT),
+ Reply;
+ not_ok ->
+ {error, {init_fun, ParmsBin}}
+ end;
+ {Ref, Value} ->
+ {error, {init_fun, Value}};
+ Error ->
+ {thrown, Error}
+ end.
+
+try_bchunk_header(ParmsBin, Head) ->
+ #head{type = Type, keypos = Kp, hash_bif = HashBif} = Head,
+ HashMethod = hash_method_to_code(HashBif),
+ case catch binary_to_term(ParmsBin) of
+ Parms when is_record(Parms, ?HASH_PARMS),
+ Parms#?HASH_PARMS.type =:= Type,
+ Parms#?HASH_PARMS.keypos =:= Kp,
+ Parms#?HASH_PARMS.hash_method =:= HashMethod,
+ Parms#?HASH_PARMS.bchunk_format_version =:=
+ ?BCHUNK_FORMAT_VERSION ->
+ {ok, Parms};
+ _ ->
+ not_ok
+ end.
+
+bchunk_input(InitFun, SizeT, Head, Ref, Cache, ASz) ->
+ fun(close) ->
+ _ = (catch InitFun(close));
+ (read) ->
+ case catch {Ref, InitFun(read)} of
+ {Ref, end_of_input} ->
+ _ = fast_write_all_sizes(Cache, SizeT, Head),
+ end_of_input;
+ {Ref, {L, NInitFun}} when is_list(L), is_function(NInitFun) ->
+ do_make_slots(L, Cache, SizeT, Head, Ref, ASz,
+ NInitFun);
+ {Ref, Value} ->
+ {error, {init_fun, Value}};
+ Error ->
+ throw({thrown, Error})
+ end
+ end.
+
+do_make_slots(L, Cache, SizeT, Head, Ref, ASz, InitFun) ->
+ case catch make_slots(L, Cache, [], ASz) of
+ {'EXIT', _} ->
+ _ = (catch InitFun(close)),
+ {error, invalid_objects_list};
+ {Cache1, SegBs, NASz} when NASz > ?CACHE_SIZE ->
+ NCache = fast_write_all_sizes(Cache1, SizeT, Head),
+ F = bchunk_input(InitFun, SizeT, Head, Ref, NCache, 0),
+ {SegBs, F};
+ {NCache, SegBs, NASz} ->
+ F = bchunk_input(InitFun, SizeT, Head, Ref, NCache, NASz),
+ {SegBs, F}
+ end.
+
+make_slots([{LSize,Slot,<<Size:32, St:32, Sz:32, KO/binary>> = Bin0} | Bins],
+ Cache, SegBs, ASz) ->
+ Bin = if
+ St =:= ?ACTIVE ->
+ Bin0;
+ St =:= ?FREE ->
+ <<Size:32,?ACTIVE:32,Sz:32,KO/binary>>
+ end,
+ BSz = byte_size(Bin0),
+ true = (BSz =:= ?POW(LSize-1)),
+ NASz = ASz + BSz,
+ [Addr | L] = ?VGET(LSize, Cache),
+ NSegBs = [<<Slot:32,Size:32,Addr:32,LSize:8>> | SegBs],
+ NCache = ?VSET(LSize, Cache, [Addr + BSz | [Bin | L]]),
+ make_slots(Bins, NCache, NSegBs, NASz);
+make_slots([], Cache, SegBs, ASz) ->
+ {Cache, SegBs, ASz}.
+
+fast_output(Head, SizeT, Bases, SegAddr, SegEnd) ->
+ fun(close) ->
+ fast_output_end(Head, SizeT);
+ (L) ->
+ case file:position(Head#head.fptr, SegAddr) of
+ {ok, SegAddr} ->
+ NewSegAddr = write_segment_file(L, Bases, Head, [],
+ SegAddr, SegAddr),
+ fast_output2(Head, SizeT, Bases, NewSegAddr,
+ SegAddr, SegEnd);
+ Error ->
+ catch dets_utils:file_error(Error, Head#head.filename)
+ end
+ end.
+
+fast_output2(Head, SizeT, Bases, SegAddr, SS, SegEnd) ->
+ fun(close) ->
+ FinalZ = SegEnd - SegAddr,
+ dets_utils:write(Head, dets_utils:make_zeros(FinalZ)),
+ fast_output_end(Head, SizeT);
+ (L) ->
+ NewSegAddr = write_segment_file(L, Bases, Head, [], SegAddr, SS),
+ fast_output2(Head, SizeT, Bases, NewSegAddr, SS, SegEnd)
+ end.
+
+fast_output_end(Head, SizeT) ->
+ case ets:foldl(fun({_Sz,_Pos,Cnt,NoC}, Acc) -> (Cnt =:= NoC) and Acc end,
+ true, SizeT) of
+ true -> {ok, Head};
+ false -> {error, invalid_objects_list}
+ end.
+
+%% Inlined.
+write_segment_file([<<Slot:32,BSize:32,AddrToBe:32,LSize:8>> | Bins],
+ Bases, Head, Ws, SegAddr, SS) ->
+ %% Should call slot_position/1, but since all segments are
+ %% allocated in a sequence, the position of a slot can be
+ %% calculated faster.
+ Pos = SS + ?SZOBJP*4 * Slot, % Same as Pos = slot_position(Slot).
+ write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos,
+ BSize, AddrToBe, LSize);
+write_segment_file([], _Bases, Head, Ws, SegAddr, _SS) ->
+ dets_utils:write(Head, Ws),
+ SegAddr.
+
+write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
+ AddrToBe, LSize) when Pos =:= SegAddr ->
+ Addr = AddrToBe + element(LSize, Bases),
+ NWs = [Ws | <<BSize:32,Addr:32>>],
+ write_segment_file(Bins, Bases, Head, NWs, SegAddr + ?SZOBJP*4, SS);
+write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
+ AddrToBe, LSize) when Pos - SegAddr < 100 ->
+ Addr = AddrToBe + element(LSize, Bases),
+ NoZeros = Pos - SegAddr,
+ NWs = [Ws | <<0:NoZeros/unit:8,BSize:32,Addr:32>>],
+ NSegAddr = SegAddr + NoZeros + ?SZOBJP*4,
+ write_segment_file(Bins, Bases, Head, NWs, NSegAddr, SS);
+write_segment_file(Bins, Bases, Head, Ws, SegAddr, SS, Pos, BSize,
+ AddrToBe, LSize) ->
+ Addr = AddrToBe + element(LSize, Bases),
+ NoZeros = Pos - SegAddr,
+ NWs = [Ws, dets_utils:make_zeros(NoZeros) | <<BSize:32,Addr:32>>],
+ NSegAddr = SegAddr + NoZeros + ?SZOBJP*4,
+ write_segment_file(Bins, Bases, Head, NWs, NSegAddr, SS).
+
+fast_write_all_sizes(Cache, SizeT, Head) ->
+ CacheL = lists:reverse(tuple_to_list(Cache)),
+ fast_write_sizes(CacheL, tuple_size(Cache), SizeT, Head, [], []).
+
+fast_write_sizes([], _Sz, _SizeT, Head, NCL, PwriteList) ->
+ #head{filename = FileName, fptr = Fd} = Head,
+ ok = dets_utils:pwrite(Fd, FileName, PwriteList),
+ list_to_tuple(NCL);
+fast_write_sizes([[_Addr] = C | CL], Sz, SizeT, Head, NCL, PwriteList) ->
+ fast_write_sizes(CL, Sz-1, SizeT, Head, [C | NCL], PwriteList);
+fast_write_sizes([[Addr | C] | CL], Sz, SizeT, Head, NCL, PwriteList) ->
+ case ets:lookup(SizeT, Sz) of
+ [] ->
+ throw({error, invalid_objects_list});
+ [{Sz,Position,_ObjCounter,_NoCollections}] ->
+ %% Update ObjCounter:
+ NoColls = length(C),
+ _ = ets:update_counter(SizeT, Sz, {3, NoColls}),
+ Pos = Position + Addr - NoColls*?POW(Sz-1),
+ fast_write_sizes(CL, Sz-1, SizeT, Head, [[Addr] | NCL],
+ [{Pos,lists:reverse(C)} | PwriteList])
+ end.
+
+prepare_file_init(NoObjects, NoKeys, NoObjsPerSize, SizeT, Head) ->
+ SegSz = ?ACTUAL_SEG_SIZE,
+ {_, SegEnd, _} = dets_utils:alloc(Head, adjsz(SegSz)),
+ Head1 = Head#head{no_objects = NoObjects, no_keys = NoKeys},
+ true = ets:insert(SizeT, {?FSCK_SEGMENT,0,[],0}),
+ lists:foreach(fun({LogSz,NoColls}) ->
+ true = ets:insert(SizeT, {LogSz+1,0,0,NoColls})
+ end, NoObjsPerSize),
+ {NewHead, NL0, MaxSz, EndOfFile} = allocate_all_objects(Head1, SizeT),
+ [{?FSCK_SEGMENT,SegAddr,[],0} | NL] = NL0,
+ true = ets:delete_all_objects(SizeT),
+ lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end, NL),
+ Bases = lists:foldl(fun({LSz,P,_D,_N}, A) -> setelement(LSz,A,P) end,
+ erlang:make_tuple(MaxSz, 0), NL),
+ Est = lists:foldl(fun({LSz,_,_,N}, A) -> A + ?POW(LSz-1)*N end, 0, NL),
+ ok = write_bytes(NewHead, EndOfFile, Est),
+ {NewHead, Bases, SegAddr, SegEnd}.
+
+%% Writes "zeros" to the file. This ensures that the file blocks are
+%% allocated more or less contiguously, which reduces the seek times
+%% to a minimum when the file is later read serially from beginning to
+%% end (as is done when calling select and the like). A well-formed
+%% file will be created also if nothing is written (as is the case for
+%% small files, for efficiency).
+write_bytes(_Head, _EndOfFile, Est) when Est < ?CACHE_SIZE ->
+ ok;
+write_bytes(Head, EndOfFile, _Est) ->
+ Fd = Head#head.fptr,
+ {ok, Start} = file:position(Fd, eof),
+ BytesToWrite = EndOfFile - Start,
+ SizeInKB = 64,
+ Bin = list_to_binary(lists:duplicate(SizeInKB * 4, lists:seq(0, 255))),
+ write_loop(Head, BytesToWrite, Bin).
+
+write_loop(Head, BytesToWrite, Bin) when BytesToWrite >= byte_size(Bin) ->
+ case file:write(Head#head.fptr, Bin) of
+ ok -> write_loop(Head, BytesToWrite - byte_size(Bin), Bin);
+ Error -> dets_utils:file_error(Error, Head#head.filename)
+ end;
+write_loop(_Head, 0, _Bin) ->
+ ok;
+write_loop(Head, BytesToWrite, Bin) ->
+ <<SmallBin:BytesToWrite/binary,_/binary>> = Bin,
+ write_loop(Head, BytesToWrite, SmallBin).
+
+%% By allocating bigger objects before smaller ones, holes in the
+%% buddy system memory map are avoided. Unfortunately, the segments
+%% are always allocated first, so if there are objects bigger than a
+%% segment, there is a hole to handle. (Haven't considered placing the
+%% segments among other objects of the same size.)
+allocate_all_objects(Head, SizeT) ->
+ DTL = lists:reverse(lists:keysort(1, ets:tab2list(SizeT))),
+ MaxSz = element(1, hd(DTL)),
+ SegSize = ?ACTUAL_SEG_SIZE,
+ {Head1, HSz, HN, HA} = alloc_hole(MaxSz, Head, SegSize),
+ {Head2, NL} = allocate_all(Head1, DTL, []),
+ %% Find the position that will be the end of the file by allocating
+ %% a minimal object.
+ {_Head, EndOfFile, _} = dets_utils:alloc(Head2, ?BUMP),
+ Head3 = free_hole(Head2, HSz, HN, HA),
+ NewHead = Head3#head{maxobjsize = max_objsize(Head3#head.no_collections)},
+ {NewHead, NL, MaxSz, EndOfFile}.
+
+alloc_hole(LSize, Head, SegSz) when ?POW(LSize-1) > SegSz ->
+ Size = ?POW(LSize-1),
+ {_, SegAddr, _} = dets_utils:alloc(Head, adjsz(SegSz)),
+ {_, Addr, _} = dets_utils:alloc(Head, adjsz(Size)),
+ N = (Addr - SegAddr) div SegSz,
+ Head1 = dets_utils:alloc_many(Head, SegSz, N, SegAddr),
+ {Head1, SegSz, N, SegAddr};
+alloc_hole(_MaxSz, Head, _SegSz) ->
+ {Head, 0, 0, 0}.
+
+free_hole(Head, _Size, 0, _Addr) ->
+ Head;
+free_hole(Head, Size, N, Addr) ->
+ {Head1, _} = dets_utils:free(Head, Addr, adjsz(Size)),
+ free_hole(Head1, Size, N-1, Addr+Size).
+
+%% One (temporary) file for each buddy size, write all objects of that
+%% size to the file.
+allocate_all(Head, [{?FSCK_SEGMENT,_,Data,_}], L) ->
+ %% And one file for the segments...
+ %% Note that space for the array parts and the segments has
+ %% already been allocated, but the segments have not been
+ %% initialized on disk.
+ NoParts = no_parts(Head#head.next),
+ %% All parts first, ensured by init_segments/6.
+ Addr = ?BASE + NoParts * 4 * ?SEGPARTSZ,
+ {Head, [{?FSCK_SEGMENT,Addr,Data,0} | L]};
+allocate_all(Head, [{LSize,_,Data,NoCollections} | DTL], L) ->
+ Size = ?POW(LSize-1),
+ {_Head, Addr, _} = dets_utils:alloc(Head, adjsz(Size)),
+ Head1 = dets_utils:alloc_many(Head, Size, NoCollections, Addr),
+ NoColls = Head1#head.no_collections,
+ NewNoColls = orddict:update_counter(LSize-1, NoCollections, NoColls),
+ NewHead = Head1#head{no_collections = NewNoColls},
+ E = {LSize,Addr,Data,NoCollections},
+ allocate_all(NewHead, DTL, [E | L]).
+
+bin2term(Bin, 9, Kp) ->
+ bin2term1(Bin, Kp, []);
+bin2term(Bin, 8, Kp) ->
+ bin2term_v8(Bin, Kp, []).
+
+bin2term1([<<Slot:32, Seq:32, BinTerm/binary>> | BTs], Kp, L) ->
+ Term = binary_to_term(BinTerm),
+ Key = element(Kp, Term),
+ bin2term1(BTs, Kp, [{Slot, Key, Seq, Term, BinTerm} | L]);
+bin2term1([], _Kp, L) ->
+ lists:reverse(L).
+
+bin2term_v8([<<Slot:32, BinTerm/binary>> | BTs], Kp, L) ->
+ Term = binary_to_term(BinTerm),
+ Key = element(Kp, Term),
+ bin2term_v8(BTs, Kp, [{Slot, Key, foo, Term, BinTerm} | L]);
+bin2term_v8([], _Kp, L) ->
+ lists:reverse(L).
+
+write_all_sizes({}=Cache, _SizeT, _Head, _More) ->
+ Cache;
+write_all_sizes(Cache, SizeT, Head, More) ->
+ CacheL = lists:reverse(tuple_to_list(Cache)),
+ Sz = length(CacheL),
+ NCL = case ets:info(SizeT, size) of
+ 1 when More =:= no_more -> % COUNTERS only...
+ all_sizes(CacheL, Sz, SizeT);
+ _ ->
+ write_sizes(CacheL, Sz, SizeT, Head)
+ end,
+ list_to_tuple(NCL).
+
+all_sizes([]=CL, _Sz, _SizeT) ->
+ CL;
+all_sizes([[]=C | CL], Sz, SizeT) ->
+ [C | all_sizes(CL, Sz-1, SizeT)];
+all_sizes([C0 | CL], Sz, SizeT) ->
+ C = lists:reverse(C0),
+ NoCollections = length(C),
+ true = ets:insert(SizeT, {Sz,0,C,NoCollections}),
+ [[] | all_sizes(CL, Sz-1, SizeT)].
+
+write_sizes([]=CL, _Sz, _SizeT, _Head) ->
+ CL;
+write_sizes([[]=C | CL], Sz, SizeT, Head) ->
+ [C | write_sizes(CL, Sz-1, SizeT, Head)];
+write_sizes([C | CL], Sz, SizeT, Head) ->
+ {FileName, Fd} =
+ case ets:lookup(SizeT, Sz) of
+ [] ->
+ temp_file(Head, SizeT, Sz);
+ [{_,_,{FN,F},_}] ->
+ {FN, F}
+ end,
+ NoCollections = length(C),
+ _ = ets:update_counter(SizeT, Sz, {4,NoCollections}),
+ case file:write(Fd, lists:reverse(C)) of
+ ok ->
+ [[] | write_sizes(CL, Sz-1, SizeT, Head)];
+ Error ->
+ dets_utils:file_error(FileName, Error)
+ end.
+
+output_slots([E | Es], Head, Cache, SizeT, NoKeys, NoObjs) ->
+ output_slots(E, Es, [E], Head, Cache, SizeT, NoKeys, NoObjs);
+output_slots([], _Head, Cache, SizeT, NoKeys, NoObjs) ->
+ _ = ets:update_counter(SizeT, ?COUNTERS, {?OBJ_COUNTER,NoObjs}),
+ _ = ets:update_counter(SizeT, ?COUNTERS, {?KEY_COUNTER,NoKeys}),
+ {not_a_tuple, [], Cache}.
+
+output_slots(E, [E1 | Es], Acc, Head, Cache, SizeT, NoKeys, NoObjs)
+ when element(1, E) =:= element(1, E1) ->
+ output_slots(E1, Es, [E1 | Acc], Head, Cache, SizeT, NoKeys, NoObjs);
+output_slots(E, [], Acc, _Head, Cache, SizeT, NoKeys, NoObjs) ->
+ _ = ets:update_counter(SizeT, ?COUNTERS, {?OBJ_COUNTER,NoObjs}),
+ _ = ets:update_counter(SizeT, ?COUNTERS, {?KEY_COUNTER,NoKeys}),
+ {E, Acc, Cache};
+output_slots(_E, L, Acc, Head, Cache, SizeT, NoKeys, NoObjs) ->
+ output_slot(Acc, Head, Cache, L, SizeT, NoKeys, NoObjs).
+
+output_slot(Es, Head, Cache, L, SizeT, NoKeys, NoObjs) ->
+ Slot = element(1, hd(Es)),
+ %% Plain lists:sort/1 will do.
+ {Bins, Size, No, KNo} = prep_slot(lists:sort(Es), Head),
+ NNoKeys = NoKeys + KNo,
+ NNoObjs = NoObjs + No,
+
+ %% First the object collection.
+ BSize = Size + ?OHDSZ,
+ LSize = sz2pos(BSize),
+ Size2 = ?POW(LSize-1),
+ Pad = <<0:(Size2-BSize)/unit:8>>,
+ BinObject = [<<BSize:32, ?ACTIVE:32>>, Bins | Pad],
+ Cache1 =
+ if
+ LSize > tuple_size(Cache) ->
+ C1 = ?VEXT(LSize, Cache, []),
+ ?VSET(LSize, C1, [BinObject]);
+ true ->
+ CL = ?VGET(LSize, Cache),
+ ?VSET(LSize, Cache, [BinObject | CL])
+ end,
+
+ %% Then the pointer to the object collection.
+ %% Cannot yet determine the absolute pointers; segment_file/4 does that.
+ PBin = <<Slot:32,BSize:32,LSize:8>>,
+ PL = ?VGET(?FSCK_SEGMENT, Cache1),
+ NCache = ?VSET(?FSCK_SEGMENT, Cache1, [PBin | PL]),
+ output_slots(L, Head, NCache, SizeT, NNoKeys, NNoObjs).
+
+prep_slot(L, Head) when Head#head.type =/= set ->
+ prep_slot(L, Head, []);
+prep_slot([{_Slot,Key,_Seq,_T,BT} | L], _Head) ->
+ prep_set_slot(L, Key, BT, 0, 0, 0, []).
+
+prep_slot([{_Slot, Key, Seq, T, _BT} | L], Head, W) ->
+ prep_slot(L, Head, [{Key, {Seq, {insert,T}}} | W]);
+prep_slot([], Head, W) ->
+ WLs = dets_utils:family(W),
+ {[], Bins, Size, No, KNo, _} =
+ eval_slot(WLs, [], Head#head.type, [], [], 0, 0, 0, false),
+ {Bins, Size, No, KNo}.
+
+%% Optimization, prep_slot/3 would work for set tables as well.
+prep_set_slot([{_,K,_Seq,_T1,BT1} | L], K, _BT, Sz, NoKeys, NoObjs, Ws) ->
+ prep_set_slot(L, K, BT1, Sz, NoKeys, NoObjs, Ws);
+prep_set_slot([{_,K1,_Seq,_T1,BT1} | L], _K, BT, Sz, NoKeys, NoObjs, Ws) ->
+ BSize = byte_size(BT) + 4,
+ NWs = [Ws,<<BSize:32>>|BT],
+ prep_set_slot(L, K1, BT1, Sz+BSize, NoKeys+1, NoObjs+1, NWs);
+prep_set_slot([], _K, BT, Sz, NoKeys, NoObjs, Ws) ->
+ BSize = byte_size(BT) + 4,
+ {[Ws, <<BSize:32>> | BT], Sz + BSize, NoKeys+1, NoObjs+1}.
+
+segment_file(SizeT, Head, FileData, SegEnd) ->
+ I = 2,
+ true = ets:delete_all_objects(SizeT),
+ lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end, FileData),
+ [{?FSCK_SEGMENT,SegAddr,Data,0} | FileData1] = FileData,
+ NewData =
+ case Data of
+ {InFile,In0} ->
+ {OutFile, Out} = temp_file(Head, SizeT, I),
+ file:close(In0),
+ {ok, In} = dets_utils:open(InFile, [raw,binary,read]),
+ {ok, 0} = dets_utils:position(In, InFile, bof),
+ seg_file(SegAddr, SegAddr, In, InFile, Out, OutFile, SizeT,
+ SegEnd),
+ file:close(In),
+ file:delete(InFile),
+ {OutFile,Out};
+ Objects ->
+ {LastAddr, B} = seg_file(Objects, SegAddr, SegAddr, SizeT, []),
+ dets_utils:disk_map_segment(SegAddr, B),
+ FinalZ = SegEnd - LastAddr,
+ [B | dets_utils:make_zeros(FinalZ)]
+ end,
+ %% Restore the positions.
+ true = ets:delete_all_objects(SizeT),
+ %% To get the segments copied first by dets:fsck_copy/4, use a big
+ %% number here, FSCK_SEGMENT2.
+ lists:foreach(fun(X) -> true = ets:insert(SizeT, X) end,
+ [{?FSCK_SEGMENT2,SegAddr,NewData,0} | FileData1]),
+ ok.
+
+seg_file(Addr, SS, In, InFile, Out, OutFile, SizeT, SegEnd) ->
+ case dets_utils:read_n(In, 4500) of
+ eof ->
+ FinalZ = SegEnd - Addr,
+ dets_utils:fwrite(Out, OutFile, dets_utils:make_zeros(FinalZ));
+ Bin ->
+ {NewAddr, L} = seg_file(Bin, Addr, SS, SizeT, []),
+ dets_utils:disk_map_segment(Addr, L),
+ ok = dets_utils:fwrite(Out, OutFile, L),
+ seg_file(NewAddr, SS, In, InFile, Out, OutFile, SizeT, SegEnd)
+ end.
+
+seg_file(<<Slot:32,BSize:32,LSize:8,T/binary>>, Addr, SS, SizeT, L) ->
+ seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize);
+seg_file([<<Slot:32,BSize:32,LSize:8>> | T], Addr, SS, SizeT, L) ->
+ seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize);
+seg_file([], Addr, _SS, _SizeT, L) ->
+ {Addr, lists:reverse(L)};
+seg_file(<<>>, Addr, _SS, _SizeT, L) ->
+ {Addr, lists:reverse(L)}.
+
+seg_file_item(T, Addr, SS, SizeT, L, Slot, BSize, LSize) ->
+ %% Should call slot_position/1, but since all segments are
+ %% allocated in a sequence, the position of a slot can be
+ %% calculated faster.
+ SlotPos = SS + ?SZOBJP*4 * Slot, % SlotPos = slot_position(Slot)
+ NoZeros = SlotPos - Addr,
+ PSize = NoZeros+?SZOBJP*4,
+ Inc = ?POW(LSize-1),
+ CollP = ets:update_counter(SizeT, LSize, Inc) - Inc,
+ PointerBin = if
+ NoZeros =:= 0 ->
+ <<BSize:32, CollP:32>>;
+ NoZeros > 100 ->
+ [dets_utils:make_zeros(NoZeros) |
+ <<BSize:32, CollP:32>>];
+ true ->
+ <<0:NoZeros/unit:8, BSize:32, CollP:32>>
+ end,
+ seg_file(T, Addr + PSize, SS, SizeT, [PointerBin | L]).
+
+temp_file(Head, SizeT, N) ->
+ TmpName = lists:concat([Head#head.filename, '.', N]),
+ {ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]),
+ %% The file table is consulted when cleaning up.
+ true = ets:insert(SizeT, {N,0,{TmpName,Fd},0}),
+ {TmpName, Fd}.
+
+%% Does not close Fd.
+fsck_input(Head, Fd, Cntrs, FileHeader) ->
+ MaxSz0 = case FileHeader#fileheader.has_md5 of
+ true when is_integer(FileHeader#fileheader.no_colls) ->
+ ?POW(max_objsize(FileHeader#fileheader.no_colls));
+ _ ->
+ %% The file is not compressed, so the bucket size
+ %% cannot exceed the filesize, for all buckets.
+ case file:position(Fd, eof) of
+ {ok, Pos} ->
+ Pos;
+ _ ->
+ 1 bsl 32
+ end
+ end,
+ MaxSz = erlang:max(MaxSz0, ?CHUNK_SIZE),
+ State0 = fsck_read(?BASE, Fd, [], 0),
+ fsck_input(Head, State0, Fd, MaxSz, Cntrs).
+
+fsck_input(Head, State, Fd, MaxSz, Cntrs) ->
+ fun(close) ->
+ ok;
+ (read) ->
+ case State of
+ done ->
+ end_of_input;
+ {done, L, _Seq} ->
+ R = count_input(Head, Cntrs, L),
+ {R, fsck_input(Head, done, Fd, MaxSz, Cntrs)};
+ {cont, L, Bin, Pos, Seq} ->
+ R = count_input(Head, Cntrs, L),
+ FR = fsck_objs(Bin, Head#head.keypos, Head, [], Seq),
+ NewState = fsck_read(FR, Pos, Fd, MaxSz, Head),
+ {R, fsck_input(Head, NewState, Fd, MaxSz, Cntrs)}
+ end
+ end.
+
+%% The ets table Cntrs is used for counting objects per size.
+count_input(Head, Cntrs, L) when Head#head.version =:= 8 ->
+ count_input1(Cntrs, L, []);
+count_input(_Head, _Cntrs, L) ->
+ lists:reverse(L).
+
+count_input1(Cntrs, [[LogSz | B] | Ts], L) ->
+ case catch ets:update_counter(Cntrs, LogSz, 1) of
+ N when is_integer(N) -> ok;
+ _Badarg -> true = ets:insert(Cntrs, {LogSz, 1})
+ end,
+ count_input1(Cntrs, Ts, [B | L]);
+count_input1(_Cntrs, [], L) ->
+ L.
+
+fsck_read(Pos, F, L, Seq) ->
+ case file:position(F, Pos) of
+ {ok, _} ->
+ read_more_bytes([], 0, Pos, F, L, Seq);
+ _Error ->
+ {done, L, Seq}
+ end.
+
+fsck_read({more, Bin, Sz, L, Seq}, Pos, F, MaxSz, Head) when Sz > MaxSz ->
+ FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L, Seq),
+ fsck_read(FR, Pos, F, MaxSz, Head);
+fsck_read({more, Bin, Sz, L, Seq}, Pos, F, _MaxSz, _Head) ->
+ read_more_bytes(Bin, Sz, Pos, F, L, Seq);
+fsck_read({new, Skip, L, Seq}, Pos, F, _MaxSz, _Head) ->
+ NewPos = Pos + Skip,
+ fsck_read(NewPos, F, L, Seq).
+
+read_more_bytes(B, Min, Pos, F, L, Seq) ->
+ Max = if
+ Min < ?CHUNK_SIZE -> ?CHUNK_SIZE;
+ true -> Min
+ end,
+ case dets_utils:read_n(F, Max) of
+ eof ->
+ {done, L, Seq};
+ Bin ->
+ NewPos = Pos + byte_size(Bin),
+ {cont, L, list_to_binary([B, Bin]), NewPos, Seq}
+ end.
+
+fsck_objs(Bin = <<Sz:32, Status:32, Tail/binary>>, Kp, Head, L, Seq) ->
+ if
+ Status =:= ?ACTIVE ->
+ Sz1 = Sz-?OHDSZ,
+ case Tail of
+ <<BinTerm:Sz1/binary, Tail2/binary>> ->
+ case catch bin2keybins(BinTerm, Head) of
+ {'EXIT', _Reason} ->
+ %% The whole collection of objects is skipped.
+ skip_bytes(Bin, ?BUMP, Kp, Head, L, Seq);
+ BOs ->
+ {NL, NSeq} = make_objects(BOs, Seq, Kp, Head, L),
+ Skip = ?POW(sz2pos(Sz)-1) - Sz,
+ skip_bytes(Tail2, Skip, Kp, Head, NL, NSeq)
+ end;
+ _ when byte_size(Tail) < Sz1 ->
+ {more, Bin, Sz, L, Seq}
+ end;
+ true ->
+ skip_bytes(Bin, ?BUMP, Kp, Head, L, Seq)
+ end;
+fsck_objs(Bin, _Kp, _Head, L, Seq) ->
+ {more, Bin, 0, L, Seq}.
+
+make_objects([{K,BT}|Os], Seq, Kp, Head, L) when Head#head.version =:= 8 ->
+ LogSz = dets_v8:sz2pos(byte_size(BT)+?OHDSZ_v8),
+ Slot = dets_v8:db_hash(K, Head),
+ Obj = [LogSz | <<Slot:32, LogSz:8, BT/binary>>],
+ make_objects(Os, Seq, Kp, Head, [Obj | L]);
+make_objects([{K,BT} | Os], Seq, Kp, Head, L) ->
+ Obj = make_object(Head, K, Seq, BT),
+ make_objects(Os, Seq+1, Kp, Head, [Obj | L]);
+make_objects([], Seq, _Kp, _Head, L) ->
+ {L, Seq}.
+
+%% Inlined.
+make_object(Head, Key, Seq, BT) ->
+ Slot = db_hash(Key, Head),
+ <<Slot:32, Seq:32, BT/binary>>.
+
+%% Inlined.
+skip_bytes(Bin, Skip, Kp, Head, L, Seq) ->
+ case Bin of
+ <<_:Skip/binary, Tail/binary>> ->
+ fsck_objs(Tail, Kp, Head, L, Seq);
+ _ when byte_size(Bin) < Skip ->
+ {new, Skip - byte_size(Bin), L, Seq}
+ end.
+
+%%%
+%%% End of repair, conversion and initialization of a dets file.
+%%%
+
+%% -> {NewHead, ok} | throw({Head, Error})
+do_perform_save(H) ->
+ {ok, FreeListsPointer} = dets_utils:position(H, eof),
+ H1 = H#head{freelists_p = FreeListsPointer},
+ {FLW, FLSize} = free_lists_to_file(H1),
+ FileSize = FreeListsPointer + FLSize + 4,
+ ok = dets_utils:write(H1, [FLW | <<FileSize:32>>]),
+ FileHeader = file_header(H1, FreeListsPointer, ?CLOSED_PROPERLY),
+ case dets_utils:debug_mode() of
+ true ->
+ TmpHead = H1#head{freelists = init_freelist(H1, true),
+ fixed = false},
+ case
+ catch dets_utils:all_allocated_as_list(TmpHead)
+ =:= dets_utils:all_allocated_as_list(H1)
+ of
+ true ->
+ dets_utils:pwrite(H1, [{0, FileHeader}]);
+ _ ->
+ dets_utils:corrupt_reason(H1, {failed_to_save_free_lists,
+ FreeListsPointer,
+ TmpHead#head.freelists,
+ H1#head.freelists})
+ end;
+ false ->
+ dets_utils:pwrite(H1, [{0, FileHeader}])
+ end.
+
+file_header(Head, FreeListsPointer, ClosedProperly) ->
+ NoColls = case Head#head.no_collections of
+ undefined -> [];
+ NC -> NC
+ end,
+ L = orddict:merge(fun(_K, V1, V2) -> V1 + V2 end,
+ NoColls,
+ lists:map(fun(X) -> {X,0} end, lists:seq(4,?MAXBUD-1))),
+ CW = lists:map(fun({_LSz,N}) -> <<N:32>> end, L),
+ file_header(Head, FreeListsPointer, ClosedProperly, CW).
+
+file_header(Head, FreeListsPointer, ClosedProperly, NoColls) ->
+ Cookie = ?MAGIC,
+ TypeCode = dets_utils:type_to_code(Head#head.type),
+ Version = ?FILE_FORMAT_VERSION,
+ HashMethod = hash_method_to_code(Head#head.hash_bif),
+ H1 = <<FreeListsPointer:32, Cookie:32, ClosedProperly:32>>,
+ H2 = <<TypeCode:32,
+ Version:32,
+ (Head#head.m):32,
+ (Head#head.next):32,
+ (Head#head.keypos):32,
+ (Head#head.no_objects):32,
+ (Head#head.no_keys):32,
+ (Head#head.min_no_slots):32,
+ (Head#head.max_no_slots):32,
+ HashMethod:32,
+ (Head#head.n):32>>,
+ DigH = [H2 | NoColls],
+ MD5 = case Head#head.has_md5 of
+ true -> erlang:md5(DigH);
+ false -> <<0:?MD5SZ/unit:8>>
+ end,
+ [H1, DigH, MD5 | <<0:?RESERVED/unit:8>>].
+
+%% Going through some trouble to avoid creating one single binary for
+%% the free lists. If the free lists are huge, binary_to_term and
+%% term_to_binary could otherwise stop the emulator for quite some time.
+
+-define(MAXFREEOBJ, 4096).
+-define(ENDFREE, 12345).
+
+free_lists_to_file(H) ->
+ FL = dets_utils:get_freelists(H),
+ free_list_to_file(FL, H, 1, tuple_size(FL), [], 0).
+
+free_list_to_file(_Ftab, _H, Pos, Sz, Ws, WsSz) when Pos > Sz ->
+ {[Ws | <<(4+?OHDSZ):32, ?FREE:32, ?ENDFREE:32>>], WsSz+4+?OHDSZ};
+free_list_to_file(Ftab, H, Pos, Sz, Ws, WsSz) ->
+ Max = (?MAXFREEOBJ - 4 - ?OHDSZ) div 4,
+ F = fun(N, L, W, S) when N =:= 0 -> {N, L, W, S};
+ (N, L, W, S) ->
+ {L1, N1, More} =
+ if
+ N > Max ->
+ {lists:sublist(L, Max), Max,
+ {N-Max, lists:nthtail(Max, L)}};
+ true ->
+ {L, N, no_more}
+ end,
+ Size = N1*4 + 4 + ?OHDSZ,
+ Header = <<Size:32, ?FREE:32, Pos:32>>,
+ NW = [W, Header | L1],
+ case More of
+ no_more ->
+ {0, [], NW, S+Size};
+ {NN, NL} ->
+ ok = dets_utils:write(H, NW),
+ {NN, NL, [], S+Size}
+ end
+ end,
+ {NWs,NWsSz} = dets_utils:tree_to_bin(element(Pos, Ftab), F, Max, Ws, WsSz),
+ free_list_to_file(Ftab, H, Pos+1, Sz, NWs, NWsSz).
+
+free_lists_from_file(H, Pos) ->
+ dets_utils:position(H#head.fptr, H#head.filename, Pos),
+ FL = dets_utils:empty_free_lists(),
+ case catch bin_to_tree([], H, start, FL, -1, []) of
+ {'EXIT', _} ->
+ throw({error, {bad_freelists, H#head.filename}});
+ Reply ->
+ Reply
+ end.
+
+bin_to_tree(Bin, H, LastPos, Ftab, A0, L) ->
+ case Bin of
+ <<_Size:32,?FREE:32,?ENDFREE:32,_/binary>> when L =:= [] ->
+ Ftab;
+ <<_Size:32,?FREE:32,?ENDFREE:32,_/binary>> ->
+ setelement(LastPos, Ftab, dets_utils:list_to_tree(L));
+ <<Size:32,?FREE:32,Pos:32,T/binary>>
+ when byte_size(T) >= Size-4-?OHDSZ ->
+ {NFtab, L1, A1} =
+ if
+ Pos =/= LastPos, LastPos =/= start ->
+ Tree = dets_utils:list_to_tree(L),
+ {setelement(LastPos, Ftab, Tree), [], -1};
+ true ->
+ {Ftab, L, A0}
+ end,
+ {NL, B2, A2} = bin_to_tree1(T, Size-?OHDSZ-4, A1, L1),
+ bin_to_tree(B2, H, Pos, NFtab, A2, NL);
+ _ ->
+ Bin2 = dets_utils:read_n(H#head.fptr, ?MAXFREEOBJ),
+ bin_to_tree(list_to_binary([Bin | Bin2]), H, LastPos, Ftab, A0, L)
+ end.
+
+bin_to_tree1(<<A1:32,A2:32,A3:32,A4:32,T/binary>>, Size, A, L)
+ when Size >= 16, A < A1, A1 < A2, A2 < A3, A3 < A4 ->
+ bin_to_tree1(T, Size-16, A4, [A4, A3, A2, A1 | L]);
+bin_to_tree1(<<A1:32,T/binary>>, Size, A, L) when Size >= 4, A < A1 ->
+ bin_to_tree1(T, Size - 4, A1, [A1 | L]);
+bin_to_tree1(B, 0, A, L) ->
+ {L, B, A}.
+
+%% -> [term()] | throw({Head, Error})
+slot_objs(H, Slot) when Slot >= H#head.next ->
+ '$end_of_table';
+slot_objs(H, Slot) ->
+ {ok, _Pointer, Objects} = slot_objects(H, Slot),
+ Objects.
+
+%% Inlined.
+h(I, phash2) -> erlang:phash2(I); % -> [0..2^27-1]
+h(I, phash) -> erlang:phash(I, ?BIG) - 1.
+
+db_hash(Key, Head) when Head#head.hash_bif =:= phash2 ->
+ H = erlang:phash2(Key),
+ Hash = ?REM2(H, Head#head.m),
+ if
+ Hash < Head#head.n ->
+ ?REM2(H, Head#head.m2); % H rem (2 * m)
+ true ->
+ Hash
+ end;
+db_hash(Key, Head) ->
+ H = h(Key, Head#head.hash_bif),
+ Hash = H rem Head#head.m,
+ if
+ Hash < Head#head.n ->
+ H rem (Head#head.m2); % H rem (2 * m)
+ true ->
+ Hash
+ end.
+
+hash_method_to_code(phash2) -> ?PHASH2;
+hash_method_to_code(phash) -> ?PHASH.
+
+code_to_hash_method(?PHASH2) -> phash2;
+code_to_hash_method(?PHASH) -> phash;
+code_to_hash_method(_) -> undefined.
+
+no_slots(Head) ->
+ {Head#head.min_no_slots, Head#head.next, Head#head.max_no_slots}.
+
+table_parameters(Head) ->
+ case Head#head.no_collections of
+ undefined ->
+ undefined; % Version 9(a)
+ CL ->
+ NoColls0 = lists:foldl(fun({_,0}, A) -> A;
+ (E, A) -> [E | A]
+ end, [], CL),
+ NoColls = lists:reverse(NoColls0),
+ #?HASH_PARMS{file_format_version = Head#head.version,
+ bchunk_format_version = ?BCHUNK_FORMAT_VERSION,
+ file = filename:basename(Head#head.filename),
+ type = Head#head.type,
+ keypos = Head#head.keypos,
+ hash_method = hash_method_to_code(Head#head.hash_bif),
+ n = Head#head.n, m = Head#head.m,
+ next = Head#head.next,
+ min = Head#head.min_no_slots,
+ max = Head#head.max_no_slots,
+ no_objects = Head#head.no_objects,
+ no_keys = Head#head.no_keys, no_colls = NoColls}
+ end.
+
+%% Allow quite a lot when reading object collections.
+-define(MAXCOLL, (10 * ?CHUNK_SIZE)).
+
+%% Re-hashing a segment, starting with SlotStart.
+%%
+%% On the average, half of the keys of the slot are put in a new slot.
+%% If the old slot is i, then the new slot is i+m. The new slots
+%% reside in a newly allocated segment.
+%%
+%% -> {NewHead, ok} | throw({Head, Error})
+re_hash(Head, SlotStart) ->
+ FromSlotPos = slot_position(SlotStart),
+ ToSlotPos = slot_position(SlotStart + Head#head.m),
+ RSpec = [{FromSlotPos, 4 * ?SEGSZ}],
+ {ok, [FromBin]} = dets_utils:pread(RSpec, Head),
+ split_bins(FromBin, Head, FromSlotPos, ToSlotPos, [], [], 0).
+
+split_bins(<<>>, Head, _Pos1, _Pos2, _ToRead, _L, 0) ->
+ {Head, ok};
+split_bins(<<>>, Head, Pos1, Pos2, ToRead, L, _SoFar) ->
+ re_hash_write(Head, ToRead, L, Pos1, Pos2);
+split_bins(FB, Head, Pos1, Pos2, ToRead, L, SoFar) ->
+ <<Sz1:32, P1:32, FT/binary>> = FB,
+ <<B1:?OHDSZ/binary, _/binary>> = FB,
+ NSoFar = SoFar + Sz1,
+ NPos1 = Pos1 + ?SZOBJP*4,
+ NPos2 = Pos2 + ?SZOBJP*4,
+ if
+ NSoFar > ?MAXCOLL, ToRead =/= [] ->
+ {NewHead, ok} = re_hash_write(Head, ToRead, L, Pos1, Pos2),
+ split_bins(FB, NewHead, Pos1, Pos2, [], [], 0);
+ Sz1 =:= 0 ->
+ E = {skip,B1},
+ split_bins(FT, Head, NPos1, NPos2, ToRead, [E | L], NSoFar);
+ true ->
+ E = {Sz1,P1,B1,Pos1,Pos2},
+ NewToRead = [{P1,Sz1} | ToRead],
+ split_bins(FT, Head, NPos1, NPos2, NewToRead, [E | L], NSoFar)
+ end.
+
+re_hash_write(Head, ToRead, L, Pos1, Pos2) ->
+ check_pread2_arg(ToRead, Head),
+ {ok, Bins} = dets_utils:pread(ToRead, Head),
+ Z = <<0:32, 0:32>>,
+ {Head1, BinFS, BinTS, WsB} = re_hash_slots(Bins, L, Head, Z, [],[],[]),
+ WPos1 = Pos1 - ?SZOBJP*4*length(L),
+ WPos2 = Pos2 - ?SZOBJP*4*length(L),
+ ToWrite = [{WPos1,BinFS}, {WPos2, BinTS} | WsB],
+ dets_utils:pwrite(Head1, ToWrite).
+
+re_hash_slots(Bins, [{skip,B1} | L], Head, Z, BinFS, BinTS, WsB) ->
+ re_hash_slots(Bins, L, Head, Z, [B1 | BinFS], [Z | BinTS], WsB);
+re_hash_slots([FB | Bins], [E | L], Head, Z, BinFS, BinTS, WsB) ->
+ {Sz1,P1,B1,Pos1,Pos2} = E,
+ KeyObjs = case catch per_key(Head, FB) of
+ {'EXIT', _Error} ->
+ Bad = dets_utils:bad_object(re_hash_slots, {FB, E}),
+ throw(dets_utils:corrupt_reason(Head, Bad));
+ Else ->
+ Else
+ end,
+ case re_hash_split(KeyObjs, Head, [], 0, [], 0) of
+ {_KL, _KSz, [], 0} ->
+ Sz1 = _KSz + ?OHDSZ,
+ re_hash_slots(Bins, L, Head, Z, [B1 | BinFS], [Z | BinTS], WsB);
+ {[], 0, _ML, _MSz} -> %% Optimization.
+ Sz1 = _MSz + ?OHDSZ,
+ re_hash_slots(Bins, L, Head, Z, [Z | BinFS], [B1 | BinTS], WsB);
+ {KL, KSz, ML, MSz} when KL =/= [], KSz > 0, ML =/= [], MSz > 0 ->
+ {Head1, FS1, Ws1} =
+ updated(Head, P1, Sz1, KSz, Pos1, KL, true, foo, bar),
+ {NewHead, [{Pos2,Bin2}], Ws2} =
+ updated(Head1, 0, 0, MSz, Pos2, ML, true, foo, bar),
+ NewBinFS = case FS1 of
+ [{Pos1,Bin1}] -> [Bin1 | BinFS];
+ [] -> [B1 | BinFS] % cannot happen
+ end,
+ NewBinTS = [Bin2 | BinTS],
+ NewWsB = Ws2 ++ Ws1 ++ WsB,
+ re_hash_slots(Bins, L, NewHead, Z, NewBinFS, NewBinTS, NewWsB)
+ end;
+re_hash_slots([], [], Head, _Z, BinFS, BinTS, WsB) ->
+ {Head, BinFS, BinTS, lists:reverse(WsB)}.
+
+re_hash_split([E | KeyObjs], Head, KL, KSz, ML, MSz) ->
+ {Key,Sz,Bin,_Item,_Objs} = E,
+ New = h(Key, Head#head.hash_bif) rem Head#head.m2, % h(key) rem (m * 2)
+ if
+ New >= Head#head.m ->
+ re_hash_split(KeyObjs, Head, KL, KSz, [Bin | ML], MSz + Sz);
+ true ->
+ re_hash_split(KeyObjs, Head, [Bin | KL], KSz + Sz, ML, MSz)
+ end;
+re_hash_split([], _Head, KL, KSz, ML, MSz) ->
+ {lists:reverse(KL), KSz, lists:reverse(ML), MSz}.
+
+%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error})
+write_cache(Head) ->
+ C = Head#head.cache,
+ case dets_utils:is_empty_cache(C) of
+ true -> {Head, [], []};
+ false ->
+ {NewC, MaxInserts, PerKey} = dets_utils:reset_cache(C),
+ %% MaxNoInsertedKeys is an upper limit on the number of new keys.
+ MaxNoInsertedKeys = erlang:min(MaxInserts, length(PerKey)),
+ Head1 = Head#head{cache = NewC},
+ case may_grow(Head1, MaxNoInsertedKeys, once) of
+ {Head2, ok} ->
+ eval_work_list(Head2, PerKey);
+ HeadError ->
+ throw(HeadError)
+ end
+ end.
+
+%% -> {NewHead, ok} | {NewHead, Error}
+may_grow(Head, _N, _How) when Head#head.fixed =/= false ->
+ {Head, ok};
+may_grow(#head{access = read}=Head, _N, _How) ->
+ {Head, ok};
+may_grow(Head, _N, _How) when Head#head.next >= Head#head.max_no_slots ->
+ {Head, ok};
+may_grow(Head, N, How) ->
+ Extra = erlang:min(2*?SEGSZP, Head#head.no_keys + N - Head#head.next),
+ case catch may_grow1(Head, Extra, How) of
+ {error, _Reason} = Error -> % alloc may throw error
+ dets_utils:corrupt(Head, Error);
+ {NewHead, Reply} when is_record(Head, head) ->
+ {NewHead, Reply}
+ end.
+
+may_grow1(Head, Extra, many_times) when Extra > ?SEGSZP ->
+ Reply = grow(Head, 1, undefined),
+ self() ! ?DETS_CALL(self(), may_grow),
+ Reply;
+may_grow1(Head, Extra, _How) ->
+ grow(Head, Extra, undefined).
+
+%% -> {Head, ok} | throw({Head, Error})
+grow(Head, Extra, _SegZero) when Extra =< 0 ->
+ {Head, ok};
+grow(Head, Extra, undefined) ->
+ grow(Head, Extra, seg_zero());
+grow(Head, _Extra, _SegZero) when Head#head.next >= Head#head.max_no_slots ->
+ {Head, ok};
+grow(Head, Extra, SegZero) ->
+ #head{n = N, next = Next, m = M} = Head,
+ SegNum = Next div ?SEGSZP,
+ {Head0, W, Ws1} = allocate_segment(Head, SegZero, SegNum),
+ %% re_hash/2 will overwrite the segment, but initialize it anyway...
+ {Head1, ok} = dets_utils:pwrite(Head0, [W | Ws1]),
+ %% If re_hash fails, segp_cache has been called, but it does not matter.
+ {Head2, ok} = re_hash(Head1, N),
+ NewHead =
+ if
+ N + ?SEGSZP =:= M ->
+ Head2#head{n = 0, next = Next + ?SEGSZP, m = 2 * M, m2 = 4 * M};
+ true ->
+ Head2#head{n = N + ?SEGSZP, next = Next + ?SEGSZP}
+ end,
+ true = hash_invars(NewHead),
+ grow(NewHead, Extra - ?SEGSZP, SegZero).
+
+hash_invars(H) ->
+ hash_invars(H#head.n, H#head.m, H#head.next, H#head.min_no_slots,
+ H#head.max_no_slots).
+
+-define(M8(X), (((X) band (?SEGSZP - 1)) =:= 0)).
+hash_invars(N, M, Next, Min, Max) ->
+ ?M8(N) and ?M8(M) and ?M8(Next) and ?M8(Min) and ?M8(Max)
+ and (0 =< N) and (N =< M) and (N =< 2*Next) and (M =< Next)
+ and (Next =< 2*M) and (0 =< Min) and (Min =< Next) and (Next =< Max)
+ and (Min =< M).
+
+seg_zero() ->
+ <<0:(4*?SEGSZ)/unit:8>>.
+
+find_object(Head, Object) ->
+ Key = element(Head#head.keypos, Object),
+ Slot = db_hash(Key, Head),
+ find_object(Head, Object, Slot).
+
+find_object(H, _Obj, Slot) when Slot >= H#head.next ->
+ false;
+find_object(H, Obj, Slot) ->
+ case catch slot_objects(H, Slot) of
+ {ok, Pointer, Objects} ->
+ case lists:member(Obj, Objects) of
+ true -> {ok, Pointer};
+ false -> false
+ end;
+ _ -> false
+ end.
+
+%% -> {ok, BucketP, Objects} | throw({Head, Error})
+slot_objects(Head, Slot) ->
+ SlotPos = slot_position(Slot),
+ MaxSize = maxobjsize(Head),
+ case dets_utils:ipread(Head, SlotPos, MaxSize) of
+ {ok, {BucketSz, Pointer, <<BucketSz:32, _St:32, KeysObjs/binary>>}} ->
+ case catch bin2objs(KeysObjs, Head#head.type, []) of
+ {'EXIT', _Error} ->
+ Bad = dets_utils:bad_object(slot_objects,
+ {SlotPos, KeysObjs}),
+ throw(dets_utils:corrupt_reason(Head, Bad));
+ Objs when is_list(Objs) ->
+ {ok, Pointer, lists:reverse(Objs)}
+ end;
+ [] ->
+ {ok, 0, []};
+ BadRead -> % eof or bad badly formed binary
+ Bad = dets_utils:bad_object(slot_objects, {SlotPos, BadRead}),
+ throw(dets_utils:corrupt_reason(Head, Bad))
+ end.
+
+%%%
+%%% Cache routines depending on the dets file format.
+%%%
+
+%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error})
+eval_work_list(Head, [{Key,[{_Seq,{lookup,Pid}}]}]) ->
+ SlotPos = slot_position(db_hash(Key, Head)),
+ MaxSize = maxobjsize(Head),
+ Objs = case dets_utils:ipread(Head, SlotPos, MaxSize) of
+ {ok, {_BucketSz, _Pointer, Bin}} ->
+ case catch per_key(Head, Bin) of
+ {'EXIT', _Error} ->
+ Bad = dets_utils:bad_object(eval_work_list,
+ {SlotPos, Bin}),
+ throw(dets_utils:corrupt_reason(Head, Bad));
+ KeyObjs when is_list(KeyObjs) ->
+ case dets_utils:mkeysearch(Key, 1, KeyObjs) of
+ false ->
+ [];
+ {value, {Key,_KS,_KB,O,Os}} ->
+ case catch binobjs2terms(Os) of
+ {'EXIT', _Error} ->
+ Bad = dets_utils:bad_object
+ (eval_work_list,
+ {SlotPos, Bin, KeyObjs}),
+ throw(dets_utils:corrupt_reason
+ (Head, Bad));
+ Terms when is_list(Terms) ->
+ get_objects([O | Terms])
+ end
+ end
+ end;
+ [] ->
+ [];
+ BadRead -> % eof or bad badly formed binary
+ Bad = dets_utils:bad_object(eval_work_list,
+ {SlotPos, BadRead}),
+ throw(dets_utils:corrupt_reason(Head, Bad))
+ end,
+ {Head, [{Pid,Objs}], []};
+eval_work_list(Head, PerKey) ->
+ SWLs = tag_with_slot(PerKey, Head, []),
+ P1 = dets_utils:family(SWLs),
+ {PerSlot, SlotPositions} = remove_slot_tag(P1, [], []),
+ {ok, Bins} = dets_utils:pread(SlotPositions, Head),
+ read_buckets(PerSlot, SlotPositions, Bins, Head, [], [], [], [], 0, 0, 0).
+
+tag_with_slot([{K,_} = WL | WLs], Head, L) ->
+ tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]);
+tag_with_slot([], _Head, L) ->
+ L.
+
+remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) ->
+ remove_slot_tag(SSWLs, [SWLs | Ls], [{slot_position(S), ?SEGOBJSZ} | SPs]);
+remove_slot_tag([], Ls, SPs) ->
+ {Ls, SPs}.
+
+read_buckets([WLs | SPs], [{P1,_8} | Ss], [<<_Zero:32,P2:32>> | Bs], Head,
+ PWLs, ToRead, LU, Ws, NoObjs, NoKeys, SoFar) when P2 =:= 0 ->
+ {NewHead, NLU, NWs, No, KNo} =
+ eval_bucket_keys(WLs, P1, 0, 0, [], Head, Ws, LU),
+ NewNoObjs = No + NoObjs,
+ NewNoKeys = KNo + NoKeys,
+ read_buckets(SPs, Ss, Bs, NewHead, PWLs, ToRead, NLU, NWs,
+ NewNoObjs, NewNoKeys, SoFar);
+read_buckets([WorkLists| SPs], [{P1,_8} | Ss], [<<Size:32,P2:32>> | Bs], Head,
+ PWLs, ToRead, LU, Ws, NoObjs, NoKeys, SoFar)
+ when SoFar + Size < ?MAXCOLL; ToRead =:= [] ->
+ NewToRead = [{P2, Size} | ToRead],
+ NewPWLs = [{P2,P1,WorkLists} | PWLs],
+ NewSoFar = SoFar + Size,
+ read_buckets(SPs, Ss, Bs, Head, NewPWLs, NewToRead, LU, Ws,
+ NoObjs, NoKeys, NewSoFar);
+read_buckets(SPs, Ss, Bs, Head, PWLs0, ToRead0, LU, Ws, NoObjs, NoKeys, SoFar)
+ when SoFar > 0 ->
+ %% It pays off to sort the positions. The seek times are reduced,
+ %% at least if the file blocks are reasonably contiguous, as is
+ %% often the case.
+ PWLs = lists:keysort(1, PWLs0),
+ ToRead = lists:keysort(1, ToRead0),
+ check_pread2_arg(ToRead, Head),
+ {ok, Bins} = dets_utils:pread(ToRead, Head),
+ case catch eval_buckets(Bins, PWLs, Head, LU, Ws, 0, 0) of
+ {ok, NewHead, NLU, [], 0, 0} ->
+ read_buckets(SPs, Ss, Bs, NewHead, [], [], NLU, [],
+ NoObjs, NoKeys, 0);
+ {ok, Head1, NLU, NWs, No, KNo} ->
+ NewNoObjs = NoObjs + No,
+ NewNoKeys = NoKeys + KNo,
+ %% It does not seem to reduce seek times to sort positions
+ %% when writing (maybe because it takes several calls to
+ %% write_cache/1 to fill the file system's buffer cache).
+ {NewHead, ok} = dets_utils:pwrite(Head1, lists:reverse(NWs)),
+ read_buckets(SPs, Ss, Bs, NewHead, [], [], NLU, [],
+ NewNoObjs, NewNoKeys, 0);
+ Error ->
+ Bad = dets_utils:bad_object(read_buckets, {Bins, Error}),
+ throw(dets_utils:corrupt_reason(Head, Bad))
+ end;
+read_buckets([], [], [], Head, [], [], LU, Ws, NoObjs, NoKeys, 0) ->
+ {NewHead, NWs} = update_no_keys(Head, Ws, NoObjs, NoKeys),
+ {NewHead, LU, lists:reverse(NWs)}.
+
+eval_buckets([Bin | Bins], [SP | SPs], Head, LU, Ws, NoObjs, NoKeys) ->
+ {Pos, P1, WLs} = SP,
+ KeyObjs = per_key(Head, Bin),
+ {NewHead, NLU, NWs, No, KNo} =
+ eval_bucket_keys(WLs, P1, Pos, byte_size(Bin), KeyObjs, Head,Ws,LU),
+ eval_buckets(Bins, SPs, NewHead, NLU, NWs, NoObjs + No, NoKeys + KNo);
+eval_buckets([], [], Head, LU, Ws, NoObjs, NoKeys) ->
+ {ok, Head, LU, Ws, NoObjs, NoKeys}.
+
+eval_bucket_keys(WLs, SlotPos, Pos, OldSize, KeyObjs, Head, Ws, LU) ->
+ {NLU, Bins, BSize, No, KNo, Ch} =
+ eval_slot(WLs, KeyObjs, Head#head.type, LU, [], 0, 0, 0, false),
+ {NewHead, W1, W2} =
+ updated(Head, Pos, OldSize, BSize, SlotPos, Bins, Ch, No, KNo),
+ {NewHead, NLU, W2++W1++Ws, No, KNo}.
+
+updated(Head, Pos, OldSize, BSize, SlotPos, Bins, Ch, DeltaNoOs, DeltaNoKs) ->
+ BinsSize = BSize + ?OHDSZ,
+ if
+ Pos =:= 0, BSize =:= 0 ->
+ {Head, [], []};
+ Pos =:= 0, BSize > 0 ->
+ {Head1, NewPos, FPos} = dets_utils:alloc(Head, adjsz(BinsSize)),
+ NewHead = one_bucket_added(Head1, FPos-1),
+ W1 = {NewPos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
+ W2 = {SlotPos, <<BinsSize:32, NewPos:32>>},
+ {NewHead, [W2], [W1]};
+ Pos =/= 0, BSize =:= 0 ->
+ {Head1, FPos} = dets_utils:free(Head, Pos, adjsz(OldSize)),
+ NewHead = one_bucket_removed(Head1, FPos-1),
+ W1 = {Pos+?STATUS_POS, <<?FREE:32>>},
+ W2 = {SlotPos, <<0:32, 0:32>>},
+ {NewHead, [W2], [W1]};
+ Pos =/= 0, BSize > 0, Ch =:= false ->
+ {Head, [], []};
+ Pos =/= 0, BSize > 0 ->
+ %% Doubtful. The scan function has to be careful since
+ %% partly scanned objects may be overwritten.
+ Overwrite0 = if
+ OldSize =:= BinsSize -> same;
+ true -> sz2pos(OldSize) =:= sz2pos(BinsSize)
+ end,
+ Overwrite = if
+ Head#head.fixed =/= false ->
+ %% Make sure that if the table is
+ %% fixed, nothing is overwritten,
+ %% unless the number of objects and
+ %% the number of keys remain the same.
+ %% This is used by bchunk, which
+ %% assumes that it traverses exactly
+ %% the same number of objects and keys
+ %% (and collections) as were present
+ %% when chunking started (the table
+ %% must have been fixed).
+ (Overwrite0 =/= false) and
+ (DeltaNoOs =:= 0) and (DeltaNoKs =:= 0);
+ true ->
+ Overwrite0
+ end,
+ if
+ Overwrite =:= same ->
+ W1 = {Pos+?OHDSZ, Bins},
+ {Head, [], [W1]};
+ Overwrite ->
+ W1 = {Pos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
+ %% Pos is already there, but return {SlotPos, <8 bytes>}.
+ W2 = {SlotPos, <<BinsSize:32, Pos:32>>},
+ {Head, [W2], [W1]};
+ true ->
+ {Head1, FPosF} = dets_utils:free(Head, Pos, adjsz(OldSize)),
+ {Head2, NewPos, FPosA} =
+ dets_utils:alloc(Head1, adjsz(BinsSize)),
+ Head3 = one_bucket_added(Head2, FPosA-1),
+ NewHead = one_bucket_removed(Head3, FPosF-1),
+ W0 = {NewPos, [<<BinsSize:32, ?ACTIVE:32>> | Bins]},
+ W2 = {SlotPos, <<BinsSize:32, NewPos:32>>},
+ W1 = if
+ Pos =/= NewPos ->
+ %% W0 first.
+ [W0, {Pos+?STATUS_POS, <<?FREE:32>>}];
+ true ->
+ [W0]
+ end,
+ {NewHead, [W2], W1}
+ end
+ end.
+
+one_bucket_added(H, _Log2) when H#head.no_collections =:= undefined ->
+ H;
+one_bucket_added(H, Log2) when H#head.maxobjsize >= Log2 ->
+ NewNoColls = orddict:update_counter(Log2, 1, H#head.no_collections),
+ H#head{no_collections = NewNoColls};
+one_bucket_added(H, Log2) ->
+ NewNoColls = orddict:update_counter(Log2, 1, H#head.no_collections),
+ H#head{no_collections = NewNoColls, maxobjsize = Log2}.
+
+one_bucket_removed(H, _FPos) when H#head.no_collections =:= undefined ->
+ H;
+one_bucket_removed(H, Log2) when H#head.maxobjsize > Log2 ->
+ NewNoColls = orddict:update_counter(Log2, -1, H#head.no_collections),
+ H#head{no_collections = NewNoColls};
+one_bucket_removed(H, Log2) when H#head.maxobjsize =:= Log2 ->
+ NewNoColls = orddict:update_counter(Log2, -1, H#head.no_collections),
+ MaxObjSize = max_objsize(NewNoColls),
+ H#head{no_collections = NewNoColls, maxobjsize = MaxObjSize}.
+
+eval_slot([{Key,Commands} | WLs] = WLs0, [{K,KS,KB,O,Os} | KOs1]=KOs,
+ Type, LU, Ws, No, KNo,BSz, Ch) ->
+ case dets_utils:cmp(K, Key) of
+ 0 ->
+ Old = [O | binobjs2terms(Os)],
+ {NLU, NWs, Sz, No1, KNo1, NCh} =
+ eval_key(Key, Commands, Old, Type, KB, KS, LU, Ws, Ch),
+ eval_slot(WLs, KOs1, Type, NLU, NWs, No1 + No,
+ KNo1 + KNo, Sz + BSz, NCh);
+ -1 ->
+ eval_slot(WLs0, KOs1, Type, LU, [Ws | KB], No,
+ KNo, KS + BSz, Ch);
+ 1 ->
+ {NLU, NWs, Sz, No1, KNo1, NCh} =
+ eval_key(Key, Commands, [], Type, [], 0, LU, Ws, Ch),
+ eval_slot(WLs, KOs, Type, NLU, NWs, No1 + No,
+ KNo1 + KNo, Sz + BSz, NCh)
+ end;
+eval_slot([{Key,Commands} | WLs], [], Type, LU, Ws, No, KNo,BSz, Ch) ->
+ {NLU, NWs, Sz, No1, KNo1, NCh} =
+ eval_key(Key, Commands, [], Type, [], 0, LU, Ws, Ch),
+ eval_slot(WLs, [], Type, NLU, NWs, No1 + No, KNo1 + KNo, Sz + BSz, NCh);
+eval_slot([], [{_Key,Size,KeyBin,_,_} | KOs], Type, LU, Ws, No, KNo,BSz, Ch) ->
+ eval_slot([], KOs, Type, LU, [Ws | KeyBin], No, KNo, Size + BSz, Ch);
+eval_slot([], [], _Type, LU, Ws, No, KNo, BSz, Ch) ->
+ {LU, Ws, BSz, No, KNo, Ch}.
+
+eval_key(_K, [{_Seq,{lookup,Pid}}], [], _Type, _KeyBin, _KeySz, LU, Ws, Ch) ->
+ NLU = [{Pid, []} | LU],
+ {NLU, Ws, 0, 0, 0, Ch};
+eval_key(_K, [{_Seq,{lookup,Pid}}], Old0, _Type, KeyBin, KeySz, LU, Ws, Ch) ->
+ Old = lists:keysort(2, Old0), % sort on sequence number
+ Objs = get_objects(Old),
+ NLU = [{Pid, Objs} | LU],
+ {NLU, [Ws | KeyBin], KeySz, 0, 0, Ch};
+eval_key(K, Comms, Orig, Type, KeyBin, KeySz, LU, Ws, Ch) ->
+ Old = dets_utils:msort(Orig),
+ case eval_key1(Comms, [], Old, Type, K, LU, Ws, 0, Orig) of
+ {ok, NLU} when Old =:= [] ->
+ {NLU, Ws, 0, 0, 0, Ch};
+ {ok, NLU} ->
+ {NLU, [Ws | KeyBin], KeySz, 0, 0, Ch};
+ {NLU, NWs, NSz, No} when Old =:= [], NSz > 0 ->
+ {NLU, NWs, NSz, No, 1, true};
+ {NLU, NWs, NSz, No} when Old =/= [], NSz =:= 0 ->
+ {NLU, NWs, NSz, No, -1, true};
+ {NLU, NWs, NSz, No} ->
+ {NLU, NWs, NSz, No, 0, true}
+ end.
+
+%% First find 'delete_key' and 'lookup' commands, and handle the 'set' type.
+eval_key1([{_Seq,{insert,Term}} | L], Cs, [{Term,_,_}] = Old, Type=set, K,
+ LU, Ws, No, Orig) ->
+ eval_key1(L, Cs, Old, Type, K, LU, Ws, No, Orig);
+eval_key1([{Seq,{insert,Term}} | L], Cs, Old, Type=set, K, LU, Ws, No, Orig)
+ ->
+ NNo = No + 1 - length(Old),
+ eval_key1(L, Cs, [{Term,Seq,insert}], Type, K, LU, Ws, NNo, Orig);
+eval_key1([{_Seq,{lookup,Pid}} | L], Cs, Old, Type, Key, LU, Ws, No, Orig) ->
+ {ok, New0, NewNo} = eval_comms(Cs, Old, Type, No),
+ New = lists:keysort(2, New0), % sort on sequence number
+ Objs = get_objects(New),
+ NLU = [{Pid, Objs} | LU],
+ if
+ L =:= [] ->
+ eval_end(New, NLU, Type, Ws, NewNo, Orig);
+ true ->
+ NewOld = dets_utils:msort(New),
+ eval_key1(L, [], NewOld, Type, Key, NLU, Ws, NewNo, Orig)
+ end;
+eval_key1([{_Seq,delete_key} | L], _Cs, Old, Type, K, LU, Ws, No, Orig) ->
+ NewNo = No - length(Old),
+ eval_key1(L, [], [], Type, K, LU, Ws, NewNo, Orig);
+eval_key1([{_Seq,{delete_object,Term}} | L], Cs, [{Term,_,_}], Type=set, K,
+ LU, Ws, No, Orig) ->
+ eval_key1(L, Cs, [], Type, K, LU, Ws, No-1, Orig);
+eval_key1([{_Seq,{delete_object,_T}}| L], Cs, Old1, Type=set, K, LU,
+ Ws, No, Orig) ->
+ eval_key1(L, Cs, Old1, Type, K, LU, Ws, No, Orig);
+eval_key1([{Seq,{Comm,Term}} | L], Cs, Old, Type, K, LU, Ws, No, Orig)
+ when Type =/= set ->
+ eval_key1(L, [{Term,Seq,Comm} | Cs], Old, Type, K, LU, Ws, No, Orig);
+eval_key1([], Cs, Old, Type=set, _Key, LU, Ws, No, Orig) ->
+ [] = Cs,
+ eval_end(Old, LU, Type, Ws, No, Orig);
+eval_key1([], Cs, Old, Type, _Key, LU, Ws, No, Orig) ->
+ {ok, New, NewNo} = eval_comms(Cs, Old, Type, No),
+ eval_end(New, LU, Type, Ws, NewNo, Orig).
+
+eval_comms([], L, _Type=set, No) ->
+ {ok, L, No};
+eval_comms(Cs, Old, Type, No) ->
+ Commands = dets_utils:msort(Cs),
+ case Type of
+ bag -> eval_bag(Commands, Old, [], No);
+ duplicate_bag -> eval_dupbag(Commands, Old, [], No)
+ end.
+
+eval_end(New0, LU, Type, Ws, NewNo, Orig) ->
+ New = lists:keysort(2, New0), % sort on sequence number
+ NoChange = if
+ length(New) =/= length(Orig) -> false;
+ true ->
+ same_terms(Orig, New)
+ end,
+ if
+ NoChange ->
+ %% The key's objects have not changed.
+ {ok, LU};
+ New =:= [] ->
+ {LU, Ws, 0, NewNo};
+ true ->
+ {Ws1, Sz} = make_bins(New, [], 0),
+ if
+ Type =:= set ->
+ {LU, [Ws | Ws1], Sz, NewNo};
+ true ->
+ NSz = Sz + 4,
+ {LU, [Ws, <<NSz:32>> | Ws1], NSz, NewNo}
+ end
+ end.
+
+same_terms([E1 | L1], [E2 | L2]) when element(1, E1) =:= element(1, E2) ->
+ same_terms(L1, L2);
+same_terms([], []) ->
+ true;
+same_terms(_L1, _L2) ->
+ false.
+
+make_bins([{_Term,_Seq,B} | L], W, Sz) when is_binary(B) ->
+ make_bins(L, [W | B], Sz + byte_size(B));
+make_bins([{Term,_Seq,insert} | L], W, Sz) ->
+ B = term_to_binary(Term),
+ BSize = byte_size(B) + 4,
+ make_bins(L, [W, [<<BSize:32>> | B]], Sz + BSize);
+make_bins([], W, Sz) ->
+ {W, Sz}.
+
+get_objects([{T,_S,_BT} | L]) ->
+ [T | get_objects(L)];
+get_objects([]) ->
+ [].
+
+eval_bag([{Term1,_S1,Op}=N | L]=L0, [{Term2,_,_}=O | Old]=Old0, New, No) ->
+ case {Op, dets_utils:cmp(Term1, Term2)} of
+ {delete_object, -1} ->
+ eval_bag(L, Old0, New, No);
+ {insert, -1} ->
+ bag_object(L, Old0, New, No, [N], Term1);
+ {delete_object, 0} ->
+ bag_object(L, Old, New, No-1, [], Term1);
+ {insert, 0} ->
+ bag_object(L, Old, New, No-1, [N], Term1);
+ {_, 1} ->
+ eval_bag(L0, Old, [O | New], No)
+ end;
+eval_bag([{_Term1,_Seq1,delete_object} | L], []=Old, New, No) ->
+ eval_bag(L, Old, New, No);
+eval_bag([{Term,_Seq1,insert} = N | L], []=Old, New, No) ->
+ bag_object(L, Old, New, No, [N], Term);
+eval_bag([]=L, [O | Old], New, No) ->
+ eval_bag(L, Old, [O | New], No);
+eval_bag([], [], New, No) ->
+ {ok, New, No}.
+
+bag_object([{Term,_,insert} = N | L], Old, New, No, _N, Term) ->
+ bag_object(L, Old, New, No, [N], Term);
+bag_object([{Term,_,delete_object} | L], Old, New, No, _N, Term) ->
+ bag_object(L, Old, New, No, [], Term);
+bag_object(L, Old, New, No, [], _Term) ->
+ eval_bag(L, Old, New, No);
+bag_object(L, Old, New, No, [N], _Term) ->
+ eval_bag(L, Old, [N | New], No+1).
+
+eval_dupbag([{Term1,_S1,Op}=N | L]=L0, [{Term2,_,_}=O | Old]=Old0, New, No) ->
+ case {Op, dets_utils:cmp(Term1, Term2)} of
+ {delete_object, -1} ->
+ eval_dupbag(L, Old0, New, No);
+ {insert, -1} ->
+ dup_object(L, Old0, New, No+1, Term1, [N]);
+ {_, 0} ->
+ old_dup_object(L0, Old, New, No, Term1, [O]);
+ {_, 1} ->
+ eval_dupbag(L0, Old, [O | New], No)
+ end;
+eval_dupbag([{_Term1,_Seq1,delete_object} | L], []=Old, New, No) ->
+ eval_dupbag(L, Old, New, No);
+eval_dupbag([{Term,_Seq1,insert} = N | L], []=Old, New, No) ->
+ dup_object(L, Old, New, No+1, Term, [N]);
+eval_dupbag([]=L, [O | Old], New, No) ->
+ eval_dupbag(L, Old, [O | New], No);
+eval_dupbag([], [], New, No) ->
+ {ok, New, No}.
+
+old_dup_object(L, [{Term,_,_} = Obj | Old], New, No, Term, N) ->
+ old_dup_object(L, Old, New, No, Term, [Obj | N]);
+old_dup_object(L, Old, New, No, Term, N) ->
+ dup_object(L, Old, New, No, Term, N).
+
+dup_object([{Term,_,insert} = Obj | L], Old, New, No, Term, Q) ->
+ dup_object(L, Old, New, No+1, Term, [Obj | Q]);
+dup_object([{Term,_Seq,delete_object} | L], Old, New, No, Term, Q) ->
+ %% All objects are deleted.
+ NewNo = No - length(Q),
+ dup_object(L, Old, New, NewNo, Term, []);
+dup_object(L, Old, New, No, _Term, Q) ->
+ eval_dupbag(L, Old, Q ++ New, No).
+
+%% Update no_keys on the file too, if the number of segments that
+%% dets:fsck/6 uses for estimate has changed.
+update_no_keys(Head, Ws, 0, 0) -> {Head, Ws};
+update_no_keys(Head, Ws, DeltaObjects, DeltaKeys) ->
+ NoKeys = Head#head.no_keys,
+ NewNoKeys = NoKeys + DeltaKeys,
+ NewNoObject = Head#head.no_objects + DeltaObjects,
+ NewHead = Head#head{no_objects = NewNoObject, no_keys = NewNoKeys},
+ NWs =
+ if
+ NewNoKeys > NewHead#head.max_no_slots ->
+ Ws;
+ NoKeys div ?SEGSZP =:= NewNoKeys div ?SEGSZP ->
+ Ws;
+ true ->
+ [{0, file_header(NewHead, 0, ?NOT_PROPERLY_CLOSED)} | Ws]
+ end,
+ {NewHead, NWs}.
+
+slot_position(S) ->
+ SegNo = ?SLOT2SEG(S), % S div ?SEGSZP
+ PartPos = ?SEGARRADDR(?SEG2SEGARRPART(SegNo)), % SegNo div ?SEGPARTSZ
+ Part = get_arrpart(PartPos),
+ Pos = ?SEGPARTADDR(Part, SegNo),
+ get_segp(Pos) + (?SEGOBJSZ * ?REM2(S, ?SEGSZP)).
+
+check_pread2_arg([{_Pos,Sz}], Head) when Sz > ?MAXCOLL ->
+ case check_pread_arg(Sz, Head) of
+ true ->
+ ok;
+ false ->
+ Bad = dets_utils:bad_object(check_pread2_arg, Sz),
+ throw(dets_utils:corrupt_reason(Head, Bad))
+ end;
+check_pread2_arg(_ToRead, _Head) ->
+ ok.
+
+check_pread_arg(Sz, Head) when Sz > ?MAXCOLL ->
+ maxobjsize(Head) >= Sz;
+check_pread_arg(_Sz, _Head) ->
+ true.
+
+%% Inlined.
+segp_cache(Pos, Segment) ->
+ put(Pos, Segment).
+
+%% Inlined.
+get_segp(Pos) ->
+ get(Pos).
+
+arrpart_cache(Pos, ArrPart) ->
+ put(Pos, ArrPart).
+
+%% Inlined.
+get_arrpart(Pos) ->
+ get(Pos).
+
+sz2pos(N) ->
+ 1 + dets_utils:log2(N).
+
+%% Inlined. Compensates for the bug in dets_utils:sz2pos/1.
+adjsz(N) ->
+ N-1.
+
+%% Inlined.
+maxobjsize(Head) when Head#head.maxobjsize =:= undefined ->
+ ?POW(32);
+maxobjsize(Head) ->
+ ?POW(Head#head.maxobjsize).
+
+scan_objs(Head, Bin, From, To, L, Ts, R, Type) ->
+ case catch scan_skip(Bin, From, To, L, Ts, R, Type, 0) of
+ {'EXIT', _Reason} ->
+ bad_object;
+ Reply = {more, _From1, _To, _L, _Ts, _R, Size} when Size > ?MAXCOLL ->
+ case check_pread_arg(Size, Head) of
+ true -> Reply;
+ false -> bad_object
+ end;
+ Reply ->
+ Reply
+ end.
+
+scan_skip(Bin, From, To, L, Ts, R, Type, Skip) ->
+ From1 = From + Skip,
+ case Bin of
+ _ when From1 >= To ->
+ if
+ From1 > To; L =:= <<>> ->
+ {more, From1, To, L, Ts, R, 0};
+ true ->
+ <<From2:32, To1:32, L1/binary>> = L,
+ Skip1 = From2 - From,
+ scan_skip(Bin, From, To1, L1, Ts, R, Type, Skip1)
+ end;
+ <<_:Skip/binary, _Size:32, St:32, _Sz:32, KO/binary>>
+ when St =/= ?ACTIVE, St =/= ?FREE ->
+ %% Neither ?ACTIVE nor ?FREE is a multiple of ?BUMP and
+ %% thus cannot be found in segments or segment array
+ %% parts.
+ scan_skip(KO, From1+12, To, L, Ts, R, Type, ?ACTUAL_SEG_SIZE-12);
+ <<_:Skip/binary, Size:32, _St:32, Sz:32, KO/binary>>
+ when Size-12 =< byte_size(KO) ->
+ %% St = ?FREE means that the object was deleted after
+ %% scanning started
+ bin2bins(KO, From1+12, To, L, Ts, R, Type, Size, Sz);
+ <<_:Skip/binary, Size:32, _St:32, _Sz:32, _KO/binary>> ->
+ {more, From1, To, L, Ts, R, Size};
+ _ when Skip >= 0 ->
+ {more, From1, To, L, Ts, R, 0}
+ end.
+
+%% Appends objects in reversed order. All objects of the slot are
+%% extracted. Note that binary_to_term/1 ignores garbage at the end.
+bin2bins(Bin, From, To, L, Ts, R, Type=set, Size, ObjSz0) ->
+ ObjsSz1 = Size - ObjSz0,
+ if
+ ObjsSz1 =:= ?OHDSZ ->
+ slot_end(Bin, From, To, L, [Bin | Ts], R, Type, Size, 1);
+ true ->
+ ObjSz = ObjSz0-4,
+ <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
+ bins_set(T, From, To, L, [Bin | Ts], R, Type, Size, 2,
+ NObjSz, ObjsSz1-NObjSz, Bin)
+ end;
+bin2bins(<<ObjSz:32, Bin/binary>> = KO, From, To, L, Ts, R, Type, Size, Sz) ->
+ bins_bag(Bin, From, To, L, Ts, R, Type, Size, 1,
+ Sz-ObjSz-4, ObjSz-4, Size-Sz, KO).
+
+bins_set(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _ObjSz0, ?OHDSZ, KO) ->
+ slot_end(KO, From, To, L, [Bin | Ts], R, Type, Size, NoObjs);
+bins_set(Bin, From, To, L, Ts, R, Type, Size, NoObjs, ObjSz0, ObjsSz, KO) ->
+ ObjSz = ObjSz0 - 4,
+ <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
+ bins_set(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
+ NObjSz, ObjsSz-NObjSz, KO).
+
+bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, Sz, ObjSz, ObjsSz, KO)
+ when Sz > 0 ->
+ <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
+ bins_bag(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
+ Sz-NObjSz, NObjSz-4, ObjsSz, KO);
+bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _Z, _ObjSz, ?OHDSZ, KO) ->
+ slot_end(KO, From, To, L, [Bin | Ts], R, Type, Size, NoObjs);
+bins_bag(Bin, From, To, L, Ts, R, Type, Size, NoObjs, _Z, ObjSz, ObjsSz, KO) ->
+ <<_:ObjSz/binary, Sz:32, NObjSz:32, T/binary>> = Bin,
+ bins_bag(T, From, To, L, [Bin | Ts], R, Type, Size, NoObjs + 1,
+ Sz-NObjSz-4, NObjSz-4, ObjsSz-Sz, KO).
+
+slot_end(KO, From, To, L, Ts, R, Type, Size, NoObjs) ->
+ Skip = ?POW(dets_utils:log2(Size)) - 12, % expensive...
+ if
+ R >= 0 ->
+ scan_skip(KO, From, To, L, Ts, R+Size, Type, Skip);
+ true ->
+ %% Should check this at the end of every key.
+ case R + NoObjs of
+ R1 when R1 >= -1 ->
+ From1 = From + Skip,
+ Bin1 = case KO of
+ <<_:Skip/binary, B/binary>> -> B;
+ _ -> <<>>
+ end,
+ {stop, Bin1, From1, To, L, Ts};
+ R1 ->
+ scan_skip(KO, From, To, L, Ts, R1, Type, Skip)
+ end
+ end.
+
+%%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%%
+
+file_info(FH) ->
+ #fileheader{closed_properly = CP, keypos = Kp,
+ m = M, next = Next, n = N, version = Version,
+ type = Type, no_objects = NoObjects, no_keys = NoKeys}
+ = FH,
+ if
+ CP =:= 0 ->
+ {error, not_closed};
+ FH#fileheader.cookie =/= ?MAGIC ->
+ {error, not_a_dets_file};
+ FH#fileheader.version =/= ?FILE_FORMAT_VERSION ->
+ {error, bad_version};
+ true ->
+ {ok, [{closed_properly,CP},{keypos,Kp},{m, M},{n,N},
+ {next,Next},{no_objects,NoObjects},{no_keys,NoKeys},
+ {type,Type},{version,Version}]}
+ end.
+
+v_segments(#head{}=H) ->
+ v_parts(H, 0, 0).
+
+v_parts(_H, ?SEGARRSZ, _SegNo) ->
+ done;
+v_parts(H, PartNo, SegNo) ->
+ Fd = H#head.fptr,
+ PartPos = dets_utils:read_4(Fd, ?SEGARRADDR(PartNo)),
+ if
+ PartPos =:= 0 ->
+ done;
+ true ->
+ PartBin = dets_utils:pread_n(Fd, PartPos, ?SEGPARTSZ*4),
+ v_segments(H, PartBin, PartNo+1, SegNo)
+ end.
+
+v_segments(H, <<>>, PartNo, SegNo) ->
+ v_parts(H, PartNo, SegNo);
+v_segments(_H, <<0:32,_/binary>>, _PartNo, _SegNo) ->
+ done;
+v_segments(H, <<Seg:32,T/binary>>, PartNo, SegNo) ->
+ io:format("<~w>SEGMENT ~w~n", [Seg, SegNo]),
+ v_segment(H, SegNo, Seg, 0),
+ v_segments(H, T, PartNo, SegNo+1).
+
+v_segment(_H, _, _SegPos, ?SEGSZP) ->
+ done;
+v_segment(H, SegNo, SegPos, SegSlot) ->
+ Slot = SegSlot + (SegNo * ?SEGSZP),
+ BucketP = SegPos + (4 * ?SZOBJP * SegSlot),
+ case catch read_bucket(H, BucketP, H#head.type) of
+ {'EXIT', Reason} ->
+ dets_utils:vformat("** dets: Corrupt or truncated dets file~n",
+ []),
+ io:format("~nERROR ~p~n", [Reason]);
+ [] -> %% don't print empty buckets
+ true;
+ {Size, CollP, Objects} ->
+ io:format(" <~w>~w: <~w:~p>~w~n",
+ [BucketP, Slot, CollP, Size, Objects])
+ end,
+ v_segment(H, SegNo, SegPos, SegSlot+1).
+
+%% -> [] | {Pointer, [object()]} | throw(EXIT)
+read_bucket(Head, Position, Type) ->
+ MaxSize = maxobjsize(Head),
+ case dets_utils:ipread(Head, Position, MaxSize) of
+ {ok, {Size, Pointer, <<Size:32, _Status:32, KeysObjs/binary>>}} ->
+ Objs = bin2objs(KeysObjs, Type, []),
+ {Size, Pointer, lists:reverse(Objs)};
+ [] ->
+ []
+ end.
+
+-define(SEQSTART, -(1 bsl 26)).
+
+%% -> [{Key,SizeOfWholeKey,WholeKeyBin,FirstObject,OtherObjects}] |throw(EXIT)
+%% FirstObject = {Term, Seq, Binary}
+%% Seq < 0 (and ascending).
+per_key(Head, <<BinSize:32, ?ACTIVE:32, Bin/binary>> = B) ->
+ true = (byte_size(B) =:= BinSize),
+ if
+ Head#head.type =:= set ->
+ per_set_key(Bin, Head#head.keypos, []);
+ true ->
+ per_bag_key(Bin, Head#head.keypos, [])
+ end.
+
+per_set_key(<<Size:32, T/binary>> = B, KeyPos, L) ->
+ <<KeyBin:Size/binary, R/binary>> = B,
+ Term = binary_to_term(T),
+ Key = element(KeyPos, Term),
+ Item = {Term, ?SEQSTART, KeyBin},
+ per_set_key(R, KeyPos, [{Key,Size,KeyBin,Item,[]} | L]);
+per_set_key(<<>>, KeyPos, L) when is_integer(KeyPos) ->
+ lists:reverse(L).
+
+per_bag_key(<<Size:32, ObjSz:32, T/binary>> = B, KeyPos, L) ->
+ <<KeyBin:Size/binary, R/binary>> = B,
+ ObjSz1 = ObjSz - 4,
+ Size1 = Size - ObjSz - 4,
+ <<_:ObjSz1/binary, KeyObjs:Size1/binary, _/binary>> = T,
+ <<_Size:32, Bin:ObjSz/binary, _/binary>> = B,
+ Term = binary_to_term(T),
+ Key = element(KeyPos, Term),
+ Item = {Term, ?SEQSTART, Bin},
+ per_bag_key(R, KeyPos, [{Key,Size,KeyBin,Item,KeyObjs} | L]);
+per_bag_key(<<>>, KeyPos, L) when is_integer(KeyPos) ->
+ lists:reverse(L).
+
+
+binobjs2terms(<<ObjSz:32, T/binary>> = B) ->
+ binobjs2terms(B, T, ObjSz, byte_size(B)-ObjSz, ?SEQSTART+1, []);
+binobjs2terms([] = B) ->
+ B;
+binobjs2terms(<<>>) ->
+ [].
+
+binobjs2terms(Bin, Obj, _ObjSz, _Size=0, N, L) ->
+ lists:reverse(L, [{binary_to_term(Obj), N, Bin}]);
+binobjs2terms(Bin, Bin1, ObjSz, Size, N, L) ->
+ <<B:ObjSz/binary, T/binary>> = Bin,
+ <<NObjSz:32, T1/binary>> = T,
+ Item = {binary_to_term(Bin1), N, B},
+ binobjs2terms(T, T1, NObjSz, Size-NObjSz, N+1, [Item | L]).
+
+
+%% Appends objects in reversed order.
+bin2objs(KeysObjs, set, Ts) ->
+ <<ObjSz:32, T/binary>> = KeysObjs,
+ bin2objs(T, ObjSz-4, byte_size(KeysObjs)-ObjSz, Ts);
+bin2objs(KeysObjs, _Type, Ts) ->
+ bin2objs2(KeysObjs, Ts).
+
+bin2objs2(<<Size:32, ObjSz:32, T/binary>>, Ts) ->
+ bin2objs(T, ObjSz-4, Size-ObjSz-4, Ts);
+bin2objs2(<<>>, Ts) ->
+ Ts.
+
+bin2objs(Bin, ObjSz, _Size=0, Ts) ->
+ <<_:ObjSz/binary, T/binary>> = Bin,
+ bin2objs2(T, [binary_to_term(Bin) | Ts]);
+bin2objs(Bin, ObjSz, Size, Ts) ->
+ <<_:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
+ bin2objs(T, NObjSz-4, Size-NObjSz, [binary_to_term(Bin) | Ts]).
+
+
+bin2keybins(KeysObjs, Head) when Head#head.type =:= set ->
+ <<ObjSz:32, T/binary>> = KeysObjs,
+ bin2keybins(T, Head#head.keypos, ObjSz-4, byte_size(KeysObjs)-ObjSz,[]);
+bin2keybins(KeysObjs, Head) ->
+ bin2keybins2(KeysObjs, Head#head.keypos, []).
+
+bin2keybins2(<<Size:32, ObjSz:32, T/binary>>, Kp, L) ->
+ bin2keybins(T, Kp, ObjSz-4, Size-ObjSz-4, L);
+bin2keybins2(<<>>, Kp, L) when is_integer(Kp) ->
+ lists:reverse(L).
+
+bin2keybins(Bin, Kp, ObjSz, _Size=0, L) ->
+ <<Obj:ObjSz/binary, T/binary>> = Bin,
+ Term = binary_to_term(Obj),
+ bin2keybins2(T, Kp, [{element(Kp, Term),Obj} | L]);
+bin2keybins(Bin, Kp, ObjSz, Size, L) ->
+ <<Obj:ObjSz/binary, NObjSz:32, T/binary>> = Bin,
+ Term = binary_to_term(Obj),
+ bin2keybins(T, Kp, NObjSz-4, Size-NObjSz, [{element(Kp,Term),Obj} | L]).