[ 
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16055700#comment-16055700
 ] 

ASF GitHub Bot commented on FLINK-6888:
---------------------------------------

Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122964903
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: 
TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces 
already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, 
ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, 
ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, 
typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    @fhueske thanks for your explanation. I tried to wrap the UDAGG in an 
`AggregateFunctionWrapper`, but find that it is not an easy way. Because I 
can't override the user-defined contract methods, such as `accumulate`, 
`retract`, `merge`. And in code generation, we generate the acc type class 
depend on `createAccumulator` method return type, but the return type of 
`createAccumulator` of `AggregateFunctionWrapper` can only be `Any` which will 
result in error.
    
    In addition, I plan to support composite result type for UDAGG. This also 
need the return type not only to be known to Calcite for semantic validation 
but also to be known for code generation.


> Can not determine TypeInformation of ACC type of AggregateFunction when ACC 
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of 
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted 
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or 
> tuple class, the TypeInformation will fall back to {{GenericType}} which 
> result in bad performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called 
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to