[ https://issues.apache.org/jira/browse/KAFKA-9994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Roesler resolved KAFKA-9994. --------------------------------- Resolution: Fixed > Catch TaskMigrated exception in task corruption code path > ---------------------------------------------------------- > > Key: KAFKA-9994 > URL: https://issues.apache.org/jira/browse/KAFKA-9994 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Boyang Chen > Assignee: Boyang Chen > Priority: Major > > We have seen a case where the TaskMigrated exception gets thrown from > taskManager.commit(). This should be prevented by proper catching. > Looking at the stack trace, the TaskMigrated was thrown from preCommit() call > inside corrupted task exception commit. > {code:java} > [2020-05-14T05:47:25-07:00] > (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) [2020-05-14 > 12:47:25,635] ERROR > [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] > stream-thread > [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] > Encountered the following exception during processing and the thread is going > to shut down: (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-05-14T05:47:25-07:00] > (streams-soak-trunk-eos_soak_i-0b5b559dda7970618_streamslog) > org.apache.kafka.streams.errors.TaskMigratedException: Producer got fenced > trying to send a record [stream-thread > [stream-soak-test-db8d1d42-5677-4a54-a0a1-89b7b0766493-StreamThread-1] task > [1_1]]; it means all tasks belonging to this thread should be migrated. > at > org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:216) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:171) > at > org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69) > at > org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedWindowBytesStore.log(ChangeLoggingTimestampedWindowBytesStore.java:36) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:112) > at > org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:34) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:111) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:91) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:124) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:296) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.flush(WrappedStateStore.java:84) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$flush$4(MeteredWindowStore.java:200) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:804) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:200) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:402) > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:317) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:573) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:517) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)