John Roesler created KAFKA-10247:
------------------------------------

             Summary: Streams may attempt to process after closing a task
                 Key: KAFKA-10247
                 URL: https://issues.apache.org/jira/browse/KAFKA-10247
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.6.0
            Reporter: John Roesler
            Assignee: John Roesler


Observed in a system test. A corrupted task was detected, and Stream properly 
closed it as dirty:
{code:java}
[2020-07-08 17:08:09,345] WARN stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered 
org.apache.kafka.clients.consumer.OffsetOutOfRangeException fetching records 
from restore consumer for partitions [SmokeTest-cntStoreName-changelog-1], it 
is likely that the consumer's position has fallen out of the topic partition 
offset range because the topic was truncated or compacted on the broker, 
marking the corresponding tasks as corrupted and re-initializing it later. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader)
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch position 
FetchPosition{offset=1, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], 
epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
   at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,345] WARN stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Detected the 
states of tasks {2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted. Will 
close the task as dirty and re-create and bootstrap from scratch. 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with changelogs 
{2_1=[SmokeTest-cntStoreName-changelog-1]} are corrupted and hence needs to be 
re-initialized
   at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Fetch 
position FetchPosition{offset=1, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[ducker03:9092 (id: 1 rack: null)], 
epoch=0}} is out of range for partition SmokeTest-cntStoreName-changelog-1
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1344)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1296)
   at 
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:611)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1280)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
   at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
   ... 3 more
[2020-07-08 17:08:09,346] INFO stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
Suspended running (org.apache.kafka.streams.processor.internals.StreamTask)
[2020-07-08 17:08:09,346] DEBUG stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
Closing its state manager and all the registered state stores: 
{sum-STATE-STORE-0000000050=StateStoreMetadata (sum-STATE-STORE-0000000050 : 
SmokeTest-sum-STATE-STORE-0000000050-changelog-1 @ null, 
cntStoreName=StateStoreMetadata (cntStoreName : 
SmokeTest-cntStoreName-changelog-1 @ 0} 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager)
[2020-07-08 17:08:09,346] INFO [Consumer 
clientId=SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2-restore-consumer,
 groupId=null] Subscribed to partition(s): SmokeTest-minStoreName-changelog-1, 
SmokeTest-minStoreName-changelog-2, 
SmokeTest-sum-STATE-STORE-0000000050-changelog-0, 
SmokeTest-minStoreName-changelog-3, 
SmokeTest-sum-STATE-STORE-0000000050-changelog-2, 
SmokeTest-maxStoreName-changelog-1, SmokeTest-cntStoreName-changelog-0, 
SmokeTest-maxStoreName-changelog-2, SmokeTest-cntStoreName-changelog-2, 
SmokeTest-maxStoreName-changelog-3, SmokeTest-cntByCnt-changelog-4 
(org.apache.kafka.clients.consumer.KafkaConsumer)
[2020-07-08 17:08:09,348] DEBUG stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Released state 
dir lock for task 2_1 
(org.apache.kafka.streams.processor.internals.StateDirectory)
[2020-07-08 17:08:09,348] INFO stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
Closing record collector dirty 
(org.apache.kafka.streams.processor.internals.RecordCollectorImpl)
[2020-07-08 17:08:09,348] INFO stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] task [2_1] 
Closed dirty (org.apache.kafka.streams.processor.internals.StreamTask){code}
However, there were already records buffered for it, so later on in the same 
processing loop, Streams tried to process that task, resulting in an 
IllegalStateException:
{code:java}
[2020-07-08 17:08:09,352] ERROR stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Failed to 
process stream task 2_1 due to the following error: 
(org.apache.kafka.streams.processor.internals.TaskManager)
org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName 
is currently closed.
   at 
org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
   at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
   at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
   at 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
   at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
   at 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
   at 
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
   at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
   at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
   at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
   at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
   at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
   at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
   at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
   at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
   at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
   at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
   at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,352] ERROR stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Encountered the 
following exception during processing and the thread is going to shut down:  
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.InvalidStateStoreException: Store cntStoreName 
is currently closed.
   at 
org.apache.kafka.streams.state.internals.WrappedStateStore.validateStoreOpen(WrappedStateStore.java:78)
   at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:202)
   at 
org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:40)
   at 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$getWithBinary$0(MeteredTimestampedKeyValueStore.java:63)
   at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:851)
   at 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.getWithBinary(MeteredTimestampedKeyValueStore.java:62)
   at 
org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:129)
   at 
org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
   at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
   at 
org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
   at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:236)
   at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
   at 
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:168)
   at 
org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
   at 
org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:679)
   at 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
   at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:679)
   at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1003)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:685)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:548)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:507)
[2020-07-08 17:08:09,352] INFO stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] State 
transition from RUNNING to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2020-07-08 17:08:09,352] INFO stream-thread 
[SmokeTest-66676ca8-d517-4e4b-bb5f-44203e24e569-StreamThread-2] Shutting down 
(org.apache.kafka.streams.processor.internals.StreamThread){code}
Which caused the entire thread to shut down.

 

Instead, we should not attempt to process tasks that are not running.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to