snuyanzin commented on code in PR #26250:
URL: https://github.com/apache/flink/pull/26250#discussion_r1980914361


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -275,6 +294,149 @@ private List<Field> deriveFunctionOutputFields(DataType 
functionDataType) {
                     .mapToObj(pos -> DataTypes.FIELD(fieldNames.get(pos), 
fieldTypes.get(pos)))
                     .collect(Collectors.toList());
         }
+
+        private List<Field> deriveRowtimeField(CallContext callContext) {
+            if (this.functionKind != FunctionKind.PROCESS_TABLE) {
+                return List.of();
+            }
+            final List<DataType> args = callContext.getArgumentDataTypes();
+
+            // Check if on_time is defined and non-empty
+            final int onTimePos = args.size() - 2;
+            final Set<String> onTimeFields =
+                    callContext
+                            .getArgumentValue(onTimePos, ColumnList.class)
+                            .map(ColumnList::getNames)
+                            .map(Set::copyOf)
+                            .orElse(Set.of());
+
+            final Set<String> usedOnTimeFields = new HashSet<>();
+
+            final List<LogicalType> onTimeColumns =
+                    IntStream.range(0, staticArgs.size())
+                            .mapToObj(
+                                    pos -> {
+                                        final StaticArgument staticArg = 
staticArgs.get(pos);
+                                        if 
(!staticArg.is(StaticArgumentTrait.TABLE)) {
+                                            return null;
+                                        }
+                                        final RowType rowType =
+                                                (RowType) 
args.get(pos).getLogicalType();
+                                        final int onTimeColumn =
+                                                findUniqueOnTimeColumn(
+                                                        staticArg.getName(), 
rowType, onTimeFields);
+                                        if (onTimeColumn >= 0) {
+                                            usedOnTimeFields.add(
+                                                    
rowType.getFieldNames().get(onTimeColumn));
+                                            return 
rowType.getTypeAt(onTimeColumn);
+                                        }
+                                        if 
(staticArg.is(StaticArgumentTrait.REQUIRE_ON_TIME)) {
+                                            throw new ValidationException(
+                                                    String.format(
+                                                            "Table argument 
'%s' requires a time attribute. "
+                                                                    + "Please 
provide one using the implicit `on_time` argument. "
+                                                                    + "For 
example: myFunction(..., on_time => DESCRIPTOR(`my_timestamp`)",
+                                                            
staticArg.getName()));
+                                        }
+                                        return null;
+                                    })
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList());
+
+            final Set<String> unusedOnTimeFields = new HashSet<>(onTimeFields);
+            unusedOnTimeFields.removeAll(usedOnTimeFields);
+            if (!unusedOnTimeFields.isEmpty()) {
+                throw new ValidationException(
+                        "Invalid time attribute declaration. "
+                                + "Each column in the `on_time` argument must 
reference at least one "
+                                + "column in one of the table arguments. 
Unknown references: "
+                                + unusedOnTimeFields);
+            }
+
+            if (onTimeColumns.isEmpty()) {
+                return List.of();
+            }
+
+            // Don't allow mixtures of time attribute roots
+            if 
(onTimeColumns.stream().map(LogicalType::getTypeRoot).distinct().count() > 1) {
+                throw new ValidationException(
+                        "Invalid time attribute declaration. "
+                                + "All columns in the `on_time` argument must 
reference the same data type kind.");

Review Comment:
   should we be more verbose here putting faced types in exception message?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/SystemTypeInference.java:
##########
@@ -275,6 +294,149 @@ private List<Field> deriveFunctionOutputFields(DataType 
functionDataType) {
                     .mapToObj(pos -> DataTypes.FIELD(fieldNames.get(pos), 
fieldTypes.get(pos)))
                     .collect(Collectors.toList());
         }
+
+        private List<Field> deriveRowtimeField(CallContext callContext) {
+            if (this.functionKind != FunctionKind.PROCESS_TABLE) {
+                return List.of();
+            }
+            final List<DataType> args = callContext.getArgumentDataTypes();
+
+            // Check if on_time is defined and non-empty
+            final int onTimePos = args.size() - 2;
+            final Set<String> onTimeFields =
+                    callContext
+                            .getArgumentValue(onTimePos, ColumnList.class)
+                            .map(ColumnList::getNames)
+                            .map(Set::copyOf)
+                            .orElse(Set.of());
+
+            final Set<String> usedOnTimeFields = new HashSet<>();
+
+            final List<LogicalType> onTimeColumns =
+                    IntStream.range(0, staticArgs.size())
+                            .mapToObj(
+                                    pos -> {
+                                        final StaticArgument staticArg = 
staticArgs.get(pos);
+                                        if 
(!staticArg.is(StaticArgumentTrait.TABLE)) {
+                                            return null;
+                                        }
+                                        final RowType rowType =
+                                                (RowType) 
args.get(pos).getLogicalType();
+                                        final int onTimeColumn =
+                                                findUniqueOnTimeColumn(
+                                                        staticArg.getName(), 
rowType, onTimeFields);
+                                        if (onTimeColumn >= 0) {
+                                            usedOnTimeFields.add(
+                                                    
rowType.getFieldNames().get(onTimeColumn));
+                                            return 
rowType.getTypeAt(onTimeColumn);
+                                        }
+                                        if 
(staticArg.is(StaticArgumentTrait.REQUIRE_ON_TIME)) {
+                                            throw new ValidationException(
+                                                    String.format(
+                                                            "Table argument 
'%s' requires a time attribute. "
+                                                                    + "Please 
provide one using the implicit `on_time` argument. "
+                                                                    + "For 
example: myFunction(..., on_time => DESCRIPTOR(`my_timestamp`)",
+                                                            
staticArg.getName()));
+                                        }
+                                        return null;
+                                    })
+                            .filter(Objects::nonNull)
+                            .collect(Collectors.toList());
+
+            final Set<String> unusedOnTimeFields = new HashSet<>(onTimeFields);
+            unusedOnTimeFields.removeAll(usedOnTimeFields);
+            if (!unusedOnTimeFields.isEmpty()) {
+                throw new ValidationException(
+                        "Invalid time attribute declaration. "
+                                + "Each column in the `on_time` argument must 
reference at least one "
+                                + "column in one of the table arguments. 
Unknown references: "
+                                + unusedOnTimeFields);
+            }
+
+            if (onTimeColumns.isEmpty()) {
+                return List.of();
+            }
+
+            // Don't allow mixtures of time attribute roots
+            if 
(onTimeColumns.stream().map(LogicalType::getTypeRoot).distinct().count() > 1) {
+                throw new ValidationException(
+                        "Invalid time attribute declaration. "
+                                + "All columns in the `on_time` argument must 
reference the same data type kind.");

Review Comment:
   I wonder whether we should be more verbose here putting faced types in 
exception message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to