This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new bd0f08ee04c3 [MINOR] Minor improvements to DFSTestSuitePathSelector
for easier debugging (#18244)
bd0f08ee04c3 is described below
commit bd0f08ee04c3be5a0898288d9e3c07c72716b097
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Mar 13 07:49:13 2026 +0530
[MINOR] Minor improvements to DFSTestSuitePathSelector for easier debugging
(#18244)
---------
Co-authored-by: Lokesh Jain <[email protected]>
---
.../test-suite/deltastreamer-async-clustering.yaml | 49 ++++++++++++++++++++
.../test-suite/deltastreamer-async-compaction.yaml | 53 ++++++++++++++++++++++
docker/setup_demo.sh | 6 +--
hudi-integ-test/README.md | 4 +-
.../helpers/DFSTestSuitePathSelector.java | 25 +++++-----
5 files changed, 122 insertions(+), 15 deletions(-)
diff --git a/docker/demo/config/test-suite/deltastreamer-async-clustering.yaml
b/docker/demo/config/test-suite/deltastreamer-async-clustering.yaml
new file mode 100644
index 000000000000..3239932995f7
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-async-clustering.yaml
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: deltastreamer-async-clustering.yaml
+dag_rounds: 6
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_bulk_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 5
+ num_records_insert: 1000
+ type: BulkInsertNode
+ deps: none
+ second_bulk_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 5
+ num_records_insert: 1000
+ deps: first_bulk_insert
+ type: BulkInsertNode
+ third_bulk_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 5
+ num_records_insert: 300
+ deps: second_bulk_insert
+ type: BulkInsertNode
+ first_validate:
+ config:
+ validate_hive: false
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
+ type: ValidateDatasetNode
+ deps: third_bulk_insert
\ No newline at end of file
diff --git a/docker/demo/config/test-suite/deltastreamer-async-compaction.yaml
b/docker/demo/config/test-suite/deltastreamer-async-compaction.yaml
new file mode 100644
index 000000000000..7ea769b148f9
--- /dev/null
+++ b/docker/demo/config/test-suite/deltastreamer-async-compaction.yaml
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: deltastreamer-async-clustering.yaml
+dag_rounds: 6
+dag_intermittent_delay_mins: 1
+dag_content:
+ first_insert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ repeat_count: 5
+ num_records_insert: 10000
+ type: BulkInsertNode
+ deps: none
+ first_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ num_records_insert: 300
+ repeat_count: 5
+ num_records_upsert: 2000
+ num_partitions_upsert: 5
+ type: UpsertNode
+ deps: first_insert
+ second_upsert:
+ config:
+ record_size: 1000
+ num_partitions_insert: 5
+ num_records_insert: 300
+ repeat_count: 5
+ num_records_upsert: 2000
+ num_partitions_upsert: 5
+ type: UpsertNode
+ deps: first_upsert
+ first_validate:
+ config:
+ validate_hive: false
+ max_wait_time_for_deltastreamer_catch_up_ms: 600000
+ type: ValidateDatasetNode
+ deps: second_upsert
\ No newline at end of file
diff --git a/docker/setup_demo.sh b/docker/setup_demo.sh
index d183086d26c7..79ab01f6deba 100755
--- a/docker/setup_demo.sh
+++ b/docker/setup_demo.sh
@@ -24,13 +24,13 @@ if [ "$HUDI_DEMO_ENV" = "--mac-aarch64" ]; then
COMPOSE_FILE_NAME="docker-compose_hadoop284_hive233_spark244_mac_aarch64.yml"
fi
# restart cluster
-HUDI_WS=${WS_ROOT} docker compose down -f
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME}
+HUDI_WS=${WS_ROOT} docker compose -f
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} down
if [ "$HUDI_DEMO_ENV" != "dev" ]; then
echo "Pulling docker demo images ..."
- HUDI_WS=${WS_ROOT} docker compose -f
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
+ HUDI_WS=${WS_ROOT} docker compose -f
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} pull
fi
sleep 5
-HUDI_WS=${WS_ROOT} docker compose up -f
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} -d
+HUDI_WS=${WS_ROOT} docker compose -f
${SCRIPT_PATH}/compose/${COMPOSE_FILE_NAME} up -d
sleep 15
docker exec -it adhoc-1 /bin/bash
/var/hoodie/ws/docker/demo/setup_demo_container.sh
diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md
index c64a1b12f4eb..410dc6fab712 100644
--- a/hudi-integ-test/README.md
+++ b/hudi-integ-test/README.md
@@ -698,7 +698,9 @@ Here is the full command:
--min-sync-interval-seconds 20
```
-We can use any yaml and properties file w/ above spark-submit command to test
deltastreamer w/ async table services.
+We can use any yaml and properties file w/ above spark-submit command to test
deltastreamer w/ async table services.
+For running async clustering with deltastreamer, following property can be
set: hoodie.clustering.async.enabled=true.
+Similarly for running async compaction, set hoodie.parquet.small.file.limit=0
so that log files are generated.
## Automated tests for N no of yamls in Local Docker environment
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
index 70026aa5f7fb..1ea145664742 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@@ -58,15 +59,14 @@ public class DFSTestSuitePathSelector extends
DFSPathSelector {
public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(
Option<String> lastCheckpointStr, long sourceLimit) {
- Integer lastBatchId;
- Integer nextBatchId;
+ Long lastBatchId;
+ Long nextBatchId;
try {
- if (lastCheckpointStr.isPresent()) {
- lastBatchId = Integer.parseInt(lastCheckpointStr.get());
+ if (lastCheckpointStr.isPresent() && (lastBatchId =
Long.parseLong(lastCheckpointStr.get())) != Long.MIN_VALUE) {
nextBatchId = lastBatchId + 1;
} else {
- lastBatchId = 0;
- nextBatchId = 1;
+ lastBatchId = 0L;
+ nextBatchId = 1L;
}
// obtain all eligible files for the batch
@@ -76,20 +76,22 @@ public class DFSTestSuitePathSelector extends
DFSPathSelector {
// Say input data is as follow input/1, input/2, input/5 since 3,4 was
rolled back and 5 is new generated data
// checkpoint from the latest commit metadata will be 2 since 3,4 has
been rolled back. We need to set the
// next batch id correctly as 5 instead of 3
+ final long fLastBatchId = lastBatchId;
Option<String> correctBatchIdDueToRollback =
Option.fromJavaOptional(Arrays.stream(fileStatuses)
.map(f ->
f.getPath().toString().split("/")[f.getPath().toString().split("/").length - 1])
- .filter(bid1 -> Integer.parseInt(bid1) > lastBatchId)
- .min((bid1, bid2) -> Integer.min(Integer.parseInt(bid1),
Integer.parseInt(bid2))));
- if (correctBatchIdDueToRollback.isPresent() &&
Integer.parseInt(correctBatchIdDueToRollback.get()) > nextBatchId) {
- nextBatchId = Integer.parseInt(correctBatchIdDueToRollback.get());
+ .filter(bid1 -> Long.parseLong(bid1) > fLastBatchId)
+ .min(Comparator.comparingLong(Long::parseLong)));
+ if (correctBatchIdDueToRollback.isPresent() &&
Long.parseLong(correctBatchIdDueToRollback.get()) > nextBatchId) {
+ nextBatchId = Long.parseLong(correctBatchIdDueToRollback.get());
}
log.info("Using DFSTestSuitePathSelector, checkpoint: " +
lastCheckpointStr + " sourceLimit: " + sourceLimit
+ " lastBatchId: " + lastBatchId + " nextBatchId: " + nextBatchId);
+ log.info("Using DFSTestSuitePathSelector, all input files: " +
Arrays.toString(fileStatuses));
for (FileStatus fileStatus : fileStatuses) {
if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
- } else if (Integer.parseInt(fileStatus.getPath().getName()) >
lastBatchId && Integer.parseInt(fileStatus.getPath()
+ } else if (Long.parseLong(fileStatus.getPath().getName()) >
lastBatchId && Long.parseLong(fileStatus.getPath()
.getName()) <= nextBatchId) {
RemoteIterator<LocatedFileStatus> files =
fs.listFiles(fileStatus.getPath(), true);
while (files.hasNext()) {
@@ -98,6 +100,7 @@ public class DFSTestSuitePathSelector extends
DFSPathSelector {
}
}
+ log.info("Using DFSTestSuitePathSelector, eligible files: " +
eligibleFiles);
// no data to readAvro
if (eligibleFiles.size() == 0) {
return new ImmutablePair<>(Option.empty(),