ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r437788584



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java
##########
@@ -95,19 +99,65 @@ public static String getTaskProducerClientId(final String 
threadClientId, final
         return result;
     }
 
-    public static Map<TopicPartition, ListOffsetsResultInfo> 
fetchEndOffsets(final Collection<TopicPartition> partitions,
-                                                                             
final Admin adminClient) {
-        final Map<TopicPartition, ListOffsetsResultInfo> endOffsets;
+    /**
+     * @throws StreamsException if the consumer throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request 
times out
+     */
+    public static Map<TopicPartition, Long> fetchCommittedOffsets(final 
Set<TopicPartition> partitions,
+                                                                  final 
Consumer<byte[], byte[]> consumer) {
+        if (partitions.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        final Map<TopicPartition, Long> committedOffsets;
         try {
-            final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
future =  adminClient.listOffsets(
-                
partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> 
OffsetSpec.latest())))
-                                                                               
         .all();
-            endOffsets = future.get();
+            // those which do not have a committed offset would default to 0
+            committedOffsets = 
consumer.committed(partitions).entrySet().stream()
+                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
== null ? 0L : e.getValue().offset()));
+        } catch (final TimeoutException e) {
+            LOG.warn("The committed offsets request timed out, try increasing 
the consumer client's default.api.timeout.ms", e);
+            throw e;
+        } catch (final KafkaException e) {
+            LOG.warn("The committed offsets request failed.", e);
+            throw new StreamsException(String.format("Failed to retrieve end 
offsets for %s", partitions), e);
+        }
+
+        return committedOffsets;
+    }
 
+    public static KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
fetchEndOffsetsFuture(final Collection<TopicPartition> partitions,
+                                                                               
                 final Admin adminClient) {
+        return adminClient.listOffsets(
+            partitions.stream().collect(Collectors.toMap(Function.identity(), 
tp -> OffsetSpec.latest())))
+            .all();
+    }
+
+    /**
+     * A helper method that wraps the {@code Future#get} call and rethrows any 
thrown exception as a StreamsException
+     * @throws StreamsException if the admin client request throws an exception
+     * @throws org.apache.kafka.common.errors.TimeoutException if the request 
times out
+     */
+    public static Map<TopicPartition, ListOffsetsResultInfo> 
getEndOffsets(final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> 
endOffsetsFuture) {
+        try {
+            return endOffsetsFuture.get();
+        } catch (final TimeoutException e) {
+            LOG.warn("The listOffsets request timed out, try increasing the 
admin client's default.api.timeout.ms", e);
+            throw e;

Review comment:
       Good catch. Do you think it should still be thrown/treated separately, 
though?  See also my comment in StreamsPartitionAssignor below 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to