Can this be achieved with EnumerableNestedBatchJoin? Would need to make the
JdbcFilterRule and it's relation handle Correlation Variables so they can
push down the filter's into the JDBC and Elastic Search RPCs. The
EnumerableNestedBatchJoinRule [1] pushes a filter relation on top of the
right hand scan. And if JdbcFilterRule translates Correlation Variables
into Conditions into the JDBC query string I think you can make it all
work.

I spent this past week doing something similar though not precisely this
thing. I wrote our own Rule and Relation that calls correlateBatchJoin [3]
directly

[1]
https://github.com/apache/calcite/blob/b80bb1cbceb11ed31b73e419916b5cc98610503e/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableBatchNestedLoopJoinRule.java#L124-L126
[2]
https://github.com/apache/calcite/blob/b80bb1cbceb11ed31b73e419916b5cc98610503e/core/src/main/java/org/apache/calcite/rex/RexCorrelVariable.java
[3]
https://github.com/apache/calcite/blob/b80bb1cbceb11ed31b73e419916b5cc98610503e/linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java#L1492

On Sat, Mar 7, 2020 at 7:11 AM Stamatis Zampetakis <[email protected]>
wrote:

> Hi Yang,
>
> Another term that is used for the optimization that you mention is
> "selective join pushdown" which essentially relies on Bloom/Cuckoo and
> other probabilistic filters. You can check [1] for more details about this
> kind of techniques.
>
> In the example that you outlined between JDBC and Elastic maybe you could
> achieve the same result with a slightly different approach by using a
> correlated join. If the scan + filter on Elastic does not bring back many
> results then you could use this results to probe the JDBC datasource. For
> more details check the discussion in [2], I think it refers to the same
> problem.
>
> Best,
> Stamatis
>
> [1] http://www.vldb.org/pvldb/vol12/p502-lang.pdf
> [2]
>
> https://lists.apache.org/thread.html/d9f95683e66009872a53e7e617295158b98746b550d2bf68230b3096%40%3Cdev.calcite.apache.org%3E
>
> On Sat, Mar 7, 2020 at 4:16 AM Yang Liu <[email protected]> wrote:
>
> > Thanks all!
> >
> > @Julian is the “split processing into phases” you are referring to like
> > this?
> >
> > with t1 as (select * from es_table where xxx limit xxx);
> > select * from t2 join t1 on (t2.key = t1.key) where t2.key in (select key
> > from t1)
> >
> > which means the SQL writer need to adapt to this specific form of SQL for
> > better performance? And Calcite will cache the t1 right?
> >
> > Or, maybe I can implement a RelRunner or EnumerableHashJoin myself to
> have
> > the specific rule: the query result of right table can be used as filters
> > for the left table?
> >
> > Thanks!
> >
> >
> > Julian Hyde <[email protected]> 于2020年3月7日周六 上午1:48写道:
> >
> > > Runtime optimization is always necessary, because you just don’t have
> the
> > > stats until you run the query. The best DB algorithms are adaptive, and
> > > therefore hard to write. The adaptations require a lot of tricky
> support
> > > from the runtime - e.g. propagating bloom filters against the flow of
> > data.
> > >
> > > Calcite can still help a little.
> > >
> > > One runtime optimization is where you split processing into phases.
> Only
> > > optimize the first part of your query. Build temp tables, analyze them,
> > and
> > > use those stats to optimize the second part of your query.
> > >
> > > Another technique is to gather stats when as you run the query today,
> so
> > > that when you run it tomorrow Calcite can do a better job.
> > >
> > > Julian
> > >
> > >
> > > > On Mar 6, 2020, at 5:52 AM, Danny Chan <[email protected]> wrote:
> > > >
> > > > Sorry to tell that Calcite runtime does not support this, the
> "dynamic
> > > > partition pruning" or "runtime filter" called in Impala, would build
> a
> > > > bloom filter for the join keys for the build side table and push it
> > down
> > > to
> > > > the probe table source, thus, in some cases, it can reduce the data.
> > > >
> > > > Yang Liu <[email protected]> 于2020年3月6日周五 下午6:54写道:
> > > >
> > > >> discussed with one of our user groups, in Spark 3.0, this is called
> > > >> "dynamic
> > > >> partition pruning"
> > > >>
> > > >> Yang Liu <[email protected]> 于2020年3月6日周五 下午6:12写道:
> > > >>
> > > >>> Hi,
> > > >>>
> > > >>> I am wondering if Calcite will support "lazy optimization"
> (execution
> > > >> time
> > > >>> optimization / runtime optimization).
> > > >>>
> > > >>> For example, we want to do an inner join between an Elasticsearch
> > table
> > > >>> and a MySQL table, like this:
> > > >>>
> > > >>> WITH logic_table_2 AS
> > > >>>  (SELECT _MAP['status'] AS "status",
> > > >>>          _MAP['user'] AS "user"
> > > >>>   FROM "es"."insight-by-sql-v3"
> > > >>>   LIMIT 12345)
> > > >>> SELECT *
> > > >>> FROM "insight_user"."user_tab" AS t1
> > > >>> JOIN logic_table_2 AS t2 ON t1."email" = t2."user"
> > > >>> WHERE t2."status" = 'fail'
> > > >>> LIMIT 10
> > > >>>
> > > >>> t2 is a ES table and t1 is a MySQL table, and it may generate a
> > > execution
> > > >>> plan like this:
> > > >>>
> > > >>> EnumerableProject(id=[$2], name=[$3], email=[$4], desc=[$5],
> > > >>> is_super=[$6], create_time=[$7], has_all_access=[$8], status=[$0],
> > > >>> user=[$1])
> > > >>>  EnumerableLimit(fetch=[10])
> > > >>>    EnumerableHashJoin(condition=[=($1, $4)], joinType=[inner])
> > > >>>      ElasticsearchToEnumerableConverter
> > > >>>        ElasticsearchProject(status=[ITEM($0, 'status')],
> > user=[ITEM($0,
> > > >>> 'user')])
> > > >>>          ElasticsearchFilter(condition=[=(ITEM($0, 'status'),
> > 'fail')])
> > > >>>            ElasticsearchSort(fetch=[12345])
> > > >>>              ElasticsearchTableScan(table=[[es,
> insight-by-sql-v3]])
> > > >>>      JdbcToEnumerableConverter
> > > >>>        JdbcTableScan(table=[[insight_user, user_tab]])
> > > >>>
> > > >>> since here ES query has a filter, in execution Calcite will do the
> ES
> > > >>> query first and get the build table, and then do JdbcTableScan and
> > get
> > > >> the
> > > >>> probe table, and do the HashJoin finally.
> > > >>>
> > > >>> But, since this is a INNER JOIN, there is an implicit filter on the
> > > later
> > > >>> JdbcTableScan:
> > > >>> ``` t1.email in (select user from t2 where t2.status='fail') ```,
> if
> > > >>> applying this implicit filter, the dataset we will handle may
> become
> > > >>> extremely small (save memory) and running much faster since the
> full
> > > >>> JdbcTableScan is always time-wasting. But since Calcite do the
> > > >> optimization
> > > >>> in planner phase, this dynamic/lazy optimization seems missed ...
> > > >>>
> > > >>> To summarize, serial execution with a "lazy optimization" may be
> > faster
> > > >>> and use less memory than parallel execution with an optimized
> > execution
> > > >>> plan since the former one can reduce dataset we handle.
> > > >>>
> > > >>> Any ideas?
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Reply via email to