[ https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16066803#comment-16066803 ]
ASF GitHub Bot commented on FLINK-6888: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/4105#discussion_r124575144 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -286,13 +286,21 @@ object UserDefinedFunctionUtils { def createAggregateSqlFunction( name: String, aggFunction: AggregateFunction[_, _], - typeInfo: TypeInformation[_], + extractedResultTypeInfo: TypeInformation[_], + accTypeInfo: TypeInformation[_], typeFactory: FlinkTypeFactory) : SqlFunction = { //check if a qualified accumulate method exists before create Sql function checkAndExtractMethods(aggFunction, "accumulate") - val resultType: TypeInformation[_] = getResultTypeOfAggregateFunction(aggFunction, typeInfo) - AggSqlFunction(name, aggFunction, resultType, typeFactory, aggFunction.requiresOver) + val resultType = getResultTypeOfAggregateFunction(aggFunction, extractedResultTypeInfo) --- End diff -- `getResultTypeOfAggregateFunction` was called before in `TableEnvironment`. Can we remove it here? If not, we also might want to double check the acc type. > Can not determine TypeInformation of ACC type of AggregateFunction when ACC > is a Scala case/tuple class > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-6888 > URL: https://issues.apache.org/jira/browse/FLINK-6888 > Project: Flink > Issue Type: Bug > Components: Table API & SQL > Reporter: Jark Wu > Assignee: Jark Wu > Fix For: 1.4.0 > > > Currently the {{ACC}} TypeInformation of > {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted > using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or > tuple class, the TypeInformation will fall back to {{GenericType}} which > result in bad performance when state de/serialization. > I suggest to extract the ACC TypeInformation when called > {{TableEnvironment.registerFunction()}}. > Here is an example: > {code} > case class Accumulator(sum: Long, count: Long) > class MyAgg extends AggregateFunction[Long, Accumulator] { > //Overloaded accumulate method > def accumulate(acc: Accumulator, value: Long): Unit = { > } > override def createAccumulator(): Accumulator = Accumulator(0, 0) > override def getValue(accumulator: Accumulator): Long = 1 > } > {code} > The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)