This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ae40b5c9f [INLONG-6938][Manager] Supports to create a starRocks database or table (#6939) ae40b5c9f is described below commit ae40b5c9f2200c900b728764429fa7df1613735a Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Mon Dec 19 16:29:41 2022 +0800 [INLONG-6938][Manager] Supports to create a starRocks database or table (#6939) --- .../pojo/sink/starrocks/StarRocksColumnInfo.java | 67 +++++++ .../manager/pojo/sink/starrocks/StarRocksSink.java | 9 + .../pojo/sink/starrocks/StarRocksSinkDTO.java | 39 +++- .../pojo/sink/starrocks/StarRocksSinkRequest.java | 10 + .../pojo/sink/starrocks/StarRocksTableInfo.java | 46 +++++ .../sink/starrocks/StarRocksJdbcUtils.java | 217 +++++++++++++++++++++ .../sink/starrocks/StarRocksResourceOperator.java | 134 +++++++++++++ .../sink/starrocks/StarRocksSqlBuilder.java | 217 +++++++++++++++++++++ .../sink/starrocks/StarRocksSinkOperator.java | 73 ++++++- 9 files changed, 807 insertions(+), 5 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java new file mode 100644 index 000000000..ba0a3731f --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksColumnInfo.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.starrocks; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.SinkField; + +import javax.validation.constraints.NotNull; + +/** + * StarRocks column info. + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@JsonTypeDefine(value = SinkType.STARROCKS) +public class StarRocksColumnInfo extends SinkField { + + private Boolean isSortKey = false; + + private Boolean isDistributed = false; + + /** + * Get the dto instance from the request + */ + public static StarRocksColumnInfo getFromRequest(SinkField sinkField) { + return CommonBeanUtils.copyProperties(sinkField, StarRocksColumnInfo::new, true); + } + + /** + * Get the extra param from the Json + */ + public static StarRocksColumnInfo getFromJson(@NotNull String extParams) { + if (StringUtils.isEmpty(extParams)) { + return new StarRocksColumnInfo(); + } + try { + return JsonUtils.parseObject(extParams, StarRocksColumnInfo.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java index 64820cbf3..1afaf389f 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSink.java @@ -75,6 +75,15 @@ public class StarRocksSink extends StreamSink { @ApiModelProperty("The multiple table-pattern of sink") private String tablePattern; + @ApiModelProperty("The table engine, like: OLAP, MYSQL, ELASTICSEARCH, etc, default is OLAP") + private String tableEngine; + + @ApiModelProperty("The table replication num") + private Integer replicationNum; + + @ApiModelProperty("The table barrel size") + private Integer barrelSize; + public StarRocksSink() { this.setSinkType(SinkType.STARROCKS); } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java index 36496bbb8..9b627c400 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkDTO.java @@ -18,9 +18,6 @@ package org.apache.inlong.manager.pojo.sink.starrocks; import io.swagger.annotations.ApiModelProperty; -import java.nio.charset.StandardCharsets; -import java.util.Map; -import javax.validation.constraints.NotNull; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -31,6 +28,11 @@ import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.AESUtils; import org.apache.inlong.manager.common.util.JsonUtils; +import javax.validation.constraints.NotNull; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + /** * Sink info of StarRocks */ @@ -73,6 +75,15 @@ public class StarRocksSinkDTO { @ApiModelProperty("The multiple table-pattern of sink") private String tablePattern; + @ApiModelProperty("The table engine, like: OLAP, MYSQL, ELASTICSEARCH, etc, default is OLAP") + private String tableEngine = "OLAP"; + + @ApiModelProperty("The table replication num") + private Integer replicationNum = 3; + + @ApiModelProperty("The table barrel size") + private Integer barrelSize = 8; + @ApiModelProperty("Password encrypt version") private Integer encryptVersion; @@ -100,6 +111,9 @@ public class StarRocksSinkDTO { .sinkMultipleFormat(request.getSinkMultipleFormat()) .databasePattern(request.getDatabasePattern()) .tablePattern(request.getTablePattern()) + .tableEngine(request.getTableEngine()) + .replicationNum(request.getReplicationNum()) + .barrelSize(request.getBarrelSize()) .encryptVersion(encryptVersion) .properties(request.getProperties()) .build(); @@ -113,6 +127,25 @@ public class StarRocksSinkDTO { } } + /** + * Get StarRocks table info + * + * @param sinkDTO StarRocks sink dto,{@link StarRocksSinkDTO} + * @param columnList StarRocks column info list,{@link StarRocksColumnInfo} + * @return {@link StarRocksTableInfo} + */ + public static StarRocksTableInfo getTableInfo(StarRocksSinkDTO sinkDTO, List<StarRocksColumnInfo> columnList) { + StarRocksTableInfo tableInfo = new StarRocksTableInfo(); + tableInfo.setDbName(sinkDTO.getDatabaseName()); + tableInfo.setTableName(sinkDTO.getTableName()); + tableInfo.setColumns(columnList); + tableInfo.setPrimaryKey(sinkDTO.getPrimaryKey()); + tableInfo.setTableEngine(sinkDTO.getTableEngine()); + tableInfo.setReplicationNum(sinkDTO.getReplicationNum()); + tableInfo.setBarrelSize(sinkDTO.getBarrelSize()); + return tableInfo; + } + private StarRocksSinkDTO decryptPassword() throws Exception { if (StringUtils.isNotEmpty(this.password)) { byte[] passwordBytes = AESUtils.decryptAsString(this.password, this.encryptVersion); diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java index 1956021ef..8c4bb74cb 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksSinkRequest.java @@ -68,4 +68,14 @@ public class StarRocksSinkRequest extends SinkRequest { @ApiModelProperty("The multiple table-pattern of sink") private String tablePattern; + + @ApiModelProperty("The table engine, like: OLAP, MYSQL, ELASTICSEARCH, etc, default is OLAP") + private String tableEngine = "OLAP"; + + @ApiModelProperty("The table replication num") + private Integer replicationNum = 3; + + @ApiModelProperty("The table barrel size") + private Integer barrelSize = 8; + } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java new file mode 100644 index 000000000..318eec070 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/starrocks/StarRocksTableInfo.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.starrocks; + +import lombok.Data; + +import java.util.List; + +/** + * StarRocks table info. + */ +@Data +public class StarRocksTableInfo { + + private String dbName; + + private String tableName; + + private String comment; + + private String primaryKey; + + private String tableEngine; + + private Integer replicationNum; + + private Integer barrelSize; + + private List<StarRocksColumnInfo> columns; + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java new file mode 100644 index 000000000..b69809a41 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksJdbcUtils.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.starrocks; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.hive.jdbc.HiveDatabaseMetaData; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +public class StarRocksJdbcUtils { + + private static final String STAR_ROCKS_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String METADATA_TYPE = "TABLE"; + private static final String COLUMN_LABEL = "TABLE_NAME"; + private static final String STAR_ROCKS_JDBC_PREFIX = "jdbc:mysql"; + + private static final Logger LOGGER = LoggerFactory.getLogger(StarRocksJdbcUtils.class); + + /** + * Get starRocks connection from starRocks url and user + */ + public static Connection getConnection(String url, String user, String password) throws Exception { + if (StringUtils.isBlank(url) || !url.startsWith(STAR_ROCKS_JDBC_PREFIX)) { + throw new Exception("starRocks server url should start with " + STAR_ROCKS_JDBC_PREFIX); + } + Connection conn; + try { + Class.forName(STAR_ROCKS_DRIVER_CLASS); + conn = DriverManager.getConnection(url, user, password); + LOGGER.info("get star rocks connection success, url={}", url); + return conn; + } catch (Exception e) { + String errMsg = "get star rocks connection error, please check starRocks jdbc url, username or password"; + LOGGER.error(errMsg, e); + throw new Exception(errMsg + ", error: " + e.getMessage()); + } + } + + /** + * Execute sql on the specified starRocks Server + * + * @param sql need to execute + * @param url url of starRocks server + * @param user user of starRocks server + * @param password password of starRocks server + * @throws Exception when executing error + */ + public static void executeSql(String sql, String url, String user, String password) throws Exception { + try (Connection conn = getConnection(url, user, password); + Statement stmt = conn.createStatement()) { + stmt.execute(sql); + LOGGER.info("execute sql [{}] success", sql); + } + } + + /** + * Create StarRocks database + */ + public static void createDb(String url, String username, String password, String dbName) throws Exception { + String createDbSql = StarRocksSqlBuilder.buildCreateDbSql(dbName); + executeSql(createDbSql, url, username, password); + } + + /** + * Create StarRocks table + */ + public static void createTable(String url, String username, String password, StarRocksTableInfo tableInfo) + throws Exception { + if (checkTablesExist(url, username, password, tableInfo.getDbName(), tableInfo.getTableName())) { + LOGGER.info("The table [{}] are exists", tableInfo.getTableName()); + } else { + String createTableSql = StarRocksSqlBuilder.buildCreateTableSql(tableInfo); + StarRocksJdbcUtils.executeSql(createTableSql, url, username, password); + LOGGER.info("execute sql [{}] success", createTableSql); + } + } + + /** + * Get Hive tables from the Hive metadata + */ + public static List<String> getTables(String url, String user, String password, String dbName) throws Exception { + try (Connection conn = getConnection(url, user, password)) { + HiveDatabaseMetaData metaData = (HiveDatabaseMetaData) conn.getMetaData(); + ResultSet rs = metaData.getTables(dbName, dbName, null, new String[]{METADATA_TYPE}); + List<String> tables = new ArrayList<>(); + while (rs.next()) { + String tableName = rs.getString(COLUMN_LABEL); + tables.add(tableName); + } + rs.close(); + + return tables; + } + } + + /** + * Add columns for StarRocks table + */ + public static void addColumns(String url, String user, String password, String dbName, String tableName, + List<StarRocksColumnInfo> columnList) throws Exception { + final List<StarRocksColumnInfo> columnInfos = Lists.newArrayList(); + + for (StarRocksColumnInfo columnInfo : columnList) { + if (!checkColumnExist(url, user, password, dbName, tableName, columnInfo.getFieldName())) { + columnInfos.add(columnInfo); + } + } + List<String> addColumnSql = StarRocksSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos); + try (Connection conn = getConnection(url, user, password)) { + executeSqlBatch(conn, addColumnSql); + } + } + + /** + * Check tables from the StarRocks information_schema. + * + * @param url jdbcUrl + * @param user username + * @param password password + * @param dbName database name + * @param tableName table name + * @return true if table exist, otherwise false + * @throws Exception on check table exist error + */ + public static boolean checkTablesExist(String url, String user, String password, String dbName, + String tableName) throws Exception { + boolean result = false; + final String checkTableSql = StarRocksSqlBuilder.getCheckTable(dbName, tableName); + try (Connection conn = getConnection(url, user, password); + Statement stmt = conn.createStatement()) { + ResultSet resultSet = stmt.executeQuery(checkTableSql); + if (Objects.nonNull(resultSet)) { + if (resultSet.next()) { + result = true; + } + } + } + LOGGER.info("check table exist for db={} table={}, result={}", dbName, tableName, result); + return result; + } + + /** + * Check whether the column exists in the StarRocks table. + * + * @param url jdbcUrl + * @param user username + * @param password password + * @param dbName database name + * @param tableName table name + * @param column table column name + * @return true if column exist in the table, otherwise false + * @throws Exception on check column exist error + */ + public static boolean checkColumnExist(String url, String user, String password, String dbName, + final String tableName, final String column) throws Exception { + boolean result = false; + final String checkTableSql = StarRocksSqlBuilder.getCheckColumn(dbName, tableName, column); + try (Connection conn = getConnection(url, user, password); + Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery(checkTableSql)) { + if (Objects.nonNull(resultSet)) { + if (resultSet.next()) { + result = true; + } + } + } + LOGGER.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column); + return result; + } + + /** + * Execute batch query SQL on StarRocks. + * + * @param conn JDBC {@link Connection} + * @param sqls SQL to be executed + * @throws Exception on get execute SQL batch error + */ + public static void executeSqlBatch(final Connection conn, final List<String> sqls) throws Exception { + conn.setAutoCommit(false); + try (Statement stmt = conn.createStatement()) { + for (String entry : sqls) { + stmt.execute(entry); + } + conn.commit(); + LOGGER.info("execute sql [{}] success", sqls); + } finally { + conn.setAutoCommit(true); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java new file mode 100644 index 000000000..b657481b7 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksResourceOperator.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.starrocks; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.SinkStatus; +import org.apache.inlong.manager.common.exceptions.WorkflowException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.sink.SinkInfo; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo; +import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; +import org.apache.inlong.manager.service.resource.sink.mysql.MySQLResourceOperator; +import org.apache.inlong.manager.service.sink.StreamSinkService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; + +/** + * StarRocks resource operator. + */ +@Service +public class StarRocksResourceOperator implements SinkResourceOperator { + + private static final Logger LOG = LoggerFactory.getLogger(MySQLResourceOperator.class); + + @Autowired + private StreamSinkService sinkService; + + @Autowired + private StreamSinkFieldEntityMapper fieldEntityMapper; + + @Override + public Boolean accept(String sinkType) { + return SinkType.STARROCKS.equals(sinkType); + } + + @Override + public void createSinkResource(SinkInfo sinkInfo) { + LOG.info("begin to create sink resources sinkId={}", sinkInfo.getId()); + if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) { + LOG.warn("sink resource [" + sinkInfo.getId() + "] already success, skip to create"); + return; + } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) { + LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]"); + return; + } + this.createTable(sinkInfo); + } + + /** + * Create starRocks table by SinkInfo. + * + * @param sinkInfo {@link SinkInfo} + */ + private void createTable(SinkInfo sinkInfo) { + LOG.info("begin to create starRocks table for sinkId={}", sinkInfo.getId()); + List<StreamSinkFieldEntity> fieldList = fieldEntityMapper.selectBySinkId(sinkInfo.getId()); + if (CollectionUtils.isEmpty(fieldList)) { + LOG.warn("no starRocks fields found, skip to create table for sinkId={}", sinkInfo.getId()); + } + // get columns + List<StarRocksColumnInfo> columnList = getStarRocksColumnInfoFromSink(fieldList); + + StarRocksSinkDTO sinkDTO = StarRocksSinkDTO.getFromJson(sinkInfo.getExtParams()); + StarRocksTableInfo tableInfo = StarRocksSinkDTO.getTableInfo(sinkDTO, columnList); + String url = sinkDTO.getJdbcUrl(); + String username = sinkDTO.getUsername(); + String password = sinkDTO.getPassword(); + String dbName = sinkDTO.getDatabaseName(); + String tableName = sinkDTO.getTableName(); + try { + // 1. create database if not exists + StarRocksJdbcUtils.createDb(url, username, password, dbName); + String dbUrl = url + "/" + dbName; + // 2. table not exists, create it + StarRocksJdbcUtils.createTable(dbUrl, username, password, tableInfo); + // 3. table exists, add columns - skip the exists columns + StarRocksJdbcUtils.addColumns(dbUrl, username, password, dbName, tableName, columnList); + // 4. update the sink status to success + String info = "success to create StarRocks resource"; + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); + LOG.info(info + " for sinkInfo={}", sinkInfo); + } catch (Throwable e) { + String errMsg = "create StarRocks table failed: " + e.getMessage(); + LOG.error(errMsg, e); + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); + throw new WorkflowException(errMsg); + } + LOG.info("success create StarRocks table for data sink [" + sinkInfo.getId() + "]"); + } + + public List<StarRocksColumnInfo> getStarRocksColumnInfoFromSink(List<StreamSinkFieldEntity> sinkList) { + List<StarRocksColumnInfo> columnInfoList = new ArrayList<>(); + for (StreamSinkFieldEntity fieldEntity : sinkList) { + if (StringUtils.isNotBlank(fieldEntity.getExtParams())) { + StarRocksColumnInfo starRocksColumnInfo = StarRocksColumnInfo.getFromJson( + fieldEntity.getExtParams()); + CommonBeanUtils.copyProperties(fieldEntity, starRocksColumnInfo, true); + columnInfoList.add(starRocksColumnInfo); + } else { + StarRocksColumnInfo starRocksColumnInfo = new StarRocksColumnInfo(); + CommonBeanUtils.copyProperties(fieldEntity, starRocksColumnInfo, true); + columnInfoList.add(starRocksColumnInfo); + } + } + return columnInfoList; + } +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java new file mode 100644 index 000000000..ad5271be1 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/starrocks/StarRocksSqlBuilder.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.starrocks; + +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksTableInfo; +import org.apache.inlong.manager.service.resource.sink.hive.SqlBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Builder for SQL string + */ +public class StarRocksSqlBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(SqlBuilder.class); + + /** + * Build create database SQL + */ + public static String buildCreateDbSql(String dbName) { + // Support _ beginning with underscore + String sql = "CREATE DATABASE IF NOT EXISTS `" + dbName + "`"; + LOGGER.info("create db sql: {}", sql); + return sql; + } + + /** + * Build create table SQL + */ + public static String buildCreateTableSql(StarRocksTableInfo table) { + StringBuilder sql = new StringBuilder(); + sql.append("CREATE TABLE ").append(table.getTableName()); + // Construct columns and partition columns + sql.append(getColumnsAndComments(table)); + if (!StringUtils.isEmpty(table.getPrimaryKey())) { + sql.append(", PRIMARY KEY (") + .append(table.getPrimaryKey()) + .append(")"); + } + if (!Objects.isNull(table.getReplicationNum())) { + sql.append(" PROPERTIES ( \"replication_num\" = \"") + .append(table.getReplicationNum()) + .append("\")"); + } + + LOGGER.info("create table sql: {}", sql); + return sql.toString(); + } + + /** + * Build query table SQL + */ + public static String buildDescTableSql(String dbName, String tableName) { + StringBuilder sql = new StringBuilder(); + String fullTableName = "`" + dbName + "." + tableName + "`"; + sql.append("DESC ").append(fullTableName); + + LOGGER.info("desc table sql={}", sql); + return sql.toString(); + } + + /** + * Build add columns SQL. + * + * @param tableName StarRocks table name + * @param columnList StarRocks column list {@link List} + * @return add column SQL string list + */ + public static List<String> buildAddColumnsSql(String dbName, String tableName, + List<StarRocksColumnInfo> columnList) { + final List<String> columnInfoList = getColumnsInfo(columnList); + final List<String> resultList = new ArrayList<>(); + final StringBuilder sqlBuilder = new StringBuilder(); + columnInfoList.forEach(columnInfo -> { + sqlBuilder.append("ALTER TABLE ") + .append(dbName) + .append(".") + .append(tableName) + .append(" ADD COLUMN ") + .append(columnInfo) + .append(";"); + resultList.add(sqlBuilder.toString()); + sqlBuilder.delete(0, sqlBuilder.length()); + }); + return resultList; + } + + /** + * Build column info by StarRocksColumnInfo list. + * + * @param columns StarRocks column info {@link StarRocksColumnInfo} list + * @return the SQL list + */ + private static List<String> getColumnsInfo(List<StarRocksColumnInfo> columns) { + final List<String> columnList = new ArrayList<>(); + final StringBuilder columnBuilder = new StringBuilder(); + columns.forEach(columnInfo -> { + columnBuilder.append("`") + .append(columnInfo.getFieldName()) + .append("`") + .append(" ") + .append(columnInfo.getFieldType()); + if (!StringUtils.isEmpty(columnInfo.getFieldComment())) { + columnBuilder.append(" COMMENT '") + .append(columnInfo.getFieldComment()) + .append("'"); + } + columnBuilder.append(" "); + columnList.add(columnBuilder.toString()); + columnBuilder.delete(0, columnBuilder.length()); + }); + return columnList; + } + + /** + * Get columns and table comment string for create table SQL. + * + * For example: col_name data_type [COMMENT col_comment], col_name data_type [COMMENT col_comment].... + */ + private static String getColumnsAndComments(StarRocksTableInfo tableInfo) { + List<StarRocksColumnInfo> columns = tableInfo.getColumns(); + List<String> columnList = new ArrayList<>(); + List<String> sortKeyList = new ArrayList<>(); + List<String> distributeList = new ArrayList<>(); + for (StarRocksColumnInfo columnInfo : columns) { + // Construct columns and partition columns + StringBuilder columnStr = new StringBuilder().append("`").append(columnInfo.getFieldName()).append("` ") + .append(columnInfo.getFieldType()); + if (StringUtils.isNotEmpty(columnInfo.getFieldComment())) { + columnStr.append(" COMMENT ").append("'").append(columnInfo.getFieldComment()).append("'"); + } + + if (columnInfo.getIsDistributed()) { + distributeList.add("`" + columnInfo.getFieldName() + "` "); + } + if (columnInfo.getIsSortKey()) { + sortKeyList.add("`" + columnInfo.getFieldName() + "` "); + } + columnList.add(columnStr.toString()); + } + StringBuilder result = new StringBuilder().append(" (").append(StringUtils.join(columnList, ",")).append(") "); + // set partitions + if (sortKeyList.size() > 0) { + result.append("DUPLICATE KEY (").append(StringUtils.join(sortKeyList, ",")).append(") "); + } + if (distributeList.size() <= 0 && columns.size() > 0) { + distributeList.add("`" + columns.get(0).getFieldName() + "` "); + } + result.append("DISTRIBUTED BY HASH (").append(StringUtils.join(distributeList, ",")).append(") ") + .append("BUCKETS ") + .append(tableInfo.getBarrelSize()); + return result.toString(); + } + + /** + * Build SQL to check whether the table exists. + * + * @param dbName StarRocks database name + * @param tableName StarRocks table name + * @return the check table SQL string + */ + public static String getCheckTable(String dbName, String tableName) { + final StringBuilder sqlBuilder = new StringBuilder() + .append("select table_schema,table_name ") + .append(" from information_schema.tables where table_schema = '") + .append(dbName) + .append("' and table_name = '") + .append(tableName) + .append("' ;"); + LOGGER.info("check table sql: {}", sqlBuilder); + return sqlBuilder.toString(); + } + + /** + * Build SQL to check whether the column exists. + * + * @param dbName StarRocks database name + * @param tableName StarRocks table name + * @param columnName StarRocks column name + * @return the check column SQL string + */ + public static String getCheckColumn(String dbName, String tableName, String columnName) { + final StringBuilder sqlBuilder = new StringBuilder() + .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ") + .append(" from information_schema.COLUMNS where table_schema='") + .append(dbName) + .append("' and table_name = '") + .append(tableName) + .append("' and column_name = '") + .append(columnName) + .append("';"); + LOGGER.info("check table sql: {}", sqlBuilder); + return sqlBuilder.toString(); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java index 5d8785620..e9ff58717 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/starrocks/StarRocksSinkOperator.java @@ -18,17 +18,20 @@ package org.apache.inlong.manager.service.sink.starrocks; import com.fasterxml.jackson.databind.ObjectMapper; -import java.util.List; -import javax.validation.constraints.NotNull; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.exceptions.BusinessException; import org.apache.inlong.manager.common.util.CommonBeanUtils; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksColumnInfo; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSink; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkDTO; import org.apache.inlong.manager.pojo.sink.starrocks.StarRocksSinkRequest; @@ -38,6 +41,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import javax.validation.constraints.NotNull; +import java.util.ArrayList; +import java.util.List; + /** * StarRocks sink operator, such as save or update StarRocks field, etc. */ @@ -90,4 +97,66 @@ public class StarRocksSinkOperator extends AbstractSinkOperator { return sink; } + @Override + public void saveFieldOpt(SinkRequest request) { + List<SinkField> fieldList = request.getSinkFieldList(); + LOGGER.info("begin to save es sink fields={}", fieldList); + if (CollectionUtils.isEmpty(fieldList)) { + return; + } + + int size = fieldList.size(); + List<StreamSinkFieldEntity> entityList = new ArrayList<>(size); + String groupId = request.getInlongGroupId(); + String streamId = request.getInlongStreamId(); + String sinkType = request.getSinkType(); + Integer sinkId = request.getId(); + for (SinkField fieldInfo : fieldList) { + this.checkFieldInfo(fieldInfo); + StreamSinkFieldEntity fieldEntity = CommonBeanUtils.copyProperties(fieldInfo, StreamSinkFieldEntity::new); + if (StringUtils.isEmpty(fieldEntity.getFieldComment())) { + fieldEntity.setFieldComment(fieldEntity.getFieldName()); + } + try { + StarRocksColumnInfo dto = StarRocksColumnInfo.getFromRequest(fieldInfo); + fieldEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + LOGGER.error("parsing json string to sink field info failed", e); + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED.getMessage()); + } + fieldEntity.setInlongGroupId(groupId); + fieldEntity.setInlongStreamId(streamId); + fieldEntity.setSinkType(sinkType); + fieldEntity.setSinkId(sinkId); + fieldEntity.setIsDeleted(InlongConstants.UN_DELETED); + entityList.add(fieldEntity); + } + + sinkFieldMapper.insertAll(entityList); + LOGGER.info("success to save starRock sink fields"); + } + + @Override + public List<SinkField> getSinkFields(Integer sinkId) { + List<StreamSinkFieldEntity> sinkFieldEntities = sinkFieldMapper.selectBySinkId(sinkId); + List<SinkField> fieldList = new ArrayList<>(); + if (CollectionUtils.isEmpty(sinkFieldEntities)) { + return fieldList; + } + sinkFieldEntities.forEach(field -> { + SinkField sinkField = new SinkField(); + if (StringUtils.isNotBlank(field.getExtParams())) { + StarRocksColumnInfo starRocksColumnInfo = StarRocksColumnInfo.getFromJson( + field.getExtParams()); + CommonBeanUtils.copyProperties(field, starRocksColumnInfo, true); + fieldList.add(starRocksColumnInfo); + } else { + CommonBeanUtils.copyProperties(field, sinkField, true); + fieldList.add(sinkField); + } + + }); + return fieldList; + } + }