fsk119 commented on code in PR #26630:
URL: https://github.com/apache/flink/pull/26630#discussion_r2128005528


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java:
##########
@@ -74,4 +116,107 @@ public RelWriter explainTerms(RelWriter pw) {
                 .item("invocation", scan.getCall())
                 .item("rowType", getRowType());
     }
+
+    private MLPredictSpec buildMLPredictSpec() {
+        RexTableArgCall tableCall = extractOperand(operand -> operand 
instanceof RexTableArgCall);
+        RexCall descriptorCall =
+                extractOperand(
+                        operand ->
+                                operand instanceof RexCall
+                                        && ((RexCall) operand).getOperator()
+                                                instanceof 
SqlDescriptorOperator);
+        Map<String, Integer> column2Index = new HashMap<>();
+        List<String> fieldNames = tableCall.getType().getFieldNames();
+        for (int i = 0; i < fieldNames.size(); i++) {
+            column2Index.put(fieldNames.get(i), i);
+        }
+        List<LookupJoinUtil.LookupKey> features =
+                descriptorCall.getOperands().stream()
+                        .map(
+                                operand -> {
+                                    if (operand instanceof RexLiteral) {
+                                        RexLiteral literal = (RexLiteral) 
operand;
+                                        String fieldName = 
RexLiteral.stringValue(literal);
+                                        Integer index = 
column2Index.get(fieldName);
+                                        if (index == null) {
+                                            throw new TableException(
+                                                    String.format(
+                                                            "Field %s is not 
found in input schema: %s.",
+                                                            fieldName, 
tableCall.getType()));
+                                        }
+                                        return new 
LookupJoinUtil.FieldRefLookupKey(index);
+                                    } else {
+                                        throw new TableException(
+                                                String.format(
+                                                        "Unknown operand for 
descriptor operator: %s.",
+                                                        operand));
+                                    }
+                                })
+                        .collect(Collectors.toList());
+        return new MLPredictSpec(features, Collections.emptyMap());
+    }
+
+    private ModelSpec buildModelSpec(RexModelCall modelCall) {
+        ModelSpec modelSpec = new 
ModelSpec(modelCall.getContextResolvedModel());
+        modelSpec.setModelProvider(modelCall.getModelProvider());
+        return modelSpec;
+    }
+
+    private LookupJoinUtil.AsyncLookupOptions buildAsyncOptions(RexModelCall 
modelCall) {
+        boolean isAsyncEnabled = 
isAsyncMLPredict(modelCall.getModelProvider());

Review Comment:
   Yes. I open a ticket for this. If we finish all these features in this PR, 
it will add a lot of codes.
   
   https://issues.apache.org/jira/browse/FLINK-37903



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to