[ https://issues.apache.org/jira/browse/FLINK-8689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16454272#comment-16454272 ]
ASF GitHub Bot commented on FLINK-8689: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r184405329 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.clear + + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + " PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") + assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + env.setParallelism(1) + StreamITCase.clear + + val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + " PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") + assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { + // use out-of-order data to test distinct accumulator remove + val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setStateBackend(getStateBackend) + env.setParallelism(1) --- End diff -- Actually, I was wrong on this one. Late elements are not deterministic handled if p > 1. Will change it back. > Add runtime support of distinct filter using MapView > ----------------------------------------------------- > > Key: FLINK-8689 > URL: https://issues.apache.org/jira/browse/FLINK-8689 > Project: Flink > Issue Type: Sub-task > Reporter: Rong Rong > Assignee: Rong Rong > Priority: Major > > This ticket should cover distinct aggregate function support to codegen for > *AggregateCall*, where *isDistinct* fields is set to true. > This can be verified using the following SQL, which is not currently > producing correct results. > {code:java} > SELECT > a, > SUM(b) OVER (PARTITION BY a ORDER BY proctime ROWS BETWEEN 5 PRECEDING AND > CURRENT ROW) > FROM > MyTable{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)