[ 
https://issues.apache.org/jira/browse/KAFKA-17850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-17850.
-----------------------------------
    Fix Version/s: 4.0.0
       Resolution: Fixed

> Stop leaking internal exception to StreamsUncaughtExceptionHandler
> ------------------------------------------------------------------
>
>                 Key: KAFKA-17850
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17850
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.9.0
>            Reporter: Bruno Cadonna
>            Assignee: Sebastien Viale
>            Priority: Major
>             Fix For: 4.0.0
>
>
> In a log file of a Streams application I found the following log line:
> {code:java}
> [2024-10-08 16:45:03,846] ERROR [i-0ec21c43346d0de25-StreamThread-2] 
> stream-client [i-0ec21c43346d0de25] Replacing thread in the streams uncaught 
> exception handler (org.apache.kafka.streams.KafkaStreams)
> org.apache.kafka.streams.errors.internals.FailedProcessingException
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:237)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:216)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:42)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
>       at 
> org.apache.kafka.streams.kstream.internals.TimestampedCacheFlushListener.apply(TimestampedCacheFlushListener.java:45)
>       at 
> org.apache.kafka.streams.state.internals.MeteredWindowStore.lambda$setFlushListener$5(MeteredWindowStore.java:163)
>       at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.putAndMaybeForward(CachingWindowStore.java:113)
>       at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.lambda$init$0(CachingWindowStore.java:86)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:159)
>       at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:117)
>       at 
> org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:148)
>       at 
> org.apache.kafka.streams.state.internals.CachingWindowStore.flushCache(CachingWindowStore.java:414)
>       at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.flushCache(WrappedStateStore.java:79)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flushCache(ProcessorStateManager.java:537)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.flush(StreamTask.java:423)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.prepareCommit(StreamTask.java:447)
>       at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.commitTasksAndMaybeUpdateCommittableOffsets(TaskExecutor.java:147)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitTasksAndMaybeUpdateCommittableOffsets(TaskManager.java:1925)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.commit(TaskManager.java:1892)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1382)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1034)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:712)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:672)
> Caused by: 
> org.apache.kafka.streams.StreamsSoakTest$ThrowableValueProcessorSupplier$ThreadReplaceableException
>       at 
> org.apache.kafka.streams.StreamsSoakTest$ThrowableValueProcessorSupplier$KaboomableValueProcessor.process(StreamsSoakTest.java:618)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:180)
>       ... 28 more
> {code} 
> That indicates that the internal exception {{FailedProcessingException}} is 
> passed to the Streams-specific uncaught exception handler. Streams should not 
> pass internal exceptions to a public API that is supposed to be implemented 
> by the user since that means the user needs to be aware of an internal 
> exception. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to