PatrickRen commented on a change in pull request #17276:
URL: https://github.com/apache/flink/pull/17276#discussion_r708100492



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -353,17 +353,19 @@ private void acquireAndSetStoppingOffsets(
             Set<TopicPartition> partitionsStoppingAtCommitted) {
         Map<TopicPartition, Long> endOffset = 
consumer.endOffsets(partitionsStoppingAtLatest);
         stoppingOffsets.putAll(endOffset);
-        consumer.committed(partitionsStoppingAtCommitted)
-                .forEach(
-                        (tp, offsetAndMetadata) -> {
-                            Preconditions.checkNotNull(
-                                    offsetAndMetadata,
-                                    String.format(
-                                            "Partition %s should stop at 
committed offset. "
-                                                    + "But there is no 
committed offset of this partition for group %s",
-                                            tp, groupId));
-                            stoppingOffsets.put(tp, 
offsetAndMetadata.offset());
-                        });
+        if (!partitionsStoppingAtCommitted.isEmpty()) {

Review comment:
       Thanks for the review @becketqin ! `consumer#committed` will check 
`group.id` in properties, but user can totally skip providing `group.id` if no 
partition stops at committed offset.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to