[ 
https://issues.apache.org/jira/browse/KAFKA-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109610#comment-17109610
 ] 

Boyang Chen commented on KAFKA-10010:
-------------------------------------

The gain for keeping the standby task open is not much when the thread is 
already under corruption or task migrated. Adding a de-register logic would 
certainly increase the complexity, which I'm not sure at the moment whether it 
is the correct move, because we need to handle that logic properly with another 
set of try-catch.

> Should close standby task for safety during HandleLostAll
> ---------------------------------------------------------
>
>                 Key: KAFKA-10010
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10010
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Boyang Chen
>            Assignee: Boyang Chen
>            Priority: Major
>
> The current lost all logic doesn't close standby task, which could 
> potentially lead to a tricky condition like below:
> 1. The standby task was initializing as `CREATED` state, and task corrupted 
> exception was thrown from registerStateStores
> 2. The task corrupted exception was caught, and do a non-affected task commit
> 3. The task commit failed due to task migrated exception
> 4. The handleLostAll didn't close the standby task, leaving it as CREATED 
> state
> 5. Next rebalance complete, the same task was assigned back as standby task.
> 6. Illegal Argument exception caught :
> {code:java}
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) [2020-05-16 
> 18:56:18,050] ERROR 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> Encountered the following exception during processing and the thread is going 
> to shut down:  (org.apache.kafka.streams.processor.internals.StreamThread)
> [2020-05-16T11:56:18-07:00] 
> (streams-soak-trunk-eos-beta_soak_i-065b27929d3e7014a_streamslog) 
> java.lang.IllegalArgumentException: stream-thread 
> [stream-soak-test-ce5e4abb-24f5-48cd-9608-21c59b6b42a3-StreamThread-1] 
> standby-task [1_2] Store KSTREAM-AGGREGATE-STATE-STORE-0000000007 has already 
> been registered.
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStore(ProcessorStateManager.java:269)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:112)
>         at 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore.init(AbstractRocksDBSegmentedBytesStore.java:191)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.init(ChangeLoggingWindowBytesStore.java:54)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:74)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$init$0(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804)
>         at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:85)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:82)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:89)
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:358)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:664)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:550)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:509)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to