Chuckame opened a new pull request, #19816:
URL: https://github.com/apache/kafka/pull/19816

   Currently, building the topology with custom graph nodes fails with a NPE 
because the GraphNode#buildPriority field is null. We can set this value to an 
empiric one (like taking the one from the previous node), but the best is to 
reuse the same method as all the other nodes.
   
   ---
   
   Problematic code:
   
https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L310
   
   NPE here:
   
https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L327
   
   Because of `buildPriority` being null, only set here currently:
   
https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java#L269
   
   --- 
   
   Impact in our codebase:
   ```diff
   internal class ReplayableKTable<K, V>(
       private val replayedTable: KTableImpl<K, *, V>,
   ) : AbstractStream<K, V>(replayedTable as AbstractStream<K, V>) {
       fun replayFromInternalTopic(commandTopicPartitions: Int): KTable<K, V> {
           val replayedTableName = name
           val replayCommandTopic = "$replayedTableName-replay-command"
   
           
builder.internalTopologyBuilder().addInternalTopic(replayCommandTopic, 
InternalTopicProperties(commandTopicPartitions))
           val replayTopicSourceNode = <code to make the replay command graph 
node>
   
           val replayNode = ReplayableTableGraphNode(
               "$replayedTableName-replay",
               replayTopicSourceNode.nodeName(),
               replayedTable,
               replayedTable.valueGetterSupplier(),
               name,
               subTopologySourceNodes
           )
   
   -        replayNode.setBuildPriority(replayTopicSourceNode.buildPriority())
   -        replayTopicSourceNode.addChild(replayNode)
   -        graphNode.addChild(replayNode)
   +        builder.addGraphNode(listOf(replayTopicSourceNode, graphNode), 
replayNode)
   
           return KTableImpl<K, Any, V>(
               name,
               keySerde,
               valueSerde,
               subTopologySourceNodes + replayTopicSourceNode.nodeName(),
               replayedTable.queryableStoreName(),
               replayNode.processor,
               replayNode,
               builder
           )
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to