Could you take a look at https://issues.apache.org/jira/browse/KAFKA-7284 and see if you are hitting this?
Guozhang On Fri, Sep 28, 2018 at 5:22 PM, Aravind Dongara <adong...@yahoo.com.invalid > wrote: > Hi Guozhang > > Thanks for your reply. > We are using Kafka 1.1.1 > > Thanks > Aravind > > > > On Sep 28, 2018, at 4:45 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > > Hello Aravind, > > > > Which version of Kafka are you currently using? What you described seems > to > > be fixed in the latest version already, so I want to check if you are > using > > an older version and if yes, what's the best way to work around it. > > > > > > Guozhang > > > > > > On Thu, Sep 27, 2018 at 12:54 PM, Aravind Dongara < > > adong...@yahoo.com.invalid> wrote: > > > >> > >> During a rebalance triggered by kafka-coordinator-heartbeat-thread > losing > >> connection to ‘Group coordinator’, we noticed that a stream thread is > >> shutting down when it catches a ProducerFencedExcpetion while flushing > the > >> state store. > >> This also causes the stream-state on that node to be stuck in > >> ‘REBALANCING’ state, even though the partitions have been rebalanced to > >> other threads across nodes. > >> During rebalance there seems to be a race condition between flushState > on > >> one node vs ProducerId creation on other node for the same partition. If > >> the flushState is slower than the other it encounters > >> ProducerFencedException. > >> > >> It would be nice if Kafka streams can handle this exception gracefully > and > >> not shutdown the thread, so that we don’t end up with uneven number of > >> threads across nodes. > >> Can you guys please suggest any work arounds for this situation? > >> > >> Thanks > >> Aravind > >> > >> > >> [2018-09-26T15:39:54,662Z] [ERROR] [kafka-producer-network-thread | > >> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > >> 0a4b8d6753a2-StreamThread-16-0_55-producer] > [o.a.k.c.producer.internals.Sender] > >> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89- > >> a27a-0a4b8d6753a2-StreamThread-16-0_55-producer, > >> transactionalId=upsert-merger-stream-oa43-1-0_55] Aborting producer > >> batches due to fatal error > >> [2018-09-26T15:39:54,665Z] [ERROR] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.i. > ProcessorStateManager] > >> task [0_55] Failed to flush state store upsert-store: > >> org.apache.kafka.streams.errors.StreamsException: task [0_55] Abort > >> sending since an error caught with a previous record (key > >> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp > >> 1537976392104) to topic upsert-merger-stream-oa43-1- > upsert-store-changelog > >> due to Cannot perform send because at least one previous transactional > or > >> idempotent request has failed with errors. > >> at org.apache.kafka.streams.processor.internals. > >> RecordCollectorImpl.send(RecordCollectorImpl.java:197) > >> at org.apache.kafka.streams.state.internals. > >> StoreChangeLogger.logChange(StoreChangeLogger.java:59) > >> at org.apache.kafka.streams.state.internals. > >> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor > >> e.java:69) > >> at org.apache.kafka.streams.state.internals. > >> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor > >> e.java:29) > >> at org.apache.kafka.streams.state.internals. > CachingKeyValueStore. > >> putAndMaybeForward(CachingKeyValueStore.java:105) > >> at org.apache.kafka.streams.state.internals. > >> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) > >> at org.apache.kafka.streams.state.internals. > >> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) > >> at org.apache.kafka.streams.state.internals.NamedCache. > >> flush(NamedCache.java:142) > >> at org.apache.kafka.streams.state.internals.NamedCache. > >> flush(NamedCache.java:100) > >> at org.apache.kafka.streams.state.internals.ThreadCache. > >> flush(ThreadCache.java:127) > >> at org.apache.kafka.streams.state.internals. > >> CachingKeyValueStore.flush(CachingKeyValueStore.java:123) > >> at org.apache.kafka.streams.state.internals. > >> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267) > >> at org.apache.kafka.streams.state.internals. > >> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149) > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorStateManager.flush(ProcessorStateManager.java:243) > >> at org.apache.kafka.streams.processor.internals. > >> AbstractTask.flushState(AbstractTask.java:195) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.flushState(StreamTask.java:339) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask$1.run(StreamTask.java:312) > >> at org.apache.kafka.streams.processor.internals. > >> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.commit(StreamTask.java:307) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.suspend(StreamTask.java:440) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.suspend(StreamTask.java:422) > >> at org.apache.kafka.streams.processor.internals. > >> AssignedTasks.suspendTasks(AssignedTasks.java:182) > >> at org.apache.kafka.streams.processor.internals. > >> AssignedTasks.suspend(AssignedTasks.java:147) > >> at org.apache.kafka.streams.processor.internals.TaskManager. > >> suspendTasksAndState(TaskManager.java:242) > >> at org.apache.kafka.streams.processor.internals.StreamThread$ > >> RebalanceListener.onPartitionsRevoked(StreamThread.java:291) > >> at org.apache.kafka.clients.consumer.internals. > >> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) > >> at org.apache.kafka.clients.consumer.internals. > >> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) > >> at org.apache.kafka.clients.consumer.internals. > >> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > >> at org.apache.kafka.clients.consumer.internals. > >> ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > >> at org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1149) > >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:1115) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.pollRequests(StreamThread.java:831) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.runOnce(StreamThread.java:788) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.runLoop(StreamThread.java:749) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:719) > >> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send > >> because at least one previous transactional or idempotent request has > >> failed with errors. > >> at org.apache.kafka.clients.producer.internals. > TransactionManager. > >> failIfNotReadyForSend(TransactionManager.java:279) > >> at org.apache.kafka.clients.producer.internals. > TransactionManager. > >> maybeAddPartitionToTransaction(TransactionManager.java:264) > >> at org.apache.kafka.clients.producer.KafkaProducer.doSend( > >> KafkaProducer.java:828) > >> at org.apache.kafka.clients.producer.KafkaProducer.send( > >> KafkaProducer.java:784) > >> at org.apache.kafka.streams.processor.internals. > >> RecordCollectorImpl.send(RecordCollectorImpl.java:153) > >> ... 34 common frames omitted > >> Caused by: org.apache.kafka.common.errors.ProducerFencedException: > >> Producer attempted an operation with an old epoch. Either there is a > newer > >> producer with the same transactionalId, or the producer's transaction > has > >> been expired by the broker. > >> [2018-09-26T15:39:54,665Z] [ERROR] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.i. > AssignedStreamsTasks] > >> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> Suspending stream task 0_55 failed due to the following error: > >> org.apache.kafka.streams.errors.ProcessorStateException: task [0_55] > >> Failed to flush state store upsert-store > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorStateManager.flush(ProcessorStateManager.java:246) > >> at org.apache.kafka.streams.processor.internals. > >> AbstractTask.flushState(AbstractTask.java:195) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.flushState(StreamTask.java:339) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask$1.run(StreamTask.java:312) > >> at org.apache.kafka.streams.processor.internals. > >> StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:211) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.commit(StreamTask.java:307) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.suspend(StreamTask.java:440) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.suspend(StreamTask.java:422) > >> at org.apache.kafka.streams.processor.internals. > >> AssignedTasks.suspendTasks(AssignedTasks.java:182) > >> at org.apache.kafka.streams.processor.internals. > >> AssignedTasks.suspend(AssignedTasks.java:147) > >> at org.apache.kafka.streams.processor.internals.TaskManager. > >> suspendTasksAndState(TaskManager.java:242) > >> at org.apache.kafka.streams.processor.internals.StreamThread$ > >> RebalanceListener.onPartitionsRevoked(StreamThread.java:291) > >> at org.apache.kafka.clients.consumer.internals. > >> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) > >> at org.apache.kafka.clients.consumer.internals. > >> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) > >> at org.apache.kafka.clients.consumer.internals. > >> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > >> at org.apache.kafka.clients.consumer.internals. > >> ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > >> at org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1149) > >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:1115) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.pollRequests(StreamThread.java:831) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.runOnce(StreamThread.java:788) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.runLoop(StreamThread.java:749) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:719) > >> Caused by: org.apache.kafka.streams.errors.StreamsException: task > [0_55] > >> Abort sending since an error caught with a previous record (key > >> de796abe-d174-40ee-99fb-36695447402e value [B@79f4a0fd timestamp > >> 1537976392104) to topic upsert-merger-stream-oa43-1- > upsert-store-changelog > >> due to Cannot perform send because at least one previous transactional > or > >> idempotent request has failed with errors. > >> at org.apache.kafka.streams.processor.internals. > >> RecordCollectorImpl.send(RecordCollectorImpl.java:197) > >> at org.apache.kafka.streams.state.internals. > >> StoreChangeLogger.logChange(StoreChangeLogger.java:59) > >> at org.apache.kafka.streams.state.internals. > >> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor > >> e.java:69) > >> at org.apache.kafka.streams.state.internals. > >> ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStor > >> e.java:29) > >> at org.apache.kafka.streams.state.internals. > CachingKeyValueStore. > >> putAndMaybeForward(CachingKeyValueStore.java:105) > >> at org.apache.kafka.streams.state.internals. > >> CachingKeyValueStore.access$000(CachingKeyValueStore.java:38) > >> at org.apache.kafka.streams.state.internals. > >> CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83) > >> at org.apache.kafka.streams.state.internals.NamedCache. > >> flush(NamedCache.java:142) > >> at org.apache.kafka.streams.state.internals.NamedCache. > >> flush(NamedCache.java:100) > >> at org.apache.kafka.streams.state.internals.ThreadCache. > >> flush(ThreadCache.java:127) > >> at org.apache.kafka.streams.state.internals. > >> CachingKeyValueStore.flush(CachingKeyValueStore.java:123) > >> at org.apache.kafka.streams.state.internals. > >> InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:267) > >> at org.apache.kafka.streams.state.internals. > >> MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149) > >> at org.apache.kafka.streams.processor.internals. > >> ProcessorStateManager.flush(ProcessorStateManager.java:243) > >> ... 21 common frames omitted > >> Caused by: org.apache.kafka.common.KafkaException: Cannot perform send > >> because at least one previous transactional or idempotent request has > >> failed with errors. > >> at org.apache.kafka.clients.producer.internals. > TransactionManager. > >> failIfNotReadyForSend(TransactionManager.java:279) > >> at org.apache.kafka.clients.producer.internals. > TransactionManager. > >> maybeAddPartitionToTransaction(TransactionManager.java:264) > >> at org.apache.kafka.clients.producer.KafkaProducer.doSend( > >> KafkaProducer.java:828) > >> at org.apache.kafka.clients.producer.KafkaProducer.send( > >> KafkaProducer.java:784) > >> at org.apache.kafka.streams.processor.internals. > >> RecordCollectorImpl.send(RecordCollectorImpl.java:153) > >> ... 34 common frames omitted > >> Caused by: org.apache.kafka.common.errors.ProducerFencedException: > >> Producer attempted an operation with an old epoch. Either there is a > newer > >> producer with the same transactionalId, or the producer's transaction > has > >> been expired by the broker. > >> [2018-09-26T15:39:54,666Z] [WARN ] [kafka-producer-network-thread | > >> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > >> 0a4b8d6753a2-StreamThread-16-0_55-producer] [o.a.k.s.p.i. > RecordCollectorImpl] > >> task [0_55] Error sending record (key 5a07e803-9323-457c-8e1d- > 29c0b0bc0fa9 > >> value [0, 0, 1, 102, 22, -119, 88, -32, 0, 0, 0, -95, 97, 112, 112, 100, > >> 121, 110, 97, 109, 105, 99, 115, 95, 101, 101, 101, 49, 100, 52, 102, > 56, > >> 45, 54, 55, 97, 50, 45, 52, 57, 56, 101, 45, 97, 55, 50, 53, 45, 52, 55, > >> 101, 50, 57, 56, 48, 51, 56, 50, 50, 101, -62, -79, 98, 105, 122, 95, > 116, > >> 120, 110, 95, 118, 49, -62, -79, 57, 54, 53, 49, 48, 100, 50, 52, -62, > -79, > >> 48, -62, -79, 115, 101, 103, 109, 101, 110, 116, 115, -62, -79, 114, > 101, > >> 113, 117, 101, 115, 116, 71, 85, 73, 68, -62, -79, 53, 97, 48, 55, 101, > 56, > >> 48, 51, 45, 57, 51, 50, 51, 45, 52, 53, 55, 99, 45, 56, 101, 49, 100, > 45, > >> 50, 57, 99, 48, 98, 48, 98, 99, 48, 102, 97, 57, -62, -79, 102, 97, 108, > >> 115, 101, -62, -79, 116, 114, 117, 101, -62, -79, 102, 97, 108, 115, > 101, > >> -62, -79, 102, 97, 108, 115, 101, 123, 10, 34, 101, 118, 101, 110, 116, > 84, > >> 105, 109, 101, 115, 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, > 48, > >> 57, 45, 50, 54, 84, 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, > 48, > >> 48, 48, 48, 34, 44, 10, 34, 115, 101, 103, 109, 101, 110, 116, 115, 34, > 58, > >> 91, 123, 10, 34, 115, 101, 103, 109, 101, 110, 116, 84, 105, 109, 101, > 115, > >> 116, 97, 109, 112, 34, 58, 34, 50, 48, 49, 56, 45, 48, 57, 45, 50, 54, > 84, > >> 49, 53, 58, 51, 57, 58, 52, 52, 46, 49, 50, 50, 43, 48, 48, 48, 48, 34, > 44, > >> 10, 34, 117, 115, 101, 114, 68, 97, 116, 97, 34, 58, 123, 10, 34, 65, > 99, > >> 99, 111, 117, 110, 116, 78, 97, 109, 101, 34, 58, 34, 112, 114, 105, > 118, > >> 97, 108, 105, 97, 118, 101, 110, 116, 97, 100, 105, 114, 101, 99, 116, > 97, > >> 115, 97, 45, 97, 48, 113, 51, 52, 48, 48, 48, 48, 48, 100, 106, 115, > 118, > >> 52, 101, 97, 100, 34, 44, 34, 69, 115, 73, 110, 100, 101, 120, 67, 108, > >> 117, 115, 116, 101, 114, 34, 58, 34, 112, 114, 100, 52, 52, 45, 49, 50, > 34, > >> 10, 125, 44, 10, 34, 116, 105, 101, 114, 34, 58, 34, 97, 112, 105, 95, > 112, > >> 114, 100, 52, 53, 50, 34, 44, 34, 116, 105, 101, 114, 73, 100, 34, 58, > 34, > >> 51, 57, 57, 52, 57, 50, 55, 49, 34, 44, 34, 110, 111, 100, 101, 34, 58, > 34, > >> 97, 112, 105, 118, 49, 45, 48, 49, 52, 45, 112, 114, 100, 52, 53, 50, > 34, > >> 44, 34, 110, 111, 100, 101, 73, 100, 34, 58, 34, 49, 57, 52, 54, 57, 54, > >> 53, 55, 48, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 69, 120, 112, > >> 101, 114, 105, 101, 110, 99, 101, 34, 58, 34, 78, 79, 82, 77, 65, 76, > 34, > >> 44, 34, 101, 110, 116, 114, 121, 80, 111, 105, 110, 116, 34, 58, 116, > 114, > >> 117, 101, 44, 34, 117, 110, 105, 113, 117, 101, 83, 101, 103, 109, 101, > >> 110, 116, 73, 100, 34, 58, 53, 44, 34, 116, 114, 97, 110, 115, 97, 99, > 116, > >> 105, 111, 110, 84, 105, 109, 101, 34, 58, 49, 49, 10, 125, 93, 44, 10, > 34, > >> 97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 34, 58, 34, 112, > 114, > >> 100, 52, 52, 45, 97, 110, 97, 108, 121, 116, 105, 99, 115, 34, 44, 34, > 97, > >> 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 73, 100, 34, 58, 34, 53, > >> 49, 48, 54, 34, 44, 34, 114, 101, 113, 117, 101, 115, 116, 71, 85, 73, > 68, > >> 34, 58, 34, 53, 97, 48, 55, 101, 56, 48, 51, 45, 57, 51, 50, 51, 45, 52, > >> 53, 55, 99, 45, 56, 101, 49, 100, 45, 50, 57, 99, 48, 98, 48, 98, 99, > 48, > >> 102, 97, 57, 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, > >> 110, 78, 97, 109, 101, 34, 58, 34, 97, 112, 105, 95, 118, 51, 46, 92, > 47, > >> 118, 51, 92, 47, 101, 118, 101, 110, 116, 115, 92, 47, 66, 114, 111, > 119, > >> 115, 101, 114, 82, 101, 99, 111, 114, 100, 92, 47, 101, 118, 101, 110, > 116, > >> 34, 44, 34, 116, 114, 97, 110, 115, 97, 99, 116, 105, 111, 110, 73, 100, > >> 34, 58, 56, 48, 50, 54, 50, 56, 10, 125] timestamp 1537976391822) to > topic > >> upsert-merger-stream-oa43-1-upsert-store-changelog due to Producer > >> attempted an operation with an old epoch. Either there is a newer > producer > >> with the same transactionalId, or the producer's transaction has been > >> expired by the broker.; No more records will be sent and no more offsets > >> will be recorded for this task. > >> > >> > >> [2018-09-26T15:39:54,784Z] [ERROR] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.s.p.i. > AssignedStreamsTasks] > >> stream-thread [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> After suspending failed, closing the same stream task 0_55 failed again > due > >> to the following error: > >> org.apache.kafka.common.KafkaException: Cannot execute transactional > >> method because we are in an error state > >> at org.apache.kafka.clients.producer.internals. > TransactionManager. > >> maybeFailWithError(TransactionManager.java:784) > >> at org.apache.kafka.clients.producer.internals. > >> TransactionManager.beginAbort(TransactionManager.java:229) > >> at org.apache.kafka.clients.producer.KafkaProducer. > >> abortTransaction(KafkaProducer.java:660) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.closeSuspended(StreamTask.java:493) > >> at org.apache.kafka.streams.processor.internals. > >> StreamTask.close(StreamTask.java:553) > >> at org.apache.kafka.streams.processor.internals. > >> AssignedTasks.suspendTasks(AssignedTasks.java:192) > >> at org.apache.kafka.streams.processor.internals. > >> AssignedTasks.suspend(AssignedTasks.java:147) > >> at org.apache.kafka.streams.processor.internals.TaskManager. > >> suspendTasksAndState(TaskManager.java:242) > >> at org.apache.kafka.streams.processor.internals.StreamThread$ > >> RebalanceListener.onPartitionsRevoked(StreamThread.java:291) > >> at org.apache.kafka.clients.consumer.internals. > >> ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:414) > >> at org.apache.kafka.clients.consumer.internals. > >> AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359) > >> at org.apache.kafka.clients.consumer.internals. > >> AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) > >> at org.apache.kafka.clients.consumer.internals. > >> ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > >> at org.apache.kafka.clients.consumer.KafkaConsumer. > >> pollOnce(KafkaConsumer.java:1149) > >> at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > >> KafkaConsumer.java:1115) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.pollRequests(StreamThread.java:831) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.runOnce(StreamThread.java:788) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.runLoop(StreamThread.java:749) > >> at org.apache.kafka.streams.processor.internals. > >> StreamThread.run(StreamThread.java:719) > >> Caused by: org.apache.kafka.common.errors.ProducerFencedException: > >> Producer attempted an operation with an old epoch. Either there is a > newer > >> producer with the same transactionalId, or the producer's transaction > has > >> been expired by the broker. > >> > >> > >> [2018-09-26T15:39:54,801Z] [ERROR] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] > >> [o.a.k.s.p.internals.StreamThread] stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> Error caught during partition revocation, will abort the current process > >> and re-throw at the end of rebalance: stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> failed to suspend stream tasks > >> [2018-09-26T15:39:54,801Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] > >> [o.a.k.s.p.internals.StreamThread] stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> partition revocation took 156 ms. > >> suspended active tasks: [0_55] > >> suspended standby tasks: [0_50] > >> [2018-09-26T15:39:54,801Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.c.c.i. > AbstractCoordinator] > >> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89- > >> a27a-0a4b8d6753a2-StreamThread-16-consumer, > groupId=upsert-merger-stream-oa43-1] > >> (Re-)joining group > >> > >> [2018-09-26T15:39:56,277Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.c.c.i. > AbstractCoordinator] > >> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89- > >> a27a-0a4b8d6753a2-StreamThread-16-consumer, > groupId=upsert-merger-stream-oa43-1] > >> Successfully joined group with generation 113 > >> > >> [2018-09-26T15:39:56,278Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] [o.a.k.c.c.i. > ConsumerCoordinator] > >> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89- > >> a27a-0a4b8d6753a2-StreamThread-16-consumer, > groupId=upsert-merger-stream-oa43-1] > >> Setting newly assigned partitions [oa43-1-event-upsert-48] > >> [2018-09-26T15:39:56,278Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-15] [o.a.k.c.c.i. > ConsumerCoordinator] > >> [Consumer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89- > >> a27a-0a4b8d6753a2-StreamThread-15-consumer, > groupId=upsert-merger-stream-oa43-1] > >> Setting newly assigned partitions [oa43-1-event-upsert-65] > >> [2018-09-26T15:39:56,278Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] > >> [o.a.k.s.p.internals.StreamThread] stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > >> > >> [2018-09-26T15:39:56,495Z] [INFO ] [kafka-producer-network-thread | > >> upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > >> 0a4b8d6753a2-StreamThread-16-0_48-producer] [o.a.k.c.p.i. > TransactionManager] > >> [Producer clientId=upsert-merger-stream-oa43-1-53c3713a-443f-4b89- > >> a27a-0a4b8d6753a2-StreamThread-16-0_48-producer, > >> transactionalId=upsert-merger-stream-oa43-1-0_48] ProducerId set to 13 > >> with epoch 81 > >> [2018-09-26T15:39:56,495Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] > >> [o.a.k.s.p.internals.StreamThread] stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> partition assignment took 217 ms. > >> current active tasks: [0_48] > >> current standby tasks: [0_49] > >> previous active tasks: [0_55] > >> > >> [2018-09-26T15:39:56,496Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] > >> [o.a.k.s.p.internals.StreamThread] stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN > >> [2018-09-26T15:39:56,496Z] [INFO ] [upsert-merger-stream-oa43-1- > >> 53c3713a-443f-4b89-a27a-0a4b8d6753a2-StreamThread-16] > >> [o.a.k.s.p.internals.StreamThread] stream-thread > >> [upsert-merger-stream-oa43-1-53c3713a-443f-4b89-a27a- > 0a4b8d6753a2-StreamThread-16] > >> Shutting down > >> > >> > >> > >> > >> > >> > > > > > > -- > > -- Guozhang > > -- -- Guozhang