This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new bd0f08ee04c3 [MINOR] Minor improvements to DFSTestSuitePathSelector 
for easier debugging (#18244)
bd0f08ee04c3 is described below

commit bd0f08ee04c3be5a0898288d9e3c07c72716b097
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Mar 13 07:49:13 2026 +0530

    [MINOR] Minor improvements to DFSTestSuitePathSelector for easier debugging 
(#18244)
    
    
    
    ---------
    
    Co-authored-by: Lokesh Jain <[email protected]>
---
 .../test-suite/deltastreamer-async-clustering.yaml | 49 ++++++++++++++++++++
 .../test-suite/deltastreamer-async-compaction.yaml | 53 ++++++++++++++++++++++
 docker/setup_demo.sh                               |  6 +--
 hudi-integ-test/README.md                          |  4 +-
 .../helpers/DFSTestSuitePathSelector.java          | 25 +++++-----
 5 files changed, 122 insertions(+), 15 deletions(-)

diff --git a/docker/demo/config/test-suite/deltastreamer-async-clustering.yaml 
b/docker/demo/config/test-suite/deltastreamer-async-clustering.yaml
new file mode 100644
index 000000000000..3239932995f7
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-async-clustering.yaml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: deltastreamer-async-clustering.yaml
+dag_rounds: 6
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_bulk_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      repeat_count: 5
+      num_records_insert: 1000
+    type: BulkInsertNode
+    deps: none
+  second_bulk_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      repeat_count: 5
+      num_records_insert: 1000
+    deps: first_bulk_insert
+    type: BulkInsertNode
+  third_bulk_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      repeat_count: 5
+      num_records_insert: 300
+    deps: second_bulk_insert
+    type: BulkInsertNode
+  first_validate:
+    config:
+      validate_hive: false
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
+    type: ValidateDatasetNode
+    deps: third_bulk_insert
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/deltastreamer-async-compaction.yaml 
b/docker/demo/config/test-suite/deltastreamer-async-compaction.yaml
new file mode 100644
index 000000000000..7ea769b148f9
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-async-compaction.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: deltastreamer-async-clustering.yaml
+dag_rounds: 6
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      repeat_count: 5
+      num_records_insert: 10000
+    type: BulkInsertNode
+    deps: none
+  first_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      num_records_insert: 300
+      repeat_count: 5
+      num_records_upsert: 2000
+      num_partitions_upsert: 5
+    type: UpsertNode
+    deps: first_insert
+  second_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      num_records_insert: 300
+      repeat_count: 5
+      num_records_upsert: 2000
+      num_partitions_upsert: 5
+    type: UpsertNode
+    deps: first_upsert
+  first_validate:
+    config:
+      validate_hive: false
+      max_wait_time_for_deltastreamer_catch_up_ms: 600000
+    type: ValidateDatasetNode
+    deps: second_upsert
\ No newline at end of file
diff --git a/docker/setup_demo.sh b/docker/setup_demo.sh
index d183086d26c7..79ab01f6deba 100755
--- a/docker/setup_demo.sh
+++ b/docker/setup_demo.sh
@@ -24,13 +24,13 @@ if [ "$HUDI_DEMO_ENV" = "--mac-aarch64" ]; then
   COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml"
 fi
 # restart cluster
-HUDI_WS=${WS_ROOT} docker compose down -f 
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME}
+HUDI_WS=${WS_ROOT} docker compose -f 
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} down
 if [ "$HUDI_DEMO_ENV" != "dev" ]; then
   echo "Pulling docker demo images ..."
-  HUDI_WS=${WS_ROOT} docker compose -f 
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
+  HUDI_WS=${WS_ROOT} docker compose -f 
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME}  pull
 fi
 sleep 5
-HUDI_WS=${WS_ROOT} docker compose up -f 
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME}  -d
+HUDI_WS=${WS_ROOT} docker compose -f 
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up  -d
 sleep 15
 
 docker exec -it adhoc-1 /bin/bash 
/var/hoodie/ws/docker/demo/setup_demo_container.sh
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index c64a1b12f4eb..410dc6fab712 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -698,7 +698,9 @@ Here is the full command:
 --min-sync-interval-seconds 20
 ```
 
-We can use any yaml and properties file w/ above spark-submit command to test 
deltastreamer w/ async table services. 
+We can use any yaml and properties file w/ above spark-submit command to test 
deltastreamer w/ async table services.
+For running async clustering with deltastreamer, following property can be 
set: hoodie.clustering.async.enabled=true.
+Similarly for running async compaction, set hoodie.parquet.small.file.limit=0 
so that log files are generated.
 
 ## Automated tests for N no of yamls in Local Docker environment
 
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index 70026aa5f7fb..1ea145664742 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -58,15 +59,14 @@ public class DFSTestSuitePathSelector extends 
DFSPathSelector {
   public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
       Option<String> lastCheckpointStr, long sourceLimit) {
 
-    Integer lastBatchId;
-    Integer nextBatchId;
+    Long lastBatchId;
+    Long nextBatchId;
     try {
-      if (lastCheckpointStr.isPresent()) {
-        lastBatchId = Integer.parseInt(lastCheckpointStr.get());
+      if (lastCheckpointStr.isPresent() && (lastBatchId = 
Long.parseLong(lastCheckpointStr.get())) != Long.MIN_VALUE) {
         nextBatchId = lastBatchId + 1;
       } else {
-        lastBatchId = 0;
-        nextBatchId = 1;
+        lastBatchId = 0L;
+        nextBatchId = 1L;
       }
 
       // obtain all eligible files for the batch
@@ -76,20 +76,22 @@ public class DFSTestSuitePathSelector extends 
DFSPathSelector {
       // Say input data is as follow input/1, input/2, input/5 since 3,4 was 
rolled back and 5 is new generated data
       // checkpoint from the latest commit metadata will be 2 since 3,4 has 
been rolled back. We need to set the
       // next batch id correctly as 5 instead of 3
+      final long fLastBatchId = lastBatchId;
       Option<String> correctBatchIdDueToRollback = 
Option.fromJavaOptional(Arrays.stream(fileStatuses)
           .map(f -> 
f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
-          .filter(bid1 -> Integer.parseInt(bid1) > lastBatchId)
-          .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1), 
Integer.parseInt(bid2))));
-      if (correctBatchIdDueToRollback.isPresent() && 
Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
-        nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get());
+          .filter(bid1 -> Long.parseLong(bid1) > fLastBatchId)
+          .min(Comparator.comparingLong(Long::parseLong)));
+      if (correctBatchIdDueToRollback.isPresent() && 
Long.parseLong(correctBatchIdDueToRollback.get()) > nextBatchId) {
+        nextBatchId = Long.parseLong(correctBatchIdDueToRollback.get());
       }
       log.info("Using DFSTestSuitePathSelector, checkpoint: " + 
lastCheckpointStr + " sourceLimit: " + sourceLimit
           + " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
+      log.info("Using DFSTestSuitePathSelector, all input files: " + 
Arrays.toString(fileStatuses));
       for (FileStatus fileStatus : fileStatuses) {
         if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
             .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
           continue;
-        } else if (Integer.parseInt(fileStatus.getPath().getName()) > 
lastBatchId && Integer.parseInt(fileStatus.getPath()
+        } else if (Long.parseLong(fileStatus.getPath().getName()) > 
lastBatchId && Long.parseLong(fileStatus.getPath()
             .getName()) <= nextBatchId) {
           RemoteIterator<LocatedFileStatus> files = 
fs.listFiles(fileStatus.getPath(), true);
           while (files.hasNext()) {
@@ -98,6 +100,7 @@ public class DFSTestSuitePathSelector extends 
DFSPathSelector {
         }
       }
 
+      log.info("Using DFSTestSuitePathSelector, eligible files: " + 
eligibleFiles);
       // no data to readAvro
       if (eligibleFiles.size() == 0) {
         return new ImmutablePair<>(Option.empty(),

Reply via email to