This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ae40b5c9f [INLONG-6938][Manager] Supports to create a starRocks 
database or table (#6939)
ae40b5c9f is described below

commit ae40b5c9f2200c900b728764429fa7df1613735a
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Mon Dec 19 16:29:41 2022 +0800

    [INLONG-6938][Manager] Supports to create a starRocks database or table 
(#6939)
---
 .../pojo/sink/starrocks/StarRocksColumnInfo.java   |  67 +++++++
 .../manager/pojo/sink/starrocks/StarRocksSink.java |   9 +
 .../pojo/sink/starrocks/StarRocksSinkDTO.java      |  39 +++-
 .../pojo/sink/starrocks/StarRocksSinkRequest.java  |  10 +
 .../pojo/sink/starrocks/StarRocksTableInfo.java    |  46 +++++
 .../sink/starrocks/StarRocksJdbcUtils.java         | 217 +++++++++++++++++++++
 .../sink/starrocks/StarRocksResourceOperator.java  | 134 +++++++++++++
 .../sink/starrocks/StarRocksSqlBuilder.java        | 217 +++++++++++++++++++++
 .../sink/starrocks/StarRocksSinkOperator.java      |  73 ++++++-
 9 files changed, 807 insertions(+), 5 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java
new file mode 100644
index 000000000..ba0a3731f
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.starrocks;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * StarRocks column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonTypeDefine(value = SinkType.STARROCKS)
+public class StarRocksColumnInfo extends SinkField {
+
+    private Boolean isSortKey = false;
+
+    private Boolean isDistributed = false;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static StarRocksColumnInfo getFromRequest(SinkField sinkField) {
+        return CommonBeanUtils.copyProperties(sinkField, 
StarRocksColumnInfo::new, true);
+    }
+
+    /**
+     * Get the extra param from the Json
+     */
+    public static StarRocksColumnInfo getFromJson(@NotNull String extParams) {
+        if (StringUtils.isEmpty(extParams)) {
+            return new StarRocksColumnInfo();
+        }
+        try {
+            return JsonUtils.parseObject(extParams, StarRocksColumnInfo.class);
+        } catch (Exception e) {
+            throw new 
BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
index 64820cbf3..1afaf389f 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java
@@ -75,6 +75,15 @@ public class StarRocksSink extends StreamSink {
     @ApiModelProperty("The multiple table-pattern of sink")
     private String tablePattern;
 
+    @ApiModelProperty("The table engine,  like: OLAP, MYSQL, ELASTICSEARCH, 
etc, default is OLAP")
+    private String tableEngine;
+
+    @ApiModelProperty("The table replication num")
+    private Integer replicationNum;
+
+    @ApiModelProperty("The table barrel size")
+    private Integer barrelSize;
+
     public StarRocksSink() {
         this.setSinkType(SinkType.STARROCKS);
     }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
index 36496bbb8..9b627c400 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java
@@ -18,9 +18,6 @@
 package org.apache.inlong.manager.pojo.sink.starrocks;
 
 import io.swagger.annotations.ApiModelProperty;
-import java.nio.charset.StandardCharsets;
-import java.util.Map;
-import javax.validation.constraints.NotNull;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -31,6 +28,11 @@ import 
org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.AESUtils;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
+import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
 /**
  * Sink info of StarRocks
  */
@@ -73,6 +75,15 @@ public class StarRocksSinkDTO {
     @ApiModelProperty("The multiple table-pattern of sink")
     private String tablePattern;
 
+    @ApiModelProperty("The table engine,  like: OLAP, MYSQL, ELASTICSEARCH, 
etc, default is OLAP")
+    private String tableEngine = "OLAP";
+
+    @ApiModelProperty("The table replication num")
+    private Integer replicationNum = 3;
+
+    @ApiModelProperty("The table barrel size")
+    private Integer barrelSize = 8;
+
     @ApiModelProperty("Password encrypt version")
     private Integer encryptVersion;
 
@@ -100,6 +111,9 @@ public class StarRocksSinkDTO {
                 .sinkMultipleFormat(request.getSinkMultipleFormat())
                 .databasePattern(request.getDatabasePattern())
                 .tablePattern(request.getTablePattern())
+                .tableEngine(request.getTableEngine())
+                .replicationNum(request.getReplicationNum())
+                .barrelSize(request.getBarrelSize())
                 .encryptVersion(encryptVersion)
                 .properties(request.getProperties())
                 .build();
@@ -113,6 +127,25 @@ public class StarRocksSinkDTO {
         }
     }
 
+    /**
+     * Get StarRocks table info
+     *
+     * @param sinkDTO StarRocks sink dto,{@link StarRocksSinkDTO}
+     * @param columnList StarRocks column info list,{@link StarRocksColumnInfo}
+     * @return {@link StarRocksTableInfo}
+     */
+    public static StarRocksTableInfo getTableInfo(StarRocksSinkDTO sinkDTO, 
List<StarRocksColumnInfo> columnList) {
+        StarRocksTableInfo tableInfo = new StarRocksTableInfo();
+        tableInfo.setDbName(sinkDTO.getDatabaseName());
+        tableInfo.setTableName(sinkDTO.getTableName());
+        tableInfo.setColumns(columnList);
+        tableInfo.setPrimaryKey(sinkDTO.getPrimaryKey());
+        tableInfo.setTableEngine(sinkDTO.getTableEngine());
+        tableInfo.setReplicationNum(sinkDTO.getReplicationNum());
+        tableInfo.setBarrelSize(sinkDTO.getBarrelSize());
+        return tableInfo;
+    }
+
     private StarRocksSinkDTO decryptPassword() throws Exception {
         if (StringUtils.isNotEmpty(this.password)) {
             byte[] passwordBytes = AESUtils.decryptAsString(this.password, 
this.encryptVersion);
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
index 1956021ef..8c4bb74cb 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java
@@ -68,4 +68,14 @@ public class StarRocksSinkRequest extends SinkRequest {
 
     @ApiModelProperty("The multiple table-pattern of sink")
     private String tablePattern;
+
+    @ApiModelProperty("The table engine,  like: OLAP, MYSQL, ELASTICSEARCH, 
etc, default is OLAP")
+    private String tableEngine = "OLAP";
+
+    @ApiModelProperty("The table replication num")
+    private Integer replicationNum = 3;
+
+    @ApiModelProperty("The table barrel size")
+    private Integer barrelSize = 8;
+
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java
new file mode 100644
index 000000000..318eec070
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.starrocks;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * StarRocks table info.
+ */
+@Data
+public class StarRocksTableInfo {
+
+    private String dbName;
+
+    private String tableName;
+
+    private String comment;
+
+    private String primaryKey;
+
+    private String tableEngine;
+
+    private Integer replicationNum;
+
+    private Integer barrelSize;
+
+    private List<StarRocksColumnInfo> columns;
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
new file mode 100644
index 000000000..b69809a41
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.starrocks;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hive.jdbc.HiveDatabaseMetaData;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class StarRocksJdbcUtils {
+
+    private static final String STAR_ROCKS_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    private static final String METADATA_TYPE = "TABLE";
+    private static final String COLUMN_LABEL = "TABLE_NAME";
+    private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql";
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(StarRocksJdbcUtils.class);
+
+    /**
+     * Get starRocks connection from starRocks url and user
+     */
+    public static Connection getConnection(String url, String user, String 
password) throws Exception {
+        if (StringUtils.isBlank(url) || 
!url.startsWith(STAR_ROCKS_JDBC_PREFIX)) {
+            throw new Exception("starRocks server url should start with " + 
STAR_ROCKS_JDBC_PREFIX);
+        }
+        Connection conn;
+        try {
+            Class.forName(STAR_ROCKS_DRIVER_CLASS);
+            conn = DriverManager.getConnection(url, user, password);
+            LOGGER.info("get star rocks connection success, url={}", url);
+            return conn;
+        } catch (Exception e) {
+            String errMsg = "get star rocks connection error, please check 
starRocks jdbc url, username or password";
+            LOGGER.error(errMsg, e);
+            throw new Exception(errMsg + ", error: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Execute sql on the specified starRocks Server
+     *
+     * @param sql need to execute
+     * @param url url of starRocks server
+     * @param user user of starRocks server
+     * @param password password of starRocks server
+     * @throws Exception when executing error
+     */
+    public static void executeSql(String sql, String url, String user, String 
password) throws Exception {
+        try (Connection conn = getConnection(url, user, password);
+                Statement stmt = conn.createStatement()) {
+            stmt.execute(sql);
+            LOGGER.info("execute sql [{}] success", sql);
+        }
+    }
+
+    /**
+     * Create StarRocks database
+     */
+    public static void createDb(String url, String username, String password, 
String dbName) throws Exception {
+        String createDbSql = StarRocksSqlBuilder.buildCreateDbSql(dbName);
+        executeSql(createDbSql, url, username, password);
+    }
+
+    /**
+     * Create StarRocks table
+     */
+    public static void createTable(String url, String username, String 
password, StarRocksTableInfo tableInfo)
+            throws Exception {
+        if (checkTablesExist(url, username, password, tableInfo.getDbName(), 
tableInfo.getTableName())) {
+            LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
+        } else {
+            String createTableSql = 
StarRocksSqlBuilder.buildCreateTableSql(tableInfo);
+            StarRocksJdbcUtils.executeSql(createTableSql, url, username, 
password);
+            LOGGER.info("execute sql [{}] success", createTableSql);
+        }
+    }
+
+    /**
+     * Get Hive tables from the Hive metadata
+     */
+    public static List<String> getTables(String url, String user, String 
password, String dbName) throws Exception {
+        try (Connection conn = getConnection(url, user, password)) {
+            HiveDatabaseMetaData metaData = (HiveDatabaseMetaData) 
conn.getMetaData();
+            ResultSet rs = metaData.getTables(dbName, dbName, null, new 
String[]{METADATA_TYPE});
+            List<String> tables = new ArrayList<>();
+            while (rs.next()) {
+                String tableName = rs.getString(COLUMN_LABEL);
+                tables.add(tableName);
+            }
+            rs.close();
+
+            return tables;
+        }
+    }
+
+    /**
+     * Add columns for StarRocks table
+     */
+    public static void addColumns(String url, String user, String password, 
String dbName, String tableName,
+            List<StarRocksColumnInfo> columnList) throws Exception {
+        final List<StarRocksColumnInfo> columnInfos = Lists.newArrayList();
+
+        for (StarRocksColumnInfo columnInfo : columnList) {
+            if (!checkColumnExist(url, user, password, dbName, tableName, 
columnInfo.getFieldName())) {
+                columnInfos.add(columnInfo);
+            }
+        }
+        List<String> addColumnSql = 
StarRocksSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
+        try (Connection conn = getConnection(url, user, password)) {
+            executeSqlBatch(conn, addColumnSql);
+        }
+    }
+
+    /**
+     * Check tables from the StarRocks information_schema.
+     *
+     * @param url jdbcUrl
+     * @param user username
+     * @param password password
+     * @param dbName database name
+     * @param tableName table name
+     * @return true if table exist, otherwise false
+     * @throws Exception on check table exist error
+     */
+    public static boolean checkTablesExist(String url, String user, String 
password, String dbName,
+            String tableName) throws Exception {
+        boolean result = false;
+        final String checkTableSql = StarRocksSqlBuilder.getCheckTable(dbName, 
tableName);
+        try (Connection conn = getConnection(url, user, password);
+                Statement stmt = conn.createStatement()) {
+            ResultSet resultSet = stmt.executeQuery(checkTableSql);
+            if (Objects.nonNull(resultSet)) {
+                if (resultSet.next()) {
+                    result = true;
+                }
+            }
+        }
+        LOGGER.info("check table exist for db={} table={}, result={}", dbName, 
tableName, result);
+        return result;
+    }
+
+    /**
+     * Check whether the column exists in the StarRocks table.
+     *
+     * @param url jdbcUrl
+     * @param user username
+     * @param password password
+     * @param dbName database name
+     * @param tableName table name
+     * @param column table column name
+     * @return true if column exist in the table, otherwise false
+     * @throws Exception on check column exist error
+     */
+    public static boolean checkColumnExist(String url, String user, String 
password, String dbName,
+            final String tableName, final String column) throws Exception {
+        boolean result = false;
+        final String checkTableSql = 
StarRocksSqlBuilder.getCheckColumn(dbName, tableName, column);
+        try (Connection conn = getConnection(url, user, password);
+                Statement stmt = conn.createStatement();
+                ResultSet resultSet = stmt.executeQuery(checkTableSql)) {
+            if (Objects.nonNull(resultSet)) {
+                if (resultSet.next()) {
+                    result = true;
+                }
+            }
+        }
+        LOGGER.info("check column exist for db={} table={}, result={} 
column={}", dbName, tableName, result, column);
+        return result;
+    }
+
+    /**
+     * Execute batch query SQL on StarRocks.
+     *
+     * @param conn JDBC {@link Connection}
+     * @param sqls SQL to be executed
+     * @throws Exception on get execute SQL batch error
+     */
+    public static void executeSqlBatch(final Connection conn, final 
List<String> sqls) throws Exception {
+        conn.setAutoCommit(false);
+        try (Statement stmt = conn.createStatement()) {
+            for (String entry : sqls) {
+                stmt.execute(entry);
+            }
+            conn.commit();
+            LOGGER.info("execute sql [{}] success", sqls);
+        } finally {
+            conn.setAutoCommit(true);
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
new file mode 100644
index 000000000..b657481b7
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.starrocks;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import 
org.apache.inlong.manager.service.resource.sink.mysql.MySQLResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * StarRocks resource operator.
+ */
+@Service
+public class StarRocksResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySQLResourceOperator.class);
+
+    @Autowired
+    private StreamSinkService sinkService;
+
+    @Autowired
+    private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.STARROCKS.equals(sinkType);
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId());
+        if 
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOG.warn("sink resource [" + sinkInfo.getId() + "] already 
success, skip to create");
+            return;
+        } else if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
 {
+            LOG.warn("create resource was disabled, skip to create for [" + 
sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTable(sinkInfo);
+    }
+
+    /**
+     * Create starRocks table by SinkInfo.
+     *
+     * @param sinkInfo {@link SinkInfo}
+     */
+    private void createTable(SinkInfo sinkInfo) {
+        LOG.info("begin to create starRocks table for sinkId={}", 
sinkInfo.getId());
+        List<StreamSinkFieldEntity> fieldList = 
fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+        if (CollectionUtils.isEmpty(fieldList)) {
+            LOG.warn("no starRocks fields found, skip to create table for 
sinkId={}", sinkInfo.getId());
+        }
+        // get columns
+        List<StarRocksColumnInfo> columnList = 
getStarRocksColumnInfoFromSink(fieldList);
+
+        StarRocksSinkDTO sinkDTO = 
StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams());
+        StarRocksTableInfo tableInfo = StarRocksSinkDTO.getTableInfo(sinkDTO, 
columnList);
+        String url = sinkDTO.getJdbcUrl();
+        String username = sinkDTO.getUsername();
+        String password = sinkDTO.getPassword();
+        String dbName = sinkDTO.getDatabaseName();
+        String tableName = sinkDTO.getTableName();
+        try {
+            // 1. create database if not exists
+            StarRocksJdbcUtils.createDb(url, username, password, dbName);
+            String dbUrl = url + "/" + dbName;
+            // 2. table not exists, create it
+            StarRocksJdbcUtils.createTable(dbUrl, username, password, 
tableInfo);
+            // 3. table exists, add columns - skip the exists columns
+            StarRocksJdbcUtils.addColumns(dbUrl, username, password, dbName, 
tableName, columnList);
+            // 4. update the sink status to success
+            String info = "success to create StarRocks resource";
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOG.info(info + " for sinkInfo={}", sinkInfo);
+        } catch (Throwable e) {
+            String errMsg = "create StarRocks table failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        }
+        LOG.info("success create StarRocks table for data sink [" + 
sinkInfo.getId() + "]");
+    }
+
+    public List<StarRocksColumnInfo> 
getStarRocksColumnInfoFromSink(List<StreamSinkFieldEntity> sinkList) {
+        List<StarRocksColumnInfo> columnInfoList = new ArrayList<>();
+        for (StreamSinkFieldEntity fieldEntity : sinkList) {
+            if (StringUtils.isNotBlank(fieldEntity.getExtParams())) {
+                StarRocksColumnInfo starRocksColumnInfo = 
StarRocksColumnInfo.getFromJson(
+                        fieldEntity.getExtParams());
+                CommonBeanUtils.copyProperties(fieldEntity, 
starRocksColumnInfo, true);
+                columnInfoList.add(starRocksColumnInfo);
+            } else {
+                StarRocksColumnInfo starRocksColumnInfo = new 
StarRocksColumnInfo();
+                CommonBeanUtils.copyProperties(fieldEntity, 
starRocksColumnInfo, true);
+                columnInfoList.add(starRocksColumnInfo);
+            }
+        }
+        return columnInfoList;
+    }
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java
new file mode 100644
index 000000000..ad5271be1
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.starrocks;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo;
+import org.apache.inlong.manager.service.resource.sink.hive.SqlBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Builder for SQL string
+ */
+public class StarRocksSqlBuilder {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SqlBuilder.class);
+
+    /**
+     * Build create database SQL
+     */
+    public static String buildCreateDbSql(String dbName) {
+        // Support _ beginning with underscore
+        String sql = "CREATE DATABASE IF NOT EXISTS `" + dbName + "`";
+        LOGGER.info("create db sql: {}", sql);
+        return sql;
+    }
+
+    /**
+     * Build create table SQL
+     */
+    public static String buildCreateTableSql(StarRocksTableInfo table) {
+        StringBuilder sql = new StringBuilder();
+        sql.append("CREATE TABLE ").append(table.getTableName());
+        // Construct columns and partition columns
+        sql.append(getColumnsAndComments(table));
+        if (!StringUtils.isEmpty(table.getPrimaryKey())) {
+            sql.append(", PRIMARY KEY (")
+                    .append(table.getPrimaryKey())
+                    .append(")");
+        }
+        if (!Objects.isNull(table.getReplicationNum())) {
+            sql.append(" PROPERTIES ( \"replication_num\" = \"")
+                    .append(table.getReplicationNum())
+                    .append("\")");
+        }
+
+        LOGGER.info("create table sql: {}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build query table SQL
+     */
+    public static String buildDescTableSql(String dbName, String tableName) {
+        StringBuilder sql = new StringBuilder();
+        String fullTableName = "`" + dbName + "." + tableName + "`";
+        sql.append("DESC ").append(fullTableName);
+
+        LOGGER.info("desc table sql={}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build add columns SQL.
+     *
+     * @param tableName StarRocks table name
+     * @param columnList StarRocks column list {@link List}
+     * @return add column SQL string list
+     */
+    public static List<String> buildAddColumnsSql(String dbName, String 
tableName,
+            List<StarRocksColumnInfo> columnList) {
+        final List<String> columnInfoList = getColumnsInfo(columnList);
+        final List<String> resultList = new ArrayList<>();
+        final StringBuilder sqlBuilder = new StringBuilder();
+        columnInfoList.forEach(columnInfo -> {
+            sqlBuilder.append("ALTER TABLE ")
+                    .append(dbName)
+                    .append(".")
+                    .append(tableName)
+                    .append(" ADD COLUMN ")
+                    .append(columnInfo)
+                    .append(";");
+            resultList.add(sqlBuilder.toString());
+            sqlBuilder.delete(0, sqlBuilder.length());
+        });
+        return resultList;
+    }
+
+    /**
+     * Build column info by StarRocksColumnInfo list.
+     *
+     * @param columns StarRocks column info {@link StarRocksColumnInfo} list
+     * @return the SQL list
+     */
+    private static List<String> getColumnsInfo(List<StarRocksColumnInfo> 
columns) {
+        final List<String> columnList = new ArrayList<>();
+        final StringBuilder columnBuilder = new StringBuilder();
+        columns.forEach(columnInfo -> {
+            columnBuilder.append("`")
+                    .append(columnInfo.getFieldName())
+                    .append("`")
+                    .append(" ")
+                    .append(columnInfo.getFieldType());
+            if (!StringUtils.isEmpty(columnInfo.getFieldComment())) {
+                columnBuilder.append(" COMMENT '")
+                        .append(columnInfo.getFieldComment())
+                        .append("'");
+            }
+            columnBuilder.append(" ");
+            columnList.add(columnBuilder.toString());
+            columnBuilder.delete(0, columnBuilder.length());
+        });
+        return columnList;
+    }
+
+    /**
+     * Get columns and table comment string for create table SQL.
+     *
+     * For example: col_name data_type [COMMENT col_comment], col_name 
data_type [COMMENT col_comment]....
+     */
+    private static String getColumnsAndComments(StarRocksTableInfo tableInfo) {
+        List<StarRocksColumnInfo> columns = tableInfo.getColumns();
+        List<String> columnList = new ArrayList<>();
+        List<String> sortKeyList = new ArrayList<>();
+        List<String> distributeList = new ArrayList<>();
+        for (StarRocksColumnInfo columnInfo : columns) {
+            // Construct columns and partition columns
+            StringBuilder columnStr = new 
StringBuilder().append("`").append(columnInfo.getFieldName()).append("` ")
+                    .append(columnInfo.getFieldType());
+            if (StringUtils.isNotEmpty(columnInfo.getFieldComment())) {
+                columnStr.append(" COMMENT 
").append("'").append(columnInfo.getFieldComment()).append("'");
+            }
+
+            if (columnInfo.getIsDistributed()) {
+                distributeList.add("`" + columnInfo.getFieldName() + "` ");
+            }
+            if (columnInfo.getIsSortKey()) {
+                sortKeyList.add("`" + columnInfo.getFieldName() + "` ");
+            }
+            columnList.add(columnStr.toString());
+        }
+        StringBuilder result = new StringBuilder().append(" 
(").append(StringUtils.join(columnList, ",")).append(") ");
+        // set partitions
+        if (sortKeyList.size() > 0) {
+            result.append("DUPLICATE KEY 
(").append(StringUtils.join(sortKeyList, ",")).append(") ");
+        }
+        if (distributeList.size() <= 0 && columns.size() > 0) {
+            distributeList.add("`" + columns.get(0).getFieldName() + "` ");
+        }
+        result.append("DISTRIBUTED BY HASH 
(").append(StringUtils.join(distributeList, ",")).append(") ")
+                .append("BUCKETS ")
+                .append(tableInfo.getBarrelSize());
+        return result.toString();
+    }
+
+    /**
+     * Build SQL to check whether the table exists.
+     *
+     * @param dbName StarRocks database name
+     * @param tableName StarRocks table name
+     * @return the check table SQL string
+     */
+    public static String getCheckTable(String dbName, String tableName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("select table_schema,table_name ")
+                .append(" from information_schema.tables where table_schema = 
'")
+                .append(dbName)
+                .append("' and table_name = '")
+                .append(tableName)
+                .append("' ;");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build SQL to check whether the column exists.
+     *
+     * @param dbName StarRocks database name
+     * @param tableName StarRocks table name
+     * @param columnName StarRocks column name
+     * @return the check column SQL string
+     */
+    public static String getCheckColumn(String dbName, String tableName, 
String columnName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ")
+                .append(" from  information_schema.COLUMNS where 
table_schema='")
+                .append(dbName)
+                .append("' and table_name = '")
+                .append(tableName)
+                .append("' and column_name = '")
+                .append(columnName)
+                .append("';");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
index 5d8785620..e9ff58717 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java
@@ -18,17 +18,20 @@
 package org.apache.inlong.manager.service.sink.starrocks;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.List;
-import javax.validation.constraints.NotNull;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO;
 import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkRequest;
@@ -38,6 +41,10 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.validation.constraints.NotNull;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * StarRocks sink operator, such as save or update StarRocks field, etc.
  */
@@ -90,4 +97,66 @@ public class StarRocksSinkOperator extends 
AbstractSinkOperator {
         return sink;
     }
 
+    @Override
+    public void saveFieldOpt(SinkRequest request) {
+        List<SinkField> fieldList = request.getSinkFieldList();
+        LOGGER.info("begin to save es sink fields={}", fieldList);
+        if (CollectionUtils.isEmpty(fieldList)) {
+            return;
+        }
+
+        int size = fieldList.size();
+        List<StreamSinkFieldEntity> entityList = new ArrayList<>(size);
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
+        String sinkType = request.getSinkType();
+        Integer sinkId = request.getId();
+        for (SinkField fieldInfo : fieldList) {
+            this.checkFieldInfo(fieldInfo);
+            StreamSinkFieldEntity fieldEntity = 
CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new);
+            if (StringUtils.isEmpty(fieldEntity.getFieldComment())) {
+                fieldEntity.setFieldComment(fieldEntity.getFieldName());
+            }
+            try {
+                StarRocksColumnInfo dto = 
StarRocksColumnInfo.getFromRequest(fieldInfo);
+                fieldEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            } catch (Exception e) {
+                LOGGER.error("parsing json string to sink field info failed", 
e);
+                throw new 
BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage());
+            }
+            fieldEntity.setInlongGroupId(groupId);
+            fieldEntity.setInlongStreamId(streamId);
+            fieldEntity.setSinkType(sinkType);
+            fieldEntity.setSinkId(sinkId);
+            fieldEntity.setIsDeleted(InlongConstants.UN_DELETED);
+            entityList.add(fieldEntity);
+        }
+
+        sinkFieldMapper.insertAll(entityList);
+        LOGGER.info("success to save starRock sink fields");
+    }
+
+    @Override
+    public List<SinkField> getSinkFields(Integer sinkId) {
+        List<StreamSinkFieldEntity> sinkFieldEntities = 
sinkFieldMapper.selectBySinkId(sinkId);
+        List<SinkField> fieldList = new ArrayList<>();
+        if (CollectionUtils.isEmpty(sinkFieldEntities)) {
+            return fieldList;
+        }
+        sinkFieldEntities.forEach(field -> {
+            SinkField sinkField = new SinkField();
+            if (StringUtils.isNotBlank(field.getExtParams())) {
+                StarRocksColumnInfo starRocksColumnInfo = 
StarRocksColumnInfo.getFromJson(
+                        field.getExtParams());
+                CommonBeanUtils.copyProperties(field, starRocksColumnInfo, 
true);
+                fieldList.add(starRocksColumnInfo);
+            } else {
+                CommonBeanUtils.copyProperties(field, sinkField, true);
+                fieldList.add(sinkField);
+            }
+
+        });
+        return fieldList;
+    }
+
 }


Reply via email to