Copilot commented on code in PR #9598:
URL: https://github.com/apache/seatunnel/pull/9598#discussion_r2218407425


##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java:
##########
@@ -333,10 +333,23 @@ private Set<KafkaSourceSplit> getTopicInfo() throws 
ExecutionException, Interrup
         }
         log.info("Discovered topics: {}", topics);
         Collection<TopicPartition> partitions =
-                
adminClient.describeTopics(topics).all().get().values().stream()
+                
adminClient.describeTopics(topics).allTopicNames().get().values().stream()

Review Comment:
   The method call `allTopicNames()` appears incorrect. Based on the context 
and typical Kafka AdminClient API, this should likely be `all()` to get all 
topic descriptions.
   ```suggestion
                   
adminClient.describeTopics(topics).all().get().values().stream()
   ```



##########
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java:
##########
@@ -333,10 +333,23 @@ private Set<KafkaSourceSplit> getTopicInfo() throws 
ExecutionException, Interrup
         }
         log.info("Discovered topics: {}", topics);
         Collection<TopicPartition> partitions =
-                
adminClient.describeTopics(topics).all().get().values().stream()
+                
adminClient.describeTopics(topics).allTopicNames().get().values().stream()
                         .flatMap(
                                 t ->
                                         t.partitions().stream()
+                                                .filter(
+                                                        partitionInfo -> {
+                                                            if (isStreamingMode
+                                                                    && 
partitionInfo.leader()
+                                                                            == 
null) {
+                                                                log.warn(
+                                                                        
"Partition {} of topic {} has no leader.",

Review Comment:
   The null check logic is incorrect. The condition checks 
`partitionInfo.leader() == null` but should check if the leader ID is -1, as 
mentioned in the PR description. Leader being null and leader being -1 are 
different conditions.
   ```suggestion
                                                                               
.id() == -1) {
                                                                   log.warn(
                                                                           
"Partition {} of topic {} has no valid leader (leader ID is -1).",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to