Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5555#discussion_r183866776 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -151,8 +157,15 @@ class AggregationCodeGenerator( } } - // initialize and create data views - addReusableDataViews() + // get distinct filter of acc fields for each aggregate functions + val distinctAccType = s"${classOf[DistinctAccumulator[_, _]].getName}" + val isDistinctAggs = distinctAggs.map(_.nonEmpty) --- End diff -- I think we don't need the type info of the accumulator to create the `MapViewTypeInfo`, but the type info of the input type of the aggregation function. The input types can be resolved from `physicalInputTypes` and `aggFields`.
---