[ https://issues.apache.org/jira/browse/FLINK-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949941#comment-15949941 ]
ASF GitHub Bot commented on FLINK-6216: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3646#discussion_r109034147 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala --- @@ -221,8 +221,8 @@ case class Aggregate( } override def validate(tableEnv: TableEnvironment): LogicalNode = { - if (tableEnv.isInstanceOf[StreamTableEnvironment]) { - failValidation(s"Aggregate on stream tables is currently not supported.") + if (tableEnv.isInstanceOf[StreamTableEnvironment] && groupingExpressions == Nil) { + failValidation(s"Aggregate without any of groupby or over is not supported.") --- End diff -- This case should be easy to support as well with a `NullByteKeySelector` as explained below. > DataStream unbounded groupby aggregate with early firing > -------------------------------------------------------- > > Key: FLINK-6216 > URL: https://issues.apache.org/jira/browse/FLINK-6216 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Groupby aggregate results in a replace table. For infinite groupby aggregate, > we need a mechanism to define when the data should be emitted (early-fired). > This task is aimed to implement the initial version of unbounded groupby > aggregate, where we update and emit aggregate value per each arrived record. > In the future, we will implement the mechanism and interface to let user > define the frequency/period of early-firing the unbounded groupby aggregation > results. > The limit space of backend state is one of major obstacles for supporting > unbounded groupby aggregate in practical. Due to this reason, we suggest two > common (and very useful) use-cases of this unbounded groupby aggregate: > 1. The range of grouping key is limit. In this case, a new arrival record > will either insert to state as new record or replace the existing record in > the backend state. The data in the backend state will not be evicted if the > resource is properly provisioned by the user, such that we can provision the > correctness on aggregation results. > 2. When the grouping key is unlimited, we will not be able ensure the 100% > correctness of "unbounded groupby aggregate". In this case, we will reply on > the TTL mechanism of the RocksDB backend state to evicted old data such that > we can provision the correct results in a certain time range. -- This message was sent by Atlassian JIRA (v6.3.15#6346)