Chirag Dewan created FLINK-33290: ------------------------------------ Summary: Custom counters to capture encoding/decoding failure in flink Key: FLINK-33290 URL: https://issues.apache.org/jira/browse/FLINK-33290 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: Chirag Dewan
I need to get the difference between records that are collected by the source and the records that are emitted. (To capture deserialization failure) Similarly, the difference between the records that have been received by sink and the records sent out of the sink. (To capture serialization failures) For e.g. - If deserialization fails while reading records from kafka, in that case, I want to expose the difference between records collected from Kafka Broker and records emitted from Kafka operator after deserialization as a metric. But I think flink does not provide any such metrics. In Kafka Source I can have a workaround to get this metric: I can override the open method from KafkaRecordDeserializationSchema where a metric can be added to show decoding failures: @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { context.getMetricGroup().gauge("decodingFailures", new Gauge<Integer>() { @Override public Integer getValue() { return decodingFailures; } }); } and at the time of deserialization I can increment that counter as below: @Override public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) { try { //deserialize } catch (IOException | MMException e) { logger.error(String.format("Error received while decoding, in partition [%d] for topic [%s] at offset [%d]: %s", partition, topic, offset, e.toString())); decodingFailures++; } *But there is no such way to implement this in FileSource, as SimpleStreamFormat/Reader does not provide access to Context in any way.* *Similarly, I did not find any way to expose serialization related metrics in any of the sinks as well.* Would it be possible to provide a way to implement custom counters to count serialization/deserialization failures in all Flink connectors (sinks & sources)? -- This message was sent by Atlassian Jira (v8.20.10#820010)