lihaosky commented on code in PR #26641:
URL: https://github.com/apache/flink/pull/26641#discussion_r2136342446


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java:
##########
@@ -256,21 +292,21 @@ private Transformation<RowData> createAsyncModelPredict(
                                 config, classLoader, null, inputRowType),
                         InternalSerializers.create(modelOutputType),
                         false,
-                        asyncLookupOptions.asyncBufferCapacity);
-        if (asyncLookupOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.UNORDERED) {
+                        asyncOptions.asyncBufferCapacity);
+        if (asyncOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.UNORDERED) {
             // The input stream is insert-only.
             return ExecNodeUtil.createOneInputTransformation(
                     inputTransformation,
                     createTransformationMeta(ML_PREDICT_TRANSFORMATION, 
config),
                     new AsyncWaitOperatorFactory<>(
                             asyncFunc,
-                            asyncLookupOptions.asyncTimeout,
-                            asyncLookupOptions.asyncBufferCapacity,
-                            asyncLookupOptions.asyncOutputMode),
+                            asyncOptions.asyncTimeout,
+                            asyncOptions.asyncBufferCapacity,
+                            asyncOptions.asyncOutputMode),
                     InternalTypeInfo.of(getOutputType()),
                     inputTransformation.getParallelism(),
                     false);
-        } else if (asyncLookupOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.ORDERED) {
+        } else if (asyncOptions.asyncOutputMode == 
AsyncDataStream.OutputMode.ORDERED) {

Review Comment:
   I think we should only support insert mode for ml_predict then 
(StreamNonDeterministicUpdatePlanVisitor should reject upsert mode for 
mlpredict plan) since ml_predict function itself isn't deterministic. 
Non-deterministic function can result in error according to 
https://docs.confluent.io/cloud/current/flink/concepts/determinism.html. We can 
support cdc mode later by introducing configs user can use to tell us their 
model is deterministic. Created 
https://issues.apache.org/jira/browse/FLINK-37928 and 
https://issues.apache.org/jira/browse/FLINK-37929



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to