kirktrue commented on code in PR #17353:
URL: https://github.com/apache/kafka/pull/17353#discussion_r1792629381


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1072,12 +1073,10 @@ public Map<TopicPartition, OffsetAndTimestamp> 
offsetsForTimes(Map<TopicPartitio
             }
 
             try {
-                return applicationEventHandler.addAndGet(listOffsetsEvent)
-                    .entrySet()
-                    .stream()
-                    .collect(Collectors.toMap(
-                        Map.Entry::getKey,
-                        entry -> entry.getValue().buildOffsetAndTimestamp()));
+                Map<TopicPartition, OffsetAndTimestampInternal> offsets = 
applicationEventHandler.addAndGet(listOffsetsEvent);
+                Map<TopicPartition, OffsetAndTimestamp> results = new 
HashMap<>(offsets.size());
+                offsets.forEach((k, v) -> results.put(k, v != null ? 
v.buildOffsetAndTimestamp() : null));

Review Comment:
   > If it's necessary to align the behavior with `ClassicKafkaConsumer`, the 
other methods like `beginningOffsets` and `endOffsets` might also encounter a 
similar issue with the `null` value.
   
   Yes, I noticed that, but I thought those didn't need to change...
   
   Interestingly, 
[`OffsetFetcher.beginningOrEndOffset()`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L194-L210)
 (used by `ClassicKafkaConsumer`) does _not_ pre-populate the returned hash 
table with `null`s. As a result, `beginningOffsets()` and `endOffsets()` for 
the `ClassicKafkaConsumer` doesn't have to worry about potential 
`NullPointerExceptions`. So I'd decided that the `AsyncKafkaConsumer`'s 
corresponding methods also didn't have the same potential for 
`NullPointerException`s.
   
   _However_, I just double-checked, and it turns out that `AsyncKafkaConsumer` 
is using a lot of the same underlying code for `beginningOffsets()` and 
`endOffsets()` as it does for `offsetsForTimes()`. So the new consumer is also 
pre-populating the returned hash tables for `beginningOffsets()` and 
`endOffsets()` with `null`, which is not consistent with the old consumer.
   
   I'll add some unit tests to trigger these cases and make sure 
`AsyncKafkaConsumer` is consistent with `ClassicKafkaConsumer`, warts and all.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to