xuyangzhong commented on code in PR #26616: URL: https://github.com/apache/flink/pull/26616#discussion_r2125897133
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java: ########## @@ -168,4 +169,22 @@ protected Transformation<RowData> createSyncLookupJoinWithState( boolean lookupKeyContainsPrimaryKey) { return inputTransformation; } + + @Override + protected Transformation<RowData> createKeyOrderedAsyncLookupJoin( + Transformation<RowData> inputTransformation, + RelOptTable temporalTable, + ExecNodeConfig config, + ClassLoader classLoader, + Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys, + AsyncTableFunction<Object> asyncLookupFunction, + RelBuilder relBuilder, + RowType inputRowType, + RowType tableSourceRowType, + RowType resultRowType, + boolean isLeftOuterJoin, + LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) { + throw new UnsupportedOperationException( Review Comment: This exception message is quite confusing because the changelog mode under batch is always insert-only. How about using the following message for the IllegalStateException: "Batch mode should not use key-ordered async lookup joins. This is a bug. Please file an issue."? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala: ########## @@ -848,6 +859,17 @@ class LookupJoinTest extends TableTestBase with Serializable { util.verifyExecPlan(sql) } + @Test + def testJoinAsyncTableKeyOrdered(): Unit = { + util.tableEnv.getConfig + .set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_KEY_ORDERED, Boolean.box(true)) + val sql = + "SELECT /*+ LOOKUP('table'='D', 'async'='true', 'output-mode'='allow_unordered') */ * " + Review Comment: Could you please add an ut about plan for append-only source? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java: ########## @@ -294,4 +355,47 @@ protected Transformation<RowData> createSyncLookupJoinWithState( transform.setStateKeyType(keySelector.getProducedType()); return transform; } + + private RowDataKeySelector getKeySelector( + boolean singleParallelism, int[] keys, ClassLoader classLoader, RowType inputRowType) { + RowDataKeySelector keySelector; + if (singleParallelism) { + keySelector = EmptyRowDataKeySelector.INSTANCE; + } else { + // make it a deterministic asc order + Arrays.sort(keys); + keySelector = + KeySelectorUtil.getRowDataSelector( + classLoader, keys, InternalTypeInfo.of(inputRowType)); + } + return keySelector; + } + + private Transformation<RowData> createPartitionTransformation( + RowDataKeySelector keySelector, + Transformation<RowData> inputTransformation, + boolean singleParallelism, + ExecNodeConfig config) { + + final KeyGroupStreamPartitioner<RowData, RowData> partitioner = + new KeyGroupStreamPartitioner<>( + keySelector, KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM); + Transformation<RowData> partitionedTransform = + new PartitionTransformation<>(inputTransformation, partitioner); + + createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", "Partitioner", config) Review Comment: Why we import this field from `CommonExecSink`...? Can we use a new & same one in `StreamExecLookupJoin`? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java: ########## @@ -294,4 +355,47 @@ protected Transformation<RowData> createSyncLookupJoinWithState( transform.setStateKeyType(keySelector.getProducedType()); return transform; } + + private RowDataKeySelector getKeySelector( Review Comment: could you please extract the same logic in `createSyncLookupJoinWithState`? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java: ########## @@ -211,6 +214,64 @@ public Transformation<RowData> translateToPlanInternal( planner, config, upsertMaterialize, lookupKeyContainsPrimaryKey); } + @Override + protected Transformation<RowData> createKeyOrderedAsyncLookupJoin( + Transformation<RowData> inputTransformation, + RelOptTable temporalTable, + ExecNodeConfig config, + ClassLoader classLoader, + Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys, + AsyncTableFunction<Object> asyncLookupFunction, + RelBuilder relBuilder, + RowType inputRowType, + RowType tableSourceRowType, + RowType resultRowType, + boolean isLeftOuterJoin, + LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) { + int[] shuffleKeys = inputUpsertKey; + // normally upsertKeys could not be null. If the job is restored from exec plan then Review Comment: actually, if we could not get the upsert key from its input, the upsert key may also be null. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java: ########## @@ -294,4 +355,47 @@ protected Transformation<RowData> createSyncLookupJoinWithState( transform.setStateKeyType(keySelector.getProducedType()); return transform; } + + private RowDataKeySelector getKeySelector( + boolean singleParallelism, int[] keys, ClassLoader classLoader, RowType inputRowType) { + RowDataKeySelector keySelector; + if (singleParallelism) { + keySelector = EmptyRowDataKeySelector.INSTANCE; Review Comment: This logic has already existed in `KeySelectorUtil.getRowDataSelector`. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java: ########## @@ -481,7 +516,14 @@ private StreamOperatorFactory<RowData> createAsyncLookupJoin( isLeftOuterJoin, asyncLookupOptions.asyncBufferCapacity); } - + if (asyncLookupOptions.keyOrdered) { + checkNotNull(keySelector); + return new TableKeyedAsyncWaitOperatorFactory<>( + asyncFunc, + keySelector, + asyncLookupOptions.asyncTimeout, + asyncLookupOptions.asyncBufferCapacity); + } Review Comment: Can we double check in this branch, the `keySelector` is always null? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AsyncLookupOptionsTest.java: ########## @@ -136,5 +140,35 @@ void testAsyncLookupOptions() { userConf, ChangelogMode.all()); assertThat(asyncLookupOptions.asyncOutputMode).isSameAs(AsyncDataStream.OutputMode.ORDERED); + assertFalse(asyncLookupOptions.keyOrdered); + + TableConfig config = TableConfig.getDefault(); + config.set(TABLE_EXEC_ASYNC_LOOKUP_KEY_ORDERED, true); + asyncLookupOptions = + LookupJoinUtil.getMergedAsyncOptions( + LookupJoinHintTestUtil.completeLookupHint, config, ChangelogMode.all()); + assertTrue(asyncLookupOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED); Review Comment: nit: use `assertThat...isEqualTo` to resolve the warnings. Others are same as well. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java: ########## @@ -168,4 +169,22 @@ protected Transformation<RowData> createSyncLookupJoinWithState( boolean lookupKeyContainsPrimaryKey) { return inputTransformation; } + + @Override + protected Transformation<RowData> createKeyOrderedAsyncLookupJoin( + Transformation<RowData> inputTransformation, + RelOptTable temporalTable, + ExecNodeConfig config, + ClassLoader classLoader, + Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys, + AsyncTableFunction<Object> asyncLookupFunction, + RelBuilder relBuilder, + RowType inputRowType, + RowType tableSourceRowType, + RowType resultRowType, + boolean isLeftOuterJoin, + LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) { + throw new UnsupportedOperationException( Review Comment: Moreover, please add a test for batch. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java: ########## @@ -294,4 +355,47 @@ protected Transformation<RowData> createSyncLookupJoinWithState( transform.setStateKeyType(keySelector.getProducedType()); return transform; } + + private RowDataKeySelector getKeySelector( + boolean singleParallelism, int[] keys, ClassLoader classLoader, RowType inputRowType) { + RowDataKeySelector keySelector; + if (singleParallelism) { + keySelector = EmptyRowDataKeySelector.INSTANCE; + } else { + // make it a deterministic asc order + Arrays.sort(keys); + keySelector = + KeySelectorUtil.getRowDataSelector( + classLoader, keys, InternalTypeInfo.of(inputRowType)); + } + return keySelector; + } + + private Transformation<RowData> createPartitionTransformation( + RowDataKeySelector keySelector, + Transformation<RowData> inputTransformation, + boolean singleParallelism, Review Comment: nit: Is this field always false? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org