Chirag Dewan created FLINK-33290:
------------------------------------

             Summary: Custom counters to capture encoding/decoding failure in 
flink
                 Key: FLINK-33290
                 URL: https://issues.apache.org/jira/browse/FLINK-33290
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Metrics
            Reporter: Chirag Dewan


I need to get the difference between records that are collected by the source 
and the records that are emitted. (To capture deserialization failure)

Similarly, the difference between the records that have been received by sink 
and the records sent out of the sink. (To capture serialization failures)


For e.g. - If deserialization fails while reading records from kafka, in that 
case, I want to expose the difference between records collected from Kafka 
Broker and records emitted from Kafka operator after deserialization as a 
metric.

But I think flink does not provide any such metrics.

In Kafka Source I can have a workaround to get this metric:

I can override the open method from KafkaRecordDeserializationSchema where a 
metric can be added to show decoding failures:

    @Override
    public void open(DeserializationSchema.InitializationContext context) 
throws Exception {
    context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>()
        {
            @Override
            public Integer getValue()
            {
                return decodingFailures;
            }
        });
    }

and at the time of deserialization I can increment that counter as below:

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> 
out)
    {
        try
        {
            //deserialize
        }
        catch (IOException | MMException e)
        {
            logger.error(String.format("Error received while decoding, in 
partition [%d] for topic [%s] at offset [%d]: %s",            
                partition, topic, offset, e.toString()));
           
            decodingFailures++;
}

*But there is no such way to implement this in FileSource, as 
SimpleStreamFormat/Reader does not provide access to Context in any way.*

*Similarly, I did not find any way to expose serialization related metrics in 
any of the sinks as well.*

Would it be possible to provide a way to implement custom counters to count 
serialization/deserialization failures in all Flink connectors (sinks & 
sources)?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to