kingkongpoon opened a new issue #2345:
URL: https://github.com/apache/hudi/issues/2345


   When I use hudi-0.6.0, I find that the option PRECOMBINE_FIELD_OPT_KEY is 
useless ?
   
   I want to use a rt table to update my data by it's timestamp (ts) 
   
   ### Test Data   filename   a.csv
   1,2,3,a,b
   4,5,6,c,d
   7,8,9,e,f
   
   hdfs dfs -put a.csv /hudi/
   
   ### Spark code
   first write into hdfs 
   
       val spark = SparkSession.builder()
         .master("local[2]")
         .appName("Hudi")
         .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
         .getOrCreate()
   
       val input = spark.read.format("csv").load("hdfs://node1:9000/hudi/a.csv")
             .withColumnRenamed("_c0", "uuid")
             .withColumnRenamed("_c1", "partitionpath")
             .withColumnRenamed("_c2", "ts")
   
       val basePath = "hdfs://node1:9000/hudi/test"
   
        input.write.format("org.apache.hudi")
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) 
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid") 
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath")
         .option("hoodie.table.name", "test") 
         .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
         .mode(SaveMode.Overwrite)
         .save(basePath)
   
       spark.read.format("hudi").load(basePath+"/*").show()
   
       
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
       
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid|partitionpath| ts|_c3|_c4|
       
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
       |     20201218134136|  20201218134136_1_1|                 4|            
         5|825dbdc3-1ea6-4d8...|   4|            5|  6|  c|  d|
       |     20201218134136|  20201218134136_2_3|                 7|            
         8|77be93a5-3ee5-43a...|   7|            8|  9|  e|  f|
       |     20201218134136|  20201218134136_0_2|                 1|            
         2|8d8a6498-116f-4f7...|   1|            2|  3|  a|  b|
       
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
   
   
   second write(update) my data, 2.csv is a local data.
   
   1,2,10,bbb,bbb
   4,5,1,bbb,bbb
   7,8,10,bbb,bbb
   
       val input = spark.read.format("csv").load("file:///root/2.csv")
         .withColumnRenamed("_c0", "uuid")
         .withColumnRenamed("_c1", "partitionpath")
         .withColumnRenamed("_c2", "ts")
   
       input.write.format("org.apache.hudi")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, 
"partitionpath")
         .option("hoodie.table.name", "test")
        .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.GLOBAL_BLOOM.name()) 
         .mode(SaveMode.Append)
         .save(basePath)
   
   when it successful run, I query by spark
   
       
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
       
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
   _hoodie_file_name|uuid|partitionpath| ts|_c3|_c4|
       
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
       |     20201218134505|  20201218134505_1_2|                 4|            
         5|825dbdc3-1ea6-4d8...|   4|            5|  1|bbb|bbb|
       |     20201218134505|  20201218134505_2_3|                 7|            
         8|77be93a5-3ee5-43a...|   7|            8| 10|bbb|bbb|
       |     20201218134505|  20201218134505_0_1|                 1|            
         2|8d8a6498-116f-4f7...|   1|            2| 10|bbb|bbb|
       
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
   
   if the ts effect,the data which uuid=4 still be ts=6 ,and _c3=c ,  _c4=d,but 
now all uuid's _c3,_c4,ts columns are updated
    I can find parquet file and avro file in hdfs.
   
       Field used in preCombining before actual write. When two records have 
the same key value, we will pick the one with the
       largest value for the precombine field, determined by 
Object.compareTo(..)
       val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
       val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts"


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to