Hi Forideal,

luckily these problems will belong to the past in Flink 1.12 when UDAF are updated to the new type system [1]. Lists will be natively supported and registering custom KryoSerializers consistently as well.

Until then, another workaround is to override getAccumulatorType() and define the PojoTypeInfo of ConcatString manually and e.g. replace the GenericTypeInfo<List> with a proper org.apache.flink.api.java.typeutils.ListTypeInfo.

I hope this helps.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-15803

On 15.08.20 08:07, forideal wrote:
Hi Robert Metzger,

        I am very happy to share my code,


     public class ConcatString {
         public List<String>list =new ArrayList<>();

public void add(String toString) {
             if (list !=null) {
                 if (list.size() <100) {
                     list.add(toString);
}
             }
         }
     }

    > Are you registering your custom types in the ExecutionConfig? (If so, it increases the chances of this error to happen)     Let me describe my scenario. We have built a SQL platform based on Flink, hoping to support user-defined UDF/UDAF, hoping that users only submit SQL and do not need to customize other codes. As for the serialization problem, it does exist.

     I currently work around this problem like this
    First :this.env.getConfig().registerTypeWithKryoSerializer(ArrayList.class, org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer.class);
     Second:ConcatString inherits Arraylist


public class ConcatString extends ArrayList<String> {

@Override
public boolean add(String toString) {
         if (this.size() <1000) {
             super.add(toString);
return true;
}
         return false;
}

     public List<String>getList() {
         return this;
}

}


Best forideal



At 2020-08-14 21:46:45, "Robert Metzger" <[email protected]> wrote:

    Hi Forideal,

    When using RocksDB, we need to serialize the data (to store it on
    disk), whereas when using the memory backend, the data (in this
    case RedConcat.ConcatString instances) is on the heap, thus we won't
    run into this issue.

    Are you registering your custom types in the ExecutionConfig? (If
    so, it increases the chances of this error to happen)

    Could you share the code of RedConcat.ConcatString as well?

    I would not be surprised if this is a bug in Flink. Using a UDAF
    with custom types is probably not a very common use case.

    Best,
    Robert



    On Fri, Aug 14, 2020 at 12:39 PM forideal <[email protected]
    <mailto:[email protected]>> wrote:

             Hi
                      I wrote a UDAF referring to this article
        
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#aggregation-functions,
        when using in-memory state, the task can run normally. However,
        When I chose rocksdb as the state backend, I encountered this
        error. Thank you for helping me see this problem.

        The following is the error content:
        com.esotericsoftware.kryo.KryoException: Encountered
        unregistered class ID: 87
        Serialization trace:
        list (com.red.data.platform.RedConcat$ConcatString)
             at
        
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
             at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
             at
        
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
             at
        
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
             at
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
             at
        
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
             at org.apache.flink.util.InstantiationUtil.deserializeFromByt

        public class RedConcatextends AggregateFunction<String, 
RedConcat.ConcatString> {

             public class ConcatString {
                 public List<String>list =new ArrayList<>();

        public void add(String toString) {
                     if (list !=null) {
                         if (list.size() <100) {
                             list.add(toString);
        }
                     }
                 }
             }

             @Override
        public boolean isDeterministic() {
                 return false;
        }

             @Override
        public ConcatStringcreateAccumulator() {
                 return new ConcatString();
        }

             @Override
        public void open(FunctionContext context)
                     throws Exception {
             }



        Best forideal





Reply via email to