xiarixiaoyao opened a new pull request #4178:
URL: https://github.com/apache/hudi/pull/4178


   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contribute/how-to-contribute before 
opening a pull request.*
   
   Fixed the bug clustering jobs cannot running in parallel。
   test code:
   import java.sql.{Date, Timestamp}
   
   import org.apache.hadoop.fs.Path
   import org.apache.hudi.config.{HoodieClusteringConfig, HoodieWriteConfig}
   import org.apache.spark.sql.functions.{col, lit}
   import org.apache.spark.sql.{DataFrame, RowFactory, SaveMode, SparkSession}
   import org.apache.spark.sql.types._
   
   import scala.util.Random
   
   object HbaseTest {
   
     val commonOpts = Map(
       "hoodie.insert.shuffle.parallelism" -> "50",
       "hoodie.upsert.shuffle.parallelism" -> "50",
       "hoodie.bulkinsert.shuffle.parallelism" -> "50",
       DataSourceWriteOptions.RECORDKEY_FIELD.key() -> "c1",
       DataSourceWriteOptions.PARTITIONPATH_FIELD.key() -> "p",
       DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> "c2",
       HoodieWriteConfig.TBL_NAME.key -> "cl"
     )
   
   
     def main(args: Array[String]): Unit = {
       val spark = SparkSession
         .builder()
         .appName("Spark SQL data sources example")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .master("local[16]")
         .getOrCreate()
   
       val df = createComplexDataFrame(spark).withColumn("p", lit(1))
         .withColumn("keyid", col("c1")).withColumn("col3", col("c1"))
   
       val path = new Path("/tmp/default/clustering/cl")
       performClustering(df, path.toString)
   
       val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
   
       fs.delete(path)
   
       spark.stop()
   
     }
   
     def createComplexDataFrame(spark: SparkSession): DataFrame = {
       val schema = new StructType()
         .add("c1", IntegerType)
         .add("c11", IntegerType)
         .add("c12", IntegerType)
         .add("c2", StringType)
         .add("c3", DecimalType(38,18))
         .add("c4", TimestampType)
         .add("c5", ShortType)
         .add("c6", DateType)
         .add("c7", BinaryType)
         .add("c8", ByteType)
   
       val rdd = spark.sparkContext.parallelize(0 to 1000000, 8).map { item =>
         val c1 = Integer.valueOf(item)
         val c11 = Integer.valueOf(Random.nextInt(1000000))
         val c12 = Integer.valueOf(Random.nextInt(1000000))
         val c2 = s" ${item}sdc"
         //val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}.${item}")
         val c3 = new java.math.BigDecimal(s"${Random.nextInt(1000)}")
         val c4 = new Timestamp(System.currentTimeMillis())
         val c5 = java.lang.Short.valueOf(s"${16}")
         val c6 = Date.valueOf(s"${2020}-${item % 11  +  1}-${item % 28  + 1}")
         val c7 = Array(item).map(_.toByte)
         val c8 = java.lang.Byte.valueOf("9")
   
         RowFactory.create(c1, c11, c12, c2, c3, c4, c5, c6, c7, c8)
       }
       spark.createDataFrame(rdd, schema)
     }
   
     def performClustering(writeDf: DataFrame, basePath: String = 
"/tmp/default/clustering"): Unit = {
       writeDf.write.format("org.apache.hudi")
         .options(commonOpts)
         .option("hoodie.compact.inline", "false")
         .option(DataSourceWriteOptions.OPERATION.key(), 
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
         .option(DataSourceWriteOptions.TABLE_TYPE.key(), "COPY_ON_WRITE")
         // option for clustering
         .option("hoodie.parquet.small.file.limit", "0")
         .option("hoodie.clustering.inline", "true")
         .option("hoodie.clustering.inline.max.commits", "1")
         .option("hoodie.clustering.plan.strategy.small.file.limit", 
String.valueOf(2*1024*1024L))
         .option("hoodie.clustering.plan.strategy.max.bytes.per.group", 
String.valueOf(10*1024*1024L))
         .option("hoodie.clustering.plan.strategy.target.file.max.bytes", 
String.valueOf(4 * 1024* 1024L))
         .option(HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS.key, "c11, 
c12")
         .mode(SaveMode.Overwrite)
         .save(basePath)
       writeDf.show()
       Thread.sleep(200000)
     }
   }
   
   before patch:
   
   
   
   after patch:
   
   
   
   ## Verify this pull request
   
   
   
   This pull request is already covered by existing tests, such as *(please 
describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under 
an umbrella JIRA.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to