lihaosky commented on code in PR #26630: URL: https://github.com/apache/flink/pull/26630#discussion_r2127260784
########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java: ########## @@ -443,6 +443,36 @@ public class ExecutionConfigOptions { "The max number of async retry attempts to make before task " + "execution is failed."); + // ------------------------------------------------------------------------ + // Async ML_PREDICT Options + // ------------------------------------------------------------------------ + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY = + key("table.exec.async-ml-predict.buffer-capacity") + .intType() + .defaultValue(100) Review Comment: Will 100 concurrent http calls be too much? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ModelPredictRuntimeProviderContext.java: ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions; + +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.ResolvedCatalogModel; +import org.apache.flink.table.ml.ModelProvider; + +/** Context to provide the query information. */ +public class ModelPredictRuntimeProviderContext implements ModelProvider.Context { Review Comment: Move to a ml folder? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java: ########## @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.FlinkVersion; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.datastream.AsyncDataStream; +import org.apache.flink.streaming.api.functions.async.AsyncFunction; +import org.apache.flink.streaming.api.operators.ProcessOperator; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.DataTypeFactory; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.conversion.DataStructureConverter; +import org.apache.flink.table.data.conversion.DataStructureConverters; +import org.apache.flink.table.functions.AsyncPredictFunction; +import org.apache.flink.table.functions.PredictFunction; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.ml.AsyncPredictRuntimeProvider; +import org.apache.flink.table.ml.ModelProvider; +import org.apache.flink.table.ml.PredictRuntimeProvider; +import org.apache.flink.table.planner.calcite.FlinkContext; +import org.apache.flink.table.planner.codegen.CodeGeneratorContext; +import org.apache.flink.table.planner.codegen.FilterCodeGenerator; +import org.apache.flink.table.planner.codegen.LookupJoinCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.spec.MLPredictSpec; +import org.apache.flink.table.planner.plan.nodes.exec.spec.ModelSpec; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.LookupJoinUtil; +import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; +import org.apache.flink.table.runtime.collector.ListenableCollector; +import org.apache.flink.table.runtime.collector.TableFunctionResultFuture; +import org.apache.flink.table.runtime.functions.ModelPredictRuntimeProviderContext; +import org.apache.flink.table.runtime.generated.GeneratedCollector; +import org.apache.flink.table.runtime.generated.GeneratedFunction; +import org.apache.flink.table.runtime.generated.GeneratedResultFuture; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner; +import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner; +import org.apache.flink.table.runtime.typeutils.InternalSerializers; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.stream.IntStream; + +/** Stream {@link ExecNode} for {@code ML_PREDICT}. */ +@ExecNodeMetadata( + name = "stream-exec-ml-predict-table-function", + version = 1, + producedTransformations = StreamExecMLPredictTableFunction.ML_PREDICT_TRANSFORMATION, + minPlanVersion = FlinkVersion.V2_1, + minStateVersion = FlinkVersion.V2_1) +public class StreamExecMLPredictTableFunction extends ExecNodeBase<RowData> + implements MultipleTransformationTranslator<RowData>, StreamExecNode<RowData> { + + public static final String PARTITIONER_TRANSFORMATION = "partitioner"; + + public static final String ML_PREDICT_TRANSFORMATION = "ml-predict-table-function"; + + private final MLPredictSpec mlPredictSpec; + private final ModelSpec modelSpec; + private final @Nullable LookupJoinUtil.AsyncLookupOptions asyncLookupOptions; + private final @Nullable int[] inputUpsertKeys; Review Comment: Why don't we provide serialization like other exec node? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java: ########## @@ -443,6 +443,36 @@ public class ExecutionConfigOptions { "The max number of async retry attempts to make before task " + "execution is failed."); + // ------------------------------------------------------------------------ + // Async ML_PREDICT Options + // ------------------------------------------------------------------------ + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY = + key("table.exec.async-ml-predict.buffer-capacity") + .intType() + .defaultValue(100) + .withDescription( + "The max number of async i/o operation that the async ml predict can trigger."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_ML_PREDICT_TIMEOUT = + key("table.exec.async-ml-predict.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(3)) + .withDescription( + "The async timeout for the asynchronous operation to complete."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE = + key("table.exec.async-ml-predict.output-mode") + .enumType(AsyncOutputMode.class) + .defaultValue(AsyncOutputMode.ALLOW_UNORDERED) Review Comment: Default to `ORDERED`. Ordered is the default behavior for sync execution which we don't want to change if switch to async? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java: ########## @@ -74,4 +116,107 @@ public RelWriter explainTerms(RelWriter pw) { .item("invocation", scan.getCall()) .item("rowType", getRowType()); } + + private MLPredictSpec buildMLPredictSpec() { + RexTableArgCall tableCall = extractOperand(operand -> operand instanceof RexTableArgCall); + RexCall descriptorCall = + extractOperand( + operand -> + operand instanceof RexCall + && ((RexCall) operand).getOperator() + instanceof SqlDescriptorOperator); + Map<String, Integer> column2Index = new HashMap<>(); + List<String> fieldNames = tableCall.getType().getFieldNames(); + for (int i = 0; i < fieldNames.size(); i++) { + column2Index.put(fieldNames.get(i), i); + } + List<LookupJoinUtil.LookupKey> features = + descriptorCall.getOperands().stream() + .map( + operand -> { + if (operand instanceof RexLiteral) { + RexLiteral literal = (RexLiteral) operand; + String fieldName = RexLiteral.stringValue(literal); + Integer index = column2Index.get(fieldName); + if (index == null) { + throw new TableException( + String.format( + "Field %s is not found in input schema: %s.", + fieldName, tableCall.getType())); + } + return new LookupJoinUtil.FieldRefLookupKey(index); + } else { + throw new TableException( + String.format( + "Unknown operand for descriptor operator: %s.", + operand)); + } + }) + .collect(Collectors.toList()); + return new MLPredictSpec(features, Collections.emptyMap()); + } + + private ModelSpec buildModelSpec(RexModelCall modelCall) { + ModelSpec modelSpec = new ModelSpec(modelCall.getContextResolvedModel()); + modelSpec.setModelProvider(modelCall.getModelProvider()); + return modelSpec; + } + + private LookupJoinUtil.AsyncLookupOptions buildAsyncOptions(RexModelCall modelCall) { + boolean isAsyncEnabled = isAsyncMLPredict(modelCall.getModelProvider()); Review Comment: Sync or not is controlled by `async` option in the map in the FLIP. If `map['async', 'true']` appears, provider must implement `AsyncPredictRuntime` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java: ########## @@ -443,6 +443,36 @@ public class ExecutionConfigOptions { "The max number of async retry attempts to make before task " + "execution is failed."); + // ------------------------------------------------------------------------ + // Async ML_PREDICT Options + // ------------------------------------------------------------------------ + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Integer> TABLE_EXEC_ASYNC_ML_PREDICT_BUFFER_CAPACITY = + key("table.exec.async-ml-predict.buffer-capacity") + .intType() + .defaultValue(100) + .withDescription( + "The max number of async i/o operation that the async ml predict can trigger."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Duration> TABLE_EXEC_ASYNC_ML_PREDICT_TIMEOUT = + key("table.exec.async-ml-predict.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(3)) + .withDescription( + "The async timeout for the asynchronous operation to complete."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<AsyncOutputMode> TABLE_EXEC_ASYNC_ML_PREDICT_OUTPUT_MODE = + key("table.exec.async-ml-predict.output-mode") + .enumType(AsyncOutputMode.class) + .defaultValue(AsyncOutputMode.ALLOW_UNORDERED) + .withDescription( + "Output mode for asynchronous operations which will convert to {@see AsyncDataStream.OutputMode}, ALLOW_UNORDERED by default. " + + "That is to say the planner will attempt to use {@see AsyncDataStream.OutputMode.UNORDERED} when it does not " + + "affect the correctness of the result, otherwise ORDERED will be still used."); Review Comment: This is missing other configs like: `max-attempts`, `retry-strategy`, `retry-delay`? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/table/AsyncMLPredictITCase.java: ########## @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.table; + +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesModelFactory; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow; +import static org.assertj.core.api.Assertions.assertThatList; + +/** ITCase for async ML_PREDICT. */ +@ExtendWith(ParameterizedTestExtension.class) +public class AsyncMLPredictITCase extends StreamingWithStateTestBase { + + private final Boolean objectReuse; + private final ExecutionConfigOptions.AsyncOutputMode asyncOutputMode; + + public AsyncMLPredictITCase( + StateBackendMode backend, + Boolean objectReuse, + ExecutionConfigOptions.AsyncOutputMode asyncOutputMode) { + super(backend); + + this.objectReuse = objectReuse; + this.asyncOutputMode = asyncOutputMode; + } + + private final List<Row> data = + Arrays.asList( + Row.of(1L, 12, "Julian"), + Row.of(2L, 15, "Hello"), + Row.of(3L, 15, "Fabian"), + Row.of(8L, 11, "Hello world"), + Row.of(9L, 12, "Hello world!")); + + private final List<Row> dataWithNull = + Arrays.asList( + Row.of(15L, null, "Hello"), + Row.of(3L, 15, "Fabian"), + Row.of(11L, null, "Hello world"), + Row.of(9L, 12, "Hello world!")); + + private final List<Row> cdcRowData = Review Comment: Should CDC be supported if ml_predict output is non-deterministic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org