[ 
https://issues.apache.org/jira/browse/FLINK-6888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051962#comment-16051962
 ] 

ASF GitHub Bot commented on FLINK-6888:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4105#discussion_r122448432
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 ---
    @@ -358,20 +358,27 @@ abstract class TableEnvironment(val config: 
TableConfig) {
         * Registers an [[AggregateFunction]] under a unique name. Replaces 
already existing
         * user-defined functions under this name.
         */
    -  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, 
ACC](
    +  private[flink] def registerAggregateFunctionInternal[T: TypeInformation, 
ACC: TypeInformation](
           name: String, function: AggregateFunction[T, ACC]): Unit = {
         // check if class not Scala object
         checkNotSingleton(function.getClass)
         // check if class could be instantiated
         checkForInstantiation(function.getClass)
     
    -    val typeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val resultTypeInfo: TypeInformation[_] = implicitly[TypeInformation[T]]
    +    val accTypeInfo: TypeInformation[_] = implicitly[TypeInformation[ACC]]
     
         // register in Table API
         functionCatalog.registerFunction(name, function.getClass)
     
         // register in SQL API
    -    val sqlFunctions = createAggregateSqlFunction(name, function, 
typeInfo, typeFactory)
    +    val sqlFunctions = createAggregateSqlFunction(
    --- End diff --
    
    To be honest, I don't think it is a nice approach to add the acc type to 
the internal representation of UDAGGs. It is not related to the logic of the 
function or required for the optimization but rather an internal runtime aspect.
    
    An alternative could be to wrap the functions registered from Scala in a 
class that extends the `AggregateFunction` interface and holds the original agg 
function and the acc type information (exposed via the getAccumulatorType()` 
method). We would need to unwrap it before we translate it. It's not a super 
nice solution either, but would probably require fewer changes.
    
    What do you think?


> Can not determine TypeInformation of ACC type of AggregateFunction when ACC 
> is a Scala case/tuple class
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-6888
>                 URL: https://issues.apache.org/jira/browse/FLINK-6888
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>             Fix For: 1.4.0
>
>
> Currently the {{ACC}} TypeInformation of 
> {{org.apache.flink.table.functions.AggregateFunction[T, ACC]}} is extracted 
> using {{TypeInformation.of(Class)}}. When {{ACC}} is a Scala case class or 
> tuple class, the TypeInformation will fall back to {{GenericType}} which 
> result in bad performance when state de/serialization. 
> I suggest to extract the ACC TypeInformation when called 
> {{TableEnvironment.registerFunction()}}.
> Here is an example:
> {code}
> case class Accumulator(sum: Long, count: Long)
> class MyAgg extends AggregateFunction[Long, Accumulator] {
>   //Overloaded accumulate method
>   def accumulate(acc: Accumulator, value: Long): Unit = {
>   }
>   override def createAccumulator(): Accumulator = Accumulator(0, 0)
>   override def getValue(accumulator: Accumulator): Long = 1
> }
> {code}
> The {{Accumulator}} will be recognized as {{GenericType<Accumulator>}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to