[ 
https://issues.apache.org/jira/browse/FLINK-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081140#comment-16081140
 ] 

ASF GitHub Bot commented on FLINK-7126:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4279#discussion_r126535807
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
 ---
    @@ -38,6 +38,43 @@ class GroupAggregationsITCase extends 
StreamingWithStateTestBase {
       private val queryConfig = new StreamQueryConfig()
       queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
     
    +
    +  @Test
    +  def testDistinct(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 
'c)
    +      .select('b).distinct()
    +
    +    val results = t.toRetractStream[Row](queryConfig)
    +    results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
    +    assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
    +  }
    +
    +  @Test
    +  def testDistinctAfterAggregate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 
'c, 'd, 'e)
    +      .groupBy('a, 'e).select('e).distinct()
    --- End diff --
    
    I think it would be better if the first aggregation would produce updates, 
such as a `.groupBy('e).select('e, 'a.count()).distinct()`. The count 
aggregation will produce different count values which have to be retracted and 
accumulated.


> Support Distinct for Stream SQL and Table API
> ---------------------------------------------
>
>                 Key: FLINK-7126
>                 URL: https://issues.apache.org/jira/browse/FLINK-7126
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.4.0
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to