Thanks a lot for your response Mason.

Is there any FLIP planned to expose context in Reader in future?

Regards,
Prateek Kohli

On Wed, Oct 11, 2023 at 6:03 AM Mason Chen <mas.chen6...@gmail.com> wrote:

> Hi Prateek,
>
> I agree, the reader should ideally expose the context to record metrics
> about deserialization. One option is to defer deserialization to another
> operator, say a RichMapFunction that has access to the RuntimeContext.
>
> Best,
> Mason
>
> On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli <prateekkohli2...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I need to get the difference between records which are collected by the
>> source and the records which are emitted.
>> For eg - If deserialization fails while reading records from kafka, in
>> that case I want to expose the difference between records collected from
>> Kafka Broker and records emitted from Kafka operator after deserialization
>> as a metric.
>>
>> But I think flink does not provide any such metrics.
>>
>> In Kafka Source I can have a workaround to get this metric:
>>
>> I can override the open method from KafkaRecordDeserializationSchema
>> where a metric can be added to show decoding failures:
>>
>>     @Override
>>     public void open(DeserializationSchema.InitializationContext context)
>> throws Exception {
>>     context.getMetricGroup().gauge("decodingFailures", new
>> Gauge<Integer>()
>>         {
>>             @Override
>>             public Integer getValue()
>>             {
>>                 return decodingFailures;
>>             }
>>         });
>>     }
>>
>> and at the time of deserialization I can increment that counter as below:
>>
>>     @Override
>>     public void deserialize(ConsumerRecord<byte[], byte[]> record,
>> Collector<T> out)
>>     {
>>         try
>>         {
>>             //deserialize
>>         }
>>         catch (IOException | MMException e)
>>         {
>>             logger.error(String.format("Error received while decoding, in
>> partition [%d] for topic [%s] at offset [%d]: %s",
>>                 partition, topic, offset, e.toString()));
>>
>>             decodingFailures++;
>> }
>>
>> *But there is no such way to implement this in FileSource, as
>> SimpleStreamFormat/Reader does not provide access to Context in any way.*
>>
>> *Is there any way I can get this metric in both File & Kafka Collector or
>> any generic way to get this agnostic to what collector is being used?*
>>
>> Regards,
>> Prateek Kohli
>>
>

Reply via email to