Hi.

The Upsert-Kafka, as far as I know, does not implement a lookup table 
interface. Therefore, what you’re describing 

resembles a temporal join[1]. Similar to the stream join in stream processing, 
it currently cannot replace the shuffle 

hash edge with a broadcast. 

By the way, this issue[2] was previously discussed but ultimately wasn't 
resolved.

Let's return to your use case. IIUC, the cost of shuffling for small tables 
should not be particularly high.




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins

[2] https://lists.apache.org/thread/mnb872m4s1yww6dl5f680dz33synd9j8




--

    Best!
    Xuyang




在 2024-11-04 22:35:06,"Guillermo Ortiz Fernández" 
<guillermo.ortiz.f...@gmail.com> 写道:

We are trying to migrate a kafka streams applications to FlinkSql. Kafka 
Streams app uses GKTables to avoid shuffles for the lookup tables. Is there any 
option to Flink?


El lun, 4 nov 2024 a las 11:27, Guillermo Ortiz Fernández 
(<guillermo.ortiz.f...@gmail.com>) escribió:

The small table use upsert-kafka and doesn't support lookup table, do you know 
another possibility? Thanks.


El lun, 4 nov 2024 a las 11:02, Xuyang (<xyzhong...@163.com>) escribió:


Additionally, does the lookup table with CACHE[1][2] meet your needs? If so, 
you might need to use or implement a dimension table connector with cache.




[1] https://issues.apache.org/jira/browse/FLINK-28415

[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/jdbc/#lookup-cache




--

    Best!
    Xuyang




在 2024-11-04 17:54:36,"Xuyang" <xyzhong...@163.com> 写道:

Hi, 

   The BROADCAST[1] join hint currently applies only to batch mode.




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#broadcast[1]




--

    Best!
    Xuyang




At 2024-11-04 17:06:59, "Guillermo Ortiz Fernández" 
<guillermo.ortiz.f...@gmail.com> wrote:

Hi,


I'm running a simple query that joins two tables, where one table is much 
larger than the other, with the second table being very small. I believe it 
would be optimal to use a broadcast on the second table for the join. All my 
tests are being done locally, with very little data in either table. When I 
apply the hint to perform the broadcast and check the execution plan, I see 
that, whether the hint is present or not, the join is done using a hash 
shuffle. Does the hint not enforce the broadcast? Could it be because I’m 
running it locally or because the tables contain very few records?
I'm executing all test from FlinkSQL and sql-client.




EXPLAIN  PLAN FOR
 SELECT /*+ BROADCAST(smalltable) */
    bigtable.eventTimestamp,
    bigtable.field1,
....
    smalltable.technology
FROM bigtable
JOIN smalltable FOR SYSTEM_TIME AS OF EventTimestampLtz 
    ON bigtable.cgi = smalltable.cgi;




Reply via email to