[ https://issues.apache.org/jira/browse/FLINK-13446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16895928#comment-16895928 ]
Jark Wu commented on FLINK-13446: --------------------------------- Hi [~twalthr], currently, the behavior of row count sliding window in blink planner is, for example, size=5, slide=3, then we will have following windows: [1,2,3,4,5], [4,5,6,7,8], [7,8,9,10,11] ... (say row_id starts from 1) In flink planner (or DataStream API), the assigned window will be: [1,2,3], [2,3,4,5,6], [5,6,7,8,9]... (say row_id starts from 1) We have two options for this issue: (1) keep the blink planner behavior and explain it in docs. (2) change blink planner behavior to flink planner. However, IMO, the flink planner behavior might confuse users why the first 5 elements not in the same window. So I prefer the option#1. No matter which option we choose, considering that it is a corner use case, I don't think it's a blocker. What do you think [~twalthr] [~hequn8128]? > Row count sliding window outputs incorrectly in blink planner > ------------------------------------------------------------- > > Key: FLINK-13446 > URL: https://issues.apache.org/jira/browse/FLINK-13446 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.9.0 > Reporter: Hequn Cheng > Assignee: Hequn Cheng > Priority: Blocker > Labels: pull-request-available > Fix For: 1.9.0, 1.10.0 > > Time Spent: 10m > Remaining Estimate: 0h > > For blink planner, the Row count sliding window outputs incorrectly. The > window assigner assigns less window than what expected. This means the window > outputs fewer data. The bug can be reproduced by the following test: > {code:java} > @Test > def testGroupWindowWithoutKeyInProjection(): Unit = { > val data = List( > (1L, 1, "Hi", 1, 1), > (2L, 2, "Hello", 2, 2), > (4L, 2, "Hello", 2, 2), > (8L, 3, "Hello world", 3, 3), > (16L, 3, "Hello world", 3, 3)) > val stream = failingDataSource(data) > val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, > 'proctime.proctime) > val weightAvgFun = new WeightedAvg > val countDistinct = new CountDistinct > val windowedTable = table > .window(Slide over 2.rows every 1.rows on 'proctime as 'w) > .groupBy('w, 'int2, 'int3, 'string) > .select(weightAvgFun('long, 'int), countDistinct('long)) > val sink = new TestingAppendSink > windowedTable.toAppendStream[Row].addSink(sink) > env.execute() > val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1") > assertEquals(expected.sorted, sink.getAppendResults.sorted) > } > {code} > The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the > actual output is Seq("12,2", "3,2") > To fix the problem, we can correct the assign logic in > CountSlidingWindowAssigner.assignWindows. -- This message was sent by Atlassian JIRA (v7.6.14#76016)