kingkongpoon opened a new issue #2345:
URL: https://github.com/apache/hudi/issues/2345
When I use hudi-0.6.0, I find that the option PRECOMBINE_FIELD_OPT_KEY is
useless ?
I want to use a rt table to update my data by it's timestamp (ts)
### Test Data filename a.csv
1,2,3,a,b
4,5,6,c,d
7,8,9,e,f
hdfs dfs -put a.csv /hudi/
### Spark code
first write into hdfs
val spark = SparkSession.builder()
.master("local[2]")
.appName("Hudi")
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
val input = spark.read.format("csv").load("hdfs://node1:9000/hudi/a.csv")
.withColumnRenamed("_c0", "uuid")
.withColumnRenamed("_c1", "partitionpath")
.withColumnRenamed("_c2", "ts")
val basePath = "hdfs://node1:9000/hudi/test"
input.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY,
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partitionpath")
.option("hoodie.table.name", "test")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Overwrite)
.save(basePath)
spark.read.format("hudi").load(basePath+"/*").show()
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid|partitionpath| ts|_c3|_c4|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
| 20201218134136| 20201218134136_1_1| 4|
5|825dbdc3-1ea6-4d8...| 4| 5| 6| c| d|
| 20201218134136| 20201218134136_2_3| 7|
8|77be93a5-3ee5-43a...| 7| 8| 9| e| f|
| 20201218134136| 20201218134136_0_2| 1|
2|8d8a6498-116f-4f7...| 1| 2| 3| a| b|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
second write(update) my data, 2.csv is a local data.
1,2,10,bbb,bbb
4,5,1,bbb,bbb
7,8,10,bbb,bbb
val input = spark.read.format("csv").load("file:///root/2.csv")
.withColumnRenamed("_c0", "uuid")
.withColumnRenamed("_c1", "partitionpath")
.withColumnRenamed("_c2", "ts")
input.write.format("org.apache.hudi")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY,
"partitionpath")
.option("hoodie.table.name", "test")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
.option(HoodieIndexConfig.INDEX_TYPE_PROP,
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
.mode(SaveMode.Append)
.save(basePath)
when it successful run, I query by spark
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|
_hoodie_file_name|uuid|partitionpath| ts|_c3|_c4|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
| 20201218134505| 20201218134505_1_2| 4|
5|825dbdc3-1ea6-4d8...| 4| 5| 1|bbb|bbb|
| 20201218134505| 20201218134505_2_3| 7|
8|77be93a5-3ee5-43a...| 7| 8| 10|bbb|bbb|
| 20201218134505| 20201218134505_0_1| 1|
2|8d8a6498-116f-4f7...| 1| 2| 10|bbb|bbb|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-------------+---+---+---+
if the ts effect,the data which uuid=4 still be ts=6 ,and _c3=c , _c4=d,but
now all uuid's _c3,_c4,ts columns are updated
I can find parquet file and avro file in hdfs.
Field used in preCombining before actual write. When two records have
the same key value, we will pick the one with the
largest value for the precombine field, determined by
Object.compareTo(..)
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val DEFAULT_PRECOMBINE_FIELD_OPT_VAL = "ts"
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]