jpugliesi opened a new issue #2002:
URL: https://github.com/apache/hudi/issues/2002


   I am seeing inconsistent commit history when querying a Hudi table 
incrementally (with `begin_time = 0` ) vs. using the CLI's `commits show`.
   
   [Following up on the Slack thread here with @bhasudha 
](https://apache-hudi.slack.com/archives/C4D716NPQ/p1597336199348200) 
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   ```
   # load data
   dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
   inserts = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
   df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
   df.show(vertical=True, truncate=False, n=3)
   
   # we'll save the non-hive hudi table here
   hudi_base_path = f"s3://{bucket}/tmp/hudi/trips_cow"
   
   # https://hudi.apache.org/docs/configurations.html
   hudi_insert_options = {
     'hoodie.table.name': "trips_cow",
     'hoodie.datasource.write.recordkey.field': 'uuid',
     'hoodie.datasource.write.partitionpath.field': '', # note we now define 
the partition field
     'hoodie.datasource.write.precombine.field': 'ts', # this defines how to 
compare rows with the same record key - "greatest" value wins (using .compareTo)
     'hoodie.datasource.write.operation': 'bulk_insert', # note we're bulk 
inserting the initial dataset
     'hoodie.consistency.check.enabled': 'true', # enable consistency checking 
for S3
     'hoodie.upsert.shuffle.parallelism': 2, 
     'hoodie.insert.shuffle.parallelism': 2
   }
   
   print("Initial insert")
   df.write.format("hudi"). \
     options(**hudi_insert_options). \
     mode("overwrite"). \
     save(hudi_base_path)
   
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  
hudi_trips_snapshot order by commitTime")
   
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after initial insert:", commits)
   
   ###################################################
   # Upsert new data - this should create a new commit
   ###################################################
   updates = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
   updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   # https://hudi.apache.org/docs/configurations.html
   hudi_upsert_options = {
     # hive sync config
     'hoodie.table.name': "trips_cow",
     # normal hudi write config
     'hoodie.datasource.write.recordkey.field': 'uuid',
     'hoodie.datasource.write.partitionpath.field': 'partitionpath',
     'hoodie.datasource.write.precombine.field': 'ts',
     'hoodie.datasource.write.operation': 'upsert', # (specifies upsert)
     'hoodie.consistency.check.enabled': 'true',
     'hoodie.upsert.shuffle.parallelism': 2, 
     'hoodie.insert.shuffle.parallelism': 2
   }
   
   print("First Upsert")
   updates_df.write.format("hudi"). \
     options(**hudi_upsert_options). \
     mode("append"). \
     save(hudi_base_path)
   
   # query for all commits since the first commit
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  
hudi_trips_snapshot order by commitTime")
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after second upsert:", commits)
   
   #### Upsert 2
   # Write updates again - we expect there to be 3 commits after this
   updates = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
   updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   print("Second Upsert")
   updates_df.write.format("hudi"). \
     options(**hudi_upsert_options). \
     mode("append"). \
     save(hudi_base_path)
   
   # query for all commits since the first commit
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  
hudi_trips_snapshot order by commitTime")
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after second upsert:", commits)
   
   # Write updates again - we expect there to be 3 commits after this
   updates = 
sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10))
   updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
   
   print("Third Upsert")
   updates_df.write.format("hudi"). \
     options(**hudi_upsert_options). \
     mode("append"). \
     save(hudi_base_path)
   
   # query for all commits since the first commit
   spark. \
     read. \
     format("hudi"). \
     options(**{
         'hoodie.datasource.query.type': 'incremental',
         'hoodie.datasource.read.begin.instanttime': "0",
     }). \
     load(hudi_base_path). \
     createOrReplaceTempView("hudi_trips_snapshot")
   
   query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  
hudi_trips_snapshot order by commitTime")
   commits = list(map(lambda row: row[0], query.collect()))
   print("Commits after third upsert:", commits)
   ```
   
   Results:
   ```
   Initial insert
   Commits after initial insert: ['20200820185825']
   First Upsert
   Commits after first upsert: ['20200820185825', '20200820185832']
   Second Upsert
   Commits after second upsert: ['20200820185825', '20200820185832', 
'20200820185841']
   Third Upsert
   Commits after third upsert: ['20200820185825', '20200820185841', 
'20200820185848']
   ```
   
   **Expected behavior**
   Notice that there are only 3 commits after the 3rd UPSERT - I expect 4 
commits to be returned in the incremental view.
   
   Additionally, the Hudi CLI shows 4 commits, as expected:
   
![image](https://user-images.githubusercontent.com/2141170/90813765-e328b600-e2dc-11ea-869a-570cd27cf853.png)
   
   
   **Environment Description**
   
   * Hudi version : 0.5.3
   
   * Spark version : 2.4.5
   
   * Hive version : 2.3.7
   
   * Hadoop version : 2.8.5
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to