This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1469469d6e [HUDI-5111] Improve integration test coverage (#7092)
1469469d6e is described below
commit 1469469d6ecaeaa7d960676c305383e7e31fcd43
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed Nov 9 09:56:14 2022 -0800
[HUDI-5111] Improve integration test coverage (#7092)
Co-authored-by: Raymond Xu <[email protected]>
---
.../deltastreamer-immutable-dataset.yaml | 14 ++++++-
...treamer-long-running-multi-partitions-hive.yaml | 6 +--
...mer-long-running-multi-partitions-metadata.yaml | 18 +++++++--
...eltastreamer-long-running-multi-partitions.yaml | 18 +++++++--
.../deltastreamer-medium-clustering.yaml | 19 ++++++++--
...ltastreamer-medium-full-dataset-validation.yaml | 18 +++++++--
.../test-suite/deltastreamer-non-partitioned.yaml | 14 ++++++-
.../detlastreamer-long-running-example.yaml | 18 +++++++--
.../demo/config/test-suite/simple-clustering.yaml | 16 +++++++-
.../config/test-suite/simple-deltastreamer.yaml | 12 ++++++
.../config/test-suite/spark-immutable-dataset.yaml | 14 ++++++-
.../spark-long-running-non-partitioned.yaml | 12 ++++++
.../demo/config/test-suite/spark-long-running.yaml | 16 +++++++-
.../config/test-suite/spark-medium-clustering.yaml | 16 +++++++-
docker/demo/config/test-suite/spark-simple.yaml | 14 ++++++-
.../hudi/integ/testsuite/HoodieTestSuiteJob.java | 7 ++++
.../integ/testsuite/dag/nodes/PrestoQueryNode.java | 44 +++++++++++++---------
.../testsuite/dag/nodes/ValidateDatasetNode.java | 13 ++++---
.../testsuite/dag/nodes/SparkInsertNode.scala | 1 +
19 files changed, 238 insertions(+), 52 deletions(-)
diff --git a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
index 4903e3650c..a19617ef13 100644
--- a/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-immutable-dataset.yaml
@@ -45,9 +45,21 @@ dag_content:
delete_input_data: false
type: ValidateDatasetNode
deps: first_insert
+ first_presto_query:
+ config:
+ execute_itr_count: 5
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 30000
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
execute_itr_count: 5
delete_input_data: true
type: ValidateAsyncOperations
- deps: second_validate
\ No newline at end of file
+ deps: first_presto_query
\ No newline at end of file
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
index 8b82415982..6e94b05a69 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 50
+dag_rounds: 20
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -71,7 +71,7 @@ dag_content:
deps: first_delete
second_validate:
config:
- validate_once_every_itr : 5
+ execute_itr_count: 20
validate_hive: true
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
@@ -79,7 +79,7 @@ dag_content:
deps: second_hive_sync
last_validate:
config:
- execute_itr_count: 50
+ execute_itr_count: 20
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
deps: second_validate
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
index 031664cd15..9ba6993e1d 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-metadata.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 30
+dag_rounds: 20
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -65,9 +65,21 @@ dag_content:
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 20
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 7600
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
- execute_itr_count: 30
+ execute_itr_count: 20
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
index c23775b2ce..9ba6993e1d 100644
---
a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 50
+dag_rounds: 20
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -65,9 +65,21 @@ dag_content:
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 20
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 7600
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
- execute_itr_count: 50
+ execute_itr_count: 20
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
index 2fc68596d8..b020792032 100644
--- a/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-medium-clustering.yaml
@@ -17,7 +17,7 @@
# to be used with test-aggressive-clean-archival.properties
dag_name: deltastreamer-medium-clustering.yaml
-dag_rounds: 20
+dag_rounds: 15
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -62,14 +62,27 @@ dag_content:
deps: first_upsert
second_validate:
config:
+ validate_once_every_itr: 3
validate_hive: false
delete_input_data: true
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 15
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 3600
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
- execute_itr_count: 20
+ execute_itr_count: 15
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
index db7edb8f8f..5636402991 100644
---
a/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
+++
b/docker/demo/config/test-suite/deltastreamer-medium-full-dataset-validation.yaml
@@ -17,7 +17,7 @@
# to be used with test-aggressive-clean-archival.properties
dag_name: deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 20
+dag_rounds: 15
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -68,9 +68,21 @@ dag_content:
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 15
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 3600
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
- execute_itr_count: 20
+ execute_itr_count: 15
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git a/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
b/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
index a8be72e108..8d42eea877 100644
--- a/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
+++ b/docker/demo/config/test-suite/deltastreamer-non-partitioned.yaml
@@ -56,8 +56,20 @@ dag_content:
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 6
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 11000
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
execute_itr_count: 6
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git
a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
index 102807ec43..4fefcc497d 100644
--- a/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
+++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: detlastreamer-long-running-example.yaml
-dag_rounds: 50
+dag_rounds: 20
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
@@ -65,9 +65,21 @@ dag_content:
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 20
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 3600
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
- execute_itr_count: 50
+ execute_itr_count: 20
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git a/docker/demo/config/test-suite/simple-clustering.yaml
b/docker/demo/config/test-suite/simple-clustering.yaml
index 4ede6394cf..96f741ecc5 100644
--- a/docker/demo/config/test-suite/simple-clustering.yaml
+++ b/docker/demo/config/test-suite/simple-clustering.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: simple-clustering.yaml
-dag_rounds: 30
+dag_rounds: 15
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
@@ -54,7 +54,7 @@ dag_content:
deps: first_delete
first_cluster:
config:
- execute_itr_count: 25
+ execute_itr_count: 10
type: ClusteringNode
deps: first_validate
second_validate:
@@ -62,3 +62,15 @@ dag_content:
validate_hive: false
type: ValidateDatasetNode
deps: first_cluster
+ first_presto_query:
+ config:
+ validate_once_every_itr: 5
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 8300
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/simple-deltastreamer.yaml
b/docker/demo/config/test-suite/simple-deltastreamer.yaml
index 11b7f17d34..1215b337c8 100644
--- a/docker/demo/config/test-suite/simple-deltastreamer.yaml
+++ b/docker/demo/config/test-suite/simple-deltastreamer.yaml
@@ -68,3 +68,15 @@ dag_content:
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ validate_once_every_itr: 3
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 9600
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
diff --git a/docker/demo/config/test-suite/spark-immutable-dataset.yaml
b/docker/demo/config/test-suite/spark-immutable-dataset.yaml
index d6cbf1b244..b609f3dc08 100644
--- a/docker/demo/config/test-suite/spark-immutable-dataset.yaml
+++ b/docker/demo/config/test-suite/spark-immutable-dataset.yaml
@@ -45,9 +45,21 @@ dag_content:
delete_input_data: false
type: ValidateDatasetNode
deps: first_insert
+ first_presto_query:
+ config:
+ execute_itr_count: 5
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 48000
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
execute_itr_count: 5
delete_input_data: true
type: ValidateAsyncOperations
- deps: second_validate
\ No newline at end of file
+ deps: first_presto_query
\ No newline at end of file
diff --git
a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
index 947bbdab86..693d7bf227 100644
--- a/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
+++ b/docker/demo/config/test-suite/spark-long-running-non-partitioned.yaml
@@ -48,6 +48,18 @@ dag_content:
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 6
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 6000
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
execute_itr_count: 6
diff --git a/docker/demo/config/test-suite/spark-long-running.yaml
b/docker/demo/config/test-suite/spark-long-running.yaml
index 2ffef55781..52aeb92a7f 100644
--- a/docker/demo/config/test-suite/spark-long-running.yaml
+++ b/docker/demo/config/test-suite/spark-long-running.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-spark-deltastreamer-long-running-multi-partitions.yaml
-dag_rounds: 30
+dag_rounds: 20
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
@@ -49,9 +49,21 @@ dag_content:
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 30
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 189000
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
execute_itr_count: 30
max_wait_time_for_deltastreamer_catch_up_ms: 600000
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git a/docker/demo/config/test-suite/spark-medium-clustering.yaml
b/docker/demo/config/test-suite/spark-medium-clustering.yaml
index 09537a23d5..3045f7c4b9 100644
--- a/docker/demo/config/test-suite/spark-medium-clustering.yaml
+++ b/docker/demo/config/test-suite/spark-medium-clustering.yaml
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: spark-medium-clustering.yaml
-dag_rounds: 20
+dag_rounds: 15
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
@@ -52,8 +52,20 @@ dag_content:
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 20
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 146000
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
last_validate:
config:
execute_itr_count: 20
type: ValidateAsyncOperations
- deps: second_validate
+ deps: first_presto_query
diff --git a/docker/demo/config/test-suite/spark-simple.yaml
b/docker/demo/config/test-suite/spark-simple.yaml
index 192adcf377..ebd1cd2d4d 100644
--- a/docker/demo/config/test-suite/spark-simple.yaml
+++ b/docker/demo/config/test-suite/spark-simple.yaml
@@ -51,4 +51,16 @@ dag_content:
validate_hive: false
delete_input_data: false
type: ValidateDatasetNode
- deps: first_delete
\ No newline at end of file
+ deps: first_delete
+ first_presto_query:
+ config:
+ execute_itr_count: 1
+ presto_props:
+ prop1: "SET SESSION hive.parquet_use_column_names = true"
+ presto_queries:
+ query1: "select count(*) from testdb.table1"
+ result1: 120
+ query2: "select count(*) from testdb.table1 group by _row_key having
count(*) > 1"
+ result2: 0
+ type: PrestoQueryNode
+ deps: second_validate
\ No newline at end of file
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
index e7e28d3dec..b911f116d3 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java
@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -319,6 +320,9 @@ public class HoodieTestSuiteJob {
@Parameter(names = {"--test-continuous-mode"}, description = "Tests
continuous mode in deltastreamer.")
public Boolean testContinousMode = false;
+ @Parameter(names = {"--enable-presto-validation"}, description = "Enables
presto validation")
+ public Boolean enablePrestoValidation = false;
+
@Parameter(names = {"--presto-jdbc-url"}, description = "Presto JDBC URL
in the format jdbc:presto://<host>:<port>/<catalog>/<schema> "
+ "e.g. URL to connect to Presto running on localhost port 8080 with
the catalog `hive` and the schema `sales`: "
+ "jdbc:presto://localhost:8080/hive/sales")
@@ -343,5 +347,8 @@ public class HoodieTestSuiteJob {
@Parameter(names = {"--index-type"}, description = "Index type to use for
writes")
public String indexType = "SIMPLE";
+
+ @Parameter(names = {"--enable-metadata-on-read"}, description = "Enable's
metadata for queries")
+ public Boolean enableMetadataOnRead =
HoodieMetadataConfig.ENABLE.defaultValue();
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
index 94f0a51a4d..45f087717c 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/PrestoQueryNode.java
@@ -35,25 +35,33 @@ public class PrestoQueryNode extends BaseQueryNode {
@Override
public void execute(ExecutionContext context, int curItrCount) throws
Exception {
- log.info("Executing presto query node {}", this.getName());
- String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
- if (StringUtils.isNullOrEmpty(url)) {
- throw new IllegalArgumentException("Presto JDBC connection url not
provided. Please set --presto-jdbc-url.");
+ if (!context.getHoodieTestSuiteWriter().getCfg().enablePrestoValidation) {
+ return;
}
- String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
- String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
- try {
- Class.forName("com.facebook.presto.jdbc.PrestoDriver");
- } catch (ClassNotFoundException e) {
- throw new HoodieValidationException("Presto query validation failed due
to " + e.getMessage(), e);
- }
- try (Connection connection = DriverManager.getConnection(url, user, pass))
{
- Statement stmt = connection.createStatement();
- setSessionProperties(this.config.getPrestoProperties(), stmt);
- executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
- stmt.close();
- } catch (Exception e) {
- throw new HoodieValidationException("Presto query validation failed due
to " + e.getMessage(), e);
+ int validateOnceEveryItr = config.validateOnceEveryIteration();
+ int itrCountToExecute = config.getIterationCountToExecute();
+ if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount)
+ || (itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr)
== 0))) {
+ log.info("Executing presto query node {}", this.getName());
+ String url = context.getHoodieTestSuiteWriter().getCfg().prestoJdbcUrl;
+ if (StringUtils.isNullOrEmpty(url)) {
+ throw new IllegalArgumentException("Presto JDBC connection url not
provided. Please set --presto-jdbc-url.");
+ }
+ String user = context.getHoodieTestSuiteWriter().getCfg().prestoUsername;
+ String pass = context.getHoodieTestSuiteWriter().getCfg().prestoPassword;
+ try {
+ Class.forName("com.facebook.presto.jdbc.PrestoDriver");
+ } catch (ClassNotFoundException e) {
+ throw new HoodieValidationException("Presto query validation failed
due to " + e.getMessage(), e);
+ }
+ try (Connection connection = DriverManager.getConnection(url, user,
pass)) {
+ Statement stmt = connection.createStatement();
+ setSessionProperties(this.config.getPrestoProperties(), stmt);
+ executeAndValidateQueries(this.config.getPrestoQueries(), stmt);
+ stmt.close();
+ } catch (Exception e) {
+ throw new HoodieValidationException("Presto query validation failed
due to " + e.getMessage(), e);
+ }
}
}
}
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
index 358abb36f9..bd50616d14 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java
@@ -18,11 +18,12 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
+import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.DataSourceWriteOptions;
-import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -35,15 +36,15 @@ import org.slf4j.LoggerFactory;
*/
public class ValidateDatasetNode extends BaseValidateDatasetNode {
- private static Logger log =
LoggerFactory.getLogger(ValidateDatasetNode.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ValidateDatasetNode.class);
- public ValidateDatasetNode(Config config) {
+ public ValidateDatasetNode(DeltaConfig.Config config) {
super(config);
}
@Override
public Logger getLogger() {
- return log;
+ return LOG;
}
@Override
@@ -51,7 +52,7 @@ public class ValidateDatasetNode extends
BaseValidateDatasetNode {
StructType inputSchema) {
String partitionPathField =
context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
String hudiPath =
context.getHoodieTestSuiteWriter().getCfg().targetBasePath +
(partitionPathField.isEmpty() ? "/" : "/*/*/*");
- Dataset<Row> hudiDf =
session.read().option(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(config.isEnableMetadataValidate()))
+ Dataset<Row> hudiDf =
session.read().option(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(context.getHoodieTestSuiteWriter().getCfg().enableMetadataOnRead))
.format("hudi").load(hudiPath);
return
hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index 38751bda5a..6c1d39e2f6 100644
---
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -72,6 +72,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends
DagNode[RDD[WriteStatus]] {
.option(DataSourceWriteOptions.TABLE_TYPE.key,
context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(HoodieIndexConfig.INDEX_TYPE.key,
context.getHoodieTestSuiteWriter.getCfg.indexType)
.option(DataSourceWriteOptions.OPERATION.key, getOperation())
+ .option(HoodieIndexConfig.INDEX_TYPE.key,
context.getHoodieTestSuiteWriter.getCfg.indexType)
.option(HoodieWriteConfig.TBL_NAME.key,
context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)