fsk119 commented on code in PR #26644:
URL: https://github.com/apache/flink/pull/26644#discussion_r2139479428


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLPredictTableFunction.java:
##########
@@ -134,100 +123,13 @@ public SqlOperandCountRange getOperandCountRange() {
 
         @Override
         public boolean isOptional(int i) {
-            return i > getOperandCountRange().getMin() && i <= 
getOperandCountRange().getMax();
+            return i >= getOperandCountRange().getMin() && i <= 
getOperandCountRange().getMax();

Review Comment:
   i > getOperandCountRange().getMin() && i <= getOperandCountRange().getMax() 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";
+    public static final String PARAM_TASK = "TASK";
+
+    public static final List<String> SUPPORTED_TASKS =

Review Comment:
   I think it is part of public API. It's better we can add an enum class to 
illustrate.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";

Review Comment:
   In ML_PREDICT, we use `ARGS` to represent PredictFunction's input, but here 
we use `FEATURE`. It's better we can have the same name.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";
+    public static final String PARAM_TASK = "TASK";
+
+    public static final List<String> SUPPORTED_TASKS =
+            List.of("regression", "clustering", "classification", "embedding", 
"text_generation");
+
+    public SqlMLEvaluateTableFunction() {
+        super("ML_EVALUATE", new EvaluateOperandMetadata());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Overrides because the first parameter of ML table-value function is 
an explicit TABLE
+     * parameter, which is not scalar.
+     */
+    @Override
+    public boolean argumentMustBeScalar(int ordinal) {
+        return ordinal != 0;
+    }
+
+    @Override
+    protected RelDataType inferRowType(SqlOperatorBinding opBinding) {
+        final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+        final RelDataType inputRowType = opBinding.getOperandType(0);
+
+        // Create a MAP type for evaluation results
+        RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+        RelDataType valueType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+        RelDataType mapType = typeFactory.createMapType(keyType, valueType);
+
+        return typeFactory
+                .builder()
+                .kind(inputRowType.getStructKind())
+                .add("result", mapType)
+                .build();
+    }
+
+    private static class EvaluateOperandMetadata implements SqlOperandMetadata 
{
+        private static final List<String> PARAM_NAMES =
+                List.of(
+                        PARAM_INPUT,
+                        PARAM_MODEL,
+                        PARAM_LABEL,
+                        PARAM_FEATURE,
+                        PARAM_TASK,
+                        PARAM_CONFIG);
+        private static final List<String> MANDATORY_PARAM_NAMES =
+                List.of(PARAM_INPUT, PARAM_MODEL, PARAM_LABEL, PARAM_FEATURE);
+
+        EvaluateOperandMetadata() {}
+
+        @Override
+        public List<RelDataType> paramTypes(RelDataTypeFactory typeFactory) {
+            return Collections.nCopies(
+                    PARAM_NAMES.size(), 
typeFactory.createSqlType(SqlTypeName.ANY));
+        }
+
+        @Override
+        public List<String> paramNames() {
+            return PARAM_NAMES;
+        }
+
+        @Override
+        public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+            if 
(!SqlValidatorUtils.checkTableAndDescriptorOperands(callBinding, 2, 3)) {
+                return 
SqlValidatorUtils.throwValidationSignatureErrorOrReturnFalse(
+                        callBinding, throwOnFailure);
+            }
+
+            if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                    checkModelSignature(callBinding, 3, 2), throwOnFailure)) {
+                return false;
+            }
+
+            if (callBinding.getOperandCount() == PARAM_NAMES.size()) {
+                // Check task and config parameters
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTask(callBinding.operand(4)), throwOnFailure)) {
+                    return false;
+                }
+
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkConfig(callBinding.operand(5)), throwOnFailure)) {
+                    return false;
+                }
+            }
+
+            if (callBinding.getOperandCount() == MANDATORY_PARAM_NAMES.size() 
+ 1) {
+                // Last param can be config or task
+                return SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTaskOrConfig(callBinding.operand(4)), 
throwOnFailure);
+            }
+            return true;
+        }
+
+        @Override
+        public SqlOperandCountRange getOperandCountRange() {
+            return SqlOperandCountRanges.between(MANDATORY_PARAM_NAMES.size(), 
PARAM_NAMES.size());
+        }
+
+        @Override
+        public boolean isOptional(int i) {
+            return i > getOperandCountRange().getMin() && i <= 
getOperandCountRange().getMax();
+        }
+
+        @Override
+        public String getAllowedSignatures(SqlOperator op, String opName) {
+            return opName
+                    + "(TABLE table_name, MODEL model_name, 
DESCRIPTOR(label_column), DESCRIPTOR(feature_columns), [task], [MAP[]])";
+        }
+
+        private static Optional<RuntimeException> checkTask(SqlNode node) {
+            // Check if the task is a valid string
+            if (!(node instanceof SqlCharStringLiteral)) {
+                return Optional.of(
+                        new RuntimeException(
+                                "Expected a valid task string, but got: " + 
node + "."));
+            }
+
+            String task = ((SqlCharStringLiteral) 
node).getValueAs(NlsString.class).getValue();
+            if (SUPPORTED_TASKS.stream()
+                    .noneMatch(supportedTask -> 
supportedTask.equalsIgnoreCase(task))) {
+                return Optional.of(
+                        new RuntimeException(

Review Comment:
   Use ValidationException



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLTableFunction.java:
##########
@@ -103,4 +115,132 @@ public SqlReturnTypeInference getRowTypeInference() {
     }
 
     protected abstract RelDataType inferRowType(SqlOperatorBinding opBinding);
+
+    protected static Optional<RuntimeException> checkModelSignature(
+            SqlCallBinding callBinding, int inputDescriptorIndex, int 
outputDescriptorIndex) {
+        SqlValidator validator = callBinding.getValidator();
+
+        // Check second operand is SqlModelCall
+        if (!(callBinding.operand(1) instanceof SqlModelCall)) {
+            return Optional.of(
+                    new ValidationException("Second operand must be a model 
identifier."));
+        }
+
+        // Get input descriptor columns
+        SqlCall descriptorCall = (SqlCall) 
callBinding.operand(inputDescriptorIndex);
+        List<SqlNode> descriptCols = descriptorCall.getOperandList();
+
+        // Get model input size
+        SqlModelCall modelCall = (SqlModelCall) callBinding.operand(1);
+        RelDataType modelInputType = modelCall.getInputType(validator);
+
+        // Check sizes match
+        if (descriptCols.size() != modelInputType.getFieldCount()) {
+            return Optional.of(
+                    new ValidationException(
+                            String.format(
+                                    "Number of input descriptor columns (%d) 
does not match model input size (%d).",
+                                    descriptCols.size(), 
modelInputType.getFieldCount())));
+        }
+
+        // Check input types match
+        final RelDataType tableType = 
validator.getValidatedNodeType(callBinding.operand(0));
+        final SqlNameMatcher matcher = 
validator.getCatalogReader().nameMatcher();
+        for (int i = 0; i < descriptCols.size(); i++) {
+            Tuple3<Boolean, LogicalType, LogicalType> result =
+                    checkModelDescriptorType(
+                            tableType,
+                            modelInputType.getFieldList().get(i).getType(),
+                            descriptCols.get(i),
+                            matcher);
+            if (!result.f0) {
+                return Optional.of(
+                        new ValidationException(
+                                String.format(
+                                        "Input descriptor column type %s 
cannot be assigned to model input type %s at position %d.",
+                                        result.f1, result.f2, i)));
+            }
+        }
+
+        // Check output
+        if (outputDescriptorIndex > 0) {
+            descriptorCall = (SqlCall) 
callBinding.operand(outputDescriptorIndex);
+            descriptCols = descriptorCall.getOperandList();
+            if (descriptCols.size() != 1) {
+                return Optional.of(
+                        new ValidationException(
+                                "Label descriptor must have exactly one column 
for evaluation."));
+            }
+            RelDataType modelOutputType = modelCall.getOutputType(validator);
+            if (modelOutputType.getFieldCount() != 1) {
+                return Optional.of(
+                        new ValidationException(
+                                "Model output must have exactly one field for 
evaluation."));
+            }
+
+            Tuple3<Boolean, LogicalType, LogicalType> result =
+                    checkModelDescriptorType(
+                            tableType,
+                            modelOutputType.getFieldList().get(0).getType(),
+                            descriptCols.get(0),
+                            matcher);
+            if (!result.f0) {
+                return Optional.of(
+                        new ValidationException(
+                                String.format(
+                                        "Label descriptor column type %s 
cannot be assigned to model output type %s for evaluation.",
+                                        result.f1, result.f2)));
+            }
+        }
+
+        return Optional.empty();
+    }
+
+    private static Tuple3<Boolean, LogicalType, LogicalType> 
checkModelDescriptorType(
+            RelDataType tableType,
+            RelDataType modelType,
+            SqlNode descriptorNode,
+            SqlNameMatcher matcher) {
+        SqlIdentifier columnName = (SqlIdentifier) descriptorNode;
+        String descriptColName =
+                columnName.isSimple() ? columnName.getSimple() : 
Util.last(columnName.names);
+        int index = matcher.indexOf(tableType.getFieldNames(), 
descriptColName);
+        RelDataType sourceType = tableType.getFieldList().get(index).getType();
+
+        LogicalType sourceLogicalType = toLogicalType(sourceType);
+        LogicalType targetLogicalType = toLogicalType(modelType);
+
+        return Tuple3.of(
+                LogicalTypeCasts.supportsImplicitCast(sourceLogicalType, 
targetLogicalType),
+                sourceLogicalType,
+                targetLogicalType);
+    }
+
+    protected static Optional<RuntimeException> checkConfig(SqlNode 
configNode) {

Review Comment:
   I have already merged PR about runtime config usage. It's better we can use 
the latest check logic.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecMLPredictTableFunction.java:
##########
@@ -81,8 +81,8 @@
         name = "stream-exec-ml-predict-table-function",
         version = 1,
         producedTransformations = 
StreamExecMLPredictTableFunction.ML_PREDICT_TRANSFORMATION,
-        minPlanVersion = FlinkVersion.V2_1,

Review Comment:
   Do we need to modify this?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";
+    public static final String PARAM_TASK = "TASK";
+
+    public static final List<String> SUPPORTED_TASKS =
+            List.of("regression", "clustering", "classification", "embedding", 
"text_generation");
+
+    public SqlMLEvaluateTableFunction() {
+        super("ML_EVALUATE", new EvaluateOperandMetadata());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Overrides because the first parameter of ML table-value function is 
an explicit TABLE
+     * parameter, which is not scalar.
+     */
+    @Override
+    public boolean argumentMustBeScalar(int ordinal) {
+        return ordinal != 0;
+    }
+
+    @Override
+    protected RelDataType inferRowType(SqlOperatorBinding opBinding) {
+        final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+        final RelDataType inputRowType = opBinding.getOperandType(0);
+
+        // Create a MAP type for evaluation results
+        RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+        RelDataType valueType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+        RelDataType mapType = typeFactory.createMapType(keyType, valueType);
+
+        return typeFactory
+                .builder()
+                .kind(inputRowType.getStructKind())
+                .add("result", mapType)
+                .build();
+    }
+
+    private static class EvaluateOperandMetadata implements SqlOperandMetadata 
{
+        private static final List<String> PARAM_NAMES =
+                List.of(
+                        PARAM_INPUT,
+                        PARAM_MODEL,
+                        PARAM_LABEL,
+                        PARAM_FEATURE,
+                        PARAM_TASK,
+                        PARAM_CONFIG);
+        private static final List<String> MANDATORY_PARAM_NAMES =
+                List.of(PARAM_INPUT, PARAM_MODEL, PARAM_LABEL, PARAM_FEATURE);
+
+        EvaluateOperandMetadata() {}
+
+        @Override
+        public List<RelDataType> paramTypes(RelDataTypeFactory typeFactory) {
+            return Collections.nCopies(
+                    PARAM_NAMES.size(), 
typeFactory.createSqlType(SqlTypeName.ANY));
+        }
+
+        @Override
+        public List<String> paramNames() {
+            return PARAM_NAMES;
+        }
+
+        @Override
+        public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+            if 
(!SqlValidatorUtils.checkTableAndDescriptorOperands(callBinding, 2, 3)) {
+                return 
SqlValidatorUtils.throwValidationSignatureErrorOrReturnFalse(
+                        callBinding, throwOnFailure);
+            }
+
+            if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                    checkModelSignature(callBinding, 3, 2), throwOnFailure)) {
+                return false;
+            }
+
+            if (callBinding.getOperandCount() == PARAM_NAMES.size()) {
+                // Check task and config parameters
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTask(callBinding.operand(4)), throwOnFailure)) {
+                    return false;
+                }
+
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkConfig(callBinding.operand(5)), throwOnFailure)) {
+                    return false;
+                }
+            }
+
+            if (callBinding.getOperandCount() == MANDATORY_PARAM_NAMES.size() 
+ 1) {
+                // Last param can be config or task
+                return SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTaskOrConfig(callBinding.operand(4)), 
throwOnFailure);

Review Comment:
   It's better we use `MANDATORY_PARAM_NAMES.size() + 1` here because if 
condition doesn't use hard-code number



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";
+    public static final String PARAM_TASK = "TASK";
+
+    public static final List<String> SUPPORTED_TASKS =
+            List.of("regression", "clustering", "classification", "embedding", 
"text_generation");
+
+    public SqlMLEvaluateTableFunction() {
+        super("ML_EVALUATE", new EvaluateOperandMetadata());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Overrides because the first parameter of ML table-value function is 
an explicit TABLE
+     * parameter, which is not scalar.
+     */
+    @Override
+    public boolean argumentMustBeScalar(int ordinal) {
+        return ordinal != 0;
+    }
+
+    @Override
+    protected RelDataType inferRowType(SqlOperatorBinding opBinding) {
+        final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+        final RelDataType inputRowType = opBinding.getOperandType(0);
+
+        // Create a MAP type for evaluation results
+        RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+        RelDataType valueType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+        RelDataType mapType = typeFactory.createMapType(keyType, valueType);
+
+        return typeFactory
+                .builder()
+                .kind(inputRowType.getStructKind())
+                .add("result", mapType)
+                .build();
+    }
+
+    private static class EvaluateOperandMetadata implements SqlOperandMetadata 
{
+        private static final List<String> PARAM_NAMES =
+                List.of(
+                        PARAM_INPUT,
+                        PARAM_MODEL,
+                        PARAM_LABEL,
+                        PARAM_FEATURE,
+                        PARAM_TASK,
+                        PARAM_CONFIG);
+        private static final List<String> MANDATORY_PARAM_NAMES =

Review Comment:
   I think task is required here. Because we don't require our users to create 
a model with option named `'task'`.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";
+    public static final String PARAM_TASK = "TASK";
+
+    public static final List<String> SUPPORTED_TASKS =
+            List.of("regression", "clustering", "classification", "embedding", 
"text_generation");
+
+    public SqlMLEvaluateTableFunction() {
+        super("ML_EVALUATE", new EvaluateOperandMetadata());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Overrides because the first parameter of ML table-value function is 
an explicit TABLE
+     * parameter, which is not scalar.
+     */
+    @Override
+    public boolean argumentMustBeScalar(int ordinal) {
+        return ordinal != 0;
+    }
+
+    @Override
+    protected RelDataType inferRowType(SqlOperatorBinding opBinding) {
+        final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+        final RelDataType inputRowType = opBinding.getOperandType(0);
+
+        // Create a MAP type for evaluation results
+        RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+        RelDataType valueType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+        RelDataType mapType = typeFactory.createMapType(keyType, valueType);
+
+        return typeFactory
+                .builder()
+                .kind(inputRowType.getStructKind())
+                .add("result", mapType)
+                .build();
+    }
+
+    private static class EvaluateOperandMetadata implements SqlOperandMetadata 
{
+        private static final List<String> PARAM_NAMES =
+                List.of(
+                        PARAM_INPUT,
+                        PARAM_MODEL,
+                        PARAM_LABEL,
+                        PARAM_FEATURE,
+                        PARAM_TASK,
+                        PARAM_CONFIG);
+        private static final List<String> MANDATORY_PARAM_NAMES =
+                List.of(PARAM_INPUT, PARAM_MODEL, PARAM_LABEL, PARAM_FEATURE);
+
+        EvaluateOperandMetadata() {}
+
+        @Override
+        public List<RelDataType> paramTypes(RelDataTypeFactory typeFactory) {
+            return Collections.nCopies(
+                    PARAM_NAMES.size(), 
typeFactory.createSqlType(SqlTypeName.ANY));
+        }
+
+        @Override
+        public List<String> paramNames() {
+            return PARAM_NAMES;
+        }
+
+        @Override
+        public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+            if 
(!SqlValidatorUtils.checkTableAndDescriptorOperands(callBinding, 2, 3)) {
+                return 
SqlValidatorUtils.throwValidationSignatureErrorOrReturnFalse(
+                        callBinding, throwOnFailure);
+            }
+
+            if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                    checkModelSignature(callBinding, 3, 2), throwOnFailure)) {
+                return false;
+            }
+
+            if (callBinding.getOperandCount() == PARAM_NAMES.size()) {
+                // Check task and config parameters
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTask(callBinding.operand(4)), throwOnFailure)) {
+                    return false;
+                }
+
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkConfig(callBinding.operand(5)), throwOnFailure)) {
+                    return false;
+                }
+            }
+
+            if (callBinding.getOperandCount() == MANDATORY_PARAM_NAMES.size() 
+ 1) {
+                // Last param can be config or task

Review Comment:
   I am confused about the comment. Considering the current function signature,
   ```
   return opName
                       + "(TABLE table_name, MODEL model_name, 
DESCRIPTOR(label_column), DESCRIPTOR(feature_columns), [task], [MAP[]])";
   ```
   The fifth parameter is always task. If not, we should throw a exception to 
notify users? 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/ml/SqlMLEvaluateTableFunction.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql.ml;
+
+import org.apache.flink.table.planner.functions.utils.SqlValidatorUtils;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperandCountRange;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorBinding;
+import org.apache.calcite.sql.type.SqlOperandCountRanges;
+import org.apache.calcite.sql.type.SqlOperandMetadata;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.NlsString;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * SqlMLEvaluateTableFunction implements an operator for model evaluation.
+ *
+ * <p>It allows six parameters:
+ *
+ * <ol>
+ *   <li>a table name
+ *   <li>a model name
+ *   <li>a descriptor to provide label column names from the input table
+ *   <li>a descriptor to provide feature column names from the input table
+ *   <li>an optional task override string
+ *   <li>an optional config map
+ * </ol>
+ *
+ * <p>The function returns a MAP type containing evaluation metrics and 
results.
+ */
+public class SqlMLEvaluateTableFunction extends SqlMLTableFunction {
+
+    public static final String PARAM_LABEL = "LABEL";
+    public static final String PARAM_FEATURE = "FEATURE";
+    public static final String PARAM_TASK = "TASK";
+
+    public static final List<String> SUPPORTED_TASKS =
+            List.of("regression", "clustering", "classification", "embedding", 
"text_generation");
+
+    public SqlMLEvaluateTableFunction() {
+        super("ML_EVALUATE", new EvaluateOperandMetadata());
+    }
+
+    /**
+     * {@inheritDoc}
+     *
+     * <p>Overrides because the first parameter of ML table-value function is 
an explicit TABLE
+     * parameter, which is not scalar.
+     */
+    @Override
+    public boolean argumentMustBeScalar(int ordinal) {
+        return ordinal != 0;
+    }
+
+    @Override
+    protected RelDataType inferRowType(SqlOperatorBinding opBinding) {
+        final RelDataTypeFactory typeFactory = opBinding.getTypeFactory();
+        final RelDataType inputRowType = opBinding.getOperandType(0);
+
+        // Create a MAP type for evaluation results
+        RelDataType keyType = typeFactory.createSqlType(SqlTypeName.VARCHAR);
+        RelDataType valueType = typeFactory.createSqlType(SqlTypeName.DOUBLE);
+        RelDataType mapType = typeFactory.createMapType(keyType, valueType);
+
+        return typeFactory
+                .builder()
+                .kind(inputRowType.getStructKind())
+                .add("result", mapType)
+                .build();
+    }
+
+    private static class EvaluateOperandMetadata implements SqlOperandMetadata 
{
+        private static final List<String> PARAM_NAMES =
+                List.of(
+                        PARAM_INPUT,
+                        PARAM_MODEL,
+                        PARAM_LABEL,
+                        PARAM_FEATURE,
+                        PARAM_TASK,
+                        PARAM_CONFIG);
+        private static final List<String> MANDATORY_PARAM_NAMES =
+                List.of(PARAM_INPUT, PARAM_MODEL, PARAM_LABEL, PARAM_FEATURE);
+
+        EvaluateOperandMetadata() {}
+
+        @Override
+        public List<RelDataType> paramTypes(RelDataTypeFactory typeFactory) {
+            return Collections.nCopies(
+                    PARAM_NAMES.size(), 
typeFactory.createSqlType(SqlTypeName.ANY));
+        }
+
+        @Override
+        public List<String> paramNames() {
+            return PARAM_NAMES;
+        }
+
+        @Override
+        public boolean checkOperandTypes(SqlCallBinding callBinding, boolean 
throwOnFailure) {
+            if 
(!SqlValidatorUtils.checkTableAndDescriptorOperands(callBinding, 2, 3)) {
+                return 
SqlValidatorUtils.throwValidationSignatureErrorOrReturnFalse(
+                        callBinding, throwOnFailure);
+            }
+
+            if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                    checkModelSignature(callBinding, 3, 2), throwOnFailure)) {
+                return false;
+            }
+
+            if (callBinding.getOperandCount() == PARAM_NAMES.size()) {
+                // Check task and config parameters
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTask(callBinding.operand(4)), throwOnFailure)) {
+                    return false;
+                }
+
+                if (!SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkConfig(callBinding.operand(5)), throwOnFailure)) {
+                    return false;
+                }
+            }
+
+            if (callBinding.getOperandCount() == MANDATORY_PARAM_NAMES.size() 
+ 1) {
+                // Last param can be config or task
+                return SqlValidatorUtils.throwExceptionOrReturnFalse(
+                        checkTaskOrConfig(callBinding.operand(4)), 
throwOnFailure);
+            }
+            return true;
+        }
+
+        @Override
+        public SqlOperandCountRange getOperandCountRange() {
+            return SqlOperandCountRanges.between(MANDATORY_PARAM_NAMES.size(), 
PARAM_NAMES.size());
+        }
+
+        @Override
+        public boolean isOptional(int i) {
+            return i > getOperandCountRange().getMin() && i <= 
getOperandCountRange().getMax();
+        }
+
+        @Override
+        public String getAllowedSignatures(SqlOperator op, String opName) {
+            return opName
+                    + "(TABLE table_name, MODEL model_name, 
DESCRIPTOR(label_column), DESCRIPTOR(feature_columns), [task], [MAP[]])";
+        }
+
+        private static Optional<RuntimeException> checkTask(SqlNode node) {
+            // Check if the task is a valid string
+            if (!(node instanceof SqlCharStringLiteral)) {
+                return Optional.of(
+                        new RuntimeException(
+                                "Expected a valid task string, but got: " + 
node + "."));
+            }
+
+            String task = ((SqlCharStringLiteral) 
node).getValueAs(NlsString.class).getValue();
+            if (SUPPORTED_TASKS.stream()
+                    .noneMatch(supportedTask -> 
supportedTask.equalsIgnoreCase(task))) {
+                return Optional.of(
+                        new RuntimeException(
+                                "Unsupported task: "
+                                        + task
+                                        + ". Supported tasks are: "
+                                        + String.join(", ", SUPPORTED_TASKS)
+                                        + "."));
+            }
+            return Optional.empty();
+        }
+
+        private static Optional<RuntimeException> checkTaskOrConfig(SqlNode 
node) {
+            if (node.getKind().equals(SqlKind.MAP_VALUE_CONSTRUCTOR)) {
+                // Check if the map is valid
+                return checkConfig(node);
+            } else if (node instanceof SqlCharStringLiteral) {
+                return checkTask(node);
+            } else {
+                return Optional.of(
+                        new RuntimeException(

Review Comment:
   Use ValidationException



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to