rkhachatryan commented on code in PR #27345:
URL: https://github.com/apache/flink/pull/27345#discussion_r2674244139


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/join/interval/RowTimeIntervalJoinTest.java:
##########
@@ -413,6 +422,93 @@ void testRowTimeFullOuterJoin() throws Exception {
         testHarness.close();
     }
 
+    @Test
+    public void testInterruptibleTimers() throws Exception {
+        final boolean[] isMailProcessed = new boolean[1]; // e.g. checkpoint 
mail
+
+        int allowedLateness = 1;
+        int leftUpperBound = 1;
+        RowTimeIntervalJoin joinProcessFunc =
+                new RowTimeIntervalJoin(
+                        FlinkJoinType.FULL,
+                        -1,
+                        leftUpperBound,
+                        allowedLateness,
+                        0,
+                        rowType,
+                        rowType,
+                        joinFunction,
+                        0,
+                        0);
+
+        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, 
RowData> testHarness =
+                createTestHarness(joinProcessFunc);
+        testHarness
+                .getEnvironment()
+                .getJobConfiguration()
+                .set(ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, true)
+                .set(CHECKPOINTING_INTERVAL, Duration.ofMillis(10)) // just 
enable checkpointing
+                .set(ENABLE_UNALIGNED, true)
+                .set(CHECKPOINTING_CONSISTENCY_MODE, 
CheckpointingMode.EXACTLY_ONCE);
+
+        final int operatorMailPriority = 123;
+        testHarness
+                .getOperator()
+                .setMailboxExecutor(
+                        new MailboxExecutorImpl(
+                                testHarness.getTaskMailbox(),
+                                operatorMailPriority,
+                                StreamTaskActionExecutor.IMMEDIATE));
+
+        testHarness.open();
+
+        testHarness.processElement1(insertRecord(5L, "k1"));
+        testHarness.processElement2(insertRecord(6L, "k2"));
+        testHarness.processElement1(insertRecord(7L, "k3"));
+        testHarness.processElement2(insertRecord(8L, "k4"));
+
+        testHarness
+                .getTaskMailbox()
+                .put(new Mail(() -> isMailProcessed[0] = true, 
operatorMailPriority, "%s", "test"));
+
+        final int timersBeforeWatermark = testHarness.numEventTimeTimers();
+        assertThat(timersBeforeWatermark).isPositive();
+
+        final int endTime = 99; // should trigger emission of all elements
+        testHarness.processWatermark1(new Watermark(endTime));
+        testHarness.processWatermark2(new Watermark(endTime));
+
+        final int timersAfterWatermark = testHarness.numEventTimeTimers();
+        assertThat(timersAfterWatermark)
+                .as("On watermark, some timers should be processed, some 
should be postponed")
+                .isLessThan(timersBeforeWatermark)
+                .isNotZero();
+
+        testHarness.getTaskMailbox().take(operatorMailPriority).run();
+        assertThat(isMailProcessed[0]).as("The mail should be 
processed").isTrue();
+        assertThat(testHarness.numEventTimeTimers())
+                .as("The number of timers shouldn't change after the 1st mail")
+                .isEqualTo(timersAfterWatermark);
+
+        // process the remaining timers
+        for (Mail mail : testHarness.getTaskMailbox().drain()) {
+            mail.run();
+        }
+        assertThat(testHarness.numEventTimeTimers())
+                .as("Eventually, all timers should be processed")
+                .isZero();
+
+        final List<Object> expectedOutput = new ArrayList<>();
+        expectedOutput.add(insertRecord(5L, "k1", null, null));
+        expectedOutput.add(insertRecord(null, null, 6L, "k2"));
+        expectedOutput.add(insertRecord(7L, "k3", null, null));
+        expectedOutput.add(insertRecord(null, null, 8L, "k4"));
+        expectedOutput.add(new Watermark(endTime - allowedLateness - 
leftUpperBound));

Review Comment:
   This verifies WM delay



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java:
##########
@@ -160,17 +165,27 @@ public abstract class AbstractStreamOperator<OUT>
     protected transient RecordAttributes lastRecordAttributes1;
     protected transient RecordAttributes lastRecordAttributes2;
 
-    public AbstractStreamOperator() {}
+    public AbstractStreamOperator() {
+        this(null);
+    }
 
     public AbstractStreamOperator(StreamOperatorParameters<OUT> parameters) {
+        this(parameters, WatermarkConsumerSupplier.defaultSupplier());
+    }
+
+    public AbstractStreamOperator(
+            StreamOperatorParameters<OUT> parameters,
+            WatermarkConsumerSupplier<OUT> watermarkConsumerSupplier) {
         if (parameters != null) {
             setup(
                     parameters.getContainingTask(),
                     parameters.getStreamConfig(),
                     parameters.getOutput());
             this.processingTimeService =
                     
Preconditions.checkNotNull(parameters.getProcessingTimeService());
+            this.mailboxExecutor = parameters.getMailboxExecutor();

Review Comment:
   This is not necessary but it makes it more consistent with 
AbstractStreamOperatorV2



##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java:
##########
@@ -170,6 +170,7 @@ public AbstractStreamOperator(StreamOperatorParameters<OUT> 
parameters) {
                     parameters.getOutput());
             this.processingTimeService =
                     
Preconditions.checkNotNull(parameters.getProcessingTimeService());
+            this.mailboxExecutor = parameters.getMailboxExecutor();

Review Comment:
   This is not necessary but it makes it more consistent with 
`AbstractStreamOperatorV2`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to