[ https://issues.apache.org/jira/browse/FLINK-9201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
yuemeng updated FLINK-9201: --------------------------- Summary: same merge window will be fired twice if watermark already passed the merge window (was: same merge window will be fired twice if watermark already passed the new merged window) > same merge window will be fired twice if watermark already passed the merge > window > ---------------------------------------------------------------------------------- > > Key: FLINK-9201 > URL: https://issues.apache.org/jira/browse/FLINK-9201 > Project: Flink > Issue Type: Bug > Components: Core > Affects Versions: 1.3.3 > Reporter: yuemeng > Assignee: yuemeng > Priority: Blocker > > sum with session window,.suppose the session gap is 3 seconds and > allowedlateness is 60 seconds > * w1,TimeWindow[1,9] had elements,1,2,3,6,will be fired if watermark reached > 9 > * if a late element (w2,TimeWindow[7,10]) had come but the watermark > already at 11. > * w1,w2 will be merged a new window w3 TimeWindow[1,10] and will be register > a new timer by call triggerContext.onMerge(mergedWindows),w3 will be fired > first time by call triggerContext.onElement(element) because of the watermark > pass the w3. w3 will be fired second times because of the timer < current > watermark. > that mean w3 will be fired twice because of watermark pass the new merge > window w3. > Examples > {code:java} > @Test > @SuppressWarnings("unchecked") > public void testSessionWindowsFiredTwice() throws Exception { > closeCalled.set(0); > final int sessionSize = 3; > TypeInformation<Tuple2<String, Integer>> inputType = > TypeInfoParser.parse("Tuple2<String, Integer>"); > ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new > ListStateDescriptor<>("window-contents", > inputType.createSerializer(new ExecutionConfig())); > WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, > Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new > WindowOperator<>( > EventTimeSessionWindows.withGap(Time.seconds(sessionSize)), > new TimeWindow.Serializer(), > new TupleKeySelector(), > BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), > stateDesc, > new InternalIterableWindowFunction<>(new SessionWindowFunction()), > EventTimeTrigger.create(), > 60000, > null /* late data output tag */); > OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, > Long, Long>> testHarness = > createTestHarness(operator); > ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); > testHarness.open(); > // add elements out-of-order > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 0)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), > 1000)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 3), > 2500)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 10)); > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 2), > 1000)); > testHarness.processWatermark(new Watermark(5500)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-6", 0L, 5500L), > 5499)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 4000L), > 3999)); > expectedOutput.add(new Watermark(5500)); > // do a snapshot, close and restore again > OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > testHarness = createTestHarness(operator); > testHarness.setup(); > testHarness.initializeState(snapshot); > testHarness.open(); > expectedOutput.clear(); > //suppose the watermark alread arrived 10000 > testHarness.processWatermark(new Watermark(10000)); > //late element with timestamp 4500 had arrived,the new session window[0, > 7500] is still a valid window becase of maxtimestamp < cleantime > //and fired immediately > testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 6), > 4500)); > expectedOutput.add(new Watermark(10000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > //when a new watermark had arrived,the same TimeWindow[0, 7500] will fired > again becase of a new timer had rigstered by call triggerOnMerge > testHarness.processWatermark(new Watermark(11000)); > expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-12", 0L, 7500L), > 7499)); > expectedOutput.add(new Watermark(11000)); > TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", > expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator()); > testHarness.close(); > } > {code} > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)