This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5834e0f7a [INLONG-7049][Manager] Support complex data types in Hudi 
Sink (#7050)
5834e0f7a is described below

commit 5834e0f7abeb96c92126a1122f28ced4db287ef0
Author: feat <featzh...@outlook.com>
AuthorDate: Mon Dec 26 20:46:19 2022 +0800

    [INLONG-7049][Manager] Support complex data types in Hudi Sink (#7050)
---
 .../inlong/manager/pojo/sink/hudi/HudiType.java    | 34 +++++++------
 .../resource/sink/hudi/HudiCatalogClient.java      | 19 ++++----
 .../resource/sink/hudi/HudiTypeConverter.java      | 55 ++++++++++++++++++++++
 3 files changed, 84 insertions(+), 24 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
index 672b07f8f..392ca7794 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java
@@ -24,26 +24,30 @@ import lombok.Getter;
  */
 public enum HudiType {
 
-    BOOLEAN("boolean"),
-    INT("int"),
-    LONG("long"),
-    FLOAT("float"),
-    DOUBLE("double"),
-    DECIMAL("decimal"),
-    DATE("date"),
-    TIME("time"),
-    TIMESTAMP("timestamp"),
-    TIMESTAMPTZ("timestamptz"),
-    STRING("string"),
-    UUID("uuid"),
-    FIXED("fixed"),
-    BINARY("binary");
+    BOOLEAN("boolean", "boolean"),
+    INT("int", "int"),
+    LONG("long", "bigint"),
+    FLOAT("float", "float"),
+    DOUBLE("double", "double"),
+    DATE("date", "date"),
+    TIME("time", "time(0)"),
+    TIMESTAMP("timestamp", "timestamp(3)"),
+    TIMESTAMPT_Z("timestamptz", "timestamp(6)"),
+    STRING("string", "varchar(" + Integer.MAX_VALUE + ")"),
+    BINARY("binary", "tinyint"),
+    UUID("uuid", "uuid"),
+    FIXED("fixed", null),
+    DECIMAL("decimal", null);
 
     @Getter
     private final String type;
 
-    HudiType(String type) {
+    @Getter
+    private final String hiveType;
+
+    HudiType(String type, String hiveType) {
         this.type = type;
+        this.hiveType = hiveType;
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
index 124dcaedb..8055114ad 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.resource.sink.hudi;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -191,14 +190,16 @@ public class HudiCatalogClient {
         String location = this.warehouse + "/" + dbName + ".db" + "/" + 
tableName;
         properties.put("path", location);
 
-        List<FieldSchema> cols = new ArrayList<>();
-        for (HudiColumnInfo column : tableInfo.getColumns()) {
-            FieldSchema fieldSchema = new FieldSchema();
-            fieldSchema.setName(column.getName());
-            fieldSchema.setType(column.getType());
-            fieldSchema.setComment(column.getDesc());
-            cols.add(fieldSchema);
-        }
+        List<FieldSchema> cols = tableInfo.getColumns()
+                .stream()
+                .map(column -> {
+                    FieldSchema fieldSchema = new FieldSchema();
+                    fieldSchema.setName(column.getName());
+                    fieldSchema.setType(HudiTypeConverter.convert(column));
+                    fieldSchema.setComment(column.getDesc());
+                    return fieldSchema;
+                })
+                .collect(Collectors.toList());
 
         // Build storage of hudi table
         StorageDescriptor sd = new StorageDescriptor();
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java
new file mode 100644
index 000000000..ebe24a6e7
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.hudi;
+
+import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo;
+import org.apache.inlong.manager.pojo.sink.hudi.HudiType;
+
+import java.util.Optional;
+
+/**
+ * Converter between Java type and Hive type that reflects the behavior before 
This converter reflects the old behavior
+ * that includes:
+ * <ul>
+ * <li>Use old java.sql.* time classes for time data types.
+ * <li>Only support millisecond precision for timestamps or day-time intervals.
+ * <li>Let variable precision and scale for decimal types pass through the 
planner.
+ * </ul>
+ * {@see org.apache.flink.table.types.utils.TypeInfoDataTypeConverter}
+ */
+public class HudiTypeConverter {
+
+    /**
+     * Converter field type of column to Hive field type.
+     */
+    public static String convert(HudiColumnInfo column) {
+        return Optional.ofNullable(column)
+                .map(col -> HudiType.forType(col.getType()))
+                .map(hudiType -> {
+                    if (HudiType.DECIMAL == hudiType) {
+                        return String.format("decimal(%d, %d)", 
column.getPrecision(), column.getScale());
+                    } else if (HudiType.FIXED == hudiType) {
+                        return String.format("fixed(%d)", column.getLength());
+                    } else {
+                        return hudiType.getHiveType();
+                    }
+                })
+                .orElseThrow(() -> new RuntimeException("Can not properly 
convert type of column: " + column));
+    }
+
+}

Reply via email to