Seweryn Habdank-Wojewodzki created KAFKA-5779: -------------------------------------------------
Summary: Single message may exploit application based on KStream Key: KAFKA-5779 URL: https://issues.apache.org/jira/browse/KAFKA-5779 Project: Kafka Issue Type: Bug Affects Versions: 0.11.0.0, 0.10.2.1 Reporter: Seweryn Habdank-Wojewodzki Priority: Critical The context: in Kafka streamming I am *defining* simple KStream processing: {code} stringInput // line 54 of the SingleTopicStreamer class .filter( streamFilter::passOrFilterMessages ) .map( normalizer ) .to( outTopicName ); {code} For some reasons I got wrong message (I am still investigating what is the problem), but anyhow my services was exploited with FATAL error: {code} 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8, offset = 15, CreateTime = -1, serialized key size = -1, serialized value size = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.; [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63), org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61), org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46), org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85), org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117), org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464), org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650), org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556), org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)] in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3 {code} The possible reason about using old producer in message is false, as we are using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this version of Kafka. The sender application is .NET client from Confluent. All the matter is a bit problematic with this exception, as it was suggested it is thrown in scope of initialization of the stream, but effectively it happend in processing, so adding try{} catch {} around stringInput statement does not help, as stream was correctly defined, but only one message send later had exploited all the app. In my opinion KStream shall be robust enough to catch all such a exception and shall protect application from death due to single corrupted message. Especially when timestamp is not embedded. In such a case one can patch message with current timestamp without loss of overall performance. I would expect Kafka Stream will handle this. I will continue to investigate, what is the problem with the message, but it is quite hard to me, as it happens internally in Kafka stream combined with .NET producer. And I had already tested, that this problem does not occur when I got these concrete messages in old-fashioned Kafka Consumer :-). -- This message was sent by Atlassian JIRA (v6.4.14#64029)