This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c0744d432 [INLONG-7525][Manager] Support to save additional info for 
the ClickHouse field (#7526)
c0744d432 is described below

commit c0744d432fe38a05877087b2095310d017f934e4
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Mar 7 13:25:14 2023 +0800

    [INLONG-7525][Manager] Support to save additional info for the ClickHouse 
field (#7526)
---
 .../pojo/node/ck/ClickHouseDataNodeDTO.java        | 14 ++++
 .../manager/pojo/sink/ck/ClickHouseColumnInfo.java | 37 -----------
 .../ck/ClickHouseFieldInfo.java}                   | 45 ++++++++-----
 .../manager/pojo/sink/ck/ClickHouseSink.java       |  6 ++
 .../manager/pojo/sink/ck/ClickHouseSinkDTO.java    | 14 +++-
 .../pojo/sink/ck/ClickHouseSinkRequest.java        |  8 ++-
 .../manager/pojo/sink/ck/ClickHouseTableInfo.java  |  4 +-
 .../node/ck/ClickHouseDataNodeOperator.java        |  2 +-
 .../resource/sink/ck/ClickHouseJdbcUtils.java      | 34 +++++-----
 .../sink/ck/ClickHouseResourceOperator.java        | 37 +++++++----
 .../resource/sink/ck/ClickHouseSqlBuilder.java     | 38 +++++++----
 .../service/sink/ck/ClickHouseSinkOperator.java    | 75 ++++++++++++++++++++--
 12 files changed, 212 insertions(+), 102 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
index c308b01df..2e1795639 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.pojo.node.ck;
 import io.swagger.annotations.ApiModel;
 import lombok.Builder;
 import lombok.Data;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.JsonUtils;
@@ -34,6 +35,8 @@ import javax.validation.constraints.NotNull;
 @ApiModel("ClickHouse data node info")
 public class ClickHouseDataNodeDTO {
 
+    private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse://";
+
     /**
      * Get the dto instance from the request
      */
@@ -53,4 +56,15 @@ public class ClickHouseDataNodeDTO {
         }
     }
 
+    /**
+     * Convert ip:post to jdbcUrl.
+     */
+    public static String convertToJdbcUrl(String url) {
+        String jdbcUrl = url;
+        if (StringUtils.isNotBlank(jdbcUrl) && 
!jdbcUrl.startsWith(CLICKHOUSE_JDBC_PREFIX)) {
+            jdbcUrl = CLICKHOUSE_JDBC_PREFIX + jdbcUrl;
+        }
+        return jdbcUrl;
+    }
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseColumnInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseColumnInfo.java
deleted file mode 100644
index 55fd83509..000000000
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseColumnInfo.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.sink.ck;
-
-import lombok.Data;
-
-/**
- * ClickHouse column info.
- */
-@Data
-public class ClickHouseColumnInfo {
-
-    private String name;
-    private String type;
-    private String desc;
-    private String defaultType;
-    private String defaultExpr;
-
-    private String compressionCode;
-
-    private String ttlExpr;
-}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseFieldInfo.java
similarity index 50%
copy from 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
copy to 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseFieldInfo.java
index c308b01df..76bca6da3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseFieldInfo.java
@@ -15,42 +15,57 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.node.ck;
+package org.apache.inlong.manager.pojo.sink.ck;
 
-import io.swagger.annotations.ApiModel;
-import lombok.Builder;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
 
 import javax.validation.constraints.NotNull;
 
 /**
- * ClickHouse data node info
+ * ClickHouse field info.
  */
 @Data
-@Builder
-@ApiModel("ClickHouse data node info")
-public class ClickHouseDataNodeDTO {
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.CLICKHOUSE)
+public class ClickHouseFieldInfo extends SinkField {
+
+    private String defaultType;
+    private String defaultExpr;
+
+    private String compressionCode;
+
+    private String ttlExpr;
 
     /**
      * Get the dto instance from the request
      */
-    public static ClickHouseDataNodeDTO 
getFromRequest(ClickHouseDataNodeRequest request) throws Exception {
-        return ClickHouseDataNodeDTO.builder().build();
+    public static ClickHouseFieldInfo getFromRequest(SinkField sinkField) {
+        return CommonBeanUtils.copyProperties(sinkField, 
ClickHouseFieldInfo::new, true);
     }
 
     /**
-     * Get the dto instance from the JSON string.
+     * Get the extra param from the Json
      */
-    public static ClickHouseDataNodeDTO getFromJson(@NotNull String extParams) 
{
+    public static ClickHouseFieldInfo getFromJson(@NotNull String extParams) {
+        if (StringUtils.isEmpty(extParams)) {
+            return new ClickHouseFieldInfo();
+        }
         try {
-            return JsonUtils.parseObject(extParams, 
ClickHouseDataNodeDTO.class);
+            return JsonUtils.parseObject(extParams, ClickHouseFieldInfo.class);
         } catch (Exception e) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT,
-                    String.format("Failed to parse extParams for ClickHouse 
node: %s", e.getMessage()));
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for ClickHouse 
fieldInfo: %s", e.getMessage()));
         }
     }
-
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
index 5114f41cc..e36a907e4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java
@@ -87,6 +87,12 @@ public class ClickHouseSink extends StreamSink {
     @ApiModelProperty("Table order information")
     private String orderBy;
 
+    @ApiModelProperty(value = "Message time-to-live duration")
+    private Integer ttl;
+
+    @ApiModelProperty(value = "The unit of message's time-to-live duration")
+    private String ttlUnit;
+
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
index 1f10ffe47..161060c65 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java
@@ -88,6 +88,12 @@ public class ClickHouseSinkDTO {
     @ApiModelProperty("Table order information")
     private String orderBy;
 
+    @ApiModelProperty(value = "Message time-to-live duration")
+    private Integer ttl;
+
+    @ApiModelProperty(value = "The unit of message's time-to-live duration")
+    private String ttlUnit;
+
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
@@ -122,6 +128,8 @@ public class ClickHouseSinkDTO {
                 .keyFieldNames(request.getKeyFieldNames())
                 .engine(request.getEngine())
                 .partitionBy(request.getPartitionBy())
+                .ttl(request.getTtl())
+                .ttlUnit(request.getTtlUnit())
                 .primaryKey(request.getPrimaryKey())
                 .orderBy(request.getOrderBy())
                 .encryptVersion(encryptVersion)
@@ -140,7 +148,7 @@ public class ClickHouseSinkDTO {
     }
 
     public static ClickHouseTableInfo getClickHouseTableInfo(ClickHouseSinkDTO 
ckInfo,
-            List<ClickHouseColumnInfo> columnList) {
+            List<ClickHouseFieldInfo> fieldInfoList) {
         ClickHouseTableInfo tableInfo = new ClickHouseTableInfo();
         tableInfo.setDbName(ckInfo.getDbName());
         tableInfo.setTableName(ckInfo.getTableName());
@@ -148,7 +156,9 @@ public class ClickHouseSinkDTO {
         tableInfo.setOrderBy(ckInfo.getOrderBy());
         tableInfo.setPartitionBy(ckInfo.getPartitionBy());
         tableInfo.setPrimaryKey(ckInfo.getPrimaryKey());
-        tableInfo.setColumns(columnList);
+        tableInfo.setTtl(ckInfo.getTtl());
+        tableInfo.setTtlUnit(ckInfo.getTtlUnit());
+        tableInfo.setFieldInfoList(fieldInfoList);
 
         return tableInfo;
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
index 2bd7f253c..6f4b113e3 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java
@@ -23,8 +23,8 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import org.apache.inlong.manager.common.consts.SinkType;
-import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
 
 /**
  * ClickHouse sink request.
@@ -81,6 +81,12 @@ public class ClickHouseSinkRequest extends SinkRequest {
     @ApiModelProperty("Table order information")
     private String orderBy;
 
+    @ApiModelProperty(value = "Message time-to-live duration")
+    private Integer ttl;
+
+    @ApiModelProperty(value = "The unit of message's time-to-live duration")
+    private String ttlUnit;
+
     @ApiModelProperty("Table primary key")
     private String primaryKey;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
index 7cb6fda01..db7a06ac1 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java
@@ -36,6 +36,8 @@ public class ClickHouseTableInfo {
     private String partitionBy;
     private String orderBy;
     private String primaryKey;
+    private Integer ttl;
+    private String ttlUnit;
 
-    private List<ClickHouseColumnInfo> columns;
+    private List<ClickHouseFieldInfo> fieldInfoList;
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
index d31c591ba..a6f3553f3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java
@@ -88,7 +88,7 @@ public class ClickHouseDataNodeOperator extends 
AbstractDataNodeOperator {
 
     @Override
     public Boolean testConnection(DataNodeRequest request) {
-        String url = request.getUrl();
+        String url = ClickHouseDataNodeDTO.convertToJdbcUrl(request.getUrl());
         String username = request.getUsername();
         String password = request.getToken();
         Preconditions.expectNotBlank(url, ErrorCodeEnum.INVALID_PARAMETER, 
"connection url cannot be empty");
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
index e8c8f1098..2b8f41dfe 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.sink.ck;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,6 +30,7 @@ import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 /**
  * Utils for ClickHouse JDBC.
@@ -128,28 +129,31 @@ public class ClickHouseJdbcUtils {
     }
 
     /**
-     * Query ClickHouse columns
+     * Query ClickHouse field
      */
-    public static List<ClickHouseColumnInfo> getColumns(String url, String 
user, String password, String dbName,
+    public static List<ClickHouseFieldInfo> getFields(String url, String user, 
String password, String dbName,
             String tableName) throws Exception {
 
         String querySql = ClickHouseSqlBuilder.buildDescTableSql(dbName, 
tableName);
         try (Connection conn = getConnection(url, user, password);
                 Statement stmt = conn.createStatement();
                 ResultSet rs = stmt.executeQuery(querySql)) {
-            List<ClickHouseColumnInfo> columnList = new ArrayList<>();
+            List<ClickHouseFieldInfo> fieldList = new ArrayList<>();
             while (rs.next()) {
-                ClickHouseColumnInfo columnInfo = new ClickHouseColumnInfo();
-                columnInfo.setName(rs.getString(1));
-                columnInfo.setType(rs.getString(2));
-                columnInfo.setDefaultType(rs.getString(3));
-                columnInfo.setDefaultExpr(rs.getString(4));
-                columnInfo.setDesc(rs.getString(5));
-                columnInfo.setCompressionCode(rs.getString(6));
-                columnInfo.setTtlExpr(rs.getString(7));
-                columnList.add(columnInfo);
+                ClickHouseFieldInfo fieldInfo = new ClickHouseFieldInfo();
+                if (Objects.equals(rs.getString(1), "inlong_ttl_date_time")) {
+                    continue;
+                }
+                fieldInfo.setFieldName(rs.getString(1));
+                fieldInfo.setFieldType(rs.getString(2));
+                fieldInfo.setDefaultType(rs.getString(3));
+                fieldInfo.setDefaultExpr(rs.getString(4));
+                fieldInfo.setFieldComment(rs.getString(5));
+                fieldInfo.setCompressionCode(rs.getString(6));
+                fieldInfo.setTtlExpr(rs.getString(7));
+                fieldList.add(fieldInfo);
             }
-            return columnList;
+            return fieldList;
         }
     }
 
@@ -157,7 +161,7 @@ public class ClickHouseJdbcUtils {
      * Add columns for ClickHouse table
      */
     public static void addColumns(String url, String user, String password, 
String dbName, String tableName,
-            List<ClickHouseColumnInfo> columnList) throws Exception {
+            List<ClickHouseFieldInfo> columnList) throws Exception {
         List<String> addColumnSql = 
ClickHouseSqlBuilder.buildAddColumnsSql(dbName, tableName, columnList);
         ClickHouseJdbcUtils.executeSqlBatch(addColumnSql, url, user, password);
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
index e4abd3890..70e4c09a6 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
@@ -28,9 +28,10 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkInfo;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
 import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
@@ -94,7 +95,7 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
             ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
                     dataNodeName, sinkInfo.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, ckInfo);
-            ckInfo.setJdbcUrl(dataNodeInfo.getUrl());
+            
ckInfo.setJdbcUrl(ClickHouseDataNodeDTO.convertToJdbcUrl(dataNodeInfo.getUrl()));
             ckInfo.setPassword(dataNodeInfo.getToken());
         }
         return ckInfo;
@@ -109,18 +110,11 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
         }
 
         // set columns
-        List<ClickHouseColumnInfo> columnList = new ArrayList<>();
-        for (StreamSinkFieldEntity field : fieldList) {
-            ClickHouseColumnInfo columnInfo = new ClickHouseColumnInfo();
-            columnInfo.setName(field.getFieldName());
-            columnInfo.setType(field.getFieldType());
-            columnInfo.setDesc(field.getFieldComment());
-            columnList.add(columnInfo);
-        }
+        List<ClickHouseFieldInfo> fieldInfoList = 
getSClickHouseColumnInfoFromSink(fieldList);
 
         try {
             ClickHouseSinkDTO ckInfo = getClickHouseInfo(sinkInfo);
-            ClickHouseTableInfo tableInfo = 
ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, columnList);
+            ClickHouseTableInfo tableInfo = 
ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, fieldInfoList);
             String url = ckInfo.getJdbcUrl();
             String user = ckInfo.getUsername();
             String password = ckInfo.getPassword();
@@ -140,9 +134,9 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
                 ClickHouseJdbcUtils.createTable(url, user, password, 
tableInfo);
             } else {
                 // 4. table exists, add columns - skip the exists columns
-                List<ClickHouseColumnInfo> existColumns = 
ClickHouseJdbcUtils.getColumns(url,
+                List<ClickHouseFieldInfo> existColumns = 
ClickHouseJdbcUtils.getFields(url,
                         user, password, dbName, tableName);
-                List<ClickHouseColumnInfo> needAddColumns = 
tableInfo.getColumns().stream()
+                List<ClickHouseFieldInfo> needAddColumns = 
tableInfo.getFieldInfoList().stream()
                         .skip(existColumns.size()).collect(toList());
                 if (CollectionUtils.isNotEmpty(needAddColumns)) {
                     ClickHouseJdbcUtils.addColumns(url, user, password, 
dbName, tableName, needAddColumns);
@@ -163,4 +157,21 @@ public class ClickHouseResourceOperator implements 
SinkResourceOperator {
         LOGGER.info("success create ClickHouse table for sink id [" + 
sinkInfo.getId() + "]");
     }
 
+    public List<ClickHouseFieldInfo> 
getSClickHouseColumnInfoFromSink(List<StreamSinkFieldEntity> sinkList) {
+        List<ClickHouseFieldInfo> columnInfoList = new ArrayList<>();
+        for (StreamSinkFieldEntity fieldEntity : sinkList) {
+            if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
+                ClickHouseFieldInfo clickHouseFieldInfo = 
ClickHouseFieldInfo.getFromJson(
+                        fieldEntity.getExtParams());
+                CommonBeanUtils.copyProperties(fieldEntity, 
clickHouseFieldInfo, true);
+                columnInfoList.add(clickHouseFieldInfo);
+            } else {
+                ClickHouseFieldInfo clickHouseFieldInfo = new 
ClickHouseFieldInfo();
+                CommonBeanUtils.copyProperties(fieldEntity, 
clickHouseFieldInfo, true);
+                columnInfoList.add(clickHouseFieldInfo);
+            }
+        }
+        return columnInfoList;
+    }
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
index 6e9b363d8..c192972e5 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.resource.sink.ck;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,8 +54,16 @@ public class ClickHouseSqlBuilder {
         String dbTableName = table.getDbName() + "." + table.getTableName();
         sql.append("CREATE TABLE ").append(dbTableName);
 
+        // add ttl columns
+        if (table.getTtl() != null && 
StringUtils.isNotBlank(table.getTtlUnit())) {
+            ClickHouseFieldInfo clickHouseFieldInfo = new 
ClickHouseFieldInfo();
+            clickHouseFieldInfo.setFieldName("inlong_ttl_date_time");
+            clickHouseFieldInfo.setFieldType("DateTime");
+            clickHouseFieldInfo.setFieldComment("inlong ttl date time");
+            table.getFieldInfoList().add(clickHouseFieldInfo);
+        }
         // Construct columns and partition columns
-        sql.append(buildCreateColumnsSql(table.getColumns()));
+        sql.append(buildCreateColumnsSql(table.getFieldInfoList()));
         if (StringUtils.isNotEmpty(table.getEngine())) {
             sql.append(" ENGINE = ").append(table.getEngine());
         } else {
@@ -64,8 +72,12 @@ public class ClickHouseSqlBuilder {
         if (StringUtils.isNotEmpty(table.getOrderBy())) {
             sql.append(" ORDER BY ").append(table.getOrderBy());
         } else if (StringUtils.isEmpty(table.getEngine())) {
-            sql.append(" ORDER BY ").append(table.getColumns()
-                    .get(FIRST_COLUMN_INDEX).getName());
+            sql.append(" ORDER BY ").append(table.getFieldInfoList()
+                    .get(FIRST_COLUMN_INDEX).getFieldName());
+        }
+        if (table.getTtl() != null && 
StringUtils.isNotBlank(table.getTtlUnit())) {
+            sql.append(" TTL ").append("inlong_ttl_date_time").append(" + 
INTERVAL ").append(table.getTtl()).append(" ")
+                    .append(table.getTtlUnit());
         }
         if (StringUtils.isNotEmpty(table.getPartitionBy())) {
             sql.append(" PARTITION BY ").append(table.getPartitionBy());
@@ -85,7 +97,7 @@ public class ClickHouseSqlBuilder {
      * Build add column SQL
      */
     public static List<String> buildAddColumnsSql(String dbName, String 
tableName,
-            List<ClickHouseColumnInfo> columnList) {
+            List<ClickHouseFieldInfo> columnList) {
         String dbTableName = dbName + "." + tableName;
         List<String> columnInfoList = getColumnsInfo(columnList);
         List<String> resultList = new ArrayList<>();
@@ -100,7 +112,7 @@ public class ClickHouseSqlBuilder {
     /**
      * Build create column SQL
      */
-    private static String buildCreateColumnsSql(List<ClickHouseColumnInfo> 
columns) {
+    private static String buildCreateColumnsSql(List<ClickHouseFieldInfo> 
columns) {
         List<String> columnList = getColumnsInfo(columns);
         StringBuilder result = new StringBuilder().append(" (")
                 .append(StringUtils.join(columnList, ",")).append(") ");
@@ -110,24 +122,24 @@ public class ClickHouseSqlBuilder {
     /**
      * Build column info
      */
-    private static List<String> getColumnsInfo(List<ClickHouseColumnInfo> 
columns) {
+    private static List<String> getColumnsInfo(List<ClickHouseFieldInfo> 
columns) {
         List<String> columnList = new ArrayList<>();
-        for (ClickHouseColumnInfo columnInfo : columns) {
+        for (ClickHouseFieldInfo columnInfo : columns) {
             // Construct columns and partition columns
-            StringBuilder columnStr = new 
StringBuilder().append(columnInfo.getName())
-                    .append(" ").append(columnInfo.getType());
+            StringBuilder columnStr = new 
StringBuilder().append(columnInfo.getFieldName())
+                    .append(" ").append(columnInfo.getFieldType());
             if (StringUtils.isNotEmpty(columnInfo.getDefaultType())) {
                 columnStr.append(" ").append(columnInfo.getDefaultType())
                         .append(" ").append(columnInfo.getDefaultExpr());
             }
             if (StringUtils.isNotEmpty(columnInfo.getCompressionCode())) {
-                columnStr.append(" 
CODEC(").append(columnInfo.getDesc()).append(")");
+                columnStr.append(" 
CODEC(").append(columnInfo.getCompressionCode()).append(")");
             }
             if (StringUtils.isNotEmpty(columnInfo.getTtlExpr())) {
                 columnStr.append(" TTL ").append(columnInfo.getTtlExpr());
             }
-            if (StringUtils.isNotEmpty(columnInfo.getDesc())) {
-                columnStr.append(" COMMENT 
'").append(columnInfo.getDesc()).append("'");
+            if (StringUtils.isNotEmpty(columnInfo.getFieldComment())) {
+                columnStr.append(" COMMENT 
'").append(columnInfo.getFieldComment()).append("'");
             }
             columnList.add(columnStr.toString());
         }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
index 6a7e80410..14d8b3474 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java
@@ -18,19 +18,24 @@
 package org.apache.inlong.manager.service.sink.ck;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO;
 import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
 import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkRequest;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +43,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -93,7 +99,7 @@ public class ClickHouseSinkOperator extends 
AbstractSinkOperator {
             ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo) 
dataNodeHelper.getDataNodeInfo(
                     entity.getDataNodeName(), entity.getSinkType());
             CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
-            dto.setJdbcUrl(dataNodeInfo.getUrl());
+            
dto.setJdbcUrl(ClickHouseDataNodeDTO.convertToJdbcUrl(dataNodeInfo.getUrl()));
             dto.setPassword(dataNodeInfo.getToken());
         }
         CommonBeanUtils.copyProperties(entity, sink, true);
@@ -103,4 +109,65 @@ public class ClickHouseSinkOperator extends 
AbstractSinkOperator {
         return sink;
     }
 
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.debug("begin to save es sink fields={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            this.checkFieldInfo(fieldInfo);
+            StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            try {
+                ClickHouseFieldInfo dto = 
ClickHouseFieldInfo.getFromRequest(fieldInfo);
+                fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            } catch (Exception e) {
+                throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                        String.format("serialize extParams of ClickHouse 
FieldInfo failure: %s", e.getMessage()));
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.debug("success to save es sink fields");
+    }
+
+    @Override
+    public List<SinkField> getSinkFields(Integer sinkId) {
+        List<StreamSinkFieldEntity> sinkFieldEntities = 
sinkFieldMapper.selectBySinkId(sinkId);
+        List<SinkField> fieldList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+            return fieldList;
+        }
+        sinkFieldEntities.forEach(field -> {
+            SinkField sinkField = new SinkField();
+            if (StringUtils.isNotBlank(field.getExtParams())) {
+                ClickHouseFieldInfo clickHouseFieldInfo = 
ClickHouseFieldInfo.getFromJson(
+                        field.getExtParams());
+                CommonBeanUtils.copyProperties(field, clickHouseFieldInfo, 
true);
+                fieldList.add(clickHouseFieldInfo);
+            } else {
+                CommonBeanUtils.copyProperties(field, sinkField, true);
+                fieldList.add(sinkField);
+            }
+
+        });
+        return fieldList;
+    }
+
 }

Reply via email to