nikoshet opened a new issue, #8343: URL: https://github.com/apache/hudi/issues/8343
**Describe the problem you faced** Hello, I am experimenting with AWS DMS -> Hudi architecture using `DeltaStreamer` with parquet, and I want to partition the files in folders based on year, month and day, where the partition field is a column named _created_at_ that is of type _TIMESTAMP_ . When using a command to spawn the Spark Job and use partition only on year, it works as expected, and the folders inside the S3 bucket are partitioned correctly. The command is: ```bash spark-submit \ --jars local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar \ --master k8s://http://localhost:8001 --deploy-mode cluster \ --conf spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.kubernetes.namespace=spark \ --conf spark.kubernetes.executor.podTemplateFile=$(pwd)/pod-templates/podTemplateExecutor.yaml \ --conf spark.kubernetes.driver.podTemplateFile=$(pwd)/pod-templates/podTemplateDriver.yaml \ --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \ --conf spark.ui.port=4040 \ --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \ --table-type COPY_ON_WRITE --op BULK_INSERT \ --target-base-path s3a://cdc-spike/hudi/postgres/employee \ --target-table employee \ --min-sync-interval-seconds 60 \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \ --hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \ --source-ordering-field _dms_ingestion_timestamp \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=id \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.partitionpath.field=created_at:TIMESTAMP \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator \ --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy" \ --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="SCALAR" \ --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit="microseconds" ``` Also, when I start a Spark SQL client and load the hudi table, it is read without any errors. However, when I am trying to run a command that will use `year,month,day` partitioning , thus changing this line: ```bash --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd" \ ``` <details> <summary> i.e:</summary> <br> ```bash spark-submit \ --jars local:///opt/spark/work-dir/hudi-spark3.3-bundle_2.12-0.13.0.jar,local:///opt/spark/work-dir/hudi-aws-bundle-0.13.0.jar,local:///opt/spark/work-dir/aws-java-sdk-bundle-1.12.398.jar,local:///opt/spark/work-dir/hadoop-aws-3.3.4.jar \ --master k8s://http://localhost:8001 --deploy-mode cluster \ --conf spark.kubernetes.container.image=stathisq/spark-hudi:3.3.1-0.13.0-slim \ --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \ --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \ --conf spark.kubernetes.namespace=spark \ --conf spark.kubernetes.executor.podTemplateFile=$(pwd)/pod-templates/podTemplateExecutor.yaml \ --conf spark.kubernetes.driver.podTemplateFile=$(pwd)/pod-templates/podTemplateDriver.yaml \ --conf spark.kubernetes.file.upload.path=s3a://cdc-spike/spark \ --conf spark.ui.port=4040 \ --conf spark.driver.extraJavaOptions="-Divy.cache.dir=/tmp -Divy.home=/tmp" \ --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \ --class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" local:///opt/spark/work-dir/hudi-utilities-slim-bundle_2.12-0.13.0.jar \ --table-type COPY_ON_WRITE --op BULK_INSERT \ --target-base-path s3a://cdc-spike/hudi/postgres/employee \ --target-table employee \ --min-sync-interval-seconds 60 \ --source-class org.apache.hudi.utilities.sources.ParquetDFSSource \ --payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \ --hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \ --source-ordering-field _dms_ingestion_timestamp \ --hoodie-conf auto.offset.reset=earliest \ --hoodie-conf hoodie.datasource.write.recordkey.field=id \ --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \ --hoodie-conf hoodie.datasource.write.partitionpath.field=created_at:TIMESTAMP \ --hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator \ --hoodie-conf hoodie.deltastreamer.keygen.timebased.output.dateformat="yyyy/MM/dd" \ --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.type="SCALAR" \ --hoodie-conf hoodie.deltastreamer.keygen.timebased.timestamp.scalar.time.unit="microseconds" ``` </br> </details> The folders are split correctly on the S3 bucket, as shown below: ```bash ws s3 ls s3://cdc-spike/hudi/postgres/employee --recursive 2023-03-31 18:22:27 0 hudi/postgres/employee/.hoodie/.aux/.bootstrap/.fileids/ 2023-03-31 18:22:26 0 hudi/postgres/employee/.hoodie/.aux/.bootstrap/.partitions/ 2023-03-31 18:22:22 0 hudi/postgres/employee/.hoodie/.schema/ 2023-03-31 18:23:51 0 hudi/postgres/employee/.hoodie/.temp/ 2023-03-31 18:23:49 1721 hudi/postgres/employee/.hoodie/20230331152242137.commit 2023-03-31 18:22:45 0 hudi/postgres/employee/.hoodie/20230331152242137.commit.requested 2023-03-31 18:23:10 0 hudi/postgres/employee/.hoodie/20230331152242137.inflight 2023-03-31 18:22:23 0 hudi/postgres/employee/.hoodie/archived/ 2023-03-31 18:23:07 697 hudi/postgres/employee/.hoodie/hoodie.properties 2023-03-31 18:22:55 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.aux/.bootstrap/.fileids/ 2023-03-31 18:22:54 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.aux/.bootstrap/.partitions/ 2023-03-31 18:22:50 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.schema/ 2023-03-31 18:23:45 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/.temp/ 2023-03-31 18:23:04 5615 hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit 2023-03-31 18:23:03 121 hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit.inflight 2023-03-31 18:23:00 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/00000000000000.deltacommit.requested 2023-03-31 18:23:42 6712 hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit 2023-03-31 18:23:34 1502 hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit.inflight 2023-03-31 18:23:30 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/20230331152242137.deltacommit.requested 2023-03-31 18:22:51 0 hudi/postgres/employee/.hoodie/metadata/.hoodie/archived/ 2023-03-31 18:22:56 672 hudi/postgres/employee/.hoodie/metadata/.hoodie/hoodie.properties 2023-03-31 18:22:58 124 hudi/postgres/employee/.hoodie/metadata/files/.files-0000_00000000000000.log.1_0-0-0 2023-03-31 18:23:40 11050 hudi/postgres/employee/.hoodie/metadata/files/.files-0000_00000000000000.log.2_0-10-10 2023-03-31 18:23:36 93 hudi/postgres/employee/.hoodie/metadata/files/.hoodie_partition_metadata 2023-03-31 18:23:15 96 hudi/postgres/employee/created_at=2023/03/31/.hoodie_partition_metadata 2023-03-31 18:23:19 436276 hudi/postgres/employee/created_at=2023/03/31/30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet ``` ,but with the Spark SQL client I get the following error: ```bash java.lang.ClassCastException: class org.apache.spark.unsafe.types.UTF8String cannot be cast to class java.lang.Long (org.apache.spark.unsafe.types.UTF8String is in unnamed module of loader 'app'; java.lang.Long is in module java.base of loader 'bootstrap') at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong(rows.scala:42) at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getLong$(rows.scala:42) at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:195) at org.apache.spark.sql.execution.vectorized.ColumnVectorUtils.populate(ColumnVectorUtils.java:100) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:269) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:280) at org.apache.spark.sql.execution.datasources.parquet.Spark32PlusHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark32PlusHoodieParquetFileFormat.scala:309) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:209) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:270) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116) at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:554) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) ``` The same error occurs when using spark shell and loading the table using DataFrames. A sample of the data in the Hudi parquet (seem to be ok) file are: <details> <summary> drop down</summary> <br> ```bash +-----------------------+------------------------+----------------------+--------------------------+------------------------------------------------------------------------+------+----------------------------+------+-------------+----------+----------------------------------+ | _hoodie_commit_time | _hoodie_commit_seqno | _hoodie_record_key | _hoodie_partition_path | _hoodie_file_name | Op | _dms_ingestion_timestamp | id | name | salary | created_at | |-----------------------+------------------------+----------------------+--------------------------+------------------------------------------------------------------------+------+----------------------------+------+-------------+----------+----------------------------------| | 20230331152242137 | 20230331152242137_0_0 | 1 | created_at=2023/03/31 | 30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I | 2023-03-31 13:06:13.492681 | 1 | Employee 1 | 2000 | 2023-03-31 14:13:40.973882+00:00 | | 20230331152242137 | 20230331152242137_0_1 | 2 | created_at=2023/03/31 | 30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I | 2023-03-31 13:06:13.492721 | 2 | Employee 2 | 5000 | 2023-03-31 14:13:40.973882+00:00 | | 20230331152242137 | 20230331152242137_0_2 | 3 | created_at=2023/03/31 | 30e66d5f-d5da-4206-9961-2b1c9f2967d1-0_0-3-3_20230331152242137.parquet | I | 2023-03-31 13:06:13.492727 | 3 | Employee 3 | 1000 | 2023-03-31 14:13:40.973882+00:00 | ``` </br> </details> **To Reproduce** Steps to reproduce the behavior: 1. Create a DMS instance that reads from an RDS and writes to S3 2. Start a Hudi DeltaStreamer Job for a table with a _created_at_ column of type TIMESTAMP 3. Start a Spark SQL client and load the Hudi table 4. Run a `select` statement on the table **Expected behavior** I expected the Spark SQL client to show the table without any errors, since the data are partitioned correctly on S3. **Environment Description** * Hudi version : `0.13.0` * Spark version : `3.3.1` * Hive version : - * Hadoop version : - * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : yes, _Kubernetes_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
