Hi,
I need to get the difference between records which are collected by the
source and the records which are emitted.
For eg - If deserialization fails while reading records from kafka, in that
case I want to expose the difference between records collected from Kafka
Broker and records emitted from Kafka operator after deserialization as a
metric.
But I think flink does not provide any such metrics.
In Kafka Source I can have a workaround to get this metric:
I can override the open method from KafkaRecordDeserializationSchema where
a metric can be added to show decoding failures:
@Override
public void open(DeserializationSchema.InitializationContext context)
throws Exception {
context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>()
{
@Override
public Integer getValue()
{
return decodingFailures;
}
});
}
and at the time of deserialization I can increment that counter as below:
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record,
Collector<T> out)
{
try
{
//deserialize
}
catch (IOException | MMException e)
{
logger.error(String.format("Error received while decoding, in
partition [%d] for topic [%s] at offset [%d]: %s",
partition, topic, offset, e.toString()));
decodingFailures++;
}
*But there is no such way to implement this in FileSource, as
SimpleStreamFormat/Reader does not provide access to Context in any way.*
*Is there any way I can get this metric in both File & Kafka Collector or
any generic way to get this agnostic to what collector is being used?*
Regards,
Prateek Kohli