[ https://issues.apache.org/jira/browse/FLINK-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081140#comment-16081140 ]
ASF GitHub Bot commented on FLINK-7126: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4279#discussion_r126535807 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala --- @@ -38,6 +38,43 @@ class GroupAggregationsITCase extends StreamingWithStateTestBase { private val queryConfig = new StreamQueryConfig() queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) + + @Test + def testDistinct(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) + .select('b).distinct() + + val results = t.toRetractStream[Row](queryConfig) + results.addSink(new StreamITCase.RetractingSink).setParallelism(1) + env.execute() + + val expected = mutable.MutableList("1", "2", "3", "4", "5", "6") + assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) + } + + @Test + def testDistinctAfterAggregate(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e) + .groupBy('a, 'e).select('e).distinct() --- End diff -- I think it would be better if the first aggregation would produce updates, such as a `.groupBy('e).select('e, 'a.count()).distinct()`. The count aggregation will produce different count values which have to be retracted and accumulated. > Support Distinct for Stream SQL and Table API > --------------------------------------------- > > Key: FLINK-7126 > URL: https://issues.apache.org/jira/browse/FLINK-7126 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Affects Versions: 1.4.0 > Reporter: Jark Wu > Assignee: Jark Wu > -- This message was sent by Atlassian JIRA (v6.4.14#64029)