Thanks a lot for your response Mason. Is there any FLIP planned to expose context in Reader in future?
Regards, Prateek Kohli On Wed, Oct 11, 2023 at 6:03 AM Mason Chen <mas.chen6...@gmail.com> wrote: > Hi Prateek, > > I agree, the reader should ideally expose the context to record metrics > about deserialization. One option is to defer deserialization to another > operator, say a RichMapFunction that has access to the RuntimeContext. > > Best, > Mason > > On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli <prateekkohli2...@gmail.com> > wrote: > >> Hi, >> >> I need to get the difference between records which are collected by the >> source and the records which are emitted. >> For eg - If deserialization fails while reading records from kafka, in >> that case I want to expose the difference between records collected from >> Kafka Broker and records emitted from Kafka operator after deserialization >> as a metric. >> >> But I think flink does not provide any such metrics. >> >> In Kafka Source I can have a workaround to get this metric: >> >> I can override the open method from KafkaRecordDeserializationSchema >> where a metric can be added to show decoding failures: >> >> @Override >> public void open(DeserializationSchema.InitializationContext context) >> throws Exception { >> context.getMetricGroup().gauge("decodingFailures", new >> Gauge<Integer>() >> { >> @Override >> public Integer getValue() >> { >> return decodingFailures; >> } >> }); >> } >> >> and at the time of deserialization I can increment that counter as below: >> >> @Override >> public void deserialize(ConsumerRecord<byte[], byte[]> record, >> Collector<T> out) >> { >> try >> { >> //deserialize >> } >> catch (IOException | MMException e) >> { >> logger.error(String.format("Error received while decoding, in >> partition [%d] for topic [%s] at offset [%d]: %s", >> partition, topic, offset, e.toString())); >> >> decodingFailures++; >> } >> >> *But there is no such way to implement this in FileSource, as >> SimpleStreamFormat/Reader does not provide access to Context in any way.* >> >> *Is there any way I can get this metric in both File & Kafka Collector or >> any generic way to get this agnostic to what collector is being used?* >> >> Regards, >> Prateek Kohli >> >