[ https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax resolved KAFKA-10247. ------------------------------------- Fix Version/s: 2.6.0 Resolution: Fixed > Streams may attempt to process after closing a task > --------------------------------------------------- > > Key: KAFKA-10247 > URL: https://issues.apache.org/jira/browse/KAFKA-10247 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0 > Reporter: John Roesler > Assignee: John Roesler > Priority: Blocker > Fix For: 2.6.0 > > > Observed in a system test. A corrupted task was detected, and Stream properly > closed it as dirty: > {code:java} > [2020-07-08 17:08:09,345] WARN stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered > org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records > from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it > is likely that the consumer's position has fallen out of the topic partition > offset range because the topic was truncated or compacted on the broker, > marking the corresponding tasks as corrupted and re-initializing it later. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position > FetchPosition{offset=1, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: > null)], epoch=0}} is out of range for partition > SmokeTest-cntStoreName-changelog-1 > at > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344) > at > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) > [2020-07-08 17:08:09,345] WARN stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the > states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. > Will close the task as dirty and re-create and bootstrap from scratch. > (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs > {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to > be re-initialized > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) > Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch > position FetchPosition{offset=1, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: > null)], epoch=0}} is out of range for partition > SmokeTest-cntStoreName-changelog-1 > at > org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344) > at > org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433) > ... 3 more > [2020-07-08 17:08:09,346] INFO stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] > Suspended running (org.apache.kafka.streams.processor.internals.StreamTask) > [2020-07-08 17:08:09,346] DEBUG stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] > Closing its state manager and all the registered state stores: > {sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 : > SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null, > cntStoreName=StateStoreMetadata (cntStoreName : > SmokeTest-cntStoreName-changelog-1 @ 0} > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > [2020-07-08 17:08:09,346] INFO [Consumer > clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer, > groupId=null] Subscribed to partition(s): > SmokeTest-minStoreName-changelog-1, SmokeTest-minStoreName-changelog-2, > SmokeTest-sum-STATE-STORE-0000000050-changelog-0, > SmokeTest-minStoreName-changelog-3, > SmokeTest-sum-STATE-STORE-0000000050-changelog-2, > SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0, > SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2, > SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4 > (org.apache.kafka.clients.consumer.KafkaConsumer) > [2020-07-08 17:08:09,348] DEBUG stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released > state dir lock for task 2_1 > (org.apache.kafka.streams.processor.internals.StateDirectory) > [2020-07-08 17:08:09,348] INFO stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] > Closing record collector dirty > (org.apache.kafka.streams.processor.internals.RecordCollectorImpl) > [2020-07-08 17:08:09,348] INFO stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] > Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask){code} > However, there were already records buffered for it, so later on in the same > processing loop, Streams tried to process that task, resulting in an > IllegalStateException: > {code:java} > [2020-07-08 17:08:09,352] ERROR stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to > process stream task 2_1 due to the following error: > (org.apache.kafka.streams.processor.internals.TaskManager) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > cntStoreName is currently closed. > at > org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40) > at > org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851) > at > org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62) > at > org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) > [2020-07-08 17:08:09,352] ERROR stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered > the following exception during processing and the thread is going to shut > down: (org.apache.kafka.streams.processor.internals.StreamThread) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > cntStoreName is currently closed. > at > org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40) > at > org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851) > at > org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62) > at > org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507) > [2020-07-08 17:08:09,352] INFO stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State > transition from RUNNING to PENDING_SHUTDOWN > (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-07-08 17:08:09,352] INFO stream-thread > [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down > (org.apache.kafka.streams.processor.internals.StreamThread){code} > Which caused the entire thread to shut down. > > Instead, we should not attempt to process tasks that are not running. -- This message was sent by Atlassian Jira (v8.3.4#803005)