From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/stdlib/src/dets_v8.erl | 1591 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1591 insertions(+) create mode 100644 lib/stdlib/src/dets_v8.erl (limited to 'lib/stdlib/src/dets_v8.erl') diff --git a/lib/stdlib/src/dets_v8.erl b/lib/stdlib/src/dets_v8.erl new file mode 100644 index 0000000000..b24df02882 --- /dev/null +++ b/lib/stdlib/src/dets_v8.erl @@ -0,0 +1,1591 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(dets_v8). + +%% Dets files, implementation part. This module handles versions up to +%% and including 8(c). To be called from dets.erl only. + +-export([constants/0, mark_dirty/1, read_file_header/2, + check_file_header/2, do_perform_save/1, initiate_file/11, + init_freelist/2, fsck_input/4, + bulk_input/3, output_objs/4, write_cache/1, may_grow/3, + find_object/2, re_hash/2, slot_objs/2, scan_objs/8, + db_hash/2, no_slots/1, table_parameters/1]). + +-export([file_info/1, v_segments/1]). + +-export([cache_segps/3]). + +%% For backward compatibility. +-export([sz2pos/1]). + +-compile({inline, [{sz2pos,1},{scan_skip,7}]}). +-compile({inline, [{skip_bytes,5}, {get_segp,1}]}). +-compile({inline, [{wl_lookup,5}]}). +-compile({inline, [{actual_seg_size,0}]}). + +-include("dets.hrl"). + +%% The layout of the file is : +%% +%% bytes decsription +%% ---------------------- File header +%% 4 FreelistsPointer +%% 4 Cookie +%% 4 ClosedProperly (pos=8) +%% 4 Type (pos=12) +%% 4 Version (pos=16) +%% 4 M +%% 4 Next +%% 4 KeyPos +%% 4 NoObjects +%% 4 N +%% ------------------ end of file header +%% 4*8192 SegmentArray +%% ------------------ +%% 4*256 First segment +%% ----------------------------- This is BASE. +%% ??? Objects (free and alive) +%% 4*256 Second segment (2 kB now, due to a bug) +%% ??? Objects (free and alive) +%% ... more objects and segments ... +%% ----------------------------- +%% ??? Free lists +%% ----------------------------- +%% 4 File size, in bytes. + +%% The first slot (0) in the segment array always points to the +%% pre-allocated first segment. +%% Before we can find an object we must find the slot where the +%% object resides. Each slot is a (possibly empty) list (or chain) of +%% objects that hash to the same slot. If the value stored in the +%% slot is zero, the slot chain is empty. If the slot value is +%% non-zero, the value points to a position in the file where the +%% chain starts. Each object in a chain has the following layout: +%% +%% bytes decsription +%% -------------------- +%% 4 Pointer to the next object of the chain. +%% 4 Size of the object in bytes (Sz). +%% 4 Status (FREE or ACTIVE) +%% Sz Binary representing the object +%% +%% The status field is used while repairing a file (but not next or size). +%% +%%|---------------| +%%| head | +%%| | +%%| | +%%|_______________| +%%| |------| +%%|___seg ptr1____| | +%%| | | +%%|__ seg ptr 2___| | +%%| | | segment 1 +%%| .... | V _____________ +%% | | +%% | | +%% |___slot 0 ____| +%% | | +%% |___slot 1 ____|-----| +%% | | | +%% | ..... | | 1:st obj in slot 1 +%% V segment 1 +%% |-----------| +%% | next | +%% |___________| +%% | size | +%% |___________| +%% | status | +%% |___________| +%% | | +%% | | +%% | obj | +%% | | + +%%% +%%% File header +%%% + +-define(HEADSZ, 40). % The size of the file header, in bytes. +-define(SEGSZ, 256). % Size of a segment, in words. +-define(SEGSZ_LOG2, 8). +-define(SEGARRSZ, 8192). % Maximal number of segments. +-define(SEGADDR(SegN), (?HEADSZ + (4 * (SegN)))). +-define(BASE, ?SEGADDR((?SEGSZ + ?SEGARRSZ))). +-define(MAXOBJS, (?SEGSZ * ?SEGARRSZ)). % 2 M objects + +-define(SLOT2SEG(S), ((S) bsr ?SEGSZ_LOG2)). + +%% BIG is used for hashing. BIG must be greater than the maximum +%% number of slots, currently MAXOBJS. +-define(BIG, 16#ffffff). + +%% Hard coded positions into the file header: +-define(FREELIST_POS, 0). +-define(CLOSED_PROPERLY_POS, 8). +-define(D_POS, 20). +-define(NO_OBJECTS_POS, (?D_POS + 12)). + +%% The version of a dets file is indicated by the ClosedProperly +%% field. Version 6 was used in the R1A release, and version 7 in the +%% R1B release up to and including the R3B01 release. Both version 6 +%% and version 7 indicate properly closed files by the value +%% CLOSED_PROPERLY. +%% +%% The current version, 8, has three sub-versions: +%% +%% - 8(a), indicated by the value CLOSED_PROPERLY (same as in versions 6 +%% and 7), introduced in R3B02; +%% - 8(b), indicated by the value CLOSED_PROPERLY2(_NEED_COMPACTING), +%% introduced in R5A and used up to and including R6A; +%% - 8(c), indicated by the value CLOSED_PROPERLY_NEW_HASH(_NEED_COMPACTING), +%% in use since R6B. +%% +%% The difference between the 8(a) and the 8(b) versions is the format +%% used for free lists saved on dets files. +%% The 8(c) version uses a different hashing algorithm, erlang:phash +%% (former versions use erlang:hash). +%% Version 8(b) files are only converted to version 8(c) if repair is +%% done, so we need compatability with 8(b) for a _long_ time. +%% +%% There are known bugs due to the fact that keys and objects are +%% sometimes compared (==) and sometimes matched (=:=). The version +%% used by default (9, see dets_v9.erl) does not have this problem. + +-define(NOT_PROPERLY_CLOSED,0). +-define(CLOSED_PROPERLY,1). +-define(CLOSED_PROPERLY2,2). +-define(CLOSED_PROPERLY2_NEED_COMPACTING,3). +-define(CLOSED_PROPERLY_NEW_HASH,4). +-define(CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING,5). + +-define(FILE_FORMAT_VERSION, 8). +-define(CAN_BUMP_BY_REPAIR, [6, 7]). +-define(CAN_CONVERT_FREELIST, [8]). + +%%% +%%% Object header (next, size, status). +%%% + +-define(OHDSZ, 12). % The size of the object header, in bytes. +-define(STATUS_POS, 8). % Position of the status field. + +%% The size of each object is a multiple of 16. +%% BUMP is used when repairing files. +-define(BUMP, 16). + +-define(ReadAhead, 512). + +%%-define(DEBUGF(X,Y), io:format(X, Y)). +-define(DEBUGF(X,Y), void). + +%% {Bump} +constants() -> + {?BUMP, ?BASE}. + +%% -> ok | throw({NewHead,Error}) +mark_dirty(Head) -> + Dirty = [{?CLOSED_PROPERLY_POS, <>}], + dets_utils:pwrite(Head, Dirty), + dets_utils:sync(Head), + dets_utils:position(Head, Head#head.freelists_p), + dets_utils:truncate(Head, cur). + +%% -> {ok, head()} | throw(Error) +initiate_file(Fd, Tab, Fname, Type, Kp, MinSlots, MaxSlots, + Ram, CacheSz, Auto, _DoInitSegments) -> + Freelist = 0, + Cookie = ?MAGIC, + ClosedProperly = ?NOT_PROPERLY_CLOSED, % immediately overwritten + Version = ?FILE_FORMAT_VERSION, + Factor = est_no_segments(MinSlots), + N = 0, + M = Next = ?SEGSZ * Factor, + NoObjects = 0, + dets_utils:pwrite(Fd, Fname, 0, + <>), % Initialize first segment + %% We must set the first slot of the segment pointer array to + %% point to the first segment + Pos = ?SEGADDR(0), + SegP = (?HEADSZ + (4 * ?SEGARRSZ)), + dets_utils:pwrite(Fd, Fname, Pos, <>), + segp_cache(Pos, SegP), + + Ftab = dets_utils:init_alloc(?BASE), + H0 = #head{freelists=Ftab, fptr = Fd, base = ?BASE}, + {H1, Ws} = init_more_segments(H0, 1, Factor, undefined, []), + + %% This is not optimal but simple: always initiate the segments. + dets_utils:pwrite(Fd, Fname, Ws), + + %% Return a new nice head structure + Head = #head{ + m = M, + m2 = M * 2, + next = Next, + fptr = Fd, + no_objects = NoObjects, + n = N, + type = Type, + update_mode = dirty, + freelists = H1#head.freelists, + auto_save = Auto, + hash_bif = phash, + keypos = Kp, + min_no_slots = Factor * ?SEGSZ, + max_no_slots = no_segs(MaxSlots) * ?SEGSZ, + + ram_file = Ram, + filename = Fname, + name = Tab, + cache = dets_utils:new_cache(CacheSz), + version = Version, + bump = ?BUMP, + base = ?BASE, + mod = ?MODULE + }, + {ok, Head}. + +est_no_segments(MinSlots) when 1 + ?SLOT2SEG(MinSlots) > ?SEGARRSZ -> + ?SEGARRSZ; +est_no_segments(MinSlots) -> + 1 + ?SLOT2SEG(MinSlots). + +init_more_segments(Head, SegNo, Factor, undefined, Ws) when SegNo < Factor -> + init_more_segments(Head, SegNo, Factor, seg_zero(), Ws); +init_more_segments(Head, SegNo, Factor, SegZero, Ws) when SegNo < Factor -> + {NewHead, W} = allocate_segment(Head, SegZero, SegNo), + init_more_segments(NewHead, SegNo+1, Factor, SegZero, W++Ws); +init_more_segments(Head, _SegNo, _Factor, _SegZero, Ws) -> + {Head, Ws}. + +allocate_segment(Head, SegZero, SegNo) -> + %% may throw error: + {NewHead, Segment, _} = dets_utils:alloc(Head, 4 * ?SEGSZ), + InitSegment = {Segment, SegZero}, + Pos = ?SEGADDR(SegNo), + segp_cache(Pos, Segment), + SegPointer = {Pos, <>}, + {NewHead, [InitSegment, SegPointer]}. + +%% Read free lists (using a Buddy System) from file. +init_freelist(Head, {convert_freelist,_Version}) -> + %% This function converts the saved freelist of the form + %% [{Slot1,Addr1},{Addr1,Addr2},...,{AddrN,0},{Slot2,Addr},...] + %% i.e each slot is a linked list which ends with a 0. + %% This is stored in a bplus_tree per Slot. + %% Each Slot is a position in a tuple. + + Ftab = dets_utils:empty_free_lists(), + Pos = Head#head.freelists_p, + case catch prterm(Head, Pos, ?OHDSZ) of + {0, _Sz, Term} -> + FreeList = lists:reverse(Term), + dets_utils:init_slots_from_old_file(FreeList, Ftab); + _ -> + throw({error, {bad_freelists, Head#head.filename}}) + end; +init_freelist(Head, _) -> + %% bplus_tree stored as is + Pos = Head#head.freelists_p, + case catch prterm(Head, Pos, ?OHDSZ) of + {0, _Sz, Term} -> + Term; + _ -> + throw({error, {bad_freelists, Head#head.filename}}) + end. + +%% -> {ok, Fd, fileheader()} | throw(Error) +read_file_header(Fd, FileName) -> + {ok, Bin} = dets_utils:pread_close(Fd, FileName, 0, ?HEADSZ), + [Freelist, Cookie, CP, Type2, Version, M, Next, Kp, NoObjects, N] = + bin2ints(Bin), + {ok, EOF} = dets_utils:position_close(Fd, FileName, eof), + {ok, <>} = dets_utils:pread_close(Fd, FileName, EOF-4, 4), + FH = #fileheader{freelist = Freelist, + cookie = Cookie, + closed_properly = CP, + type = dets_utils:code_to_type(Type2), + version = Version, + m = M, + next = Next, + keypos = Kp, + no_objects = NoObjects, + min_no_slots = ?DEFAULT_MIN_NO_SLOTS, + max_no_slots = ?DEFAULT_MAX_NO_SLOTS, + trailer = FileSize, + eof = EOF, + n = N, + mod = ?MODULE}, + {ok, Fd, FH}. + +%% -> {ok, head(), ExtraInfo} | {error, Reason} (Reason lacking file name) +%% ExtraInfo = {convert_freelist, Version} | true | need_compacting +check_file_header(FH, Fd) -> + Test = + if + FH#fileheader.cookie =/= ?MAGIC -> + {error, not_a_dets_file}; + FH#fileheader.type =:= badtype -> + {error, invalid_type_code}; + FH#fileheader.version =/= ?FILE_FORMAT_VERSION -> + case lists:member(FH#fileheader.version, + ?CAN_BUMP_BY_REPAIR) of + true -> + {error, version_bump}; + false -> + {error, bad_version} + end; + FH#fileheader.trailer =/= FH#fileheader.eof -> + {error, not_closed}; + FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY -> + case lists:member(FH#fileheader.version, + ?CAN_CONVERT_FREELIST) of + true -> + {ok, {convert_freelist, FH#fileheader.version}, hash}; + false -> + {error, not_closed} % should not happen + end; + FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY2 -> + {ok, true, hash}; + FH#fileheader.closed_properly =:= + ?CLOSED_PROPERLY2_NEED_COMPACTING -> + {ok, need_compacting, hash}; + FH#fileheader.closed_properly =:= ?CLOSED_PROPERLY_NEW_HASH -> + {ok, true, phash}; + FH#fileheader.closed_properly =:= + ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING -> + {ok, need_compacting, phash}; + FH#fileheader.closed_properly =:= ?NOT_PROPERLY_CLOSED -> + {error, not_closed}; + FH#fileheader.closed_properly > + ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING -> + {error, not_closed}; + true -> + {error, not_a_dets_file} + end, + case Test of + {ok, ExtraInfo, HashAlg} -> + H = #head{ + m = FH#fileheader.m, + m2 = FH#fileheader.m * 2, + next = FH#fileheader.next, + fptr = Fd, + no_objects= FH#fileheader.no_objects, + n = FH#fileheader.n, + type = FH#fileheader.type, + update_mode = saved, + auto_save = infinity, % not saved on file + fixed = false, % not saved on file + freelists_p = FH#fileheader.freelist, + hash_bif = HashAlg, + keypos = FH#fileheader.keypos, + min_no_slots = FH#fileheader.min_no_slots, + max_no_slots = FH#fileheader.max_no_slots, + version = ?FILE_FORMAT_VERSION, + mod = ?MODULE, + bump = ?BUMP, + base = ?BASE}, + {ok, H, ExtraInfo}; + Error -> + Error + end. + +cache_segps(Fd, FileName, M) -> + NSegs = no_segs(M), + {ok, Bin} = dets_utils:pread_close(Fd, FileName, ?HEADSZ, 4 * NSegs), + Fun = fun(S, P) -> segp_cache(P, S), P+4 end, + lists:foldl(Fun, ?HEADSZ, bin2ints(Bin)). + +no_segs(NoSlots) -> + ?SLOT2SEG(NoSlots - 1) + 1. + +bin2ints(<>) -> + [Int | bin2ints(B)]; +bin2ints(<<>>) -> + []. + +%%% +%%% Repair, conversion and initialization of a dets file. +%%% + +bulk_input(Head, InitFun, Cntrs) -> + bulk_input(Head, InitFun, Cntrs, make_ref()). + +bulk_input(Head, InitFun, Cntrs, Ref) -> + fun(close) -> + ok; + (read) -> + case catch {Ref, InitFun(read)} of + {Ref, end_of_input} -> + end_of_input; + {Ref, {L0, NewInitFun}} when is_list(L0), + is_function(NewInitFun) -> + Kp = Head#head.keypos, + case catch bulk_objects(L0, Head, Cntrs, Kp, []) of + {'EXIT', _Error} -> + _ = (catch NewInitFun(close)), + {error, invalid_objects_list}; + L -> + {L, bulk_input(Head, NewInitFun, Cntrs, Ref)} + end; + {Ref, Value} -> + {error, {init_fun, Value}}; + Error -> + throw({thrown, Error}) + end + end. + +bulk_objects([T | Ts], Head, Cntrs, Kp, L) -> + BT = term_to_binary(T), + Sz = byte_size(BT), + LogSz = sz2pos(Sz+?OHDSZ), + count_object(Cntrs, LogSz), + Key = element(Kp, T), + bulk_objects(Ts, Head, Cntrs, Kp, [make_object(Head, Key, LogSz, BT) | L]); +bulk_objects([], _Head, _Cntrs, _Kp, L) -> + L. + +-define(FSCK_SEGMENT, 10000). + +-define(DCT(D, CT), [D | CT]). + +-define(VNEW(N, E), erlang:make_tuple(N, E)). +-define(VSET(I, V, E), setelement(I, V, E)). +-define(VGET(I, V), element(I, V)). + +%% OldVersion not used, assuming later versions have been converted already. +output_objs(OldVersion, Head, SlotNumbers, Cntrs) -> + fun(close) -> + {ok, 0, Head}; + ([]) -> + output_objs(OldVersion, Head, SlotNumbers, Cntrs); + (L) -> + %% Descending sizes. + Count = lists:sort(ets:tab2list(Cntrs)), + RCount = lists:reverse(Count), + NoObjects = lists:foldl(fun({_Sz,No}, A) -> A + No end, 0, Count), + {_, MinSlots, _} = SlotNumbers, + if + %% Using number of objects for bags and duplicate bags + %% is not ideal; number of (unique) keys should be + %% used instead. The effect is that there will be more + %% segments than "necessary". + MinSlots =/= bulk_init, + abs(?SLOT2SEG(NoObjects) - ?SLOT2SEG(MinSlots)) > 5, + (NoObjects < ?MAXOBJS) -> + {try_again, NoObjects}; + true -> + Head1 = Head#head{no_objects = NoObjects}, + SegSz = actual_seg_size(), + {_, End, _} = dets_utils:alloc(Head, SegSz-1), + %% Now {LogSize,NoObjects} in Cntrs is replaced by + %% {LogSize,Position,{FileName,FileDescriptor},NoObjects}. + {Head2, CT} = allocate_all_objects(Head1, RCount, Cntrs), + [E | Es] = bin2term(L, []), + {NE, Acc, DCT1} = + output_slots(E, Es, [E], Head2, ?DCT(0, CT)), + NDCT = write_all_sizes(DCT1, Cntrs), + Max = ets:info(Cntrs, size), + output_objs2(NE, Acc, Head2, Cntrs, NDCT, End, Max,Max) + end + end. + +output_objs2(E, Acc, Head, Cntrs, DCT, End, 0, MaxNoChunks) -> + NDCT = write_all_sizes(DCT, Cntrs), + output_objs2(E, Acc, Head, Cntrs, NDCT, End, MaxNoChunks, MaxNoChunks); +output_objs2(E, Acc, Head, Cntrs, DCT, End, ChunkI, MaxNoChunks) -> + fun(close) -> + DCT1 = output_slot(Acc, Head, DCT), + NDCT = write_all_sizes(DCT1, Cntrs), + ?DCT(NoDups, CT) = NDCT, + [SegAddr | []] = ?VGET(tuple_size(CT), CT), + FinalZ = End - SegAddr, + [{?FSCK_SEGMENT, _, {FileName, Fd}, _}] = + ets:lookup(Cntrs, ?FSCK_SEGMENT), + ok = dets_utils:fwrite(Fd, FileName, + dets_utils:make_zeros(FinalZ)), + NewHead = Head#head{no_objects = Head#head.no_objects - NoDups}, + {ok, NoDups, NewHead}; + (L) -> + Es = bin2term(L, []), + {NE, NAcc, NDCT} = output_slots(E, Es, Acc, Head, DCT), + output_objs2(NE, NAcc, Head, Cntrs, NDCT, End, + ChunkI-1, MaxNoChunks) + end. + +%% By allocating bigger objects before smaller ones, holes in the +%% buddy system memory map are avoided. Unfortunately, the segments +%% are always allocated first, so if there are objects bigger than a +%% segment, there is a hole to handle. (Haven't considered placing the +%% segments among other objects of the same size.) +allocate_all_objects(Head, Count, Cntrs) -> + SegSize = actual_seg_size(), + {Head1, HSz, HN, HA} = alloc_hole(Count, Head, SegSize), + {Max, _} = hd(Count), + CT = ?VNEW(Max+1, not_used), + {Head2, NCT} = allocate_all(Head1, Count, Cntrs, CT), + Head3 = free_hole(Head2, HSz, HN, HA), + {Head3, NCT}. + +alloc_hole([{LSize,_} | _], Head, SegSz) when ?POW(LSize-1) > SegSz -> + {_, SegAddr, _} = dets_utils:alloc(Head, SegSz-1), + Size = ?POW(LSize-1)-1, + {_, Addr, _} = dets_utils:alloc(Head, Size), + N = (Addr - SegAddr) div SegSz, + Head1 = dets_utils:alloc_many(Head, SegSz, N, SegAddr), + {Head1, SegSz-1, N, SegAddr}; +alloc_hole(_Count, Head, _SegSz) -> + {Head, 0, 0, 0}. + +free_hole(Head, _Size, 0, _Addr) -> + Head; +free_hole(Head, Size, N, Addr) -> + {Head1, _} = dets_utils:free(Head, Addr, Size), + free_hole(Head1, Size, N-1, Addr+Size+1). + +%% One (temporary) file for each buddy size, write all objects of that +%% size to the file. +allocate_all(Head, [{LSize,NoObjects} | Count], Cntrs, CT) -> + Size = ?POW(LSize-1)-1, + {_Head, Addr, _} = dets_utils:alloc(Head, Size), + NewHead = dets_utils:alloc_many(Head, Size+1, NoObjects, Addr), + {FileName, Fd} = temp_file(Head, LSize), + true = ets:insert(Cntrs, {LSize, Addr, {FileName, Fd}, NoObjects}), + NCT = ?VSET(LSize, CT, [Addr | []]), + allocate_all(NewHead, Count, Cntrs, NCT); +allocate_all(Head, [], Cntrs, CT) -> + %% Note that space for the segments has been allocated already. + %% And one file for the segments... + {FileName, Fd} = temp_file(Head, ?FSCK_SEGMENT), + Addr = ?SEGADDR(?SEGARRSZ), + true = ets:insert(Cntrs, {?FSCK_SEGMENT, Addr, {FileName, Fd}, 0}), + NCT = ?VSET(tuple_size(CT), CT, [Addr | []]), + {Head, NCT}. + +temp_file(Head, N) -> + TmpName = lists:concat([Head#head.filename, '.', N]), + {ok, Fd} = dets_utils:open(TmpName, [raw, binary, write]), + {TmpName, Fd}. + +bin2term([<> | BTs], L) -> + bin2term(BTs, [{Slot, LogSize, BinTerm} | L]); +bin2term([], L) -> + lists:reverse(L). + +write_all_sizes(?DCT(D, CT), Cntrs) -> + ?DCT(D, write_sizes(1, tuple_size(CT), CT, Cntrs)). + +write_sizes(Sz, Sz, CT, Cntrs) -> + write_size(Sz, ?FSCK_SEGMENT, CT, Cntrs); +write_sizes(Sz, MaxSz, CT, Cntrs) -> + NCT = write_size(Sz, Sz, CT, Cntrs), + write_sizes(Sz+1, MaxSz, NCT, Cntrs). + +write_size(Sz, I, CT, Cntrs) -> + case ?VGET(Sz, CT) of + not_used -> + CT; + [Addr | L] -> + {FileName, Fd} = ets:lookup_element(Cntrs, I, 3), + case file:write(Fd, lists:reverse(L)) of + ok -> + ?VSET(Sz, CT, [Addr | []]); + Error -> + dets_utils:file_error(FileName, Error) + end + end. + +output_slots(E, [E1 | Es], Acc, Head, DCT) + when element(1, E) =:= element(1, E1) -> + output_slots(E1, Es, [E1 | Acc], Head, DCT); +output_slots(_E, [E | L], Acc, Head, DCT) -> + NDCT = output_slot(Acc, Head, DCT), + output_slots(E, L, [E], Head, NDCT); +output_slots(E, [], Acc, _Head, DCT) -> + {E, Acc, DCT}. + +output_slot([E], _Head, ?DCT(D, CT)) -> + ?DCT(D, output_slot([{foo, E}], 0, foo, CT)); +output_slot(Es0, Head, ?DCT(D, CT)) -> + Kp = Head#head.keypos, + Fun = fun({_Slot, _LSize, BinTerm} = E) -> + Key = element(Kp, binary_to_term(BinTerm)), + {Key, E} + end, + Es = lists:map(Fun, Es0), + NEs = case Head#head.type of + set -> + [{Key0,_} = E | L0] = lists:sort(Es), + choose_one(lists:sort(L0), Key0, [E]); + bag -> + lists:usort(Es); + duplicate_bag -> + lists:sort(Es) + end, + Dups = D + length(Es) - length(NEs), + ?DCT(Dups, output_slot(NEs, 0, foo, CT)). + +choose_one([{Key,_} | Es], Key, L) -> + choose_one(Es, Key, L); +choose_one([{Key,_} = E | Es], _Key, L) -> + choose_one(Es, Key, [E | L]); +choose_one([], _Key, L) -> + L. + +output_slot([E | Es], Next, _Slot, CT) -> + {_Key, {Slot, LSize, BinTerm}} = E, + Size = byte_size(BinTerm), + Size2 = ?POW(LSize-1), + Pad = <<0:(Size2-Size-?OHDSZ)/unit:8>>, + BinObject = [<>, BinTerm | Pad], + [Addr | L] = ?VGET(LSize, CT), + NCT = ?VSET(LSize, CT, [Addr+Size2 | [BinObject | L]]), + output_slot(Es, Addr, Slot, NCT); +output_slot([], Next, Slot, CT) -> + I = tuple_size(CT), + [Addr | L] = ?VGET(I, CT), + {Pos, _} = slot_position(Slot), + NoZeros = Pos - Addr, + BinObject = if + NoZeros > 100 -> + [dets_utils:make_zeros(NoZeros) | <>]; + true -> + <<0:NoZeros/unit:8,Next:32>> + end, + Size = NoZeros+4, + ?VSET(I, CT, [Addr+Size | [BinObject | L]]). + +%% Does not close Fd. +fsck_input(Head, Fd, Cntrs, _FileHeader) -> + %% The file is not compressed, so the object size cannot exceed + %% the filesize, for all objects. + MaxSz = case file:position(Fd, eof) of + {ok, Pos} -> + Pos; + _ -> + (1 bsl 32) - 1 + end, + State0 = fsck_read(?BASE, Fd, []), + fsck_input1(Head, State0, Fd, MaxSz, Cntrs). + +fsck_input1(Head, State, Fd, MaxSz, Cntrs) -> + fun(close) -> + ok; + (read) -> + case State of + done -> + end_of_input; + {done, L} -> + R = count_input(Cntrs, L, []), + {R, fsck_input1(Head, done, Fd, MaxSz, Cntrs)}; + {cont, L, Bin, Pos} -> + R = count_input(Cntrs, L, []), + FR = fsck_objs(Bin, Head#head.keypos, Head, []), + NewState = fsck_read(FR, Pos, Fd, MaxSz, Head), + {R, fsck_input1(Head, NewState, Fd, MaxSz, Cntrs)} + end + end. + +%% The ets table Cntrs is used for counting objects per size. +count_input(Cntrs, [[LogSz | B] | Ts], L) -> + count_object(Cntrs, LogSz), + count_input(Cntrs, Ts, [B | L]); +count_input(_Cntrs, [], L) -> + L. + +count_object(Cntrs, LogSz) -> + case catch ets:update_counter(Cntrs, LogSz, 1) of + N when is_integer(N) -> ok; + _Badarg -> true = ets:insert(Cntrs, {LogSz, 1}) + end. + +fsck_read(Pos, F, L) -> + case file:position(F, Pos) of + {ok, _} -> + read_more_bytes(<<>>, 0, Pos, F, L); + _Error -> + {done, L} + end. + +fsck_read({more, Bin, Sz, L}, Pos, F, MaxSz, Head) when Sz > MaxSz -> + FR = skip_bytes(Bin, ?BUMP, Head#head.keypos, Head, L), + fsck_read(FR, Pos, F, MaxSz, Head); +fsck_read({more, Bin, Sz, L}, Pos, F, _MaxSz, _Head) -> + read_more_bytes(Bin, Sz, Pos, F, L); +fsck_read({new, Skip, L}, Pos, F, _MaxSz, _Head) -> + NewPos = Pos + Skip, + fsck_read(NewPos, F, L). + +read_more_bytes(B, Min, Pos, F, L) -> + Max = if + Min < ?CHUNK_SIZE -> ?CHUNK_SIZE; + true -> Min + end, + case dets_utils:read_n(F, Max) of + eof -> + {done, L}; + Bin -> + NewPos = Pos + byte_size(Bin), + {cont, L, list_to_binary([B, Bin]), NewPos} + end. + +fsck_objs(Bin = <<_N:32, Sz:32, Status:32, Tail/binary>>, Kp, Head, L) -> + if + Status =:= ?ACTIVE -> + case Tail of + <> -> + case catch element(Kp, binary_to_term(BinTerm)) of + {'EXIT', _} -> + skip_bytes(Bin, ?BUMP, Kp, Head, L); + Key -> + LogSz = sz2pos(Sz+?OHDSZ), + Obj = make_object(Head, Key, LogSz, BinTerm), + NL = [[LogSz | Obj] | L], + Skip = ?POW(LogSz-1) - Sz - ?OHDSZ, + skip_bytes(Tail2, Skip, Kp, Head, NL) + end; + _ -> + {more, Bin, Sz, L} + end; + true -> + skip_bytes(Bin, ?BUMP, Kp, Head, L) + end; +fsck_objs(Bin, _Kp, _Head, L) -> + {more, Bin, 0, L}. + +%% Version 8 has to know about version 9. +make_object(Head, Key, _LogSz, BT) when Head#head.version =:= 9 -> + Slot = dets_v9:db_hash(Key, Head), + <>; +make_object(Head, Key, LogSz, BT) -> + Slot = db_hash(Key, Head), + <>. + +%% Inlined. +skip_bytes(Bin, Skip, Kp, Head, L) -> + case Bin of + <<_:Skip/binary, Tail/binary>> -> + fsck_objs(Tail, Kp, Head, L); + _ -> + {new, Skip - byte_size(Bin), L} + end. + +%% -> {NewHead, ok} | throw({Head, Error}) +do_perform_save(H) -> + FL = dets_utils:get_freelists(H), + B = term_to_binary(FL), + Size = byte_size(B), + ?DEBUGF("size of freelist = ~p~n", [Size]), + ?DEBUGF("head.m = ~p~n", [H#head.m]), + ?DEBUGF("head.no_objects = ~p~n", [H#head.no_objects]), + + {ok, Pos} = dets_utils:position(H, eof), + H1 = H#head{freelists_p = Pos}, + W1 = {?FREELIST_POS, <>}, + W2 = {Pos, [<<0:32, Size:32, ?FREE:32>>, B]}, + + W3 = {?D_POS, <<(H1#head.m):32, + (H1#head.next):32, + (H1#head.keypos):32, + (H1#head.no_objects):32, + (H1#head.n):32>>}, + {ClosedProperly, ClosedProperlyNeedCompacitng} = + case H1#head.hash_bif of + hash -> + {?CLOSED_PROPERLY2, ?CLOSED_PROPERLY2_NEED_COMPACTING}; + phash -> + {?CLOSED_PROPERLY_NEW_HASH, + ?CLOSED_PROPERLY_NEW_HASH_NEED_COMPACTING} + end, + W4 = + if + Size > 1000, Size > H1#head.no_objects -> + {?CLOSED_PROPERLY_POS, + <>}; + true -> + {?CLOSED_PROPERLY_POS, <>} + end, + W5 = {?FILE_FORMAT_VERSION_POS, <>}, + {H2, ok} = dets_utils:pwrite(H1, [W1,W2,W3,W4,W5]), + {ok, Pos2} = dets_utils:position(H2, eof), + ?DEBUGF("Writing file size ~p, eof at ~p~n", [Pos2+4, Pos2]), + dets_utils:pwrite(H2, [{Pos2, <<(Pos2 + 4):32>>}]). + +%% -> [term()] | throw({Head, Error}) +slot_objs(H, Slot) when Slot >= H#head.next -> + '$end_of_table'; +slot_objs(H, Slot) -> + {_Pos, Chain} = chain(H, Slot), + collect_chain(H, Chain). + +collect_chain(_H, 0) -> []; +collect_chain(H, Pos) -> + {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead), + [Term | collect_chain(H, Next)]. + +db_hash(Key, Head) -> + H = h(Key, Head#head.hash_bif), + Hash = H rem Head#head.m, + if + Hash < Head#head.n -> + H rem (Head#head.m2); % H rem (2 * m) + true -> + Hash + end. + +h(I, phash) -> erlang:phash(I, ?BIG) - 1; +h(I, HF) -> erlang:HF(I, ?BIG) - 1. %% stupid BIF has 1 counts. + +no_slots(_Head) -> + undefined. + +table_parameters(_Head) -> + undefined. + +%% Re-hashing a segment, starting with SlotStart. +%% +%% On the average, half of the objects of the chain are put into a new +%% chain. If the slot of the old chain is i, then the slot of the new +%% chain is i+m. +%% Note that the insertion of objects into the new chain is simplified +%% by the fact that the chains are not sorted on key, which means that +%% each moved object can be inserted first in the new chain. +%% (It is also a fact that the objects with the same key are not sorted.) +%% +%% -> {ok, Writes} | throw({Head, Error}) +re_hash(Head, SlotStart) -> + {SlotPos, _4} = slot_position(SlotStart), + {ok, Bin} = dets_utils:pread(Head, SlotPos, 4*?SEGSZ, 0), + {Read, Cs} = split_bin(SlotPos, Bin, [], []), + re_hash_read(Head, [], Read, Cs). + +split_bin(Pos, <>, R, Cs) -> + if + P =:= 0 -> + split_bin(Pos+4, B, R, Cs); + true -> + split_bin(Pos+4, B, [{P,?ReadAhead} | R], [[Pos] | Cs]) + end; +split_bin(_Pos, <<>>, R, Cs) -> + {R, Cs}. + +re_hash_read(Head, Cs, R, RCs) -> + {ok, Bins} = dets_utils:pread(R, Head), + re_hash_read(Head, R, RCs, Bins, Cs, [], []). + +re_hash_read(Head, [{Pos, Size} | Ps], [C | Cs], + [<> | Bins], + DoneCs, R, RCs) -> + case byte_size(Bin0) of + BinSz when BinSz >= Sz -> + case catch binary_to_term(Bin0) of + {'EXIT', _Error} -> + throw(dets_utils:corrupt_reason(Head, bad_object)); + Term -> + Key = element(Head#head.keypos, Term), + New = h(Key, Head#head.hash_bif) rem Head#head.m2, + NC = case New >= Head#head.m of + true -> [{Pos,New} | C]; + false -> [Pos | C] + end, + if + Next =:= 0 -> + NDoneCs = [NC | DoneCs], + re_hash_read(Head, Ps, Cs, Bins, NDoneCs, R, RCs); + true -> + NR = [{Next,?ReadAhead} | R], + NRCs = [NC | RCs], + re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, NRCs) + end + end; + BinSz when Size =:= BinSz+?OHDSZ -> + NR = [{Pos, Sz+?OHDSZ} | R], + re_hash_read(Head, Ps, Cs, Bins, DoneCs, NR, [C | RCs]); + _BinSz -> + throw({Head, {error, {premature_eof, Head#head.filename}}}) + end; +re_hash_read(Head, [], [], [], Cs, [], []) -> + re_hash_traverse_chains(Cs, Head, [], [], []); +re_hash_read(Head, [], [], [], Cs, R, RCs) -> + re_hash_read(Head, Cs, R, RCs). + +re_hash_traverse_chains([C | Cs], Head, Rs, Ns, Ws) -> + case re_hash_find_new(C, Rs, start, start) of + false -> + re_hash_traverse_chains(Cs, Head, Rs, Ns, Ws); + {NRs, FirstNew, LastNew} -> + LastInNew = case C of + [{_,_} | _] -> true; + _ -> false + end, + N = {FirstNew, LastNew, LastInNew}, + NWs = re_hash_link(C, start, start, start, Ws), + re_hash_traverse_chains(Cs, Head, NRs, [N | Ns], NWs) + end; +re_hash_traverse_chains([], Head, Rs, Ns, Ws) -> + {ok, Bins} = dets_utils:pread(Rs, Head), + {ok, insert_new(Rs, Bins, Ns, Ws)}. + +re_hash_find_new([{Pos,NewSlot} | C], R, start, start) -> + {SPos, _4} = slot_position(NewSlot), + re_hash_find_new(C, [{SPos,4} | R], Pos, Pos); +re_hash_find_new([{Pos,_SPos} | C], R, _FirstNew, LastNew) -> + re_hash_find_new(C, R, Pos, LastNew); +re_hash_find_new([_Pos | C], R, FirstNew, LastNew) -> + re_hash_find_new(C, R, FirstNew, LastNew); +re_hash_find_new([], _R, start, start) -> + false; +re_hash_find_new([], R, FirstNew, LastNew) -> + {R, FirstNew, LastNew}. + +re_hash_link([{Pos,_SPos} | C], LastOld, start, _LastInNew, Ws) -> + re_hash_link(C, LastOld, Pos, true, Ws); +re_hash_link([{Pos,_SPos} | C], LastOld, LastNew, false, Ws) -> + re_hash_link(C, LastOld, Pos, true, [{Pos,<>} | Ws]); +re_hash_link([{Pos,_SPos} | C], LastOld, _LastNew, LastInNew, Ws) -> + re_hash_link(C, LastOld, Pos, LastInNew, Ws); +re_hash_link([Pos | C], start, LastNew, true, Ws) -> + re_hash_link(C, Pos, LastNew, false, [{Pos,<<0:32>>} | Ws]); +re_hash_link([Pos | C], LastOld, LastNew, true, Ws) -> + re_hash_link(C, Pos, LastNew, false, [{Pos,<>} | Ws]); +re_hash_link([Pos | C], _LastOld, LastNew, LastInNew, Ws) -> + re_hash_link(C, Pos, LastNew, LastInNew, Ws); +re_hash_link([], _LastOld, _LastNew, _LastInNew, Ws) -> + Ws. + +insert_new([{NewSlotPos,_4} | Rs], [<> = PB | Bins], [N | Ns], Ws) -> + {FirstNew, LastNew, LastInNew} = N, + Ws1 = case P of + 0 when LastInNew -> + Ws; + 0 -> + [{LastNew, <<0:32>>} | Ws]; + _ -> + [{LastNew, PB} | Ws] + end, + NWs = [{NewSlotPos, <>} | Ws1], + insert_new(Rs, Bins, Ns, NWs); +insert_new([], [], [], Ws) -> + Ws. + +%% When writing the cache, a 'work list' is first created: +%% WorkList = [{Key, {Delete,Lookup,[Inserted]}}] +%% Delete = keep | delete +%% Lookup = skip | lookup +%% Inserted = {object(), No} +%% No = integer() +%% If No =< 0 then there will be -No instances of object() on the file +%% when the cache has been written. If No > 0 then No instances of +%% object() will be added to the file. +%% If Delete has the value 'delete', then all objects with the key Key +%% have been deleted. (This could be viewed as a shorthand for {Object,0} +%% for each object Object on the file not mentioned in some Inserted.) +%% If Lookup has the value 'lookup', all objects with the key Key will +%% be returned. +%% + +%% -> {NewHead, [LookedUpObject], pwrite_list()} | throw({NewHead, Error}) +write_cache(Head) -> + #head{cache = C, type = Type} = Head, + case dets_utils:is_empty_cache(C) of + true -> {Head, [], []}; + false -> + {NewC, _MaxInserts, PerKey} = dets_utils:reset_cache(C), + %% NoInsertedKeys is an upper limit on the number of new keys. + {WL, NoInsertedKeys} = make_wl(PerKey, Type), + Head1 = Head#head{cache = NewC}, + case may_grow(Head1, NoInsertedKeys, once) of + {Head2, ok} -> + eval_work_list(Head2, WL); + HeadError -> + throw(HeadError) + end + end. + +make_wl(PerKey, Type) -> + make_wl(PerKey, Type, [], 0). + +make_wl([{Key,L} | PerKey], Type, WL, Ins) -> + [Cs | I] = wl(L, Type), + make_wl(PerKey, Type, [{Key,Cs} | WL], Ins+I); +make_wl([], _Type, WL, Ins) -> + {WL, Ins}. + +wl(L, Type) -> + wl(L, Type, keep, skip, 0, []). + +wl([{_Seq, delete_key} | Cs], Type, _Del, Lookup, _I, _Objs) -> + wl(Cs, Type, delete, Lookup, 0, []); +wl([{_Seq, {delete_object, Object}} | Cs], Type, Del, Lookup, I, Objs) -> + NObjs = lists:keydelete(Object, 1, Objs), + wl(Cs, Type, Del, Lookup, I, [{Object,0} | NObjs]); +wl([{_Seq, {insert, Object}} | Cs], Type, _Del, Lookup, _I, _Objs) + when Type =:= set -> + wl(Cs, Type, delete, Lookup, 1, [{Object,-1}]); +wl([{_Seq, {insert, Object}} | Cs], Type, Del, Lookup, _I, Objs) -> + NObjs = + case lists:keysearch(Object, 1, Objs) of + {value, {_, 0}} -> + lists:keyreplace(Object, 1, Objs, {Object,-1}); + {value, {_, _C}} when Type =:= bag -> % C =:= 1; C =:= -1 + Objs; + {value, {_, C}} when C < 0 -> % when Type =:= duplicate_bag + lists:keyreplace(Object, 1, Objs, {Object,C-1}); + {value, {_, C}} -> % when C > 0, Type =:= duplicate_bag + lists:keyreplace(Object, 1, Objs, {Object,C+1}); + false when Del =:= delete -> + [{Object, -1} | Objs]; + false -> + [{Object, 1} | Objs] + end, + wl(Cs, Type, Del, Lookup, 1, NObjs); +wl([{_Seq, {lookup,_Pid}=Lookup} | Cs], Type, Del, _Lookup, I, Objs) -> + wl(Cs, Type, Del, Lookup, I, Objs); +wl([], _Type, Del, Lookup, I, Objs) -> + [{Del, Lookup, Objs} | I]. + +%% -> {NewHead, ok} | {NewHead, Error} +may_grow(Head, _N, _How) when Head#head.fixed =/= false -> + {Head, ok}; +may_grow(#head{access = read}=Head, _N, _How) -> + {Head, ok}; +may_grow(Head, _N, _How) when Head#head.next >= ?MAXOBJS -> + {Head, ok}; +may_grow(Head, N, How) -> + Extra = erlang:min(2*?SEGSZ, Head#head.no_objects + N - Head#head.next), + case catch may_grow1(Head, Extra, How) of + {error, Reason} -> % alloc may throw error + {Head, {error, Reason}}; + Reply -> + Reply + end. + +may_grow1(Head, Extra, many_times) when Extra > ?SEGSZ -> + Reply = grow(Head, 1, undefined), + self() ! ?DETS_CALL(self(), may_grow), + Reply; +may_grow1(Head, Extra, _How) -> + grow(Head, Extra, undefined). + +%% -> {Head, ok} | throw({Head, Error}) +grow(Head, Extra, _SegZero) when Extra =< 0 -> + {Head, ok}; +grow(Head, Extra, undefined) -> + grow(Head, Extra, seg_zero()); +grow(Head, Extra, SegZero) -> + #head{n = N, next = Next, m = M} = Head, + SegNum = ?SLOT2SEG(Next), + {Head0, Ws1} = allocate_segment(Head, SegZero, SegNum), + {Head1, ok} = dets_utils:pwrite(Head0, Ws1), + %% If re_hash fails, segp_cache has been called, but it does not matter. + {ok, Ws2} = re_hash(Head1, N), + {Head2, ok} = dets_utils:pwrite(Head1, Ws2), + NewHead = + if + N + ?SEGSZ =:= M -> + Head2#head{n = 0, next = Next + ?SEGSZ, m = 2 * M, m2 = 4 * M}; + true -> + Head2#head{n = N + ?SEGSZ, next = Next + ?SEGSZ} + end, + grow(NewHead, Extra - ?SEGSZ, SegZero). + +seg_zero() -> + <<0:(4*?SEGSZ)/unit:8>>. + +find_object(Head, Object) -> + Key = element(Head#head.keypos, Object), + Slot = db_hash(Key, Head), + find_object(Head, Object, Slot). + +find_object(H, _Obj, Slot) when Slot >= H#head.next -> + false; +find_object(H, Obj, Slot) -> + {_Pos, Chain} = chain(H, Slot), + case catch find_obj(H, Obj, Chain) of + {ok, Pos} -> + {ok, Pos}; + _Else -> + false + end. + +find_obj(H, Obj, Pos) when Pos > 0 -> + {Next, _Sz, Term} = prterm(H, Pos, ?ReadAhead), + if + Term == Obj -> + {ok, Pos}; + true -> + find_obj(H, Obj, Next) + end. + +%% Given, a slot, return the {Pos, Chain} in the file where the +%% objects hashed to this slot reside. Pos is the position in the +%% file where the chain pointer is written and Chain is the position +%% in the file where the first object resides. +chain(Head, Slot) -> + Pos = ?SEGADDR(?SLOT2SEG(Slot)), + Segment = get_segp(Pos), + FinalPos = Segment + (4 * ?REM2(Slot, ?SEGSZ)), + {ok, <>} = dets_utils:pread(Head, FinalPos, 4, 0), + {FinalPos, Chain}. + +%%% +%%% Cache routines depending on the dets file format. +%%% + +%% -> {Head, [LookedUpObject], pwrite_list()} | throw({Head, Error}) +eval_work_list(Head, WorkLists) -> + SWLs = tag_with_slot(WorkLists, Head, []), + P1 = dets_utils:family(SWLs), + {PerSlot, SlotPositions} = remove_slot_tag(P1, [], []), + {ok, Bins} = dets_utils:pread(SlotPositions, Head), + first_object(PerSlot, SlotPositions, Bins, Head, [], [], [], []). + +tag_with_slot([{K,_} = WL | WLs], Head, L) -> + tag_with_slot(WLs, Head, [{db_hash(K, Head), WL} | L]); +tag_with_slot([], _Head, L) -> + L. + +remove_slot_tag([{S,SWLs} | SSWLs], Ls, SPs) -> + remove_slot_tag(SSWLs, [SWLs | Ls], [slot_position(S) | SPs]); +remove_slot_tag([], Ls, SPs) -> + {Ls, SPs}. + +%% The initial chain pointers and the first object in each chain are +%% read "in parallel", that is, with one call to file:pread/2 (two +%% calls altogether). The following chain objects are read one by +%% one. This is a compromise: if the chains are long and threads are +%% active, it would be faster to keep a state for each chain and read +%% the objects of the chains in parallel, but the overhead would be +%% quite substantial. + +first_object([WorkLists | SPs], [{P1,_4} | Ss], [<> | Bs], Head, + ObjsToRead, ToRead, Ls, LU) when P2 =:= 0 -> + L0 = [{old,P1}], + {L, NLU} = eval_slot(Head, ?ReadAhead, P2, WorkLists, L0, LU), + first_object(SPs, Ss, Bs, Head, ObjsToRead, ToRead, [L | Ls], NLU); +first_object([WorkLists | SPs], [{P1,_4} | Ss], [<> | Bs], Head, + ObjsToRead, ToRead, Ls, LU) -> + E = {P1,P2,WorkLists}, + first_object(SPs, Ss, Bs, Head, + [E | ObjsToRead], [{P2, ?ReadAhead} | ToRead], Ls, LU); +first_object([], [], [], Head, ObjsToRead, ToRead, Ls, LU) -> + {ok, Bins} = dets_utils:pread(ToRead, Head), + case catch eval_first(Bins, ObjsToRead, Head, Ls, LU) of + {ok, NLs, NLU} -> + case create_writes(NLs, Head, [], 0) of + {Head1, [], 0} -> + {Head1, NLU, []}; + {Head1, Ws, No} -> + {NewHead, Ws2} = update_no_objects(Head1, Ws, No), + {NewHead, NLU, Ws2} + end; + _Error -> + throw(dets_utils:corrupt_reason(Head, bad_object)) + end. + +%% Update no_objects on the file too, if the number of segments that +%% dets:fsck/6 use for estimate has changed. +update_no_objects(Head, Ws, 0) -> {Head, Ws}; +update_no_objects(Head, Ws, Delta) -> + No = Head#head.no_objects, + NewNo = No + Delta, + NWs = + if + NewNo > ?MAXOBJS -> + Ws; + ?SLOT2SEG(No) =:= ?SLOT2SEG(NewNo) -> + Ws; + true -> + [{?NO_OBJECTS_POS, <>} | Ws] + end, + {Head#head{no_objects = NewNo}, NWs}. + +eval_first([<> | Bins], + [SP | SPs], Head, Ls, LU) -> + {P1, P2, WLs} = SP, + L0 = [{old,P1}], + case byte_size(Bin) of + BinSz when BinSz >= Sz -> + Term = binary_to_term(Bin), + Key = element(Head#head.keypos, Term), + {L, NLU} = find_key(Head, P2, Next, Sz, Term, Key, WLs, L0, LU), + eval_first(Bins, SPs, Head, [L | Ls], NLU); + _BinSz -> + {L, NLU} = eval_slot(Head, Sz+?OHDSZ, P2, WLs, L0, LU), + eval_first(Bins, SPs, Head, [L | Ls], NLU) + end; +eval_first([], [], _Head, Ls, LU) -> + {ok, Ls, LU}. + +eval_slot(_Head, _TrySize, _Pos=0, [], L, LU) -> + {L, LU}; +eval_slot(Head, _TrySize, Pos=0, [WL | WLs], L, LU) -> + {_Key, {_Delete, LookUp, Objects}} = WL, + {NL, NLU} = end_of_key(Objects, LookUp, L, []), + eval_slot(Head, ?ReadAhead, Pos, WLs, NL, NLU++LU); +eval_slot(Head, TrySize, Pos, WLs, L, LU) -> + {NextPos, Size, Term} = prterm(Head, Pos, TrySize), + Key = element(Head#head.keypos, Term), + find_key(Head, Pos, NextPos, Size, Term, Key, WLs, L, LU). + +find_key(Head, Pos, NextPos, Size, Term, Key, WLs, L, LU) -> + case lists:keysearch(Key, 1, WLs) of + {value, {_, {Delete, LookUp, Objects}} = WL} -> + NWLs = lists:delete(WL, WLs), + {NewObjects, NL, LUK} = eval_object(Size, Term, Delete, LookUp, + Objects, Head, Pos, L, []), + eval_key(Key, Delete, LookUp, NewObjects, Head, NextPos, + NWLs, NL, LU, LUK); + false -> + L0 = [{old,Pos} | L], + eval_slot(Head, ?ReadAhead, NextPos, WLs, L0, LU) + end. + +eval_key(_Key, _Delete, Lookup, _Objects, Head, Pos, WLs, L, LU, LUK) + when Head#head.type =:= set -> + NLU = case Lookup of + {lookup, Pid} -> [{Pid,LUK} | LU]; + skip -> LU + end, + eval_slot(Head, ?ReadAhead, Pos, WLs, L, NLU); +eval_key(_Key, _Delete, LookUp, Objects, Head, Pos, WLs, L, LU, LUK) + when Pos =:= 0 -> + {NL, NLU} = end_of_key(Objects, LookUp, L, LUK), + eval_slot(Head, ?ReadAhead, Pos, WLs, NL, NLU++LU); +eval_key(Key, Delete, LookUp, Objects, Head, Pos, WLs, L, LU, LUK) -> + {NextPos, Size, Term} = prterm(Head, Pos, ?ReadAhead), + case element(Head#head.keypos, Term) of + Key -> + {NewObjects, NL, LUK1} = + eval_object(Size, Term, Delete, LookUp,Objects,Head,Pos,L,LUK), + eval_key(Key, Delete, LookUp, NewObjects, Head, NextPos, WLs, + NL, LU, LUK1); + Key2 -> + {L1, NLU} = end_of_key(Objects, LookUp, L, LUK), + find_key(Head, Pos, NextPos, Size, Term, Key2, WLs, L1, NLU++LU) + end. + +%% All objects in Objects have the key Key. +eval_object(Size, Term, Delete, LookUp, Objects, Head, Pos, L, LU) -> + Type = Head#head.type, + case lists:keysearch(Term, 1, Objects) of + {value, {_Object, N}} when N =:= 0 -> + L1 = [{delete,Pos,Size} | L], + {Objects, L1, LU}; + {value, {_Object, N}} when N < 0, Type =:= set -> + L1 = [{old,Pos} | L], + wl_lookup(LookUp, Objects, Term, L1, LU); + {value, {Object, _N}} when Type =:= bag -> % when N =:= 1; N =:= -1 + L1 = [{old,Pos} | L], + Objects1 = lists:keydelete(Object, 1, Objects), + wl_lookup(LookUp, Objects1, Term, L1, LU); + {value, {Object, N}} when N < 0, Type =:= duplicate_bag -> + L1 = [{old,Pos} | L], + Objects1 = lists:keyreplace(Object, 1, Objects, {Object,N+1}), + wl_lookup(LookUp, Objects1, Term, L1, LU); + {value, {_Object, N}} when N > 0, Type =:= duplicate_bag -> + L1 = [{old,Pos} | L], + wl_lookup(LookUp, Objects, Term, L1, LU); + false when Type =:= set, Delete =:= delete -> + case lists:keysearch(-1, 2, Objects) of + false -> % no inserted object, perhaps deleted objects + L1 = [{delete,Pos,Size} | L], + {[], L1, LU}; + {value, {Term2,-1}} -> + Bin2 = term_to_binary(Term2), + NSize = byte_size(Bin2), + Overwrite = + if + NSize =:= Size -> + true; + true -> + SizePos = sz2pos(Size+?OHDSZ), + NSizePos = sz2pos(NSize+?OHDSZ), + SizePos =:= NSizePos + end, + E = if + Overwrite -> + {overwrite,Bin2,Pos}; + true -> + {replace,Bin2,Pos,Size} + end, + wl_lookup(LookUp, [], Term2, [E | L], LU) + end; + false when Delete =:= delete -> + L1 = [{delete,Pos,Size} | L], + {Objects, L1, LU}; + false -> + L1 = [{old,Pos} | L], + wl_lookup(LookUp, Objects, Term, L1, LU) + end. + +%% Inlined. +wl_lookup({lookup,_}, Objects, Term, L, LU) -> + {Objects, L, [Term | LU]}; +wl_lookup(skip, Objects, _Term, L, LU) -> + {Objects, L, LU}. + +end_of_key([{Object,N0} | Objs], LookUp, L, LU) when N0 =/= 0 -> + N = abs(N0), + NL = [{insert,N,term_to_binary(Object)} | L], + NLU = case LookUp of + {lookup, _} -> + lists:duplicate(N, Object) ++ LU; + skip -> + LU + end, + end_of_key(Objs, LookUp, NL, NLU); +end_of_key([_ | Objects], LookUp, L, LU) -> + end_of_key(Objects, LookUp, L, LU); +end_of_key([], {lookup,Pid}, L, LU) -> + {L, [{Pid,LU}]}; +end_of_key([], skip, L, LU) -> + {L, LU}. + +create_writes([L | Ls], H, Ws, No) -> + {NH, NWs, NNo} = create_writes(L, H, Ws, No, 0, true), + create_writes(Ls, NH, NWs, NNo); +create_writes([], H, Ws, No) -> + {H, lists:reverse(Ws), No}. + +create_writes([{old,Pos} | L], H, Ws, No, _Next, true) -> + create_writes(L, H, Ws, No, Pos, true); +create_writes([{old,Pos} | L], H, Ws, No, Next, false) -> + W = {Pos, <>}, + create_writes(L, H, [W | Ws], No, Pos, true); +create_writes([{insert,N,Bin} | L], H, Ws, No, Next, _NextIsOld) -> + {NH, NWs, Pos} = create_inserts(N, H, Ws, Next, byte_size(Bin), Bin), + create_writes(L, NH, NWs, No+N, Pos, false); +create_writes([{overwrite,Bin,Pos} | L], H, Ws, No, Next, _) -> + Size = byte_size(Bin), + W = {Pos, [<>, Bin]}, + create_writes(L, H, [W | Ws], No, Pos, true); +create_writes([{replace,Bin,Pos,OSize} | L], H, Ws, No, Next, _) -> + Size = byte_size(Bin), + {H1, _} = dets_utils:free(H, Pos, OSize+?OHDSZ), + {NH, NewPos, _} = dets_utils:alloc(H1, ?OHDSZ + Size), + W1 = {NewPos, [<>, Bin]}, + NWs = if + Pos =:= NewPos -> + [W1 | Ws]; + true -> + W2 = {Pos+?STATUS_POS, <>}, + [W1,W2 | Ws] + end, + create_writes(L, NH, NWs, No, NewPos, false); +create_writes([{delete,Pos,Size} | L], H, Ws, No, Next, _) -> + {NH, _} = dets_utils:free(H, Pos, Size+?OHDSZ), + NWs = [{Pos+?STATUS_POS,<>} | Ws], + create_writes(L, NH, NWs, No-1, Next, false); +create_writes([], H, Ws, No, _Next, _NextIsOld) -> + {H, Ws, No}. + +create_inserts(0, H, Ws, Next, _Size, _Bin) -> + {H, Ws, Next}; +create_inserts(N, H, Ws, Next, Size, Bin) -> + {NH, Pos, _} = dets_utils:alloc(H, ?OHDSZ + Size), + W = {Pos, [<>, Bin]}, + create_inserts(N-1, NH, [W | Ws], Pos, Size, Bin). + +slot_position(S) -> + Pos = ?SEGADDR(?SLOT2SEG(S)), + Segment = get_segp(Pos), + FinalPos = Segment + (4 * ?REM2(S, ?SEGSZ)), + {FinalPos, 4}. + +%% Twice the size of a segment due to the bug in sz2pos/1. Inlined. +actual_seg_size() -> + ?POW(sz2pos(?SEGSZ*4)-1). + +segp_cache(Pos, Segment) -> + put(Pos, Segment). + +%% Inlined. +get_segp(Pos) -> + get(Pos). + +%% Bug: If Sz0 is equal to 2**k for some k, then 2**(k+1) bytes are +%% allocated (wasting 2**k bytes). +sz2pos(N) -> + 1 + dets_utils:log2(N+1). + +scan_objs(_Head, Bin, From, To, L, Ts, R, _Type) -> + scan_objs(Bin, From, To, L, Ts, R). + +scan_objs(Bin, From, To, L, Ts, -1) -> + {stop, Bin, From, To, L, Ts}; +scan_objs(B = <<_N:32, Sz:32, St:32, T/binary>>, From, To, L, Ts, R) -> + if + St =:= ?ACTIVE; + St =:= ?FREE -> % deleted after scanning started + case T of + <> -> + NTs = [BinTerm | Ts], + OSz = Sz + ?OHDSZ, + Skip = ?POW(sz2pos(OSz)-1) - OSz, + F2 = From + OSz, + NR = if + R < 0 -> + R + 1; + true -> + R + OSz + Skip + end, + scan_skip(T2, F2, To, Skip, L, NTs, NR); + _ -> + {more, From, To, L, Ts, R, Sz+?OHDSZ} + end; + true -> % a segment + scan_skip(B, From, To, actual_seg_size(), L, Ts, R) + end; +scan_objs(_B, From, To, L, Ts, R) -> + {more, From, To, L, Ts, R, 0}. + +scan_skip(Bin, From, To, Skip, L, Ts, R) when From + Skip < To -> + SkipPos = From + Skip, + case Bin of + <<_:Skip/binary, Tail/binary>> -> + scan_objs(Tail, SkipPos, To, L, Ts, R); + _ -> + {more, SkipPos, To, L, Ts, R, 0} + end; +scan_skip(Bin, From, To, Skip, L, Ts, R) when From + Skip =:= To -> + scan_next_allocated(Bin, From, To, L, Ts, R); +scan_skip(_Bin, From, _To, Skip, L, Ts, R) -> % when From + Skip > _To + From1 = From + Skip, + {more, From1, From1, L, Ts, R, 0}. + +scan_next_allocated(_Bin, _From, To, <<>>=L, Ts, R) -> + {more, To, To, L, Ts, R, 0}; +scan_next_allocated(Bin, From0, _To, <>, Ts, R) -> + Skip = From - From0, + scan_skip(Bin, From0, To, Skip, L, Ts, R). + +%% Read term from file at position Pos +prterm(Head, Pos, ReadAhead) -> + Res = dets_utils:pread(Head, Pos, ?OHDSZ, ReadAhead), + ?DEBUGF("file:pread(~p, ~p, ?) -> ~p~n", [Head#head.filename, Pos, Res]), + {ok, <>} = Res, + ?DEBUGF("{Next, Sz} = ~p~n", [{Next, Sz}]), + Bin = case byte_size(Bin0) of + Actual when Actual >= Sz -> + Bin0; + _ -> + {ok, Bin1} = dets_utils:pread(Head, Pos + ?OHDSZ, Sz, 0), + Bin1 + end, + Term = binary_to_term(Bin), + {Next, Sz, Term}. + +%%%%%%%%%%%%%%%%% DEBUG functions %%%%%%%%%%%%%%%% + +file_info(FH) -> + #fileheader{closed_properly = CP, keypos = Kp, + m = M, next = Next, n = N, version = Version, + type = Type, no_objects = NoObjects} + = FH, + if + CP =:= 0 -> + {error, not_closed}; + FH#fileheader.cookie =/= ?MAGIC -> + {error, not_a_dets_file}; + FH#fileheader.version =/= ?FILE_FORMAT_VERSION -> + {error, bad_version}; + true -> + {ok, [{closed_properly,CP},{keypos,Kp},{m, M}, + {n,N},{next,Next},{no_objects,NoObjects}, + {type,Type},{version,Version}]} + end. + +v_segments(H) -> + v_segments(H, 0). + +v_segments(_H, ?SEGARRSZ) -> + done; +v_segments(H, SegNo) -> + Seg = dets_utils:read_4(H#head.fptr, ?SEGADDR(SegNo)), + if + Seg =:= 0 -> + done; + true -> + io:format("SEGMENT ~w ", [SegNo]), + io:format("At position ~w~n", [Seg]), + v_segment(H, SegNo, Seg, 0), + v_segments(H, SegNo+1) + end. + +v_segment(_H, _, _SegPos, ?SEGSZ) -> + done; +v_segment(H, SegNo, SegPos, SegSlot) -> + Slot = SegSlot + (SegNo * ?SEGSZ), + Chain = dets_utils:read_4(H#head.fptr, SegPos + (4 * SegSlot)), + if + Chain =:= 0 -> %% don't print empty chains + true; + true -> + io:format(" <~p>~p: [",[SegPos + (4 * SegSlot), Slot]), + print_chain(H, Chain) + end, + v_segment(H, SegNo, SegPos, SegSlot+1). + +print_chain(_H, 0) -> + io:format("] \n", []); +print_chain(H, Pos) -> + {ok, _} = file:position(H#head.fptr, Pos), + case rterm(H#head.fptr) of + {ok, 0, _Sz, Term} -> + io:format("<~p>~p] \n",[Pos, Term]); + {ok, Next, _Sz, Term} -> + io:format("<~p>~p, ", [Pos, Term]), + print_chain(H, Next); + Other -> + io:format("~nERROR ~p~n", [Other]) + end. + +%% Can't be used at the bucket level!!!! +%% Only when we go down a chain +rterm(F) -> + case catch rterm2(F) of + {'EXIT', Reason} -> %% truncated DAT file + dets_utils:vformat("** dets: Corrupt or truncated dets file~n", + []), + {error, Reason}; + Other -> + Other + end. + +rterm2(F) -> + {ok, <>} = file:read(F, ?OHDSZ), + {ok, Bin} = file:read(F, Sz), + Term = binary_to_term(Bin), + {ok, Next, Sz, Term}. + + -- cgit v1.2.3