During a rebalance triggered by kafka-coordinator-heartbeat-thread losing 
connection to ‘Group coordinator’, we noticed that a stream thread is shutting 
down when it catches a ProducerFencedExcpetion while flushing the state store.
This also causes the stream-state on that node to be stuck in ‘REBALANCING’ 
state, even though the partitions have been rebalanced to other threads across 
nodes.
During rebalance there seems to be a race condition between flushState on one 
node vs ProducerId creation on other node for the same partition. If the 
flushState is slower than the other it encounters ProducerFencedException.

It would be nice if Kafka streams can handle this exception gracefully and not 
shutdown the thread, so that we don’t end up with uneven number of threads 
across nodes. 
Can you guys please suggest any work arounds for this situation?

Thanks
Aravind


[2018-09-26T15:39:54,662Z]  [ERROR]  [kafka-producer-network-thread | 
upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_55-producer]
  [o.a.k.c.producer.internals.Sender]  [Producer 
clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_55-producer,
 transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer batches 
due to fatal error
[2018-09-26T15:39:54,665Z]  [ERROR]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.i.ProcessorStateManager]  task [0_55] Failed to flush state store 
upsert-store:
org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort sending 
since an error caught with a previous record (key 
de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp 1537976392104) 
to topic upsert-merger-stream-oa43-1-upsert-store-changelog due to Cannot 
perform send because at least one previous transactional or idempotent request 
has failed with errors.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:197)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:105)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
        at 
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:243)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:195)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:339)
        at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:312)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:440)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:422)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:182)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:147)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because 
at least one previous transactional or idempotent request has failed with 
errors.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:279)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:264)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:828)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:153)
        ... 34 common frames omitted
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
[2018-09-26T15:39:54,665Z]  [ERROR]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.i.AssignedStreamsTasks]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 Suspending stream task 0_55 failed due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_55] Failed to 
flush state store upsert-store
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:246)
        at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:195)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:339)
        at 
org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:312)
        at 
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:440)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:422)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:182)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:147)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort 
sending since an error caught with a previous record (key 
de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp 1537976392104) 
to topic upsert-merger-stream-oa43-1-upsert-store-changelog due to Cannot 
perform send because at least one previous transactional or idempotent request 
has failed with errors.
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:197)
        at 
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
        at 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:29)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:105)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142)
        at 
org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100)
        at 
org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
        at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
        at 
org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267)
        at 
org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
        at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:243)
        ... 21 common frames omitted
Caused by: org.apache.kafka.common.KafkaException: Cannot perform send because 
at least one previous transactional or idempotent request has failed with 
errors.
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.failIfNotReadyForSend(TransactionManager.java:279)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartitionToTransaction(TransactionManager.java:264)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:828)
        at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784)
        at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:153)
        ... 34 common frames omitted
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
[2018-09-26T15:39:54,666Z]  [WARN ]  [kafka-producer-network-thread | 
upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_55-producer]
  [o.a.k.s.p.i.RecordCollectorImpl]  task [0_55] Error sending record (key 
5a07e803-9323-457c-8e1d-29c0b0bc0fa9 value [0, 0, 1, 102, 22, -119, 88, -32, 0, 
0, 0, -95, 97, 112, 112, 100, 121, 110, 97, 109, 105, 99, 115, 95, 101, 101, 
101, 49, 100, 52, 102, 56, 45, 54, 55, 97, 50, 45, 52, 57, 56, 101, 45, 97, 55, 
50, 53, 45, 52, 55, 101, 50, 57, 56, 48, 51, 56, 50, 50, 101, -62, -79, 98, 
105, 122, 95, 116, 120, 110, 95, 118, 49, -62, -79, 57, 54, 53, 49, 48, 100, 
50, 52, -62, -79, 48, -62, -79, 115, 101, 103, 109, 101, 110, 116, 115, -62, 
-79, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73, 68, -62, -79, 53, 97, 48, 
55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 
100, 45, 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, -62, -79, 102, 97, 
108, 115, 101, -62, -79, 116, 114, 117, 101, -62, -79, 102, 97, 108, 115, 101, 
-62, -79, 102, 97, 108, 115, 101, 123, 10, 34, 101, 118, 101, 110, 116, 84, 
105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 
45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 
48, 34, 44, 10, 34, 115, 101, 103, 109, 101, 110, 116, 115, 34, 58, 91, 123, 
10, 34, 115, 101, 103, 109, 101, 110, 116, 84, 105, 109, 101, 115, 116, 97, 
109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54, 84, 49, 53, 58, 
51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48, 34, 44, 10, 34, 117, 
115, 101, 114, 68, 97, 116, 97, 34, 58, 123, 10, 34, 65, 99, 99, 111, 117, 110, 
116, 78, 97, 109, 101, 34, 58, 34, 112, 114, 105, 118, 97, 108, 105, 97, 118, 
101, 110, 116, 97, 100, 105, 114, 101, 99, 116, 97, 115, 97, 45, 97, 48, 113, 
51, 52, 48, 48, 48, 48, 48, 100, 106, 115, 118, 52, 101, 97, 100, 34, 44, 34, 
69, 115, 73, 110, 100, 101, 120, 67, 108, 117, 115, 116, 101, 114, 34, 58, 34, 
112, 114, 100, 52, 52, 45, 49, 50, 34, 10, 125, 44, 10, 34, 116, 105, 101, 114, 
34, 58, 34, 97, 112, 105, 95, 112, 114, 100, 52, 53, 50, 34, 44, 34, 116, 105, 
101, 114, 73, 100, 34, 58, 34, 51, 57, 57, 52, 57, 50, 55, 49, 34, 44, 34, 110, 
111, 100, 101, 34, 58, 34, 97, 112, 105, 118, 49, 45, 48, 49, 52, 45, 112, 114, 
100, 52, 53, 50, 34, 44, 34, 110, 111, 100, 101, 73, 100, 34, 58, 34, 49, 57, 
52, 54, 57, 54, 53, 55, 48, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 69, 
120, 112, 101, 114, 105, 101, 110, 99, 101, 34, 58, 34, 78, 79, 82, 77, 65, 76, 
34, 44, 34, 101, 110, 116, 114, 121, 80, 111, 105, 110, 116, 34, 58, 116, 114, 
117, 101, 44, 34, 117, 110, 105, 113, 117, 101, 83, 101, 103, 109, 101, 110, 
116, 73, 100, 34, 58, 53, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 
111, 110, 84, 105, 109, 101, 34, 58, 49, 49, 10, 125, 93, 44, 10, 34, 97, 112, 
112, 108, 105, 99, 97, 116, 105, 111, 110, 34, 58, 34, 112, 114, 100, 52, 52, 
45, 97, 110, 97, 108, 121, 116, 105, 99, 115, 34, 44, 34, 97, 112, 112, 108, 
105, 99, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 53, 49, 48, 54, 34, 44, 
34, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73, 68, 34, 58, 34, 53, 97, 48, 
55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 
100, 45, 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, 34, 44, 34, 116, 114, 
97, 110, 115, 97, 99, 116, 105, 111, 110, 78, 97, 109, 101, 34, 58, 34, 97, 
112, 105, 95, 118, 51, 46, 92, 47, 118, 51, 92, 47, 101, 118, 101, 110, 116, 
115, 92, 47, 66, 114, 111, 119, 115, 101, 114, 82, 101, 99, 111, 114, 100, 92, 
47, 101, 118, 101, 110, 116, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 
105, 111, 110, 73, 100, 34, 58, 56, 48, 50, 54, 50, 56, 10, 125] timestamp 
1537976391822) to topic upsert-merger-stream-oa43-1-upsert-store-changelog due 
to Producer attempted an operation with an old epoch. Either there is a newer 
producer with the same transactionalId, or the producer's transaction has been 
expired by the broker.; No more records will be sent and no more offsets will 
be recorded for this task.                                                      
                                                             

[2018-09-26T15:39:54,784Z]  [ERROR]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.i.AssignedStreamsTasks]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 After suspending failed, closing the same stream task 0_55 failed again due to 
the following error:
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
        at 
org.apache.kafka.clients.producer.internals.TransactionManager.beginAbort(TransactionManager.java:229)
        at 
org.apache.kafka.clients.producer.KafkaProducer.abortTransaction(KafkaProducer.java:660)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.closeSuspended(StreamTask.java:493)
        at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:553)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.suspendTasks(AssignedTasks.java:192)
        at 
org.apache.kafka.streams.processor.internals.AssignedTasks.suspend(AssignedTasks.java:147)
        at 
org.apache.kafka.streams.processor.internals.TaskManager.suspendTasksAndState(TaskManager.java:242)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:291)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1149)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:831)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:788)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.


[2018-09-26T15:39:54,801Z]  [ERROR]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.internals.StreamThread]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 Error caught during partition revocation, will abort the current process and 
re-throw at the end of rebalance: stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 failed to suspend stream tasks
[2018-09-26T15:39:54,801Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.internals.StreamThread]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 partition revocation took 156 ms.
        suspended active tasks: [0_55]
        suspended standby tasks: [0_50]
[2018-09-26T15:39:54,801Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.c.c.i.AbstractCoordinator]  [Consumer 
clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-consumer,
 groupId=upsert-merger-stream-oa43-1] (Re-)joining group

[2018-09-26T15:39:56,277Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.c.c.i.AbstractCoordinator]  [Consumer 
clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-consumer,
 groupId=upsert-merger-stream-oa43-1] Successfully joined group with generation 
113

[2018-09-26T15:39:56,278Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.c.c.i.ConsumerCoordinator]  [Consumer 
clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-consumer,
 groupId=upsert-merger-stream-oa43-1] Setting newly assigned partitions 
[oa43-1-event-upsert-48]
[2018-09-26T15:39:56,278Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15]
  [o.a.k.c.c.i.ConsumerCoordinator]  [Consumer 
clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15-consumer,
 groupId=upsert-merger-stream-oa43-1] Setting newly assigned partitions 
[oa43-1-event-upsert-65]
[2018-09-26T15:39:56,278Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.internals.StreamThread]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED

[2018-09-26T15:39:56,495Z]  [INFO ]  [kafka-producer-network-thread | 
upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_48-producer]
  [o.a.k.c.p.i.TransactionManager]  [Producer 
clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16-0_48-producer,
 transactionalId=upsert-merger-stream-oa43-1-0_48] ProducerId set to 13 with 
epoch 81
[2018-09-26T15:39:56,495Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.internals.StreamThread]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 partition assignment took 217 ms.
        current active tasks: [0_48]
        current standby tasks: [0_49]
        previous active tasks: [0_55]

[2018-09-26T15:39:56,496Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.internals.StreamThread]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN
[2018-09-26T15:39:56,496Z]  [INFO ]  
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
  [o.a.k.s.p.internals.StreamThread]  stream-thread 
[upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16]
 Shutting down





Reply via email to