becketqin commented on a change in pull request #17276:
URL: https://github.com/apache/flink/pull/17276#discussion_r708088425



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaPartitionSplitReader.java
##########
@@ -353,17 +353,19 @@ private void acquireAndSetStoppingOffsets(
             Set<TopicPartition> partitionsStoppingAtCommitted) {
         Map<TopicPartition, Long> endOffset = 
consumer.endOffsets(partitionsStoppingAtLatest);
         stoppingOffsets.putAll(endOffset);
-        consumer.committed(partitionsStoppingAtCommitted)
-                .forEach(
-                        (tp, offsetAndMetadata) -> {
-                            Preconditions.checkNotNull(
-                                    offsetAndMetadata,
-                                    String.format(
-                                            "Partition %s should stop at 
committed offset. "
-                                                    + "But there is no 
committed offset of this partition for group %s",
-                                            tp, groupId));
-                            stoppingOffsets.put(tp, 
offsetAndMetadata.offset());
-                        });
+        if (!partitionsStoppingAtCommitted.isEmpty()) {

Review comment:
       Why is this needed?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java
##########
@@ -481,5 +489,24 @@ private void sanityCheck() {
                 "No subscribe mode is specified, "
                         + "should be one of topics, topic pattern and 
partition set.");
         checkNotNull(deserializationSchema, "Deserialization schema is 
required but not provided.");
+        // Check consumer group ID
+        checkState(
+                props.containsKey(ConsumerConfig.GROUP_ID_CONFIG) || 
!offsetCommitEnabledManually(),
+                String.format(
+                        "Property %s is required when offset commit is 
enabled",
+                        ConsumerConfig.GROUP_ID_CONFIG));
+    }

Review comment:
       Ideally, we should probably also throw exception if the committedOffsets 
initializer is used when group id was not specified. It is a little tricky 
because `ReaderHandledOffsetsInitializerader` is package private. 
   
   One possible solution is to have an `OffsetInitializerValidator` interface. 
If an `OffsetInitializer` needs to be validated, it can implement that 
interface and the `KafkaSourceBuilder` will then validate the properties 
against that `OffsetInitializer`. At the beginning this interface could be 
private. We may consider exposing it to end users later if that turns out to be 
necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to