[ https://issues.apache.org/jira/browse/FLINK-13446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-13446: ----------------------------------- Labels: pull-request-available (was: ) > Row count sliding window outputs incorrectly in blink planner > ------------------------------------------------------------- > > Key: FLINK-13446 > URL: https://issues.apache.org/jira/browse/FLINK-13446 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.9.0 > Reporter: Hequn Cheng > Assignee: Hequn Cheng > Priority: Major > Labels: pull-request-available > Fix For: 1.9.0, 1.10.0 > > > For blink planner, the Row count sliding window outputs incorrectly. The > window assigner assigns less window than what expected. This means the window > outputs fewer data. The bug can be reproduced by the following test: > {code:java} > @Test > def testGroupWindowWithoutKeyInProjection(): Unit = { > val data = List( > (1L, 1, "Hi", 1, 1), > (2L, 2, "Hello", 2, 2), > (4L, 2, "Hello", 2, 2), > (8L, 3, "Hello world", 3, 3), > (16L, 3, "Hello world", 3, 3)) > val stream = failingDataSource(data) > val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, > 'proctime.proctime) > val weightAvgFun = new WeightedAvg > val countDistinct = new CountDistinct > val windowedTable = table > .window(Slide over 2.rows every 1.rows on 'proctime as 'w) > .groupBy('w, 'int2, 'int3, 'string) > .select(weightAvgFun('long, 'int), countDistinct('long)) > val sink = new TestingAppendSink > windowedTable.toAppendStream[Row].addSink(sink) > env.execute() > val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1") > assertEquals(expected.sorted, sink.getAppendResults.sorted) > } > {code} > The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the > actual output is Seq("12,2", "3,2") > To fix the problem, we can correct the assign logic in > CountSlidingWindowAssigner.assignWindows. -- This message was sent by Atlassian JIRA (v7.6.14#76016)