Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135648347
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
    @@ -307,6 +312,92 @@ object UserDefinedFunctionUtils {
       // 
----------------------------------------------------------------------------------------------
     
       /**
    +    * Remove StateView fields from accumulator type information.
    +    *
    +    * @param index index of aggregate function
    +    * @param aggFun aggregate function
    +    * @param accType accumulator type information, only support pojo type
    +    * @param isStateBackedDataViews is data views use state backend
    +    * @return mapping of accumulator type information and data view config 
which contains id,
    +    *         field name and state descriptor
    +    */
    +  def removeStateViewFieldsFromAccTypeInfo(
    +    index: Int,
    +    aggFun: AggregateFunction[_, _],
    +    accType: TypeInformation[_],
    +    isStateBackedDataViews: Boolean)
    +  : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = {
    +
    +    var hasDataView = false
    +    val acc = aggFun.createAccumulator()
    +    accType match {
    +      case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 =>
    +        val arity = pojoType.getArity
    +        val newPojoFields = new util.ArrayList[PojoField]()
    +        val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]]
    +        for (i <- 0 until arity) {
    +          val pojoField = pojoType.getPojoFieldAt(i)
    +          val field = pojoField.getField
    +          val fieldName = field.getName
    +          field.setAccessible(true)
    +
    +          pojoField.getTypeInformation match {
    +            case map: MapViewTypeInfo[Any, Any] =>
    +              val mapView = field.get(acc).asInstanceOf[MapView[_, _]]
    +              if (mapView != null) {
    +                val keyTypeInfo = mapView.keyTypeInfo
    +                val valueTypeInfo = mapView.valueTypeInfo
    +                val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo 
!= null) {
    +                  hasDataView = true
    +                  new MapViewTypeInfo(keyTypeInfo, valueTypeInfo)
    +                } else {
    +                  map
    +                }
    +
    +                var spec = MapViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field 
which not use state backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case list: ListViewTypeInfo[Any] =>
    +              val listView = field.get(acc).asInstanceOf[ListView[_]]
    +              if (listView != null) {
    +                val elementTypeInfo = listView.elementTypeInfo
    +                val newTypeInfo = if (elementTypeInfo != null) {
    +                  hasDataView = true
    +                  new ListViewTypeInfo(elementTypeInfo)
    +                } else {
    +                  list
    +                }
    +
    +                var spec = ListViewSpec(
    +                  "agg" + index + "$" + fieldName, // generate unique name 
to be used as state name
    +                  field,
    +                  newTypeInfo)
    +
    +                accumulatorSpecs += spec
    +                if (!isStateBackedDataViews) { // add data view field 
which not use state backend
    +                  newPojoFields.add(new PojoField(field, newTypeInfo))
    +                }
    +              }
    +
    +            case _ => newPojoFields.add(pojoField)
    +          }
    +        }
    +        (new PojoTypeInfo(accType.getTypeClass, newPojoFields), 
Some(accumulatorSpecs))
    +
    +      case _ if !hasDataView => (accType, None)
    +      case _ => throw new TableException("MapView and ListView only 
support in PoJo class")
    --- End diff --
    
    This case will never be reached. `hasDataView` is only set to `true` in the 
`case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we 
come to this point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to