Sounds correct to me; it's not a POJO so it is treated as a generic type, which go through Kryo.

If you want to be doubly-sure that your serializer is in fact used, add a log statement to the read/write methods.

On 09/03/2022 08:10, guoliubi...@foxmail.com wrote:

Hi,

I have an entity class built by Google Flatbuf, to raise the performance, I have tried written a serializer class.

public class TransactionSerializer extends Serializer<Transaction> {

    @Override

    public void write(Kryo kryo, Output output, Transaction transaction) {

        ByteBuffer byteBuffer = transaction.getByteBuffer();

        byte[] generated = new byte[byteBuffer.remaining()];

byteBuffer.get(generated);

output.writeInt(generated.length, true);

output.writeBytes(generated);

    }

    @Override

    public Transaction read(Kryo kryo, Input input, Class<Transaction> aClass) {

        int size = input.readInt(true);

        byte[] barr = new byte[size];

input.readBytes(barr);

        ByteBuffer buf = ByteBuffer.wrap(barr);

        return Transaction.getRootAsTransaction(buf);

    }

}

And register it to the runtime env before calling env.execute.

env.registerTypeWithKryoSerializer(Transaction.class, TransactionSerializer.class);

env.getConfig().addDefaultKryoSerializer(Transaction.class, TransactionSerializer.class);

After that, I executed my job, however, I can see the log like this.

2021-11-18 10:35:07,624 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - class org.ff.dto.flatbuff.Transaction does not contain a setter for field bb_pos

2021-11-18 10:35:07,624 INFO org.apache.flink.api.java.typeutils.TypeExtractor [] - Class class org.ff.dto.flatbuff.Transaction cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

It looks like the serializer is not working at all. So what’s the problem about this? I register the serializer in a wrong way? Or do I need to move the class to somewhere to make the flink classloader recognize it?

Thanks in advance.

Reply via email to