diff options
Diffstat (limited to 'lib/kernel/src/disk_log.erl')
-rw-r--r-- | lib/kernel/src/disk_log.erl | 252 |
1 files changed, 191 insertions, 61 deletions
diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 7f1b5f9ec6..9b8d2db437 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% Copyright Ericsson AB 1997-2011. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -70,9 +70,10 @@ %%% Contract type specifications %%%---------------------------------------------------------------------- +-opaque continuation() :: #continuation{}. + -type bytes() :: binary() | [byte()]. --type log() :: term(). % XXX: refine -type file_error() :: term(). % XXX: refine -type invalid_header() :: term(). % XXX: refine @@ -87,27 +88,30 @@ -type open_error_rsn() :: 'no_such_log' | {'badarg', term()} - | {'size_mismatch', dlog_size(), dlog_size()} - | {'arg_mismatch', dlog_optattr(), term(), term()} - | {'name_already_open', log()} - | {'open_read_write', log()} - | {'open_read_only', log()} - | {'need_repair', log()} - | {'not_a_log_file', string()} - | {'invalid_index_file', string()} + | {'size_mismatch', CurrentSize :: dlog_size(), + NewSize :: dlog_size()} + | {'arg_mismatch', OptionName :: dlog_optattr(), + CurrentValue :: term(), Value :: term()} + | {'name_already_open', Log :: log()} + | {'open_read_write', Log :: log()} + | {'open_read_only', Log :: log()} + | {'need_repair', Log :: log()} + | {'not_a_log_file', FileName :: file:filename()} + | {'invalid_index_file', FileName :: file:filename()} | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()} - | {'node_already_open', log()}. + | {'node_already_open', Log :: log()}. -type dist_error_rsn() :: 'nodedown' | open_error_rsn(). --type ret() :: {'ok', log()} - | {'repaired', log(), {'recovered', non_neg_integer()}, - {'badbytes', non_neg_integer()}}. +-type ret() :: {'ok', Log :: log()} + | {'repaired', Log :: log(), + {'recovered', Rec :: non_neg_integer()}, + {'badbytes', Bad :: non_neg_integer()}}. -type open_ret() :: ret() | {'error', open_error_rsn()}. -type dist_open_ret() :: {[{node(), ret()}], [{node(), {'error', dist_error_rsn()}}]}. --type all_open_ret() :: open_ret() | dist_open_ret(). --spec open(Args :: dlog_options()) -> all_open_ret(). +-spec open(ArgL) -> open_ret() | dist_open_ret() when + ArgL :: dlog_options(). open(A) -> disk_log_server:open(check_arg(A, #arg{options = A})). @@ -116,40 +120,57 @@ open(A) -> | {'full', log()} | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()}. --spec log(Log :: log(), Term :: term()) -> 'ok' | {'error', log_error_rsn()}. +-spec log(Log, Term) -> ok | {error, Reason :: log_error_rsn()} when + Log :: log(), + Term :: term(). log(Log, Term) -> req(Log, {log, term_to_binary(Term)}). --spec blog(Log :: log(), Bytes :: bytes()) -> 'ok' | {'error', log_error_rsn()}. +-spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when + Log :: log(), + Bytes :: bytes(). blog(Log, Bytes) -> req(Log, {blog, check_bytes(Bytes)}). --spec log_terms(Log :: log(), Terms :: [term()]) -> 'ok' | {'error', term()}. +-spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when + Log :: log(), + TermList :: [term()]. log_terms(Log, Terms) -> Bs = terms2bins(Terms), req(Log, {log, Bs}). --spec blog_terms(Log :: log(), Bytes :: [bytes()]) -> 'ok' | {'error', term()}. +-spec blog_terms(Log, BytesList) -> + ok | {error, Reason :: log_error_rsn()} when + Log :: log(), + BytesList :: [bytes()]. blog_terms(Log, Bytess) -> Bs = check_bytes_list(Bytess, Bytess), req(Log, {blog, Bs}). -type notify_ret() :: 'ok' | {'error', 'no_such_log'}. --spec alog(Log :: log(), Term :: term()) -> notify_ret(). +-spec alog(Log, Term) -> notify_ret() when + Log :: log(), + Term :: term(). alog(Log, Term) -> notify(Log, {alog, term_to_binary(Term)}). --spec alog_terms(Log :: log(), Terms :: [term()]) -> notify_ret(). +-spec alog_terms(Log, TermList) -> notify_ret() when + Log :: log(), + TermList :: [term()]. alog_terms(Log, Terms) -> Bs = terms2bins(Terms), notify(Log, {alog, Bs}). --spec balog(Log :: log(), Bytes :: bytes()) -> notify_ret(). +-spec balog(Log, Bytes) -> notify_ret() when + Log :: log(), + Bytes :: bytes(). balog(Log, Bytes) -> notify(Log, {balog, check_bytes(Bytes)}). --spec balog_terms(Log :: log(), Bytes :: [bytes()]) -> notify_ret(). +-spec balog_terms(Log, ByteList) -> notify_ret() when + Log :: log(), + ByteList :: [bytes()]. balog_terms(Log, Bytess) -> Bs = check_bytes_list(Bytess, Bytess), notify(Log, {balog, Bs}). @@ -157,18 +178,22 @@ balog_terms(Log, Bytess) -> -type close_error_rsn() ::'no_such_log' | 'nonode' | {'file_error', file:filename(), file_error()}. --spec close(Log :: log()) -> 'ok' | {'error', close_error_rsn()}. +-spec close(Log) -> 'ok' | {'error', close_error_rsn()} when + Log :: log(). close(Log) -> req(Log, close). -type lclose_error_rsn() :: 'no_such_log' | {'file_error', file:filename(), file_error()}. --spec lclose(Log :: log()) -> 'ok' | {'error', lclose_error_rsn()}. +-spec lclose(Log) -> 'ok' | {'error', lclose_error_rsn()} when + Log :: log(). lclose(Log) -> lclose(Log, node()). --spec lclose(Log :: log(), Node :: node()) -> 'ok' | {'error', lclose_error_rsn()}. +-spec lclose(Log, Node) -> 'ok' | {'error', lclose_error_rsn()} when + Log :: log(), + Node :: node(). lclose(Log, Node) -> lreq(Log, close, Node). @@ -178,29 +203,49 @@ lclose(Log, Node) -> | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()}. --spec truncate(Log :: log()) -> 'ok' | {'error', trunc_error_rsn()}. +-spec truncate(Log) -> 'ok' | {'error', trunc_error_rsn()} when + Log :: log(). truncate(Log) -> req(Log, {truncate, none, truncate, 1}). --spec truncate(Log :: log(), Head :: term()) -> 'ok' | {'error', trunc_error_rsn()}. +-spec truncate(Log, Head) -> 'ok' | {'error', trunc_error_rsn()} when + Log :: log(), + Head :: term(). truncate(Log, Head) -> req(Log, {truncate, {ok, term_to_binary(Head)}, truncate, 2}). --spec btruncate(Log :: log(), Head :: bytes()) -> 'ok' | {'error', trunc_error_rsn()}. +-spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when + Log :: log(), + BHead :: bytes(). btruncate(Log, Head) -> req(Log, {truncate, {ok, check_bytes(Head)}, btruncate, 2}). --spec reopen(Log :: log(), Filename :: file:filename()) -> 'ok' | {'error', term()}. +-type reopen_error_rsn() :: no_such_log + | nonode + | {read_only_mode, log()} + | {blocked_log, log()} + | {same_file_name, log()} | + {invalid_index_file, file:filename()} + | {invalid_header, invalid_header()} + | {'file_error', file:filename(), file_error()}. + +-spec reopen(Log, File) -> 'ok' | {'error', reopen_error_rsn()} when + Log :: log(), + File :: file:filename(). reopen(Log, NewFile) -> req(Log, {reopen, NewFile, none, reopen, 2}). --spec reopen(Log :: log(), Filename :: file:filename(), Head :: term()) -> - 'ok' | {'error', term()}. +-spec reopen(Log, File, Head) -> 'ok' | {'error', reopen_error_rsn()} when + Log :: log(), + File :: file:filename(), + Head :: term(). reopen(Log, NewFile, NewHead) -> req(Log, {reopen, NewFile, {ok, term_to_binary(NewHead)}, reopen, 3}). --spec breopen(Log :: log(), Filename :: file:filename(), Head :: bytes()) -> - 'ok' | {'error', term()}. +-spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when + Log :: log(), + File :: file:filename(), + BHead :: bytes(). breopen(Log, NewFile, NewHead) -> req(Log, {reopen, NewFile, {ok, check_bytes(NewHead)}, breopen, 3}). @@ -210,21 +255,36 @@ breopen(Log, NewFile, NewHead) -> | {'invalid_header', invalid_header()} | {'file_error', file:filename(), file_error()}. --spec inc_wrap_file(Log :: log()) -> 'ok' | {'error', inc_wrap_error_rsn()}. +-spec inc_wrap_file(Log) -> 'ok' | {'error', inc_wrap_error_rsn()} when + Log :: log(). inc_wrap_file(Log) -> req(Log, inc_wrap_file). --spec change_size(Log :: log(), Size :: dlog_size()) -> 'ok' | {'error', term()}. +-spec change_size(Log, Size) -> 'ok' | {'error', Reason} when + Log :: log(), + Size :: dlog_size(), + Reason :: no_such_log | nonode | {read_only_mode, Log} + | {blocked_log, Log} + | {new_size_too_small, CurrentSize :: pos_integer()} + | {badarg, size} + | {file_error, file:filename(), file_error()}. change_size(Log, NewSize) -> req(Log, {change_size, NewSize}). --spec change_notify(Log :: log(), Pid :: pid(), Notify :: boolean()) -> - 'ok' | {'error', term()}. +-spec change_notify(Log, Owner, Notify) -> 'ok' | {'error', Reason} when + Log :: log(), + Owner :: pid(), + Notify :: boolean(), + Reason :: no_such_log | nonode | {blocked_log, Log} + | {badarg, notify} | {not_owner, Owner}. change_notify(Log, Pid, NewNotify) -> req(Log, {change_notify, Pid, NewNotify}). --spec change_header(Log :: log(), Head :: {atom(), term()}) -> - 'ok' | {'error', term()}. +-spec change_header(Log, Header) -> 'ok' | {'error', Reason} when + Log :: log(), + Header :: {head, dlog_head_opt()} | {head_func, mfa()}, + Reason :: no_such_log | nonode | {read_only_mode, Log} + | {blocked_log, Log} | {badarg, head}. change_header(Log, NewHead) -> req(Log, {change_header, NewHead}). @@ -232,17 +292,21 @@ change_header(Log, NewHead) -> | {'blocked_log', log()} | {'file_error', file:filename(), file_error()}. --spec sync(Log :: log()) -> 'ok' | {'error', sync_error_rsn()}. +-spec sync(Log) -> 'ok' | {'error', sync_error_rsn()} when + Log :: log(). sync(Log) -> req(Log, sync). -type block_error_rsn() :: 'no_such_log' | 'nonode' | {'blocked_log', log()}. --spec block(Log :: log()) -> 'ok' | {'error', block_error_rsn()}. +-spec block(Log) -> 'ok' | {'error', block_error_rsn()} when + Log :: log(). block(Log) -> block(Log, true). --spec block(Log :: log(), QueueLogRecords :: boolean()) -> 'ok' | {'error', term()}. +-spec block(Log, QueueLogRecords) -> 'ok' | {'error', block_error_rsn()} when + Log :: log(), + QueueLogRecords :: boolean(). block(Log, QueueLogRecords) -> req(Log, {block, QueueLogRecords}). @@ -250,19 +314,46 @@ block(Log, QueueLogRecords) -> | {'not_blocked', log()} | {'not_blocked_by_pid', log()}. --spec unblock(Log :: log()) -> 'ok' | {'error', unblock_error_rsn()}. +-spec unblock(Log) -> 'ok' | {'error', unblock_error_rsn()} when + Log :: log(). unblock(Log) -> req(Log, unblock). --spec format_error(Error :: term()) -> string(). +-spec format_error(Error) -> io_lib:chars() when + Error :: term(). format_error(Error) -> do_format_error(Error). --spec info(Log :: log()) -> [{atom(), any()}] | {'error', term()}. +-type dlog_info() :: {name, Log :: log()} + | {file, File :: file:filename()} + | {type, Type :: dlog_type()} + | {format, Format :: dlog_format()} + | {size, Size :: dlog_size()} + | {mode, Mode :: dlog_mode()} + | {owners, [{pid(), Notify :: boolean()}]} + | {users, Users :: non_neg_integer()} + | {status, Status :: + ok | {blocked, QueueLogRecords :: boolean()}} + | {node, Node :: node()} + | {distributed, Dist :: local | [node()]} + | {head, Head :: none | {head, term()} | mfa()} + | {no_written_items, NoWrittenItems ::non_neg_integer()} + | {full, Full :: boolean} + | {no_current_bytes, non_neg_integer()} + | {no_current_items, non_neg_integer()} + | {no_items, non_neg_integer()} + | {current_file, pos_integer()} + | {no_overflows, {SinceLogWasOpened :: non_neg_integer(), + SinceLastInfo :: non_neg_integer()}}. +-spec info(Log) -> InfoList | {'error', no_such_log} when + Log :: log(), + InfoList :: [dlog_info()]. info(Log) -> sreq(Log, info). --spec pid2name(Pid :: pid()) -> {'ok', log()} | 'undefined'. +-spec pid2name(Pid) -> {'ok', Log} | 'undefined' when + Pid :: pid(), + Log :: log(). pid2name(Pid) -> disk_log_server:start(), case ets:lookup(?DISK_LOG_PID_TABLE, Pid) of @@ -274,13 +365,31 @@ pid2name(Pid) -> %% It retuns a {Cont2, ObjList} | eof | {error, Reason} %% The initial continuation is the atom 'start' --spec chunk(Log :: log(), Cont :: any()) -> - {'error', term()} | 'eof' | {any(), [any()]} | {any(), [any()], integer()}. +-type chunk_error_rsn() :: no_such_log + | {format_external, log()} + | {blocked_log, log()} + | {badarg, continuation} + | {not_internal_wrap, log()} + | {corrupt_log_file, FileName :: file:filename()} + | {file_error, file:filename(), file_error()}. + +-type chunk_ret() :: {Continuation2 :: continuation(), Terms :: [term()]} + | {Continuation2 :: continuation(), + Terms :: [term()], + Badbytes :: non_neg_integer()} + | eof + | {error, Reason :: chunk_error_rsn()}. + +-spec chunk(Log, Continuation) -> chunk_ret() when + Log :: log(), + Continuation :: start | continuation(). chunk(Log, Cont) -> chunk(Log, Cont, infinity). --spec chunk(Log :: log(), Cont :: any(), N :: pos_integer() | 'infinity') -> - {'error', term()} | 'eof' | {any(), [any()]} | {any(), [any()], integer()}. +-spec chunk(Log, Continuation, N) -> chunk_ret() when + Log :: log(), + Continuation :: start | continuation(), + N :: pos_integer() | infinity. chunk(Log, Cont, infinity) -> %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. ichunk(Log, Cont, ?MAX_CHUNK_SIZE); @@ -346,13 +455,24 @@ ichunk_bad_end([B | Bs], Mode, Log, C, Bad, A) -> ichunk_bad_end(Bs, Mode, Log, C, Bad, [T | A]) end. --spec bchunk(Log :: log(), Cont :: any()) -> - {'error', any()} | 'eof' | {any(), [binary()]} | {any(), [binary()], integer()}. +-type bchunk_ret() :: {Continuation2 :: continuation(), + Binaries :: [binary()]} + | {Continuation2 :: continuation(), + Binaries :: [binary()], + Badbytes :: non_neg_integer()} + | eof + | {error, Reason :: chunk_error_rsn()}. + +-spec bchunk(Log, Continuation) -> bchunk_ret() when + Log :: log(), + Continuation :: start | continuation(). bchunk(Log, Cont) -> bchunk(Log, Cont, infinity). --spec bchunk(Log :: log(), Cont :: any(), N :: 'infinity' | pos_integer()) -> - {'error', any()} | 'eof' | {any(), [binary()]} | {any(), [binary()], integer()}. +-spec bchunk(Log, Continuation, N) -> bchunk_ret() when + Log :: log(), + Continuation :: start | continuation(), + N :: pos_integer() | infinity. bchunk(Log, Cont, infinity) -> %% There cannot be more than ?MAX_CHUNK_SIZE terms in a chunk. bichunk(Log, Cont, ?MAX_CHUNK_SIZE); @@ -375,8 +495,14 @@ bichunk_end({C = #continuation{}, R, Bad}) -> bichunk_end(R) -> R. --spec chunk_step(Log :: log(), Cont :: any(), N :: integer()) -> - {'ok', any()} | {'error', term()}. +-spec chunk_step(Log, Continuation, Step) -> + {'ok', any()} | {'error', Reason} when + Log :: log(), + Continuation :: start | continuation(), + Step :: integer(), + Reason :: no_such_log | end_of_log | {format_external, Log} + | {blocked_log, Log} | {badarg, continuation} + | {file_error, file:filename(), file_error()}. chunk_step(Log, Cont, N) when is_integer(N) -> ichunk_step(Log, Cont, N). @@ -387,14 +513,18 @@ ichunk_step(_Log, More, N) when is_record(More, continuation) -> ichunk_step(_Log, _, _) -> {error, {badarg, continuation}}. --spec chunk_info(More :: any()) -> - [{'node', node()},...] | {'error', {'no_continuation', any()}}. +-spec chunk_info(Continuation) -> InfoList | {error, Reason} when + Continuation :: continuation(), + InfoList :: [{node, Node :: node()}, ...], + Reason :: {no_continuation, Continuation}. chunk_info(More = #continuation{}) -> [{node, node(More#continuation.pid)}]; chunk_info(BadCont) -> {error, {no_continuation, BadCont}}. --spec accessible_logs() -> {[_], [_]}. +-spec accessible_logs() -> {[LocalLog], [DistributedLog]} when + LocalLog :: log(), + DistributedLog :: log(). accessible_logs() -> disk_log_server:accessible_logs(). |