fsk119 commented on code in PR #26641: URL: https://github.com/apache/flink/pull/26641#discussion_r2135228993
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java: ########## @@ -256,21 +292,21 @@ private Transformation<RowData> createAsyncModelPredict( config, classLoader, null, inputRowType), InternalSerializers.create(modelOutputType), false, - asyncLookupOptions.asyncBufferCapacity); - if (asyncLookupOptions.asyncOutputMode == AsyncDataStream.OutputMode.UNORDERED) { + asyncOptions.asyncBufferCapacity); + if (asyncOptions.asyncOutputMode == AsyncDataStream.OutputMode.UNORDERED) { // The input stream is insert-only. return ExecNodeUtil.createOneInputTransformation( inputTransformation, createTransformationMeta(ML_PREDICT_TRANSFORMATION, config), new AsyncWaitOperatorFactory<>( asyncFunc, - asyncLookupOptions.asyncTimeout, - asyncLookupOptions.asyncBufferCapacity, - asyncLookupOptions.asyncOutputMode), + asyncOptions.asyncTimeout, + asyncOptions.asyncBufferCapacity, + asyncOptions.asyncOutputMode), InternalTypeInfo.of(getOutputType()), inputTransformation.getParallelism(), false); - } else if (asyncLookupOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED) { + } else if (asyncOptions.asyncOutputMode == AsyncDataStream.OutputMode.ORDERED) { Review Comment: This a good question! Let me share some points here. When using materialize, it means lookup operator will store the lookup results in its state. So when a update-before or delete message arrives, the lookup operator tries to search the results in its state. If state contains the results, it emits the result with the content in the state to keep the output of the lookup join op is deterministic. > Why shuffle is required for cdc mode Planner requires the lookup join operator uses keyed state to make sure all messages with the same lookup keys should be located at the same subtask. Currently, Flink requires there is a shuffle before a keyed stream. > And upsertMaterialize seems to be always false for lookup join. In some cases, planner will omit upsertMaterialize. You can take a look at StreamNonDeterministicUpdatePlanVisitor#visitLookupJoin. First of all, users should set 'table.optimizer.non-deterministic-update.strategy' = 'TRY_RESOLVE'; Then user's query should not use pk of the upstream operator as lookup keys or ... > async is disabled when upsertMaterialize for lookup join Because current AsyncWaitOperator is not friendly to cdc stream. For example , +I message and -D message are almost at the same time arriving at the operator and then both enter the input queue. It means the async lookup join function should process these messages at the same time. It's possible that the output of +I message and -D message are different if lookup source is changed frequently. Hope my explanation can solve your questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org