This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new bd4c3dbecf [INLONG-10703][Manager] Manager Add Oceanbase Support 
(#10701)
bd4c3dbecf is described below

commit bd4c3dbecfa75601ad53d68fb767b6afd639d3c5
Author: xxsc0529 <93303124+xxsc0...@users.noreply.github.com>
AuthorDate: Wed Jul 31 20:02:03 2024 +0800

    [INLONG-10703][Manager] Manager Add Oceanbase Support (#10701)
---
 .../inlong/manager/common/consts/DataNodeType.java |   1 +
 .../inlong/manager/common/consts/SinkType.java     |   3 +
 .../inlong/manager/common/consts/SourceType.java   |   1 +
 .../strategy/OceanBaseFieldTypeStrategy.java       |  40 +++
 .../resources/oceanbase-field-type-mapping.yaml    | 228 +++++++++++++++++
 .../pojo/node/oceanbase/OceanBaseDataNodeDTO.java  |  85 +++++++
 .../pojo/node/oceanbase/OceanBaseDataNodeInfo.java |  56 ++++
 .../node/oceanbase/OceanBaseDataNodeRequest.java   |  46 ++++
 .../pojo/sink/oceanbase/OceanBaseColumnInfo.java   |  37 +++
 .../manager/pojo/sink/oceanbase/OceanBaseSink.java |  72 ++++++
 .../pojo/sink/oceanbase/OceanBaseSinkDTO.java      | 224 ++++++++++++++++
 .../pojo/sink/oceanbase/OceanBaseSinkRequest.java  |  60 +++++
 .../pojo/sink/oceanbase/OceanBaseTableInfo.java    |  43 ++++
 .../pojo/sort/node/provider/OceanBaseProvider.java |  79 ++++++
 .../source/oceanbase/OceanBaseBinlogSource.java    | 117 +++++++++
 .../source/oceanbase/OceanBaseBinlogSourceDTO.java | 146 +++++++++++
 .../oceanbase/OceanBaseBinlogSourceRequest.java    | 123 +++++++++
 .../node/oceanbase/OceanBaseDataNodeOperator.java  | 127 ++++++++++
 .../sink/oceanbase/OceanBaseJdbcUtils.java         | 282 +++++++++++++++++++++
 .../sink/oceanbase/OceanBaseResourceOperator.java  | 143 +++++++++++
 .../sink/oceanbase/OceanBaseSqlBuilder.java        | 231 +++++++++++++++++
 .../sink/oceanbase/OceanBaseSinkOperator.java      | 107 ++++++++
 .../source/oceanbase/OceanBaseSourceOperator.java  | 122 +++++++++
 23 files changed, 2373 insertions(+)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index fda7ee7262..0f1952c938 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -37,6 +37,7 @@ public class DataNodeType {
     public static final String SQLSERVER = "SQLSERVER";
     public static final String MONGODB = "MONGODB";
     public static final String DORIS = "DORIS";
+    public static final String OCEANBASE = "OCEANBASE";
 
     /**
      * Tencent cloud log service
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
index 554651ac8c..5d069e33df 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SinkType.java
@@ -71,6 +71,9 @@ public class SinkType extends StreamType {
     @SupportSortType(sortType = SortType.SORT_FLINK)
     public static final String TUBEMQ = "TUBEMQ";
 
+    @SupportSortType(sortType = SortType.SORT_FLINK)
+    public static final String OCEANBASE = "OCEANBASE";
+
     /**
      * Tencent cloud log service
      * Details: <a href="https://www.tencentcloud.com/products/cls";>CLS</a>
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index f40593e421..cb6f1a7f6b 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -36,6 +36,7 @@ public class SourceType extends StreamType {
     public static final String MONGODB = "MONGODB";
     public static final String REDIS = "REDIS";
     public static final String MQTT = "MQTT";
+    public static final String OCEANBASE = "OCEANBASE";
 
     public static final Map<String, TaskTypeEnum> SOURCE_TASK_MAP = new 
HashMap<String, TaskTypeEnum>() {
 
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java
new file mode 100644
index 0000000000..8482899075
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/fieldtype/strategy/OceanBaseFieldTypeStrategy.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.fieldtype.strategy;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.fieldtype.FieldTypeMappingReader;
+
+import org.springframework.stereotype.Service;
+
+/**
+ * The oceanbase field type mapping strategy
+ */
+@Service
+public class OceanBaseFieldTypeStrategy extends DefaultFieldTypeStrategy {
+
+    public OceanBaseFieldTypeStrategy() {
+        this.reader = new FieldTypeMappingReader(DataNodeType.OCEANBASE);
+    }
+
+    @Override
+    public Boolean accept(String type) {
+        return DataNodeType.OCEANBASE.equals(type);
+    }
+
+}
diff --git 
a/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml
 
b/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml
new file mode 100644
index 0000000000..0d2a0599cf
--- /dev/null
+++ 
b/inlong-manager/manager-common/src/main/resources/oceanbase-field-type-mapping.yaml
@@ -0,0 +1,228 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source.type.to.target.type.converter:
+
+  - source.type: TINYINT
+    target.type: TINYINT
+
+  - source.type: SMALLINT
+    target.type: SMALLINT
+
+  - source.type: TINYINT UNSIGNED
+    target.type: SMALLINT
+
+  - source.type: TINYINT UNSIGNED ZEROFILL
+    target.type: SMALLINT
+
+  - source.type: INT
+    target.type: INT
+
+  - source.type: INTEGER
+    target.type: INT
+
+  - source.type: YEAR
+    target.type: INT
+
+  - source.type: SHORT
+    target.type: SHORT
+
+  - source.type: MEDIUMINT
+    target.type: INT
+
+  - source.type: SMALLINT UNSIGNED
+    target.type: INT
+
+  - source.type: SMALLINT UNSIGNED ZEROFILL
+    target.type: INT
+
+  - source.type: BIGINT
+    target.type: LONG
+
+  - source.type: INT UNSIGNED
+    target.type: LONG
+
+  - source.type: MEDIUMINT UNSIGNED
+    target.type: LONG
+
+  - source.type: MEDIUMINT UNSIGNED ZEROFILL
+    target.type: LONG
+
+  - source.type: INT UNSIGNED ZEROFILL
+    target.type: LONG
+
+  - source.type: BIGINT UNSIGNED
+    target.type: DECIMAL
+
+  - source.type: BIGINT UNSIGNED ZEROFILL
+    target.type: DECIMAL
+
+  - source.type: SERIAL
+    target.type: DECIMAL
+
+  - source.type: FLOAT
+    target.type: FLOAT
+
+  - source.type: FLOAT UNSIGNED
+    target.type: FLOAT
+
+  - source.type: FLOAT UNSIGNED ZEROFILL
+    target.type: FLOAT
+
+  - source.type: DOUBLE
+    target.type: DOUBLE
+
+  - source.type: DOUBLE UNSIGNED
+    target.type: DOUBLE
+
+  - source.type: DOUBLE UNSIGNED ZEROFILL
+    target.type: DOUBLE
+
+  - source.type: DOUBLE PRECISION
+    target.type: DOUBLE
+
+  - source.type: DOUBLE PRECISION UNSIGNED
+    target.type: DOUBLE
+
+  - source.type: ZEROFILL
+    target.type: DOUBLE
+
+  - source.type: REAL
+    target.type: DOUBLE
+
+  - source.type: REAL UNSIGNED
+    target.type: DOUBLE
+
+  - source.type: REAL UNSIGNED ZEROFILL
+    target.type: DOUBLE
+
+  - source.type: NUMERIC
+    target.type: DECIMAL
+
+  - source.type: NUMERIC UNSIGNED
+    target.type: DECIMAL
+
+  - source.type: NUMERIC UNSIGNED ZEROFILL
+    target.type: DECIMAL
+
+  - source.type: DECIMAL
+    target.type: DECIMAL
+
+  - source.type: DECIMAL UNSIGNED
+    target.type: DECIMAL
+
+  - source.type: DECIMAL UNSIGNED ZEROFILL
+    target.type: DECIMAL
+
+  - source.type: FIXED
+    target.type: DECIMAL
+
+  - source.type: FIXED UNSIGNED
+    target.type: DECIMAL
+
+  - source.type: FIXED UNSIGNED ZEROFILL
+    target.type: DECIMAL
+
+  - source.type: BOOLEAN
+    target.type: BOOLEAN
+
+  - source.type: DATE
+    target.type: DATE
+
+  - source.type: TIME
+    target.type: TIME
+
+  - source.type: DATETIME
+    target.type: TIMESTAMP
+
+  - source.type: TIMESTAMP
+    target.type: TIMESTAMP
+
+  - source.type: CHAR
+    target.type: STRING
+
+  - source.type: JSON
+    target.type: STRING
+
+  - source.type: BIT
+    target.type: STRING
+
+  - source.type: VARCHAR
+    target.type: STRING
+
+  - source.type: TEXT
+    target.type: STRING
+
+  - source.type: BLOB
+    target.type: STRING
+
+  - source.type: TINYBLOB
+    target.type: STRING
+
+  - source.type: TINYTEXT
+    target.type: STRING
+
+  - source.type: MEDIUMBLOB
+    target.type: STRING
+
+  - source.type: MEDIUMTEXT
+    target.type: STRING
+
+  - source.type: LONGBLOB
+    target.type: STRING
+
+  - source.type: LONGTEXT
+    target.type: STRING
+
+  - source.type: VARBINARY
+    target.type: STRING
+
+  - source.type: GEOMETRY
+    target.type: STRING
+
+  - source.type: POINT
+    target.type: STRING
+
+  - source.type: LINESTRING
+    target.type: STRING
+
+  - source.type: POLYGON
+    target.type: STRING
+
+  - source.type: MULTIPOINT
+    target.type: STRING
+
+  - source.type: MULTILINESTRING
+    target.type: STRING
+
+  - source.type: MULTIPOLYGON
+    target.type: STRING
+
+  - source.type: GEOMETRYCOLLECTION
+    target.type: STRING
+
+  - source.type: ENUM
+    target.type: STRING
+
+  - source.type: STRING
+    target.type: STRING
+
+  - source.type: BINARY
+    target.type: BINARY
+
+  - source.type: BYTE
+    target.type: BYTE
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java
new file mode 100644
index 0000000000..2b2f9ff214
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeDTO.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.oceanbase;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * OceanBase data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("OceanBase data node info")
+public class OceanBaseDataNodeDTO {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OceanBaseDataNodeDTO.class);
+    private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";
+
+    @ApiModelProperty("URL of backup DB server")
+    private String backupUrl;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static OceanBaseDataNodeDTO getFromRequest(OceanBaseDataNodeRequest 
request, String extParams) {
+        OceanBaseDataNodeDTO dto = StringUtils.isNotBlank(extParams)
+                ? OceanBaseDataNodeDTO.getFromJson(extParams)
+                : new OceanBaseDataNodeDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static OceanBaseDataNodeDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, 
OceanBaseDataNodeDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
+                    String.format("Failed to parse extParams for OceanBase 
node: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Convert ip:post to jdbcurl.
+     */
+    public static String convertToJdbcurl(String url) {
+        String jdbcUrl = url;
+        if (StringUtils.isNotBlank(jdbcUrl) && 
!jdbcUrl.startsWith(OCEANBASE_JDBC_PREFIX)) {
+            jdbcUrl = OCEANBASE_JDBC_PREFIX + jdbcUrl;
+        }
+        return jdbcUrl;
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java
new file mode 100644
index 0000000000..2751ab4a5a
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * OceanBase data node info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.OCEANBASE)
+@ApiModel("Hive data node info")
+public class OceanBaseDataNodeInfo extends DataNodeInfo {
+
+    @ApiModelProperty("URL of backup DB servere")
+    private String backupUrl;
+
+    public OceanBaseDataNodeInfo() {
+        this.setType(DataNodeType.OCEANBASE);
+    }
+
+    @Override
+    public OceanBaseDataNodeRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, 
OceanBaseDataNodeRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java
new file mode 100644
index 0000000000..9eacb13483
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/oceanbase/OceanBaseDataNodeRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import javax.validation.constraints.Pattern;
+
+/**
+ * OceanBase data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.OCEANBASE)
+@ApiModel("OceanBase data node request")
+public class OceanBaseDataNodeRequest extends DataNodeRequest {
+
+    @ApiModelProperty("URL of backup DB server")
+    @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
+    private String backupUrl;
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java
new file mode 100644
index 0000000000..c5e848737d
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseColumnInfo.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * OceanBase column info.
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class OceanBaseColumnInfo {
+
+    private String name;
+
+    private String type;
+
+    private String comment;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java
new file mode 100644
index 0000000000..f78d1c84ba
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSink.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * OceanBase sink info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase sink info")
+@JsonTypeDefine(value = SinkType.OCEANBASE)
+public class OceanBaseSink extends StreamSink {
+
+    @ApiModelProperty("OceanBase JDBC URL, such as jdbc:mysql://host:port")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    public OceanBaseSink() {
+        this.setSinkType(SinkType.MYSQL);
+    }
+
+    @Override
+    public SinkRequest genSinkRequest() {
+        return CommonBeanUtils.copyProperties(this, OceanBaseSinkRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java
new file mode 100644
index 0000000000..b358cc9f4e
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkDTO.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.sink.BaseStreamSink;
+
+import com.google.common.base.Strings;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * OceanBase sink info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OceanBaseSinkDTO extends BaseStreamSink {
+
+    // The protocol of using mysql in sink
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OceanBaseSinkDTO.class);
+    private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";
+    private static final String OCEANBASE_JDBC_PREFIX_CDC = "jdbc:mysql://";
+
+    @ApiModelProperty("OceanBase JDBC URL, such as jdbc:oceanbase://host:port")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+
+    @ApiModelProperty("Properties for OceanBase")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     *
+     * @param request OceanBaseSinkRequest
+     * @return {@link OceanBaseSinkDTO}
+     * @apiNote The config here will be saved to the database, so filter 
sensitive params before saving.
+     */
+    public static OceanBaseSinkDTO getFromRequest(OceanBaseSinkRequest 
request, String extParams) {
+        OceanBaseSinkDTO dto =
+                StringUtils.isNotBlank(extParams) ? 
OceanBaseSinkDTO.getFromJson(extParams) : new OceanBaseSinkDTO();
+        CommonBeanUtils.copyProperties(request, dto, true);
+        return dto;
+    }
+
+    /**
+     * Get OceanBase sink info from JSON string
+     *
+     * @param extParams string ext params
+     * @return {@link OceanBaseSinkDTO}
+     */
+    public static OceanBaseSinkDTO getFromJson(@NotNull String extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, OceanBaseSinkDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    String.format("parse extParams of OceanBase SinkDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    /**
+     * Get OceanBase table info
+     *
+     * @param OceanBaseSink OceanBase sink dto,{@link OceanBaseSinkDTO}
+     * @param columnList OceanBase column info list,{@link OceanBaseColumnInfo}
+     * @return {@link OceanBaseTableInfo}
+     */
+    public static OceanBaseTableInfo getTableInfo(OceanBaseSinkDTO 
OceanBaseSink,
+            List<OceanBaseColumnInfo> columnList) {
+        OceanBaseTableInfo tableInfo = new OceanBaseTableInfo();
+        tableInfo.setDbName(OceanBaseSink.getDatabaseName());
+        tableInfo.setTableName(OceanBaseSink.getTableName());
+        tableInfo.setPrimaryKey(OceanBaseSink.getPrimaryKey());
+        tableInfo.setColumns(columnList);
+        return tableInfo;
+    }
+
+    /**
+     * Get DbName from jdbcUrl
+     *
+     * @param jdbcUrl OceanBase JDBC url, such as 
jdbc:oceanbase://host:port/database
+     * @return database name
+     */
+    private static String getDbNameFromUrl(String jdbcUrl) {
+        String database = null;
+
+        if (Strings.isNullOrEmpty(jdbcUrl)) {
+            throw new IllegalArgumentException("Invalid JDBC url.");
+        }
+
+        jdbcUrl = jdbcUrl.toLowerCase();
+        if (jdbcUrl.startsWith("jdbc:impala")) {
+            jdbcUrl = jdbcUrl.replace(":impala", "");
+        }
+
+        int pos1;
+        if (!jdbcUrl.startsWith("jdbc:")
+                || (pos1 = jdbcUrl.indexOf(':', 5)) == -1) {
+            throw new IllegalArgumentException("Invalid JDBC url.");
+        }
+
+        String connUri = jdbcUrl.substring(pos1 + 1);
+        if (connUri.startsWith("//")) {
+            int pos = connUri.indexOf('/', 2);
+            if (pos != -1) {
+                database = connUri.substring(pos + 1);
+            }
+        } else {
+            database = connUri;
+        }
+
+        if (Strings.isNullOrEmpty(database)) {
+            throw new IllegalArgumentException("Invalid JDBC URL: " + jdbcUrl);
+        }
+
+        if (database.contains(InlongConstants.QUESTION_MARK)) {
+            database = database.substring(0, 
database.indexOf(InlongConstants.QUESTION_MARK));
+        }
+        if (database.contains(InlongConstants.SEMICOLON)) {
+            database = database.substring(0, 
database.indexOf(InlongConstants.SEMICOLON));
+        }
+        return database;
+    }
+
+    public static String setDbNameToUrl(String jdbcUrl, String databaseName) {
+        if (StringUtils.isBlank(jdbcUrl)) {
+            return jdbcUrl;
+        }
+        String pattern = 
"jdbc:oceanbase://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)";
+        Pattern namePattern = Pattern.compile(pattern);
+        Matcher dataMatcher = namePattern.matcher(jdbcUrl);
+        StringBuilder resultUrl;
+        if (dataMatcher.find()) {
+            String host = dataMatcher.group("host");
+            String port = dataMatcher.group("port");
+            resultUrl = new StringBuilder().append(OCEANBASE_JDBC_PREFIX)
+                    .append(host)
+                    .append(InlongConstants.COLON)
+                    .append(port)
+                    .append(InlongConstants.SLASH)
+                    .append(databaseName);
+        } else {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    "OceanBase JDBC URL was invalid, it should like 
jdbc:mysql://host:port");
+        }
+        if (jdbcUrl.contains(InlongConstants.QUESTION_MARK)) {
+            
resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf(InlongConstants.QUESTION_MARK)));
+        }
+        return resultUrl.toString();
+    }
+    public static String setDbNameToUrlWithCdc(String jdbcUrl, String 
databaseName) {
+        if (StringUtils.isBlank(jdbcUrl)) {
+            return jdbcUrl;
+        }
+        String pattern = 
"jdbc:oceanbase://(?<host>[a-zA-Z0-9-//.]+):(?<port>[0-9]+)?(?<ext>)";
+        Pattern namePattern = Pattern.compile(pattern);
+        Matcher dataMatcher = namePattern.matcher(jdbcUrl);
+        StringBuilder resultUrl;
+        if (dataMatcher.find()) {
+            String host = dataMatcher.group("host");
+            String port = dataMatcher.group("port");
+            resultUrl = new StringBuilder().append(OCEANBASE_JDBC_PREFIX_CDC)
+                    .append(host)
+                    .append(InlongConstants.COLON)
+                    .append(port)
+                    .append(InlongConstants.SLASH)
+                    .append(databaseName);
+        } else {
+            throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                    "OceanBase JDBC URL was invalid, it should like 
jdbc:mysql://host:port");
+        }
+        if (jdbcUrl.contains(InlongConstants.QUESTION_MARK)) {
+            
resultUrl.append(jdbcUrl.substring(jdbcUrl.indexOf(InlongConstants.QUESTION_MARK)));
+        }
+        return resultUrl.toString();
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java
new file mode 100644
index 0000000000..f7cc6f1efe
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseSinkRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import javax.validation.constraints.Pattern;
+
+/**
+ * OceanBase sink request.
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase sink request")
+@JsonTypeDefine(value = SinkType.OCEANBASE)
+public class OceanBaseSinkRequest extends SinkRequest {
+
+    @ApiModelProperty("OceanBase JDBC URL, such as jdbc:mysql://host:port")
+    @Pattern(regexp = "^((?!\\s).)*$", message = "not supports blank in url")
+    private String jdbcUrl;
+
+    @ApiModelProperty("Username for JDBC URL")
+    private String username;
+
+    @ApiModelProperty("User password")
+    private String password;
+
+    @ApiModelProperty("Target database name")
+    private String databaseName;
+
+    @ApiModelProperty("Target table name")
+    private String tableName;
+
+    @ApiModelProperty("Primary key")
+    private String primaryKey;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java
new file mode 100644
index 0000000000..1f1f6a3cbd
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/oceanbase/OceanBaseTableInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.oceanbase;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ * OceanBase table info.
+ */
+@Data
+public class OceanBaseTableInfo {
+
+    private String dbName;
+
+    private String tableName;
+
+    private String comment;
+
+    private String primaryKey;
+
+    private String engine;
+
+    private String charset;
+
+    private List<OceanBaseColumnInfo> columns;
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java
new file mode 100644
index 0000000000..da117670e0
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/OceanBaseProvider.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sort.node.provider;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
+import 
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeStrategyFactory;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSink;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO;
+import org.apache.inlong.manager.pojo.sort.node.base.LoadNodeProvider;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.pojo.stream.StreamNode;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.node.load.OceanBaseLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+import com.google.common.collect.Lists;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Provider for creating OceanBase load nodes.
+ */
+@Service
+public class OceanBaseProvider implements LoadNodeProvider {
+
+    @Autowired
+    private FieldTypeStrategyFactory fieldTypeStrategyFactory;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.OCEANBASE.equals(sinkType);
+    }
+
+    @Override
+    public LoadNode createLoadNode(StreamNode nodeInfo, Map<String, 
StreamField> constantFieldMap) {
+        OceanBaseSink oceanBaseSink = (OceanBaseSink) nodeInfo;
+        Map<String, String> properties = 
parseProperties(oceanBaseSink.getProperties());
+        FieldTypeMappingStrategy fieldTypeMappingStrategy =
+                fieldTypeStrategyFactory.getInstance(SinkType.MYSQL);
+        List<FieldInfo> fieldInfos = 
parseSinkFieldInfos(oceanBaseSink.getSinkFieldList(), 
oceanBaseSink.getSinkName(),
+                fieldTypeMappingStrategy);
+        List<FieldRelation> fieldRelations = 
parseSinkFields(oceanBaseSink.getSinkFieldList(), constantFieldMap);
+
+        return new OceanBaseLoadNode(
+                oceanBaseSink.getSinkName(),
+                oceanBaseSink.getSinkName(),
+                fieldInfos,
+                fieldRelations,
+                Lists.newArrayList(),
+                null,
+                null,
+                properties,
+                
OceanBaseSinkDTO.setDbNameToUrlWithCdc(oceanBaseSink.getJdbcUrl(), 
oceanBaseSink.getDatabaseName()),
+                oceanBaseSink.getUsername(),
+                oceanBaseSink.getPassword(),
+                oceanBaseSink.getTableName(),
+                oceanBaseSink.getPrimaryKey());
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java
new file mode 100644
index 0000000000..585a5ba1f2
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSource.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+
+/**
+ * OceanBase binlog source info
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase binlog source info")
+@JsonTypeDefine(value = SourceType.OCEANBASE)
+public class OceanBaseBinlogSource extends StreamSource {
+
+    @ApiModelProperty("Username of the OceanBase server")
+    private String username;
+
+    @ApiModelProperty("Password of the OceanBase server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the OceanBase server")
+    private String hostname;
+
+    @ApiModelProperty("Port of the OceanBase server")
+    private Integer port;
+
+    @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single 
node")
+    @Builder.Default
+    private Integer serverId = 0;
+
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
+    @ApiModelProperty(value = "List of DBs to be collected, seperated by ',', 
supporting regular expressions")
+    private String databaseWhiteList;
+
+    @ApiModelProperty(value = "List of tables to be collected, seperated by 
',',supporting regular expressions")
+    private String tableWhiteList;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
+
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, 
schema_only, schema_only_recovery")
+    private String snapshotMode;
+
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    @Builder.Default
+    private String timestampFormatStandard = "SQL";
+
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration;
+
+    @ApiModelProperty("Only incremental")
+    private boolean onlyIncremental;
+
+    @ApiModelProperty("Primary key must be shared by all tables")
+    private String primaryKey;
+
+    @ApiModelProperty("Directly read binlog from the specified offset 
filename")
+    private String specificOffsetFile;
+
+    @ApiModelProperty("Directly read binlog from the specified offset 
position")
+    private Integer specificOffsetPos;
+
+    public OceanBaseBinlogSource() {
+        this.setSourceType(SourceType.OCEANBASE);
+    }
+
+    @Override
+    public SourceRequest genSourceRequest() {
+        return CommonBeanUtils.copyProperties(this, 
OceanBaseBinlogSourceRequest::new);
+    }
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java
new file mode 100644
index 0000000000..7db34fd144
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceDTO.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.oceanbase;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.validation.constraints.NotNull;
+
+import java.util.Map;
+
+/**
+ * Binlog source info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class OceanBaseBinlogSourceDTO {
+
+    @ApiModelProperty("Username of the OceanBase server")
+    private String username;
+
+    @ApiModelProperty("Password of the OceanBase server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the OceanBase server")
+    private String hostname;
+
+    @ApiModelProperty("Port of the OceanBase server")
+    private Integer port;
+
+    @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single 
node")
+    @Builder.Default
+    private Integer serverId = 0;
+
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular 
expressions, "
+            + "seperated by ',', for example: db1,test_db*", notes = "DBs not 
in this list are excluded. If not set, all DBs are monitored")
+    private String databaseWhiteList;
+
+    @ApiModelProperty(value = "List of tables to be collected, supporting 
regular expressions, "
+            + "seperated by ',', for example: tb1,user*", notes = "Tables not 
in this list are excluded. By default, all tables are monitored")
+    private String tableWhiteList;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs;
+
+    /**
+     * <code>initial</code>: Default mode, do a snapshot when no offset is 
found.
+     * <p/>
+     * <code>when_needed</code>: Similar to initial, do a snapshot when the 
binlog position
+     * has been purged on the DB server.
+     * <p/>
+     * <code>never</code>: Do not snapshot.
+     * <p/>
+     * <code>schema_only</code>: All tables' column name will be taken, but 
the table data will not be exported,
+     * and it will only be consumed from the end of the binlog at the task is 
started.
+     * So it is very suitable for not caring about historical data, but only 
about recent changes. the
+     * <p/>
+     * <code>schema_only_recovery</code>: When <code>schema_only</code> mode 
fails, use this mode to recover, which is
+     * generally not used.
+     */
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, 
schema_only, schema_only_recovery")
+    private String snapshotMode;
+
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    @Builder.Default
+    private String timestampFormatStandard = "SQL";
+
+    @ApiModelProperty("Whether to migrate all databases")
+    private boolean allMigration;
+
+    @ApiModelProperty("Only incremental")
+    private boolean onlyIncremental;
+
+    @ApiModelProperty("Primary key must be shared by all tables")
+    private String primaryKey;
+
+    @ApiModelProperty("Directly read binlog from the specified offset 
filename")
+    private String specificOffsetFile;
+
+    @ApiModelProperty("Directly read binlog from the specified offset 
position")
+    private Integer specificOffsetPos;
+
+    @ApiModelProperty("Properties for OceanBase")
+    private Map<String, Object> properties;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static OceanBaseBinlogSourceDTO 
getFromRequest(OceanBaseBinlogSourceRequest request, String extParams) {
+        OceanBaseBinlogSourceDTO dto = StringUtils.isNotBlank(extParams)
+                ? OceanBaseBinlogSourceDTO.getFromJson(extParams)
+                : new OceanBaseBinlogSourceDTO();
+        return CommonBeanUtils.copyProperties(request, dto, true);
+    }
+
+    public static OceanBaseBinlogSourceDTO getFromJson(@NotNull String 
extParams) {
+        try {
+            return JsonUtils.parseObject(extParams, 
OceanBaseBinlogSourceDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("parse extParams of OceanBaseBinlogSource 
failure: %s", e.getMessage()));
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java
new file mode 100644
index 0000000000..3be8006e88
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/oceanbase/OceanBaseBinlogSourceRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.source.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.DataFormat;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+/**
+ * OceanBase binlog source request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "OceanBase binlog source request")
+@JsonTypeDefine(value = SourceType.OCEANBASE)
+public class OceanBaseBinlogSourceRequest extends SourceRequest {
+
+    @ApiModelProperty("Username of the DB server")
+    private String username;
+
+    @ApiModelProperty("Password of the DB server")
+    private String password;
+
+    @ApiModelProperty("Hostname of the DB server")
+    private String hostname;
+
+    @ApiModelProperty("Port of the DB server")
+    private Integer port = 3306;
+
+    @ApiModelProperty("Id of physical node of OceanBase Cluster, 0 if single 
node")
+    private Integer serverId = 0;
+
+    @ApiModelProperty("Whether include schema, default is 'false'")
+    private String includeSchema;
+
+    @ApiModelProperty(value = "List of DBs to be collected, supporting regular 
expressions, "
+            + "separate them with ',', for example: db1,test_db*", notes = 
"DBs not in this list are excluded. If not set, all DBs are monitored")
+    private String databaseWhiteList;
+
+    @ApiModelProperty(value = "List of tables to be collected, supporting 
regular expressions, "
+            + "separate them with ',', for example: tb1,user*", notes = 
"Tables not in this list are excluded. By default, all tables are monitored")
+    private String tableWhiteList;
+
+    @ApiModelProperty("Database time zone, Default is UTC")
+    private String serverTimezone;
+
+    @ApiModelProperty("The interval for recording an offset")
+    private String intervalMs = "500";
+
+    /**
+     * <code>initial</code>: Default mode, do a snapshot when no offset is 
found.
+     * <p/>
+     * <code>when_needed</code>: Similar to initial, do a snapshot when the 
binlog position
+     * has been purged on the DB server.
+     * <p/>
+     * <code>never</code>: Do not snapshot.
+     * <p/>
+     * <code>schema_only</code>: All tables' column name will be taken, but 
the table data will not be exported,
+     * and it will only be consumed from the end of the binlog at the task is 
started.
+     * So it is very suitable for not caring about historical data, but only 
about recent changes. the
+     * <p/>
+     * <code>schema_only_recovery</code>: When <code>schema_only</code> mode 
fails, use this mode to recover, which is
+     * generally not used.
+     */
+    @ApiModelProperty("Snapshot mode, supports: initial, when_needed, never, 
schema_only, schema_only_recovery")
+    private String snapshotMode = "initial";
+
+    @ApiModelProperty("The file path to store offset info")
+    private String offsetFilename;
+
+    @ApiModelProperty("The file path to store history info")
+    private String historyFilename;
+
+    @ApiModelProperty("Whether to monitor the DDL, default is 'false'")
+    private String monitoredDdl;
+
+    @ApiModelProperty("Timestamp standard for binlog: SQL, ISO_8601")
+    private String timestampFormatStandard = "SQL";
+
+    @ApiModelProperty("Need transfer total database")
+    private boolean allMigration = false;
+
+    @ApiModelProperty("Only incremental")
+    private boolean onlyIncremental;
+
+    @ApiModelProperty("Primary key must be shared by all tables")
+    private String primaryKey;
+
+    @ApiModelProperty("Directly read binlog from the specified offset 
filename")
+    private String specificOffsetFile;
+
+    @ApiModelProperty("Directly read binlog from the specified offset 
position")
+    private Integer specificOffsetPos;
+
+    public OceanBaseBinlogSourceRequest() {
+        this.setSourceType(SourceType.OCEANBASE);
+        this.setSerializationType(DataFormat.DEBEZIUM_JSON.getName());
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java
new file mode 100644
index 0000000000..84917e624e
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/oceanbase/OceanBaseDataNodeOperator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import 
org.apache.inlong.manager.service.resource.sink.oceanbase.OceanBaseJdbcUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.sql.Connection;
+import java.util.Objects;
+
+/**
+ * OceanBase data node operator
+ */
+@Service
+public class OceanBaseDataNodeOperator extends AbstractDataNodeOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OceanBaseDataNodeOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String dataNodeType) {
+        return getDataNodeType().equals(dataNodeType);
+    }
+
+    @Override
+    public String getDataNodeType() {
+        return DataNodeType.OCEANBASE;
+    }
+
+    @Override
+    public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+        }
+
+        OceanBaseDataNodeInfo dataNodeInfo = new OceanBaseDataNodeInfo();
+        CommonBeanUtils.copyProperties(entity, dataNodeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            OceanBaseDataNodeDTO dto = 
OceanBaseDataNodeDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, dataNodeInfo);
+        }
+        return dataNodeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(DataNodeRequest request, DataNodeEntity 
targetEntity) {
+        OceanBaseDataNodeRequest dataNodeRequest = (OceanBaseDataNodeRequest) 
request;
+        CommonBeanUtils.copyProperties(dataNodeRequest, targetEntity, true);
+        try {
+            OceanBaseDataNodeDTO dto =
+                    OceanBaseDataNodeDTO.getFromRequest(dataNodeRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("Failed to build extParams for OceanBase 
node: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public Boolean testConnection(DataNodeRequest request) {
+        String jdbcUrl = 
OceanBaseDataNodeDTO.convertToJdbcurl(request.getUrl());
+        String username = request.getUsername();
+        String password = request.getToken();
+        Preconditions.expectNotBlank(jdbcUrl, ErrorCodeEnum.INVALID_PARAMETER, 
"connection jdbcUrl cannot be empty");
+        try (Connection ignored = OceanBaseJdbcUtils.getConnection(jdbcUrl, 
username, password)) {
+            LOGGER.info("OceanBase connection not null - connection success 
for jdbcUrl={}, username={}, password={}",
+                    jdbcUrl, username, password);
+            return true;
+        } catch (Exception e) {
+            String errMsg =
+                    String.format("OceanBase connection failed for jdbcUrl=%s, 
username=%s, password=%s", jdbcUrl,
+                            username, password);
+            LOGGER.error(errMsg, e);
+            throw new BusinessException(errMsg);
+        }
+    }
+
+    @Override
+    public void updateRelatedStreamSource(DataNodeRequest request, 
DataNodeEntity dataNodeEntity, String operator) {
+        OceanBaseDataNodeRequest nodeRequest = (OceanBaseDataNodeRequest) 
request;
+        OceanBaseDataNodeInfo nodeInfo = (OceanBaseDataNodeInfo) 
this.getFromEntity(dataNodeEntity);
+        boolean changed = !Objects.equals(nodeRequest.getUrl(), 
nodeInfo.getUrl())
+                || !Objects.equals(nodeRequest.getBackupUrl(), 
nodeInfo.getBackupUrl())
+                || !Objects.equals(nodeRequest.getUsername(), 
nodeInfo.getUsername())
+                || !Objects.equals(nodeRequest.getToken(), 
nodeInfo.getToken());
+        if (changed) {
+            retryStreamSourceByDataNodeNameAndType(dataNodeEntity.getName(), 
SourceType.OCEANBASE, operator);
+        }
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java
new file mode 100644
index 0000000000..35e67a5a57
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseJdbcUtils.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.oceanbase;
+
+import org.apache.inlong.manager.common.util.UrlVerificationUtils;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Utils for OceanBase JDBC.
+ */
+public class OceanBaseJdbcUtils {
+
+    private static final String OCEANBASE_JDBC_PREFIX = "jdbc:oceanbase://";
+    private static final String OCEANBASE_DRIVER_CLASS = 
"com.oceanbase.jdbc.Driver";
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OceanBaseJdbcUtils.class);
+
+    /**
+     * Get OceanBase connection from the url and user.
+     *
+     * @param url jdbc url, such as jdbc:oceanbase://host:port/database
+     * @param user Username for JDBC URL
+     * @param password User password
+     * @return {@link Connection}
+     * @throws Exception on get connection error
+     */
+    public static Connection getConnection(String url, String user, String 
password) throws Exception {
+        if (StringUtils.isNotBlank(url) && StringUtils.isNotBlank(url) && 
StringUtils.isNotBlank(password)) {
+            UrlVerificationUtils.extractHostAndValidatePortFromJdbcUrl(url, 
OCEANBASE_JDBC_PREFIX);
+            return establishDatabaseConnection(url, user, password);
+        }
+        return null;
+    }
+
+    /**
+     * Establishes a database connection using the provided URL, username, and 
password.
+     *
+     * @param url      The JDBC URL
+     * @param user     The username
+     * @param password The user's password
+     * @return A {@link Connection} object representing the database connection
+     * @throws Exception If an error occurs while obtaining the connection
+     */
+    private static Connection establishDatabaseConnection(String url, String 
user, String password) throws Exception {
+        Connection conn;
+        try {
+            Class.forName(OCEANBASE_DRIVER_CLASS);
+            conn = DriverManager.getConnection(url, user, password);
+        } catch (Exception e) {
+            String errorMsg =
+                    "Failed to get OceanBase connection, please check 
OceanBase JDBC URL, username, or password!";
+            LOGGER.error(errorMsg, e);
+            throw new Exception(errorMsg + " Other error message: " + 
e.getMessage());
+        }
+        if (conn == null) {
+            throw new Exception("get OceanBase connection failed, please 
contact administrator");
+        }
+        LOGGER.info("get OceanBase connection success for url={}", url);
+        return conn;
+    }
+
+    /**
+     * Execute SQL command on OceanBase.
+     *
+     * @param conn JDBC {@link Connection}
+     * @param sql SQL to be executed
+     * @throws Exception on execute SQL error
+     */
+    public static void executeSql(final Connection conn, final String sql) 
throws Exception {
+        try (Statement stmt = conn.createStatement()) {
+            stmt.execute(sql);
+            LOGGER.info("execute sql [{}] success", sql);
+        }
+    }
+
+    /**
+     * Execute batch query SQL on OceanBase.
+     *
+     * @param conn JDBC {@link Connection}
+     * @param sqls SQL to be executed
+     * @throws Exception on get execute SQL batch error
+     */
+    public static void executeSqlBatch(final Connection conn, final 
List<String> sqls) throws Exception {
+        conn.setAutoCommit(false);
+        try (Statement stmt = conn.createStatement()) {
+            for (String entry : sqls) {
+                stmt.execute(entry);
+            }
+            conn.commit();
+            LOGGER.info("execute sql [{}] success", sqls);
+        } finally {
+            conn.setAutoCommit(true);
+        }
+    }
+
+    /**
+     * Create OceanBase database
+     *
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @throws Exception on create database error
+     */
+    public static void createDb(final Connection conn, final String dbName) 
throws Exception {
+        if (!checkDbExist(conn, dbName)) {
+            final String createDbSql = 
OceanBaseSqlBuilder.buildCreateDbSql(dbName);
+            executeSql(conn, createDbSql);
+            LOGGER.info("execute sql [{}] success", createDbSql);
+        } else {
+            LOGGER.info("The database [{}] are exists", dbName);
+        }
+    }
+
+    /**
+     * Check database from the OceanBase information_schema.
+     *
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @return true if table exist, otherwise false
+     * @throws Exception on check database exist error
+     */
+    public static boolean checkDbExist(final Connection conn, final String 
dbName) throws Exception {
+        final String checkDbSql = OceanBaseSqlBuilder.getCheckDatabase(dbName);
+        try (Statement stmt = conn.createStatement();
+                ResultSet resultSet = stmt.executeQuery(checkDbSql)) {
+            if (Objects.nonNull(resultSet)) {
+                if (resultSet.next()) {
+                    LOGGER.info("check db exist for db={}, result=true", 
dbName);
+                    return true;
+                }
+            }
+        }
+        LOGGER.info("check db exist for db={}, result=false", dbName);
+        return false;
+    }
+
+    /**
+     * Create OceanBase table by OceanBaseTableInfo
+     *
+     * @param conn JDBC {@link Connection}
+     * @param tableInfo table info  {@link OceanBaseTableInfo}
+     * @throws Exception on create table error
+     */
+    public static void createTable(final Connection conn, final 
OceanBaseTableInfo tableInfo) throws Exception {
+        if (checkTablesExist(conn, tableInfo.getDbName(), 
tableInfo.getTableName())) {
+            LOGGER.info("The table [{}] are exists", tableInfo.getTableName());
+        } else {
+            final String createTableSql = 
OceanBaseSqlBuilder.buildCreateTableSql(tableInfo);
+            executeSql(conn, createTableSql);
+            LOGGER.info("execute sql [{}] success", createTableSql);
+        }
+    }
+
+    /**
+     * Check tables from the OceanBase information_schema.
+     *
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
+     * @return true if table exist, otherwise false
+     * @throws Exception on check table exist error
+     */
+    public static boolean checkTablesExist(final Connection conn, final String 
dbName, final String tableName)
+            throws Exception {
+        boolean result = false;
+        final String checkTableSql = OceanBaseSqlBuilder.getCheckTable(dbName, 
tableName);
+        try (Statement stmt = conn.createStatement();
+                ResultSet resultSet = stmt.executeQuery(checkTableSql)) {
+            if (Objects.nonNull(resultSet)) {
+                if (resultSet.next()) {
+                    result = true;
+                }
+            }
+        }
+        LOGGER.info("check table exist for db={} table={}, result={}", dbName, 
tableName, result);
+        return result;
+    }
+
+    /**
+     * Check whether the column exists in the OceanBase table.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
+     * @param column table column name
+     * @return true if column exist in the table, otherwise false
+     * @throws Exception on check column exist error
+     */
+    public static boolean checkColumnExist(final Connection conn, final String 
dbName, final String tableName,
+            final String column) throws Exception {
+        boolean result = false;
+        final String checkTableSql = 
OceanBaseSqlBuilder.getCheckColumn(dbName, tableName, column);
+        try (Statement stmt = conn.createStatement();
+                ResultSet resultSet = stmt.executeQuery(checkTableSql)) {
+            if (Objects.nonNull(resultSet)) {
+                if (resultSet.next()) {
+                    result = true;
+                }
+            }
+        }
+        LOGGER.info("check column exist for db={} table={}, result={} 
column={}", dbName, tableName, result, column);
+        return result;
+    }
+
+    /**
+     * Query all OceanBase table columns by the given tableName.
+     *
+     * @param conn JDBC {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
+     * @return {@link List}
+     * @throws Exception on get columns error
+     */
+    public static List<OceanBaseColumnInfo> getColumns(final Connection conn, 
final String dbName,
+            final String tableName)
+            throws Exception {
+        final String querySql = OceanBaseSqlBuilder.buildDescTableSql(dbName, 
tableName);
+        final List<OceanBaseColumnInfo> columnList = new ArrayList<>();
+
+        try (Statement stmt = conn.createStatement();
+                ResultSet rs = stmt.executeQuery(querySql)) {
+            if (Objects.nonNull(rs)) {
+                while (rs.next()) {
+                    OceanBaseColumnInfo columnInfo = new 
OceanBaseColumnInfo(rs.getString(1),
+                            rs.getString(2), rs.getString(3));
+                    columnList.add(columnInfo);
+                }
+            }
+        }
+        return columnList;
+    }
+
+    /**
+     * Add columns for OceanBase table.
+     *
+     * @param conn JDBC Connection  {@link Connection}
+     * @param dbName database name
+     * @param tableName table name
+     * @param columns columns to be added
+     * @throws Exception on add columns error
+     */
+    public static void addColumns(final Connection conn, final String dbName, 
final String tableName,
+            final List<OceanBaseColumnInfo> columns) throws Exception {
+        final List<OceanBaseColumnInfo> columnInfos = Lists.newArrayList();
+
+        for (OceanBaseColumnInfo columnInfo : columns) {
+            if (!checkColumnExist(conn, dbName, tableName, 
columnInfo.getName())) {
+                columnInfos.add(columnInfo);
+            }
+        }
+        final List<String> addColumnSql = 
OceanBaseSqlBuilder.buildAddColumnsSql(dbName, tableName, columnInfos);
+        executeSqlBatch(conn, addColumnSql);
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java
new file mode 100644
index 0000000000..1da92bf3a8
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseResourceOperator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
+import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * OceanBase's resource operator.
+ */
+@Service
+public class OceanBaseResourceOperator implements SinkResourceOperator {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseResourceOperator.class);
+
+    @Autowired
+    private StreamSinkService sinkService;
+
+    @Autowired
+    private StreamSinkFieldEntityMapper fieldEntityMapper;
+
+    @Autowired
+    private DataNodeOperateHelper dataNodeHelper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.OCEANBASE.equals(sinkType);
+    }
+
+    @Override
+    public void createSinkResource(SinkInfo sinkInfo) {
+        LOG.info("begin to create OceanBase resources sinkId={}", 
sinkInfo.getId());
+        if 
(SinkStatus.CONFIG_SUCCESSFUL.getCode().equals(sinkInfo.getStatus())) {
+            LOG.warn("OceanBase resource [" + sinkInfo.getId() + "] already 
success, skip to create");
+            return;
+        } else if 
(InlongConstants.DISABLE_CREATE_RESOURCE.equals(sinkInfo.getEnableCreateResource()))
 {
+            LOG.warn("create resource was disabled, skip to create for [" + 
sinkInfo.getId() + "]");
+            return;
+        }
+        this.createTable(sinkInfo);
+    }
+
+    /**
+     * Create OceanBase table by SinkInfo.
+     *
+     * @param sinkInfo {@link SinkInfo}
+     */
+    private void createTable(SinkInfo sinkInfo) {
+        LOG.info("begin to create OceanBase table for sinkId={}", 
sinkInfo.getId());
+        List<StreamSinkFieldEntity> fieldList = 
fieldEntityMapper.selectBySinkId(sinkInfo.getId());
+        if (CollectionUtils.isEmpty(fieldList)) {
+            LOG.warn("no OceanBase fields found, skip to create table for 
sinkId={}", sinkInfo.getId());
+        }
+        // set columns
+        List<OceanBaseColumnInfo> columnList = new ArrayList<>();
+        for (StreamSinkFieldEntity field : fieldList) {
+            OceanBaseColumnInfo columnInfo = new 
OceanBaseColumnInfo(field.getFieldName(), field.getFieldType(),
+                    field.getFieldComment());
+            columnList.add(columnInfo);
+        }
+
+        OceanBaseSinkDTO sinkDTO = this.getOceanBaseInfo(sinkInfo);
+        OceanBaseTableInfo tableInfo = OceanBaseSinkDTO.getTableInfo(sinkDTO, 
columnList);
+        try (Connection conn = 
OceanBaseJdbcUtils.getConnection(sinkDTO.getJdbcUrl(), sinkDTO.getUsername(),
+                sinkDTO.getPassword())) {
+            // 1. create database if not exists
+            OceanBaseJdbcUtils.createDb(conn, tableInfo.getDbName());
+            // 2. table not exists, create it
+            OceanBaseJdbcUtils.createTable(conn, tableInfo);
+            // 3. table exists, add columns - skip the exists columns
+            OceanBaseJdbcUtils.addColumns(conn, tableInfo.getDbName(), 
tableInfo.getTableName(), columnList);
+
+            // 4. update the sink status to success
+            String info = "success to create OceanBase resource";
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_SUCCESSFUL.getCode(), info);
+            LOG.info(info + " for sinkInfo={}", sinkInfo);
+        } catch (Throwable e) {
+            String errMsg = "create OceanBase table failed: " + e.getMessage();
+            LOG.error(errMsg, e);
+            sinkService.updateStatus(sinkInfo.getId(), 
SinkStatus.CONFIG_FAILED.getCode(), errMsg);
+            throw new WorkflowException(errMsg);
+        }
+        LOG.info("success create OceanBase table for data sink [" + 
sinkInfo.getId() + "]");
+    }
+
+    private OceanBaseSinkDTO getOceanBaseInfo(SinkInfo sinkInfo) {
+        OceanBaseSinkDTO OceanBaseInfo = 
OceanBaseSinkDTO.getFromJson(sinkInfo.getExtParams());
+
+        if (StringUtils.isBlank(OceanBaseInfo.getJdbcUrl())) {
+            String dataNodeName = sinkInfo.getDataNodeName();
+            Preconditions.expectNotBlank(dataNodeName, 
ErrorCodeEnum.INVALID_PARAMETER,
+                    "OceanBase jdbc url not specified and data node is empty");
+            DataNodeInfo dataNodeInfo = 
dataNodeHelper.getDataNodeInfo(dataNodeName, sinkInfo.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, OceanBaseInfo);
+            
OceanBaseInfo.setJdbcUrl(OceanBaseDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
+            OceanBaseInfo.setPassword(dataNodeInfo.getToken());
+        }
+        return OceanBaseInfo;
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java
new file mode 100644
index 0000000000..5cbef1557f
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/oceanbase/OceanBaseSqlBuilder.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.sink.oceanbase;
+
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseTableInfo;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Builder the SQL string for OceanBase
+ */
+public class OceanBaseSqlBuilder {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OceanBaseSqlBuilder.class);
+
+    /**
+     * Build SQL to check whether the database exists.
+     *
+     * @param dbName OceanBase database name
+     * @return the check database SQL string
+     */
+    public static String getCheckDatabase(String dbName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("SELECT schema_name ")
+                .append(" FROM information_schema.schemata ")
+                .append("WHERE schema_name = '")
+                .append(dbName)
+                .append("';");
+        LOGGER.info("check database sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build SQL to check whether the table exists.
+     *
+     * @param dbName OceanBase database name
+     * @param tableName OceanBase table name
+     * @return the check table SQL string
+     */
+    public static String getCheckTable(String dbName, String tableName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("select table_schema,table_name ")
+                .append(" from information_schema.tables where table_schema = 
'")
+                .append(dbName)
+                .append("' and table_name = '")
+                .append(tableName)
+                .append("' ;");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build SQL to check whether the column exists.
+     *
+     * @param dbName OceanBase database name
+     * @param tableName OceanBase table name
+     * @param columnName OceanBase column name
+     * @return the check column SQL string
+     */
+    public static String getCheckColumn(String dbName, String tableName, 
String columnName) {
+        final StringBuilder sqlBuilder = new StringBuilder()
+                .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ")
+                .append(" from  information_schema.COLUMNS where 
table_schema='")
+                .append(dbName)
+                .append("' and table_name = '")
+                .append(tableName)
+                .append("' and column_name = '")
+                .append(columnName)
+                .append("';");
+        LOGGER.info("check table sql: {}", sqlBuilder);
+        return sqlBuilder.toString();
+    }
+
+    /**
+     * Build create database SQL.
+     *
+     * @param dbName OceanBase database name
+     * @return the create database SQL string
+     */
+    public static String buildCreateDbSql(String dbName) {
+        final String sql = "CREATE DATABASE " + dbName;
+        LOGGER.info("create db sql: {}", sql);
+        return sql;
+    }
+
+    /**
+     * Build create table SQL by OceanBaseTableInfo.
+     *
+     * @param table OceanBase table info {@link OceanBaseTableInfo}
+     * @return the create table SQL String
+     */
+    public static String buildCreateTableSql(OceanBaseTableInfo table) {
+        final StringBuilder sql = new StringBuilder()
+                .append("CREATE TABLE ").append(table.getDbName())
+                .append(".")
+                .append(table.getTableName())
+                .append(buildCreateColumnsSql(table));
+
+        if (StringUtils.isEmpty(table.getEngine())) {
+            sql.append(" ENGINE=InnoDB ");
+        } else {
+            sql.append(" ENGINE=")
+                    .append(table.getEngine())
+                    .append(" ");
+        }
+
+        if (!StringUtils.isEmpty(table.getCharset())) {
+            sql.append(" DEFAULT CHARSET=")
+                    .append(table.getCharset())
+                    .append(" ");
+        }
+
+        LOGGER.info("create table sql: {}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build add columns SQL.
+     *
+     * @param tableName OceanBase table name
+     * @param columnList OceanBase column list {@link List}
+     * @return add column SQL string list
+     */
+    public static List<String> buildAddColumnsSql(String dbName, String 
tableName,
+            List<OceanBaseColumnInfo> columnList) {
+        final List<String> columnInfoList = getColumnsInfo(columnList);
+        final List<String> resultList = new ArrayList<>();
+        final StringBuilder sqlBuilder = new StringBuilder();
+        columnInfoList.forEach(columnInfo -> {
+            sqlBuilder.append("ALTER TABLE ")
+                    .append(dbName)
+                    .append(".")
+                    .append(tableName)
+                    .append(" ADD COLUMN ")
+                    .append(columnInfo)
+                    .append(";");
+            resultList.add(sqlBuilder.toString());
+            sqlBuilder.delete(0, sqlBuilder.length());
+        });
+        return resultList;
+    }
+
+    /**
+     * Build create column SQL.
+     *
+     * @param table OceanBase table info {@link OceanBaseTableInfo}
+     * @return create column SQL string
+     */
+    private static String buildCreateColumnsSql(OceanBaseTableInfo table) {
+        final List<String> columnList = getColumnsInfo(table.getColumns());
+        final StringBuilder sql = new StringBuilder()
+                .append(" (")
+                .append(StringUtils.join(columnList, ","));
+        if (!StringUtils.isEmpty(table.getPrimaryKey())) {
+            sql.append(", PRIMARY KEY (")
+                    .append(table.getPrimaryKey())
+                    .append(")");
+        }
+        sql.append(") ");
+        LOGGER.info("create columns sql={}", sql);
+        return sql.toString();
+    }
+
+    /**
+     * Build column info by OceanBaseColumnInfo list.
+     *
+     * @param columns OceanBase column info {@link OceanBaseColumnInfo} list
+     * @return the SQL list
+     */
+    private static List<String> getColumnsInfo(List<OceanBaseColumnInfo> 
columns) {
+        final List<String> columnList = new ArrayList<>();
+        final StringBuilder columnBuilder = new StringBuilder();
+        columns.forEach(columnInfo -> {
+            columnBuilder.append("`")
+                    .append(columnInfo.getName())
+                    .append("`")
+                    .append(" ")
+                    .append(columnInfo.getType());
+            if (!StringUtils.isEmpty(columnInfo.getComment())) {
+                columnBuilder.append(" COMMENT '")
+                        .append(columnInfo.getComment())
+                        .append("'");
+            }
+            columnBuilder.append(" ");
+            columnList.add(columnBuilder.toString());
+            columnBuilder.delete(0, columnBuilder.length());
+        });
+        return columnList;
+    }
+
+    /**
+     * Build query table SQL.
+     *
+     * @param dbName OceanBase database name
+     * @param tableName OceanBase table name
+     * @return desc table SQL string
+     */
+    public static String buildDescTableSql(String dbName, String tableName) {
+        final StringBuilder sql = new StringBuilder()
+                .append("SELECT COLUMN_NAME,COLUMN_TYPE,COLUMN_COMMENT ")
+                .append(" from  information_schema.COLUMNS where 
table_schema='")
+                .append(dbName)
+                .append("' and table_name = '")
+                .append(tableName)
+                .append("';");
+        LOGGER.info("desc table sql={}", sql);
+        return sql.toString();
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java
new file mode 100644
index 0000000000..debe791d82
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/oceanbase/OceanBaseSinkOperator.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.sink.oceanbase;
+
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeDTO;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSink;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkDTO;
+import org.apache.inlong.manager.pojo.sink.oceanbase.OceanBaseSinkRequest;
+import org.apache.inlong.manager.service.sink.AbstractSinkOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * OceanBase sink operator
+ */
+@Service
+public class OceanBaseSinkOperator extends AbstractSinkOperator {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OceanBaseSinkOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sinkType) {
+        return SinkType.OCEANBASE.equals(sinkType);
+    }
+
+    @Override
+    protected String getSinkType() {
+        return SinkType.OCEANBASE;
+    }
+
+    @Override
+    protected void setTargetEntity(SinkRequest request, StreamSinkEntity 
targetEntity) {
+        if (!this.getSinkType().equals(request.getSinkType())) {
+            throw new BusinessException(ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT,
+                    ErrorCodeEnum.SINK_TYPE_NOT_SUPPORT.getMessage() + ": " + 
getSinkType());
+        }
+        OceanBaseSinkRequest sinkRequest = (OceanBaseSinkRequest) request;
+        try {
+            OceanBaseSinkDTO dto = 
OceanBaseSinkDTO.getFromRequest(sinkRequest, targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SINK_SAVE_FAILED,
+                    String.format("serialize extParams of OceanBase SinkDTO 
failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public StreamSink getFromEntity(StreamSinkEntity entity) {
+        OceanBaseSink sink = new OceanBaseSink();
+        if (entity == null) {
+            return sink;
+        }
+
+        OceanBaseSinkDTO dto = 
OceanBaseSinkDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getJdbcUrl())) {
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new BusinessException(ErrorCodeEnum.SINK_INFO_INCORRECT,
+                        "OceanBase jdbc url not specified and data node is 
blank");
+            }
+            DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(
+                    entity.getDataNodeName(), entity.getSinkType());
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            
dto.setJdbcUrl(OceanBaseDataNodeDTO.convertToJdbcurl(dataNodeInfo.getUrl()));
+            dto.setPassword(dataNodeInfo.getToken());
+        }
+        CommonBeanUtils.copyProperties(entity, sink, true);
+        CommonBeanUtils.copyProperties(dto, sink, true);
+        List<SinkField> sinkFields = super.getSinkFields(entity.getId());
+        sink.setSinkFieldList(sinkFields);
+        return sink;
+    }
+
+}
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java
new file mode 100644
index 0000000000..ae55c41262
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/oceanbase/OceanBaseSourceOperator.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.source.oceanbase;
+
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SourceType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
+import org.apache.inlong.manager.pojo.node.oceanbase.OceanBaseDataNodeInfo;
+import org.apache.inlong.manager.pojo.source.SourceRequest;
+import org.apache.inlong.manager.pojo.source.StreamSource;
+import org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSource;
+import 
org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSourceDTO;
+import 
org.apache.inlong.manager.pojo.source.oceanbase.OceanBaseBinlogSourceRequest;
+import org.apache.inlong.manager.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.source.AbstractSourceOperator;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Binlog source operator
+ */
+@Service
+public class OceanBaseSourceOperator extends AbstractSourceOperator {
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String sourceType) {
+        return SourceType.OCEANBASE.equals(sourceType);
+    }
+
+    @Override
+    protected String getSourceType() {
+        return SourceType.OCEANBASE;
+    }
+
+    @Override
+    public String getExtParams(StreamSourceEntity sourceEntity) {
+        OceanBaseBinlogSourceDTO OceanBaseBinlogSourceDTO = 
JsonUtils.parseObject(sourceEntity.getExtParams(),
+                OceanBaseBinlogSourceDTO.class);
+        if (Objects.nonNull(OceanBaseBinlogSourceDTO) && 
StringUtils.isBlank(OceanBaseBinlogSourceDTO.getHostname())) {
+            OceanBaseDataNodeInfo dataNodeInfo = (OceanBaseDataNodeInfo) 
dataNodeService.get(
+                    sourceEntity.getDataNodeName(), DataNodeType.OCEANBASE);
+            CommonBeanUtils.copyProperties(dataNodeInfo, 
OceanBaseBinlogSourceDTO, true);
+            OceanBaseBinlogSourceDTO.setPassword(dataNodeInfo.getToken());
+            
OceanBaseBinlogSourceDTO.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+            
OceanBaseBinlogSourceDTO.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+            return JsonUtils.toJsonString(OceanBaseBinlogSourceDTO);
+        }
+        return sourceEntity.getExtParams();
+    }
+
+    @Override
+    protected void setTargetEntity(SourceRequest request, StreamSourceEntity 
targetEntity) {
+        OceanBaseBinlogSourceRequest sourceRequest = 
(OceanBaseBinlogSourceRequest) request;
+        CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
+        try {
+            OceanBaseBinlogSourceDTO dto =
+                    OceanBaseBinlogSourceDTO.getFromRequest(sourceRequest, 
targetEntity.getExtParams());
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                    String.format("serialize extParams of OceanBaseBinlog 
SourceDTO failure: %s", e.getMessage()));
+        }
+    }
+
+    @Override
+    public StreamSource getFromEntity(StreamSourceEntity entity) {
+        OceanBaseBinlogSource source = new OceanBaseBinlogSource();
+        if (entity == null) {
+            return source;
+        }
+
+        OceanBaseBinlogSourceDTO dto = 
OceanBaseBinlogSourceDTO.getFromJson(entity.getExtParams());
+        if (StringUtils.isBlank(dto.getHostname())) {
+            if (StringUtils.isBlank(entity.getDataNodeName())) {
+                throw new 
BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT,
+                        "OceanBase url and data node is blank");
+            }
+            OceanBaseDataNodeInfo dataNodeInfo = (OceanBaseDataNodeInfo) 
dataNodeService.get(
+                    entity.getDataNodeName(), DataNodeType.OCEANBASE);
+            CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+            dto.setPassword(dataNodeInfo.getToken());
+            
dto.setHostname(dataNodeInfo.getUrl().split(InlongConstants.COLON)[0]);
+            
dto.setPort(Integer.valueOf(dataNodeInfo.getUrl().split(InlongConstants.COLON)[1]));
+        }
+        CommonBeanUtils.copyProperties(entity, source, true);
+        CommonBeanUtils.copyProperties(dto, source, true);
+
+        List<StreamField> sourceFields = super.getSourceFields(entity.getId());
+        source.setFieldList(sourceFields);
+        return source;
+    }
+
+}

Reply via email to