From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/kernel/src/disk_log_1.erl | 1551 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1551 insertions(+) create mode 100644 lib/kernel/src/disk_log_1.erl (limited to 'lib/kernel/src/disk_log_1.erl') diff --git a/lib/kernel/src/disk_log_1.erl b/lib/kernel/src/disk_log_1.erl new file mode 100644 index 0000000000..7103417149 --- /dev/null +++ b/lib/kernel/src/disk_log_1.erl @@ -0,0 +1,1551 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(disk_log_1). + +%% Efficient file based log - implementation part + +-export([int_open/4, ext_open/4, logl/1, close/3, truncate/3, chunk/5, + sync/2, write_cache/2]). +-export([mf_int_open/7, mf_int_log/3, mf_int_close/2, mf_int_inc/2, + mf_ext_inc/2, mf_int_chunk/4, mf_int_chunk_step/3, + mf_sync/1, mf_write_cache/1]). +-export([mf_ext_open/7, mf_ext_log/3, mf_ext_close/2]). + +-export([print_index_file/1]). +-export([read_index_file/1]). +-export([read_size_file/1, read_size_file_version/1]). +-export([chunk_read_only/5]). +-export([mf_int_chunk_read_only/4]). +-export([change_size_wrap/3]). +-export([get_wrap_size/1]). +-export([is_head/1]). +-export([position/3, truncate_at/3, fwrite/4, fclose/2]). + +-compile({inline,[{scan_f2,7}]}). + +-import(lists, [concat/1, reverse/1, sum/1]). + +-include("disk_log.hrl"). + +%%% At the head of a LOG file we have [?LOGMAGIC, ?OPENED | ?CLOSED]. +%%% Otherwise it's not a LOG file. Following that, the head, come the +%%% logged items. +%%% +%%% There are four formats of wrap log files (so far). Only the size +%%% file and the index file differ between versions between the first +%%% three version. The fourth version 2(a), has some protection +%%% against damaged item sizes. +%%% Version 0: no "siz" file +%%% Version 1: "siz" file, 4 byte sizes +%%% Version 2: 8 byte sizes (support for large files) +%%% Version 2(a): Change of the format of logged items: +%%% if the size of a term binary is greater than or equal to +%%% ?MIN_MD5_TERM, a logged item looks like +%%% <>, +%%% otherwise <>. + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- + +%% -> {ok, NoBytes, NewFdC} | {Error, NewFdC} +log(FdC, FileName, X) -> + {Bs, Size} = logl(X, [], 0), + case fwrite(FdC, FileName, Bs, Size) of + {ok, NewFdC} -> + {ok, Size, NewFdC}; + Error -> + Error + end. + +-spec logl([binary()]) -> {iolist(), non_neg_integer()}. +logl(X) -> + logl(X, [], 0). + +logl([X | T], Bs, Size) -> + Sz = byte_size(X), + BSz = <>, + NBs = case Sz < ?MIN_MD5_TERM of + true -> + [Bs, BSz, ?BIGMAGICHEAD | X]; + false -> + MD5 = erlang:md5(BSz), + [Bs, BSz, ?BIGMAGICHEAD, MD5 | X] + end, + logl(T, NBs, Size + ?HEADERSZ + Sz); +logl([], Bs, Size) -> + {Bs, Size}. + +%% -> {ok, NewFdC} | {Error, NewFdC} +write_cache(#cache{fd = Fd, c = C}, FName) -> + erase(write_cache_timer_is_running), + write_cache(Fd, FName, C). + +%% -> {Reply, NewFdC}; Reply = ok | Error +sync(FdC, FName) -> + fsync(FdC, FName). + +%% -> {Reply, NewFdC}; Reply = ok | Error +truncate(FdC, FileName, Head) -> + Reply = truncate_at(FdC, FileName, ?HEADSZ), + case Reply of + {ok, _} when Head =:= none -> + Reply; + {ok, FdC1} -> + {ok, B} = Head, + case log(FdC1, FileName, [B]) of + {ok, _NoBytes, NewFdC} -> + {ok, NewFdC}; + Reply2 -> + Reply2 + end; + _ -> + Reply + end. + +%% -> {NewFdC, Reply}, Reply = {Cont, Binaries} | {error, Reason} | eof +chunk(FdC, FileName, Pos, B, N) when is_binary(B) -> + true = byte_size(B) >= ?HEADERSZ, + do_handle_chunk(FdC, FileName, Pos, B, N); +chunk(FdC, FileName, Pos, NoBytes, N) -> + MaxNoBytes = case NoBytes of + [] -> ?MAX_CHUNK_SIZE; + _ -> erlang:max(NoBytes, ?MAX_CHUNK_SIZE) + end, + case read_chunk(FdC, FileName, Pos, MaxNoBytes) of + {NewFdC, {ok, Bin}} when byte_size(Bin) < ?HEADERSZ -> + {NewFdC, {error, {corrupt_log_file, FileName}}}; + {NewFdC, {ok, Bin}} when NoBytes =:= []; byte_size(Bin) >= NoBytes -> + NewPos = Pos + byte_size(Bin), + do_handle_chunk(NewFdC, FileName, NewPos, Bin, N); + {NewFdC, {ok, _Bin}} -> + {NewFdC, {error, {corrupt_log_file, FileName}}}; + {NewFdC, eof} when is_integer(NoBytes) -> % "cannot happen" + {NewFdC, {error, {corrupt_log_file, FileName}}}; + Other -> % eof or error + Other + end. + +do_handle_chunk(FdC, FileName, Pos, B, N) -> + case handle_chunk(B, Pos, N, []) of + corrupt -> + {FdC, {error, {corrupt_log_file, FileName}}}; + {C, []} -> + chunk(FdC, FileName, C#continuation.pos, C#continuation.b, N); + C_Ack -> + {FdC, C_Ack} + end. + +handle_chunk(B, Pos, 0, Ack) when byte_size(B) >= ?HEADERSZ -> + {#continuation{pos = Pos, b = B}, Ack}; +handle_chunk(B= <>, Pos, N, Ack) when Size < ?MIN_MD5_TERM -> + case Tail of + <> -> + %% The client calls binary_to_term/1. + handle_chunk(Tail2, Pos, N-1, [BinTerm | Ack]); + _ -> + BytesToRead = Size + ?HEADERSZ, + {#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack} + end; +handle_chunk(B= <>, Pos, _N, Ack) -> % when Size >= ?MIN_MD5_TERM + MD5 = erlang:md5(<>), + case Tail of + %% The requested object is always bigger than a chunk. + <> -> + {#continuation{pos = Pos, b = []}, [Bin | Ack]}; + <> -> + BytesToRead = Size + ?HEADERSZ + 16, + {#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack}; + _ when byte_size(Tail) >= 16 -> + corrupt; + _ -> + {#continuation{pos = Pos - byte_size(B), b = []}, Ack} + end; +handle_chunk(B= <>, + Pos, N, Ack) -> + %% Version 2, before 2(a). + case Tail of + <> -> + handle_chunk(Tail2, Pos, N-1, [BinTerm | Ack]); + _ -> + %% We read the whole thing into one binary, even if Size is huge. + BytesToRead = Size + ?HEADERSZ, + {#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack} + end; +handle_chunk(B, _Pos, _N, _Ack) when byte_size(B) >= ?HEADERSZ -> + corrupt; +handle_chunk(B, Pos, _N, Ack) -> + {#continuation{pos = Pos-byte_size(B), b = []}, Ack}. + +read_chunk(FdC, FileName, Pos, MaxBytes) -> + {FdC1, R} = pread(FdC, FileName, Pos + ?HEADSZ, MaxBytes), + case position(FdC1, FileName, eof) of + {ok, NewFdC, _Pos} -> + {NewFdC, R}; + {Error, NewFdC} -> + {NewFdC, Error} + end. + +%% Used by wrap_log_reader. +%% -> {NewFdC, Reply}, +%% Reply = {Cont, Binaries, Bad} (Bad >= 0) | {error, Reason} | eof +chunk_read_only(FdC = #cache{}, FileName, Pos, B, N) -> + do_chunk_read_only(FdC, FileName, Pos, B, N); +chunk_read_only(Fd, FileName, Pos, B, N) -> + %% wrap_log_reader calling... + FdC = #cache{fd = Fd}, + {_NFdC, Reply} = do_chunk_read_only(FdC, FileName, Pos, B, N), + Reply. + +do_chunk_read_only(FdC, FileName, Pos, B, N) when is_binary(B) -> + true = byte_size(B) >= ?HEADERSZ, + do_handle_chunk_ro(FdC, FileName, Pos, B, N); +do_chunk_read_only(FdC, FileName, Pos, NoBytes, N) -> + MaxNoBytes = case NoBytes of + [] -> ?MAX_CHUNK_SIZE; + _ -> erlang:max(NoBytes, ?MAX_CHUNK_SIZE) + end, + case read_chunk_ro(FdC, FileName, Pos, MaxNoBytes) of + {NewFdC, {ok, Bin}} when byte_size(Bin) < ?HEADERSZ -> + NewCont = #continuation{pos = Pos+byte_size(Bin), b = []}, + {NewFdC, {NewCont, [], byte_size(Bin)}}; + {NewFdC, {ok, Bin}} when NoBytes =:= []; byte_size(Bin) >= NoBytes -> + NewPos = Pos + byte_size(Bin), + do_handle_chunk_ro(NewFdC, FileName, NewPos, Bin, N); + {NewFdC, {ok, Bin}} -> + NewCont = #continuation{pos = Pos+byte_size(Bin), b = []}, + {NewFdC, {NewCont, [], byte_size(Bin)-?HEADERSZ}}; + {NewFdC, eof} when is_integer(NoBytes) -> % "cannot happen" + {NewFdC, eof}; % what else? + Other -> + Other + end. + +do_handle_chunk_ro(FdC, FileName, Pos, B, N) -> + case handle_chunk_ro(B, Pos, N, [], 0) of + {C, [], 0} -> + #continuation{pos = NewPos, b = NoBytes} = C, + do_chunk_read_only(FdC, FileName, NewPos, NoBytes, N); + C_Ack_Bad -> + {FdC, C_Ack_Bad} + end. + +handle_chunk_ro(B, Pos, 0, Ack, Bad) when byte_size(B) >= ?HEADERSZ -> + {#continuation{pos = Pos, b = B}, Ack, Bad}; +handle_chunk_ro(B= <>, Pos, N, Ack, Bad) when Size < ?MIN_MD5_TERM -> + case Tail of + <> -> + handle_chunk_ro(Tail2, Pos, N-1, [BinTerm | Ack], Bad); + _ -> + BytesToRead = Size + ?HEADERSZ, + {#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack, Bad} + end; +handle_chunk_ro(B= <>, Pos, N, Ack, Bad) -> % when Size>=?MIN_MD5_TERM + MD5 = erlang:md5(<>), + case Tail of + <> -> + %% The requested object is always bigger than a chunk. + {#continuation{pos = Pos, b = []}, [Bin | Ack], Bad}; + <> -> + BytesToRead = Size + ?HEADERSZ + 16, + {#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack, Bad}; + <<_BadMD5:16/binary, _:1/unit:8, Tail2/binary>> -> + handle_chunk_ro(Tail2, Pos, N-1, Ack, Bad+1); + _ -> + {#continuation{pos = Pos - byte_size(B), b = []}, Ack, Bad} + end; +handle_chunk_ro(B= <>, Pos, N, Ack, Bad) -> + %% Version 2, before 2(a). + case Tail of + <> -> + handle_chunk_ro(Tail2, Pos, N-1, [BinTerm | Ack], Bad); + _ -> + %% We read the whole thing into one binary, even if Size is huge. + BytesToRead = Size + ?HEADERSZ, + {#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack, Bad} + end; +handle_chunk_ro(B, Pos, N, Ack, Bad) when byte_size(B) >= ?HEADERSZ -> + <<_:1/unit:8, B2/binary>> = B, + handle_chunk_ro(B2, Pos, N-1, Ack, Bad+1); +handle_chunk_ro(B, Pos, _N, Ack, Bad) -> + {#continuation{pos = Pos-byte_size(B), b = []}, Ack, Bad}. + +read_chunk_ro(FdC, FileName, Pos, MaxBytes) -> + pread(FdC, FileName, Pos + ?HEADSZ, MaxBytes). + +%% -> ok | throw(Error) +close(#cache{fd = Fd, c = []}, _FileName, read_only) -> + file:close(Fd); +close(#cache{fd = Fd, c = C}, FileName, read_write) -> + {Reply, _NewFdC} = write_cache(Fd, FileName, C), + mark(Fd, FileName, ?CLOSED), + file:close(Fd), + if Reply =:= ok -> ok; true -> throw(Reply) end. + +%% Open an internal file. Head is ignored if Mode is read_only. +%% int_open(FileName, Repair, Mode, Head) -> +%% {ok, {Alloc, FdC, HeadSize, FileSize}} +%% | {repaired, FdC, Terms, BadBytes, FileSize} +%% | throw(Error) +%% Alloc = new | existed +%% HeadSize = {NumberOfItemsWritten, NumberOfBytesWritten} +%% (HeadSize is equal {0, 0} if Alloc =:= existed, or no header written.) +int_open(FName, truncate, read_write, Head) -> + new_int_file(FName, Head); +int_open(FName, Repair, read_write, Head) -> + case open_read(FName) of + {ok, Fd} -> %% File exists + case file:read(Fd, ?HEADSZ) of + {ok, FileHead} -> + case is_head(FileHead) of + yes -> + file:close(Fd), + case open_update(FName) of + {ok, Fd2} -> + mark(Fd2, FName, ?OPENED), + FdC1 = #cache{fd = Fd2}, + {FdC, P} = position_close(FdC1, FName,eof), + {ok, {existed, FdC, {0, 0}, P}}; + Error -> + file_error(FName, Error) + end; + yes_not_closed when Repair -> + repair(Fd, FName); + yes_not_closed when not Repair -> + file:close(Fd), + throw({error, {need_repair, FName}}); + no -> + file:close(Fd), + throw({error, {not_a_log_file, FName}}) + end; + eof -> + file:close(Fd), + throw({error, {not_a_log_file, FName}}); + Error -> + file_error_close(Fd, FName, Error) + end; + _Other -> + new_int_file(FName, Head) + end; +int_open(FName, _Repair, read_only, _Head) -> + case open_read(FName) of + {ok, Fd} -> %% File exists + case file:read(Fd, ?HEADSZ) of + {ok, Head} -> + case is_head(Head) of + yes -> + {ok, P} = position_close2(Fd, FName, eof), + FdC = #cache{fd = Fd}, + {ok, {existed, FdC, {0, 0}, P}}; + yes_not_closed -> + {ok, P} = position_close2(Fd, FName, eof), + FdC = #cache{fd = Fd}, + {ok, {existed, FdC, {0, 0}, P}}; + no -> + file:close(Fd), + throw({error, {not_a_log_file, FName}}) + end; + eof -> + file:close(Fd), + throw({error, {not_a_log_file, FName}}); + Error -> + file_error_close(Fd, FName, Error) + end; + Error -> + file_error(FName, Error) + end. + +new_int_file(FName, Head) -> + case open_update(FName) of + {ok, Fd} -> + ok = truncate_at_close2(Fd, FName, bof), + fwrite_close2(Fd, FName, [?LOGMAGIC, ?OPENED]), + {FdC1, Nh, HeadSz} = int_log_head(Fd, Head), + {FdC, FileSize} = position_close(FdC1, FName, cur), + {ok, {new, FdC, {Nh, ?HEADERSZ + HeadSz}, FileSize}}; + Error -> + file_error(FName, Error) + end. + +%% -> {FdC, NoItemsWritten, NoBytesWritten} | throw(Error) +int_log_head(Fd, Head) -> + case lh(Head, internal) of + {ok, BinHead} -> + {Bs, Size} = logl([BinHead]), + {ok, FdC} = fwrite_header(Fd, Bs, Size), + {FdC, 1, Size}; + none -> + {#cache{fd = Fd}, 0, 0}; + Error -> + file:close(Fd), + throw(Error) + end. + +%% Open an external file. +%% -> {ok, {Alloc, FdC, HeadSize}, FileSize} | throw(Error) +ext_open(FName, truncate, read_write, Head) -> + new_ext_file(FName, Head); +ext_open(FName, _Repair, read_write, Head) -> + case file:read_file_info(FName) of + {ok, _FileInfo} -> + case open_update(FName) of + {ok, Fd} -> + {ok, P} = position_close2(Fd, FName, eof), + FdC = #cache{fd = Fd}, + {ok, {existed, FdC, {0, 0}, P}}; + Error -> + file_error(FName, Error) + end; + _Other -> + new_ext_file(FName, Head) + end; +ext_open(FName, _Repair, read_only, _Head) -> + case open_read(FName) of + {ok, Fd} -> + {ok, P} = position_close2(Fd, FName, eof), + FdC = #cache{fd = Fd}, + {ok, {existed, FdC, {0, 0}, P}}; + Error -> + file_error(FName, Error) + end. + +new_ext_file(FName, Head) -> + case open_truncate(FName) of + {ok, Fd} -> + {FdC1, HeadSize} = ext_log_head(Fd, Head), + {FdC, FileSize} = position_close(FdC1, FName, cur), + {ok, {new, FdC, HeadSize, FileSize}}; + Error -> + file_error(FName, Error) + end. + +%% -> {FdC, {NoItemsWritten, NoBytesWritten}} | throw(Error) +ext_log_head(Fd, Head) -> + case lh(Head, external) of + {ok, BinHead} -> + Size = byte_size(BinHead), + {ok, FdC} = fwrite_header(Fd, BinHead, Size), + {FdC, {1, Size}}; + none -> + {#cache{fd = Fd}, {0, 0}}; + Error -> + file:close(Fd), + throw(Error) + end. + +%% -> _Any | throw() +mark(Fd, FileName, What) -> + position_close2(Fd, FileName, 4), + fwrite_close2(Fd, FileName, What). + +%% -> {ok, Bin} | Error +lh({ok, Bin}, _Format) -> + {ok, Bin}; +lh({M, F, A}, Format) when is_list(A) -> + case catch apply(M, F, A) of + {ok, Head} when Format =:= internal -> + {ok, term_to_binary(Head)}; + {ok, Bin} when is_binary(Bin) -> + {ok, Bin}; + {ok, Bytes} -> + case catch list_to_binary(Bytes) of + {'EXIT', _} -> + {error, {invalid_header, {{M,F,A}, {ok, Bytes}}}}; + Bin -> + {ok, Bin} + end; + {'EXIT', Error} -> + {error, {invalid_header, {{M,F,A}, Error}}}; + Error -> + {error, {invalid_header, {{M,F,A}, Error}}} + end; +lh({M, F, A}, _Format) -> % cannot happen + {error, {invalid_header, {M, F, A}}}; +lh(none, _Format) -> + none; +lh(H, _F) -> % cannot happen + {error, {invalid_header, H}}. + +repair(In, File) -> + FSz = file_size(File), + error_logger:info_msg("disk_log: repairing ~p ...\n", [File]), + Tmp = add_ext(File, "TMP"), + {ok, {_Alloc, Out, {0, _}, _FileSize}} = new_int_file(Tmp, none), + scan_f_read(<<>>, In, Out, File, FSz, Tmp, ?MAX_CHUNK_SIZE, 0, 0). + +scan_f_read(B, In, Out, File, FSz, Tmp, MaxBytes, No, Bad) -> + case file:read(In, MaxBytes) of + eof -> + done_scan(In, Out, Tmp, File, No, Bad+byte_size(B)); + {ok, Bin} -> + NewBin = list_to_binary([B, Bin]), + {NB, NMax, Ack, NNo, NBad} = + scan_f(NewBin, FSz, [], No, Bad), + case log(Out, Tmp, lists:reverse(Ack)) of + {ok, _Size, NewOut} -> + scan_f_read(NB, In, NewOut, File, FSz, Tmp, NMax,NNo,NBad); + {{error, {file_error, _Filename, Error}}, NewOut} -> + repair_err(In, NewOut, Tmp, File, {error, Error}) + end; + Error -> + repair_err(In, Out, Tmp, File, Error) + end. + +scan_f(B = <>, + FSz, Ack, No, Bad) when Size < ?MIN_MD5_TERM -> + scan_f2(B, FSz, Ack, No, Bad, Size, Tail); +scan_f(B = <>, + FSz, Ack, No, Bad) -> % when Size >= ?MIN_MD5_TERM + MD5 = erlang:md5(<>), + case Tail of + <> -> + case catch binary_to_term(BinTerm) of + {'EXIT', _} -> + scan_f(Tail2, FSz, Ack, No, Bad+Size); + _Term -> + scan_f(Tail2, FSz, [BinTerm | Ack], No+1, Bad) + end; + <> -> + {B, Size-byte_size(Tail)+16, Ack, No, Bad}; + _ when byte_size(Tail) < 16 -> + {B, Size-byte_size(Tail)+16, Ack, No, Bad}; + _ -> + <<_:8, B2/binary>> = B, + scan_f(B2, FSz, Ack, No, Bad+1) + end; +scan_f(B = <>, + FSz, Ack, No, Bad) when Size =< FSz -> + %% Since the file is not compressed, the item size cannot exceed + %% the file size. + scan_f2(B, FSz, Ack, No, Bad, Size, Tail); +scan_f(B = <<_:?HEADERSZ/unit:8, _/binary>>, FSz, Ack, No, Bad) -> + <<_:8, B2/binary>> = B, + scan_f(B2, FSz, Ack, No, Bad + 1); +scan_f(B, _FSz, Ack, No, Bad) -> + {B, ?MAX_CHUNK_SIZE, Ack, No, Bad}. + +scan_f2(B, FSz, Ack, No, Bad, Size, Tail) -> + case Tail of + <> -> + case catch binary_to_term(BinTerm) of + {'EXIT', _} -> + <<_:8, B2/binary>> = B, + scan_f(B2, FSz, Ack, No, Bad+1); + _Term -> + scan_f(Tail2, FSz, [BinTerm | Ack], No+1, Bad) + end; + _ -> + {B, Size-byte_size(Tail), Ack, No, Bad} + end. + +done_scan(In, Out, OutName, FName, RecoveredTerms, BadChars) -> + file:close(In), + case catch fclose(Out, OutName) of + ok -> + case file:rename(OutName, FName) of + ok -> + case open_update(FName) of + {ok, New} -> + {ok, P} = position_close2(New, FName, eof), + FdC = #cache{fd = New}, + {repaired, FdC, RecoveredTerms, BadChars, P}; + Error -> + file_error(FName, Error) + end; + Error -> + file:delete(OutName), + file_error(FName, Error) + end; + Error -> + file:delete(OutName), + throw(Error) + end. + +repair_err(In, Out, OutName, ErrFileName, Error) -> + file:close(In), + catch fclose(Out, OutName), + % OutName is often the culprit, try to remove it anyway... + file:delete(OutName), + file_error(ErrFileName, Error). + +%% Used by wrap_log_reader. +-spec is_head(binary()) -> 'yes' | 'yes_not_closed' | 'no'. +is_head(<>) when ?LOGMAGIC =:= M, ?CLOSED =:= S -> + yes; +is_head(<>) when ?LOGMAGIC =:= M, ?OPENED =:= S -> + yes_not_closed; +is_head(Bin) when is_binary(Bin) -> + no. + +%%----------------------------------------------------------------- +%% Func: mf_int_open/7, mf_ext_open/7 +%% Args: FName = file:filename() +%% MaxB = integer() +%% MaxF = integer() +%% Repair = truncate | true | false +%% Mode = read_write | read_only +%% Head = none | {ok, Bin} | {M, F, A} +%% Version = integer() +%% Purpose: An ADT for wrapping logs. mf_int_ writes binaries (mf_ext_ +%% writes bytes) +%% to files called FName.1, FName.2, ..., FName.MaxF. +%% Writes MaxB bytes on each file. +%% Creates a file called Name.idx in the Dir. This +%% file contains the last written FileName as one byte, and +%% follwing that, the sizes of each file (size 0 number of items). +%% On startup, this file is read, and the next available +%% filename is used as first log file. +%% Reports can be browsed with Report Browser Tool (rb), or +%% read with disk_log. +%%----------------------------------------------------------------- +-spec mf_int_open(FName :: file:filename(), + MaxB :: integer(), + MaxF :: integer(), + Repair :: dlog_repair(), + Mode :: dlog_mode(), + Head :: dlog_head(), + Version :: integer()) + -> {'ok', #handle{}, integer()} + | {'repaired', #handle{}, + non_neg_integer(), non_neg_integer(), non_neg_integer()}. +%% | throw(FileError) +mf_int_open(FName, MaxB, MaxF, Repair, Mode, Head, Version) -> + {First, Sz, TotSz, NFiles} = read_index_file(Repair, FName, MaxF), + write_size_file(Mode, FName, MaxB, MaxF, Version), + NewMaxF = if + NFiles > MaxF -> + {MaxF, NFiles}; + true -> + MaxF + end, + case int_file_open(FName, First, 0, 0, Head, Repair, Mode) of + {ok, FdC, FileName, Lost, {NoItems, NoBytes}, FSz} -> + % firstPos = NoBytes is not always correct when the file + % existed, but it will have to do since we don't know + % where the header ends. + CurCnt = Sz + NoItems - Lost, + {ok, #handle{filename = FName, maxB = MaxB, + maxF = NewMaxF, curF = First, cur_fdc = FdC, + cur_name = FileName, cur_cnt = CurCnt, + acc_cnt = -Sz, curB = FSz, + firstPos = NoBytes, noFull = 0, accFull = 0}, + TotSz + CurCnt}; + {repaired, FdC, FileName, Rec, Bad, FSz} -> + {repaired, + #handle{filename = FName, maxB = MaxB, cur_name = FileName, + maxF = NewMaxF, curF = First, cur_fdc = FdC, + cur_cnt = Rec, acc_cnt = -Rec, curB = FSz, + firstPos = 0, noFull = 0, accFull = 0}, + Rec, Bad, TotSz + Rec} + end. + +%% -> {ok, handle(), Lost} | {error, Error, handle()} +mf_int_inc(Handle, Head) -> + #handle{filename = FName, cur_cnt = CurCnt, acc_cnt = AccCnt, + cur_name = FileName, curF = CurF, maxF = MaxF, + cur_fdc = CurFdC, noFull = NoFull} = Handle, + case catch wrap_int_log(FName, CurF, MaxF, CurCnt, Head) of + {NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} -> + Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF, + cur_name = NewFileName, + cur_cnt = Nh, acc_cnt = AccCnt + CurCnt, + maxF = NewMaxF, firstPos = FirstPos, + curB = FirstPos, noFull = NoFull + 1}, + case catch close(CurFdC, FileName, read_write) of + ok -> + {ok, Handle1, Lost}; + Error -> % Error in the last file, new file opened. + {error, Error, Handle1} + end; + Error -> + {error, Error, Handle} + end. + +%% -> {ok, handle(), Logged, Lost, NoWraps} | {ok, handle(), Logged} +%% | {error, Error, handle(), Logged, Lost} +%% The returned handle is not always valid - something may +%% have been written before things went wrong. +mf_int_log(Handle, Bins, Head) -> + mf_int_log(Handle, Bins, Head, 0, []). + +mf_int_log(Handle, [], _Head, No, []) -> + {ok, Handle, No}; +mf_int_log(Handle, [], _Head, No, Wraps0) -> + Wraps = reverse(Wraps0), + {ok, Handle, No, sum(Wraps), Wraps}; +mf_int_log(Handle, Bins, Head, No0, Wraps) -> + #handle{curB = CurB, maxB = MaxB, cur_name = FileName, cur_fdc = CurFdC, + firstPos = FirstPos0, cur_cnt = CurCnt} = Handle, + {FirstBins, LastBins, NoBytes, N} = + int_split_bins(CurB, MaxB, FirstPos0, Bins), + case FirstBins of + [] -> + #handle{filename = FName, curF = CurF, maxF = MaxF, + acc_cnt = AccCnt, noFull = NoFull} = Handle, + case catch wrap_int_log(FName, CurF, MaxF, CurCnt, Head) of + {NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} -> + Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF, + cur_cnt = Nh, + cur_name = NewFileName, + acc_cnt = AccCnt + CurCnt, + maxF = NewMaxF, + curB = FirstPos, + firstPos = FirstPos, + noFull = NoFull + 1}, + case catch close(CurFdC, FileName, read_write) of + ok -> + mf_int_log(Handle1, Bins, Head, No0 + Nh, + [Lost | Wraps]); + Error -> + Lost1 = Lost + sum(Wraps), + {error, Error, Handle1, No0 + Nh, Lost1} + end; + Error -> + {error, Error, Handle, No0, sum(Wraps)} + end; + _ -> + case fwrite(CurFdC, FileName, FirstBins, NoBytes) of + {ok, NewCurFdC} -> + Handle1 = Handle#handle{cur_fdc = NewCurFdC, + curB = CurB + NoBytes, + cur_cnt = CurCnt + N}, + mf_int_log(Handle1, LastBins, Head, No0 + N, Wraps); + {Error, NewCurFdC} -> + Handle1 = Handle#handle{cur_fdc = NewCurFdC}, + {error, Error, Handle1, No0, sum(Wraps)} + end + end. + +wrap_int_log(FName, CurF, MaxF, CurCnt, Head) -> + {NewF, NewMaxF} = inc_wrap(FName, CurF, MaxF), + {ok, NewFdC, NewFileName, Lost, {Nh, FirstPos}, _FileSize} = + int_file_open(FName, NewF, CurF, CurCnt, Head), + {NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost}. + +%% -> {NewHandle, Reply}, Reply = {Cont, Binaries} | {error, Reason} | eof +mf_int_chunk(Handle, 0, Bin, N) -> + FirstF = find_first_file(Handle), + mf_int_chunk(Handle, {FirstF, 0}, Bin, N); +mf_int_chunk(#handle{curF = FileNo, cur_fdc = FdC, cur_name = FileName} + = Handle, {FileNo, Pos}, Bin, N) -> + {NewFdC, Reply} = chunk(FdC, FileName, Pos, Bin, N), + {Handle#handle{cur_fdc = NewFdC}, conv(Reply, FileNo)}; +mf_int_chunk(Handle, {FileNo, Pos}, Bin, N) -> + FName = add_ext(Handle#handle.filename, FileNo), + NFileNo = inc(FileNo, Handle#handle.maxF), + case catch int_open(FName, true, read_only, any) of + {error, _Reason} -> + error_logger:info_msg("disk_log: chunk error. File ~p missing.\n\n", + [FName]), + mf_int_chunk(Handle, {NFileNo, 0}, [], N); + {ok, {_Alloc, FdC, _HeadSize, _FileSize}} -> + case chunk(FdC, FName, Pos, Bin, N) of + {NewFdC, eof} -> + file:close(NewFdC#cache.fd), + mf_int_chunk(Handle, {NFileNo, 0}, [], N); + {NewFdC, Other} -> + file:close(NewFdC#cache.fd), + {Handle, conv(Other, FileNo)} + end + end. + +%% -> {NewHandle, Reply}, +%% Reply = {Cont, Binaries, Bad} (Bad >= 0) | {error, Reason} | eof +mf_int_chunk_read_only(Handle, 0, Bin, N) -> + FirstF = find_first_file(Handle), + mf_int_chunk_read_only(Handle, {FirstF, 0}, Bin, N); +mf_int_chunk_read_only(#handle{curF = FileNo, cur_fdc = FdC, cur_name=FileName} + = Handle, {FileNo, Pos}, Bin, N) -> + {NewFdC, Reply} = do_chunk_read_only(FdC, FileName, Pos, Bin, N), + {Handle#handle{cur_fdc = NewFdC}, conv(Reply, FileNo)}; +mf_int_chunk_read_only(Handle, {FileNo, Pos}, Bin, N) -> + FName = add_ext(Handle#handle.filename, FileNo), + NFileNo = inc(FileNo, Handle#handle.maxF), + case catch int_open(FName, true, read_only, any) of + {error, _Reason} -> + error_logger:info_msg("disk_log: chunk error. File ~p missing.\n\n", + [FName]), + mf_int_chunk_read_only(Handle, {NFileNo, 0}, [], N); + {ok, {_Alloc, FdC, _HeadSize, _FileSize}} -> + case do_chunk_read_only(FdC, FName, Pos, Bin, N) of + {NewFdC, eof} -> + file:close(NewFdC#cache.fd), + mf_int_chunk_read_only(Handle, {NFileNo,0}, [], N); + {NewFdC, Other} -> + file:close(NewFdC#cache.fd), + {Handle, conv(Other, FileNo)} + end + end. + +%% -> {ok, Cont} | Error +mf_int_chunk_step(Handle, 0, Step) -> + FirstF = find_first_file(Handle), + mf_int_chunk_step(Handle, {FirstF, 0}, Step); +mf_int_chunk_step(Handle, {FileNo, _Pos}, Step) -> + NFileNo = inc(FileNo, Handle#handle.maxF, Step), + FileName = add_ext(Handle#handle.filename, NFileNo), + case file:read_file_info(FileName) of + {ok, _FileInfo} -> + {ok, #continuation{pos = {NFileNo, 0}, b = []}}; + _Error -> + {error, end_of_log} + end. + +%% -> {Reply, handle()}; Reply = ok | Error +mf_write_cache(#handle{filename = FName, cur_fdc = FdC} = Handle) -> + erase(write_cache_timer_is_running), + #cache{fd = Fd, c = C} = FdC, + {Reply, NewFdC} = write_cache(Fd, FName, C), + {Reply, Handle#handle{cur_fdc = NewFdC}}. + +%% -> {Reply, handle()}; Reply = ok | Error +mf_sync(#handle{filename = FName, cur_fdc = FdC} = Handle) -> + {Reply, NewFdC} = fsync(FdC, FName), + {Reply, Handle#handle{cur_fdc = NewFdC}}. + +%% -> ok | throw(FileError) +mf_int_close(#handle{filename = FName, curF = CurF, cur_name = FileName, + cur_fdc = CurFdC, cur_cnt = CurCnt}, Mode) -> + close(CurFdC, FileName, Mode), + write_index_file(Mode, FName, CurF, CurF, CurCnt), + ok. + +%% -> {ok, handle(), Cnt} | throw(FileError) +mf_ext_open(FName, MaxB, MaxF, Repair, Mode, Head, Version) -> + {First, Sz, TotSz, NFiles} = read_index_file(Repair, FName, MaxF), + write_size_file(Mode, FName, MaxB, MaxF, Version), + NewMaxF = if + NFiles > MaxF -> + {MaxF, NFiles}; + true -> + MaxF + end, + {ok, FdC, FileName, Lost, {NoItems, NoBytes}, CurB} = + ext_file_open(FName, First, 0, 0, Head, Repair, Mode), + CurCnt = Sz + NoItems - Lost, + {ok, #handle{filename = FName, maxB = MaxB, cur_name = FileName, + maxF = NewMaxF, cur_cnt = CurCnt, acc_cnt = -Sz, + curF = First, cur_fdc = FdC, firstPos = NoBytes, + curB = CurB, noFull = 0, accFull = 0}, + TotSz + CurCnt}. + +%% -> {ok, handle(), Lost} +%% | {error, Error, handle()} +%% | throw(FatalError) +%% Fatal errors should always terminate the log. +mf_ext_inc(Handle, Head) -> + #handle{filename = FName, cur_cnt = CurCnt, cur_name = FileName, + acc_cnt = AccCnt, curF = CurF, maxF = MaxF, cur_fdc = CurFdC, + noFull = NoFull} = Handle, + case catch wrap_ext_log(FName, CurF, MaxF, CurCnt, Head) of + {NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} -> + Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF, + cur_name = NewFileName, + cur_cnt = Nh, acc_cnt = AccCnt + CurCnt, + maxF = NewMaxF, firstPos = FirstPos, + curB = FirstPos, noFull = NoFull + 1}, + case catch fclose(CurFdC, FileName) of + ok -> + {ok, Handle1, Lost}; + Error -> % Error in the last file, new file opened. + {error, Error, Handle1} + end; + Error -> + {error, Error, Handle} + end. + +%% -> {ok, handle(), Logged, Lost, NoWraps} | {ok, handle(), Logged} +%% | {error, Error, handle(), Logged, Lost} + +%% The returned handle is not always valid - +%% something may have been written before things went wrong. +mf_ext_log(Handle, Bins, Head) -> + mf_ext_log(Handle, Bins, Head, 0, []). + +mf_ext_log(Handle, [], _Head, No, []) -> + {ok, Handle, No}; +mf_ext_log(Handle, [], _Head, No, Wraps0) -> + Wraps = reverse(Wraps0), + {ok, Handle, No, sum(Wraps), Wraps}; +mf_ext_log(Handle, Bins, Head, No0, Wraps) -> + #handle{curB = CurB, maxB = MaxB, cur_name = FileName, cur_fdc = CurFdC, + firstPos = FirstPos0, cur_cnt = CurCnt} = Handle, + {FirstBins, LastBins, NoBytes, N} = + ext_split_bins(CurB, MaxB, FirstPos0, Bins), + case FirstBins of + [] -> + #handle{filename = FName, curF = CurF, maxF = MaxF, + acc_cnt = AccCnt, noFull = NoFull} = Handle, + case catch wrap_ext_log(FName, CurF, MaxF, CurCnt, Head) of + {NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} -> + Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF, + cur_cnt = Nh, + cur_name = NewFileName, + acc_cnt = AccCnt + CurCnt, + maxF = NewMaxF, + curB = FirstPos, + firstPos = FirstPos, + noFull = NoFull + 1}, + case catch fclose(CurFdC, FileName) of + ok -> + mf_ext_log(Handle1, Bins, Head, No0 + Nh, + [Lost | Wraps]); + Error -> + Lost1 = Lost + sum(Wraps), + {error, Error, Handle1, No0 + Nh, Lost1} + end; + Error -> + {error, Error, Handle, No0, sum(Wraps)} + end; + _ -> + case fwrite(CurFdC, FileName, FirstBins, NoBytes) of + {ok, NewCurFdC} -> + Handle1 = Handle#handle{cur_fdc = NewCurFdC, + curB = CurB + NoBytes, + cur_cnt = CurCnt + N}, + mf_ext_log(Handle1, LastBins, Head, No0 + N, Wraps); + {Error, NewCurFdC} -> + Handle1 = Handle#handle{cur_fdc = NewCurFdC}, + {error, Error, Handle1, No0, sum(Wraps)} + end + end. + +wrap_ext_log(FName, CurF, MaxF, CurCnt, Head) -> + {NewF, NewMaxF} = inc_wrap(FName, CurF, MaxF), + {ok, NewFdC, NewFileName, Lost, {Nh, FirstPos}, _FileSize} = + ext_file_open(FName, NewF, CurF, CurCnt, Head), + {NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost}. + +%% -> ok | throw(FileError) +mf_ext_close(#handle{filename = FName, curF = CurF, + cur_fdc = CurFdC, cur_cnt = CurCnt}, Mode) -> + Res = (catch fclose(CurFdC, FName)), + write_index_file(Mode, FName, CurF, CurF, CurCnt), + Res. + +%% -> {ok, handle()} | throw(FileError) +change_size_wrap(Handle, {NewMaxB, NewMaxF}, Version) -> + FName = Handle#handle.filename, + {_MaxB, MaxF} = get_wrap_size(Handle), + write_size_file(read_write, FName, NewMaxB, NewMaxF, Version), + if + NewMaxF > MaxF -> + remove_files(FName, MaxF + 1, NewMaxF), + {ok, Handle#handle{maxB = NewMaxB, maxF = NewMaxF}}; + NewMaxF < MaxF -> + {ok, Handle#handle{maxB = NewMaxB, maxF = {NewMaxF, MaxF}}}; + true -> + {ok, Handle#handle{maxB = NewMaxB, maxF = NewMaxF}} + end. + +%%----------------------------------------------------------------- +%% Misc functions +%%----------------------------------------------------------------- +%% -> {ok, FdC, FileName, Lost, HeadSize, FileSize} | throw(Error) +int_file_open(FName, NewFile, OldFile, OldCnt, Head) -> + Repair = truncate, Mode = read_write, + int_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode). + +%% -> {ok, FdC, FileName, Lost, HeadSize, FileSize} +%% | {repaired, FdC, FileName, Rec, Bad, FileSize} +%% | throw(Error) +int_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode) -> + N = add_ext(FName, NewFile), + case int_open(N, Repair, Mode, Head) of + {ok, {_Alloc, FdC, HeadSize, FileSize}} -> + Lost = write_index_file(Mode, FName, NewFile, OldFile, OldCnt), + {ok, FdC, N, Lost, HeadSize, FileSize}; + {repaired, FdC, Recovered, BadBytes, FileSize} -> + write_index_file(Mode, FName, NewFile, OldFile, OldCnt), + {repaired, FdC, N, Recovered, BadBytes, FileSize} + end. + +%% -> {ok, FdC, FileName, Lost, HeadSize, FileSize} | throw(Error) +ext_file_open(FName, NewFile, OldFile, OldCnt, Head) -> + Repair = truncate, Mode = read_write, + ext_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode). + +ext_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode) -> + FileName = add_ext(FName, NewFile), + {ok, {_Alloc, FdC, HeadSize, FileSize}} = + ext_open(FileName, Repair, Mode, Head), + Lost = write_index_file(Mode, FName, NewFile, OldFile, OldCnt), + {ok, FdC, FileName, Lost, HeadSize, FileSize}. + +%%----------------------------------------------------------------- +%% The old file format for index file (CurFileNo > 0), Version 0: +%% +%% CurFileNo SizeFile1 SizeFile2 ... SizeFileN +%% 1 byte 4 bytes 4 bytes 4 bytes +%% +%% The new file format for index file (NewFormat = 0), version 1: +%% +%% NewFormat CurFileNo SizeFile1 SizeFile2 ... SizeFileN +%% 1 byte 4 bytes 4 bytes 4 bytes +%% +%% The current file format for index file (sizes in bytes), version 2: +%% +%% 0 (1) 0 (4) FileFormatVersion (1) CurFileNo (4) SizeFile1 (8) ... +%% +%% (SizeFileI refers to number of items on the log file.) +%%----------------------------------------------------------------- + +-define(index_file_name(F), add_ext(F, "idx")). + +read_index_file(truncate, FName, MaxF) -> + remove_files(FName, 2, MaxF), + file:delete(?index_file_name(FName)), + {1, 0, 0, 0}; +read_index_file(_, FName, _MaxF) -> + read_index_file(FName). + +%% Used by wrap_log_reader. +%% -> {CurFileNo, CurFileSz, TotSz, NoFiles} | throw(FileError) +%% where TotSz does not include CurFileSz. + +read_index_file(FName) -> + FileName = ?index_file_name(FName), + case open_read(FileName) of + {ok, Fd} -> + R = case file:read(Fd, ?MAX_CHUNK_SIZE) of + {ok, <<0, 0:32, Version, CurF:32, Tail/binary>>} + when Version =:= ?VERSION, + 0 < CurF, CurF < ?MAX_FILES -> + parse_index(CurF, Version, 1, Tail, Fd, 0, 0, 0); + {ok, <<0, CurF:32, Tail/binary>>} + when 0 < CurF, CurF < ?MAX_FILES -> + parse_index(CurF, 1, 1, Tail, Fd, 0, 0, 0); + {ok, <>} when 0 < CurF -> + parse_index(CurF, 1, 1, Tail, Fd, 0, 0, 0); + _ErrorOrEof -> + {1, 0, 0, 0} + end, + file:close(Fd), + R; + _Error -> + {1, 0, 0, 0} + end. + +parse_index(CurF, V, CurF, <>, Fd, _, TotSz, NFiles) + when V =:= ?VERSION -> + parse_index(CurF, V, CurF+1, Tail, Fd, CurSz, TotSz, NFiles+1); +parse_index(CurF, V, N, <>, Fd, CurSz, TotSz, NFiles) + when V =:= ?VERSION -> + parse_index(CurF, V, N+1, Tail, Fd, CurSz, TotSz + Sz, NFiles+1); +parse_index(CurF, V, CurF, <>, Fd, _, TotSz, NFiles) + when V < ?VERSION -> + parse_index(CurF, V, CurF+1, Tail, Fd, CurSz, TotSz, NFiles+1); +parse_index(CurF, V, N, <>, Fd, CurSz, TotSz, NFiles) + when V < ?VERSION -> + parse_index(CurF, V, N+1, Tail, Fd, CurSz, TotSz + Sz, NFiles+1); +parse_index(CurF, V, N, B, Fd, CurSz, TotSz, NFiles) -> + case file:read(Fd, ?MAX_CHUNK_SIZE) of + eof when 0 =:= byte_size(B) -> + {CurF, CurSz, TotSz, NFiles}; + {ok, Bin} -> + NewB = list_to_binary([B, Bin]), + parse_index(CurF, V, N, NewB, Fd, CurSz, TotSz, NFiles); + _ErrorOrEof -> + {1, 0, 0, 0} + end. + +%% Returns: Number of lost items (if an old file was truncated) +%% -> integer() | throw(FileError) +write_index_file(read_only, _FName, _NewFile, _OldFile, _OldCnt) -> + 0; +write_index_file(read_write, FName, NewFile, OldFile, OldCnt) -> + FileName = ?index_file_name(FName), + case open_update(FileName) of + {ok, Fd} -> + {Offset, SzSz} = + case file:read(Fd, 6) of + eof -> + Bin = <<0, 0:32, ?VERSION, NewFile:32>>, + fwrite_close2(Fd, FileName, Bin), + {10, 8}; + {ok, <<0, 0:32, _Version>>} -> + pwrite_close2(Fd, FileName, 6, <>), + {10, 8}; + {ok, <<0, _/binary>>} -> + pwrite_close2(Fd, FileName, 1, <>), + {5, 4}; + {ok, <<_,_/binary>>} -> + %% Very old format, convert to the latest format! + case file:read_file(FileName) of + {ok, <<_CurF, Tail/binary>>} -> + position_close2(Fd, FileName, bof), + Bin = <<0, 0:32, ?VERSION, NewFile:32>>, + NewTail = to_8_bytes(Tail, [], FileName, Fd), + fwrite_close2(Fd, FileName, [Bin | NewTail]), + {10, 8}; + Error -> + file_error_close(Fd, FileName, Error) + end; + Error -> + file_error_close(Fd, FileName, Error) + end, + + NewPos = Offset + (NewFile - 1)*SzSz, + OldCntBin = <>, + if + OldFile > 0 -> + R = file:pread(Fd, NewPos, SzSz), + OldPos = Offset + (OldFile - 1)*SzSz, + pwrite_close2(Fd, FileName, OldPos, OldCntBin), + file:close(Fd), + case R of + {ok, <>} -> Lost; + {ok, _} -> + throw({error, {invalid_index_file, FileName}}); + eof -> 0; + Error2 -> file_error(FileName, Error2) + end; + true -> + pwrite_close2(Fd, FileName, NewPos, OldCntBin), + file:close(Fd), + 0 + end; + E -> + file_error(FileName, E) + end. + +to_8_bytes(<>, NT, FileName, Fd) -> + to_8_bytes(T, [NT | <>], FileName, Fd); +to_8_bytes(B, NT, _FileName, _Fd) when byte_size(B) =:= 0 -> + NT; +to_8_bytes(_B, _NT, FileName, Fd) -> + file:close(Fd), + throw({error, {invalid_index_file, FileName}}). + +%% -> ok | throw(FileError) +index_file_trunc(FName, N) -> + FileName = ?index_file_name(FName), + case open_update(FileName) of + {ok, Fd} -> + case file:read(Fd, 6) of + eof -> + file:close(Fd), + ok; + {ok, <<0, 0:32, Version>>} when Version =:= ?VERSION -> + truncate_index_file(Fd, FileName, 10, 8, N); + {ok, <<0, _/binary>>} -> + truncate_index_file(Fd, FileName, 5, 4, N); + {ok, <<_, _/binary>>} -> % cannot happen + truncate_index_file(Fd, FileName, 1, 4, N); + Error -> + file_error_close(Fd, FileName, Error) + end; + Error -> + file_error(FileName, Error) + end. + +truncate_index_file(Fd, FileName, Offset, N, SzSz) -> + Pos = Offset + N*SzSz, + case Pos > file_size(FileName) of + true -> + file:close(Fd); + false -> + truncate_at_close2(Fd, FileName, {bof, Pos}), + file:close(Fd) + end, + ok. + +print_index_file(File) -> + io:format("-- Index begin --~n"), + case file:read_file(File) of + {ok, <<0, 0:32, Version, CurF:32, Tail/binary>>} + when Version =:= ?VERSION, 0 < CurF, CurF < ?MAX_FILES -> + io:format("cur file: ~w~n", [CurF]), + loop_index(1, Version, Tail); + {ok, <<0, CurF:32, Tail/binary>>} when 0 < CurF, CurF < ?MAX_FILES -> + io:format("cur file: ~w~n", [CurF]), + loop_index(1, 1, Tail); + {ok, <>} when 0 < CurF -> + io:format("cur file: ~w~n", [CurF]), + loop_index(1, 1, Tail); + _Else -> + ok + end, + io:format("-- end --~n"). + +loop_index(N, V, <>) when V =:= ?VERSION -> + io:format(" ~p items: ~w~n", [N, Sz]), + loop_index(N+1, V, Tail); +loop_index(N, V, <>) when V < ?VERSION -> + io:format(" ~p items: ~w~n", [N, Sz]), + loop_index(N+1, V, Tail); +loop_index(_, _, _) -> + ok. + +-define(size_file_name(F), add_ext(F, "siz")). + +%% Version 0: no size file +%% Version 1: <> +%% Version 2: <> + +%% -> ok | throw(FileError) +write_size_file(read_only, _FName, _NewSize, _NewMaxFiles, _Version) -> + ok; +write_size_file(read_write, FName, NewSize, NewMaxFiles, Version) -> + FileName = ?size_file_name(FName), + Bin = if + Version =:= ?VERSION -> + <>; + true -> + <> + end, + case file:write_file(FileName, Bin) of + ok -> + ok; + E -> + file_error(FileName, E) + end. + +%% -> {NoBytes, NoFiles}. +read_size_file(FName) -> + {Size,_Version} = read_size_file_version(FName), + Size. + +%% -> {{NoBytes, NoFiles}, Version}, Version = integer() | undefined +read_size_file_version(FName) -> + case file:read_file(?size_file_name(FName)) of + {ok, <>} when Version =:= ?VERSION -> + {{Size, MaxFiles}, Version}; + {ok, <>} -> + {{Size, MaxFiles}, 1}; + _ -> + %% The oldest version too... + {{0, 0}, ?VERSION} + end. + +conv({More, Terms}, FileNo) when is_record(More, continuation) -> + Cont = More#continuation{pos = {FileNo, More#continuation.pos}}, + {Cont, Terms}; +conv({More, Terms, Bad}, FileNo) when is_record(More, continuation) -> + Cont = More#continuation{pos = {FileNo, More#continuation.pos}}, + {Cont, Terms, Bad}; +conv(Other, _) -> + Other. + +find_first_file(#handle{filename = FName, curF = CurF, maxF = MaxF}) -> + fff(FName, inc(CurF, MaxF), CurF, MaxF). + +fff(_FName, CurF, CurF, _MaxF) -> CurF; +fff(FName, MaybeFirstF, CurF, MaxF) -> + N = add_ext(FName, MaybeFirstF), + case file:read_file_info(N) of + {ok, _} -> MaybeFirstF; + _ -> fff(FName, inc(MaybeFirstF, MaxF), CurF, MaxF) + end. + +%% -> {iolist(), LastBins, NoBytes, NoTerms} +ext_split_bins(CurB, MaxB, FirstPos, Bins) -> + MaxBs = MaxB - CurB, IsFirst = CurB =:= FirstPos, + ext_split_bins(MaxBs, IsFirst, [], Bins, 0, 0). + +ext_split_bins(MaxBs, IsFirst, First, [X | Last], Bs, N) -> + NBs = Bs + byte_size(X), + if + NBs =< MaxBs -> + ext_split_bins(MaxBs, IsFirst, [First | X], Last, NBs, N+1); + IsFirst, First =:= [] -> + % To avoid infinite loop - we allow the file to be + % too big if it's just one item on the file. + {[X], Last, NBs, N+1}; + true -> + {First, [X | Last], Bs, N} + end; +ext_split_bins(_, _, First, [], Bs, N) -> + {First, [], Bs, N}. + +%% -> {iolist(), LastBins, NoBytes, NoTerms} +int_split_bins(CurB, MaxB, FirstPos, Bins) -> + MaxBs = MaxB - CurB, IsFirst = CurB =:= FirstPos, + int_split_bins(MaxBs, IsFirst, [], Bins, 0, 0). + +int_split_bins(MaxBs, IsFirst, First, [X | Last], Bs, N) -> + Sz = byte_size(X), + NBs = Bs + Sz + ?HEADERSZ, + BSz = <>, + XB = case Sz < ?MIN_MD5_TERM of + true -> + [BSz, ?BIGMAGICHEAD | X]; + false -> + MD5 = erlang:md5(BSz), + [BSz, ?BIGMAGICHEAD, MD5 | X] + end, + if + NBs =< MaxBs -> + int_split_bins(MaxBs, IsFirst, [First | XB], Last, NBs, N+1); + IsFirst, First =:= [] -> + % To avoid infinite loop - we allow the file to be + % too big if it's just one item on the file. + {[XB], Last, NBs, N+1}; + true -> + {First, [X | Last], Bs, N} + end; +int_split_bins(_, _, First, [], Bs, N) -> + {First, [], Bs, N}. + +%% -> {NewCurrentFileNo, MaxFilesToBe} | throw(FileError) +inc_wrap(FName, CurF, MaxF) -> + case MaxF of + %% Number of max files has changed + {NewMaxF, OldMaxF} -> + if + CurF >= NewMaxF -> + %% We are at or above the new number of files + remove_files(FName, CurF + 1, OldMaxF), + if + CurF > NewMaxF -> + %% The change was done while the current file was + %% greater than the new number of files. + %% The index file is not trunctated here, since + %% writing the index file while opening the file + %% with index 1 will write the value for the file + %% with extension CurF as well. Next time the + %% limit is reached, the index file will be + %% truncated. + {1, {NewMaxF, CurF}}; + true -> + %% The change was done while the current file was + %% less than the new number of files. + %% Remove the files from the index file too + index_file_trunc(FName, NewMaxF), + {1, NewMaxF} + end; + true -> + %% We haven't reached the new limit yet + NewFt = inc(CurF, NewMaxF), + {NewFt, MaxF} + end; + MaxF -> + %% Normal case. + NewFt = inc(CurF, MaxF), + {NewFt, MaxF} + end. + +inc(N, {_NewMax, OldMax}) -> inc(N, OldMax, 1); +inc(N, Max) -> inc(N, Max, 1). + +inc(N, Max, Step) -> + Nx = (N + Step) rem Max, + if + Nx > 0 -> Nx; + true -> Nx + Max + end. + + +file_size(Fname) -> + {ok, Fi} = file:read_file_info(Fname), + Fi#file_info.size. + +%% -> ok | throw(FileError) +%% Tries to remove each file with name FName.I, N<=I<=Max. +remove_files(FName, N, Max) -> + remove_files(FName, N, Max, ok). + +remove_files(_FName, N, Max, ok) when N > Max -> + ok; +remove_files(_FName, N, Max, {FileName, Error}) when N > Max -> + file_error(FileName, Error); +remove_files(FName, N, Max, Reply) -> + FileName = add_ext(FName, N), + NewReply = case file:delete(FileName) of + ok -> Reply; + {error, enoent} -> Reply; + Error -> {FileName, Error} + end, + remove_files(FName, N + 1, Max, NewReply). + +%% -> {MaxBytes, MaxFiles} +get_wrap_size(#handle{maxB = MaxB, maxF = MaxF}) -> + case MaxF of + {NewMaxF,_} -> {MaxB, NewMaxF}; + MaxF -> {MaxB, MaxF} + end. + +add_ext(Name, Ext) -> + concat([Name, ".", Ext]). + +open_read(FileName) -> + file:open(FileName, [raw, binary, read]). + +open_update(FileName) -> + file:open(FileName, [raw, binary, read, write]). + +open_truncate(FileName) -> + file:open(FileName, [raw, binary, write]). + +%%% Functions that access files, and throw on error. + +-define(MAX, 16384). % bytes +-define(TIMEOUT, 2000). % ms + +%% -> {Reply, cache()}; Reply = ok | Error +fwrite(#cache{c = []} = FdC, _FN, B, Size) -> + case get(write_cache_timer_is_running) of + true -> + ok; + _ -> + put(write_cache_timer_is_running, true), + erlang:send_after(?TIMEOUT, self(), {self(), write_cache}) + end, + {ok, FdC#cache{sz = Size, c = B}}; +fwrite(#cache{sz = Sz, c = C} = FdC, _FN, B, Size) when Sz < ?MAX -> + {ok, FdC#cache{sz = Sz+Size, c = [C | B]}}; +fwrite(#cache{fd = Fd, c = C}, FileName, B, _Size) -> + write_cache(Fd, FileName, [C | B]). + +fwrite_header(Fd, B, Size) -> + {ok, #cache{fd = Fd, sz = Size, c = B}}. + +%% -> {NewFdC, Reply}; Reply = ok | Error +pread(#cache{fd = Fd, c = C}, FileName, Position, MaxBytes) -> + Reply = write_cache(Fd, FileName, C), + case Reply of + {ok, NewFdC} -> + case file:pread(Fd, Position, MaxBytes) of + {error, Error} -> + {NewFdC, catch file_error(FileName, {error, Error})}; + R -> + {NewFdC, R} + end; + {Error, NewFdC} -> + {NewFdC, Error} + end. + +%% -> {ok, cache(), Pos} | {Error, cache()} +position(#cache{fd = Fd, c = C}, FileName, Pos) -> + Reply = write_cache(Fd, FileName, C), + case Reply of + {ok, NewFdC} -> + case position2(Fd, FileName, Pos) of + {ok, Loc} -> + {ok, NewFdC, Loc}; + Error -> + {Error, NewFdC} + end; + _Error -> + Reply + end. + +position_close(#cache{fd = Fd, c = C}, FileName, Pos) -> + NewFdC = write_cache_close(Fd, FileName, C), + {ok, Loc} = position_close2(Fd, FileName, Pos), + {NewFdC, Loc}. + +fsync(#cache{fd = Fd, c = C}, FileName) -> + Reply = write_cache(Fd, FileName, C), + case Reply of + {ok, NewFdC} -> + case file:sync(Fd) of + ok -> + Reply; + Error -> + {catch file_error(FileName, Error), NewFdC} + end; + _Error -> + Reply + end. + +%% -> {Reply, NewFdC}; Reply = ok | Error +truncate_at(FdC, FileName, Pos) -> + case position(FdC, FileName, Pos) of + {ok, NewFdC, _Pos} -> + case file:truncate(NewFdC#cache.fd) of + ok -> + {ok, NewFdC}; + Error -> + {catch file_error(FileName, Error), NewFdC} + end; + Reply -> + Reply + end. + +fwrite_close2(Fd, FileName, B) -> + case file:write(Fd, B) of + ok -> ok; + Error -> file_error_close(Fd, FileName, Error) + end. + +pwrite_close2(Fd, FileName, Position, B) -> + case file:pwrite(Fd, Position, B) of + ok -> ok; + Error -> file_error(FileName, {error, Error}) + end. + +position2(Fd, FileName, Pos) -> + case file:position(Fd, Pos) of + {error, Error} -> catch file_error(FileName, {error, Error}); + OK -> OK + end. + +position_close2(Fd, FileName, Pos) -> + case file:position(Fd, Pos) of + {error, Error} -> file_error_close(Fd, FileName, {error, Error}); + OK -> OK + end. + +truncate_at_close2(Fd, FileName, Pos) -> + position_close2(Fd, FileName, Pos), + case file:truncate(Fd) of + ok -> ok; + Error -> file_error_close(Fd, FileName, Error) + end. + +fclose(#cache{fd = Fd, c = C}, FileName) -> + %% The cache is empty if the file was opened in read_only mode. + write_cache_close(Fd, FileName, C), + file:close(Fd). + +%% -> {Reply, #cache{}}; Reply = ok | Error +write_cache(Fd, _FileName, []) -> + {ok, #cache{fd = Fd}}; +write_cache(Fd, FileName, C) -> + case file:write(Fd, C) of + ok -> {ok, #cache{fd = Fd}}; + Error -> {catch file_error(FileName, Error), #cache{fd = Fd}} + end. + +-spec write_cache_close(fd(), file:filename(), iodata()) -> #cache{}. % | throw(Error) + +write_cache_close(Fd, _FileName, []) -> + #cache{fd = Fd}; +write_cache_close(Fd, FileName, C) -> + case file:write(Fd, C) of + ok -> #cache{fd = Fd}; + Error -> file_error_close(Fd, FileName, Error) + end. + +-spec file_error(file:filename(), {'error', atom()}) -> no_return(). + +file_error(FileName, {error, Error}) -> + throw({error, {file_error, FileName, Error}}). + +-spec file_error_close(fd(), file:filename(), {'error', atom()}) -> no_return(). + +file_error_close(Fd, FileName, {error, Error}) -> + file:close(Fd), + throw({error, {file_error, FileName, Error}}). -- cgit v1.2.3