Hi Yinhua,

Flink needs to know how to serialize and deserialize a type `T`. If you are using a type variable here, Flink can not derive the type information.

You need to override org.apache.flink.table.functions.AggregateFunction#getResultType and return type information that matches.

Regards,
Timo



Am 04.01.19 um 10:28 schrieb yinhua.dai:
Hi Chesnay,

Maybe you misunderstand my question.
I have below code:
public class MyMaxAggregation<T> extends AggregateFunction<T,
MyMaxAggregation.MyAccumulator> {
   @Override
   public MyAccumulator createAccumulator() {
     return new MyAccumulator();
   }

   @Override
   public T getValue(MyAccumulator accumulator) {
     return null;
   }

   static class MyAccumulator {
     double maxValue;
   }

}

But tableEnv.registerFunction("MYMAX", new MyMaxAggregation<Integer>());
will throw exception as below:
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: Type of
TypeVariable 'T' in 'class com.tr.apt.test.MyMaxAggregation' could not be
determined. This is most likely a type erasure problem. The type extraction
currently supports types with generic variables only in cases where all
variables in the return type can be deduced from the input type(s).
Otherwise the type has to be specified explicitly using type information.
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
        at
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:762)
        at
org.apache.flink.table.api.java.StreamTableEnvironment.registerFunction(StreamTableEnvironment.scala:482)
        at com.tr.apt.test.StreamingJob.main(StreamingJob.java:52)




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to