Salva created FLINK-29480:
-----------------------------

             Summary: Skip invalid messages when writing
                 Key: FLINK-29480
                 URL: https://issues.apache.org/jira/browse/FLINK-29480
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
            Reporter: Salva


As reported in [1], it seems that it's not possible to skip messages when 
writing. More specifically, if there is an error serializing messages, there is 
no option for skipping them and then Flink job enters a crash loop. In 
particular, the `write` method of the `KafkaWriter` looks like this:

 
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
  final ProducerRecord<byte[], byte[]> record =
      recordSerializer.serialize(element, ...);
  currentProducer.send(record, deliveryCallback); // line 200
  numRecordsSendCounter.inc();
} {code}
So, If you make your `serialize` method return `null`, this is what you get at 
runtime

 

 
{code:java}
java.lang.NullPointerException at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:906) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:885) 
at 
org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:200) 
at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:158)
  {code}
 

What I propose is to modify the KafkaWriter like this:
{code:java}
@Override
public void write(IN element, Context context) throws IOException {
  final ProducerRecord<byte[], byte[]> record =
  recordSerializer.serialize(element, ...);
  if (record != null) { // skip null records
    currentProducer.send(record, deliveryCallback);
    numRecordsSendCounter.inc();
  }
} {code}
In order to at least give a chance of skipping those messages and move on to 
the next.

Obviously, one could prepend the sink with a flatMap operator for filtering out 
invalid messages, but
 # It looks weird that one has to prepend an operator for "making sure" that 
the serializer will not fail, wouldn't it be simpler to simply let it fail in 
order to avoid this pre-check which would be basically repeating the same 
logic? I might be missing something and maybe there is an intrinsic reason why 
it's assumed that the serializer should not fail in the first place...
 # It's such a simple change (apparently)
 # Brings consistency/symmetry with the reading case [5, 6]

To expand on point 3, by looking at `KafkaDeserializationSchema`:
{code:java}
// To be overridden by the user
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception; default 
void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out) 
throws Exception {
  T deserialized = deserialize(message);
  if (deserialized != null) { <-- skip invalid messages
    out.collect(deserialized);
  }
}  {code}
one can simply return `null` in the overriden `deserialize` method in order to 
skip any message that fails to be deserialized. Similarly, if one uses the 
`KafkaRecordDeserializationSchema` interface instead:
{code:java}
void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) 
throws IOException {code}
Then it's possible not to invoke `out.collect(...)` on null records in order to 
skip them. To me, it looks strange that the same flexibility is not given on 
the writing case.

 

*References*

[1] [https://lists.apache.org/thread/ykmy4llovrrrzlvz0ng3x5yosskjg70h]

[2] 
[https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#port-kafkasink-to-new-unified-sink-api-flip-143]
 

[3] 
[KafkaWriter.java|[https://github.com/apache/flink/blob/f0fe85a50920da2b7d7da815db0a924940522e28/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L196]]
 

[5] 
[https://stackoverflow.com/questions/55538736/how-to-skip-corrupted-messages-in-flink]
 

[6] [https://lists.apache.org/thread/pllv5dqq27xkvj6p3lj91vcz409pw38d] 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to