diff options
author | Hans Bolinder <[email protected]> | 2016-12-01 08:32:18 +0100 |
---|---|---|
committer | Hans Bolinder <[email protected]> | 2016-12-01 08:32:18 +0100 |
commit | ca29c37928ad36cf4270fb3edffc2cf9752f2ed9 (patch) | |
tree | 119892da605f6be989d12c907c3362efdb59d7ab /lib/kernel | |
parent | d4701119d50f16dce30ff0a736c7a926e1bbaa04 (diff) | |
parent | 93ac8d2b4b2937b9b7651b5e043c31929f3b2c7c (diff) | |
download | otp-ca29c37928ad36cf4270fb3edffc2cf9752f2ed9.tar.gz otp-ca29c37928ad36cf4270fb3edffc2cf9752f2ed9.tar.bz2 otp-ca29c37928ad36cf4270fb3edffc2cf9752f2ed9.zip |
Merge branch 'richcarl/kernel/disk_log_tweaks/OTP-14057/PR-1245'
* richcarl/kernel/disk_log_tweaks/OTP-14057/PR-1245:
Pass log format through from handle()
Improve caching in disk_log
Use pattern matching for records where suitable
Eliminate more code duplication
Simplify for rflat
Clarify that the type for disk log data is iodata()
Pass through known size instead of recomputing
Use iolist_size instead of local function
Only read log format once in collect loop
Eliminate some code duplication
Rename internal function for clarity
Minor documentation cleanup
Diffstat (limited to 'lib/kernel')
-rw-r--r-- | lib/kernel/doc/src/disk_log.xml | 19 | ||||
-rw-r--r-- | lib/kernel/src/disk_log.erl | 394 | ||||
-rw-r--r-- | lib/kernel/src/disk_log.hrl | 4 | ||||
-rw-r--r-- | lib/kernel/src/disk_log_1.erl | 32 | ||||
-rw-r--r-- | lib/kernel/test/disk_log_SUITE.erl | 14 |
5 files changed, 216 insertions, 247 deletions
diff --git a/lib/kernel/doc/src/disk_log.xml b/lib/kernel/doc/src/disk_log.xml index 0b6ee1e6a5..aebeacee28 100644 --- a/lib/kernel/doc/src/disk_log.xml +++ b/lib/kernel/doc/src/disk_log.xml @@ -43,7 +43,7 @@ <taglist> <tag>halt logs</tag> <item><p>Appends items to a single file, which size can - be limited by the disk log module.</p></item> + be limited by the <c>disk_log</c> module.</p></item> <tag>wrap logs</tag> <item><p>Uses a sequence of wrap log files of limited size. As a wrap log file is filled up, further items are logged on to the next @@ -62,8 +62,8 @@ An item logged to an internally formatted log must not occupy more than 4 GB of disk space (the size must fit in 4 bytes).</p></item> <tag>external format</tag> - <item><p>Leaves it up to the user to read the logged deep byte lists. - The disk log module cannot repair externally formatted logs.</p></item> + <item><p>Leaves it up to the user to read and interpret the logged data. + The <c>disk_log</c> module cannot repair externally formatted logs.</p></item> </taglist> <p>For each open disk log, one process handles requests @@ -109,8 +109,7 @@ These functions log one or more Erlang terms. By prefixing each of the functions with a <c>b</c> (for "binary"), we get the corresponding <c>blog()</c> functions for the external format. - These functions log one or more deep lists of bytes or, alternatively, - binaries of deep lists of bytes. + These functions log one or more chunks of bytes. For example, to log the string <c>"hello"</c> in ASCII format, you can use <c>disk_log:blog(Log, "hello")</c>, or <c>disk_log:blog(Log, list_to_binary("hello"))</c>. The two @@ -219,9 +218,6 @@ <name name="dlog_head_opt"/> </datatype> <datatype> - <name name="dlog_byte"/> - </datatype> - <datatype> <name name="dlog_mode"/> </datatype> <datatype> @@ -234,9 +230,6 @@ </desc> </datatype> <datatype> - <name name="bytes"/> - </datatype> - <datatype> <name name="invalid_header"/> </datatype> <datatype> @@ -953,7 +946,7 @@ written first on the log file. If the log is a wrap log, the item <c><anno>Head</anno></c> is written first in each new file. <c><anno>Head</anno></c> is to be a term if the format is - <c>internal</c>, otherwise a deep list of bytes (or a binary). + <c>internal</c>, otherwise a sequence of bytes. Defaults to <c>none</c>, which means that no header is written first on the file. </p> @@ -965,7 +958,7 @@ The call <c>M:F(A)</c> is assumed to return <c>{ok, Head}</c>. The item <c>Head</c> is written first in each file. <c>Head</c> is to be a term if the format is - <c>internal</c>, otherwise a deep list of bytes (or a binary). + <c>internal</c>, otherwise a sequence of bytes. </p> </item> <tag><c>{mode, <anno>Mode</anno>}</c></tag> diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 9b44021872..2ade7fd77a 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -67,7 +67,7 @@ %%-define(PROFILE(C), C). -define(PROFILE(C), void). --compile({inline,[{log_loop,5},{log_end_sync,2},{replies,2},{rflat,1}]}). +-compile({inline,[{log_loop,6},{log_end_sync,2},{replies,2},{rflat,1}]}). %%%---------------------------------------------------------------------- %%% Contract type specifications @@ -75,8 +75,6 @@ -opaque continuation() :: #continuation{}. --type bytes() :: binary() | [byte()]. - -type file_error() :: term(). % XXX: refine -type invalid_header() :: term(). % XXX: refine @@ -127,28 +125,28 @@ open(A) -> Log :: log(), Term :: term(). log(Log, Term) -> - req(Log, {log, term_to_binary(Term)}). + req(Log, {log, internal, [term_to_binary(Term)]}). -spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when Log :: log(), - Bytes :: bytes(). + Bytes :: iodata(). blog(Log, Bytes) -> - req(Log, {blog, check_bytes(Bytes)}). + req(Log, {log, external, [ensure_binary(Bytes)]}). -spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when Log :: log(), TermList :: [term()]. log_terms(Log, Terms) -> Bs = terms2bins(Terms), - req(Log, {log, Bs}). + req(Log, {log, internal, Bs}). -spec blog_terms(Log, BytesList) -> ok | {error, Reason :: log_error_rsn()} when Log :: log(), - BytesList :: [bytes()]. + BytesList :: [iodata()]. blog_terms(Log, Bytess) -> - Bs = check_bytes_list(Bytess, Bytess), - req(Log, {blog, Bs}). + Bs = ensure_binary_list(Bytess), + req(Log, {log, external, Bs}). -type notify_ret() :: 'ok' | {'error', 'no_such_log'}. @@ -156,27 +154,27 @@ blog_terms(Log, Bytess) -> Log :: log(), Term :: term(). alog(Log, Term) -> - notify(Log, {alog, term_to_binary(Term)}). + notify(Log, {alog, internal, [term_to_binary(Term)]}). -spec alog_terms(Log, TermList) -> notify_ret() when Log :: log(), TermList :: [term()]. alog_terms(Log, Terms) -> Bs = terms2bins(Terms), - notify(Log, {alog, Bs}). + notify(Log, {alog, internal, Bs}). -spec balog(Log, Bytes) -> notify_ret() when Log :: log(), - Bytes :: bytes(). + Bytes :: iodata(). balog(Log, Bytes) -> - notify(Log, {balog, check_bytes(Bytes)}). + notify(Log, {alog, external, [ensure_binary(Bytes)]}). -spec balog_terms(Log, ByteList) -> notify_ret() when Log :: log(), - ByteList :: [bytes()]. + ByteList :: [iodata()]. balog_terms(Log, Bytess) -> - Bs = check_bytes_list(Bytess, Bytess), - notify(Log, {balog, Bs}). + Bs = ensure_binary_list(Bytess), + notify(Log, {alog, external, Bs}). -type close_error_rsn() ::'no_such_log' | 'nonode' | {'file_error', file:filename(), file_error()}. @@ -219,9 +217,9 @@ truncate(Log, Head) -> -spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when Log :: log(), - BHead :: bytes(). + BHead :: iodata(). btruncate(Log, Head) -> - req(Log, {truncate, {ok, check_bytes(Head)}, btruncate, 2}). + req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}). -type reopen_error_rsn() :: no_such_log | nonode @@ -248,9 +246,9 @@ reopen(Log, NewFile, NewHead) -> -spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when Log :: log(), File :: file:filename(), - BHead :: bytes(). + BHead :: iodata(). breopen(Log, NewFile, NewHead) -> - req(Log, {reopen, NewFile, {ok, check_bytes(NewHead)}, breopen, 3}). + req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}). -type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} @@ -670,13 +668,12 @@ init(Parent, Server) -> process_flag(trap_exit, true), loop(#state{parent = Parent, server = Server}). -loop(State) when State#state.messages =:= [] -> +loop(#state{messages = []}=State) -> receive Message -> handle(Message, State) end; -loop(State) -> - [M | Ms] = State#state.messages, +loop(#state{messages = [M | Ms]}=State) -> handle(M, State#state{messages = Ms}). handle({From, write_cache}, S) when From =:= self() -> @@ -686,106 +683,79 @@ handle({From, write_cache}, S) when From =:= self() -> Error -> loop(S#state{cache_error = Error}) end; -handle({From, {log, B}}, S) -> +handle({From, {log, Format, B}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, L#log.format =:= internal -> - log_loop(S, From, [B], [], iolist_size(B)); - L when L#log.status =:= ok, L#log.format =:= external -> + #log{status = ok, format=external}=L when Format =:= internal -> reply(From, {error, {format_external, L#log.name}}, S); - L when L#log.status =:= {blocked, false} -> - reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> - reply(From, {error, {blocked_log, L#log.name}}, S); - _ -> - loop(S#state{queue = [{From, {log, B}} | S#state.queue]}) - end; -handle({From, {blog, B}}, S) -> - case get(log) of - L when L#log.mode =:= read_only -> - reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> - log_loop(S, From, [B], [], iolist_size(B)); - L when L#log.status =:= {blocked, false} -> + #log{status = ok, format=LogFormat} -> + log_loop(S, From, [B], [], iolist_size(B), LogFormat); + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {blog, B}} | S#state.queue]}) + enqueue(Message, S) end; -handle({alog, B}, S) -> +handle({alog, Format, B}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only} -> notify_owners({read_only,B}), loop(S); - L when L#log.status =:= ok, L#log.format =:= internal -> - log_loop(S, [], [B], [], iolist_size(B)); - L when L#log.status =:= ok -> + #log{status = ok, format = external} when Format =:= internal -> notify_owners({format_external, B}), loop(S); - L when L#log.status =:= {blocked, false} -> + #log{status = ok, format=LogFormat} -> + log_loop(S, [], [B], [], iolist_size(B), LogFormat); + #log{status = {blocked, false}} -> notify_owners({blocked_log, B}), loop(S); _ -> - loop(S#state{queue = [{alog, B} | S#state.queue]}) + enqueue(Message, S) end; -handle({balog, B}, S) -> +handle({From, {block, QueueLogRecs}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> - notify_owners({read_only,B}), - loop(S); - L when L#log.status =:= ok -> - log_loop(S, [], [B], [], iolist_size(B)); - L when L#log.status =:= {blocked, false} -> - notify_owners({blocked_log, B}), - loop(S); - _ -> - loop(S#state{queue = [{balog, B} | S#state.queue]}) - end; -handle({From, {block, QueueLogRecs}}, S) -> - case get(log) of - L when L#log.status =:= ok -> + #log{status = ok}=L -> do_block(From, QueueLogRecs, L), reply(From, ok, S); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {block, QueueLogRecs}} | - S#state.queue]}) + enqueue(Message, S) end; handle({From, unblock}, S) -> case get(log) of - L when L#log.status =:= ok -> + #log{status = ok}=L -> reply(From, {error, {not_blocked, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> S2 = do_unblock(L, S), reply(From, ok, S2); L -> reply(From, {error, {not_blocked_by_pid, L#log.name}}, S) end; -handle({From, sync}, S) -> +handle({From, sync}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> - sync_loop([From], S); - L when L#log.status =:= {blocked, false} -> + #log{status = ok, format=LogFormat} -> + log_loop(S, [], [], [From], 0, LogFormat); + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, sync} | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {truncate, Head, F, A}}, S) -> +handle({From, {truncate, Head, F, A}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> H = merge_head(Head, L#log.head), case catch do_trunc(L, H) of ok -> @@ -796,48 +766,46 @@ handle({From, {truncate, Head, F, A}}, S) -> Error -> do_exit(S, From, Error, ?failure(Error, F, A)) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {truncate, Head, F, A}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {chunk, Pos, B, N}}, S) -> +handle({From, {chunk, Pos, B, N}}=Message, S) -> case get(log) of - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> R = do_chunk(L, Pos, B, N), reply(From, R, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> R = do_chunk(L, Pos, B, N), reply(From, R, S); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _L -> - loop(S#state{queue = [{From, {chunk, Pos, B, N}} | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {chunk_step, Pos, N}}, S) -> +handle({From, {chunk_step, Pos, N}}=Message, S) -> case get(log) of - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> R = do_chunk_step(L, Pos, N), reply(From, R, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> R = do_chunk_step(L, Pos, N), reply(From, R, S); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {chunk_step, Pos, N}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {change_notify, Pid, NewNotify}}, S) -> +handle({From, {change_notify, Pid, NewNotify}}=Message, S) -> case get(log) of - L when L#log.status =:= ok -> + #log{status = ok}=L -> case do_change_notify(L, Pid, NewNotify) of {ok, L1} -> put(log, L1), @@ -845,39 +813,37 @@ handle({From, {change_notify, Pid, NewNotify}}, S) -> Error -> reply(From, Error, S) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {change_notify, Pid, NewNotify}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {change_header, NewHead}}, S) -> +handle({From, {change_header, NewHead}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> - case check_head(NewHead, L#log.format) of + #log{status = ok, format = Format}=L -> + case check_head(NewHead, Format) of {ok, Head} -> - put(log, L#log{head = mk_head(Head, L#log.format)}), + put(log, L#log{head = mk_head(Head, Format)}), reply(From, ok, S); Error -> reply(From, Error, S) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {change_header, NewHead}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {change_size, NewSize}}, S) -> +handle({From, {change_size, NewSize}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> + #log{status = ok}=L -> case check_size(L#log.type, NewSize) of ok -> case catch do_change_size(L, NewSize) of % does the put @@ -894,23 +860,22 @@ handle({From, {change_size, NewSize}}, S) -> not_ok -> reply(From, {error, {badarg, size}}, S) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {change_size, NewSize}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, inc_wrap_file}, S) -> +handle({From, inc_wrap_file}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.type =:= halt -> + #log{type = halt}=L -> reply(From, {error, {halt_log, L#log.name}}, S); - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> case catch do_inc_wrap_file(L) of {ok, L2, Lost} -> put(log, L2), @@ -920,20 +885,22 @@ handle({From, inc_wrap_file}, S) -> put(log, L2), reply(From, Error, state_err(S, Error)) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, inc_wrap_file} | S#state.queue]}) + enqueue(Message, S) end; handle({From, {reopen, NewFile, Head, F, A}}, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok, L#log.filename =/= NewFile -> + #log{status = ok, filename = NewFile}=L -> + reply(From, {error, {same_file_name, L#log.name}}, S); + #log{status = ok}=L -> case catch close_disk_log2(L) of closed -> File = L#log.filename, @@ -966,8 +933,6 @@ handle({From, {reopen, NewFile, Head, F, A}}, S) -> Error -> do_exit(S, From, Error, ?failure(Error, F, A)) end; - L when L#log.status =:= ok -> - reply(From, {error, {same_file_name, L#log.name}}, S); L -> reply(From, {error, {blocked_log, L#log.name}}, S) end; @@ -1005,11 +970,11 @@ handle({From, close}, S) -> end; handle({From, info}, S) -> reply(From, do_info(get(log), S#state.cnt), S); -handle({'EXIT', From, Reason}, S) when From =:= S#state.parent -> +handle({'EXIT', From, Reason}, #state{parent=From}=S) -> %% Parent orders shutdown. _ = do_stop(S), exit(Reason); -handle({'EXIT', From, Reason}, S) when From =:= S#state.server -> +handle({'EXIT', From, Reason}, #state{server=From}=S) -> %% The server is gone. _ = do_stop(S), exit(Reason); @@ -1034,57 +999,59 @@ handle({system, From, Req}, S) -> handle(_, S) -> loop(S). -sync_loop(From, S) -> - log_loop(S, [], [], From, 0). +enqueue(Message, #state{queue = Queue}=S) -> + loop(S#state{queue = [Message | Queue]}). + +%% Collect further log and sync requests already in the mailbox or queued -define(MAX_LOOK_AHEAD, 64*1024). %% Inlined. -log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz) when CE =/= ok -> +log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok -> loop(cache_error(S, Pids)); -log_loop(#state{}=S, Pids, Bins, Sync, Sz) when Sz > ?MAX_LOOK_AHEAD -> - loop(log_end(S, Pids, Bins, Sync)); -log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz) -> - receive +log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD -> + loop(log_end(S, Pids, Bins, Sync, Sz)); +log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) -> + receive Message -> - log_loop(Message, Pids, Bins, Sync, Sz, S, get(log)) + log_loop(Message, Pids, Bins, Sync, Sz, F, S) after 0 -> - loop(log_end(S, Pids, Bins, Sync)) + loop(log_end(S, Pids, Bins, Sync, Sz)) end; -log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz) -> +log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) -> S1 = S#state{messages = Ms}, - log_loop(M, Pids, Bins, Sync, Sz, S1, get(log)). + log_loop(M, Pids, Bins, Sync, Sz, F, S1). %% Items logged after the last sync request found are sync:ed as well. -log_loop({alog,B}, Pids, Bins, Sync, Sz, S, #log{format = internal}) -> - %% {alog, _} allowed for the internal format only. - log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({balog, B}, Pids, Bins, Sync, Sz, S, _L) -> - log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({From, {log, B}}, Pids, Bins, Sync, Sz, S, #log{format = internal}) -> - %% {log, _} allowed for the internal format only. - log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({From, {blog, B}}, Pids, Bins, Sync, Sz, S, _L) -> - log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({From, sync}, Pids, Bins, Sync, Sz, S, _L) -> - log_loop(S, Pids, Bins, [From | Sync], Sz); -log_loop(Message, Pids, Bins, Sync, _Sz, S, _L) -> - NS = log_end(S, Pids, Bins, Sync), +log_loop({alog, internal, B}, Pids, Bins, Sync, Sz, internal=F, S) -> + %% alog of terms allowed for the internal format only + log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({alog, binary, B}, Pids, Bins, Sync, Sz, F, S) -> + log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({From, {log, internal, B}}, Pids, Bins, Sync, Sz, internal=F, S) -> + %% log of terms allowed for the internal format only + log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({From, {log, binary, B}}, Pids, Bins, Sync, Sz, F, S) -> + log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) -> + log_loop(S, Pids, Bins, [From | Sync], Sz, F); +log_loop(Message, Pids, Bins, Sync, Sz, _F, S) -> + NS = log_end(S, Pids, Bins, Sync, Sz), handle(Message, NS). -log_end(S, [], [], Sync) -> +log_end(S, [], [], Sync, _Sz) -> log_end_sync(S, Sync); -log_end(S, Pids, Bins, Sync) -> - case do_log(get(log), rflat(Bins)) of +log_end(#state{cnt = Cnt}=S, Pids, Bins, Sync, Sz) -> + case do_log(get(log), rflat(Bins), Sz) of N when is_integer(N) -> ok = replies(Pids, ok), - S1 = (state_ok(S))#state{cnt = S#state.cnt+N}, + S1 = (state_ok(S))#state{cnt = Cnt + N}, log_end_sync(S1, Sync); {error, {error, {full, _Name}}, N} when Pids =:= [] -> - log_end_sync(state_ok(S#state{cnt = S#state.cnt + N}), Sync); + log_end_sync(state_ok(S#state{cnt = Cnt + N}), Sync); {error, Error, N} -> ok = replies(Pids, Error), - state_err(S#state{cnt = S#state.cnt + N}, Error) + state_err(S#state{cnt = Cnt + N}, Error) end. %% Inlined. @@ -1096,12 +1063,9 @@ log_end_sync(S, Sync) -> state_err(S, Res). %% Inlined. -rflat([B]=L) when is_binary(B) -> L; rflat([B]) -> B; rflat(B) -> rflat(B, []). -rflat([B | Bs], L) when is_binary(B) -> - rflat(Bs, [B | L]); rflat([B | Bs], L) -> rflat(Bs, B ++ L); rflat([], L) -> L. @@ -1138,17 +1102,17 @@ close_owner(Pid, L, S) -> S2 = do_unblock(Pid, get(log), S), unlink(Pid), do_close2(L1, S2). - + %% -> {stop, S} | {continue, S} -close_user(Pid, L, S) when L#log.users > 0 -> - L1 = L#log{users = L#log.users - 1}, +close_user(Pid, #log{users=Users}=L, S) when Users > 0 -> + L1 = L#log{users = Users - 1}, put(log, L1), S2 = do_unblock(Pid, get(log), S), do_close2(L1, S2); close_user(_Pid, _L, S) -> {continue, S}. -do_close2(L, S) when L#log.users =:= 0, L#log.owners =:= [] -> +do_close2(#log{users = 0, owners = []}, S) -> {stop, S}; do_close2(_L, S) -> {continue, S}. @@ -1227,14 +1191,14 @@ add_pid(Pid, Notify, L) when is_pid(Pid) -> add_pid(_NotAPid, _Notify, L) -> {ok, L#log{users = L#log.users + 1}}. -unblock_pid(L) when L#log.blocked_by =:= none -> +unblock_pid(#log{blocked_by = none}) -> ok; -unblock_pid(L) -> - case is_owner(L#log.blocked_by, L) of +unblock_pid(#log{blocked_by = Pid}=L) -> + case is_owner(Pid, L) of {true, _Notify} -> ok; false -> - unlink(L#log.blocked_by) + unlink(Pid) end. %% -> true | false @@ -1326,7 +1290,7 @@ do_open(A) -> end. mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)}; -mk_head({head, Bytes}, external) -> {ok, check_bytes(Bytes)}; +mk_head({head, Bytes}, external) -> {ok, ensure_binary(Bytes)}; mk_head(H, _) -> H. terms2bins([T | Ts]) -> @@ -1334,30 +1298,29 @@ terms2bins([T | Ts]) -> terms2bins([]) -> []. -check_bytes_list([B | Bs], Bs0) when is_binary(B) -> - check_bytes_list(Bs, Bs0); -check_bytes_list([], Bs0) -> +ensure_binary_list(Bs) -> + ensure_binary_list(Bs, Bs). + +ensure_binary_list([B | Bs], Bs0) when is_binary(B) -> + ensure_binary_list(Bs, Bs0); +ensure_binary_list([], Bs0) -> Bs0; -check_bytes_list(_, Bs0) -> - check_bytes_list(Bs0). - -check_bytes_list([B | Bs]) when is_binary(B) -> - [B | check_bytes_list(Bs)]; -check_bytes_list([B | Bs]) -> - [list_to_binary(B) | check_bytes_list(Bs)]; -check_bytes_list([]) -> +ensure_binary_list(_, Bs0) -> + make_binary_list(Bs0). + +make_binary_list([B | Bs]) -> + [ensure_binary(B) | make_binary_list(Bs)]; +make_binary_list([]) -> []. -check_bytes(Binary) when is_binary(Binary) -> - Binary; -check_bytes(Bytes) -> - list_to_binary(Bytes). +ensure_binary(Bytes) -> + iolist_to_binary(Bytes). %%----------------------------------------------------------------- %% Change size of the logs in runtime. %%----------------------------------------------------------------- %% -> ok | {big, CurSize} | throw(Error) -do_change_size(L, NewSize) when L#log.type =:= halt -> +do_change_size(#log{type = halt}=L, NewSize) -> Halt = L#log.extra, CurB = Halt#halt.curB, NewLog = L#log{extra = Halt#halt{size = NewSize}}, @@ -1373,7 +1336,7 @@ do_change_size(L, NewSize) when L#log.type =:= halt -> true -> {big, CurB} end; -do_change_size(L, NewSize) when L#log.type =:= wrap -> +do_change_size(#log{type = wrap}=L, NewSize) -> #log{extra = Extra, version = Version} = L, {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version), erase(is_full), @@ -1388,7 +1351,7 @@ check_head({head_func, {M, F, A}}, _Format) when is_atom(M), is_list(A) -> {ok, {M, F, A}}; check_head({head, Head}, external) -> - case catch check_bytes(Head) of + case catch ensure_binary(Head) of {'EXIT', _} -> {error, {badarg, head}}; _ -> @@ -1674,7 +1637,7 @@ do_block(Pid, QueueLogRecs, L) -> link(Pid) end. -do_unblock(Pid, L, S) when L#log.blocked_by =:= Pid -> +do_unblock(Pid, #log{blocked_by = Pid}=L, S) -> do_unblock(L, S); do_unblock(_Pid, _L, S) -> S. @@ -1692,10 +1655,13 @@ do_unblock(L, S) -> -spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}. -do_log(L, B) when L#log.type =:= halt -> +do_log(L, B) -> + do_log(L, B, iolist_size(B)). + +do_log(#log{type = halt}=L, B, BSz) -> #log{format = Format, extra = Halt} = L, #halt{curB = CurSize, size = Sz} = Halt, - {Bs, BSize} = bsize(B, Format), + {Bs, BSize} = logl(B, Format, BSz), case get(is_full) of true -> {error, {error, {full, L#log.name}}, 0}; @@ -1704,7 +1670,7 @@ do_log(L, B) when L#log.type =:= halt -> undefined -> halt_write_full(L, B, Format, 0) end; -do_log(L, B) when L#log.format_type =:= wrap_int -> +do_log(#log{format_type = wrap_int}=L, B, _BSz) -> case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), @@ -1717,7 +1683,7 @@ do_log(L, B) when L#log.format_type =:= wrap_int -> put(log, L#log{extra = Handle}), {error, Error, Logged - Lost} end; -do_log(L, B) when L#log.format_type =:= wrap_ext -> +do_log(#log{format_type = wrap_ext}=L, B, _BSz) -> case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), @@ -1731,17 +1697,16 @@ do_log(L, B) when L#log.format_type =:= wrap_ext -> {error, Error, Logged - Lost} end. -bsize(B, external) -> - {B, xsz(B, 0)}; -bsize(B, internal) -> +logl(B, external, undefined) -> + {B, iolist_size(B)}; +logl(B, external, Sz) -> + {B, Sz}; +logl(B, internal, _Sz) -> disk_log_1:logl(B). -xsz([B|T], Sz) -> xsz(T, byte_size(B) + Sz); -xsz([], Sz) -> Sz. - halt_write_full(L, [Bin | Bins], Format, N) -> B = [Bin], - {Bs, BSize} = bsize(B, Format), + {Bs, BSize} = logl(B, Format, undefined), Halt = L#log.extra, #halt{curB = CurSize, size = Sz} = Halt, if @@ -1793,7 +1758,7 @@ do_sync(#log{type = wrap, extra = Handle} = Log) -> Reply. %% -> ok | Error | throw(Error) -do_trunc(L, Head) when L#log.type =:= halt -> +do_trunc(#log{type = halt}=L, Head) -> #log{filename = FName, extra = Halt} = L, FdC = Halt#halt.fdc, {Reply1, FdC2} = @@ -1822,7 +1787,7 @@ do_trunc(L, Head) when L#log.type =:= halt -> end, put(log, L#log{extra = NewHalt}), Reply; -do_trunc(L, Head) when L#log.type =:= wrap -> +do_trunc(#log{type = wrap}=L, Head) -> Handle = L#log.extra, OldHead = L#log.head, {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle), @@ -2016,8 +1981,7 @@ notify_owners(Note) -> (_) -> ok end, L#log.owners). -cache_error(S, Pids) -> - Error = S#state.cache_error, +cache_error(#state{cache_error=Error}=S, Pids) -> ok = replies(Pids, Error), state_err(S#state{cache_error = ok}, Error). diff --git a/lib/kernel/src/disk_log.hrl b/lib/kernel/src/disk_log.hrl index 3262d979ee..593dbb31ab 100644 --- a/lib/kernel/src/disk_log.hrl +++ b/lib/kernel/src/disk_log.hrl @@ -39,6 +39,7 @@ -define(MAX_FILES, 65000). -define(MAX_BYTES, ((1 bsl 64) - 1)). -define(MAX_CHUNK_SIZE, 65536). +-define(MAX_FWRITE_CACHE, 65536). %% Object defines -define(LOGMAGIC, <<1,2,3,4>>). @@ -54,11 +55,10 @@ %% Types -- alphabetically %%------------------------------------------------------------------------ --type dlog_byte() :: [dlog_byte()] | byte(). -type dlog_format() :: 'external' | 'internal'. -type dlog_format_type() :: 'halt_ext' | 'halt_int' | 'wrap_ext' | 'wrap_int'. -type dlog_head() :: 'none' | {'ok', binary()} | mfa(). --type dlog_head_opt() :: none | term() | binary() | [dlog_byte()]. +-type dlog_head_opt() :: none | term() | iodata(). -type log() :: term(). % XXX: refine -type dlog_mode() :: 'read_only' | 'read_write'. -type dlog_name() :: atom() | string(). diff --git a/lib/kernel/src/disk_log_1.erl b/lib/kernel/src/disk_log_1.erl index 2e61363aa6..d83c30f35f 100644 --- a/lib/kernel/src/disk_log_1.erl +++ b/lib/kernel/src/disk_log_1.erl @@ -1416,24 +1416,36 @@ open_truncate(FileName) -> %%% Functions that access files, and throw on error. --define(MAX, 16384). % bytes -define(TIMEOUT, 2000). % ms %% -> {Reply, cache()}; Reply = ok | Error -fwrite(#cache{c = []} = FdC, _FN, B, Size) -> +fwrite(FdC, _FN, _B, 0) -> + {ok, FdC}; % avoid starting a timer for empty writes +fwrite(#cache{fd = Fd, c = C, sz = Sz} = FdC, FileName, B, Size) -> + Sz1 = Sz + Size, + C1 = cache_append(C, B), + if Sz1 > ?MAX_FWRITE_CACHE -> + write_cache(Fd, FileName, C1); + true -> + maybe_start_timer(C), + {ok, FdC#cache{sz = Sz1, c = C1}} + end. + +cache_append([], B) -> B; +cache_append(C, B) -> [C | B]. + +%% if the cache was empty, start timer (unless it's already running) +maybe_start_timer([]) -> case get(write_cache_timer_is_running) of - true -> + true -> ok; - _ -> + _ -> put(write_cache_timer_is_running, true), erlang:send_after(?TIMEOUT, self(), {self(), write_cache}), ok - end, - {ok, FdC#cache{sz = Size, c = B}}; -fwrite(#cache{sz = Sz, c = C} = FdC, _FN, B, Size) when Sz < ?MAX -> - {ok, FdC#cache{sz = Sz+Size, c = [C | B]}}; -fwrite(#cache{fd = Fd, c = C}, FileName, B, _Size) -> - write_cache(Fd, FileName, [C | B]). + end; +maybe_start_timer(_C) -> + ok. fwrite_header(Fd, B, Size) -> {ok, #cache{fd = Fd, sz = Size, c = B}}. diff --git a/lib/kernel/test/disk_log_SUITE.erl b/lib/kernel/test/disk_log_SUITE.erl index f7ad9c0c04..23fe975ef7 100644 --- a/lib/kernel/test/disk_log_SUITE.erl +++ b/lib/kernel/test/disk_log_SUITE.erl @@ -421,7 +421,7 @@ halt_ro_alog(Conf) when is_list(Conf) -> halt_ro_alog_wait_notify(Log, T) -> Term = term_to_binary(T), receive - {disk_log, _, Log,{read_only, Term}} -> + {disk_log, _, Log,{read_only, [Term]}} -> ok; Other -> Other @@ -449,7 +449,7 @@ halt_ro_balog(Conf) when is_list(Conf) -> halt_ro_balog_wait_notify(Log, T) -> Term = list_to_binary(T), receive - {disk_log, _, Log,{read_only, Term}} -> + {disk_log, _, Log,{read_only, [Term]}} -> ok; Other -> Other @@ -1385,15 +1385,15 @@ blocked_notif(Conf) when is_list(Conf) -> "The requested operation" ++ _ = format_error(Error1), ok = disk_log:blog(n, B), ok = disk_log:alog(n, B), - rec(1, {disk_log, node(), n, {format_external, term_to_binary(B)}}), + rec(1, {disk_log, node(), n, {format_external, [term_to_binary(B)]}}), ok = disk_log:alog_terms(n, [B,B,B,B]), rec(1, {disk_log, node(), n, {format_external, lists:map(fun term_to_binary/1, [B,B,B,B])}}), ok = disk_log:block(n, false), ok = disk_log:alog(n, B), - rec(1, {disk_log, node(), n, {blocked_log, term_to_binary(B)}}), + rec(1, {disk_log, node(), n, {blocked_log, [term_to_binary(B)]}}), ok = disk_log:balog(n, B), - rec(1, {disk_log, node(), n, {blocked_log, list_to_binary(B)}}), + rec(1, {disk_log, node(), n, {blocked_log, [list_to_binary(B)]}}), ok = disk_log:balog_terms(n, [B,B,B,B]), disk_log:close(n), rec(1, {disk_log, node(), n, {blocked_log, @@ -4666,7 +4666,7 @@ other_groups(Conf) when is_list(Conf) -> ok. --define(MAX, 16384). % MAX in disk_log_1.erl +-define(MAX, ?MAX_FWRITE_CACHE). % as in disk_log_1.erl %% Evil cases such as closed file descriptor port. evil(Conf) when is_list(Conf) -> Dir = ?privdir(Conf), @@ -4690,7 +4690,7 @@ evil(Conf) when is_list(Conf) -> {size,?MAX+50},{format,external}]), [Fd] = erlang:ports() -- Ports0, {B,_} = x_mk_bytes(30), - ok = disk_log:blog(Log, <<0:(?MAX+1)/unit:8>>), + ok = disk_log:blog(Log, <<0:(?MAX-1)/unit:8>>), exit(Fd, kill), {error, {file_error,_,einval}} = disk_log:blog_terms(Log, [B,B]), ok= disk_log:close(Log), |