Hello everyone, I've another problem unrelated to the previous one so I'm creating another thread We've a stream application that reads from a topic, read/writes from 3 different stores and writes the output to another topic. All with exactly once processing guarantee enabled.
Due to a bug in the producer logic that was sending messages to the topic our stream is reading from, it was sending over and over the same data with increasing timestamps. For example it was sending data with an initial timestamp of today at 12:00 until 15:00 and then 12:00 again over and over creating a cycle in the timestamps. In our stream configuration we're using a timestamp extractor that reads the message from the timestamp json. Since this bugged producer was sending data in batches every few minutes our stream was dying every few minutes with this error: [kafka-producer-network-thread | my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1-0_0-producer] ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl - task [0_0] Error sending record to topic my-app-orders-pipeline-order-unique-emitter-changelog due to This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.; No more records will be sent and no more offsets will be recorded for this task. Enable TRACE logging to view failed record key and value. org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1] ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - stream-thread [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1] Failed to commit stream task 0_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key 59 value [B@16e88816 timestamp 1568779124999) to topic my-app-orders-pipeline-order-unique-emitter-changelog due to org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1318) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:720) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:706) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:663) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:585) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:73) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:789) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) at java.lang.Thread.run(Thread.java:748) Sometimes the error referred the first store changelog in our stream, sometimes another one. I've found https://issues.apache.org/jira/browse/KAFKA-6817 this issue but it doesn't seems to be related because: - the timestamps used are from today so way within the 4 weeks retention set for the changelog topic (which also uses compact as policy not delete) - the stream correctly processed data from that key over and over so I don't think the producer ID has expired multiple times in minutes Any help is very appreciated -- Alessandro Tagliapietra