PatrickRen commented on a change in pull request #17276: URL: https://github.com/apache/flink/pull/17276#discussion_r708100492
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java ########## @@ -353,17 +353,19 @@ private void acquireAndSetStoppingOffsets( Set<TopicPartition> partitionsStoppingAtCommitted) { Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest); stoppingOffsets.putAll(endOffset); - consumer.committed(partitionsStoppingAtCommitted) - .forEach( - (tp, offsetAndMetadata) -> { - Preconditions.checkNotNull( - offsetAndMetadata, - String.format( - "Partition %s should stop at committed offset. " - + "But there is no committed offset of this partition for group %s", - tp, groupId)); - stoppingOffsets.put(tp, offsetAndMetadata.offset()); - }); + if (!partitionsStoppingAtCommitted.isEmpty()) { Review comment: Thanks for the review @becketqin ! `consumer#committed` will check `group.id` in properties, but user can totally skip providing `group.id` if no partition stops at committed offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org