ableegoldman commented on code in PR #15790:
URL: https://github.com/apache/kafka/pull/15790#discussion_r1619985405


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -58,8 +60,21 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> 
child) {
     public void init(final InternalProcessorContext<Void, Void> context) {
         super.init(context);
         this.context = context;
-        keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
-        valSerializer = prepareValueSerializer(valSerializer, context, 
this.name());
+        try {
+            keySerializer = prepareKeySerializer(keySerializer, context, 
this.name());
+        } catch (final ConfigException e) {
+            throw new ConfigException(String.format("Failed to initialize key 
serdes for sink node %s", name()));
+        } catch (final StreamsException e) {

Review Comment:
   Can't say what it's original purpose was but StreamsException has definitely 
morphed into a catch-all for exceptions throughout Streams. It's definitely not 
exclusive to the state of a task though (that would be 
ProcessorStateException). The nice thing about StreamsException is you can add 
other useful metadata such as the taskId where the error originated, so I 
always prefer to just throw the StreamsException. We also know for a fact that 
StreamsException will be caught and handled properly as it gets bubbled up. So 
I'd go for merging this into a single `catch RuntimeException` block, then wrap 
it in a StreamsException. 
   
   And don't forget to add the task id too! 😄  (you can get it from the 
processor context)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to