Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r184039407 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -207,112 +241,164 @@ class AggregationCodeGenerator( } } + /** - * Create DataView Term, for example, acc1_map_dataview. - * - * @param aggIndex index of aggregate function - * @param fieldName field name of DataView - * @return term to access [[MapView]] or [[ListView]] + * Add all data view for all distinct filters defined by aggregation functions. */ - def createDataViewTerm(aggIndex: Int, fieldName: String): String = { - s"acc${aggIndex}_${fieldName}_dataview" + def addDistinctFilterDataViews(): Unit = { + val descMapping: Map[String, StateDescriptor[_, _]] = distinctAggs --- End diff -- can be moved into the `if` branch (like in `addAccumulatorDataViews`)
---