diff options
Diffstat (limited to 'lib/stdlib/src/qlc.erl')
-rw-r--r-- | lib/stdlib/src/qlc.erl | 3540 |
1 files changed, 3540 insertions, 0 deletions
diff --git a/lib/stdlib/src/qlc.erl b/lib/stdlib/src/qlc.erl new file mode 100644 index 0000000000..ef142e1c8a --- /dev/null +++ b/lib/stdlib/src/qlc.erl @@ -0,0 +1,3540 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2004-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(qlc). + +%%% Purpose: Main API module qlc. Functions for evaluation. +%%% Other files: +%%% qlc_pt. Implements the parse transform. + +%% External exports + +-export([parse_transform/2, transform_from_evaluator/2]). + +-export([q/1, q/2]). + +-export([eval/1, e/1, eval/2, e/2, fold/3, fold/4]). +-export([cursor/1, cursor/2, + next_answers/1, next_answers/2, + delete_cursor/1]). +-export([append/1, append/2]). + +-export([sort/1, sort/2, keysort/2, keysort/3]). + +-export([table/2]). + +-export([info/1, info/2]). + +-export([string_to_handle/1, string_to_handle/2, string_to_handle/3]). + +-export([format_error/1]). + +%% Exported to qlc_pt.erl only: +-export([template_state/0, aux_name/3, name_suffix/2, vars/1, + var_ufold/2, var_fold/3, all_selections/1]). + +%% When cache=list lists bigger than ?MAX_LIST_SIZE bytes are put on +%% file. Also used when merge join finds big equivalence classes. +-define(MAX_LIST_SIZE, 512*1024). + +-record(qlc_append, % qlc:append/1,2 + {hl + }). + +-record(qlc_table, % qlc:table/2 + {trav_fun, % traverse fun + trav_MS, % bool(); true iff traverse fun takes a match spec + pre_fun, + post_fun, + info_fun, + format_fun, + lookup_fun, + parent_fun, + key_equality, % '==' | '=:=' | undefined (--R12B-5) + lu_vals, % undefined | {Position,Values}; values to be looked up + ms = no_match_spec + % match specification; [T || P <- Tab, Fs] + }). + +-record(qlc_sort, % qlc:sort/1,2 and qlc:keysort/2,3 + {h, + keypos, % sort | {keysort, KeyPos} + unique, + compressed, % [] | [compressed] + order, + fs_opts, % file_sorter options + tmpdir_usage = allowed, % allowed | not_allowed + % | warning_msg | error_msg | info_msg + tmpdir + }). + +%% Also in qlc_pt.erl. +-record(qlc_lc, % qlc:q/1,2 + {lc, + opt % #qlc_opt + }). + +-record(qlc_list, % a prepared list + {l, + ms = no_match_spec + }). + +-record(qlc_join, % a prepared join + {kind, % {merge, KeyEquality} | + % {lookup, KeyEquality, LookupFun} + opt, % #qlc_opt from q/2. + h1, q1, c1, % to be traversed by "lookup join" + h2, q2, c2 % to be looked up by "lookup join" + }). + +%%% A query cursor is a tuple {qlc_cursor, Cursor} where Cursor is a pair +%%% {CursorPid, OwnerPid}. + +-record(qlc_cursor, {c}). + +-record(qlc_opt, + {unique = false, % bool() + cache = false, % bool() | list (true~ets, false~no) + max_lookup = -1, % int() >= 0 | -1 (represents infinity) + join = any, % any | nested_loop | merge | lookup + tmpdir = "", % global tmpdir + lookup = any, % any | bool() + max_list = ?MAX_LIST_SIZE, % int() >= 0 + tmpdir_usage = allowed % allowed | not_allowed + % | warning_msg | error_msg | info_msg + }). + +-record(setup, {parent}). + +-define(THROWN_ERROR, {?MODULE, throw_error, _}). + +%%% A query handle is a tuple {qlc_handle, Handle} where Handle is one +%%% of #qlc_append, #qlc_table, #qlc_sort, and #qlc_lc. + +-record(qlc_handle, {h}). + +get_handle(#qlc_handle{h = #qlc_lc{opt = {qlc_opt, U, C, M}}=H}) -> + %% R11B-0. + H#qlc_lc{opt = #qlc_opt{unique = U, cache = C, max_lookup = M}}; +get_handle(#qlc_handle{h = H}) -> + H; +get_handle(L) when is_list(L) -> + L; +get_handle(_) -> + badarg. + +%%% +%%% Exported functions +%%% + +append(QHs) -> + Hs = [case get_handle(QH) of + badarg -> erlang:error(badarg, [QHs]); + H -> H + end || QH <- QHs], + #qlc_handle{h = #qlc_append{hl = Hs}}. + +append(QH1, QH2) -> + Hs = [case get_handle(QH) of + badarg -> erlang:error(badarg, [QH1, QH2]); + H -> H + end || QH <- [QH1, QH2]], + #qlc_handle{h = #qlc_append{hl = Hs}}. + +cursor(QH) -> + cursor(QH, []). + +cursor(QH, Options) -> + case {options(Options, [unique_all, cache_all, tmpdir, + spawn_options, max_list_size, + tmpdir_usage]), + get_handle(QH)} of + {B1, B2} when B1 =:= badarg; B2 =:= badarg -> + erlang:error(badarg, [QH, Options]); + {[GUnique, GCache, TmpDir, SpawnOptions0, MaxList, TmpUsage], H} -> + SpawnOptions = spawn_options(SpawnOptions0), + case cursor_process(H, GUnique, GCache, TmpDir, + SpawnOptions, MaxList, TmpUsage) of + Pid when is_pid(Pid) -> + #qlc_cursor{c = {Pid, self()}}; + Error -> + Error + end + end. + +delete_cursor(#qlc_cursor{c = {_, Owner}}=C) when Owner =/= self() -> + erlang:error(not_cursor_owner, [C]); +delete_cursor(#qlc_cursor{c = {Pid, _}}) -> + stop_cursor(Pid); +delete_cursor(T) -> + erlang:error(badarg, [T]). + +e(QH) -> + eval(QH, []). + +e(QH, Options) -> + eval(QH, Options). + +eval(QH) -> + eval(QH, []). + +eval(QH, Options) -> + case {options(Options, [unique_all, cache_all, tmpdir, max_list_size, + tmpdir_usage]), + get_handle(QH)} of + {B1, B2} when B1 =:= badarg; B2 =:= badarg -> + erlang:error(badarg, [QH, Options]); + {[GUnique, GCache, TmpDir, MaxList, TmpUsage], Handle} -> + try + Prep = prepare_qlc(Handle, [], GUnique, GCache, + TmpDir, MaxList, TmpUsage), + case setup_qlc(Prep, #setup{parent = self()}) of + {L, Post, _LocalPost} when is_list(L) -> + post_funs(Post), + L; + {Objs, Post, _LocalPost} when is_function(Objs) -> + try + collect(Objs) + after + post_funs(Post) + end + end + catch Term -> + case erlang:get_stacktrace() of + [?THROWN_ERROR | _] -> + Term; + Stacktrace -> + erlang:raise(throw, Term, Stacktrace) + end + end + end. + +fold(Fun, Acc0, QH) -> + fold(Fun, Acc0, QH, []). + +fold(Fun, Acc0, QH, Options) -> + case {options(Options, [unique_all, cache_all, tmpdir, max_list_size, + tmpdir_usage]), + get_handle(QH)} of + {B1, B2} when B1 =:= badarg; B2 =:= badarg -> + erlang:error(badarg, [Fun, Acc0, QH, Options]); + {[GUnique, GCache, TmpDir, MaxList, TmpUsage], Handle} -> + try + Prep = prepare_qlc(Handle, not_a_list, GUnique, GCache, + TmpDir, MaxList, TmpUsage), + case setup_qlc(Prep, #setup{parent = self()}) of + {Objs, Post, _LocalPost} when is_function(Objs); + is_list(Objs) -> + try + fold_loop(Fun, Objs, Acc0) + after + post_funs(Post) + end + end + catch Term -> + case erlang:get_stacktrace() of + [?THROWN_ERROR | _] -> + Term; + Stacktrace -> + erlang:raise(throw, Term, Stacktrace) + end + end + end. + +format_error(not_a_query_list_comprehension) -> + io_lib:format("argument is not a query list comprehension", []); +format_error({used_generator_variable, V}) -> + io_lib:format("generated variable ~w must not be used in list expression", + [V]); +format_error(binary_generator) -> + io_lib:format("cannot handle binary generators", []); +format_error(too_complex_join) -> + io_lib:format("cannot handle join of three or more generators efficiently", + []); +format_error(too_many_joins) -> + io_lib:format("cannot handle more than one join efficiently", []); +format_error(nomatch_pattern) -> + io_lib:format("pattern cannot possibly match", []); +format_error(nomatch_filter) -> + io_lib:format("filter evaluates to 'false'", []); +format_error({Line, Mod, Reason}) when is_integer(Line) -> + io_lib:format("~p: ~s~n", + [Line, lists:flatten(Mod:format_error(Reason))]); +%% file_sorter errors +format_error({bad_object, FileName}) -> + io_lib:format("the temporary file \"~s\" holding answers is corrupt", + [FileName]); +format_error(bad_object) -> + io_lib:format("the keys could not be extracted from some term", []); +format_error({file_error, FileName, Reason}) -> + io_lib:format("\"~s\": ~p~n",[FileName, file:format_error(Reason)]); +format_error({premature_eof, FileName}) -> + io_lib:format("\"~s\": end-of-file was encountered inside some binary term", + [FileName]); +format_error({tmpdir_usage, Why}) -> + io_lib:format("temporary file was needed for ~w~n", [Why]); +format_error({error, Module, Reason}) -> + Module:format_error(Reason); +format_error(E) -> + io_lib:format("~p~n", [E]). + +info(QH) -> + info(QH, []). + +info(QH, Options) -> + case {options(Options, [unique_all, cache_all, flat, format, n_elements, + depth, tmpdir, max_list_size, tmpdir_usage]), + get_handle(QH)} of + {B1, B2} when B1 =:= badarg; B2 =:= badarg -> + erlang:error(badarg, [QH, Options]); + {[GUnique, GCache, Flat, Format, NElements, + Depth, TmpDir, MaxList, TmpUsage], + H} -> + try + Prep = prepare_qlc(H, [], GUnique, GCache, + TmpDir, MaxList, TmpUsage), + Info = le_info(Prep, {NElements,Depth}), + AbstractCode = abstract(Info, Flat, NElements, Depth), + case Format of + abstract_code -> + abstract_code(AbstractCode); + string -> + Hook = fun({special, _Line, String}, _I, _P, _F) -> + String + end, + lists:flatten(erl_pp:expr(AbstractCode, 0, Hook)); + debug -> % Not documented. Intended for testing only. + Info + end + catch Term -> + case erlang:get_stacktrace() of + [?THROWN_ERROR | _] -> + Term; + Stacktrace -> + erlang:raise(throw, Term, Stacktrace) + end + end + end. + +keysort(KeyPos, QH) -> + keysort(KeyPos, QH, []). + +keysort(KeyPos, QH, Options) -> + case {is_keypos(KeyPos), + options(Options, [tmpdir, order, unique, compressed, + size, no_files]), + get_handle(QH)} of + {true, [TmpDir, Order, Unique,Compressed | _], H} when H =/= badarg -> + #qlc_handle{h = #qlc_sort{h = H, keypos = {keysort,KeyPos}, + unique = Unique, + compressed = Compressed, + order = Order, + fs_opts = listify(Options), + tmpdir = TmpDir}}; + _ -> + erlang:error(badarg, [KeyPos, QH, Options]) + end. + +-define(DEFAULT_NUM_OF_ANSWERS, 10). + +next_answers(C) -> + next_answers(C, ?DEFAULT_NUM_OF_ANSWERS). + +next_answers(#qlc_cursor{c = {_, Owner}}=C, + NumOfAnswers) when Owner =/= self() -> + erlang:error(not_cursor_owner, [C, NumOfAnswers]); +next_answers(#qlc_cursor{c = {Pid, _}}=C, NumOfAnswers) -> + N = case NumOfAnswers of + all_remaining -> -1; + _ when is_integer(NumOfAnswers), NumOfAnswers > 0 -> NumOfAnswers; + _ -> erlang:error(badarg, [C, NumOfAnswers]) + end, + next_loop(Pid, [], N); +next_answers(T1, T2) -> + erlang:error(badarg, [T1, T2]). + +parse_transform(Forms, Options) -> + qlc_pt:parse_transform(Forms, Options). + +%% The funcspecs qlc:q/1 and qlc:q/2 are known by erl_eval.erl and +%% erl_lint.erl. +q(QLC_lc) -> + q(QLC_lc, []). + +q(#qlc_lc{}=QLC_lc, Options) -> + case options(Options, [unique, cache, max_lookup, join, lookup]) of + [Unique, Cache, Max, Join, Lookup] -> + Opt = #qlc_opt{unique = Unique, cache = Cache, + max_lookup = Max, join = Join, lookup = Lookup}, + #qlc_handle{h = QLC_lc#qlc_lc{opt = Opt}}; + _ -> + erlang:error(badarg, [QLC_lc, Options]) + end; +q(T1, T2) -> + erlang:error(badarg, [T1, T2]). + +sort(QH) -> + sort(QH, []). + +sort(QH, Options) -> + case {options(Options, [tmpdir, order, unique, compressed, + size, no_files]), get_handle(QH)} of + {B1, B2} when B1 =:= badarg; B2 =:= badarg -> + erlang:error(badarg, [QH, Options]); + {[TD, Order, Unique, Compressed | _], H} -> + #qlc_handle{h = #qlc_sort{h = H, keypos = sort, unique = Unique, + compressed = Compressed, order = Order, + fs_opts = listify(Options), + tmpdir = TD}} + end. + +%% Note that the generated code is evaluated by (the slow) erl_eval. +string_to_handle(Str) -> + string_to_handle(Str, []). + +string_to_handle(Str, Options) -> + string_to_handle(Str, Options, []). + +string_to_handle(Str, Options, Bindings) when is_list(Str), + is_list(Bindings) -> + case options(Options, [unique, cache, max_lookup, join, lookup]) of + badarg -> + erlang:error(badarg, [Str, Options, Bindings]); + [Unique, Cache, MaxLookup, Join, Lookup] -> + case erl_scan:string(Str) of + {ok, Tokens, _} -> + case erl_parse:parse_exprs(Tokens) of + {ok, [Expr]} -> + case qlc_pt:transform_expression(Expr, Bindings) of + {ok, {call, _, _QlcQ, Handle}} -> + {value, QLC_lc, _} = + erl_eval:exprs(Handle, Bindings), + O = #qlc_opt{unique = Unique, + cache = Cache, + max_lookup = MaxLookup, + join = Join, + lookup = Lookup}, + #qlc_handle{h = QLC_lc#qlc_lc{opt = O}}; + {not_ok, [{error, Error} | _]} -> + error(Error) + end; + {ok, _ExprList} -> + erlang:error(badarg, [Str, Options, Bindings]); + {error, ErrorInfo} -> + error(ErrorInfo) + end; + {error, ErrorInfo, _EndLine} -> + error(ErrorInfo) + end + end; +string_to_handle(T1, T2, T3) -> + erlang:error(badarg, [T1, T2, T3]). + +table(TraverseFun, Options) when is_function(TraverseFun) -> + case {is_function(TraverseFun, 0), + IsFun1 = is_function(TraverseFun, 1)} of + {false, false} -> + erlang:error(badarg, [TraverseFun, Options]); + _ -> + case options(Options, [pre_fun, post_fun, info_fun, format_fun, + lookup_fun, parent_fun, key_equality]) of + [PreFun, PostFun, InfoFun, FormatFun, LookupFun, ParentFun, + KeyEquality] -> + T = #qlc_table{trav_fun = TraverseFun, pre_fun = PreFun, + post_fun = PostFun, info_fun = InfoFun, + parent_fun = ParentFun, + trav_MS = IsFun1, + format_fun = FormatFun, + lookup_fun = LookupFun, + key_equality = KeyEquality}, + #qlc_handle{h = T}; + badarg -> + erlang:error(badarg, [TraverseFun, Options]) + end + end; +table(T1, T2) -> + erlang:error(badarg, [T1, T2]). + +transform_from_evaluator(LC, Bs0) -> + qlc_pt:transform_from_evaluator(LC, Bs0). + +-define(TEMPLATE_STATE, 1). + +template_state() -> + ?TEMPLATE_STATE. + +aux_name(Name, N, AllNames) -> + {VN, _} = aux_name1(Name, N, AllNames), + VN. + +name_suffix(A, Suff) -> + list_to_atom(lists:concat([A, Suff])). + +vars(E) -> + var_ufold(fun({var,_L,V}) -> V end, E). + +var_ufold(F, E) -> + ordsets:from_list(var_fold(F, [], E)). + +all_selections([]) -> + [[]]; +all_selections([{I,Cs} | ICs]) -> + [[{I,C} | L] || C <- Cs, L <- all_selections(ICs)]. + +%%% +%%% Local functions +%%% + +aux_name1(Name, N, AllNames) -> + SN = name_suffix(Name, N), + case sets:is_element(SN, AllNames) of + true -> aux_name1(Name, N + 1, AllNames); + false -> {SN, N} + end. + +var_fold(F, A, {var,_,V}=Var) when V =/= '_' -> + [F(Var) | A]; +var_fold(F, A, T) when is_tuple(T) -> + var_fold(F, A, tuple_to_list(T)); +var_fold(F, A, [E | Es]) -> + var_fold(F, var_fold(F, A, E), Es); +var_fold(_F, A, _T) -> + A. + +options(Options, Keys) when is_list(Options) -> + options(Options, Keys, []); +options(Option, Keys) -> + options([Option], Keys, []). + +options(Options0, [Key | Keys], L) when is_list(Options0) -> + Options = case lists:member(Key, Options0) of + true -> + [atom_option(Key) | lists:delete(Key, Options0)]; + false -> + Options0 + end, + V = case lists:keysearch(Key, 1, Options) of + {value, {format_fun, U=undefined}} -> + {ok, U}; + {value, {info_fun, U=undefined}} -> + {ok, U}; + {value, {lookup_fun, U=undefined}} -> + {ok, U}; + {value, {parent_fun, U=undefined}} -> + {ok, U}; + {value, {post_fun, U=undefined}} -> + {ok, U}; + {value, {pre_fun, U=undefined}} -> + {ok, U}; + {value, {info_fun, Fun}} when is_function(Fun), + is_function(Fun, 1) -> + {ok, Fun}; + {value, {pre_fun, Fun}} when is_function(Fun), + is_function(Fun, 1) -> + {ok, Fun}; + {value, {post_fun, Fun}} when is_function(Fun), + is_function(Fun, 0) -> + {ok, Fun}; + {value, {lookup_fun, Fun}} when is_function(Fun), + is_function(Fun, 2) -> + {ok, Fun}; + {value, {max_lookup, Max}} when is_integer(Max), Max >= 0 -> + {ok, Max}; + {value, {max_lookup, infinity}} -> + {ok, -1}; + {value, {format_fun, Fun}} when is_function(Fun), + is_function(Fun, 1) -> + {ok, Fun}; + {value, {parent_fun, Fun}} when is_function(Fun), + is_function(Fun, 0) -> + {ok, Fun}; + {value, {key_equality, KE='=='}}-> + {ok, KE}; + {value, {key_equality, KE='=:='}}-> + {ok, KE}; + {value, {join, J=any}} -> + {ok, J}; + {value, {join, J=nested_loop}} -> + {ok, J}; + {value, {join, J=merge}} -> + {ok, J}; + {value, {join, J=lookup}} -> + {ok, J}; + {value, {lookup, LookUp}} when LookUp; + not LookUp; + LookUp =:= any -> + {ok, LookUp}; + {value, {max_list_size, Max}} when is_integer(Max), Max >= 0 -> + {ok, Max}; + {value, {tmpdir_usage, TmpUsage}} when TmpUsage =:= allowed; + TmpUsage =:= not_allowed; + TmpUsage =:= info_msg; + TmpUsage =:= warning_msg; + TmpUsage =:= error_msg -> + {ok, TmpUsage}; + {value, {unique, Unique}} when Unique; not Unique -> + {ok, Unique}; + {value, {cache, Cache}} when Cache; not Cache; Cache =:= list -> + {ok, Cache}; + {value, {cache, ets}} -> + {ok, true}; + {value, {cache, no}} -> + {ok, false}; + {value, {unique_all, UniqueAll}} when UniqueAll; not UniqueAll -> + {ok, UniqueAll}; + {value, {cache_all, CacheAll}} when CacheAll; + not CacheAll; + CacheAll =:= list -> + {ok, CacheAll}; + {value, {cache_all, ets}} -> + {ok, true}; + {value, {cache_all, no}} -> + {ok, false}; + {value, {spawn_options, default}} -> + {ok, default}; + {value, {spawn_options, SpawnOptions}} -> + case is_proper_list(SpawnOptions) of + true -> + {ok, SpawnOptions}; + false -> + badarg + end; + {value, {flat, Flat}} when Flat; not Flat -> + {ok, Flat}; + {value, {format, Format}} when Format =:= string; + Format =:= abstract_code; + Format =:= debug -> + {ok, Format}; + {value, {n_elements, NElements}} when NElements =:= infinity; + is_integer(NElements), + NElements > 0 -> + {ok, NElements}; + {value, {depth, Depth}} when Depth =:= infinity; + is_integer(Depth), Depth >= 0 -> + {ok, Depth}; + {value, {order, Order}} when is_function(Order), + is_function(Order, 2); + (Order =:= ascending); + (Order =:= descending) -> + {ok, Order}; + {value, {compressed, Comp}} when Comp -> + {ok, [compressed]}; + {value, {compressed, Comp}} when not Comp -> + {ok, []}; + {value, {tmpdir, T}} -> + {ok, T}; + {value, {size, Size}} when is_integer(Size), Size > 0 -> + {ok, Size}; + {value, {no_files, NoFiles}} when is_integer(NoFiles), + NoFiles > 1 -> + {ok, NoFiles}; + {value, {Key, _}} -> + badarg; + false -> + Default = default_option(Key), + {ok, Default} + end, + case V of + badarg -> + badarg; + {ok, Value} -> + NewOptions = lists:keydelete(Key, 1, Options), + options(NewOptions, Keys, [Value | L]) + end; +options([], [], L) -> + lists:reverse(L); +options(_Options, _, _L) -> + badarg. + +default_option(pre_fun) -> undefined; +default_option(post_fun) -> undefined; +default_option(info_fun) -> undefined; +default_option(format_fun) -> undefined; +default_option(lookup_fun) -> undefined; +default_option(max_lookup) -> -1; +default_option(join) -> any; +default_option(lookup) -> any; +default_option(parent_fun) -> undefined; +default_option(key_equality) -> '=:='; +default_option(spawn_options) -> default; +default_option(flat) -> true; +default_option(format) -> string; +default_option(n_elements) -> infinity; +default_option(depth) -> infinity; +default_option(max_list_size) -> ?MAX_LIST_SIZE; +default_option(tmpdir_usage) -> allowed; +default_option(cache) -> false; +default_option(cache_all) -> false; +default_option(unique) -> false; +default_option(unique_all) -> false; +default_option(order) -> ascending; % default values from file_sorter.erl +default_option(compressed) -> []; +default_option(tmpdir) -> ""; +default_option(size) -> 524288; +default_option(no_files) -> 16. + +atom_option(cache) -> {cache, true}; +atom_option(unique) -> {unique, true}; +atom_option(cache_all) -> {cache_all, true}; +atom_option(unique_all) -> {unique_all, true}; +atom_option(lookup) -> {lookup, true}; +atom_option(flat) -> {flat, true}; +atom_option(Key) -> Key. + +is_proper_list([_ | L]) -> + is_proper_list(L); +is_proper_list(L) -> + L =:= []. + +spawn_options(default) -> + [link]; +spawn_options(SpawnOptions) -> + lists:delete(monitor, + case lists:member(link, SpawnOptions) of + true -> + SpawnOptions; + false -> + [link | SpawnOptions] + end). + +is_keypos(Keypos) when is_integer(Keypos), Keypos > 0 -> + true; +is_keypos([]) -> + false; +is_keypos(L) -> + is_keyposs(L). + +is_keyposs([Kp | Kps]) when is_integer(Kp), Kp > 0 -> + is_keyposs(Kps); +is_keyposs(Kps) -> + Kps =:= []. + +listify(L) when is_list(L) -> + L; +listify(T) -> + [T]. + +%% Optimizations to be carried out. +-record(optz, + {unique = false, % bool() + cache = false, % bool() | list + join_option = any, % constraint set by the 'join' option + fast_join = no, % no | #qlc_join. 'no' means nested loop. + opt % #qlc_opt + }). + +%% Prepared #qlc_lc. +-record(qlc, + {lcf, % fun() -> Val + codef, + qdata, % with evaluated list expressions + init_value, + optz % #optz + }). + +%% Prepared simple #qlc_lc. +-record(simple_qlc, + {p, % atom(), pattern variable + le, + line, + init_value, + optz % #optz + }). + +-record(prepared, + {qh, % #qlc_append | #qlc_table | #qlc | #simple_qlc | + % #qlc_sort | list() + sorted = no, % yes | no | ascending | descending + sort_info = [], % + sort_info2 = [], % 'sort_info' updated with pattern info; qh is LE + lu_skip_quals = [], % qualifiers to skip due to lookup + join = {[],[]}, % {Lookup, Merge} + n_objs = undefined, % for join (not used yet) + is_unique_objects = false, % bool() + is_cached = false % bool() (true means 'ets' or 'list') + }). + +%%% Cursor process functions. + +cursor_process(H, GUnique, GCache, TmpDir, SpawnOptions, MaxList, TmpUsage) -> + Parent = self(), + Setup = #setup{parent = Parent}, + CF = fun() -> + %% Unless exit/2 is trapped no cleanup can be done. + %% The user is assumed not to set the flag to false. + process_flag(trap_exit, true), + MonRef = erlang:monitor(process, Parent), + {Objs, Post, _LocalPost} = + try + Prep = prepare_qlc(H, not_a_list, GUnique, GCache, + TmpDir, MaxList, TmpUsage), + setup_qlc(Prep, Setup) + catch Class:Reason -> + Parent ! {self(), {caught, Class, Reason, + erlang:get_stacktrace()}}, + exit(normal) + end, + Parent ! {self(), ok}, + wait_for_request(Parent, MonRef, Post), + reply(Parent, MonRef, Post, Objs) + end, + Pid = spawn_opt(CF, SpawnOptions), + parent_fun(Pid, Parent). + +%% Expect calls from tables calling the parent_fun and finally an 'ok'. +parent_fun(Pid, Parent) -> + receive + {Pid, ok} -> Pid; + {TPid, {parent_fun, Fun}} -> + V = try + {value, Fun()} + catch Class:Reason -> + {parent_fun_caught, Class, Reason, erlang:get_stacktrace()} + end, + TPid ! {Parent, V}, + parent_fun(Pid, Parent); + {Pid, {caught, throw, Error, [?THROWN_ERROR | _]}} -> + Error; + {Pid, {caught, Class, Reason, Stacktrace}} -> + erlang:raise(Class, Reason, Stacktrace) + end. + +reply(Parent, MonRef, Post, []) -> + no_more(Parent, MonRef, Post); +reply(Parent, MonRef, Post, [Answer | Cont]) -> + Parent ! {self(), {answer, Answer}}, + wait_for_request(Parent, MonRef, Post), + reply(Parent, MonRef, Post, Cont); +reply(Parent, MonRef, Post, Cont) -> + Reply = try + if + is_function(Cont) -> + Cont(); + true -> + throw_error(Cont) + end + catch + Class:Reason -> + post_funs(Post), + Message = {caught, Class, Reason, erlang:get_stacktrace()}, + Parent ! {self(), Message}, + exit(normal) + end, + reply(Parent, MonRef, Post, Reply). + +no_more(Parent, MonRef, Post) -> + Parent ! {self(), no_more}, + wait_for_request(Parent, MonRef, Post), + no_more(Parent, MonRef, Post). + +wait_for_request(Parent, MonRef, Post) -> + receive + {Parent, stop} -> + post_funs(Post), + exit(normal); + {Parent, more} -> + ok; + {'EXIT', Parent, _Reason} -> + post_funs(Post), + exit(normal); + {'DOWN', MonRef, process, Parent, _Info} -> + post_funs(Post), + exit(normal); + {'EXIT', Pid, _Reason} when Pid =:= self() -> + %% Trapped signal. The cursor ignores it... + wait_for_request(Parent, MonRef, Post); + Other -> + error_logger:error_msg( + "The qlc cursor ~w received an unexpected message:\n~p\n", + [self(), Other]), + wait_for_request(Parent, MonRef, Post) + end. + +%%% End of cursor process functions. + +abstract_code({special, Line, String}) -> + {string, Line, String}; +abstract_code(Tuple) when is_tuple(Tuple) -> + list_to_tuple(abstract_code(tuple_to_list(Tuple))); +abstract_code([H | T]) -> + [abstract_code(H) | abstract_code(T)]; +abstract_code(Term) -> + Term. + +%% Also in qlc_pt.erl. +-define(Q, q). +-define(QLC_Q(L1, L2, L3, L4, LC, Os), + {call,L1,{remote,L2,{atom,L3,?MODULE},{atom,L4,?Q}},[LC | Os]}). + +abstract(Info, false=_Flat, NElements, Depth) -> + abstract(Info, NElements, Depth); +abstract(Info, true=_Flat, NElements, Depth) -> + Abstract = abstract(Info, NElements, Depth), + Vars = abstract_vars(Abstract), + {_, Body0, Expr} = flatten_abstr(Abstract, 1, Vars, []), + case Body0 of + [] -> + Expr; + [{match,_,Expr,Q}] -> + Q; + [{match,_,Expr,Q} | Body] -> + {block, 0, lists:reverse(Body, [Q])}; + _ -> + {block, 0, lists:reverse(Body0, [Expr])} + end. + +abstract({qlc, E0, Qs0, Opt}, NElements, Depth) -> + Qs = lists:map(fun({generate, P, LE}) -> + {generate, 1, binary_to_term(P), + abstract(LE, NElements, Depth)}; + (F) -> + binary_to_term(F) + end, Qs0), + E = binary_to_term(E0), + Os = case Opt of + [] -> []; + _ -> [abstract_term(Opt, 1)] + end, + ?QLC_Q(1, 1, 1, 1, {lc,1,E,Qs}, Os); +abstract({table, {M, F, As0}}, _NElements, _Depth) + when is_atom(M), is_atom(F), is_list(As0) -> + As = [abstract_term(A, 1) || A <- As0], + {call, 1, {remote, 1, {atom, 1, M}, {atom, 1, F}}, As}; +abstract({table, TableDesc}, _NElements, _Depth) -> + case io_lib:deep_char_list(TableDesc) of + true -> + {ok, Tokens, _} = erl_scan:string(lists:flatten(TableDesc++".")), + {ok, [Expr]} = erl_parse:parse_exprs(Tokens), + Expr; + false -> % abstract expression + TableDesc + end; +abstract({append, Infos}, NElements, Depth) -> + As = lists:foldr(fun(Info, As0) -> + {cons,1,abstract(Info, NElements, Depth),As0} + end, {nil, 1}, Infos), + {call, 1, {remote, 1, {atom, 1, ?MODULE}, {atom, 1, append}}, [As]}; +abstract({sort, Info, SortOptions}, NElements, Depth) -> + {call, 1, {remote, 1, {atom, 1, ?MODULE}, {atom, 1, sort}}, + [abstract(Info, NElements, Depth), abstract_term(SortOptions, 1)]}; +abstract({keysort, Info, Kp, SortOptions}, NElements, Depth) -> + {call, 1, {remote, 1, {atom, 1, ?MODULE}, {atom, 1, keysort}}, + [abstract_term(Kp, 1), abstract(Info, NElements, Depth), + abstract_term(SortOptions, 1)]}; +abstract({list,L,MS}, NElements, Depth) -> + {call, 1, {remote, 1, {atom, 1, ets}, {atom, 1, match_spec_run}}, + [abstract(L, NElements, Depth), + {call, 1, {remote, 1, {atom, 1, ets}, {atom, 1, match_spec_compile}}, + [abstract_term(depth(MS, Depth), 1)]}]}; +abstract({list, L}, NElements, Depth) when NElements =:= infinity; + NElements >= length(L) -> + abstract_term(depth(L, Depth), 1); +abstract({list, L}, NElements, Depth) -> + abstract_term(depth(lists:sublist(L, NElements), Depth) ++ '...', 1). + +depth(List, infinity) -> + List; +depth(List, Depth) -> + [depth1(E, Depth) || E <- List]. + +depth_fun(infinity = _Depth) -> + fun(E) -> E end; +depth_fun(Depth) -> + fun(E) -> depth1(E, Depth) end. + +depth1([]=L, _D) -> + L; +depth1(_Term, 0) -> + '...'; +depth1(Tuple, D) when is_tuple(Tuple) -> + depth_tuple(Tuple, tuple_size(Tuple), 1, D - 1, []); +depth1(List, D) when is_list(List) -> + if + D =:= 1 -> + ['...']; + true -> + depth_list(List, D - 1) + end; +depth1(Binary, D) when byte_size(Binary) > D - 1 -> + D1 = D - 1, + <<Bin:D1/bytes,_/bytes>> = Binary, + <<Bin/bytes,"...">>; +depth1(T, _Depth) -> + T. + +depth_list([]=L, _D) -> + L; +depth_list(_L, 0) -> + '...'; +depth_list([E | Es], D) -> + [depth1(E, D) | depth_list(Es, D - 1)]. + +depth_tuple(_Tuple, Sz, I, _D, L) when I > Sz -> + list_to_tuple(lists:reverse(L)); +depth_tuple(_L, _Sz, _I, 0, L) -> + list_to_tuple(lists:reverse(L, ['...'])); +depth_tuple(Tuple, Sz, I, D, L) -> + E = depth1(element(I, Tuple), D), + depth_tuple(Tuple, Sz, I + 1, D - 1, [E | L]). + +abstract_term(Term) -> + abstract_term(Term, 0). + +abstract_term(Term, Line) -> + abstr_term(Term, Line). + +abstr_term(Tuple, Line) when is_tuple(Tuple) -> + {tuple,Line,[abstr_term(E, Line) || E <- tuple_to_list(Tuple)]}; +abstr_term([_ | _]=L, Line) -> + case io_lib:char_list(L) of + true -> + erl_parse:abstract(L, Line); + false -> + abstr_list(L, Line) + end; +abstr_term(Fun, Line) when is_function(Fun) -> + case erl_eval:fun_data(Fun) of + {fun_data, _Bs, Cs} -> + {'fun', Line, {clauses, Cs}}; + false -> + {name, Name} = erlang:fun_info(Fun, name), + {arity, Arity} = erlang:fun_info(Fun, arity), + case erlang:fun_info(Fun, type) of + {type, external} -> + {module, Module} = erlang:fun_info(Fun, module), + {'fun', Line, {function,Module,Name,Arity}}; + {type, local} -> + {'fun', Line, {function,Name,Arity}} + end + end; +abstr_term(PPR, Line) when is_pid(PPR); is_port(PPR); is_reference(PPR) -> + {special, Line, lists:flatten(io_lib:write(PPR))}; +abstr_term(Simple, Line) -> + erl_parse:abstract(Simple, Line). + +abstr_list([H | T], Line) -> + {cons, Line, abstr_term(H, Line), abstr_list(T, Line)}; +abstr_list(T, Line) -> + abstr_term(T, Line). + +%% Since generator pattern variables cannot be used in list +%% expressions, it is OK to flatten out QLCs using temporary +%% variables. +flatten_abstr(?QLC_Q(L1, L2, L3, L4, LC0, Os), VN0, Vars, Body0) -> + {lc,L,E,Qs0} = LC0, + F = fun({generate,Ln,P,LE0}, {VN1,Body1}) -> + {VN2,Body2,LE} = flatten_abstr(LE0, VN1, Vars, Body1), + {{generate,Ln,P,LE}, {VN2,Body2}}; + (Fil, VN_Body) -> + {Fil, VN_Body} + end, + {Qs, {VN3,Body}} = lists:mapfoldl(F, {VN0,Body0}, Qs0), + LC = {lc,L,E,Qs}, + {V, VN} = aux_name1('V', VN3, Vars), + Var = {var, L1, V}, + QLC = ?QLC_Q(L1, L2, L3, L4, LC, Os), + {VN + 1, [{match, L1, Var, QLC} | Body], Var}; +flatten_abstr(T0, VN0, Vars, Body0) when is_tuple(T0) -> + {VN, Body, L} = flatten_abstr(tuple_to_list(T0), VN0, Vars, Body0), + {VN, Body, list_to_tuple(L)}; +flatten_abstr([E0 | Es0], VN0, Vars, Body0) -> + {VN1, Body1, E} = flatten_abstr(E0, VN0, Vars, Body0), + {VN, Body, Es} = flatten_abstr(Es0, VN1, Vars, Body1), + {VN, Body, [E | Es]}; +flatten_abstr(E, VN, _Vars, Body) -> + {VN, Body, E}. + +abstract_vars(Abstract) -> + sets:from_list(ordsets:to_list(vars(Abstract))). + +collect([]=L) -> + L; +collect([Answer | Cont]) -> + [Answer | collect(Cont)]; +collect(Cont) -> + case Cont() of + Answers when is_list(Answers) -> + collect(Answers); + Term -> + throw_error(Term) + end. + +fold_loop(Fun, [Obj | Cont], Acc) -> + fold_loop(Fun, Cont, Fun(Obj, Acc)); +fold_loop(_Fun, [], Acc) -> + Acc; +fold_loop(Fun, Cont, Acc) -> + case Cont() of + Objects when is_list(Objects) -> + fold_loop(Fun, Objects, Acc); + Term -> + Term + end. + +next_loop(Pid, L, N) when N =/= 0 -> + case monitor_request(Pid, more) of + no_more -> + lists:reverse(L); + {answer, Answer} -> + next_loop(Pid, [Answer | L], N - 1); + {caught, throw, Error, [?THROWN_ERROR | _]} -> + Error; + {caught, Class, Reason, Stacktrace} -> + _ = (catch erlang:error(foo)), + erlang:raise(Class, Reason, Stacktrace ++ erlang:get_stacktrace()); + error -> + erlang:error({qlc_cursor_pid_no_longer_exists, Pid}) + end; +next_loop(_Pid, L, _N) -> + lists:reverse(L). + +stop_cursor(Pid) -> + erlang:monitor(process, Pid), + unlink(Pid), + receive + {'EXIT',Pid,_Reason} -> % Simply ignore the error. + receive + {'DOWN',_,process,Pid,_} -> ok + end + after 0 -> + Pid ! {self(),stop}, + receive + {'DOWN',_,process,Pid,_} -> ok + end + end. + +monitor_request(Pid, Req) -> + Ref = erlang:monitor(process, Pid), + Pid ! {self(), Req}, + receive + {'DOWN', Ref, process, Pid, _Info} -> + receive + {'EXIT', Pid, _Reason} -> ok + after 1 -> ok end, + error; + {'EXIT', Pid, _Reason} -> + receive + {'DOWN', _, process, Pid, _} -> error + end; + {Pid, Reply} -> + erlang:demonitor(Ref, [flush]), + Reply + end. + +%% Marker for skipped filter or unused generator. +-define(SKIP, (-1)). + +%% Qual = {gen, LE} | fil +-define(qual_data(QNum, GoToIndex, State, Qual), + {QNum, GoToIndex, State, Qual}). + +-record(join, % generated by qlc_pt + {op, q1, q2, wh1, wh2, cs_fun}). % op is unused + +%% le_info/1 returns an intermediate information format only used for +%% testing purposes. Changes will happen without notice. +%% +%% QueryDesc = {qlc, TemplateDesc, [QualDesc], [QueryOpt]} +%% | {table, TableDesc} +%% | {append, [QueryDesc]} +%% | {sort, QueryDesc, [SortOption]} +%% | {keysort, KeyPos, QueryDesc, [SortOption]} +%% | {list, list()} +%% | {list, QueryDesc, MatchExpression} +%% TableDesc = {Mod, Fun, Args} +%% | AbstractExpression +%% | character_list() +%% Mod = module() +%% Fun = atom() +%% Args = [term()] +%% QualDesc = FilterDesc +%% | {generate, PatternDesc, QueryDesc} +%% QueryOpt = {cache, bool()} | cache +%% | {unique, bool()} | unique +%% FilterDesc = PatternDesc = TemplateDesc = binary() + +le_info(#prepared{qh = #simple_qlc{le = LE, p = P, line = L, optz = Optz}}, + InfOpt) -> + QVar = term_to_binary({var, L, P}), + {qlc, QVar, [{generate, QVar, le_info(LE, InfOpt)}], opt_info(Optz)}; +le_info(#prepared{qh = #qlc{codef = CodeF, qdata = Qdata, optz = Optz}}, + InfOpt) -> + Code = CodeF(), + TemplateState = template_state(), + E = element(TemplateState, Code), + QualInfo0 = qual_info(Qdata, Code, InfOpt), + QualInfo1 = case Optz#optz.fast_join of + #qlc_join{} = Join -> + join_info(Join, QualInfo0, Qdata, Code); + no -> + QualInfo0 + end, + QualInfo = [I || I <- QualInfo1, I =/= skip], + {qlc, E, QualInfo, opt_info(Optz)}; +le_info(#prepared{qh = #qlc_table{format_fun = FormatFun, trav_MS = TravMS, + ms = MS, lu_vals = LuVals}}, InfOpt) -> + {NElements, Depth} = InfOpt, + %% The 'depth' option applies to match specifications as well. + %% This is for limiting imported variables (parameters). + DepthFun = depth_fun(Depth), + case LuVals of + _ when FormatFun =:= undefined -> + {table, {'$MOD', '$FUN', []}}; + {Pos, Vals} -> + Formated = try FormatFun({lookup, Pos, Vals, NElements, DepthFun}) + catch _:_ -> FormatFun({lookup, Pos, Vals}) + end, + if + MS =:= no_match_spec -> + {table, Formated}; + true -> + {list, {table, Formated}, depth(MS, Depth)} + end; + _ when TravMS, is_list(MS) -> + {table, FormatFun({match_spec, depth(MS, Depth)})}; + _ when MS =:= no_match_spec -> + try {table, FormatFun({all, NElements, DepthFun})} + catch _:_ -> {table, FormatFun(all)} + end + end; +le_info(#prepared{qh = #qlc_append{hl = HL}}, InfOpt) -> + {append, [le_info(H, InfOpt) || H <- HL]}; +le_info(#prepared{qh = #qlc_sort{h = H, keypos = sort, + fs_opts = SortOptions0, tmpdir = TmpDir}}, + InfOpt) -> + SortOptions = sort_options_global_tmp(SortOptions0, TmpDir), + {sort, le_info(H, InfOpt), SortOptions}; +le_info(#prepared{qh = #qlc_sort{h = H, keypos = {keysort, Kp}, + fs_opts = SortOptions0, tmpdir = TmpDir}}, + InfOpt) -> + SortOptions = sort_options_global_tmp(SortOptions0, TmpDir), + {keysort, le_info(H, InfOpt), Kp, SortOptions}; +le_info(#prepared{qh = #qlc_list{l = L, ms = no_match_spec}}, _InfOpt) -> + {list, L}; +le_info(#prepared{qh = #qlc_list{l = L, ms = MS}},_InfOpt) when is_list(L) -> + {list, {list, L}, MS}; +le_info(#prepared{qh = #qlc_list{l = L, ms = MS}}, InfOpt) -> + {list, le_info(L, InfOpt), MS}. + +qual_info([?qual_data(_QNum, _GoI, ?SKIP, fil) | Qdata], Code, InfOpt) -> + %% see skip_lookup_filters() + [skip | qual_info(Qdata, Code, InfOpt)]; +qual_info([?qual_data(QNum, _GoI, _SI, fil) | Qdata], Code, InfOpt) -> + [element(QNum + 1, Code) | qual_info(Qdata, Code, InfOpt)]; +qual_info([?qual_data(_QNum, _GoI, _SI, {gen,#join{}}) | Qdata], + Code, InfOpt) -> + [skip | qual_info(Qdata, Code, InfOpt)]; +qual_info([?qual_data(QNum, _GoI, _SI, {gen,LE}) | Qdata], Code, InfOpt) -> + [{generate,element(QNum + 1, Code),le_info(LE, InfOpt)} | + qual_info(Qdata, Code, InfOpt)]; +qual_info([], _Code, _InfOpt) -> + []. + +join_info(Join, QInfo, Qdata, Code) -> + #qlc_join{kind = Kind, q1 = QNum1a, c1 = C1, q2 = QNum2a, c2 = C2, + opt = Opt} = Join, + {?qual_data(JQNum,_,_,_), Rev, QNum1, QNum2, _WH1, _WH2, CsFun} = + find_join_data(Qdata, QNum1a, QNum2a), + {Cs1_0, Cs2_0, Compat} = CsFun(), + [Cs1, Cs2] = case Compat of + [] -> % --R12B-5 + [[{C,[{V,'=:='} || V <- Vs]} || {C,Vs} <- CVs] || + CVs <- [Cs1_0, Cs2_0]]; + _ -> % 'v1', R13A -- + %% Only compared constants (==). + [Cs1_0, Cs2_0] + end, + L = 0, + G1_0 = {var,L,'G1'}, G2_0 = {var,L,'G2'}, + JP = element(JQNum + 1, Code), + %% Create code for wh1 and wh2 in #join{}: + {{I1,G1}, {I2,G2}, QInfoL} = + case Kind of + {merge, _} -> + {JG1,QInfo1} = join_merge_info(QNum1, QInfo, Code, G1_0, Cs1), + {JG2,QInfo2} = join_merge_info(QNum2, QInfo, Code, G2_0, Cs2), + {JG1, JG2, QInfo1 ++ QInfo2}; + _ when Rev -> + {JG2,QInfo2} = join_merge_info(QNum2, QInfo, Code, G2_0, Cs2), + {J1, QInfo1} = join_lookup_info(QNum1, QInfo, G1_0), + {{J1,G1_0}, JG2, QInfo2 ++ [QInfo1]}; + _ -> + {JG1,QInfo1} = join_merge_info(QNum1, QInfo, Code, G1_0, Cs1), + {J2, QInfo2} = join_lookup_info(QNum2, QInfo, G2_0), + {JG1, {J2,G2_0}, QInfo1 ++ [QInfo2]} + end, + {JOptVal, JOp} = kind2op(Kind), + JOpt = [{join, JOptVal}] ++ opt_info(join_unique_cache(Opt)), + JFil = term_to_binary({op,L,JOp, + {call,L,{atom,L,element},[{integer,L,C1},G1]}, + {call,L,{atom,L,element},[{integer,L,C2},G2]}}), + P = term_to_binary({cons, L, G1, G2}), + JInfo = {generate, JP, {qlc, P, QInfoL ++ [JFil], JOpt}}, + {Before, [I1 | After]} = lists:split(QNum1 - 1, QInfo), + Before ++ [JInfo] ++ lists:delete(I2, After). + +kind2op({merge, _KE}) -> {merge, '=='}; +kind2op({lookup, KE, _LU_fun}) -> {lookup, KE}. + +%% qlc:q(P0 || P0 = Pattern <- H1, ConstFilters), +%% where "P0" is a fresh variable and ConstFilters are filters that +%% test constant values of pattern columns. +join_merge_info(QNum, QInfo, Code, G, ExtraConstants) -> + {generate, _, LEInfo}=I = lists:nth(QNum, QInfo), + P = binary_to_term(element(QNum + 1, Code)), + case {P, ExtraConstants} of + {{var, _, _}, []} -> + %% No need to introduce a QLC. + TP = term_to_binary(G), + I2 = {generate, TP, LEInfo}, + {{I,G}, [I2]}; + _ -> + {EPV, M} = + case P of + {var, _, _} -> + %% No need to introduce a pattern variable. + {P, P}; + _ -> + {PV, _} = aux_name1('P', 0, abstract_vars(P)), + L = 0, + V = {var, L, PV}, + {V, {match, L, V, P}} + end, + DQP = term_to_binary(EPV), + LEI = {generate, term_to_binary(M), LEInfo}, + TP = term_to_binary(G), + CFs = [begin + Call = {call,0,{atom,0,element},[{integer,0,Col},EPV]}, + F = list2op([{op,0,Op,abstract_term(Con),Call} + || {Con,Op} <- ConstOps], 'or'), + term_to_binary(F) + end || + {Col,ConstOps} <- ExtraConstants], + {{I,G}, [{generate, TP, {qlc, DQP, [LEI | CFs], []}}]} + end. + +list2op([E], _Op) -> + E; +list2op([E | Es], Op) -> + {op,0,Op,E,list2op(Es, Op)}. + +join_lookup_info(QNum, QInfo, G) -> + {generate, _, LEInfo}=I = lists:nth(QNum, QInfo), + TP = term_to_binary(G), + {I, {generate, TP, LEInfo}}. + +opt_info(#optz{unique = Unique, cache = Cache0, join_option = JoinOption}) -> + %% No 'nested_loop' options are added here, even if there are + %% nested loops to carry out, unless a 'nested_loop' was given as + %% option. The reason is that the qlc module does not know about + %% all instances of nested loops. + Cache = if + Cache0 -> ets; + true -> Cache0 + end, + [{T,V} || {T,V} <- [{cache,Cache},{unique,Unique}], + V =/= default_option(T)] ++ + [{T,V} || {T,V} <- [{join,JoinOption}], V =:= nested_loop]. + +prepare_qlc(H, InitialValue, GUnique, GCache, TmpDir, MaxList, TmpUsage) -> + GOpt = #qlc_opt{unique = GUnique, cache = GCache, + tmpdir = TmpDir, max_list = MaxList, + tmpdir_usage = TmpUsage}, + case opt_le(prep_le(H, GOpt), 1) of + #prepared{qh = #qlc{} = QLC}=Prep -> + Prep#prepared{qh = QLC#qlc{init_value = InitialValue}}; + #prepared{qh = #simple_qlc{}=SimpleQLC}=Prep -> + Prep#prepared{qh = SimpleQLC#simple_qlc{init_value = InitialValue}}; + Prep -> + Prep + end. + +%%% The options given to append, q and table (unique and cache) as well +%%% as the type of expression (list, table, append, qlc...) are +%%% analyzed by prep_le. The results are is_unique_objects and +%%% is_cached. It is checked that the evaluation (in the Erlang sense) +%%% of list expressions yields qlc handles. + +prep_le(#qlc_lc{lc = LC_fun, opt = #qlc_opt{} = Opt0}=H, GOpt) -> + #qlc_opt{unique = GUnique, cache = GCache, + tmpdir = TmpDir, max_list = MaxList, + tmpdir_usage = TmpUsage} = GOpt, + Unique = Opt0#qlc_opt.unique or GUnique, + Cache = if + not GCache -> Opt0#qlc_opt.cache; + true -> GCache + end, + Opt = Opt0#qlc_opt{unique = Unique, cache = Cache, + tmpdir = TmpDir, max_list = MaxList, + tmpdir_usage = TmpUsage}, + prep_qlc_lc(LC_fun(), Opt, GOpt, H); +prep_le(#qlc_table{info_fun = IF}=T, GOpt) -> + {SortInfo, Sorted} = table_sort_info(T), + IsUnique = grd(IF, is_unique_objects), + Prep = #prepared{qh = T, sort_info = SortInfo, sorted = Sorted, + is_unique_objects = IsUnique}, + Opt = if + IsUnique or not GOpt#qlc_opt.unique, + T#qlc_table.ms =:= no_match_spec -> + GOpt#qlc_opt{cache = false}; + true -> + GOpt + end, + may_create_simple(Opt, Prep); +prep_le(#qlc_append{hl = HL}, GOpt) -> + case lists:flatmap(fun(#prepared{qh = #qlc_list{l = []}}) -> []; + (#prepared{qh = #qlc_append{hl = HL1}}) -> HL1; + (H) -> [H] end, + [prep_le(H, GOpt) || H <- HL]) of + []=Nil -> + short_list(Nil); + [Prep] -> + Prep; + PrepL -> + Cache = lists:all(fun(#prepared{is_cached = IsC}) -> IsC =/= false + end, PrepL), + %% The handles in hl are replaced by prepared handles: + Prep = #prepared{qh = #qlc_append{hl = PrepL}, is_cached = Cache}, + may_create_simple(GOpt, Prep) + end; +prep_le(#qlc_sort{h = H0}=Q0, GOpt) -> + %% The handle h is replaced by a prepared handle: + Q = Q0#qlc_sort{h = prep_le(H0, GOpt)}, + prep_sort(Q, GOpt); +prep_le([_, _ | _]=L, GOpt) -> + Prep = #prepared{qh = #qlc_list{l = L}, is_cached = true}, + Opt = if + not GOpt#qlc_opt.unique -> + GOpt#qlc_opt{cache = false}; + true -> GOpt + end, + may_create_simple(Opt, Prep); +prep_le(L, _GOpt) when is_list(L) -> + short_list(L); +prep_le(T, _GOpt) -> + erlang:error({unsupported_qlc_handle, #qlc_handle{h = T}}). + +eval_le(LE_fun, GOpt) -> + case LE_fun() of + {error, ?MODULE, _} = Error -> + throw_error(Error); + R -> + case get_handle(R) of + badarg -> + erlang:error(badarg, [R]); + H -> + prep_le(H, GOpt) + end + end. + +prep_qlc_lc({simple_v1, PVar, LE_fun, L}, Opt, GOpt, _H) -> + check_lookup_option(Opt, false), + prep_simple_qlc(PVar, L, eval_le(LE_fun, GOpt), Opt); +prep_qlc_lc({qlc_v1, QFun, CodeF, Qdata0, QOpt}, Opt, GOpt, _H) -> + F = fun(?qual_data(_QNum, _GoI, _SI, fil)=QualData, ModGens) -> + {QualData, ModGens}; + (?qual_data(_QNum, _GoI, _SI, {gen, #join{}})=QualData, ModGens) -> + {QualData, ModGens}; + (?qual_data(QNum, GoI, SI, {gen, LE_fun}), ModGens0) -> + Prep1 = eval_le(LE_fun, GOpt), + {Prep, ModGens} = + prep_generator(QNum, Prep1, QOpt, Opt, ModGens0), + {?qual_data(QNum, GoI, SI, {gen, Prep}), ModGens} + end, + {Qdata, ModGens} = lists:mapfoldl(F, [], Qdata0), + SomeLookUp = lists:keymember(true, 2, ModGens) =/= false, + check_lookup_option(Opt, SomeLookUp), + case ModGens of + [{_QNum, _LookUp, all, OnePrep}] -> + check_join_option(Opt), + OnePrep; + _ -> + Prep0 = prep_qlc(QFun, CodeF, Qdata, QOpt, Opt), + LU_SkipQuals = + lists:flatmap(fun({QNum,_LookUp,Fs,_Prep}) -> [{QNum,Fs}] + end, ModGens), + Prep1 = Prep0#prepared{lu_skip_quals = LU_SkipQuals}, + prep_join(Prep1, QOpt, Opt) + end; +prep_qlc_lc(_, _Opt, _GOpt, H) -> + erlang:error({unsupported_qlc_handle, #qlc_handle{h = H}}). + +prep_generator(QNum, Prep0, QOpt, Opt, ModGens) -> + PosFun = fun(KeyEquality) -> pos_fun(KeyEquality, QOpt, QNum) end, + MSFs = case match_specs(QOpt, QNum) of + undefined -> + {no_match_spec, []}; + {_, _}=MSFs0 -> + MSFs0 + end, + #prepared{qh = LE} = Prep0, + case prep_gen(LE, Prep0, PosFun, MSFs, Opt) of + {replace, Fs, LookUp, Prep} -> + {Prep, [{QNum,LookUp,Fs,Prep} | ModGens]}; + {skip, SkipFils, LookUp, Prep} -> + {Prep, [{QNum,LookUp,SkipFils,Prep} | ModGens]}; + {no, _Fs, _LookUp, Prep} -> + {Prep, ModGens} + end. + +pos_fun(undefined, QOpt, QNum) -> + {'=:=', constants(QOpt, QNum)}; %% --R12B-5 +pos_fun('=:=', QOpt, QNum) -> + {'=:=', constants(QOpt, QNum)}; +pos_fun('==', QOpt, QNum) -> + try {'==', equal_constants(QOpt, QNum)} % R13A-- + catch _:_ -> {'=:=', constants(QOpt, QNum)} + end. + +prep_gen(#qlc_table{lu_vals = LuV0, ms = MS0, trav_MS = TravMS, + info_fun = IF, lookup_fun = LU_fun, + key_equality = KeyEquality}=LE0, + Prep0, PosFun0, {MS, Fs}, Opt) -> + PosFun = PosFun0(KeyEquality), + {LuV, {STag,SkipFils}} = find_const_positions(IF, LU_fun, PosFun, Opt), + LU = LuV =/= false, + if + LuV0 =/= undefined; MS0 =/= no_match_spec -> + {no, [], false, Prep0}; + MS =/= no_match_spec, LU -> + MS1 = if + Fs =:= SkipFils; STag =:= Fs -> + %% The guard of the match specification + %% is covered by the lookup. + case MS of + [{'$1',_Guard,['$1']}] -> % no transformation + no_match_spec; + [{Head,_Guard,Body}] -> + [{Head,[],Body}] % true guard + end; + true -> + MS + end, + Prep = Prep0#prepared{qh = LE0#qlc_table{lu_vals = LuV,ms = MS1}}, + {replace, Fs, LU, Prep}; + LU -> + Prep = Prep0#prepared{qh = LE0#qlc_table{lu_vals = LuV}}, + {skip, SkipFils, LU, Prep}; + TravMS, MS =/= no_match_spec -> + Prep = Prep0#prepared{qh = LE0#qlc_table{ms = MS}, + is_unique_objects = false}, + {replace, Fs, false, may_create_simple(Opt, Prep)}; + true -> + {no, [], false, Prep0} + end; +prep_gen(#qlc_list{l = []}, Prep0, _PosFun, {_MS, Fs}, _Opt) -> + %% unique and cached + {replace, Fs, false, Prep0}; +prep_gen(#qlc_list{ms = no_match_spec}=LE0, Prep0, _PosFun, {MS, Fs}, Opt) + when MS =/= no_match_spec -> + Prep = Prep0#prepared{qh = LE0#qlc_list{ms = MS}, + is_cached = false}, + {replace, Fs, false, may_create_simple(Opt, Prep)}; +prep_gen(#qlc_list{}, Prep0, _PosFun, {MS, Fs}, Opt) + when MS =/= no_match_spec -> + ListMS = #qlc_list{l = Prep0, ms = MS}, + LE = #prepared{qh = ListMS, is_cached = false}, + {replace, Fs, false, may_create_simple(Opt, LE)}; +prep_gen(_LE0, Prep0, _PosFun, _MSFs, _Opt) -> + {no, [], false, Prep0}. + +-define(SIMPLE_QVAR, 'SQV'). + +may_create_simple(#qlc_opt{unique = Unique, cache = Cache} = Opt, + #prepared{is_cached = IsCached, + is_unique_objects = IsUnique} = Prep) -> + if + Unique and not IsUnique; + (Cache =/= false) and not IsCached -> + prep_simple_qlc(?SIMPLE_QVAR, 1, Prep, Opt); + true -> + Prep + end. + +prep_simple_qlc(PVar, Line, LE, Opt) -> + check_join_option(Opt), + #prepared{is_cached = IsCached, + sort_info = SortInfo, sorted = Sorted, + is_unique_objects = IsUnique} = LE, + #qlc_opt{unique = Unique, cache = Cache} = Opt, + Cachez = if + Unique -> Cache; + not IsCached -> Cache; + true -> false + end, + Optz = #optz{unique = Unique and not IsUnique, + cache = Cachez, opt = Opt}, + QLC = #simple_qlc{p = PVar, le = LE, line = Line, + init_value = not_a_list, optz = Optz}, + %% LE#prepared.join is not copied + #prepared{qh = QLC, is_unique_objects = IsUnique or Unique, + sort_info = SortInfo, sorted = Sorted, + is_cached = IsCached or (Cachez =/= false)}. + +prep_sort(#qlc_sort{h = #prepared{sorted = yes}=Prep}, _GOpt) -> + Prep; +prep_sort(#qlc_sort{h = #prepared{is_unique_objects = IsUniqueObjs}}=Q, + GOpt) -> + S1 = sort_unique(IsUniqueObjs, Q), + S2 = sort_tmpdir(S1, GOpt), + S = S2#qlc_sort{tmpdir_usage = GOpt#qlc_opt.tmpdir_usage}, + {SortInfo, Sorted} = sort_sort_info(S), + #prepared{qh = S, is_cached = true, sort_info = SortInfo, + sorted = Sorted, + is_unique_objects = S#qlc_sort.unique or IsUniqueObjs}. + +prep_qlc(QFun, CodeF, Qdata0, QOpt, Opt) -> + #qlc_opt{unique = Unique, cache = Cache, join = Join} = Opt, + Optz = #optz{unique = Unique, cache = Cache, + join_option = Join, opt = Opt}, + {Qdata, SortInfo} = qlc_sort_info(Qdata0, QOpt), + QLC = #qlc{lcf = QFun, codef = CodeF, qdata = Qdata, + init_value = not_a_list, optz = Optz}, + #prepared{qh = QLC, sort_info = SortInfo, + is_unique_objects = Unique, + is_cached = Cache =/= false}. + +%% 'sorted', 'sorted_info', and 'sorted_info2' are used to avoid +%% sorting on a key when there is no need to sort on the key. 'sorted' +%% is set by qlc:sort() only; its purpose is to assure that if columns +%% 1 to i are constant, then column i+1 is key-sorted (always true if +%% the tuples are sorted). Note: the implementation is (too?) simple. +%% For instance, each column is annotated with 'ascending' or +%% 'descending' (not yet). More exact would be, as examples, 'always +%% ascending' and 'ascending if all preceding columns are constant'. +%% +%% The 'size' of the template is not used (size_of_qualifier(QOpt, 0)). + +qlc_sort_info(Qdata0, QOpt) -> + F = fun(?qual_data(_QNum, _GoI, _SI, fil)=Qd, Info) -> + {Qd, Info}; + (?qual_data(_QNum, _GoI, _SI, {gen, #join{}})=Qd, Info) -> + {Qd, Info}; + (?qual_data(QNum, GoI, SI, {gen, PrepLE0}), Info) -> + PrepLE = sort_info(PrepLE0, QNum, QOpt), + Qd = ?qual_data(QNum, GoI, SI, {gen, PrepLE}), + I = [{{Column,Order}, [{traverse,QNum,C}]} || + {{C,Order},What} <- PrepLE#prepared.sort_info2, + What =:= [], % Something else later... + Column <- equal_template_columns(QOpt, {QNum,C})], + {Qd, [I | Info]} + end, + {Qdata, SortInfoL} = lists:mapfoldl(F, [], Qdata0), + SortInfo0 = [{{Pos,Ord}, [template]} || + Pos <- constant_columns(QOpt, 0), + Ord <- orders(yes)] + ++ lists:append(SortInfoL), + SortInfo = family_union(SortInfo0), + {Qdata, SortInfo}. + +sort_info(#prepared{sort_info = SI, sorted = S} = Prep, QNum, QOpt) -> + SI1 = [{{C,Ord},[]} || + S =/= no, + is_integer(Sz = size_of_qualifier(QOpt, QNum)), + Sz > 0, % the size of the pattern + (NConstCols = size_of_constant_prefix(QOpt, QNum)) < Sz, + C <- [NConstCols+1], + Ord <- orders(S)] + ++ [{{Pos,Ord},[]} || Pos <- constant_columns(QOpt, QNum), + Ord <- orders(yes)] + ++ [{PosOrd,[]} || {PosOrd,_} <- SI], + SI2 = lists:usort(SI1), + Prep#prepared{sort_info2 = SI2}. + +%orders(descending=O) -> +% [O]; +orders(ascending=O) -> + [O]; +orders(yes) -> + [ascending +% ,descending + ]. + +sort_unique(true, #qlc_sort{fs_opts = SortOptions, keypos = sort}=Sort) -> + Sort#qlc_sort{unique = false, + fs_opts = + lists:keydelete(unique, 1, + lists:delete(unique, SortOptions))}; +sort_unique(_, Sort) -> + Sort. + +sort_tmpdir(S, #qlc_opt{tmpdir = ""}) -> + S; +sort_tmpdir(S, Opt) -> + S#qlc_sort{tmpdir = Opt#qlc_opt.tmpdir}. + +short_list(L) -> + %% length(L) < 2: all elements are known be equal + #prepared{qh = #qlc_list{l = L}, sorted = yes, is_unique_objects = true, + is_cached = true}. + +find_const_positions(IF, LU_fun, {KeyEquality, PosFun}, + #qlc_opt{max_lookup = Max, lookup = Lookup}) + when is_function(LU_fun), is_function(PosFun), is_function(IF), + Lookup =/= false -> + case call(IF, keypos, undefined, []) of + undefined -> + Indices = call(IF, indices, undefined, []), + find_const_position_idx(Indices, KeyEquality, PosFun, Max, []); + KeyPos -> + case pos_vals(KeyPos, KeyEquality, PosFun(KeyPos), Max) of + false -> + find_const_position_idx(IF(indices), KeyEquality, + PosFun, Max, []); + PosValuesSkip -> + PosValuesSkip + end + end; +find_const_positions(_IF, _LU_fun, _KE_PosFun, _Opt0) -> + {false, {some,[]}}. + +find_const_position_idx([I | Is], KeyEquality, PosFun, Max, L0) -> + case pos_vals(I, KeyEquality, PosFun(I), Max) of + false -> + find_const_position_idx(Is, KeyEquality, PosFun, Max, L0); + {{_Pos, Values}, _SkipFils}=PosValuesFils -> + L = [{length(Values), PosValuesFils} | L0], + find_const_position_idx(Is, KeyEquality, PosFun, Max, L) + end; +find_const_position_idx(_, _KeyEquality, _PosFun, _Max, []) -> + {false, {some,[]}}; +find_const_position_idx(_, _KeyEquality, _PosFun, _Max, L) -> + [{_,PVF} | _] = lists:sort(L), + PVF. + +pos_vals(Pos, '==', {usort_needed, Values, SkipFils}, Max) -> + pos_vals_max(Pos, lists:usort(Values), SkipFils, Max); +pos_vals(Pos, '=:=', {usort_needed, Values, SkipFils}, Max) -> + pos_vals_max(Pos, lists:sort(nub(Values)), SkipFils, Max); +pos_vals(Pos, _KeyEquality, {values, Values, SkipFils}, Max) -> + pos_vals_max(Pos, Values, SkipFils, Max); +pos_vals(_Pos, _KeyEquality, _T, _Max) -> + false. + +nub([]) -> + []; +nub([E | L]) -> + case lists:member(E, Es=nub(L)) of + true -> + Es; + false -> + [E | Es] + end. + +%% length(Values) >= 1 +pos_vals_max(Pos, Values, Skip, Max) when Max =:= -1; Max >= length(Values) -> + {{Pos, Values}, Skip}; +pos_vals_max(_Pos, _Value, _Skip, _Max) -> + false. + +prep_join(Prep, QOpt, Opt) -> + case join_opt(QOpt) of + undefined -> + check_join_option(Opt), + Prep; + EqualMatch -> + {Ix, M} = case EqualMatch of + {NEqual, NMatch} -> + pref_join(NEqual, NMatch, Prep, QOpt, Opt); + EM -> + pref_join(EM, EM, Prep, QOpt, Opt) + end, + SI = family_union(Prep#prepared.sort_info ++ M), + Prep#prepared{join = {Ix, M}, sort_info = SI} + end. + +%% The parse transform ensures that only two tables are involved. +pref_join(Equal, Match, Prep, QOpt, #qlc_opt{join = JoinOpt}) -> + JQs = [{KeyEquality, QCs} || + {KeyEquality, QCsL} <- [{'==',Equal}, {'=:=',Match}], + QCs <- QCsL], + IxL = [pref_lookup_join(KE, QCs, Prep, QOpt) || + JoinOpt =:= any orelse JoinOpt =:= lookup, + {KE, QCs} <- JQs], + ML = [pref_merge_join(KE, QCs, Prep, QOpt) || + JoinOpt =:= any orelse JoinOpt =:= merge, + {KE, QCs} <- JQs], + {lists:usort(lists:append(IxL)), lists:usort(lists:append(ML))}. + +pref_lookup_join(KeyEquality, {[{Q1,C1},{Q2,C2}],Skip}, Prep, QOpt) + when is_integer(C1), is_integer(C2) -> + #prepared{qh = #qlc{qdata = QData}} = Prep, + Is1 = lookup_qual_data(QData, Q1, KeyEquality), + Lu2 = [pref_lookup_join2(Q2, C2, Q1, C1, Skip, QOpt, KeyEquality) || + IC1 <- Is1, IC1 =:= C1], + Is2 = lookup_qual_data(QData, Q2, KeyEquality), + Lu1 = [pref_lookup_join2(Q1, C1, Q2, C2, Skip, QOpt, KeyEquality) || + IC2 <- Is2, IC2 =:= C2], + family(Lu1 ++ Lu2); +pref_lookup_join(KE, [{_,Cs1},{_,Cs2}]=L, Prep, QOpt) when is_list(Cs1), + is_list(Cs2) -> + %% --R12B-5 + lists:append([pref_lookup_join(KE, QC,Prep,QOpt) || + QC <- selections_no_skip(L)]). + +lookup_qual_data(QData, QNum, KeyEquality) -> + case lists:keysearch(QNum, 1, QData) of + {value, ?qual_data(QNum, _, _, {gen, PrepLE})} -> + join_indices(PrepLE, KeyEquality) + end. + +%% If the table has a match specification (ms =/= no_match_spec) that +%% has _not_ been derived from a filter but from a query handle then +%% the lookup join cannot be done. This particular case has not been +%% excluded here but is taken care of in opt_join(). +join_indices(#prepared{qh = #qlc_table{info_fun = IF, + lookup_fun = LU_fun, + key_equality = KeyEquality, + lu_vals = undefined}}, + KE) when is_function(LU_fun), + KE =:= KeyEquality orelse + KE =:= '=:=' andalso + KeyEquality =:= undefined -> % --R12B-5 + KpL = case call(IF, keypos, undefined, []) of + undefined -> []; + Kp -> [Kp] + end, + case call(IF, indices, undefined, []) of + undefined -> KpL; + Is0 -> lists:usort(KpL ++ Is0) + end; +join_indices(_Prep, _KeyEquality) -> + []. + +pref_lookup_join2(Q1, C1, Q2, C2, Skip, QOpt, KeyEquality) -> + TemplCols = compared_template_columns(QOpt, {Q1,C1}, KeyEquality), + {{Q1,C1,Q2,C2},{lookup_join,TemplCols,KeyEquality,Skip}}. + +pref_merge_join(KE, {[{Q1,C1},{Q2,C2}],Skip}, Prep, QOpt) + when is_integer(C1), is_integer(C2) -> + #prepared{qh = #qlc{qdata = QData}} = Prep, + Sort1 = merge_qual_data(QData, Q1), + Sort2 = merge_qual_data(QData, Q2), + Merge = pref_merge(KE, Q1, C1, Q2, C2, Skip, Sort1, Sort2, QOpt), + family_union(Merge); +pref_merge_join(KE, [{_,Cs1},{_,Cs2}]=L, Prep, QOpt) when is_list(Cs1), + is_list(Cs2) -> + %% --R12B-5 + lists:append([pref_merge_join(KE, QC, Prep, QOpt) || + QC <- selections_no_skip(L)]). + +selections_no_skip(L) -> + [{C,{some,[]}} || C <- all_selections(L)]. + +merge_qual_data(QData, QNum) -> + case lists:keysearch(QNum, 1, QData) of + {value, ?qual_data(QNum, _, _, {gen, PrepLE})} -> + #prepared{sort_info2 = SortInfo} = PrepLE, + SortInfo + end. + +pref_merge(KE, Q1, C1, Q2, C2, Skip, Sort1, Sort2, QOpt) -> + Col1 = {Q1,C1}, + Col2 = {Q2,C2}, + DoSort = [QC || {{_QNum,Col}=QC,SortL} <- [{Col1,Sort1}, {Col2,Sort2}], + lists:keymember({Col, ascending}, 1, SortL) =:= false], + J = [{{Q1,C1,Q2,C2}, {merge_join,DoSort,KE,Skip}}], + %% true = (QOpt(template))(Col1, '==') =:= (QOpt(template))(Col2, '==') + [{{Column, ascending}, J} || + Column <- equal_template_columns(QOpt, Col1)] ++ [{other, J}]. + +table_sort_info(#qlc_table{info_fun = IF}) -> + case call(IF, is_sorted_key, undefined, []) of + undefined -> + {[], no}; + false -> + {[], no}; + true -> + case call(IF, keypos, undefined, []) of + undefined -> % strange + {[], no}; + KeyPos -> + {[{{KeyPos,ascending},[]}], no} + end + end. + +sort_sort_info(#qlc_sort{keypos = sort, order = Ord0}) -> + {[], sort_order(Ord0)}; +sort_sort_info(#qlc_sort{keypos = {keysort,Kp0}, order = Ord0}) -> + Kp = case Kp0 of + [Pos | _] -> Pos; + _ -> Kp0 + end, + {[{{Kp,sort_order(Ord0)},[]}], no}. + +sort_order(F) when is_function(F) -> + no; +sort_order(Order) -> + Order. + +check_join_option(#qlc_opt{join = any}) -> + ok; +check_join_option(#qlc_opt{join = Join}) -> + erlang:error(no_join_to_carry_out, [{join,Join}]). + +check_lookup_option(#qlc_opt{lookup = true}, false) -> + erlang:error(no_lookup_to_carry_out, [{lookup,true}]); +check_lookup_option(_QOpt, _LuV) -> + ok. + +compared_template_columns(QOpt, QNumColumn, KeyEquality) -> + (QOpt(template))(QNumColumn, KeyEquality). + +equal_template_columns(QOpt, QNumColumn) -> + (QOpt(template))(QNumColumn, '=='). + +%eq_template_columns(QOpt, QNumColumn) -> +% (QOpt(template))(QNumColumn, '=:='). + +size_of_constant_prefix(QOpt, QNum) -> + (QOpt(n_leading_constant_columns))(QNum). + +constants(QOpt, QNum) -> + (QOpt(constants))(QNum). + +equal_constants(QOpt, QNum) -> + (QOpt(equal_constants))(QNum). + +join_opt(QOpt) -> + QOpt(join). + +match_specs(QOpt, QNum) -> + (QOpt(match_specs))(QNum). + +constant_columns(QOpt, QNum) -> + (QOpt(constant_columns))(QNum). + +size_of_qualifier(QOpt, QNum) -> + (QOpt(size))(QNum). + +%% Two optimizations are carried out: +%% 1. The first generator is never cached if the QLC itself is cached. +%% Since the answers do not need to be cached, the top-most QLC is +%% never cached either. Simple QLCs not holding any options are +%% removed. Simple QLCs are coalesced when possible. +%% 2. Merge join and lookup join is done if possible. + +opt_le(#prepared{qh = #simple_qlc{le = LE0, optz = Optz0}=QLC}=Prep0, + GenNum) -> + case LE0 of + #prepared{qh = #simple_qlc{p = LE_Pvar, le = LE2, optz = Optz2}} -> + %% Coalesce two simple QLCs. + Cachez = case Optz2#optz.cache of + false -> Optz0#optz.cache; + Cache2 -> Cache2 + end, + Optz = Optz0#optz{cache = Cachez, + unique = Optz0#optz.unique or Optz2#optz.unique}, + PVar = if + LE_Pvar =:= ?SIMPLE_QVAR -> QLC#simple_qlc.p; + true -> LE_Pvar + end, + Prep = Prep0#prepared{qh = QLC#simple_qlc{p = PVar, le = LE2, + optz = Optz}}, + opt_le(Prep, GenNum); + _ -> + Optz1 = no_cache_of_first_generator(Optz0, GenNum), + case {opt_le(LE0, 1), Optz1} of + {LE, #optz{unique = false, cache = false}} -> + LE; + {LE, _} -> + Prep0#prepared{qh = QLC#simple_qlc{le = LE, optz = Optz1}} + end + end; +opt_le(#prepared{qh = #qlc{}, lu_skip_quals = LU_SkipQuals0}=Prep0, GenNum) -> + #prepared{qh = #qlc{qdata = Qdata0, optz = Optz0}=QLC} = Prep0, + #optz{join_option = JoinOption, opt = Opt} = Optz0, + JoinOption = Optz0#optz.join_option, + {LU_QNum, Join, JoinSkipFs, DoSort} = + opt_join(Prep0#prepared.join, JoinOption, Qdata0, Opt, LU_SkipQuals0), + {LU_Skip, LU_SkipQuals} = + lists:partition(fun({QNum,_Fs}) -> QNum =:= LU_QNum end, + LU_SkipQuals0), + LU_SkipFs = lists:flatmap(fun({_QNum,Fs}) -> Fs end, LU_SkipQuals), + %% If LU_QNum has a match spec it must be applied _after_ the + %% lookup join (the filter must not be skipped!). + Qdata1 = if + LU_Skip =:= [] -> Qdata0; + true -> activate_join_lookup_filter(LU_QNum, Qdata0) + end, + Qdata2 = skip_lookup_filters(Qdata1, LU_SkipFs ++ JoinSkipFs), + F = fun(?qual_data(QNum, GoI, SI, {gen, #prepared{}=PrepLE}), GenNum1) -> + NewPrepLE = maybe_sort(PrepLE, QNum, DoSort, Opt), + {?qual_data(QNum, GoI, SI, {gen, opt_le(NewPrepLE, GenNum1)}), + GenNum1 + 1}; + (Qd, GenNum1) -> + {Qd, GenNum1} + end, + {Qdata, _} = lists:mapfoldl(F, 1, Qdata2), + Optz1 = no_cache_of_first_generator(Optz0, GenNum), + Optz = Optz1#optz{fast_join = Join}, + Prep0#prepared{qh = QLC#qlc{qdata = Qdata, optz = Optz}}; +opt_le(#prepared{qh = #qlc_append{hl = HL}}=Prep, GenNum) -> + Hs = [opt_le(H, GenNum) || H <- HL], + Prep#prepared{qh = #qlc_append{hl = Hs}}; +opt_le(#prepared{qh = #qlc_sort{h = H}=Sort}=Prep, GenNum) -> + Prep#prepared{qh = Sort#qlc_sort{h = opt_le(H, GenNum)}}; +opt_le(Prep, _GenNum) -> + Prep. + +no_cache_of_first_generator(Optz, GenNum) when GenNum > 1 -> + Optz; +no_cache_of_first_generator(Optz, 1) -> + Optz#optz{cache = false}. + +maybe_sort(LE, QNum, DoSort, Opt) -> + case lists:keysearch(QNum, 1, DoSort) of + {value, {QNum, Col}} -> + #qlc_opt{tmpdir = TmpDir, tmpdir_usage = TmpUsage} = Opt, + SortOpts = [{tmpdir,Dir} || Dir <- [TmpDir], Dir =/= ""], + Sort = #qlc_sort{h = LE, keypos = {keysort, Col}, unique = false, + compressed = [], order = ascending, + fs_opts = SortOpts, tmpdir_usage = TmpUsage, + tmpdir = TmpDir}, + #prepared{qh = Sort, sorted = no, join = no}; + false -> + LE + end. + +skip_lookup_filters(Qdata, []) -> + Qdata; +skip_lookup_filters(Qdata0, LU_SkipFs) -> + [case lists:member(QNum, LU_SkipFs) of + true -> + ?qual_data(QNum, GoI, ?SKIP, fil); + false -> + Qd + end || ?qual_data(QNum, GoI, _, _)=Qd <- Qdata0]. + +%% If the qualifier used for lookup by the join (QNum) has a match +%% specification it must be applied _after_ the lookup join (the +%% filter must not be skipped!). +activate_join_lookup_filter(QNum, Qdata) -> + {value, {_,GoI2,SI2,{gen,Prep2}}} = lists:keysearch(QNum, 1, Qdata), + Table2 = Prep2#prepared.qh, + NPrep2 = Prep2#prepared{qh = Table2#qlc_table{ms = no_match_spec}}, + %% Table2#qlc_table.ms has been reset; the filter will be run. + lists:keyreplace(QNum, 1, Qdata, ?qual_data(QNum,GoI2,SI2,{gen,NPrep2})). + +opt_join(Join, JoinOption, Qdata, Opt, LU_SkipQuals) -> + %% prep_qlc_lc() assures that no unwanted join is carried out + {Ix0, M0} = Join, + Ix1 = opt_join_lu(Ix0, Qdata, LU_SkipQuals), + Ix = lists:reverse(lists:keysort(2, Ix1)), % prefer to skip + case Ix of + [{{Q1,C1,Q2,C2},Skip,KE,LU_fun} | _] -> + J = #qlc_join{kind = {lookup, KE, LU_fun}, q1 = Q1, + c1 = C1, q2 = Q2, c2 = C2, opt = Opt}, + {Q2, J, Skip, []}; + [] -> + M = opt_join_merge(M0), + case M of + [{{Q1,C1,Q2,C2},{merge_join,DoSort,KE,Skip}}|_] -> + J = #qlc_join{kind = {merge, KE}, opt = Opt, + q1 = Q1, c1 = C1, q2 = Q2, c2 = C2}, + {not_a_qnum, J, Skip, DoSort}; + [] when JoinOption =:= nested_loop -> + {not_a_qnum, no, [], []}; + _ when JoinOption =/= any -> + erlang:error(cannot_carry_out_join, [JoinOption]); + _ -> + {not_a_qnum, no, [], []} + end + end. + +opt_join_lu([{{_Q1,_C1,Q2,_C2}=J,[{lookup_join,_KEols,JKE,Skip0} | _]} | LJ], + Qdata, LU_SkipQuals) -> + {value, {Q2,_,_,{gen,Prep2}}} = lists:keysearch(Q2, 1, Qdata), + #qlc_table{ms = MS, key_equality = KE, + lookup_fun = LU_fun} = Prep2#prepared.qh, + %% If there is no filter to skip (the match spec was derived + %% from a query handle) then the lookup join cannot be done. + case + MS =/= no_match_spec andalso + lists:keymember(Q2, 1, LU_SkipQuals) =:= false + of + true -> + opt_join_lu(LJ, Qdata, LU_SkipQuals); + false -> + %% The join is preferred before evaluating the match spec + %% (if there is one). + Skip = skip_if_possible(JKE, KE, Skip0), + [{J,Skip,KE,LU_fun} | opt_join_lu(LJ, Qdata, LU_SkipQuals)] + end; +opt_join_lu([], _Qdata, _LU_SkipQuals) -> + []. + +opt_join_merge(M) -> + %% Prefer not to sort arguments. Prefer to skip join filter. + L = [{-length(DoSort),length(Skip), + {QCs,{merge_join,DoSort,KE,Skip}}} || + {_KpOrder_or_other,MJ} <- M, + {QCs,{merge_join,DoSort,KE,Skip0}} <- MJ, + Skip <- [skip_if_possible(KE, '==', Skip0)]], + lists:reverse([J || {_,_,J} <- lists:sort(L)]). + +%% Cannot skip the join filter the join operator is '=:=' and the join +%% is performed using '=='. Note: the tag 'some'/'all' is not used. +skip_if_possible('=:=', '==', _) -> + []; +skip_if_possible(_, _, {_SkipTag, Skip}) -> + Skip. + +%% -> {Objects, Post, LocalPost} | throw() +%% Post is a list of funs (closures) to run afterwards. +%% LocalPost should be run when all objects have been found (optimization). +%% LocalPost will always be a subset of Post. +%% List expressions are evaluated, resulting in lists of objects kept in +%% RAM or on disk. +%% An error term is thrown as soon as cleanup according Post has been +%% done. (This is opposed to errors during evaluation; such errors are +%% returned as terms.) +setup_qlc(Prep, Setup) -> + Post0 = [], + setup_le(Prep, Post0, Setup). + +setup_le(#prepared{qh = #simple_qlc{le = LE, optz = Optz}}, Post0, Setup) -> + {Objs, Post, LocalPost} = setup_le(LE, Post0, Setup), + unique_cache(Objs, Post, LocalPost, Optz); +setup_le(#prepared{qh = #qlc{lcf = QFun, qdata = Qdata, init_value = V, + optz = Optz}}, Post0, Setup) -> + {GoTo, FirstState, Post, LocalPost} = + setup_quals(Qdata, Post0, Setup, Optz), + Objs = fun() -> QFun(FirstState, V, GoTo) end, + unique_cache(Objs, Post, LocalPost, Optz); +setup_le(#prepared{qh = #qlc_table{post_fun = PostFun}=Table}, Post, Setup) -> + H = table_handle(Table, Post, Setup), + %% The pre fun has been called from table_handle(): + {H, [PostFun | Post], []}; +setup_le(#prepared{qh = #qlc_append{hl = PrepL}}, Post0, Setup) -> + F = fun(Prep, {Post1, LPost1}) -> + {Objs, Post2, LPost2} = setup_le(Prep, Post1, Setup), + {Objs, {Post2, LPost1++LPost2}} + end, + {ObjsL, {Post, LocalPost}} = lists:mapfoldl(F, {Post0,[]}, PrepL), + {fun() -> append_loop(ObjsL, 0) end, Post, LocalPost}; +setup_le(#prepared{qh = #qlc_sort{h = Prep, keypos = Kp, + unique = Unique, compressed = Compressed, + order = Order, fs_opts = SortOptions0, + tmpdir_usage = TmpUsage,tmpdir = TmpDir}}, + Post0, Setup) -> + SortOptions = sort_options_global_tmp(SortOptions0, TmpDir), + LF = fun(Objs) -> + sort_list(Objs, Order, Unique, Kp, SortOptions, Post0) + end, + case setup_le(Prep, Post0, Setup) of + {L, Post, LocalPost} when is_list(L) -> + {LF(L), Post, LocalPost}; + {Objs, Post, LocalPost} -> + FF = fun(Objs1) -> + file_sort_handle(Objs1, Kp, SortOptions, TmpDir, + Compressed, Post, LocalPost) + end, + sort_handle(Objs, LF, FF, SortOptions, Post, LocalPost, + {TmpUsage, sorting}) + end; +setup_le(#prepared{qh = #qlc_list{l = L, ms = MS}}, Post, _Setup) + when (no_match_spec =:= MS); L =:= [] -> + {L, Post, []}; +setup_le(#prepared{qh = #qlc_list{l = L, ms = MS}}, Post, _Setup) + when is_list(L) -> + {ets:match_spec_run(L, ets:match_spec_compile(MS)), Post, []}; +setup_le(#prepared{qh = #qlc_list{l = H0, ms = MS}}, Post0, Setup) -> + {Objs0, Post, LocalPost} = setup_le(H0, Post0, Setup), + Objs = ets:match_spec_run(Objs0, ets:match_spec_compile(MS)), + {Objs, Post, LocalPost}. + +%% The goto table (a tuple) is created at runtime. It is accessed by +%% the generated code in order to find next clause to execute. For +%% generators there is also a fun; calling the fun runs the list +%% expression of the generator. There are two elements for a filter: +%% the first one is the state to go when the filter is false; the +%% other the state when the filter is true. There are three elements +%% for a generator G: the first one is the state of the generator +%% before G (or the stop state if there is no generator); the second +%% one is the state of the qualifier following the generator (or the +%% template if there is no next generator); the third one is the list +%% expression fun. +%% There are also join generators which are "activated" when it is +%% possbible to do a join. + +setup_quals(Qdata, Post0, Setup, Optz) -> + {GoTo0, Post1, LocalPost0} = + setup_quals(0, Qdata, [], Post0, [], Setup), + GoTo1 = lists:keysort(1, GoTo0), + FirstState0 = next_state(Qdata), + {GoTo2, FirstState, Post, LocalPost1} = + case Optz#optz.fast_join of + #qlc_join{kind = {merge,_KE}, c1 = C1, c2 = C2, opt = Opt} = MJ -> + MF = fun(_Rev, {H1, WH1}, {H2, WH2}) -> + fun() -> + merge_join(WH1(H1), C1, WH2(H2), C2, Opt) + end + end, + setup_join(MJ, Qdata, GoTo1, FirstState0, MF, Post1); + #qlc_join{kind = {lookup,_KE,LuF}, c1 = C1, c2 = C2} = LJ -> + LF = fun(Rev, {H1, WH1}, {H2, WH2}) -> + {H, W} = if + Rev -> {H2, WH2}; + true -> {H1, WH1} + end, + fun() -> + lookup_join(W(H), C1, LuF, C2, Rev) + end + end, + setup_join(LJ, Qdata, GoTo1, FirstState0, LF, Post1); + no -> + {flat_goto(GoTo1), FirstState0, Post1, []} + end, + GoTo = list_to_tuple(GoTo2), + {GoTo, FirstState, Post, LocalPost0 ++ LocalPost1}. + +setup_quals(GenLoopS, [?qual_data(_QNum,GoI,?SKIP,fil) | Qdata], + Gs, P, LP, Setup) -> + %% ?SKIP causes runtime error. See also skip_lookup_filters(). + setup_quals(GenLoopS, Qdata, [{GoI,[?SKIP,?SKIP]} | Gs], P, LP, Setup); +setup_quals(GenLoopS, [?qual_data(_QNum,GoI,_SI,fil) | Qdata], + Gs, P, LP, Setup) -> + setup_quals(GenLoopS, Qdata, [{GoI,[GenLoopS,next_state(Qdata)]} | Gs], + P, LP, Setup); +setup_quals(GenLoopS, [?qual_data(_QNum,GoI,_SI, {gen,#join{}}) | Qdata], + Gs, P, LP, Setup) -> + setup_quals(GenLoopS, Qdata, [{GoI,[?SKIP,?SKIP,?SKIP]} | Gs],P,LP,Setup); +setup_quals(GenLoopS, [?qual_data(_QNum,GoI,SI,{gen,LE}) | Qdata], + Gs, P, LP, Setup) -> + {V, NP, LP1} = setup_le(LE, P, Setup), + setup_quals(SI + 1, Qdata, [{GoI, [GenLoopS,next_state(Qdata),V]} | Gs], + NP, LP ++ LP1, Setup); +setup_quals(GenLoopS, [], Gs, P, LP, _Setup) -> + {[{1,[GenLoopS]} | Gs], P, LP}. + +%% Finds the qualifier in Qdata that performs the join between Q1 and +%% Q2, and sets it up using the handles already set up for Q1 and Q2. +%% Removes Q1 and Q2 from GoTo0 and updates the join qualifier in GoTo0. +%% Note: the parse transform has given each generator three slots +%% in the GoTo table. The position of these slots within the GoTo table +%% is fixed (at runtime). +%% (Assumes there is only one join-generator in Qdata.) +setup_join(J, Qdata, GoTo0, FirstState0, JoinFun, Post0) -> + #qlc_join{q1 = QNum1a, q2 = QNum2a, opt = Opt} = J, + {?qual_data(_QN,JGoI,JSI,_), Rev, QNum1, QNum2, WH1, WH2, _CsFun} = + find_join_data(Qdata, QNum1a, QNum2a), + [{GoI1,SI1}] = [{GoI,SI} || + ?qual_data(QNum,GoI,SI,_) <- Qdata, QNum =:= QNum1], + [{GoI2,SI2}] = [{GoI,SI} || + ?qual_data(QNum,GoI,SI,_) <- Qdata, QNum =:= QNum2], + + [H1] = [H || {GoI,[_Back,_Forth,H]} <- GoTo0, GoI =:= GoI1], + [{BackH2,H2}] = + [{Back,H} || {GoI,[Back,_Forth,H]} <- GoTo0, GoI =:= GoI2], + H0 = JoinFun(Rev, {H1,WH1}, {H2,WH2}), + %% The qlc expression options apply to the introduced qlc expr as well. + {H, Post, LocalPost} = + unique_cache(H0, Post0, [], join_unique_cache(Opt)), + [JBack] = [Back || {GoI,[Back,_,_]} <- GoTo0, GoI =:= GoI1], + JForth = next_after(Qdata, SI1, QNum2), + GoTo1 = lists:map(fun({GoI,_}) when GoI =:= JGoI -> + {JGoI, [JBack, JForth, H]}; + ({GoI,_}) when GoI =:= GoI1; GoI =:= GoI2 -> + {GoI, [?SKIP,?SKIP,?SKIP]}; % not necessary + (Go) -> + Go + end, GoTo0), + GoTo = lists:map(fun(S) when S =:= SI1 -> + JSI; + (S) when S =:= SI2 -> + next_after(Qdata, S, QNum2); + (S) when S =:= SI1+1 -> + JSI+1; + (S) when S =:= SI2+1, SI1 + 1 =:= BackH2 -> + JSI+1; + (S) when S =:= SI2+1 -> + BackH2; + (S) -> S + end, flat_goto(GoTo1)), + FirstState = if + SI1 =:= FirstState0 -> JSI; + true -> FirstState0 + end, + {GoTo, FirstState, Post, LocalPost}. + +join_unique_cache(#qlc_opt{cache = Cache, unique = Unique}=Opt) -> + #optz{cache = Cache, unique = Unique, opt = Opt}. + +flat_goto(GoTo) -> + lists:flatmap(fun({_,L}) -> L end, GoTo). + +next_after([?qual_data(_, _, S, _) | Qdata], S, QNum2) -> + case Qdata of + [?qual_data(QNum2, _, _, _) | Qdata1] -> + next_state(Qdata1); + _ -> + next_state(Qdata) + end; +next_after([_ | Qdata], S, QNum2) -> + next_after(Qdata, S, QNum2). + +next_state([?qual_data(_,_,_,{gen,#join{}}) | Qdata]) -> + next_state(Qdata); +next_state([?qual_data(_,_,?SKIP,fil) | Qdata]) -> + %% see skip_lookup_filters() + next_state(Qdata); +next_state([?qual_data(_,_,S,_) | _]) -> + S; +next_state([]) -> + template_state(). + +find_join_data(Qdata, QNum1, QNum2) -> + [QRev] = [{Q,Rev,QN1,QN2,H1,H2,CsF} || + ?qual_data(_QN,_GoI,_SI, + {gen,#join{q1 = QN1,q2 = QN2, + wh1 = H1, wh2 = H2, + cs_fun = CsF}})= Q <- Qdata, + if + QN1 =:= QNum1, QN2 =:= QNum2 -> + not (Rev = false); + QN1 =:= QNum2, QN2 =:= QNum1 -> + Rev = true; + true -> + Rev = false + end], + QRev. + +table_handle(#qlc_table{trav_fun = TraverseFun, trav_MS = TravMS, + pre_fun = PreFun, lookup_fun = LuF, + parent_fun = ParentFun, lu_vals = LuVals, ms = MS}, + Post, Setup) -> + #setup{parent = Parent} = Setup, + ParentValue = + if + ParentFun =:= undefined -> + undefined; + Parent =:= self() -> + try + ParentFun() + catch Class:Reason -> + post_funs(Post), + erlang:raise(Class, Reason, erlang:get_stacktrace()) + end; + true -> + case monitor_request(Parent, {parent_fun, ParentFun}) of + error -> % parent has died + post_funs(Post), + exit(normal); + {value, Value} -> + Value; + {parent_fun_caught, Class, Reason, Stacktrace} -> + %% No use augmenting Stacktrace here. + post_funs(Post), + erlang:raise(Class, Reason, Stacktrace) + end + end, + StopFun = + if + Parent =:= self() -> + undefined; + true -> + Cursor = #qlc_cursor{c = {self(), Parent}}, + fun() -> delete_cursor(Cursor) end + end, + PreFunArgs = [{parent_value, ParentValue}, {stop_fun, StopFun}], + _ = call(PreFun, PreFunArgs, ok, Post), + case LuVals of + {Pos, Vals} when MS =:= no_match_spec -> + LuF(Pos, Vals); + {Pos, Vals} -> + case LuF(Pos, Vals) of + [] -> + []; + Objs when is_list(Objs) -> + ets:match_spec_run(Objs, + ets:match_spec_compile(MS)); + Error -> + post_funs(Post), + throw_error(Error) + end; + _ when not TravMS -> + MS = no_match_spec, % assertion + TraverseFun; + _ when MS =:= no_match_spec -> + fun() -> TraverseFun([{'$1',[],['$1']}]) end; + _ -> + fun() -> TraverseFun(MS) end + end. + +-define(CHUNK_SIZE, 64*1024). + +open_file(FileName, Extra, Post) -> + case file:open(FileName, [read, raw, binary | Extra]) of + {ok, Fd} -> + {fun() -> + case file:position(Fd, bof) of + {ok, 0} -> + TF = fun([], _) -> + []; + (Ts, C) when is_list(Ts) -> + lists:reverse(Ts, C) + end, + file_loop_read(<<>>, ?CHUNK_SIZE, {Fd,FileName}, TF); + Error -> + file_error(FileName, Error) + end + end, Fd}; + Error -> + post_funs(Post), + throw_file_error(FileName, Error) + end. + +file_loop(Bin0, Fd_FName, Ts0, TF) -> + case + try file_loop2(Bin0, Ts0) + catch _:_ -> + {_Fd, FileName} = Fd_FName, + error({bad_object, FileName}) + end + of + {terms, <<Size:4/unit:8, B/bytes>>=Bin, []} -> + file_loop_read(Bin, Size - byte_size(B) + 4, Fd_FName, TF); + {terms, <<Size:4/unit:8, _/bytes>>=Bin, Ts} -> + C = fun() -> file_loop_read(Bin, Size+4, Fd_FName, TF) end, + TF(Ts, C); + {terms, B, Ts} -> + C = fun() -> file_loop_read(B, ?CHUNK_SIZE, Fd_FName, TF) end, + TF(Ts, C); + Error -> + Error + end. + +file_loop2(<<Size:4/unit:8, B:Size/bytes, Bin/bytes>>, Ts) -> + file_loop2(Bin, [binary_to_term(B) | Ts]); +file_loop2(Bin, Ts) -> + {terms, Bin, Ts}. + +%% After power failures (and only then) files with corrupted Size +%% fields have been observed in a disk_log file. If file:read/2 is +%% asked to read a huge amount of data the emulator may crash. Nothing +%% has been done here to prevent such crashes (by inspecting +%% BytesToRead in some way) since temporary files will never be read +%% after a power failure. +file_loop_read(B, MinBytesToRead, {Fd, FileName}=Fd_FName, TF) -> + BytesToRead = erlang:max(?CHUNK_SIZE, MinBytesToRead), + case file:read(Fd, BytesToRead) of + {ok, Bin} when byte_size(B) =:= 0 -> + file_loop(Bin, Fd_FName, [], TF); + {ok, Bin} -> + case B of + <<Size:4/unit:8, Tl/bytes>> + when byte_size(Bin) + byte_size(Tl) >= Size -> + {B1, B2} = split_binary(Bin, Size - byte_size(Tl)), + Foo = fun([T], Fun) -> [T | Fun] end, + %% TF should be applied exactly once. + case + file_loop(list_to_binary([B, B1]), Fd_FName, [], Foo) + of + [T | Fun] -> + true = is_function(Fun), + file_loop(B2, Fd_FName, [T], TF); + Error -> + Error + end; + _ -> + file_loop(list_to_binary([B, Bin]), Fd_FName, [], TF) + end; + eof when byte_size(B) =:= 0 -> + TF([], foo); + eof -> + error({bad_object, FileName}); + Error -> + file_error(FileName, Error) + end. + +sort_cursor_input(H, NoObjects) -> + fun(close) -> + ok; + (read) -> + sort_cursor_input_read(H, NoObjects) + end. + +sort_cursor_list_output(TmpDir, Z, Unique) -> + fun(close) -> + {terms, []}; + ({value, NoObjects}) -> + fun(BTerms) when Unique; length(BTerms) =:= NoObjects -> + fun(close) -> + {terms, BTerms}; + (BTerms1) -> + sort_cursor_file(BTerms ++ BTerms1, TmpDir, Z) + end; + (BTerms) -> + sort_cursor_file(BTerms, TmpDir, Z) + end + end. + +sort_cursor_file(BTerms, TmpDir, Z) -> + FName = tmp_filename(TmpDir), + case file:open(FName, [write, raw, binary | Z]) of + {ok, Fd} -> + WFun = write_terms(FName, Fd), + WFun(BTerms); + Error -> + throw_file_error(FName, Error) + end. + +sort_options_global_tmp(S, "") -> + S; +sort_options_global_tmp(S, TmpDir) -> + [{tmpdir,TmpDir} | lists:keydelete(tmpdir, 1, S)]. + +tmp_filename(TmpDirOpt) -> + U = "_", + Node = node(), + Pid = os:getpid(), + {MSecs,Secs,MySecs} = erlang:now(), + F = lists:concat([?MODULE,U,Node,U,Pid,U,MSecs,U,Secs,U,MySecs]), + TmpDir = case TmpDirOpt of + "" -> + {ok, CurDir} = file:get_cwd(), + CurDir; + TDir -> + TDir + end, + filename:join(filename:absname(TmpDir), F). + +write_terms(FileName, Fd) -> + fun(close) -> + _ = file:close(Fd), + {file, FileName}; + (BTerms) -> + case file:write(Fd, size_bin(BTerms, [])) of + ok -> + write_terms(FileName, Fd); + Error -> + _ = file:close(Fd), + throw_file_error(FileName, Error) + end + end. + +size_bin([], L) -> + L; +size_bin([BinTerm | BinTerms], L) -> + size_bin(BinTerms, [L, <<(byte_size(BinTerm)):4/unit:8>> | BinTerm]). + +sort_cursor_input_read([], NoObjects) -> + {end_of_input, NoObjects}; +sort_cursor_input_read([Object | Cont], NoObjects) -> + {[term_to_binary(Object)], sort_cursor_input(Cont, NoObjects + 1)}; +sort_cursor_input_read(F, NoObjects) -> + case F() of + Objects when is_list(Objects) -> + sort_cursor_input_read(Objects, NoObjects); + Term -> + throw_error(Term) + end. + +unique_cache(L, Post, LocalPost, Optz) when is_list(L) -> + case Optz#optz.unique of + true -> + {unique_sort_list(L), Post, LocalPost}; + false -> + %% If Optz#optz.cache then an ETS table could be used. + {L, Post, LocalPost} + end; +unique_cache(H, Post, LocalPost, #optz{unique = false, cache = false}) -> + {H, Post, LocalPost}; +unique_cache(H, Post, LocalPost, #optz{unique = true, cache = false}) -> + E = ets:new(qlc, [set, private]), + {fun() -> no_dups(H, E) end, [del_table(E) | Post], LocalPost}; +unique_cache(H, Post, LocalPost, #optz{unique = false, cache = true}) -> + E = ets:new(qlc, [set, private]), + {L, P} = unique_cache_post(E), + {fun() -> cache(H, E, LocalPost) end, [P | Post], [L]}; +unique_cache(H, Post, LocalPost, #optz{unique = true, cache = true}) -> + UT = ets:new(qlc, [bag, private]), + MT = ets:new(qlc, [set, private]), + {L1, P1} = unique_cache_post(UT), + {L2, P2} = unique_cache_post(MT), + {fun() -> ucache(H, UT, MT, LocalPost) end, [P1, P2 | Post], [L1, L2]}; +unique_cache(H, Post, LocalPost, #optz{unique = false, cache = list}=Optz) -> + Ref = make_ref(), + F = del_lcache(Ref), + #qlc_opt{tmpdir = TmpDir, max_list = MaxList, tmpdir_usage = TmpUsage} = + Optz#optz.opt, + {fun() -> lcache(H, Ref, LocalPost, TmpDir, MaxList, TmpUsage) end, + [F | Post], [F]}; +unique_cache(H, Post0, LocalPost0, #optz{unique = true, cache = list}=Optz) -> + #qlc_opt{tmpdir = TmpDir, max_list = MaxList, tmpdir_usage = TmpUsage} = + Optz#optz.opt, + Size = if + MaxList >= 1 bsl 31 -> (1 bsl 31) - 1; + MaxList =:= 0 -> 1; + true -> MaxList + end, + SortOptions = [{size, Size}, {tmpdir, TmpDir}], + USortOptions = [{unique, true} | SortOptions], + TmpUsageM = {TmpUsage, caching}, + LF1 = fun(Objs) -> lists:ukeysort(1, Objs) end, + FF1 = fun(Objs) -> + file_sort_handle(Objs, {keysort, 1}, USortOptions, + TmpDir, [], Post0, LocalPost0) + end, + {UH, Post1, LocalPost1} = sort_handle(tag_objects(H, 1), LF1, FF1, + USortOptions, Post0, LocalPost0, + TmpUsageM), + LF2 = fun(Objs) -> lists:keysort(2, Objs) end, + FF2 = fun(Objs) -> + file_sort_handle(Objs, {keysort, 2}, SortOptions, TmpDir, + [], Post1, LocalPost1) + end, + {SH, Post, LocalPost} = + sort_handle(UH, LF2, FF2, SortOptions, Post1, LocalPost1, TmpUsageM), + if + is_list(SH) -> + %% Remove the tag once and for all. + {untag_objects2(SH), Post, LocalPost}; + true -> + %% Every traversal untags the objects... + {fun() -> untag_objects(SH) end, Post, LocalPost} + end. + +unique_cache_post(E) -> + {empty_table(E), del_table(E)}. + +unique_sort_list(L) -> + E = ets:new(qlc, [set, private]), + unique_list(L, E). + +unique_list([], E) -> + true = ets:delete(E), + []; +unique_list([Object | Objects], E) -> + case ets:member(E, Object) of + false -> + true = ets:insert(E, {Object}), + [Object | unique_list(Objects, E)]; + true -> + unique_list(Objects, E) + end. + +sort_list(L, CFun, true, sort, _SortOptions, _Post) when is_function(CFun) -> + lists:usort(CFun, L); +sort_list(L, CFun, false, sort, _SortOptions, _Post) when is_function(CFun) -> + lists:sort(CFun, L); +sort_list(L, ascending, true, sort, _SortOptions, _Post) -> + lists:usort(L); +sort_list(L, descending, true, sort, _SortOptions, _Post) -> + lists:reverse(lists:usort(L)); +sort_list(L, ascending, false, sort, _SortOptions, _Post) -> + lists:sort(L); +sort_list(L, descending, false, sort, _SortOptions, _Post) -> + lists:reverse(lists:sort(L)); +sort_list(L, Order, Unique, {keysort, Kp}, _SortOptions, _Post) + when is_integer(Kp), is_atom(Order) -> + case {Order, Unique} of + {ascending, true} -> + lists:ukeysort(Kp, L); + {ascending, false} -> + lists:keysort(Kp, L); + {descending, true} -> + lists:reverse(lists:ukeysort(Kp, L)); + {descending, false} -> + lists:reverse(lists:keysort(Kp, L)) + end; +sort_list(L, _Order, _Unique, Sort, SortOptions, Post) -> + In = fun(_) -> {L, fun(_) -> end_of_input end} end, + Out = sort_list_output([]), + TSortOptions = [{format,term} | SortOptions], + do_sort(In, Out, Sort, TSortOptions, Post). + +sort_list_output(L) -> + fun(close) -> + lists:append(lists:reverse(L)); + (Terms) when is_list(Terms) -> + sort_list_output([Terms | L]) + end. + +%% Don't use the file_sorter unless it is known that objects will be +%% put on a temporary file (optimization). +sort_handle(H, ListFun, FileFun, SortOptions, Post, LocalPost, TmpUsageM) -> + Size = case lists:keysearch(size, 1, SortOptions) of + {value, {size, Size0}} -> Size0; + false -> default_option(size) + end, + sort_cache(H, [], Size, {ListFun, FileFun, Post, LocalPost, TmpUsageM}). + +sort_cache([], CL, _Sz, {LF, _FF, Post, LocalPost, _TmpUsageM}) -> + {LF(lists:reverse(CL)), Post, LocalPost}; +sort_cache(Objs, CL, Sz, C) when Sz < 0 -> + sort_cache2(Objs, CL, false, C); +sort_cache([Object | Cont], CL, Sz0, C) -> + Sz = decr_list_size(Sz0, Object), + sort_cache(Cont, [Object | CL], Sz, C); +sort_cache(F, CL, Sz, C) -> + case F() of + Objects when is_list(Objects) -> + sort_cache(Objects, CL, Sz, C); + Term -> + {_LF, _FF, Post, _LocalPost, _TmpUsageM} = C, + post_funs(Post), + throw_error(Term) + end. + +sort_cache2([], CL, _X, {LF, _FF, Post, LocalPost, _TmpUsageM}) -> + {LF(lists:reverse(CL)), Post, LocalPost}; +sort_cache2([Object | Cont], CL, _, C) -> + sort_cache2(Cont, [Object | CL], true, C); +sort_cache2(F, CL, false, C) -> + %% Find one extra object to be sure that temporary file(s) will be + %% used when calling the file_sorter. This works even if + %% duplicates are removed. + case F() of + Objects when is_list(Objects) -> + sort_cache2(Objects, CL, true, C); + Term -> + {_LF, _FF, Post, _LocalPost, _TmpUsageM} = C, + post_funs(Post), + throw_error(Term) + end; +sort_cache2(_Cont, _CL, true, {_LF,_FF,Post,_LocalPost, {not_allowed,M}}) -> + post_funs(Post), + throw_reason({tmpdir_usage, M}); +sort_cache2(Cont, CL, true, {_LF, FF, _Post, _LocalPost, {TmpUsage, M}}) -> + maybe_error_logger(TmpUsage, M), + FF(lists:reverse(CL, Cont)). + +file_sort_handle(H, Kp, SortOptions, TmpDir, Compressed, Post, LocalPost) -> + In = sort_cursor_input(H, 0), + Unique = lists:member(unique, SortOptions) + orelse + lists:keymember(unique, 1, SortOptions), + Out = sort_cursor_list_output(TmpDir, Compressed, Unique), + Reply = do_sort(In, Out, Kp, SortOptions, Post), + case Reply of + {file, FileName} -> + {F, Fd} = open_file(FileName, Compressed, Post), + P = fun() -> _ = file:close(Fd), + _ = file:delete(FileName) + end, + {F, [P | Post], LocalPost}; + {terms, BTerms} -> + try + {[binary_to_term(B) || B <- BTerms], Post, LocalPost} + catch Class:Reason -> + post_funs(Post), + erlang:raise(Class, Reason, erlang:get_stacktrace()) + end + end. + +do_sort(In, Out, Sort, SortOptions, Post) -> + try + case do_sort(In, Out, Sort, SortOptions) of + {error, Reason} -> throw_reason(Reason); + Reply -> Reply + end + catch Class:Term -> + post_funs(Post), + erlang:raise(Class, Term, erlang:get_stacktrace()) + end. + +do_sort(In, Out, sort, SortOptions) -> + file_sorter:sort(In, Out, SortOptions); +do_sort(In, Out, {keysort, KeyPos}, SortOptions) -> + file_sorter:keysort(KeyPos, In, Out, SortOptions). + +del_table(Ets) -> + fun() -> true = ets:delete(Ets) end. + +empty_table(Ets) -> + fun() -> true = ets:delete_all_objects(Ets) end. + +append_loop([[_ | _]=L], _N) -> + L; +append_loop([F], _N) -> + F(); +append_loop([L | Hs], N) -> + append_loop(L, N, Hs). + +append_loop([], N, Hs) -> + append_loop(Hs, N); +append_loop([Object | Cont], N, Hs) -> + [Object | append_loop(Cont, N + 1, Hs)]; +append_loop(F, 0, Hs) -> + case F() of + [] -> + append_loop(Hs, 0); + [Object | Cont] -> + [Object | append_loop(Cont, 1, Hs)]; + Term -> + Term + end; +append_loop(F, _N, Hs) -> % when _N > 0 + fun() -> append_loop(F, 0, Hs) end. + +no_dups([]=Cont, UTab) -> + true = ets:delete_all_objects(UTab), + Cont; +no_dups([Object | Cont], UTab) -> + case ets:member(UTab, Object) of + false -> + true = ets:insert(UTab, {Object}), + %% A fun is created here, even if Cont is a list; objects + %% will not be copied to the ETS table unless requested. + [Object | fun() -> no_dups(Cont, UTab) end]; + true -> + no_dups(Cont, UTab) + end; +no_dups(F, UTab) -> + case F() of + Objects when is_list(Objects) -> + no_dups(Objects, UTab); + Term -> + Term + end. + +%% When all objects have been returned from a cached QLC, the +%% generators of the expression will never be called again, and so the +%% tables used by the generators (LocalPost) can be emptied. + +cache(H, MTab, LocalPost) -> + case ets:member(MTab, 0) of + false -> + true = ets:insert(MTab, {0}), + cache(H, MTab, 1, LocalPost); + true -> + cache_recall(MTab, 1) + end. + +cache([]=Cont, _MTab, _SeqNo, LocalPost) -> + local_post(LocalPost), + Cont; +cache([Object | Cont], MTab, SeqNo, LocalPost) -> + true = ets:insert(MTab, {SeqNo, Object}), + %% A fun is created here, even if Cont is a list; objects + %% will not be copied to the ETS table unless requested. + [Object | fun() -> cache(Cont, MTab, SeqNo + 1, LocalPost) end]; +cache(F, MTab, SeqNo, LocalPost) -> + case F() of + Objects when is_list(Objects) -> + cache(Objects, MTab, SeqNo, LocalPost); + Term -> + Term + end. + +cache_recall(MTab, SeqNo) -> + case ets:lookup(MTab, SeqNo) of + []=Cont -> + Cont; + [{SeqNo, Object}] -> + [Object | fun() -> cache_recall(MTab, SeqNo + 1) end] + end. + +ucache(H, UTab, MTab, LocalPost) -> + case ets:member(MTab, 0) of + false -> + true = ets:insert(MTab, {0}), + ucache(H, UTab, MTab, 1, LocalPost); + true -> + ucache_recall(UTab, MTab, 1) + end. + +ucache([]=Cont, _UTab, _MTab, _SeqNo, LocalPost) -> + local_post(LocalPost), + Cont; +ucache([Object | Cont], UTab, MTab, SeqNo, LocalPost) -> + %% Always using 28 bits hash value... + Hash = erlang:phash2(Object), + case ets:lookup(UTab, Hash) of + [] -> + ucache3(Object, Cont, Hash, UTab, MTab, SeqNo, LocalPost); + HashSeqObjects -> + case lists:keymember(Object, 3, HashSeqObjects) of + true -> + ucache(Cont, UTab, MTab, SeqNo, LocalPost); + false -> + ucache3(Object, Cont, Hash, UTab, MTab, SeqNo, LocalPost) + end + end; +ucache(F, UTab, MTab, SeqNo, LocalPost) -> + case F() of + Objects when is_list(Objects) -> + ucache(Objects, UTab, MTab, SeqNo, LocalPost); + Term -> + Term + end. + +ucache3(Object, Cont, Hash, UTab, MTab, SeqNo, LocalPost) -> + true = ets:insert(UTab, {Hash, SeqNo, Object}), + true = ets:insert(MTab, {SeqNo, Hash}), + %% A fun is created here, even if Cont is a list; objects + %% will not be copied to the ETS table unless requested. + [Object | fun() -> ucache(Cont, UTab, MTab, SeqNo+1, LocalPost) end]. + +ucache_recall(UTab, MTab, SeqNo) -> + case ets:lookup(MTab, SeqNo) of + []=Cont -> + Cont; + [{SeqNo, Hash}] -> + Object = case ets:lookup(UTab, Hash) of + [{Hash, SeqNo, Object0}] -> Object0; + HashSeqObjects -> + {value, {Hash, SeqNo, Object0}} = + lists:keysearch(SeqNo, 2, HashSeqObjects), + Object0 + end, + [Object | fun() -> ucache_recall(UTab, MTab, SeqNo + 1) end] + end. + +-define(LCACHE_FILE(Ref), {Ref, '$_qlc_cache_tmpfiles_'}). + +lcache(H, Ref, LocalPost, TmpDir, MaxList, TmpUsage) -> + Key = ?LCACHE_FILE(Ref), + case get(Key) of + undefined -> + lcache1(H, {Key, LocalPost, TmpDir, MaxList, TmpUsage}, + MaxList, []); + {file, _Fd, _TmpFile, F} -> + F(); + L when is_list(L) -> + L + end. + +lcache1([]=Cont, {Key, LocalPost, _TmpDir, _MaxList, _TmpUsage}, _Sz, Acc) -> + local_post(LocalPost), + case get(Key) of + undefined -> + put(Key, lists:reverse(Acc)), + Cont; + {file, Fd, TmpFile, _F} -> + case lcache_write(Fd, TmpFile, Acc) of + ok -> + Cont; + Error -> + Error + end + end; +lcache1(H, State, Sz, Acc) when Sz < 0 -> + {Key, LocalPost, TmpDir, MaxList, TmpUsage} = State, + GetFile = + case get(Key) of + {file, Fd0, TmpFile, _F} -> + {TmpFile, Fd0}; + undefined when TmpUsage =:= not_allowed -> + error({tmpdir_usage, caching}); + undefined -> + maybe_error_logger(TmpUsage, caching), + FName = tmp_filename(TmpDir), + {F, Fd0} = open_file(FName, [write], LocalPost), + put(Key, {file, Fd0, FName, F}), + {FName, Fd0} + end, + case GetFile of + {FileName, Fd} -> + case lcache_write(Fd, FileName, Acc) of + ok -> + lcache1(H, State, MaxList, []); + Error -> + Error + end; + Error -> + Error + end; +lcache1([Object | Cont], State, Sz0, Acc) -> + Sz = decr_list_size(Sz0, Object), + [Object | lcache2(Cont, State, Sz, [Object | Acc])]; +lcache1(F, State, Sz, Acc) -> + case F() of + Objects when is_list(Objects) -> + lcache1(Objects, State, Sz, Acc); + Term -> + Term + end. + +lcache2([Object | Cont], State, Sz0, Acc) when Sz0 >= 0 -> + Sz = decr_list_size(Sz0, Object), + [Object | lcache2(Cont, State, Sz, [Object | Acc])]; +lcache2(Cont, State, Sz, Acc) -> + fun() -> lcache1(Cont, State, Sz, Acc) end. + +lcache_write(Fd, FileName, L) -> + write_binary_terms(t2b(L, []), Fd, FileName). + +t2b([], Bs) -> + Bs; +t2b([T | Ts], Bs) -> + t2b(Ts, [term_to_binary(T) | Bs]). + +del_lcache(Ref) -> + fun() -> + Key = ?LCACHE_FILE(Ref), + case get(Key) of + undefined -> + ok; + {file, Fd, TmpFile, _F} -> + _ = file:close(Fd), + _ = file:delete(TmpFile), + erase(Key); + _L -> + erase(Key) + end + end. + +tag_objects([Object | Cont], T) -> + [{Object, T} | tag_objects2(Cont, T + 1)]; +tag_objects([]=Cont, _T) -> + Cont; +tag_objects(F, T) -> + case F() of + Objects when is_list(Objects) -> + tag_objects(Objects, T); + Term -> + Term + end. + +tag_objects2([Object | Cont], T) -> + [{Object, T} | tag_objects2(Cont, T + 1)]; +tag_objects2(Objects, T) -> + fun() -> tag_objects(Objects, T) end. + +untag_objects([]=Objs) -> + Objs; +untag_objects([{Object, _N} | Cont]) -> + [Object | untag_objects2(Cont)]; +untag_objects(F) -> + case F() of + Objects when is_list(Objects) -> + untag_objects(Objects); + Term -> % Cannot happen + Term + end. + +untag_objects2([{Object, _N} | Cont]) -> + [Object | untag_objects2(Cont)]; +untag_objects2([]=Cont) -> + Cont; +untag_objects2(Objects) -> + fun() -> untag_objects(Objects) end. + +%%% Merge join. +%%% Temporary files are used when many objects have the same key. + +-define(JWRAP(E1, E2), [E1 | E2]). + +-record(m, {id, tmpdir, max_list, tmp_usage}). + +merge_join([]=Cont, _C1, _T2, _C2, _Opt) -> + Cont; +merge_join([E1 | L1], C1, L2, C2, Opt) -> + #qlc_opt{tmpdir = TmpDir, max_list = MaxList, + tmpdir_usage = TmpUsage} = Opt, + M = #m{id = merge_join_id(), tmpdir = TmpDir, max_list = MaxList, + tmp_usage = TmpUsage}, + merge_join2(E1, element(C1, E1), L1, C1, L2, C2, M); +merge_join(F1, C1, L2, C2, Opt) -> + case F1() of + L1 when is_list(L1) -> + merge_join(L1, C1, L2, C2, Opt); + T1 -> + T1 + end. + +merge_join1(_E2, _K2, []=Cont, _C1, _L2, _C2, M) -> + end_merge_join(Cont, M); +merge_join1(E2, K2, [E1 | L1], C1, L2, C2, M) -> + K1 = element(C1, E1), + if + K1 == K2 -> + same_keys2(E1, K1, L1, C1, L2, C2, E2, M); + K1 > K2 -> + merge_join2(E1, K1, L1, C1, L2, C2, M); + true -> % K1 < K2 + merge_join1(E2, K2, L1, C1, L2, C2, M) + end; +merge_join1(E2, K2, F1, C1, L2, C2, M) -> + case F1() of + L1 when is_list(L1) -> + merge_join1(E2, K2, L1, C1, L2, C2, M); + T1 -> + T1 + end. + +merge_join2(_E1, _K1, _L1, _C1, []=Cont, _C2, M) -> + end_merge_join(Cont, M); +merge_join2(E1, K1, L1, C1, [E2 | L2], C2, M) -> + K2 = element(C2, E2), + if + K1 == K2 -> + same_keys2(E1, K1, L1, C1, L2, C2, E2, M); + K1 > K2 -> + merge_join2(E1, K1, L1, C1, L2, C2, M); + true -> % K1 < K2 + merge_join1(E2, K2, L1, C1, L2, C2, M) + end; +merge_join2(E1, K1, L1, C1, F2, C2, M) -> + case F2() of + L2 when is_list(L2) -> + merge_join2(E1, K1, L1, C1, L2, C2, M); + T2 -> + T2 + end. + +%% element(C2, E2_0) == K1 +same_keys2(E1, K1, L1, C1, [], _C2, E2_0, M) -> + Cont = fun(_L1b) -> end_merge_join([], M) end, + loop_same_keys(E1, K1, L1, C1, [E2_0], Cont, M); +same_keys2(E1, K1, L1, C1, [E2 | L2]=L2_0, C2, E2_0, M) -> + K2 = element(C2, E2), + if + K1 == K2 -> + same_keys1(E1, K1, L1, C1, E2, C2, E2_0, L2, M); + K1 < K2 -> + [?JWRAP(E1, E2_0) | + fun() -> same_loop1(L1, K1, C1, E2_0, L2_0, C2, M) end] + end; +same_keys2(E1, K1, L1, C1, F2, C2, E2_0, M) -> + case F2() of + L2 when is_list(L2) -> + same_keys2(E1, K1, L1, C1, L2, C2, E2_0, M); + T2 -> + Cont = fun(_L1b) -> T2 end, + loop_same_keys(E1, K1, L1, C1, [E2_0], Cont, M) + end. + +same_loop1([], _K1_0, _C1, _E2_0, _L2, _C2, M) -> + end_merge_join([], M); +same_loop1([E1 | L1], K1_0, C1, E2_0, L2, C2, M) -> + K1 = element(C1, E1), + if + K1 == K1_0 -> + [?JWRAP(E1, E2_0) | + fun() -> same_loop1(L1, K1_0, C1, E2_0, L2, C2, M) end]; + K1_0 < K1 -> + merge_join2(E1, K1, L1, C1, L2, C2, M) + end; +same_loop1(F1, K1_0, C1, E2_0, L2, C2, M) -> + case F1() of + L1 when is_list(L1) -> + same_loop1(L1, K1_0, C1, E2_0, L2, C2, M); + T1 -> + T1 + end. + +%% element(C2, E2_0) == K1, element(C2, E2) == K1_0 +same_keys1(E1_0, K1_0, []=L1, C1, E2, C2, E2_0, L2, M) -> + [?JWRAP(E1_0, E2_0), ?JWRAP(E1_0, E2) | + fun() -> same_keys(K1_0, E1_0, L1, C1, L2, C2, M) end]; +same_keys1(E1_0, K1_0, [E1 | _]=L1, C1, E2, C2, E2_0, L2, M) -> + K1 = element(C1, E1), + if + K1_0 == K1 -> + E2s = [E2, E2_0], + Sz0 = decr_list_size(M#m.max_list, E2s), + same_keys_cache(E1_0, K1_0, L1, C1, L2, C2, E2s, Sz0, M); + K1_0 < K1 -> + [?JWRAP(E1_0, E2_0), ?JWRAP(E1_0, E2) | + fun() -> same_keys(K1_0, E1_0, L1, C1, L2, C2, M) end] + end; +same_keys1(E1_0, K1_0, F1, C1, E2, C2, E2_0, L2, M) -> + case F1() of + L1 when is_list(L1) -> + same_keys1(E1_0, K1_0, L1, C1, E2, C2, E2_0, L2, M); + T1 -> + Cont = fun() -> T1 end, + loop_same(E1_0, [E2, E2_0], Cont) + end. + +%% There is no such element E in L1 such that element(C1, E) == K1. +same_keys(_K1, _E1, _L1, _C1, []=Cont, _C2, M) -> + end_merge_join(Cont, M); +same_keys(K1, E1, L1, C1, [E2 | L2], C2, M) -> + K2 = element(C2, E2), + if + K1 == K2 -> + [?JWRAP(E1, E2) | + fun() -> same_keys(K1, E1, L1, C1, L2, C2, M) end]; + K1 < K2 -> + merge_join1(E2, K2, L1, C1, L2, C2, M) + end; +same_keys(K1, E1, L1, C1, F2, C2, M) -> + case F2() of + L2 when is_list(L2) -> + same_keys(K1, E1, L1, C1, L2, C2, M); + T2 -> + T2 + end. + +%% There are at least two elements in [E1 | L1] that are to be combined +%% with the elements in E2s (length(E2s) > 1). This loop covers the case +%% when all elements in E2 with key K1 can be kept in RAM. +same_keys_cache(E1, K1, L1, C1, [], _C2, E2s, _Sz, M) -> + Cont = fun(_L1b) -> end_merge_join([], M) end, + loop_same_keys(E1, K1, L1, C1, E2s, Cont, M); +same_keys_cache(E1, K1, L1, C1, L2, C2, E2s, Sz0, M) when Sz0 < 0 -> + case init_merge_join(M) of + ok -> + Sz = M#m.max_list, + C = fun() -> + same_keys_file(E1, K1, L1, C1, L2, C2, [], Sz, M) + end, + write_same_keys(E1, E2s, M, C); + Error -> + Error + end; +same_keys_cache(E1, K1, L1, C1, [E2 | L2], C2, E2s, Sz0, M) -> + K2 = element(C2, E2), + if + K1 == K2 -> + Sz = decr_list_size(Sz0, E2), + same_keys_cache(E1, K1, L1, C1, L2, C2, [E2 | E2s], Sz, M); + K1 < K2 -> + Cont = fun(L1b) -> merge_join1(E2, K2, L1b, C1, L2, C2, M) end, + loop_same_keys(E1, K1, L1, C1, E2s, Cont, M) + end; +same_keys_cache(E1, K1, L1, C1, F2, C2, E2s, Sz, M) -> + case F2() of + L2 when is_list(L2) -> + same_keys_cache(E1, K1, L1, C1, L2, C2, E2s, Sz, M); + T2 -> + Cont = fun(_L1b) -> T2 end, + loop_same_keys(E1, K1, L1, C1, E2s, Cont, M) + end. + +%% E2s holds all elements E2 in L2 such that element(E2, C2) == K1. +loop_same_keys(E1, _K1, [], _C1, E2s, _Cont, M) -> + end_merge_join(loop_same(E1, E2s, []), M); +loop_same_keys(E1, K1, L1, C1, E2s, Cont, M) -> + loop_same(E1, E2s, fun() -> loop_keys(K1, L1, C1, E2s, Cont, M) end). + +loop_same(_E1, [], L) -> + L; +loop_same(E1, [E2 | E2s], L) -> + loop_same(E1, E2s, [?JWRAP(E1, E2) | L]). + +loop_keys(K, [E1 | L1]=L1_0, C1, E2s, Cont, M) -> + K1 = element(C1, E1), + if + K1 == K -> + loop_same_keys(E1, K1, L1, C1, E2s, Cont, M); + K1 > K -> + Cont(L1_0) + end; +loop_keys(_K, []=L1, _C1, _Es2, Cont, _M) -> + Cont(L1); +loop_keys(K, F1, C1, E2s, Cont, M) -> + case F1() of + L1 when is_list(L1) -> + loop_keys(K, L1, C1, E2s, Cont, M); + T1 -> + T1 + end. + +%% This is for the case when a temporary file has to be used. +same_keys_file(E1, K1, L1, C1, [], _C2, E2s, _Sz, M) -> + Cont = fun(_L1b) -> end_merge_join([], M) end, + same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont); +same_keys_file(E1, K1, L1, C1, L2, C2, E2s, Sz0, M) when Sz0 < 0 -> + Sz = M#m.max_list, + C = fun() -> same_keys_file(E1, K1, L1, C1, L2, C2, [], Sz, M) end, + write_same_keys(E1, E2s, M, C); +same_keys_file(E1, K1, L1, C1, [E2 | L2], C2, E2s, Sz0, M) -> + K2 = element(C2, E2), + if + K1 == K2 -> + Sz = decr_list_size(Sz0, E2), + same_keys_file(E1, K1, L1, C1, L2, C2, [E2 | E2s], Sz, M); + K1 < K2 -> + Cont = fun(L1b) -> + %% The temporary file could be truncated here. + merge_join1(E2, K2, L1b, C1, L2, C2, M) + end, + same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont) + end; +same_keys_file(E1, K1, L1, C1, F2, C2, E2s, Sz, M) -> + case F2() of + L2 when is_list(L2) -> + same_keys_file(E1, K1, L1, C1, L2, C2, E2s, Sz, M); + T2 -> + Cont = fun(_L1b) -> T2 end, + same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont) + end. + +same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont) -> + C = fun() -> loop_keys_file(K1, L1, C1, Cont, M) end, + write_same_keys(E1, E2s, M, C). + +write_same_keys(_E1, [], _M, Cont) -> + Cont(); +write_same_keys(E1, Es2, M, Cont) -> + write_same_keys(E1, Es2, M, [], Cont). + +%% Avoids one (the first) traversal of the temporary file. +write_same_keys(_E1, [], M, E2s, Objs) -> + case write_merge_join(M, E2s) of + ok -> Objs; + Error -> Error + end; +write_same_keys(E1, [E2 | E2s0], M, E2s, Objs) -> + BE2 = term_to_binary(E2), + write_same_keys(E1, E2s0, M, [BE2 | E2s], [?JWRAP(E1, E2) | Objs]). + +loop_keys_file(K, [E1 | L1]=L1_0, C1, Cont, M) -> + K1 = element(C1, E1), + if + K1 == K -> + C = fun() -> loop_keys_file(K1, L1, C1, Cont, M) end, + read_merge_join(M, E1, C); + K1 > K -> + Cont(L1_0) + end; +loop_keys_file(_K, []=L1, _C1, Cont, _M) -> + Cont(L1); +loop_keys_file(K, F1, C1, Cont, M) -> + case F1() of + L1 when is_list(L1) -> + loop_keys_file(K, L1, C1, Cont, M); + T1 -> + T1 + end. + +end_merge_join(Reply, M) -> + end_merge_join(M), + Reply. + +%% Normally post_funs() cleans up temporary files by calling funs in +%% Post. It seems impossible to do that with the temporary file(s) +%% used when many objects have the same key--such a file is created +%% after the setup when Post is prepared. There seems to be no real +%% alternative to using the process dictionary, at least as things +%% have been implemented so far. Probably all of Post could have been +%% put in the process dictionary... + +-define(MERGE_JOIN_FILE, '$_qlc_merge_join_tmpfiles_'). + +init_merge_join(#m{id = MergeId, tmpdir = TmpDir, tmp_usage = TmpUsage}) -> + case tmp_merge_file(MergeId) of + {Fd, FileName} -> + case file:position(Fd, bof) of + {ok, 0} -> + case file:truncate(Fd) of + ok -> + ok; + Error -> + file_error(FileName, Error) + end; + Error -> + file_error(FileName, Error) + end; + none when TmpUsage =:= not_allowed -> + error({tmpdir_usage, joining}); + none -> + maybe_error_logger(TmpUsage, joining), + FName = tmp_filename(TmpDir), + case file:open(FName, [raw, binary, read, write]) of + {ok, Fd} -> + TmpFiles = get(?MERGE_JOIN_FILE), + put(?MERGE_JOIN_FILE, [{MergeId, Fd, FName} | TmpFiles]), + ok; + Error -> + file_error(FName, Error) + end + end. + +write_merge_join(#m{id = MergeId}, BTerms) -> + {Fd, FileName} = tmp_merge_file(MergeId), + write_binary_terms(BTerms, Fd, FileName). + +read_merge_join(#m{id = MergeId}, E1, Cont) -> + {Fd, FileName} = tmp_merge_file(MergeId), + case file:position(Fd, bof) of + {ok, 0} -> + Fun = fun([], _) -> + Cont(); + (Ts, C) when is_list(Ts) -> + join_read_terms(E1, Ts, C) + end, + file_loop_read(<<>>, ?CHUNK_SIZE, {Fd, FileName}, Fun); + Error -> + file_error(FileName, Error) + end. + +join_read_terms(_E1, [], Objs) -> + Objs; +join_read_terms(E1, [E2 | E2s], Objs) -> + join_read_terms(E1, E2s, [?JWRAP(E1, E2) | Objs]). + +end_merge_join(#m{id = MergeId}) -> + case tmp_merge_file(MergeId) of + none -> + ok; + {Fd, FileName} -> + _ = file:close(Fd), + _ = file:delete(FileName), + put(?MERGE_JOIN_FILE, + lists:keydelete(MergeId, 1, get(?MERGE_JOIN_FILE))) + end. + +end_all_merge_joins() -> + lists:foreach( + fun(Id) -> end_merge_join(#m{id = Id}) end, + [Id || {Id, _Fd, _FileName} <- lists:flatten([get(?MERGE_JOIN_FILE)])]), + erase(?MERGE_JOIN_FILE). + +merge_join_id() -> + case get(?MERGE_JOIN_FILE) of + undefined -> + put(?MERGE_JOIN_FILE, []); + _ -> + ok + end, + make_ref(). + +tmp_merge_file(MergeId) -> + TmpFiles = get(?MERGE_JOIN_FILE), + case lists:keysearch(MergeId, 1, TmpFiles) of + {value, {MergeId, Fd, FileName}} -> + {Fd, FileName}; + false -> + none + end. + +decr_list_size(Sz0, E) when is_integer(Sz0) -> + Sz0 - erlang:external_size(E). + +%%% End of merge join. + +lookup_join([E1 | L1], C1, LuF, C2, Rev) -> + K1 = element(C1, E1), + case LuF(C2, [K1]) of + [] -> + lookup_join(L1, C1, LuF, C2, Rev); + [E2] when Rev -> + [?JWRAP(E2, E1) | fun() -> lookup_join(L1, C1, LuF, C2, Rev) end]; + [E2] -> + [?JWRAP(E1, E2) | fun() -> lookup_join(L1, C1, LuF, C2, Rev) end]; + E2s when is_list(E2s), Rev -> + [?JWRAP(E2, E1) || E2 <- E2s] ++ + fun() -> lookup_join(L1, C1, LuF, C2, Rev) end; + E2s when is_list(E2s) -> + [?JWRAP(E1, E2) || E2 <- E2s] ++ + fun() -> lookup_join(L1, C1, LuF, C2, Rev) end; + Term -> + Term + end; +lookup_join([]=Cont, _C1, _LuF, _C2, _Rev) -> + Cont; +lookup_join(F1, C1, LuF, C2, Rev) -> + case F1() of + L1 when is_list(L1) -> + lookup_join(L1, C1, LuF, C2, Rev); + T1 -> + T1 + end. + +maybe_error_logger(allowed, _) -> + ok; +maybe_error_logger(Name, Why) -> + [_, _, {?MODULE,maybe_error_logger,_} | Stacktrace] = expand_stacktrace(), + Trimmer = fun(M, _F, _A) -> M =:= erl_eval end, + Formater = fun(Term, I) -> io_lib:print(Term, I, 80, -1) end, + X = lib:format_stacktrace(1, Stacktrace, Trimmer, Formater), + error_logger:Name("qlc: temporary file was needed for ~w\n~s\n", + [Why, lists:flatten(X)]). + +expand_stacktrace() -> + D = erlang:system_flag(backtrace_depth, 8), + try + %% Compensate for a bug in erlang:system_flag/2: + expand_stacktrace(erlang:max(1, D)) + after + erlang:system_flag(backtrace_depth, D) + end. + +expand_stacktrace(D) -> + _ = erlang:system_flag(backtrace_depth, D), + {'EXIT', {foo, Stacktrace}} = (catch erlang:error(foo)), + L = lists:takewhile(fun({M,_,_}) -> M =/= ?MODULE + end, lists:reverse(Stacktrace)), + if + length(L) < 3 andalso length(Stacktrace) =:= D -> + expand_stacktrace(D + 5); + true -> + Stacktrace + end. + +write_binary_terms(BTerms, Fd, FileName) -> + case file:write(Fd, size_bin(BTerms, [])) of + ok -> + ok; + Error -> + file_error(FileName, Error) + end. + +post_funs(L) -> + end_all_merge_joins(), + local_post(L). + +local_post(L) -> + lists:foreach(fun(undefined) -> ok; + (F) -> catch (F)() + end, L). + +call(undefined, _Arg, Default, _Post) -> + Default; +call(Fun, Arg, _Default, Post) -> + try + Fun(Arg) + catch Class:Reason -> + post_funs(Post), + erlang:raise(Class, Reason, erlang:get_stacktrace()) + end. + +grd(undefined, _Arg) -> + false; +grd(Fun, Arg) -> + case Fun(Arg) of + true -> + true; + _ -> + false + end. + +family(L) -> + sofs:to_external(sofs:relation_to_family(sofs:relation(L))). + +family_union(L) -> + R = sofs:relation(L,[{atom,[atom]}]), + sofs:to_external(sofs:family_union(sofs:relation_to_family(R))). + +file_error(File, {error, Reason}) -> + error({file_error, File, Reason}). + +-spec throw_file_error(string(), {'error',atom()}) -> no_return(). + +throw_file_error(File, {error, Reason}) -> + throw_reason({file_error, File, Reason}). + +-spec throw_reason(term()) -> no_return(). + +throw_reason(Reason) -> + throw_error(error(Reason)). + +-spec throw_error(term()) -> no_return(). + +throw_error(Error) -> + throw(Error). + +error(Reason) -> + {error, ?MODULE, Reason}. |