davidradl commented on code in PR #26616:
URL: https://github.com/apache/flink/pull/26616#discussion_r2115564263


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLookupJoin.java:
##########
@@ -211,6 +214,64 @@ public Transformation<RowData> translateToPlanInternal(
                 planner, config, upsertMaterialize, 
lookupKeyContainsPrimaryKey);
     }
 
+    @Override
+    protected Transformation<RowData> createKeyOrderedAsyncLookupJoin(
+            Transformation<RowData> inputTransformation,
+            RelOptTable temporalTable,
+            ExecNodeConfig config,
+            ClassLoader classLoader,
+            Map<Integer, LookupJoinUtil.LookupKey> allLookupKeys,
+            AsyncTableFunction<Object> asyncLookupFunction,
+            RelBuilder relBuilder,
+            RowType inputRowType,
+            RowType tableSourceRowType,
+            RowType resultRowType,
+            boolean isLeftOuterJoin,
+            LookupJoinUtil.AsyncLookupOptions asyncLookupOptions) {
+        int[] shuffleKeys = inputUpsertKey;
+        // normally upsertKeys could not be null. If the job is restored from 
exec plan then

Review Comment:
   This comment could be clearer - can we say something like:
   
    If the job is restored from exec plan then the upsert keys could be null, 
because .... in this case we ....
   In all other cases the upsert keys are not null. 
   
   nit we could write this as:
   
   ```
    int[] shuffleKeys = (inputUpsertKey == null || inputUpsertKey.length == 0)?
   IntStream.range(0, inputRowType.getFieldCount()).toArray():  inputUpsertKey;
   ```
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to