raminqaf commented on code in PR #28025:
URL: https://github.com/apache/flink/pull/28025#discussion_r3196026243


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -273,23 +274,54 @@ private static Optional<List<DataType>> 
validateErrorHandling(
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
+    /**
+     * Resolves the op column name from the {@code op} descriptor argument, 
falling back to {@link
+     * #DEFAULT_OP_COLUMN_NAME} when the argument is omitted or empty.
+     */
+    public static String resolveOpColumnName(final CallContext callContext) {
         return callContext
                 .getArgumentValue(ARG_OP, ColumnList.class)
                 .filter(cl -> !cl.getNames().isEmpty())
                 .map(cl -> cl.getNames().get(0))
                 .orElse(DEFAULT_OP_COLUMN_NAME);
     }
 
-    private static List<Field> buildOutputFields(
+    /**
+     * Computes the indices of input columns that pass through to the 
function's output. Excludes
+     * the op column (becomes RowKind) and partition key columns (which the 
framework prepends

Review Comment:
   Done



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java:
##########
@@ -158,21 +159,21 @@ private static Optional<List<DataType>> validateOpColumn(
         final TableSemantics tableSemantics = 
callContext.getTableSemantics(ARG_TABLE).get();
         final String opColumnName = resolveOpColumnName(callContext);
         final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
-        final Optional<Field> opField =
-                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
-        if (opField.isEmpty()) {
+        final Integer opIndex = 
buildFieldNameToIndex(inputFields).get(opColumnName);
+        if (opIndex == null) {
             return callContext.fail(
                     throwOnFailure,
                     String.format(
                             "The op column '%s' does not exist in the input 
schema.",
                             opColumnName));
         }
-        if 
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {
+        final Field opField = inputFields.get(opIndex);
+        if 
(!opField.getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to