[ https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sivabalan narayanan updated HUDI-2947: -------------------------------------- Fix Version/s: 0.10.1 > HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config > from CLI in continuous mode > ------------------------------------------------------------------------------------------------------ > > Key: HUDI-2947 > URL: https://issues.apache.org/jira/browse/HUDI-2947 > Project: Apache Hudi > Issue Type: Bug > Reporter: Ethan Guo > Assignee: sivabalan narayanan > Priority: Critical > Labels: pull-request-available, sev:critical > Fix For: 0.11.0, 0.10.1 > > > *Problem:* > When deltastreamer is started with a given checkpoint, e.g., `--checkpoint > 0`, in the continuous mode, the deltastreamer job may pick up the wrong > checkpoint later on. The wrong checkpoint (for 20211206203551080 commit) > happens after the replacecommit and clean, which is reset to "0", instead of > "5" after 20211206202728233.commit. More details below. > > The bug is due to the check here: > [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335] > {code:java} > if (cfg.checkpoint != null && > (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)) > || > !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) { > resumeCheckpointStr = Option.of(cfg.checkpoint); > } {code} > In this case of resuming after a clustering commit, "cfg.checkpoint != null" > and > "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" > are both true as "--checkpoint 0" is configured and last commit is > replacecommit without checkpoint keys. This leads to the resume checkpoint > string being reset to the configured checkpoint, skipping the timeline > walk-back logic below, which is wrong. > > Timeline: > > {code:java} > 189069 Dec 6 12:19 20211206201238649.commit > 0 Dec 6 12:12 20211206201238649.commit.requested > 0 Dec 6 12:12 20211206201238649.inflight > 189069 Dec 6 12:27 20211206201959151.commit > 0 Dec 6 12:20 20211206201959151.commit.requested > 0 Dec 6 12:20 20211206201959151.inflight > 189069 Dec 6 12:34 20211206202728233.commit > 0 Dec 6 12:27 20211206202728233.commit.requested > 0 Dec 6 12:27 20211206202728233.inflight > 36662 Dec 6 12:35 20211206203449899.replacecommit > 0 Dec 6 12:35 20211206203449899.replacecommit.inflight > 34656 Dec 6 12:35 20211206203449899.replacecommit.requested > 28013 Dec 6 12:35 20211206203503574.clean > 19024 Dec 6 12:35 20211206203503574.clean.inflight > 19024 Dec 6 12:35 20211206203503574.clean.requested > 189069 Dec 6 12:43 20211206203551080.commit > 0 Dec 6 12:35 20211206203551080.commit.requested > 0 Dec 6 12:35 20211206203551080.inflight > 189069 Dec 6 12:50 20211206204311612.commit > 0 Dec 6 12:43 20211206204311612.commit.requested > 0 Dec 6 12:43 20211206204311612.inflight > 0 Dec 6 12:50 20211206205044595.commit.requested > 0 Dec 6 12:50 20211206205044595.inflight > 128 Dec 6 12:56 archived > 483 Dec 6 11:52 hoodie.properties > {code} > > Checkpoints in commits: > > {code:java} > grep "deltastreamer.checkpoint.key" * > 20211206201238649.commit: "deltastreamer.checkpoint.key" : "2" > 20211206201959151.commit: "deltastreamer.checkpoint.key" : "3" > 20211206202728233.commit: "deltastreamer.checkpoint.key" : "4" > 20211206203551080.commit: "deltastreamer.checkpoint.key" : "1" > 20211206204311612.commit: "deltastreamer.checkpoint.key" : "2" {code} > > *Steps to reproduce:* > Run HoodieDeltaStreamer in the continuous mode, by providing both > "--checkpoint 0" and "--continuous", with inline clustering and sync clean > enabled (some configs are masked). > > {code:java} > spark-submit \ > --master yarn \ > --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores > 4 \ > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ > --conf > spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain > \ > --conf spark.speculation=true \ > --conf spark.speculation.multiplier=1.0 \ > --conf spark.speculation.quantile=0.5 \ > --packages org.apache.spark:spark-avro_2.12:3.2.0 \ > --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ > file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \ > --props file:/home/hadoop/ethan/test.properties \ > --source-class ... \ > --source-ordering-field ts \ > --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \ > --target-table test_table \ > --table-type COPY_ON_WRITE \ > --op BULK_INSERT \ > --checkpoint 0 \ > --continuous {code} > test.properties: > > > {code:java} > hoodie.cleaner.commits.retained=4 > hoodie.keep.min.commits=5 > hoodie.keep.max.commits=7 > hoodie.clean.async=true > hoodie.clustering.inline=true > hoodie.clustering.async.max.commits=3 > hoodie.compact.inline.max.delta.commits=3 > hoodie.insert.shuffle.parallelism=10 > hoodie.upsert.shuffle.parallelism=10 > hoodie.bulk_insert.shuffle.parallelism=10 > hoodie.delete.shuffle.parallelism=10 > hoodie.bulkinsert.shuffle.parallelism=10 > hoodie.datasource.write.recordkey.field=key > hoodie.datasource.write.partitionpath.field=partition > # turn off any small file handling, for ease of testing > hoodie.parquet.small.file.limit=1 > benchmark.input.source.path=...{code} > > -- This message was sent by Atlassian Jira (v8.20.1#820001)