kirktrue commented on code in PR #17353: URL: https://github.com/apache/kafka/pull/17353#discussion_r1792629381
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ########## @@ -1072,12 +1073,10 @@ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitio } try { - return applicationEventHandler.addAndGet(listOffsetsEvent) - .entrySet() - .stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().buildOffsetAndTimestamp())); + Map<TopicPartition, OffsetAndTimestampInternal> offsets = applicationEventHandler.addAndGet(listOffsetsEvent); + Map<TopicPartition, OffsetAndTimestamp> results = new HashMap<>(offsets.size()); + offsets.forEach((k, v) -> results.put(k, v != null ? v.buildOffsetAndTimestamp() : null)); Review Comment: > If it's necessary to align the behavior with `ClassicKafkaConsumer`, the other methods like `beginningOffsets` and `endOffsets` might also encounter a similar issue with the `null` value. Yes, I noticed that, but I thought those didn't need to change... Interestingly, [`OffsetFetcher.beginningOrEndOffset()`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L194-L210) (used by `ClassicKafkaConsumer`) does _not_ pre-populate the returned hash table with `null`s. As a result, `beginningOffsets()` and `endOffsets()` for the `ClassicKafkaConsumer` doesn't have to worry about potential `NullPointerExceptions`. So I'd decided that the `AsyncKafkaConsumer`'s corresponding methods also didn't have the same potential for `NullPointerException`s. _However_, I just double-checked, and it turns out that `AsyncKafkaConsumer` is using a lot of the same underlying code for `beginningOffsets()` and `endOffsets()` as it does for `offsetsForTimes()`. So the new consumer is also pre-populating the returned hash tables for `beginningOffsets()` and `endOffsets()` with `null`, which is not consistent with the old consumer. I'll add some unit tests to trigger these cases and make sure `AsyncKafkaConsumer` is consistent with `ClassicKafkaConsumer`, warts and all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org