Yu Yang created FLINK-18017:
-------------------------------
Summary: improve Kafka connector to handle record deserialization
exception and report related metrics
Key: FLINK-18017
URL: https://issues.apache.org/jira/browse/FLINK-18017
Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.9.1
Reporter: Yu Yang
Corrupted messages can get into the message pipeline for various reasons. When
a Flink deserializer fails to deserialize the message, and throw an exception
due to corrupted message, the flink application will be blocked until we update
the deserializer to handle the exception.
Currently messages are deserialized as below in
flink_pinterest/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java
{code:java}
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(record);
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
running = false;
break;
}
// emit the actual record. this also updates offset state atomically
// and deals with timestamps and watermark generation
emitRecord(value, partition, record.offset(), record);
}
{code}
Flink Kafka connector needs to catch exception from deserialization, and expose
related metrics.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)