Hello everyone,

I've another problem unrelated to the previous one so I'm creating another
thread
We've a stream application that reads from a topic, read/writes from 3
different stores and writes the output to another topic. All with exactly
once processing guarantee enabled.

Due to a bug in the producer logic that was sending messages to the topic
our stream is reading from, it was sending over and over the same data with
increasing timestamps.
For example it was sending data with an initial timestamp of today at 12:00
until 15:00 and then 12:00 again over and over creating a cycle in the
timestamps.

In our stream configuration we're using a timestamp extractor that reads
the message from the timestamp json.

Since this bugged producer was sending data in batches every few minutes
our stream was dying every few minutes with this error:

[kafka-producer-network-thread |
my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1-0_0-producer]
ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl -
task [0_0] Error sending record to topic
my-app-orders-pipeline-order-unique-emitter-changelog due to This exception
is raised by the broker if it could not locate the producer metadata
associated with the producerId in question. This could happen if, for
instance, the producer's records were deleted because their retention time
had elapsed. Once the last records of the producerId are removed, the
producer's metadata is removed from the broker, and future appends by the
producer will return this exception.; No more records will be sent and no
more offsets will be recorded for this task. Enable TRACE logging to view
failed record key and value.
org.apache.kafka.common.errors.UnknownProducerIdException: This exception
is raised by the broker if it could not locate the producer metadata
associated with the producerId in question. This could happen if, for
instance, the producer's records were deleted because their retention time
had elapsed. Once the last records of the producerId are removed, the
producer's metadata is removed from the broker, and future appends by the
producer will return this exception.
[my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1]
ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks -
stream-thread
[my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1]
Failed to commit stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending
since an error caught with a previous record (key 59 value [B@16e88816
timestamp 1568779124999) to topic
my-app-orders-pipeline-order-unique-emitter-changelog due to
org.apache.kafka.common.errors.UnknownProducerIdException: This exception
is raised by the broker if it could not locate the producer metadata
associated with the producerId in question. This could happen if, for
instance, the producer's records were deleted because their retention time
had elapsed. Once the last records of the producerId are removed, the
producer's metadata is removed from the broker, and future appends by the
producer will return this exception.
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201)
at
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1318)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
at
org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:720)
at
org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:706)
at
org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:663)
at
org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:585)
at
org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:73)
at
org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:789)
at
org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553)
at
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
at java.lang.Thread.run(Thread.java:748)

Sometimes the error referred the first store changelog in our stream,
sometimes another one.
I've found https://issues.apache.org/jira/browse/KAFKA-6817 this issue but
it doesn't seems to be related because:
 - the timestamps used are from today so way within the 4 weeks retention
set for the changelog topic (which also uses compact as policy not delete)
 - the stream correctly processed data from that key over and over so I
don't think the producer ID has expired multiple times in minutes

Any help is very appreciated

--
Alessandro Tagliapietra

Reply via email to