    @@ -161,13 +182,119 @@ class AggregationCodeGenerator(
    +    /**
    +      * Create DataView Term, for example, acc1_map_dataview.
    +      *
    +      * @param aggIndex index of aggregate function
    +      * @param fieldName field name of DataView
    +      * @return term to access [[MapView]] or [[ListView]]
    +      */
    +    def createDataViewTerm(aggIndex: Int, fieldName: String): String = {
    +      s"acc${aggIndex}_${fieldName}_dataview"
    +    }
    +    /**
    +      * Adds a reusable [[org.apache.flink.table.api.dataview.DataView]] 
to the open, cleanup,
    +      * close and member area of the generated function.
    +      *
    +      */
    +    def addReusableDataViews: Unit = {
    +      if (accConfig.isDefined) {
    +        val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get
    +          .flatMap(specs => => (, s.toStateDescriptor)))
    +          .toMap[String, StateDescriptor[_, _]]
    +        for (i <- aggs.indices) yield {
    +          for (spec <- accConfig.get(i)) yield {
    +            val dataViewField = spec.field
    +            val dataViewTypeTerm = dataViewField.getType.getCanonicalName
    +            val desc = descMapping.getOrElse(,
    +              throw new CodeGenException(s"Can not find DataView in 
accumulator by id: ${}"))
    +            // define the DataView variables
    +            val serializedData = AggregateUtil.serialize(desc)
    --- End diff --
    move `serialize` method to this class and rename to 

