Hi Prateek,

I agree, the reader should ideally expose the context to record metrics
about deserialization. One option is to defer deserialization to another
operator, say a RichMapFunction that has access to the RuntimeContext.

Best,
Mason

On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli <prateekkohli2...@gmail.com>
wrote:

> Hi,
>
> I need to get the difference between records which are collected by the
> source and the records which are emitted.
> For eg - If deserialization fails while reading records from kafka, in
> that case I want to expose the difference between records collected from
> Kafka Broker and records emitted from Kafka operator after deserialization
> as a metric.
>
> But I think flink does not provide any such metrics.
>
> In Kafka Source I can have a workaround to get this metric:
>
> I can override the open method from KafkaRecordDeserializationSchema where
> a metric can be added to show decoding failures:
>
>     @Override
>     public void open(DeserializationSchema.InitializationContext context)
> throws Exception {
>     context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>()
>         {
>             @Override
>             public Integer getValue()
>             {
>                 return decodingFailures;
>             }
>         });
>     }
>
> and at the time of deserialization I can increment that counter as below:
>
>     @Override
>     public void deserialize(ConsumerRecord<byte[], byte[]> record,
> Collector<T> out)
>     {
>         try
>         {
>             //deserialize
>         }
>         catch (IOException | MMException e)
>         {
>             logger.error(String.format("Error received while decoding, in
> partition [%d] for topic [%s] at offset [%d]: %s",
>                 partition, topic, offset, e.toString()));
>
>             decodingFailures++;
> }
>
> *But there is no such way to implement this in FileSource, as
> SimpleStreamFormat/Reader does not provide access to Context in any way.*
>
> *Is there any way I can get this metric in both File & Kafka Collector or
> any generic way to get this agnostic to what collector is being used?*
>
> Regards,
> Prateek Kohli
>

Reply via email to