Maybe you need to use a lookup table[1] for this small side. In that case, the probe side(the big table side) will not be shuffled.
However, lookup join only supports processing time, and the changelog on the small table side will not be captured... [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join -- Best! Xuyang 在 2024-11-05 10:41:38,"Xuyang" <xyzhong...@163.com> 写道: Hi. The Upsert-Kafka, as far as I know, does not implement a lookup table interface. Therefore, what you’re describing resembles a temporal join[1]. Similar to the stream join in stream processing, it currently cannot replace the shuffle hash edge with a broadcast. By the way, this issue[2] was previously discussed but ultimately wasn't resolved. Let's return to your use case. IIUC, the cost of shuffling for small tables should not be particularly high. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#temporal-joins [2] https://lists.apache.org/thread/mnb872m4s1yww6dl5f680dz33synd9j8 -- Best! Xuyang 在 2024-11-04 22:35:06,"Guillermo Ortiz Fernández" <guillermo.ortiz.f...@gmail.com> 写道: We are trying to migrate a kafka streams applications to FlinkSql. Kafka Streams app uses GKTables to avoid shuffles for the lookup tables. Is there any option to Flink? El lun, 4 nov 2024 a las 11:27, Guillermo Ortiz Fernández (<guillermo.ortiz.f...@gmail.com>) escribió: The small table use upsert-kafka and doesn't support lookup table, do you know another possibility? Thanks. El lun, 4 nov 2024 a las 11:02, Xuyang (<xyzhong...@163.com>) escribió: Additionally, does the lookup table with CACHE[1][2] meet your needs? If so, you might need to use or implement a dimension table connector with cache. [1] https://issues.apache.org/jira/browse/FLINK-28415 [2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/jdbc/#lookup-cache -- Best! Xuyang 在 2024-11-04 17:54:36,"Xuyang" <xyzhong...@163.com> 写道: Hi, The BROADCAST[1] join hint currently applies only to batch mode. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/hints/#broadcast[1] -- Best! Xuyang At 2024-11-04 17:06:59, "Guillermo Ortiz Fernández" <guillermo.ortiz.f...@gmail.com> wrote: Hi, I'm running a simple query that joins two tables, where one table is much larger than the other, with the second table being very small. I believe it would be optimal to use a broadcast on the second table for the join. All my tests are being done locally, with very little data in either table. When I apply the hint to perform the broadcast and check the execution plan, I see that, whether the hint is present or not, the join is done using a hash shuffle. Does the hint not enforce the broadcast? Could it be because I’m running it locally or because the tables contain very few records? I'm executing all test from FlinkSQL and sql-client. EXPLAIN PLAN FOR SELECT /*+ BROADCAST(smalltable) */ bigtable.eventTimestamp, bigtable.field1, .... smalltable.technology FROM bigtable JOIN smalltable FOR SYSTEM_TIME AS OF EventTimestampLtz ON bigtable.cgi = smalltable.cgi;