diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/disk_log.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/disk_log.erl')
-rw-r--r-- | lib/kernel/src/disk_log.erl | 1899 |
1 files changed, 1899 insertions, 0 deletions
diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl new file mode 100644 index 0000000000..7f1b5f9ec6 --- /dev/null +++ b/lib/kernel/src/disk_log.erl @@ -0,0 +1,1899 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(disk_log). + +%% Efficient file based log - process part + +-export([start/0, istart_link/1, + log/2, log_terms/2, blog/2, blog_terms/2, + alog/2, alog_terms/2, balog/2, balog_terms/2, + close/1, lclose/1, lclose/2, sync/1, open/1, + truncate/1, truncate/2, btruncate/2, + reopen/2, reopen/3, breopen/3, inc_wrap_file/1, change_size/2, + change_notify/3, change_header/2, + chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1, + block/1, block/2, unblock/1, info/1, format_error/1, + accessible_logs/0]). + +%% Internal exports +-export([init/2, internal_open/2, + system_continue/3, system_terminate/4, system_code_change/4]). + +%% To be used by disk_log_h.erl (not (yet) in Erlang/OTP) only. +-export([ll_open/1, ll_close/1, do_log/2, do_sync/1, do_info/2]). + +%% To be used by wrap_log_reader only. +-export([ichunk_end/2]). + +%% To be used for debugging only: +-export([pid2name/1]). + +-type dlog_state_error() :: 'ok' | {'error', term()}. + +-record(state, {queue = [], + messages = [], + parent, + server, + cnt = 0 :: non_neg_integer(), + args, + error_status = ok :: dlog_state_error(), + cache_error = ok %% cache write error after timeout + }). + +-include("disk_log.hrl"). + +-define(failure(Error, Function, Arg), + {{failed, Error}, [{?MODULE, Function, Arg}]}). + +%%-define(PROFILE(C), C). +-define(PROFILE(C), void). + +-compile({inline,[{log_loop,4},{log_end_sync,2},{replies,2},{rflat,1}]}). + +%%%---------------------------------------------------------------------- +%%% Contract type specifications +%%%---------------------------------------------------------------------- + +-type bytes() :: binary() | [byte()]. + +-type log() :: term(). % XXX: refine +-type file_error() :: term(). % XXX: refine +-type invalid_header() :: term(). % XXX: refine + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- + +%%----------------------------------------------------------------- +%% This module implements the API, and the processes for each log. +%% There is one process per log. +%%----------------------------------------------------------------- + +-type open_error_rsn() :: 'no_such_log' + | {'badarg', term()} + | {'size_mismatch', dlog_size(), dlog_size()} + | {'arg_mismatch', dlog_optattr(), term(), term()} + | {'name_already_open', log()} + | {'open_read_write', log()} + | {'open_read_only', log()} + | {'need_repair', log()} + | {'not_a_log_file', string()} + | {'invalid_index_file', string()} + | {'invalid_header', invalid_header()} + | {'file_error', file:filename(), file_error()} + | {'node_already_open', log()}. +-type dist_error_rsn() :: 'nodedown' | open_error_rsn(). +-type ret() :: {'ok', log()} + | {'repaired', log(), {'recovered', non_neg_integer()}, + {'badbytes', non_neg_integer()}}. +-type open_ret() :: ret() | {'error', open_error_rsn()}. +-type dist_open_ret() :: {[{node(), ret()}], + [{node(), {'error', dist_error_rsn()}}]}. +-type all_open_ret() :: open_ret() | dist_open_ret(). + +-spec open(Args :: dlog_options()) -> all_open_ret(). +open(A) -> + disk_log_server:open(check_arg(A, #arg{options = A})). + +-type log_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} + | {'format_external', log()} | {'blocked_log', log()} + | {'full', log()} | {'invalid_header', invalid_header()} + | {'file_error', file:filename(), file_error()}. + +-spec log(Log :: log(), Term :: term()) -> 'ok' | {'error', log_error_rsn()}. +log(Log, Term) -> + req(Log, {log, term_to_binary(Term)}). + +-spec blog(Log :: log(), Bytes :: bytes()) -> 'ok' | {'error', log_error_rsn()}. +blog(Log, Bytes) -> + req(Log, {blog, check_bytes(Bytes)}). + +-spec log_terms(Log :: log(), Terms :: [term()]) -> 'ok' | {'error', term()}. +log_terms(Log, Terms) -> + Bs = terms2bins(Terms), + req(Log, {log, Bs}). + +-spec blog_terms(Log :: log(), Bytes :: [bytes()]) -> 'ok' | {'error', term()}. +blog_terms(Log, Bytess) -> + Bs = check_bytes_list(Bytess, Bytess), + req(Log, {blog, Bs}). + +-type notify_ret() :: 'ok' | {'error', 'no_such_log'}. + +-spec alog(Log :: log(), Term :: term()) -> notify_ret(). +alog(Log, Term) -> + notify(Log, {alog, term_to_binary(Term)}). + +-spec alog_terms(Log :: log(), Terms :: [term()]) -> notify_ret(). +alog_terms(Log, Terms) -> + Bs = terms2bins(Terms), + notify(Log, {alog, Bs}). + +-spec balog(Log :: log(), Bytes :: bytes()) -> notify_ret(). +balog(Log, Bytes) -> + notify(Log, {balog, check_bytes(Bytes)}). + +-spec balog_terms(Log :: log(), Bytes :: [bytes()]) -> notify_ret(). +balog_terms(Log, Bytess) -> + Bs = check_bytes_list(Bytess, Bytess), + notify(Log, {balog, Bs}). + +-type close_error_rsn() ::'no_such_log' | 'nonode' + | {'file_error', file:filename(), file_error()}. + +-spec close(Log :: log()) -> 'ok' | {'error', close_error_rsn()}. +close(Log) -> + req(Log, close). + +-type lclose_error_rsn() :: 'no_such_log' + | {'file_error', file:filename(), file_error()}. + +-spec lclose(Log :: log()) -> 'ok' | {'error', lclose_error_rsn()}. +lclose(Log) -> + lclose(Log, node()). + +-spec lclose(Log :: log(), Node :: node()) -> 'ok' | {'error', lclose_error_rsn()}. +lclose(Log, Node) -> + lreq(Log, close, Node). + +-type trunc_error_rsn() :: 'no_such_log' | 'nonode' + | {'read_only_mode', log()} + | {'blocked_log', log()} + | {'invalid_header', invalid_header()} + | {'file_error', file:filename(), file_error()}. + +-spec truncate(Log :: log()) -> 'ok' | {'error', trunc_error_rsn()}. +truncate(Log) -> + req(Log, {truncate, none, truncate, 1}). + +-spec truncate(Log :: log(), Head :: term()) -> 'ok' | {'error', trunc_error_rsn()}. +truncate(Log, Head) -> + req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}). + +-spec btruncate(Log :: log(), Head :: bytes()) -> 'ok' | {'error', trunc_error_rsn()}. +btruncate(Log, Head) -> + req(Log, {truncate, {ok, check_bytes(Head)}, btruncate, 2}). + +-spec reopen(Log :: log(), Filename :: file:filename()) -> 'ok' | {'error', term()}. +reopen(Log, NewFile) -> + req(Log, {reopen, NewFile, none, reopen, 2}). + +-spec reopen(Log :: log(), Filename :: file:filename(), Head :: term()) -> + 'ok' | {'error', term()}. +reopen(Log, NewFile, NewHead) -> + req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}). + +-spec breopen(Log :: log(), Filename :: file:filename(), Head :: bytes()) -> + 'ok' | {'error', term()}. +breopen(Log, NewFile, NewHead) -> + req(Log, {reopen, NewFile, {ok, check_bytes(NewHead)}, breopen, 3}). + +-type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode' + | {'read_only_mode', log()} + | {'blocked_log', log()} | {'halt_log', log()} + | {'invalid_header', invalid_header()} + | {'file_error', file:filename(), file_error()}. + +-spec inc_wrap_file(Log :: log()) -> 'ok' | {'error', inc_wrap_error_rsn()}. +inc_wrap_file(Log) -> + req(Log, inc_wrap_file). + +-spec change_size(Log :: log(), Size :: dlog_size()) -> 'ok' | {'error', term()}. +change_size(Log, NewSize) -> + req(Log, {change_size, NewSize}). + +-spec change_notify(Log :: log(), Pid :: pid(), Notify :: boolean()) -> + 'ok' | {'error', term()}. +change_notify(Log, Pid, NewNotify) -> + req(Log, {change_notify, Pid, NewNotify}). + +-spec change_header(Log :: log(), Head :: {atom(), term()}) -> + 'ok' | {'error', term()}. +change_header(Log, NewHead) -> + req(Log, {change_header, NewHead}). + +-type sync_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} + | {'blocked_log', log()} + | {'file_error', file:filename(), file_error()}. + +-spec sync(Log :: log()) -> 'ok' | {'error', sync_error_rsn()}. +sync(Log) -> + req(Log, sync). + +-type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}. + +-spec block(Log :: log()) -> 'ok' | {'error', block_error_rsn()}. +block(Log) -> + block(Log, true). + +-spec block(Log :: log(), QueueLogRecords :: boolean()) -> 'ok' | {'error', term()}. +block(Log, QueueLogRecords) -> + req(Log, {block, QueueLogRecords}). + +-type unblock_error_rsn() :: 'no_such_log' | 'nonode' + | {'not_blocked', log()} + | {'not_blocked_by_pid', log()}. + +-spec unblock(Log :: log()) -> 'ok' | {'error', unblock_error_rsn()}. +unblock(Log) -> + req(Log, unblock). + +-spec format_error(Error :: term()) -> string(). +format_error(Error) -> + do_format_error(Error). + +-spec info(Log :: log()) -> [{atom(), any()}] | {'error', term()}. +info(Log) -> + sreq(Log, info). + +-spec pid2name(Pid :: pid()) -> {'ok', log()} | 'undefined'. +pid2name(Pid) -> + disk_log_server:start(), + case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of + [] -> undefined; + [{_Pid, Log}] -> {ok, Log} + end. + +%% This function Takes 3 args, a Log, a Continuation and N. +%% It retuns a {Cont2, ObjList} | eof | {error, Reason} +%% The initial continuation is the atom 'start' + +-spec chunk(Log :: log(), Cont :: any()) -> + {'error', term()} | 'eof' | {any(), [any()]} | {any(), [any()], integer()}. +chunk(Log, Cont) -> + chunk(Log, Cont, infinity). + +-spec chunk(Log :: log(), Cont :: any(), N :: pos_integer() | 'infinity') -> + {'error', term()} | 'eof' | {any(), [any()]} | {any(), [any()], integer()}. +chunk(Log, Cont, infinity) -> + %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. + ichunk(Log, Cont, ?MAX_CHUNK_SIZE); +chunk(Log, Cont, N) when is_integer(N), N > 0 -> + ichunk(Log, Cont, N). + +ichunk(Log, start, N) -> + R = sreq(Log, {chunk, 0, [], N}), + ichunk_end(R, Log); +ichunk(Log, More, N) when is_record(More, continuation) -> + R = req2(More#continuation.pid, + {chunk, More#continuation.pos, More#continuation.b, N}), + ichunk_end(R, Log); +ichunk(_Log, _, _) -> + {error, {badarg, continuation}}. + +ichunk_end({C, R}, Log) when is_record(C, continuation) -> + ichunk_end(R, read_write, Log, C, 0); +ichunk_end({C, R, Bad}, Log) when is_record(C, continuation) -> + ichunk_end(R, read_only, Log, C, Bad); +ichunk_end(R, _Log) -> + R. + +%% Create the terms on the client's heap, not the server's. +%% The list of binaries is reversed. +ichunk_end(R, Mode, Log, C, Bad) -> + case catch bins2terms(R, []) of + {'EXIT', _} -> + RR = lists:reverse(R), + ichunk_bad_end(RR, Mode, Log, C, Bad, []); + Ts when Bad > 0 -> + {C, Ts, Bad}; + Ts when Bad =:= 0 -> + {C, Ts} + end. + +bins2terms([], L) -> + L; +bins2terms([B | Bs], L) -> + bins2terms(Bs, [binary_to_term(B) | L]). + +ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) -> + case catch binary_to_term(B) of + {'EXIT', _} when read_write =:= Mode -> + InfoList = info(Log), + {value, {file, FileName}} = lists:keysearch(file, 1, InfoList), + File = case C#continuation.pos of + Pos when is_integer(Pos) -> FileName; % halt log + {FileNo, _} -> add_ext(FileName, FileNo) % wrap log + end, + {error, {corrupt_log_file, File}}; + {'EXIT', _} when read_only =:= Mode -> + Reread = lists:foldl(fun(Bin, Sz) -> + Sz + byte_size(Bin) + ?HEADERSZ + end, 0, Bs), + NewPos = case C#continuation.pos of + Pos when is_integer(Pos) -> Pos-Reread; + {FileNo, Pos} -> {FileNo, Pos-Reread} + end, + NewBad = Bad + byte_size(B), + {C#continuation{pos = NewPos, b = []}, lists:reverse(A), NewBad}; + T -> + ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A]) + end. + +-spec bchunk(Log :: log(), Cont :: any()) -> + {'error', any()} | 'eof' | {any(), [binary()]} | {any(), [binary()], integer()}. +bchunk(Log, Cont) -> + bchunk(Log, Cont, infinity). + +-spec bchunk(Log :: log(), Cont :: any(), N :: 'infinity' | pos_integer()) -> + {'error', any()} | 'eof' | {any(), [binary()]} | {any(), [binary()], integer()}. +bchunk(Log, Cont, infinity) -> + %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. + bichunk(Log, Cont, ?MAX_CHUNK_SIZE); +bchunk(Log, Cont, N) when is_integer(N), N > 0 -> + bichunk(Log, Cont, N). + +bichunk(Log, start, N) -> + R = sreq(Log, {chunk, 0, [], N}), + bichunk_end(R); +bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) -> + R = req2(Pid, {chunk, Pos, B, N}), + bichunk_end(R); +bichunk(_Log, _, _) -> + {error, {badarg, continuation}}. + +bichunk_end({C = #continuation{}, R}) -> + {C, lists:reverse(R)}; +bichunk_end({C = #continuation{}, R, Bad}) -> + {C, lists:reverse(R), Bad}; +bichunk_end(R) -> + R. + +-spec chunk_step(Log :: log(), Cont :: any(), N :: integer()) -> + {'ok', any()} | {'error', term()}. +chunk_step(Log, Cont, N) when is_integer(N) -> + ichunk_step(Log, Cont, N). + +ichunk_step(Log, start, N) -> + sreq(Log, {chunk_step, 0, N}); +ichunk_step(_Log, More, N) when is_record(More, continuation) -> + req2(More#continuation.pid, {chunk_step, More#continuation.pos, N}); +ichunk_step(_Log, _, _) -> + {error, {badarg, continuation}}. + +-spec chunk_info(More :: any()) -> + [{'node', node()},...] | {'error', {'no_continuation', any()}}. +chunk_info(More = #continuation{}) -> + [{node, node(More#continuation.pid)}]; +chunk_info(BadCont) -> + {error, {no_continuation, BadCont}}. + +-spec accessible_logs() -> {[_], [_]}. +accessible_logs() -> + disk_log_server:accessible_logs(). + +istart_link(Server) -> + {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}. + +%% Only for backwards compatibility, could probably be removed. +-spec start() -> 'ok'. +start() -> + disk_log_server:start(). + +internal_open(Pid, A) -> + req2(Pid, {internal_open, A}). + +%%% ll_open() and ll_close() are used by disk_log_h.erl, a module not +%%% (yet) in Erlang/OTP. + +%% -spec ll_open(dlog_options()) -> {'ok', Res :: _, #log{}, Cnt :: _} | Error. +ll_open(A) -> + case check_arg(A, #arg{options = A}) of + {ok, L} -> do_open(L); + Error -> Error + end. + +%% -> closed | throw(Error) +ll_close(Log) -> + close_disk_log2(Log). + +check_arg([], Res) -> + Ret = case Res#arg.head of + none -> + {ok, Res}; + _ -> + case check_head(Res#arg.head, Res#arg.format) of + {ok, Head} -> + {ok, Res#arg{head = Head}}; + Error -> + Error + end + end, + + if %% check result + Res#arg.name =:= 0 -> + {error, {badarg, name}}; + Res#arg.file =:= none -> + case catch lists:concat([Res#arg.name, ".LOG"]) of + {'EXIT',_} -> {error, {badarg, file}}; + FName -> check_arg([], Res#arg{file = FName}) + end; + Res#arg.repair =:= truncate, Res#arg.mode =:= read_only -> + {error, {badarg, repair_read_only}}; + Res#arg.type =:= halt, is_tuple(Res#arg.size) -> + {error, {badarg, size}}; + Res#arg.type =:= wrap -> + {OldSize, Version} = + disk_log_1:read_size_file_version(Res#arg.file), + check_wrap_arg(Ret, OldSize, Version); + true -> + Ret + end; +check_arg([{file, F} | Tail], Res) when is_list(F) -> + check_arg(Tail, Res#arg{file = F}); +check_arg([{file, F} | Tail], Res) when is_atom(F) -> + check_arg(Tail, Res#arg{file = F}); +check_arg([{linkto, Pid} |Tail], Res) when is_pid(Pid) -> + check_arg(Tail, Res#arg{linkto = Pid}); +check_arg([{linkto, none} |Tail], Res) -> + check_arg(Tail, Res#arg{linkto = none}); +check_arg([{name, Name}|Tail], Res) -> + check_arg(Tail, Res#arg{name = Name}); +check_arg([{repair, true}|Tail], Res) -> + check_arg(Tail, Res#arg{repair = true}); +check_arg([{repair, false}|Tail], Res) -> + check_arg(Tail, Res#arg{repair = false}); +check_arg([{repair, truncate}|Tail], Res) -> + check_arg(Tail, Res#arg{repair = truncate}); +check_arg([{size, Int}|Tail], Res) when is_integer(Int), Int > 0 -> + check_arg(Tail, Res#arg{size = Int}); +check_arg([{size, infinity}|Tail], Res) -> + check_arg(Tail, Res#arg{size = infinity}); +check_arg([{size, {MaxB,MaxF}}|Tail], Res) when is_integer(MaxB), + is_integer(MaxF), + MaxB > 0, MaxB =< ?MAX_BYTES, + MaxF > 0, MaxF < ?MAX_FILES -> + check_arg(Tail, Res#arg{size = {MaxB, MaxF}}); +check_arg([{type, wrap}|Tail], Res) -> + check_arg(Tail, Res#arg{type = wrap}); +check_arg([{type, halt}|Tail], Res) -> + check_arg(Tail, Res#arg{type = halt}); +check_arg([{format, internal}|Tail], Res) -> + check_arg(Tail, Res#arg{format = internal}); +check_arg([{format, external}|Tail], Res) -> + check_arg(Tail, Res#arg{format = external}); +check_arg([{distributed, []}|Tail], Res) -> + check_arg(Tail, Res#arg{distributed = false}); +check_arg([{distributed, Nodes}|Tail], Res) when is_list(Nodes) -> + check_arg(Tail, Res#arg{distributed = {true, Nodes}}); +check_arg([{notify, true}|Tail], Res) -> + check_arg(Tail, Res#arg{notify = true}); +check_arg([{notify, false}|Tail], Res) -> + check_arg(Tail, Res#arg{notify = false}); +check_arg([{head_func, HeadFunc}|Tail], Res) -> + check_arg(Tail, Res#arg{head = {head_func, HeadFunc}}); +check_arg([{head, Term}|Tail], Res) -> + check_arg(Tail, Res#arg{head = {head, Term}}); +check_arg([{mode, read_only}|Tail], Res) -> + check_arg(Tail, Res#arg{mode = read_only}); +check_arg([{mode, read_write}|Tail], Res) -> + check_arg(Tail, Res#arg{mode = read_write}); +check_arg(Arg, _) -> + {error, {badarg, Arg}}. + +check_wrap_arg({ok, Res}, {0,0}, _Version) when Res#arg.size =:= infinity -> + {error, {badarg, size}}; +check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.size =:= infinity -> + NewRes = Res#arg{size = OldSize}, + check_wrap_arg({ok, NewRes}, OldSize, Version); +check_wrap_arg({ok, Res}, {0,0}, Version) -> + {ok, Res#arg{version = Version}}; +check_wrap_arg({ok, Res}, OldSize, Version) when OldSize =:= Res#arg.size -> + {ok, Res#arg{version = Version}}; +check_wrap_arg({ok, Res}, _OldSize, Version) when Res#arg.repair =:= truncate, + is_tuple(Res#arg.size) -> + {ok, Res#arg{version = Version}}; +check_wrap_arg({ok, Res}, OldSize, _Version) when is_tuple(Res#arg.size) -> + {error, {size_mismatch, OldSize, Res#arg.size}}; +check_wrap_arg({ok, _Res}, _OldSize, _Version) -> + {error, {badarg, size}}; +check_wrap_arg(Ret, _OldSize, _Version) -> + Ret. + +%%%----------------------------------------------------------------- +%%% Server functions +%%%----------------------------------------------------------------- +init(Parent, Server) -> + ?PROFILE(ep:do()), + process_flag(trap_exit, true), + loop(#state{parent = Parent, server = Server}). + +loop(State) when State#state.messages =:= [] -> + receive + Message -> + handle(Message, State) + end; +loop(State) -> + [M | Ms] = State#state.messages, + handle(M, State#state{messages = Ms}). + +handle({From, write_cache}, S) when From =:= self() -> + case catch do_write_cache(get(log)) of + ok -> + loop(S); + Error -> + loop(S#state{cache_error = Error}) + end; +handle({From, {log, B}}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok, L#log.format =:= internal -> + log_loop(S, From, [B], []); + L when L#log.status =:= ok, L#log.format =:= external -> + reply(From, {error, {format_external, L#log.name}}, S); + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {log, B}} | S#state.queue]}) + end; +handle({From, {blog, B}}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok -> + log_loop(S, From, [B], []); + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {blog, B}} | S#state.queue]}) + end; +handle({alog, B}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + notify_owners({read_only,B}), + loop(S); + L when L#log.status =:= ok, L#log.format =:= internal -> + log_loop(S, [], [B], []); + L when L#log.status =:= ok -> + notify_owners({format_external, B}), + loop(S); + L when L#log.status =:= {blocked, false} -> + notify_owners({blocked_log, B}), + loop(S); + _ -> + loop(S#state{queue = [{alog, B} | S#state.queue]}) + end; +handle({balog, B}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + notify_owners({read_only,B}), + loop(S); + L when L#log.status =:= ok -> + log_loop(S, [], [B], []); + L when L#log.status =:= {blocked, false} -> + notify_owners({blocked_log, B}), + loop(S); + _ -> + loop(S#state{queue = [{balog, B} | S#state.queue]}) + end; +handle({From, {block, QueueLogRecs}}, S) -> + case get(log) of + L when L#log.status =:= ok -> + do_block(From, QueueLogRecs, L), + reply(From, ok, S); + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {block, QueueLogRecs}} | + S#state.queue]}) + end; +handle({From, unblock}, S) -> + case get(log) of + L when L#log.status =:= ok -> + reply(From, {error, {not_blocked, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + S2 = do_unblock(L, S), + reply(From, ok, S2); + L -> + reply(From, {error, {not_blocked_by_pid, L#log.name}}, S) + end; +handle({From, sync}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok -> + sync_loop([From], S); + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, sync} | S#state.queue]}) + end; +handle({From, {truncate, Head, F, A}}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok, S#state.cache_error =/= ok -> + loop(cache_error(S, [From])); + L when L#log.status =:= ok -> + H = merge_head(Head, L#log.head), + case catch do_trunc(L, H) of + ok -> + erase(is_full), + notify_owners({truncated, S#state.cnt}), + N = if Head =:= none -> 0; true -> 1 end, + reply(From, ok, (state_ok(S))#state{cnt = N}); + Error -> + do_exit(S, From, Error, ?failure(Error, F, A)) + end; + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {truncate, Head, F, A}} + | S#state.queue]}) + end; +handle({From, {chunk, Pos, B, N}}, S) -> + case get(log) of + L when L#log.status =:= ok, S#state.cache_error =/= ok -> + loop(cache_error(S, [From])); + L when L#log.status =:= ok -> + R = do_chunk(L, Pos, B, N), + reply(From, R, S); + L when L#log.blocked_by =:= From -> + R = do_chunk(L, Pos, B, N), + reply(From, R, S); + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _L -> + loop(S#state{queue = [{From, {chunk, Pos, B, N}} | S#state.queue]}) + end; +handle({From, {chunk_step, Pos, N}}, S) -> + case get(log) of + L when L#log.status =:= ok, S#state.cache_error =/= ok -> + loop(cache_error(S, [From])); + L when L#log.status =:= ok -> + R = do_chunk_step(L, Pos, N), + reply(From, R, S); + L when L#log.blocked_by =:= From -> + R = do_chunk_step(L, Pos, N), + reply(From, R, S); + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {chunk_step, Pos, N}} + | S#state.queue]}) + end; +handle({From, {change_notify, Pid, NewNotify}}, S) -> + case get(log) of + L when L#log.status =:= ok -> + case do_change_notify(L, Pid, NewNotify) of + {ok, L1} -> + put(log, L1), + reply(From, ok, S); + Error -> + reply(From, Error, S) + end; + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {change_notify, Pid, NewNotify}} + | S#state.queue]}) + end; +handle({From, {change_header, NewHead}}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok -> + case check_head(NewHead, L#log.format) of + {ok, Head} -> + put(log, L#log{head = mk_head(Head, L#log.format)}), + reply(From, ok, S); + Error -> + reply(From, Error, S) + end; + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {change_header, NewHead}} + | S#state.queue]}) + end; +handle({From, {change_size, NewSize}}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok -> + case check_size(L#log.type, NewSize) of + ok -> + case catch do_change_size(L, NewSize) of % does the put + ok -> + reply(From, ok, S); + {big, CurSize} -> + reply(From, + {error, + {new_size_too_small, L#log.name, CurSize}}, + S); + Else -> + reply(From, Else, state_err(S, Else)) + end; + not_ok -> + reply(From, {error, {badarg, size}}, S) + end; + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, {change_size, NewSize}} + | S#state.queue]}) + end; +handle({From, inc_wrap_file}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.type =:= halt -> + reply(From, {error, {halt_log, L#log.name}}, S); + L when L#log.status =:= ok, S#state.cache_error =/= ok -> + loop(cache_error(S, [From])); + L when L#log.status =:= ok -> + case catch do_inc_wrap_file(L) of + {ok, L2, Lost} -> + put(log, L2), + notify_owners({wrap, Lost}), + reply(From, ok, S#state{cnt = S#state.cnt-Lost}); + {error, Error, L2} -> + put(log, L2), + reply(From, Error, state_err(S, Error)) + end; + L when L#log.status =:= {blocked, false} -> + reply(From, {error, {blocked_log, L#log.name}}, S); + L when L#log.blocked_by =:= From -> + reply(From, {error, {blocked_log, L#log.name}}, S); + _ -> + loop(S#state{queue = [{From, inc_wrap_file} | S#state.queue]}) + end; +handle({From, {reopen, NewFile, Head, F, A}}, S) -> + case get(log) of + L when L#log.mode =:= read_only -> + reply(From, {error, {read_only_mode, L#log.name}}, S); + L when L#log.status =:= ok, S#state.cache_error =/= ok -> + loop(cache_error(S, [From])); + L when L#log.status =:= ok, L#log.filename =/= NewFile -> + case catch close_disk_log2(L) of + closed -> + File = L#log.filename, + case catch rename_file(File, NewFile, L#log.type) of + ok -> + H = merge_head(Head, L#log.head), + case do_open((S#state.args)#arg{name = L#log.name, + repair = truncate, + head = H, + file = File}) of + {ok, Res, L2, Cnt} -> + put(log, L2#log{owners = L#log.owners, + head = L#log.head, + users = L#log.users}), + notify_owners({truncated, S#state.cnt}), + erase(is_full), + case Res of + {error, _} -> + do_exit(S, From, Res, + ?failure(Res, F, A)); + _ -> + reply(From, ok, S#state{cnt = Cnt}) + end; + Res -> + do_exit(S, From, Res, ?failure(Res, F, A)) + end; + Error -> + do_exit(S, From, Error, ?failure(Error, reopen, 2)) + end; + Error -> + do_exit(S, From, Error, ?failure(Error, F, A)) + end; + L when L#log.status =:= ok -> + reply(From, {error, {same_file_name, L#log.name}}, S); + L -> + reply(From, {error, {blocked_log, L#log.name}}, S) + end; +handle({Server, {internal_open, A}}, S) -> + case get(log) of + undefined -> + case do_open(A) of % does the put + {ok, Res, L, Cnt} -> + put(log, opening_pid(A#arg.linkto, A#arg.notify, L)), + reply(Server, Res, S#state{args=A, cnt=Cnt}); + Res -> + do_fast_exit(S, Server, Res) + end; + L -> + TestH = mk_head(A#arg.head, A#arg.format), + case compare_arg(A#arg.options, S#state.args, TestH, L#log.head) of + ok -> + case add_pid(A#arg.linkto, A#arg.notify, L) of + {ok, L1} -> + put(log, L1), + reply(Server, {ok, L#log.name}, S); + Error -> + reply(Server, Error, S) + end; + Error -> + reply(Server, Error, S) + end + end; +handle({From, close}, S) -> + case do_close(From, S) of + {stop, S1} -> + do_exit(S1, From, ok, normal); + {continue, S1} -> + reply(From, ok, S1) + end; +handle({From, info}, S) -> + reply(From, do_info(get(log), S#state.cnt), S); +handle({'EXIT', From, Reason}, S) when From =:= S#state.parent -> + %% Parent orders shutdown. + _ = do_stop(S), + exit(Reason); +handle({'EXIT', From, Reason}, S) when From =:= S#state.server -> + %% The server is gone. + _ = do_stop(S), + exit(Reason); +handle({'EXIT', From, _Reason}, S) -> + L = get(log), + case is_owner(From, L) of + {true, _Notify} -> + case close_owner(From, L, S) of + {stop, S1} -> + _ = do_stop(S1), + exit(normal); + {continue, S1} -> + loop(S1) + end; + false -> + %% 'users' is not decremented. + S1 = do_unblock(From, get(log), S), + loop(S1) + end; +handle({system, From, Req}, S) -> + sys:handle_system_msg(Req, From, S#state.parent, ?MODULE, [], S); +handle(_, S) -> + loop(S). + +sync_loop(From, S) -> + log_loop(S, [], [], From). + +%% Inlined. +log_loop(S, Pids, _Bins, _Sync) when S#state.cache_error =/= ok -> + loop(cache_error(S, Pids)); +log_loop(S, Pids, Bins, Sync) when S#state.messages =:= [] -> + receive + Message -> + log_loop(Message, Pids, Bins, Sync, S, get(log)) + after 0 -> + loop(log_end(S, Pids, Bins, Sync)) + end; +log_loop(S, Pids, Bins, Sync) -> + [M | Ms] = S#state.messages, + S1 = S#state{messages = Ms}, + log_loop(M, Pids, Bins, Sync, S1, get(log)). + +%% Items logged after the last sync request found are sync:ed as well. +log_loop({alog,B}, Pids, Bins, Sync, S, L) when L#log.format =:= internal -> + %% {alog, _} allowed for the internal format only. + log_loop(S, Pids, [B | Bins], Sync); +log_loop({balog, B}, Pids, Bins, Sync, S, _L) -> + log_loop(S, Pids, [B | Bins], Sync); +log_loop({From, {log, B}}, Pids, Bins, Sync, S, L) + when L#log.format =:= internal -> + %% {log, _} allowed for the internal format only. + log_loop(S, [From | Pids], [B | Bins], Sync); +log_loop({From, {blog, B}}, Pids, Bins, Sync, S, _L) -> + log_loop(S, [From | Pids], [B | Bins], Sync); +log_loop({From, sync}, Pids, Bins, Sync, S, _L) -> + log_loop(S, Pids, Bins, [From | Sync]); +log_loop(Message, Pids, Bins, Sync, S, _L) -> + NS = log_end(S, Pids, Bins, Sync), + handle(Message, NS). + +log_end(S, [], [], Sync) -> + log_end_sync(S, Sync); +log_end(S, Pids, Bins, Sync) -> + case do_log(get(log), rflat(Bins)) of + N when is_integer(N) -> + replies(Pids, ok), + S1 = (state_ok(S))#state{cnt = S#state.cnt+N}, + log_end_sync(S1, Sync); + {error, {error, {full, _Name}}, N} when Pids =:= [] -> + log_end_sync(state_ok(S#state{cnt = S#state.cnt + N}), Sync); + {error, Error, N} -> + replies(Pids, Error), + state_err(S#state{cnt = S#state.cnt + N}, Error) + end. + +%% Inlined. +log_end_sync(S, []) -> + S; +log_end_sync(S, Sync) -> + Res = do_sync(get(log)), + replies(Sync, Res), + state_err(S, Res). + +%% Inlined. +rflat([B]=L) when is_binary(B) -> L; +rflat([B]) -> B; +rflat(B) -> rflat(B, []). + +rflat([B | Bs], L) when is_binary(B) -> + rflat(Bs, [B | L]); +rflat([B | Bs], L) -> + rflat(Bs, B ++ L); +rflat([], L) -> L. + +%% -> {ok, Log} | {error, Error} +do_change_notify(L, Pid, Notify) -> + case is_owner(Pid, L) of + {true, Notify} -> + {ok, L}; + {true, _OldNotify} when Notify =/= true, Notify =/= false -> + {error, {badarg, notify}}; + {true, _OldNotify} -> + Owners = lists:keydelete(Pid, 1, L#log.owners), + L1 = L#log{owners = [{Pid, Notify} | Owners]}, + {ok, L1}; + false -> + {error, {not_owner, Pid}} + end. + +%% -> {stop, S} | {continue, S} +do_close(Pid, S) -> + L = get(log), + case is_owner(Pid, L) of + {true, _Notify} -> + close_owner(Pid, L, S); + false -> + close_user(Pid, L, S) + end. + +%% -> {stop, S} | {continue, S} +close_owner(Pid, L, S) -> + L1 = L#log{owners = lists:keydelete(Pid, 1, L#log.owners)}, + put(log, L1), + S2 = do_unblock(Pid, get(log), S), + unlink(Pid), + do_close2(L1, S2). + +%% -> {stop, S} | {continue, S} +close_user(Pid, L, S) when L#log.users > 0 -> + L1 = L#log{users = L#log.users - 1}, + put(log, L1), + S2 = do_unblock(Pid, get(log), S), + do_close2(L1, S2); +close_user(_Pid, _L, S) -> + {continue, S}. + +do_close2(L, S) when L#log.users =:= 0, L#log.owners =:= [] -> + {stop, S}; +do_close2(_L, S) -> + {continue, S}. + +%%----------------------------------------------------------------- +%% Callback functions for system messages handling. +%%----------------------------------------------------------------- +system_continue(_Parent, _, State) -> + loop(State). + +-spec system_terminate(_, _, _, #state{}) -> no_return(). +system_terminate(Reason, _Parent, _, State) -> + _ = do_stop(State), + exit(Reason). + +%%----------------------------------------------------------------- +%% Temporay code for upgrade. +%%----------------------------------------------------------------- +system_code_change(State, _Module, _OldVsn, _Extra) -> + {ok, State}. + + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- +-spec do_exit(#state{}, pid(), _, _) -> no_return(). +do_exit(S, From, Message0, Reason) -> + R = do_stop(S), + Message = case S#state.cache_error of + Err when Err =/= ok -> Err; + _ when R =:= closed -> Message0; + _ when Message0 =:= ok -> R; + _ -> Message0 + end, + _ = disk_log_server:close(self()), + replies(From, Message), + ?PROFILE(ep:done()), + exit(Reason). + +-spec do_fast_exit(#state{}, pid(), _) -> no_return(). +do_fast_exit(S, Server, Message) -> + _ = do_stop(S), + Server ! {disk_log, self(), Message}, + exit(normal). + +%% -> closed | Error +do_stop(S) -> + proc_q(S#state.queue ++ S#state.messages), + close_disk_log(get(log)). + +proc_q([{From, _R}|Tail]) when is_pid(From) -> + From ! {disk_log, self(), {error, disk_log_stopped}}, + proc_q(Tail); +proc_q([_|T]) -> %% async stuff + proc_q(T); +proc_q([]) -> + ok. + +%% -> log() +opening_pid(Pid, Notify, L) -> + {ok, L1} = add_pid(Pid, Notify, L), + L1. + +%% -> {ok, log()} | Error +add_pid(Pid, Notify, L) when is_pid(Pid) -> + case is_owner(Pid, L) of + false -> + link(Pid), + {ok, L#log{owners = [{Pid, Notify} | L#log.owners]}}; + {true, Notify} -> +%% {error, {pid_already_connected, L#log.name}}; + {ok, L}; + {true, CurNotify} when Notify =/= CurNotify -> + {error, {arg_mismatch, notify, CurNotify, Notify}} + end; +add_pid(_NotAPid, _Notify, L) -> + {ok, L#log{users = L#log.users + 1}}. + +unblock_pid(L) when L#log.blocked_by =:= none -> + ok; +unblock_pid(L) -> + case is_owner(L#log.blocked_by, L) of + {true, _Notify} -> + ok; + false -> + unlink(L#log.blocked_by) + end. + +%% -> true | false +is_owner(Pid, L) -> + case lists:keysearch(Pid, 1, L#log.owners) of + {value, {_Pid, Notify}} -> + {true, Notify}; + false -> + false + end. + +%% ok | throw(Error) +rename_file(File, NewFile, halt) -> + file:rename(File, NewFile); +rename_file(File, NewFile, wrap) -> + rename_file(wrap_file_extensions(File), File, NewFile, ok). + +rename_file([Ext|Exts], File, NewFile, Res) -> + NRes = case file:rename(add_ext(File, Ext), add_ext(NewFile, Ext)) of + ok -> + Res; + Else -> + Else + end, + rename_file(Exts, File, NewFile, NRes); +rename_file([], _File, _NewFiles, Res) -> Res. + +%% "Old" error messages have been kept, arg_mismatch has been added. +%%-spec compare_arg(dlog_options(), #arg{}, +compare_arg([], _A, none, _OrigHead) -> + % no header option given + ok; +compare_arg([], _A, Head, OrigHead) when Head =/= OrigHead -> + {error, {arg_mismatch, head, OrigHead, Head}}; +compare_arg([], _A, _Head, _OrigHead) -> + ok; +compare_arg([{Attr, Val} | Tail], A, Head, OrigHead) -> + case compare_arg(Attr, Val, A) of + {not_ok, OrigVal} -> + {error, {arg_mismatch, Attr, OrigVal, Val}}; + ok -> + compare_arg(Tail, A, Head, OrigHead); + Error -> + Error + end. + +-spec compare_arg(atom(), _, #arg{}) -> + 'ok' | {'not_ok', _} | {'error', {atom(), _}}. +compare_arg(file, F, A) when F =/= A#arg.file -> + {error, {name_already_open, A#arg.name}}; +compare_arg(mode, read_only, A) when A#arg.mode =:= read_write -> + {error, {open_read_write, A#arg.name}}; +compare_arg(mode, read_write, A) when A#arg.mode =:= read_only -> + {error, {open_read_only, A#arg.name}}; +compare_arg(type, T, A) when T =/= A#arg.type -> + {not_ok, A#arg.type}; +compare_arg(format, F, A) when F =/= A#arg.format -> + {not_ok, A#arg.format}; +compare_arg(repair, R, A) when R =/= A#arg.repair -> + %% not used, but check it anyway... + {not_ok, A#arg.repair}; +compare_arg(_Attr, _Val, _A) -> + ok. + +%% -> {ok, Res, log(), Cnt} | Error +do_open(A) -> + L = #log{name = A#arg.name, + filename = A#arg.file, + size = A#arg.size, + head = mk_head(A#arg.head, A#arg.format), + mode = A#arg.mode, + version = A#arg.version}, + do_open2(L, A). + +mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)}; +mk_head({head, Bytes}, external) -> {ok, check_bytes(Bytes)}; +mk_head(H, _) -> H. + +terms2bins([T | Ts]) -> + [term_to_binary(T) | terms2bins(Ts)]; +terms2bins([]) -> + []. + +check_bytes_list([B | Bs], Bs0) when is_binary(B) -> + check_bytes_list(Bs, Bs0); +check_bytes_list([], Bs0) -> + Bs0; +check_bytes_list(_, Bs0) -> + check_bytes_list(Bs0). + +check_bytes_list([B | Bs]) when is_binary(B) -> + [B | check_bytes_list(Bs)]; +check_bytes_list([B | Bs]) -> + [list_to_binary(B) | check_bytes_list(Bs)]; +check_bytes_list([]) -> + []. + +check_bytes(Binary) when is_binary(Binary) -> + Binary; +check_bytes(Bytes) -> + list_to_binary(Bytes). + +%%----------------------------------------------------------------- +%% Change size of the logs in runtime. +%%----------------------------------------------------------------- +%% -> ok | {big, CurSize} | throw(Error) +do_change_size(L, NewSize) when L#log.type =:= halt -> + Halt = L#log.extra, + CurB = Halt#halt.curB, + NewLog = L#log{extra = Halt#halt{size = NewSize}}, + if + NewSize =:= infinity -> + erase(is_full), + put(log, NewLog), + ok; + CurB =< NewSize -> + erase(is_full), + put(log, NewLog), + ok; + true -> + {big, CurB} + end; +do_change_size(L, NewSize) when L#log.type =:= wrap -> + #log{extra = Extra, version = Version} = L, + {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version), + erase(is_full), + put(log, L#log{extra = Handle}), + ok. + +%% -> {ok, Head} | Error; Head = none | {head, H} | {M,F,A} +check_head({head, none}, _Format) -> + {ok, none}; +check_head({head_func, {M, F, A}}, _Format) when is_atom(M), + is_atom(F), + is_list(A) -> + {ok, {M, F, A}}; +check_head({head, Head}, external) -> + case catch check_bytes(Head) of + {'EXIT', _} -> + {error, {badarg, head}}; + _ -> + {ok, {head, Head}} + end; +check_head({head, Term}, internal) -> + {ok, {head, Term}}; +check_head(_Head, _Format) -> + {error, {badarg, head}}. + +check_size(wrap, {NewMaxB,NewMaxF}) when + is_integer(NewMaxB), is_integer(NewMaxF), + NewMaxB > 0, NewMaxB =< ?MAX_BYTES, NewMaxF > 0, NewMaxF < ?MAX_FILES -> + ok; +check_size(halt, NewSize) when is_integer(NewSize), NewSize > 0 -> + ok; +check_size(halt, infinity) -> + ok; +check_size(_, _) -> + not_ok. + +%%----------------------------------------------------------------- +%% Increment a wrap log. +%%----------------------------------------------------------------- +%% -> {ok, log(), Lost} | {error, Error, log()} +do_inc_wrap_file(L) -> + #log{format = Format, extra = Handle} = L, + case Format of + internal -> + case disk_log_1:mf_int_inc(Handle, L#log.head) of + {ok, Handle2, Lost} -> + {ok, L#log{extra = Handle2}, Lost}; + {error, Error, Handle2} -> + {error, Error, L#log{extra = Handle2}} + end; + external -> + case disk_log_1:mf_ext_inc(Handle, L#log.head) of + {ok, Handle2, Lost} -> + {ok, L#log{extra = Handle2}, Lost}; + {error, Error, Handle2} -> + {error, Error, L#log{extra = Handle2}} + end + end. + + +%%----------------------------------------------------------------- +%% Open a log file. +%%----------------------------------------------------------------- +%% -> {ok, Reply, log(), Cnt} | Error +%% Note: the header is always written, even if the log size is too small. +do_open2(L, #arg{type = halt, format = internal, name = Name, + file = FName, repair = Repair, size = Size, mode = Mode}) -> + case catch disk_log_1:int_open(FName, Repair, Mode, L#log.head) of + {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> + Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, + {ok, {ok, Name}, L#log{format_type = halt_int, extra = Halt}, + NoItems}; + {repaired, FdC, Rec, Bad, FileSize} -> + Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, + {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, + L#log{format_type = halt_int, extra = Halt}, + Rec}; + Error -> + Error + end; +do_open2(L, #arg{type = wrap, format = internal, size = {MaxB, MaxF}, + name = Name, repair = Repair, file = FName, mode = Mode, + version = V}) -> + case catch + disk_log_1:mf_int_open(FName, MaxB, MaxF, Repair, Mode, L#log.head, V) of + {ok, Handle, Cnt} -> + {ok, {ok, Name}, L#log{type = wrap, + format_type = wrap_int, + extra = Handle}, Cnt}; + {repaired, Handle, Rec, Bad, Cnt} -> + {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, + L#log{type = wrap, format_type = wrap_int, extra = Handle}, Cnt}; + Error -> + Error + end; +do_open2(L, #arg{type = halt, format = external, file = FName, name = Name, + size = Size, repair = Repair, mode = Mode}) -> + case catch disk_log_1:ext_open(FName, Repair, Mode, L#log.head) of + {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> + Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, + {ok, {ok, Name}, + L#log{format_type = halt_ext, format = external, extra = Halt}, + NoItems}; + Error -> + Error + end; +do_open2(L, #arg{type = wrap, format = external, size = {MaxB, MaxF}, + name = Name, file = FName, repair = Repair, mode = Mode, + version = V}) -> + case catch + disk_log_1:mf_ext_open(FName, MaxB, MaxF, Repair, Mode, L#log.head, V) of + {ok, Handle, Cnt} -> + {ok, {ok, Name}, L#log{type = wrap, + format_type = wrap_ext, + extra = Handle, + format = external}, Cnt}; + Error -> + Error + end. + +%% -> closed | Error +close_disk_log(undefined) -> + closed; +close_disk_log(L) -> + unblock_pid(L), + F = fun({Pid, _}) -> + unlink(Pid) + end, + lists:foreach(F, L#log.owners), + R = (catch close_disk_log2(L)), + erase(log), + R. + +-spec close_disk_log2(#log{}) -> 'closed'. % | throw(Error) + +close_disk_log2(L) -> + case L of + #log{format_type = halt_int, mode = Mode, extra = Halt} -> + disk_log_1:close(Halt#halt.fdc, L#log.filename, Mode); + #log{format_type = wrap_int, mode = Mode, extra = Handle} -> + disk_log_1:mf_int_close(Handle, Mode); + #log{format_type = halt_ext, extra = Halt} -> + disk_log_1:fclose(Halt#halt.fdc, L#log.filename); + #log{format_type = wrap_ext, mode = Mode, extra = Handle} -> + disk_log_1:mf_ext_close(Handle, Mode) + end, + closed. + +do_format_error({error, Module, Error}) -> + Module:format_error(Error); +do_format_error({error, Reason}) -> + do_format_error(Reason); +do_format_error({Node, Error = {error, _Reason}}) -> + lists:append(io_lib:format("~p: ", [Node]), do_format_error(Error)); +do_format_error({badarg, Arg}) -> + io_lib:format("The argument ~p is missing, not recognized or " + "not wellformed~n", [Arg]); +do_format_error({size_mismatch, OldSize, ArgSize}) -> + io_lib:format("The given size ~p does not match the size ~p found on " + "the disk log size file~n", [ArgSize, OldSize]); +do_format_error({read_only_mode, Log}) -> + io_lib:format("The disk log ~p has been opened read-only, but the " + "requested operation needs read-write access~n", [Log]); +do_format_error({format_external, Log}) -> + io_lib:format("The requested operation can only be applied on internally " + "formatted disk logs, but ~p is externally formatted~n", + [Log]); +do_format_error({blocked_log, Log}) -> + io_lib:format("The blocked disk log ~p does not queue requests, or " + "the log has been blocked by the calling process~n", [Log]); +do_format_error({full, Log}) -> + io_lib:format("The halt log ~p is full~n", [Log]); +do_format_error({not_blocked, Log}) -> + io_lib:format("The disk log ~p is not blocked~n", [Log]); +do_format_error({not_owner, Pid}) -> + io_lib:format("The pid ~p is not an owner of the disk log~n", [Pid]); +do_format_error({not_blocked_by_pid, Log}) -> + io_lib:format("The disk log ~p is blocked, but only the blocking pid " + "can unblock a disk log~n", [Log]); +do_format_error({new_size_too_small, Log, CurrentSize}) -> + io_lib:format("The current size ~p of the halt log ~p is greater than the " + "requested new size~n", [CurrentSize, Log]); +do_format_error({halt_log, Log}) -> + io_lib:format("The halt log ~p cannot be wrapped~n", [Log]); +do_format_error({same_file_name, Log}) -> + io_lib:format("Current and new file name of the disk log ~p " + "are the same~n", [Log]); +do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) -> + io_lib:format("The value ~p of the disk log option ~p does not match " + "the current value ~p~n", [ArgValue, Option, FirstValue]); +do_format_error({name_already_open, Log}) -> + io_lib:format("The disk log ~p has already opened another file~n", [Log]); +do_format_error({node_already_open, Log}) -> + io_lib:format("The distribution option of the disk log ~p does not match " + "already open log~n", [Log]); +do_format_error({open_read_write, Log}) -> + io_lib:format("The disk log ~p has already been opened read-write~n", + [Log]); +do_format_error({open_read_only, Log}) -> + io_lib:format("The disk log ~p has already been opened read-only~n", + [Log]); +do_format_error({not_internal_wrap, Log}) -> + io_lib:format("The requested operation cannot be applied since ~p is not " + "an internally formatted disk log~n", [Log]); +do_format_error(no_such_log) -> + io_lib:format("There is no disk log with the given name~n", []); +do_format_error(nonode) -> + io_lib:format("There seems to be no node up that can handle " + "the request~n", []); +do_format_error(nodedown) -> + io_lib:format("There seems to be no node up that can handle " + "the request~n", []); +do_format_error({corrupt_log_file, FileName}) -> + io_lib:format("The disk log file \"~s\" contains corrupt data~n", + [FileName]); +do_format_error({need_repair, FileName}) -> + io_lib:format("The disk log file \"~s\" has not been closed properly and " + "needs repair~n", [FileName]); +do_format_error({not_a_log_file, FileName}) -> + io_lib:format("The file \"~s\" is not a wrap log file~n", [FileName]); +do_format_error({invalid_header, InvalidHeader}) -> + io_lib:format("The disk log header is not wellformed: ~p~n", + [InvalidHeader]); +do_format_error(end_of_log) -> + io_lib:format("An attempt was made to step outside a not yet " + "full wrap log~n", []); +do_format_error({invalid_index_file, FileName}) -> + io_lib:format("The wrap log index file \"~s\" cannot be used~n", + [FileName]); +do_format_error({no_continuation, BadCont}) -> + io_lib:format("The term ~p is not a chunk continuation~n", [BadCont]); +do_format_error({file_error, FileName, Reason}) -> + io_lib:format("\"~s\": ~p~n", [FileName, file:format_error(Reason)]); +do_format_error(E) -> + io_lib:format("~p~n", [E]). + +do_info(L, Cnt) -> + #log{name = Name, type = Type, mode = Mode, filename = File, + extra = Extra, status = Status, owners = Owners, users = Users, + format = Format, head = Head} = L, + Size = case Type of + wrap -> + disk_log_1:get_wrap_size(Extra); + halt -> + Extra#halt.size + end, + Distribution = + case disk_log_server:get_log_pids(Name) of + {local, _Pid} -> + local; + {distributed, Pids} -> + [node(P) || P <- Pids]; + undefined -> % "cannot happen" + [] + end, + RW = case Type of + wrap when Mode =:= read_write -> + #handle{curB = CurB, curF = CurF, + cur_cnt = CurCnt, acc_cnt = AccCnt, + noFull = NoFull, accFull = AccFull} = Extra, + NewAccFull = AccFull + NoFull, + NewExtra = Extra#handle{noFull = 0, accFull = NewAccFull}, + put(log, L#log{extra = NewExtra}), + [{no_current_bytes, CurB}, + {no_current_items, CurCnt}, + {no_items, Cnt}, + {no_written_items, CurCnt + AccCnt}, + {current_file, CurF}, + {no_overflows, {NewAccFull, NoFull}} + ]; + halt when Mode =:= read_write -> + IsFull = case get(is_full) of + undefined -> false; + _ -> true + end, + [{full, IsFull}, + {no_written_items, Cnt} + ]; + _ when Mode =:= read_only -> + [] + end, + HeadL = case Mode of + read_write -> + [{head, Head}]; + read_only -> + [] + end, + Common = [{name, Name}, + {file, File}, + {type, Type}, + {format, Format}, + {size, Size}, + {items, Cnt}, % kept for "backward compatibility" (undocumented) + {owners, Owners}, + {users, Users}] ++ + HeadL ++ + [{mode, Mode}, + {status, Status}, + {node, node()}, + {distributed, Distribution} + ], + Common ++ RW. + +do_block(Pid, QueueLogRecs, L) -> + L2 = L#log{status = {blocked, QueueLogRecs}, blocked_by = Pid}, + put(log, L2), + case is_owner(Pid, L2) of + {true, _Notify} -> + ok; + false -> + link(Pid) + end. + +do_unblock(Pid, L, S) when L#log.blocked_by =:= Pid -> + do_unblock(L, S); +do_unblock(_Pid, _L, S) -> + S. + +do_unblock(L, S) -> + unblock_pid(L), + L2 = L#log{blocked_by = none, status = ok}, + put(log, L2), + %% Since the block request is synchronous, and the blocking + %% process is the only process that can unblock, all requests in + %% 'messages' will have been put in 'queue' before the unblock + %% request is granted. + [] = S#state.messages, % assertion + S#state{queue = [], messages = lists:reverse(S#state.queue)}. + +-spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}. + +do_log(L, B) when L#log.type =:= halt -> + #log{format = Format, extra = Halt} = L, + #halt{curB = CurSize, size = Sz} = Halt, + {Bs, BSize} = bsize(B, Format), + case get(is_full) of + true -> + {error, {error, {full, L#log.name}}, 0}; + undefined when Sz =:= infinity; CurSize + BSize =< Sz -> + halt_write(Halt, L, B, Bs, BSize); + undefined -> + halt_write_full(L, B, Format, 0) + end; +do_log(L, B) when L#log.format_type =:= wrap_int -> + case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of + {ok, Handle, Logged, Lost, Wraps} -> + notify_owners_wrap(Wraps), + put(log, L#log{extra = Handle}), + Logged - Lost; + {ok, Handle, Logged} -> + put(log, L#log{extra = Handle}), + Logged; + {error, Error, Handle, Logged, Lost} -> + put(log, L#log{extra = Handle}), + {error, Error, Logged - Lost} + end; +do_log(L, B) when L#log.format_type =:= wrap_ext -> + case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of + {ok, Handle, Logged, Lost, Wraps} -> + notify_owners_wrap(Wraps), + put(log, L#log{extra = Handle}), + Logged - Lost; + {ok, Handle, Logged} -> + put(log, L#log{extra = Handle}), + Logged; + {error, Error, Handle, Logged, Lost} -> + put(log, L#log{extra = Handle}), + {error, Error, Logged - Lost} + end. + +bsize(B, external) -> + {B, xsz(B, 0)}; +bsize(B, internal) -> + disk_log_1:logl(B). + +xsz([B|T], Sz) -> xsz(T, byte_size(B) + Sz); +xsz([], Sz) -> Sz. + +halt_write_full(L, [Bin | Bins], Format, N) -> + B = [Bin], + {Bs, BSize} = bsize(B, Format), + Halt = L#log.extra, + #halt{curB = CurSize, size = Sz} = Halt, + if + CurSize + BSize =< Sz -> + case halt_write(Halt, L, B, Bs, BSize) of + N1 when is_integer(N1) -> + halt_write_full(get(log), Bins, Format, N+N1); + Error -> + Error + end; + true -> + halt_write_full(L, [], Format, N) + end; +halt_write_full(L, _Bs, _Format, N) -> + put(is_full, true), + notify_owners(full), + {error, {error, {full, L#log.name}}, N}. + +halt_write(Halt, L, B, Bs, BSize) -> + case disk_log_1:fwrite(Halt#halt.fdc, L#log.filename, Bs, BSize) of + {ok, NewFdC} -> + NCurB = Halt#halt.curB + BSize, + NewHalt = Halt#halt{fdc = NewFdC, curB = NCurB}, + put(log, L#log{extra = NewHalt}), + length(B); + {Error, NewFdC} -> + put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), + {error, Error, 0} + end. + +%% -> ok | Error +do_write_cache(#log{filename = FName, type = halt, extra = Halt} = Log) -> + {Reply, NewFdC} = disk_log_1:write_cache(Halt#halt.fdc, FName), + put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), + Reply; +do_write_cache(#log{type = wrap, extra = Handle} = Log) -> + {Reply, NewHandle} = disk_log_1:mf_write_cache(Handle), + put(log, Log#log{extra = NewHandle}), + Reply. + +%% -> ok | Error +do_sync(#log{filename = FName, type = halt, extra = Halt} = Log) -> + {Reply, NewFdC} = disk_log_1:sync(Halt#halt.fdc, FName), + put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), + Reply; +do_sync(#log{type = wrap, extra = Handle} = Log) -> + {Reply, NewHandle} = disk_log_1:mf_sync(Handle), + put(log, Log#log{extra = NewHandle}), + Reply. + +%% -> ok | Error | throw(Error) +do_trunc(L, Head) when L#log.type =:= halt -> + #log{filename = FName, extra = Halt} = L, + FdC = Halt#halt.fdc, + {Reply1, FdC2} = + case L#log.format of + internal -> + disk_log_1:truncate(FdC, FName, Head); + external -> + case disk_log_1:truncate_at(FdC, FName, bof) of + {ok, NFdC} when Head =:= none -> + {ok, NFdC}; + {ok, NFdC} -> + {ok, H} = Head, + disk_log_1:fwrite(NFdC, FName, H, byte_size(H)); + R -> + R + end + end, + {Reply, NewHalt} = + case disk_log_1:position(FdC2, FName, cur) of + {ok, NewFdC, FileSize} when Reply1 =:= ok -> + {ok, Halt#halt{fdc = NewFdC, curB = FileSize}}; + {Reply2, NewFdC} -> + {Reply2, Halt#halt{fdc = NewFdC}}; + {ok, NewFdC, _} -> + {Reply1, Halt#halt{fdc = NewFdC}} + end, + put(log, L#log{extra = NewHalt}), + Reply; +do_trunc(L, Head) when L#log.type =:= wrap -> + Handle = L#log.extra, + OldHead = L#log.head, + {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle), + ok = do_change_size(L, {MaxB, 1}), + NewLog = trunc_wrap((get(log))#log{head = Head}), + %% Just to remove all files with suffix > 1: + NewLog2 = trunc_wrap(NewLog), + NewHandle = (NewLog2#log.extra)#handle{noFull = 0, accFull = 0}, + do_change_size(NewLog2#log{extra = NewHandle, head = OldHead}, + {MaxB, MaxF}). + +trunc_wrap(L) -> + case do_inc_wrap_file(L) of + {ok, L2, _Lost} -> + L2; + {error, Error, _L2} -> + throw(Error) + end. + +do_chunk(#log{format_type = halt_int, extra = Halt} = L, Pos, B, N) -> + FdC = Halt#halt.fdc, + {NewFdC, Reply} = + case L#log.mode of + read_only -> + disk_log_1:chunk_read_only(FdC, L#log.filename, Pos, B, N); + read_write -> + disk_log_1:chunk(FdC, L#log.filename, Pos, B, N) + end, + put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), + Reply; +do_chunk(#log{format_type = wrap_int, mode = read_only, + extra = Handle} = Log, Pos, B, N) -> + {NewHandle, Reply} = disk_log_1:mf_int_chunk_read_only(Handle, Pos, B, N), + put(log, Log#log{extra = NewHandle}), + Reply; +do_chunk(#log{format_type = wrap_int, extra = Handle} = Log, Pos, B, N) -> + {NewHandle, Reply} = disk_log_1:mf_int_chunk(Handle, Pos, B, N), + put(log, Log#log{extra = NewHandle}), + Reply; +do_chunk(Log, _Pos, _B, _) -> + {error, {format_external, Log#log.name}}. + +do_chunk_step(#log{format_type = wrap_int, extra = Handle}, Pos, N) -> + disk_log_1:mf_int_chunk_step(Handle, Pos, N); +do_chunk_step(Log, _Pos, _N) -> + {error, {not_internal_wrap, Log#log.name}}. + +%% Inlined. +replies(Pids, Reply) -> + M = {disk_log, self(), Reply}, + send_reply(Pids, M). + +send_reply(Pid, M) when is_pid(Pid) -> + Pid ! M; +send_reply([Pid | Pids], M) -> + Pid ! M, + send_reply(Pids, M); +send_reply([], _M) -> + ok. + +reply(To, Reply, S) -> + To ! {disk_log, self(), Reply}, + loop(S). + +req(Log, R) -> + case disk_log_server:get_log_pids(Log) of + {local, Pid} -> + monitor_request(Pid, R); + undefined -> + {error, no_such_log}; + {distributed, Pids} -> + multi_req({self(), R}, Pids) + end. + +multi_req(Msg, Pids) -> + Refs = + lists:map(fun(Pid) -> + Ref = erlang:monitor(process, Pid), + Pid ! Msg, + {Pid, Ref} + end, Pids), + lists:foldl(fun({Pid, Ref}, Reply) -> + receive + {'DOWN', Ref, process, Pid, _Info} -> + Reply; + {disk_log, Pid, _Reply} -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, process, Pid, _Reason} -> + ok + after 0 -> + ok + end + end + end, {error, nonode}, Refs). + +sreq(Log, R) -> + case nearby_pid(Log, node()) of + undefined -> + {error, no_such_log}; + Pid -> + monitor_request(Pid, R) + end. + +%% Local req - always talk to log on Node +lreq(Log, R, Node) -> + case nearby_pid(Log, Node) of + Pid when is_pid(Pid), node(Pid) =:= Node -> + monitor_request(Pid, R); + _Else -> + {error, no_such_log} + end. + +nearby_pid(Log, Node) -> + case disk_log_server:get_log_pids(Log) of + undefined -> + undefined; + {local, Pid} -> + Pid; + {distributed, Pids} -> + get_near_pid(Pids, Node) + end. + +-spec get_near_pid([pid(),...], node()) -> pid(). + +get_near_pid([Pid | _], Node) when node(Pid) =:= Node -> Pid; +get_near_pid([Pid], _ ) -> Pid; +get_near_pid([_ | T], Node) -> get_near_pid(T, Node). + +monitor_request(Pid, Req) -> + Ref = erlang:monitor(process, Pid), + Pid ! {self(), Req}, + receive + {'DOWN', Ref, process, Pid, _Info} -> + {error, no_such_log}; + {disk_log, Pid, Reply} -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, process, Pid, _Reason} -> + Reply + after 0 -> + Reply + end + end. + +req2(Pid, R) -> + monitor_request(Pid, R). + +merge_head(none, Head) -> + Head; +merge_head(Head, _) -> + Head. + +%% -> List of extensions of existing files (no dot included) | throw(FileError) +wrap_file_extensions(File) -> + {_CurF, _CurFSz, _TotSz, NoOfFiles} = + disk_log_1:read_index_file(File), + Fs = if + NoOfFiles >= 1 -> + lists:seq(1, NoOfFiles); + NoOfFiles =:= 0 -> + [] + end, + Fun = fun(Ext) -> + case file:read_file_info(add_ext(File, Ext)) of + {ok, _} -> + true; + _Else -> + false + end + end, + lists:filter(Fun, ["idx", "siz" | Fs]). + +add_ext(File, Ext) -> + lists:concat([File, ".", Ext]). + +notify(Log, R) -> + case disk_log_server:get_log_pids(Log) of + undefined -> + {error, no_such_log}; + {local, Pid} -> + Pid ! R, + ok; + {distributed, Pids} -> + lists:foreach(fun(Pid) -> Pid ! R end, Pids), + ok + end. + +notify_owners_wrap([]) -> + ok; +notify_owners_wrap([N | Wraps]) -> + notify_owners({wrap, N}), + notify_owners_wrap(Wraps). + +notify_owners(Note) -> + L = get(log), + Msg = {disk_log, node(), L#log.name, Note}, + lists:foreach(fun({Pid, true}) -> Pid ! Msg; + (_) -> ok + end, L#log.owners). + +cache_error(S, Pids) -> + Error = S#state.cache_error, + replies(Pids, Error), + state_err(S#state{cache_error = ok}, Error). + +state_ok(S) -> + state_err(S, ok). + +-spec state_err(#state{}, dlog_state_error()) -> #state{}. + +state_err(S, Err) when S#state.error_status =:= Err -> S; +state_err(S, Err) -> + notify_owners({error_status, Err}), + S#state{error_status = Err}. |