snuyanzin commented on code in PR #26250: URL: https://github.com/apache/flink/pull/26250#discussion_r1980914361
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java: ########## @@ -275,6 +294,149 @@ private List<Field> deriveFunctionOutputFields(DataType functionDataType) { .mapToObj(pos -> DataTypes.FIELD(fieldNames.get(pos), fieldTypes.get(pos))) .collect(Collectors.toList()); } + + private List<Field> deriveRowtimeField(CallContext callContext) { + if (this.functionKind != FunctionKind.PROCESS_TABLE) { + return List.of(); + } + final List<DataType> args = callContext.getArgumentDataTypes(); + + // Check if on_time is defined and non-empty + final int onTimePos = args.size() - 2; + final Set<String> onTimeFields = + callContext + .getArgumentValue(onTimePos, ColumnList.class) + .map(ColumnList::getNames) + .map(Set::copyOf) + .orElse(Set.of()); + + final Set<String> usedOnTimeFields = new HashSet<>(); + + final List<LogicalType> onTimeColumns = + IntStream.range(0, staticArgs.size()) + .mapToObj( + pos -> { + final StaticArgument staticArg = staticArgs.get(pos); + if (!staticArg.is(StaticArgumentTrait.TABLE)) { + return null; + } + final RowType rowType = + (RowType) args.get(pos).getLogicalType(); + final int onTimeColumn = + findUniqueOnTimeColumn( + staticArg.getName(), rowType, onTimeFields); + if (onTimeColumn >= 0) { + usedOnTimeFields.add( + rowType.getFieldNames().get(onTimeColumn)); + return rowType.getTypeAt(onTimeColumn); + } + if (staticArg.is(StaticArgumentTrait.REQUIRE_ON_TIME)) { + throw new ValidationException( + String.format( + "Table argument '%s' requires a time attribute. " + + "Please provide one using the implicit `on_time` argument. " + + "For example: myFunction(..., on_time => DESCRIPTOR(`my_timestamp`)", + staticArg.getName())); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + final Set<String> unusedOnTimeFields = new HashSet<>(onTimeFields); + unusedOnTimeFields.removeAll(usedOnTimeFields); + if (!unusedOnTimeFields.isEmpty()) { + throw new ValidationException( + "Invalid time attribute declaration. " + + "Each column in the `on_time` argument must reference at least one " + + "column in one of the table arguments. Unknown references: " + + unusedOnTimeFields); + } + + if (onTimeColumns.isEmpty()) { + return List.of(); + } + + // Don't allow mixtures of time attribute roots + if (onTimeColumns.stream().map(LogicalType::getTypeRoot).distinct().count() > 1) { + throw new ValidationException( + "Invalid time attribute declaration. " + + "All columns in the `on_time` argument must reference the same data type kind."); Review Comment: should we be more verbose here putting faced types in exception message? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java: ########## @@ -275,6 +294,149 @@ private List<Field> deriveFunctionOutputFields(DataType functionDataType) { .mapToObj(pos -> DataTypes.FIELD(fieldNames.get(pos), fieldTypes.get(pos))) .collect(Collectors.toList()); } + + private List<Field> deriveRowtimeField(CallContext callContext) { + if (this.functionKind != FunctionKind.PROCESS_TABLE) { + return List.of(); + } + final List<DataType> args = callContext.getArgumentDataTypes(); + + // Check if on_time is defined and non-empty + final int onTimePos = args.size() - 2; + final Set<String> onTimeFields = + callContext + .getArgumentValue(onTimePos, ColumnList.class) + .map(ColumnList::getNames) + .map(Set::copyOf) + .orElse(Set.of()); + + final Set<String> usedOnTimeFields = new HashSet<>(); + + final List<LogicalType> onTimeColumns = + IntStream.range(0, staticArgs.size()) + .mapToObj( + pos -> { + final StaticArgument staticArg = staticArgs.get(pos); + if (!staticArg.is(StaticArgumentTrait.TABLE)) { + return null; + } + final RowType rowType = + (RowType) args.get(pos).getLogicalType(); + final int onTimeColumn = + findUniqueOnTimeColumn( + staticArg.getName(), rowType, onTimeFields); + if (onTimeColumn >= 0) { + usedOnTimeFields.add( + rowType.getFieldNames().get(onTimeColumn)); + return rowType.getTypeAt(onTimeColumn); + } + if (staticArg.is(StaticArgumentTrait.REQUIRE_ON_TIME)) { + throw new ValidationException( + String.format( + "Table argument '%s' requires a time attribute. " + + "Please provide one using the implicit `on_time` argument. " + + "For example: myFunction(..., on_time => DESCRIPTOR(`my_timestamp`)", + staticArg.getName())); + } + return null; + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + + final Set<String> unusedOnTimeFields = new HashSet<>(onTimeFields); + unusedOnTimeFields.removeAll(usedOnTimeFields); + if (!unusedOnTimeFields.isEmpty()) { + throw new ValidationException( + "Invalid time attribute declaration. " + + "Each column in the `on_time` argument must reference at least one " + + "column in one of the table arguments. Unknown references: " + + unusedOnTimeFields); + } + + if (onTimeColumns.isEmpty()) { + return List.of(); + } + + // Don't allow mixtures of time attribute roots + if (onTimeColumns.stream().map(LogicalType::getTypeRoot).distinct().count() > 1) { + throw new ValidationException( + "Invalid time attribute declaration. " + + "All columns in the `on_time` argument must reference the same data type kind."); Review Comment: I wonder whether we should be more verbose here putting faced types in exception message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org