jpugliesi opened a new issue #2002: URL: https://github.com/apache/hudi/issues/2002
I am seeing inconsistent commit history when querying a Hudi table incrementally (with `begin_time = 0` ) vs. using the CLI's `commits show`. [Following up on the Slack thread here with @bhasudha ](https://apache-hudi.slack.com/archives/C4D716NPQ/p1597336199348200) **To Reproduce** Steps to reproduce the behavior: ``` # load data dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator() inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10)) df = spark.read.json(spark.sparkContext.parallelize(inserts, 2)) df.show(vertical=True, truncate=False, n=3) # we'll save the non-hive hudi table here hudi_base_path = f"s3://{bucket}/tmp/hudi/trips_cow" # https://hudi.apache.org/docs/configurations.html hudi_insert_options = { 'hoodie.table.name': "trips_cow", 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': '', # note we now define the partition field 'hoodie.datasource.write.precombine.field': 'ts', # this defines how to compare rows with the same record key - "greatest" value wins (using .compareTo) 'hoodie.datasource.write.operation': 'bulk_insert', # note we're bulk inserting the initial dataset 'hoodie.consistency.check.enabled': 'true', # enable consistency checking for S3 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } print("Initial insert") df.write.format("hudi"). \ options(**hudi_insert_options). \ mode("overwrite"). \ save(hudi_base_path) spark. \ read. \ format("hudi"). \ options(**{ 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': "0", }). \ load(hudi_base_path). \ createOrReplaceTempView("hudi_trips_snapshot") query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime") commits = list(map(lambda row: row[0], query.collect())) print("Commits after initial insert:", commits) ################################################### # Upsert new data - this should create a new commit ################################################### updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) # https://hudi.apache.org/docs/configurations.html hudi_upsert_options = { # hive sync config 'hoodie.table.name': "trips_cow", # normal hudi write config 'hoodie.datasource.write.recordkey.field': 'uuid', 'hoodie.datasource.write.partitionpath.field': 'partitionpath', 'hoodie.datasource.write.precombine.field': 'ts', 'hoodie.datasource.write.operation': 'upsert', # (specifies upsert) 'hoodie.consistency.check.enabled': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2 } print("First Upsert") updates_df.write.format("hudi"). \ options(**hudi_upsert_options). \ mode("append"). \ save(hudi_base_path) # query for all commits since the first commit spark. \ read. \ format("hudi"). \ options(**{ 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': "0", }). \ load(hudi_base_path). \ createOrReplaceTempView("hudi_trips_snapshot") query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime") commits = list(map(lambda row: row[0], query.collect())) print("Commits after second upsert:", commits) #### Upsert 2 # Write updates again - we expect there to be 3 commits after this updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) print("Second Upsert") updates_df.write.format("hudi"). \ options(**hudi_upsert_options). \ mode("append"). \ save(hudi_base_path) # query for all commits since the first commit spark. \ read. \ format("hudi"). \ options(**{ 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': "0", }). \ load(hudi_base_path). \ createOrReplaceTempView("hudi_trips_snapshot") query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime") commits = list(map(lambda row: row[0], query.collect())) print("Commits after second upsert:", commits) # Write updates again - we expect there to be 3 commits after this updates = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateUpdates(10)) updates_df = spark.read.json(spark.sparkContext.parallelize(updates, 2)) print("Third Upsert") updates_df.write.format("hudi"). \ options(**hudi_upsert_options). \ mode("append"). \ save(hudi_base_path) # query for all commits since the first commit spark. \ read. \ format("hudi"). \ options(**{ 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': "0", }). \ load(hudi_base_path). \ createOrReplaceTempView("hudi_trips_snapshot") query = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime") commits = list(map(lambda row: row[0], query.collect())) print("Commits after third upsert:", commits) ``` Results: ``` Initial insert Commits after initial insert: ['20200820185825'] First Upsert Commits after first upsert: ['20200820185825', '20200820185832'] Second Upsert Commits after second upsert: ['20200820185825', '20200820185832', '20200820185841'] Third Upsert Commits after third upsert: ['20200820185825', '20200820185841', '20200820185848'] ``` **Expected behavior** Notice that there are only 3 commits after the 3rd UPSERT - I expect 4 commits to be returned in the incremental view. Additionally, the Hudi CLI shows 4 commits, as expected:  **Environment Description** * Hudi version : 0.5.3 * Spark version : 2.4.5 * Hive version : 2.3.7 * Hadoop version : 2.8.5 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
