%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1998-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% Read wrap files with internal format -module(wrap_log_reader). %%-define(debug, true). -ifdef(debug). -define(FORMAT(P, A), io:format(P, A)). -else. -define(FORMAT(P, A), ok). -endif. -export([open/1, open/2, chunk/1, chunk/2, close/1]). -export_type([continuation/0]). -include("disk_log.hrl"). -record(wrap_reader, {fd :: file:fd(), cont :: dlog_cont(), % disk_log's continuation record file :: file:filename(), % file name without extension file_no :: non_neg_integer(), % current file number mod_time :: file:date_time(), % modification time of current file first_no :: non_neg_integer() | 'one' % first read file number }). -opaque continuation() :: #wrap_reader{}. %% %% Exported functions %% %% A special case to be handled when appropriate: if current file %% number is one greater than number of files then the max file number %% is not yet reached, we are on the first 'round' of filling the wrap %% files. -type open_ret() :: {'ok', Continuation :: continuation()} | {'error', Reason :: tuple()}. -spec open(Filename) -> open_ret() when Filename :: string() | atom(). open(File) when is_atom(File) -> open(atom_to_list(File)); open(File) when is_list(File) -> case read_index_file(File) of %% The special case described above. {ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} when CurFileNo =:= NoOfFiles + 1 -> FileNo = 1, ?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), open_int(File, FileNo, FileNo); {ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} -> FileNo = case (CurFileNo + 1) rem NoOfFiles of 0 -> NoOfFiles; No -> No end, ?FORMAT("open from ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), open_int(File, FileNo, FileNo); Error -> Error end. -spec open(Filename, N) -> open_ret() when Filename :: string() | atom(), N :: integer(). open(File, FileNo) when is_atom(File), is_integer(FileNo) -> open(atom_to_list(File), FileNo); open(File, FileNo) when is_list(File), is_integer(FileNo) -> case read_index_file(File) of {ok, {_CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} when NoOfFiles >= FileNo -> ?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", [FileNo, _CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), open_int(File, FileNo, one); %% The special case described above. {ok, {CurFileNo, _CurFileSz, _TotSz, NoOfFiles}} when CurFileNo =:= FileNo, CurFileNo =:= NoOfFiles +1 -> ?FORMAT("open file ~p Cur = ~p, Sz = ~p, Tot = ~p, NoFiles = ~p~n", [FileNo, CurFileNo, _CurFileSz, _TotSz, NoOfFiles]), open_int(File, FileNo, one); {ok, {_CurFileNo, _CurFileSz, _TotSz, _NoOfFiles}} -> {error, {file_not_found, add_ext(File, FileNo)}}; Error -> Error end. -spec close(Continuation) -> 'ok' | {'error', Reason} when Continuation :: continuation(), Reason :: file:posix(). close(#wrap_reader{fd = FD}) -> file:close(FD). -type chunk_ret() :: {Continuation2, Terms :: [term()]} | {Continuation2, Terms :: [term()], Badbytes :: non_neg_integer()} | {Continuation2, 'eof'} | {'error', Reason :: term()}. -spec chunk(Continuation) -> chunk_ret() when Continuation :: continuation(). chunk(WR = #wrap_reader{}) -> chunk(WR, ?MAX_CHUNK_SIZE, 0). -spec chunk(Continuation, N) -> chunk_ret() when Continuation :: continuation(), N :: infinity | pos_integer(). chunk(WR = #wrap_reader{}, infinity) -> chunk(WR, ?MAX_CHUNK_SIZE, 0); chunk(WR = #wrap_reader{}, N) when is_integer(N), N > 0 -> chunk(WR, N, 0). %% %% Local functions %% open_int(File, FileNo, FirstFileNo) -> FName = add_ext(File, FileNo), case file:open(FName, [raw, binary, read]) of {ok, Fd} -> %% File exists case file:read(Fd, ?HEADSZ) of {ok, Head} -> case disk_log_1:is_head(Head) of no -> _ = file:close(Fd), {error, {not_a_log_file, FName}}; _ -> % yes or yes_not_closed case last_mod_time(FName) of {ok, ModTime} -> WR = #wrap_reader{fd = Fd, cont = start, file = File, file_no = FileNo, mod_time = ModTime, first_no = FirstFileNo}, {ok, WR}; {error, E} -> _ = file:close(Fd), {error, {file_error, FName, E}} end end; _Other -> _ = file:close(Fd), {error, {not_a_log_file, FName}} end; _Other -> {error, {not_a_log_file, FName}} end. chunk(WR, N, Bad) -> #wrap_reader{fd = Fd, cont = Continue, file = File, file_no = CurFileNo, first_no = FirstFileNo} = WR, case read_a_chunk(Fd, N, Continue, add_ext(File, CurFileNo)) of eof -> case FirstFileNo of one -> {WR, eof}; _Else -> chunk_at_eof(WR, N, Bad) end; {ContOut, [], BadBytes} -> ?FORMAT("chunk: empty chunk read, ~p bad bytes~n", [BadBytes]), chunk(WR#wrap_reader{cont = ContOut}, N, Bad + BadBytes); {ContOut, Chunk, BadBytes} when Bad + BadBytes =:= 0 -> {WR#wrap_reader{cont = ContOut}, Chunk}; {ContOut, Chunk, BadBytes} -> ?FORMAT("chunk: total of ~p bad bytes~n", [BadBytes]), {WR#wrap_reader{cont = ContOut}, Chunk, Bad + BadBytes}; Error -> Error end. read_a_chunk(Fd, N, start, FileName) -> read_a_chunk(Fd, FileName, 0, [], N); read_a_chunk(Fd, N, More, FileName) -> Pos = More#continuation.pos, B = More#continuation.b, read_a_chunk(Fd, FileName, Pos, B, N). read_a_chunk(Fd, FileName, Pos, B, N) -> R = disk_log_1:chunk_read_only(Fd, FileName, Pos, B, N), %% Create terms from the binaries returned from chunk_read_only/5. %% 'foo' will do here since Log is not used in read-only mode. Log = foo, case disk_log:ichunk_end(R, Log) of {C = #continuation{}, S} -> {C, S, 0}; Else -> Else end. chunk_at_eof(WR, N, Bad) -> #wrap_reader{file = File, file_no = CurFileNo, first_no = FirstFileNo} = WR, case read_index_file(File) of {ok, IndexFile} -> {_, _, _, NoOfFiles} = IndexFile, NewFileNo = case (CurFileNo + 1) rem NoOfFiles of %% The special case described above. _ when CurFileNo > NoOfFiles -> 1; 0 when NoOfFiles > 1 -> NoOfFiles; No when CurFileNo =:= NoOfFiles -> FileName = add_ext(File, CurFileNo+1), case file:read_file_info(FileName) of {ok, _} -> CurFileNo + 1; _ -> No end; No -> No end, ?FORMAT("chunk: at eof, index file: ~p, FirstFileNo: ~p, " "CurFileNo: ~p, NoOfFiles: ~p, NewFileNo: ~p~n", [IndexFile, FirstFileNo, CurFileNo, NoOfFiles, NewFileNo]), case {FirstFileNo, NewFileNo} of {_, 0} -> {WR, eof}; {_, FirstFileNo} -> {WR, eof}; _ -> read_next_file(WR, N, NewFileNo, Bad) end; Error -> Error end. %% Read the index file for the File %% -> {ok, {CurFileNo, CurFileSz, TotSz, NoOfFiles}} | {error, Reason} read_index_file(File) -> case catch disk_log_1:read_index_file(File) of {1, 0, 0, 0} -> {error, {index_file_not_found, File}}; {error, _Reason} -> {error, {index_file_not_found, File}}; FileData -> {ok, FileData} end. %% When reading all the index files, this function closes the previous %% index file and opens the next one. read_next_file(WR, N, NewFileNo, Bad) -> #wrap_reader{file = File, file_no = CurFileNo, mod_time = ModTime, first_no = FirstFileNo} = WR, %% If current file were closed here, then WR would be in a strange %% state should an error occur below. case last_mod_time(add_ext(File, NewFileNo)) of {ok, NewModTime} -> OldMT = calendar:datetime_to_gregorian_seconds(ModTime), NewMT = calendar:datetime_to_gregorian_seconds(NewModTime), Diff = NewMT - OldMT, ?FORMAT("next: now = ~p~n last mtime = ~p~n" " mtime = ~p~n diff = ~p~n", [calendar:local_time(), ModTime, NewModTime, Diff]), if Diff < 0 -> %% The file to be read is older than the one just finished. {error, {is_wrapped, add_ext(File, CurFileNo)}}; true -> case open_int(File, NewFileNo, FirstFileNo) of {ok, NWR} -> _ = close(WR), %% Now we can safely close the old file. chunk(NWR, N, Bad); Error -> Error end end; {error, EN} -> {error, {file_error, add_ext(File, NewFileNo), EN}} end. %% Get the last modification time of a file last_mod_time(File) -> case file:read_file_info(File) of {ok, FileInfo} -> {ok, FileInfo#file_info.mtime}; E -> {error, E} end. add_ext(File, Ext) -> lists:concat([File, ".", Ext]).