[ https://issues.apache.org/jira/browse/HUDI-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Well Tang updated HUDI-3184: ---------------------------- Status: In Progress (was: Open) > hudi-flink support timestamp-micros > ----------------------------------- > > Key: HUDI-3184 > URL: https://issues.apache.org/jira/browse/HUDI-3184 > Project: Apache Hudi > Issue Type: Improvement > Components: Flink Integration > Reporter: Well Tang > Assignee: Well Tang > Priority: Major > Labels: pull-request-available > Fix For: 0.11.0 > > Attachments: 1.png, 2.png, 3.png > > Original Estimate: 5h > Remaining Estimate: 5h > > {*}Problem overview{*}: > Steps to reproduce the behavior: > ①The spark engine is used to write data into the hoodie table(PS: There are > timestamp type columns in the dataset field). > ②Use the Flink engine to read the hoodie table written in step 1. > *Expected behavior* > Caused by: java.lang.IllegalArgumentException: Avro does not support > TIMESTAMP type with precision: 6, it only supports precision less than 3. > at > org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:221) > ~... > at > org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:263) > ~... > at > org.apache.hudi.util.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:169) > ~... > at > org.apache.hudi.table.HoodieTableFactory.inferAvroSchema(HoodieTableFactory.java:239) > ~... > at > org.apache.hudi.table.HoodieTableFactory.setupConfOptions(HoodieTableFactory.java:155) > ~... > at > org.apache.hudi.table.HoodieTableFactory.createDynamicTableSource(HoodieTableFactory.java:65) > ~... > *Environment Description* > Hudi version : 0.11.0-SNAPSHOT > Spark version : 3.1.2 > Flink version : 1.13.1 > Hive version : None > Hadoop version : 2.9.2 > Storage (HDFS/S3/GCS..) : HDFS > Running on Docker? (yes/no) : None > *Additional context* > We are using hoodie as a data lake to deliver projects to customers. We found > such application scenarios: write data to the hoodie table through the spark > engine, and then read data from the hoodie table through the finlk engine. > It should be noted that the above exception will be caused by how to write to > the column containing the timestamp in the dataset. > In order to simplify the description of the problem, we summarize the problem > into the following steps: > 【step-1】Mock data: > {code:java} > /home/deploy/spark-3.1.2-bin-hadoop2.7/bin/spark-shell \ > --driver-class-path /home/workflow/apache-hive-2.3.8-bin/conf/ \ > --master spark://2-120:7077 \ > --executor-memory 4g \ > --driver-memory 4g \ > --num-executors 4 \ > --total-executor-cores 4 \ > --name test \ > --jars > /home/deploy/spark-3.1.2-bin-hadoop2.7/jars/hudi-spark3-bundle_2.12-0.11.0-SNAPSHOT.jar,/home/deploy/spark-3.1.2-bin-hadoop2.7/jars/spark-avro_2.12-3.1.2.jar > \ > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ > --conf spark.sql.legacy.parquet.datetimeRebaseModeInRead=CORRECTED \ > --conf spark.sql.hive.convertMetastoreParquet=false {code} > {code:java} > val df = spark.sql("select 1 as id, 'A' as name, current_timestamp as dt") > df.write.format("hudi"). > option("hoodie.datasource.write.recordkey.field", "id"). > option("hoodie.datasource.write.precombine.field", "id"). > option("hoodie.datasource.write.keygenerator.class", > "org.apache.hudi.keygen.NonpartitionedKeyGenerator"). > option("hoodie.upsert.shuffle.parallelism", "2"). > option("hoodie.table.name", "timestamp_table"). > mode("append"). > save("/hudi/suite/data_type_timestamp_table") > spark.read.format("hudi").load("/hudi/suite/data_type_timestamp_table").show(false) > {code} > 【step-2】Consumption data through flink: > {code:java} > bin/sql-client.sh embedded -j lib/hudi-flink-bundle_2.12-0.11.0-SNAPSHOT.jar > {code} > {code:java} > create table data_type_timestamp_table ( > `id` INT, > `name` STRING, > `dt` TIMESTAMP(6) > ) with ( > 'connector' = 'hudi', > 'hoodie.table.name' = 'data_type_timestamp_table', > 'read.streaming.enabled' = 'true', > 'hoodie.datasource.write.recordkey.field' = 'id', > 'path' = '/hudi/suite/data_type_timestamp_table', > 'read.streaming.check-interval' = '10', > 'table.type' = 'COPY_ON_WRITE', > 'write.precombine.field' = 'id' > ); > select * from data_type_timestamp_table; {code} > As shown below: > !1.png! > If we changge timestamp (6) to timestamp (3),the result is as follows: > !2.png! > The data can be found here, but the display is incorrect! > After checking It is found in the Hoodie directory that the spark write > timestamp type is timestamp micros: > !3.png! > However, the timestamp type of hook reading and writing Hoodie data is > timestamp-millis!Therefore, it is problematic for us to read and write > timestamp types through Spark and Flink computing engines. We hope that > hudi-flink module needs to support timestamp micros and cannot lose time > accuracy. -- This message was sent by Atlassian Jira (v8.20.1#820001)