This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7ba11cecd [Feature][Connector-V2][Jdbc] Add oceanbase dialect factory 
(#4989)
7ba11cecd is described below

commit 7ba11cecdf7ac70c65ff5df501bbce8b499967c9
Author: He Wang <wanghe...@qq.com>
AuthorDate: Tue Jul 11 17:34:02 2023 +0800

    [Feature][Connector-V2][Jdbc] Add oceanbase dialect factory (#4989)
    
    
    ---------
    
    Co-authored-by: silenceland <silencelan...@163.com>
    Co-authored-by: changhuyan <877018...@qq.com>
---
 docs/en/connector-v2/sink/Jdbc.md                  |   6 +
 docs/en/connector-v2/sink/OceanBase.md             | 186 +++++++++++++++
 docs/en/connector-v2/source/Jdbc.md                |   6 +
 docs/en/connector-v2/source/OceanBase.md           | 168 ++++++++++++++
 .../jdbc/config/JdbcConnectionConfig.java          |  13 ++
 .../seatunnel/jdbc/config/JdbcOptions.java         |   6 +
 .../seatunnel/jdbc/config/JdbcSourceConfig.java    |   2 +
 .../jdbc/internal/dialect/JdbcDialectFactory.java  |  10 +
 .../jdbc/internal/dialect/JdbcDialectLoader.java   |   5 +-
 .../dialect/oceanbase/OceanBaseDialectFactory.java |  49 ++++
 .../connectors/seatunnel/jdbc/sink/JdbcSink.java   |   5 +-
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |   9 +-
 .../seatunnel/jdbc/source/JdbcSource.java          |   4 +-
 .../seatunnel/jdbc/source/JdbcSourceFactory.java   |   9 +-
 .../seatunnel/jdbc/JdbcOceanBaseITBase.java        | 147 ++++++++++++
 .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java       | 256 +++++++++++++++++++++
 .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java      | 161 +++++++++++++
 .../jdbc_oceanbase_mysql_source_and_sink.conf      |  55 +++++
 .../jdbc_oceanbase_oracle_source_and_sink.conf     |  53 +++++
 .../e2e/connector/pulsar/PulsarBatchIT.java        |   2 +
 20 files changed, 1144 insertions(+), 8 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index d472d9a33..f128f6b4b 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | user                                      | String  | No       | -           
  |
 | password                                  | String  | No       | -           
  |
 | query                                     | String  | No       | -           
  |
+| compatible_mode                           | String  | No       | -           
  |
 | database                                  | String  | No       | -           
  |
 | table                                     | String  | No       | -           
  |
 | primary_keys                              | Array   | No       | -           
  |
@@ -69,6 +70,10 @@ The URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost/tes
 
 Use this sql write upstream input datas to database. e.g `INSERT ...`
 
+### compatible_mode [string]
+
+The compatible mode of database, required when the database supports multiple 
compatible modes. For example, when using OceanBase database, you need to set 
it to 'mysql' or 'oracle'.
+
 ### database [string]
 
 Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.
@@ -168,6 +173,7 @@ there are some reference value for params above.
 | Redshift   | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
 | Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc:snowflake://<account_name>.snowflakecomputing.com             | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                            |
 | Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
+| OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
 
 ## Example
 
diff --git a/docs/en/connector-v2/sink/OceanBase.md 
b/docs/en/connector-v2/sink/OceanBase.md
new file mode 100644
index 000000000..ec87ce3d3
--- /dev/null
+++ b/docs/en/connector-v2/sink/OceanBase.md
@@ -0,0 +1,186 @@
+# OceanBase
+
+> JDBC OceanBase Sink Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Write data through jdbc. Support Batch mode and Streaming mode, support 
concurrent writing, support exactly-once semantics.
+
+## Supported DataSource Info
+
+| Datasource |       Supported versions       |          Driver           |    
             Url                  |                                     Maven   
                                  |
+|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------|
+| OceanBase  | All OceanBase server versions. | com.oceanbase.jdbc.Driver | 
jdbc:oceanbase://localhost:2883/test | 
[Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the 
'$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+### Mysql Mode
+
+|                                                          Mysql Data type     
                                                     |                          
                                       SeaTunnel Data type                      
                                           |
+|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| BIT(1)<br/>INT UNSIGNED                                                      
                                                     | BOOLEAN                  
                                                                                
                                           |
+| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT 
UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR | 
INT                                                                             
                                                                    |
+| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT                                 
                                                     | BIGINT                   
                                                                                
                                           |
+| BIGINT UNSIGNED                                                              
                                                     | DECIMAL(20,0)            
                                                                                
                                           |
+| DECIMAL(x,y)(Get the designated column's specified column size.<38)          
                                                     | DECIMAL(x,y)             
                                                                                
                                           |
+| DECIMAL(x,y)(Get the designated column's specified column size.>38)          
                                                     | DECIMAL(38,18)           
                                                                                
                                           |
+| DECIMAL UNSIGNED                                                             
                                                     | DECIMAL((Get the 
designated column's specified column size)+1,<br/>(Gets the designated column's 
number of digits to right of the decimal point.))) |
+| FLOAT<br/>FLOAT UNSIGNED                                                     
                                                     | FLOAT                    
                                                                                
                                           |
+| DOUBLE<br/>DOUBLE UNSIGNED                                                   
                                                     | DOUBLE                   
                                                                                
                                           |
+| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON  
                                                     | STRING                   
                                                                                
                                           |
+| DATE                                                                         
                                                     | DATE                     
                                                                                
                                           |
+| TIME                                                                         
                                                     | TIME                     
                                                                                
                                           |
+| DATETIME<br/>TIMESTAMP                                                       
                                                     | TIMESTAMP                
                                                                                
                                           |
+| 
TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n)
                                                  | BYTES                       
                                                                                
                                        |
+| GEOMETRY<br/>UNKNOWN                                                         
                                                     | Not supported yet        
                                                                                
                                           |
+
+### Oracle Mode
+
+|                     Oracle Data type                      | SeaTunnel Data 
type |
+|-----------------------------------------------------------|---------------------|
+| Number(p), p <= 9                                         | INT              
   |
+| Number(p), p <= 18                                        | BIGINT           
   |
+| Number(p), p > 18                                         | DECIMAL(38,18)   
   |
+| REAL<br/> BINARY_FLOAT                                    | FLOAT            
   |
+| BINARY_DOUBLE                                             | DOUBLE           
   |
+| CHAR<br/>NCHAR<br/>NVARCHAR2<br/>NCLOB<br/>CLOB<br/>ROWID | STRING           
   |
+| DATE                                                      | DATE             
   |
+| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE              | TIMESTAMP        
   |
+| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE                       | BYTES            
   |
+| UNKNOWN                                                   | Not supported 
yet   |
+
+## Sink Options
+
+|                   Name                    |  Type   | Required | Default |   
                                                                                
                              Description                                       
                                                                           |
+|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                                       | String  | Yes      | -       | 
The URL of the JDBC connection. Refer to a case: 
jdbc:oceanbase://localhost:2883/test                                            
                                                                                
                            |
+| driver                                    | String  | Yes      | -       | 
The jdbc class name used to connect to the remote data source, should be 
`com.oceanbase.jdbc.Driver`.                                                    
                                                                                
    |
+| user                                      | String  | No       | -       | 
Connection instance user name                                                   
                                                                                
                                                                             |
+| password                                  | String  | No       | -       | 
Connection instance password                                                    
                                                                                
                                                                             |
+| query                                     | String  | No       | -       | 
Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` 
have the higher priority                                                        
                                                                               |
+| compatible_mode                           | String  | Yes      | -       | 
The compatible mode of OceanBase, can be 'mysql' or 'oracle'.                   
                                                                                
                                                                             |
+| database                                  | String  | No       | -       | 
Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.<br/>This option is mutually exclusive with 
`query` and has a higher priority.                                              
       |
+| table                                     | String  | No       | -       | 
Use database and this table-name auto-generate sql and receive upstream input 
datas write to database.<br/>This option is mutually exclusive with `query` and 
has a higher priority.                                                         |
+| primary_keys                              | Array   | No       | -       | 
This option is used to support operations such as `insert`, `delete`, and 
`update` when automatically generate sql.                                       
                                                                                
   |
+| support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
+| connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
+| max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be 
flushed into the database                                                       
    |
+| batch_interval_ms                         | Int     | No       | 1000    | 
For batch writing, when the number of buffers reaches the number of 
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed 
into the database                                                               
          |
+| generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to        
                                                                                
                                                                             |
+| max_commit_attempts                       | Int     | No       | 3       | 
The number of retries for transaction commit failures                           
                                                                                
                                                                             |
+| transaction_timeout_sec                   | Int     | No       | -1      | 
The timeout after the transaction is opened, the default is -1 (never timeout). 
Note that setting the timeout may affect<br/>exactly-once semantics             
                                                                             |
+| auto_commit                               | Boolean | No       | true    | 
Automatic transaction commit is enabled by default                              
                                                                                
                                                                             |
+| common-options                            |         | no       | -       | 
Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details                                         
                                                                                
                 |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+> This example defines a SeaTunnel synchronization task that automatically 
generates data through FakeSource and sends it to JDBC Sink. FakeSource 
generates a total of 16 rows of data (row.num=16), with each row having two 
fields, name (string type) and age (int type). The final target table is 
test_table will also be 16 rows of data in the table. Before run this job, you 
need create database test and table test_table in your mysql. And if you have 
not yet installed and deployed SeaTunne [...]
+
+```
+# Defining the runtime environment
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    result_table_name = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/category/transform-v2
+}
+
+sink {
+    jdbc {
+        url = "jdbc:oceanbase://localhost:2883/test"
+        driver = "com.oceanbase.jdbc.Driver"
+        user = "root"
+        password = "123456"
+        compatible_mode = "mysql"
+        query = "insert into test_table(name,age) values(?,?)"
+    }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
+```
+
+### Generate Sink SQL
+
+> This example  not need to write complex sql statements, you can configure 
the database name table name to automatically generate add statements for you
+
+```
+sink {
+    jdbc {
+        url = "jdbc:oceanbase://localhost:2883/test"
+        driver = "com.oceanbase.jdbc.Driver"
+        user = "root"
+        password = "123456"
+        compatible_mode = "mysql"
+        # Automatically generate sql statements based on database table names
+        generate_sink_sql = true
+        database = test
+        table = test_table
+    }
+}
+```
+
+### CDC(Change Data Capture) Event
+
+> CDC change data is also supported by us In this case, you need config 
database, table and primary_keys.
+
+```
+sink {
+    jdbc {
+        url = "jdbc:oceanbase://localhost:3306/test"
+        driver = "com.oceanbase.jdbc.Driver"
+        user = "root"
+        password = "123456"
+        compatible_mode = "mysql"
+        generate_sink_sql = true
+        # You need to configure both database and table
+        database = test
+        table = sink_table
+        primary_keys = ["id","name"]
+    }
+}
+```
+
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 528114754..a324316e5 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -35,6 +35,7 @@ supports query SQL and can achieve projection effect.
 | user                         | String | No       | -               |
 | password                     | String | No       | -               |
 | query                        | String | Yes      | -               |
+| compatible_mode              | String | No       | -               |
 | connection_check_timeout_sec | Int    | No       | 30              |
 | partition_column             | String | No       | -               |
 | partition_upper_bound        | Long   | No       | -               |
@@ -63,6 +64,10 @@ The URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost/tes
 
 Query statement
 
+### compatible_mode [string]
+
+The compatible mode of database, required when the database supports multiple 
compatible modes. For example, when using OceanBase database, you need to set 
it to 'mysql' or 'oracle'.
+
 ### connection_check_timeout_sec [int]
 
 The time in seconds to wait for the database operation used to validate the 
connection to complete.
@@ -120,6 +125,7 @@ there are some reference value for params above.
 | Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver           | 
jdbc:snowflake://<account_name>.snowflakecomputing.com                 | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                            |
 | Redshift   | com.amazon.redshift.jdbc42.Driver                   | 
jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000         | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
 | Vertica    | com.vertica.jdbc.Driver                             | 
jdbc:vertica://localhost:5433                                          | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
+| OceanBase  | com.oceanbase.jdbc.Driver                           | 
jdbc:oceanbase://localhost:2881                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
 
 ## Example
 
diff --git a/docs/en/connector-v2/source/OceanBase.md 
b/docs/en/connector-v2/source/OceanBase.md
new file mode 100644
index 000000000..9625ef4fb
--- /dev/null
+++ b/docs/en/connector-v2/source/OceanBase.md
@@ -0,0 +1,168 @@
+# OceanBase
+
+> JDBC OceanBase Source Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource Info
+
+| Datasource |       Supported versions       |          Driver           |    
             Url                  |                                     Maven   
                                  |
+|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------|
+| OceanBase  | All OceanBase server versions. | com.oceanbase.jdbc.Driver | 
jdbc:oceanbase://localhost:2883/test | 
[Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the 
'$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+### Mysql Mode
+
+|                                                          Mysql Data type     
                                                     |                          
                                       SeaTunnel Data type                      
                                           |
+|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| BIT(1)<br/>INT UNSIGNED                                                      
                                                     | BOOLEAN                  
                                                                                
                                           |
+| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT 
UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR | 
INT                                                                             
                                                                    |
+| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT                                 
                                                     | BIGINT                   
                                                                                
                                           |
+| BIGINT UNSIGNED                                                              
                                                     | DECIMAL(20,0)            
                                                                                
                                           |
+| DECIMAL(x,y)(Get the designated column's specified column size.<38)          
                                                     | DECIMAL(x,y)             
                                                                                
                                           |
+| DECIMAL(x,y)(Get the designated column's specified column size.>38)          
                                                     | DECIMAL(38,18)           
                                                                                
                                           |
+| DECIMAL UNSIGNED                                                             
                                                     | DECIMAL((Get the 
designated column's specified column size)+1,<br/>(Gets the designated column's 
number of digits to right of the decimal point.))) |
+| FLOAT<br/>FLOAT UNSIGNED                                                     
                                                     | FLOAT                    
                                                                                
                                           |
+| DOUBLE<br/>DOUBLE UNSIGNED                                                   
                                                     | DOUBLE                   
                                                                                
                                           |
+| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON  
                                                     | STRING                   
                                                                                
                                           |
+| DATE                                                                         
                                                     | DATE                     
                                                                                
                                           |
+| TIME                                                                         
                                                     | TIME                     
                                                                                
                                           |
+| DATETIME<br/>TIMESTAMP                                                       
                                                     | TIMESTAMP                
                                                                                
                                           |
+| 
TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n)
                                                  | BYTES                       
                                                                                
                                        |
+| GEOMETRY<br/>UNKNOWN                                                         
                                                     | Not supported yet        
                                                                                
                                           |
+
+### Oracle Mode
+
+|                     Oracle Data type                      | SeaTunnel Data 
type |
+|-----------------------------------------------------------|---------------------|
+| Number(p), p <= 9                                         | INT              
   |
+| Number(p), p <= 18                                        | BIGINT           
   |
+| Number(p), p > 18                                         | DECIMAL(38,18)   
   |
+| REAL<br/> BINARY_FLOAT                                    | FLOAT            
   |
+| BINARY_DOUBLE                                             | DOUBLE           
   |
+| CHAR<br/>NCHAR<br/>NVARCHAR2<br/>NCLOB<br/>CLOB<br/>ROWID | STRING           
   |
+| DATE                                                      | DATE             
   |
+| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE              | TIMESTAMP        
   |
+| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE                       | BYTES            
   |
+| UNKNOWN                                                   | Not supported 
yet   |
+
+## Source Options
+
+|             Name             |  Type  | Required |     Default     |         
                                                                                
                                     Description                                
                                                                                
              |
+|------------------------------|--------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                          | String | Yes      | -               | The URL 
of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test   
                                                                                
                                                                                
              |
+| driver                       | String | Yes      | -               | The 
jdbc class name used to connect to the remote data source, should be 
`com.oceanbase.jdbc.Driver`.                                                    
                                                                                
                             |
+| user                         | String | No       | -               | 
Connection instance user name                                                   
                                                                                
                                                                                
                      |
+| password                     | String | No       | -               | 
Connection instance password                                                    
                                                                                
                                                                                
                      |
+| compatible_mode              | String | Yes      | -               | The 
compatible mode of OceanBase, can be 'mysql' or 'oracle'.                       
                                                                                
                                                                                
                  |
+| query                        | String | Yes      | -               | Query 
statement                                                                       
                                                                                
                                                                                
                |
+| connection_check_timeout_sec | Int    | No       | 30              | The 
time in seconds to wait for the database operation used to validate the 
connection to complete                                                          
                                                                                
                          |
+| partition_column             | String | No       | -               | The 
column name for parallelism's partition, only support numeric type column and 
string type column.                                                             
                                                                                
                    |
+| partition_lower_bound        | Long   | No       | -               | The 
partition_column min value for scan, if not set SeaTunnel will query database 
get min value.                                                                  
                                                                                
                    |
+| partition_upper_bound        | Long   | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                    |
+| partition_num                | Int    | No       | job parallelism | The 
number of partition count, only support positive integer. Default value is job 
parallelism.                                                                    
                                                                                
                   |
+| fetch_size                   | Int    | No       | 0               | For 
queries that return a large number of objects, you can configure <br/> the row 
fetch size used in the query to improve performance by <br/> reducing the 
number database hits required to satisfy the selection criteria.<br/> Zero 
means use jdbc default value. |
+| common-options               |        | No       | -               | Source 
plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                      |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+```
+env {
+  execution.parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = "com.oceanbase.jdbc.Driver"
+    url = 
"jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+    user = "root"
+    password = ""
+    compatible_mode = "mysql"
+    query = "select * from source"
+  }
+}
+
+transform {
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    Console {}
+}
+```
+
+### Parallel:
+
+> Read your query table in parallel with the shard field you configured and 
the shard data. You can do this if you want to read the whole table
+
+```
+source {
+  Jdbc {
+    driver = "com.oceanbase.jdbc.Driver"
+    url = 
"jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+    user = "root"
+    password = ""
+    compatible_mode = "mysql"
+    query = "select * from source"
+    # Parallel sharding reads fields
+    partition_column = "id"
+    # Number of fragments
+    partition_num = 10
+  }
+}
+```
+
+### Parallel Boundary:
+
+> It is more efficient to read your data source according to the upper and 
lower boundaries you configured
+
+```
+source {
+  Jdbc {
+    driver = "com.oceanbase.jdbc.Driver"
+    url = 
"jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+    user = "root"
+    password = ""
+    compatible_mode = "mysql"
+    query = "select * from source"
+    partition_column = "id"
+    partition_num = 10
+    # Read start boundary
+    partition_lower_bound = 1
+    # Read end boundary
+    partition_upper_bound = 500
+  }
+}
+```
+
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index afceddc59..6e2147c03 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -27,6 +27,7 @@ public class JdbcConnectionConfig implements Serializable {
 
     public String url;
     public String driverName;
+    public String compatibleMode;
     public int connectionCheckTimeoutSeconds =
             JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
     public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
@@ -48,6 +49,7 @@ public class JdbcConnectionConfig implements Serializable {
     public static JdbcConnectionConfig of(ReadonlyConfig config) {
         JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder();
         builder.url(config.get(JdbcOptions.URL));
+        builder.compatibleMode(config.get(JdbcOptions.COMPATIBLE_MODE));
         builder.driverName(config.get(JdbcOptions.DRIVER));
         builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT));
         builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
@@ -74,6 +76,10 @@ public class JdbcConnectionConfig implements Serializable {
         return driverName;
     }
 
+    public String getCompatibleMode() {
+        return compatibleMode;
+    }
+
     public boolean isAutoCommit() {
         return autoCommit;
     }
@@ -121,6 +127,7 @@ public class JdbcConnectionConfig implements Serializable {
     public static final class Builder {
         private String url;
         private String driverName;
+        private String compatibleMode;
         private int connectionCheckTimeoutSeconds =
                 JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
         private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
@@ -146,6 +153,11 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
+        public Builder compatibleMode(String compatibleMode) {
+            this.compatibleMode = compatibleMode;
+            return this;
+        }
+
         public Builder connectionCheckTimeoutSeconds(int 
connectionCheckTimeoutSeconds) {
             this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
             return this;
@@ -206,6 +218,7 @@ public class JdbcConnectionConfig implements Serializable {
             jdbcConnectionConfig.batchSize = this.batchSize;
             jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs;
             jdbcConnectionConfig.driverName = this.driverName;
+            jdbcConnectionConfig.compatibleMode = this.compatibleMode;
             jdbcConnectionConfig.maxRetries = this.maxRetries;
             jdbcConnectionConfig.password = this.password;
             jdbcConnectionConfig.connectionCheckTimeoutSeconds = 
this.connectionCheckTimeoutSeconds;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 87b2a7b46..24ae0580f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -36,6 +36,12 @@ public interface JdbcOptions {
                     .intType()
                     .defaultValue(30)
                     .withDescription("connection check time second");
+    Option<String> COMPATIBLE_MODE =
+            Options.key("compatible_mode")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The compatible mode of database, required when 
the database supports multiple compatible modes. For example, when using 
OceanBase database, you need to set it to 'mysql' or 'oracle'.");
 
     Option<Integer> MAX_RETRIES =
             
Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 4c6221549..00130b32a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -33,6 +33,7 @@ public class JdbcSourceConfig implements Serializable {
 
     private JdbcConnectionConfig jdbcConnectionConfig;
     public String query;
+    public String compatibleMode;
     private String partitionColumn;
     private BigDecimal partitionUpperBound;
     private BigDecimal partitionLowerBound;
@@ -44,6 +45,7 @@ public class JdbcSourceConfig implements Serializable {
         builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
         builder.query(config.get(JdbcOptions.QUERY));
         builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE));
+        
config.getOptional(JdbcOptions.COMPATIBLE_MODE).ifPresent(builder::compatibleMode);
         
config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn);
         config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND)
                 .ifPresent(builder::partitionUpperBound);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
index 5e5ae1b55..3d66de659 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -37,4 +37,14 @@ public interface JdbcDialectFactory {
 
     /** @return Creates a new instance of the {@link JdbcDialect}. */
     JdbcDialect create();
+
+    /**
+     * Create a {@link JdbcDialect} instance based on the driver type and 
compatible mode.
+     *
+     * @param compatibleMode The compatible mode
+     * @return a new instance of {@link JdbcDialect}
+     */
+    default JdbcDialect create(String compatibleMode) {
+        return create();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index 076a6734b..b49df35ff 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -40,11 +40,12 @@ public final class JdbcDialectLoader {
      * Loads the unique JDBC Dialect that can handle the given database url.
      *
      * @param url A database URL.
+     * @param compatibleMode The compatible mode.
      * @throws IllegalStateException if the loader cannot find exactly one 
dialect that can
      *     unambiguously process the given database URL.
      * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url) {
+    public static JdbcDialect load(String url, String compatibleMode) {
         ClassLoader cl = Thread.currentThread().getContextClassLoader();
         List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
 
@@ -89,7 +90,7 @@ public final class JdbcDialectLoader {
                                     .collect(Collectors.joining("\n"))));
         }
 
-        return matchingFactories.get(0).create();
+        return matchingFactories.get(0).create(compatibleMode);
     }
 
     private static List<JdbcDialectFactory> discoverFactories(ClassLoader 
classLoader) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
new file mode 100644
index 000000000..66df84205
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+@AutoService(JdbcDialectFactory.class)
+public class OceanBaseDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:oceanbase:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        throw new UnsupportedOperationException(
+                "Can't create JdbcDialect without compatible mode for 
OceanBase");
+    }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode) {
+        if ("oracle".equalsIgnoreCase(compatibleMode)) {
+            return new OracleDialect();
+        }
+        return new MysqlDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 4221172b1..4666eae1e 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -104,7 +104,10 @@ public class JdbcSink
     public void prepare(Config pluginConfig) throws PrepareFailException {
         this.config = ReadonlyConfig.fromConfig(pluginConfig);
         this.jdbcSinkConfig = JdbcSinkConfig.of(config);
-        this.dialect = 
JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
+        this.dialect =
+                JdbcDialectLoader.load(
+                        jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
+                        
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode());
         this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index ae2e49b1e..a9bb1c155 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -41,6 +41,7 @@ import java.util.Optional;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_SIZE;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER;
@@ -82,7 +83,10 @@ public class JdbcSinkFactory implements TableSinkFactory {
         }
         final ReadonlyConfig options = config;
         JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
-        JdbcDialect dialect = 
JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl());
+        JdbcDialect dialect =
+                JdbcDialectLoader.load(
+                        sinkConfig.getJdbcConnectionConfig().getUrl(),
+                        
sinkConfig.getJdbcConnectionConfig().getCompatibleMode());
         return () ->
                 new JdbcSink(
                         options,
@@ -106,7 +110,8 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         GENERATE_SINK_SQL,
                         AUTO_COMMIT,
                         SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST,
-                        PRIMARY_KEYS)
+                        PRIMARY_KEYS,
+                        COMPATIBLE_MODE)
                 .conditional(
                         IS_EXACTLY_ONCE,
                         true,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 39deac1ef..732892b21 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -99,7 +99,9 @@ public class JdbcSource
                 new 
SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
         this.query = jdbcSourceConfig.getQuery();
         this.jdbcDialect =
-                
JdbcDialectLoader.load(jdbcSourceConfig.getJdbcConnectionConfig().getUrl());
+                JdbcDialectLoader.load(
+                        jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
+                        
jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode());
         try (Connection connection = 
jdbcConnectionProvider.getOrEstablishConnection()) {
             this.typeInfo = initTableField(connection);
             this.partitionParameter =
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 8f9605182..43aa1c03d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.FETCH_SIZE;
@@ -83,7 +84,10 @@ public class JdbcSourceFactory implements TableSourceFactory 
{
         JdbcConnectionProvider connectionProvider =
                 new 
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
         final String querySql = config.getQuery();
-        JdbcDialect dialect = 
JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl());
+        JdbcDialect dialect =
+                JdbcDialectLoader.load(
+                        config.getJdbcConnectionConfig().getUrl(),
+                        config.getJdbcConnectionConfig().getCompatibleMode());
         TableSchema tableSchema = catalogTable.getTableSchema();
         SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
         Optional<PartitionParameter> partitionParameter =
@@ -228,7 +232,8 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                         PARTITION_COLUMN,
                         PARTITION_UPPER_BOUND,
                         PARTITION_LOWER_BOUND,
-                        PARTITION_NUM)
+                        PARTITION_NUM,
+                        COMPATIBLE_MODE)
                 .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
new file mode 100644
index 000000000..b8202e697
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT {
+
+    private static final String OCEANBASE_DATABASE = "seatunnel";
+    private static final String OCEANBASE_SOURCE = "source";
+    private static final String OCEANBASE_SINK = "sink";
+
+    private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" 
+ HOST + ":%s";
+    private static final String OCEANBASE_DRIVER_CLASS = 
"com.oceanbase.jdbc.Driver";
+
+    abstract String imageName();
+
+    abstract String host();
+
+    abstract int port();
+
+    abstract String username();
+
+    abstract String password();
+
+    abstract List<String> configFile();
+
+    abstract String createSqlTemplate();
+
+    abstract String[] getFieldNames();
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port());
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(imageName())
+                .networkAliases(host())
+                .containerEnv(containerEnv)
+                .driverClass(OCEANBASE_DRIVER_CLASS)
+                .host(HOST)
+                .port(port())
+                .localPort(port())
+                .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+                .jdbcUrl(jdbcUrl)
+                .userName(username())
+                .password(password())
+                .database(OCEANBASE_DATABASE)
+                .sourceTable(OCEANBASE_SOURCE)
+                .sinkTable(OCEANBASE_SINK)
+                .createSql(createSqlTemplate())
+                .configFile(configFile())
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .build();
+    }
+
+    @Override
+    void compareResult() {
+        String sourceSql =
+                String.format(
+                        "select * from %s.%s order by 1", OCEANBASE_DATABASE, 
OCEANBASE_SOURCE);
+        String sinkSql =
+                String.format("select * from %s.%s order by 1", 
OCEANBASE_DATABASE, OCEANBASE_SINK);
+        try {
+            Statement sourceStatement = connection.createStatement();
+            Statement sinkStatement = connection.createStatement();
+            ResultSet sourceResultSet = 
sourceStatement.executeQuery(sourceSql);
+            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+            Assertions.assertEquals(
+                    sourceResultSet.getMetaData().getColumnCount(),
+                    sinkResultSet.getMetaData().getColumnCount());
+            while (sourceResultSet.next()) {
+                if (sinkResultSet.next()) {
+                    for (String column : getFieldNames()) {
+                        Object source = sourceResultSet.getObject(column);
+                        Object sink = sinkResultSet.getObject(column);
+                        if (!Objects.deepEquals(source, sink)) {
+                            InputStream sourceAsciiStream = 
sourceResultSet.getBinaryStream(column);
+                            InputStream sinkAsciiStream = 
sinkResultSet.getBinaryStream(column);
+                            String sourceValue =
+                                    IOUtils.toString(sourceAsciiStream, 
StandardCharsets.UTF_8);
+                            String sinkValue =
+                                    IOUtils.toString(sinkAsciiStream, 
StandardCharsets.UTF_8);
+                            Assertions.assertEquals(sourceValue, sinkValue);
+                        }
+                    }
+                }
+            }
+            sourceResultSet.last();
+            sinkResultSet.last();
+        } catch (Exception e) {
+            throw new RuntimeException("Compare result error", e);
+        }
+    }
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";;
+    }
+
+    @Override
+    protected void createSchemaIfNeeded() {
+        String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+        try {
+            connection.prepareStatement(sql).executeUpdate();
+        } catch (Exception e) {
+            throw new SeaTunnelRuntimeException(
+                    JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql 
" + sql, e);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
new file mode 100644
index 000000000..548fecaee
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Disabled;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+@Disabled("Disabled due to insufficient hardware resources in the CI 
environment")
+public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase {
+
+    @Override
+    String imageName() {
+        return "oceanbase/oceanbase-ce:4.0.0.0";
+    }
+
+    @Override
+    String host() {
+        return "e2e_oceanbase_mysql";
+    }
+
+    @Override
+    int port() {
+        return 2881;
+    }
+
+    @Override
+    String username() {
+        return "root";
+    }
+
+    @Override
+    String password() {
+        return "";
+    }
+
+    @Override
+    List<String> configFile() {
+        return 
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");
+    }
+
+    @Override
+    String createSqlTemplate() {
+        return "CREATE TABLE IF NOT EXISTS %s\n"
+                + "(\n"
+                + "    `c_bit_1`                bit(1)                DEFAULT 
NULL,\n"
+                + "    `c_bit_8`                bit(8)                DEFAULT 
NULL,\n"
+                + "    `c_bit_16`               bit(16)               DEFAULT 
NULL,\n"
+                + "    `c_bit_32`               bit(32)               DEFAULT 
NULL,\n"
+                + "    `c_bit_64`               bit(64)               DEFAULT 
NULL,\n"
+                + "    `c_boolean`              tinyint(1)            DEFAULT 
NULL,\n"
+                + "    `c_tinyint`              tinyint(4)            DEFAULT 
NULL,\n"
+                + "    `c_tinyint_unsigned`     tinyint(3) unsigned   DEFAULT 
NULL,\n"
+                + "    `c_smallint`             smallint(6)           DEFAULT 
NULL,\n"
+                + "    `c_smallint_unsigned`    smallint(5) unsigned  DEFAULT 
NULL,\n"
+                + "    `c_mediumint`            mediumint(9)          DEFAULT 
NULL,\n"
+                + "    `c_mediumint_unsigned`   mediumint(8) unsigned DEFAULT 
NULL,\n"
+                + "    `c_int`                  int(11)               DEFAULT 
NULL,\n"
+                + "    `c_integer`              int(11)               DEFAULT 
NULL,\n"
+                + "    `c_bigint`               bigint(20)            DEFAULT 
NULL,\n"
+                + "    `c_bigint_unsigned`      bigint(20) unsigned   DEFAULT 
NULL,\n"
+                + "    `c_decimal`              decimal(20, 0)        DEFAULT 
NULL,\n"
+                + "    `c_decimal_unsigned`     decimal(38, 18)       DEFAULT 
NULL,\n"
+                + "    `c_float`                float                 DEFAULT 
NULL,\n"
+                + "    `c_float_unsigned`       float unsigned        DEFAULT 
NULL,\n"
+                + "    `c_double`               double                DEFAULT 
NULL,\n"
+                + "    `c_double_unsigned`      double unsigned       DEFAULT 
NULL,\n"
+                + "    `c_char`                 char(1)               DEFAULT 
NULL,\n"
+                + "    `c_tinytext`             tinytext,\n"
+                + "    `c_mediumtext`           mediumtext,\n"
+                + "    `c_text`                 text,\n"
+                + "    `c_varchar`              varchar(255)          DEFAULT 
NULL,\n"
+                + "    `c_json`                 json                  DEFAULT 
NULL,\n"
+                + "    `c_longtext`             longtext,\n"
+                + "    `c_date`                 date                  DEFAULT 
NULL,\n"
+                + "    `c_datetime`             datetime              DEFAULT 
NULL,\n"
+                + "    `c_timestamp`            timestamp NULL        DEFAULT 
NULL,\n"
+                + "    `c_tinyblob`             tinyblob,\n"
+                + "    `c_mediumblob`           mediumblob,\n"
+                + "    `c_blob`                 blob,\n"
+                + "    `c_longblob`             longblob,\n"
+                + "    `c_varbinary`            varbinary(255)        DEFAULT 
NULL,\n"
+                + "    `c_binary`               binary(1)             DEFAULT 
NULL,\n"
+                + "    `c_year`                 year(4)               DEFAULT 
NULL,\n"
+                + "    `c_int_unsigned`         int(10) unsigned      DEFAULT 
NULL,\n"
+                + "    `c_integer_unsigned`     int(10) unsigned      DEFAULT 
NULL,\n"
+                + "    `c_bigint_30`            BIGINT(40)  unsigned  DEFAULT 
NULL,\n"
+                + "    `c_decimal_unsigned_30`  DECIMAL(30) unsigned  DEFAULT 
NULL,\n"
+                + "    `c_decimal_30`           DECIMAL(30)           DEFAULT 
NULL\n"
+                + ");";
+    }
+
+    @Override
+    String[] getFieldNames() {
+        return new String[] {
+            "c_bit_1",
+            "c_bit_8",
+            "c_bit_16",
+            "c_bit_32",
+            "c_bit_64",
+            "c_boolean",
+            "c_tinyint",
+            "c_tinyint_unsigned",
+            "c_smallint",
+            "c_smallint_unsigned",
+            "c_mediumint",
+            "c_mediumint_unsigned",
+            "c_int",
+            "c_integer",
+            "c_year",
+            "c_int_unsigned",
+            "c_integer_unsigned",
+            "c_bigint",
+            "c_bigint_unsigned",
+            "c_decimal",
+            "c_decimal_unsigned",
+            "c_float",
+            "c_float_unsigned",
+            "c_double",
+            "c_double_unsigned",
+            "c_char",
+            "c_tinytext",
+            "c_mediumtext",
+            "c_text",
+            "c_varchar",
+            "c_json",
+            "c_longtext",
+            "c_date",
+            "c_datetime",
+            "c_timestamp",
+            "c_tinyblob",
+            "c_mediumblob",
+            "c_blob",
+            "c_longblob",
+            "c_varbinary",
+            "c_binary",
+            "c_bigint_30",
+            "c_decimal_unsigned_30",
+            "c_decimal_30",
+        };
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames = getFieldNames();
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+        BigDecimal decimalValue = new 
BigDecimal("999999999999999999999999999899");
+        for (int i = 0; i < 100; i++) {
+            byte byteArr = Integer.valueOf(i).byteValue();
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i % 2 == 0 ? (byte) 1 : (byte) 0,
+                                new byte[] {byteArr},
+                                new byte[] {byteArr, byteArr},
+                                new byte[] {byteArr, byteArr, byteArr, 
byteArr},
+                                new byte[] {
+                                    byteArr, byteArr, byteArr, byteArr, 
byteArr, byteArr, byteArr,
+                                    byteArr
+                                },
+                                i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                i,
+                                Long.parseLong("1"),
+                                Long.parseLong("1"),
+                                Long.parseLong("1"),
+                                BigDecimal.valueOf(i, 0),
+                                BigDecimal.valueOf(i, 18),
+                                BigDecimal.valueOf(i, 18),
+                                Float.parseFloat("1.1"),
+                                Float.parseFloat("1.1"),
+                                Double.parseDouble("1.1"),
+                                Double.parseDouble("1.1"),
+                                "f",
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("f1_%s", i),
+                                String.format("{\"aa\":\"bb_%s\"}", i),
+                                String.format("f1_%s", i),
+                                Date.valueOf(LocalDate.now()),
+                                Timestamp.valueOf(LocalDateTime.now()),
+                                new Timestamp(System.currentTimeMillis()),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "test".getBytes(),
+                                "f".getBytes(),
+                                bigintValue.add(BigDecimal.valueOf(i)),
+                                decimalValue.add(BigDecimal.valueOf(i)),
+                                decimalValue.add(BigDecimal.valueOf(i)),
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(imageName())
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(host())
+                        .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
+                        .withStartupTimeout(Duration.ofMinutes(5))
+                        .withLogConsumer(
+                                new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName())));
+
+        container.setPortBindings(Lists.newArrayList(String.format("%s:%s", 
port(), port())));
+
+        return container;
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
new file mode 100644
index 000000000..4c3cca5dd
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Disabled;
+import org.testcontainers.containers.GenericContainer;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker 
environment")
+public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase {
+
+    @Override
+    String imageName() {
+        return null;
+    }
+
+    @Override
+    String host() {
+        return "e2e_oceanbase_oracle";
+    }
+
+    @Override
+    int port() {
+        return 2883;
+    }
+
+    @Override
+    String username() {
+        return "root";
+    }
+
+    @Override
+    String password() {
+        return "";
+    }
+
+    @Override
+    List<String> configFile() {
+        return 
Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf");
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void startUp() {
+        jdbcCase = getJdbcCase();
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(360, TimeUnit.SECONDS)
+                .untilAsserted(() -> 
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+
+        createSchemaIfNeeded();
+        createNeededTables();
+        insertTestData();
+    }
+
+    @Override
+    public String quoteIdentifier(String field) {
+        return "\"" + field + "\"";
+    }
+
+    @Override
+    String createSqlTemplate() {
+        return "create table %s\n"
+                + "(\n"
+                + "    VARCHAR_10_COL                varchar2(10),\n"
+                + "    CHAR_10_COL                   char(10),\n"
+                + "    CLOB_COL                      clob,\n"
+                + "    NUMBER_3_SF_2_DP              number(3, 2),\n"
+                + "    INTEGER_COL                   integer,\n"
+                + "    FLOAT_COL                     float(10),\n"
+                + "    REAL_COL                      real,\n"
+                + "    BINARY_FLOAT_COL              binary_float,\n"
+                + "    BINARY_DOUBLE_COL             binary_double,\n"
+                + "    DATE_COL                      date,\n"
+                + "    TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n"
+                + "    TIMESTAMP_WITH_LOCAL_TZ       timestamp with local time 
zone\n"
+                + ")";
+    }
+
+    @Override
+    String[] getFieldNames() {
+        return new String[] {
+            "VARCHAR_10_COL",
+            "CHAR_10_COL",
+            "CLOB_COL",
+            "NUMBER_3_SF_2_DP",
+            "INTEGER_COL",
+            "FLOAT_COL",
+            "REAL_COL",
+            "BINARY_FLOAT_COL",
+            "BINARY_DOUBLE_COL",
+            "DATE_COL",
+            "TIMESTAMP_WITH_3_FRAC_SEC_COL",
+            "TIMESTAMP_WITH_LOCAL_TZ"
+        };
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames = getFieldNames();
+
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                String.format("f%s", i),
+                                String.format("f%s", i),
+                                String.format("f%s", i),
+                                BigDecimal.valueOf(1.1),
+                                i,
+                                Float.parseFloat("2.2"),
+                                Float.parseFloat("2.2"),
+                                Float.parseFloat("22.2"),
+                                Double.parseDouble("2.2"),
+                                Date.valueOf(LocalDate.now()),
+                                Timestamp.valueOf(LocalDateTime.now()),
+                                Timestamp.valueOf(LocalDateTime.now())
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
new file mode 100644
index 000000000..098d3ffae
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  Jdbc {
+    driver = com.oceanbase.jdbc.Driver
+    url = 
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
+    user = root
+    password = ""
+    query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, 
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, 
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, 
c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, 
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, 
c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, 
c_varbinary, c_binary, c_yea [...]
+    compatible_mode = "mysql"
+  }
+
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to 
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+sink {
+  Jdbc {
+    driver = com.oceanbase.jdbc.Driver
+    url = 
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
+    user = root
+    password = ""
+    query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, 
c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, 
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, 
c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, 
c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, 
c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, 
c_mediumblob, c_blob, c_longblob, c_varbinary, c_bin [...]
+    compatible_mode = "mysql"
+  }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
new file mode 100644
index 000000000..bf2b1ccf0
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+  #execution.checkpoint.interval = 10000
+  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+  jdbc{
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+    driver = com.oceanbase.jdbc.Driver
+    user = "root"
+    password = ""
+    query = "SELECT 
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ
 FROM source"
+    compatible_mode = "oracle"
+  }
+}
+
+transform {
+}
+
+sink {
+  jdbc{
+    url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+    driver = com.oceanbase.jdbc.Driver
+    user = "root"
+    password = ""
+    query = "INSERT INTO sink 
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ)
 VALUES(?,?,?,?,?,?,?,?,?,?,?,?)"
+    compatible_mode = "oracle"
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
index b1ea69efa..092f37f9b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -56,6 +56,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -113,6 +114,7 @@ public class PulsarBatchIT extends TestSuiteBase implements 
TestResource {
                 new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
                         .withNetwork(NETWORK)
                         .withNetworkAliases(PULSAR_HOST)
+                        .withStartupTimeout(Duration.ofMinutes(3))
                         .withLogConsumer(
                                 new Slf4jLogConsumer(
                                         
DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));


Reply via email to