[ https://issues.apache.org/jira/browse/FLINK-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15949926#comment-15949926 ]
ASF GitHub Bot commented on FLINK-6216: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3646#discussion_r109041002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala --- @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.StreamTableEnvironment +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.plan.nodes.CommonAggregate +import org.apache.flink.types.Row +import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair + +/** + * + * Flink RelNode for data stream group (without window & early-firing) aggregate + * + * @param cluster Cluster of the RelNode, represent for an environment of related + * relational expressions during the optimization of a query. + * @param traitSet Trait set of the RelNode + * @param inputNode The input RelNode of aggregation + * @param namedAggregates List of calls to aggregate functions and their output field names + * @param rowRelDataType The type of the rows of the RelNode + * @param inputType The type of the rows of aggregation input RelNode + * @param groupings The position (in the input Row) of the grouping keys + */ +class DataStreamGroupAggregate( + cluster: RelOptCluster, + traitSet: RelTraitSet, + inputNode: RelNode, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + rowRelDataType: RelDataType, + inputType: RelDataType, + groupings: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with CommonAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + def getGrouping() = groupings + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataStreamGroupAggregate( + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + groupings) + } + + override def toString: String = { + s"Aggregate(${ + if (!groupings.isEmpty) { + s"groupBy: (${groupingToString(inputType, groupings)}), " + } else { + "" + } + }select:(${aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, groupings), !groupings.isEmpty) + .item("select", aggregationToString(inputType, groupings, getRowType, namedAggregates, Nil)) + } + + override def translateToPlan(tableEnv: StreamTableEnvironment): DataStream[Row] = { + + val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv) + + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) + + val aggString = aggregationToString( + inputType, + groupings, + getRowType, + namedAggregates, + Nil) + + val keyedAggOpName = s"groupBy: (${groupingToString(inputType, groupings)}), " + + s"select: ($aggString)" + + val processFunction = AggregateUtil.createGroupAggregateFunction( + namedAggregates, + inputType, + groupings) + + inputDS + .keyBy(groupings: _*) --- End diff -- I think we can easily support the case of non-grouped aggregates as well. We use `.keyBy(new NullByteKeySelector[Row])` to send all data to a single instance of the process function. This is also done in `DataStreamOverAggregate` to implement the non-partitioned OVER windows. > DataStream unbounded groupby aggregate with early firing > -------------------------------------------------------- > > Key: FLINK-6216 > URL: https://issues.apache.org/jira/browse/FLINK-6216 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Shaoxuan Wang > Assignee: Shaoxuan Wang > > Groupby aggregate results in a replace table. For infinite groupby aggregate, > we need a mechanism to define when the data should be emitted (early-fired). > This task is aimed to implement the initial version of unbounded groupby > aggregate, where we update and emit aggregate value per each arrived record. > In the future, we will implement the mechanism and interface to let user > define the frequency/period of early-firing the unbounded groupby aggregation > results. > The limit space of backend state is one of major obstacles for supporting > unbounded groupby aggregate in practical. Due to this reason, we suggest two > common (and very useful) use-cases of this unbounded groupby aggregate: > 1. The range of grouping key is limit. In this case, a new arrival record > will either insert to state as new record or replace the existing record in > the backend state. The data in the backend state will not be evicted if the > resource is properly provisioned by the user, such that we can provision the > correctness on aggregation results. > 2. When the grouping key is unlimited, we will not be able ensure the 100% > correctness of "unbounded groupby aggregate". In this case, we will reply on > the TTL mechanism of the RocksDB backend state to evicted old data such that > we can provision the correct results in a certain time range. -- This message was sent by Atlassian JIRA (v6.3.15#6346)