This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1469469d6e [HUDI-5111] Improve integration test coverage (#7092)
1469469d6e is described below

commit 1469469d6ecaeaa7d960676c305383e7e31fcd43
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Nov 9 09:56:14 2022 -0800

    [HUDI-5111] Improve integration test coverage (#7092)
    
    
    
    Co-authored-by: Raymond Xu <[email protected]>
---
 .../deltastreamer-immutable-dataset.yaml           | 14 ++++++-
 ...treamer-long-running-multi-partitions-hive.yaml |  6 +--
 ...mer-long-running-multi-partitions-metadata.yaml | 18 +++++++--
 ...eltastreamer-long-running-multi-partitions.yaml | 18 +++++++--
 .../deltastreamer-medium-clustering.yaml           | 19 ++++++++--
 ...ltastreamer-medium-full-dataset-validation.yaml | 18 +++++++--
 .../test-suite/deltastreamer-non-partitioned.yaml  | 14 ++++++-
 .../detlastreamer-long-running-example.yaml        | 18 +++++++--
 .../demo/config/test-suite/simple-clustering.yaml  | 16 +++++++-
 .../config/test-suite/simple-deltastreamer.yaml    | 12 ++++++
 .../config/test-suite/spark-immutable-dataset.yaml | 14 ++++++-
 .../spark-long-running-non-partitioned.yaml        | 12 ++++++
 .../demo/config/test-suite/spark-long-running.yaml | 16 +++++++-
 .../config/test-suite/spark-medium-clustering.yaml | 16 +++++++-
 docker/demo/config/test-suite/spark-simple.yaml    | 14 ++++++-
 .../hudi/integ/testsuite/HoodieTestSuiteJob.java   |  7 ++++
 .../integ/testsuite/dag/nodes/PrestoQueryNode.java | 44 +++++++++++++---------
 .../testsuite/dag/nodes/ValidateDatasetNode.java   | 13 ++++---
 .../testsuite/dag/nodes/SparkInsertNode.scala      |  1 +
 19 files changed, 238 insertions(+), 52 deletions(-)

diff --git a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml 
b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
index 4903e3650c..a19617ef13 100644
--- a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
@@ -45,9 +45,21 @@ dag_content:
       delete_input_data: false
     type: ValidateDatasetNode
     deps: first_insert
+  first_presto_query:
+    config:
+      execute_itr_count: 5
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 30000
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
       execute_itr_count: 5
       delete_input_data: true
     type: ValidateAsyncOperations
-    deps: second_validate
\ No newline at end of file
+    deps: first_presto_query
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 8b82415982..6e94b05a69 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 50
+dag_rounds: 20
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
@@ -71,7 +71,7 @@ dag_content:
     deps: first_delete
   second_validate:
     config:
-      validate_once_every_itr : 5
+      execute_itr_count: 20
       validate_hive: true
       delete_input_data: true
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
@@ -79,7 +79,7 @@ dag_content:
     deps: second_hive_sync
   last_validate:
     config:
-      execute_itr_count: 50
+      execute_itr_count: 20
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
     deps: second_validate
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
index 031664cd15..9ba6993e1d 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 30
+dag_rounds: 20
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
@@ -65,9 +65,21 @@ dag_content:
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 20
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 7600
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
-      execute_itr_count: 30
+      execute_itr_count: 20
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index c23775b2ce..9ba6993e1d 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 50
+dag_rounds: 20
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
@@ -65,9 +65,21 @@ dag_content:
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 20
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 7600
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
-      execute_itr_count: 50
+      execute_itr_count: 20
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml 
b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
index 2fc68596d8..b020792032 100644
--- a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
@@ -17,7 +17,7 @@
 # to be used with test-aggressive-clean-archival.properties
 
 dag_name: deltastreamer-medium-clustering.yaml
-dag_rounds: 20
+dag_rounds: 15
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
@@ -62,14 +62,27 @@ dag_content:
     deps: first_upsert
   second_validate:
     config:
+      validate_once_every_itr: 3
       validate_hive: false
       delete_input_data: true
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 15
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 3600
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
-      execute_itr_count: 20
+      execute_itr_count: 15
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git 
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
 
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index db7edb8f8f..5636402991 100644
--- 
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++ 
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -17,7 +17,7 @@
 # to be used with test-aggressive-clean-archival.properties
 
 dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 20
+dag_rounds: 15
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
@@ -68,9 +68,21 @@ dag_content:
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 15
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 3600
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
-      execute_itr_count: 20
+      execute_itr_count: 15
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git a/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml 
b/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
index a8be72e108..8d42eea877 100644
--- a/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
@@ -56,8 +56,20 @@ dag_content:
       delete_input_data: true
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 6
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 11000
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
       execute_itr_count: 6
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git 
a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml 
b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 102807ec43..4fefcc497d 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: detlastreamer-long-running-example.yaml
-dag_rounds: 50
+dag_rounds: 20
 dag_intermittent_delay_mins: 1
 dag_content:
   first_insert:
@@ -65,9 +65,21 @@ dag_content:
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 20
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 3600
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
-      execute_itr_count: 50
+      execute_itr_count: 20
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git a/docker/demo/config/test-suite/simple-clustering.yaml 
b/docker/demo/config/test-suite/simple-clustering.yaml
index 4ede6394cf..96f741ecc5 100644
--- a/docker/demo/config/test-suite/simple-clustering.yaml
+++ b/docker/demo/config/test-suite/simple-clustering.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: simple-clustering.yaml
-dag_rounds: 30
+dag_rounds: 15
 dag_intermittent_delay_mins: 0
 dag_content:
   first_insert:
@@ -54,7 +54,7 @@ dag_content:
     deps: first_delete
   first_cluster:
     config:
-      execute_itr_count: 25
+      execute_itr_count: 10
     type: ClusteringNode
     deps: first_validate
   second_validate:
@@ -62,3 +62,15 @@ dag_content:
       validate_hive: false
     type: ValidateDatasetNode
     deps: first_cluster
+  first_presto_query:
+    config:
+      validate_once_every_itr: 5
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 8300
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
diff --git a/docker/demo/config/test-suite/simple-deltastreamer.yaml 
b/docker/demo/config/test-suite/simple-deltastreamer.yaml
index 11b7f17d34..1215b337c8 100644
--- a/docker/demo/config/test-suite/simple-deltastreamer.yaml
+++ b/docker/demo/config/test-suite/simple-deltastreamer.yaml
@@ -68,3 +68,15 @@ dag_content:
       delete_input_data: true
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      validate_once_every_itr: 3
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 9600
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
diff --git a/docker/demo/config/test-suite/spark-immutable-dataset.yaml 
b/docker/demo/config/test-suite/spark-immutable-dataset.yaml
index d6cbf1b244..b609f3dc08 100644
--- a/docker/demo/config/test-suite/spark-immutable-dataset.yaml
+++ b/docker/demo/config/test-suite/spark-immutable-dataset.yaml
@@ -45,9 +45,21 @@ dag_content:
       delete_input_data: false
     type: ValidateDatasetNode
     deps: first_insert
+  first_presto_query:
+    config:
+      execute_itr_count: 5
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 48000
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
       execute_itr_count: 5
       delete_input_data: true
     type: ValidateAsyncOperations
-    deps: second_validate
\ No newline at end of file
+    deps: first_presto_query
\ No newline at end of file
diff --git 
a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml 
b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
index 947bbdab86..693d7bf227 100644
--- a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
+++ b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
@@ -48,6 +48,18 @@ dag_content:
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 6
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 6000
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
       execute_itr_count: 6
diff --git a/docker/demo/config/test-suite/spark-long-running.yaml 
b/docker/demo/config/test-suite/spark-long-running.yaml
index 2ffef55781..52aeb92a7f 100644
--- a/docker/demo/config/test-suite/spark-long-running.yaml
+++ b/docker/demo/config/test-suite/spark-long-running.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: cow-spark-deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 30
+dag_rounds: 20
 dag_intermittent_delay_mins: 0
 dag_content:
   first_insert:
@@ -49,9 +49,21 @@ dag_content:
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 30
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 189000
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
       execute_itr_count: 30
       max_wait_time_for_deltastreamer_catch_up_ms: 600000
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git a/docker/demo/config/test-suite/spark-medium-clustering.yaml 
b/docker/demo/config/test-suite/spark-medium-clustering.yaml
index 09537a23d5..3045f7c4b9 100644
--- a/docker/demo/config/test-suite/spark-medium-clustering.yaml
+++ b/docker/demo/config/test-suite/spark-medium-clustering.yaml
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 dag_name: spark-medium-clustering.yaml
-dag_rounds: 20
+dag_rounds: 15
 dag_intermittent_delay_mins: 0
 dag_content:
   first_insert:
@@ -52,8 +52,20 @@ dag_content:
       delete_input_data: true
     type: ValidateDatasetNode
     deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 20
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 146000
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
   last_validate:
     config:
       execute_itr_count: 20
     type: ValidateAsyncOperations
-    deps: second_validate
+    deps: first_presto_query
diff --git a/docker/demo/config/test-suite/spark-simple.yaml 
b/docker/demo/config/test-suite/spark-simple.yaml
index 192adcf377..ebd1cd2d4d 100644
--- a/docker/demo/config/test-suite/spark-simple.yaml
+++ b/docker/demo/config/test-suite/spark-simple.yaml
@@ -51,4 +51,16 @@ dag_content:
       validate_hive: false
       delete_input_data: false
     type: ValidateDatasetNode
-    deps: first_delete
\ No newline at end of file
+    deps: first_delete
+  first_presto_query:
+    config:
+      execute_itr_count: 1
+      presto_props:
+        prop1: "SET SESSION hive.parquet_use_column_names = true"
+      presto_queries:
+        query1: "select count(*) from testdb.table1"
+        result1: 120
+        query2: "select count(*) from testdb.table1 group by _row_key having 
count(*) > 1"
+        result2: 0
+    type: PrestoQueryNode
+    deps: second_validate
\ No newline at end of file
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index e7e28d3dec..b911f116d3 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -319,6 +320,9 @@ public class HoodieTestSuiteJob {
     @Parameter(names = {"--test-continuous-mode"}, description = "Tests 
continuous mode in deltastreamer.")
     public Boolean testContinousMode = false;
 
+    @Parameter(names = {"--enable-presto-validation"}, description = "Enables 
presto validation")
+    public Boolean enablePrestoValidation = false;
+
     @Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL 
in the format jdbc:presto://<host>:<port>/<catalog>/<schema>  "
         + "e.g. URL to connect to Presto running on localhost port 8080 with 
the catalog `hive` and the schema `sales`: "
         + "jdbc:presto://localhost:8080/hive/sales")
@@ -343,5 +347,8 @@ public class HoodieTestSuiteJob {
 
     @Parameter(names = {"--index-type"}, description = "Index type to use for 
writes")
     public String indexType = "SIMPLE";
+
+    @Parameter(names = {"--enable-metadata-on-read"}, description = "Enable's 
metadata for queries")
+    public Boolean enableMetadataOnRead = 
HoodieMetadataConfig.ENABLE.defaultValue();
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
index 94f0a51a4d..45f087717c 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
@@ -35,25 +35,33 @@ public class PrestoQueryNode extends BaseQueryNode {
 
   @Override
   public void execute(ExecutionContext context, int curItrCount) throws 
Exception {
-    log.info("Executing presto query node {}", this.getName());
-    String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
-    if (StringUtils.isNullOrEmpty(url)) {
-      throw new IllegalArgumentException("Presto JDBC connection url not 
provided. Please set --presto-jdbc-url.");
+    if (!context.getHoodieTestSuiteWriter().getCfg().enablePrestoValidation) {
+      return;
     }
-    String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
-    String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
-    try {
-      Class.forName("com.facebook.presto.jdbc.PrestoDriver");
-    } catch (ClassNotFoundException e) {
-      throw new HoodieValidationException("Presto query validation failed due 
to " + e.getMessage(), e);
-    }
-    try (Connection connection = DriverManager.getConnection(url, user, pass)) 
{
-      Statement stmt = connection.createStatement();
-      setSessionProperties(this.config.getPrestoProperties(), stmt);
-      executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
-      stmt.close();
-    } catch (Exception e) {
-      throw new HoodieValidationException("Presto query validation failed due 
to " + e.getMessage(), e);
+    int validateOnceEveryItr = config.validateOnceEveryIteration();
+    int itrCountToExecute = config.getIterationCountToExecute();
+    if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
+        || (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) 
== 0))) {
+      log.info("Executing presto query node {}", this.getName());
+      String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
+      if (StringUtils.isNullOrEmpty(url)) {
+        throw new IllegalArgumentException("Presto JDBC connection url not 
provided. Please set --presto-jdbc-url.");
+      }
+      String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
+      String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
+      try {
+        Class.forName("com.facebook.presto.jdbc.PrestoDriver");
+      } catch (ClassNotFoundException e) {
+        throw new HoodieValidationException("Presto query validation failed 
due to " + e.getMessage(), e);
+      }
+      try (Connection connection = DriverManager.getConnection(url, user, 
pass)) {
+        Statement stmt = connection.createStatement();
+        setSessionProperties(this.config.getPrestoProperties(), stmt);
+        executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
+        stmt.close();
+      } catch (Exception e) {
+        throw new HoodieValidationException("Presto query validation failed 
due to " + e.getMessage(), e);
+      }
     }
   }
 }
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
index 358abb36f9..bd50616d14 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
@@ -18,11 +18,12 @@
 
 package org.apache.hudi.integ.testsuite.dag.nodes;
 
+import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
 import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -35,15 +36,15 @@ import org.slf4j.LoggerFactory;
  */
 public class ValidateDatasetNode extends BaseValidateDatasetNode {
 
-  private static Logger log = 
LoggerFactory.getLogger(ValidateDatasetNode.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ValidateDatasetNode.class);
 
-  public ValidateDatasetNode(Config config) {
+  public ValidateDatasetNode(DeltaConfig.Config config) {
     super(config);
   }
 
   @Override
   public Logger getLogger() {
-    return log;
+    return LOG;
   }
 
   @Override
@@ -51,7 +52,7 @@ public class ValidateDatasetNode extends 
BaseValidateDatasetNode {
                                            StructType inputSchema) {
     String partitionPathField = 
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
     String hudiPath = 
context.getHoodieTestSuiteWriter().getCfg().targetBasePath + 
(partitionPathField.isEmpty() ? "/" : "/*/*/*");
-    Dataset<Row> hudiDf = 
session.read().option(HoodieMetadataConfig.ENABLE.key(), 
String.valueOf(config.isEnableMetadataValidate()))
+    Dataset<Row> hudiDf = 
session.read().option(HoodieMetadataConfig.ENABLE.key(), 
String.valueOf(context.getHoodieTestSuiteWriter().getCfg().enableMetadataOnRead))
         .format("hudi").load(hudiPath);
     return 
hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
             
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
diff --git 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index 38751bda5a..6c1d39e2f6 100644
--- 
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++ 
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -72,6 +72,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends 
DagNode[RDD[WriteStatus]] {
       .option(DataSourceWriteOptions.TABLE_TYPE.key, 
context.getHoodieTestSuiteWriter.getCfg.tableType)
       .option(HoodieIndexConfig.INDEX_TYPE.key, 
context.getHoodieTestSuiteWriter.getCfg.indexType)
       .option(DataSourceWriteOptions.OPERATION.key, getOperation())
+      .option(HoodieIndexConfig.INDEX_TYPE.key, 
context.getHoodieTestSuiteWriter.getCfg.indexType)
       .option(HoodieWriteConfig.TBL_NAME.key, 
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
       .mode(SaveMode.Append)
       .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)

Reply via email to