This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new bd4c3dbecf [INLONG-10703][Manager] Manager Add Oceanbase Support (#10701) bd4c3dbecf is described below commit bd4c3dbecfa75601ad53d68fb767b6afd639d3c5 Author: xxsc0529 <93303124+xxsc0...@users.noreply.github.com> AuthorDate: Wed Jul 31 20:02:03 2024 +0800 [INLONG-10703][Manager] Manager Add Oceanbase Support (#10701) --- .../inlong/manager/common/consts/DataNodeType.java | 1 + .../inlong/manager/common/consts/SinkType.java | 3 + .../inlong/manager/common/consts/SourceType.java | 1 + .../strategy/OceanBaseFieldTypeStrategy.java | 40 +++ .../resources/oceanbase-field-type-mapping.yaml | 228 +++++++++++++++++ .../pojo/node/oceanbase/OceanBaseDataNodeDTO.java | 85 +++++++ .../pojo/node/oceanbase/OceanBaseDataNodeInfo.java | 56 ++++ .../node/oceanbase/OceanBaseDataNodeRequest.java | 46 ++++ .../pojo/sink/oceanbase/OceanBaseColumnInfo.java | 37 +++ .../manager/pojo/sink/oceanbase/OceanBaseSink.java | 72 ++++++ .../pojo/sink/oceanbase/OceanBaseSinkDTO.java | 224 ++++++++++++++++ .../pojo/sink/oceanbase/OceanBaseSinkRequest.java | 60 +++++ .../pojo/sink/oceanbase/OceanBaseTableInfo.java | 43 ++++ .../pojo/sort/node/provider/OceanBaseProvider.java | 79 ++++++ .../source/oceanbase/OceanBaseBinlogSource.java | 117 +++++++++ .../source/oceanbase/OceanBaseBinlogSourceDTO.java | 146 +++++++++++ .../oceanbase/OceanBaseBinlogSourceRequest.java | 123 +++++++++ .../node/oceanbase/OceanBaseDataNodeOperator.java | 127 ++++++++++ .../sink/oceanbase/OceanBaseJdbcUtils.java | 282 +++++++++++++++++++++ .../sink/oceanbase/OceanBaseResourceOperator.java | 143 +++++++++++ .../sink/oceanbase/OceanBaseSqlBuilder.java | 231 +++++++++++++++++ .../sink/oceanbase/OceanBaseSinkOperator.java | 107 ++++++++ .../source/oceanbase/OceanBaseSourceOperator.java | 122 +++++++++ 23 files changed, 2373 insertions(+) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index fda7ee7262..0f1952c938 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -37,6 +37,7 @@ public class DataNodeType { public static final String SQLSERVER = "SQLSERVER"; public static final String MONGODB = "MONGODB"; public static final String DORIS = "DORIS"; + public static final String OCEANBASE = "OCEANBASE"; /** * Tencent cloud log service diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java index 554651ac8c..5d069e33df 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java @@ -71,6 +71,9 @@ public class SinkType extends StreamType { @SupportSortType(sortType = SortType.SORT_FLINK) public static final String TUBEMQ = "TUBEMQ"; + @SupportSortType(sortType = SortType.SORT_FLINK) + public static final String OCEANBASE = "OCEANBASE"; + /** * Tencent cloud log service * Details: <a href="https://www.tencentcloud.com/products/cls";>CLS</a> diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java index f40593e421..cb6f1a7f6b 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java @@ -36,6 +36,7 @@ public class SourceType extends StreamType { public static final String MONGODB = "MONGODB"; public static final String REDIS = "REDIS"; public static final String MQTT = "MQTT"; + public static final String OCEANBASE = "OCEANBASE"; public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new HashMap<String, TaskTypeEnum>() { diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java new file mode 100644 index 0000000000..8482899075 --- /dev/null +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.common.fieldtype.strategy; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.fieldtype.FieldTypeMappingReader; + +import org.springframework.stereotype.Service; + +/** + * The oceanbase field type mapping strategy + */ +@Service +public class OceanBaseFieldTypeStrategy extends DefaultFieldTypeStrategy { + + public OceanBaseFieldTypeStrategy() { + this.reader = new FieldTypeMappingReader(DataNodeType.OCEANBASE); + } + + @Override + public Boolean accept(String type) { + return DataNodeType.OCEANBASE.equals(type); + } + +} diff --git a/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml b/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml new file mode 100644 index 0000000000..0d2a0599cf --- /dev/null +++ b/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml @@ -0,0 +1,228 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source.type.to.target.type.converter: + + - source.type: TINYINT + target.type: TINYINT + + - source.type: SMALLINT + target.type: SMALLINT + + - source.type: TINYINT UNSIGNED + target.type: SMALLINT + + - source.type: TINYINT UNSIGNED ZEROFILL + target.type: SMALLINT + + - source.type: INT + target.type: INT + + - source.type: INTEGER + target.type: INT + + - source.type: YEAR + target.type: INT + + - source.type: SHORT + target.type: SHORT + + - source.type: MEDIUMINT + target.type: INT + + - source.type: SMALLINT UNSIGNED + target.type: INT + + - source.type: SMALLINT UNSIGNED ZEROFILL + target.type: INT + + - source.type: BIGINT + target.type: LONG + + - source.type: INT UNSIGNED + target.type: LONG + + - source.type: MEDIUMINT UNSIGNED + target.type: LONG + + - source.type: MEDIUMINT UNSIGNED ZEROFILL + target.type: LONG + + - source.type: INT UNSIGNED ZEROFILL + target.type: LONG + + - source.type: BIGINT UNSIGNED + target.type: DECIMAL + + - source.type: BIGINT UNSIGNED ZEROFILL + target.type: DECIMAL + + - source.type: SERIAL + target.type: DECIMAL + + - source.type: FLOAT + target.type: FLOAT + + - source.type: FLOAT UNSIGNED + target.type: FLOAT + + - source.type: FLOAT UNSIGNED ZEROFILL + target.type: FLOAT + + - source.type: DOUBLE + target.type: DOUBLE + + - source.type: DOUBLE UNSIGNED + target.type: DOUBLE + + - source.type: DOUBLE UNSIGNED ZEROFILL + target.type: DOUBLE + + - source.type: DOUBLE PRECISION + target.type: DOUBLE + + - source.type: DOUBLE PRECISION UNSIGNED + target.type: DOUBLE + + - source.type: ZEROFILL + target.type: DOUBLE + + - source.type: REAL + target.type: DOUBLE + + - source.type: REAL UNSIGNED + target.type: DOUBLE + + - source.type: REAL UNSIGNED ZEROFILL + target.type: DOUBLE + + - source.type: NUMERIC + target.type: DECIMAL + + - source.type: NUMERIC UNSIGNED + target.type: DECIMAL + + - source.type: NUMERIC UNSIGNED ZEROFILL + target.type: DECIMAL + + - source.type: DECIMAL + target.type: DECIMAL + + - source.type: DECIMAL UNSIGNED + target.type: DECIMAL + + - source.type: DECIMAL UNSIGNED ZEROFILL + target.type: DECIMAL + + - source.type: FIXED + target.type: DECIMAL + + - source.type: FIXED UNSIGNED + target.type: DECIMAL + + - source.type: FIXED UNSIGNED ZEROFILL + target.type: DECIMAL + + - source.type: BOOLEAN + target.type: BOOLEAN + + - source.type: DATE + target.type: DATE + + - source.type: TIME + target.type: TIME + + - source.type: DATETIME + target.type: TIMESTAMP + + - source.type: TIMESTAMP + target.type: TIMESTAMP + + - source.type: CHAR + target.type: STRING + + - source.type: JSON + target.type: STRING + + - source.type: BIT + target.type: STRING + + - source.type: VARCHAR + target.type: STRING + + - source.type: TEXT + target.type: STRING + + - source.type: BLOB + target.type: STRING + + - source.type: TINYBLOB + target.type: STRING + + - source.type: TINYTEXT + target.type: STRING + + - source.type: MEDIUMBLOB + target.type: STRING + + - source.type: MEDIUMTEXT + target.type: STRING + + - source.type: LONGBLOB + target.type: STRING + + - source.type: LONGTEXT + target.type: STRING + + - source.type: VARBINARY + target.type: STRING + + - source.type: GEOMETRY + target.type: STRING + + - source.type: POINT + target.type: STRING + + - source.type: LINESTRING + target.type: STRING + + - source.type: POLYGON + target.type: STRING + + - source.type: MULTIPOINT + target.type: STRING + + - source.type: MULTILINESTRING + target.type: STRING + + - source.type: MULTIPOLYGON + target.type: STRING + + - source.type: GEOMETRYCOLLECTION + target.type: STRING + + - source.type: ENUM + target.type: STRING + + - source.type: STRING + target.type: STRING + + - source.type: BINARY + target.type: BINARY + + - source.type: BYTE + target.type: BYTE \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java new file mode 100644 index 0000000000..2b2f9ff214 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.oceanbase; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.NotNull; + +/** + * OceanBase data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("OceanBase data node info") +public class OceanBaseDataNodeDTO { + + private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseDataNodeDTO.class); + private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://"; + + @ApiModelProperty("URL of backup DB server") + private String backupUrl; + + /** + * Get the dto instance from the request + */ + public static OceanBaseDataNodeDTO getFromRequest(OceanBaseDataNodeRequest request, String extParams) { + OceanBaseDataNodeDTO dto = StringUtils.isNotBlank(extParams) + ? OceanBaseDataNodeDTO.getFromJson(extParams) + : new OceanBaseDataNodeDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + /** + * Get the dto instance from the JSON string. + */ + public static OceanBaseDataNodeDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, OceanBaseDataNodeDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT, + String.format("Failed to parse extParams for OceanBase node: %s", e.getMessage())); + } + } + + /** + * Convert ip:post to jdbcurl. + */ + public static String convertToJdbcurl(String url) { + String jdbcUrl = url; + if (StringUtils.isNotBlank(jdbcUrl) && !jdbcUrl.startsWith(OCEANBASE_JDBC_PREFIX)) { + jdbcUrl = OCEANBASE_JDBC_PREFIX + jdbcUrl; + } + return jdbcUrl; + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java new file mode 100644 index 0000000000..2751ab4a5a --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.oceanbase; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * OceanBase data node info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.OCEANBASE) +@ApiModel("Hive data node info") +public class OceanBaseDataNodeInfo extends DataNodeInfo { + + @ApiModelProperty("URL of backup DB servere") + private String backupUrl; + + public OceanBaseDataNodeInfo() { + this.setType(DataNodeType.OCEANBASE); + } + + @Override + public OceanBaseDataNodeRequest genRequest() { + return CommonBeanUtils.copyProperties(this, OceanBaseDataNodeRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java new file mode 100644 index 0000000000..9eacb13483 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.oceanbase; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import javax.validation.constraints.Pattern; + +/** + * OceanBase data node request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@JsonTypeDefine(value = DataNodeType.OCEANBASE) +@ApiModel("OceanBase data node request") +public class OceanBaseDataNodeRequest extends DataNodeRequest { + + @ApiModelProperty("URL of backup DB server") + @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url") + private String backupUrl; + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java new file mode 100644 index 0000000000..c5e848737d --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.oceanbase; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * OceanBase column info. + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class OceanBaseColumnInfo { + + private String name; + + private String type; + + private String comment; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java new file mode 100644 index 0000000000..f78d1c84ba --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.oceanbase; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * OceanBase sink info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "OceanBase sink info") +@JsonTypeDefine(value = SinkType.OCEANBASE) +public class OceanBaseSink extends StreamSink { + + @ApiModelProperty("OceanBase JDBC URL, such as jdbc:mysql://host:port") + private String jdbcUrl; + + @ApiModelProperty("Username for JDBC URL") + private String username; + + @ApiModelProperty("User password") + private String password; + + @ApiModelProperty("Target database name") + private String databaseName; + + @ApiModelProperty("Target table name") + private String tableName; + + @ApiModelProperty("Primary key") + private String primaryKey; + + public OceanBaseSink() { + this.setSinkType(SinkType.MYSQL); + } + + @Override + public SinkRequest genSinkRequest() { + return CommonBeanUtils.copyProperties(this, OceanBaseSinkRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java new file mode 100644 index 0000000000..b358cc9f4e --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.oceanbase; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.sink.BaseStreamSink; + +import com.google.common.base.Strings; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.NotNull; + +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * OceanBase sink info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OceanBaseSinkDTO extends BaseStreamSink { + + // The protocol of using mysql in sink + private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseSinkDTO.class); + private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://"; + private static final String OCEANBASE_JDBC_PREFIX_CDC = "jdbc:mysql://"; + + @ApiModelProperty("OceanBase JDBC URL, such as jdbc:oceanbase://host:port") + private String jdbcUrl; + + @ApiModelProperty("Username for JDBC URL") + private String username; + + @ApiModelProperty("User password") + private String password; + + @ApiModelProperty("Target database name") + private String databaseName; + + @ApiModelProperty("Target table name") + private String tableName; + + @ApiModelProperty("Primary key") + private String primaryKey; + + @ApiModelProperty("Properties for OceanBase") + private Map<String, Object> properties; + + /** + * Get the dto instance from the request + * + * @param request OceanBaseSinkRequest + * @return {@link OceanBaseSinkDTO} + * @apiNote The config here will be saved to the database, so filter sensitive params before saving. + */ + public static OceanBaseSinkDTO getFromRequest(OceanBaseSinkRequest request, String extParams) { + OceanBaseSinkDTO dto = + StringUtils.isNotBlank(extParams) ? OceanBaseSinkDTO.getFromJson(extParams) : new OceanBaseSinkDTO(); + CommonBeanUtils.copyProperties(request, dto, true); + return dto; + } + + /** + * Get OceanBase sink info from JSON string + * + * @param extParams string ext params + * @return {@link OceanBaseSinkDTO} + */ + public static OceanBaseSinkDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, OceanBaseSinkDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + String.format("parse extParams of OceanBase SinkDTO failure: %s", e.getMessage())); + } + } + + /** + * Get OceanBase table info + * + * @param OceanBaseSink OceanBase sink dto,{@link OceanBaseSinkDTO} + * @param columnList OceanBase column info list,{@link OceanBaseColumnInfo} + * @return {@link OceanBaseTableInfo} + */ + public static OceanBaseTableInfo getTableInfo(OceanBaseSinkDTO OceanBaseSink, + List<OceanBaseColumnInfo> columnList) { + OceanBaseTableInfo tableInfo = new OceanBaseTableInfo(); + tableInfo.setDbName(OceanBaseSink.getDatabaseName()); + tableInfo.setTableName(OceanBaseSink.getTableName()); + tableInfo.setPrimaryKey(OceanBaseSink.getPrimaryKey()); + tableInfo.setColumns(columnList); + return tableInfo; + } + + /** + * Get DbName from jdbcUrl + * + * @param jdbcUrl OceanBase JDBC url, such as jdbc:oceanbase://host:port/database + * @return database name + */ + private static String getDbNameFromUrl(String jdbcUrl) { + String database = null; + + if (Strings.isNullOrEmpty(jdbcUrl)) { + throw new IllegalArgumentException("Invalid JDBC url."); + } + + jdbcUrl = jdbcUrl.toLowerCase(); + if (jdbcUrl.startsWith("jdbc:impala")) { + jdbcUrl = jdbcUrl.replace(":impala", ""); + } + + int pos1; + if (!jdbcUrl.startsWith("jdbc:") + || (pos1 = jdbcUrl.indexOf(':', 5)) == -1) { + throw new IllegalArgumentException("Invalid JDBC url."); + } + + String connUri = jdbcUrl.substring(pos1 + 1); + if (connUri.startsWith("//")) { + int pos = connUri.indexOf('/', 2); + if (pos != -1) { + database = connUri.substring(pos + 1); + } + } else { + database = connUri; + } + + if (Strings.isNullOrEmpty(database)) { + throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl); + } + + if (database.contains(InlongConstants.QUESTION_MARK)) { + database = database.substring(0, database.indexOf(InlongConstants.QUESTION_MARK)); + } + if (database.contains(InlongConstants.SEMICOLON)) { + database = database.substring(0, database.indexOf(InlongConstants.SEMICOLON)); + } + return database; + } + + public static String setDbNameToUrl(String jdbcUrl, String databaseName) { + if (StringUtils.isBlank(jdbcUrl)) { + return jdbcUrl; + } + String pattern = "jdbc:oceanbase://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)"; + Pattern namePattern = Pattern.compile(pattern); + Matcher dataMatcher = namePattern.matcher(jdbcUrl); + StringBuilder resultUrl; + if (dataMatcher.find()) { + String host = dataMatcher.group("host"); + String port = dataMatcher.group("port"); + resultUrl = new StringBuilder().append(OCEANBASE_JDBC_PREFIX) + .append(host) + .append(InlongConstants.COLON) + .append(port) + .append(InlongConstants.SLASH) + .append(databaseName); + } else { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + "OceanBase JDBC URL was invalid, it should like jdbc:mysql://host:port"); + } + if (jdbcUrl.contains(InlongConstants.QUESTION_MARK)) { + resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf(InlongConstants.QUESTION_MARK))); + } + return resultUrl.toString(); + } + public static String setDbNameToUrlWithCdc(String jdbcUrl, String databaseName) { + if (StringUtils.isBlank(jdbcUrl)) { + return jdbcUrl; + } + String pattern = "jdbc:oceanbase://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)"; + Pattern namePattern = Pattern.compile(pattern); + Matcher dataMatcher = namePattern.matcher(jdbcUrl); + StringBuilder resultUrl; + if (dataMatcher.find()) { + String host = dataMatcher.group("host"); + String port = dataMatcher.group("port"); + resultUrl = new StringBuilder().append(OCEANBASE_JDBC_PREFIX_CDC) + .append(host) + .append(InlongConstants.COLON) + .append(port) + .append(InlongConstants.SLASH) + .append(databaseName); + } else { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + "OceanBase JDBC URL was invalid, it should like jdbc:mysql://host:port"); + } + if (jdbcUrl.contains(InlongConstants.QUESTION_MARK)) { + resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf(InlongConstants.QUESTION_MARK))); + } + return resultUrl.toString(); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java new file mode 100644 index 0000000000..f7cc6f1efe --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.oceanbase; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.sink.SinkRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +import javax.validation.constraints.Pattern; + +/** + * OceanBase sink request. + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "OceanBase sink request") +@JsonTypeDefine(value = SinkType.OCEANBASE) +public class OceanBaseSinkRequest extends SinkRequest { + + @ApiModelProperty("OceanBase JDBC URL, such as jdbc:mysql://host:port") + @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url") + private String jdbcUrl; + + @ApiModelProperty("Username for JDBC URL") + private String username; + + @ApiModelProperty("User password") + private String password; + + @ApiModelProperty("Target database name") + private String databaseName; + + @ApiModelProperty("Target table name") + private String tableName; + + @ApiModelProperty("Primary key") + private String primaryKey; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java new file mode 100644 index 0000000000..1f1f6a3cbd --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.oceanbase; + +import lombok.Data; + +import java.util.List; + +/** + * OceanBase table info. + */ +@Data +public class OceanBaseTableInfo { + + private String dbName; + + private String tableName; + + private String comment; + + private String primaryKey; + + private String engine; + + private String charset; + + private List<OceanBaseColumnInfo> columns; +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java new file mode 100644 index 0000000000..da117670e0 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sort.node.provider; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy; +import org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeStrategyFactory; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSink; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO; +import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider; +import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.pojo.stream.StreamNode; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.node.LoadNode; +import org.apache.inlong.sort.protocol.node.load.OceanBaseLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; + +import com.google.common.collect.Lists; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Map; + +/** + * The Provider for creating OceanBase load nodes. + */ +@Service +public class OceanBaseProvider implements LoadNodeProvider { + + @Autowired + private FieldTypeStrategyFactory fieldTypeStrategyFactory; + + @Override + public Boolean accept(String sinkType) { + return SinkType.OCEANBASE.equals(sinkType); + } + + @Override + public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, StreamField> constantFieldMap) { + OceanBaseSink oceanBaseSink = (OceanBaseSink) nodeInfo; + Map<String, String> properties = parseProperties(oceanBaseSink.getProperties()); + FieldTypeMappingStrategy fieldTypeMappingStrategy = + fieldTypeStrategyFactory.getInstance(SinkType.MYSQL); + List<FieldInfo> fieldInfos = parseSinkFieldInfos(oceanBaseSink.getSinkFieldList(), oceanBaseSink.getSinkName(), + fieldTypeMappingStrategy); + List<FieldRelation> fieldRelations = parseSinkFields(oceanBaseSink.getSinkFieldList(), constantFieldMap); + + return new OceanBaseLoadNode( + oceanBaseSink.getSinkName(), + oceanBaseSink.getSinkName(), + fieldInfos, + fieldRelations, + Lists.newArrayList(), + null, + null, + properties, + OceanBaseSinkDTO.setDbNameToUrlWithCdc(oceanBaseSink.getJdbcUrl(), oceanBaseSink.getDatabaseName()), + oceanBaseSink.getUsername(), + oceanBaseSink.getPassword(), + oceanBaseSink.getTableName(), + oceanBaseSink.getPrimaryKey()); + } +} \ No newline at end of file diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java new file mode 100644 index 0000000000..585a5ba1f2 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.oceanbase; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +/** + * OceanBase binlog source info + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "OceanBase binlog source info") +@JsonTypeDefine(value = SourceType.OCEANBASE) +public class OceanBaseBinlogSource extends StreamSource { + + @ApiModelProperty("Username of the OceanBase server") + private String username; + + @ApiModelProperty("Password of the OceanBase server") + private String password; + + @ApiModelProperty("Hostname of the OceanBase server") + private String hostname; + + @ApiModelProperty("Port of the OceanBase server") + private Integer port; + + @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single node") + @Builder.Default + private Integer serverId = 0; + + @ApiModelProperty("Whether include schema, default is 'false'") + private String includeSchema; + + @ApiModelProperty(value = "List of DBs to be collected, seperated by ',', supporting regular expressions") + private String databaseWhiteList; + + @ApiModelProperty(value = "List of tables to be collected, seperated by ',',supporting regular expressions") + private String tableWhiteList; + + @ApiModelProperty("Database time zone, Default is UTC") + private String serverTimezone; + + @ApiModelProperty("The interval for recording an offset") + private String intervalMs; + + @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery") + private String snapshotMode; + + @ApiModelProperty("The file path to store offset info") + private String offsetFilename; + + @ApiModelProperty("The file path to store history info") + private String historyFilename; + + @ApiModelProperty("Whether to monitor the DDL, default is 'false'") + private String monitoredDdl; + + @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601") + @Builder.Default + private String timestampFormatStandard = "SQL"; + + @ApiModelProperty("Need transfer total database") + private boolean allMigration; + + @ApiModelProperty("Only incremental") + private boolean onlyIncremental; + + @ApiModelProperty("Primary key must be shared by all tables") + private String primaryKey; + + @ApiModelProperty("Directly read binlog from the specified offset filename") + private String specificOffsetFile; + + @ApiModelProperty("Directly read binlog from the specified offset position") + private Integer specificOffsetPos; + + public OceanBaseBinlogSource() { + this.setSourceType(SourceType.OCEANBASE); + } + + @Override + public SourceRequest genSourceRequest() { + return CommonBeanUtils.copyProperties(this, OceanBaseBinlogSourceRequest::new); + } +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java new file mode 100644 index 0000000000..7db34fd144 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.oceanbase; + +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; + +import javax.validation.constraints.NotNull; + +import java.util.Map; + +/** + * Binlog source info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class OceanBaseBinlogSourceDTO { + + @ApiModelProperty("Username of the OceanBase server") + private String username; + + @ApiModelProperty("Password of the OceanBase server") + private String password; + + @ApiModelProperty("Hostname of the OceanBase server") + private String hostname; + + @ApiModelProperty("Port of the OceanBase server") + private Integer port; + + @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single node") + @Builder.Default + private Integer serverId = 0; + + @ApiModelProperty("Whether include schema, default is 'false'") + private String includeSchema; + + @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, " + + "seperated by ',', for example: db1,test_db*", notes = "DBs not in this list are excluded. If not set, all DBs are monitored") + private String databaseWhiteList; + + @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, " + + "seperated by ',', for example: tb1,user*", notes = "Tables not in this list are excluded. By default, all tables are monitored") + private String tableWhiteList; + + @ApiModelProperty("Database time zone, Default is UTC") + private String serverTimezone; + + @ApiModelProperty("The interval for recording an offset") + private String intervalMs; + + /** + * <code>initial</code>: Default mode, do a snapshot when no offset is found. + * <p/> + * <code>when_needed</code>: Similar to initial, do a snapshot when the binlog position + * has been purged on the DB server. + * <p/> + * <code>never</code>: Do not snapshot. + * <p/> + * <code>schema_only</code>: All tables' column name will be taken, but the table data will not be exported, + * and it will only be consumed from the end of the binlog at the task is started. + * So it is very suitable for not caring about historical data, but only about recent changes. the + * <p/> + * <code>schema_only_recovery</code>: When <code>schema_only</code> mode fails, use this mode to recover, which is + * generally not used. + */ + @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery") + private String snapshotMode; + + @ApiModelProperty("The file path to store offset info") + private String offsetFilename; + + @ApiModelProperty("The file path to store history info") + private String historyFilename; + + @ApiModelProperty("Whether to monitor the DDL, default is 'false'") + private String monitoredDdl; + + @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601") + @Builder.Default + private String timestampFormatStandard = "SQL"; + + @ApiModelProperty("Whether to migrate all databases") + private boolean allMigration; + + @ApiModelProperty("Only incremental") + private boolean onlyIncremental; + + @ApiModelProperty("Primary key must be shared by all tables") + private String primaryKey; + + @ApiModelProperty("Directly read binlog from the specified offset filename") + private String specificOffsetFile; + + @ApiModelProperty("Directly read binlog from the specified offset position") + private Integer specificOffsetPos; + + @ApiModelProperty("Properties for OceanBase") + private Map<String, Object> properties; + + /** + * Get the dto instance from the request + */ + public static OceanBaseBinlogSourceDTO getFromRequest(OceanBaseBinlogSourceRequest request, String extParams) { + OceanBaseBinlogSourceDTO dto = StringUtils.isNotBlank(extParams) + ? OceanBaseBinlogSourceDTO.getFromJson(extParams) + : new OceanBaseBinlogSourceDTO(); + return CommonBeanUtils.copyProperties(request, dto, true); + } + + public static OceanBaseBinlogSourceDTO getFromJson(@NotNull String extParams) { + try { + return JsonUtils.parseObject(extParams, OceanBaseBinlogSourceDTO.class); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("parse extParams of OceanBaseBinlogSource failure: %s", e.getMessage())); + } + } + +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java new file mode 100644 index 0000000000..3be8006e88 --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.source.oceanbase; + +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.DataFormat; +import org.apache.inlong.manager.common.util.JsonTypeDefine; +import org.apache.inlong.manager.pojo.source.SourceRequest; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; + +/** + * OceanBase binlog source request + */ +@Data +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "OceanBase binlog source request") +@JsonTypeDefine(value = SourceType.OCEANBASE) +public class OceanBaseBinlogSourceRequest extends SourceRequest { + + @ApiModelProperty("Username of the DB server") + private String username; + + @ApiModelProperty("Password of the DB server") + private String password; + + @ApiModelProperty("Hostname of the DB server") + private String hostname; + + @ApiModelProperty("Port of the DB server") + private Integer port = 3306; + + @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single node") + private Integer serverId = 0; + + @ApiModelProperty("Whether include schema, default is 'false'") + private String includeSchema; + + @ApiModelProperty(value = "List of DBs to be collected, supporting regular expressions, " + + "separate them with ',', for example: db1,test_db*", notes = "DBs not in this list are excluded. If not set, all DBs are monitored") + private String databaseWhiteList; + + @ApiModelProperty(value = "List of tables to be collected, supporting regular expressions, " + + "separate them with ',', for example: tb1,user*", notes = "Tables not in this list are excluded. By default, all tables are monitored") + private String tableWhiteList; + + @ApiModelProperty("Database time zone, Default is UTC") + private String serverTimezone; + + @ApiModelProperty("The interval for recording an offset") + private String intervalMs = "500"; + + /** + * <code>initial</code>: Default mode, do a snapshot when no offset is found. + * <p/> + * <code>when_needed</code>: Similar to initial, do a snapshot when the binlog position + * has been purged on the DB server. + * <p/> + * <code>never</code>: Do not snapshot. + * <p/> + * <code>schema_only</code>: All tables' column name will be taken, but the table data will not be exported, + * and it will only be consumed from the end of the binlog at the task is started. + * So it is very suitable for not caring about historical data, but only about recent changes. the + * <p/> + * <code>schema_only_recovery</code>: When <code>schema_only</code> mode fails, use this mode to recover, which is + * generally not used. + */ + @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, schema_only, schema_only_recovery") + private String snapshotMode = "initial"; + + @ApiModelProperty("The file path to store offset info") + private String offsetFilename; + + @ApiModelProperty("The file path to store history info") + private String historyFilename; + + @ApiModelProperty("Whether to monitor the DDL, default is 'false'") + private String monitoredDdl; + + @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601") + private String timestampFormatStandard = "SQL"; + + @ApiModelProperty("Need transfer total database") + private boolean allMigration = false; + + @ApiModelProperty("Only incremental") + private boolean onlyIncremental; + + @ApiModelProperty("Primary key must be shared by all tables") + private String primaryKey; + + @ApiModelProperty("Directly read binlog from the specified offset filename") + private String specificOffsetFile; + + @ApiModelProperty("Directly read binlog from the specified offset position") + private Integer specificOffsetPos; + + public OceanBaseBinlogSourceRequest() { + this.setSourceType(SourceType.OCEANBASE); + this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName()); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java new file mode 100644 index 0000000000..84917e624e --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node.oceanbase; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.DataNodeRequest; +import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO; +import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeInfo; +import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeRequest; +import org.apache.inlong.manager.service.node.AbstractDataNodeOperator; +import org.apache.inlong.manager.service.resource.sink.oceanbase.OceanBaseJdbcUtils; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.sql.Connection; +import java.util.Objects; + +/** + * OceanBase data node operator + */ +@Service +public class OceanBaseDataNodeOperator extends AbstractDataNodeOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseDataNodeOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String dataNodeType) { + return getDataNodeType().equals(dataNodeType); + } + + @Override + public String getDataNodeType() { + return DataNodeType.OCEANBASE; + } + + @Override + public DataNodeInfo getFromEntity(DataNodeEntity entity) { + if (entity == null) { + throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND); + } + + OceanBaseDataNodeInfo dataNodeInfo = new OceanBaseDataNodeInfo(); + CommonBeanUtils.copyProperties(entity, dataNodeInfo); + if (StringUtils.isNotBlank(entity.getExtParams())) { + OceanBaseDataNodeDTO dto = OceanBaseDataNodeDTO.getFromJson(entity.getExtParams()); + CommonBeanUtils.copyProperties(dto, dataNodeInfo); + } + return dataNodeInfo; + } + + @Override + protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) { + OceanBaseDataNodeRequest dataNodeRequest = (OceanBaseDataNodeRequest) request; + CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true); + try { + OceanBaseDataNodeDTO dto = + OceanBaseDataNodeDTO.getFromRequest(dataNodeRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("Failed to build extParams for OceanBase node: %s", e.getMessage())); + } + } + + @Override + public Boolean testConnection(DataNodeRequest request) { + String jdbcUrl = OceanBaseDataNodeDTO.convertToJdbcurl(request.getUrl()); + String username = request.getUsername(); + String password = request.getToken(); + Preconditions.expectNotBlank(jdbcUrl, ErrorCodeEnum.INVALID_PARAMETER, "connection jdbcUrl cannot be empty"); + try (Connection ignored = OceanBaseJdbcUtils.getConnection(jdbcUrl, username, password)) { + LOGGER.info("OceanBase connection not null - connection success for jdbcUrl={}, username={}, password={}", + jdbcUrl, username, password); + return true; + } catch (Exception e) { + String errMsg = + String.format("OceanBase connection failed for jdbcUrl=%s, username=%s, password=%s", jdbcUrl, + username, password); + LOGGER.error(errMsg, e); + throw new BusinessException(errMsg); + } + } + + @Override + public void updateRelatedStreamSource(DataNodeRequest request, DataNodeEntity dataNodeEntity, String operator) { + OceanBaseDataNodeRequest nodeRequest = (OceanBaseDataNodeRequest) request; + OceanBaseDataNodeInfo nodeInfo = (OceanBaseDataNodeInfo) this.getFromEntity(dataNodeEntity); + boolean changed = !Objects.equals(nodeRequest.getUrl(), nodeInfo.getUrl()) + || !Objects.equals(nodeRequest.getBackupUrl(), nodeInfo.getBackupUrl()) + || !Objects.equals(nodeRequest.getUsername(), nodeInfo.getUsername()) + || !Objects.equals(nodeRequest.getToken(), nodeInfo.getToken()); + if (changed) { + retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), SourceType.OCEANBASE, operator); + } + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java new file mode 100644 index 0000000000..35e67a5a57 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.oceanbase; + +import org.apache.inlong.manager.common.util.UrlVerificationUtils; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * Utils for OceanBase JDBC. + */ +public class OceanBaseJdbcUtils { + + private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://"; + private static final String OCEANBASE_DRIVER_CLASS = "com.oceanbase.jdbc.Driver"; + private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseJdbcUtils.class); + + /** + * Get OceanBase connection from the url and user. + * + * @param url jdbc url, such as jdbc:oceanbase://host:port/database + * @param user Username for JDBC URL + * @param password User password + * @return {@link Connection} + * @throws Exception on get connection error + */ + public static Connection getConnection(String url, String user, String password) throws Exception { + if (StringUtils.isNotBlank(url) && StringUtils.isNotBlank(url) && StringUtils.isNotBlank(password)) { + UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, OCEANBASE_JDBC_PREFIX); + return establishDatabaseConnection(url, user, password); + } + return null; + } + + /** + * Establishes a database connection using the provided URL, username, and password. + * + * @param url The JDBC URL + * @param user The username + * @param password The user's password + * @return A {@link Connection} object representing the database connection + * @throws Exception If an error occurs while obtaining the connection + */ + private static Connection establishDatabaseConnection(String url, String user, String password) throws Exception { + Connection conn; + try { + Class.forName(OCEANBASE_DRIVER_CLASS); + conn = DriverManager.getConnection(url, user, password); + } catch (Exception e) { + String errorMsg = + "Failed to get OceanBase connection, please check OceanBase JDBC URL, username, or password!"; + LOGGER.error(errorMsg, e); + throw new Exception(errorMsg + " Other error message: " + e.getMessage()); + } + if (conn == null) { + throw new Exception("get OceanBase connection failed, please contact administrator"); + } + LOGGER.info("get OceanBase connection success for url={}", url); + return conn; + } + + /** + * Execute SQL command on OceanBase. + * + * @param conn JDBC {@link Connection} + * @param sql SQL to be executed + * @throws Exception on execute SQL error + */ + public static void executeSql(final Connection conn, final String sql) throws Exception { + try (Statement stmt = conn.createStatement()) { + stmt.execute(sql); + LOGGER.info("execute sql [{}] success", sql); + } + } + + /** + * Execute batch query SQL on OceanBase. + * + * @param conn JDBC {@link Connection} + * @param sqls SQL to be executed + * @throws Exception on get execute SQL batch error + */ + public static void executeSqlBatch(final Connection conn, final List<String> sqls) throws Exception { + conn.setAutoCommit(false); + try (Statement stmt = conn.createStatement()) { + for (String entry : sqls) { + stmt.execute(entry); + } + conn.commit(); + LOGGER.info("execute sql [{}] success", sqls); + } finally { + conn.setAutoCommit(true); + } + } + + /** + * Create OceanBase database + * + * @param conn JDBC {@link Connection} + * @param dbName database name + * @throws Exception on create database error + */ + public static void createDb(final Connection conn, final String dbName) throws Exception { + if (!checkDbExist(conn, dbName)) { + final String createDbSql = OceanBaseSqlBuilder.buildCreateDbSql(dbName); + executeSql(conn, createDbSql); + LOGGER.info("execute sql [{}] success", createDbSql); + } else { + LOGGER.info("The database [{}] are exists", dbName); + } + } + + /** + * Check database from the OceanBase information_schema. + * + * @param conn JDBC {@link Connection} + * @param dbName database name + * @return true if table exist, otherwise false + * @throws Exception on check database exist error + */ + public static boolean checkDbExist(final Connection conn, final String dbName) throws Exception { + final String checkDbSql = OceanBaseSqlBuilder.getCheckDatabase(dbName); + try (Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery(checkDbSql)) { + if (Objects.nonNull(resultSet)) { + if (resultSet.next()) { + LOGGER.info("check db exist for db={}, result=true", dbName); + return true; + } + } + } + LOGGER.info("check db exist for db={}, result=false", dbName); + return false; + } + + /** + * Create OceanBase table by OceanBaseTableInfo + * + * @param conn JDBC {@link Connection} + * @param tableInfo table info {@link OceanBaseTableInfo} + * @throws Exception on create table error + */ + public static void createTable(final Connection conn, final OceanBaseTableInfo tableInfo) throws Exception { + if (checkTablesExist(conn, tableInfo.getDbName(), tableInfo.getTableName())) { + LOGGER.info("The table [{}] are exists", tableInfo.getTableName()); + } else { + final String createTableSql = OceanBaseSqlBuilder.buildCreateTableSql(tableInfo); + executeSql(conn, createTableSql); + LOGGER.info("execute sql [{}] success", createTableSql); + } + } + + /** + * Check tables from the OceanBase information_schema. + * + * @param conn JDBC {@link Connection} + * @param dbName database name + * @param tableName table name + * @return true if table exist, otherwise false + * @throws Exception on check table exist error + */ + public static boolean checkTablesExist(final Connection conn, final String dbName, final String tableName) + throws Exception { + boolean result = false; + final String checkTableSql = OceanBaseSqlBuilder.getCheckTable(dbName, tableName); + try (Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery(checkTableSql)) { + if (Objects.nonNull(resultSet)) { + if (resultSet.next()) { + result = true; + } + } + } + LOGGER.info("check table exist for db={} table={}, result={}", dbName, tableName, result); + return result; + } + + /** + * Check whether the column exists in the OceanBase table. + * + * @param conn JDBC Connection {@link Connection} + * @param dbName database name + * @param tableName table name + * @param column table column name + * @return true if column exist in the table, otherwise false + * @throws Exception on check column exist error + */ + public static boolean checkColumnExist(final Connection conn, final String dbName, final String tableName, + final String column) throws Exception { + boolean result = false; + final String checkTableSql = OceanBaseSqlBuilder.getCheckColumn(dbName, tableName, column); + try (Statement stmt = conn.createStatement(); + ResultSet resultSet = stmt.executeQuery(checkTableSql)) { + if (Objects.nonNull(resultSet)) { + if (resultSet.next()) { + result = true; + } + } + } + LOGGER.info("check column exist for db={} table={}, result={} column={}", dbName, tableName, result, column); + return result; + } + + /** + * Query all OceanBase table columns by the given tableName. + * + * @param conn JDBC {@link Connection} + * @param dbName database name + * @param tableName table name + * @return {@link List} + * @throws Exception on get columns error + */ + public static List<OceanBaseColumnInfo> getColumns(final Connection conn, final String dbName, + final String tableName) + throws Exception { + final String querySql = OceanBaseSqlBuilder.buildDescTableSql(dbName, tableName); + final List<OceanBaseColumnInfo> columnList = new ArrayList<>(); + + try (Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(querySql)) { + if (Objects.nonNull(rs)) { + while (rs.next()) { + OceanBaseColumnInfo columnInfo = new OceanBaseColumnInfo(rs.getString(1), + rs.getString(2), rs.getString(3)); + columnList.add(columnInfo); + } + } + } + return columnList; + } + + /** + * Add columns for OceanBase table. + * + * @param conn JDBC Connection {@link Connection} + * @param dbName database name + * @param tableName table name + * @param columns columns to be added + * @throws Exception on add columns error + */ + public static void addColumns(final Connection conn, final String dbName, final String tableName, + final List<OceanBaseColumnInfo> columns) throws Exception { + final List<OceanBaseColumnInfo> columnInfos = Lists.newArrayList(); + + for (OceanBaseColumnInfo columnInfo : columns) { + if (!checkColumnExist(conn, dbName, tableName, columnInfo.getName())) { + columnInfos.add(columnInfo); + } + } + final List<String> addColumnSql = OceanBaseSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos); + executeSqlBatch(conn, addColumnSql); + } + +} \ No newline at end of file diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java new file mode 100644 index 0000000000..1da92bf3a8 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.oceanbase; + +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.enums.SinkStatus; +import org.apache.inlong.manager.common.exceptions.WorkflowException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity; +import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO; +import org.apache.inlong.manager.pojo.sink.SinkInfo; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo; +import org.apache.inlong.manager.service.node.DataNodeOperateHelper; +import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator; +import org.apache.inlong.manager.service.sink.StreamSinkService; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.sql.Connection; +import java.util.ArrayList; +import java.util.List; + +/** + * OceanBase's resource operator. + */ +@Service +public class OceanBaseResourceOperator implements SinkResourceOperator { + + private static final Logger LOG = LoggerFactory.getLogger(OceanBaseResourceOperator.class); + + @Autowired + private StreamSinkService sinkService; + + @Autowired + private StreamSinkFieldEntityMapper fieldEntityMapper; + + @Autowired + private DataNodeOperateHelper dataNodeHelper; + + @Override + public Boolean accept(String sinkType) { + return SinkType.OCEANBASE.equals(sinkType); + } + + @Override + public void createSinkResource(SinkInfo sinkInfo) { + LOG.info("begin to create OceanBase resources sinkId={}", sinkInfo.getId()); + if (SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) { + LOG.warn("OceanBase resource [" + sinkInfo.getId() + "] already success, skip to create"); + return; + } else if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource())) { + LOG.warn("create resource was disabled, skip to create for [" + sinkInfo.getId() + "]"); + return; + } + this.createTable(sinkInfo); + } + + /** + * Create OceanBase table by SinkInfo. + * + * @param sinkInfo {@link SinkInfo} + */ + private void createTable(SinkInfo sinkInfo) { + LOG.info("begin to create OceanBase table for sinkId={}", sinkInfo.getId()); + List<StreamSinkFieldEntity> fieldList = fieldEntityMapper.selectBySinkId(sinkInfo.getId()); + if (CollectionUtils.isEmpty(fieldList)) { + LOG.warn("no OceanBase fields found, skip to create table for sinkId={}", sinkInfo.getId()); + } + // set columns + List<OceanBaseColumnInfo> columnList = new ArrayList<>(); + for (StreamSinkFieldEntity field : fieldList) { + OceanBaseColumnInfo columnInfo = new OceanBaseColumnInfo(field.getFieldName(), field.getFieldType(), + field.getFieldComment()); + columnList.add(columnInfo); + } + + OceanBaseSinkDTO sinkDTO = this.getOceanBaseInfo(sinkInfo); + OceanBaseTableInfo tableInfo = OceanBaseSinkDTO.getTableInfo(sinkDTO, columnList); + try (Connection conn = OceanBaseJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(), + sinkDTO.getPassword())) { + // 1. create database if not exists + OceanBaseJdbcUtils.createDb(conn, tableInfo.getDbName()); + // 2. table not exists, create it + OceanBaseJdbcUtils.createTable(conn, tableInfo); + // 3. table exists, add columns - skip the exists columns + OceanBaseJdbcUtils.addColumns(conn, tableInfo.getDbName(), tableInfo.getTableName(), columnList); + + // 4. update the sink status to success + String info = "success to create OceanBase resource"; + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_SUCCESSFUL.getCode(), info); + LOG.info(info + " for sinkInfo={}", sinkInfo); + } catch (Throwable e) { + String errMsg = "create OceanBase table failed: " + e.getMessage(); + LOG.error(errMsg, e); + sinkService.updateStatus(sinkInfo.getId(), SinkStatus.CONFIG_FAILED.getCode(), errMsg); + throw new WorkflowException(errMsg); + } + LOG.info("success create OceanBase table for data sink [" + sinkInfo.getId() + "]"); + } + + private OceanBaseSinkDTO getOceanBaseInfo(SinkInfo sinkInfo) { + OceanBaseSinkDTO OceanBaseInfo = OceanBaseSinkDTO.getFromJson(sinkInfo.getExtParams()); + + if (StringUtils.isBlank(OceanBaseInfo.getJdbcUrl())) { + String dataNodeName = sinkInfo.getDataNodeName(); + Preconditions.expectNotBlank(dataNodeName, ErrorCodeEnum.INVALID_PARAMETER, + "OceanBase jdbc url not specified and data node is empty"); + DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, OceanBaseInfo); + OceanBaseInfo.setJdbcUrl(OceanBaseDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl())); + OceanBaseInfo.setPassword(dataNodeInfo.getToken()); + } + return OceanBaseInfo; + } + +} \ No newline at end of file diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java new file mode 100644 index 0000000000..5cbef1557f --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.resource.sink.oceanbase; + +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * Builder the SQL string for OceanBase + */ +public class OceanBaseSqlBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseSqlBuilder.class); + + /** + * Build SQL to check whether the database exists. + * + * @param dbName OceanBase database name + * @return the check database SQL string + */ + public static String getCheckDatabase(String dbName) { + final StringBuilder sqlBuilder = new StringBuilder() + .append("SELECT schema_name ") + .append(" FROM information_schema.schemata ") + .append("WHERE schema_name = '") + .append(dbName) + .append("';"); + LOGGER.info("check database sql: {}", sqlBuilder); + return sqlBuilder.toString(); + } + + /** + * Build SQL to check whether the table exists. + * + * @param dbName OceanBase database name + * @param tableName OceanBase table name + * @return the check table SQL string + */ + public static String getCheckTable(String dbName, String tableName) { + final StringBuilder sqlBuilder = new StringBuilder() + .append("select table_schema,table_name ") + .append(" from information_schema.tables where table_schema = '") + .append(dbName) + .append("' and table_name = '") + .append(tableName) + .append("' ;"); + LOGGER.info("check table sql: {}", sqlBuilder); + return sqlBuilder.toString(); + } + + /** + * Build SQL to check whether the column exists. + * + * @param dbName OceanBase database name + * @param tableName OceanBase table name + * @param columnName OceanBase column name + * @return the check column SQL string + */ + public static String getCheckColumn(String dbName, String tableName, String columnName) { + final StringBuilder sqlBuilder = new StringBuilder() + .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ") + .append(" from information_schema.COLUMNS where table_schema='") + .append(dbName) + .append("' and table_name = '") + .append(tableName) + .append("' and column_name = '") + .append(columnName) + .append("';"); + LOGGER.info("check table sql: {}", sqlBuilder); + return sqlBuilder.toString(); + } + + /** + * Build create database SQL. + * + * @param dbName OceanBase database name + * @return the create database SQL string + */ + public static String buildCreateDbSql(String dbName) { + final String sql = "CREATE DATABASE " + dbName; + LOGGER.info("create db sql: {}", sql); + return sql; + } + + /** + * Build create table SQL by OceanBaseTableInfo. + * + * @param table OceanBase table info {@link OceanBaseTableInfo} + * @return the create table SQL String + */ + public static String buildCreateTableSql(OceanBaseTableInfo table) { + final StringBuilder sql = new StringBuilder() + .append("CREATE TABLE ").append(table.getDbName()) + .append(".") + .append(table.getTableName()) + .append(buildCreateColumnsSql(table)); + + if (StringUtils.isEmpty(table.getEngine())) { + sql.append(" ENGINE=InnoDB "); + } else { + sql.append(" ENGINE=") + .append(table.getEngine()) + .append(" "); + } + + if (!StringUtils.isEmpty(table.getCharset())) { + sql.append(" DEFAULT CHARSET=") + .append(table.getCharset()) + .append(" "); + } + + LOGGER.info("create table sql: {}", sql); + return sql.toString(); + } + + /** + * Build add columns SQL. + * + * @param tableName OceanBase table name + * @param columnList OceanBase column list {@link List} + * @return add column SQL string list + */ + public static List<String> buildAddColumnsSql(String dbName, String tableName, + List<OceanBaseColumnInfo> columnList) { + final List<String> columnInfoList = getColumnsInfo(columnList); + final List<String> resultList = new ArrayList<>(); + final StringBuilder sqlBuilder = new StringBuilder(); + columnInfoList.forEach(columnInfo -> { + sqlBuilder.append("ALTER TABLE ") + .append(dbName) + .append(".") + .append(tableName) + .append(" ADD COLUMN ") + .append(columnInfo) + .append(";"); + resultList.add(sqlBuilder.toString()); + sqlBuilder.delete(0, sqlBuilder.length()); + }); + return resultList; + } + + /** + * Build create column SQL. + * + * @param table OceanBase table info {@link OceanBaseTableInfo} + * @return create column SQL string + */ + private static String buildCreateColumnsSql(OceanBaseTableInfo table) { + final List<String> columnList = getColumnsInfo(table.getColumns()); + final StringBuilder sql = new StringBuilder() + .append(" (") + .append(StringUtils.join(columnList, ",")); + if (!StringUtils.isEmpty(table.getPrimaryKey())) { + sql.append(", PRIMARY KEY (") + .append(table.getPrimaryKey()) + .append(")"); + } + sql.append(") "); + LOGGER.info("create columns sql={}", sql); + return sql.toString(); + } + + /** + * Build column info by OceanBaseColumnInfo list. + * + * @param columns OceanBase column info {@link OceanBaseColumnInfo} list + * @return the SQL list + */ + private static List<String> getColumnsInfo(List<OceanBaseColumnInfo> columns) { + final List<String> columnList = new ArrayList<>(); + final StringBuilder columnBuilder = new StringBuilder(); + columns.forEach(columnInfo -> { + columnBuilder.append("`") + .append(columnInfo.getName()) + .append("`") + .append(" ") + .append(columnInfo.getType()); + if (!StringUtils.isEmpty(columnInfo.getComment())) { + columnBuilder.append(" COMMENT '") + .append(columnInfo.getComment()) + .append("'"); + } + columnBuilder.append(" "); + columnList.add(columnBuilder.toString()); + columnBuilder.delete(0, columnBuilder.length()); + }); + return columnList; + } + + /** + * Build query table SQL. + * + * @param dbName OceanBase database name + * @param tableName OceanBase table name + * @return desc table SQL string + */ + public static String buildDescTableSql(String dbName, String tableName) { + final StringBuilder sql = new StringBuilder() + .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ") + .append(" from information_schema.COLUMNS where table_schema='") + .append(dbName) + .append("' and table_name = '") + .append(tableName) + .append("';"); + LOGGER.info("desc table sql={}", sql); + return sql.toString(); + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java new file mode 100644 index 0000000000..debe791d82 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.sink.oceanbase; + +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.dao.entity.StreamSinkEntity; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO; +import org.apache.inlong.manager.pojo.sink.SinkField; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSink; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO; +import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkRequest; +import org.apache.inlong.manager.service.sink.AbstractSinkOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * OceanBase sink operator + */ +@Service +public class OceanBaseSinkOperator extends AbstractSinkOperator { + + private static final Logger LOGGER = LoggerFactory.getLogger(OceanBaseSinkOperator.class); + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String sinkType) { + return SinkType.OCEANBASE.equals(sinkType); + } + + @Override + protected String getSinkType() { + return SinkType.OCEANBASE; + } + + @Override + protected void setTargetEntity(SinkRequest request, StreamSinkEntity targetEntity) { + if (!this.getSinkType().equals(request.getSinkType())) { + throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT, + ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + getSinkType()); + } + OceanBaseSinkRequest sinkRequest = (OceanBaseSinkRequest) request; + try { + OceanBaseSinkDTO dto = OceanBaseSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED, + String.format("serialize extParams of OceanBase SinkDTO failure: %s", e.getMessage())); + } + } + + @Override + public StreamSink getFromEntity(StreamSinkEntity entity) { + OceanBaseSink sink = new OceanBaseSink(); + if (entity == null) { + return sink; + } + + OceanBaseSinkDTO dto = OceanBaseSinkDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getJdbcUrl())) { + if (StringUtils.isBlank(entity.getDataNodeName())) { + throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT, + "OceanBase jdbc url not specified and data node is blank"); + } + DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo( + entity.getDataNodeName(), entity.getSinkType()); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + dto.setJdbcUrl(OceanBaseDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl())); + dto.setPassword(dataNodeInfo.getToken()); + } + CommonBeanUtils.copyProperties(entity, sink, true); + CommonBeanUtils.copyProperties(dto, sink, true); + List<SinkField> sinkFields = super.getSinkFields(entity.getId()); + sink.setSinkFieldList(sinkFields); + return sink; + } + +} diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java new file mode 100644 index 0000000000..ae55c41262 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.source.oceanbase; + +import org.apache.inlong.manager.common.consts.DataNodeType; +import org.apache.inlong.manager.common.consts.InlongConstants; +import org.apache.inlong.manager.common.consts.SourceType; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.dao.entity.StreamSourceEntity; +import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeInfo; +import org.apache.inlong.manager.pojo.source.SourceRequest; +import org.apache.inlong.manager.pojo.source.StreamSource; +import org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSource; +import org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSourceDTO; +import org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSourceRequest; +import org.apache.inlong.manager.pojo.stream.StreamField; +import org.apache.inlong.manager.service.source.AbstractSourceOperator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +import java.util.Objects; + +/** + * Binlog source operator + */ +@Service +public class OceanBaseSourceOperator extends AbstractSourceOperator { + + @Autowired + private ObjectMapper objectMapper; + + @Override + public Boolean accept(String sourceType) { + return SourceType.OCEANBASE.equals(sourceType); + } + + @Override + protected String getSourceType() { + return SourceType.OCEANBASE; + } + + @Override + public String getExtParams(StreamSourceEntity sourceEntity) { + OceanBaseBinlogSourceDTO OceanBaseBinlogSourceDTO = JsonUtils.parseObject(sourceEntity.getExtParams(), + OceanBaseBinlogSourceDTO.class); + if (Objects.nonNull(OceanBaseBinlogSourceDTO) && StringUtils.isBlank(OceanBaseBinlogSourceDTO.getHostname())) { + OceanBaseDataNodeInfo dataNodeInfo = (OceanBaseDataNodeInfo) dataNodeService.get( + sourceEntity.getDataNodeName(), DataNodeType.OCEANBASE); + CommonBeanUtils.copyProperties(dataNodeInfo, OceanBaseBinlogSourceDTO, true); + OceanBaseBinlogSourceDTO.setPassword(dataNodeInfo.getToken()); + OceanBaseBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]); + OceanBaseBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1])); + return JsonUtils.toJsonString(OceanBaseBinlogSourceDTO); + } + return sourceEntity.getExtParams(); + } + + @Override + protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) { + OceanBaseBinlogSourceRequest sourceRequest = (OceanBaseBinlogSourceRequest) request; + CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true); + try { + OceanBaseBinlogSourceDTO dto = + OceanBaseBinlogSourceDTO.getFromRequest(sourceRequest, targetEntity.getExtParams()); + targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); + } catch (Exception e) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + String.format("serialize extParams of OceanBaseBinlog SourceDTO failure: %s", e.getMessage())); + } + } + + @Override + public StreamSource getFromEntity(StreamSourceEntity entity) { + OceanBaseBinlogSource source = new OceanBaseBinlogSource(); + if (entity == null) { + return source; + } + + OceanBaseBinlogSourceDTO dto = OceanBaseBinlogSourceDTO.getFromJson(entity.getExtParams()); + if (StringUtils.isBlank(dto.getHostname())) { + if (StringUtils.isBlank(entity.getDataNodeName())) { + throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT, + "OceanBase url and data node is blank"); + } + OceanBaseDataNodeInfo dataNodeInfo = (OceanBaseDataNodeInfo) dataNodeService.get( + entity.getDataNodeName(), DataNodeType.OCEANBASE); + CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); + dto.setPassword(dataNodeInfo.getToken()); + dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]); + dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1])); + } + CommonBeanUtils.copyProperties(entity, source, true); + CommonBeanUtils.copyProperties(dto, source, true); + + List<StreamField> sourceFields = super.getSourceFields(entity.getId()); + source.setFieldList(sourceFields); + return source; + } + +}