Hi Prateek, I agree, the reader should ideally expose the context to record metrics about deserialization. One option is to defer deserialization to another operator, say a RichMapFunction that has access to the RuntimeContext.
Best, Mason On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli <prateekkohli2...@gmail.com> wrote: > Hi, > > I need to get the difference between records which are collected by the > source and the records which are emitted. > For eg - If deserialization fails while reading records from kafka, in > that case I want to expose the difference between records collected from > Kafka Broker and records emitted from Kafka operator after deserialization > as a metric. > > But I think flink does not provide any such metrics. > > In Kafka Source I can have a workaround to get this metric: > > I can override the open method from KafkaRecordDeserializationSchema where > a metric can be added to show decoding failures: > > @Override > public void open(DeserializationSchema.InitializationContext context) > throws Exception { > context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>() > { > @Override > public Integer getValue() > { > return decodingFailures; > } > }); > } > > and at the time of deserialization I can increment that counter as below: > > @Override > public void deserialize(ConsumerRecord<byte[], byte[]> record, > Collector<T> out) > { > try > { > //deserialize > } > catch (IOException | MMException e) > { > logger.error(String.format("Error received while decoding, in > partition [%d] for topic [%s] at offset [%d]: %s", > partition, topic, offset, e.toString())); > > decodingFailures++; > } > > *But there is no such way to implement this in FileSource, as > SimpleStreamFormat/Reader does not provide access to Context in any way.* > > *Is there any way I can get this metric in both File & Kafka Collector or > any generic way to get this agnostic to what collector is being used?* > > Regards, > Prateek Kohli >