Thanks for the suggestion, Yun! On Sun, May 31, 2020 at 11:15 PM Yun Gao <yungao...@aliyun.com> wrote:
> Hi Yu, > > I think when the serializer returns *null, *the following operator should > still receive a record of null. A possible thought is that the following > operator may couting the number of null records received and use a metric > to publish the value to a monitor system, and the monitor system promethus, > and the monitor system should be able to configure alert conditions. > > If *null* has problems, a special indicating object instance may be > created like NULL_TBASE, and the operator should be able to count the > number of NULL_TBASE received. > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Yu Yang <yuyan...@gmail.com> > *Send Date:*Mon Jun 1 06:37:35 2020 > *Recipients:*user <user@flink.apache.org> > *Subject:*best practice for handling corrupted records / exceptions in > custom DefaultKryoSerializer? > >> Hi all, >> >> To deal with corrupted messages that can leak into the data source once >> in a while, we implement a custom DefaultKryoSerializer class as below that >> catches exceptions. The custom serializer returns null in read(...) method >> when it encounters exception in reading. With this implementation, the >> serializer may silently drop records. One concern is that it may drop too >> many records before we notice and take actions. What is the best practice >> to handle this? >> >> The serializer processes one record at a time. Will reading a corrupted >> record make the serialize fail to process the next valid record? >> >> public class CustomTBaseSerializer extends TBaseSerializer { >> private static final Logger LOG = LoggerFactory.getLogger >> (CustomTBaseSerializer.class); >> @Override >> public void write(Kryo kryo, Output output, TBase tBase) { >> try { >> super.write(kryo, output, tBase); >> } catch (Throwable t) { >> LOG.error("Failed to write due to unexpected Throwable", t); >> } >> } >> >> @Override >> public TBase read(Kryo kryo, Input input, Class<TBase> tBaseClass) { >> try { >> return super.read(kryo, input, tBaseClass); >> } catch (Throwable t) { >> LOG.error("Failed to read from input due to unexpected >> Throwable", t); >> return null; >> } >> } >> } >> >> Thank you! >> >> Regards, >> -Yu >> >