Hi, I need to get the difference between records which are collected by the source and the records which are emitted. For eg - If deserialization fails while reading records from kafka, in that case I want to expose the difference between records collected from Kafka Broker and records emitted from Kafka operator after deserialization as a metric.
But I think flink does not provide any such metrics. In Kafka Source I can have a workaround to get this metric: I can override the open method from KafkaRecordDeserializationSchema where a metric can be added to show decoding failures: @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>() { @Override public Integer getValue() { return decodingFailures; } }); } and at the time of deserialization I can increment that counter as below: @Override public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) { try { //deserialize } catch (IOException | MMException e) { logger.error(String.format("Error received while decoding, in partition [%d] for topic [%s] at offset [%d]: %s", partition, topic, offset, e.toString())); decodingFailures++; } *But there is no such way to implement this in FileSource, as SimpleStreamFormat/Reader does not provide access to Context in any way.* *Is there any way I can get this metric in both File & Kafka Collector or any generic way to get this agnostic to what collector is being used?* Regards, Prateek Kohli