autophagy commented on code in PR #28326:
URL: https://github.com/apache/flink/pull/28326#discussion_r3382164630
##########
flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/ProcessTableFunctionTestHarness.java:
##########
@@ -711,25 +1197,67 @@ public ProcessTableFunctionTestHarness<OUT> build()
throws Exception {
// Extract table arguments for output type derivation
// SystemTypeInference needs table semantics for pass-through
column deduplication
- List<TableArgumentInfo> tableArgs =
ArgumentInfo.filterTableArguments(arguments);
+ List<TableArgumentInfo> tableArgInfos =
ArgumentInfo.filterTableArguments(arguments);
// Derive output schema using SystemTypeInference
DataType derivedOutputType =
deriveOutputTypeFromSystemInference(
- function, dataTypeFactory, systemTypeInference,
tableArgs);
+ function,
+ dataTypeFactory,
+ systemTypeInference,
+ arguments,
+ tableArgInfos);
// Create output converter for PTF emissions
DataStructureConverter<Object, Object> harnessOutputConverter =
createPTFOutputConverter(derivedOutputType);
+ // Validate onTimeColumn configuration
+ if (onTimeColumnName != null) {
+ boolean foundInAnyTable =
+ tableArgInfos.stream()
+ .anyMatch(
+ t ->
getFieldNames(t.dataType).contains(onTimeColumnName));
Review Comment:
We get all this for free from `SystemTypeInference`!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]