yihua commented on code in PR #10987:
URL: https://github.com/apache/hudi/pull/10987#discussion_r1558846954


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -331,24 +331,35 @@ private List<PartitionInfo> 
fetchPartitionInfos(KafkaConsumer consumer, String t
 
   /**
    * Fetch checkpoint offsets for each partition.
-   * @param consumer instance of {@link KafkaConsumer} to fetch offsets from.
+   *
+   * @param consumer          instance of {@link KafkaConsumer} to fetch 
offsets from.
    * @param lastCheckpointStr last checkpoint string.
-   * @param topicPartitions set of topic partitions.
+   * @param topicPartitions   set of topic partitions.
    * @return a map of Topic partitions to offsets.
    */
   private Map<TopicPartition, Long> fetchValidOffsets(KafkaConsumer consumer,
-                                                        Option<String> 
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
+                                                      Option<String> 
lastCheckpointStr, Set<TopicPartition> topicPartitions) {
     Map<TopicPartition, Long> earliestOffsets = 
consumer.beginningOffsets(topicPartitions);
     Map<TopicPartition, Long> checkpointOffsets = 
CheckpointUtils.strToOffsets(lastCheckpointStr.get());
-    boolean isCheckpointOutOfBounds = checkpointOffsets.entrySet().stream()
-        .anyMatch(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey()));
+    List<TopicPartition> outOfBoundPartitionList = 
checkpointOffsets.entrySet().stream()
+        .filter(offset -> offset.getValue() < 
earliestOffsets.get(offset.getKey()))
+        .map(Map.Entry::getKey)
+        .collect(Collectors.toList());
+    boolean isCheckpointOutOfBounds = !outOfBoundPartitionList.isEmpty();
+
     if (isCheckpointOutOfBounds) {
+      String outOfBoundOffsets = outOfBoundPartitionList.stream()
+          .map(p -> p.toString() + ":{checkpoint=" + checkpointOffsets.get(p)
+              + ",earliestOffset=" + earliestOffsets.get(p) + "}")
+          .collect(Collectors.joining(","));
+      String message = "Some data may have been lost because they are not 
available in Kafka any more;"
+          + " either the data was aged out by Kafka or the topic may have been 
deleted before all the data in the topic was processed. "
+          + "Kafka partitions that have out-of-bound checkpoints: " + 
outOfBoundOffsets + " .";
+

Review Comment:
   I think we can keep the message verbose as this is intended for the user to 
read.  I adjusted the test to also check the new prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to