%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2004-2009. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(qlc).
%%% Purpose: Main API module qlc. Functions for evaluation.
%%% Other files:
%%% qlc_pt. Implements the parse transform.
%% External exports
-export([parse_transform/2, transform_from_evaluator/2]).
-export([q/1, q/2]).
-export([eval/1, e/1, eval/2, e/2, fold/3, fold/4]).
-export([cursor/1, cursor/2,
next_answers/1, next_answers/2,
delete_cursor/1]).
-export([append/1, append/2]).
-export([sort/1, sort/2, keysort/2, keysort/3]).
-export([table/2]).
-export([info/1, info/2]).
-export([string_to_handle/1, string_to_handle/2, string_to_handle/3]).
-export([format_error/1]).
%% Exported to qlc_pt.erl only:
-export([template_state/0, aux_name/3, name_suffix/2, vars/1,
var_ufold/2, var_fold/3, all_selections/1]).
%% When cache=list lists bigger than ?MAX_LIST_SIZE bytes are put on
%% file. Also used when merge join finds big equivalence classes.
-define(MAX_LIST_SIZE, 512*1024).
-record(qlc_append, % qlc:append/1,2
{hl
}).
-record(qlc_table, % qlc:table/2
{trav_fun, % traverse fun
trav_MS, % bool(); true iff traverse fun takes a match spec
pre_fun,
post_fun,
info_fun,
format_fun,
lookup_fun,
parent_fun,
key_equality, % '==' | '=:=' | undefined (--R12B-5)
lu_vals, % undefined | {Position,Values}; values to be looked up
ms = no_match_spec
% match specification; [T || P <- Tab, Fs]
}).
-record(qlc_sort, % qlc:sort/1,2 and qlc:keysort/2,3
{h,
keypos, % sort | {keysort, KeyPos}
unique,
compressed, % [] | [compressed]
order,
fs_opts, % file_sorter options
tmpdir_usage = allowed, % allowed | not_allowed
% | warning_msg | error_msg | info_msg
tmpdir
}).
%% Also in qlc_pt.erl.
-record(qlc_lc, % qlc:q/1,2
{lc,
opt % #qlc_opt
}).
-record(qlc_list, % a prepared list
{l,
ms = no_match_spec
}).
-record(qlc_join, % a prepared join
{kind, % {merge, KeyEquality} |
% {lookup, KeyEquality, LookupFun}
opt, % #qlc_opt from q/2.
h1, q1, c1, % to be traversed by "lookup join"
h2, q2, c2 % to be looked up by "lookup join"
}).
%%% A query cursor is a tuple {qlc_cursor, Cursor} where Cursor is a pair
%%% {CursorPid, OwnerPid}.
-record(qlc_cursor, {c}).
-record(qlc_opt,
{unique = false, % bool()
cache = false, % bool() | list (true~ets, false~no)
max_lookup = -1, % int() >= 0 | -1 (represents infinity)
join = any, % any | nested_loop | merge | lookup
tmpdir = "", % global tmpdir
lookup = any, % any | bool()
max_list = ?MAX_LIST_SIZE, % int() >= 0
tmpdir_usage = allowed % allowed | not_allowed
% | warning_msg | error_msg | info_msg
}).
-record(setup, {parent}).
-define(THROWN_ERROR, {?MODULE, throw_error, _}).
%%% A query handle is a tuple {qlc_handle, Handle} where Handle is one
%%% of #qlc_append, #qlc_table, #qlc_sort, and #qlc_lc.
-record(qlc_handle, {h}).
get_handle(#qlc_handle{h = #qlc_lc{opt = {qlc_opt, U, C, M}}=H}) ->
%% R11B-0.
H#qlc_lc{opt = #qlc_opt{unique = U, cache = C, max_lookup = M}};
get_handle(#qlc_handle{h = H}) ->
H;
get_handle(L) when is_list(L) ->
L;
get_handle(_) ->
badarg.
%%%
%%% Exported functions
%%%
append(QHs) ->
Hs = [case get_handle(QH) of
badarg -> erlang:error(badarg, [QHs]);
H -> H
end || QH <- QHs],
#qlc_handle{h = #qlc_append{hl = Hs}}.
append(QH1, QH2) ->
Hs = [case get_handle(QH) of
badarg -> erlang:error(badarg, [QH1, QH2]);
H -> H
end || QH <- [QH1, QH2]],
#qlc_handle{h = #qlc_append{hl = Hs}}.
cursor(QH) ->
cursor(QH, []).
cursor(QH, Options) ->
case {options(Options, [unique_all, cache_all, tmpdir,
spawn_options, max_list_size,
tmpdir_usage]),
get_handle(QH)} of
{B1, B2} when B1 =:= badarg; B2 =:= badarg ->
erlang:error(badarg, [QH, Options]);
{[GUnique, GCache, TmpDir, SpawnOptions0, MaxList, TmpUsage], H} ->
SpawnOptions = spawn_options(SpawnOptions0),
case cursor_process(H, GUnique, GCache, TmpDir,
SpawnOptions, MaxList, TmpUsage) of
Pid when is_pid(Pid) ->
#qlc_cursor{c = {Pid, self()}};
Error ->
Error
end
end.
delete_cursor(#qlc_cursor{c = {_, Owner}}=C) when Owner =/= self() ->
erlang:error(not_cursor_owner, [C]);
delete_cursor(#qlc_cursor{c = {Pid, _}}) ->
stop_cursor(Pid);
delete_cursor(T) ->
erlang:error(badarg, [T]).
e(QH) ->
eval(QH, []).
e(QH, Options) ->
eval(QH, Options).
eval(QH) ->
eval(QH, []).
eval(QH, Options) ->
case {options(Options, [unique_all, cache_all, tmpdir, max_list_size,
tmpdir_usage]),
get_handle(QH)} of
{B1, B2} when B1 =:= badarg; B2 =:= badarg ->
erlang:error(badarg, [QH, Options]);
{[GUnique, GCache, TmpDir, MaxList, TmpUsage], Handle} ->
try
Prep = prepare_qlc(Handle, [], GUnique, GCache,
TmpDir, MaxList, TmpUsage),
case setup_qlc(Prep, #setup{parent = self()}) of
{L, Post, _LocalPost} when is_list(L) ->
post_funs(Post),
L;
{Objs, Post, _LocalPost} when is_function(Objs) ->
try
collect(Objs)
after
post_funs(Post)
end
end
catch Term ->
case erlang:get_stacktrace() of
[?THROWN_ERROR | _] ->
Term;
Stacktrace ->
erlang:raise(throw, Term, Stacktrace)
end
end
end.
fold(Fun, Acc0, QH) ->
fold(Fun, Acc0, QH, []).
fold(Fun, Acc0, QH, Options) ->
case {options(Options, [unique_all, cache_all, tmpdir, max_list_size,
tmpdir_usage]),
get_handle(QH)} of
{B1, B2} when B1 =:= badarg; B2 =:= badarg ->
erlang:error(badarg, [Fun, Acc0, QH, Options]);
{[GUnique, GCache, TmpDir, MaxList, TmpUsage], Handle} ->
try
Prep = prepare_qlc(Handle, not_a_list, GUnique, GCache,
TmpDir, MaxList, TmpUsage),
case setup_qlc(Prep, #setup{parent = self()}) of
{Objs, Post, _LocalPost} when is_function(Objs);
is_list(Objs) ->
try
fold_loop(Fun, Objs, Acc0)
after
post_funs(Post)
end
end
catch Term ->
case erlang:get_stacktrace() of
[?THROWN_ERROR | _] ->
Term;
Stacktrace ->
erlang:raise(throw, Term, Stacktrace)
end
end
end.
format_error(not_a_query_list_comprehension) ->
io_lib:format("argument is not a query list comprehension", []);
format_error({used_generator_variable, V}) ->
io_lib:format("generated variable ~w must not be used in list expression",
[V]);
format_error(binary_generator) ->
io_lib:format("cannot handle binary generators", []);
format_error(too_complex_join) ->
io_lib:format("cannot handle join of three or more generators efficiently",
[]);
format_error(too_many_joins) ->
io_lib:format("cannot handle more than one join efficiently", []);
format_error(nomatch_pattern) ->
io_lib:format("pattern cannot possibly match", []);
format_error(nomatch_filter) ->
io_lib:format("filter evaluates to 'false'", []);
format_error({Line, Mod, Reason}) when is_integer(Line) ->
io_lib:format("~p: ~s~n",
[Line, lists:flatten(Mod:format_error(Reason))]);
%% file_sorter errors
format_error({bad_object, FileName}) ->
io_lib:format("the temporary file \"~s\" holding answers is corrupt",
[FileName]);
format_error(bad_object) ->
io_lib:format("the keys could not be extracted from some term", []);
format_error({file_error, FileName, Reason}) ->
io_lib:format("\"~s\": ~p~n",[FileName, file:format_error(Reason)]);
format_error({premature_eof, FileName}) ->
io_lib:format("\"~s\": end-of-file was encountered inside some binary term",
[FileName]);
format_error({tmpdir_usage, Why}) ->
io_lib:format("temporary file was needed for ~w~n", [Why]);
format_error({error, Module, Reason}) ->
Module:format_error(Reason);
format_error(E) ->
io_lib:format("~p~n", [E]).
info(QH) ->
info(QH, []).
info(QH, Options) ->
case {options(Options, [unique_all, cache_all, flat, format, n_elements,
depth, tmpdir, max_list_size, tmpdir_usage]),
get_handle(QH)} of
{B1, B2} when B1 =:= badarg; B2 =:= badarg ->
erlang:error(badarg, [QH, Options]);
{[GUnique, GCache, Flat, Format, NElements,
Depth, TmpDir, MaxList, TmpUsage],
H} ->
try
Prep = prepare_qlc(H, [], GUnique, GCache,
TmpDir, MaxList, TmpUsage),
Info = le_info(Prep, {NElements,Depth}),
AbstractCode = abstract(Info, Flat, NElements, Depth),
case Format of
abstract_code ->
abstract_code(AbstractCode);
string ->
Hook = fun({special, _Line, String}, _I, _P, _F) ->
String
end,
lists:flatten(erl_pp:expr(AbstractCode, 0, Hook));
debug -> % Not documented. Intended for testing only.
Info
end
catch Term ->
case erlang:get_stacktrace() of
[?THROWN_ERROR | _] ->
Term;
Stacktrace ->
erlang:raise(throw, Term, Stacktrace)
end
end
end.
keysort(KeyPos, QH) ->
keysort(KeyPos, QH, []).
keysort(KeyPos, QH, Options) ->
case {is_keypos(KeyPos),
options(Options, [tmpdir, order, unique, compressed,
size, no_files]),
get_handle(QH)} of
{true, [TmpDir, Order, Unique,Compressed | _], H} when H =/= badarg ->
#qlc_handle{h = #qlc_sort{h = H, keypos = {keysort,KeyPos},
unique = Unique,
compressed = Compressed,
order = Order,
fs_opts = listify(Options),
tmpdir = TmpDir}};
_ ->
erlang:error(badarg, [KeyPos, QH, Options])
end.
-define(DEFAULT_NUM_OF_ANSWERS, 10).
next_answers(C) ->
next_answers(C, ?DEFAULT_NUM_OF_ANSWERS).
next_answers(#qlc_cursor{c = {_, Owner}}=C,
NumOfAnswers) when Owner =/= self() ->
erlang:error(not_cursor_owner, [C, NumOfAnswers]);
next_answers(#qlc_cursor{c = {Pid, _}}=C, NumOfAnswers) ->
N = case NumOfAnswers of
all_remaining -> -1;
_ when is_integer(NumOfAnswers), NumOfAnswers > 0 -> NumOfAnswers;
_ -> erlang:error(badarg, [C, NumOfAnswers])
end,
next_loop(Pid, [], N);
next_answers(T1, T2) ->
erlang:error(badarg, [T1, T2]).
parse_transform(Forms, Options) ->
qlc_pt:parse_transform(Forms, Options).
%% The funcspecs qlc:q/1 and qlc:q/2 are known by erl_eval.erl and
%% erl_lint.erl.
q(QLC_lc) ->
q(QLC_lc, []).
q(#qlc_lc{}=QLC_lc, Options) ->
case options(Options, [unique, cache, max_lookup, join, lookup]) of
[Unique, Cache, Max, Join, Lookup] ->
Opt = #qlc_opt{unique = Unique, cache = Cache,
max_lookup = Max, join = Join, lookup = Lookup},
#qlc_handle{h = QLC_lc#qlc_lc{opt = Opt}};
_ ->
erlang:error(badarg, [QLC_lc, Options])
end;
q(T1, T2) ->
erlang:error(badarg, [T1, T2]).
sort(QH) ->
sort(QH, []).
sort(QH, Options) ->
case {options(Options, [tmpdir, order, unique, compressed,
size, no_files]), get_handle(QH)} of
{B1, B2} when B1 =:= badarg; B2 =:= badarg ->
erlang:error(badarg, [QH, Options]);
{[TD, Order, Unique, Compressed | _], H} ->
#qlc_handle{h = #qlc_sort{h = H, keypos = sort, unique = Unique,
compressed = Compressed, order = Order,
fs_opts = listify(Options),
tmpdir = TD}}
end.
%% Note that the generated code is evaluated by (the slow) erl_eval.
string_to_handle(Str) ->
string_to_handle(Str, []).
string_to_handle(Str, Options) ->
string_to_handle(Str, Options, []).
string_to_handle(Str, Options, Bindings) when is_list(Str),
is_list(Bindings) ->
case options(Options, [unique, cache, max_lookup, join, lookup]) of
badarg ->
erlang:error(badarg, [Str, Options, Bindings]);
[Unique, Cache, MaxLookup, Join, Lookup] ->
case erl_scan:string(Str) of
{ok, Tokens, _} ->
case erl_parse:parse_exprs(Tokens) of
{ok, [Expr]} ->
case qlc_pt:transform_expression(Expr, Bindings) of
{ok, {call, _, _QlcQ, Handle}} ->
{value, QLC_lc, _} =
erl_eval:exprs(Handle, Bindings),
O = #qlc_opt{unique = Unique,
cache = Cache,
max_lookup = MaxLookup,
join = Join,
lookup = Lookup},
#qlc_handle{h = QLC_lc#qlc_lc{opt = O}};
{not_ok, [{error, Error} | _]} ->
error(Error)
end;
{ok, _ExprList} ->
erlang:error(badarg, [Str, Options, Bindings]);
{error, ErrorInfo} ->
error(ErrorInfo)
end;
{error, ErrorInfo, _EndLine} ->
error(ErrorInfo)
end
end;
string_to_handle(T1, T2, T3) ->
erlang:error(badarg, [T1, T2, T3]).
table(TraverseFun, Options) when is_function(TraverseFun) ->
case {is_function(TraverseFun, 0),
IsFun1 = is_function(TraverseFun, 1)} of
{false, false} ->
erlang:error(badarg, [TraverseFun, Options]);
_ ->
case options(Options, [pre_fun, post_fun, info_fun, format_fun,
lookup_fun, parent_fun, key_equality]) of
[PreFun, PostFun, InfoFun, FormatFun, LookupFun, ParentFun,
KeyEquality] ->
T = #qlc_table{trav_fun = TraverseFun, pre_fun = PreFun,
post_fun = PostFun, info_fun = InfoFun,
parent_fun = ParentFun,
trav_MS = IsFun1,
format_fun = FormatFun,
lookup_fun = LookupFun,
key_equality = KeyEquality},
#qlc_handle{h = T};
badarg ->
erlang:error(badarg, [TraverseFun, Options])
end
end;
table(T1, T2) ->
erlang:error(badarg, [T1, T2]).
transform_from_evaluator(LC, Bs0) ->
qlc_pt:transform_from_evaluator(LC, Bs0).
-define(TEMPLATE_STATE, 1).
template_state() ->
?TEMPLATE_STATE.
aux_name(Name, N, AllNames) ->
{VN, _} = aux_name1(Name, N, AllNames),
VN.
name_suffix(A, Suff) ->
list_to_atom(lists:concat([A, Suff])).
vars(E) ->
var_ufold(fun({var,_L,V}) -> V end, E).
var_ufold(F, E) ->
ordsets:from_list(var_fold(F, [], E)).
all_selections([]) ->
[[]];
all_selections([{I,Cs} | ICs]) ->
[[{I,C} | L] || C <- Cs, L <- all_selections(ICs)].
%%%
%%% Local functions
%%%
aux_name1(Name, N, AllNames) ->
SN = name_suffix(Name, N),
case sets:is_element(SN, AllNames) of
true -> aux_name1(Name, N + 1, AllNames);
false -> {SN, N}
end.
var_fold(F, A, {var,_,V}=Var) when V =/= '_' ->
[F(Var) | A];
var_fold(F, A, T) when is_tuple(T) ->
var_fold(F, A, tuple_to_list(T));
var_fold(F, A, [E | Es]) ->
var_fold(F, var_fold(F, A, E), Es);
var_fold(_F, A, _T) ->
A.
options(Options, Keys) when is_list(Options) ->
options(Options, Keys, []);
options(Option, Keys) ->
options([Option], Keys, []).
options(Options0, [Key | Keys], L) when is_list(Options0) ->
Options = case lists:member(Key, Options0) of
true ->
[atom_option(Key) | lists:delete(Key, Options0)];
false ->
Options0
end,
V = case lists:keysearch(Key, 1, Options) of
{value, {format_fun, U=undefined}} ->
{ok, U};
{value, {info_fun, U=undefined}} ->
{ok, U};
{value, {lookup_fun, U=undefined}} ->
{ok, U};
{value, {parent_fun, U=undefined}} ->
{ok, U};
{value, {post_fun, U=undefined}} ->
{ok, U};
{value, {pre_fun, U=undefined}} ->
{ok, U};
{value, {info_fun, Fun}} when is_function(Fun),
is_function(Fun, 1) ->
{ok, Fun};
{value, {pre_fun, Fun}} when is_function(Fun),
is_function(Fun, 1) ->
{ok, Fun};
{value, {post_fun, Fun}} when is_function(Fun),
is_function(Fun, 0) ->
{ok, Fun};
{value, {lookup_fun, Fun}} when is_function(Fun),
is_function(Fun, 2) ->
{ok, Fun};
{value, {max_lookup, Max}} when is_integer(Max), Max >= 0 ->
{ok, Max};
{value, {max_lookup, infinity}} ->
{ok, -1};
{value, {format_fun, Fun}} when is_function(Fun),
is_function(Fun, 1) ->
{ok, Fun};
{value, {parent_fun, Fun}} when is_function(Fun),
is_function(Fun, 0) ->
{ok, Fun};
{value, {key_equality, KE='=='}}->
{ok, KE};
{value, {key_equality, KE='=:='}}->
{ok, KE};
{value, {join, J=any}} ->
{ok, J};
{value, {join, J=nested_loop}} ->
{ok, J};
{value, {join, J=merge}} ->
{ok, J};
{value, {join, J=lookup}} ->
{ok, J};
{value, {lookup, LookUp}} when LookUp;
not LookUp;
LookUp =:= any ->
{ok, LookUp};
{value, {max_list_size, Max}} when is_integer(Max), Max >= 0 ->
{ok, Max};
{value, {tmpdir_usage, TmpUsage}} when TmpUsage =:= allowed;
TmpUsage =:= not_allowed;
TmpUsage =:= info_msg;
TmpUsage =:= warning_msg;
TmpUsage =:= error_msg ->
{ok, TmpUsage};
{value, {unique, Unique}} when Unique; not Unique ->
{ok, Unique};
{value, {cache, Cache}} when Cache; not Cache; Cache =:= list ->
{ok, Cache};
{value, {cache, ets}} ->
{ok, true};
{value, {cache, no}} ->
{ok, false};
{value, {unique_all, UniqueAll}} when UniqueAll; not UniqueAll ->
{ok, UniqueAll};
{value, {cache_all, CacheAll}} when CacheAll;
not CacheAll;
CacheAll =:= list ->
{ok, CacheAll};
{value, {cache_all, ets}} ->
{ok, true};
{value, {cache_all, no}} ->
{ok, false};
{value, {spawn_options, default}} ->
{ok, default};
{value, {spawn_options, SpawnOptions}} ->
case is_proper_list(SpawnOptions) of
true ->
{ok, SpawnOptions};
false ->
badarg
end;
{value, {flat, Flat}} when Flat; not Flat ->
{ok, Flat};
{value, {format, Format}} when Format =:= string;
Format =:= abstract_code;
Format =:= debug ->
{ok, Format};
{value, {n_elements, NElements}} when NElements =:= infinity;
is_integer(NElements),
NElements > 0 ->
{ok, NElements};
{value, {depth, Depth}} when Depth =:= infinity;
is_integer(Depth), Depth >= 0 ->
{ok, Depth};
{value, {order, Order}} when is_function(Order),
is_function(Order, 2);
(Order =:= ascending);
(Order =:= descending) ->
{ok, Order};
{value, {compressed, Comp}} when Comp ->
{ok, [compressed]};
{value, {compressed, Comp}} when not Comp ->
{ok, []};
{value, {tmpdir, T}} ->
{ok, T};
{value, {size, Size}} when is_integer(Size), Size > 0 ->
{ok, Size};
{value, {no_files, NoFiles}} when is_integer(NoFiles),
NoFiles > 1 ->
{ok, NoFiles};
{value, {Key, _}} ->
badarg;
false ->
Default = default_option(Key),
{ok, Default}
end,
case V of
badarg ->
badarg;
{ok, Value} ->
NewOptions = lists:keydelete(Key, 1, Options),
options(NewOptions, Keys, [Value | L])
end;
options([], [], L) ->
lists:reverse(L);
options(_Options, _, _L) ->
badarg.
default_option(pre_fun) -> undefined;
default_option(post_fun) -> undefined;
default_option(info_fun) -> undefined;
default_option(format_fun) -> undefined;
default_option(lookup_fun) -> undefined;
default_option(max_lookup) -> -1;
default_option(join) -> any;
default_option(lookup) -> any;
default_option(parent_fun) -> undefined;
default_option(key_equality) -> '=:=';
default_option(spawn_options) -> default;
default_option(flat) -> true;
default_option(format) -> string;
default_option(n_elements) -> infinity;
default_option(depth) -> infinity;
default_option(max_list_size) -> ?MAX_LIST_SIZE;
default_option(tmpdir_usage) -> allowed;
default_option(cache) -> false;
default_option(cache_all) -> false;
default_option(unique) -> false;
default_option(unique_all) -> false;
default_option(order) -> ascending; % default values from file_sorter.erl
default_option(compressed) -> [];
default_option(tmpdir) -> "";
default_option(size) -> 524288;
default_option(no_files) -> 16.
atom_option(cache) -> {cache, true};
atom_option(unique) -> {unique, true};
atom_option(cache_all) -> {cache_all, true};
atom_option(unique_all) -> {unique_all, true};
atom_option(lookup) -> {lookup, true};
atom_option(flat) -> {flat, true};
atom_option(Key) -> Key.
is_proper_list([_ | L]) ->
is_proper_list(L);
is_proper_list(L) ->
L =:= [].
spawn_options(default) ->
[link];
spawn_options(SpawnOptions) ->
lists:delete(monitor,
case lists:member(link, SpawnOptions) of
true ->
SpawnOptions;
false ->
[link | SpawnOptions]
end).
is_keypos(Keypos) when is_integer(Keypos), Keypos > 0 ->
true;
is_keypos([]) ->
false;
is_keypos(L) ->
is_keyposs(L).
is_keyposs([Kp | Kps]) when is_integer(Kp), Kp > 0 ->
is_keyposs(Kps);
is_keyposs(Kps) ->
Kps =:= [].
listify(L) when is_list(L) ->
L;
listify(T) ->
[T].
%% Optimizations to be carried out.
-record(optz,
{unique = false, % bool()
cache = false, % bool() | list
join_option = any, % constraint set by the 'join' option
fast_join = no, % no | #qlc_join. 'no' means nested loop.
opt % #qlc_opt
}).
%% Prepared #qlc_lc.
-record(qlc,
{lcf, % fun() -> Val
codef,
qdata, % with evaluated list expressions
init_value,
optz % #optz
}).
%% Prepared simple #qlc_lc.
-record(simple_qlc,
{p, % atom(), pattern variable
le,
line,
init_value,
optz % #optz
}).
-record(prepared,
{qh, % #qlc_append | #qlc_table | #qlc | #simple_qlc |
% #qlc_sort | list()
sorted = no, % yes | no | ascending | descending
sort_info = [], %
sort_info2 = [], % 'sort_info' updated with pattern info; qh is LE
lu_skip_quals = [], % qualifiers to skip due to lookup
join = {[],[]}, % {Lookup, Merge}
n_objs = undefined, % for join (not used yet)
is_unique_objects = false, % bool()
is_cached = false % bool() (true means 'ets' or 'list')
}).
%%% Cursor process functions.
cursor_process(H, GUnique, GCache, TmpDir, SpawnOptions, MaxList, TmpUsage) ->
Parent = self(),
Setup = #setup{parent = Parent},
CF = fun() ->
%% Unless exit/2 is trapped no cleanup can be done.
%% The user is assumed not to set the flag to false.
process_flag(trap_exit, true),
MonRef = erlang:monitor(process, Parent),
{Objs, Post, _LocalPost} =
try
Prep = prepare_qlc(H, not_a_list, GUnique, GCache,
TmpDir, MaxList, TmpUsage),
setup_qlc(Prep, Setup)
catch Class:Reason ->
Parent ! {self(), {caught, Class, Reason,
erlang:get_stacktrace()}},
exit(normal)
end,
Parent ! {self(), ok},
wait_for_request(Parent, MonRef, Post),
reply(Parent, MonRef, Post, Objs)
end,
Pid = spawn_opt(CF, SpawnOptions),
parent_fun(Pid, Parent).
%% Expect calls from tables calling the parent_fun and finally an 'ok'.
parent_fun(Pid, Parent) ->
receive
{Pid, ok} -> Pid;
{TPid, {parent_fun, Fun}} ->
V = try
{value, Fun()}
catch Class:Reason ->
{parent_fun_caught, Class, Reason, erlang:get_stacktrace()}
end,
TPid ! {Parent, V},
parent_fun(Pid, Parent);
{Pid, {caught, throw, Error, [?THROWN_ERROR | _]}} ->
Error;
{Pid, {caught, Class, Reason, Stacktrace}} ->
erlang:raise(Class, Reason, Stacktrace)
end.
reply(Parent, MonRef, Post, []) ->
no_more(Parent, MonRef, Post);
reply(Parent, MonRef, Post, [Answer | Cont]) ->
Parent ! {self(), {answer, Answer}},
wait_for_request(Parent, MonRef, Post),
reply(Parent, MonRef, Post, Cont);
reply(Parent, MonRef, Post, Cont) ->
Reply = try
if
is_function(Cont) ->
Cont();
true ->
throw_error(Cont)
end
catch
Class:Reason ->
post_funs(Post),
Message = {caught, Class, Reason, erlang:get_stacktrace()},
Parent ! {self(), Message},
exit(normal)
end,
reply(Parent, MonRef, Post, Reply).
no_more(Parent, MonRef, Post) ->
Parent ! {self(), no_more},
wait_for_request(Parent, MonRef, Post),
no_more(Parent, MonRef, Post).
wait_for_request(Parent, MonRef, Post) ->
receive
{Parent, stop} ->
post_funs(Post),
exit(normal);
{Parent, more} ->
ok;
{'EXIT', Parent, _Reason} ->
post_funs(Post),
exit(normal);
{'DOWN', MonRef, process, Parent, _Info} ->
post_funs(Post),
exit(normal);
{'EXIT', Pid, _Reason} when Pid =:= self() ->
%% Trapped signal. The cursor ignores it...
wait_for_request(Parent, MonRef, Post);
Other ->
error_logger:error_msg(
"The qlc cursor ~w received an unexpected message:\n~p\n",
[self(), Other]),
wait_for_request(Parent, MonRef, Post)
end.
%%% End of cursor process functions.
abstract_code({special, Line, String}) ->
{string, Line, String};
abstract_code(Tuple) when is_tuple(Tuple) ->
list_to_tuple(abstract_code(tuple_to_list(Tuple)));
abstract_code([H | T]) ->
[abstract_code(H) | abstract_code(T)];
abstract_code(Term) ->
Term.
%% Also in qlc_pt.erl.
-define(Q, q).
-define(QLC_Q(L1, L2, L3, L4, LC, Os),
{call,L1,{remote,L2,{atom,L3,?MODULE},{atom,L4,?Q}},[LC | Os]}).
abstract(Info, false=_Flat, NElements, Depth) ->
abstract(Info, NElements, Depth);
abstract(Info, true=_Flat, NElements, Depth) ->
Abstract = abstract(Info, NElements, Depth),
Vars = abstract_vars(Abstract),
{_, Body0, Expr} = flatten_abstr(Abstract, 1, Vars, []),
case Body0 of
[] ->
Expr;
[{match,_,Expr,Q}] ->
Q;
[{match,_,Expr,Q} | Body] ->
{block, 0, lists:reverse(Body, [Q])};
_ ->
{block, 0, lists:reverse(Body0, [Expr])}
end.
abstract({qlc, E0, Qs0, Opt}, NElements, Depth) ->
Qs = lists:map(fun({generate, P, LE}) ->
{generate, 1, binary_to_term(P),
abstract(LE, NElements, Depth)};
(F) ->
binary_to_term(F)
end, Qs0),
E = binary_to_term(E0),
Os = case Opt of
[] -> [];
_ -> [abstract_term(Opt, 1)]
end,
?QLC_Q(1, 1, 1, 1, {lc,1,E,Qs}, Os);
abstract({table, {M, F, As0}}, _NElements, _Depth)
when is_atom(M), is_atom(F), is_list(As0) ->
As = [abstract_term(A, 1) || A <- As0],
{call, 1, {remote, 1, {atom, 1, M}, {atom, 1, F}}, As};
abstract({table, TableDesc}, _NElements, _Depth) ->
case io_lib:deep_char_list(TableDesc) of
true ->
{ok, Tokens, _} = erl_scan:string(lists:flatten(TableDesc++".")),
{ok, [Expr]} = erl_parse:parse_exprs(Tokens),
Expr;
false -> % abstract expression
TableDesc
end;
abstract({append, Infos}, NElements, Depth) ->
As = lists:foldr(fun(Info, As0) ->
{cons,1,abstract(Info, NElements, Depth),As0}
end, {nil, 1}, Infos),
{call, 1, {remote, 1, {atom, 1, ?MODULE}, {atom, 1, append}}, [As]};
abstract({sort, Info, SortOptions}, NElements, Depth) ->
{call, 1, {remote, 1, {atom, 1, ?MODULE}, {atom, 1, sort}},
[abstract(Info, NElements, Depth), abstract_term(SortOptions, 1)]};
abstract({keysort, Info, Kp, SortOptions}, NElements, Depth) ->
{call, 1, {remote, 1, {atom, 1, ?MODULE}, {atom, 1, keysort}},
[abstract_term(Kp, 1), abstract(Info, NElements, Depth),
abstract_term(SortOptions, 1)]};
abstract({list,L,MS}, NElements, Depth) ->
{call, 1, {remote, 1, {atom, 1, ets}, {atom, 1, match_spec_run}},
[abstract(L, NElements, Depth),
{call, 1, {remote, 1, {atom, 1, ets}, {atom, 1, match_spec_compile}},
[abstract_term(depth(MS, Depth), 1)]}]};
abstract({list, L}, NElements, Depth) when NElements =:= infinity;
NElements >= length(L) ->
abstract_term(depth(L, Depth), 1);
abstract({list, L}, NElements, Depth) ->
abstract_term(depth(lists:sublist(L, NElements), Depth) ++ '...', 1).
depth(List, infinity) ->
List;
depth(List, Depth) ->
[depth1(E, Depth) || E <- List].
depth_fun(infinity = _Depth) ->
fun(E) -> E end;
depth_fun(Depth) ->
fun(E) -> depth1(E, Depth) end.
depth1([]=L, _D) ->
L;
depth1(_Term, 0) ->
'...';
depth1(Tuple, D) when is_tuple(Tuple) ->
depth_tuple(Tuple, tuple_size(Tuple), 1, D - 1, []);
depth1(List, D) when is_list(List) ->
if
D =:= 1 ->
['...'];
true ->
depth_list(List, D - 1)
end;
depth1(Binary, D) when byte_size(Binary) > D - 1 ->
D1 = D - 1,
<<Bin:D1/bytes,_/bytes>> = Binary,
<<Bin/bytes,"...">>;
depth1(T, _Depth) ->
T.
depth_list([]=L, _D) ->
L;
depth_list(_L, 0) ->
'...';
depth_list([E | Es], D) ->
[depth1(E, D) | depth_list(Es, D - 1)].
depth_tuple(_Tuple, Sz, I, _D, L) when I > Sz ->
list_to_tuple(lists:reverse(L));
depth_tuple(_L, _Sz, _I, 0, L) ->
list_to_tuple(lists:reverse(L, ['...']));
depth_tuple(Tuple, Sz, I, D, L) ->
E = depth1(element(I, Tuple), D),
depth_tuple(Tuple, Sz, I + 1, D - 1, [E | L]).
abstract_term(Term) ->
abstract_term(Term, 0).
abstract_term(Term, Line) ->
abstr_term(Term, Line).
abstr_term(Tuple, Line) when is_tuple(Tuple) ->
{tuple,Line,[abstr_term(E, Line) || E <- tuple_to_list(Tuple)]};
abstr_term([_ | _]=L, Line) ->
case io_lib:char_list(L) of
true ->
erl_parse:abstract(L, Line);
false ->
abstr_list(L, Line)
end;
abstr_term(Fun, Line) when is_function(Fun) ->
case erl_eval:fun_data(Fun) of
{fun_data, _Bs, Cs} ->
{'fun', Line, {clauses, Cs}};
false ->
{name, Name} = erlang:fun_info(Fun, name),
{arity, Arity} = erlang:fun_info(Fun, arity),
case erlang:fun_info(Fun, type) of
{type, external} ->
{module, Module} = erlang:fun_info(Fun, module),
{'fun', Line, {function,Module,Name,Arity}};
{type, local} ->
{'fun', Line, {function,Name,Arity}}
end
end;
abstr_term(PPR, Line) when is_pid(PPR); is_port(PPR); is_reference(PPR) ->
{special, Line, lists:flatten(io_lib:write(PPR))};
abstr_term(Simple, Line) ->
erl_parse:abstract(Simple, Line).
abstr_list([H | T], Line) ->
{cons, Line, abstr_term(H, Line), abstr_list(T, Line)};
abstr_list(T, Line) ->
abstr_term(T, Line).
%% Since generator pattern variables cannot be used in list
%% expressions, it is OK to flatten out QLCs using temporary
%% variables.
flatten_abstr(?QLC_Q(L1, L2, L3, L4, LC0, Os), VN0, Vars, Body0) ->
{lc,L,E,Qs0} = LC0,
F = fun({generate,Ln,P,LE0}, {VN1,Body1}) ->
{VN2,Body2,LE} = flatten_abstr(LE0, VN1, Vars, Body1),
{{generate,Ln,P,LE}, {VN2,Body2}};
(Fil, VN_Body) ->
{Fil, VN_Body}
end,
{Qs, {VN3,Body}} = lists:mapfoldl(F, {VN0,Body0}, Qs0),
LC = {lc,L,E,Qs},
{V, VN} = aux_name1('V', VN3, Vars),
Var = {var, L1, V},
QLC = ?QLC_Q(L1, L2, L3, L4, LC, Os),
{VN + 1, [{match, L1, Var, QLC} | Body], Var};
flatten_abstr(T0, VN0, Vars, Body0) when is_tuple(T0) ->
{VN, Body, L} = flatten_abstr(tuple_to_list(T0), VN0, Vars, Body0),
{VN, Body, list_to_tuple(L)};
flatten_abstr([E0 | Es0], VN0, Vars, Body0) ->
{VN1, Body1, E} = flatten_abstr(E0, VN0, Vars, Body0),
{VN, Body, Es} = flatten_abstr(Es0, VN1, Vars, Body1),
{VN, Body, [E | Es]};
flatten_abstr(E, VN, _Vars, Body) ->
{VN, Body, E}.
abstract_vars(Abstract) ->
sets:from_list(ordsets:to_list(vars(Abstract))).
collect([]=L) ->
L;
collect([Answer | Cont]) ->
[Answer | collect(Cont)];
collect(Cont) ->
case Cont() of
Answers when is_list(Answers) ->
collect(Answers);
Term ->
throw_error(Term)
end.
fold_loop(Fun, [Obj | Cont], Acc) ->
fold_loop(Fun, Cont, Fun(Obj, Acc));
fold_loop(_Fun, [], Acc) ->
Acc;
fold_loop(Fun, Cont, Acc) ->
case Cont() of
Objects when is_list(Objects) ->
fold_loop(Fun, Objects, Acc);
Term ->
Term
end.
next_loop(Pid, L, N) when N =/= 0 ->
case monitor_request(Pid, more) of
no_more ->
lists:reverse(L);
{answer, Answer} ->
next_loop(Pid, [Answer | L], N - 1);
{caught, throw, Error, [?THROWN_ERROR | _]} ->
Error;
{caught, Class, Reason, Stacktrace} ->
_ = (catch erlang:error(foo)),
erlang:raise(Class, Reason, Stacktrace ++ erlang:get_stacktrace());
error ->
erlang:error({qlc_cursor_pid_no_longer_exists, Pid})
end;
next_loop(_Pid, L, _N) ->
lists:reverse(L).
stop_cursor(Pid) ->
erlang:monitor(process, Pid),
unlink(Pid),
receive
{'EXIT',Pid,_Reason} -> % Simply ignore the error.
receive
{'DOWN',_,process,Pid,_} -> ok
end
after 0 ->
Pid ! {self(),stop},
receive
{'DOWN',_,process,Pid,_} -> ok
end
end.
monitor_request(Pid, Req) ->
Ref = erlang:monitor(process, Pid),
Pid ! {self(), Req},
receive
{'DOWN', Ref, process, Pid, _Info} ->
receive
{'EXIT', Pid, _Reason} -> ok
after 1 -> ok end,
error;
{'EXIT', Pid, _Reason} ->
receive
{'DOWN', _, process, Pid, _} -> error
end;
{Pid, Reply} ->
erlang:demonitor(Ref, [flush]),
Reply
end.
%% Marker for skipped filter or unused generator.
-define(SKIP, (-1)).
%% Qual = {gen, LE} | fil
-define(qual_data(QNum, GoToIndex, State, Qual),
{QNum, GoToIndex, State, Qual}).
-record(join, % generated by qlc_pt
{op, q1, q2, wh1, wh2, cs_fun}). % op is unused
%% le_info/1 returns an intermediate information format only used for
%% testing purposes. Changes will happen without notice.
%%
%% QueryDesc = {qlc, TemplateDesc, [QualDesc], [QueryOpt]}
%% | {table, TableDesc}
%% | {append, [QueryDesc]}
%% | {sort, QueryDesc, [SortOption]}
%% | {keysort, KeyPos, QueryDesc, [SortOption]}
%% | {list, list()}
%% | {list, QueryDesc, MatchExpression}
%% TableDesc = {Mod, Fun, Args}
%% | AbstractExpression
%% | character_list()
%% Mod = module()
%% Fun = atom()
%% Args = [term()]
%% QualDesc = FilterDesc
%% | {generate, PatternDesc, QueryDesc}
%% QueryOpt = {cache, bool()} | cache
%% | {unique, bool()} | unique
%% FilterDesc = PatternDesc = TemplateDesc = binary()
le_info(#prepared{qh = #simple_qlc{le = LE, p = P, line = L, optz = Optz}},
InfOpt) ->
QVar = term_to_binary({var, L, P}),
{qlc, QVar, [{generate, QVar, le_info(LE, InfOpt)}], opt_info(Optz)};
le_info(#prepared{qh = #qlc{codef = CodeF, qdata = Qdata, optz = Optz}},
InfOpt) ->
Code = CodeF(),
TemplateState = template_state(),
E = element(TemplateState, Code),
QualInfo0 = qual_info(Qdata, Code, InfOpt),
QualInfo1 = case Optz#optz.fast_join of
#qlc_join{} = Join ->
join_info(Join, QualInfo0, Qdata, Code);
no ->
QualInfo0
end,
QualInfo = [I || I <- QualInfo1, I =/= skip],
{qlc, E, QualInfo, opt_info(Optz)};
le_info(#prepared{qh = #qlc_table{format_fun = FormatFun, trav_MS = TravMS,
ms = MS, lu_vals = LuVals}}, InfOpt) ->
{NElements, Depth} = InfOpt,
%% The 'depth' option applies to match specifications as well.
%% This is for limiting imported variables (parameters).
DepthFun = depth_fun(Depth),
case LuVals of
_ when FormatFun =:= undefined ->
{table, {'$MOD', '$FUN', []}};
{Pos, Vals} ->
Formated = try FormatFun({lookup, Pos, Vals, NElements, DepthFun})
catch _:_ -> FormatFun({lookup, Pos, Vals})
end,
if
MS =:= no_match_spec ->
{table, Formated};
true ->
{list, {table, Formated}, depth(MS, Depth)}
end;
_ when TravMS, is_list(MS) ->
{table, FormatFun({match_spec, depth(MS, Depth)})};
_ when MS =:= no_match_spec ->
try {table, FormatFun({all, NElements, DepthFun})}
catch _:_ -> {table, FormatFun(all)}
end
end;
le_info(#prepared{qh = #qlc_append{hl = HL}}, InfOpt) ->
{append, [le_info(H, InfOpt) || H <- HL]};
le_info(#prepared{qh = #qlc_sort{h = H, keypos = sort,
fs_opts = SortOptions0, tmpdir = TmpDir}},
InfOpt) ->
SortOptions = sort_options_global_tmp(SortOptions0, TmpDir),
{sort, le_info(H, InfOpt), SortOptions};
le_info(#prepared{qh = #qlc_sort{h = H, keypos = {keysort, Kp},
fs_opts = SortOptions0, tmpdir = TmpDir}},
InfOpt) ->
SortOptions = sort_options_global_tmp(SortOptions0, TmpDir),
{keysort, le_info(H, InfOpt), Kp, SortOptions};
le_info(#prepared{qh = #qlc_list{l = L, ms = no_match_spec}}, _InfOpt) ->
{list, L};
le_info(#prepared{qh = #qlc_list{l = L, ms = MS}},_InfOpt) when is_list(L) ->
{list, {list, L}, MS};
le_info(#prepared{qh = #qlc_list{l = L, ms = MS}}, InfOpt) ->
{list, le_info(L, InfOpt), MS}.
qual_info([?qual_data(_QNum, _GoI, ?SKIP, fil) | Qdata], Code, InfOpt) ->
%% see skip_lookup_filters()
[skip | qual_info(Qdata, Code, InfOpt)];
qual_info([?qual_data(QNum, _GoI, _SI, fil) | Qdata], Code, InfOpt) ->
[element(QNum + 1, Code) | qual_info(Qdata, Code, InfOpt)];
qual_info([?qual_data(_QNum, _GoI, _SI, {gen,#join{}}) | Qdata],
Code, InfOpt) ->
[skip | qual_info(Qdata, Code, InfOpt)];
qual_info([?qual_data(QNum, _GoI, _SI, {gen,LE}) | Qdata], Code, InfOpt) ->
[{generate,element(QNum + 1, Code),le_info(LE, InfOpt)} |
qual_info(Qdata, Code, InfOpt)];
qual_info([], _Code, _InfOpt) ->
[].
join_info(Join, QInfo, Qdata, Code) ->
#qlc_join{kind = Kind, q1 = QNum1a, c1 = C1, q2 = QNum2a, c2 = C2,
opt = Opt} = Join,
{?qual_data(JQNum,_,_,_), Rev, QNum1, QNum2, _WH1, _WH2, CsFun} =
find_join_data(Qdata, QNum1a, QNum2a),
{Cs1_0, Cs2_0, Compat} = CsFun(),
[Cs1, Cs2] = case Compat of
[] -> % --R12B-5
[[{C,[{V,'=:='} || V <- Vs]} || {C,Vs} <- CVs] ||
CVs <- [Cs1_0, Cs2_0]];
_ -> % 'v1', R13A --
%% Only compared constants (==).
[Cs1_0, Cs2_0]
end,
L = 0,
G1_0 = {var,L,'G1'}, G2_0 = {var,L,'G2'},
JP = element(JQNum + 1, Code),
%% Create code for wh1 and wh2 in #join{}:
{{I1,G1}, {I2,G2}, QInfoL} =
case Kind of
{merge, _} ->
{JG1,QInfo1} = join_merge_info(QNum1, QInfo, Code, G1_0, Cs1),
{JG2,QInfo2} = join_merge_info(QNum2, QInfo, Code, G2_0, Cs2),
{JG1, JG2, QInfo1 ++ QInfo2};
_ when Rev ->
{JG2,QInfo2} = join_merge_info(QNum2, QInfo, Code, G2_0, Cs2),
{J1, QInfo1} = join_lookup_info(QNum1, QInfo, G1_0),
{{J1,G1_0}, JG2, QInfo2 ++ [QInfo1]};
_ ->
{JG1,QInfo1} = join_merge_info(QNum1, QInfo, Code, G1_0, Cs1),
{J2, QInfo2} = join_lookup_info(QNum2, QInfo, G2_0),
{JG1, {J2,G2_0}, QInfo1 ++ [QInfo2]}
end,
{JOptVal, JOp} = kind2op(Kind),
JOpt = [{join, JOptVal}] ++ opt_info(join_unique_cache(Opt)),
JFil = term_to_binary({op,L,JOp,
{call,L,{atom,L,element},[{integer,L,C1},G1]},
{call,L,{atom,L,element},[{integer,L,C2},G2]}}),
P = term_to_binary({cons, L, G1, G2}),
JInfo = {generate, JP, {qlc, P, QInfoL ++ [JFil], JOpt}},
{Before, [I1 | After]} = lists:split(QNum1 - 1, QInfo),
Before ++ [JInfo] ++ lists:delete(I2, After).
kind2op({merge, _KE}) -> {merge, '=='};
kind2op({lookup, KE, _LU_fun}) -> {lookup, KE}.
%% qlc:q(P0 || P0 = Pattern <- H1, ConstFilters),
%% where "P0" is a fresh variable and ConstFilters are filters that
%% test constant values of pattern columns.
join_merge_info(QNum, QInfo, Code, G, ExtraConstants) ->
{generate, _, LEInfo}=I = lists:nth(QNum, QInfo),
P = binary_to_term(element(QNum + 1, Code)),
case {P, ExtraConstants} of
{{var, _, _}, []} ->
%% No need to introduce a QLC.
TP = term_to_binary(G),
I2 = {generate, TP, LEInfo},
{{I,G}, [I2]};
_ ->
{EPV, M} =
case P of
{var, _, _} ->
%% No need to introduce a pattern variable.
{P, P};
_ ->
{PV, _} = aux_name1('P', 0, abstract_vars(P)),
L = 0,
V = {var, L, PV},
{V, {match, L, V, P}}
end,
DQP = term_to_binary(EPV),
LEI = {generate, term_to_binary(M), LEInfo},
TP = term_to_binary(G),
CFs = [begin
Call = {call,0,{atom,0,element},[{integer,0,Col},EPV]},
F = list2op([{op,0,Op,abstract_term(Con),Call}
|| {Con,Op} <- ConstOps], 'or'),
term_to_binary(F)
end ||
{Col,ConstOps} <- ExtraConstants],
{{I,G}, [{generate, TP, {qlc, DQP, [LEI | CFs], []}}]}
end.
list2op([E], _Op) ->
E;
list2op([E | Es], Op) ->
{op,0,Op,E,list2op(Es, Op)}.
join_lookup_info(QNum, QInfo, G) ->
{generate, _, LEInfo}=I = lists:nth(QNum, QInfo),
TP = term_to_binary(G),
{I, {generate, TP, LEInfo}}.
opt_info(#optz{unique = Unique, cache = Cache0, join_option = JoinOption}) ->
%% No 'nested_loop' options are added here, even if there are
%% nested loops to carry out, unless a 'nested_loop' was given as
%% option. The reason is that the qlc module does not know about
%% all instances of nested loops.
Cache = if
Cache0 -> ets;
true -> Cache0
end,
[{T,V} || {T,V} <- [{cache,Cache},{unique,Unique}],
V =/= default_option(T)] ++
[{T,V} || {T,V} <- [{join,JoinOption}], V =:= nested_loop].
prepare_qlc(H, InitialValue, GUnique, GCache, TmpDir, MaxList, TmpUsage) ->
GOpt = #qlc_opt{unique = GUnique, cache = GCache,
tmpdir = TmpDir, max_list = MaxList,
tmpdir_usage = TmpUsage},
case opt_le(prep_le(H, GOpt), 1) of
#prepared{qh = #qlc{} = QLC}=Prep ->
Prep#prepared{qh = QLC#qlc{init_value = InitialValue}};
#prepared{qh = #simple_qlc{}=SimpleQLC}=Prep ->
Prep#prepared{qh = SimpleQLC#simple_qlc{init_value = InitialValue}};
Prep ->
Prep
end.
%%% The options given to append, q and table (unique and cache) as well
%%% as the type of expression (list, table, append, qlc...) are
%%% analyzed by prep_le. The results are is_unique_objects and
%%% is_cached. It is checked that the evaluation (in the Erlang sense)
%%% of list expressions yields qlc handles.
prep_le(#qlc_lc{lc = LC_fun, opt = #qlc_opt{} = Opt0}=H, GOpt) ->
#qlc_opt{unique = GUnique, cache = GCache,
tmpdir = TmpDir, max_list = MaxList,
tmpdir_usage = TmpUsage} = GOpt,
Unique = Opt0#qlc_opt.unique or GUnique,
Cache = if
not GCache -> Opt0#qlc_opt.cache;
true -> GCache
end,
Opt = Opt0#qlc_opt{unique = Unique, cache = Cache,
tmpdir = TmpDir, max_list = MaxList,
tmpdir_usage = TmpUsage},
prep_qlc_lc(LC_fun(), Opt, GOpt, H);
prep_le(#qlc_table{info_fun = IF}=T, GOpt) ->
{SortInfo, Sorted} = table_sort_info(T),
IsUnique = grd(IF, is_unique_objects),
Prep = #prepared{qh = T, sort_info = SortInfo, sorted = Sorted,
is_unique_objects = IsUnique},
Opt = if
IsUnique or not GOpt#qlc_opt.unique,
T#qlc_table.ms =:= no_match_spec ->
GOpt#qlc_opt{cache = false};
true ->
GOpt
end,
may_create_simple(Opt, Prep);
prep_le(#qlc_append{hl = HL}, GOpt) ->
case lists:flatmap(fun(#prepared{qh = #qlc_list{l = []}}) -> [];
(#prepared{qh = #qlc_append{hl = HL1}}) -> HL1;
(H) -> [H] end,
[prep_le(H, GOpt) || H <- HL]) of
[]=Nil ->
short_list(Nil);
[Prep] ->
Prep;
PrepL ->
Cache = lists:all(fun(#prepared{is_cached = IsC}) -> IsC =/= false
end, PrepL),
%% The handles in hl are replaced by prepared handles:
Prep = #prepared{qh = #qlc_append{hl = PrepL}, is_cached = Cache},
may_create_simple(GOpt, Prep)
end;
prep_le(#qlc_sort{h = H0}=Q0, GOpt) ->
%% The handle h is replaced by a prepared handle:
Q = Q0#qlc_sort{h = prep_le(H0, GOpt)},
prep_sort(Q, GOpt);
prep_le([_, _ | _]=L, GOpt) ->
Prep = #prepared{qh = #qlc_list{l = L}, is_cached = true},
Opt = if
not GOpt#qlc_opt.unique ->
GOpt#qlc_opt{cache = false};
true -> GOpt
end,
may_create_simple(Opt, Prep);
prep_le(L, _GOpt) when is_list(L) ->
short_list(L);
prep_le(T, _GOpt) ->
erlang:error({unsupported_qlc_handle, #qlc_handle{h = T}}).
eval_le(LE_fun, GOpt) ->
case LE_fun() of
{error, ?MODULE, _} = Error ->
throw_error(Error);
R ->
case get_handle(R) of
badarg ->
erlang:error(badarg, [R]);
H ->
prep_le(H, GOpt)
end
end.
prep_qlc_lc({simple_v1, PVar, LE_fun, L}, Opt, GOpt, _H) ->
check_lookup_option(Opt, false),
prep_simple_qlc(PVar, L, eval_le(LE_fun, GOpt), Opt);
prep_qlc_lc({qlc_v1, QFun, CodeF, Qdata0, QOpt}, Opt, GOpt, _H) ->
F = fun(?qual_data(_QNum, _GoI, _SI, fil)=QualData, ModGens) ->
{QualData, ModGens};
(?qual_data(_QNum, _GoI, _SI, {gen, #join{}})=QualData, ModGens) ->
{QualData, ModGens};
(?qual_data(QNum, GoI, SI, {gen, LE_fun}), ModGens0) ->
Prep1 = eval_le(LE_fun, GOpt),
{Prep, ModGens} =
prep_generator(QNum, Prep1, QOpt, Opt, ModGens0),
{?qual_data(QNum, GoI, SI, {gen, Prep}), ModGens}
end,
{Qdata, ModGens} = lists:mapfoldl(F, [], Qdata0),
SomeLookUp = lists:keymember(true, 2, ModGens) =/= false,
check_lookup_option(Opt, SomeLookUp),
case ModGens of
[{_QNum, _LookUp, all, OnePrep}] ->
check_join_option(Opt),
OnePrep;
_ ->
Prep0 = prep_qlc(QFun, CodeF, Qdata, QOpt, Opt),
LU_SkipQuals =
lists:flatmap(fun({QNum,_LookUp,Fs,_Prep}) -> [{QNum,Fs}]
end, ModGens),
Prep1 = Prep0#prepared{lu_skip_quals = LU_SkipQuals},
prep_join(Prep1, QOpt, Opt)
end;
prep_qlc_lc(_, _Opt, _GOpt, H) ->
erlang:error({unsupported_qlc_handle, #qlc_handle{h = H}}).
prep_generator(QNum, Prep0, QOpt, Opt, ModGens) ->
PosFun = fun(KeyEquality) -> pos_fun(KeyEquality, QOpt, QNum) end,
MSFs = case match_specs(QOpt, QNum) of
undefined ->
{no_match_spec, []};
{_, _}=MSFs0 ->
MSFs0
end,
#prepared{qh = LE} = Prep0,
case prep_gen(LE, Prep0, PosFun, MSFs, Opt) of
{replace, Fs, LookUp, Prep} ->
{Prep, [{QNum,LookUp,Fs,Prep} | ModGens]};
{skip, SkipFils, LookUp, Prep} ->
{Prep, [{QNum,LookUp,SkipFils,Prep} | ModGens]};
{no, _Fs, _LookUp, Prep} ->
{Prep, ModGens}
end.
pos_fun(undefined, QOpt, QNum) ->
{'=:=', constants(QOpt, QNum)}; %% --R12B-5
pos_fun('=:=', QOpt, QNum) ->
{'=:=', constants(QOpt, QNum)};
pos_fun('==', QOpt, QNum) ->
try {'==', equal_constants(QOpt, QNum)} % R13A--
catch _:_ -> {'=:=', constants(QOpt, QNum)}
end.
prep_gen(#qlc_table{lu_vals = LuV0, ms = MS0, trav_MS = TravMS,
info_fun = IF, lookup_fun = LU_fun,
key_equality = KeyEquality}=LE0,
Prep0, PosFun0, {MS, Fs}, Opt) ->
PosFun = PosFun0(KeyEquality),
{LuV, {STag,SkipFils}} = find_const_positions(IF, LU_fun, PosFun, Opt),
LU = LuV =/= false,
if
LuV0 =/= undefined; MS0 =/= no_match_spec ->
{no, [], false, Prep0};
MS =/= no_match_spec, LU ->
MS1 = if
Fs =:= SkipFils; STag =:= Fs ->
%% The guard of the match specification
%% is covered by the lookup.
case MS of
[{'$1',_Guard,['$1']}] -> % no transformation
no_match_spec;
[{Head,_Guard,Body}] ->
[{Head,[],Body}] % true guard
end;
true ->
MS
end,
Prep = Prep0#prepared{qh = LE0#qlc_table{lu_vals = LuV,ms = MS1}},
{replace, Fs, LU, Prep};
LU ->
Prep = Prep0#prepared{qh = LE0#qlc_table{lu_vals = LuV}},
{skip, SkipFils, LU, Prep};
TravMS, MS =/= no_match_spec ->
Prep = Prep0#prepared{qh = LE0#qlc_table{ms = MS},
is_unique_objects = false},
{replace, Fs, false, may_create_simple(Opt, Prep)};
true ->
{no, [], false, Prep0}
end;
prep_gen(#qlc_list{l = []}, Prep0, _PosFun, {_MS, Fs}, _Opt) ->
%% unique and cached
{replace, Fs, false, Prep0};
prep_gen(#qlc_list{ms = no_match_spec}=LE0, Prep0, _PosFun, {MS, Fs}, Opt)
when MS =/= no_match_spec ->
Prep = Prep0#prepared{qh = LE0#qlc_list{ms = MS},
is_cached = false},
{replace, Fs, false, may_create_simple(Opt, Prep)};
prep_gen(#qlc_list{}, Prep0, _PosFun, {MS, Fs}, Opt)
when MS =/= no_match_spec ->
ListMS = #qlc_list{l = Prep0, ms = MS},
LE = #prepared{qh = ListMS, is_cached = false},
{replace, Fs, false, may_create_simple(Opt, LE)};
prep_gen(_LE0, Prep0, _PosFun, _MSFs, _Opt) ->
{no, [], false, Prep0}.
-define(SIMPLE_QVAR, 'SQV').
may_create_simple(#qlc_opt{unique = Unique, cache = Cache} = Opt,
#prepared{is_cached = IsCached,
is_unique_objects = IsUnique} = Prep) ->
if
Unique and not IsUnique;
(Cache =/= false) and not IsCached ->
prep_simple_qlc(?SIMPLE_QVAR, 1, Prep, Opt);
true ->
Prep
end.
prep_simple_qlc(PVar, Line, LE, Opt) ->
check_join_option(Opt),
#prepared{is_cached = IsCached,
sort_info = SortInfo, sorted = Sorted,
is_unique_objects = IsUnique} = LE,
#qlc_opt{unique = Unique, cache = Cache} = Opt,
Cachez = if
Unique -> Cache;
not IsCached -> Cache;
true -> false
end,
Optz = #optz{unique = Unique and not IsUnique,
cache = Cachez, opt = Opt},
QLC = #simple_qlc{p = PVar, le = LE, line = Line,
init_value = not_a_list, optz = Optz},
%% LE#prepared.join is not copied
#prepared{qh = QLC, is_unique_objects = IsUnique or Unique,
sort_info = SortInfo, sorted = Sorted,
is_cached = IsCached or (Cachez =/= false)}.
prep_sort(#qlc_sort{h = #prepared{sorted = yes}=Prep}, _GOpt) ->
Prep;
prep_sort(#qlc_sort{h = #prepared{is_unique_objects = IsUniqueObjs}}=Q,
GOpt) ->
S1 = sort_unique(IsUniqueObjs, Q),
S2 = sort_tmpdir(S1, GOpt),
S = S2#qlc_sort{tmpdir_usage = GOpt#qlc_opt.tmpdir_usage},
{SortInfo, Sorted} = sort_sort_info(S),
#prepared{qh = S, is_cached = true, sort_info = SortInfo,
sorted = Sorted,
is_unique_objects = S#qlc_sort.unique or IsUniqueObjs}.
prep_qlc(QFun, CodeF, Qdata0, QOpt, Opt) ->
#qlc_opt{unique = Unique, cache = Cache, join = Join} = Opt,
Optz = #optz{unique = Unique, cache = Cache,
join_option = Join, opt = Opt},
{Qdata, SortInfo} = qlc_sort_info(Qdata0, QOpt),
QLC = #qlc{lcf = QFun, codef = CodeF, qdata = Qdata,
init_value = not_a_list, optz = Optz},
#prepared{qh = QLC, sort_info = SortInfo,
is_unique_objects = Unique,
is_cached = Cache =/= false}.
%% 'sorted', 'sorted_info', and 'sorted_info2' are used to avoid
%% sorting on a key when there is no need to sort on the key. 'sorted'
%% is set by qlc:sort() only; its purpose is to assure that if columns
%% 1 to i are constant, then column i+1 is key-sorted (always true if
%% the tuples are sorted). Note: the implementation is (too?) simple.
%% For instance, each column is annotated with 'ascending' or
%% 'descending' (not yet). More exact would be, as examples, 'always
%% ascending' and 'ascending if all preceding columns are constant'.
%%
%% The 'size' of the template is not used (size_of_qualifier(QOpt, 0)).
qlc_sort_info(Qdata0, QOpt) ->
F = fun(?qual_data(_QNum, _GoI, _SI, fil)=Qd, Info) ->
{Qd, Info};
(?qual_data(_QNum, _GoI, _SI, {gen, #join{}})=Qd, Info) ->
{Qd, Info};
(?qual_data(QNum, GoI, SI, {gen, PrepLE0}), Info) ->
PrepLE = sort_info(PrepLE0, QNum, QOpt),
Qd = ?qual_data(QNum, GoI, SI, {gen, PrepLE}),
I = [{{Column,Order}, [{traverse,QNum,C}]} ||
{{C,Order},What} <- PrepLE#prepared.sort_info2,
What =:= [], % Something else later...
Column <- equal_template_columns(QOpt, {QNum,C})],
{Qd, [I | Info]}
end,
{Qdata, SortInfoL} = lists:mapfoldl(F, [], Qdata0),
SortInfo0 = [{{Pos,Ord}, [template]} ||
Pos <- constant_columns(QOpt, 0),
Ord <- orders(yes)]
++ lists:append(SortInfoL),
SortInfo = family_union(SortInfo0),
{Qdata, SortInfo}.
sort_info(#prepared{sort_info = SI, sorted = S} = Prep, QNum, QOpt) ->
SI1 = [{{C,Ord},[]} ||
S =/= no,
is_integer(Sz = size_of_qualifier(QOpt, QNum)),
Sz > 0, % the size of the pattern
(NConstCols = size_of_constant_prefix(QOpt, QNum)) < Sz,
C <- [NConstCols+1],
Ord <- orders(S)]
++ [{{Pos,Ord},[]} || Pos <- constant_columns(QOpt, QNum),
Ord <- orders(yes)]
++ [{PosOrd,[]} || {PosOrd,_} <- SI],
SI2 = lists:usort(SI1),
Prep#prepared{sort_info2 = SI2}.
%orders(descending=O) ->
% [O];
orders(ascending=O) ->
[O];
orders(yes) ->
[ascending
% ,descending
].
sort_unique(true, #qlc_sort{fs_opts = SortOptions, keypos = sort}=Sort) ->
Sort#qlc_sort{unique = false,
fs_opts =
lists:keydelete(unique, 1,
lists:delete(unique, SortOptions))};
sort_unique(_, Sort) ->
Sort.
sort_tmpdir(S, #qlc_opt{tmpdir = ""}) ->
S;
sort_tmpdir(S, Opt) ->
S#qlc_sort{tmpdir = Opt#qlc_opt.tmpdir}.
short_list(L) ->
%% length(L) < 2: all elements are known be equal
#prepared{qh = #qlc_list{l = L}, sorted = yes, is_unique_objects = true,
is_cached = true}.
find_const_positions(IF, LU_fun, {KeyEquality, PosFun},
#qlc_opt{max_lookup = Max, lookup = Lookup})
when is_function(LU_fun), is_function(PosFun), is_function(IF),
Lookup =/= false ->
case call(IF, keypos, undefined, []) of
undefined ->
Indices = call(IF, indices, undefined, []),
find_const_position_idx(Indices, KeyEquality, PosFun, Max, []);
KeyPos ->
case pos_vals(KeyPos, KeyEquality, PosFun(KeyPos), Max) of
false ->
find_const_position_idx(IF(indices), KeyEquality,
PosFun, Max, []);
PosValuesSkip ->
PosValuesSkip
end
end;
find_const_positions(_IF, _LU_fun, _KE_PosFun, _Opt0) ->
{false, {some,[]}}.
find_const_position_idx([I | Is], KeyEquality, PosFun, Max, L0) ->
case pos_vals(I, KeyEquality, PosFun(I), Max) of
false ->
find_const_position_idx(Is, KeyEquality, PosFun, Max, L0);
{{_Pos, Values}, _SkipFils}=PosValuesFils ->
L = [{length(Values), PosValuesFils} | L0],
find_const_position_idx(Is, KeyEquality, PosFun, Max, L)
end;
find_const_position_idx(_, _KeyEquality, _PosFun, _Max, []) ->
{false, {some,[]}};
find_const_position_idx(_, _KeyEquality, _PosFun, _Max, L) ->
[{_,PVF} | _] = lists:sort(L),
PVF.
pos_vals(Pos, '==', {usort_needed, Values, SkipFils}, Max) ->
pos_vals_max(Pos, lists:usort(Values), SkipFils, Max);
pos_vals(Pos, '=:=', {usort_needed, Values, SkipFils}, Max) ->
pos_vals_max(Pos, lists:sort(nub(Values)), SkipFils, Max);
pos_vals(Pos, _KeyEquality, {values, Values, SkipFils}, Max) ->
pos_vals_max(Pos, Values, SkipFils, Max);
pos_vals(_Pos, _KeyEquality, _T, _Max) ->
false.
nub([]) ->
[];
nub([E | L]) ->
case lists:member(E, Es=nub(L)) of
true ->
Es;
false ->
[E | Es]
end.
%% length(Values) >= 1
pos_vals_max(Pos, Values, Skip, Max) when Max =:= -1; Max >= length(Values) ->
{{Pos, Values}, Skip};
pos_vals_max(_Pos, _Value, _Skip, _Max) ->
false.
prep_join(Prep, QOpt, Opt) ->
case join_opt(QOpt) of
undefined ->
check_join_option(Opt),
Prep;
EqualMatch ->
{Ix, M} = case EqualMatch of
{NEqual, NMatch} ->
pref_join(NEqual, NMatch, Prep, QOpt, Opt);
EM ->
pref_join(EM, EM, Prep, QOpt, Opt)
end,
SI = family_union(Prep#prepared.sort_info ++ M),
Prep#prepared{join = {Ix, M}, sort_info = SI}
end.
%% The parse transform ensures that only two tables are involved.
pref_join(Equal, Match, Prep, QOpt, #qlc_opt{join = JoinOpt}) ->
JQs = [{KeyEquality, QCs} ||
{KeyEquality, QCsL} <- [{'==',Equal}, {'=:=',Match}],
QCs <- QCsL],
IxL = [pref_lookup_join(KE, QCs, Prep, QOpt) ||
JoinOpt =:= any orelse JoinOpt =:= lookup,
{KE, QCs} <- JQs],
ML = [pref_merge_join(KE, QCs, Prep, QOpt) ||
JoinOpt =:= any orelse JoinOpt =:= merge,
{KE, QCs} <- JQs],
{lists:usort(lists:append(IxL)), lists:usort(lists:append(ML))}.
pref_lookup_join(KeyEquality, {[{Q1,C1},{Q2,C2}],Skip}, Prep, QOpt)
when is_integer(C1), is_integer(C2) ->
#prepared{qh = #qlc{qdata = QData}} = Prep,
Is1 = lookup_qual_data(QData, Q1, KeyEquality),
Lu2 = [pref_lookup_join2(Q2, C2, Q1, C1, Skip, QOpt, KeyEquality) ||
IC1 <- Is1, IC1 =:= C1],
Is2 = lookup_qual_data(QData, Q2, KeyEquality),
Lu1 = [pref_lookup_join2(Q1, C1, Q2, C2, Skip, QOpt, KeyEquality) ||
IC2 <- Is2, IC2 =:= C2],
family(Lu1 ++ Lu2);
pref_lookup_join(KE, [{_,Cs1},{_,Cs2}]=L, Prep, QOpt) when is_list(Cs1),
is_list(Cs2) ->
%% --R12B-5
lists:append([pref_lookup_join(KE, QC,Prep,QOpt) ||
QC <- selections_no_skip(L)]).
lookup_qual_data(QData, QNum, KeyEquality) ->
case lists:keysearch(QNum, 1, QData) of
{value, ?qual_data(QNum, _, _, {gen, PrepLE})} ->
join_indices(PrepLE, KeyEquality)
end.
%% If the table has a match specification (ms =/= no_match_spec) that
%% has _not_ been derived from a filter but from a query handle then
%% the lookup join cannot be done. This particular case has not been
%% excluded here but is taken care of in opt_join().
join_indices(#prepared{qh = #qlc_table{info_fun = IF,
lookup_fun = LU_fun,
key_equality = KeyEquality,
lu_vals = undefined}},
KE) when is_function(LU_fun),
KE =:= KeyEquality orelse
KE =:= '=:=' andalso
KeyEquality =:= undefined -> % --R12B-5
KpL = case call(IF, keypos, undefined, []) of
undefined -> [];
Kp -> [Kp]
end,
case call(IF, indices, undefined, []) of
undefined -> KpL;
Is0 -> lists:usort(KpL ++ Is0)
end;
join_indices(_Prep, _KeyEquality) ->
[].
pref_lookup_join2(Q1, C1, Q2, C2, Skip, QOpt, KeyEquality) ->
TemplCols = compared_template_columns(QOpt, {Q1,C1}, KeyEquality),
{{Q1,C1,Q2,C2},{lookup_join,TemplCols,KeyEquality,Skip}}.
pref_merge_join(KE, {[{Q1,C1},{Q2,C2}],Skip}, Prep, QOpt)
when is_integer(C1), is_integer(C2) ->
#prepared{qh = #qlc{qdata = QData}} = Prep,
Sort1 = merge_qual_data(QData, Q1),
Sort2 = merge_qual_data(QData, Q2),
Merge = pref_merge(KE, Q1, C1, Q2, C2, Skip, Sort1, Sort2, QOpt),
family_union(Merge);
pref_merge_join(KE, [{_,Cs1},{_,Cs2}]=L, Prep, QOpt) when is_list(Cs1),
is_list(Cs2) ->
%% --R12B-5
lists:append([pref_merge_join(KE, QC, Prep, QOpt) ||
QC <- selections_no_skip(L)]).
selections_no_skip(L) ->
[{C,{some,[]}} || C <- all_selections(L)].
merge_qual_data(QData, QNum) ->
case lists:keysearch(QNum, 1, QData) of
{value, ?qual_data(QNum, _, _, {gen, PrepLE})} ->
#prepared{sort_info2 = SortInfo} = PrepLE,
SortInfo
end.
pref_merge(KE, Q1, C1, Q2, C2, Skip, Sort1, Sort2, QOpt) ->
Col1 = {Q1,C1},
Col2 = {Q2,C2},
DoSort = [QC || {{_QNum,Col}=QC,SortL} <- [{Col1,Sort1}, {Col2,Sort2}],
lists:keymember({Col, ascending}, 1, SortL) =:= false],
J = [{{Q1,C1,Q2,C2}, {merge_join,DoSort,KE,Skip}}],
%% true = (QOpt(template))(Col1, '==') =:= (QOpt(template))(Col2, '==')
[{{Column, ascending}, J} ||
Column <- equal_template_columns(QOpt, Col1)] ++ [{other, J}].
table_sort_info(#qlc_table{info_fun = IF}) ->
case call(IF, is_sorted_key, undefined, []) of
undefined ->
{[], no};
false ->
{[], no};
true ->
case call(IF, keypos, undefined, []) of
undefined -> % strange
{[], no};
KeyPos ->
{[{{KeyPos,ascending},[]}], no}
end
end.
sort_sort_info(#qlc_sort{keypos = sort, order = Ord0}) ->
{[], sort_order(Ord0)};
sort_sort_info(#qlc_sort{keypos = {keysort,Kp0}, order = Ord0}) ->
Kp = case Kp0 of
[Pos | _] -> Pos;
_ -> Kp0
end,
{[{{Kp,sort_order(Ord0)},[]}], no}.
sort_order(F) when is_function(F) ->
no;
sort_order(Order) ->
Order.
check_join_option(#qlc_opt{join = any}) ->
ok;
check_join_option(#qlc_opt{join = Join}) ->
erlang:error(no_join_to_carry_out, [{join,Join}]).
check_lookup_option(#qlc_opt{lookup = true}, false) ->
erlang:error(no_lookup_to_carry_out, [{lookup,true}]);
check_lookup_option(_QOpt, _LuV) ->
ok.
compared_template_columns(QOpt, QNumColumn, KeyEquality) ->
(QOpt(template))(QNumColumn, KeyEquality).
equal_template_columns(QOpt, QNumColumn) ->
(QOpt(template))(QNumColumn, '==').
%eq_template_columns(QOpt, QNumColumn) ->
% (QOpt(template))(QNumColumn, '=:=').
size_of_constant_prefix(QOpt, QNum) ->
(QOpt(n_leading_constant_columns))(QNum).
constants(QOpt, QNum) ->
(QOpt(constants))(QNum).
equal_constants(QOpt, QNum) ->
(QOpt(equal_constants))(QNum).
join_opt(QOpt) ->
QOpt(join).
match_specs(QOpt, QNum) ->
(QOpt(match_specs))(QNum).
constant_columns(QOpt, QNum) ->
(QOpt(constant_columns))(QNum).
size_of_qualifier(QOpt, QNum) ->
(QOpt(size))(QNum).
%% Two optimizations are carried out:
%% 1. The first generator is never cached if the QLC itself is cached.
%% Since the answers do not need to be cached, the top-most QLC is
%% never cached either. Simple QLCs not holding any options are
%% removed. Simple QLCs are coalesced when possible.
%% 2. Merge join and lookup join is done if possible.
opt_le(#prepared{qh = #simple_qlc{le = LE0, optz = Optz0}=QLC}=Prep0,
GenNum) ->
case LE0 of
#prepared{qh = #simple_qlc{p = LE_Pvar, le = LE2, optz = Optz2}} ->
%% Coalesce two simple QLCs.
Cachez = case Optz2#optz.cache of
false -> Optz0#optz.cache;
Cache2 -> Cache2
end,
Optz = Optz0#optz{cache = Cachez,
unique = Optz0#optz.unique or Optz2#optz.unique},
PVar = if
LE_Pvar =:= ?SIMPLE_QVAR -> QLC#simple_qlc.p;
true -> LE_Pvar
end,
Prep = Prep0#prepared{qh = QLC#simple_qlc{p = PVar, le = LE2,
optz = Optz}},
opt_le(Prep, GenNum);
_ ->
Optz1 = no_cache_of_first_generator(Optz0, GenNum),
case {opt_le(LE0, 1), Optz1} of
{LE, #optz{unique = false, cache = false}} ->
LE;
{LE, _} ->
Prep0#prepared{qh = QLC#simple_qlc{le = LE, optz = Optz1}}
end
end;
opt_le(#prepared{qh = #qlc{}, lu_skip_quals = LU_SkipQuals0}=Prep0, GenNum) ->
#prepared{qh = #qlc{qdata = Qdata0, optz = Optz0}=QLC} = Prep0,
#optz{join_option = JoinOption, opt = Opt} = Optz0,
JoinOption = Optz0#optz.join_option,
{LU_QNum, Join, JoinSkipFs, DoSort} =
opt_join(Prep0#prepared.join, JoinOption, Qdata0, Opt, LU_SkipQuals0),
{LU_Skip, LU_SkipQuals} =
lists:partition(fun({QNum,_Fs}) -> QNum =:= LU_QNum end,
LU_SkipQuals0),
LU_SkipFs = lists:flatmap(fun({_QNum,Fs}) -> Fs end, LU_SkipQuals),
%% If LU_QNum has a match spec it must be applied _after_ the
%% lookup join (the filter must not be skipped!).
Qdata1 = if
LU_Skip =:= [] -> Qdata0;
true -> activate_join_lookup_filter(LU_QNum, Qdata0)
end,
Qdata2 = skip_lookup_filters(Qdata1, LU_SkipFs ++ JoinSkipFs),
F = fun(?qual_data(QNum, GoI, SI, {gen, #prepared{}=PrepLE}), GenNum1) ->
NewPrepLE = maybe_sort(PrepLE, QNum, DoSort, Opt),
{?qual_data(QNum, GoI, SI, {gen, opt_le(NewPrepLE, GenNum1)}),
GenNum1 + 1};
(Qd, GenNum1) ->
{Qd, GenNum1}
end,
{Qdata, _} = lists:mapfoldl(F, 1, Qdata2),
Optz1 = no_cache_of_first_generator(Optz0, GenNum),
Optz = Optz1#optz{fast_join = Join},
Prep0#prepared{qh = QLC#qlc{qdata = Qdata, optz = Optz}};
opt_le(#prepared{qh = #qlc_append{hl = HL}}=Prep, GenNum) ->
Hs = [opt_le(H, GenNum) || H <- HL],
Prep#prepared{qh = #qlc_append{hl = Hs}};
opt_le(#prepared{qh = #qlc_sort{h = H}=Sort}=Prep, GenNum) ->
Prep#prepared{qh = Sort#qlc_sort{h = opt_le(H, GenNum)}};
opt_le(Prep, _GenNum) ->
Prep.
no_cache_of_first_generator(Optz, GenNum) when GenNum > 1 ->
Optz;
no_cache_of_first_generator(Optz, 1) ->
Optz#optz{cache = false}.
maybe_sort(LE, QNum, DoSort, Opt) ->
case lists:keysearch(QNum, 1, DoSort) of
{value, {QNum, Col}} ->
#qlc_opt{tmpdir = TmpDir, tmpdir_usage = TmpUsage} = Opt,
SortOpts = [{tmpdir,Dir} || Dir <- [TmpDir], Dir =/= ""],
Sort = #qlc_sort{h = LE, keypos = {keysort, Col}, unique = false,
compressed = [], order = ascending,
fs_opts = SortOpts, tmpdir_usage = TmpUsage,
tmpdir = TmpDir},
#prepared{qh = Sort, sorted = no, join = no};
false ->
LE
end.
skip_lookup_filters(Qdata, []) ->
Qdata;
skip_lookup_filters(Qdata0, LU_SkipFs) ->
[case lists:member(QNum, LU_SkipFs) of
true ->
?qual_data(QNum, GoI, ?SKIP, fil);
false ->
Qd
end || ?qual_data(QNum, GoI, _, _)=Qd <- Qdata0].
%% If the qualifier used for lookup by the join (QNum) has a match
%% specification it must be applied _after_ the lookup join (the
%% filter must not be skipped!).
activate_join_lookup_filter(QNum, Qdata) ->
{value, {_,GoI2,SI2,{gen,Prep2}}} = lists:keysearch(QNum, 1, Qdata),
Table2 = Prep2#prepared.qh,
NPrep2 = Prep2#prepared{qh = Table2#qlc_table{ms = no_match_spec}},
%% Table2#qlc_table.ms has been reset; the filter will be run.
lists:keyreplace(QNum, 1, Qdata, ?qual_data(QNum,GoI2,SI2,{gen,NPrep2})).
opt_join(Join, JoinOption, Qdata, Opt, LU_SkipQuals) ->
%% prep_qlc_lc() assures that no unwanted join is carried out
{Ix0, M0} = Join,
Ix1 = opt_join_lu(Ix0, Qdata, LU_SkipQuals),
Ix = lists:reverse(lists:keysort(2, Ix1)), % prefer to skip
case Ix of
[{{Q1,C1,Q2,C2},Skip,KE,LU_fun} | _] ->
J = #qlc_join{kind = {lookup, KE, LU_fun}, q1 = Q1,
c1 = C1, q2 = Q2, c2 = C2, opt = Opt},
{Q2, J, Skip, []};
[] ->
M = opt_join_merge(M0),
case M of
[{{Q1,C1,Q2,C2},{merge_join,DoSort,KE,Skip}}|_] ->
J = #qlc_join{kind = {merge, KE}, opt = Opt,
q1 = Q1, c1 = C1, q2 = Q2, c2 = C2},
{not_a_qnum, J, Skip, DoSort};
[] when JoinOption =:= nested_loop ->
{not_a_qnum, no, [], []};
_ when JoinOption =/= any ->
erlang:error(cannot_carry_out_join, [JoinOption]);
_ ->
{not_a_qnum, no, [], []}
end
end.
opt_join_lu([{{_Q1,_C1,Q2,_C2}=J,[{lookup_join,_KEols,JKE,Skip0} | _]} | LJ],
Qdata, LU_SkipQuals) ->
{value, {Q2,_,_,{gen,Prep2}}} = lists:keysearch(Q2, 1, Qdata),
#qlc_table{ms = MS, key_equality = KE,
lookup_fun = LU_fun} = Prep2#prepared.qh,
%% If there is no filter to skip (the match spec was derived
%% from a query handle) then the lookup join cannot be done.
case
MS =/= no_match_spec andalso
lists:keymember(Q2, 1, LU_SkipQuals) =:= false
of
true ->
opt_join_lu(LJ, Qdata, LU_SkipQuals);
false ->
%% The join is preferred before evaluating the match spec
%% (if there is one).
Skip = skip_if_possible(JKE, KE, Skip0),
[{J,Skip,KE,LU_fun} | opt_join_lu(LJ, Qdata, LU_SkipQuals)]
end;
opt_join_lu([], _Qdata, _LU_SkipQuals) ->
[].
opt_join_merge(M) ->
%% Prefer not to sort arguments. Prefer to skip join filter.
L = [{-length(DoSort),length(Skip),
{QCs,{merge_join,DoSort,KE,Skip}}} ||
{_KpOrder_or_other,MJ} <- M,
{QCs,{merge_join,DoSort,KE,Skip0}} <- MJ,
Skip <- [skip_if_possible(KE, '==', Skip0)]],
lists:reverse([J || {_,_,J} <- lists:sort(L)]).
%% Cannot skip the join filter the join operator is '=:=' and the join
%% is performed using '=='. Note: the tag 'some'/'all' is not used.
skip_if_possible('=:=', '==', _) ->
[];
skip_if_possible(_, _, {_SkipTag, Skip}) ->
Skip.
%% -> {Objects, Post, LocalPost} | throw()
%% Post is a list of funs (closures) to run afterwards.
%% LocalPost should be run when all objects have been found (optimization).
%% LocalPost will always be a subset of Post.
%% List expressions are evaluated, resulting in lists of objects kept in
%% RAM or on disk.
%% An error term is thrown as soon as cleanup according Post has been
%% done. (This is opposed to errors during evaluation; such errors are
%% returned as terms.)
setup_qlc(Prep, Setup) ->
Post0 = [],
setup_le(Prep, Post0, Setup).
setup_le(#prepared{qh = #simple_qlc{le = LE, optz = Optz}}, Post0, Setup) ->
{Objs, Post, LocalPost} = setup_le(LE, Post0, Setup),
unique_cache(Objs, Post, LocalPost, Optz);
setup_le(#prepared{qh = #qlc{lcf = QFun, qdata = Qdata, init_value = V,
optz = Optz}}, Post0, Setup) ->
{GoTo, FirstState, Post, LocalPost} =
setup_quals(Qdata, Post0, Setup, Optz),
Objs = fun() -> QFun(FirstState, V, GoTo) end,
unique_cache(Objs, Post, LocalPost, Optz);
setup_le(#prepared{qh = #qlc_table{post_fun = PostFun}=Table}, Post, Setup) ->
H = table_handle(Table, Post, Setup),
%% The pre fun has been called from table_handle():
{H, [PostFun | Post], []};
setup_le(#prepared{qh = #qlc_append{hl = PrepL}}, Post0, Setup) ->
F = fun(Prep, {Post1, LPost1}) ->
{Objs, Post2, LPost2} = setup_le(Prep, Post1, Setup),
{Objs, {Post2, LPost1++LPost2}}
end,
{ObjsL, {Post, LocalPost}} = lists:mapfoldl(F, {Post0,[]}, PrepL),
{fun() -> append_loop(ObjsL, 0) end, Post, LocalPost};
setup_le(#prepared{qh = #qlc_sort{h = Prep, keypos = Kp,
unique = Unique, compressed = Compressed,
order = Order, fs_opts = SortOptions0,
tmpdir_usage = TmpUsage,tmpdir = TmpDir}},
Post0, Setup) ->
SortOptions = sort_options_global_tmp(SortOptions0, TmpDir),
LF = fun(Objs) ->
sort_list(Objs, Order, Unique, Kp, SortOptions, Post0)
end,
case setup_le(Prep, Post0, Setup) of
{L, Post, LocalPost} when is_list(L) ->
{LF(L), Post, LocalPost};
{Objs, Post, LocalPost} ->
FF = fun(Objs1) ->
file_sort_handle(Objs1, Kp, SortOptions, TmpDir,
Compressed, Post, LocalPost)
end,
sort_handle(Objs, LF, FF, SortOptions, Post, LocalPost,
{TmpUsage, sorting})
end;
setup_le(#prepared{qh = #qlc_list{l = L, ms = MS}}, Post, _Setup)
when (no_match_spec =:= MS); L =:= [] ->
{L, Post, []};
setup_le(#prepared{qh = #qlc_list{l = L, ms = MS}}, Post, _Setup)
when is_list(L) ->
{ets:match_spec_run(L, ets:match_spec_compile(MS)), Post, []};
setup_le(#prepared{qh = #qlc_list{l = H0, ms = MS}}, Post0, Setup) ->
{Objs0, Post, LocalPost} = setup_le(H0, Post0, Setup),
Objs = ets:match_spec_run(Objs0, ets:match_spec_compile(MS)),
{Objs, Post, LocalPost}.
%% The goto table (a tuple) is created at runtime. It is accessed by
%% the generated code in order to find next clause to execute. For
%% generators there is also a fun; calling the fun runs the list
%% expression of the generator. There are two elements for a filter:
%% the first one is the state to go when the filter is false; the
%% other the state when the filter is true. There are three elements
%% for a generator G: the first one is the state of the generator
%% before G (or the stop state if there is no generator); the second
%% one is the state of the qualifier following the generator (or the
%% template if there is no next generator); the third one is the list
%% expression fun.
%% There are also join generators which are "activated" when it is
%% possbible to do a join.
setup_quals(Qdata, Post0, Setup, Optz) ->
{GoTo0, Post1, LocalPost0} =
setup_quals(0, Qdata, [], Post0, [], Setup),
GoTo1 = lists:keysort(1, GoTo0),
FirstState0 = next_state(Qdata),
{GoTo2, FirstState, Post, LocalPost1} =
case Optz#optz.fast_join of
#qlc_join{kind = {merge,_KE}, c1 = C1, c2 = C2, opt = Opt} = MJ ->
MF = fun(_Rev, {H1, WH1}, {H2, WH2}) ->
fun() ->
merge_join(WH1(H1), C1, WH2(H2), C2, Opt)
end
end,
setup_join(MJ, Qdata, GoTo1, FirstState0, MF, Post1);
#qlc_join{kind = {lookup,_KE,LuF}, c1 = C1, c2 = C2} = LJ ->
LF = fun(Rev, {H1, WH1}, {H2, WH2}) ->
{H, W} = if
Rev -> {H2, WH2};
true -> {H1, WH1}
end,
fun() ->
lookup_join(W(H), C1, LuF, C2, Rev)
end
end,
setup_join(LJ, Qdata, GoTo1, FirstState0, LF, Post1);
no ->
{flat_goto(GoTo1), FirstState0, Post1, []}
end,
GoTo = list_to_tuple(GoTo2),
{GoTo, FirstState, Post, LocalPost0 ++ LocalPost1}.
setup_quals(GenLoopS, [?qual_data(_QNum,GoI,?SKIP,fil) | Qdata],
Gs, P, LP, Setup) ->
%% ?SKIP causes runtime error. See also skip_lookup_filters().
setup_quals(GenLoopS, Qdata, [{GoI,[?SKIP,?SKIP]} | Gs], P, LP, Setup);
setup_quals(GenLoopS, [?qual_data(_QNum,GoI,_SI,fil) | Qdata],
Gs, P, LP, Setup) ->
setup_quals(GenLoopS, Qdata, [{GoI,[GenLoopS,next_state(Qdata)]} | Gs],
P, LP, Setup);
setup_quals(GenLoopS, [?qual_data(_QNum,GoI,_SI, {gen,#join{}}) | Qdata],
Gs, P, LP, Setup) ->
setup_quals(GenLoopS, Qdata, [{GoI,[?SKIP,?SKIP,?SKIP]} | Gs],P,LP,Setup);
setup_quals(GenLoopS, [?qual_data(_QNum,GoI,SI,{gen,LE}) | Qdata],
Gs, P, LP, Setup) ->
{V, NP, LP1} = setup_le(LE, P, Setup),
setup_quals(SI + 1, Qdata, [{GoI, [GenLoopS,next_state(Qdata),V]} | Gs],
NP, LP ++ LP1, Setup);
setup_quals(GenLoopS, [], Gs, P, LP, _Setup) ->
{[{1,[GenLoopS]} | Gs], P, LP}.
%% Finds the qualifier in Qdata that performs the join between Q1 and
%% Q2, and sets it up using the handles already set up for Q1 and Q2.
%% Removes Q1 and Q2 from GoTo0 and updates the join qualifier in GoTo0.
%% Note: the parse transform has given each generator three slots
%% in the GoTo table. The position of these slots within the GoTo table
%% is fixed (at runtime).
%% (Assumes there is only one join-generator in Qdata.)
setup_join(J, Qdata, GoTo0, FirstState0, JoinFun, Post0) ->
#qlc_join{q1 = QNum1a, q2 = QNum2a, opt = Opt} = J,
{?qual_data(_QN,JGoI,JSI,_), Rev, QNum1, QNum2, WH1, WH2, _CsFun} =
find_join_data(Qdata, QNum1a, QNum2a),
[{GoI1,SI1}] = [{GoI,SI} ||
?qual_data(QNum,GoI,SI,_) <- Qdata, QNum =:= QNum1],
[{GoI2,SI2}] = [{GoI,SI} ||
?qual_data(QNum,GoI,SI,_) <- Qdata, QNum =:= QNum2],
[H1] = [H || {GoI,[_Back,_Forth,H]} <- GoTo0, GoI =:= GoI1],
[{BackH2,H2}] =
[{Back,H} || {GoI,[Back,_Forth,H]} <- GoTo0, GoI =:= GoI2],
H0 = JoinFun(Rev, {H1,WH1}, {H2,WH2}),
%% The qlc expression options apply to the introduced qlc expr as well.
{H, Post, LocalPost} =
unique_cache(H0, Post0, [], join_unique_cache(Opt)),
[JBack] = [Back || {GoI,[Back,_,_]} <- GoTo0, GoI =:= GoI1],
JForth = next_after(Qdata, SI1, QNum2),
GoTo1 = lists:map(fun({GoI,_}) when GoI =:= JGoI ->
{JGoI, [JBack, JForth, H]};
({GoI,_}) when GoI =:= GoI1; GoI =:= GoI2 ->
{GoI, [?SKIP,?SKIP,?SKIP]}; % not necessary
(Go) ->
Go
end, GoTo0),
GoTo = lists:map(fun(S) when S =:= SI1 ->
JSI;
(S) when S =:= SI2 ->
next_after(Qdata, S, QNum2);
(S) when S =:= SI1+1 ->
JSI+1;
(S) when S =:= SI2+1, SI1 + 1 =:= BackH2 ->
JSI+1;
(S) when S =:= SI2+1 ->
BackH2;
(S) -> S
end, flat_goto(GoTo1)),
FirstState = if
SI1 =:= FirstState0 -> JSI;
true -> FirstState0
end,
{GoTo, FirstState, Post, LocalPost}.
join_unique_cache(#qlc_opt{cache = Cache, unique = Unique}=Opt) ->
#optz{cache = Cache, unique = Unique, opt = Opt}.
flat_goto(GoTo) ->
lists:flatmap(fun({_,L}) -> L end, GoTo).
next_after([?qual_data(_, _, S, _) | Qdata], S, QNum2) ->
case Qdata of
[?qual_data(QNum2, _, _, _) | Qdata1] ->
next_state(Qdata1);
_ ->
next_state(Qdata)
end;
next_after([_ | Qdata], S, QNum2) ->
next_after(Qdata, S, QNum2).
next_state([?qual_data(_,_,_,{gen,#join{}}) | Qdata]) ->
next_state(Qdata);
next_state([?qual_data(_,_,?SKIP,fil) | Qdata]) ->
%% see skip_lookup_filters()
next_state(Qdata);
next_state([?qual_data(_,_,S,_) | _]) ->
S;
next_state([]) ->
template_state().
find_join_data(Qdata, QNum1, QNum2) ->
[QRev] = [{Q,Rev,QN1,QN2,H1,H2,CsF} ||
?qual_data(_QN,_GoI,_SI,
{gen,#join{q1 = QN1,q2 = QN2,
wh1 = H1, wh2 = H2,
cs_fun = CsF}})= Q <- Qdata,
if
QN1 =:= QNum1, QN2 =:= QNum2 ->
not (Rev = false);
QN1 =:= QNum2, QN2 =:= QNum1 ->
Rev = true;
true ->
Rev = false
end],
QRev.
table_handle(#qlc_table{trav_fun = TraverseFun, trav_MS = TravMS,
pre_fun = PreFun, lookup_fun = LuF,
parent_fun = ParentFun, lu_vals = LuVals, ms = MS},
Post, Setup) ->
#setup{parent = Parent} = Setup,
ParentValue =
if
ParentFun =:= undefined ->
undefined;
Parent =:= self() ->
try
ParentFun()
catch Class:Reason ->
post_funs(Post),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end;
true ->
case monitor_request(Parent, {parent_fun, ParentFun}) of
error -> % parent has died
post_funs(Post),
exit(normal);
{value, Value} ->
Value;
{parent_fun_caught, Class, Reason, Stacktrace} ->
%% No use augmenting Stacktrace here.
post_funs(Post),
erlang:raise(Class, Reason, Stacktrace)
end
end,
StopFun =
if
Parent =:= self() ->
undefined;
true ->
Cursor = #qlc_cursor{c = {self(), Parent}},
fun() -> delete_cursor(Cursor) end
end,
PreFunArgs = [{parent_value, ParentValue}, {stop_fun, StopFun}],
_ = call(PreFun, PreFunArgs, ok, Post),
case LuVals of
{Pos, Vals} when MS =:= no_match_spec ->
LuF(Pos, Vals);
{Pos, Vals} ->
case LuF(Pos, Vals) of
[] ->
[];
Objs when is_list(Objs) ->
ets:match_spec_run(Objs,
ets:match_spec_compile(MS));
Error ->
post_funs(Post),
throw_error(Error)
end;
_ when not TravMS ->
MS = no_match_spec, % assertion
TraverseFun;
_ when MS =:= no_match_spec ->
fun() -> TraverseFun([{'$1',[],['$1']}]) end;
_ ->
fun() -> TraverseFun(MS) end
end.
-define(CHUNK_SIZE, 64*1024).
open_file(FileName, Extra, Post) ->
case file:open(FileName, [read, raw, binary | Extra]) of
{ok, Fd} ->
{fun() ->
case file:position(Fd, bof) of
{ok, 0} ->
TF = fun([], _) ->
[];
(Ts, C) when is_list(Ts) ->
lists:reverse(Ts, C)
end,
file_loop_read(<<>>, ?CHUNK_SIZE, {Fd,FileName}, TF);
Error ->
file_error(FileName, Error)
end
end, Fd};
Error ->
post_funs(Post),
throw_file_error(FileName, Error)
end.
file_loop(Bin0, Fd_FName, Ts0, TF) ->
case
try file_loop2(Bin0, Ts0)
catch _:_ ->
{_Fd, FileName} = Fd_FName,
error({bad_object, FileName})
end
of
{terms, <<Size:4/unit:8, B/bytes>>=Bin, []} ->
file_loop_read(Bin, Size - byte_size(B) + 4, Fd_FName, TF);
{terms, <<Size:4/unit:8, _/bytes>>=Bin, Ts} ->
C = fun() -> file_loop_read(Bin, Size+4, Fd_FName, TF) end,
TF(Ts, C);
{terms, B, Ts} ->
C = fun() -> file_loop_read(B, ?CHUNK_SIZE, Fd_FName, TF) end,
TF(Ts, C);
Error ->
Error
end.
file_loop2(<<Size:4/unit:8, B:Size/bytes, Bin/bytes>>, Ts) ->
file_loop2(Bin, [binary_to_term(B) | Ts]);
file_loop2(Bin, Ts) ->
{terms, Bin, Ts}.
%% After power failures (and only then) files with corrupted Size
%% fields have been observed in a disk_log file. If file:read/2 is
%% asked to read a huge amount of data the emulator may crash. Nothing
%% has been done here to prevent such crashes (by inspecting
%% BytesToRead in some way) since temporary files will never be read
%% after a power failure.
file_loop_read(B, MinBytesToRead, {Fd, FileName}=Fd_FName, TF) ->
BytesToRead = erlang:max(?CHUNK_SIZE, MinBytesToRead),
case file:read(Fd, BytesToRead) of
{ok, Bin} when byte_size(B) =:= 0 ->
file_loop(Bin, Fd_FName, [], TF);
{ok, Bin} ->
case B of
<<Size:4/unit:8, Tl/bytes>>
when byte_size(Bin) + byte_size(Tl) >= Size ->
{B1, B2} = split_binary(Bin, Size - byte_size(Tl)),
Foo = fun([T], Fun) -> [T | Fun] end,
%% TF should be applied exactly once.
case
file_loop(list_to_binary([B, B1]), Fd_FName, [], Foo)
of
[T | Fun] ->
true = is_function(Fun),
file_loop(B2, Fd_FName, [T], TF);
Error ->
Error
end;
_ ->
file_loop(list_to_binary([B, Bin]), Fd_FName, [], TF)
end;
eof when byte_size(B) =:= 0 ->
TF([], foo);
eof ->
error({bad_object, FileName});
Error ->
file_error(FileName, Error)
end.
sort_cursor_input(H, NoObjects) ->
fun(close) ->
ok;
(read) ->
sort_cursor_input_read(H, NoObjects)
end.
sort_cursor_list_output(TmpDir, Z, Unique) ->
fun(close) ->
{terms, []};
({value, NoObjects}) ->
fun(BTerms) when Unique; length(BTerms) =:= NoObjects ->
fun(close) ->
{terms, BTerms};
(BTerms1) ->
sort_cursor_file(BTerms ++ BTerms1, TmpDir, Z)
end;
(BTerms) ->
sort_cursor_file(BTerms, TmpDir, Z)
end
end.
sort_cursor_file(BTerms, TmpDir, Z) ->
FName = tmp_filename(TmpDir),
case file:open(FName, [write, raw, binary | Z]) of
{ok, Fd} ->
WFun = write_terms(FName, Fd),
WFun(BTerms);
Error ->
throw_file_error(FName, Error)
end.
sort_options_global_tmp(S, "") ->
S;
sort_options_global_tmp(S, TmpDir) ->
[{tmpdir,TmpDir} | lists:keydelete(tmpdir, 1, S)].
tmp_filename(TmpDirOpt) ->
U = "_",
Node = node(),
Pid = os:getpid(),
{MSecs,Secs,MySecs} = erlang:now(),
F = lists:concat([?MODULE,U,Node,U,Pid,U,MSecs,U,Secs,U,MySecs]),
TmpDir = case TmpDirOpt of
"" ->
{ok, CurDir} = file:get_cwd(),
CurDir;
TDir ->
TDir
end,
filename:join(filename:absname(TmpDir), F).
write_terms(FileName, Fd) ->
fun(close) ->
_ = file:close(Fd),
{file, FileName};
(BTerms) ->
case file:write(Fd, size_bin(BTerms, [])) of
ok ->
write_terms(FileName, Fd);
Error ->
_ = file:close(Fd),
throw_file_error(FileName, Error)
end
end.
size_bin([], L) ->
L;
size_bin([BinTerm | BinTerms], L) ->
size_bin(BinTerms, [L, <<(byte_size(BinTerm)):4/unit:8>> | BinTerm]).
sort_cursor_input_read([], NoObjects) ->
{end_of_input, NoObjects};
sort_cursor_input_read([Object | Cont], NoObjects) ->
{[term_to_binary(Object)], sort_cursor_input(Cont, NoObjects + 1)};
sort_cursor_input_read(F, NoObjects) ->
case F() of
Objects when is_list(Objects) ->
sort_cursor_input_read(Objects, NoObjects);
Term ->
throw_error(Term)
end.
unique_cache(L, Post, LocalPost, Optz) when is_list(L) ->
case Optz#optz.unique of
true ->
{unique_sort_list(L), Post, LocalPost};
false ->
%% If Optz#optz.cache then an ETS table could be used.
{L, Post, LocalPost}
end;
unique_cache(H, Post, LocalPost, #optz{unique = false, cache = false}) ->
{H, Post, LocalPost};
unique_cache(H, Post, LocalPost, #optz{unique = true, cache = false}) ->
E = ets:new(qlc, [set, private]),
{fun() -> no_dups(H, E) end, [del_table(E) | Post], LocalPost};
unique_cache(H, Post, LocalPost, #optz{unique = false, cache = true}) ->
E = ets:new(qlc, [set, private]),
{L, P} = unique_cache_post(E),
{fun() -> cache(H, E, LocalPost) end, [P | Post], [L]};
unique_cache(H, Post, LocalPost, #optz{unique = true, cache = true}) ->
UT = ets:new(qlc, [bag, private]),
MT = ets:new(qlc, [set, private]),
{L1, P1} = unique_cache_post(UT),
{L2, P2} = unique_cache_post(MT),
{fun() -> ucache(H, UT, MT, LocalPost) end, [P1, P2 | Post], [L1, L2]};
unique_cache(H, Post, LocalPost, #optz{unique = false, cache = list}=Optz) ->
Ref = make_ref(),
F = del_lcache(Ref),
#qlc_opt{tmpdir = TmpDir, max_list = MaxList, tmpdir_usage = TmpUsage} =
Optz#optz.opt,
{fun() -> lcache(H, Ref, LocalPost, TmpDir, MaxList, TmpUsage) end,
[F | Post], [F]};
unique_cache(H, Post0, LocalPost0, #optz{unique = true, cache = list}=Optz) ->
#qlc_opt{tmpdir = TmpDir, max_list = MaxList, tmpdir_usage = TmpUsage} =
Optz#optz.opt,
Size = if
MaxList >= 1 bsl 31 -> (1 bsl 31) - 1;
MaxList =:= 0 -> 1;
true -> MaxList
end,
SortOptions = [{size, Size}, {tmpdir, TmpDir}],
USortOptions = [{unique, true} | SortOptions],
TmpUsageM = {TmpUsage, caching},
LF1 = fun(Objs) -> lists:ukeysort(1, Objs) end,
FF1 = fun(Objs) ->
file_sort_handle(Objs, {keysort, 1}, USortOptions,
TmpDir, [], Post0, LocalPost0)
end,
{UH, Post1, LocalPost1} = sort_handle(tag_objects(H, 1), LF1, FF1,
USortOptions, Post0, LocalPost0,
TmpUsageM),
LF2 = fun(Objs) -> lists:keysort(2, Objs) end,
FF2 = fun(Objs) ->
file_sort_handle(Objs, {keysort, 2}, SortOptions, TmpDir,
[], Post1, LocalPost1)
end,
{SH, Post, LocalPost} =
sort_handle(UH, LF2, FF2, SortOptions, Post1, LocalPost1, TmpUsageM),
if
is_list(SH) ->
%% Remove the tag once and for all.
{untag_objects2(SH), Post, LocalPost};
true ->
%% Every traversal untags the objects...
{fun() -> untag_objects(SH) end, Post, LocalPost}
end.
unique_cache_post(E) ->
{empty_table(E), del_table(E)}.
unique_sort_list(L) ->
E = ets:new(qlc, [set, private]),
unique_list(L, E).
unique_list([], E) ->
true = ets:delete(E),
[];
unique_list([Object | Objects], E) ->
case ets:member(E, Object) of
false ->
true = ets:insert(E, {Object}),
[Object | unique_list(Objects, E)];
true ->
unique_list(Objects, E)
end.
sort_list(L, CFun, true, sort, _SortOptions, _Post) when is_function(CFun) ->
lists:usort(CFun, L);
sort_list(L, CFun, false, sort, _SortOptions, _Post) when is_function(CFun) ->
lists:sort(CFun, L);
sort_list(L, ascending, true, sort, _SortOptions, _Post) ->
lists:usort(L);
sort_list(L, descending, true, sort, _SortOptions, _Post) ->
lists:reverse(lists:usort(L));
sort_list(L, ascending, false, sort, _SortOptions, _Post) ->
lists:sort(L);
sort_list(L, descending, false, sort, _SortOptions, _Post) ->
lists:reverse(lists:sort(L));
sort_list(L, Order, Unique, {keysort, Kp}, _SortOptions, _Post)
when is_integer(Kp), is_atom(Order) ->
case {Order, Unique} of
{ascending, true} ->
lists:ukeysort(Kp, L);
{ascending, false} ->
lists:keysort(Kp, L);
{descending, true} ->
lists:reverse(lists:ukeysort(Kp, L));
{descending, false} ->
lists:reverse(lists:keysort(Kp, L))
end;
sort_list(L, _Order, _Unique, Sort, SortOptions, Post) ->
In = fun(_) -> {L, fun(_) -> end_of_input end} end,
Out = sort_list_output([]),
TSortOptions = [{format,term} | SortOptions],
do_sort(In, Out, Sort, TSortOptions, Post).
sort_list_output(L) ->
fun(close) ->
lists:append(lists:reverse(L));
(Terms) when is_list(Terms) ->
sort_list_output([Terms | L])
end.
%% Don't use the file_sorter unless it is known that objects will be
%% put on a temporary file (optimization).
sort_handle(H, ListFun, FileFun, SortOptions, Post, LocalPost, TmpUsageM) ->
Size = case lists:keysearch(size, 1, SortOptions) of
{value, {size, Size0}} -> Size0;
false -> default_option(size)
end,
sort_cache(H, [], Size, {ListFun, FileFun, Post, LocalPost, TmpUsageM}).
sort_cache([], CL, _Sz, {LF, _FF, Post, LocalPost, _TmpUsageM}) ->
{LF(lists:reverse(CL)), Post, LocalPost};
sort_cache(Objs, CL, Sz, C) when Sz < 0 ->
sort_cache2(Objs, CL, false, C);
sort_cache([Object | Cont], CL, Sz0, C) ->
Sz = decr_list_size(Sz0, Object),
sort_cache(Cont, [Object | CL], Sz, C);
sort_cache(F, CL, Sz, C) ->
case F() of
Objects when is_list(Objects) ->
sort_cache(Objects, CL, Sz, C);
Term ->
{_LF, _FF, Post, _LocalPost, _TmpUsageM} = C,
post_funs(Post),
throw_error(Term)
end.
sort_cache2([], CL, _X, {LF, _FF, Post, LocalPost, _TmpUsageM}) ->
{LF(lists:reverse(CL)), Post, LocalPost};
sort_cache2([Object | Cont], CL, _, C) ->
sort_cache2(Cont, [Object | CL], true, C);
sort_cache2(F, CL, false, C) ->
%% Find one extra object to be sure that temporary file(s) will be
%% used when calling the file_sorter. This works even if
%% duplicates are removed.
case F() of
Objects when is_list(Objects) ->
sort_cache2(Objects, CL, true, C);
Term ->
{_LF, _FF, Post, _LocalPost, _TmpUsageM} = C,
post_funs(Post),
throw_error(Term)
end;
sort_cache2(_Cont, _CL, true, {_LF,_FF,Post,_LocalPost, {not_allowed,M}}) ->
post_funs(Post),
throw_reason({tmpdir_usage, M});
sort_cache2(Cont, CL, true, {_LF, FF, _Post, _LocalPost, {TmpUsage, M}}) ->
maybe_error_logger(TmpUsage, M),
FF(lists:reverse(CL, Cont)).
file_sort_handle(H, Kp, SortOptions, TmpDir, Compressed, Post, LocalPost) ->
In = sort_cursor_input(H, 0),
Unique = lists:member(unique, SortOptions)
orelse
lists:keymember(unique, 1, SortOptions),
Out = sort_cursor_list_output(TmpDir, Compressed, Unique),
Reply = do_sort(In, Out, Kp, SortOptions, Post),
case Reply of
{file, FileName} ->
{F, Fd} = open_file(FileName, Compressed, Post),
P = fun() -> _ = file:close(Fd),
_ = file:delete(FileName)
end,
{F, [P | Post], LocalPost};
{terms, BTerms} ->
try
{[binary_to_term(B) || B <- BTerms], Post, LocalPost}
catch Class:Reason ->
post_funs(Post),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end
end.
do_sort(In, Out, Sort, SortOptions, Post) ->
try
case do_sort(In, Out, Sort, SortOptions) of
{error, Reason} -> throw_reason(Reason);
Reply -> Reply
end
catch Class:Term ->
post_funs(Post),
erlang:raise(Class, Term, erlang:get_stacktrace())
end.
do_sort(In, Out, sort, SortOptions) ->
file_sorter:sort(In, Out, SortOptions);
do_sort(In, Out, {keysort, KeyPos}, SortOptions) ->
file_sorter:keysort(KeyPos, In, Out, SortOptions).
del_table(Ets) ->
fun() -> true = ets:delete(Ets) end.
empty_table(Ets) ->
fun() -> true = ets:delete_all_objects(Ets) end.
append_loop([[_ | _]=L], _N) ->
L;
append_loop([F], _N) ->
F();
append_loop([L | Hs], N) ->
append_loop(L, N, Hs).
append_loop([], N, Hs) ->
append_loop(Hs, N);
append_loop([Object | Cont], N, Hs) ->
[Object | append_loop(Cont, N + 1, Hs)];
append_loop(F, 0, Hs) ->
case F() of
[] ->
append_loop(Hs, 0);
[Object | Cont] ->
[Object | append_loop(Cont, 1, Hs)];
Term ->
Term
end;
append_loop(F, _N, Hs) -> % when _N > 0
fun() -> append_loop(F, 0, Hs) end.
no_dups([]=Cont, UTab) ->
true = ets:delete_all_objects(UTab),
Cont;
no_dups([Object | Cont], UTab) ->
case ets:member(UTab, Object) of
false ->
true = ets:insert(UTab, {Object}),
%% A fun is created here, even if Cont is a list; objects
%% will not be copied to the ETS table unless requested.
[Object | fun() -> no_dups(Cont, UTab) end];
true ->
no_dups(Cont, UTab)
end;
no_dups(F, UTab) ->
case F() of
Objects when is_list(Objects) ->
no_dups(Objects, UTab);
Term ->
Term
end.
%% When all objects have been returned from a cached QLC, the
%% generators of the expression will never be called again, and so the
%% tables used by the generators (LocalPost) can be emptied.
cache(H, MTab, LocalPost) ->
case ets:member(MTab, 0) of
false ->
true = ets:insert(MTab, {0}),
cache(H, MTab, 1, LocalPost);
true ->
cache_recall(MTab, 1)
end.
cache([]=Cont, _MTab, _SeqNo, LocalPost) ->
local_post(LocalPost),
Cont;
cache([Object | Cont], MTab, SeqNo, LocalPost) ->
true = ets:insert(MTab, {SeqNo, Object}),
%% A fun is created here, even if Cont is a list; objects
%% will not be copied to the ETS table unless requested.
[Object | fun() -> cache(Cont, MTab, SeqNo + 1, LocalPost) end];
cache(F, MTab, SeqNo, LocalPost) ->
case F() of
Objects when is_list(Objects) ->
cache(Objects, MTab, SeqNo, LocalPost);
Term ->
Term
end.
cache_recall(MTab, SeqNo) ->
case ets:lookup(MTab, SeqNo) of
[]=Cont ->
Cont;
[{SeqNo, Object}] ->
[Object | fun() -> cache_recall(MTab, SeqNo + 1) end]
end.
ucache(H, UTab, MTab, LocalPost) ->
case ets:member(MTab, 0) of
false ->
true = ets:insert(MTab, {0}),
ucache(H, UTab, MTab, 1, LocalPost);
true ->
ucache_recall(UTab, MTab, 1)
end.
ucache([]=Cont, _UTab, _MTab, _SeqNo, LocalPost) ->
local_post(LocalPost),
Cont;
ucache([Object | Cont], UTab, MTab, SeqNo, LocalPost) ->
%% Always using 28 bits hash value...
Hash = erlang:phash2(Object),
case ets:lookup(UTab, Hash) of
[] ->
ucache3(Object, Cont, Hash, UTab, MTab, SeqNo, LocalPost);
HashSeqObjects ->
case lists:keymember(Object, 3, HashSeqObjects) of
true ->
ucache(Cont, UTab, MTab, SeqNo, LocalPost);
false ->
ucache3(Object, Cont, Hash, UTab, MTab, SeqNo, LocalPost)
end
end;
ucache(F, UTab, MTab, SeqNo, LocalPost) ->
case F() of
Objects when is_list(Objects) ->
ucache(Objects, UTab, MTab, SeqNo, LocalPost);
Term ->
Term
end.
ucache3(Object, Cont, Hash, UTab, MTab, SeqNo, LocalPost) ->
true = ets:insert(UTab, {Hash, SeqNo, Object}),
true = ets:insert(MTab, {SeqNo, Hash}),
%% A fun is created here, even if Cont is a list; objects
%% will not be copied to the ETS table unless requested.
[Object | fun() -> ucache(Cont, UTab, MTab, SeqNo+1, LocalPost) end].
ucache_recall(UTab, MTab, SeqNo) ->
case ets:lookup(MTab, SeqNo) of
[]=Cont ->
Cont;
[{SeqNo, Hash}] ->
Object = case ets:lookup(UTab, Hash) of
[{Hash, SeqNo, Object0}] -> Object0;
HashSeqObjects ->
{value, {Hash, SeqNo, Object0}} =
lists:keysearch(SeqNo, 2, HashSeqObjects),
Object0
end,
[Object | fun() -> ucache_recall(UTab, MTab, SeqNo + 1) end]
end.
-define(LCACHE_FILE(Ref), {Ref, '$_qlc_cache_tmpfiles_'}).
lcache(H, Ref, LocalPost, TmpDir, MaxList, TmpUsage) ->
Key = ?LCACHE_FILE(Ref),
case get(Key) of
undefined ->
lcache1(H, {Key, LocalPost, TmpDir, MaxList, TmpUsage},
MaxList, []);
{file, _Fd, _TmpFile, F} ->
F();
L when is_list(L) ->
L
end.
lcache1([]=Cont, {Key, LocalPost, _TmpDir, _MaxList, _TmpUsage}, _Sz, Acc) ->
local_post(LocalPost),
case get(Key) of
undefined ->
put(Key, lists:reverse(Acc)),
Cont;
{file, Fd, TmpFile, _F} ->
case lcache_write(Fd, TmpFile, Acc) of
ok ->
Cont;
Error ->
Error
end
end;
lcache1(H, State, Sz, Acc) when Sz < 0 ->
{Key, LocalPost, TmpDir, MaxList, TmpUsage} = State,
GetFile =
case get(Key) of
{file, Fd0, TmpFile, _F} ->
{TmpFile, Fd0};
undefined when TmpUsage =:= not_allowed ->
error({tmpdir_usage, caching});
undefined ->
maybe_error_logger(TmpUsage, caching),
FName = tmp_filename(TmpDir),
{F, Fd0} = open_file(FName, [write], LocalPost),
put(Key, {file, Fd0, FName, F}),
{FName, Fd0}
end,
case GetFile of
{FileName, Fd} ->
case lcache_write(Fd, FileName, Acc) of
ok ->
lcache1(H, State, MaxList, []);
Error ->
Error
end;
Error ->
Error
end;
lcache1([Object | Cont], State, Sz0, Acc) ->
Sz = decr_list_size(Sz0, Object),
[Object | lcache2(Cont, State, Sz, [Object | Acc])];
lcache1(F, State, Sz, Acc) ->
case F() of
Objects when is_list(Objects) ->
lcache1(Objects, State, Sz, Acc);
Term ->
Term
end.
lcache2([Object | Cont], State, Sz0, Acc) when Sz0 >= 0 ->
Sz = decr_list_size(Sz0, Object),
[Object | lcache2(Cont, State, Sz, [Object | Acc])];
lcache2(Cont, State, Sz, Acc) ->
fun() -> lcache1(Cont, State, Sz, Acc) end.
lcache_write(Fd, FileName, L) ->
write_binary_terms(t2b(L, []), Fd, FileName).
t2b([], Bs) ->
Bs;
t2b([T | Ts], Bs) ->
t2b(Ts, [term_to_binary(T) | Bs]).
del_lcache(Ref) ->
fun() ->
Key = ?LCACHE_FILE(Ref),
case get(Key) of
undefined ->
ok;
{file, Fd, TmpFile, _F} ->
_ = file:close(Fd),
_ = file:delete(TmpFile),
erase(Key);
_L ->
erase(Key)
end
end.
tag_objects([Object | Cont], T) ->
[{Object, T} | tag_objects2(Cont, T + 1)];
tag_objects([]=Cont, _T) ->
Cont;
tag_objects(F, T) ->
case F() of
Objects when is_list(Objects) ->
tag_objects(Objects, T);
Term ->
Term
end.
tag_objects2([Object | Cont], T) ->
[{Object, T} | tag_objects2(Cont, T + 1)];
tag_objects2(Objects, T) ->
fun() -> tag_objects(Objects, T) end.
untag_objects([]=Objs) ->
Objs;
untag_objects([{Object, _N} | Cont]) ->
[Object | untag_objects2(Cont)];
untag_objects(F) ->
case F() of
Objects when is_list(Objects) ->
untag_objects(Objects);
Term -> % Cannot happen
Term
end.
untag_objects2([{Object, _N} | Cont]) ->
[Object | untag_objects2(Cont)];
untag_objects2([]=Cont) ->
Cont;
untag_objects2(Objects) ->
fun() -> untag_objects(Objects) end.
%%% Merge join.
%%% Temporary files are used when many objects have the same key.
-define(JWRAP(E1, E2), [E1 | E2]).
-record(m, {id, tmpdir, max_list, tmp_usage}).
merge_join([]=Cont, _C1, _T2, _C2, _Opt) ->
Cont;
merge_join([E1 | L1], C1, L2, C2, Opt) ->
#qlc_opt{tmpdir = TmpDir, max_list = MaxList,
tmpdir_usage = TmpUsage} = Opt,
M = #m{id = merge_join_id(), tmpdir = TmpDir, max_list = MaxList,
tmp_usage = TmpUsage},
merge_join2(E1, element(C1, E1), L1, C1, L2, C2, M);
merge_join(F1, C1, L2, C2, Opt) ->
case F1() of
L1 when is_list(L1) ->
merge_join(L1, C1, L2, C2, Opt);
T1 ->
T1
end.
merge_join1(_E2, _K2, []=Cont, _C1, _L2, _C2, M) ->
end_merge_join(Cont, M);
merge_join1(E2, K2, [E1 | L1], C1, L2, C2, M) ->
K1 = element(C1, E1),
if
K1 == K2 ->
same_keys2(E1, K1, L1, C1, L2, C2, E2, M);
K1 > K2 ->
merge_join2(E1, K1, L1, C1, L2, C2, M);
true -> % K1 < K2
merge_join1(E2, K2, L1, C1, L2, C2, M)
end;
merge_join1(E2, K2, F1, C1, L2, C2, M) ->
case F1() of
L1 when is_list(L1) ->
merge_join1(E2, K2, L1, C1, L2, C2, M);
T1 ->
T1
end.
merge_join2(_E1, _K1, _L1, _C1, []=Cont, _C2, M) ->
end_merge_join(Cont, M);
merge_join2(E1, K1, L1, C1, [E2 | L2], C2, M) ->
K2 = element(C2, E2),
if
K1 == K2 ->
same_keys2(E1, K1, L1, C1, L2, C2, E2, M);
K1 > K2 ->
merge_join2(E1, K1, L1, C1, L2, C2, M);
true -> % K1 < K2
merge_join1(E2, K2, L1, C1, L2, C2, M)
end;
merge_join2(E1, K1, L1, C1, F2, C2, M) ->
case F2() of
L2 when is_list(L2) ->
merge_join2(E1, K1, L1, C1, L2, C2, M);
T2 ->
T2
end.
%% element(C2, E2_0) == K1
same_keys2(E1, K1, L1, C1, [], _C2, E2_0, M) ->
Cont = fun(_L1b) -> end_merge_join([], M) end,
loop_same_keys(E1, K1, L1, C1, [E2_0], Cont, M);
same_keys2(E1, K1, L1, C1, [E2 | L2]=L2_0, C2, E2_0, M) ->
K2 = element(C2, E2),
if
K1 == K2 ->
same_keys1(E1, K1, L1, C1, E2, C2, E2_0, L2, M);
K1 < K2 ->
[?JWRAP(E1, E2_0) |
fun() -> same_loop1(L1, K1, C1, E2_0, L2_0, C2, M) end]
end;
same_keys2(E1, K1, L1, C1, F2, C2, E2_0, M) ->
case F2() of
L2 when is_list(L2) ->
same_keys2(E1, K1, L1, C1, L2, C2, E2_0, M);
T2 ->
Cont = fun(_L1b) -> T2 end,
loop_same_keys(E1, K1, L1, C1, [E2_0], Cont, M)
end.
same_loop1([], _K1_0, _C1, _E2_0, _L2, _C2, M) ->
end_merge_join([], M);
same_loop1([E1 | L1], K1_0, C1, E2_0, L2, C2, M) ->
K1 = element(C1, E1),
if
K1 == K1_0 ->
[?JWRAP(E1, E2_0) |
fun() -> same_loop1(L1, K1_0, C1, E2_0, L2, C2, M) end];
K1_0 < K1 ->
merge_join2(E1, K1, L1, C1, L2, C2, M)
end;
same_loop1(F1, K1_0, C1, E2_0, L2, C2, M) ->
case F1() of
L1 when is_list(L1) ->
same_loop1(L1, K1_0, C1, E2_0, L2, C2, M);
T1 ->
T1
end.
%% element(C2, E2_0) == K1, element(C2, E2) == K1_0
same_keys1(E1_0, K1_0, []=L1, C1, E2, C2, E2_0, L2, M) ->
[?JWRAP(E1_0, E2_0), ?JWRAP(E1_0, E2) |
fun() -> same_keys(K1_0, E1_0, L1, C1, L2, C2, M) end];
same_keys1(E1_0, K1_0, [E1 | _]=L1, C1, E2, C2, E2_0, L2, M) ->
K1 = element(C1, E1),
if
K1_0 == K1 ->
E2s = [E2, E2_0],
Sz0 = decr_list_size(M#m.max_list, E2s),
same_keys_cache(E1_0, K1_0, L1, C1, L2, C2, E2s, Sz0, M);
K1_0 < K1 ->
[?JWRAP(E1_0, E2_0), ?JWRAP(E1_0, E2) |
fun() -> same_keys(K1_0, E1_0, L1, C1, L2, C2, M) end]
end;
same_keys1(E1_0, K1_0, F1, C1, E2, C2, E2_0, L2, M) ->
case F1() of
L1 when is_list(L1) ->
same_keys1(E1_0, K1_0, L1, C1, E2, C2, E2_0, L2, M);
T1 ->
Cont = fun() -> T1 end,
loop_same(E1_0, [E2, E2_0], Cont)
end.
%% There is no such element E in L1 such that element(C1, E) == K1.
same_keys(_K1, _E1, _L1, _C1, []=Cont, _C2, M) ->
end_merge_join(Cont, M);
same_keys(K1, E1, L1, C1, [E2 | L2], C2, M) ->
K2 = element(C2, E2),
if
K1 == K2 ->
[?JWRAP(E1, E2) |
fun() -> same_keys(K1, E1, L1, C1, L2, C2, M) end];
K1 < K2 ->
merge_join1(E2, K2, L1, C1, L2, C2, M)
end;
same_keys(K1, E1, L1, C1, F2, C2, M) ->
case F2() of
L2 when is_list(L2) ->
same_keys(K1, E1, L1, C1, L2, C2, M);
T2 ->
T2
end.
%% There are at least two elements in [E1 | L1] that are to be combined
%% with the elements in E2s (length(E2s) > 1). This loop covers the case
%% when all elements in E2 with key K1 can be kept in RAM.
same_keys_cache(E1, K1, L1, C1, [], _C2, E2s, _Sz, M) ->
Cont = fun(_L1b) -> end_merge_join([], M) end,
loop_same_keys(E1, K1, L1, C1, E2s, Cont, M);
same_keys_cache(E1, K1, L1, C1, L2, C2, E2s, Sz0, M) when Sz0 < 0 ->
case init_merge_join(M) of
ok ->
Sz = M#m.max_list,
C = fun() ->
same_keys_file(E1, K1, L1, C1, L2, C2, [], Sz, M)
end,
write_same_keys(E1, E2s, M, C);
Error ->
Error
end;
same_keys_cache(E1, K1, L1, C1, [E2 | L2], C2, E2s, Sz0, M) ->
K2 = element(C2, E2),
if
K1 == K2 ->
Sz = decr_list_size(Sz0, E2),
same_keys_cache(E1, K1, L1, C1, L2, C2, [E2 | E2s], Sz, M);
K1 < K2 ->
Cont = fun(L1b) -> merge_join1(E2, K2, L1b, C1, L2, C2, M) end,
loop_same_keys(E1, K1, L1, C1, E2s, Cont, M)
end;
same_keys_cache(E1, K1, L1, C1, F2, C2, E2s, Sz, M) ->
case F2() of
L2 when is_list(L2) ->
same_keys_cache(E1, K1, L1, C1, L2, C2, E2s, Sz, M);
T2 ->
Cont = fun(_L1b) -> T2 end,
loop_same_keys(E1, K1, L1, C1, E2s, Cont, M)
end.
%% E2s holds all elements E2 in L2 such that element(E2, C2) == K1.
loop_same_keys(E1, _K1, [], _C1, E2s, _Cont, M) ->
end_merge_join(loop_same(E1, E2s, []), M);
loop_same_keys(E1, K1, L1, C1, E2s, Cont, M) ->
loop_same(E1, E2s, fun() -> loop_keys(K1, L1, C1, E2s, Cont, M) end).
loop_same(_E1, [], L) ->
L;
loop_same(E1, [E2 | E2s], L) ->
loop_same(E1, E2s, [?JWRAP(E1, E2) | L]).
loop_keys(K, [E1 | L1]=L1_0, C1, E2s, Cont, M) ->
K1 = element(C1, E1),
if
K1 == K ->
loop_same_keys(E1, K1, L1, C1, E2s, Cont, M);
K1 > K ->
Cont(L1_0)
end;
loop_keys(_K, []=L1, _C1, _Es2, Cont, _M) ->
Cont(L1);
loop_keys(K, F1, C1, E2s, Cont, M) ->
case F1() of
L1 when is_list(L1) ->
loop_keys(K, L1, C1, E2s, Cont, M);
T1 ->
T1
end.
%% This is for the case when a temporary file has to be used.
same_keys_file(E1, K1, L1, C1, [], _C2, E2s, _Sz, M) ->
Cont = fun(_L1b) -> end_merge_join([], M) end,
same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont);
same_keys_file(E1, K1, L1, C1, L2, C2, E2s, Sz0, M) when Sz0 < 0 ->
Sz = M#m.max_list,
C = fun() -> same_keys_file(E1, K1, L1, C1, L2, C2, [], Sz, M) end,
write_same_keys(E1, E2s, M, C);
same_keys_file(E1, K1, L1, C1, [E2 | L2], C2, E2s, Sz0, M) ->
K2 = element(C2, E2),
if
K1 == K2 ->
Sz = decr_list_size(Sz0, E2),
same_keys_file(E1, K1, L1, C1, L2, C2, [E2 | E2s], Sz, M);
K1 < K2 ->
Cont = fun(L1b) ->
%% The temporary file could be truncated here.
merge_join1(E2, K2, L1b, C1, L2, C2, M)
end,
same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont)
end;
same_keys_file(E1, K1, L1, C1, F2, C2, E2s, Sz, M) ->
case F2() of
L2 when is_list(L2) ->
same_keys_file(E1, K1, L1, C1, L2, C2, E2s, Sz, M);
T2 ->
Cont = fun(_L1b) -> T2 end,
same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont)
end.
same_keys_file_write(E1, K1, L1, C1, E2s, M, Cont) ->
C = fun() -> loop_keys_file(K1, L1, C1, Cont, M) end,
write_same_keys(E1, E2s, M, C).
write_same_keys(_E1, [], _M, Cont) ->
Cont();
write_same_keys(E1, Es2, M, Cont) ->
write_same_keys(E1, Es2, M, [], Cont).
%% Avoids one (the first) traversal of the temporary file.
write_same_keys(_E1, [], M, E2s, Objs) ->
case write_merge_join(M, E2s) of
ok -> Objs;
Error -> Error
end;
write_same_keys(E1, [E2 | E2s0], M, E2s, Objs) ->
BE2 = term_to_binary(E2),
write_same_keys(E1, E2s0, M, [BE2 | E2s], [?JWRAP(E1, E2) | Objs]).
loop_keys_file(K, [E1 | L1]=L1_0, C1, Cont, M) ->
K1 = element(C1, E1),
if
K1 == K ->
C = fun() -> loop_keys_file(K1, L1, C1, Cont, M) end,
read_merge_join(M, E1, C);
K1 > K ->
Cont(L1_0)
end;
loop_keys_file(_K, []=L1, _C1, Cont, _M) ->
Cont(L1);
loop_keys_file(K, F1, C1, Cont, M) ->
case F1() of
L1 when is_list(L1) ->
loop_keys_file(K, L1, C1, Cont, M);
T1 ->
T1
end.
end_merge_join(Reply, M) ->
end_merge_join(M),
Reply.
%% Normally post_funs() cleans up temporary files by calling funs in
%% Post. It seems impossible to do that with the temporary file(s)
%% used when many objects have the same key--such a file is created
%% after the setup when Post is prepared. There seems to be no real
%% alternative to using the process dictionary, at least as things
%% have been implemented so far. Probably all of Post could have been
%% put in the process dictionary...
-define(MERGE_JOIN_FILE, '$_qlc_merge_join_tmpfiles_').
init_merge_join(#m{id = MergeId, tmpdir = TmpDir, tmp_usage = TmpUsage}) ->
case tmp_merge_file(MergeId) of
{Fd, FileName} ->
case file:position(Fd, bof) of
{ok, 0} ->
case file:truncate(Fd) of
ok ->
ok;
Error ->
file_error(FileName, Error)
end;
Error ->
file_error(FileName, Error)
end;
none when TmpUsage =:= not_allowed ->
error({tmpdir_usage, joining});
none ->
maybe_error_logger(TmpUsage, joining),
FName = tmp_filename(TmpDir),
case file:open(FName, [raw, binary, read, write]) of
{ok, Fd} ->
TmpFiles = get(?MERGE_JOIN_FILE),
put(?MERGE_JOIN_FILE, [{MergeId, Fd, FName} | TmpFiles]),
ok;
Error ->
file_error(FName, Error)
end
end.
write_merge_join(#m{id = MergeId}, BTerms) ->
{Fd, FileName} = tmp_merge_file(MergeId),
write_binary_terms(BTerms, Fd, FileName).
read_merge_join(#m{id = MergeId}, E1, Cont) ->
{Fd, FileName} = tmp_merge_file(MergeId),
case file:position(Fd, bof) of
{ok, 0} ->
Fun = fun([], _) ->
Cont();
(Ts, C) when is_list(Ts) ->
join_read_terms(E1, Ts, C)
end,
file_loop_read(<<>>, ?CHUNK_SIZE, {Fd, FileName}, Fun);
Error ->
file_error(FileName, Error)
end.
join_read_terms(_E1, [], Objs) ->
Objs;
join_read_terms(E1, [E2 | E2s], Objs) ->
join_read_terms(E1, E2s, [?JWRAP(E1, E2) | Objs]).
end_merge_join(#m{id = MergeId}) ->
case tmp_merge_file(MergeId) of
none ->
ok;
{Fd, FileName} ->
_ = file:close(Fd),
_ = file:delete(FileName),
put(?MERGE_JOIN_FILE,
lists:keydelete(MergeId, 1, get(?MERGE_JOIN_FILE)))
end.
end_all_merge_joins() ->
lists:foreach(
fun(Id) -> end_merge_join(#m{id = Id}) end,
[Id || {Id, _Fd, _FileName} <- lists:flatten([get(?MERGE_JOIN_FILE)])]),
erase(?MERGE_JOIN_FILE).
merge_join_id() ->
case get(?MERGE_JOIN_FILE) of
undefined ->
put(?MERGE_JOIN_FILE, []);
_ ->
ok
end,
make_ref().
tmp_merge_file(MergeId) ->
TmpFiles = get(?MERGE_JOIN_FILE),
case lists:keysearch(MergeId, 1, TmpFiles) of
{value, {MergeId, Fd, FileName}} ->
{Fd, FileName};
false ->
none
end.
decr_list_size(Sz0, E) when is_integer(Sz0) ->
Sz0 - erlang:external_size(E).
%%% End of merge join.
lookup_join([E1 | L1], C1, LuF, C2, Rev) ->
K1 = element(C1, E1),
case LuF(C2, [K1]) of
[] ->
lookup_join(L1, C1, LuF, C2, Rev);
[E2] when Rev ->
[?JWRAP(E2, E1) | fun() -> lookup_join(L1, C1, LuF, C2, Rev) end];
[E2] ->
[?JWRAP(E1, E2) | fun() -> lookup_join(L1, C1, LuF, C2, Rev) end];
E2s when is_list(E2s), Rev ->
[?JWRAP(E2, E1) || E2 <- E2s] ++
fun() -> lookup_join(L1, C1, LuF, C2, Rev) end;
E2s when is_list(E2s) ->
[?JWRAP(E1, E2) || E2 <- E2s] ++
fun() -> lookup_join(L1, C1, LuF, C2, Rev) end;
Term ->
Term
end;
lookup_join([]=Cont, _C1, _LuF, _C2, _Rev) ->
Cont;
lookup_join(F1, C1, LuF, C2, Rev) ->
case F1() of
L1 when is_list(L1) ->
lookup_join(L1, C1, LuF, C2, Rev);
T1 ->
T1
end.
maybe_error_logger(allowed, _) ->
ok;
maybe_error_logger(Name, Why) ->
[_, _, {?MODULE,maybe_error_logger,_} | Stacktrace] = expand_stacktrace(),
Trimmer = fun(M, _F, _A) -> M =:= erl_eval end,
Formater = fun(Term, I) -> io_lib:print(Term, I, 80, -1) end,
X = lib:format_stacktrace(1, Stacktrace, Trimmer, Formater),
error_logger:Name("qlc: temporary file was needed for ~w\n~s\n",
[Why, lists:flatten(X)]).
expand_stacktrace() ->
D = erlang:system_flag(backtrace_depth, 8),
try
%% Compensate for a bug in erlang:system_flag/2:
expand_stacktrace(erlang:max(1, D))
after
erlang:system_flag(backtrace_depth, D)
end.
expand_stacktrace(D) ->
_ = erlang:system_flag(backtrace_depth, D),
{'EXIT', {foo, Stacktrace}} = (catch erlang:error(foo)),
L = lists:takewhile(fun({M,_,_}) -> M =/= ?MODULE
end, lists:reverse(Stacktrace)),
if
length(L) < 3 andalso length(Stacktrace) =:= D ->
expand_stacktrace(D + 5);
true ->
Stacktrace
end.
write_binary_terms(BTerms, Fd, FileName) ->
case file:write(Fd, size_bin(BTerms, [])) of
ok ->
ok;
Error ->
file_error(FileName, Error)
end.
post_funs(L) ->
end_all_merge_joins(),
local_post(L).
local_post(L) ->
lists:foreach(fun(undefined) -> ok;
(F) -> catch (F)()
end, L).
call(undefined, _Arg, Default, _Post) ->
Default;
call(Fun, Arg, _Default, Post) ->
try
Fun(Arg)
catch Class:Reason ->
post_funs(Post),
erlang:raise(Class, Reason, erlang:get_stacktrace())
end.
grd(undefined, _Arg) ->
false;
grd(Fun, Arg) ->
case Fun(Arg) of
true ->
true;
_ ->
false
end.
family(L) ->
sofs:to_external(sofs:relation_to_family(sofs:relation(L))).
family_union(L) ->
R = sofs:relation(L,[{atom,[atom]}]),
sofs:to_external(sofs:family_union(sofs:relation_to_family(R))).
file_error(File, {error, Reason}) ->
error({file_error, File, Reason}).
-spec throw_file_error(string(), {'error',atom()}) -> no_return().
throw_file_error(File, {error, Reason}) ->
throw_reason({file_error, File, Reason}).
-spec throw_reason(term()) -> no_return().
throw_reason(Reason) ->
throw_error(error(Reason)).
-spec throw_error(term()) -> no_return().
throw_error(Error) ->
throw(Error).
error(Reason) ->
{error, ?MODULE, Reason}.