Maybe related to https://issues.apache.org/jira/browse/KAFKA-7190
It's fixed in upcoming 2.4 release. -Matthias On 9/25/19 3:08 PM, Alessandro Tagliapietra wrote: > Hello everyone, > > I've another problem unrelated to the previous one so I'm creating another > thread > We've a stream application that reads from a topic, read/writes from 3 > different stores and writes the output to another topic. All with exactly > once processing guarantee enabled. > > Due to a bug in the producer logic that was sending messages to the topic > our stream is reading from, it was sending over and over the same data with > increasing timestamps. > For example it was sending data with an initial timestamp of today at 12:00 > until 15:00 and then 12:00 again over and over creating a cycle in the > timestamps. > > In our stream configuration we're using a timestamp extractor that reads > the message from the timestamp json. > > Since this bugged producer was sending data in batches every few minutes > our stream was dying every few minutes with this error: > > [kafka-producer-network-thread | > my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1-0_0-producer] > ERROR org.apache.kafka.streams.processor.internals.RecordCollectorImpl - > task [0_0] Error sending record to topic > my-app-orders-pipeline-order-unique-emitter-changelog due to This exception > is raised by the broker if it could not locate the producer metadata > associated with the producerId in question. This could happen if, for > instance, the producer's records were deleted because their retention time > had elapsed. Once the last records of the producerId are removed, the > producer's metadata is removed from the broker, and future appends by the > producer will return this exception.; No more records will be sent and no > more offsets will be recorded for this task. Enable TRACE logging to view > failed record key and value. > org.apache.kafka.common.errors.UnknownProducerIdException: This exception > is raised by the broker if it could not locate the producer metadata > associated with the producerId in question. This could happen if, for > instance, the producer's records were deleted because their retention time > had elapsed. Once the last records of the producerId are removed, the > producer's metadata is removed from the broker, and future appends by the > producer will return this exception. > [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1] > ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks - > stream-thread > [my-app-orders-pipeline-d7426da3-93be-4ac0-84f9-a6850d776d8b-StreamThread-1] > Failed to commit stream task 0_0 due to the following error: > org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending > since an error caught with a previous record (key 59 value [B@16e88816 > timestamp 1568779124999) to topic > my-app-orders-pipeline-order-unique-emitter-changelog due to > org.apache.kafka.common.errors.UnknownProducerIdException: This exception > is raised by the broker if it could not locate the producer metadata > associated with the producerId in question. This could happen if, for > instance, the producer's records were deleted because their retention time > had elapsed. Once the last records of the producerId are removed, the > producer's metadata is removed from the broker, and future appends by the > producer will return this exception. > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:138) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:50) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:201) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1318) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:720) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:706) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:663) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:585) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:73) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:789) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:561) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:553) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:331) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238) > at java.lang.Thread.run(Thread.java:748) > > Sometimes the error referred the first store changelog in our stream, > sometimes another one. > I've found https://issues.apache.org/jira/browse/KAFKA-6817 this issue but > it doesn't seems to be related because: > - the timestamps used are from today so way within the 4 weeks retention > set for the changelog topic (which also uses compact as policy not delete) > - the stream correctly processed data from that key over and over so I > don't think the producer ID has expired multiple times in minutes > > Any help is very appreciated > > -- > Alessandro Tagliapietra >
signature.asc
Description: OpenPGP digital signature