Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4355#discussion_r135648347 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -307,6 +312,92 @@ object UserDefinedFunctionUtils { // ---------------------------------------------------------------------------------------------- /** + * Remove StateView fields from accumulator type information. + * + * @param index index of aggregate function + * @param aggFun aggregate function + * @param accType accumulator type information, only support pojo type + * @param isStateBackedDataViews is data views use state backend + * @return mapping of accumulator type information and data view config which contains id, + * field name and state descriptor + */ + def removeStateViewFieldsFromAccTypeInfo( + index: Int, + aggFun: AggregateFunction[_, _], + accType: TypeInformation[_], + isStateBackedDataViews: Boolean) + : (TypeInformation[_], Option[Seq[DataViewSpec[_]]]) = { + + var hasDataView = false + val acc = aggFun.createAccumulator() + accType match { + case pojoType: PojoTypeInfo[_] if pojoType.getArity > 0 => + val arity = pojoType.getArity + val newPojoFields = new util.ArrayList[PojoField]() + val accumulatorSpecs = new mutable.ArrayBuffer[DataViewSpec[_]] + for (i <- 0 until arity) { + val pojoField = pojoType.getPojoFieldAt(i) + val field = pojoField.getField + val fieldName = field.getName + field.setAccessible(true) + + pojoField.getTypeInformation match { + case map: MapViewTypeInfo[Any, Any] => + val mapView = field.get(acc).asInstanceOf[MapView[_, _]] + if (mapView != null) { + val keyTypeInfo = mapView.keyTypeInfo + val valueTypeInfo = mapView.valueTypeInfo + val newTypeInfo = if (keyTypeInfo != null && valueTypeInfo != null) { + hasDataView = true + new MapViewTypeInfo(keyTypeInfo, valueTypeInfo) + } else { + map + } + + var spec = MapViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + + accumulatorSpecs += spec + if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) + } + } + + case list: ListViewTypeInfo[Any] => + val listView = field.get(acc).asInstanceOf[ListView[_]] + if (listView != null) { + val elementTypeInfo = listView.elementTypeInfo + val newTypeInfo = if (elementTypeInfo != null) { + hasDataView = true + new ListViewTypeInfo(elementTypeInfo) + } else { + list + } + + var spec = ListViewSpec( + "agg" + index + "$" + fieldName, // generate unique name to be used as state name + field, + newTypeInfo) + + accumulatorSpecs += spec + if (!isStateBackedDataViews) { // add data view field which not use state backend + newPojoFields.add(new PojoField(field, newTypeInfo)) + } + } + + case _ => newPojoFields.add(pojoField) + } + } + (new PojoTypeInfo(accType.getTypeClass, newPojoFields), Some(accumulatorSpecs)) + + case _ if !hasDataView => (accType, None) + case _ => throw new TableException("MapView and ListView only support in PoJo class") --- End diff -- This case will never be reached. `hasDataView` is only set to `true` in the `case pojoType: PojoTypeInfo[_]` case. Hence, it will always be false when we come to this point.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---