%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1997-2009. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% -module(disk_log). %% Efficient file based log - process part -export([start/0, istart_link/1, log/2, log_terms/2, blog/2, blog_terms/2, alog/2, alog_terms/2, balog/2, balog_terms/2, close/1, lclose/1, lclose/2, sync/1, open/1, truncate/1, truncate/2, btruncate/2, reopen/2, reopen/3, breopen/3, inc_wrap_file/1, change_size/2, change_notify/3, change_header/2, chunk/2, chunk/3, bchunk/2, bchunk/3, chunk_step/3, chunk_info/1, block/1, block/2, unblock/1, info/1, format_error/1, accessible_logs/0]). %% Internal exports -export([init/2, internal_open/2, system_continue/3, system_terminate/4, system_code_change/4]). %% To be used by disk_log_h.erl (not (yet) in Erlang/OTP) only. -export([ll_open/1, ll_close/1, do_log/2, do_sync/1, do_info/2]). %% To be used by wrap_log_reader only. -export([ichunk_end/2]). %% To be used for debugging only: -export([pid2name/1]). -type dlog_state_error() :: 'ok' | {'error', term()}. -record(state, {queue = [], messages = [], parent, server, cnt = 0 :: non_neg_integer(), args, error_status = ok :: dlog_state_error(), cache_error = ok %% cache write error after timeout }). -include("disk_log.hrl"). -define(failure(Error, Function, Arg), {{failed, Error}, [{?MODULE, Function, Arg}]}). %%-define(PROFILE(C), C). -define(PROFILE(C), void). -compile({inline,[{log_loop,4},{log_end_sync,2},{replies,2},{rflat,1}]}). %%%---------------------------------------------------------------------- %%% Contract type specifications %%%---------------------------------------------------------------------- -type bytes() :: binary() | [byte()]. -type log() :: term(). % XXX: refine -type file_error() :: term(). % XXX: refine -type invalid_header() :: term(). % XXX: refine %%%---------------------------------------------------------------------- %%% API %%%---------------------------------------------------------------------- %%----------------------------------------------------------------- %% This module implements the API, and the processes for each log. %% There is one process per log. %%----------------------------------------------------------------- -type open_error_rsn() :: 'no_such_log' | {'badarg', term()} | {'size_mismatch', dlog_size(), dlog_size()} | {'arg_mismatch', dlog_optattr(), term(), term()} | {'name_already_open', log()} | {'open_read_write', log()} | {'open_read_only', log()} | {'need_repair', log()} | {'not_a_log_file', string()} | {'invalid_index_file', string()} | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()} | {'node_already_open', log()}. -type dist_error_rsn() :: 'nodedown' | open_error_rsn(). -type ret() :: {'ok', log()} | {'repaired', log(), {'recovered', non_neg_integer()}, {'badbytes', non_neg_integer()}}. -type open_ret() :: ret() | {'error', open_error_rsn()}. -type dist_open_ret() :: {[{node(), ret()}], [{node(), {'error', dist_error_rsn()}}]}. -type all_open_ret() :: open_ret() | dist_open_ret(). -spec open(Args :: dlog_options()) -> all_open_ret(). open(A) -> disk_log_server:open(check_arg(A, #arg{options = A})). -type log_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} | {'format_external', log()} | {'blocked_log', log()} | {'full', log()} | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()}. -spec log(Log :: log(), Term :: term()) -> 'ok' | {'error', log_error_rsn()}. log(Log, Term) -> req(Log, {log, term_to_binary(Term)}). -spec blog(Log :: log(), Bytes :: bytes()) -> 'ok' | {'error', log_error_rsn()}. blog(Log, Bytes) -> req(Log, {blog, check_bytes(Bytes)}). -spec log_terms(Log :: log(), Terms :: [term()]) -> 'ok' | {'error', term()}. log_terms(Log, Terms) -> Bs = terms2bins(Terms), req(Log, {log, Bs}). -spec blog_terms(Log :: log(), Bytes :: [bytes()]) -> 'ok' | {'error', term()}. blog_terms(Log, Bytess) -> Bs = check_bytes_list(Bytess, Bytess), req(Log, {blog, Bs}). -type notify_ret() :: 'ok' | {'error', 'no_such_log'}. -spec alog(Log :: log(), Term :: term()) -> notify_ret(). alog(Log, Term) -> notify(Log, {alog, term_to_binary(Term)}). -spec alog_terms(Log :: log(), Terms :: [term()]) -> notify_ret(). alog_terms(Log, Terms) -> Bs = terms2bins(Terms), notify(Log, {alog, Bs}). -spec balog(Log :: log(), Bytes :: bytes()) -> notify_ret(). balog(Log, Bytes) -> notify(Log, {balog, check_bytes(Bytes)}). -spec balog_terms(Log :: log(), Bytes :: [bytes()]) -> notify_ret(). balog_terms(Log, Bytess) -> Bs = check_bytes_list(Bytess, Bytess), notify(Log, {balog, Bs}). -type close_error_rsn() ::'no_such_log' | 'nonode' | {'file_error', file:filename(), file_error()}. -spec close(Log :: log()) -> 'ok' | {'error', close_error_rsn()}. close(Log) -> req(Log, close). -type lclose_error_rsn() :: 'no_such_log' | {'file_error', file:filename(), file_error()}. -spec lclose(Log :: log()) -> 'ok' | {'error', lclose_error_rsn()}. lclose(Log) -> lclose(Log, node()). -spec lclose(Log :: log(), Node :: node()) -> 'ok' | {'error', lclose_error_rsn()}. lclose(Log, Node) -> lreq(Log, close, Node). -type trunc_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} | {'blocked_log', log()} | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()}. -spec truncate(Log :: log()) -> 'ok' | {'error', trunc_error_rsn()}. truncate(Log) -> req(Log, {truncate, none, truncate, 1}). -spec truncate(Log :: log(), Head :: term()) -> 'ok' | {'error', trunc_error_rsn()}. truncate(Log, Head) -> req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}). -spec btruncate(Log :: log(), Head :: bytes()) -> 'ok' | {'error', trunc_error_rsn()}. btruncate(Log, Head) -> req(Log, {truncate, {ok, check_bytes(Head)}, btruncate, 2}). -spec reopen(Log :: log(), Filename :: file:filename()) -> 'ok' | {'error', term()}. reopen(Log, NewFile) -> req(Log, {reopen, NewFile, none, reopen, 2}). -spec reopen(Log :: log(), Filename :: file:filename(), Head :: term()) -> 'ok' | {'error', term()}. reopen(Log, NewFile, NewHead) -> req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}). -spec breopen(Log :: log(), Filename :: file:filename(), Head :: bytes()) -> 'ok' | {'error', term()}. breopen(Log, NewFile, NewHead) -> req(Log, {reopen, NewFile, {ok, check_bytes(NewHead)}, breopen, 3}). -type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} | {'blocked_log', log()} | {'halt_log', log()} | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()}. -spec inc_wrap_file(Log :: log()) -> 'ok' | {'error', inc_wrap_error_rsn()}. inc_wrap_file(Log) -> req(Log, inc_wrap_file). -spec change_size(Log :: log(), Size :: dlog_size()) -> 'ok' | {'error', term()}. change_size(Log, NewSize) -> req(Log, {change_size, NewSize}). -spec change_notify(Log :: log(), Pid :: pid(), Notify :: boolean()) -> 'ok' | {'error', term()}. change_notify(Log, Pid, NewNotify) -> req(Log, {change_notify, Pid, NewNotify}). -spec change_header(Log :: log(), Head :: {atom(), term()}) -> 'ok' | {'error', term()}. change_header(Log, NewHead) -> req(Log, {change_header, NewHead}). -type sync_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} | {'blocked_log', log()} | {'file_error', file:filename(), file_error()}. -spec sync(Log :: log()) -> 'ok' | {'error', sync_error_rsn()}. sync(Log) -> req(Log, sync). -type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}. -spec block(Log :: log()) -> 'ok' | {'error', block_error_rsn()}. block(Log) -> block(Log, true). -spec block(Log :: log(), QueueLogRecords :: boolean()) -> 'ok' | {'error', term()}. block(Log, QueueLogRecords) -> req(Log, {block, QueueLogRecords}). -type unblock_error_rsn() :: 'no_such_log' | 'nonode' | {'not_blocked', log()} | {'not_blocked_by_pid', log()}. -spec unblock(Log :: log()) -> 'ok' | {'error', unblock_error_rsn()}. unblock(Log) -> req(Log, unblock). -spec format_error(Error :: term()) -> string(). format_error(Error) -> do_format_error(Error). -spec info(Log :: log()) -> [{atom(), any()}] | {'error', term()}. info(Log) -> sreq(Log, info). -spec pid2name(Pid :: pid()) -> {'ok', log()} | 'undefined'. pid2name(Pid) -> disk_log_server:start(), case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of [] -> undefined; [{_Pid, Log}] -> {ok, Log} end. %% This function Takes 3 args, a Log, a Continuation and N. %% It retuns a {Cont2, ObjList} | eof | {error, Reason} %% The initial continuation is the atom 'start' -spec chunk(Log :: log(), Cont :: any()) -> {'error', term()} | 'eof' | {any(), [any()]} | {any(), [any()], integer()}. chunk(Log, Cont) -> chunk(Log, Cont, infinity). -spec chunk(Log :: log(), Cont :: any(), N :: pos_integer() | 'infinity') -> {'error', term()} | 'eof' | {any(), [any()]} | {any(), [any()], integer()}. chunk(Log, Cont, infinity) -> %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. ichunk(Log, Cont, ?MAX_CHUNK_SIZE); chunk(Log, Cont, N) when is_integer(N), N > 0 -> ichunk(Log, Cont, N). ichunk(Log, start, N) -> R = sreq(Log, {chunk, 0, [], N}), ichunk_end(R, Log); ichunk(Log, More, N) when is_record(More, continuation) -> R = req2(More#continuation.pid, {chunk, More#continuation.pos, More#continuation.b, N}), ichunk_end(R, Log); ichunk(_Log, _, _) -> {error, {badarg, continuation}}. ichunk_end({C, R}, Log) when is_record(C, continuation) -> ichunk_end(R, read_write, Log, C, 0); ichunk_end({C, R, Bad}, Log) when is_record(C, continuation) -> ichunk_end(R, read_only, Log, C, Bad); ichunk_end(R, _Log) -> R. %% Create the terms on the client's heap, not the server's. %% The list of binaries is reversed. ichunk_end(R, Mode, Log, C, Bad) -> case catch bins2terms(R, []) of {'EXIT', _} -> RR = lists:reverse(R), ichunk_bad_end(RR, Mode, Log, C, Bad, []); Ts when Bad > 0 -> {C, Ts, Bad}; Ts when Bad =:= 0 -> {C, Ts} end. bins2terms([], L) -> L; bins2terms([B | Bs], L) -> bins2terms(Bs, [binary_to_term(B) | L]). ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) -> case catch binary_to_term(B) of {'EXIT', _} when read_write =:= Mode -> InfoList = info(Log), {value, {file, FileName}} = lists:keysearch(file, 1, InfoList), File = case C#continuation.pos of Pos when is_integer(Pos) -> FileName; % halt log {FileNo, _} -> add_ext(FileName, FileNo) % wrap log end, {error, {corrupt_log_file, File}}; {'EXIT', _} when read_only =:= Mode -> Reread = lists:foldl(fun(Bin, Sz) -> Sz + byte_size(Bin) + ?HEADERSZ end, 0, Bs), NewPos = case C#continuation.pos of Pos when is_integer(Pos) -> Pos-Reread; {FileNo, Pos} -> {FileNo, Pos-Reread} end, NewBad = Bad + byte_size(B), {C#continuation{pos = NewPos, b = []}, lists:reverse(A), NewBad}; T -> ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A]) end. -spec bchunk(Log :: log(), Cont :: any()) -> {'error', any()} | 'eof' | {any(), [binary()]} | {any(), [binary()], integer()}. bchunk(Log, Cont) -> bchunk(Log, Cont, infinity). -spec bchunk(Log :: log(), Cont :: any(), N :: 'infinity' | pos_integer()) -> {'error', any()} | 'eof' | {any(), [binary()]} | {any(), [binary()], integer()}. bchunk(Log, Cont, infinity) -> %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. bichunk(Log, Cont, ?MAX_CHUNK_SIZE); bchunk(Log, Cont, N) when is_integer(N), N > 0 -> bichunk(Log, Cont, N). bichunk(Log, start, N) -> R = sreq(Log, {chunk, 0, [], N}), bichunk_end(R); bichunk(_Log, #continuation{pid = Pid, pos = Pos, b = B}, N) -> R = req2(Pid, {chunk, Pos, B, N}), bichunk_end(R); bichunk(_Log, _, _) -> {error, {badarg, continuation}}. bichunk_end({C = #continuation{}, R}) -> {C, lists:reverse(R)}; bichunk_end({C = #continuation{}, R, Bad}) -> {C, lists:reverse(R), Bad}; bichunk_end(R) -> R. -spec chunk_step(Log :: log(), Cont :: any(), N :: integer()) -> {'ok', any()} | {'error', term()}. chunk_step(Log, Cont, N) when is_integer(N) -> ichunk_step(Log, Cont, N). ichunk_step(Log, start, N) -> sreq(Log, {chunk_step, 0, N}); ichunk_step(_Log, More, N) when is_record(More, continuation) -> req2(More#continuation.pid, {chunk_step, More#continuation.pos, N}); ichunk_step(_Log, _, _) -> {error, {badarg, continuation}}. -spec chunk_info(More :: any()) -> [{'node', node()},...] | {'error', {'no_continuation', any()}}. chunk_info(More = #continuation{}) -> [{node, node(More#continuation.pid)}]; chunk_info(BadCont) -> {error, {no_continuation, BadCont}}. -spec accessible_logs() -> {[_], [_]}. accessible_logs() -> disk_log_server:accessible_logs(). istart_link(Server) -> {ok, proc_lib:spawn_link(disk_log, init, [self(), Server])}. %% Only for backwards compatibility, could probably be removed. -spec start() -> 'ok'. start() -> disk_log_server:start(). internal_open(Pid, A) -> req2(Pid, {internal_open, A}). %%% ll_open() and ll_close() are used by disk_log_h.erl, a module not %%% (yet) in Erlang/OTP. %% -spec ll_open(dlog_options()) -> {'ok', Res :: _, #log{}, Cnt :: _} | Error. ll_open(A) -> case check_arg(A, #arg{options = A}) of {ok, L} -> do_open(L); Error -> Error end. %% -> closed | throw(Error) ll_close(Log) -> close_disk_log2(Log). check_arg([], Res) -> Ret = case Res#arg.head of none -> {ok, Res}; _ -> case check_head(Res#arg.head, Res#arg.format) of {ok, Head} -> {ok, Res#arg{head = Head}}; Error -> Error end end, if %% check result Res#arg.name =:= 0 -> {error, {badarg, name}}; Res#arg.file =:= none -> case catch lists:concat([Res#arg.name, ".LOG"]) of {'EXIT',_} -> {error, {badarg, file}}; FName -> check_arg([], Res#arg{file = FName}) end; Res#arg.repair =:= truncate, Res#arg.mode =:= read_only -> {error, {badarg, repair_read_only}}; Res#arg.type =:= halt, is_tuple(Res#arg.size) -> {error, {badarg, size}}; Res#arg.type =:= wrap -> {OldSize, Version} = disk_log_1:read_size_file_version(Res#arg.file), check_wrap_arg(Ret, OldSize, Version); true -> Ret end; check_arg([{file, F} | Tail], Res) when is_list(F) -> check_arg(Tail, Res#arg{file = F}); check_arg([{file, F} | Tail], Res) when is_atom(F) -> check_arg(Tail, Res#arg{file = F}); check_arg([{linkto, Pid} |Tail], Res) when is_pid(Pid) -> check_arg(Tail, Res#arg{linkto = Pid}); check_arg([{linkto, none} |Tail], Res) -> check_arg(Tail, Res#arg{linkto = none}); check_arg([{name, Name}|Tail], Res) -> check_arg(Tail, Res#arg{name = Name}); check_arg([{repair, true}|Tail], Res) -> check_arg(Tail, Res#arg{repair = true}); check_arg([{repair, false}|Tail], Res) -> check_arg(Tail, Res#arg{repair = false}); check_arg([{repair, truncate}|Tail], Res) -> check_arg(Tail, Res#arg{repair = truncate}); check_arg([{size, Int}|Tail], Res) when is_integer(Int), Int > 0 -> check_arg(Tail, Res#arg{size = Int}); check_arg([{size, infinity}|Tail], Res) -> check_arg(Tail, Res#arg{size = infinity}); check_arg([{size, {MaxB,MaxF}}|Tail], Res) when is_integer(MaxB), is_integer(MaxF), MaxB > 0, MaxB =< ?MAX_BYTES, MaxF > 0, MaxF < ?MAX_FILES -> check_arg(Tail, Res#arg{size = {MaxB, MaxF}}); check_arg([{type, wrap}|Tail], Res) -> check_arg(Tail, Res#arg{type = wrap}); check_arg([{type, halt}|Tail], Res) -> check_arg(Tail, Res#arg{type = halt}); check_arg([{format, internal}|Tail], Res) -> check_arg(Tail, Res#arg{format = internal}); check_arg([{format, external}|Tail], Res) -> check_arg(Tail, Res#arg{format = external}); check_arg([{distributed, []}|Tail], Res) -> check_arg(Tail, Res#arg{distributed = false}); check_arg([{distributed, Nodes}|Tail], Res) when is_list(Nodes) -> check_arg(Tail, Res#arg{distributed = {true, Nodes}}); check_arg([{notify, true}|Tail], Res) -> check_arg(Tail, Res#arg{notify = true}); check_arg([{notify, false}|Tail], Res) -> check_arg(Tail, Res#arg{notify = false}); check_arg([{head_func, HeadFunc}|Tail], Res) -> check_arg(Tail, Res#arg{head = {head_func, HeadFunc}}); check_arg([{head, Term}|Tail], Res) -> check_arg(Tail, Res#arg{head = {head, Term}}); check_arg([{mode, read_only}|Tail], Res) -> check_arg(Tail, Res#arg{mode = read_only}); check_arg([{mode, read_write}|Tail], Res) -> check_arg(Tail, Res#arg{mode = read_write}); check_arg(Arg, _) -> {error, {badarg, Arg}}. check_wrap_arg({ok, Res}, {0,0}, _Version) when Res#arg.size =:= infinity -> {error, {badarg, size}}; check_wrap_arg({ok, Res}, OldSize, Version) when Res#arg.size =:= infinity -> NewRes = Res#arg{size = OldSize}, check_wrap_arg({ok, NewRes}, OldSize, Version); check_wrap_arg({ok, Res}, {0,0}, Version) -> {ok, Res#arg{version = Version}}; check_wrap_arg({ok, Res}, OldSize, Version) when OldSize =:= Res#arg.size -> {ok, Res#arg{version = Version}}; check_wrap_arg({ok, Res}, _OldSize, Version) when Res#arg.repair =:= truncate, is_tuple(Res#arg.size) -> {ok, Res#arg{version = Version}}; check_wrap_arg({ok, Res}, OldSize, _Version) when is_tuple(Res#arg.size) -> {error, {size_mismatch, OldSize, Res#arg.size}}; check_wrap_arg({ok, _Res}, _OldSize, _Version) -> {error, {badarg, size}}; check_wrap_arg(Ret, _OldSize, _Version) -> Ret. %%%----------------------------------------------------------------- %%% Server functions %%%----------------------------------------------------------------- init(Parent, Server) -> ?PROFILE(ep:do()), process_flag(trap_exit, true), loop(#state{parent = Parent, server = Server}). loop(State) when State#state.messages =:= [] -> receive Message -> handle(Message, State) end; loop(State) -> [M | Ms] = State#state.messages, handle(M, State#state{messages = Ms}). handle({From, write_cache}, S) when From =:= self() -> case catch do_write_cache(get(log)) of ok -> loop(S); Error -> loop(S#state{cache_error = Error}) end; handle({From, {log, B}}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok, L#log.format =:= internal -> log_loop(S, From, [B], []); L when L#log.status =:= ok, L#log.format =:= external -> reply(From, {error, {format_external, L#log.name}}, S); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {log, B}} | S#state.queue]}) end; handle({From, {blog, B}}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> log_loop(S, From, [B], []); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {blog, B}} | S#state.queue]}) end; handle({alog, B}, S) -> case get(log) of L when L#log.mode =:= read_only -> notify_owners({read_only,B}), loop(S); L when L#log.status =:= ok, L#log.format =:= internal -> log_loop(S, [], [B], []); L when L#log.status =:= ok -> notify_owners({format_external, B}), loop(S); L when L#log.status =:= {blocked, false} -> notify_owners({blocked_log, B}), loop(S); _ -> loop(S#state{queue = [{alog, B} | S#state.queue]}) end; handle({balog, B}, S) -> case get(log) of L when L#log.mode =:= read_only -> notify_owners({read_only,B}), loop(S); L when L#log.status =:= ok -> log_loop(S, [], [B], []); L when L#log.status =:= {blocked, false} -> notify_owners({blocked_log, B}), loop(S); _ -> loop(S#state{queue = [{balog, B} | S#state.queue]}) end; handle({From, {block, QueueLogRecs}}, S) -> case get(log) of L when L#log.status =:= ok -> do_block(From, QueueLogRecs, L), reply(From, ok, S); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {block, QueueLogRecs}} | S#state.queue]}) end; handle({From, unblock}, S) -> case get(log) of L when L#log.status =:= ok -> reply(From, {error, {not_blocked, L#log.name}}, S); L when L#log.blocked_by =:= From -> S2 = do_unblock(L, S), reply(From, ok, S2); L -> reply(From, {error, {not_blocked_by_pid, L#log.name}}, S) end; handle({From, sync}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> sync_loop([From], S); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, sync} | S#state.queue]}) end; handle({From, {truncate, Head, F, A}}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); L when L#log.status =:= ok -> H = merge_head(Head, L#log.head), case catch do_trunc(L, H) of ok -> erase(is_full), notify_owners({truncated, S#state.cnt}), N = if Head =:= none -> 0; true -> 1 end, reply(From, ok, (state_ok(S))#state{cnt = N}); Error -> do_exit(S, From, Error, ?failure(Error, F, A)) end; L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {truncate, Head, F, A}} | S#state.queue]}) end; handle({From, {chunk, Pos, B, N}}, S) -> case get(log) of L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); L when L#log.status =:= ok -> R = do_chunk(L, Pos, B, N), reply(From, R, S); L when L#log.blocked_by =:= From -> R = do_chunk(L, Pos, B, N), reply(From, R, S); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); _L -> loop(S#state{queue = [{From, {chunk, Pos, B, N}} | S#state.queue]}) end; handle({From, {chunk_step, Pos, N}}, S) -> case get(log) of L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); L when L#log.status =:= ok -> R = do_chunk_step(L, Pos, N), reply(From, R, S); L when L#log.blocked_by =:= From -> R = do_chunk_step(L, Pos, N), reply(From, R, S); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {chunk_step, Pos, N}} | S#state.queue]}) end; handle({From, {change_notify, Pid, NewNotify}}, S) -> case get(log) of L when L#log.status =:= ok -> case do_change_notify(L, Pid, NewNotify) of {ok, L1} -> put(log, L1), reply(From, ok, S); Error -> reply(From, Error, S) end; L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {change_notify, Pid, NewNotify}} | S#state.queue]}) end; handle({From, {change_header, NewHead}}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> case check_head(NewHead, L#log.format) of {ok, Head} -> put(log, L#log{head = mk_head(Head, L#log.format)}), reply(From, ok, S); Error -> reply(From, Error, S) end; L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {change_header, NewHead}} | S#state.queue]}) end; handle({From, {change_size, NewSize}}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> case check_size(L#log.type, NewSize) of ok -> case catch do_change_size(L, NewSize) of % does the put ok -> reply(From, ok, S); {big, CurSize} -> reply(From, {error, {new_size_too_small, L#log.name, CurSize}}, S); Else -> reply(From, Else, state_err(S, Else)) end; not_ok -> reply(From, {error, {badarg, size}}, S) end; L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, {change_size, NewSize}} | S#state.queue]}) end; handle({From, inc_wrap_file}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.type =:= halt -> reply(From, {error, {halt_log, L#log.name}}, S); L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); L when L#log.status =:= ok -> case catch do_inc_wrap_file(L) of {ok, L2, Lost} -> put(log, L2), notify_owners({wrap, Lost}), reply(From, ok, S#state{cnt = S#state.cnt-Lost}); {error, Error, L2} -> put(log, L2), reply(From, Error, state_err(S, Error)) end; L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> loop(S#state{queue = [{From, inc_wrap_file} | S#state.queue]}) end; handle({From, {reopen, NewFile, Head, F, A}}, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); L when L#log.status =:= ok, L#log.filename =/= NewFile -> case catch close_disk_log2(L) of closed -> File = L#log.filename, case catch rename_file(File, NewFile, L#log.type) of ok -> H = merge_head(Head, L#log.head), case do_open((S#state.args)#arg{name = L#log.name, repair = truncate, head = H, file = File}) of {ok, Res, L2, Cnt} -> put(log, L2#log{owners = L#log.owners, head = L#log.head, users = L#log.users}), notify_owners({truncated, S#state.cnt}), erase(is_full), case Res of {error, _} -> do_exit(S, From, Res, ?failure(Res, F, A)); _ -> reply(From, ok, S#state{cnt = Cnt}) end; Res -> do_exit(S, From, Res, ?failure(Res, F, A)) end; Error -> do_exit(S, From, Error, ?failure(Error, reopen, 2)) end; Error -> do_exit(S, From, Error, ?failure(Error, F, A)) end; L when L#log.status =:= ok -> reply(From, {error, {same_file_name, L#log.name}}, S); L -> reply(From, {error, {blocked_log, L#log.name}}, S) end; handle({Server, {internal_open, A}}, S) -> case get(log) of undefined -> case do_open(A) of % does the put {ok, Res, L, Cnt} -> put(log, opening_pid(A#arg.linkto, A#arg.notify, L)), reply(Server, Res, S#state{args=A, cnt=Cnt}); Res -> do_fast_exit(S, Server, Res) end; L -> TestH = mk_head(A#arg.head, A#arg.format), case compare_arg(A#arg.options, S#state.args, TestH, L#log.head) of ok -> case add_pid(A#arg.linkto, A#arg.notify, L) of {ok, L1} -> put(log, L1), reply(Server, {ok, L#log.name}, S); Error -> reply(Server, Error, S) end; Error -> reply(Server, Error, S) end end; handle({From, close}, S) -> case do_close(From, S) of {stop, S1} -> do_exit(S1, From, ok, normal); {continue, S1} -> reply(From, ok, S1) end; handle({From, info}, S) -> reply(From, do_info(get(log), S#state.cnt), S); handle({'EXIT', From, Reason}, S) when From =:= S#state.parent -> %% Parent orders shutdown. _ = do_stop(S), exit(Reason); handle({'EXIT', From, Reason}, S) when From =:= S#state.server -> %% The server is gone. _ = do_stop(S), exit(Reason); handle({'EXIT', From, _Reason}, S) -> L = get(log), case is_owner(From, L) of {true, _Notify} -> case close_owner(From, L, S) of {stop, S1} -> _ = do_stop(S1), exit(normal); {continue, S1} -> loop(S1) end; false -> %% 'users' is not decremented. S1 = do_unblock(From, get(log), S), loop(S1) end; handle({system, From, Req}, S) -> sys:handle_system_msg(Req, From, S#state.parent, ?MODULE, [], S); handle(_, S) -> loop(S). sync_loop(From, S) -> log_loop(S, [], [], From). %% Inlined. log_loop(S, Pids, _Bins, _Sync) when S#state.cache_error =/= ok -> loop(cache_error(S, Pids)); log_loop(S, Pids, Bins, Sync) when S#state.messages =:= [] -> receive Message -> log_loop(Message, Pids, Bins, Sync, S, get(log)) after 0 -> loop(log_end(S, Pids, Bins, Sync)) end; log_loop(S, Pids, Bins, Sync) -> [M | Ms] = S#state.messages, S1 = S#state{messages = Ms}, log_loop(M, Pids, Bins, Sync, S1, get(log)). %% Items logged after the last sync request found are sync:ed as well. log_loop({alog,B}, Pids, Bins, Sync, S, L) when L#log.format =:= internal -> %% {alog, _} allowed for the internal format only. log_loop(S, Pids, [B | Bins], Sync); log_loop({balog, B}, Pids, Bins, Sync, S, _L) -> log_loop(S, Pids, [B | Bins], Sync); log_loop({From, {log, B}}, Pids, Bins, Sync, S, L) when L#log.format =:= internal -> %% {log, _} allowed for the internal format only. log_loop(S, [From | Pids], [B | Bins], Sync); log_loop({From, {blog, B}}, Pids, Bins, Sync, S, _L) -> log_loop(S, [From | Pids], [B | Bins], Sync); log_loop({From, sync}, Pids, Bins, Sync, S, _L) -> log_loop(S, Pids, Bins, [From | Sync]); log_loop(Message, Pids, Bins, Sync, S, _L) -> NS = log_end(S, Pids, Bins, Sync), handle(Message, NS). log_end(S, [], [], Sync) -> log_end_sync(S, Sync); log_end(S, Pids, Bins, Sync) -> case do_log(get(log), rflat(Bins)) of N when is_integer(N) -> replies(Pids, ok), S1 = (state_ok(S))#state{cnt = S#state.cnt+N}, log_end_sync(S1, Sync); {error, {error, {full, _Name}}, N} when Pids =:= [] -> log_end_sync(state_ok(S#state{cnt = S#state.cnt + N}), Sync); {error, Error, N} -> replies(Pids, Error), state_err(S#state{cnt = S#state.cnt + N}, Error) end. %% Inlined. log_end_sync(S, []) -> S; log_end_sync(S, Sync) -> Res = do_sync(get(log)), replies(Sync, Res), state_err(S, Res). %% Inlined. rflat([B]=L) when is_binary(B) -> L; rflat([B]) -> B; rflat(B) -> rflat(B, []). rflat([B | Bs], L) when is_binary(B) -> rflat(Bs, [B | L]); rflat([B | Bs], L) -> rflat(Bs, B ++ L); rflat([], L) -> L. %% -> {ok, Log} | {error, Error} do_change_notify(L, Pid, Notify) -> case is_owner(Pid, L) of {true, Notify} -> {ok, L}; {true, _OldNotify} when Notify =/= true, Notify =/= false -> {error, {badarg, notify}}; {true, _OldNotify} -> Owners = lists:keydelete(Pid, 1, L#log.owners), L1 = L#log{owners = [{Pid, Notify} | Owners]}, {ok, L1}; false -> {error, {not_owner, Pid}} end. %% -> {stop, S} | {continue, S} do_close(Pid, S) -> L = get(log), case is_owner(Pid, L) of {true, _Notify} -> close_owner(Pid, L, S); false -> close_user(Pid, L, S) end. %% -> {stop, S} | {continue, S} close_owner(Pid, L, S) -> L1 = L#log{owners = lists:keydelete(Pid, 1, L#log.owners)}, put(log, L1), S2 = do_unblock(Pid, get(log), S), unlink(Pid), do_close2(L1, S2). %% -> {stop, S} | {continue, S} close_user(Pid, L, S) when L#log.users > 0 -> L1 = L#log{users = L#log.users - 1}, put(log, L1), S2 = do_unblock(Pid, get(log), S), do_close2(L1, S2); close_user(_Pid, _L, S) -> {continue, S}. do_close2(L, S) when L#log.users =:= 0, L#log.owners =:= [] -> {stop, S}; do_close2(_L, S) -> {continue, S}. %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- system_continue(_Parent, _, State) -> loop(State). -spec system_terminate(_, _, _, #state{}) -> no_return(). system_terminate(Reason, _Parent, _, State) -> _ = do_stop(State), exit(Reason). %%----------------------------------------------------------------- %% Temporay code for upgrade. %%----------------------------------------------------------------- system_code_change(State, _Module, _OldVsn, _Extra) -> {ok, State}. %%%---------------------------------------------------------------------- %%% Internal functions %%%---------------------------------------------------------------------- -spec do_exit(#state{}, pid(), _, _) -> no_return(). do_exit(S, From, Message0, Reason) -> R = do_stop(S), Message = case S#state.cache_error of Err when Err =/= ok -> Err; _ when R =:= closed -> Message0; _ when Message0 =:= ok -> R; _ -> Message0 end, _ = disk_log_server:close(self()), replies(From, Message), ?PROFILE(ep:done()), exit(Reason). -spec do_fast_exit(#state{}, pid(), _) -> no_return(). do_fast_exit(S, Server, Message) -> _ = do_stop(S), Server ! {disk_log, self(), Message}, exit(normal). %% -> closed | Error do_stop(S) -> proc_q(S#state.queue ++ S#state.messages), close_disk_log(get(log)). proc_q([{From, _R}|Tail]) when is_pid(From) -> From ! {disk_log, self(), {error, disk_log_stopped}}, proc_q(Tail); proc_q([_|T]) -> %% async stuff proc_q(T); proc_q([]) -> ok. %% -> log() opening_pid(Pid, Notify, L) -> {ok, L1} = add_pid(Pid, Notify, L), L1. %% -> {ok, log()} | Error add_pid(Pid, Notify, L) when is_pid(Pid) -> case is_owner(Pid, L) of false -> link(Pid), {ok, L#log{owners = [{Pid, Notify} | L#log.owners]}}; {true, Notify} -> %% {error, {pid_already_connected, L#log.name}}; {ok, L}; {true, CurNotify} when Notify =/= CurNotify -> {error, {arg_mismatch, notify, CurNotify, Notify}} end; add_pid(_NotAPid, _Notify, L) -> {ok, L#log{users = L#log.users + 1}}. unblock_pid(L) when L#log.blocked_by =:= none -> ok; unblock_pid(L) -> case is_owner(L#log.blocked_by, L) of {true, _Notify} -> ok; false -> unlink(L#log.blocked_by) end. %% -> true | false is_owner(Pid, L) -> case lists:keysearch(Pid, 1, L#log.owners) of {value, {_Pid, Notify}} -> {true, Notify}; false -> false end. %% ok | throw(Error) rename_file(File, NewFile, halt) -> file:rename(File, NewFile); rename_file(File, NewFile, wrap) -> rename_file(wrap_file_extensions(File), File, NewFile, ok). rename_file([Ext|Exts], File, NewFile, Res) -> NRes = case file:rename(add_ext(File, Ext), add_ext(NewFile, Ext)) of ok -> Res; Else -> Else end, rename_file(Exts, File, NewFile, NRes); rename_file([], _File, _NewFiles, Res) -> Res. %% "Old" error messages have been kept, arg_mismatch has been added. %%-spec compare_arg(dlog_options(), #arg{}, compare_arg([], _A, none, _OrigHead) -> % no header option given ok; compare_arg([], _A, Head, OrigHead) when Head =/= OrigHead -> {error, {arg_mismatch, head, OrigHead, Head}}; compare_arg([], _A, _Head, _OrigHead) -> ok; compare_arg([{Attr, Val} | Tail], A, Head, OrigHead) -> case compare_arg(Attr, Val, A) of {not_ok, OrigVal} -> {error, {arg_mismatch, Attr, OrigVal, Val}}; ok -> compare_arg(Tail, A, Head, OrigHead); Error -> Error end. -spec compare_arg(atom(), _, #arg{}) -> 'ok' | {'not_ok', _} | {'error', {atom(), _}}. compare_arg(file, F, A) when F =/= A#arg.file -> {error, {name_already_open, A#arg.name}}; compare_arg(mode, read_only, A) when A#arg.mode =:= read_write -> {error, {open_read_write, A#arg.name}}; compare_arg(mode, read_write, A) when A#arg.mode =:= read_only -> {error, {open_read_only, A#arg.name}}; compare_arg(type, T, A) when T =/= A#arg.type -> {not_ok, A#arg.type}; compare_arg(format, F, A) when F =/= A#arg.format -> {not_ok, A#arg.format}; compare_arg(repair, R, A) when R =/= A#arg.repair -> %% not used, but check it anyway... {not_ok, A#arg.repair}; compare_arg(_Attr, _Val, _A) -> ok. %% -> {ok, Res, log(), Cnt} | Error do_open(A) -> L = #log{name = A#arg.name, filename = A#arg.file, size = A#arg.size, head = mk_head(A#arg.head, A#arg.format), mode = A#arg.mode, version = A#arg.version}, do_open2(L, A). mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)}; mk_head({head, Bytes}, external) -> {ok, check_bytes(Bytes)}; mk_head(H, _) -> H. terms2bins([T | Ts]) -> [term_to_binary(T) | terms2bins(Ts)]; terms2bins([]) -> []. check_bytes_list([B | Bs], Bs0) when is_binary(B) -> check_bytes_list(Bs, Bs0); check_bytes_list([], Bs0) -> Bs0; check_bytes_list(_, Bs0) -> check_bytes_list(Bs0). check_bytes_list([B | Bs]) when is_binary(B) -> [B | check_bytes_list(Bs)]; check_bytes_list([B | Bs]) -> [list_to_binary(B) | check_bytes_list(Bs)]; check_bytes_list([]) -> []. check_bytes(Binary) when is_binary(Binary) -> Binary; check_bytes(Bytes) -> list_to_binary(Bytes). %%----------------------------------------------------------------- %% Change size of the logs in runtime. %%----------------------------------------------------------------- %% -> ok | {big, CurSize} | throw(Error) do_change_size(L, NewSize) when L#log.type =:= halt -> Halt = L#log.extra, CurB = Halt#halt.curB, NewLog = L#log{extra = Halt#halt{size = NewSize}}, if NewSize =:= infinity -> erase(is_full), put(log, NewLog), ok; CurB =< NewSize -> erase(is_full), put(log, NewLog), ok; true -> {big, CurB} end; do_change_size(L, NewSize) when L#log.type =:= wrap -> #log{extra = Extra, version = Version} = L, {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version), erase(is_full), put(log, L#log{extra = Handle}), ok. %% -> {ok, Head} | Error; Head = none | {head, H} | {M,F,A} check_head({head, none}, _Format) -> {ok, none}; check_head({head_func, {M, F, A}}, _Format) when is_atom(M), is_atom(F), is_list(A) -> {ok, {M, F, A}}; check_head({head, Head}, external) -> case catch check_bytes(Head) of {'EXIT', _} -> {error, {badarg, head}}; _ -> {ok, {head, Head}} end; check_head({head, Term}, internal) -> {ok, {head, Term}}; check_head(_Head, _Format) -> {error, {badarg, head}}. check_size(wrap, {NewMaxB,NewMaxF}) when is_integer(NewMaxB), is_integer(NewMaxF), NewMaxB > 0, NewMaxB =< ?MAX_BYTES, NewMaxF > 0, NewMaxF < ?MAX_FILES -> ok; check_size(halt, NewSize) when is_integer(NewSize), NewSize > 0 -> ok; check_size(halt, infinity) -> ok; check_size(_, _) -> not_ok. %%----------------------------------------------------------------- %% Increment a wrap log. %%----------------------------------------------------------------- %% -> {ok, log(), Lost} | {error, Error, log()} do_inc_wrap_file(L) -> #log{format = Format, extra = Handle} = L, case Format of internal -> case disk_log_1:mf_int_inc(Handle, L#log.head) of {ok, Handle2, Lost} -> {ok, L#log{extra = Handle2}, Lost}; {error, Error, Handle2} -> {error, Error, L#log{extra = Handle2}} end; external -> case disk_log_1:mf_ext_inc(Handle, L#log.head) of {ok, Handle2, Lost} -> {ok, L#log{extra = Handle2}, Lost}; {error, Error, Handle2} -> {error, Error, L#log{extra = Handle2}} end end. %%----------------------------------------------------------------- %% Open a log file. %%----------------------------------------------------------------- %% -> {ok, Reply, log(), Cnt} | Error %% Note: the header is always written, even if the log size is too small. do_open2(L, #arg{type = halt, format = internal, name = Name, file = FName, repair = Repair, size = Size, mode = Mode}) -> case catch disk_log_1:int_open(FName, Repair, Mode, L#log.head) of {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, {ok, {ok, Name}, L#log{format_type = halt_int, extra = Halt}, NoItems}; {repaired, FdC, Rec, Bad, FileSize} -> Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, L#log{format_type = halt_int, extra = Halt}, Rec}; Error -> Error end; do_open2(L, #arg{type = wrap, format = internal, size = {MaxB, MaxF}, name = Name, repair = Repair, file = FName, mode = Mode, version = V}) -> case catch disk_log_1:mf_int_open(FName, MaxB, MaxF, Repair, Mode, L#log.head, V) of {ok, Handle, Cnt} -> {ok, {ok, Name}, L#log{type = wrap, format_type = wrap_int, extra = Handle}, Cnt}; {repaired, Handle, Rec, Bad, Cnt} -> {ok, {repaired, Name, {recovered, Rec}, {badbytes, Bad}}, L#log{type = wrap, format_type = wrap_int, extra = Handle}, Cnt}; Error -> Error end; do_open2(L, #arg{type = halt, format = external, file = FName, name = Name, size = Size, repair = Repair, mode = Mode}) -> case catch disk_log_1:ext_open(FName, Repair, Mode, L#log.head) of {ok, {_Alloc, FdC, {NoItems, _NoBytes}, FileSize}} -> Halt = #halt{fdc = FdC, curB = FileSize, size = Size}, {ok, {ok, Name}, L#log{format_type = halt_ext, format = external, extra = Halt}, NoItems}; Error -> Error end; do_open2(L, #arg{type = wrap, format = external, size = {MaxB, MaxF}, name = Name, file = FName, repair = Repair, mode = Mode, version = V}) -> case catch disk_log_1:mf_ext_open(FName, MaxB, MaxF, Repair, Mode, L#log.head, V) of {ok, Handle, Cnt} -> {ok, {ok, Name}, L#log{type = wrap, format_type = wrap_ext, extra = Handle, format = external}, Cnt}; Error -> Error end. %% -> closed | Error close_disk_log(undefined) -> closed; close_disk_log(L) -> unblock_pid(L), F = fun({Pid, _}) -> unlink(Pid) end, lists:foreach(F, L#log.owners), R = (catch close_disk_log2(L)), erase(log), R. -spec close_disk_log2(#log{}) -> 'closed'. % | throw(Error) close_disk_log2(L) -> case L of #log{format_type = halt_int, mode = Mode, extra = Halt} -> disk_log_1:close(Halt#halt.fdc, L#log.filename, Mode); #log{format_type = wrap_int, mode = Mode, extra = Handle} -> disk_log_1:mf_int_close(Handle, Mode); #log{format_type = halt_ext, extra = Halt} -> disk_log_1:fclose(Halt#halt.fdc, L#log.filename); #log{format_type = wrap_ext, mode = Mode, extra = Handle} -> disk_log_1:mf_ext_close(Handle, Mode) end, closed. do_format_error({error, Module, Error}) -> Module:format_error(Error); do_format_error({error, Reason}) -> do_format_error(Reason); do_format_error({Node, Error = {error, _Reason}}) -> lists:append(io_lib:format("~p: ", [Node]), do_format_error(Error)); do_format_error({badarg, Arg}) -> io_lib:format("The argument ~p is missing, not recognized or " "not wellformed~n", [Arg]); do_format_error({size_mismatch, OldSize, ArgSize}) -> io_lib:format("The given size ~p does not match the size ~p found on " "the disk log size file~n", [ArgSize, OldSize]); do_format_error({read_only_mode, Log}) -> io_lib:format("The disk log ~p has been opened read-only, but the " "requested operation needs read-write access~n", [Log]); do_format_error({format_external, Log}) -> io_lib:format("The requested operation can only be applied on internally " "formatted disk logs, but ~p is externally formatted~n", [Log]); do_format_error({blocked_log, Log}) -> io_lib:format("The blocked disk log ~p does not queue requests, or " "the log has been blocked by the calling process~n", [Log]); do_format_error({full, Log}) -> io_lib:format("The halt log ~p is full~n", [Log]); do_format_error({not_blocked, Log}) -> io_lib:format("The disk log ~p is not blocked~n", [Log]); do_format_error({not_owner, Pid}) -> io_lib:format("The pid ~p is not an owner of the disk log~n", [Pid]); do_format_error({not_blocked_by_pid, Log}) -> io_lib:format("The disk log ~p is blocked, but only the blocking pid " "can unblock a disk log~n", [Log]); do_format_error({new_size_too_small, Log, CurrentSize}) -> io_lib:format("The current size ~p of the halt log ~p is greater than the " "requested new size~n", [CurrentSize, Log]); do_format_error({halt_log, Log}) -> io_lib:format("The halt log ~p cannot be wrapped~n", [Log]); do_format_error({same_file_name, Log}) -> io_lib:format("Current and new file name of the disk log ~p " "are the same~n", [Log]); do_format_error({arg_mismatch, Option, FirstValue, ArgValue}) -> io_lib:format("The value ~p of the disk log option ~p does not match " "the current value ~p~n", [ArgValue, Option, FirstValue]); do_format_error({name_already_open, Log}) -> io_lib:format("The disk log ~p has already opened another file~n", [Log]); do_format_error({node_already_open, Log}) -> io_lib:format("The distribution option of the disk log ~p does not match " "already open log~n", [Log]); do_format_error({open_read_write, Log}) -> io_lib:format("The disk log ~p has already been opened read-write~n", [Log]); do_format_error({open_read_only, Log}) -> io_lib:format("The disk log ~p has already been opened read-only~n", [Log]); do_format_error({not_internal_wrap, Log}) -> io_lib:format("The requested operation cannot be applied since ~p is not " "an internally formatted disk log~n", [Log]); do_format_error(no_such_log) -> io_lib:format("There is no disk log with the given name~n", []); do_format_error(nonode) -> io_lib:format("There seems to be no node up that can handle " "the request~n", []); do_format_error(nodedown) -> io_lib:format("There seems to be no node up that can handle " "the request~n", []); do_format_error({corrupt_log_file, FileName}) -> io_lib:format("The disk log file \"~s\" contains corrupt data~n", [FileName]); do_format_error({need_repair, FileName}) -> io_lib:format("The disk log file \"~s\" has not been closed properly and " "needs repair~n", [FileName]); do_format_error({not_a_log_file, FileName}) -> io_lib:format("The file \"~s\" is not a wrap log file~n", [FileName]); do_format_error({invalid_header, InvalidHeader}) -> io_lib:format("The disk log header is not wellformed: ~p~n", [InvalidHeader]); do_format_error(end_of_log) -> io_lib:format("An attempt was made to step outside a not yet " "full wrap log~n", []); do_format_error({invalid_index_file, FileName}) -> io_lib:format("The wrap log index file \"~s\" cannot be used~n", [FileName]); do_format_error({no_continuation, BadCont}) -> io_lib:format("The term ~p is not a chunk continuation~n", [BadCont]); do_format_error({file_error, FileName, Reason}) -> io_lib:format("\"~s\": ~p~n", [FileName, file:format_error(Reason)]); do_format_error(E) -> io_lib:format("~p~n", [E]). do_info(L, Cnt) -> #log{name = Name, type = Type, mode = Mode, filename = File, extra = Extra, status = Status, owners = Owners, users = Users, format = Format, head = Head} = L, Size = case Type of wrap -> disk_log_1:get_wrap_size(Extra); halt -> Extra#halt.size end, Distribution = case disk_log_server:get_log_pids(Name) of {local, _Pid} -> local; {distributed, Pids} -> [node(P) || P <- Pids]; undefined -> % "cannot happen" [] end, RW = case Type of wrap when Mode =:= read_write -> #handle{curB = CurB, curF = CurF, cur_cnt = CurCnt, acc_cnt = AccCnt, noFull = NoFull, accFull = AccFull} = Extra, NewAccFull = AccFull + NoFull, NewExtra = Extra#handle{noFull = 0, accFull = NewAccFull}, put(log, L#log{extra = NewExtra}), [{no_current_bytes, CurB}, {no_current_items, CurCnt}, {no_items, Cnt}, {no_written_items, CurCnt + AccCnt}, {current_file, CurF}, {no_overflows, {NewAccFull, NoFull}} ]; halt when Mode =:= read_write -> IsFull = case get(is_full) of undefined -> false; _ -> true end, [{full, IsFull}, {no_written_items, Cnt} ]; _ when Mode =:= read_only -> [] end, HeadL = case Mode of read_write -> [{head, Head}]; read_only -> [] end, Common = [{name, Name}, {file, File}, {type, Type}, {format, Format}, {size, Size}, {items, Cnt}, % kept for "backward compatibility" (undocumented) {owners, Owners}, {users, Users}] ++ HeadL ++ [{mode, Mode}, {status, Status}, {node, node()}, {distributed, Distribution} ], Common ++ RW. do_block(Pid, QueueLogRecs, L) -> L2 = L#log{status = {blocked, QueueLogRecs}, blocked_by = Pid}, put(log, L2), case is_owner(Pid, L2) of {true, _Notify} -> ok; false -> link(Pid) end. do_unblock(Pid, L, S) when L#log.blocked_by =:= Pid -> do_unblock(L, S); do_unblock(_Pid, _L, S) -> S. do_unblock(L, S) -> unblock_pid(L), L2 = L#log{blocked_by = none, status = ok}, put(log, L2), %% Since the block request is synchronous, and the blocking %% process is the only process that can unblock, all requests in %% 'messages' will have been put in 'queue' before the unblock %% request is granted. [] = S#state.messages, % assertion S#state{queue = [], messages = lists:reverse(S#state.queue)}. -spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}. do_log(L, B) when L#log.type =:= halt -> #log{format = Format, extra = Halt} = L, #halt{curB = CurSize, size = Sz} = Halt, {Bs, BSize} = bsize(B, Format), case get(is_full) of true -> {error, {error, {full, L#log.name}}, 0}; undefined when Sz =:= infinity; CurSize + BSize =< Sz -> halt_write(Halt, L, B, Bs, BSize); undefined -> halt_write_full(L, B, Format, 0) end; do_log(L, B) when L#log.format_type =:= wrap_int -> case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), put(log, L#log{extra = Handle}), Logged - Lost; {ok, Handle, Logged} -> put(log, L#log{extra = Handle}), Logged; {error, Error, Handle, Logged, Lost} -> put(log, L#log{extra = Handle}), {error, Error, Logged - Lost} end; do_log(L, B) when L#log.format_type =:= wrap_ext -> case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), put(log, L#log{extra = Handle}), Logged - Lost; {ok, Handle, Logged} -> put(log, L#log{extra = Handle}), Logged; {error, Error, Handle, Logged, Lost} -> put(log, L#log{extra = Handle}), {error, Error, Logged - Lost} end. bsize(B, external) -> {B, xsz(B, 0)}; bsize(B, internal) -> disk_log_1:logl(B). xsz([B|T], Sz) -> xsz(T, byte_size(B) + Sz); xsz([], Sz) -> Sz. halt_write_full(L, [Bin | Bins], Format, N) -> B = [Bin], {Bs, BSize} = bsize(B, Format), Halt = L#log.extra, #halt{curB = CurSize, size = Sz} = Halt, if CurSize + BSize =< Sz -> case halt_write(Halt, L, B, Bs, BSize) of N1 when is_integer(N1) -> halt_write_full(get(log), Bins, Format, N+N1); Error -> Error end; true -> halt_write_full(L, [], Format, N) end; halt_write_full(L, _Bs, _Format, N) -> put(is_full, true), notify_owners(full), {error, {error, {full, L#log.name}}, N}. halt_write(Halt, L, B, Bs, BSize) -> case disk_log_1:fwrite(Halt#halt.fdc, L#log.filename, Bs, BSize) of {ok, NewFdC} -> NCurB = Halt#halt.curB + BSize, NewHalt = Halt#halt{fdc = NewFdC, curB = NCurB}, put(log, L#log{extra = NewHalt}), length(B); {Error, NewFdC} -> put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), {error, Error, 0} end. %% -> ok | Error do_write_cache(#log{filename = FName, type = halt, extra = Halt} = Log) -> {Reply, NewFdC} = disk_log_1:write_cache(Halt#halt.fdc, FName), put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), Reply; do_write_cache(#log{type = wrap, extra = Handle} = Log) -> {Reply, NewHandle} = disk_log_1:mf_write_cache(Handle), put(log, Log#log{extra = NewHandle}), Reply. %% -> ok | Error do_sync(#log{filename = FName, type = halt, extra = Halt} = Log) -> {Reply, NewFdC} = disk_log_1:sync(Halt#halt.fdc, FName), put(log, Log#log{extra = Halt#halt{fdc = NewFdC}}), Reply; do_sync(#log{type = wrap, extra = Handle} = Log) -> {Reply, NewHandle} = disk_log_1:mf_sync(Handle), put(log, Log#log{extra = NewHandle}), Reply. %% -> ok | Error | throw(Error) do_trunc(L, Head) when L#log.type =:= halt -> #log{filename = FName, extra = Halt} = L, FdC = Halt#halt.fdc, {Reply1, FdC2} = case L#log.format of internal -> disk_log_1:truncate(FdC, FName, Head); external -> case disk_log_1:truncate_at(FdC, FName, bof) of {ok, NFdC} when Head =:= none -> {ok, NFdC}; {ok, NFdC} -> {ok, H} = Head, disk_log_1:fwrite(NFdC, FName, H, byte_size(H)); R -> R end end, {Reply, NewHalt} = case disk_log_1:position(FdC2, FName, cur) of {ok, NewFdC, FileSize} when Reply1 =:= ok -> {ok, Halt#halt{fdc = NewFdC, curB = FileSize}}; {Reply2, NewFdC} -> {Reply2, Halt#halt{fdc = NewFdC}}; {ok, NewFdC, _} -> {Reply1, Halt#halt{fdc = NewFdC}} end, put(log, L#log{extra = NewHalt}), Reply; do_trunc(L, Head) when L#log.type =:= wrap -> Handle = L#log.extra, OldHead = L#log.head, {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle), ok = do_change_size(L, {MaxB, 1}), NewLog = trunc_wrap((get(log))#log{head = Head}), %% Just to remove all files with suffix > 1: NewLog2 = trunc_wrap(NewLog), NewHandle = (NewLog2#log.extra)#handle{noFull = 0, accFull = 0}, do_change_size(NewLog2#log{extra = NewHandle, head = OldHead}, {MaxB, MaxF}). trunc_wrap(L) -> case do_inc_wrap_file(L) of {ok, L2, _Lost} -> L2; {error, Error, _L2} -> throw(Error) end. do_chunk(#log{format_type = halt_int, extra = Halt} = L, Pos, B, N) -> FdC = Halt#halt.fdc, {NewFdC, Reply} = case L#log.mode of read_only -> disk_log_1:chunk_read_only(FdC, L#log.filename, Pos, B, N); read_write -> disk_log_1:chunk(FdC, L#log.filename, Pos, B, N) end, put(log, L#log{extra = Halt#halt{fdc = NewFdC}}), Reply; do_chunk(#log{format_type = wrap_int, mode = read_only, extra = Handle} = Log, Pos, B, N) -> {NewHandle, Reply} = disk_log_1:mf_int_chunk_read_only(Handle, Pos, B, N), put(log, Log#log{extra = NewHandle}), Reply; do_chunk(#log{format_type = wrap_int, extra = Handle} = Log, Pos, B, N) -> {NewHandle, Reply} = disk_log_1:mf_int_chunk(Handle, Pos, B, N), put(log, Log#log{extra = NewHandle}), Reply; do_chunk(Log, _Pos, _B, _) -> {error, {format_external, Log#log.name}}. do_chunk_step(#log{format_type = wrap_int, extra = Handle}, Pos, N) -> disk_log_1:mf_int_chunk_step(Handle, Pos, N); do_chunk_step(Log, _Pos, _N) -> {error, {not_internal_wrap, Log#log.name}}. %% Inlined. replies(Pids, Reply) -> M = {disk_log, self(), Reply}, send_reply(Pids, M). send_reply(Pid, M) when is_pid(Pid) -> Pid ! M; send_reply([Pid | Pids], M) -> Pid ! M, send_reply(Pids, M); send_reply([], _M) -> ok. reply(To, Reply, S) -> To ! {disk_log, self(), Reply}, loop(S). req(Log, R) -> case disk_log_server:get_log_pids(Log) of {local, Pid} -> monitor_request(Pid, R); undefined -> {error, no_such_log}; {distributed, Pids} -> multi_req({self(), R}, Pids) end. multi_req(Msg, Pids) -> Refs = lists:map(fun(Pid) -> Ref = erlang:monitor(process, Pid), Pid ! Msg, {Pid, Ref} end, Pids), lists:foldl(fun({Pid, Ref}, Reply) -> receive {'DOWN', Ref, process, Pid, _Info} -> Reply; {disk_log, Pid, _Reply} -> erlang:demonitor(Ref), receive {'DOWN', Ref, process, Pid, _Reason} -> ok after 0 -> ok end end end, {error, nonode}, Refs). sreq(Log, R) -> case nearby_pid(Log, node()) of undefined -> {error, no_such_log}; Pid -> monitor_request(Pid, R) end. %% Local req - always talk to log on Node lreq(Log, R, Node) -> case nearby_pid(Log, Node) of Pid when is_pid(Pid), node(Pid) =:= Node -> monitor_request(Pid, R); _Else -> {error, no_such_log} end. nearby_pid(Log, Node) -> case disk_log_server:get_log_pids(Log) of undefined -> undefined; {local, Pid} -> Pid; {distributed, Pids} -> get_near_pid(Pids, Node) end. -spec get_near_pid([pid(),...], node()) -> pid(). get_near_pid([Pid | _], Node) when node(Pid) =:= Node -> Pid; get_near_pid([Pid], _ ) -> Pid; get_near_pid([_ | T], Node) -> get_near_pid(T, Node). monitor_request(Pid, Req) -> Ref = erlang:monitor(process, Pid), Pid ! {self(), Req}, receive {'DOWN', Ref, process, Pid, _Info} -> {error, no_such_log}; {disk_log, Pid, Reply} -> erlang:demonitor(Ref), receive {'DOWN', Ref, process, Pid, _Reason} -> Reply after 0 -> Reply end end. req2(Pid, R) -> monitor_request(Pid, R). merge_head(none, Head) -> Head; merge_head(Head, _) -> Head. %% -> List of extensions of existing files (no dot included) | throw(FileError) wrap_file_extensions(File) -> {_CurF, _CurFSz, _TotSz, NoOfFiles} = disk_log_1:read_index_file(File), Fs = if NoOfFiles >= 1 -> lists:seq(1, NoOfFiles); NoOfFiles =:= 0 -> [] end, Fun = fun(Ext) -> case file:read_file_info(add_ext(File, Ext)) of {ok, _} -> true; _Else -> false end end, lists:filter(Fun, ["idx", "siz" | Fs]). add_ext(File, Ext) -> lists:concat([File, ".", Ext]). notify(Log, R) -> case disk_log_server:get_log_pids(Log) of undefined -> {error, no_such_log}; {local, Pid} -> Pid ! R, ok; {distributed, Pids} -> lists:foreach(fun(Pid) -> Pid ! R end, Pids), ok end. notify_owners_wrap([]) -> ok; notify_owners_wrap([N | Wraps]) -> notify_owners({wrap, N}), notify_owners_wrap(Wraps). notify_owners(Note) -> L = get(log), Msg = {disk_log, node(), L#log.name, Note}, lists:foreach(fun({Pid, true}) -> Pid ! Msg; (_) -> ok end, L#log.owners). cache_error(S, Pids) -> Error = S#state.cache_error, replies(Pids, Error), state_err(S#state{cache_error = ok}, Error). state_ok(S) -> state_err(S, ok). -spec state_err(#state{}, dlog_state_error()) -> #state{}. state_err(S, Err) when S#state.error_status =:= Err -> S; state_err(S, Err) -> notify_owners({error_status, Err}), S#state{error_status = Err}.