After enabling exactly once processing on a Kafka streams application, the 
following error appears in the logs:

ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer
due to the following error:

org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort
sending since an error caught with a previous record (key 222222 value
some-value timestamp 1519200902670) to topic exactly-once-test-topic-
v2 due to This exception is raised by the broker if it could not
locate the producer metadata associated with the producerId in
question. This could happen if, for instance, the producer's records
were deleted because their retention time had elapsed. Once the last
records of the producerId are removed, the producer's metadata is
removed from the broker, and future appends by the producer will
return this exception.
  at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
  at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
  at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
  at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
  at 
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
  at 
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
  at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
  at 
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596)
  at 
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
  at 
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
  at 
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
  at 
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
  at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
  at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
  at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.UnknownProducerIdException

We've reproduced the issue with a minimal test case where we move messages from 
a source stream to another stream without any transformation. The source stream 
contains millions of messages produced over several months. The KafkaStreams 
object is created with the following StreamsConfig:

- StreamsConfig.PROCESSING_GUARANTEE_CONFIG = "exactly_once"
- StreamsConfig.APPLICATION_ID_CONFIG = "Some app id"
- StreamsConfig.NUM_STREAM_THREADS_CONFIG = 1
- ProducerConfig.BATCH_SIZE_CONFIG = 102400

The app is able to process some messages before the exception occurs. After 
restarting the app, the processing of messages continues for a while before the 
exception occurs again.

Context information:

- we're running a 5 node Kafka 1.1.0 cluster with 5 zookeeper nodes.
- there are multiple instances of the app running

1. Has anyone seen this problem before or can give us any hints about what 
might be causing this behaviour?

2. How should problems like this be treated?

Sincerely
Odin

Reply via email to