vvcephei commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r436208312
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ########## @@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { } private Map<TopicPartition, Long> committedOffsetForChangelogs(final Set<TopicPartition> partitions) { - if (partitions.isEmpty()) - return Collections.emptyMap(); - Review comment: Ah, now I see it. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java ########## @@ -95,19 +99,65 @@ public static String getTaskProducerClientId(final String threadClientId, final return result; } - public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions, - final Admin adminClient) { - final Map<TopicPartition, ListOffsetsResultInfo> endOffsets; + /** + * @throws StreamsException if the consumer throws an exception + * @throws org.apache.kafka.common.errors.TimeoutException if the request times out + */ + public static Map<TopicPartition, Long> fetchCommittedOffsets(final Set<TopicPartition> partitions, + final Consumer<byte[], byte[]> consumer) { + if (partitions.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<TopicPartition, Long> committedOffsets; try { - final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets( - partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) - .all(); - endOffsets = future.get(); + // those which do not have a committed offset would default to 0 + committedOffsets = consumer.committed(partitions).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset())); + } catch (final TimeoutException e) { + LOG.warn("The committed offsets request timed out, try increasing the consumer client's default.api.timeout.ms", e); + throw e; + } catch (final KafkaException e) { + LOG.warn("The committed offsets request failed.", e); + throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e); + } + + return committedOffsets; + } + public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOffsetsFuture(final Collection<TopicPartition> partitions, + final Admin adminClient) { + return adminClient.listOffsets( + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) + .all(); + } + + /** + * A helper method that wraps the {@code Future#get} call and rethrows any thrown exception as a StreamsException + * @throws StreamsException if the admin client request throws an exception + * @throws org.apache.kafka.common.errors.TimeoutException if the request times out + */ + public static Map<TopicPartition, ListOffsetsResultInfo> getEndOffsets(final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture) { + try { + return endOffsetsFuture.get(); + } catch (final TimeoutException e) { + LOG.warn("The listOffsets request timed out, try increasing the admin client's default.api.timeout.ms", e); + throw e; Review comment: Upon retrospect, I'm not sure if this is possible. The javadoc for Future#get indicates that any exception would be wrapped in an ExecutionException. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org