[ https://issues.apache.org/jira/browse/FLINK-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949929#comment-15949929 ]
ASF GitHub Bot commented on FLINK-6216: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3646#discussion_r109039910 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.types.Row +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair + +/** + * + * Flink RelNode for data stream group (without window & early-firing) aggregate + * + * @param cluster Cluster of the RelNode, represent for an environment of related + * relational expressions during the optimization of a query. + * @param traitSet Trait set of the RelNode + * @param inputNode The input RelNode of aggregation + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param rowRelDataType The type of the rows of the RelNode + * @param inputType The type of the rows of aggregation input RelNode + * @param groupings The position (in the input Row) of the grouping keys + */ +class DataStreamGroupAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + rowRelDataType: RelDataType, + inputType: RelDataType, + groupings: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with CommonAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + def getGrouping() = groupings --- End diff -- not used? > DataStream unbounded groupby aggregate with early firing > -------------------------------------------------------- > > Key: FLINK-6216 > URL: https://issues.apache.org/jira/browse/FLINK-6216 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Groupby aggregate results in a replace table. For infinite groupby aggregate, > we need a mechanism to define when the data should be emitted (early-fired). > This task is aimed to implement the initial version of unbounded groupby > aggregate, where we update and emit aggregate value per each arrived record. > In the future, we will implement the mechanism and interface to let user > define the frequency/period of early-firing the unbounded groupby aggregation > results. > The limit space of backend state is one of major obstacles for supporting > unbounded groupby aggregate in practical. Due to this reason, we suggest two > common (and very useful) use-cases of this unbounded groupby aggregate: > 1. The range of grouping key is limit. In this case, a new arrival record > will either insert to state as new record or replace the existing record in > the backend state. The data in the backend state will not be evicted if the > resource is properly provisioned by the user, such that we can provision the > correctness on aggregation results. > 2. When the grouping key is unlimited, we will not be able ensure the 100% > correctness of "unbounded groupby aggregate". In this case, we will reply on > the TTL mechanism of the RocksDB backend state to evicted old data such that > we can provision the correct results in a certain time range. -- This message was sent by Atlassian JIRA (v6.3.15#6346)