xuyangzhong commented on code in PR #26616:
URL: https://github.com/apache/flink/pull/26616#discussion_r2125897133


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -168,4 +169,22 @@ protected Transformation<RowData> 
createSyncLookupJoinWithState(
             boolean lookupKeyContainsPrimaryKey) {
         return inputTransformation;
     }
+
+    @Override
+    protected Transformation<RowData> createKeyOrderedAsyncLookupJoin(
+            Transformation<RowData> inputTransformation,
+            RelOptTable temporalTable,
+            ExecNodeConfig config,
+            ClassLoader classLoader,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction<Object> asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
+        throw new UnsupportedOperationException(

Review Comment:
   This exception message is quite confusing because the changelog mode under 
batch is always insert-only. How about using the following message for the 
IllegalStateException: "Batch mode should not use key-ordered async lookup 
joins. This is a bug. Please file an issue."?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala:
##########
@@ -848,6 +859,17 @@ class LookupJoinTest extends TableTestBase with 
Serializable {
     util.verifyExecPlan(sql)
   }
 
+  @Test
+  def testJoinAsyncTableKeyOrdered(): Unit = {
+    util.tableEnv.getConfig
+      .set(ExecutionConfigOptions.TABLE_EXEC_ASYNC_LOOKUP_KEY_ORDERED, 
Boolean.box(true))
+    val sql =
+      "SELECT /*+ LOOKUP('table'='D', 'async'='true', 
'output-mode'='allow_unordered') */ * " +

Review Comment:
   Could you please add an ut about plan for append-only source?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -294,4 +355,47 @@ protected Transformation<RowData> 
createSyncLookupJoinWithState(
         transform.setStateKeyType(keySelector.getProducedType());
         return transform;
     }
+
+    private RowDataKeySelector getKeySelector(
+            boolean singleParallelism, int[] keys, ClassLoader classLoader, 
RowType inputRowType) {
+        RowDataKeySelector keySelector;
+        if (singleParallelism) {
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
+        } else {
+            // make it a deterministic asc order
+            Arrays.sort(keys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader, keys, 
InternalTypeInfo.of(inputRowType));
+        }
+        return keySelector;
+    }
+
+    private Transformation<RowData> createPartitionTransformation(
+            RowDataKeySelector keySelector,
+            Transformation<RowData> inputTransformation,
+            boolean singleParallelism,
+            ExecNodeConfig config) {
+
+        final KeyGroupStreamPartitioner<RowData, RowData> partitioner =
+                new KeyGroupStreamPartitioner<>(
+                        keySelector, 
KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
+        Transformation<RowData> partitionedTransform =
+                new PartitionTransformation<>(inputTransformation, 
partitioner);
+
+        createTransformationMeta(PARTITIONER_TRANSFORMATION, "Partitioner", 
"Partitioner", config)

Review Comment:
   Why we import this field from `CommonExecSink`...? Can we use a new & same 
one in `StreamExecLookupJoin`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -294,4 +355,47 @@ protected Transformation<RowData> 
createSyncLookupJoinWithState(
         transform.setStateKeyType(keySelector.getProducedType());
         return transform;
     }
+
+    private RowDataKeySelector getKeySelector(

Review Comment:
   could you please extract the same logic in `createSyncLookupJoinWithState`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -211,6 +214,64 @@ public Transformation<RowData> translateToPlanInternal(
                 planner, config, upsertMaterialize, 
lookupKeyContainsPrimaryKey);
     }
 
+    @Override
+    protected Transformation<RowData> createKeyOrderedAsyncLookupJoin(
+            Transformation<RowData> inputTransformation,
+            RelOptTable temporalTable,
+            ExecNodeConfig config,
+            ClassLoader classLoader,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction<Object> asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
+        int[] shuffleKeys = inputUpsertKey;
+        // normally upsertKeys could not be null. If the job is restored from 
exec plan then

Review Comment:
   actually, if we could not get the upsert key from its input, the upsert key 
may also be  null. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -294,4 +355,47 @@ protected Transformation<RowData> 
createSyncLookupJoinWithState(
         transform.setStateKeyType(keySelector.getProducedType());
         return transform;
     }
+
+    private RowDataKeySelector getKeySelector(
+            boolean singleParallelism, int[] keys, ClassLoader classLoader, 
RowType inputRowType) {
+        RowDataKeySelector keySelector;
+        if (singleParallelism) {
+            keySelector = EmptyRowDataKeySelector.INSTANCE;

Review Comment:
   This logic has already existed in `KeySelectorUtil.getRowDataSelector`.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -481,7 +516,14 @@ private StreamOperatorFactory<RowData> 
createAsyncLookupJoin(
                             isLeftOuterJoin,
                             asyncLookupOptions.asyncBufferCapacity);
         }
-
+        if (asyncLookupOptions.keyOrdered) {
+            checkNotNull(keySelector);
+            return new TableKeyedAsyncWaitOperatorFactory<>(
+                    asyncFunc,
+                    keySelector,
+                    asyncLookupOptions.asyncTimeout,
+                    asyncLookupOptions.asyncBufferCapacity);
+        }

Review Comment:
   Can we double check in this branch, the `keySelector` is always null?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/AsyncLookupOptionsTest.java:
##########
@@ -136,5 +140,35 @@ void testAsyncLookupOptions() {
                         userConf,
                         ChangelogMode.all());
         
assertThat(asyncLookupOptions.asyncOutputMode).isSameAs(AsyncDataStream.OutputMode.ORDERED);
+        assertFalse(asyncLookupOptions.keyOrdered);
+
+        TableConfig config = TableConfig.getDefault();
+        config.set(TABLE_EXEC_ASYNC_LOOKUP_KEY_ORDERED, true);
+        asyncLookupOptions =
+                LookupJoinUtil.getMergedAsyncOptions(
+                        LookupJoinHintTestUtil.completeLookupHint, config, 
ChangelogMode.all());
+        assertTrue(asyncLookupOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.ORDERED);

Review Comment:
   nit: use `assertThat...isEqualTo` to resolve the warnings.
   Others are same as well.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecLookupJoin.java:
##########
@@ -168,4 +169,22 @@ protected Transformation<RowData> 
createSyncLookupJoinWithState(
             boolean lookupKeyContainsPrimaryKey) {
         return inputTransformation;
     }
+
+    @Override
+    protected Transformation<RowData> createKeyOrderedAsyncLookupJoin(
+            Transformation<RowData> inputTransformation,
+            RelOptTable temporalTable,
+            ExecNodeConfig config,
+            ClassLoader classLoader,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction<Object> asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
+        throw new UnsupportedOperationException(

Review Comment:
   Moreover, please add a test for batch.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -294,4 +355,47 @@ protected Transformation<RowData> 
createSyncLookupJoinWithState(
         transform.setStateKeyType(keySelector.getProducedType());
         return transform;
     }
+
+    private RowDataKeySelector getKeySelector(
+            boolean singleParallelism, int[] keys, ClassLoader classLoader, 
RowType inputRowType) {
+        RowDataKeySelector keySelector;
+        if (singleParallelism) {
+            keySelector = EmptyRowDataKeySelector.INSTANCE;
+        } else {
+            // make it a deterministic asc order
+            Arrays.sort(keys);
+            keySelector =
+                    KeySelectorUtil.getRowDataSelector(
+                            classLoader, keys, 
InternalTypeInfo.of(inputRowType));
+        }
+        return keySelector;
+    }
+
+    private Transformation<RowData> createPartitionTransformation(
+            RowDataKeySelector keySelector,
+            Transformation<RowData> inputTransformation,
+            boolean singleParallelism,

Review Comment:
   nit: Is this field always false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to