[ https://issues.apache.org/jira/browse/FLINK-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16134776#comment-16134776 ]
ASF GitHub Bot commented on FLINK-7206: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r134137525 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1646,4 +1649,86 @@ abstract class CodeGenerator( fieldTerm } + + /** + * Adds a reusable class to the member area of the generated [[Function]]. + */ + def addReusableClass(clazz: Class[_], fieldTerm: String): Unit = { + val field = + s""" + |transient ${clazz.getCanonicalName} $fieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + } + + /** + * Adds a reusable [[DataViewConfig]] to the member area of the generated [[Function]]. + * + * @param indices indices of aggregate functions. + * @param ctxTerm field name of runtime context. + * @param accConfig data view config which contains id, field and StateDescriptos. + * @return statements to create [[MapView]] or [[ListView]]. + */ + def addReusableDataViewConfig( + indices: Range, + ctxTerm: String, + accConfig: Option[DataViewConfig]) + : String = { + if (accConfig.isDefined && accConfig.get.isStateBackedDataViews) { + val initDataViews = new StringBuilder + val descMapping: Map[String, StateDescriptor[_, _]] = accConfig.get.accSpecs + .flatMap(specs => specs.map(s => (s.id, s.toStateDescriptor))) + .toMap[String, StateDescriptor[_, _]] + + for (i <- indices) yield { + for (spec <- accConfig.get.accSpecs(i)) yield { + val dataViewField = spec.field + val dataViewTypeTerm = dataViewField.getType.getCanonicalName + val desc = descMapping.getOrElse(spec.id, + throw new CodeGenException(s"Can not find ListView in accumulator by id: ${spec.id}")) + + val serializedData = AggregateUtil.serialize(desc) + val dataViewFieldTerm = s"acc${i}_${dataViewField.getName}_dataview" + val field = + s""" + |transient $dataViewTypeTerm $dataViewFieldTerm = null; + |""".stripMargin + reusableMemberStatements.add(field) + + val descFieldTerm = s"${dataViewFieldTerm}_desc" + val descClassQualifier = classOf[StateDescriptor[_, _]].getCanonicalName + val descDeserialize = + s""" + | $descClassQualifier $descFieldTerm = ($descClassQualifier) + | ${AggregateUtil.getClass.getName.stripSuffix("$")} + | .deserialize("$serializedData"); + """.stripMargin + + val init = if (dataViewField.getType == classOf[MapView[_, _]]) { + s""" + | $descDeserialize + | $dataViewFieldTerm = + | org.apache.flink.table.dataview.StateViewUtils.createMapView($descFieldTerm, --- End diff -- I think we do not need the `StateViewUtils` here, we can create a MapView using code gen directly, because we already have the RuntimeContext and StateDescriptor. > Implementation of DataView to support state access for UDAGG > ------------------------------------------------------------ > > Key: FLINK-7206 > URL: https://issues.apache.org/jira/browse/FLINK-7206 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL > Reporter: Kaibo Zhou > Assignee: Kaibo Zhou > > Implementation of MapView and ListView to support state access for UDAGG. -- This message was sent by Atlassian JIRA (v6.4.14#64029)