becketqin commented on a change in pull request #17276: URL: https://github.com/apache/flink/pull/17276#discussion_r708088425
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java ########## @@ -353,17 +353,19 @@ private void acquireAndSetStoppingOffsets( Set<TopicPartition> partitionsStoppingAtCommitted) { Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest); stoppingOffsets.putAll(endOffset); - consumer.committed(partitionsStoppingAtCommitted) - .forEach( - (tp, offsetAndMetadata) -> { - Preconditions.checkNotNull( - offsetAndMetadata, - String.format( - "Partition %s should stop at committed offset. " - + "But there is no committed offset of this partition for group %s", - tp, groupId)); - stoppingOffsets.put(tp, offsetAndMetadata.offset()); - }); + if (!partitionsStoppingAtCommitted.isEmpty()) { Review comment: Why is this needed? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java ########## @@ -481,5 +489,24 @@ private void sanityCheck() { "No subscribe mode is specified, " + "should be one of topics, topic pattern and partition set."); checkNotNull(deserializationSchema, "Deserialization schema is required but not provided."); + // Check consumer group ID + checkState( + props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || !offsetCommitEnabledManually(), + String.format( + "Property %s is required when offset commit is enabled", + ConsumerConfig.GROUP_ID_CONFIG)); + } Review comment: Ideally, we should probably also throw exception if the committedOffsets initializer is used when group id was not specified. It is a little tricky because `ReaderHandledOffsetsInitializerader` is package private. One possible solution is to have an `OffsetInitializerValidator` interface. If an `OffsetInitializer` needs to be validated, it can implement that interface and the `KafkaSourceBuilder` will then validate the properties against that `OffsetInitializer`. At the beginning this interface could be private. We may consider exposing it to end users later if that turns out to be necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org