After enabling exactly once processing on a Kafka streams application, the following error appears in the logs:
ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer due to the following error: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key 222222 value some-value timestamp 1519200902670) to topic exactly-once-test-topic- v2 due to This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.UnknownProducerIdException We've reproduced the issue with a minimal test case where we move messages from a source stream to another stream without any transformation. The source stream contains millions of messages produced over several months. The KafkaStreams object is created with the following StreamsConfig: - StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once" - StreamsConfig.APPLICATION_ID_CONFIG = "Some app id" - StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1 - ProducerConfig.BATCH_SIZE_CONFIG = 102400 The app is able to process some messages before the exception occurs. After restarting the app, the processing of messages continues for a while before the exception occurs again. Context information: - we're running a 5 node Kafka 1.1.0 cluster with 5 zookeeper nodes. - there are multiple instances of the app running 1. Has anyone seen this problem before or can give us any hints about what might be causing this behaviour? 2. How should problems like this be treated? Sincerely Odin