[ https://issues.apache.org/jira/browse/KAFKA-17099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruno Cadonna updated KAFKA-17099: ---------------------------------- Fix Version/s: 4.0.0 > Improve the process exception logs with the exact processor node name in > which processing exceptions occur > ---------------------------------------------------------------------------------------------------------- > > Key: KAFKA-17099 > URL: https://issues.apache.org/jira/browse/KAFKA-17099 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Loïc Greffier > Assignee: Loïc Greffier > Priority: Minor > Fix For: 4.0.0 > > > h2. Current Behaviour > When an exception occurs in a processor node, the task executor does not log > the actual processor node where the exception occurs. > > For example, considering the following topology: > > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [PERSON_TOPIC]) > --> KSTREAM-PEEK-0000000001 > Processor: KSTREAM-PEEK-0000000001 (stores: []) > --> KSTREAM-MAP-0000000002 > <-- KSTREAM-SOURCE-0000000000 > Processor: KSTREAM-MAP-0000000002 (stores: []) > --> KSTREAM-SINK-0000000003 > <-- KSTREAM-PEEK-0000000001 > Sink: KSTREAM-SINK-0000000003 (topic: PERSON_MAP_TOPIC) > <-- KSTREAM-MAP-0000000002 > > When an exception is thrown in the processor KSTREAM-MAP-0000000002, the > following information will be logged by the task executor: > > 2024-07-08T22:17:19.926+02:00 INFO 10552 — [-StreamThread-1] > i.g.l.s.map.app.KafkaStreamsTopology : Received key = 0, value = \{"id": > 0, "firstName": "Ethan", "lastName": "Moore", "nationality": "CH", > "birthDate": "2011-02-21T15:45:12Z"} > 2024-07-08T22:17:30.082+02:00 ERROR 10552 — [-StreamThread-1] > o.a.k.s.p.internals.TaskExecutor : stream-thread > [streams-map-StreamThread-1] Failed to process stream task 0_0 due to the > following error: > > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=PERSON_TOPIC, > partition=0, offset=0, stacktrace=java.lang.RuntimeException: Something bad > happened... > at > io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33) > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) > at > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:767) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1765) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:807) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579) > ~[kafka-streams-3.6.1.jar:na] > Caused by: java.lang.RuntimeException: Something bad happened... > at > io.github.loicgreffier.streams.map.app.KafkaStreamsTopology.lambda$topology$1(KafkaStreamsTopology.java:33) > ~[classes/:na] > at > org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:46) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:215) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:42) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:154) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:810) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:810) > ~[kafka-streams-3.6.1.jar:na] > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:741) > ~[kafka-streams-3.6.1.jar:na] > ... 6 common frames omitted > > On line #1 of the stack trace, it appears that an exception has been caught > in the processor KSTREAM-SOURCE-0000000000 while the exception actually > occurred in KSTREAM-MAP-0000000002. > h2. Expected Behaviour > The stack trace should provide the precise node in which the exception > occurred (e.g., KSTREAM-MAP-0000000002). > h2. Current Limitation > The current limitation is that processing exceptions are caught in the > [stream > task#process|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L802] > where it is not possible to get the exact processor node where the exception > occurred. > h2. Improvement Proposal > With the changes brought by KAFKA-16448, processing exceptions will be caught > at the processor node level and wrapped into an internal exception named > *FailedProcessingException* before being thrown to the stream task. > > This change should allow to identify the precise processor node where a > processing exception occurs and bring its name up to the stream task where it > will be used to build the > [StreamsException|https://github.com/apache/kafka/blob/25230b538841a5e7256b1b51725361dd59435901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L803] > that will appear in the logs. > > This improvement comes from the following PR > [https://github.com/apache/kafka/pull/16093] and the following discussion > [https://github.com/apache/kafka/pull/16093#issuecomment-2200265168.] -- This message was sent by Atlassian Jira (v8.20.10#820010)