ad1happy2go commented on issue #10932:
URL: https://github.com/apache/hudi/issues/10932#issuecomment-2024899836
@pravin1406 I tried lot of combinations as you said but looks like I am
missing something. I couldn't reproduce this issue with Hudi 0.14.1.
Can you review below code and let me know what I can change to get the issue
as you suggested.
```
fake = Faker()
data = [{"transactionId": fake.uuid4(), "EventTime": "2","storeNbr" : "1",
"FullName": fake.name(), "Address": fake.address(),
"CompanyName": fake.company(), "JobTitle": fake.job(),
"EmailAddress": fake.email(), "PhoneNumber": fake.phone_number(),
"RandomText": fake.sentence(), "City": fake.city(),
"State": "NYC", "Country": "US"} for _ in range(1)]
pandas_df = pd.DataFrame(data)
df = spark.createDataFrame(pandas_df)
df.write.mode("overwrite").format("hudi") \
.option("hoodie.datasource.write.operation","upsert") \
.option("hoodie.spark.sql.insert.into.operation","upsert") \
.option("hoodie.datasource.write.precombine.field", "EventTime") \
.option("hoodie.datasource.write.recordkey.field", "transactionId") \
.option("hoodie.datasource.write.partitionpath.field","Country") \
.option("hoodie.datasource.write.table.type","MERGE_ON_READ") \
.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.DefaultHoodieRecordPayload")
\
.option("hoodie.enable.data.skipping", "true")
.option("hoodie.datasource.write.reconcile.schema", "true") \
.option("hoodie.datasource.hive_sync.support_timestamp", "true")
.option("hoodie.upsert.shuffle.parallelism","200") \
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.simple.index.update.partition.path","true") \
.option("hoodie.table.name",tablename) \
.option("hoodie.datasource.write.hive_style_partitioning","true") \
.save(PATH)
spark.read.format("hudi").load(PATH).show()
assert spark.read.format("hudi").load(PATH).count() == 1
# Upsert 1 record into a hudi table.
df.withColumn("City", lit("SFO")).write.mode("append").format("hudi") \
.option("hoodie.datasource.write.operation","upsert") \
.option("hoodie.spark.sql.insert.into.operation","upsert") \
.option("hoodie.datasource.write.precombine.field", "EventTime") \
.option("hoodie.datasource.write.recordkey.field", "transactionId") \
.option("hoodie.datasource.write.partitionpath.field","Country") \
.option("hoodie.datasource.write.table.type","MERGE_ON_READ") \
.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.DefaultHoodieRecordPayload")
\
.option("hoodie.enable.data.skipping", "true")
.option("hoodie.datasource.write.reconcile.schema", "true") \
.option("hoodie.datasource.hive_sync.support_timestamp", "true")
.option("hoodie.upsert.shuffle.parallelism","200") \
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.simple.index.update.partition.path","true") \
.option("hoodie.table.name",tablename) \
.option("hoodie.datasource.write.hive_style_partitioning","true") \
.save(PATH)
spark.read.format("hudi").load(PATH).show()
assert spark.read.format("hudi").load(PATH).count() == 1
# Update the partition col to a new a value and upsert that
df.withColumn("Country", lit("MA")).write.mode("append").format("hudi") \
.option("hoodie.datasource.write.operation","upsert") \
.option("hoodie.spark.sql.insert.into.operation","upsert") \
.option("hoodie.datasource.write.precombine.field", "EventTime") \
.option("hoodie.datasource.write.recordkey.field", "transactionId") \
.option("hoodie.datasource.write.partitionpath.field","Country") \
.option("hoodie.datasource.write.table.type","MERGE_ON_READ") \
.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.DefaultHoodieRecordPayload")
\
.option("hoodie.enable.data.skipping", "true")
.option("hoodie.datasource.write.reconcile.schema", "true") \
.option("hoodie.datasource.hive_sync.support_timestamp", "true") \
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.simple.index.update.partition.path","true") \
.option("hoodie.table.name",tablename) \
.option("hoodie.datasource.write.hive_style_partitioning","true") \
.save(PATH)
spark.read.format("hudi").load(PATH).show()
assert spark.read.format("hudi").load(PATH).count() == 1
# Update the partition col to the older value and upsert that
df.withColumn("Country", lit("US")).write.mode("append").format("hudi") \
.option("hoodie.datasource.write.operation","upsert") \
.option("hoodie.spark.sql.insert.into.operation","upsert") \
.option("hoodie.datasource.write.precombine.field", "EventTime") \
.option("hoodie.datasource.write.recordkey.field", "transactionId") \
.option("hoodie.datasource.write.partitionpath.field","Country") \
.option("hoodie.datasource.write.table.type","MERGE_ON_READ") \
.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.DefaultHoodieRecordPayload")
\
.option("hoodie.enable.data.skipping", "true")
.option("hoodie.datasource.write.reconcile.schema", "true") \
.option("hoodie.datasource.hive_sync.support_timestamp", "true") \
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.simple.index.update.partition.path","true") \
.option("hoodie.table.name",tablename) \
.option("hoodie.datasource.write.hive_style_partitioning","true") \
.save(PATH)
spark.read.format("hudi").load(PATH).show()
assert spark.read.format("hudi").load(PATH).count() == 1
# Update the partition col to the older value and upsert that
df.withColumn("Country", lit("PA")).write.mode("append").format("hudi") \
.option("hoodie.datasource.write.operation","upsert") \
.option("hoodie.spark.sql.insert.into.operation","upsert") \
.option("hoodie.datasource.write.precombine.field", "EventTime") \
.option("hoodie.datasource.write.recordkey.field", "transactionId") \
.option("hoodie.datasource.write.partitionpath.field","Country") \
.option("hoodie.datasource.write.table.type","MERGE_ON_READ") \
.option("hoodie.datasource.write.payload.class","org.apache.hudi.common.model.DefaultHoodieRecordPayload")
\
.option("hoodie.enable.data.skipping", "true")
.option("hoodie.datasource.write.reconcile.schema", "true") \
.option("hoodie.datasource.hive_sync.support_timestamp", "true") \
.option("hoodie.index.type","GLOBAL_SIMPLE")
.option("hoodie.simple.index.update.partition.path","true") \
.option("hoodie.table.name",tablename) \
.option("hoodie.datasource.write.hive_style_partitioning","true") \
.save(PATH)
spark.read.format("hudi").load(PATH).show()
assert spark.read.format("hudi").load(PATH).count() == 1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]