This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f5e487445 [HUDI-5492] spark call command 'show_compaction' doesn't
return the completed compaction (#7593)
2f5e487445 is described below
commit 2f5e487445a19e2fa5b2c26143efeb4fe9aeda63
Author: wangkang <[email protected]>
AuthorDate: Thu Jan 5 09:43:45 2023 +0800
[HUDI-5492] spark call command 'show_compaction' doesn't return the
completed compaction (#7593)
Co-authored-by: kandy01.wang <[email protected]>
---
.../procedures/ShowCompactionProcedure.scala | 2 +-
.../spark/sql/hudi/TestCompactionTable.scala | 6 ++--
.../hudi/procedure/TestCompactionProcedure.scala | 38 +++++++++++++++++++---
3 files changed, 38 insertions(+), 8 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
index 7a7bb2cf9d..1076b9fc44 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
@@ -65,7 +65,7 @@ class ShowCompactionProcedure extends BaseProcedure with
ProcedureBuilder with S
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.")
val compactionInstants =
metaClient.getActiveTimeline.getInstants.iterator().asScala
- .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+ .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION ||
p.getAction == HoodieTimeline.COMMIT_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index 780d1aad1a..a51912b5fe 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -58,7 +58,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
- assertResult(1)(spark.sql(s"show compaction on
$tableName").collect().length)
+ assertResult(2)(spark.sql(s"show compaction on
$tableName").collect().length)
spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
@@ -66,7 +66,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"show compaction on
$tableName").collect().length)
+ assertResult(2)(spark.sql(s"show compaction on
$tableName").collect().length)
})
}
@@ -119,7 +119,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"show compaction on
'${tmp.getCanonicalPath}'").collect().length)
+ assertResult(2)(spark.sql(s"show compaction on
'${tmp.getCanonicalPath}'").collect().length)
checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath},
Available pending compaction instants are: "
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index e9d9d550d3..236d87970d 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -88,8 +88,7 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
val resultC = spark.sql(s"call show_compaction('$tableName')")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
- assertResult(1)(resultC.length)
- assertResult(resultA)(resultC)
+ assertResult(2)(resultC.length)
checkAnswer(s"call run_compaction(op => 'run', table => '$tableName',
timestamp => ${timestamps(0)})")(
Seq(resultA(0).head, resultA(0)(1),
HoodieInstant.State.COMPLETED.name())
@@ -100,7 +99,7 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"call show_compaction(table =>
'$tableName')").collect().length)
+ assertResult(2)(spark.sql(s"call show_compaction(table =>
'$tableName')").collect().length)
}
}
@@ -168,11 +167,42 @@ class TestCompactionProcedure extends
HoodieSparkProcedureTestBase {
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}')").collect().length)
+ assertResult(2)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}')").collect().length)
checkException(s"call run_compaction(op => 'run', path =>
'${tmp.getCanonicalPath}', timestamp => 12345L)")(
s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath},
Available pending compaction instants are: "
)
}
}
+ test("Test show_compaction Procedure by Path") {
+ withTempDir { tmp =>
+ val tableName1 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName1 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.compact.inline ='true',
+ | hoodie.compact.inline.max.delta.commits ='2'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName1'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
+
+ assertResult(2)(spark.sql(s"call show_compaction(path =>
'${tmp.getCanonicalPath}/$tableName1')").collect().length)
+ }
+ }
}