xuyangzhong commented on code in PR #26430:
URL: https://github.com/apache/flink/pull/26430#discussion_r2081141084


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java:
##########
@@ -429,4 +461,30 @@ private File createSourceSinkTables() throws IOException {
     private String getPreparedToCompareCompiledPlan(final String planAsString) 
{
         return 
TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString));
     }
+
+    @Test
+    void testWatermarkPushDownWithTimeStampChanged() throws Exception {

Review Comment:
   nit: Timestamp



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java:
##########
@@ -429,4 +461,30 @@ private File createSourceSinkTables() throws IOException {
     private String getPreparedToCompareCompiledPlan(final String planAsString) 
{
         return 
TableTestUtil.replaceExecNodeId(TableTestUtil.replaceFlinkVersion(planAsString));
     }
+
+    @Test
+    void testWatermarkPushDownWithTimeStampChanged() throws Exception {

Review Comment:
   How about moving this test to `ScanReuseTest`, as it is actually unrelated 
to `CompilePlan`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java:
##########
@@ -203,7 +204,13 @@ private static WatermarkPushDownSpec adjustWatermarkIndex(
                 String name = newFieldNames.get(i);
                 LogicalType type = newSourceType.getTypeAt(i);
                 if (name.equals(rowtimeColumn)) {
-                    type = new TimestampType(type.isNullable(), 
TimestampKind.ROWTIME, 3);
+                    if (type instanceof LocalZonedTimestampType) {
+                        type =
+                                new LocalZonedTimestampType(
+                                        type.isNullable(), 
TimestampKind.ROWTIME, 3);

Review Comment:
   I just wonder what happens if the precision is not 3 actually.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/ScanReuserUtils.java:
##########
@@ -203,7 +204,13 @@ private static WatermarkPushDownSpec adjustWatermarkIndex(
                 String name = newFieldNames.get(i);
                 LogicalType type = newSourceType.getTypeAt(i);
                 if (name.equals(rowtimeColumn)) {
-                    type = new TimestampType(type.isNullable(), 
TimestampKind.ROWTIME, 3);
+                    if (type instanceof LocalZonedTimestampType) {

Review Comment:
   I believe that when encountering types other than LocalZonedTimestampType 
and TimestampType, it would be better to raise an error proactively rather than 
silently converting them to TimestampType. This is because currently we just do 
not support timestamp(3) with time zone during parsing. (See more at 
https://github.com/apache/flink/blob/d61575e80f9050840be2cfede3d8c7ba09c74a0d/flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj#L5979)
 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/CompiledPlanITCase.java:
##########
@@ -55,6 +56,18 @@ class CompiledPlanITCase extends JsonPlanTestBase {
     private static final String[] COLUMNS_DEFINITION =
             new String[] {"a bigint", "b int", "c varchar"};
 
+    private static final String[] COLUMNS_DEFINITION_WITH_TIMESTAMP =
+            new String[] {"a bigint", "ts timestamp_ltz(3)"};
+    private static final List<String> DATA_WITH_TIMESTAMP =

Review Comment:
   Remove this unused variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to