Hi,

I recently switched my Kafka Streams 1.0.0 app to use exactly_once
semantics and since them my cluster has been stuck in rebalancing.  Is
there an explanation as to what is going on, or how I can resolve it?

I saw a similar issue discussed on the mailing list, but I don't know if a
ticket was created or there was a resolution.

http://mail-archives.apache.org/mod_mbox/kafka-users/201711.mbox/%3CCAKkfnUY0C311Yq%3Drt8kyna4cyucV8HbgWpiYj%3DfnYMt9%2BAb8Mw%40mail.gmail.com%3E

This is the exception I'm seeing:
2018-02-08 17:09:20,763 ERR [kafka-producer-network-thread |
dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4-0_414-producer]
Sender [Producer
clientId=dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4-0_414-producer,
transactionalId=dp-app-devel-dbress-0_414] Aborting producer batches due to
fatal error
org.apache.kafka.common.errors.ProducerFencedException: Producer attempted
an operation with an old epoch. Either there is a newer producer with the
same transactionalId, or the producer's transaction has been expired by the
broker.
2018-02-08 17:09:20,764 ERR
[dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4]
ProcessorStateManager task [0_414] Failed to flush state store
summarykey-to-summary:
org.apache.kafka.common.KafkaException: Cannot perform send because at
least one previous transactional or idempotent request has failed with
errors.
at
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278)
at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
at
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
at
org.apache.kafka.streams.state.internals.StoreChangeFlushingLogger.flush(StoreChangeFlushingLogger.java:92)
at
org.apache.kafka.streams.state.internals.InMemoryKeyValueFlushingLoggedStore.flush(InMemoryKeyValueFlushingLoggedStore.java:139)
at
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:126)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
at
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
at
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324)
at
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304)
at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299)
at
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289)
at
org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451)
at
org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380)
at
org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309)
at
org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)

Reply via email to