fsk119 commented on code in PR #26630: URL: https://github.com/apache/flink/pull/26630#discussion_r2128005528
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalMLPredictTableFunction.java: ########## @@ -74,4 +116,107 @@ public RelWriter explainTerms(RelWriter pw) { .item("invocation", scan.getCall()) .item("rowType", getRowType()); } + + private MLPredictSpec buildMLPredictSpec() { + RexTableArgCall tableCall = extractOperand(operand -> operand instanceof RexTableArgCall); + RexCall descriptorCall = + extractOperand( + operand -> + operand instanceof RexCall + && ((RexCall) operand).getOperator() + instanceof SqlDescriptorOperator); + Map<String, Integer> column2Index = new HashMap<>(); + List<String> fieldNames = tableCall.getType().getFieldNames(); + for (int i = 0; i < fieldNames.size(); i++) { + column2Index.put(fieldNames.get(i), i); + } + List<LookupJoinUtil.LookupKey> features = + descriptorCall.getOperands().stream() + .map( + operand -> { + if (operand instanceof RexLiteral) { + RexLiteral literal = (RexLiteral) operand; + String fieldName = RexLiteral.stringValue(literal); + Integer index = column2Index.get(fieldName); + if (index == null) { + throw new TableException( + String.format( + "Field %s is not found in input schema: %s.", + fieldName, tableCall.getType())); + } + return new LookupJoinUtil.FieldRefLookupKey(index); + } else { + throw new TableException( + String.format( + "Unknown operand for descriptor operator: %s.", + operand)); + } + }) + .collect(Collectors.toList()); + return new MLPredictSpec(features, Collections.emptyMap()); + } + + private ModelSpec buildModelSpec(RexModelCall modelCall) { + ModelSpec modelSpec = new ModelSpec(modelCall.getContextResolvedModel()); + modelSpec.setModelProvider(modelCall.getModelProvider()); + return modelSpec; + } + + private LookupJoinUtil.AsyncLookupOptions buildAsyncOptions(RexModelCall modelCall) { + boolean isAsyncEnabled = isAsyncMLPredict(modelCall.getModelProvider()); Review Comment: Yes. I open a ticket for this. If we finish all these features in this PR, it will add a lot of codes. https://issues.apache.org/jira/browse/FLINK-37903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org