Hi,

I need to get the difference between records which are collected by the
source and the records which are emitted.
For eg - If deserialization fails while reading records from kafka, in that
case I want to expose the difference between records collected from Kafka
Broker and records emitted from Kafka operator after deserialization as a
metric.

But I think flink does not provide any such metrics.

In Kafka Source I can have a workaround to get this metric:

I can override the open method from KafkaRecordDeserializationSchema where
a metric can be added to show decoding failures:

    @Override
    public void open(DeserializationSchema.InitializationContext context)
throws Exception {
    context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>()
        {
            @Override
            public Integer getValue()
            {
                return decodingFailures;
            }
        });
    }

and at the time of deserialization I can increment that counter as below:

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record,
Collector<T> out)
    {
        try
        {
            //deserialize
        }
        catch (IOException | MMException e)
        {
            logger.error(String.format("Error received while decoding, in
partition [%d] for topic [%s] at offset [%d]: %s",
                partition, topic, offset, e.toString()));

            decodingFailures++;
}

*But there is no such way to implement this in FileSource, as
SimpleStreamFormat/Reader does not provide access to Context in any way.*

*Is there any way I can get this metric in both File & Kafka Collector or
any generic way to get this agnostic to what collector is being used?*

Regards,
Prateek Kohli

Reply via email to