[ https://issues.apache.org/jira/browse/KAFKA-17850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna resolved KAFKA-17850. ----------------------------------- Fix Version/s: 4.0.0 Resolution: Fixed > Stop leaking internal exception to StreamsUncaughtExceptionHandler > ------------------------------------------------------------------ > > Key: KAFKA-17850 > URL: https://issues.apache.org/jira/browse/KAFKA-17850 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.9.0 > Reporter: Bruno Cadonna > Assignee: Sebastien Viale > Priority: Major > Fix For: 4.0.0 > > > In a log file of a Streams application I found the following log line: > {code:java} > [2024-10-08 16:45:03,846] ERROR [i-0ec21c43346d0de25-StreamThread-2] > stream-client [i-0ec21c43346d0de25] Replacing thread in the streams uncaught > exception handler (org.apache.kafka.streams.KafkaStreams) > org.apache.kafka.streams.errors.internals.FailedProcessingException > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:237) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216) > at > org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229) > at > org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45) > at > org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$5(MeteredWindowStore.java:163) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:113) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$init$0(CachingWindowStore.java:86) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:159) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:148) > at > org.apache.kafka.streams.state.internals.CachingWindowStore.flushCache(CachingWindowStore.java:414) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:79) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:537) > at > org.apache.kafka.streams.processor.internals.StreamTask.flush(StreamTask.java:423) > at > org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:447) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:147) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1925) > at > org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1892) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1382) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1034) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:712) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672) > Caused by: > org.apache.kafka.streams.StreamsSoakTest$ThrowableValueProcessorSupplier$ThreadReplaceableException > at > org.apache.kafka.streams.StreamsSoakTest$ThrowableValueProcessorSupplier$KaboomableValueProcessor.process(StreamsSoakTest.java:618) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:180) > ... 28 more > {code} > That indicates that the internal exception {{FailedProcessingException}} is > passed to the Streams-specific uncaught exception handler. Streams should not > pass internal exceptions to a public API that is supposed to be implemented > by the user since that means the user needs to be aware of an internal > exception. -- This message was sent by Atlassian Jira (v8.20.10#820010)