This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1e162bb73a HUDI-4687 add show_invalid_parquet procedure (#6480)
1e162bb73a is described below
commit 1e162bb73af9f39024e6e5d098958b9c0d926e6a
Author: shaoxiong.zhan <[email protected]>
AuthorDate: Wed Aug 24 19:28:26 2022 +0800
HUDI-4687 add show_invalid_parquet procedure (#6480)
Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
---
.../hudi/command/procedures/HoodieProcedures.scala | 1 +
.../procedures/ShowInvalidParquetProcedure.scala | 83 ++++++++++++++++++++++
.../TestShowInvalidParquetProcedure.scala | 71 ++++++++++++++++++
3 files changed, 155 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b245b54f61..49c88e5cd6 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -82,6 +82,7 @@ object HoodieProcedures {
mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME,
RepairOverwriteHoodiePropsProcedure.builder)
mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder)
mapBuilder.put(ValidateHoodieSyncProcedure.NAME,
ValidateHoodieSyncProcedure.builder)
+ mapBuilder.put(ShowInvalidParquetProcedure.NAME,
ShowInvalidParquetProcedure.builder)
mapBuilder.build
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
new file mode 100644
index 0000000000..11d170bbed
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+
+class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("path", DataTypes.StringType, nullable = true, Metadata.empty))
+ )
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val srcPath = getArgValueOrDefault(args,
PARAMETERS(0)).get.asInstanceOf[String]
+ val partitionPaths: java.util.List[String] =
FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false,
false)
+ val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths,
partitionPaths.size())
+ val serHadoopConf = new
SerializableConfiguration(jsc.hadoopConfiguration())
+ javaRdd.rdd.map(part => {
+ val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
+ FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath,
part))
+ }).flatMap(_.toList)
+ .filter(status => {
+ val filePath = status.getPath
+ var isInvalid = false
+ if (filePath.toString.endsWith(".parquet")) {
+ try ParquetFileReader.readFooter(serHadoopConf.get(), filePath,
SKIP_ROW_GROUPS).getFileMetaData catch {
+ case e: Exception =>
+ isInvalid = e.getMessage.contains("is not a Parquet file")
+ }
+ }
+ isInvalid
+ })
+ .map(status => Row(status.getPath.toString))
+ .collect()
+ }
+
+ override def build = new ShowInvalidParquetProcedure()
+}
+
+object ShowInvalidParquetProcedure {
+ val NAME = "show_invalid_parquet"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get(): ProcedureBuilder = new ShowInvalidParquetProcedure()
+ }
+}
+
+
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
new file mode 100644
index 0000000000..4d0c9c7b34
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+
+class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
+ test("Test Call show_invalid_parquet Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | partitioned by (ts)
+ | location '$basePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // insert data to table
+ spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+ spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+ // Check required fields
+ checkExceptionContain(s"""call show_invalid_parquet(limit => 10)""")(
+ s"Argument: path is required")
+
+ val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ val invalidPath1 = new Path(basePath, "ts=1000/1.parquet")
+ val out1 = fs.create(invalidPath1)
+ out1.write(1)
+ out1.close()
+
+ val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
+ val out2 = fs.create(invalidPath2)
+ out2.write(1)
+ out2.close()
+
+ // collect result for table
+ val result = spark.sql(
+ s"""call show_invalid_parquet(path =>
'$basePath')""".stripMargin).collect()
+ assertResult(2) {
+ result.length
+ }
+ }
+ }
+}