During a rebalance triggered by kafka-coordinator-heartbeat-thread losing connection to ‘Group coordinator’, we noticed that a stream thread is shutting down when it catches a ProducerFencedExcpetion while flushing the state store. This also causes the stream-state on that node to be stuck in ‘REBALANCING’ state, even though the partitions have been rebalanced to other threads across nodes. During rebalance there seems to be a race condition between flushState on one node vs ProducerId creation on other node for the same partition. If the flushState is slower than the other it encounters ProducerFencedException.
It would be nice if Kafka streams can handle this exception gracefully and not shutdown the thread, so that we don’t end up with uneven number of threads across nodes. Can you guys please suggest any work arounds for this situation? Thanks Aravind [2018-09-26T15:39:54,662Z] [ERROR] [kafka-producer-network-thread | upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_55-producer] [o.a.k.c.producer.internals.Sender] [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_55-producer, transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer batches due to fatal error [2018-09-26T15:39:54,665Z] [ERROR] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.i.ProcessorStateManager] task [0_55] Failed to flush state store upsert-store: org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort sending since an error caught with a previous record (key de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp 1537976392104) to topic upsert-merger-stream-oa43-1-upsert-store-changelog due to Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:197) at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:105) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:243) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:195) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:312) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:440) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:422) at org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:182) at org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:147) at org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719) Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:279) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:264) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:828) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:153) ... 34 common frames omitted Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. [2018-09-26T15:39:54,665Z] [ERROR] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.i.AssignedStreamsTasks] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] Suspending stream task 0_55 failed due to the following error: org.apache.kafka.streams.errors.ProcessorStateException: task [0_55] Failed to flush state store upsert-store at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:246) at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:195) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:339) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:312) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:440) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:422) at org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:182) at org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:147) at org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719) Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort sending since an error caught with a previous record (key de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp 1537976392104) to topic upsert-merger-stream-oa43-1-upsert-store-changelog due to Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:197) at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:105) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123) at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267) at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149) at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:243) ... 21 common frames omitted Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because at least one previous transactional or idempotent request has failed with errors. at org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:279) at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:264) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:828) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:153) ... 34 common frames omitted Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. [2018-09-26T15:39:54,666Z] [WARN ] [kafka-producer-network-thread | upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_55-producer] [o.a.k.s.p.i.RecordCollectorImpl] task [0_55] Error sending record (key 5a07e803-9323-457c-8e1d-29c0b0bc0fa9 value [0, 0, 1, 102, 22, -119, 88, -32, 0, 0, 0, -95, 97, 112, 112, 100, 121, 110, 97, 109, 105, 99, 115, 95, 101, 101, 101, 49, 100, 52, 102, 56, 45, 54, 55, 97, 50, 45, 52, 57, 56, 101, 45, 97, 55, 50, 53, 45, 52, 55, 101, 50, 57, 56, 48, 51, 56, 50, 50, 101, -62, -79, 98, 105, 122, 95, 116, 120, 110, 95, 118, 49, -62, -79, 57, 54, 53, 49, 48, 100, 50, 52, -62, -79, 48, -62, -79, 115, 101, 103, 109, 101, 110, 116, 115, -62, -79, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73, 68, -62, -79, 53, 97, 48, 55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 100, 45, 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, -62, -79, 102, 97, 108, 115, 101, -62, -79, 116, 114, 117, 101, -62, -79, 102, 97, 108, 115, 101, -62, -79, 102, 97, 108, 115, 101, 123, 10, 34, 101, 118, 101, 110, 116, 84, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48, 34, 44, 10, 34, 115, 101, 103, 109, 101, 110, 116, 115, 34, 58, 91, 123, 10, 34, 115, 101, 103, 109, 101, 110, 116, 84, 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48, 34, 44, 10, 34, 117, 115, 101, 114, 68, 97, 116, 97, 34, 58, 123, 10, 34, 65, 99, 99, 111, 117, 110, 116, 78, 97, 109, 101, 34, 58, 34, 112, 114, 105, 118, 97, 108, 105, 97, 118, 101, 110, 116, 97, 100, 105, 114, 101, 99, 116, 97, 115, 97, 45, 97, 48, 113, 51, 52, 48, 48, 48, 48, 48, 100, 106, 115, 118, 52, 101, 97, 100, 34, 44, 34, 69, 115, 73, 110, 100, 101, 120, 67, 108, 117, 115, 116, 101, 114, 34, 58, 34, 112, 114, 100, 52, 52, 45, 49, 50, 34, 10, 125, 44, 10, 34, 116, 105, 101, 114, 34, 58, 34, 97, 112, 105, 95, 112, 114, 100, 52, 53, 50, 34, 44, 34, 116, 105, 101, 114, 73, 100, 34, 58, 34, 51, 57, 57, 52, 57, 50, 55, 49, 34, 44, 34, 110, 111, 100, 101, 34, 58, 34, 97, 112, 105, 118, 49, 45, 48, 49, 52, 45, 112, 114, 100, 52, 53, 50, 34, 44, 34, 110, 111, 100, 101, 73, 100, 34, 58, 34, 49, 57, 52, 54, 57, 54, 53, 55, 48, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 69, 120, 112, 101, 114, 105, 101, 110, 99, 101, 34, 58, 34, 78, 79, 82, 77, 65, 76, 34, 44, 34, 101, 110, 116, 114, 121, 80, 111, 105, 110, 116, 34, 58, 116, 114, 117, 101, 44, 34, 117, 110, 105, 113, 117, 101, 83, 101, 103, 109, 101, 110, 116, 73, 100, 34, 58, 53, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 84, 105, 109, 101, 34, 58, 49, 49, 10, 125, 93, 44, 10, 34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 34, 58, 34, 112, 114, 100, 52, 52, 45, 97, 110, 97, 108, 121, 116, 105, 99, 115, 34, 44, 34, 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 53, 49, 48, 54, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73, 68, 34, 58, 34, 53, 97, 48, 55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 100, 45, 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 78, 97, 109, 101, 34, 58, 34, 97, 112, 105, 95, 118, 51, 46, 92, 47, 118, 51, 92, 47, 101, 118, 101, 110, 116, 115, 92, 47, 66, 114, 111, 119, 115, 101, 114, 82, 101, 99, 111, 114, 100, 92, 47, 101, 118, 101, 110, 116, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 73, 100, 34, 58, 56, 48, 50, 54, 50, 56, 10, 125] timestamp 1537976391822) to topic upsert-merger-stream-oa43-1-upsert-store-changelog due to Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.; No more records will be sent and no more offsets will be recorded for this task. [2018-09-26T15:39:54,784Z] [ERROR] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.i.AssignedStreamsTasks] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] After suspending failed, closing the same stream task 0_55 failed again due to the following error: org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784) at org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229) at org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:660) at org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:553) at org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:192) at org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:147) at org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719) Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker. [2018-09-26T15:39:54,801Z] [ERROR] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.internals.StreamThread] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] Error caught during partition revocation, will abort the current process and re-throw at the end of rebalance: stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] failed to suspend stream tasks [2018-09-26T15:39:54,801Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.internals.StreamThread] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] partition revocation took 156 ms. suspended active tasks: [0_55] suspended standby tasks: [0_50] [2018-09-26T15:39:54,801Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-consumer, groupId=upsert-merger-stream-oa43-1] (Re-)joining group [2018-09-26T15:39:56,277Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.c.c.i.AbstractCoordinator] [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-consumer, groupId=upsert-merger-stream-oa43-1] Successfully joined group with generation 113 [2018-09-26T15:39:56,278Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-consumer, groupId=upsert-merger-stream-oa43-1] Setting newly assigned partitions [oa43-1-event-upsert-48] [2018-09-26T15:39:56,278Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15] [o.a.k.c.c.i.ConsumerCoordinator] [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15-consumer, groupId=upsert-merger-stream-oa43-1] Setting newly assigned partitions [oa43-1-event-upsert-65] [2018-09-26T15:39:56,278Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.internals.StreamThread] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED [2018-09-26T15:39:56,495Z] [INFO ] [kafka-producer-network-thread | upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_48-producer] [o.a.k.c.p.i.TransactionManager] [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_48-producer, transactionalId=upsert-merger-stream-oa43-1-0_48] ProducerId set to 13 with epoch 81 [2018-09-26T15:39:56,495Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.internals.StreamThread] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] partition assignment took 217 ms. current active tasks: [0_48] current standby tasks: [0_49] previous active tasks: [0_55] [2018-09-26T15:39:56,496Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.internals.StreamThread] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN [2018-09-26T15:39:56,496Z] [INFO ] [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.internals.StreamThread] stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] Shutting down