From 3097a53cc6bc3728f72c3873713f42d9c591ba0c Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 10:40:30 +0100 Subject: Minor documentation cleanup --- lib/kernel/doc/src/disk_log.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/kernel/doc/src/disk_log.xml b/lib/kernel/doc/src/disk_log.xml index 0b6ee1e6a5..3c9bc7f6e8 100644 --- a/lib/kernel/doc/src/disk_log.xml +++ b/lib/kernel/doc/src/disk_log.xml @@ -43,7 +43,7 @@ halt logs

Appends items to a single file, which size can - be limited by the disk log module.

+ be limited by the disk_log module.

wrap logs

Uses a sequence of wrap log files of limited size. As a wrap log file is filled up, further items are logged on to the next @@ -62,8 +62,8 @@ An item logged to an internally formatted log must not occupy more than 4 GB of disk space (the size must fit in 4 bytes).

external format -

Leaves it up to the user to read the logged deep byte lists. - The disk log module cannot repair externally formatted logs.

+

Leaves it up to the user to read and interpret the logged data. + The disk_log module cannot repair externally formatted logs.

For each open disk log, one process handles requests -- cgit v1.2.3 From a59be645027272e96acdddf5cf9710aeda1e11de Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 11:53:07 +0100 Subject: Rename internal function for clarity --- lib/kernel/src/disk_log.erl | 43 ++++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 9b44021872..515caf98a7 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -133,7 +133,7 @@ log(Log, Term) -> Log :: log(), Bytes :: bytes(). blog(Log, Bytes) -> - req(Log, {blog, check_bytes(Bytes)}). + req(Log, {blog, ensure_binary(Bytes)}). -spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when Log :: log(), @@ -147,7 +147,7 @@ log_terms(Log, Terms) -> Log :: log(), BytesList :: [bytes()]. blog_terms(Log, Bytess) -> - Bs = check_bytes_list(Bytess, Bytess), + Bs = ensure_binary_list(Bytess), req(Log, {blog, Bs}). -type notify_ret() :: 'ok' | {'error', 'no_such_log'}. @@ -169,13 +169,13 @@ alog_terms(Log, Terms) -> Log :: log(), Bytes :: bytes(). balog(Log, Bytes) -> - notify(Log, {balog, check_bytes(Bytes)}). + notify(Log, {balog, ensure_binary(Bytes)}). -spec balog_terms(Log, ByteList) -> notify_ret() when Log :: log(), ByteList :: [bytes()]. balog_terms(Log, Bytess) -> - Bs = check_bytes_list(Bytess, Bytess), + Bs = ensure_binary_list(Bytess), notify(Log, {balog, Bs}). -type close_error_rsn() ::'no_such_log' | 'nonode' @@ -221,7 +221,7 @@ truncate(Log, Head) -> Log :: log(), BHead :: bytes(). btruncate(Log, Head) -> - req(Log, {truncate, {ok, check_bytes(Head)}, btruncate, 2}). + req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}). -type reopen_error_rsn() :: no_such_log | nonode @@ -250,7 +250,7 @@ reopen(Log, NewFile, NewHead) -> File :: file:filename(), BHead :: bytes(). breopen(Log, NewFile, NewHead) -> - req(Log, {reopen, NewFile, {ok, check_bytes(NewHead)}, breopen, 3}). + req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}). -type inc_wrap_error_rsn() :: 'no_such_log' | 'nonode' | {'read_only_mode', log()} @@ -1326,7 +1326,7 @@ do_open(A) -> end. mk_head({head, Term}, internal) -> {ok, term_to_binary(Term)}; -mk_head({head, Bytes}, external) -> {ok, check_bytes(Bytes)}; +mk_head({head, Bytes}, external) -> {ok, ensure_binary(Bytes)}; mk_head(H, _) -> H. terms2bins([T | Ts]) -> @@ -1334,23 +1334,24 @@ terms2bins([T | Ts]) -> terms2bins([]) -> []. -check_bytes_list([B | Bs], Bs0) when is_binary(B) -> - check_bytes_list(Bs, Bs0); -check_bytes_list([], Bs0) -> +ensure_binary_list(Bs) -> + ensure_binary_list(Bs, Bs). + +ensure_binary_list([B | Bs], Bs0) when is_binary(B) -> + ensure_binary_list(Bs, Bs0); +ensure_binary_list([], Bs0) -> Bs0; -check_bytes_list(_, Bs0) -> - check_bytes_list(Bs0). - -check_bytes_list([B | Bs]) when is_binary(B) -> - [B | check_bytes_list(Bs)]; -check_bytes_list([B | Bs]) -> - [list_to_binary(B) | check_bytes_list(Bs)]; -check_bytes_list([]) -> +ensure_binary_list(_, Bs0) -> + make_binary_list(Bs0). + +make_binary_list([B | Bs]) -> + [ensure_binary(B) | make_binary_list(Bs)]; +make_binary_list([]) -> []. -check_bytes(Binary) when is_binary(Binary) -> +ensure_binary(Binary) when is_binary(Binary) -> Binary; -check_bytes(Bytes) -> +ensure_binary(Bytes) -> list_to_binary(Bytes). %%----------------------------------------------------------------- @@ -1388,7 +1389,7 @@ check_head({head_func, {M, F, A}}, _Format) when is_atom(M), is_list(A) -> {ok, {M, F, A}}; check_head({head, Head}, external) -> - case catch check_bytes(Head) of + case catch ensure_binary(Head) of {'EXIT', _} -> {error, {badarg, head}}; _ -> -- cgit v1.2.3 From a46e95598f6c587277671980115a4c61ff08c9cd Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 14:21:29 +0100 Subject: Eliminate some code duplication --- lib/kernel/src/disk_log.erl | 63 +++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 515caf98a7..312d07515b 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -686,7 +686,7 @@ handle({From, write_cache}, S) when From =:= self() -> Error -> loop(S#state{cache_error = Error}) end; -handle({From, {log, B}}, S) -> +handle({From, {log, B}}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -699,9 +699,9 @@ handle({From, {log, B}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {log, B}} | S#state.queue]}) - end; -handle({From, {blog, B}}, S) -> + enqueue(Message, S) + end; +handle({From, {blog, B}}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -712,9 +712,9 @@ handle({From, {blog, B}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {blog, B}} | S#state.queue]}) + enqueue(Message, S) end; -handle({alog, B}, S) -> +handle({alog, B}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> notify_owners({read_only,B}), @@ -728,9 +728,9 @@ handle({alog, B}, S) -> notify_owners({blocked_log, B}), loop(S); _ -> - loop(S#state{queue = [{alog, B} | S#state.queue]}) + enqueue(Message, S) end; -handle({balog, B}, S) -> +handle({balog, B}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> notify_owners({read_only,B}), @@ -741,9 +741,9 @@ handle({balog, B}, S) -> notify_owners({blocked_log, B}), loop(S); _ -> - loop(S#state{queue = [{balog, B} | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {block, QueueLogRecs}}, S) -> +handle({From, {block, QueueLogRecs}}=Message, S) -> case get(log) of L when L#log.status =:= ok -> do_block(From, QueueLogRecs, L), @@ -753,8 +753,7 @@ handle({From, {block, QueueLogRecs}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {block, QueueLogRecs}} | - S#state.queue]}) + enqueue(Message, S) end; handle({From, unblock}, S) -> case get(log) of @@ -766,7 +765,7 @@ handle({From, unblock}, S) -> L -> reply(From, {error, {not_blocked_by_pid, L#log.name}}, S) end; -handle({From, sync}, S) -> +handle({From, sync}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -777,9 +776,9 @@ handle({From, sync}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, sync} | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {truncate, Head, F, A}}, S) -> +handle({From, {truncate, Head, F, A}}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -801,10 +800,9 @@ handle({From, {truncate, Head, F, A}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {truncate, Head, F, A}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {chunk, Pos, B, N}}, S) -> +handle({From, {chunk, Pos, B, N}}=Message, S) -> case get(log) of L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); @@ -817,9 +815,9 @@ handle({From, {chunk, Pos, B, N}}, S) -> L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); _L -> - loop(S#state{queue = [{From, {chunk, Pos, B, N}} | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {chunk_step, Pos, N}}, S) -> +handle({From, {chunk_step, Pos, N}}=Message, S) -> case get(log) of L when L#log.status =:= ok, S#state.cache_error =/= ok -> loop(cache_error(S, [From])); @@ -832,10 +830,9 @@ handle({From, {chunk_step, Pos, N}}, S) -> L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {chunk_step, Pos, N}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {change_notify, Pid, NewNotify}}, S) -> +handle({From, {change_notify, Pid, NewNotify}}=Message, S) -> case get(log) of L when L#log.status =:= ok -> case do_change_notify(L, Pid, NewNotify) of @@ -850,10 +847,9 @@ handle({From, {change_notify, Pid, NewNotify}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {change_notify, Pid, NewNotify}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {change_header, NewHead}}, S) -> +handle({From, {change_header, NewHead}}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -870,10 +866,9 @@ handle({From, {change_header, NewHead}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {change_header, NewHead}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, {change_size, NewSize}}, S) -> +handle({From, {change_size, NewSize}}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -899,10 +894,9 @@ handle({From, {change_size, NewSize}}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, {change_size, NewSize}} - | S#state.queue]}) + enqueue(Message, S) end; -handle({From, inc_wrap_file}, S) -> +handle({From, inc_wrap_file}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); @@ -925,7 +919,7 @@ handle({From, inc_wrap_file}, S) -> L when L#log.blocked_by =:= From -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> - loop(S#state{queue = [{From, inc_wrap_file} | S#state.queue]}) + enqueue(Message, S) end; handle({From, {reopen, NewFile, Head, F, A}}, S) -> case get(log) of @@ -1034,6 +1028,9 @@ handle({system, From, Req}, S) -> handle(_, S) -> loop(S). +enqueue(Message, S) -> + loop(S#state{queue = [Message | S#state.queue]}). + sync_loop(From, S) -> log_loop(S, [], [], From, 0). -- cgit v1.2.3 From b653dc18fa91e68021b28bef37e942f7fa7f3809 Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 15:56:24 +0100 Subject: Only read log format once in collect loop --- lib/kernel/src/disk_log.erl | 54 +++++++++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 312d07515b..d06429d629 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -67,7 +67,7 @@ %%-define(PROFILE(C), C). -define(PROFILE(C), void). --compile({inline,[{log_loop,5},{log_end_sync,2},{replies,2},{rflat,1}]}). +-compile({inline,[{log_loop,6},{log_end_sync,2},{replies,2},{rflat,1}]}). %%%---------------------------------------------------------------------- %%% Contract type specifications @@ -691,7 +691,7 @@ handle({From, {log, B}}=Message, S) -> L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok, L#log.format =:= internal -> - log_loop(S, From, [B], [], iolist_size(B)); + log_loop(S, From, [B], []); L when L#log.status =:= ok, L#log.format =:= external -> reply(From, {error, {format_external, L#log.name}}, S); L when L#log.status =:= {blocked, false} -> @@ -706,7 +706,7 @@ handle({From, {blog, B}}=Message, S) -> L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> - log_loop(S, From, [B], [], iolist_size(B)); + log_loop(S, From, [B], []); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> @@ -720,7 +720,7 @@ handle({alog, B}=Message, S) -> notify_owners({read_only,B}), loop(S); L when L#log.status =:= ok, L#log.format =:= internal -> - log_loop(S, [], [B], [], iolist_size(B)); + log_loop(S, [], [B], []); L when L#log.status =:= ok -> notify_owners({format_external, B}), loop(S); @@ -736,7 +736,7 @@ handle({balog, B}=Message, S) -> notify_owners({read_only,B}), loop(S); L when L#log.status =:= ok -> - log_loop(S, [], [B], [], iolist_size(B)); + log_loop(S, [], [B], []); L when L#log.status =:= {blocked, false} -> notify_owners({blocked_log, B}), loop(S); @@ -770,7 +770,7 @@ handle({From, sync}=Message, S) -> L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> - sync_loop([From], S); + log_loop(S, [], [], [From]); L when L#log.status =:= {blocked, false} -> reply(From, {error, {blocked_log, L#log.name}}, S); L when L#log.blocked_by =:= From -> @@ -1031,41 +1031,43 @@ handle(_, S) -> enqueue(Message, S) -> loop(S#state{queue = [Message | S#state.queue]}). -sync_loop(From, S) -> - log_loop(S, [], [], From, 0). +%% Collect further log and sync requests already in the mailbox or queued -define(MAX_LOOK_AHEAD, 64*1024). +log_loop(S, Pids, Bins, Sync) -> + log_loop(S, Pids, Bins, Sync, iolist_size(Bins), (get(log))#log.format). + %% Inlined. -log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz) when CE =/= ok -> +log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok -> loop(cache_error(S, Pids)); -log_loop(#state{}=S, Pids, Bins, Sync, Sz) when Sz > ?MAX_LOOK_AHEAD -> +log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD -> loop(log_end(S, Pids, Bins, Sync)); -log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz) -> - receive +log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) -> + receive Message -> - log_loop(Message, Pids, Bins, Sync, Sz, S, get(log)) + log_loop(Message, Pids, Bins, Sync, Sz, F, S) after 0 -> loop(log_end(S, Pids, Bins, Sync)) end; -log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz) -> +log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) -> S1 = S#state{messages = Ms}, - log_loop(M, Pids, Bins, Sync, Sz, S1, get(log)). + log_loop(M, Pids, Bins, Sync, Sz, F, S1). %% Items logged after the last sync request found are sync:ed as well. -log_loop({alog,B}, Pids, Bins, Sync, Sz, S, #log{format = internal}) -> +log_loop({alog,B}, Pids, Bins, Sync, Sz, internal=F, S) -> %% {alog, _} allowed for the internal format only. - log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({balog, B}, Pids, Bins, Sync, Sz, S, _L) -> - log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({From, {log, B}}, Pids, Bins, Sync, Sz, S, #log{format = internal}) -> + log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({balog, B}, Pids, Bins, Sync, Sz, F, S) -> + log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({From, {log, B}}, Pids, Bins, Sync, Sz, internal=F, S) -> %% {log, _} allowed for the internal format only. - log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({From, {blog, B}}, Pids, Bins, Sync, Sz, S, _L) -> - log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B)); -log_loop({From, sync}, Pids, Bins, Sync, Sz, S, _L) -> - log_loop(S, Pids, Bins, [From | Sync], Sz); -log_loop(Message, Pids, Bins, Sync, _Sz, S, _L) -> + log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({From, {blog, B}}, Pids, Bins, Sync, Sz, F, S) -> + log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); +log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) -> + log_loop(S, Pids, Bins, [From | Sync], Sz, F); +log_loop(Message, Pids, Bins, Sync, _Sz, _F, S) -> NS = log_end(S, Pids, Bins, Sync), handle(Message, NS). -- cgit v1.2.3 From 6abc37b32ac6965ab780cd6fda5667585caaa711 Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 16:53:38 +0100 Subject: Use iolist_size instead of local function --- lib/kernel/src/disk_log.erl | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index d06429d629..8dd9975b23 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -1695,7 +1695,7 @@ do_unblock(L, S) -> do_log(L, B) when L#log.type =:= halt -> #log{format = Format, extra = Halt} = L, #halt{curB = CurSize, size = Sz} = Halt, - {Bs, BSize} = bsize(B, Format), + {Bs, BSize} = logl(B, Format), case get(is_full) of true -> {error, {error, {full, L#log.name}}, 0}; @@ -1731,17 +1731,14 @@ do_log(L, B) when L#log.format_type =:= wrap_ext -> {error, Error, Logged - Lost} end. -bsize(B, external) -> - {B, xsz(B, 0)}; -bsize(B, internal) -> +logl(B, external) -> + {B, iolist_size(B)}; +logl(B, internal) -> disk_log_1:logl(B). -xsz([B|T], Sz) -> xsz(T, byte_size(B) + Sz); -xsz([], Sz) -> Sz. - halt_write_full(L, [Bin | Bins], Format, N) -> B = [Bin], - {Bs, BSize} = bsize(B, Format), + {Bs, BSize} = logl(B, Format), Halt = L#log.extra, #halt{curB = CurSize, size = Sz} = Halt, if -- cgit v1.2.3 From 08d22e7658542279711753f3363299105bd669bb Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 17:21:49 +0100 Subject: Pass through known size instead of recomputing --- lib/kernel/src/disk_log.erl | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 8dd9975b23..1b28d05691 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -1042,13 +1042,13 @@ log_loop(S, Pids, Bins, Sync) -> log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok -> loop(cache_error(S, Pids)); log_loop(#state{}=S, Pids, Bins, Sync, Sz, _F) when Sz > ?MAX_LOOK_AHEAD -> - loop(log_end(S, Pids, Bins, Sync)); + loop(log_end(S, Pids, Bins, Sync, Sz)); log_loop(#state{messages = []}=S, Pids, Bins, Sync, Sz, F) -> receive Message -> log_loop(Message, Pids, Bins, Sync, Sz, F, S) after 0 -> - loop(log_end(S, Pids, Bins, Sync)) + loop(log_end(S, Pids, Bins, Sync, Sz)) end; log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) -> S1 = S#state{messages = Ms}, @@ -1067,14 +1067,14 @@ log_loop({From, {blog, B}}, Pids, Bins, Sync, Sz, F, S) -> log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) -> log_loop(S, Pids, Bins, [From | Sync], Sz, F); -log_loop(Message, Pids, Bins, Sync, _Sz, _F, S) -> - NS = log_end(S, Pids, Bins, Sync), +log_loop(Message, Pids, Bins, Sync, Sz, _F, S) -> + NS = log_end(S, Pids, Bins, Sync, Sz), handle(Message, NS). -log_end(S, [], [], Sync) -> +log_end(S, [], [], Sync, _Sz) -> log_end_sync(S, Sync); -log_end(S, Pids, Bins, Sync) -> - case do_log(get(log), rflat(Bins)) of +log_end(S, Pids, Bins, Sync, Sz) -> + case do_log(get(log), rflat(Bins), Sz) of N when is_integer(N) -> ok = replies(Pids, ok), S1 = (state_ok(S))#state{cnt = S#state.cnt+N}, @@ -1692,10 +1692,13 @@ do_unblock(L, S) -> -spec do_log(#log{}, [binary()]) -> integer() | {'error', _, integer()}. -do_log(L, B) when L#log.type =:= halt -> +do_log(L, B) -> + do_log(L, B, iolist_size(B)). + +do_log(L, B, BSz) when L#log.type =:= halt -> #log{format = Format, extra = Halt} = L, #halt{curB = CurSize, size = Sz} = Halt, - {Bs, BSize} = logl(B, Format), + {Bs, BSize} = logl(B, Format, BSz), case get(is_full) of true -> {error, {error, {full, L#log.name}}, 0}; @@ -1704,7 +1707,7 @@ do_log(L, B) when L#log.type =:= halt -> undefined -> halt_write_full(L, B, Format, 0) end; -do_log(L, B) when L#log.format_type =:= wrap_int -> +do_log(L, B, _BSz) when L#log.format_type =:= wrap_int -> case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), @@ -1717,7 +1720,7 @@ do_log(L, B) when L#log.format_type =:= wrap_int -> put(log, L#log{extra = Handle}), {error, Error, Logged - Lost} end; -do_log(L, B) when L#log.format_type =:= wrap_ext -> +do_log(L, B, _BSz) when L#log.format_type =:= wrap_ext -> case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), @@ -1731,14 +1734,16 @@ do_log(L, B) when L#log.format_type =:= wrap_ext -> {error, Error, Logged - Lost} end. -logl(B, external) -> +logl(B, external, undefined) -> {B, iolist_size(B)}; -logl(B, internal) -> +logl(B, external, Sz) -> + {B, Sz}; +logl(B, internal, _Sz) -> disk_log_1:logl(B). halt_write_full(L, [Bin | Bins], Format, N) -> B = [Bin], - {Bs, BSize} = logl(B, Format), + {Bs, BSize} = logl(B, Format, undefined), Halt = L#log.extra, #halt{curB = CurSize, size = Sz} = Halt, if -- cgit v1.2.3 From 9bb7aee366b04f6e8b6ea8491c0d7ebbdb304bb6 Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 15 Nov 2016 18:49:39 +0100 Subject: Clarify that the type for disk log data is iodata() --- lib/kernel/doc/src/disk_log.xml | 13 +++---------- lib/kernel/src/disk_log.erl | 18 +++++++----------- lib/kernel/src/disk_log.hrl | 3 +-- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/lib/kernel/doc/src/disk_log.xml b/lib/kernel/doc/src/disk_log.xml index 3c9bc7f6e8..aebeacee28 100644 --- a/lib/kernel/doc/src/disk_log.xml +++ b/lib/kernel/doc/src/disk_log.xml @@ -109,8 +109,7 @@ These functions log one or more Erlang terms. By prefixing each of the functions with a b (for "binary"), we get the corresponding blog() functions for the external format. - These functions log one or more deep lists of bytes or, alternatively, - binaries of deep lists of bytes. + These functions log one or more chunks of bytes. For example, to log the string "hello" in ASCII format, you can use disk_log:blog(Log, "hello"), or disk_log:blog(Log, list_to_binary("hello")). The two @@ -218,9 +217,6 @@ - - - @@ -233,9 +229,6 @@ chunk/2,3, bchunk/2,3, or chunk_step/3.

- - - @@ -953,7 +946,7 @@ written first on the log file. If the log is a wrap log, the item Head is written first in each new file. Head is to be a term if the format is - internal, otherwise a deep list of bytes (or a binary). + internal, otherwise a sequence of bytes. Defaults to none, which means that no header is written first on the file.

@@ -965,7 +958,7 @@ The call M:F(A) is assumed to return {ok, Head}. The item Head is written first in each file. Head is to be a term if the format is - internal, otherwise a deep list of bytes (or a binary). + internal, otherwise a sequence of bytes.

{mode, Mode} diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 1b28d05691..bc31f1fbbd 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -75,8 +75,6 @@ -opaque continuation() :: #continuation{}. --type bytes() :: binary() | [byte()]. - -type file_error() :: term(). % XXX: refine -type invalid_header() :: term(). % XXX: refine @@ -131,7 +129,7 @@ log(Log, Term) -> -spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when Log :: log(), - Bytes :: bytes(). + Bytes :: iodata(). blog(Log, Bytes) -> req(Log, {blog, ensure_binary(Bytes)}). @@ -145,7 +143,7 @@ log_terms(Log, Terms) -> -spec blog_terms(Log, BytesList) -> ok | {error, Reason :: log_error_rsn()} when Log :: log(), - BytesList :: [bytes()]. + BytesList :: [iodata()]. blog_terms(Log, Bytess) -> Bs = ensure_binary_list(Bytess), req(Log, {blog, Bs}). @@ -167,13 +165,13 @@ alog_terms(Log, Terms) -> -spec balog(Log, Bytes) -> notify_ret() when Log :: log(), - Bytes :: bytes(). + Bytes :: iodata(). balog(Log, Bytes) -> notify(Log, {balog, ensure_binary(Bytes)}). -spec balog_terms(Log, ByteList) -> notify_ret() when Log :: log(), - ByteList :: [bytes()]. + ByteList :: [iodata()]. balog_terms(Log, Bytess) -> Bs = ensure_binary_list(Bytess), notify(Log, {balog, Bs}). @@ -219,7 +217,7 @@ truncate(Log, Head) -> -spec btruncate(Log, BHead) -> 'ok' | {'error', trunc_error_rsn()} when Log :: log(), - BHead :: bytes(). + BHead :: iodata(). btruncate(Log, Head) -> req(Log, {truncate, {ok, ensure_binary(Head)}, btruncate, 2}). @@ -248,7 +246,7 @@ reopen(Log, NewFile, NewHead) -> -spec breopen(Log, File, BHead) -> 'ok' | {'error', reopen_error_rsn()} when Log :: log(), File :: file:filename(), - BHead :: bytes(). + BHead :: iodata(). breopen(Log, NewFile, NewHead) -> req(Log, {reopen, NewFile, {ok, ensure_binary(NewHead)}, breopen, 3}). @@ -1348,10 +1346,8 @@ make_binary_list([B | Bs]) -> make_binary_list([]) -> []. -ensure_binary(Binary) when is_binary(Binary) -> - Binary; ensure_binary(Bytes) -> - list_to_binary(Bytes). + iolist_to_binary(Bytes). %%----------------------------------------------------------------- %% Change size of the logs in runtime. diff --git a/lib/kernel/src/disk_log.hrl b/lib/kernel/src/disk_log.hrl index 3262d979ee..3cf8a3b3a2 100644 --- a/lib/kernel/src/disk_log.hrl +++ b/lib/kernel/src/disk_log.hrl @@ -54,11 +54,10 @@ %% Types -- alphabetically %%------------------------------------------------------------------------ --type dlog_byte() :: [dlog_byte()] | byte(). -type dlog_format() :: 'external' | 'internal'. -type dlog_format_type() :: 'halt_ext' | 'halt_int' | 'wrap_ext' | 'wrap_int'. -type dlog_head() :: 'none' | {'ok', binary()} | mfa(). --type dlog_head_opt() :: none | term() | binary() | [dlog_byte()]. +-type dlog_head_opt() :: none | term() | iodata(). -type log() :: term(). % XXX: refine -type dlog_mode() :: 'read_only' | 'read_write'. -type dlog_name() :: atom() | string(). -- cgit v1.2.3 From 3900d246682a46cec3b4dcf4c1b13259038ab358 Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Wed, 16 Nov 2016 15:11:45 +0100 Subject: Simplify for rflat --- lib/kernel/src/disk_log.erl | 11 ++++------- lib/kernel/test/disk_log_SUITE.erl | 10 +++++----- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index bc31f1fbbd..2a7afb4c53 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -125,13 +125,13 @@ open(A) -> Log :: log(), Term :: term(). log(Log, Term) -> - req(Log, {log, term_to_binary(Term)}). + req(Log, {log, [term_to_binary(Term)]}). -spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when Log :: log(), Bytes :: iodata(). blog(Log, Bytes) -> - req(Log, {blog, ensure_binary(Bytes)}). + req(Log, {blog, [ensure_binary(Bytes)]}). -spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when Log :: log(), @@ -154,7 +154,7 @@ blog_terms(Log, Bytess) -> Log :: log(), Term :: term(). alog(Log, Term) -> - notify(Log, {alog, term_to_binary(Term)}). + notify(Log, {alog, [term_to_binary(Term)]}). -spec alog_terms(Log, TermList) -> notify_ret() when Log :: log(), @@ -167,7 +167,7 @@ alog_terms(Log, Terms) -> Log :: log(), Bytes :: iodata(). balog(Log, Bytes) -> - notify(Log, {balog, ensure_binary(Bytes)}). + notify(Log, {balog, [ensure_binary(Bytes)]}). -spec balog_terms(Log, ByteList) -> notify_ret() when Log :: log(), @@ -1093,12 +1093,9 @@ log_end_sync(S, Sync) -> state_err(S, Res). %% Inlined. -rflat([B]=L) when is_binary(B) -> L; rflat([B]) -> B; rflat(B) -> rflat(B, []). -rflat([B | Bs], L) when is_binary(B) -> - rflat(Bs, [B | L]); rflat([B | Bs], L) -> rflat(Bs, B ++ L); rflat([], L) -> L. diff --git a/lib/kernel/test/disk_log_SUITE.erl b/lib/kernel/test/disk_log_SUITE.erl index f7ad9c0c04..a25b315d9d 100644 --- a/lib/kernel/test/disk_log_SUITE.erl +++ b/lib/kernel/test/disk_log_SUITE.erl @@ -421,7 +421,7 @@ halt_ro_alog(Conf) when is_list(Conf) -> halt_ro_alog_wait_notify(Log, T) -> Term = term_to_binary(T), receive - {disk_log, _, Log,{read_only, Term}} -> + {disk_log, _, Log,{read_only, [Term]}} -> ok; Other -> Other @@ -449,7 +449,7 @@ halt_ro_balog(Conf) when is_list(Conf) -> halt_ro_balog_wait_notify(Log, T) -> Term = list_to_binary(T), receive - {disk_log, _, Log,{read_only, Term}} -> + {disk_log, _, Log,{read_only, [Term]}} -> ok; Other -> Other @@ -1385,15 +1385,15 @@ blocked_notif(Conf) when is_list(Conf) -> "The requested operation" ++ _ = format_error(Error1), ok = disk_log:blog(n, B), ok = disk_log:alog(n, B), - rec(1, {disk_log, node(), n, {format_external, term_to_binary(B)}}), + rec(1, {disk_log, node(), n, {format_external, [term_to_binary(B)]}}), ok = disk_log:alog_terms(n, [B,B,B,B]), rec(1, {disk_log, node(), n, {format_external, lists:map(fun term_to_binary/1, [B,B,B,B])}}), ok = disk_log:block(n, false), ok = disk_log:alog(n, B), - rec(1, {disk_log, node(), n, {blocked_log, term_to_binary(B)}}), + rec(1, {disk_log, node(), n, {blocked_log, [term_to_binary(B)]}}), ok = disk_log:balog(n, B), - rec(1, {disk_log, node(), n, {blocked_log, list_to_binary(B)}}), + rec(1, {disk_log, node(), n, {blocked_log, [list_to_binary(B)]}}), ok = disk_log:balog_terms(n, [B,B,B,B]), disk_log:close(n), rec(1, {disk_log, node(), n, {blocked_log, -- cgit v1.2.3 From 3aadf13da204f955fe7cb6932c75bf71a856650b Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Wed, 16 Nov 2016 21:53:34 +0100 Subject: Eliminate more code duplication --- lib/kernel/src/disk_log.erl | 62 +++++++++++++-------------------------------- 1 file changed, 18 insertions(+), 44 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 2a7afb4c53..ad3c5ae227 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -125,20 +125,20 @@ open(A) -> Log :: log(), Term :: term(). log(Log, Term) -> - req(Log, {log, [term_to_binary(Term)]}). + req(Log, {log, internal, [term_to_binary(Term)]}). -spec blog(Log, Bytes) -> ok | {error, Reason :: log_error_rsn()} when Log :: log(), Bytes :: iodata(). blog(Log, Bytes) -> - req(Log, {blog, [ensure_binary(Bytes)]}). + req(Log, {log, external, [ensure_binary(Bytes)]}). -spec log_terms(Log, TermList) -> ok | {error, Resaon :: log_error_rsn()} when Log :: log(), TermList :: [term()]. log_terms(Log, Terms) -> Bs = terms2bins(Terms), - req(Log, {log, Bs}). + req(Log, {log, internal, Bs}). -spec blog_terms(Log, BytesList) -> ok | {error, Reason :: log_error_rsn()} when @@ -146,7 +146,7 @@ log_terms(Log, Terms) -> BytesList :: [iodata()]. blog_terms(Log, Bytess) -> Bs = ensure_binary_list(Bytess), - req(Log, {blog, Bs}). + req(Log, {log, external, Bs}). -type notify_ret() :: 'ok' | {'error', 'no_such_log'}. @@ -154,27 +154,27 @@ blog_terms(Log, Bytess) -> Log :: log(), Term :: term(). alog(Log, Term) -> - notify(Log, {alog, [term_to_binary(Term)]}). + notify(Log, {alog, internal, [term_to_binary(Term)]}). -spec alog_terms(Log, TermList) -> notify_ret() when Log :: log(), TermList :: [term()]. alog_terms(Log, Terms) -> Bs = terms2bins(Terms), - notify(Log, {alog, Bs}). + notify(Log, {alog, internal, Bs}). -spec balog(Log, Bytes) -> notify_ret() when Log :: log(), Bytes :: iodata(). balog(Log, Bytes) -> - notify(Log, {balog, [ensure_binary(Bytes)]}). + notify(Log, {alog, external, [ensure_binary(Bytes)]}). -spec balog_terms(Log, ByteList) -> notify_ret() when Log :: log(), ByteList :: [iodata()]. balog_terms(Log, Bytess) -> Bs = ensure_binary_list(Bytess), - notify(Log, {balog, Bs}). + notify(Log, {alog, external, Bs}). -type close_error_rsn() ::'no_such_log' | 'nonode' | {'file_error', file:filename(), file_error()}. @@ -684,25 +684,12 @@ handle({From, write_cache}, S) when From =:= self() -> Error -> loop(S#state{cache_error = Error}) end; -handle({From, {log, B}}=Message, S) -> +handle({From, {log, Format, B}}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, L#log.format =:= internal -> - log_loop(S, From, [B], []); - L when L#log.status =:= ok, L#log.format =:= external -> + L when L#log.status =:= ok, L#log.format =:= external, Format =:= internal -> reply(From, {error, {format_external, L#log.name}}, S); - L when L#log.status =:= {blocked, false} -> - reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> - reply(From, {error, {blocked_log, L#log.name}}, S); - _ -> - enqueue(Message, S) - end; -handle({From, {blog, B}}=Message, S) -> - case get(log) of - L when L#log.mode =:= read_only -> - reply(From, {error, {read_only_mode, L#log.name}}, S); L when L#log.status =:= ok -> log_loop(S, From, [B], []); L when L#log.status =:= {blocked, false} -> @@ -712,27 +699,14 @@ handle({From, {blog, B}}=Message, S) -> _ -> enqueue(Message, S) end; -handle({alog, B}=Message, S) -> +handle({alog, Format, B}=Message, S) -> case get(log) of L when L#log.mode =:= read_only -> notify_owners({read_only,B}), loop(S); - L when L#log.status =:= ok, L#log.format =:= internal -> - log_loop(S, [], [B], []); - L when L#log.status =:= ok -> + L when L#log.status =:= ok, L#log.format =:= external, Format =:= internal -> notify_owners({format_external, B}), loop(S); - L when L#log.status =:= {blocked, false} -> - notify_owners({blocked_log, B}), - loop(S); - _ -> - enqueue(Message, S) - end; -handle({balog, B}=Message, S) -> - case get(log) of - L when L#log.mode =:= read_only -> - notify_owners({read_only,B}), - loop(S); L when L#log.status =:= ok -> log_loop(S, [], [B], []); L when L#log.status =:= {blocked, false} -> @@ -1053,15 +1027,15 @@ log_loop(#state{messages = [M | Ms]}=S, Pids, Bins, Sync, Sz, F) -> log_loop(M, Pids, Bins, Sync, Sz, F, S1). %% Items logged after the last sync request found are sync:ed as well. -log_loop({alog,B}, Pids, Bins, Sync, Sz, internal=F, S) -> - %% {alog, _} allowed for the internal format only. +log_loop({alog, internal, B}, Pids, Bins, Sync, Sz, internal=F, S) -> + %% alog of terms allowed for the internal format only log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); -log_loop({balog, B}, Pids, Bins, Sync, Sz, F, S) -> +log_loop({alog, binary, B}, Pids, Bins, Sync, Sz, F, S) -> log_loop(S, Pids, [B | Bins], Sync, Sz+iolist_size(B), F); -log_loop({From, {log, B}}, Pids, Bins, Sync, Sz, internal=F, S) -> - %% {log, _} allowed for the internal format only. +log_loop({From, {log, internal, B}}, Pids, Bins, Sync, Sz, internal=F, S) -> + %% log of terms allowed for the internal format only log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); -log_loop({From, {blog, B}}, Pids, Bins, Sync, Sz, F, S) -> +log_loop({From, {log, binary, B}}, Pids, Bins, Sync, Sz, F, S) -> log_loop(S, [From | Pids], [B | Bins], Sync, Sz+iolist_size(B), F); log_loop({From, sync}, Pids, Bins, Sync, Sz, F, S) -> log_loop(S, Pids, Bins, [From | Sync], Sz, F); -- cgit v1.2.3 From 5a8a1a963edd6d6dbb35bf615a4bad7d77041443 Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Wed, 16 Nov 2016 23:07:29 +0100 Subject: Use pattern matching for records where suitable --- lib/kernel/src/disk_log.erl | 166 ++++++++++++++++++++++---------------------- 1 file changed, 82 insertions(+), 84 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index ad3c5ae227..69c65c9c43 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -668,13 +668,12 @@ init(Parent, Server) -> process_flag(trap_exit, true), loop(#state{parent = Parent, server = Server}). -loop(State) when State#state.messages =:= [] -> +loop(#state{messages = []}=State) -> receive Message -> handle(Message, State) end; -loop(State) -> - [M | Ms] = State#state.messages, +loop(#state{messages = [M | Ms]}=State) -> handle(M, State#state{messages = Ms}). handle({From, write_cache}, S) when From =:= self() -> @@ -686,30 +685,30 @@ handle({From, write_cache}, S) when From =:= self() -> end; handle({From, {log, Format, B}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, L#log.format =:= external, Format =:= internal -> + #log{status = ok, format=external}=L when Format =:= internal -> reply(From, {error, {format_external, L#log.name}}, S); - L when L#log.status =:= ok -> + #log{status = ok} -> log_loop(S, From, [B], []); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({alog, Format, B}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only} -> notify_owners({read_only,B}), loop(S); - L when L#log.status =:= ok, L#log.format =:= external, Format =:= internal -> + #log{status = ok, format = external} when Format =:= internal -> notify_owners({format_external, B}), loop(S); - L when L#log.status =:= ok -> + #log{status = ok} -> log_loop(S, [], [B], []); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}} -> notify_owners({blocked_log, B}), loop(S); _ -> @@ -717,21 +716,21 @@ handle({alog, Format, B}=Message, S) -> end; handle({From, {block, QueueLogRecs}}=Message, S) -> case get(log) of - L when L#log.status =:= ok -> + #log{status = ok}=L -> do_block(From, QueueLogRecs, L), reply(From, ok, S); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, unblock}, S) -> case get(log) of - L when L#log.status =:= ok -> + #log{status = ok}=L -> reply(From, {error, {not_blocked, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> S2 = do_unblock(L, S), reply(From, ok, S2); L -> @@ -739,24 +738,24 @@ handle({From, unblock}, S) -> end; handle({From, sync}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> + #log{status = ok} -> log_loop(S, [], [], [From]); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, {truncate, Head, F, A}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> H = merge_head(Head, L#log.head), case catch do_trunc(L, H) of ok -> @@ -767,46 +766,46 @@ handle({From, {truncate, Head, F, A}}=Message, S) -> Error -> do_exit(S, From, Error, ?failure(Error, F, A)) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, {chunk, Pos, B, N}}=Message, S) -> case get(log) of - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> R = do_chunk(L, Pos, B, N), reply(From, R, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> R = do_chunk(L, Pos, B, N), reply(From, R, S); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _L -> enqueue(Message, S) end; handle({From, {chunk_step, Pos, N}}=Message, S) -> case get(log) of - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> R = do_chunk_step(L, Pos, N), reply(From, R, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> R = do_chunk_step(L, Pos, N), reply(From, R, S); - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, {change_notify, Pid, NewNotify}}=Message, S) -> case get(log) of - L when L#log.status =:= ok -> + #log{status = ok}=L -> case do_change_notify(L, Pid, NewNotify) of {ok, L1} -> put(log, L1), @@ -814,37 +813,37 @@ handle({From, {change_notify, Pid, NewNotify}}=Message, S) -> Error -> reply(From, Error, S) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, {change_header, NewHead}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> - case check_head(NewHead, L#log.format) of + #log{status = ok, format = Format}=L -> + case check_head(NewHead, Format) of {ok, Head} -> - put(log, L#log{head = mk_head(Head, L#log.format)}), + put(log, L#log{head = mk_head(Head, Format)}), reply(From, ok, S); Error -> reply(From, Error, S) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, {change_size, NewSize}}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok -> + #log{status = ok}=L -> case check_size(L#log.type, NewSize) of ok -> case catch do_change_size(L, NewSize) of % does the put @@ -861,22 +860,22 @@ handle({From, {change_size, NewSize}}=Message, S) -> not_ok -> reply(From, {error, {badarg, size}}, S) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, inc_wrap_file}=Message, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.type =:= halt -> + #log{type = halt}=L -> reply(From, {error, {halt_log, L#log.name}}, S); - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok -> + #log{status = ok}=L -> case catch do_inc_wrap_file(L) of {ok, L2, Lost} -> put(log, L2), @@ -886,20 +885,22 @@ handle({From, inc_wrap_file}=Message, S) -> put(log, L2), reply(From, Error, state_err(S, Error)) end; - L when L#log.status =:= {blocked, false} -> + #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); - L when L#log.blocked_by =:= From -> + #log{blocked_by = From}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); _ -> enqueue(Message, S) end; handle({From, {reopen, NewFile, Head, F, A}}, S) -> case get(log) of - L when L#log.mode =:= read_only -> + #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - L when L#log.status =:= ok, S#state.cache_error =/= ok -> + #log{status = ok} when S#state.cache_error =/= ok -> loop(cache_error(S, [From])); - L when L#log.status =:= ok, L#log.filename =/= NewFile -> + #log{status = ok, filename = NewFile}=L -> + reply(From, {error, {same_file_name, L#log.name}}, S); + #log{status = ok}=L -> case catch close_disk_log2(L) of closed -> File = L#log.filename, @@ -932,8 +933,6 @@ handle({From, {reopen, NewFile, Head, F, A}}, S) -> Error -> do_exit(S, From, Error, ?failure(Error, F, A)) end; - L when L#log.status =:= ok -> - reply(From, {error, {same_file_name, L#log.name}}, S); L -> reply(From, {error, {blocked_log, L#log.name}}, S) end; @@ -971,11 +970,11 @@ handle({From, close}, S) -> end; handle({From, info}, S) -> reply(From, do_info(get(log), S#state.cnt), S); -handle({'EXIT', From, Reason}, S) when From =:= S#state.parent -> +handle({'EXIT', From, Reason}, #state{parent=From}=S) -> %% Parent orders shutdown. _ = do_stop(S), exit(Reason); -handle({'EXIT', From, Reason}, S) when From =:= S#state.server -> +handle({'EXIT', From, Reason}, #state{server=From}=S) -> %% The server is gone. _ = do_stop(S), exit(Reason); @@ -1000,8 +999,8 @@ handle({system, From, Req}, S) -> handle(_, S) -> loop(S). -enqueue(Message, S) -> - loop(S#state{queue = [Message | S#state.queue]}). +enqueue(Message, #state{queue = Queue}=S) -> + loop(S#state{queue = [Message | Queue]}). %% Collect further log and sync requests already in the mailbox or queued @@ -1045,17 +1044,17 @@ log_loop(Message, Pids, Bins, Sync, Sz, _F, S) -> log_end(S, [], [], Sync, _Sz) -> log_end_sync(S, Sync); -log_end(S, Pids, Bins, Sync, Sz) -> +log_end(#state{cnt = Cnt}=S, Pids, Bins, Sync, Sz) -> case do_log(get(log), rflat(Bins), Sz) of N when is_integer(N) -> ok = replies(Pids, ok), - S1 = (state_ok(S))#state{cnt = S#state.cnt+N}, + S1 = (state_ok(S))#state{cnt = Cnt + N}, log_end_sync(S1, Sync); {error, {error, {full, _Name}}, N} when Pids =:= [] -> - log_end_sync(state_ok(S#state{cnt = S#state.cnt + N}), Sync); + log_end_sync(state_ok(S#state{cnt = Cnt + N}), Sync); {error, Error, N} -> ok = replies(Pids, Error), - state_err(S#state{cnt = S#state.cnt + N}, Error) + state_err(S#state{cnt = Cnt + N}, Error) end. %% Inlined. @@ -1106,17 +1105,17 @@ close_owner(Pid, L, S) -> S2 = do_unblock(Pid, get(log), S), unlink(Pid), do_close2(L1, S2). - + %% -> {stop, S} | {continue, S} -close_user(Pid, L, S) when L#log.users > 0 -> - L1 = L#log{users = L#log.users - 1}, +close_user(Pid, #log{users=Users}=L, S) when Users > 0 -> + L1 = L#log{users = Users - 1}, put(log, L1), S2 = do_unblock(Pid, get(log), S), do_close2(L1, S2); close_user(_Pid, _L, S) -> {continue, S}. -do_close2(L, S) when L#log.users =:= 0, L#log.owners =:= [] -> +do_close2(#log{users = 0, owners = []}, S) -> {stop, S}; do_close2(_L, S) -> {continue, S}. @@ -1195,14 +1194,14 @@ add_pid(Pid, Notify, L) when is_pid(Pid) -> add_pid(_NotAPid, _Notify, L) -> {ok, L#log{users = L#log.users + 1}}. -unblock_pid(L) when L#log.blocked_by =:= none -> +unblock_pid(#log{blocked_by = none}) -> ok; -unblock_pid(L) -> - case is_owner(L#log.blocked_by, L) of +unblock_pid(#log{blocked_by = Pid}=L) -> + case is_owner(Pid, L) of {true, _Notify} -> ok; false -> - unlink(L#log.blocked_by) + unlink(Pid) end. %% -> true | false @@ -1324,7 +1323,7 @@ ensure_binary(Bytes) -> %% Change size of the logs in runtime. %%----------------------------------------------------------------- %% -> ok | {big, CurSize} | throw(Error) -do_change_size(L, NewSize) when L#log.type =:= halt -> +do_change_size(#log{type = halt}=L, NewSize) -> Halt = L#log.extra, CurB = Halt#halt.curB, NewLog = L#log{extra = Halt#halt{size = NewSize}}, @@ -1340,7 +1339,7 @@ do_change_size(L, NewSize) when L#log.type =:= halt -> true -> {big, CurB} end; -do_change_size(L, NewSize) when L#log.type =:= wrap -> +do_change_size(#log{type = wrap}=L, NewSize) -> #log{extra = Extra, version = Version} = L, {ok, Handle} = disk_log_1:change_size_wrap(Extra, NewSize, Version), erase(is_full), @@ -1641,7 +1640,7 @@ do_block(Pid, QueueLogRecs, L) -> link(Pid) end. -do_unblock(Pid, L, S) when L#log.blocked_by =:= Pid -> +do_unblock(Pid, #log{blocked_by = Pid}=L, S) -> do_unblock(L, S); do_unblock(_Pid, _L, S) -> S. @@ -1662,7 +1661,7 @@ do_unblock(L, S) -> do_log(L, B) -> do_log(L, B, iolist_size(B)). -do_log(L, B, BSz) when L#log.type =:= halt -> +do_log(#log{type = halt}=L, B, BSz) -> #log{format = Format, extra = Halt} = L, #halt{curB = CurSize, size = Sz} = Halt, {Bs, BSize} = logl(B, Format, BSz), @@ -1674,7 +1673,7 @@ do_log(L, B, BSz) when L#log.type =:= halt -> undefined -> halt_write_full(L, B, Format, 0) end; -do_log(L, B, _BSz) when L#log.format_type =:= wrap_int -> +do_log(#log{format_type = wrap_int}=L, B, _BSz) -> case disk_log_1:mf_int_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), @@ -1687,7 +1686,7 @@ do_log(L, B, _BSz) when L#log.format_type =:= wrap_int -> put(log, L#log{extra = Handle}), {error, Error, Logged - Lost} end; -do_log(L, B, _BSz) when L#log.format_type =:= wrap_ext -> +do_log(#log{format_type = wrap_ext}=L, B, _BSz) -> case disk_log_1:mf_ext_log(L#log.extra, B, L#log.head) of {ok, Handle, Logged, Lost, Wraps} -> notify_owners_wrap(Wraps), @@ -1762,7 +1761,7 @@ do_sync(#log{type = wrap, extra = Handle} = Log) -> Reply. %% -> ok | Error | throw(Error) -do_trunc(L, Head) when L#log.type =:= halt -> +do_trunc(#log{type = halt}=L, Head) -> #log{filename = FName, extra = Halt} = L, FdC = Halt#halt.fdc, {Reply1, FdC2} = @@ -1791,7 +1790,7 @@ do_trunc(L, Head) when L#log.type =:= halt -> end, put(log, L#log{extra = NewHalt}), Reply; -do_trunc(L, Head) when L#log.type =:= wrap -> +do_trunc(#log{type = wrap}=L, Head) -> Handle = L#log.extra, OldHead = L#log.head, {MaxB, MaxF} = disk_log_1:get_wrap_size(Handle), @@ -1985,8 +1984,7 @@ notify_owners(Note) -> (_) -> ok end, L#log.owners). -cache_error(S, Pids) -> - Error = S#state.cache_error, +cache_error(#state{cache_error=Error}=S, Pids) -> ok = replies(Pids, Error), state_err(S#state{cache_error = ok}, Error). -- cgit v1.2.3 From eda5c4859f561fae7d3a3b74f19dd8596c212966 Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Tue, 1 Nov 2016 15:36:57 +0100 Subject: Improve caching in disk_log Avoid starting timers for flushing when the written data is empty or larger than the max cache size. Previously, a single huge write to an empty cache would be put in the cache until the next write or the timer event. Also increase the cache size from 16K to 64K. --- lib/kernel/src/disk_log.hrl | 1 + lib/kernel/src/disk_log_1.erl | 32 ++++++++++++++++++++++---------- lib/kernel/test/disk_log_SUITE.erl | 4 ++-- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/lib/kernel/src/disk_log.hrl b/lib/kernel/src/disk_log.hrl index 3cf8a3b3a2..593dbb31ab 100644 --- a/lib/kernel/src/disk_log.hrl +++ b/lib/kernel/src/disk_log.hrl @@ -39,6 +39,7 @@ -define(MAX_FILES, 65000). -define(MAX_BYTES, ((1 bsl 64) - 1)). -define(MAX_CHUNK_SIZE, 65536). +-define(MAX_FWRITE_CACHE, 65536). %% Object defines -define(LOGMAGIC, <<1,2,3,4>>). diff --git a/lib/kernel/src/disk_log_1.erl b/lib/kernel/src/disk_log_1.erl index 2e61363aa6..d83c30f35f 100644 --- a/lib/kernel/src/disk_log_1.erl +++ b/lib/kernel/src/disk_log_1.erl @@ -1416,24 +1416,36 @@ open_truncate(FileName) -> %%% Functions that access files, and throw on error. --define(MAX, 16384). % bytes -define(TIMEOUT, 2000). % ms %% -> {Reply, cache()}; Reply = ok | Error -fwrite(#cache{c = []} = FdC, _FN, B, Size) -> +fwrite(FdC, _FN, _B, 0) -> + {ok, FdC}; % avoid starting a timer for empty writes +fwrite(#cache{fd = Fd, c = C, sz = Sz} = FdC, FileName, B, Size) -> + Sz1 = Sz + Size, + C1 = cache_append(C, B), + if Sz1 > ?MAX_FWRITE_CACHE -> + write_cache(Fd, FileName, C1); + true -> + maybe_start_timer(C), + {ok, FdC#cache{sz = Sz1, c = C1}} + end. + +cache_append([], B) -> B; +cache_append(C, B) -> [C | B]. + +%% if the cache was empty, start timer (unless it's already running) +maybe_start_timer([]) -> case get(write_cache_timer_is_running) of - true -> + true -> ok; - _ -> + _ -> put(write_cache_timer_is_running, true), erlang:send_after(?TIMEOUT, self(), {self(), write_cache}), ok - end, - {ok, FdC#cache{sz = Size, c = B}}; -fwrite(#cache{sz = Sz, c = C} = FdC, _FN, B, Size) when Sz < ?MAX -> - {ok, FdC#cache{sz = Sz+Size, c = [C | B]}}; -fwrite(#cache{fd = Fd, c = C}, FileName, B, _Size) -> - write_cache(Fd, FileName, [C | B]). + end; +maybe_start_timer(_C) -> + ok. fwrite_header(Fd, B, Size) -> {ok, #cache{fd = Fd, sz = Size, c = B}}. diff --git a/lib/kernel/test/disk_log_SUITE.erl b/lib/kernel/test/disk_log_SUITE.erl index a25b315d9d..23fe975ef7 100644 --- a/lib/kernel/test/disk_log_SUITE.erl +++ b/lib/kernel/test/disk_log_SUITE.erl @@ -4666,7 +4666,7 @@ other_groups(Conf) when is_list(Conf) -> ok. --define(MAX, 16384). % MAX in disk_log_1.erl +-define(MAX, ?MAX_FWRITE_CACHE). % as in disk_log_1.erl %% Evil cases such as closed file descriptor port. evil(Conf) when is_list(Conf) -> Dir = ?privdir(Conf), @@ -4690,7 +4690,7 @@ evil(Conf) when is_list(Conf) -> {size,?MAX+50},{format,external}]), [Fd] = erlang:ports() -- Ports0, {B,_} = x_mk_bytes(30), - ok = disk_log:blog(Log, <<0:(?MAX+1)/unit:8>>), + ok = disk_log:blog(Log, <<0:(?MAX-1)/unit:8>>), exit(Fd, kill), {error, {file_error,_,einval}} = disk_log:blog_terms(Log, [B,B]), ok= disk_log:close(Log), -- cgit v1.2.3 From 93ac8d2b4b2937b9b7651b5e043c31929f3b2c7c Mon Sep 17 00:00:00 2001 From: Richard Carlsson Date: Fri, 25 Nov 2016 21:26:19 +0100 Subject: Pass log format through from handle() --- lib/kernel/src/disk_log.erl | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/kernel/src/disk_log.erl b/lib/kernel/src/disk_log.erl index 69c65c9c43..2ade7fd77a 100644 --- a/lib/kernel/src/disk_log.erl +++ b/lib/kernel/src/disk_log.erl @@ -689,8 +689,8 @@ handle({From, {log, Format, B}}=Message, S) -> reply(From, {error, {read_only_mode, L#log.name}}, S); #log{status = ok, format=external}=L when Format =:= internal -> reply(From, {error, {format_external, L#log.name}}, S); - #log{status = ok} -> - log_loop(S, From, [B], []); + #log{status = ok, format=LogFormat} -> + log_loop(S, From, [B], [], iolist_size(B), LogFormat); #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); #log{blocked_by = From}=L -> @@ -706,8 +706,8 @@ handle({alog, Format, B}=Message, S) -> #log{status = ok, format = external} when Format =:= internal -> notify_owners({format_external, B}), loop(S); - #log{status = ok} -> - log_loop(S, [], [B], []); + #log{status = ok, format=LogFormat} -> + log_loop(S, [], [B], [], iolist_size(B), LogFormat); #log{status = {blocked, false}} -> notify_owners({blocked_log, B}), loop(S); @@ -740,8 +740,8 @@ handle({From, sync}=Message, S) -> case get(log) of #log{mode = read_only}=L -> reply(From, {error, {read_only_mode, L#log.name}}, S); - #log{status = ok} -> - log_loop(S, [], [], [From]); + #log{status = ok, format=LogFormat} -> + log_loop(S, [], [], [From], 0, LogFormat); #log{status = {blocked, false}}=L -> reply(From, {error, {blocked_log, L#log.name}}, S); #log{blocked_by = From}=L -> @@ -1006,9 +1006,6 @@ enqueue(Message, #state{queue = Queue}=S) -> -define(MAX_LOOK_AHEAD, 64*1024). -log_loop(S, Pids, Bins, Sync) -> - log_loop(S, Pids, Bins, Sync, iolist_size(Bins), (get(log))#log.format). - %% Inlined. log_loop(#state{cache_error = CE}=S, Pids, _Bins, _Sync, _Sz, _F) when CE =/= ok -> loop(cache_error(S, Pids)); -- cgit v1.2.3