%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(disk_log_1).
%% Efficient file based log - implementation part
-export([int_open/4, ext_open/4, logl/1, close/3, truncate/3, chunk/5,
sync/2, write_cache/2]).
-export([mf_int_open/7, mf_int_log/3, mf_int_close/2, mf_int_inc/2,
mf_ext_inc/2, mf_int_chunk/4, mf_int_chunk_step/3,
mf_sync/1, mf_write_cache/1]).
-export([mf_ext_open/7, mf_ext_log/3, mf_ext_close/2]).
-export([print_index_file/1]).
-export([read_index_file/1]).
-export([read_size_file/1, read_size_file_version/1]).
-export([chunk_read_only/5]).
-export([mf_int_chunk_read_only/4]).
-export([change_size_wrap/3]).
-export([get_wrap_size/1]).
-export([is_head/1]).
-export([position/3, truncate_at/3, fwrite/4, fclose/2]).
-compile({inline,[{scan_f2,7}]}).
-import(lists, [concat/1, reverse/1, sum/1]).
-include("disk_log.hrl").
%%% At the head of a LOG file we have [?LOGMAGIC, ?OPENED | ?CLOSED].
%%% Otherwise it's not a LOG file. Following that, the head, come the
%%% logged items.
%%%
%%% There are four formats of wrap log files (so far). Only the size
%%% file and the index file differ between versions between the first
%%% three version. The fourth version 2(a), has some protection
%%% against damaged item sizes.
%%% Version 0: no "siz" file
%%% Version 1: "siz" file, 4 byte sizes
%%% Version 2: 8 byte sizes (support for large files)
%%% Version 2(a): Change of the format of logged items:
%%% if the size of a term binary is greater than or equal to
%%% ?MIN_MD5_TERM, a logged item looks like
%%% <<Size:32, ?BIGMAGICHEAD:32, MD5:128, Term/binary>>,
%%% otherwise <<Size:32, ?BIGMAGICHEAD:32, Term/binary>>.
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
%% -> {ok, NoBytes, NewFdC} | {Error, NewFdC}
log(FdC, FileName, X) ->
{Bs, Size} = logl(X, [], 0),
case fwrite(FdC, FileName, Bs, Size) of
{ok, NewFdC} ->
{ok, Size, NewFdC};
Error ->
Error
end.
-spec logl([binary()]) -> {iolist(), non_neg_integer()}.
logl(X) ->
logl(X, [], 0).
logl([X | T], Bs, Size) ->
Sz = byte_size(X),
BSz = <<Sz:?SIZESZ/unit:8>>,
NBs = case Sz < ?MIN_MD5_TERM of
true ->
[Bs, BSz, ?BIGMAGICHEAD | X];
false ->
MD5 = erlang:md5(BSz),
[Bs, BSz, ?BIGMAGICHEAD, MD5 | X]
end,
logl(T, NBs, Size + ?HEADERSZ + Sz);
logl([], Bs, Size) ->
{Bs, Size}.
%% -> {ok, NewFdC} | {Error, NewFdC}
write_cache(#cache{fd = Fd, c = C}, FName) ->
erase(write_cache_timer_is_running),
write_cache(Fd, FName, C).
%% -> {Reply, NewFdC}; Reply = ok | Error
sync(FdC, FName) ->
fsync(FdC, FName).
%% -> {Reply, NewFdC}; Reply = ok | Error
truncate(FdC, FileName, Head) ->
Reply = truncate_at(FdC, FileName, ?HEADSZ),
case Reply of
{ok, _} when Head =:= none ->
Reply;
{ok, FdC1} ->
{ok, B} = Head,
case log(FdC1, FileName, [B]) of
{ok, _NoBytes, NewFdC} ->
{ok, NewFdC};
Reply2 ->
Reply2
end;
_ ->
Reply
end.
%% -> {NewFdC, Reply}, Reply = {Cont, Binaries} | {error, Reason} | eof
chunk(FdC, FileName, Pos, B, N) when is_binary(B) ->
true = byte_size(B) >= ?HEADERSZ,
do_handle_chunk(FdC, FileName, Pos, B, N);
chunk(FdC, FileName, Pos, NoBytes, N) ->
MaxNoBytes = case NoBytes of
[] -> ?MAX_CHUNK_SIZE;
_ -> erlang:max(NoBytes, ?MAX_CHUNK_SIZE)
end,
case read_chunk(FdC, FileName, Pos, MaxNoBytes) of
{NewFdC, {ok, Bin}} when byte_size(Bin) < ?HEADERSZ ->
{NewFdC, {error, {corrupt_log_file, FileName}}};
{NewFdC, {ok, Bin}} when NoBytes =:= []; byte_size(Bin) >= NoBytes ->
NewPos = Pos + byte_size(Bin),
do_handle_chunk(NewFdC, FileName, NewPos, Bin, N);
{NewFdC, {ok, _Bin}} ->
{NewFdC, {error, {corrupt_log_file, FileName}}};
{NewFdC, eof} when is_integer(NoBytes) -> % "cannot happen"
{NewFdC, {error, {corrupt_log_file, FileName}}};
Other -> % eof or error
Other
end.
do_handle_chunk(FdC, FileName, Pos, B, N) ->
case handle_chunk(B, Pos, N, []) of
corrupt ->
{FdC, {error, {corrupt_log_file, FileName}}};
{C, []} ->
chunk(FdC, FileName, C#continuation.pos, C#continuation.b, N);
C_Ack ->
{FdC, C_Ack}
end.
handle_chunk(B, Pos, 0, Ack) when byte_size(B) >= ?HEADERSZ ->
{#continuation{pos = Pos, b = B}, Ack};
handle_chunk(B= <<Size:?SIZESZ/unit:8, ?BIGMAGICINT:?MAGICSZ/unit:8,
Tail/binary>>, Pos, N, Ack) when Size < ?MIN_MD5_TERM ->
case Tail of
<<BinTerm:Size/binary, Tail2/binary>> ->
%% The client calls binary_to_term/1.
handle_chunk(Tail2, Pos, N-1, [BinTerm | Ack]);
_ ->
BytesToRead = Size + ?HEADERSZ,
{#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack}
end;
handle_chunk(B= <<Size:?SIZESZ/unit:8, ?BIGMAGICINT:?MAGICSZ/unit:8,
Tail/binary>>, Pos, _N, Ack) -> % when Size >= ?MIN_MD5_TERM
MD5 = erlang:md5(<<Size:?SIZESZ/unit:8>>),
case Tail of
%% The requested object is always bigger than a chunk.
<<MD5:16/binary, Bin:Size/binary>> ->
{#continuation{pos = Pos, b = []}, [Bin | Ack]};
<<MD5:16/binary, _/binary>> ->
BytesToRead = Size + ?HEADERSZ + 16,
{#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack};
_ when byte_size(Tail) >= 16 ->
corrupt;
_ ->
{#continuation{pos = Pos - byte_size(B), b = []}, Ack}
end;
handle_chunk(B= <<Size:?SIZESZ/unit:8, ?MAGICINT:?MAGICSZ/unit:8, Tail/binary>>,
Pos, N, Ack) ->
%% Version 2, before 2(a).
case Tail of
<<BinTerm:Size/binary, Tail2/binary>> ->
handle_chunk(Tail2, Pos, N-1, [BinTerm | Ack]);
_ ->
%% We read the whole thing into one binary, even if Size is huge.
BytesToRead = Size + ?HEADERSZ,
{#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack}
end;
handle_chunk(B, _Pos, _N, _Ack) when byte_size(B) >= ?HEADERSZ ->
corrupt;
handle_chunk(B, Pos, _N, Ack) ->
{#continuation{pos = Pos-byte_size(B), b = []}, Ack}.
read_chunk(FdC, FileName, Pos, MaxBytes) ->
{FdC1, R} = pread(FdC, FileName, Pos + ?HEADSZ, MaxBytes),
case position(FdC1, FileName, eof) of
{ok, NewFdC, _Pos} ->
{NewFdC, R};
{Error, NewFdC} ->
{NewFdC, Error}
end.
%% Used by wrap_log_reader.
%% -> {NewFdC, Reply},
%% Reply = {Cont, Binaries, Bad} (Bad >= 0) | {error, Reason} | eof
chunk_read_only(FdC = #cache{}, FileName, Pos, B, N) ->
do_chunk_read_only(FdC, FileName, Pos, B, N);
chunk_read_only(Fd, FileName, Pos, B, N) ->
%% wrap_log_reader calling...
FdC = #cache{fd = Fd},
{_NFdC, Reply} = do_chunk_read_only(FdC, FileName, Pos, B, N),
Reply.
do_chunk_read_only(FdC, FileName, Pos, B, N) when is_binary(B) ->
true = byte_size(B) >= ?HEADERSZ,
do_handle_chunk_ro(FdC, FileName, Pos, B, N);
do_chunk_read_only(FdC, FileName, Pos, NoBytes, N) ->
MaxNoBytes = case NoBytes of
[] -> ?MAX_CHUNK_SIZE;
_ -> erlang:max(NoBytes, ?MAX_CHUNK_SIZE)
end,
case read_chunk_ro(FdC, FileName, Pos, MaxNoBytes) of
{NewFdC, {ok, Bin}} when byte_size(Bin) < ?HEADERSZ ->
NewCont = #continuation{pos = Pos+byte_size(Bin), b = []},
{NewFdC, {NewCont, [], byte_size(Bin)}};
{NewFdC, {ok, Bin}} when NoBytes =:= []; byte_size(Bin) >= NoBytes ->
NewPos = Pos + byte_size(Bin),
do_handle_chunk_ro(NewFdC, FileName, NewPos, Bin, N);
{NewFdC, {ok, Bin}} ->
NewCont = #continuation{pos = Pos+byte_size(Bin), b = []},
{NewFdC, {NewCont, [], byte_size(Bin)-?HEADERSZ}};
{NewFdC, eof} when is_integer(NoBytes) -> % "cannot happen"
{NewFdC, eof}; % what else?
Other ->
Other
end.
do_handle_chunk_ro(FdC, FileName, Pos, B, N) ->
case handle_chunk_ro(B, Pos, N, [], 0) of
{C, [], 0} ->
#continuation{pos = NewPos, b = NoBytes} = C,
do_chunk_read_only(FdC, FileName, NewPos, NoBytes, N);
C_Ack_Bad ->
{FdC, C_Ack_Bad}
end.
handle_chunk_ro(B, Pos, 0, Ack, Bad) when byte_size(B) >= ?HEADERSZ ->
{#continuation{pos = Pos, b = B}, Ack, Bad};
handle_chunk_ro(B= <<Size:?SIZESZ/unit:8, ?BIGMAGICINT:?MAGICSZ/unit:8,
Tail/binary>>, Pos, N, Ack, Bad) when Size < ?MIN_MD5_TERM ->
case Tail of
<<BinTerm:Size/binary, Tail2/binary>> ->
handle_chunk_ro(Tail2, Pos, N-1, [BinTerm | Ack], Bad);
_ ->
BytesToRead = Size + ?HEADERSZ,
{#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack, Bad}
end;
handle_chunk_ro(B= <<Size:?SIZESZ/unit:8, ?BIGMAGICINT:?MAGICSZ/unit:8,
Tail/binary>>, Pos, N, Ack, Bad) -> % when Size>=?MIN_MD5_TERM
MD5 = erlang:md5(<<Size:?SIZESZ/unit:8>>),
case Tail of
<<MD5:16/binary, Bin:Size/binary>> ->
%% The requested object is always bigger than a chunk.
{#continuation{pos = Pos, b = []}, [Bin | Ack], Bad};
<<MD5:16/binary, _/binary>> ->
BytesToRead = Size + ?HEADERSZ + 16,
{#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack, Bad};
<<_BadMD5:16/binary, _:1/unit:8, Tail2/binary>> ->
handle_chunk_ro(Tail2, Pos, N-1, Ack, Bad+1);
_ ->
{#continuation{pos = Pos - byte_size(B), b = []}, Ack, Bad}
end;
handle_chunk_ro(B= <<Size:?SIZESZ/unit:8, ?MAGICINT:?MAGICSZ/unit:8,
Tail/binary>>, Pos, N, Ack, Bad) ->
%% Version 2, before 2(a).
case Tail of
<<BinTerm:Size/binary, Tail2/binary>> ->
handle_chunk_ro(Tail2, Pos, N-1, [BinTerm | Ack], Bad);
_ ->
%% We read the whole thing into one binary, even if Size is huge.
BytesToRead = Size + ?HEADERSZ,
{#continuation{pos = Pos - byte_size(B), b = BytesToRead}, Ack, Bad}
end;
handle_chunk_ro(B, Pos, N, Ack, Bad) when byte_size(B) >= ?HEADERSZ ->
<<_:1/unit:8, B2/binary>> = B,
handle_chunk_ro(B2, Pos, N-1, Ack, Bad+1);
handle_chunk_ro(B, Pos, _N, Ack, Bad) ->
{#continuation{pos = Pos-byte_size(B), b = []}, Ack, Bad}.
read_chunk_ro(FdC, FileName, Pos, MaxBytes) ->
pread(FdC, FileName, Pos + ?HEADSZ, MaxBytes).
%% -> ok | throw(Error)
close(#cache{fd = Fd, c = []}, _FileName, read_only) ->
file:close(Fd);
close(#cache{fd = Fd, c = C}, FileName, read_write) ->
{Reply, _NewFdC} = write_cache(Fd, FileName, C),
mark(Fd, FileName, ?CLOSED),
file:close(Fd),
if Reply =:= ok -> ok; true -> throw(Reply) end.
%% Open an internal file. Head is ignored if Mode is read_only.
%% int_open(FileName, Repair, Mode, Head) ->
%% {ok, {Alloc, FdC, HeadSize, FileSize}}
%% | {repaired, FdC, Terms, BadBytes, FileSize}
%% | throw(Error)
%% Alloc = new | existed
%% HeadSize = {NumberOfItemsWritten, NumberOfBytesWritten}
%% (HeadSize is equal {0, 0} if Alloc =:= existed, or no header written.)
int_open(FName, truncate, read_write, Head) ->
new_int_file(FName, Head);
int_open(FName, Repair, read_write, Head) ->
case open_read(FName) of
{ok, Fd} -> %% File exists
case file:read(Fd, ?HEADSZ) of
{ok, FileHead} ->
case is_head(FileHead) of
yes ->
file:close(Fd),
case open_update(FName) of
{ok, Fd2} ->
mark(Fd2, FName, ?OPENED),
FdC1 = #cache{fd = Fd2},
{FdC, P} = position_close(FdC1, FName,eof),
{ok, {existed, FdC, {0, 0}, P}};
Error ->
file_error(FName, Error)
end;
yes_not_closed when Repair ->
repair(Fd, FName);
yes_not_closed when not Repair ->
file:close(Fd),
throw({error, {need_repair, FName}});
no ->
file:close(Fd),
throw({error, {not_a_log_file, FName}})
end;
eof ->
file:close(Fd),
throw({error, {not_a_log_file, FName}});
Error ->
file_error_close(Fd, FName, Error)
end;
_Other ->
new_int_file(FName, Head)
end;
int_open(FName, _Repair, read_only, _Head) ->
case open_read(FName) of
{ok, Fd} -> %% File exists
case file:read(Fd, ?HEADSZ) of
{ok, Head} ->
case is_head(Head) of
yes ->
{ok, P} = position_close2(Fd, FName, eof),
FdC = #cache{fd = Fd},
{ok, {existed, FdC, {0, 0}, P}};
yes_not_closed ->
{ok, P} = position_close2(Fd, FName, eof),
FdC = #cache{fd = Fd},
{ok, {existed, FdC, {0, 0}, P}};
no ->
file:close(Fd),
throw({error, {not_a_log_file, FName}})
end;
eof ->
file:close(Fd),
throw({error, {not_a_log_file, FName}});
Error ->
file_error_close(Fd, FName, Error)
end;
Error ->
file_error(FName, Error)
end.
new_int_file(FName, Head) ->
case open_update(FName) of
{ok, Fd} ->
ok = truncate_at_close2(Fd, FName, bof),
fwrite_close2(Fd, FName, [?LOGMAGIC, ?OPENED]),
{FdC1, Nh, HeadSz} = int_log_head(Fd, Head),
{FdC, FileSize} = position_close(FdC1, FName, cur),
{ok, {new, FdC, {Nh, ?HEADERSZ + HeadSz}, FileSize}};
Error ->
file_error(FName, Error)
end.
%% -> {FdC, NoItemsWritten, NoBytesWritten} | throw(Error)
int_log_head(Fd, Head) ->
case lh(Head, internal) of
{ok, BinHead} ->
{Bs, Size} = logl([BinHead]),
{ok, FdC} = fwrite_header(Fd, Bs, Size),
{FdC, 1, Size};
none ->
{#cache{fd = Fd}, 0, 0};
Error ->
file:close(Fd),
throw(Error)
end.
%% Open an external file.
%% -> {ok, {Alloc, FdC, HeadSize}, FileSize} | throw(Error)
ext_open(FName, truncate, read_write, Head) ->
new_ext_file(FName, Head);
ext_open(FName, _Repair, read_write, Head) ->
case file:read_file_info(FName) of
{ok, _FileInfo} ->
case open_update(FName) of
{ok, Fd} ->
{ok, P} = position_close2(Fd, FName, eof),
FdC = #cache{fd = Fd},
{ok, {existed, FdC, {0, 0}, P}};
Error ->
file_error(FName, Error)
end;
_Other ->
new_ext_file(FName, Head)
end;
ext_open(FName, _Repair, read_only, _Head) ->
case open_read(FName) of
{ok, Fd} ->
{ok, P} = position_close2(Fd, FName, eof),
FdC = #cache{fd = Fd},
{ok, {existed, FdC, {0, 0}, P}};
Error ->
file_error(FName, Error)
end.
new_ext_file(FName, Head) ->
case open_truncate(FName) of
{ok, Fd} ->
{FdC1, HeadSize} = ext_log_head(Fd, Head),
{FdC, FileSize} = position_close(FdC1, FName, cur),
{ok, {new, FdC, HeadSize, FileSize}};
Error ->
file_error(FName, Error)
end.
%% -> {FdC, {NoItemsWritten, NoBytesWritten}} | throw(Error)
ext_log_head(Fd, Head) ->
case lh(Head, external) of
{ok, BinHead} ->
Size = byte_size(BinHead),
{ok, FdC} = fwrite_header(Fd, BinHead, Size),
{FdC, {1, Size}};
none ->
{#cache{fd = Fd}, {0, 0}};
Error ->
file:close(Fd),
throw(Error)
end.
%% -> _Any | throw()
mark(Fd, FileName, What) ->
position_close2(Fd, FileName, 4),
fwrite_close2(Fd, FileName, What).
%% -> {ok, Bin} | Error
lh({ok, Bin}, _Format) ->
{ok, Bin};
lh({M, F, A}, Format) when is_list(A) ->
case catch apply(M, F, A) of
{ok, Head} when Format =:= internal ->
{ok, term_to_binary(Head)};
{ok, Bin} when is_binary(Bin) ->
{ok, Bin};
{ok, Bytes} ->
case catch list_to_binary(Bytes) of
{'EXIT', _} ->
{error, {invalid_header, {{M,F,A}, {ok, Bytes}}}};
Bin ->
{ok, Bin}
end;
{'EXIT', Error} ->
{error, {invalid_header, {{M,F,A}, Error}}};
Error ->
{error, {invalid_header, {{M,F,A}, Error}}}
end;
lh({M, F, A}, _Format) -> % cannot happen
{error, {invalid_header, {M, F, A}}};
lh(none, _Format) ->
none;
lh(H, _F) -> % cannot happen
{error, {invalid_header, H}}.
repair(In, File) ->
FSz = file_size(File),
error_logger:info_msg("disk_log: repairing ~p ...\n", [File]),
Tmp = add_ext(File, "TMP"),
{ok, {_Alloc, Out, {0, _}, _FileSize}} = new_int_file(Tmp, none),
scan_f_read(<<>>, In, Out, File, FSz, Tmp, ?MAX_CHUNK_SIZE, 0, 0).
scan_f_read(B, In, Out, File, FSz, Tmp, MaxBytes, No, Bad) ->
case file:read(In, MaxBytes) of
eof ->
done_scan(In, Out, Tmp, File, No, Bad+byte_size(B));
{ok, Bin} ->
NewBin = list_to_binary([B, Bin]),
{NB, NMax, Ack, NNo, NBad} =
scan_f(NewBin, FSz, [], No, Bad),
case log(Out, Tmp, lists:reverse(Ack)) of
{ok, _Size, NewOut} ->
scan_f_read(NB, In, NewOut, File, FSz, Tmp, NMax,NNo,NBad);
{{error, {file_error, _Filename, Error}}, NewOut} ->
repair_err(In, NewOut, Tmp, File, {error, Error})
end;
Error ->
repair_err(In, Out, Tmp, File, Error)
end.
scan_f(B = <<Size:?SIZESZ/unit:8, ?BIGMAGICINT:?MAGICSZ/unit:8, Tail/binary>>,
FSz, Ack, No, Bad) when Size < ?MIN_MD5_TERM ->
scan_f2(B, FSz, Ack, No, Bad, Size, Tail);
scan_f(B = <<Size:?SIZESZ/unit:8, ?BIGMAGICINT:?MAGICSZ/unit:8, Tail/binary>>,
FSz, Ack, No, Bad) -> % when Size >= ?MIN_MD5_TERM
MD5 = erlang:md5(<<Size:?SIZESZ/unit:8>>),
case Tail of
<<MD5:16/binary, BinTerm:Size/binary, Tail2/binary>> ->
case catch binary_to_term(BinTerm) of
{'EXIT', _} ->
scan_f(Tail2, FSz, Ack, No, Bad+Size);
_Term ->
scan_f(Tail2, FSz, [BinTerm | Ack], No+1, Bad)
end;
<<MD5:16/binary, _/binary>> ->
{B, Size-byte_size(Tail)+16, Ack, No, Bad};
_ when byte_size(Tail) < 16 ->
{B, Size-byte_size(Tail)+16, Ack, No, Bad};
_ ->
<<_:8, B2/binary>> = B,
scan_f(B2, FSz, Ack, No, Bad+1)
end;
scan_f(B = <<Size:?SIZESZ/unit:8, ?MAGICINT:?MAGICSZ/unit:8, Tail/binary>>,
FSz, Ack, No, Bad) when Size =< FSz ->
%% Since the file is not compressed, the item size cannot exceed
%% the file size.
scan_f2(B, FSz, Ack, No, Bad, Size, Tail);
scan_f(B = <<_:?HEADERSZ/unit:8, _/binary>>, FSz, Ack, No, Bad) ->
<<_:8, B2/binary>> = B,
scan_f(B2, FSz, Ack, No, Bad + 1);
scan_f(B, _FSz, Ack, No, Bad) ->
{B, ?MAX_CHUNK_SIZE, Ack, No, Bad}.
scan_f2(B, FSz, Ack, No, Bad, Size, Tail) ->
case Tail of
<<BinTerm:Size/binary, Tail2/binary>> ->
case catch binary_to_term(BinTerm) of
{'EXIT', _} ->
<<_:8, B2/binary>> = B,
scan_f(B2, FSz, Ack, No, Bad+1);
_Term ->
scan_f(Tail2, FSz, [BinTerm | Ack], No+1, Bad)
end;
_ ->
{B, Size-byte_size(Tail), Ack, No, Bad}
end.
done_scan(In, Out, OutName, FName, RecoveredTerms, BadChars) ->
file:close(In),
case catch fclose(Out, OutName) of
ok ->
case file:rename(OutName, FName) of
ok ->
case open_update(FName) of
{ok, New} ->
{ok, P} = position_close2(New, FName, eof),
FdC = #cache{fd = New},
{repaired, FdC, RecoveredTerms, BadChars, P};
Error ->
file_error(FName, Error)
end;
Error ->
file:delete(OutName),
file_error(FName, Error)
end;
Error ->
file:delete(OutName),
throw(Error)
end.
-spec repair_err(file:io_device(), #cache{}, file:filename(),
file:filename(), {'error', file:posix()}) -> no_return().
repair_err(In, Out, OutName, ErrFileName, Error) ->
file:close(In),
catch fclose(Out, OutName),
%% OutName is often the culprit, try to remove it anyway...
file:delete(OutName),
file_error(ErrFileName, Error).
%% Used by wrap_log_reader.
-spec is_head(binary()) -> 'yes' | 'yes_not_closed' | 'no'.
is_head(<<M:4/binary, S:4/binary>>) when ?LOGMAGIC =:= M, ?CLOSED =:= S ->
yes;
is_head(<<M:4/binary, S:4/binary>>) when ?LOGMAGIC =:= M, ?OPENED =:= S ->
yes_not_closed;
is_head(Bin) when is_binary(Bin) ->
no.
%%-----------------------------------------------------------------
%% Func: mf_int_open/7, mf_ext_open/7
%% Args: FName = file:filename()
%% MaxB = integer()
%% MaxF = integer()
%% Repair = truncate | true | false
%% Mode = read_write | read_only
%% Head = none | {ok, Bin} | {M, F, A}
%% Version = integer()
%% Purpose: An ADT for wrapping logs. mf_int_ writes binaries (mf_ext_
%% writes bytes)
%% to files called FName.1, FName.2, ..., FName.MaxF.
%% Writes MaxB bytes on each file.
%% Creates a file called Name.idx in the Dir. This
%% file contains the last written FileName as one byte, and
%% follwing that, the sizes of each file (size 0 number of items).
%% On startup, this file is read, and the next available
%% filename is used as first log file.
%% Reports can be browsed with Report Browser Tool (rb), or
%% read with disk_log.
%%-----------------------------------------------------------------
-spec mf_int_open(FName :: file:filename(),
MaxB :: integer(),
MaxF :: integer(),
Repair :: dlog_repair(),
Mode :: dlog_mode(),
Head :: dlog_head(),
Version :: integer())
-> {'ok', #handle{}, integer()}
| {'repaired', #handle{},
non_neg_integer(), non_neg_integer(), non_neg_integer()}.
%% | throw(FileError)
mf_int_open(FName, MaxB, MaxF, Repair, Mode, Head, Version) ->
{First, Sz, TotSz, NFiles} = read_index_file(Repair, FName, MaxF),
write_size_file(Mode, FName, MaxB, MaxF, Version),
NewMaxF = if
NFiles > MaxF ->
{MaxF, NFiles};
true ->
MaxF
end,
case int_file_open(FName, First, 0, 0, Head, Repair, Mode) of
{ok, FdC, FileName, Lost, {NoItems, NoBytes}, FSz} ->
% firstPos = NoBytes is not always correct when the file
% existed, but it will have to do since we don't know
% where the header ends.
CurCnt = Sz + NoItems - Lost,
{ok, #handle{filename = FName, maxB = MaxB,
maxF = NewMaxF, curF = First, cur_fdc = FdC,
cur_name = FileName, cur_cnt = CurCnt,
acc_cnt = -Sz, curB = FSz,
firstPos = NoBytes, noFull = 0, accFull = 0},
TotSz + CurCnt};
{repaired, FdC, FileName, Rec, Bad, FSz} ->
{repaired,
#handle{filename = FName, maxB = MaxB, cur_name = FileName,
maxF = NewMaxF, curF = First, cur_fdc = FdC,
cur_cnt = Rec, acc_cnt = -Rec, curB = FSz,
firstPos = 0, noFull = 0, accFull = 0},
Rec, Bad, TotSz + Rec}
end.
%% -> {ok, handle(), Lost} | {error, Error, handle()}
mf_int_inc(Handle, Head) ->
#handle{filename = FName, cur_cnt = CurCnt, acc_cnt = AccCnt,
cur_name = FileName, curF = CurF, maxF = MaxF,
cur_fdc = CurFdC, noFull = NoFull} = Handle,
case catch wrap_int_log(FName, CurF, MaxF, CurCnt, Head) of
{NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} ->
Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF,
cur_name = NewFileName,
cur_cnt = Nh, acc_cnt = AccCnt + CurCnt,
maxF = NewMaxF, firstPos = FirstPos,
curB = FirstPos, noFull = NoFull + 1},
case catch close(CurFdC, FileName, read_write) of
ok ->
{ok, Handle1, Lost};
Error -> % Error in the last file, new file opened.
{error, Error, Handle1}
end;
Error ->
{error, Error, Handle}
end.
%% -> {ok, handle(), Logged, Lost, NoWraps} | {ok, handle(), Logged}
%% | {error, Error, handle(), Logged, Lost}
%% The returned handle is not always valid - something may
%% have been written before things went wrong.
mf_int_log(Handle, Bins, Head) ->
mf_int_log(Handle, Bins, Head, 0, []).
mf_int_log(Handle, [], _Head, No, []) ->
{ok, Handle, No};
mf_int_log(Handle, [], _Head, No, Wraps0) ->
Wraps = reverse(Wraps0),
{ok, Handle, No, sum(Wraps), Wraps};
mf_int_log(Handle, Bins, Head, No0, Wraps) ->
#handle{curB = CurB, maxB = MaxB, cur_name = FileName, cur_fdc = CurFdC,
firstPos = FirstPos0, cur_cnt = CurCnt} = Handle,
{FirstBins, LastBins, NoBytes, N} =
int_split_bins(CurB, MaxB, FirstPos0, Bins),
case FirstBins of
[] ->
#handle{filename = FName, curF = CurF, maxF = MaxF,
acc_cnt = AccCnt, noFull = NoFull} = Handle,
case catch wrap_int_log(FName, CurF, MaxF, CurCnt, Head) of
{NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} ->
Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF,
cur_cnt = Nh,
cur_name = NewFileName,
acc_cnt = AccCnt + CurCnt,
maxF = NewMaxF,
curB = FirstPos,
firstPos = FirstPos,
noFull = NoFull + 1},
case catch close(CurFdC, FileName, read_write) of
ok ->
mf_int_log(Handle1, Bins, Head, No0 + Nh,
[Lost | Wraps]);
Error ->
Lost1 = Lost + sum(Wraps),
{error, Error, Handle1, No0 + Nh, Lost1}
end;
Error ->
{error, Error, Handle, No0, sum(Wraps)}
end;
_ ->
case fwrite(CurFdC, FileName, FirstBins, NoBytes) of
{ok, NewCurFdC} ->
Handle1 = Handle#handle{cur_fdc = NewCurFdC,
curB = CurB + NoBytes,
cur_cnt = CurCnt + N},
mf_int_log(Handle1, LastBins, Head, No0 + N, Wraps);
{Error, NewCurFdC} ->
Handle1 = Handle#handle{cur_fdc = NewCurFdC},
{error, Error, Handle1, No0, sum(Wraps)}
end
end.
wrap_int_log(FName, CurF, MaxF, CurCnt, Head) ->
{NewF, NewMaxF} = inc_wrap(FName, CurF, MaxF),
{ok, NewFdC, NewFileName, Lost, {Nh, FirstPos}, _FileSize} =
int_file_open(FName, NewF, CurF, CurCnt, Head),
{NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost}.
%% -> {NewHandle, Reply}, Reply = {Cont, Binaries} | {error, Reason} | eof
mf_int_chunk(Handle, 0, Bin, N) ->
FirstF = find_first_file(Handle),
mf_int_chunk(Handle, {FirstF, 0}, Bin, N);
mf_int_chunk(#handle{curF = FileNo, cur_fdc = FdC, cur_name = FileName}
= Handle, {FileNo, Pos}, Bin, N) ->
{NewFdC, Reply} = chunk(FdC, FileName, Pos, Bin, N),
{Handle#handle{cur_fdc = NewFdC}, conv(Reply, FileNo)};
mf_int_chunk(Handle, {FileNo, Pos}, Bin, N) ->
FName = add_ext(Handle#handle.filename, FileNo),
NFileNo = inc(FileNo, Handle#handle.maxF),
case catch int_open(FName, true, read_only, any) of
{error, _Reason} ->
error_logger:info_msg("disk_log: chunk error. File ~p missing.\n\n",
[FName]),
mf_int_chunk(Handle, {NFileNo, 0}, [], N);
{ok, {_Alloc, FdC, _HeadSize, _FileSize}} ->
case chunk(FdC, FName, Pos, Bin, N) of
{NewFdC, eof} ->
file:close(NewFdC#cache.fd),
mf_int_chunk(Handle, {NFileNo, 0}, [], N);
{NewFdC, Other} ->
file:close(NewFdC#cache.fd),
{Handle, conv(Other, FileNo)}
end
end.
%% -> {NewHandle, Reply},
%% Reply = {Cont, Binaries, Bad} (Bad >= 0) | {error, Reason} | eof
mf_int_chunk_read_only(Handle, 0, Bin, N) ->
FirstF = find_first_file(Handle),
mf_int_chunk_read_only(Handle, {FirstF, 0}, Bin, N);
mf_int_chunk_read_only(#handle{curF = FileNo, cur_fdc = FdC, cur_name=FileName}
= Handle, {FileNo, Pos}, Bin, N) ->
{NewFdC, Reply} = do_chunk_read_only(FdC, FileName, Pos, Bin, N),
{Handle#handle{cur_fdc = NewFdC}, conv(Reply, FileNo)};
mf_int_chunk_read_only(Handle, {FileNo, Pos}, Bin, N) ->
FName = add_ext(Handle#handle.filename, FileNo),
NFileNo = inc(FileNo, Handle#handle.maxF),
case catch int_open(FName, true, read_only, any) of
{error, _Reason} ->
error_logger:info_msg("disk_log: chunk error. File ~p missing.\n\n",
[FName]),
mf_int_chunk_read_only(Handle, {NFileNo, 0}, [], N);
{ok, {_Alloc, FdC, _HeadSize, _FileSize}} ->
case do_chunk_read_only(FdC, FName, Pos, Bin, N) of
{NewFdC, eof} ->
file:close(NewFdC#cache.fd),
mf_int_chunk_read_only(Handle, {NFileNo,0}, [], N);
{NewFdC, Other} ->
file:close(NewFdC#cache.fd),
{Handle, conv(Other, FileNo)}
end
end.
%% -> {ok, Cont} | Error
mf_int_chunk_step(Handle, 0, Step) ->
FirstF = find_first_file(Handle),
mf_int_chunk_step(Handle, {FirstF, 0}, Step);
mf_int_chunk_step(Handle, {FileNo, _Pos}, Step) ->
NFileNo = inc(FileNo, Handle#handle.maxF, Step),
FileName = add_ext(Handle#handle.filename, NFileNo),
case file:read_file_info(FileName) of
{ok, _FileInfo} ->
{ok, #continuation{pos = {NFileNo, 0}, b = []}};
_Error ->
{error, end_of_log}
end.
%% -> {Reply, handle()}; Reply = ok | Error
mf_write_cache(#handle{filename = FName, cur_fdc = FdC} = Handle) ->
erase(write_cache_timer_is_running),
#cache{fd = Fd, c = C} = FdC,
{Reply, NewFdC} = write_cache(Fd, FName, C),
{Reply, Handle#handle{cur_fdc = NewFdC}}.
%% -> {Reply, handle()}; Reply = ok | Error
mf_sync(#handle{filename = FName, cur_fdc = FdC} = Handle) ->
{Reply, NewFdC} = fsync(FdC, FName),
{Reply, Handle#handle{cur_fdc = NewFdC}}.
%% -> ok | throw(FileError)
mf_int_close(#handle{filename = FName, curF = CurF, cur_name = FileName,
cur_fdc = CurFdC, cur_cnt = CurCnt}, Mode) ->
close(CurFdC, FileName, Mode),
write_index_file(Mode, FName, CurF, CurF, CurCnt),
ok.
%% -> {ok, handle(), Cnt} | throw(FileError)
mf_ext_open(FName, MaxB, MaxF, Repair, Mode, Head, Version) ->
{First, Sz, TotSz, NFiles} = read_index_file(Repair, FName, MaxF),
write_size_file(Mode, FName, MaxB, MaxF, Version),
NewMaxF = if
NFiles > MaxF ->
{MaxF, NFiles};
true ->
MaxF
end,
{ok, FdC, FileName, Lost, {NoItems, NoBytes}, CurB} =
ext_file_open(FName, First, 0, 0, Head, Repair, Mode),
CurCnt = Sz + NoItems - Lost,
{ok, #handle{filename = FName, maxB = MaxB, cur_name = FileName,
maxF = NewMaxF, cur_cnt = CurCnt, acc_cnt = -Sz,
curF = First, cur_fdc = FdC, firstPos = NoBytes,
curB = CurB, noFull = 0, accFull = 0},
TotSz + CurCnt}.
%% -> {ok, handle(), Lost}
%% | {error, Error, handle()}
%% | throw(FatalError)
%% Fatal errors should always terminate the log.
mf_ext_inc(Handle, Head) ->
#handle{filename = FName, cur_cnt = CurCnt, cur_name = FileName,
acc_cnt = AccCnt, curF = CurF, maxF = MaxF, cur_fdc = CurFdC,
noFull = NoFull} = Handle,
case catch wrap_ext_log(FName, CurF, MaxF, CurCnt, Head) of
{NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} ->
Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF,
cur_name = NewFileName,
cur_cnt = Nh, acc_cnt = AccCnt + CurCnt,
maxF = NewMaxF, firstPos = FirstPos,
curB = FirstPos, noFull = NoFull + 1},
case catch fclose(CurFdC, FileName) of
ok ->
{ok, Handle1, Lost};
Error -> % Error in the last file, new file opened.
{error, Error, Handle1}
end;
Error ->
{error, Error, Handle}
end.
%% -> {ok, handle(), Logged, Lost, NoWraps} | {ok, handle(), Logged}
%% | {error, Error, handle(), Logged, Lost}
%% The returned handle is not always valid -
%% something may have been written before things went wrong.
mf_ext_log(Handle, Bins, Head) ->
mf_ext_log(Handle, Bins, Head, 0, []).
mf_ext_log(Handle, [], _Head, No, []) ->
{ok, Handle, No};
mf_ext_log(Handle, [], _Head, No, Wraps0) ->
Wraps = reverse(Wraps0),
{ok, Handle, No, sum(Wraps), Wraps};
mf_ext_log(Handle, Bins, Head, No0, Wraps) ->
#handle{curB = CurB, maxB = MaxB, cur_name = FileName, cur_fdc = CurFdC,
firstPos = FirstPos0, cur_cnt = CurCnt} = Handle,
{FirstBins, LastBins, NoBytes, N} =
ext_split_bins(CurB, MaxB, FirstPos0, Bins),
case FirstBins of
[] ->
#handle{filename = FName, curF = CurF, maxF = MaxF,
acc_cnt = AccCnt, noFull = NoFull} = Handle,
case catch wrap_ext_log(FName, CurF, MaxF, CurCnt, Head) of
{NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost} ->
Handle1 = Handle#handle{cur_fdc = NewFdC, curF = NewF,
cur_cnt = Nh,
cur_name = NewFileName,
acc_cnt = AccCnt + CurCnt,
maxF = NewMaxF,
curB = FirstPos,
firstPos = FirstPos,
noFull = NoFull + 1},
case catch fclose(CurFdC, FileName) of
ok ->
mf_ext_log(Handle1, Bins, Head, No0 + Nh,
[Lost | Wraps]);
Error ->
Lost1 = Lost + sum(Wraps),
{error, Error, Handle1, No0 + Nh, Lost1}
end;
Error ->
{error, Error, Handle, No0, sum(Wraps)}
end;
_ ->
case fwrite(CurFdC, FileName, FirstBins, NoBytes) of
{ok, NewCurFdC} ->
Handle1 = Handle#handle{cur_fdc = NewCurFdC,
curB = CurB + NoBytes,
cur_cnt = CurCnt + N},
mf_ext_log(Handle1, LastBins, Head, No0 + N, Wraps);
{Error, NewCurFdC} ->
Handle1 = Handle#handle{cur_fdc = NewCurFdC},
{error, Error, Handle1, No0, sum(Wraps)}
end
end.
wrap_ext_log(FName, CurF, MaxF, CurCnt, Head) ->
{NewF, NewMaxF} = inc_wrap(FName, CurF, MaxF),
{ok, NewFdC, NewFileName, Lost, {Nh, FirstPos}, _FileSize} =
ext_file_open(FName, NewF, CurF, CurCnt, Head),
{NewF, NewMaxF, NewFdC, NewFileName, Nh, FirstPos, Lost}.
%% -> ok | throw(FileError)
mf_ext_close(#handle{filename = FName, curF = CurF,
cur_fdc = CurFdC, cur_cnt = CurCnt}, Mode) ->
Res = (catch fclose(CurFdC, FName)),
write_index_file(Mode, FName, CurF, CurF, CurCnt),
Res.
%% -> {ok, handle()} | throw(FileError)
change_size_wrap(Handle, {NewMaxB, NewMaxF}, Version) ->
FName = Handle#handle.filename,
{_MaxB, MaxF} = get_wrap_size(Handle),
write_size_file(read_write, FName, NewMaxB, NewMaxF, Version),
if
NewMaxF > MaxF ->
remove_files(FName, MaxF + 1, NewMaxF),
{ok, Handle#handle{maxB = NewMaxB, maxF = NewMaxF}};
NewMaxF < MaxF ->
{ok, Handle#handle{maxB = NewMaxB, maxF = {NewMaxF, MaxF}}};
true ->
{ok, Handle#handle{maxB = NewMaxB, maxF = NewMaxF}}
end.
%%-----------------------------------------------------------------
%% Misc functions
%%-----------------------------------------------------------------
%% -> {ok, FdC, FileName, Lost, HeadSize, FileSize} | throw(Error)
int_file_open(FName, NewFile, OldFile, OldCnt, Head) ->
Repair = truncate, Mode = read_write,
int_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode).
%% -> {ok, FdC, FileName, Lost, HeadSize, FileSize}
%% | {repaired, FdC, FileName, Rec, Bad, FileSize}
%% | throw(Error)
int_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode) ->
N = add_ext(FName, NewFile),
case int_open(N, Repair, Mode, Head) of
{ok, {_Alloc, FdC, HeadSize, FileSize}} ->
Lost = write_index_file(Mode, FName, NewFile, OldFile, OldCnt),
{ok, FdC, N, Lost, HeadSize, FileSize};
{repaired, FdC, Recovered, BadBytes, FileSize} ->
write_index_file(Mode, FName, NewFile, OldFile, OldCnt),
{repaired, FdC, N, Recovered, BadBytes, FileSize}
end.
%% -> {ok, FdC, FileName, Lost, HeadSize, FileSize} | throw(Error)
ext_file_open(FName, NewFile, OldFile, OldCnt, Head) ->
Repair = truncate, Mode = read_write,
ext_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode).
ext_file_open(FName, NewFile, OldFile, OldCnt, Head, Repair, Mode) ->
FileName = add_ext(FName, NewFile),
{ok, {_Alloc, FdC, HeadSize, FileSize}} =
ext_open(FileName, Repair, Mode, Head),
Lost = write_index_file(Mode, FName, NewFile, OldFile, OldCnt),
{ok, FdC, FileName, Lost, HeadSize, FileSize}.
%%-----------------------------------------------------------------
%% The old file format for index file (CurFileNo > 0), Version 0:
%%
%% CurFileNo SizeFile1 SizeFile2 ... SizeFileN
%% 1 byte 4 bytes 4 bytes 4 bytes
%%
%% The new file format for index file (NewFormat = 0), version 1:
%%
%% NewFormat CurFileNo SizeFile1 SizeFile2 ... SizeFileN
%% 1 byte 4 bytes 4 bytes 4 bytes
%%
%% The current file format for index file (sizes in bytes), version 2:
%%
%% 0 (1) 0 (4) FileFormatVersion (1) CurFileNo (4) SizeFile1 (8) ...
%%
%% (SizeFileI refers to number of items on the log file.)
%%-----------------------------------------------------------------
-define(index_file_name(F), add_ext(F, "idx")).
read_index_file(truncate, FName, MaxF) ->
remove_files(FName, 2, MaxF),
file:delete(?index_file_name(FName)),
{1, 0, 0, 0};
read_index_file(_, FName, _MaxF) ->
read_index_file(FName).
%% Used by wrap_log_reader.
%% -> {CurFileNo, CurFileSz, TotSz, NoFiles} | throw(FileError)
%% where TotSz does not include CurFileSz.
read_index_file(FName) ->
FileName = ?index_file_name(FName),
case open_read(FileName) of
{ok, Fd} ->
R = case file:read(Fd, ?MAX_CHUNK_SIZE) of
{ok, <<0, 0:32, Version, CurF:32, Tail/binary>>}
when Version =:= ?VERSION,
0 < CurF, CurF < ?MAX_FILES ->
parse_index(CurF, Version, 1, Tail, Fd, 0, 0, 0);
{ok, <<0, CurF:32, Tail/binary>>}
when 0 < CurF, CurF < ?MAX_FILES ->
parse_index(CurF, 1, 1, Tail, Fd, 0, 0, 0);
{ok, <<CurF, Tail/binary>>} when 0 < CurF ->
parse_index(CurF, 1, 1, Tail, Fd, 0, 0, 0);
_ErrorOrEof ->
{1, 0, 0, 0}
end,
file:close(Fd),
R;
_Error ->
{1, 0, 0, 0}
end.
parse_index(CurF, V, CurF, <<CurSz:64, Tail/binary>>, Fd, _, TotSz, NFiles)
when V =:= ?VERSION ->
parse_index(CurF, V, CurF+1, Tail, Fd, CurSz, TotSz, NFiles+1);
parse_index(CurF, V, N, <<Sz:64, Tail/binary>>, Fd, CurSz, TotSz, NFiles)
when V =:= ?VERSION ->
parse_index(CurF, V, N+1, Tail, Fd, CurSz, TotSz + Sz, NFiles+1);
parse_index(CurF, V, CurF, <<CurSz:32, Tail/binary>>, Fd, _, TotSz, NFiles)
when V < ?VERSION ->
parse_index(CurF, V, CurF+1, Tail, Fd, CurSz, TotSz, NFiles+1);
parse_index(CurF, V, N, <<Sz:32, Tail/binary>>, Fd, CurSz, TotSz, NFiles)
when V < ?VERSION ->
parse_index(CurF, V, N+1, Tail, Fd, CurSz, TotSz + Sz, NFiles+1);
parse_index(CurF, V, N, B, Fd, CurSz, TotSz, NFiles) ->
case file:read(Fd, ?MAX_CHUNK_SIZE) of
eof when 0 =:= byte_size(B) ->
{CurF, CurSz, TotSz, NFiles};
{ok, Bin} ->
NewB = list_to_binary([B, Bin]),
parse_index(CurF, V, N, NewB, Fd, CurSz, TotSz, NFiles);
_ErrorOrEof ->
{1, 0, 0, 0}
end.
%% Returns: Number of lost items (if an old file was truncated)
%% -> integer() | throw(FileError)
write_index_file(read_only, _FName, _NewFile, _OldFile, _OldCnt) ->
0;
write_index_file(read_write, FName, NewFile, OldFile, OldCnt) ->
FileName = ?index_file_name(FName),
case open_update(FileName) of
{ok, Fd} ->
{Offset, SzSz} =
case file:read(Fd, 6) of
eof ->
Bin = <<0, 0:32, ?VERSION, NewFile:32>>,
fwrite_close2(Fd, FileName, Bin),
{10, 8};
{ok, <<0, 0:32, _Version>>} ->
pwrite_close2(Fd, FileName, 6, <<NewFile:32>>),
{10, 8};
{ok, <<0, _/binary>>} ->
pwrite_close2(Fd, FileName, 1, <<NewFile:32>>),
{5, 4};
{ok, <<_,_/binary>>} ->
%% Very old format, convert to the latest format!
case file:read_file(FileName) of
{ok, <<_CurF, Tail/binary>>} ->
position_close2(Fd, FileName, bof),
Bin = <<0, 0:32, ?VERSION, NewFile:32>>,
NewTail = to_8_bytes(Tail, [], FileName, Fd),
fwrite_close2(Fd, FileName, [Bin | NewTail]),
{10, 8};
Error ->
file_error_close(Fd, FileName, Error)
end;
Error ->
file_error_close(Fd, FileName, Error)
end,
NewPos = Offset + (NewFile - 1)*SzSz,
OldCntBin = <<OldCnt:SzSz/unit:8>>,
if
OldFile > 0 ->
R = file:pread(Fd, NewPos, SzSz),
OldPos = Offset + (OldFile - 1)*SzSz,
pwrite_close2(Fd, FileName, OldPos, OldCntBin),
file:close(Fd),
case R of
{ok, <<Lost:SzSz/unit:8>>} -> Lost;
{ok, _} ->
throw({error, {invalid_index_file, FileName}});
eof -> 0;
Error2 -> file_error(FileName, Error2)
end;
true ->
pwrite_close2(Fd, FileName, NewPos, OldCntBin),
file:close(Fd),
0
end;
E ->
file_error(FileName, E)
end.
to_8_bytes(<<N:32,T/binary>>, NT, FileName, Fd) ->
to_8_bytes(T, [NT | <<N:64>>], FileName, Fd);
to_8_bytes(B, NT, _FileName, _Fd) when byte_size(B) =:= 0 ->
NT;
to_8_bytes(_B, _NT, FileName, Fd) ->
file:close(Fd),
throw({error, {invalid_index_file, FileName}}).
%% -> ok | throw(FileError)
index_file_trunc(FName, N) ->
FileName = ?index_file_name(FName),
case open_update(FileName) of
{ok, Fd} ->
case file:read(Fd, 6) of
eof ->
file:close(Fd),
ok;
{ok, <<0, 0:32, Version>>} when Version =:= ?VERSION ->
truncate_index_file(Fd, FileName, 10, 8, N);
{ok, <<0, _/binary>>} ->
truncate_index_file(Fd, FileName, 5, 4, N);
{ok, <<_, _/binary>>} -> % cannot happen
truncate_index_file(Fd, FileName, 1, 4, N);
Error ->
file_error_close(Fd, FileName, Error)
end;
Error ->
file_error(FileName, Error)
end.
truncate_index_file(Fd, FileName, Offset, N, SzSz) ->
Pos = Offset + N*SzSz,
case Pos > file_size(FileName) of
true ->
file:close(Fd);
false ->
truncate_at_close2(Fd, FileName, {bof, Pos}),
file:close(Fd)
end,
ok.
print_index_file(File) ->
io:format("-- Index begin --~n"),
case file:read_file(File) of
{ok, <<0, 0:32, Version, CurF:32, Tail/binary>>}
when Version =:= ?VERSION, 0 < CurF, CurF < ?MAX_FILES ->
io:format("cur file: ~w~n", [CurF]),
loop_index(1, Version, Tail);
{ok, <<0, CurF:32, Tail/binary>>} when 0 < CurF, CurF < ?MAX_FILES ->
io:format("cur file: ~w~n", [CurF]),
loop_index(1, 1, Tail);
{ok, <<CurF, Tail/binary>>} when 0 < CurF ->
io:format("cur file: ~w~n", [CurF]),
loop_index(1, 1, Tail);
_Else ->
ok
end,
io:format("-- end --~n").
loop_index(N, V, <<Sz:64, Tail/binary>>) when V =:= ?VERSION ->
io:format(" ~p items: ~w~n", [N, Sz]),
loop_index(N+1, V, Tail);
loop_index(N, V, <<Sz:32, Tail/binary>>) when V < ?VERSION ->
io:format(" ~p items: ~w~n", [N, Sz]),
loop_index(N+1, V, Tail);
loop_index(_, _, _) ->
ok.
-define(size_file_name(F), add_ext(F, "siz")).
%% Version 0: no size file
%% Version 1: <<MaxSize:32, MaxFiles:32>>
%% Version 2: <<Version:8, MaxSize:64, MaxFiles:32>>
%% -> ok | throw(FileError)
write_size_file(read_only, _FName, _NewSize, _NewMaxFiles, _Version) ->
ok;
write_size_file(read_write, FName, NewSize, NewMaxFiles, Version) ->
FileName = ?size_file_name(FName),
Bin = if
Version =:= ?VERSION ->
<<Version, NewSize:64, NewMaxFiles:32>>;
true ->
<<NewSize:32, NewMaxFiles:32>>
end,
case file:write_file(FileName, Bin) of
ok ->
ok;
E ->
file_error(FileName, E)
end.
%% -> {NoBytes, NoFiles}.
read_size_file(FName) ->
{Size,_Version} = read_size_file_version(FName),
Size.
%% -> {{NoBytes, NoFiles}, Version}, Version = integer() | undefined
read_size_file_version(FName) ->
case file:read_file(?size_file_name(FName)) of
{ok, <<Version, Size:64, MaxFiles:32>>} when Version =:= ?VERSION ->
{{Size, MaxFiles}, Version};
{ok, <<Size:32, MaxFiles:32>>} ->
{{Size, MaxFiles}, 1};
_ ->
%% The oldest version too...
{{0, 0}, ?VERSION}
end.
conv({More, Terms}, FileNo) when is_record(More, continuation) ->
Cont = More#continuation{pos = {FileNo, More#continuation.pos}},
{Cont, Terms};
conv({More, Terms, Bad}, FileNo) when is_record(More, continuation) ->
Cont = More#continuation{pos = {FileNo, More#continuation.pos}},
{Cont, Terms, Bad};
conv(Other, _) ->
Other.
find_first_file(#handle{filename = FName, curF = CurF, maxF = MaxF}) ->
fff(FName, inc(CurF, MaxF), CurF, MaxF).
fff(_FName, CurF, CurF, _MaxF) -> CurF;
fff(FName, MaybeFirstF, CurF, MaxF) ->
N = add_ext(FName, MaybeFirstF),
case file:read_file_info(N) of
{ok, _} -> MaybeFirstF;
_ -> fff(FName, inc(MaybeFirstF, MaxF), CurF, MaxF)
end.
%% -> {iolist(), LastBins, NoBytes, NoTerms}
ext_split_bins(CurB, MaxB, FirstPos, Bins) ->
MaxBs = MaxB - CurB, IsFirst = CurB =:= FirstPos,
ext_split_bins(MaxBs, IsFirst, [], Bins, 0, 0).
ext_split_bins(MaxBs, IsFirst, First, [X | Last], Bs, N) ->
NBs = Bs + byte_size(X),
if
NBs =< MaxBs ->
ext_split_bins(MaxBs, IsFirst, [First | X], Last, NBs, N+1);
IsFirst, First =:= [] ->
% To avoid infinite loop - we allow the file to be
% too big if it's just one item on the file.
{[X], Last, NBs, N+1};
true ->
{First, [X | Last], Bs, N}
end;
ext_split_bins(_, _, First, [], Bs, N) ->
{First, [], Bs, N}.
%% -> {iolist(), LastBins, NoBytes, NoTerms}
int_split_bins(CurB, MaxB, FirstPos, Bins) ->
MaxBs = MaxB - CurB, IsFirst = CurB =:= FirstPos,
int_split_bins(MaxBs, IsFirst, [], Bins, 0, 0).
int_split_bins(MaxBs, IsFirst, First, [X | Last], Bs, N) ->
Sz = byte_size(X),
NBs = Bs + Sz + ?HEADERSZ,
BSz = <<Sz:?SIZESZ/unit:8>>,
XB = case Sz < ?MIN_MD5_TERM of
true ->
[BSz, ?BIGMAGICHEAD | X];
false ->
MD5 = erlang:md5(BSz),
[BSz, ?BIGMAGICHEAD, MD5 | X]
end,
if
NBs =< MaxBs ->
int_split_bins(MaxBs, IsFirst, [First | XB], Last, NBs, N+1);
IsFirst, First =:= [] ->
% To avoid infinite loop - we allow the file to be
% too big if it's just one item on the file.
{[XB], Last, NBs, N+1};
true ->
{First, [X | Last], Bs, N}
end;
int_split_bins(_, _, First, [], Bs, N) ->
{First, [], Bs, N}.
%% -> {NewCurrentFileNo, MaxFilesToBe} | throw(FileError)
inc_wrap(FName, CurF, MaxF) ->
case MaxF of
%% Number of max files has changed
{NewMaxF, OldMaxF} ->
if
CurF >= NewMaxF ->
%% We are at or above the new number of files
remove_files(FName, CurF + 1, OldMaxF),
if
CurF > NewMaxF ->
%% The change was done while the current file was
%% greater than the new number of files.
%% The index file is not trunctated here, since
%% writing the index file while opening the file
%% with index 1 will write the value for the file
%% with extension CurF as well. Next time the
%% limit is reached, the index file will be
%% truncated.
{1, {NewMaxF, CurF}};
true ->
%% The change was done while the current file was
%% less than the new number of files.
%% Remove the files from the index file too
index_file_trunc(FName, NewMaxF),
{1, NewMaxF}
end;
true ->
%% We haven't reached the new limit yet
NewFt = inc(CurF, NewMaxF),
{NewFt, MaxF}
end;
MaxF ->
%% Normal case.
NewFt = inc(CurF, MaxF),
{NewFt, MaxF}
end.
inc(N, {_NewMax, OldMax}) -> inc(N, OldMax, 1);
inc(N, Max) -> inc(N, Max, 1).
inc(N, Max, Step) ->
Nx = (N + Step) rem Max,
if
Nx > 0 -> Nx;
true -> Nx + Max
end.
file_size(Fname) ->
{ok, Fi} = file:read_file_info(Fname),
Fi#file_info.size.
%% -> ok | throw(FileError)
%% Tries to remove each file with name FName.I, N<=I<=Max.
remove_files(FName, N, Max) ->
remove_files(FName, N, Max, ok).
remove_files(_FName, N, Max, ok) when N > Max ->
ok;
remove_files(_FName, N, Max, {FileName, Error}) when N > Max ->
file_error(FileName, Error);
remove_files(FName, N, Max, Reply) ->
FileName = add_ext(FName, N),
NewReply = case file:delete(FileName) of
ok -> Reply;
{error, enoent} -> Reply;
Error -> {FileName, Error}
end,
remove_files(FName, N + 1, Max, NewReply).
%% -> {MaxBytes, MaxFiles}
get_wrap_size(#handle{maxB = MaxB, maxF = MaxF}) ->
case MaxF of
{NewMaxF,_} -> {MaxB, NewMaxF};
MaxF -> {MaxB, MaxF}
end.
add_ext(Name, Ext) ->
concat([Name, ".", Ext]).
open_read(FileName) ->
file:open(FileName, [raw, binary, read]).
open_update(FileName) ->
file:open(FileName, [raw, binary, read, write]).
open_truncate(FileName) ->
file:open(FileName, [raw, binary, write]).
%%% Functions that access files, and throw on error.
-define(MAX, 16384). % bytes
-define(TIMEOUT, 2000). % ms
%% -> {Reply, cache()}; Reply = ok | Error
fwrite(#cache{c = []} = FdC, _FN, B, Size) ->
case get(write_cache_timer_is_running) of
true ->
ok;
_ ->
put(write_cache_timer_is_running, true),
erlang:send_after(?TIMEOUT, self(), {self(), write_cache})
end,
{ok, FdC#cache{sz = Size, c = B}};
fwrite(#cache{sz = Sz, c = C} = FdC, _FN, B, Size) when Sz < ?MAX ->
{ok, FdC#cache{sz = Sz+Size, c = [C | B]}};
fwrite(#cache{fd = Fd, c = C}, FileName, B, _Size) ->
write_cache(Fd, FileName, [C | B]).
fwrite_header(Fd, B, Size) ->
{ok, #cache{fd = Fd, sz = Size, c = B}}.
%% -> {NewFdC, Reply}; Reply = ok | Error
pread(#cache{fd = Fd, c = C}, FileName, Position, MaxBytes) ->
Reply = write_cache(Fd, FileName, C),
case Reply of
{ok, NewFdC} ->
case file:pread(Fd, Position, MaxBytes) of
{error, Error} ->
{NewFdC, catch file_error(FileName, {error, Error})};
R ->
{NewFdC, R}
end;
{Error, NewFdC} ->
{NewFdC, Error}
end.
%% -> {ok, cache(), Pos} | {Error, cache()}
position(#cache{fd = Fd, c = C}, FileName, Pos) ->
Reply = write_cache(Fd, FileName, C),
case Reply of
{ok, NewFdC} ->
case position2(Fd, FileName, Pos) of
{ok, Loc} ->
{ok, NewFdC, Loc};
Error ->
{Error, NewFdC}
end;
_Error ->
Reply
end.
position_close(#cache{fd = Fd, c = C}, FileName, Pos) ->
NewFdC = write_cache_close(Fd, FileName, C),
{ok, Loc} = position_close2(Fd, FileName, Pos),
{NewFdC, Loc}.
fsync(#cache{fd = Fd, c = C}, FileName) ->
Reply = write_cache(Fd, FileName, C),
case Reply of
{ok, NewFdC} ->
case file:sync(Fd) of
ok ->
Reply;
Error ->
{catch file_error(FileName, Error), NewFdC}
end;
_Error ->
Reply
end.
%% -> {Reply, NewFdC}; Reply = ok | Error
truncate_at(FdC, FileName, Pos) ->
case position(FdC, FileName, Pos) of
{ok, NewFdC, _Pos} ->
case file:truncate(NewFdC#cache.fd) of
ok ->
{ok, NewFdC};
Error ->
{catch file_error(FileName, Error), NewFdC}
end;
Reply ->
Reply
end.
fwrite_close2(Fd, FileName, B) ->
case file:write(Fd, B) of
ok -> ok;
Error -> file_error_close(Fd, FileName, Error)
end.
pwrite_close2(Fd, FileName, Position, B) ->
case file:pwrite(Fd, Position, B) of
ok -> ok;
{error,Error} -> file_error(FileName, {error, Error})
end.
position2(Fd, FileName, Pos) ->
case file:position(Fd, Pos) of
{error, Error} -> catch file_error(FileName, {error, Error});
OK -> OK
end.
position_close2(Fd, FileName, Pos) ->
case file:position(Fd, Pos) of
{error, Error} -> file_error_close(Fd, FileName, {error, Error});
OK -> OK
end.
truncate_at_close2(Fd, FileName, Pos) ->
position_close2(Fd, FileName, Pos),
case file:truncate(Fd) of
ok -> ok;
Error -> file_error_close(Fd, FileName, Error)
end.
fclose(#cache{fd = Fd, c = C}, FileName) ->
%% The cache is empty if the file was opened in read_only mode.
write_cache_close(Fd, FileName, C),
file:close(Fd).
%% -> {Reply, #cache{}}; Reply = ok | Error
write_cache(Fd, _FileName, []) ->
{ok, #cache{fd = Fd}};
write_cache(Fd, FileName, C) ->
case file:write(Fd, C) of
ok -> {ok, #cache{fd = Fd}};
Error -> {catch file_error(FileName, Error), #cache{fd = Fd}}
end.
-spec write_cache_close(file:fd(), file:filename(), iodata()) -> #cache{}. % | throw(Error)
write_cache_close(Fd, _FileName, []) ->
#cache{fd = Fd};
write_cache_close(Fd, FileName, C) ->
case file:write(Fd, C) of
ok -> #cache{fd = Fd};
Error -> file_error_close(Fd, FileName, Error)
end.
-spec file_error(file:filename(), {'error', file:posix()}) -> no_return().
file_error(FileName, {error, Error}) ->
throw({error, {file_error, FileName, Error}}).
-spec file_error_close(file:fd(), file:filename(), {'error', file:posix()}) -> no_return().
file_error_close(Fd, FileName, {error, Error}) ->
file:close(Fd),
throw({error, {file_error, FileName, Error}}).