[ https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16083456#comment-16083456 ]
ASF GitHub Bot commented on FLINK-6888: --------------------------------------- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/4105#discussion_r126865636 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java --- @@ -59,7 +59,7 @@ public boolean requiresOver() { /** * Accumulator for WeightedAvg. */ - public static class WeightedAvgAccum extends Tuple2<Long, Integer> { --- End diff -- TypeExtractor will recognize `WeightedAvgAccum` as a `TupleType` which will ignore the `sum` and `count` fields when de/serialize. So I changed it as a POJO class. This should not be included in this PR. But I fixed it when I find it. > Can not determine TypeInformation of ACC type of AggregateFunction when ACC > is a Scala case/tuple class > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-6888 > URL: https://issues.apache.org/jira/browse/FLINK-6888 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Jark Wu > Fix For: 1.4.0 > > > Currently the {{ACC}} TypeInformation of > {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted > using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or > tuple class, the TypeInformation will fall back to {{GenericType}} which > result in bad performance when state de/serialization. > I suggest to extract the ACC TypeInformation when called > {{TableEnvironment.registerFunction()}}. > Here is an example: > {code} > case class Accumulator(sum: Long, count: Long) > class MyAgg extends AggregateFunction[Long, Accumulator] { > //Overloaded accumulate method > def accumulate(acc: Accumulator, value: Long): Unit = { > } > override def createAccumulator(): Accumulator = Accumulator(0, 0) > override def getValue(accumulator: Accumulator): Long = 1 > } > {code} > The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)