[ https://issues.apache.org/jira/browse/HUDI-3242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17479240#comment-17479240 ]
Harsha Teja Kanna edited comment on HUDI-3242 at 1/20/22, 10:23 AM: -------------------------------------------------------------------- [~shivnarayan] So what I found from further debugging is that once the --checkpoint 0 is passed once to Deltastreamer, it will not pick it again if it is same. [https://github.com/apache/hudi/blob/release-0.10.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L471] I added log statements in a PR to master branch This is what I got 22/01/20 04:04:28 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://datalake-hudi/v1/journals 22/01/20 04:04:28 INFO HoodieTableConfig: Loading table properties from s3a://datalake-hudi/v1/journals/.hoodie/hoodie.properties 22/01/20 04:04:28 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://datalake-hudi/v1/journals 22/01/20 04:04:29 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220120085344674__replacecommit__COMPLETED]} 22/01/20 04:04:29 INFO DFSPathSelector: Using path selector org.apache.hudi.utilities.sources.helpers.DFSPathSelector 22/01/20 04:04:29 INFO HoodieDeltaStreamer: Delta Streamer running only single round 22/01/20 04:04:29 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://datalake-hudi/v1/journals 22/01/20 04:04:29 INFO HoodieTableConfig: Loading table properties from s3a://datalake-hudi/v1/journals/.hoodie/hoodie.properties 22/01/20 04:04:29 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://datalake-hudi/v1/journals 22/01/20 04:04:30 INFO HoodieActiveTimeline: Loaded instants upto : Option{val=[20220120085344674__replacecommit__COMPLETED]} 22/01/20 04:04:30 INFO DeltaSync: *Checkpoint reset from metadata: 0* 22/01/20 04:04:30 INFO DeltaSync: *Checkpoint from config: 0* 22/01/20 04:04:30 INFO DeltaSync: *Checkpoint to resume from : Option\{val=1642668697000}* 22/01/20 04:04:30 INFO DFSPathSelector: Root path => s3a://datalake-hudi/v1/journals/year=2022/month=01/day=19 source limit => 9223372036854775807 22/01/20 04:04:37 INFO DeltaSync: No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Option\{val=1642668697000}). New Checkpoint=(1642668697000) 22/01/20 04:04:37 INFO DeltaSync: Shutting down embedded timeline server 22/01/20 04:04:37 INFO HoodieDeltaStreamer: Shut down delta streamer 22/01/20 04:04:37 INFO SparkUI: Stopped Spark web UI at [http://192.168.86.5:4040|http://192.168.86.5:4040/] 22/01/20 04:04:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 22/01/20 04:04:37 INFO MemoryStore: MemoryStore cleared 22/01/20 04:04:37 INFO BlockManager: BlockManager stopped 22/01/20 04:04:38 INFO BlockManagerMaster: BlockManagerMaster stopped 22/01/20 04:04:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 22/01/20 04:04:38 INFO SparkContext: Successfully stopped SparkContext 22/01/20 04:04:38 INFO ShutdownHookManager: Shutdown hook called 22/01/20 04:04:38 INFO ShutdownHookManager: Deleting directory /private/var/folders/61/3vd56bjx3cj0hpdq_139d5hm0000gp/T/spark-acf0e21c-c48c-440c-86f8-72ff20bef349 22/01/20 04:04:38 INFO ShutdownHookManager: Deleting directory /private/var/folders/61/3vd56bjx3cj0hpdq_139d5hm0000gp/T/spark-b53eb674-0c67-4b68-8974-7ff706408686 22/01/20 04:04:38 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 22/01/20 04:04:38 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 22/01/20 04:04:38 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete. was (Author: h7kanna): So what I found from further debugging is that once the --checkpoint 0 is passed once to Deltastreamer, it will not pick it again if it is same. [https://github.com/apache/hudi/blob/release-0.10.1/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L471] I added log statements in a PR to master branch This is what I got 22/01/20 04:04:28 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://datalake-hudi/v1/journals 22/01/20 04:04:28 INFO HoodieTableConfig: Loading table properties from s3a://datalake-hudi/v1/journals/.hoodie/hoodie.properties 22/01/20 04:04:28 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://datalake-hudi/v1/journals 22/01/20 04:04:29 INFO HoodieActiveTimeline: Loaded instants upto : Option\{val=[20220120085344674__replacecommit__COMPLETED]} 22/01/20 04:04:29 INFO DFSPathSelector: Using path selector org.apache.hudi.utilities.sources.helpers.DFSPathSelector 22/01/20 04:04:29 INFO HoodieDeltaStreamer: Delta Streamer running only single round 22/01/20 04:04:29 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from s3a://datalake-hudi/v1/journals 22/01/20 04:04:29 INFO HoodieTableConfig: Loading table properties from s3a://datalake-hudi/v1/journals/.hoodie/hoodie.properties 22/01/20 04:04:29 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1, baseFileFormat=PARQUET) from s3a://datalake-hudi/v1/journals 22/01/20 04:04:30 INFO HoodieActiveTimeline: Loaded instants upto : Option\{val=[20220120085344674__replacecommit__COMPLETED]} 22/01/20 04:04:30 INFO DeltaSync: *Checkpoint reset from metadata: 0* 22/01/20 04:04:30 INFO DeltaSync: *Checkpoint from config: 0* 22/01/20 04:04:30 INFO DeltaSync: *Checkpoint to resume from : Option\{val=1642668697000}* 22/01/20 04:04:30 INFO DFSPathSelector: Root path => s3a://datalake-hudi/v1/journals/year=2022/month=01/day=19 source limit => 9223372036854775807 22/01/20 04:04:37 INFO DeltaSync: No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(Option\{val=1642668697000}). New Checkpoint=(1642668697000) 22/01/20 04:04:37 INFO DeltaSync: Shutting down embedded timeline server 22/01/20 04:04:37 INFO HoodieDeltaStreamer: Shut down delta streamer 22/01/20 04:04:37 INFO SparkUI: Stopped Spark web UI at http://192.168.86.5:4040 22/01/20 04:04:37 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 22/01/20 04:04:37 INFO MemoryStore: MemoryStore cleared 22/01/20 04:04:37 INFO BlockManager: BlockManager stopped 22/01/20 04:04:38 INFO BlockManagerMaster: BlockManagerMaster stopped 22/01/20 04:04:38 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 22/01/20 04:04:38 INFO SparkContext: Successfully stopped SparkContext 22/01/20 04:04:38 INFO ShutdownHookManager: Shutdown hook called 22/01/20 04:04:38 INFO ShutdownHookManager: Deleting directory /private/var/folders/61/3vd56bjx3cj0hpdq_139d5hm0000gp/T/spark-acf0e21c-c48c-440c-86f8-72ff20bef349 22/01/20 04:04:38 INFO ShutdownHookManager: Deleting directory /private/var/folders/61/3vd56bjx3cj0hpdq_139d5hm0000gp/T/spark-b53eb674-0c67-4b68-8974-7ff706408686 22/01/20 04:04:38 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 22/01/20 04:04:38 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 22/01/20 04:04:38 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete. > Checkpoint 0 is ignored -Partial parquet file discovery after the first commit > ------------------------------------------------------------------------------ > > Key: HUDI-3242 > URL: https://issues.apache.org/jira/browse/HUDI-3242 > Project: Apache Hudi > Issue Type: Bug > Components: spark, writer-core > Affects Versions: 0.10.1 > Environment: AWS > EMR 6.4.0 > Spark 3.1.2 > Hudi - 0.10.1-rc > Reporter: Harsha Teja Kanna > Assignee: sivabalan narayanan > Priority: Critical > Labels: hudi-on-call, sev:critical, user-support-issues > Attachments: Screen Shot 2022-01-13 at 2.40.55 AM.png, Screen Shot > 2022-01-13 at 2.55.35 AM.png > > Original Estimate: 3h > Remaining Estimate: 3h > > Hi, I am testing release branch 0.10.1 as I needed few bug fixes from it. > However, I see for a certain table. Only partial discovery of files happening > after the initial commit of the table. > But if the second partition is given as input for the first commit, all the > files are getting discovered. > First partition : 2021/01 has 744 files and all of them are discovered > Second partition: 2021/02 has 762 files but only 72 are discovered. > Checkpoint is set to 0. > No errors in the logs. > {code:java} > spark-submit \ > --master yarn \ > --deploy-mode cluster \ > --driver-cores 30 \ > --driver-memory 32g \ > --executor-cores 5 \ > --executor-memory 32g \ > --num-executors 120 \ > --jars > s3://bucket/apps/datalake/jars/unused-1.0.0.jar,s3://bucket/apps/datalake/jars/spark-avro_2.12-3.1.2.jar > \ > --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \ > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer > s3://bucket/apps/datalake/jars/hudi-0.10.0/hudi-utilities-bundle_2.12-0.10.0.jar > \ > --table-type COPY_ON_WRITE \ > --source-ordering-field timestamp \ > --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ > --target-base-path s3a://datalake-hudi/datastream/v1/sessions_by_date \ > --target-table sessions_by_date \ > --transformer-class > org.apache.hudi.utilities.transform.SqlQueryBasedTransformer \ > --op INSERT \ > --checkpoint 0 \ > --hoodie-conf hoodie.clean.automatic=true \ > --hoodie-conf hoodie.cleaner.commits.retained=1 \ > --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \ > --hoodie-conf hoodie.clustering.inline=false \ > --hoodie-conf hoodie.clustering.inline.max.commits=1 \ > --hoodie-conf > hoodie.clustering.plan.strategy.class=org.apache.hudi.client.clustering.plan.strategy.SparkSizeBasedClusteringPlanStrategy > \ > --hoodie-conf hoodie.clustering.plan.strategy.max.num.groups=1000000 \ > --hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=250000000 \ > --hoodie-conf hoodie.clustering.plan.strategy.sort.columns=sid,id \ > --hoodie-conf hoodie.clustering.plan.strategy.target.file.max.bytes=268435456 > \ > --hoodie-conf hoodie.clustering.preserve.commit.metadata=true \ > --hoodie-conf hoodie.datasource.hive_sync.database=datalake-hudi \ > --hoodie-conf hoodie.datasource.hive_sync.enable=false \ > --hoodie-conf hoodie.datasource.hive_sync.ignore_exceptions=true \ > --hoodie-conf hoodie.datasource.hive_sync.mode=hms \ > --hoodie-conf > hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.HiveStylePartitionValueExtractor > \ > --hoodie-conf hoodie.datasource.hive_sync.table=sessions_by_date \ > --hoodie-conf hoodie.datasource.hive_sync.use_jdbc=false \ > --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ > --hoodie-conf > hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator > \ > --hoodie-conf hoodie.datasource.write.operation=insert \ > --hoodie-conf hoodie.datasource.write.partitionpath.field=date:TIMESTAMP \ > --hoodie-conf hoodie.datasource.write.precombine.field=timestamp \ > --hoodie-conf hoodie.datasource.write.recordkey.field=id,qid,aid \ > --hoodie-conf > hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy/MM/dd \ > --hoodie-conf hoodie.deltastreamer.keygen.timebased.input.timezone=GMT \ > --hoodie-conf > hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \ > --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.timezone=GMT \ > --hoodie-conf > hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \ > --hoodie-conf > hoodie.deltastreamer.source.dfs.root=s3://datalake-hudi/history/datastream/v1/sessions/2021/02 > \ > --hoodie-conf > hoodie.deltastreamer.source.input.selector=org.apache.hudi.utilities.sources.helpers.DFSPathSelector > \ > --hoodie-conf "\"hoodie.deltastreamer.transformer.sql=SELECT id, qid, aid, > to_timestamp(timestamp) as timestamp, sid, > date_format(to_timestamp(timestamp), 'yyyy/MM/dd') AS date FROM <SRC> a \"" \ > --hoodie-conf hoodie.file.listing.parallelism=256 \ > --hoodie-conf hoodie.finalize.write.parallelism=256 \ > --hoodie-conf > hoodie.generate.consistent.timestamp.logical.for.key.generator=true \ > --hoodie-conf hoodie.insert.shuffle.parallelism=1000 \ > --hoodie-conf hoodie.metadata.enable=true \ > --hoodie-conf hoodie.metadata.metrics.enable=true \ > --hoodie-conf > hoodie.metrics.cloudwatch.metric.prefix=emr.datalake-service.prd.insert.sessions_by_date > \ > --hoodie-conf hoodie.metrics.on=true \ > --hoodie-conf hoodie.metrics.reporter.type=CLOUDWATCH \ > --hoodie-conf hoodie.parquet.block.size=268435456 \ > --hoodie-conf hoodie.parquet.compression.codec=snappy \ > --hoodie-conf hoodie.parquet.max.file.size=268435456 \ > --hoodie-conf hoodie.parquet.small.file.limit=250000000 {code} > > > -- This message was sent by Atlassian Jira (v8.20.1#820001)