ableegoldman commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r437788584
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java ########## @@ -95,19 +99,65 @@ public static String getTaskProducerClientId(final String threadClientId, final return result; } - public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions, - final Admin adminClient) { - final Map<TopicPartition, ListOffsetsResultInfo> endOffsets; + /** + * @throws StreamsException if the consumer throws an exception + * @throws org.apache.kafka.common.errors.TimeoutException if the request times out + */ + public static Map<TopicPartition, Long> fetchCommittedOffsets(final Set<TopicPartition> partitions, + final Consumer<byte[], byte[]> consumer) { + if (partitions.isEmpty()) { + return Collections.emptyMap(); + } + + final Map<TopicPartition, Long> committedOffsets; try { - final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets( - partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) - .all(); - endOffsets = future.get(); + // those which do not have a committed offset would default to 0 + committedOffsets = consumer.committed(partitions).entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset())); + } catch (final TimeoutException e) { + LOG.warn("The committed offsets request timed out, try increasing the consumer client's default.api.timeout.ms", e); + throw e; + } catch (final KafkaException e) { + LOG.warn("The committed offsets request failed.", e); + throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e); + } + + return committedOffsets; + } + public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> fetchEndOffsetsFuture(final Collection<TopicPartition> partitions, + final Admin adminClient) { + return adminClient.listOffsets( + partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) + .all(); + } + + /** + * A helper method that wraps the {@code Future#get} call and rethrows any thrown exception as a StreamsException + * @throws StreamsException if the admin client request throws an exception + * @throws org.apache.kafka.common.errors.TimeoutException if the request times out + */ + public static Map<TopicPartition, ListOffsetsResultInfo> getEndOffsets(final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> endOffsetsFuture) { + try { + return endOffsetsFuture.get(); + } catch (final TimeoutException e) { + LOG.warn("The listOffsets request timed out, try increasing the admin client's default.api.timeout.ms", e); + throw e; Review comment: Good catch. Do you think it should still be thrown/treated separately, though? See also my comment in StreamsPartitionAssignor below ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org