xuyangzhong commented on code in PR #26430: URL: https://github.com/apache/flink/pull/26430#discussion_r2081141084
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java: ########## @@ -429,4 +461,30 @@ private File createSourceSinkTables() throws IOException { private String getPreparedToCompareCompiledPlan(final String planAsString) { return TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString)); } + + @Test + void testWatermarkPushDownWithTimeStampChanged() throws Exception { Review Comment: nit: Timestamp ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java: ########## @@ -429,4 +461,30 @@ private File createSourceSinkTables() throws IOException { private String getPreparedToCompareCompiledPlan(final String planAsString) { return TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString)); } + + @Test + void testWatermarkPushDownWithTimeStampChanged() throws Exception { Review Comment: How about moving this test to `ScanReuseTest`, as it is actually unrelated to `CompilePlan`? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java: ########## @@ -203,7 +204,13 @@ private static WatermarkPushDownSpec adjustWatermarkIndex( String name = newFieldNames.get(i); LogicalType type = newSourceType.getTypeAt(i); if (name.equals(rowtimeColumn)) { - type = new TimestampType(type.isNullable(), TimestampKind.ROWTIME, 3); + if (type instanceof LocalZonedTimestampType) { + type = + new LocalZonedTimestampType( + type.isNullable(), TimestampKind.ROWTIME, 3); Review Comment: I just wonder what happens if the precision is not 3 actually. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java: ########## @@ -203,7 +204,13 @@ private static WatermarkPushDownSpec adjustWatermarkIndex( String name = newFieldNames.get(i); LogicalType type = newSourceType.getTypeAt(i); if (name.equals(rowtimeColumn)) { - type = new TimestampType(type.isNullable(), TimestampKind.ROWTIME, 3); + if (type instanceof LocalZonedTimestampType) { Review Comment: I believe that when encountering types other than LocalZonedTimestampType and TimestampType, it would be better to raise an error proactively rather than silently converting them to TimestampType. This is because currently we just do not support timestamp(3) with time zone during parsing. (See more at https://github.com/apache/flink/blob/d61575e80f9050840be2cfede3d8c7ba09c74a0d/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj#L5979) ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java: ########## @@ -55,6 +56,18 @@ class CompiledPlanITCase extends JsonPlanTestBase { private static final String[] COLUMNS_DEFINITION = new String[] {"a bigint", "b int", "c varchar"}; + private static final String[] COLUMNS_DEFINITION_WITH_TIMESTAMP = + new String[] {"a bigint", "ts timestamp_ltz(3)"}; + private static final List<String> DATA_WITH_TIMESTAMP = Review Comment: Remove this unused variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org