diff options
Diffstat (limited to 'lib/stdlib/src/dets.erl')
-rw-r--r-- | lib/stdlib/src/dets.erl | 111 |
1 files changed, 57 insertions, 54 deletions
diff --git a/lib/stdlib/src/dets.erl b/lib/stdlib/src/dets.erl index 4584b8184f..6c91f1efb7 100644 --- a/lib/stdlib/src/dets.erl +++ b/lib/stdlib/src/dets.erl @@ -147,6 +147,7 @@ bin, % small chunk not consumed, or 'eof' at end-of-file alloc, % the part of the file not yet scanned, mostly a binary tab, + proc, % the pid of the Dets process match_program % true | compiled_match_spec() | undefined }). @@ -208,8 +209,6 @@ all() -> bchunk(Tab, start) -> badarg(treq(Tab, {bchunk_init, Tab}), [Tab, start]); -bchunk(Tab, #dets_cont{bin = eof, tab = Tab}) -> - '$end_of_table'; bchunk(Tab, #dets_cont{what = bchunk, tab = Tab} = State) -> badarg(treq(Tab, {bchunk, State}), [Tab, State]); bchunk(Tab, Term) -> @@ -722,11 +721,14 @@ init_chunk_match(Tab, Pat, What, N) when is_integer(N), N >= 0; N =:= default -> case compile_match_spec(What, Pat) of {Spec, MP} -> - case req(dets_server:get_pid(Tab), {match, MP, Spec, N}) of + Proc = dets_server:get_pid(Tab), + case req(Proc, {match, MP, Spec, N}) of {done, L} -> - {L, #dets_cont{tab = Tab, what = What, bin = eof}}; + {L, #dets_cont{tab = Tab, proc = Proc, what = What, + bin = eof}}; {cont, State} -> - chunk_match(State#dets_cont{what = What, tab = Tab}); + chunk_match(State#dets_cont{what = What, tab = Tab, + proc = Proc}); Error -> Error end; @@ -736,34 +738,28 @@ init_chunk_match(Tab, Pat, What, N) when is_integer(N), N >= 0; init_chunk_match(_Tab, _Pat, _What, _) -> badarg. -chunk_match(State) -> - case catch dets_server:get_pid(State#dets_cont.tab) of - {'EXIT', _Reason} -> - badarg; - _Proc when State#dets_cont.bin =:= eof -> - '$end_of_table'; - Proc -> - case req(Proc, {match_init, State}) of - {cont, {Bins, NewState}} -> - MP = NewState#dets_cont.match_program, - case catch do_foldl_bins(Bins, MP) of - {'EXIT', _} -> - case ets:is_compiled_ms(MP) of - true -> - Bad = dets_utils:bad_object(chunk_match, - Bins), - req(Proc, {corrupt, Bad}); - false -> - badarg - end; - [] -> - chunk_match(NewState); - Terms -> - {Terms, NewState} - end; - Error -> - Error - end +chunk_match(#dets_cont{proc = Proc}=State) -> + case req(Proc, {match_init, State}) of + '$end_of_table'=Reply -> + Reply; + {cont, {Bins, NewState}} -> + MP = NewState#dets_cont.match_program, + case catch do_foldl_bins(Bins, MP) of + {'EXIT', _} -> + case ets:is_compiled_ms(MP) of + true -> + Bad = dets_utils:bad_object(chunk_match, Bins), + req(Proc, {corrupt, Bad}); + false -> + badarg + end; + [] -> + chunk_match(NewState); + Terms -> + {Terms, NewState} + end; + Error -> + Error end. do_foldl_bins(Bins, true) -> @@ -1094,7 +1090,9 @@ do_apply_op(Op, From, Head, N) -> {N2, H2} when is_record(H2, head), is_integer(N2) -> open_file_loop(H2, N2); H2 when is_record(H2, head) -> - open_file_loop(H2, N) + open_file_loop(H2, N); + {{more,From1,Op1,N1}, NewHead} -> + do_apply_op(Op1, From1, NewHead, N1) catch exit:normal -> exit(normal); @@ -1363,37 +1361,35 @@ start_auto_save_timer(Head) -> %% lookup requests in parallel. Evalute delete_object, delete and %% insert as well. stream_op(Op, Pid, Pids, Head, N) -> - stream_op(Head, Pids, [], N, Pid, Op, Head#head.fixed). + #head{fixed = Fxd, update_mode = M} = Head, + stream_op(Head, Pids, [], N, Pid, Op, Fxd, M). -stream_loop(Head, Pids, C, N, false = Fxd) -> +stream_loop(Head, Pids, C, N, false = Fxd, M) -> receive ?DETS_CALL(From, Message) -> - stream_op(Head, Pids, C, N, From, Message, Fxd) + stream_op(Head, Pids, C, N, From, Message, Fxd, M) after 0 -> stream_end(Head, Pids, C, N, no_more) end; -stream_loop(Head, Pids, C, N, _Fxd) -> +stream_loop(Head, Pids, C, N, _Fxd, _M) -> stream_end(Head, Pids, C, N, no_more). -stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd) -> +stream_op(Head, Pids, C, N, Pid, {lookup_keys,Keys}, Fxd, M) -> NC = [{{lookup,Pid},Keys} | C], - stream_loop(Head, Pids, NC, N, Fxd); -stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd) -> - NC = [Op | C], - stream_loop(Head, [Pid | Pids], NC, N, Fxd); -stream_op(Head, Pids, C, N, Pid, {insert_new, _Objects} = Op, Fxd) -> + stream_loop(Head, Pids, NC, N, Fxd, M); +stream_op(Head, Pids, C, N, Pid, {insert, _Objects} = Op, Fxd, dirty = M) -> NC = [Op | C], - stream_loop(Head, [Pid | Pids], NC, N, Fxd); -stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd) -> + stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); +stream_op(Head, Pids, C, N, Pid, {delete_key, _Keys} = Op, Fxd, dirty = M) -> NC = [Op | C], - stream_loop(Head, [Pid | Pids], NC, N, Fxd); -stream_op(Head, Pids, C, N, Pid, {delete_object, _Objects} = Op, Fxd) -> + stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); +stream_op(Head, Pids, C, N, Pid, {delete_object, _Os} = Op, Fxd, dirty = M) -> NC = [Op | C], - stream_loop(Head, [Pid | Pids], NC, N, Fxd); -stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd) -> + stream_loop(Head, [Pid | Pids], NC, N, Fxd, M); +stream_op(Head, Pids, C, N, Pid, {member, Key}, Fxd, M) -> NC = [{{lookup,[Pid]},[Key]} | C], - stream_loop(Head, Pids, NC, N, Fxd); -stream_op(Head, Pids, C, N, Pid, Op, _Fxd) -> + stream_loop(Head, Pids, NC, N, Fxd, M); +stream_op(Head, Pids, C, N, Pid, Op, _Fxd, _M) -> stream_end(Head, Pids, C, N, {Pid,Op}). stream_end(Head, Pids0, C, N, Next) -> @@ -1438,7 +1434,7 @@ stream_end2([], Ps, no_more, N, C, Head, _Reply) -> penalty(Head, Ps, C), {N, Head}; stream_end2([], _Ps, {From, Op}, N, _C, Head, _Reply) -> - apply_op(Op, From, Head, N). + {{more,From,Op,N},Head}. penalty(H, _Ps, _C) when H#head.fixed =:= false -> ok; @@ -1578,13 +1574,18 @@ do_bchunk_init(Head, Tab) -> L = dets_utils:all_allocated(H2), C0 = #dets_cont{no_objs = default, bin = <<>>, alloc = L}, BinParms = term_to_binary(Parms), - {H2, {C0#dets_cont{tab = Tab, what = bchunk}, [BinParms]}} + {H2, {C0#dets_cont{tab = Tab, proc = self(),what = bchunk}, + [BinParms]}} end; {NewHead, _} = HeadError when is_record(NewHead, head) -> HeadError end. %% -> {NewHead, {cont(), [binary()]}} | {NewHead, Error} +do_bchunk(Head, #dets_cont{proc = Proc}) when Proc =/= self() -> + {Head, badarg}; +do_bchunk(Head, #dets_cont{bin = eof}) -> + {Head, '$end_of_table'}; do_bchunk(Head, State) -> case dets_v9:read_bchunks(Head, State#dets_cont.alloc) of {error, Reason} -> @@ -1954,6 +1955,8 @@ flookup_keys(Head, Keys) -> end. %% -> {NewHead, Result} +fmatch_init(Head, #dets_cont{bin = eof}) -> + {Head, '$end_of_table'}; fmatch_init(Head, C) -> case scan(Head, C) of {scan_error, Reason} -> |