[ 
https://issues.apache.org/jira/browse/KAFKA-10247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10247.
-------------------------------------
    Fix Version/s: 2.6.0
       Resolution: Fixed

> Streams may attempt to process after closing a task
> ---------------------------------------------------
>
>                 Key: KAFKA-10247
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10247
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Blocker
>             Fix For: 2.6.0
>
>
> Observed in a system test. A corrupted task was detected, and Stream properly 
> closed it as dirty:
> {code:java}
> [2020-07-08 17:08:09,345] WARN stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered 
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records 
> from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it 
> is likely that the consumer's position has fallen out of the topic partition 
> offset range because the topic was truncated or compacted on the broker, 
> marking the corresponding tasks as corrupted and re-initializing it later. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader)
> org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position 
> FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition 
> SmokeTest-cntStoreName-changelog-1
>    at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>    at 
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>    at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>    at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,345] WARN stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the 
> states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. 
> Will close the task as dirty and re-create and bootstrap from scratch. 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
> {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to 
> be re-initialized
>    at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch 
> position FetchPosition{offset=1, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: 
> null)], epoch=0}} is out of range for partition 
> SmokeTest-cntStoreName-changelog-1
>    at 
> org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
>    at 
> org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
>    at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>    at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
>    at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
>    ... 3 more
> [2020-07-08 17:08:09,346] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
> [2020-07-08 17:08:09,346] DEBUG stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Closing its state manager and all the registered state stores: 
> {sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 : 
> SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null, 
> cntStoreName=StateStoreMetadata (cntStoreName : 
> SmokeTest-cntStoreName-changelog-1 @ 0} 
> (org.apache.kafka.streams.processor.internals.ProcessorStateManager)
> [2020-07-08 17:08:09,346] INFO [Consumer 
> clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer,
>  groupId=null] Subscribed to partition(s): 
> SmokeTest-minStoreName-changelog-1, SmokeTest-minStoreName-changelog-2, 
> SmokeTest-sum-STATE-STORE-0000000050-changelog-0, 
> SmokeTest-minStoreName-changelog-3, 
> SmokeTest-sum-STATE-STORE-0000000050-changelog-2, 
> SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0, 
> SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2, 
> SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4 
> (org.apache.kafka.clients.consumer.KafkaConsumer)
> [2020-07-08 17:08:09,348] DEBUG stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released 
> state dir lock for task 2_1 
> (org.apache.kafka.streams.processor.internals.StateDirectory)
> [2020-07-08 17:08:09,348] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Closing record collector dirty 
> (org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
> [2020-07-08 17:08:09,348] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
> Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask){code}
> However, there were already records buffered for it, so later on in the same 
> processing loop, Streams tried to process that task, resulting in an 
> IllegalStateException:
> {code:java}
> [2020-07-08 17:08:09,352] ERROR stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to 
> process stream task 2_1 due to the following error: 
> (org.apache.kafka.streams.processor.internals.TaskManager)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> cntStoreName is currently closed.
>    at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
>    at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
>    at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
>    at 
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
>    at 
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
>    at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>    at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
>    at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
>    at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,352] ERROR stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered 
> the following exception during processing and the thread is going to shut 
> down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.InvalidStateStoreException: Store 
> cntStoreName is currently closed.
>    at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
>    at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
>    at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
>    at 
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
>    at 
> org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
>    at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>    at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
>    at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
>    at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
>    at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>    at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
>    at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
>    at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
> [2020-07-08 17:08:09,352] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State 
> transition from RUNNING to PENDING_SHUTDOWN 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-07-08 17:08:09,352] INFO stream-thread 
> [SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down 
> (org.apache.kafka.streams.processor.internals.StreamThread){code}
> Which caused the entire thread to shut down.
>  
> Instead, we should not attempt to process tasks that are not running.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to