[ https://issues.apache.org/jira/browse/KAFKA-10616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17217813#comment-17217813 ]
Guozhang Wang commented on KAFKA-10616: --------------------------------------- I think this is a long lurking bug caused by https://github.com/apache/kafka/pull/9083. Generally speaking, before we close the topology we should always try to flush the state store manager (we actually do not really need to flush the whole store, but just the cache). I have this fix piggy-backed in my other PR https://github.com/apache/kafka/pull/8988 but it is dragging very long and every 3-4 days I'd have to rebase it again, so I do not feel that it could be merged soon. What I can do is to extract that fix along from the PR and merge it for 2.7 / 2.6. > StreamThread killed by "IllegalStateException: The processor is already > closed" > ------------------------------------------------------------------------------- > > Key: KAFKA-10616 > URL: https://issues.apache.org/jira/browse/KAFKA-10616 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: A. Sophie Blee-Goldman > Assignee: Guozhang Wang > Priority: Blocker > Fix For: 2.7.0 > > > Application is hitting "java.lang.IllegalStateException: The processor is > already closed". Over the course of about a day, this exception killed 21/100 > of the queries (StreamThreads). The (slightly trimmed) stacktrace: > > {code:java} > java.lang.RuntimeException: Caught an exception while closing caching window > store for store Aggregate-Aggregate-Materialize at > org.apache.kafka.streams.state.internals.ExceptionUtils.throwSuppressed(ExceptionUtils.java:39) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:432) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:527) > at > org.apache.kafka.streams.processor.internals.StreamTask.closeDirty(StreamTask.java:499) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleLostAll(TaskManager.java:626) > … Caused by: java.lang.IllegalStateException: The processor is already > closed at > org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:172) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:178) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:28) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$1(MeteredWindowStore.java:110) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:118) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$initInternal$0(CachingWindowStore.java:93) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:109) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:116) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$close$1(CachingWindowStore.java:427) > at > org.apache.kafka.streams.state.internals.ExceptionUtils.executeAll(ExceptionUtils.java:28) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.close(CachingWindowStore.java:426) > {code} > > I'm guessing we close the topology before closing the state states, so > records that get flushed during the caching store's close() will run into an > already-closed processor. During a clean close we should always flush before > closing anything (during prepareCommit()), but since this was a > handleLostAll() we would just skip right to suspend() and close the topology. > Presumably the right thing to do here is to flush the caches before closing > anything during a dirty close. -- This message was sent by Atlassian Jira (v8.3.4#803005)