[ https://issues.apache.org/jira/browse/KAFKA-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16985429#comment-16985429 ]
John Roesler commented on KAFKA-9231: ------------------------------------- In addition to the error I listed above (ProducerFencedException), I have also observed the following recoverable exceptions leading to threads dying: UnknownProducerIdException: {noformat} org.apache.kafka.streams.errors.StreamsException: task [1_0] Abort sending since an error caught with a previous record (timestamp 1574960233670) to topic stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-0000000019-changelog due to org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:143) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:51) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:202) at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:562) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:554) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. {noformat} and an internal assertion: {noformat} java.lang.IllegalStateException: RocksDB metrics recorder for store "KSTREAM-AGGREGATE-STATE-STORE-0000000049" of task 3_1 has already been added. This is a bug in Kafka Streams. at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecordingTrigger.addMetricsRecorder(RocksDBMetricsRecordingTrigger.java:30) at org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder.addStatistics(RocksDBMetricsRecorder.java:93) at org.apache.kafka.streams.state.internals.RocksDBStore.maybeSetUpMetricsRecorder(RocksDBStore.java:205) at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:191) at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:227) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:44) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:58) at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:203) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356) at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:201) at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:211) at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:323) at org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:76) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:385) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:769) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) [2019-11-29 08:27:08,883] INFO [stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] stream-thread [stream-soak-test-a0363f9f-ff9c-4dbd-b38a-8e898d77a22e-StreamThread-2] State transition from PARTITIONS_ASSIGNED to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread) {noformat} This is a specific internal assertion. What happened there is that the RocksDBStore initializer registers itself with the metric collector before fully creating the store. The store creation failed, but it was still registered, leading to a situation where the store could no longer be created at all. My testing setup is to run a Streams application processing data at a modest rate (~ 20K records/sec) in a three-instance configuration, each of which have three StreamsThreads. I'm introducing network partitions separating each instance one at a time from the brokers, at an incidence of about one network partition per hour, each lasting up to 5 minutes. I've observed all of these exceptions to kill Streams threads within a few occurrences of network partitioning. Note that Streams is configured with appropriate timeouts/retries to tolerate network interruptions lasting longer than 5 mintues, so any thread deaths are unexpected. All the thread deaths I've observed are the results of bugs in the Streams exception handling code. There are two main categories of exception: 1. ProducerFencedException and UnknownProducerIdException . While they reflect different root causes, both of these are expected with EOS enabled, if the producer is silent for too long and ceases to be considered a valid member by the broker. Streams is supposed to handle this situation by rejoining the group (which includes discarding and re-creating its Producers). These were uncovered by repeated rebalances ultimately caused by the injected network partitions. 2. IllegalStateException. A specific internal assertion revealed buggy store initialization logic. This was also uncovered by repeated rebalances ultimately caused by the injected network partitions. I have addressed all of these bugs in my PR https://github.com/apache/kafka/pull/7748 I'm proposing to consider this a Blocker for the 2.4.0 release. It is both severe (the exceptions above have reliably caused my Streams cluster to die completely within about 12-24 hours), and it is a regression. To determine the latter claim, I ran the exact same workload with the exact same scenario using Kafka Streams 2.3. It should be noted that I still observed threads to die on that branch, but *only* due to the UnknownProducerId exception. So, there is some overlap with what I'm seeing on 2.4, but aside from that one cause, the fact that Streams is losing threads at a high rate from both ProducerFencedExceptions and its own internal assertion (IllegalStateException) leads me to think that Streams would be less stable in production using EOS on 2.4 than it was on 2.3. For completeness, note that I've run the same test on 2.4 and 2.3 _without_ EOS, and Streams is quite stable. Also note, that the exceptions killing my applications seem to be directly caused by frequent rebalances and network interruptions. For users running EOS Streams in reliable and stable conditions, I do not expect them to suffer thread deaths. I know that everyone is waiting for the much-delayed 2.4.0 release, so I'm not taking a hard stance on it, but from where I'm sitting, the situation seems to warrant a new RC once the fix is prepared. Also note, a new RC was just announced this morning, so assuming I can merge my PR on Monday, we're only setting the release back a couple of extra days. > Streams Threads may die from recoverable errors with EOS enabled > ---------------------------------------------------------------- > > Key: KAFKA-9231 > URL: https://issues.apache.org/jira/browse/KAFKA-9231 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0 > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > While testing Streams in EOS mode under frequent and heavy network > partitions, I've encountered the following error, leading to thread death: > {noformat} > [2019-11-26 04:54:02,650] ERROR > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.StreamsException: stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Failed > to rebalance. > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671) > Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] failed > to suspend stream tasks > at > org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707) > at > org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073) > at > org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710) > ... 1 more > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task > [1_1] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279) > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:175) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:581) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:535) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:660) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:628) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:145) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128) > at > org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246) > ... 7 more > Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer > attempted an operation with an old epoch. Either there is a newer producer > with the same transactionalId, or the producer's transaction has been expired > by the broker. > [2019-11-26 04:54:02,650] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State > transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > [2019-11-26 04:54:02,650] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > Shutting down (org.apache.kafka.streams.processor.internals.StreamThread) > [2019-11-26 04:54:02,650] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > [Consumer > clientId=stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > (org.apache.kafka.clients.consumer.KafkaConsumer) > [2019-11-26 04:54:02,653] INFO > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] > stream-thread > [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State > transition from PENDING_SHUTDOWN to DEAD > (org.apache.kafka.streams.processor.internals.StreamThread) > {noformat} > Elsewhere in the code, we catch ProducerFencedExceptions and trigger a > rebalance instead of killing the thread. It seems like one possible avenue > has slipped through the cracks. -- This message was sent by Atlassian Jira (v8.3.4#803005)