[ https://issues.apache.org/jira/browse/KAFKA-6349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Damian Guy reassigned KAFKA-6349: --------------------------------- Assignee: Damian Guy > ConcurrentModificationException during streams state restoration > ---------------------------------------------------------------- > > Key: KAFKA-6349 > URL: https://issues.apache.org/jira/browse/KAFKA-6349 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Damian Guy > Assignee: Damian Guy > Priority: Blocker > Fix For: 1.1.0 > > Attachments: streams-error.log > > > During application startup and state restoration a > {{ConcurrentModificationException}} was thrown from {{AbstractStateManager}} > {code} > [2017-12-12 10:47:09,840] ERROR [streams-saak-test-client-StreamThread-2] > stream-thread [streams-saak-test-client-StreamThread-2] Encountered the > following error during processing: > (org.apache.kafka.streams.processor.internals.StreamThread) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:752) > at > java.util.LinkedHashMap$LinkedEntryIterator.next(LinkedHashMap.java:750) > at > org.apache.kafka.streams.processor.internals.AbstractStateManager.reinitializeStateStoresForPartitions(AbstractStateManager.java:74) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.reinitializeStateStoresForPartitions(ProcessorStateManager.java:155) > at > org.apache.kafka.streams.processor.internals.AbstractTask.reinitializeStateStoresForPartitions(AbstractTask.java:229) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > {code} > The root cause is due to an {{OffsetOutOfRangeException}} that causes the > store to be re-initialized: > {code} > [2017-12-12 10:46:04,861] WARN [streams-saak-test-client-StreamThread-2] > stream-thread [streams-saak-test-client-StreamThread-2] Restoring StreamTasks > failed. Deleting StreamTasks stores to recreate from scratch. > (org.apache.kafka.streams.processor.internals.StoreChangelogReader) > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of > range with no configured reset policy for partitions: > {streams-soak-test-app-KSTREAM-AGGREGATE-STATE-STORE-0000000048-changelog-9=0} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:883) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:528) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1173) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1106) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:84) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:325) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:789) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)