This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5834e0f7a [INLONG-7049][Manager] Support complex data types in Hudi Sink (#7050) 5834e0f7a is described below commit 5834e0f7abeb96c92126a1122f28ced4db287ef0 Author: feat <featzh...@outlook.com> AuthorDate: Mon Dec 26 20:46:19 2022 +0800 [INLONG-7049][Manager] Support complex data types in Hudi Sink (#7050) --- .../inlong/manager/pojo/sink/hudi/HudiType.java | 34 +++++++------ .../resource/sink/hudi/HudiCatalogClient.java | 19 ++++---- .../resource/sink/hudi/HudiTypeConverter.java | 55 ++++++++++++++++++++++ 3 files changed, 84 insertions(+), 24 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java index 672b07f8f..392ca7794 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/hudi/HudiType.java @@ -24,26 +24,30 @@ import lombok.Getter; */ public enum HudiType { - BOOLEAN("boolean"), - INT("int"), - LONG("long"), - FLOAT("float"), - DOUBLE("double"), - DECIMAL("decimal"), - DATE("date"), - TIME("time"), - TIMESTAMP("timestamp"), - TIMESTAMPTZ("timestamptz"), - STRING("string"), - UUID("uuid"), - FIXED("fixed"), - BINARY("binary"); + BOOLEAN("boolean", "boolean"), + INT("int", "int"), + LONG("long", "bigint"), + FLOAT("float", "float"), + DOUBLE("double", "double"), + DATE("date", "date"), + TIME("time", "time(0)"), + TIMESTAMP("timestamp", "timestamp(3)"), + TIMESTAMPT_Z("timestamptz", "timestamp(6)"), + STRING("string", "varchar(" + Integer.MAX_VALUE + ")"), + BINARY("binary", "tinyint"), + UUID("uuid", "uuid"), + FIXED("fixed", null), + DECIMAL("decimal", null); @Getter private final String type; - HudiType(String type) { + @Getter + private final String hiveType; + + HudiType(String type, String hiveType) { this.type = type; + this.hiveType = hiveType; } /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java index 124dcaedb..8055114ad 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiCatalogClient.java @@ -18,7 +18,6 @@ package org.apache.inlong.manager.service.resource.sink.hudi; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -191,14 +190,16 @@ public class HudiCatalogClient { String location = this.warehouse + "/" + dbName + ".db" + "/" + tableName; properties.put("path", location); - List<FieldSchema> cols = new ArrayList<>(); - for (HudiColumnInfo column : tableInfo.getColumns()) { - FieldSchema fieldSchema = new FieldSchema(); - fieldSchema.setName(column.getName()); - fieldSchema.setType(column.getType()); - fieldSchema.setComment(column.getDesc()); - cols.add(fieldSchema); - } + List<FieldSchema> cols = tableInfo.getColumns() + .stream() + .map(column -> { + FieldSchema fieldSchema = new FieldSchema(); + fieldSchema.setName(column.getName()); + fieldSchema.setType(HudiTypeConverter.convert(column)); + fieldSchema.setComment(column.getDesc()); + return fieldSchema; + }) + .collect(Collectors.toList()); // Build storage of hudi table StorageDescriptor sd = new StorageDescriptor(); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java new file mode 100644 index 000000000..ebe24a6e7 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hudi/HudiTypeConverter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.hudi; + +import org.apache.inlong.manager.pojo.sink.hudi.HudiColumnInfo; +import org.apache.inlong.manager.pojo.sink.hudi.HudiType; + +import java.util.Optional; + +/** + * Converter between Java type and Hive type that reflects the behavior before This converter reflects the old behavior + * that includes: + * <ul> + * <li>Use old java.sql.* time classes for time data types. + * <li>Only support millisecond precision for timestamps or day-time intervals. + * <li>Let variable precision and scale for decimal types pass through the planner. + * </ul> + * {@see org.apache.flink.table.types.utils.TypeInfoDataTypeConverter} + */ +public class HudiTypeConverter { + + /** + * Converter field type of column to Hive field type. + */ + public static String convert(HudiColumnInfo column) { + return Optional.ofNullable(column) + .map(col -> HudiType.forType(col.getType())) + .map(hudiType -> { + if (HudiType.DECIMAL == hudiType) { + return String.format("decimal(%d, %d)", column.getPrecision(), column.getScale()); + } else if (HudiType.FIXED == hudiType) { + return String.format("fixed(%d)", column.getLength()); + } else { + return hudiType.getHiveType(); + } + }) + .orElseThrow(() -> new RuntimeException("Can not properly convert type of column: " + column)); + } + +}