wuchong commented on a change in pull request #15485: URL: https://github.com/apache/flink/pull/15485#discussion_r608401319
########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java ########## @@ -122,24 +126,13 @@ public static boolean isProctimeAttribute(LogicalType logicalType) { } public static boolean canBeTimeAttributeType(LogicalType logicalType) { - if (isProctimeAttribute(logicalType) - && logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { - return true; - } - if (isRowtimeAttribute(logicalType) - && (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE - || logicalType.getTypeRoot() - == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) { + if (logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE + || logicalType.getTypeRoot() == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) { Review comment: Can be simplify to ```java LogicalTypeRoot typeRoot = logicalType.getTypeRoot(); return typeRoot == LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE || typeRoot == LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE; ``` ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java ########## @@ -197,11 +198,23 @@ private void validateDuplicateColumns(List<Schema.UnresolvedColumn> columns) { } validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType()); + if (!watermarkExpression + .getOutputDataType() + .getLogicalType() + .getTypeRoot() + .equals(validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) { Review comment: nit: can use `==` for enums. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/triggers/EventTimeTriggers.java ########## @@ -94,11 +95,13 @@ public void open(TriggerContext ctx) throws Exception { @Override public boolean onElement(Object element, long timestamp, W window) throws Exception { - if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { + if (toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()) + <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return true; } else { - ctx.registerEventTimeTimer(window.maxTimestamp()); + ctx.registerEventTimeTimer( + toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone())); Review comment: We can have a private method to reduce the duplicate method call. ```java private long triggerTime(W window) { return toEpochMillsForTimer(window.maxTimestamp(), ctx.getShiftTimeZone()); } ``` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -329,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - val isValid = inputTypes.forall { t => + inputTypes.forall { t => Review comment: We don't care the return value, can be `foreach` here. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.slicing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.InternalTimerService; + +import java.time.ZoneId; + +/** + * Interface for working with window time and timers which considers timezone for window splitting. + * + * @param <W> Type of the window namespace to which timers are scoped. + */ +@Internal +public interface WindowTimerService<W> { + + /** + * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift + * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC + * which means never shift when assigning windows. + */ + ZoneId getShiftTimeZone(); + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + long currentWatermark(); + + /** Returns the current {@link InternalTimerService}. */ + InternalTimerService<W> getInternalTimerService(); + + /** + * Registers a window timer to be fired when processing time passes the window. The window you + * pass here will be provided when the timer fires. + */ + void registerProcessingTimeWindowTimer(W window, long windowEnd); + + /** Deletes the timer for the given key and window. */ + void deleteProcessingTimeWindowTimer(W window, long windowEnd); Review comment: We can introduce this when needed. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java ########## @@ -65,7 +66,9 @@ public static long toUtcTimestampMills(long epochMills, ZoneId shiftTimeZone) { * @return the epoch mills. */ public static long toEpochMillsForTimer(long utcTimestampMills, ZoneId shiftTimeZone) { - if (shiftTimeZone.equals(UTC_ZONE_ID)) { + // Long.MAX_VALUE is a flag of max watermark, directly return it + if (UTC_ZONE_ID.equals(shiftTimeZone.equals(shiftTimeZone)) Review comment: Should be `UTC_ZONE_ID.equals(shiftTimeZone)`?? ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java ########## @@ -109,11 +109,15 @@ public static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily famil } public static boolean isTimeAttribute(LogicalType logicalType) { - return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) != TimestampKind.REGULAR; + return (hasRoot(logicalType, LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) Review comment: I thought about this again, this may allow TIMESTAMP + PROCTIME which is illegal. I think we can just call `return isRowtimeAttribute(logicalType) || isProctimeAttribute(logicalType);` instead. ########## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java ########## @@ -0,0 +1,74 @@ +package org.apache.flink.table.runtime.util; + +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.TimeZone; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.junit.Assert.assertEquals; + +/** Test for {@link org.apache.flink.table.runtime.util.TimeWindowUtil}. */ +public class TimeWindowUtilTest { + + private static final ZoneId UTC_ZONE_ID = TimeZone.getTimeZone("UTC").toZoneId(); + + @Test + public void testShiftedTimeZone() { + ZoneId zoneId = ZoneId.of("Asia/Shanghai"); + assertEquals(-28799000L, toEpochMillsForTimer(utcMills("1970-01-01T00:00:01"), zoneId)); + assertEquals(-1L, toEpochMillsForTimer(utcMills("1970-01-01T07:59:59.999"), zoneId)); + assertEquals(1000L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:01"), zoneId)); + assertEquals(1L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:00.001"), zoneId)); + } + + @Test + public void testDaylightSaving() { + ZoneId zoneId = ZoneId.of("America/Los_Angeles"); + /* + * The DaylightTime in Los_Angele start at time 2021-03-14 02:00:00 + * <pre> + * 2021-03-14 00:00:00 -> epoch1 = 1615708800000L; + * 2021-03-14 01:00:00 -> epoch2 = 1615712400000L; + * 2021-03-14 03:00:00 -> epoch3 = 1615716000000L; skip one hour (2021-03-14 02:00:00) + * 2021-03-14 04:00:00 -> epoch4 = 1615719600000L; + */ + assertEquals(1615708800000L, toEpochMillsForTimer(utcMills("2021-03-14T00:00:00"), zoneId)); + assertEquals(1615712400000L, toEpochMillsForTimer(utcMills("2021-03-14T01:00:00"), zoneId)); + assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T02:00:00"), zoneId)); + assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T03:00:00"), zoneId)); + assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T02:30:00"), zoneId)); + assertEquals(1615719599000L, toEpochMillsForTimer(utcMills("2021-03-14T02:59:59"), zoneId)); Review comment: These two looks not correct. IIUC, all timers of `2021-03-14T02:00:00` until `2021-03-14T02:59:59` should be `1615716000000L`, because at `2021-03-14T03:00:00` (1615716000000L), we have passed `02:59:59`, we don't need to wait `03:59:59`. ########## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java ########## @@ -0,0 +1,74 @@ +package org.apache.flink.table.runtime.util; + +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.TimeZone; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.junit.Assert.assertEquals; + +/** Test for {@link org.apache.flink.table.runtime.util.TimeWindowUtil}. */ +public class TimeWindowUtilTest { + + private static final ZoneId UTC_ZONE_ID = TimeZone.getTimeZone("UTC").toZoneId(); + + @Test + public void testShiftedTimeZone() { + ZoneId zoneId = ZoneId.of("Asia/Shanghai"); + assertEquals(-28799000L, toEpochMillsForTimer(utcMills("1970-01-01T00:00:01"), zoneId)); + assertEquals(-1L, toEpochMillsForTimer(utcMills("1970-01-01T07:59:59.999"), zoneId)); + assertEquals(1000L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:01"), zoneId)); + assertEquals(1L, toEpochMillsForTimer(utcMills("1970-01-01T08:00:00.001"), zoneId)); + } + + @Test + public void testDaylightSaving() { + ZoneId zoneId = ZoneId.of("America/Los_Angeles"); + /* + * The DaylightTime in Los_Angele start at time 2021-03-14 02:00:00 + * <pre> + * 2021-03-14 00:00:00 -> epoch1 = 1615708800000L; + * 2021-03-14 01:00:00 -> epoch2 = 1615712400000L; + * 2021-03-14 03:00:00 -> epoch3 = 1615716000000L; skip one hour (2021-03-14 02:00:00) + * 2021-03-14 04:00:00 -> epoch4 = 1615719600000L; + */ + assertEquals(1615708800000L, toEpochMillsForTimer(utcMills("2021-03-14T00:00:00"), zoneId)); + assertEquals(1615712400000L, toEpochMillsForTimer(utcMills("2021-03-14T01:00:00"), zoneId)); + assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T02:00:00"), zoneId)); + assertEquals(1615716000000L, toEpochMillsForTimer(utcMills("2021-03-14T03:00:00"), zoneId)); + assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T02:30:00"), zoneId)); + assertEquals(1615719599000L, toEpochMillsForTimer(utcMills("2021-03-14T02:59:59"), zoneId)); + assertEquals(1615717800000L, toEpochMillsForTimer(utcMills("2021-03-14T03:30:00"), zoneId)); + + /* + * The DaylightTime in Los_Angele start at time 2021-11-07 01:00:00 + * <pre> + * 2021-11-07 00:00:00 -> epoch0 = 1636268400000L; 2021-11-07 00:00:00 + * 2021-11-07 01:00:00 -> epoch1 = 1636272000000L; the first local timestamp 2021-11-07 01:00:00 + * 2021-11-07 01:00:00 -> epoch2 = 1636275600000L; back to local timestamp 2021-11-07 01:00:00 + * 2021-11-07 02:00:00 -> epoch3 = 1636279200000L; 2021-11-07 02:00:00 + */ + assertEquals(1636268400000L, toEpochMillsForTimer(utcMills("2021-11-07T00:00:00"), zoneId)); + assertEquals(1636275600000L, toEpochMillsForTimer(utcMills("2021-11-07T01:00:00"), zoneId)); + assertEquals(1636279200000L, toEpochMillsForTimer(utcMills("2021-11-07T02:00:00"), zoneId)); + assertEquals(1636268401000L, toEpochMillsForTimer(utcMills("2021-11-07T00:00:01"), zoneId)); + assertEquals(1636279199000L, toEpochMillsForTimer(utcMills("2021-11-07T01:59:59"), zoneId)); + assertEquals(1636279201000L, toEpochMillsForTimer(utcMills("2021-11-07T02:00:01"), zoneId)); + } + + @Test + public void testMaxWaterMark() { Review comment: nit: `testMaxWatermark` ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/LocalSlicingWindowAggOperator.java ########## @@ -125,12 +140,14 @@ public void processElement(StreamRecord<RowData> element) throws Exception { @Override public void processWatermark(Watermark mark) throws Exception { - if (mark.getTimestamp() > currentWatermark) { - currentWatermark = mark.getTimestamp(); - if (currentWatermark >= nextTriggerWatermark) { - // we only need to call advanceProgress() when currentWatermark may trigger window - windowBuffer.advanceProgress(currentWatermark); - nextTriggerWatermark = getNextTriggerWatermark(currentWatermark, windowInterval); + long timestamp = toUtcTimestampMills(mark.getTimestamp(), shiftTimezone); + if (timestamp > currentShiftWatermark) { + currentShiftWatermark = timestamp; + if (currentShiftWatermark >= nextTriggerWatermark) { Review comment: Would be better to not shift watermark, but shift timers only. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java ########## @@ -103,7 +93,7 @@ public void open(Context<Long> context) throws Exception { new ValueStateDescriptor<>("window-aggs", accSerializer)); this.windowState = new WindowValueState<>((InternalValueState<RowData, Long, RowData>) state); - this.clockService = ClockService.of(ctx.getTimerService()); + this.clockService = ClockService.of(ctx.getTimerService().getInternalTimerService()); Review comment: We can construct the ClockService from `WindowTimerSerivce`, so that we don't need to expose `getInternalTimerService`. ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -329,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - val isValid = inputTypes.forall { t => + inputTypes.forall { t => val fieldTypes = t.getFieldList.map(_.getType) fieldTypes.zip(head).forall { case (l, r) => - // check if time indicators match - if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { - val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime - val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime - leftTime == rightTime - } - // one side is not an indicator - else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { - false - } - // uninteresting types - else { - true - } + validateUnionPair(l, r) + } + } + + setOp.copy(setOp.getTraitSet, inputs, setOp.all) + } + + private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = { + val exceptionMsg = + s"Union fields with time attributes requires same types, but the types are %s and %s." + // check if time indicators match + val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { + val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime + val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime + if (leftTime && leftTime) { Review comment: ```suggestion if (leftTime && rightTime) { ``` ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/slicing/WindowTimerService.java ########## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.slicing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.operators.InternalTimerService; + +import java.time.ZoneId; + +/** + * Interface for working with window time and timers which considers timezone for window splitting. + * + * @param <W> Type of the window namespace to which timers are scoped. + */ +@Internal +public interface WindowTimerService<W> { + + /** + * The shift timezone of the window, if the proctime or rowtime type is TIMESTAMP_LTZ, the shift + * timezone is the timezone user configured in TableConfig, other cases the timezone is UTC + * which means never shift when assigning windows. + */ + ZoneId getShiftTimeZone(); + + /** Returns the current processing time. */ + long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + long currentWatermark(); + + /** Returns the current {@link InternalTimerService}. */ + InternalTimerService<W> getInternalTimerService(); + + /** + * Registers a window timer to be fired when processing time passes the window. The window you + * pass here will be provided when the timer fires. + */ + void registerProcessingTimeWindowTimer(W window, long windowEnd); Review comment: We don't need `windowEnd` parameter, it should be inferred from `window` if the implementation knows the generic type `W`. ########## File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/TimeWindowUtilTest.java ########## @@ -0,0 +1,74 @@ +package org.apache.flink.table.runtime.util; + +import org.junit.Test; + +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.TimeZone; + +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMills; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toUtcTimestampMills; +import static org.junit.Assert.assertEquals; + +/** Test for {@link org.apache.flink.table.runtime.util.TimeWindowUtil}. */ +public class TimeWindowUtilTest { Review comment: Please add tests for `toEpochMills`. ########## File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java ########## @@ -146,8 +135,9 @@ public boolean processElement(RowData key, RowData element) throws Exception { @Override public void advanceProgress(long progress) throws Exception { - if (progress > currentProgress) { - currentProgress = progress; + long timestamp = toUtcTimestampMills(progress, timerService.getShiftTimeZone()); Review comment: Would be better not shift watermark/processing-time, but shift timers only. ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -329,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - val isValid = inputTypes.forall { t => + inputTypes.forall { t => val fieldTypes = t.getFieldList.map(_.getType) fieldTypes.zip(head).forall { case (l, r) => - // check if time indicators match - if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { - val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime - val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime - leftTime == rightTime - } - // one side is not an indicator - else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { - false - } - // uninteresting types - else { - true - } + validateUnionPair(l, r) + } + } + + setOp.copy(setOp.getTraitSet, inputs, setOp.all) + } + + private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = { + val exceptionMsg = + s"Union fields with time attributes requires same types, but the types are %s and %s." + // check if time indicators match + val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { + val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime + val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime Review comment: Would be better to call `leftIsEventTime`. ########## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java ########## @@ -197,11 +198,23 @@ private void validateDuplicateColumns(List<Schema.UnresolvedColumn> columns) { } validateWatermarkExpression(watermarkExpression.getOutputDataType().getLogicalType()); + if (!watermarkExpression + .getOutputDataType() + .getLogicalType() + .getTypeRoot() + .equals(validatedTimeColumn.getDataType().getLogicalType().getTypeRoot())) { + throw new ValidationException( + String.format( + "The watermark output type %s is different with input time filed type %s.", Review comment: ```suggestion "The watermark output type %s is different from input time filed type %s.", ``` ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -329,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - val isValid = inputTypes.forall { t => + inputTypes.forall { t => val fieldTypes = t.getFieldList.map(_.getType) fieldTypes.zip(head).forall { case (l, r) => Review comment: ditto. ########## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala ########## @@ -329,33 +338,45 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle { val head = inputTypes.head.getFieldList.map(_.getType) - val isValid = inputTypes.forall { t => + inputTypes.forall { t => val fieldTypes = t.getFieldList.map(_.getType) fieldTypes.zip(head).forall { case (l, r) => - // check if time indicators match - if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { - val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime - val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime - leftTime == rightTime - } - // one side is not an indicator - else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { - false - } - // uninteresting types - else { - true - } + validateUnionPair(l, r) + } + } + + setOp.copy(setOp.getTraitSet, inputs, setOp.all) + } + + private def validateUnionPair(l: RelDataType, r: RelDataType): Boolean = { + val exceptionMsg = + s"Union fields with time attributes requires same types, but the types are %s and %s." + // check if time indicators match + val isValid = if (isTimeIndicatorType(l) && isTimeIndicatorType(r)) { + val leftTime = l.asInstanceOf[TimeIndicatorRelDataType].isEventTime + val rightTime = r.asInstanceOf[TimeIndicatorRelDataType].isEventTime + if (leftTime && leftTime) { + //rowtime must have same type + isTimestampLtzIndicatorType(l) == isTimestampLtzIndicatorType(r) + } else { + leftTime == rightTime } } + // one side is not an indicator + else if (isTimeIndicatorType(l) || isTimeIndicatorType(r)) { + false + } + // uninteresting types + else { + true + } if (!isValid) { throw new ValidationException( - "Union fields with time attributes have different types.") + String.format(exceptionMsg, l.toString, r.toString)) } - - setOp.copy(setOp.getTraitSet, inputs, setOp.all) + isValid Review comment: We have throw exception above, so no need to return `isValid`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org