twalthr commented on a change in pull request #15280:
URL: https://github.com/apache/flink/pull/15280#discussion_r605167449



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
##########
@@ -248,11 +252,11 @@ private void validateTimeColumn(String columnName, 
List<Column> columns) {
     }
 
     private void validateWatermarkExpression(LogicalType watermarkType) {
-        if (!hasRoot(watermarkType, 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+        if (!hasFamily(watermarkType, LogicalTypeFamily.TIMESTAMP)
                 || getPrecision(watermarkType) != 3) {
             throw new ValidationException(
                     "Invalid data type of expression for watermark definition. 
"
-                            + "The field must be of type TIMESTAMP(3) WITHOUT 
TIME ZONE.");
+                            + "The field must be of type TIMESTAMP(3) WITHOUT 
TIME ZONE or TIMESTAMP_LTZ(3) WITHOUT TIME ZONE.");

Review comment:
       remove last `WITHOUT TIME ZONE`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
##########
@@ -270,13 +274,29 @@ private Column adjustRowtimeAttribute(List<WatermarkSpec> 
watermarkSpecs, Column
         final boolean hasWatermarkSpec =
                 watermarkSpecs.stream().anyMatch(s -> 
s.getRowtimeAttribute().equals(name));
         if (hasWatermarkSpec && isStreamingMode) {
-            final TimestampType originalType = (TimestampType) 
dataType.getLogicalType();
-            final LogicalType rowtimeType =
-                    new TimestampType(
-                            originalType.isNullable(),
-                            TimestampKind.ROWTIME,
-                            originalType.getPrecision());
-            return column.copy(replaceLogicalType(dataType, rowtimeType));
+            switch (dataType.getLogicalType().getTypeRoot()) {
+                case TIMESTAMP_WITHOUT_TIME_ZONE:
+                    final TimestampType originalType = (TimestampType) 
dataType.getLogicalType();
+                    final LogicalType rowtimeType =
+                            new TimestampType(
+                                    originalType.isNullable(),
+                                    TimestampKind.ROWTIME,
+                                    originalType.getPrecision());
+                    return column.copy(replaceLogicalType(dataType, 
rowtimeType));
+                case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                    final LocalZonedTimestampType timestampLtzType =
+                            (LocalZonedTimestampType) 
dataType.getLogicalType();
+                    final LogicalType rowtimeLtzType =
+                            new LocalZonedTimestampType(
+                                    timestampLtzType.isNullable(),
+                                    TimestampKind.ROWTIME,
+                                    timestampLtzType.getPrecision());
+                    return column.copy(replaceLogicalType(dataType, 
rowtimeLtzType));
+                default:
+                    throw new ValidationException(
+                            "Invalid data type of expression for rowtime 
definition. "
+                                    + "The field must be of type TIMESTAMP(3) 
WITHOUT TIME ZONE or TIMESTAMP_LTZ(3) WITHOUT TIME ZONE.");

Review comment:
       remove last `WITHOUT TIME ZONE`

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
##########
@@ -248,11 +252,11 @@ private void validateTimeColumn(String columnName, 
List<Column> columns) {
     }
 
     private void validateWatermarkExpression(LogicalType watermarkType) {
-        if (!hasRoot(watermarkType, 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+        if (!hasFamily(watermarkType, LogicalTypeFamily.TIMESTAMP)

Review comment:
       with this you also allow timestamp's with time zone, we should be more 
precise here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to