[ https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051960#comment-16051960 ]
ASF GitHub Bot commented on FLINK-6888: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4105#discussion_r122447580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/BatchTableEnvironment.scala --- @@ -192,10 +193,14 @@ class BatchTableEnvironment( name: String, f: AggregateFunction[T, ACC]) : Unit = { - implicit val typeInfo: TypeInformation[T] = TypeExtractor - .createTypeInfo(f, classOf[AggregateFunction[T, ACC]], f.getClass, 0) + implicit val typeInfo: TypeInformation[T] = UserDefinedFunctionUtils + .getResultTypeOfAggregateFunction(f) .asInstanceOf[TypeInformation[T]] + implicit val accTypeInfo: TypeInformation[ACC] = UserDefinedFunctionUtils --- End diff -- The correct extraction of TypeInformation for Scala classes only works in the Scala TableEnvironments but not for Java environments. Hence, UDAGGs that use Scala classes should not be used in Java queries because they will fall back to GenericTypes there. The safe and portable way is to use Java classes for accumulators. > Can not determine TypeInformation of ACC type of AggregateFunction when ACC > is a Scala case/tuple class > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-6888 > URL: https://issues.apache.org/jira/browse/FLINK-6888 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Jark Wu > Fix For: 1.4.0 > > > Currently the {{ACC}} TypeInformation of > {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted > using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or > tuple class, the TypeInformation will fall back to {{GenericType}} which > result in bad performance when state de/serialization. > I suggest to extract the ACC TypeInformation when called > {{TableEnvironment.registerFunction()}}. > Here is an example: > {code} > case class Accumulator(sum: Long, count: Long) > class MyAgg extends AggregateFunction[Long, Accumulator] { > //Overloaded accumulate method > def accumulate(acc: Accumulator, value: Long): Unit = { > } > override def createAccumulator(): Accumulator = Accumulator(0, 0) > override def getValue(accumulator: Accumulator): Long = 1 > } > {code} > The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)