This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c0744d432 [INLONG-7525][Manager] Support to save additional info for the ClickHouse field (#7526) c0744d432 is described below commit c0744d432fe38a05877087b2095310d017f934e4 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Tue Mar 7 13:25:14 2023 +0800 [INLONG-7525][Manager] Support to save additional info for the ClickHouse field (#7526) --- .../pojo/node/ck/ClickHouseDataNodeDTO.java | 14 ++++ .../manager/pojo/sink/ck/ClickHouseColumnInfo.java | 37 ----------- .../ck/ClickHouseFieldInfo.java} | 45 ++++++++----- .../manager/pojo/sink/ck/ClickHouseSink.java | 6 ++ .../manager/pojo/sink/ck/ClickHouseSinkDTO.java | 14 +++- .../pojo/sink/ck/ClickHouseSinkRequest.java | 8 ++- .../manager/pojo/sink/ck/ClickHouseTableInfo.java | 4 +- .../node/ck/ClickHouseDataNodeOperator.java | 2 +- .../resource/sink/ck/ClickHouseJdbcUtils.java | 34 +++++----- .../sink/ck/ClickHouseResourceOperator.java | 37 +++++++---- .../resource/sink/ck/ClickHouseSqlBuilder.java | 38 +++++++---- .../service/sink/ck/ClickHouseSinkOperator.java | 75 ++++++++++++++++++++-- 12 files changed, 212 insertions(+), 102 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java index c308b01df..2e1795639 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.pojo.node.ck; import io.swagger.annotations.ApiModel; import lombok.Builder; import lombok.Data; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.JsonUtils; @@ -34,6 +35,8 @@ import javax.validation.constraints.NotNull; @ApiModel("ClickHouse data node info") public class ClickHouseDataNodeDTO { + private static final String CLICKHOUSE_JDBC_PREFIX = "jdbc:clickhouse://"; + /** * Get the dto instance from the request */ @@ -53,4 +56,15 @@ public class ClickHouseDataNodeDTO { } } + /** + * Convert ip:post to jdbcUrl. + */ + public static String convertToJdbcUrl(String url) { + String jdbcUrl = url; + if (StringUtils.isNotBlank(jdbcUrl) && !jdbcUrl.startsWith(CLICKHOUSE_JDBC_PREFIX)) { + jdbcUrl = CLICKHOUSE_JDBC_PREFIX + jdbcUrl; + } + return jdbcUrl; + } + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseColumnInfo.java deleted file mode 100644 index 55fd83509..000000000 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseColumnInfo.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.manager.pojo.sink.ck; - -import lombok.Data; - -/** - * ClickHouse column info. - */ -@Data -public class ClickHouseColumnInfo { - - private String name; - private String type; - private String desc; - private String defaultType; - private String defaultExpr; - - private String compressionCode; - - private String ttlExpr; -} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseFieldInfo.java similarity index 50% copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseFieldInfo.java index c308b01df..76bca6da3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/ck/ClickHouseDataNodeDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseFieldInfo.java @@ -15,42 +15,57 @@ * limitations under the License. */ -package org.apache.inlong.manager.pojo.node.ck; +package org.apache.inlong.manager.pojo.sink.ck; -import io.swagger.annotations.ApiModel; -import lombok.Builder; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.SinkField; import javax.validation.constraints.NotNull; /** - * ClickHouse data node info + * ClickHouse field info. */ @Data -@Builder -@ApiModel("ClickHouse data node info") -public class ClickHouseDataNodeDTO { +@NoArgsConstructor +@AllArgsConstructor +@JsonTypeDefine(value = SinkType.CLICKHOUSE) +public class ClickHouseFieldInfo extends SinkField { + + private String defaultType; + private String defaultExpr; + + private String compressionCode; + + private String ttlExpr; /** * Get the dto instance from the request */ - public static ClickHouseDataNodeDTO getFromRequest(ClickHouseDataNodeRequest request) throws Exception { - return ClickHouseDataNodeDTO.builder().build(); + public static ClickHouseFieldInfo getFromRequest(SinkField sinkField) { + return CommonBeanUtils.copyProperties(sinkField, ClickHouseFieldInfo::new, true); } /** - * Get the dto instance from the JSON string. + * Get the extra param from the Json */ - public static ClickHouseDataNodeDTO getFromJson(@NotNull String extParams) { + public static ClickHouseFieldInfo getFromJson(@NotNull String extParams) { + if (StringUtils.isEmpty(extParams)) { + return new ClickHouseFieldInfo(); + } try { - return JsonUtils.parseObject(extParams, ClickHouseDataNodeDTO.class); + return JsonUtils.parseObject(extParams, ClickHouseFieldInfo.class); } catch (Exception e) { - throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT, - String.format("Failed to parse extParams for ClickHouse node: %s", e.getMessage())); + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + String.format("Failed to parse extParams for ClickHouse fieldInfo: %s", e.getMessage())); } } - } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java index 5114f41cc..e36a907e4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSink.java @@ -87,6 +87,12 @@ public class ClickHouseSink extends StreamSink { @ApiModelProperty("Table order information") private String orderBy; + @ApiModelProperty(value = "Message time-to-live duration") + private Integer ttl; + + @ApiModelProperty(value = "The unit of message's time-to-live duration") + private String ttlUnit; + @ApiModelProperty("Table primary key") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java index 1f10ffe47..161060c65 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkDTO.java @@ -88,6 +88,12 @@ public class ClickHouseSinkDTO { @ApiModelProperty("Table order information") private String orderBy; + @ApiModelProperty(value = "Message time-to-live duration") + private Integer ttl; + + @ApiModelProperty(value = "The unit of message's time-to-live duration") + private String ttlUnit; + @ApiModelProperty("Table primary key") private String primaryKey; @@ -122,6 +128,8 @@ public class ClickHouseSinkDTO { .keyFieldNames(request.getKeyFieldNames()) .engine(request.getEngine()) .partitionBy(request.getPartitionBy()) + .ttl(request.getTtl()) + .ttlUnit(request.getTtlUnit()) .primaryKey(request.getPrimaryKey()) .orderBy(request.getOrderBy()) .encryptVersion(encryptVersion) @@ -140,7 +148,7 @@ public class ClickHouseSinkDTO { } public static ClickHouseTableInfo getClickHouseTableInfo(ClickHouseSinkDTO ckInfo, - List<ClickHouseColumnInfo> columnList) { + List<ClickHouseFieldInfo> fieldInfoList) { ClickHouseTableInfo tableInfo = new ClickHouseTableInfo(); tableInfo.setDbName(ckInfo.getDbName()); tableInfo.setTableName(ckInfo.getTableName()); @@ -148,7 +156,9 @@ public class ClickHouseSinkDTO { tableInfo.setOrderBy(ckInfo.getOrderBy()); tableInfo.setPartitionBy(ckInfo.getPartitionBy()); tableInfo.setPrimaryKey(ckInfo.getPrimaryKey()); - tableInfo.setColumns(columnList); + tableInfo.setTtl(ckInfo.getTtl()); + tableInfo.setTtlUnit(ckInfo.getTtlUnit()); + tableInfo.setFieldInfoList(fieldInfoList); return tableInfo; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java index 2bd7f253c..6f4b113e3 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseSinkRequest.java @@ -23,8 +23,8 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.inlong.manager.common.consts.SinkType; -import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; /** * ClickHouse sink request. @@ -81,6 +81,12 @@ public class ClickHouseSinkRequest extends SinkRequest { @ApiModelProperty("Table order information") private String orderBy; + @ApiModelProperty(value = "Message time-to-live duration") + private Integer ttl; + + @ApiModelProperty(value = "The unit of message's time-to-live duration") + private String ttlUnit; + @ApiModelProperty("Table primary key") private String primaryKey; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java index 7cb6fda01..db7a06ac1 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/ck/ClickHouseTableInfo.java @@ -36,6 +36,8 @@ public class ClickHouseTableInfo { private String partitionBy; private String orderBy; private String primaryKey; + private Integer ttl; + private String ttlUnit; - private List<ClickHouseColumnInfo> columns; + private List<ClickHouseFieldInfo> fieldInfoList; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java index d31c591ba..a6f3553f3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/ck/ClickHouseDataNodeOperator.java @@ -88,7 +88,7 @@ public class ClickHouseDataNodeOperator extends AbstractDataNodeOperator { @Override public Boolean testConnection(DataNodeRequest request) { - String url = request.getUrl(); + String url = ClickHouseDataNodeDTO.convertToJdbcUrl(request.getUrl()); String username = request.getUsername(); String password = request.getToken(); Preconditions.expectNotBlank(url, ErrorCodeEnum.INVALID_PARAMETER, "connection url cannot be empty"); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java index e8c8f1098..2b8f41dfe 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseJdbcUtils.java @@ -18,7 +18,7 @@ package org.apache.inlong.manager.service.resource.sink.ck; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo; +import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +30,7 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.ArrayList; import java.util.List; +import java.util.Objects; /** * Utils for ClickHouse JDBC. @@ -128,28 +129,31 @@ public class ClickHouseJdbcUtils { } /** - * Query ClickHouse columns + * Query ClickHouse field */ - public static List<ClickHouseColumnInfo> getColumns(String url, String user, String password, String dbName, + public static List<ClickHouseFieldInfo> getFields(String url, String user, String password, String dbName, String tableName) throws Exception { String querySql = ClickHouseSqlBuilder.buildDescTableSql(dbName, tableName); try (Connection conn = getConnection(url, user, password); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(querySql)) { - List<ClickHouseColumnInfo> columnList = new ArrayList<>(); + List<ClickHouseFieldInfo> fieldList = new ArrayList<>(); while (rs.next()) { - ClickHouseColumnInfo columnInfo = new ClickHouseColumnInfo(); - columnInfo.setName(rs.getString(1)); - columnInfo.setType(rs.getString(2)); - columnInfo.setDefaultType(rs.getString(3)); - columnInfo.setDefaultExpr(rs.getString(4)); - columnInfo.setDesc(rs.getString(5)); - columnInfo.setCompressionCode(rs.getString(6)); - columnInfo.setTtlExpr(rs.getString(7)); - columnList.add(columnInfo); + ClickHouseFieldInfo fieldInfo = new ClickHouseFieldInfo(); + if (Objects.equals(rs.getString(1), "inlong_ttl_date_time")) { + continue; + } + fieldInfo.setFieldName(rs.getString(1)); + fieldInfo.setFieldType(rs.getString(2)); + fieldInfo.setDefaultType(rs.getString(3)); + fieldInfo.setDefaultExpr(rs.getString(4)); + fieldInfo.setFieldComment(rs.getString(5)); + fieldInfo.setCompressionCode(rs.getString(6)); + fieldInfo.setTtlExpr(rs.getString(7)); + fieldList.add(fieldInfo); } - return columnList; + return fieldList; } } @@ -157,7 +161,7 @@ public class ClickHouseJdbcUtils { * Add columns for ClickHouse table */ public static void addColumns(String url, String user, String password, String dbName, String tableName, - List<ClickHouseColumnInfo> columnList) throws Exception { + List<ClickHouseFieldInfo> columnList) throws Exception { List<String> addColumnSql = ClickHouseSqlBuilder.buildAddColumnsSql(dbName, tableName, columnList); ClickHouseJdbcUtils.executeSqlBatch(addColumnSql, url, user, password); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java index e4abd3890..70e4c09a6 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java @@ -28,9 +28,10 @@ import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO; import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkInfo; -import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo; +import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo; import org.apache.inlong.manager.service.node.DataNodeOperateHelper; @@ -94,7 +95,7 @@ public class ClickHouseResourceOperator implements SinkResourceOperator { ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo) dataNodeHelper.getDataNodeInfo( dataNodeName, sinkInfo.getSinkType()); CommonBeanUtils.copyProperties(dataNodeInfo, ckInfo); - ckInfo.setJdbcUrl(dataNodeInfo.getUrl()); + ckInfo.setJdbcUrl(ClickHouseDataNodeDTO.convertToJdbcUrl(dataNodeInfo.getUrl())); ckInfo.setPassword(dataNodeInfo.getToken()); } return ckInfo; @@ -109,18 +110,11 @@ public class ClickHouseResourceOperator implements SinkResourceOperator { } // set columns - List<ClickHouseColumnInfo> columnList = new ArrayList<>(); - for (StreamSinkFieldEntity field : fieldList) { - ClickHouseColumnInfo columnInfo = new ClickHouseColumnInfo(); - columnInfo.setName(field.getFieldName()); - columnInfo.setType(field.getFieldType()); - columnInfo.setDesc(field.getFieldComment()); - columnList.add(columnInfo); - } + List<ClickHouseFieldInfo> fieldInfoList = getSClickHouseColumnInfoFromSink(fieldList); try { ClickHouseSinkDTO ckInfo = getClickHouseInfo(sinkInfo); - ClickHouseTableInfo tableInfo = ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, columnList); + ClickHouseTableInfo tableInfo = ClickHouseSinkDTO.getClickHouseTableInfo(ckInfo, fieldInfoList); String url = ckInfo.getJdbcUrl(); String user = ckInfo.getUsername(); String password = ckInfo.getPassword(); @@ -140,9 +134,9 @@ public class ClickHouseResourceOperator implements SinkResourceOperator { ClickHouseJdbcUtils.createTable(url, user, password, tableInfo); } else { // 4. table exists, add columns - skip the exists columns - List<ClickHouseColumnInfo> existColumns = ClickHouseJdbcUtils.getColumns(url, + List<ClickHouseFieldInfo> existColumns = ClickHouseJdbcUtils.getFields(url, user, password, dbName, tableName); - List<ClickHouseColumnInfo> needAddColumns = tableInfo.getColumns().stream() + List<ClickHouseFieldInfo> needAddColumns = tableInfo.getFieldInfoList().stream() .skip(existColumns.size()).collect(toList()); if (CollectionUtils.isNotEmpty(needAddColumns)) { ClickHouseJdbcUtils.addColumns(url, user, password, dbName, tableName, needAddColumns); @@ -163,4 +157,21 @@ public class ClickHouseResourceOperator implements SinkResourceOperator { LOGGER.info("success create ClickHouse table for sink id [" + sinkInfo.getId() + "]"); } + public List<ClickHouseFieldInfo> getSClickHouseColumnInfoFromSink(List<StreamSinkFieldEntity> sinkList) { + List<ClickHouseFieldInfo> columnInfoList = new ArrayList<>(); + for (StreamSinkFieldEntity fieldEntity : sinkList) { + if (StringUtils.isNotBlank(fieldEntity.getExtParams())) { + ClickHouseFieldInfo clickHouseFieldInfo = ClickHouseFieldInfo.getFromJson( + fieldEntity.getExtParams()); + CommonBeanUtils.copyProperties(fieldEntity, clickHouseFieldInfo, true); + columnInfoList.add(clickHouseFieldInfo); + } else { + ClickHouseFieldInfo clickHouseFieldInfo = new ClickHouseFieldInfo(); + CommonBeanUtils.copyProperties(fieldEntity, clickHouseFieldInfo, true); + columnInfoList.add(clickHouseFieldInfo); + } + } + return columnInfoList; + } + } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java index 6e9b363d8..c192972e5 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseSqlBuilder.java @@ -18,7 +18,7 @@ package org.apache.inlong.manager.service.resource.sink.ck; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo; +import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,8 +54,16 @@ public class ClickHouseSqlBuilder { String dbTableName = table.getDbName() + "." + table.getTableName(); sql.append("CREATE TABLE ").append(dbTableName); + // add ttl columns + if (table.getTtl() != null && StringUtils.isNotBlank(table.getTtlUnit())) { + ClickHouseFieldInfo clickHouseFieldInfo = new ClickHouseFieldInfo(); + clickHouseFieldInfo.setFieldName("inlong_ttl_date_time"); + clickHouseFieldInfo.setFieldType("DateTime"); + clickHouseFieldInfo.setFieldComment("inlong ttl date time"); + table.getFieldInfoList().add(clickHouseFieldInfo); + } // Construct columns and partition columns - sql.append(buildCreateColumnsSql(table.getColumns())); + sql.append(buildCreateColumnsSql(table.getFieldInfoList())); if (StringUtils.isNotEmpty(table.getEngine())) { sql.append(" ENGINE = ").append(table.getEngine()); } else { @@ -64,8 +72,12 @@ public class ClickHouseSqlBuilder { if (StringUtils.isNotEmpty(table.getOrderBy())) { sql.append(" ORDER BY ").append(table.getOrderBy()); } else if (StringUtils.isEmpty(table.getEngine())) { - sql.append(" ORDER BY ").append(table.getColumns() - .get(FIRST_COLUMN_INDEX).getName()); + sql.append(" ORDER BY ").append(table.getFieldInfoList() + .get(FIRST_COLUMN_INDEX).getFieldName()); + } + if (table.getTtl() != null && StringUtils.isNotBlank(table.getTtlUnit())) { + sql.append(" TTL ").append("inlong_ttl_date_time").append(" + INTERVAL ").append(table.getTtl()).append(" ") + .append(table.getTtlUnit()); } if (StringUtils.isNotEmpty(table.getPartitionBy())) { sql.append(" PARTITION BY ").append(table.getPartitionBy()); @@ -85,7 +97,7 @@ public class ClickHouseSqlBuilder { * Build add column SQL */ public static List<String> buildAddColumnsSql(String dbName, String tableName, - List<ClickHouseColumnInfo> columnList) { + List<ClickHouseFieldInfo> columnList) { String dbTableName = dbName + "." + tableName; List<String> columnInfoList = getColumnsInfo(columnList); List<String> resultList = new ArrayList<>(); @@ -100,7 +112,7 @@ public class ClickHouseSqlBuilder { /** * Build create column SQL */ - private static String buildCreateColumnsSql(List<ClickHouseColumnInfo> columns) { + private static String buildCreateColumnsSql(List<ClickHouseFieldInfo> columns) { List<String> columnList = getColumnsInfo(columns); StringBuilder result = new StringBuilder().append(" (") .append(StringUtils.join(columnList, ",")).append(") "); @@ -110,24 +122,24 @@ public class ClickHouseSqlBuilder { /** * Build column info */ - private static List<String> getColumnsInfo(List<ClickHouseColumnInfo> columns) { + private static List<String> getColumnsInfo(List<ClickHouseFieldInfo> columns) { List<String> columnList = new ArrayList<>(); - for (ClickHouseColumnInfo columnInfo : columns) { + for (ClickHouseFieldInfo columnInfo : columns) { // Construct columns and partition columns - StringBuilder columnStr = new StringBuilder().append(columnInfo.getName()) - .append(" ").append(columnInfo.getType()); + StringBuilder columnStr = new StringBuilder().append(columnInfo.getFieldName()) + .append(" ").append(columnInfo.getFieldType()); if (StringUtils.isNotEmpty(columnInfo.getDefaultType())) { columnStr.append(" ").append(columnInfo.getDefaultType()) .append(" ").append(columnInfo.getDefaultExpr()); } if (StringUtils.isNotEmpty(columnInfo.getCompressionCode())) { - columnStr.append(" CODEC(").append(columnInfo.getDesc()).append(")"); + columnStr.append(" CODEC(").append(columnInfo.getCompressionCode()).append(")"); } if (StringUtils.isNotEmpty(columnInfo.getTtlExpr())) { columnStr.append(" TTL ").append(columnInfo.getTtlExpr()); } - if (StringUtils.isNotEmpty(columnInfo.getDesc())) { - columnStr.append(" COMMENT '").append(columnInfo.getDesc()).append("'"); + if (StringUtils.isNotEmpty(columnInfo.getFieldComment())) { + columnStr.append(" COMMENT '").append(columnInfo.getFieldComment()).append("'"); } columnList.add(columnStr.toString()); } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java index 6a7e80410..14d8b3474 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/ck/ClickHouseSinkOperator.java @@ -18,19 +18,24 @@ package org.apache.inlong.manager.service.sink.ck; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeDTO; import org.apache.inlong.manager.pojo.node.ck.ClickHouseDataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.ck.ClickHouseFieldInfo; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO; import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkRequest; -import org.apache.inlong.manager.common.util.CommonBeanUtils; -import org.apache.inlong.manager.dao.entity.StreamSinkEntity; import org.apache.inlong.manager.service.sink.AbstractSinkOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,6 +43,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import javax.validation.constraints.NotNull; +import java.util.ArrayList; import java.util.List; /** @@ -93,7 +99,7 @@ public class ClickHouseSinkOperator extends AbstractSinkOperator { ClickHouseDataNodeInfo dataNodeInfo = (ClickHouseDataNodeInfo) dataNodeHelper.getDataNodeInfo( entity.getDataNodeName(), entity.getSinkType()); CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); - dto.setJdbcUrl(dataNodeInfo.getUrl()); + dto.setJdbcUrl(ClickHouseDataNodeDTO.convertToJdbcUrl(dataNodeInfo.getUrl())); dto.setPassword(dataNodeInfo.getToken()); } CommonBeanUtils.copyProperties(entity, sink, true); @@ -103,4 +109,65 @@ public class ClickHouseSinkOperator extends AbstractSinkOperator { return sink; } + @Override + public void saveFieldOpt(SinkRequest request) { + List<SinkField> fieldList = request.getSinkFieldList(); + LOGGER.debug("begin to save es sink fields={}", fieldList); + if (CollectionUtils.isEmpty(fieldList)) { + return; + } + int size = fieldList.size(); + List<StreamSinkFieldEntity> entityList = new ArrayList<>(size); + String groupId = request.getInlongGroupId(); + String streamId = request.getInlongStreamId(); + String sinkType = request.getSinkType(); + Integer sinkId = request.getId(); + for (SinkField fieldInfo : fieldList) { + this.checkFieldInfo(fieldInfo); + StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); + if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { + fieldEntity.setFieldComment(fieldEntity.getFieldName()); + } + try { + ClickHouseFieldInfo dto = ClickHouseFieldInfo.getFromRequest(fieldInfo); + fieldEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, + String.format("serialize extParams of ClickHouse FieldInfo failure: %s", e.getMessage())); + } + fieldEntity.setInlongGroupId(groupId); + fieldEntity.setInlongStreamId(streamId); + fieldEntity.setSinkType(sinkType); + fieldEntity.setSinkId(sinkId); + fieldEntity.setIsDeleted(InlongConstants.UN_DELETED); + entityList.add(fieldEntity); + } + + sinkFieldMapper.insertAll(entityList); + LOGGER.debug("success to save es sink fields"); + } + + @Override + public List<SinkField> getSinkFields(Integer sinkId) { + List<StreamSinkFieldEntity> sinkFieldEntities = sinkFieldMapper.selectBySinkId(sinkId); + List<SinkField> fieldList = new ArrayList<>(); + if (CollectionUtils.isEmpty(sinkFieldEntities)) { + return fieldList; + } + sinkFieldEntities.forEach(field -> { + SinkField sinkField = new SinkField(); + if (StringUtils.isNotBlank(field.getExtParams())) { + ClickHouseFieldInfo clickHouseFieldInfo = ClickHouseFieldInfo.getFromJson( + field.getExtParams()); + CommonBeanUtils.copyProperties(field, clickHouseFieldInfo, true); + fieldList.add(clickHouseFieldInfo); + } else { + CommonBeanUtils.copyProperties(field, sinkField, true); + fieldList.add(sinkField); + } + + }); + return fieldList; + } + }