fsk119 commented on code in PR #26641:
URL: https://github.com/apache/flink/pull/26641#discussion_r2135228993


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java:
##########
@@ -256,21 +292,21 @@ private Transformation<RowData> createAsyncModelPredict(
                                 config, classLoader, null, inputRowType),
                         InternalSerializers.create(modelOutputType),
                         false,
-                        asyncLookupOptions.asyncBufferCapacity);
-        if (asyncLookupOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.UNORDERED) {
+                        asyncOptions.asyncBufferCapacity);
+        if (asyncOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.UNORDERED) {
             // The input stream is insert-only.
             return ExecNodeUtil.createOneInputTransformation(
                     inputTransformation,
                     createTransformationMeta(ML_PREDICT_TRANSFORMATION, 
config),
                     new AsyncWaitOperatorFactory<>(
                             asyncFunc,
-                            asyncLookupOptions.asyncTimeout,
-                            asyncLookupOptions.asyncBufferCapacity,
-                            asyncLookupOptions.asyncOutputMode),
+                            asyncOptions.asyncTimeout,
+                            asyncOptions.asyncBufferCapacity,
+                            asyncOptions.asyncOutputMode),
                     InternalTypeInfo.of(getOutputType()),
                     inputTransformation.getParallelism(),
                     false);
-        } else if (asyncLookupOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.ORDERED) {
+        } else if (asyncOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.ORDERED) {

Review Comment:
   This a good question! Let me share some points here.
   
   When using materialize, it means lookup operator will store the lookup 
results in its state. So when a update-before or delete message arrives, the 
lookup operator tries to search the results in its state. If state contains the 
results, it emits the result with the content in the state to keep the output 
of the lookup join op is deterministic.
   
   > Why shuffle is required for cdc mode
   
   Planner requires the lookup join operator uses keyed state to make sure all 
messages with the same lookup keys should be located at the same subtask.  
Currently, Flink requires there is a shuffle before a keyed stream.
   
   > And upsertMaterialize seems to be always false for lookup join. 
   
   In some cases, planner will omit upsertMaterialize. You can take a look at 
StreamNonDeterministicUpdatePlanVisitor#visitLookupJoin.
   
   First of all, users should set 
'table.optimizer.non-deterministic-update.strategy' = 'TRY_RESOLVE';
   Then user's query should not use pk of the upstream operator as lookup keys 
or ...
   
   >  async is disabled when upsertMaterialize for lookup join
   
   Because current AsyncWaitOperator is not friendly to cdc stream. For example 
, +I message and -D message are almost at the same time arriving at the 
operator and then both enter the input queue. It means the async lookup join 
function should process these messages at the same time. It's possible that the 
output of +I message and -D message are different if lookup source is changed 
frequently. 
   
   Hope my explanation can solve your questions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to