Hi all,
Thanks for all the feedback so far.
If there is no more suggestions, I would like to drive a vote in Tuesday
next week (18 Jan).
Best,
Jing Zhang

Jing Zhang <beyond1...@gmail.com> 于2022年1月5日周三 11:33写道:

> Hi Francesco,
> Thanks a lot for the feedback.
>
> > does it makes sense for a lookup join to use hash distribution whenever
> is possible by default?
> I prefer to enable the hash lookup join only find the hint in the query
> for the following reason:
> 1. Plan compatibility
>     Add a hash shuffle by default would leads to the change of plan after
> users upgrade the flink version.
>     Besides, lookup join is commonly used feature in flink SQL.
> 2. Not all flink jobs could benefit from this improvement.
>     It is a trade off for the lookup join with dimension connectors which
> has cache inside.
>     We hope the raise the cache hit ratio by Hash Lookup Join, however it
> would leads to an extra shuffle at the same time.
>     It is not always a positive optimization, especially for the
> connectors which does not have cache inside.
>
> > Shouldn't the hint take the table alias as the "table name"?  What if
> you do two lookup joins in cascade within the same query with the same
> table (once
> on a key, then on another one), where you use two different aliases for
> the table?
> In theory, it's better to support both table names and alias names.
> But in calcite, the alias name of subquery or table would not be lost in
> the sql conversion phase and sql optimization phase.
> So here we only support table names.
>
> Best,
> Jing Zhang
>
>
> Francesco Guardiani <france...@ververica.com> 于2022年1月3日周一 18:38写道:
>
>> Hi Jing,
>>
>> Thanks for the FLIP. I'm not very knowledgeable about the topic, but going
>> through both the FLIP and the discussion here, I wonder, does it makes
>> sense for a lookup join to use hash distribution whenever is possible by
>> default?
>>
>> The point you're explaining here:
>>
>> > Many Lookup table sources introduce cache in order
>> to reduce the RPC call, such as JDBC, CSV, HBase connectors.
>> For those connectors, we could raise cache hit ratio by routing the same
>> lookup keys to the same task instance
>>
>> Seems something we can infer automatically, rather than manually asking
>> the
>> user to add this hint to the query. Note that I'm not talking against the
>> hint syntax, which might still make sense to be introduced, but I feel
>> like
>> this optimization makes sense in the general case when using the
>> connectors
>> you have quoted. Perhaps there is some downside I'm not aware of?
>>
>> Talking about the hint themselves, taking this example as reference:
>>
>> SELECT /*+ SHUFFLE_HASH('Orders', 'Customers') */ o.order_id, o.total,
>> c.country, c.zip
>> FROM Orders AS o
>> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> ON o.customer_id = c.id;
>>
>> Shouldn't the hint take the table alias as the "table name"? What If you
>> do
>> two lookup joins in cascade within the same query with the same table
>> (once
>> on a key, then on another one), where you use two different aliases for
>> the
>> table?
>>
>>
>> On Fri, Dec 31, 2021 at 9:56 AM Jing Zhang <beyond1...@gmail.com> wrote:
>>
>> > Hi Lincoln,
>> > Thanks for the feedback.
>> >
>> > > 1. For the hint name, +1 for WenLong's proposal.
>> >
>> > I've added add 'SHUFFLE_HASH' to other alternatives in FLIP. Let's
>> waiting
>> > for more voices here.
>> >
>> > > Regarding the `SKEW` hint, agree with you that it can be used widely,
>> and
>> > I
>> > prefer to treat it as a metadata hint, a new category differs from a
>> join
>> > hint.
>> > For your example:
>> > ```
>> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
>> o.order_id,
>> > o.total, c.country, c.zip
>> > FROM Orders AS o
>> > JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> > ON o.customer_id = c.id;
>> > ```
>> > I would prefer another form:
>> > ```
>> > -- provide the skew info to let the engine choose the optimal plan
>> > SELECT /*+ SKEW('Orders') */ o.order_id, ...
>> >
>> > -- or introduce a new hint for the join case, e.g.,
>> > SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ...
>> > ```
>> >
>> > Maybe there is misunderstanding here.
>> > I just use a syntax sugar here.
>> >
>> > SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
>> o.order_id,
>> > ....
>> >
>> > is just a syntax with
>> >
>> > SELECT /*+ USE_HASH('Orders', 'Customers') */ /*+SKEW('Orders') */
>> > o.order_id,
>> > ....
>> >
>> > Although I list 'USE_HASH' and 'SKEW' hint in a query hint clause, it
>> does
>> > not mean they must appear together as a whole.
>> > Based on calcite syntax doc [1], you could list more than one hint in
>> > a /*+' hint [, hint ]* '*/ clause.
>> >
>> > Each hint has different function.
>> > The'USE_HASH' hint suggests the optimizer use hash partitioner for
>> Lookup
>> > Join for table 'Orders' and table 'Customers' while the 'SKEW' hint
>> tells
>> > the optimizer the skew metadata about the table 'Orders'.
>> >
>> > Best,
>> > Jing Zhang
>> >
>> > [1] https://calcite.apache.org/docs/reference.html#sql-hints
>> >
>> >
>> >
>> >
>> > Jing Zhang <beyond1...@gmail.com> 于2021年12月31日周五 16:39写道:
>> >
>> > > Hi Martijn,
>> > > Thanks for the feedback.
>> > >
>> > > Glad to hear that we reached a consensus on the first and second
>> point.
>> > >
>> > > About whether to use `use_hash` as a term, I think your concern makes
>> > > sense.
>> > > Although the hash lookup join is similar to Hash join in oracle that
>> they
>> > > all require hash distribution on input, there exists a little
>> difference
>> > > between them.
>> > > About this point, Lincoln and WenLong both prefer the term
>> > 'SHUFFLE_HASH',
>> > > WDYT?
>> > >
>> > > Best,
>> > > Jing Zhang
>> > >
>> > >
>> > > Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月30日周四 11:21写道:
>> > >
>> > >> Hi Jing,
>> > >>     Thanks for your explanation!
>> > >>
>> > >> 1. For the hint name, +1 for WenLong's proposal. I think the
>> `SHUFFLE`
>> > >> keyword is important in a classic distributed computing system,
>> > >> a hash-join usually means there's a shuffle stage(include shuffle
>> > >> hash-join, broadcast hash-join). Users only need to pass the `build`
>> > side
>> > >> table(usually the smaller one) into `SHUFFLE_HASH` join hint, more
>> > >> concisely than `USE_HASH(left_table, right_table)`. Please correct
>> me if
>> > >> my
>> > >> understanding is wrong.
>> > >> Regarding the `SKEW` hint, agree with you that it can be used widely,
>> > and
>> > >> I
>> > >> prefer to treat it as a metadata hint, a new category differs from a
>> > join
>> > >> hint.
>> > >> For your example:
>> > >> ```
>> > >> SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders') */
>> > o.order_id,
>> > >> o.total, c.country, c.zip
>> > >> FROM Orders AS o
>> > >> JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> > >> ON o.customer_id = c.id;
>> > >> ```
>> > >> I would prefer another form:
>> > >> ```
>> > >> -- provide the skew info to let the engine choose the optimal plan
>> > >> SELECT /*+ SKEW('Orders') */ o.order_id, ...
>> > >>
>> > >> -- or introduce a new hint for the join case, e.g.,
>> > >> SELECT /*+ REPLICATED_SHUFFLE_HASH('Orders') */ o.order_id, ...
>> > >> ```
>> > >>
>> > >> 2. Agree with Martin adding the feature to 1.16, we need time to
>> > complete
>> > >> the change in calcite and also the upgrading work.
>> > >>
>> > >> 3. I misunderstood the 'Other Alternatives' part as the 'Rejected'
>> ones
>> > in
>> > >> the FLIP doc. And my point is avoiding the hacky way with our best
>> > effort.
>> > >> The potential issues for calcite's hint propagation, e.g., join hints
>> > >> correctly propagate into proper join scope include subquery or views
>> > which
>> > >> may have various sql operators, so we should check all kinds of
>> > operators
>> > >> for the correct propagation. Hope this may help. And also cc @Shuo
>> Cheng
>> > >> may
>> > >> offer more help.
>> > >>
>> > >>
>> > >> Best,
>> > >> Lincoln Lee
>> > >>
>> > >>
>> > >> Martijn Visser <mart...@ververica.com> 于2021年12月29日周三 22:21写道:
>> > >>
>> > >> > Hi Jing,
>> > >> >
>> > >> > Thanks for explaining this in more detail and also to others
>> > >> > participating.
>> > >> >
>> > >> > > I think using query hints in this case is more natural for users,
>> > >> WDYT?
>> > >> >
>> > >> > Yes, I agree. As long as we properly explain in our documentation
>> that
>> > >> we
>> > >> > support both Query Hints and Table Hints, what's the difference
>> > between
>> > >> > them and how to use them, I think our users can understand this
>> > >> perfectly.
>> > >> >
>> > >> > > I admit upgrading from Calcite 1.26 to 1.30 would be a big
>> change.
>> > >> > However we could not always avoid upgrade for the following reason
>> > >> >
>> > >> > We have to upgrade Calcite. We actually considered putting that in
>> the
>> > >> > Flink 1.15 scope but ultimately had to drop it, but I definitely
>> think
>> > >> this
>> > >> > needs to be done for 1.16. It's not only because of new features
>> that
>> > >> are
>> > >> > depending on Calcite upgrades, but also because newer versions have
>> > >> > resolved bugs that also hurt our users. That's why we also already
>> > have
>> > >> > tickets for upgrading to Calcite 1.27 [1] and 1.28 [2].
>> > >> >
>> > >> > With regards to using `use_hash` as a term, I think the most
>> important
>> > >> part
>> > >> > is that if we re-use a term like Oracle is using, is that the
>> > behaviour
>> > >> and
>> > >> > outcome should be the same/comparable to the one from (in this
>> case)
>> > >> > Oracle. If their behaviour and outcome are not the same or
>> > comparable, I
>> > >> > would probably introduce our own term to avoid that users get
>> > confused.
>> > >> >
>> > >> > Best regards,
>> > >> >
>> > >> > Martijn
>> > >> >
>> > >> > [1] https://issues.apache.org/jira/browse/FLINK-20873
>> > >> > [2] https://issues.apache.org/jira/browse/FLINK-21239
>> > >> >
>> > >> > On Wed, 29 Dec 2021 at 14:18, Jing Zhang <beyond1...@gmail.com>
>> > wrote:
>> > >> >
>> > >> > > Hi Jian gang,
>> > >> > > Thanks for the feedback.
>> > >> > >
>> > >> > > > When it comes to hive, how do you load partial data instead of
>> the
>> > >> > >    whole data? Any change related with hive?
>> > >> > >
>> > >> > > The question is same as Yuan mentioned before.
>> > >> > > I prefer to drive another FLIP on this topic to further
>> discussion
>> > >> > > individually because this point involves many extension on API.
>> > >> > > Here I would like to share the implementation in our internal
>> > version
>> > >> > > firstly, it maybe very different with the final solution which
>> > merged
>> > >> to
>> > >> > > community.
>> > >> > > The core idea is push the partitioner information down to the
>> lookup
>> > >> > table
>> > >> > > source.
>> > >> > > Hive connector need also upgrades. When loading data into caches,
>> > each
>> > >> > task
>> > >> > > could only store records which look keys are sent to current
>> task.
>> > >> > >
>> > >> > > > How to define the cache configuration? For example, the size
>> and
>> > the
>> > >> > ttl.
>> > >> > >
>> > >> > > I'm afraid there is no a unify caching configuration and cache
>> > >> > > implementation of different connectors yet.
>> > >> > > You could find cache size and ttl config of JDBC in doc [1],
>> HBase
>> > in
>> > >> doc
>> > >> > > [2]
>> > >> > >
>> > >> > > >  Will this feature add another shuffle phase compared with the
>> > >> default
>> > >> > >    behavior? In what situations will user choose this feature?
>> > >> > >
>> > >> > > Yes, if user specify hash hint in query, optimizer would prefer
>> to
>> > >> choose
>> > >> > > Hash Lookup Join, which would add a Hash Shuffle.
>> > >> > > If lookup table source has cache inside (for example HBase/Jdbc)
>> and
>> > >> the
>> > >> > > benefit of increasing cache hit ratio is bigger than add an extra
>> > >> shuffle
>> > >> > > cost, the user could use Hash Lookup Join.
>> > >> > >
>> > >> > > >  For the keys, the default implementation will be ok. But I
>> wonder
>> > >> > > whether we can support more flexible strategies.
>> > >> > >
>> > >> > > The question is same as Yuan mentioned before.
>> > >> > >
>> > >> > > I'm afraid there is no plan to support flexible strategies yet
>> > because
>> > >> > the
>> > >> > > feature involves many things, for example:
>> > >> > > 1. sql syntax
>> > >> > > 2. user defined partitioner API
>> > >> > > 3. RelDistribution type extension and Flink RelDistribution
>> > extension
>> > >> > > 4. FlinkExpandConversionRule
>> > >> > > 5. Exchange execNode extension
>> > >> > > 6. ....
>> > >> > > It needs well designed and more discussion. If this is a strong
>> > >> > > requirement, we would drive another discussion on this point
>> > >> > individually.
>> > >> > > In this FLIP, I would first support hash shuffle. WDYT?
>> > >> > >
>> > >> > > Best,
>> > >> > > Jing Zhang
>> > >> > >
>> > >> > > [1]
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/#connector-options
>> > >> > > [2]
>> > >> > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/hbase/#connector-options
>> > >> > >
>> > >> > > Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 20:37写道:
>> > >> > >
>> > >> > > > Hi Wenlong,
>> > >> > > > Thanks for the feedback.
>> > >> > > > I've checked similar syntax in other systems, they are all
>> > different
>> > >> > from
>> > >> > > > each other. It seems to be without consensus.
>> > >> > > > As mentioned in FLIP-204, oracle uses a query hint, the hint
>> name
>> > is
>> > >> > > > 'use_hash' [1].
>> > >> > > > Spark also uses a query hint, its name is 'SHUFFLE_HASH' [2].
>> > >> > > > SQL Server uses keyword 'HASH' instead of query hint [3].
>> > >> > > > Note, the purposes of hash shuffle in [1][2][3] are a little
>> > >> different
>> > >> > > > from the purpose of FLIP-204, we just discuss syntax here.
>> > >> > > >
>> > >> > > > I've added this part to FLIP waiting for further discussion.
>> > >> > > >
>> > >> > > > Best,
>> > >> > > > Jing Zhang
>> > >> > > >
>> > >> > > > [1]
>> > >> > > >
>> > >> >
>> > >>
>> >
>> https://docs.oracle.com/cd/B12037_01/server.101/b10752/hintsref.htm#5683
>> > >> > > > [2]
>> > >> > > >
>> > >> >
>> > >>
>> >
>> https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html
>> > >> > > > [3]
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-join?view=sql-server-ver15
>> > >> > > >
>> > >> > > >
>> > >> > > > wenlong.lwl <wenlong88....@gmail.com> 于2021年12月29日周三 17:18写道:
>> > >> > > >
>> > >> > > >> Hi, Jing, thanks for driving the discussion.
>> > >> > > >>
>> > >> > > >> Have you made some investigation on the syntax of join hint?
>> > >> > > >> Why do you choose USE_HASH from oracle instead of the style of
>> > >> spark
>> > >> > > >> SHUFFLE_HASH, they are quite different.
>> > >> > > >> People in the big data world may be more familiar with
>> > spark/hive,
>> > >> if
>> > >> > we
>> > >> > > >> need to choose one, personally, I prefer the style of spark.
>> > >> > > >>
>> > >> > > >>
>> > >> > > >> Best,
>> > >> > > >> Wenlong
>> > >> > > >>
>> > >> > > >> On Wed, 29 Dec 2021 at 16:48, zst...@163.com <zst...@163.com>
>> > >> wrote:
>> > >> > > >>
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> > Hi Jing,
>> > >> > > >> > Thanks for your detail reply.
>> > >> > > >> > 1) In the last suggestion, hash by primary key is not use
>> for
>> > >> > raising
>> > >> > > >> the
>> > >> > > >> > cache hit, but handling with skew of left source. Now that
>> you
>> > >> have
>> > >> > > >> 'skew'
>> > >> > > >> > hint and other discussion about it, I'm looking forward to
>> it.
>> > >> > > >> > 2) I mean to support user defined partitioner function. We
>> > have a
>> > >> > case
>> > >> > > >> > that joining a datalake source with special way of
>> partition,
>> > and
>> > >> > have
>> > >> > > >> > implemented not elegantly in our internal version. As you
>> said,
>> > >> it
>> > >> > > needs
>> > >> > > >> > more design.
>> > >> > > >> > 3) I thing so-called 'HashPartitionedCache' is usefull,
>> > otherwise
>> > >> > > >> loading
>> > >> > > >> > all data such as hive lookup table source is almost not
>> > >> available in
>> > >> > > big
>> > >> > > >> > data.
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> > Best regards,
>> > >> > > >> > Yuan
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> >
>> > >> > > >> > 在 2021-12-29 14:52:11,"Jing Zhang" <beyond1...@gmail.com>
>> 写道:
>> > >> > > >> > >Hi, Lincoln
>> > >> > > >> > >Thanks a lot for the feedback.
>> > >> > > >> > >
>> > >> > > >> > >>  Regarding the hint name ‘USE_HASH’, could we consider
>> more
>> > >> > > >> candidates?
>> > >> > > >> > >Things are a little different from RDBMS in the distributed
>> > >> world,
>> > >> > > and
>> > >> > > >> we
>> > >> > > >> > >also aim to solve the data skew problem, so all these
>> incoming
>> > >> > hints
>> > >> > > >> names
>> > >> > > >> > >should be considered together.
>> > >> > > >> > >
>> > >> > > >> > >About skew problem, I would discuss this in next FLIP
>> > >> > individually. I
>> > >> > > >> > would
>> > >> > > >> > >like to share hint proposal for skew here.
>> > >> > > >> > >I want to introduce 'skew' hint which is a query hint,
>> similar
>> > >> with
>> > >> > > >> skew
>> > >> > > >> > >hint in spark [1] and MaxCompute[2].
>> > >> > > >> > >The 'skew' hint could only contain the name of the table
>> with
>> > >> skew.
>> > >> > > >> > >Besides, skew hint could accept table name and column
>> names.
>> > >> > > >> > >In addition, skew hint could accept table name, column
>> names
>> > and
>> > >> > skew
>> > >> > > >> > >values.
>> > >> > > >> > >For example:
>> > >> > > >> > >
>> > >> > > >> > >SELECT /*+ USE_HASH('Orders', 'Customers'), SKEW('Orders')
>> */
>> > >> > > >> o.order_id,
>> > >> > > >> > >o.total, c.country, c.zip
>> > >> > > >> > >FROM Orders AS o
>> > >> > > >> > >JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
>> > >> > > >> > >ON o.customer_id = c.id;
>> > >> > > >> > >
>> > >> > > >> > >The 'skew' hint is not only used for look up join here, but
>> > also
>> > >> > > could
>> > >> > > >> be
>> > >> > > >> > >used for other types of join later, for example, batch hash
>> > >> join or
>> > >> > > >> > >streaming regular join.
>> > >> > > >> > >Go back to better name problem for hash look up join. Since
>> > the
>> > >> > > 'skew'
>> > >> > > >> > hint
>> > >> > > >> > >is a separate hint, so 'use_hash' is still an alternative.
>> > >> > > >> > >WDYT?
>> > >> > > >> > >I don't have a good idea about the better hint name yet. I
>> > would
>> > >> > like
>> > >> > > >> to
>> > >> > > >> > >heard more suggestions about hint names.
>> > >> > > >> > >
>> > >> > > >> > >>  As you mentioned in the flip, this solution depends on
>> > future
>> > >> > > >> changes
>> > >> > > >> > to
>> > >> > > >> > >calcite (and also upgrading calcite would be another
>> possible
>> > >> big
>> > >> > > >> change:
>> > >> > > >> > >at least calicite-1.30 vs 1.26, are we preparing to accept
>> > this
>> > >> big
>> > >> > > >> > >change?).
>> > >> > > >> > >
>> > >> > > >> > >Indeed, solution 1 depends on calcite upgrade.
>> > >> > > >> > >I admit upgrade from Calcite 1.26 to 1.30 would be a big
>> > >> change. I
>> > >> > > >> still
>> > >> > > >> > >remember what we have suffered from last upgrade to Calcite
>> > >> 1.26.
>> > >> > > >> > >However we could not always avoid upgrade for the following
>> > >> reason:
>> > >> > > >> > >1. Other features also depends on the Calcite upgrade. For
>> > >> example,
>> > >> > > >> > Session
>> > >> > > >> > >Window and Count Window.
>> > >> > > >> > >2. If we always avoid Calcite upgrade, there would be more
>> gap
>> > >> with
>> > >> > > the
>> > >> > > >> > >latest version. One day, if upgrading becomes a thing which
>> > has
>> > >> to
>> > >> > be
>> > >> > > >> > done,
>> > >> > > >> > >the pain is more.
>> > >> > > >> > >
>> > >> > > >> > >WDYT?
>> > >> > > >> > >
>> > >> > > >> > >>  Is there another possible way to minimize the change in
>> > >> calcite?
>> > >> > > >> > >
>> > >> > > >> > >Do you check the 'Other Alternatives' part in the
>> FLIP-204? It
>> > >> > gives
>> > >> > > >> > >another solution which does not depend on calcite upgrade
>> and
>> > do
>> > >> > not
>> > >> > > >> need
>> > >> > > >> > >to worry about the hint would be missed in the propagation.
>> > >> > > >> > >This is also what we have done in the internal version.
>> > >> > > >> > >The core idea is propagating 'use_hash' hint to TableScan
>> with
>> > >> > > matched
>> > >> > > >> > >table names.  However, it is a little hacky.
>> > >> > > >> > >
>> > >> > > >> > >> As I know there're more limitations than `Correlate`.
>> > >> > > >> > >
>> > >> > > >> > >As mentioned before, in our external version, I choose the
>> the
>> > >> > 'Other
>> > >> > > >> > >Alternatives' part in the FLIP-204.
>> > >> > > >> > >Although I do a POC in the solution 1 and lists all
>> changes I
>> > >> found
>> > >> > > in
>> > >> > > >> the
>> > >> > > >> > >FLIP, there may still be something I missed.
>> > >> > > >> > >I'm very happy to hear that you point out there're more
>> > >> limitations
>> > >> > > >> except
>> > >> > > >> > >for `Correlate`, would you please give more details on this
>> > >> part?
>> > >> > > >> > >
>> > >> > > >> > >Best,
>> > >> > > >> > >Jing Zhang
>> > >> > > >> > >
>> > >> > > >> > >[1]
>> > >> > >
>> https://docs.databricks.com/delta/join-performance/skew-join.html
>> > >> > > >> > >[2]
>> > >> > > >> > >
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://help.aliyun.com/apsara/enterprise/v_3_13_0_20201215/odps/enterprise-ascm-user-guide/hotspot-tilt.html?spm=a2c4g.14484438.10001.669
>> > >> > > >> > >
>> > >> > > >> > >Jing Zhang <beyond1...@gmail.com> 于2021年12月29日周三 14:40写道:
>> > >> > > >> > >
>> > >> > > >> > >> Hi Yuan and Lincoln,
>> > >> > > >> > >> thanks a lot for the attention. I would answer the email
>> one
>> > >> by
>> > >> > > one.
>> > >> > > >> > >>
>> > >> > > >> > >> To Yuan
>> > >> > > >> > >> > How shall we deal with CDC data? If there is CDC data
>> in
>> > the
>> > >> > > >> pipeline,
>> > >> > > >> > >> IMHO, shuffle by join key will cause CDC data disorder.
>> Will
>> > >> it
>> > >> > be
>> > >> > > >> > better
>> > >> > > >> > >> to use primary key in this case?
>> > >> > > >> > >>
>> > >> > > >> > >> Good question.
>> > >> > > >> > >> The problem could not only exists in CDC data source, but
>> > also
>> > >> > > exists
>> > >> > > >> > when
>> > >> > > >> > >> the input stream is not insert-only stream (for example,
>> the
>> > >> > result
>> > >> > > >> of
>> > >> > > >> > >> unbounded aggregate or regular join).
>> > >> > > >> > >> I think use hash by primary key is not a good choise. It
>> > could
>> > >> > not
>> > >> > > >> raise
>> > >> > > >> > >> the cache hit because cache key is look up key instead of
>> > >> primary
>> > >> > > >> key of
>> > >> > > >> > >> input.
>> > >> > > >> > >>
>> > >> > > >> > >> To avoid wrong result, hash lookup Join requires that the
>> > >> input
>> > >> > > >> stream
>> > >> > > >> > >> should be insert_only stream or its upsert keys contains
>> > >> lookup
>> > >> > > keys.
>> > >> > > >> > >>
>> > >> > > >> > >> I've added this limitation to FLIP, thanks a lot for
>> > >> reminding.
>> > >> > > >> > >>
>> > >> > > >> > >> > If the shuffle keys can be customized  when users have
>> the
>> > >> > > >> knowledge
>> > >> > > >> > >> about distribution of data?
>> > >> > > >> > >>
>> > >> > > >> > >> I'm not sure I understand your question.
>> > >> > > >> > >>
>> > >> > > >> > >> Do you mean to support user defined partitioner function
>> on
>> > >> keys
>> > >> > > just
>> > >> > > >> > like
>> > >> > > >> > >> flink DataStream sql?
>> > >> > > >> > >> If yes, I'm afraid there is no plan to support this
>> feature
>> > >> yet
>> > >> > > >> because
>> > >> > > >> > >> the feature involves many things, for example:
>> > >> > > >> > >> 1. sql syntax
>> > >> > > >> > >> 2. user defined partitioner API
>> > >> > > >> > >> 3. RelDistribution type extension and Flink
>> RelDistribution
>> > >> > > extension
>> > >> > > >> > >> 4. FlinkExpandConversionRule
>> > >> > > >> > >> 5. Exchange execNode extension
>> > >> > > >> > >> 6. ....
>> > >> > > >> > >> It needs well designed and more discussion. If this is a
>> > >> strong
>> > >> > > >> > >> requirement, we would drive another discussion on this
>> point
>> > >> > > >> > individually.
>> > >> > > >> > >> In this FLIP, I would first support hash shuffle. WDYT?
>> > >> > > >> > >>
>> > >> > > >> > >> Or do you mean support hash by other keys instead of
>> lookup
>> > >> key?
>> > >> > > >> > >> If yes, would you please tell me a specific user case?
>> > >> > > >> > >> We need to fetch the record from external storage of
>> > dimension
>> > >> > > table
>> > >> > > >> by
>> > >> > > >> > >> look up key, so those dimension table source uses look up
>> > >> keys as
>> > >> > > >> cache
>> > >> > > >> > >> key.
>> > >> > > >> > >> We could only increase  the cache ratio by shuffle lookup
>> > >> keys.
>> > >> > > >> > >> I need more use cases to understand this requirement.
>> > >> > > >> > >>
>> > >> > > >> > >> > Some connectors such as hive, caches all data in
>> > >> > LookupFunction.
>> > >> > > >> How
>> > >> > > >> > to
>> > >> > > >> > >> decrease the valid cache data size if data can be
>> shuffled?
>> > >> > > >> > >>
>> > >> > > >> > >> Very good idea.
>> > >> > > >> > >> There are two types of cache.
>> > >> > > >> > >> For Key-Value storage, such as Redis/HBase, the lookup
>> table
>> > >> > source
>> > >> > > >> > stores
>> > >> > > >> > >> the visited lookup keys and it's record into cache
>> lazily.
>> > >> > > >> > >> For other storage without keys, such as hive, each task
>> > loads
>> > >> all
>> > >> > > >> data
>> > >> > > >> > >> into cache eagerly in the initialize phase.
>> > >> > > >> > >> After introduce hash partitioner, for key-value storages,
>> > >> there
>> > >> > is
>> > >> > > no
>> > >> > > >> > need
>> > >> > > >> > >> to change; for hive, each task could only load part of
>> cache
>> > >> > > instead
>> > >> > > >> of
>> > >> > > >> > >> load all cache.
>> > >> > > >> > >>
>> > >> > > >> > >> We have implemented this optimization in our internal
>> > version.
>> > >> > > >> > >> The core idea is push the partitioner information down to
>> > the
>> > >> > > lookup
>> > >> > > >> > table
>> > >> > > >> > >> source. When loading data into caches, each task could
>> only
>> > >> store
>> > >> > > >> those
>> > >> > > >> > >> records which look keys are sent to current task.
>> > >> > > >> > >> We called this 'HashPartitionedCache'.
>> > >> > > >> > >>
>> > >> > > >> > >> I have added this point into the Lookup Join requirements
>> > >> list in
>> > >> > > the
>> > >> > > >> > >> motivation of the FLIP, but I would not do this point in
>> > this
>> > >> > FLIP
>> > >> > > >> right
>> > >> > > >> > >> now.
>> > >> > > >> > >> If this is a strong requirement, we need drive another
>> > >> discussion
>> > >> > > on
>> > >> > > >> > this
>> > >> > > >> > >> topic individually because this point involves many
>> > extension
>> > >> on
>> > >> > > API.
>> > >> > > >> > >>
>> > >> > > >> > >> Best,
>> > >> > > >> > >> Jing Zhang
>> > >> > > >> > >>
>> > >> > > >> > >>
>> > >> > > >> > >> Lincoln Lee <lincoln.8...@gmail.com> 于2021年12月29日周三
>> > 10:01写道:
>> > >> > > >> > >>
>> > >> > > >> > >>> Hi Jing,
>> > >> > > >> > >>>     Thanks for bringing up this discussion!  Agree that
>> > this
>> > >> > join
>> > >> > > >> hints
>> > >> > > >> > >>> should benefit both bounded and unbounded cases as
>> Martin
>> > >> > > mentioned.
>> > >> > > >> > >>> I also agree that implementing the query hint is the
>> right
>> > >> way
>> > >> > > for a
>> > >> > > >> > more
>> > >> > > >> > >>> general purpose since the dynamic table options has a
>> > limited
>> > >> > > scope.
>> > >> > > >> > >>>    Some points I'd like to share are:
>> > >> > > >> > >>> 1. Regarding the hint name ‘USE_HASH’, could we consider
>> > more
>> > >> > > >> > candidates?
>> > >> > > >> > >>> Things are a little different from RDBMS in the
>> distributed
>> > >> > world,
>> > >> > > >> and
>> > >> > > >> > we
>> > >> > > >> > >>> also aim to solve the data skew problem, so all these
>> > >> incoming
>> > >> > > hints
>> > >> > > >> > names
>> > >> > > >> > >>> should be considered together.
>> > >> > > >> > >>> 2. As you mentioned in the flip, this solution depends
>> on
>> > >> future
>> > >> > > >> > changes
>> > >> > > >> > >>> to
>> > >> > > >> > >>> calcite (and also upgrading calcite would be another
>> > possible
>> > >> > big
>> > >> > > >> > change:
>> > >> > > >> > >>> at least calicite-1.30 vs 1.26, are we preparing to
>> accept
>> > >> this
>> > >> > > big
>> > >> > > >> > >>> change?). Is there another possible way to minimize the
>> > >> change
>> > >> > in
>> > >> > > >> > calcite?
>> > >> > > >> > >>> As I know there're more limitations than `Correlate`.
>> > >> > > >> > >>>
>> > >> > > >> > >>> Best,
>> > >> > > >> > >>> Lincoln Lee
>> > >> > > >> > >>>
>> > >> > > >> > >>>
>> > >> > > >> > >>> Jing Zhang <beyond1...@gmail.com> 于2021年12月28日周二
>> 23:04写道:
>> > >> > > >> > >>>
>> > >> > > >> > >>> > Hi Martijn,
>> > >> > > >> > >>> > Thanks a lot for your attention.
>> > >> > > >> > >>> > I'm sorry I didn't explain the motivation clearly. I
>> > would
>> > >> > like
>> > >> > > to
>> > >> > > >> > >>> explain
>> > >> > > >> > >>> > it in detail, and then give response on your
>> questions.
>> > >> > > >> > >>> > A lookup join is typically used to enrich a table with
>> > data
>> > >> > that
>> > >> > > >> is
>> > >> > > >> > >>> queried
>> > >> > > >> > >>> > from an external system. Many Lookup table sources
>> > >> introduce
>> > >> > > >> cache in
>> > >> > > >> > >>> order
>> > >> > > >> > >>> > to reduce the RPC call, such as JDBC, CSV, HBase
>> > >> connectors.
>> > >> > > >> > >>> > For those connectors, we could raise cache hit ratio
>> by
>> > >> > routing
>> > >> > > >> the
>> > >> > > >> > same
>> > >> > > >> > >>> > lookup keys to the same task instance. This is the
>> > purpose
>> > >> of
>> > >> > > >> > >>> >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > >> > > >> > >>> > .
>> > >> > > >> > >>> > Other cases might benefit from Hash distribution,
>> such as
>> > >> > batch
>> > >> > > >> hash
>> > >> > > >> > >>> join
>> > >> > > >> > >>> > as you mentioned. It is a cool idea, however it is not
>> > the
>> > >> > > >> purpose of
>> > >> > > >> > >>> this
>> > >> > > >> > >>> > FLIP, we could discuss this in FLINK-20670
>> > >> > > >> > >>> > <https://issues.apache.org/jira/browse/FLINK-20670>.
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > > - When I was reading about this topic [1] I was
>> > >> wondering if
>> > >> > > >> this
>> > >> > > >> > >>> feature
>> > >> > > >> > >>> > would be more beneficial for bounded use cases and
>> not so
>> > >> much
>> > >> > > for
>> > >> > > >> > >>> > unbounded use cases. What do you think?
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > As mentioned before, the purpose of Hash Lookup Join
>> is
>> > to
>> > >> > > >> increase
>> > >> > > >> > the
>> > >> > > >> > >>> > cache hit ratio which is different from Oracle Hash
>> Join.
>> > >> > > However
>> > >> > > >> we
>> > >> > > >> > >>> could
>> > >> > > >> > >>> > use the similar hint syntax.
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > > - If I look at the current documentation for SQL
>> Hints
>> > in
>> > >> > > Flink
>> > >> > > >> > [2], I
>> > >> > > >> > >>> > notice that all of the hints there are located at the
>> end
>> > >> of
>> > >> > the
>> > >> > > >> SQL
>> > >> > > >> > >>> > statement. In the FLIP, the use_hash is defined
>> directly
>> > >> after
>> > >> > > the
>> > >> > > >> > >>> 'SELECT'
>> > >> > > >> > >>> > keyword. Can we somehow make this consistent for the
>> > user?
>> > >> Or
>> > >> > > >> should
>> > >> > > >> > the
>> > >> > > >> > >>> > user be able to specify hints anywhere in its SQL
>> > >> statement?
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > Calcite supports hints in two locations [3]:
>> > >> > > >> > >>> > Query Hint: right after the SELECT keyword;
>> > >> > > >> > >>> > Table Hint: right after the referenced table name.
>> > >> > > >> > >>> > Now Flink has supported dynamic table options based on
>> > the
>> > >> > Hint
>> > >> > > >> > >>> framework
>> > >> > > >> > >>> > of Calcite which is mentioned in doc[2].
>> > >> > > >> > >>> > Besides, query hints are also important, it could
>> give a
>> > >> hint
>> > >> > > for
>> > >> > > >> > >>> > optimizers to choose a better plan. Almost all popular
>> > >> > databases
>> > >> > > >> and
>> > >> > > >> > >>> > big-data engines support sql query hints, such as
>> oracle,
>> > >> > hive,
>> > >> > > >> spark
>> > >> > > >> > >>> and
>> > >> > > >> > >>> > so on.
>> > >> > > >> > >>> > I think using query hints in this case is more natural
>> > for
>> > >> > > users,
>> > >> > > >> > WDYT?
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > I have updated the motivation part in the FLIP,
>> > >> > > >> > >>> > Thanks for the feedback!
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > [1]
>> > >> > > >>
>> https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
>> > >> > > >> > >>> > [2]
>> > >> > > >> > >>> >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
>> > >> > > >> > >>> > [3]
>> > >> https://calcite.apache.org/docs/reference.html#sql-hints
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > Best,
>> > >> > > >> > >>> > Jing Zhang
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > Martijn Visser <mart...@ververica.com> 于2021年12月28日周二
>> > >> > 22:02写道:
>> > >> > > >> > >>> >
>> > >> > > >> > >>> > > Hi Jing,
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > Thanks a lot for the explanation and the FLIP. I
>> > >> definitely
>> > >> > > >> learned
>> > >> > > >> > >>> > > something when reading more about `use_hash`. My
>> > >> > > interpretation
>> > >> > > >> > would
>> > >> > > >> > >>> be
>> > >> > > >> > >>> > > that the primary benefit of a hash lookup join
>> would be
>> > >> > > improved
>> > >> > > >> > >>> > > performance by allowing the user to explicitly
>> optimise
>> > >> the
>> > >> > > >> > planner.
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > I have a couple of questions:
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > - When I was reading about this topic [1] I was
>> > >> wondering if
>> > >> > > >> this
>> > >> > > >> > >>> feature
>> > >> > > >> > >>> > > would be more beneficial for bounded use cases and
>> not
>> > so
>> > >> > much
>> > >> > > >> for
>> > >> > > >> > >>> > > unbounded use cases. What do you think?
>> > >> > > >> > >>> > > - If I look at the current documentation for SQL
>> Hints
>> > in
>> > >> > > Flink
>> > >> > > >> > [2], I
>> > >> > > >> > >>> > > notice that all of the hints there are located at
>> the
>> > >> end of
>> > >> > > the
>> > >> > > >> > SQL
>> > >> > > >> > >>> > > statement. In the FLIP, the use_hash is defined
>> > directly
>> > >> > after
>> > >> > > >> the
>> > >> > > >> > >>> > 'SELECT'
>> > >> > > >> > >>> > > keyword. Can we somehow make this consistent for the
>> > >> user?
>> > >> > Or
>> > >> > > >> > should
>> > >> > > >> > >>> the
>> > >> > > >> > >>> > > user be able to specify hints anywhere in its SQL
>> > >> statement?
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > Best regards,
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > Martijn
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > [1]
>> > >> > > >> >
>> > https://logicalread.com/oracle-11g-hash-joins-mc02/#.V5Wm4_mnoUI
>> > >> > > >> > >>> > > [2]
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > On Tue, 28 Dec 2021 at 08:17, Jing Zhang <
>> > >> > > beyond1...@gmail.com>
>> > >> > > >> > >>> wrote:
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> > > > Hi everyone,
>> > >> > > >> > >>> > > > Look up join
>> > >> > > >> > >>> > > > <
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
>> > >> > > >> > >>> > > > >[1]
>> > >> > > >> > >>> > > > is
>> > >> > > >> > >>> > > > commonly used feature in Flink SQL. We have
>> received
>> > >> many
>> > >> > > >> > >>> optimization
>> > >> > > >> > >>> > > > requirements on look up join. For example:
>> > >> > > >> > >>> > > > 1. Enforces left side of lookup join do a hash
>> > >> partitioner
>> > >> > > to
>> > >> > > >> > raise
>> > >> > > >> > >>> > cache
>> > >> > > >> > >>> > > > hint ratio
>> > >> > > >> > >>> > > > 2. Solves the data skew problem after introduces
>> hash
>> > >> > lookup
>> > >> > > >> join
>> > >> > > >> > >>> > > > 3. Enables mini-batch optimization to reduce RPC
>> call
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > Next we will solve these problems one by one.
>> > >> Firstly,  we
>> > >> > > >> would
>> > >> > > >> > >>> focus
>> > >> > > >> > >>> > on
>> > >> > > >> > >>> > > > point 1, and continue to discuss point 2 and
>> point 3
>> > >> > later.
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > There are many similar requirements from user mail
>> > list
>> > >> > and
>> > >> > > >> JIRA
>> > >> > > >> > >>> about
>> > >> > > >> > >>> > > hash
>> > >> > > >> > >>> > > > Lookup Join, for example:
>> > >> > > >> > >>> > > > 1. FLINK-23687 <
>> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-23687>
>> > >> > > >> > >>> -
>> > >> > > >> > >>> > > > Introduce partitioned lookup join to enforce
>> input of
>> > >> > > >> LookupJoin
>> > >> > > >> > to
>> > >> > > >> > >>> > hash
>> > >> > > >> > >>> > > > shuffle by lookup keys
>> > >> > > >> > >>> > > > 2. FLINK-25396 <
>> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25396>
>> > >> > > >> > >>> -
>> > >> > > >> > >>> > > > lookupjoin source table for pre-partitioning
>> > >> > > >> > >>> > > > 3. FLINK-25262 <
>> > >> > > >> > https://issues.apache.org/jira/browse/FLINK-25262>
>> > >> > > >> > >>> -
>> > >> > > >> > >>> > > > Support to send data to lookup table for
>> > >> > > >> > KeyGroupStreamPartitioner
>> > >> > > >> > >>> way
>> > >> > > >> > >>> > > for
>> > >> > > >> > >>> > > > SQL.
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > In this FLIP, I would like to start a discussion
>> > about
>> > >> > Hash
>> > >> > > >> > Lookup
>> > >> > > >> > >>> > Join.
>> > >> > > >> > >>> > > > The core idea is introducing a 'USE_HASH' hint in
>> > >> query.
>> > >> > > This
>> > >> > > >> > >>> syntax
>> > >> > > >> > >>> > is
>> > >> > > >> > >>> > > > directly user-oriented and therefore requires
>> careful
>> > >> > > design.
>> > >> > > >> > >>> > > > There are two ways about how to propagate this
>> hint
>> > to
>> > >> > > >> > LookupJoin in
>> > >> > > >> > >>> > > > optimizer. We need further discussion to do final
>> > >> decide.
>> > >> > > >> Anyway,
>> > >> > > >> > >>> the
>> > >> > > >> > >>> > > > difference between the two solution is only about
>> the
>> > >> > > internal
>> > >> > > >> > >>> > > > implementation and has no impact on the user.
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > For more detail on the proposal:
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > Looking forward to your feedback, thanks.
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > Best,
>> > >> > > >> > >>> > > > Jing Zhang
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > > [1]
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> >
>> > >> > > >>
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join
>> > >> > > >> > >>> > > >
>> > >> > > >> > >>> > >
>> > >> > > >> > >>> >
>> > >> > > >> > >>>
>> > >> > > >> > >>
>> > >> > > >> >
>> > >> > > >>
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> >
>>
>>

Reply via email to