%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1998-2013. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%% Read wrap files with internal format
-module(wrap_log_reader).
%%-define(debug, true).
-ifdef(debug).
-define(FORMAT(P, A), io:format(P, A)).
-else.
-define(FORMAT(P, A), ok).
-endif.
-export([open/1, open/2, chunk/1, chunk/2, close/1]).
-export_type([continuation/0]).
-include("disk_log.hrl").
-record(wrap_reader,
{fd :: file:fd(),
cont :: dlog_cont(), % disk_log's continuation record
file :: file:filename(), % file name without extension
file_no :: non_neg_integer(), % current file number
mod_time :: file:date_time(), % modification time of current file
first_no :: non_neg_integer() | 'one' % first read file number
}).
-opaque continuation() :: #wrap_reader{}.
%%
%% Exported functions
%%
%% A special case to be handled when appropriate: if current file
%% number is one greater than number of files then the max file number
%% is not yet reached, we are on the first 'round' of filling the wrap
%% files.
-type open_ret() :: {'ok', Continuation :: continuation()}
| {'error', Reason :: tuple()}.
-spec open(Filename) -> open_ret() when
Filename :: string() | atom().
open(File) when is_atom(File) ->
open(atom_to_list(File));
open(File) when is_list(File) ->
case read_index_file(File) of
%% The special case described above.
{ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}}
when CurFileNo =:= NoOfFiles + 1 ->
FileNo = 1,
?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
[FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
open_int(File, FileNo, FileNo);
{ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} ->
FileNo = case (CurFileNo + 1) rem NoOfFiles of
0 -> NoOfFiles;
No -> No
end,
?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
[FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
open_int(File, FileNo, FileNo);
Error ->
Error
end.
-spec open(Filename, N) -> open_ret() when
Filename :: string() | atom(),
N :: integer().
open(File, FileNo) when is_atom(File), is_integer(FileNo) ->
open(atom_to_list(File), FileNo);
open(File, FileNo) when is_list(File), is_integer(FileNo) ->
case read_index_file(File) of
{ok, {_CurFileNo, _CurFileSz, _TotSz, NoOfFiles}}
when NoOfFiles >= FileNo ->
?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
[FileNo, _CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
open_int(File, FileNo, one);
%% The special case described above.
{ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}}
when CurFileNo =:= FileNo, CurFileNo =:= NoOfFiles +1 ->
?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n",
[FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]),
open_int(File, FileNo, one);
{ok, {_CurFileNo, _CurFileSz, _TotSz, _NoOfFiles}} ->
{error, {file_not_found, add_ext(File, FileNo)}};
Error ->
Error
end.
-spec close(Continuation) -> 'ok' | {'error', Reason} when
Continuation :: continuation(),
Reason :: file:posix().
close(#wrap_reader{fd = FD}) ->
file:close(FD).
-type chunk_ret() :: {Continuation2, Terms :: [term()]}
| {Continuation2,
Terms :: [term()],
Badbytes :: non_neg_integer()}
| {Continuation2, 'eof'}
| {'error', Reason :: term()}.
-spec chunk(Continuation) -> chunk_ret() when
Continuation :: continuation().
chunk(WR = #wrap_reader{}) ->
chunk(WR, ?MAX_CHUNK_SIZE, 0).
-spec chunk(Continuation, N) -> chunk_ret() when
Continuation :: continuation(),
N :: infinity | pos_integer().
chunk(WR = #wrap_reader{}, infinity) ->
chunk(WR, ?MAX_CHUNK_SIZE, 0);
chunk(WR = #wrap_reader{}, N) when is_integer(N), N > 0 ->
chunk(WR, N, 0).
%%
%% Local functions
%%
open_int(File, FileNo, FirstFileNo) ->
FName = add_ext(File, FileNo),
case file:open(FName, [raw, binary, read]) of
{ok, Fd} -> %% File exists
case file:read(Fd, ?HEADSZ) of
{ok, Head} ->
case disk_log_1:is_head(Head) of
no ->
_ = file:close(Fd),
{error, {not_a_log_file, FName}};
_ -> % yes or yes_not_closed
case last_mod_time(FName) of
{ok, ModTime} ->
WR = #wrap_reader{fd = Fd, cont = start,
file = File,
file_no = FileNo,
mod_time = ModTime,
first_no = FirstFileNo},
{ok, WR};
{error, E} ->
_ = file:close(Fd),
{error, {file_error, FName, E}}
end
end;
_Other ->
_ = file:close(Fd),
{error, {not_a_log_file, FName}}
end;
_Other ->
{error, {not_a_log_file, FName}}
end.
chunk(WR, N, Bad) ->
#wrap_reader{fd = Fd, cont = Continue, file = File, file_no = CurFileNo,
first_no = FirstFileNo} = WR,
case read_a_chunk(Fd, N, Continue, add_ext(File, CurFileNo)) of
eof ->
case FirstFileNo of
one ->
{WR, eof};
_Else ->
chunk_at_eof(WR, N, Bad)
end;
{ContOut, [], BadBytes} ->
?FORMAT("chunk: empty chunk read, ~p bad bytes~n", [BadBytes]),
chunk(WR#wrap_reader{cont = ContOut}, N, Bad + BadBytes);
{ContOut, Chunk, BadBytes} when Bad + BadBytes =:= 0 ->
{WR#wrap_reader{cont = ContOut}, Chunk};
{ContOut, Chunk, BadBytes} ->
?FORMAT("chunk: total of ~p bad bytes~n", [BadBytes]),
{WR#wrap_reader{cont = ContOut}, Chunk, Bad + BadBytes};
Error ->
Error
end.
read_a_chunk(Fd, N, start, FileName) ->
read_a_chunk(Fd, FileName, 0, [], N);
read_a_chunk(Fd, N, More, FileName) ->
Pos = More#continuation.pos,
B = More#continuation.b,
read_a_chunk(Fd, FileName, Pos, B, N).
read_a_chunk(Fd, FileName, Pos, B, N) ->
R = disk_log_1:chunk_read_only(Fd, FileName, Pos, B, N),
%% Create terms from the binaries returned from chunk_read_only/5.
%% 'foo' will do here since Log is not used in read-only mode.
Log = foo,
case disk_log:ichunk_end(R, Log) of
{C = #continuation{}, S} ->
{C, S, 0};
Else ->
Else
end.
chunk_at_eof(WR, N, Bad) ->
#wrap_reader{file = File, file_no = CurFileNo,
first_no = FirstFileNo} = WR,
case read_index_file(File) of
{ok, IndexFile} ->
{_, _, _, NoOfFiles} = IndexFile,
NewFileNo = case (CurFileNo + 1) rem NoOfFiles of
%% The special case described above.
_ when CurFileNo > NoOfFiles -> 1;
0 when NoOfFiles > 1 -> NoOfFiles;
No when CurFileNo =:= NoOfFiles ->
FileName = add_ext(File, CurFileNo+1),
case file:read_file_info(FileName) of
{ok, _} -> CurFileNo + 1;
_ -> No
end;
No -> No
end,
?FORMAT("chunk: at eof, index file: ~p, FirstFileNo: ~p, "
"CurFileNo: ~p, NoOfFiles: ~p, NewFileNo: ~p~n",
[IndexFile, FirstFileNo, CurFileNo,
NoOfFiles, NewFileNo]),
case {FirstFileNo, NewFileNo} of
{_, 0} -> {WR, eof};
{_, FirstFileNo} -> {WR, eof};
_ -> read_next_file(WR, N, NewFileNo, Bad)
end;
Error ->
Error
end.
%% Read the index file for the File
%% -> {ok, {CurFileNo, CurFileSz, TotSz, NoOfFiles}} | {error, Reason}
read_index_file(File) ->
case catch disk_log_1:read_index_file(File) of
{1, 0, 0, 0} ->
{error, {index_file_not_found, File}};
{error, _Reason} ->
{error, {index_file_not_found, File}};
FileData ->
{ok, FileData}
end.
%% When reading all the index files, this function closes the previous
%% index file and opens the next one.
read_next_file(WR, N, NewFileNo, Bad) ->
#wrap_reader{file = File, file_no = CurFileNo,
mod_time = ModTime, first_no = FirstFileNo} = WR,
%% If current file were closed here, then WR would be in a strange
%% state should an error occur below.
case last_mod_time(add_ext(File, NewFileNo)) of
{ok, NewModTime} ->
OldMT = calendar:datetime_to_gregorian_seconds(ModTime),
NewMT = calendar:datetime_to_gregorian_seconds(NewModTime),
Diff = NewMT - OldMT,
?FORMAT("next: now = ~p~n last mtime = ~p~n"
" mtime = ~p~n diff = ~p~n",
[calendar:local_time(), ModTime, NewModTime, Diff]),
if
Diff < 0 ->
%% The file to be read is older than the one just finished.
{error, {is_wrapped, add_ext(File, CurFileNo)}};
true ->
case open_int(File, NewFileNo, FirstFileNo) of
{ok, NWR} ->
_ = close(WR), %% Now we can safely close the old file.
chunk(NWR, N, Bad);
Error ->
Error
end
end;
{error, EN} ->
{error, {file_error, add_ext(File, NewFileNo), EN}}
end.
%% Get the last modification time of a file
last_mod_time(File) ->
case file:read_file_info(File) of
{ok, FileInfo} ->
{ok, FileInfo#file_info.mtime};
E ->
{error, E}
end.
add_ext(File, Ext) ->
lists:concat([File, ".", Ext]).