ableegoldman commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1619985405
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ########## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> child) { public void init(final InternalProcessorContext<Void, Void> context) { super.init(context); this.context = context; - keySerializer = prepareKeySerializer(keySerializer, context, this.name()); - valSerializer = prepareValueSerializer(valSerializer, context, this.name()); + try { + keySerializer = prepareKeySerializer(keySerializer, context, this.name()); + } catch (final ConfigException e) { + throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); + } catch (final StreamsException e) { Review Comment: Can't say what it's original purpose was but StreamsException has definitely morphed into a catch-all for exceptions throughout Streams. It's definitely not exclusive to the state of a task though (that would be ProcessorStateException). The nice thing about StreamsException is you can add other useful metadata such as the taskId where the error originated, so I always prefer to just throw the StreamsException. We also know for a fact that StreamsException will be caught and handled properly as it gets bubbled up. So I'd go for merging this into a single `catch RuntimeException` block, then wrap it in a StreamsException. And don't forget to add the task id too! 😄 (you can get it from the processor context) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org