Hi, I recently switched my Kafka Streams 1.0.0 app to use exactly_once semantics and since them my cluster has been stuck in rebalancing. Is there an explanation as to what is going on, or how I can resolve it?
I saw a similar issue discussed on the mailing list, but I don't know if a ticket was created or there was a resolution. http://mail-archives.apache.org/mod_mbox/kafka-users/201711.mbox/%3CCAKkfnUY0C311Yq%3Drt8kyna4cyucV8HbgWpiYj%3DfnYMt9%2BAb8Mw%40mail.gmail.com%3E This is the exception I'm seeing: 2018-02-08 17:09:20,763 ERR [kafka-producer-network-thread | dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4-0_414-producer] Sender [Producer clientId=dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4-0_414-producer, transactionalId=dp-app-devel-dbress-0_414] Aborting producer batches due to fatal error org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. 2018-02-08 17:09:20,764 ERR [dp-app-devel-dbress-a92a93de-c1b1-4655-b487-0e9f3f3f3409-StreamThread-4] ProcessorStateManager task [0_414] Failed to flush state store summarykey-to-summary: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:278) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:263) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:804) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100) at org.apache.kafka.streams.state.internals.StoreChangeFlushingLogger.flush(StoreChangeFlushingLogger.java:92) at org.apache.kafka.streams.state.internals.InMemoryKeyValueFlushingLoggedStore.flush(InMemoryKeyValueFlushingLoggedStore.java:139) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:268) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:126) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)