Hi Yu,

I think when the serializer returns null, the following operator should still 
receive a record of null. A possible thought is that the following operator may 
couting the number of null records received and use a metric to publish the 
value to a monitor system, and the monitor system promethus, and the monitor 
system should be able to configure alert conditions.

If null has problems, a special indicating object instance may be created like 
NULL_TBASE, and the operator should be able to count the number of NULL_TBASE 
received.

Best,
 Yun



 ------------------Original Mail ------------------
Sender:Yu Yang <yuyan...@gmail.com>
Send Date:Mon Jun 1 06:37:35 2020
Recipients:user <user@flink.apache.org>
Subject:best practice for handling corrupted records / exceptions in custom 
DefaultKryoSerializer?

Hi all, 

To deal with corrupted messages that can leak into the data source once in a 
while, we implement a custom DefaultKryoSerializer class as below that catches 
exceptions. The custom serializer returns null in read(...) method when it 
encounters exception in reading. With this implementation, the serializer may 
silently drop records.  One concern is that it may drop too many records before 
we notice and take actions. What is the best practice to handle this?  

The serializer processes one record at a time. Will reading a corrupted record 
make the serialize fail to process the next valid record?

public class CustomTBaseSerializer extends TBaseSerializer {
 private static final Logger LOG = 
LoggerFactory.getLogger(CustomTBaseSerializer.class);
 @Override
     public void write(Kryo kryo, Output output, TBase tBase) {
         try {
             super.write(kryo, output, tBase);
        } catch (Throwable t) {
             LOG.error("Failed to write due to unexpected Throwable", t);
        }
    }

     @Override
     public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) {
         try {
             return super.read(kryo, input, tBaseClass);
        } catch (Throwable t) {
             LOG.error("Failed to read from input due to unexpected Throwable", 
t);
             return null;
        }
     }
  }

Thank you!

Regards, 
-Yu

Reply via email to