[ https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Boyang Chen resolved KAFKA-10010. --------------------------------- Resolution: Fixed > Should make state store registration idempotent > ----------------------------------------------- > > Key: KAFKA-10010 > URL: https://issues.apache.org/jira/browse/KAFKA-10010 > Project: Kafka > Issue Type: Bug > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > > The current lost all logic doesn't close standby task, which could > potentially lead to a tricky condition like below: > 1. The standby task was initializing as `CREATED` state, and task corrupted > exception was thrown from registerStateStores > 2. The task corrupted exception was caught, and do a non-affected task commit > 3. The task commit failed due to task migrated exception > 4. The handleLostAll didn't close the standby task, leaving it as CREATED > state > 5. Next rebalance complete, the same task was assigned back as standby task. > 6. Illegal Argument exception caught : > {code:java} > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 > 18:56:18,050] ERROR > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-16T11:56:18-07:00] > (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) > java.lang.IllegalArgumentException: stream-thread > [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] > standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-0000000007 has already > been registered. > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112) > at > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82) > at > org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89) > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)