This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 7ba11cecd [Feature][Connector-V2][Jdbc] Add oceanbase dialect factory (#4989) 7ba11cecd is described below commit 7ba11cecdf7ac70c65ff5df501bbce8b499967c9 Author: He Wang <wanghe...@qq.com> AuthorDate: Tue Jul 11 17:34:02 2023 +0800 [Feature][Connector-V2][Jdbc] Add oceanbase dialect factory (#4989) --------- Co-authored-by: silenceland <silencelan...@163.com> Co-authored-by: changhuyan <877018...@qq.com> --- docs/en/connector-v2/sink/Jdbc.md | 6 + docs/en/connector-v2/sink/OceanBase.md | 186 +++++++++++++++ docs/en/connector-v2/source/Jdbc.md | 6 + docs/en/connector-v2/source/OceanBase.md | 168 ++++++++++++++ .../jdbc/config/JdbcConnectionConfig.java | 13 ++ .../seatunnel/jdbc/config/JdbcOptions.java | 6 + .../seatunnel/jdbc/config/JdbcSourceConfig.java | 2 + .../jdbc/internal/dialect/JdbcDialectFactory.java | 10 + .../jdbc/internal/dialect/JdbcDialectLoader.java | 5 +- .../dialect/oceanbase/OceanBaseDialectFactory.java | 49 ++++ .../connectors/seatunnel/jdbc/sink/JdbcSink.java | 5 +- .../seatunnel/jdbc/sink/JdbcSinkFactory.java | 9 +- .../seatunnel/jdbc/source/JdbcSource.java | 4 +- .../seatunnel/jdbc/source/JdbcSourceFactory.java | 9 +- .../seatunnel/jdbc/JdbcOceanBaseITBase.java | 147 ++++++++++++ .../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 256 +++++++++++++++++++++ .../seatunnel/jdbc/JdbcOceanBaseOracleIT.java | 161 +++++++++++++ .../jdbc_oceanbase_mysql_source_and_sink.conf | 55 +++++ .../jdbc_oceanbase_oracle_source_and_sink.conf | 53 +++++ .../e2e/connector/pulsar/PulsarBatchIT.java | 2 + 20 files changed, 1144 insertions(+), 8 deletions(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index d472d9a33..f128f6b4b 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. | user | String | No | - | | password | String | No | - | | query | String | No | - | +| compatible_mode | String | No | - | | database | String | No | - | | table | String | No | - | | primary_keys | Array | No | - | @@ -69,6 +70,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes Use this sql write upstream input datas to database. e.g `INSERT ...` +### compatible_mode [string] + +The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. + ### database [string] Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database. @@ -168,6 +173,7 @@ there are some reference value for params above. | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | ## Example diff --git a/docs/en/connector-v2/sink/OceanBase.md b/docs/en/connector-v2/sink/OceanBase.md new file mode 100644 index 000000000..ec87ce3d3 --- /dev/null +++ b/docs/en/connector-v2/sink/OceanBase.md @@ -0,0 +1,186 @@ +# OceanBase + +> JDBC OceanBase Sink Connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +## Description + +Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------| +| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2883/test | [Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/> +> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +### Mysql Mode + +| Mysql Data type | SeaTunnel Data type | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT(1)<br/>INT UNSIGNED | BOOLEAN | +| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR | INT | +| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) | +| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) | +| DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1,<br/>(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT<br/>FLOAT UNSIGNED | FLOAT | +| DOUBLE<br/>DOUBLE UNSIGNED | DOUBLE | +| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME<br/>TIMESTAMP | TIMESTAMP | +| TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n) | BYTES | +| GEOMETRY<br/>UNKNOWN | Not supported yet | + +### Oracle Mode + +| Oracle Data type | SeaTunnel Data type | +|-----------------------------------------------------------|---------------------| +| Number(p), p <= 9 | INT | +| Number(p), p <= 18 | BIGINT | +| Number(p), p > 18 | DECIMAL(38,18) | +| REAL<br/> BINARY_FLOAT | FLOAT | +| BINARY_DOUBLE | DOUBLE | +| CHAR<br/>NCHAR<br/>NVARCHAR2<br/>NCLOB<br/>CLOB<br/>ROWID | STRING | +| DATE | DATE | +| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | +| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE | BYTES | +| UNKNOWN | Not supported yet | + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.<br/>This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.<br/>This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be flushed into the database | +| batch_interval_ms | Int | No | 1000 | For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect<br/>exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target table is test_table will also be 16 rows of data in the table. Before run this job, you need create database test and table test_table in your mysql. And if you have not yet installed and deployed SeaTunne [...] + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + jdbc { + url = "jdbc:oceanbase://localhost:2883/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + query = "insert into test_table(name,age) values(?,?)" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + +### Generate Sink SQL + +> This example not need to write complex sql statements, you can configure the database name table name to automatically generate add statements for you + +``` +sink { + jdbc { + url = "jdbc:oceanbase://localhost:2883/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + # Automatically generate sql statements based on database table names + generate_sink_sql = true + database = test + table = test_table + } +} +``` + +### CDC(Change Data Capture) Event + +> CDC change data is also supported by us In this case, you need config database, table and primary_keys. + +``` +sink { + jdbc { + url = "jdbc:oceanbase://localhost:3306/test" + driver = "com.oceanbase.jdbc.Driver" + user = "root" + password = "123456" + compatible_mode = "mysql" + generate_sink_sql = true + # You need to configure both database and table + database = test + table = sink_table + primary_keys = ["id","name"] + } +} +``` + diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 528114754..a324316e5 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -35,6 +35,7 @@ supports query SQL and can achieve projection effect. | user | String | No | - | | password | String | No | - | | query | String | Yes | - | +| compatible_mode | String | No | - | | connection_check_timeout_sec | Int | No | 30 | | partition_column | String | No | - | | partition_upper_bound | Long | No | - | @@ -63,6 +64,10 @@ The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/tes Query statement +### compatible_mode [string] + +The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'. + ### connection_check_timeout_sec [int] The time in seconds to wait for the database operation used to validate the connection to complete. @@ -120,6 +125,7 @@ there are some reference value for params above. | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://<account_name>.snowflakecomputing.com | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | ## Example diff --git a/docs/en/connector-v2/source/OceanBase.md b/docs/en/connector-v2/source/OceanBase.md new file mode 100644 index 000000000..9625ef4fb --- /dev/null +++ b/docs/en/connector-v2/source/OceanBase.md @@ -0,0 +1,168 @@ +# OceanBase + +> JDBC OceanBase Source Connector + +## Support Those Engines + +> Spark<br/> +> Flink<br/> +> SeaTunnel Zeta<br/> + +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Read external data source data through JDBC. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------| +| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2883/test | [Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/> +> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +### Mysql Mode + +| Mysql Data type | SeaTunnel Data type | +|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT(1)<br/>INT UNSIGNED | BOOLEAN | +| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR | INT | +| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT | BIGINT | +| BIGINT UNSIGNED | DECIMAL(20,0) | +| DECIMAL(x,y)(Get the designated column's specified column size.<38) | DECIMAL(x,y) | +| DECIMAL(x,y)(Get the designated column's specified column size.>38) | DECIMAL(38,18) | +| DECIMAL UNSIGNED | DECIMAL((Get the designated column's specified column size)+1,<br/>(Gets the designated column's number of digits to right of the decimal point.))) | +| FLOAT<br/>FLOAT UNSIGNED | FLOAT | +| DOUBLE<br/>DOUBLE UNSIGNED | DOUBLE | +| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON | STRING | +| DATE | DATE | +| TIME | TIME | +| DATETIME<br/>TIMESTAMP | TIMESTAMP | +| TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n) | BYTES | +| GEOMETRY<br/>UNKNOWN | Not supported yet | + +### Oracle Mode + +| Oracle Data type | SeaTunnel Data type | +|-----------------------------------------------------------|---------------------| +| Number(p), p <= 9 | INT | +| Number(p), p <= 18 | BIGINT | +| Number(p), p > 18 | DECIMAL(38,18) | +| REAL<br/> BINARY_FLOAT | FLOAT | +| BINARY_DOUBLE | DOUBLE | +| CHAR<br/>NCHAR<br/>NVARCHAR2<br/>NCLOB<br/>CLOB<br/>ROWID | STRING | +| DATE | DATE | +| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP | +| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE | BYTES | +| UNKNOWN | Not supported yet | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------------------------|--------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.oceanbase.jdbc.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| compatible_mode | String | Yes | - | The compatible mode of OceanBase, can be 'mysql' or 'oracle'. | +| query | String | Yes | - | Query statement | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete | +| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type column and string type column. | +| partition_lower_bound | Long | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. | +| partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | +| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. Default value is job parallelism. | +| fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure <br/> the row fetch size used in the query to improve performance by <br/> reducing the number database hits required to satisfy the selection criteria.<br/> Zero means use jdbc default value. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +``` +env { + execution.parallelism = 2 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` + +### Parallel: + +> Read your query table in parallel with the shard field you configured and the shard data. You can do this if you want to read the whole table + +``` +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + } +} +``` + +### Parallel Boundary: + +> It is more efficient to read your data source according to the upper and lower boundaries you configured + +``` +source { + Jdbc { + driver = "com.oceanbase.jdbc.Driver" + url = "jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = "root" + password = "" + compatible_mode = "mysql" + query = "select * from source" + partition_column = "id" + partition_num = 10 + # Read start boundary + partition_lower_bound = 1 + # Read end boundary + partition_upper_bound = 500 + } +} +``` + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java index afceddc59..6e2147c03 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java @@ -27,6 +27,7 @@ public class JdbcConnectionConfig implements Serializable { public String url; public String driverName; + public String compatibleMode; public int connectionCheckTimeoutSeconds = JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue(); public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue(); @@ -48,6 +49,7 @@ public class JdbcConnectionConfig implements Serializable { public static JdbcConnectionConfig of(ReadonlyConfig config) { JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder(); builder.url(config.get(JdbcOptions.URL)); + builder.compatibleMode(config.get(JdbcOptions.COMPATIBLE_MODE)); builder.driverName(config.get(JdbcOptions.DRIVER)); builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT)); builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES)); @@ -74,6 +76,10 @@ public class JdbcConnectionConfig implements Serializable { return driverName; } + public String getCompatibleMode() { + return compatibleMode; + } + public boolean isAutoCommit() { return autoCommit; } @@ -121,6 +127,7 @@ public class JdbcConnectionConfig implements Serializable { public static final class Builder { private String url; private String driverName; + private String compatibleMode; private int connectionCheckTimeoutSeconds = JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue(); private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue(); @@ -146,6 +153,11 @@ public class JdbcConnectionConfig implements Serializable { return this; } + public Builder compatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; + return this; + } + public Builder connectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) { this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds; return this; @@ -206,6 +218,7 @@ public class JdbcConnectionConfig implements Serializable { jdbcConnectionConfig.batchSize = this.batchSize; jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs; jdbcConnectionConfig.driverName = this.driverName; + jdbcConnectionConfig.compatibleMode = this.compatibleMode; jdbcConnectionConfig.maxRetries = this.maxRetries; jdbcConnectionConfig.password = this.password; jdbcConnectionConfig.connectionCheckTimeoutSeconds = this.connectionCheckTimeoutSeconds; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java index 87b2a7b46..24ae0580f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java @@ -36,6 +36,12 @@ public interface JdbcOptions { .intType() .defaultValue(30) .withDescription("connection check time second"); + Option<String> COMPATIBLE_MODE = + Options.key("compatible_mode") + .stringType() + .noDefaultValue() + .withDescription( + "The compatible mode of database, required when the database supports multiple compatible modes. For example, when using OceanBase database, you need to set it to 'mysql' or 'oracle'."); Option<Integer> MAX_RETRIES = Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired"); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java index 4c6221549..00130b32a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java @@ -33,6 +33,7 @@ public class JdbcSourceConfig implements Serializable { private JdbcConnectionConfig jdbcConnectionConfig; public String query; + public String compatibleMode; private String partitionColumn; private BigDecimal partitionUpperBound; private BigDecimal partitionLowerBound; @@ -44,6 +45,7 @@ public class JdbcSourceConfig implements Serializable { builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config)); builder.query(config.get(JdbcOptions.QUERY)); builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE)); + config.getOptional(JdbcOptions.COMPATIBLE_MODE).ifPresent(builder::compatibleMode); config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn); config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND) .ifPresent(builder::partitionUpperBound); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java index 5e5ae1b55..3d66de659 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java @@ -37,4 +37,14 @@ public interface JdbcDialectFactory { /** @return Creates a new instance of the {@link JdbcDialect}. */ JdbcDialect create(); + + /** + * Create a {@link JdbcDialect} instance based on the driver type and compatible mode. + * + * @param compatibleMode The compatible mode + * @return a new instance of {@link JdbcDialect} + */ + default JdbcDialect create(String compatibleMode) { + return create(); + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java index 076a6734b..b49df35ff 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java @@ -40,11 +40,12 @@ public final class JdbcDialectLoader { * Loads the unique JDBC Dialect that can handle the given database url. * * @param url A database URL. + * @param compatibleMode The compatible mode. * @throws IllegalStateException if the loader cannot find exactly one dialect that can * unambiguously process the given database URL. * @return The loaded dialect. */ - public static JdbcDialect load(String url) { + public static JdbcDialect load(String url, String compatibleMode) { ClassLoader cl = Thread.currentThread().getContextClassLoader(); List<JdbcDialectFactory> foundFactories = discoverFactories(cl); @@ -89,7 +90,7 @@ public final class JdbcDialectLoader { .collect(Collectors.joining("\n")))); } - return matchingFactories.get(0).create(); + return matchingFactories.get(0).create(compatibleMode); } private static List<JdbcDialectFactory> discoverFactories(ClassLoader classLoader) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java new file mode 100644 index 000000000..66df84205 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect; + +import com.google.auto.service.AutoService; + +import javax.annotation.Nonnull; + +@AutoService(JdbcDialectFactory.class) +public class OceanBaseDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:oceanbase:"); + } + + @Override + public JdbcDialect create() { + throw new UnsupportedOperationException( + "Can't create JdbcDialect without compatible mode for OceanBase"); + } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode) { + if ("oracle".equalsIgnoreCase(compatibleMode)) { + return new OracleDialect(); + } + return new MysqlDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 4221172b1..4666eae1e 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -104,7 +104,10 @@ public class JdbcSink public void prepare(Config pluginConfig) throws PrepareFailException { this.config = ReadonlyConfig.fromConfig(pluginConfig); this.jdbcSinkConfig = JdbcSinkConfig.of(config); - this.dialect = JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl()); + this.dialect = + JdbcDialectLoader.load( + jdbcSinkConfig.getJdbcConnectionConfig().getUrl(), + jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode()); this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java index ae2e49b1e..a9bb1c155 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java @@ -41,6 +41,7 @@ import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DATABASE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER; @@ -82,7 +83,10 @@ public class JdbcSinkFactory implements TableSinkFactory { } final ReadonlyConfig options = config; JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config); - JdbcDialect dialect = JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl()); + JdbcDialect dialect = + JdbcDialectLoader.load( + sinkConfig.getJdbcConnectionConfig().getUrl(), + sinkConfig.getJdbcConnectionConfig().getCompatibleMode()); return () -> new JdbcSink( options, @@ -106,7 +110,8 @@ public class JdbcSinkFactory implements TableSinkFactory { GENERATE_SINK_SQL, AUTO_COMMIT, SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST, - PRIMARY_KEYS) + PRIMARY_KEYS, + COMPATIBLE_MODE) .conditional( IS_EXACTLY_ONCE, true, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java index 39deac1ef..732892b21 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java @@ -99,7 +99,9 @@ public class JdbcSource new SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig()); this.query = jdbcSourceConfig.getQuery(); this.jdbcDialect = - JdbcDialectLoader.load(jdbcSourceConfig.getJdbcConnectionConfig().getUrl()); + JdbcDialectLoader.load( + jdbcSourceConfig.getJdbcConnectionConfig().getUrl(), + jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode()); try (Connection connection = jdbcConnectionProvider.getOrEstablishConnection()) { this.typeInfo = initTableField(connection); this.partitionParameter = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java index 8f9605182..43aa1c03d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java @@ -54,6 +54,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER; import static org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.FETCH_SIZE; @@ -83,7 +84,10 @@ public class JdbcSourceFactory implements TableSourceFactory { JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig()); final String querySql = config.getQuery(); - JdbcDialect dialect = JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl()); + JdbcDialect dialect = + JdbcDialectLoader.load( + config.getJdbcConnectionConfig().getUrl(), + config.getJdbcConnectionConfig().getCompatibleMode()); TableSchema tableSchema = catalogTable.getTableSchema(); SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType(); Optional<PartitionParameter> partitionParameter = @@ -228,7 +232,8 @@ public class JdbcSourceFactory implements TableSourceFactory { PARTITION_COLUMN, PARTITION_UPPER_BOUND, PARTITION_LOWER_BOUND, - PARTITION_NUM) + PARTITION_NUM, + COMPATIBLE_MODE) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java new file mode 100644 index 000000000..b8202e697 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Assertions; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT { + + private static final String OCEANBASE_DATABASE = "seatunnel"; + private static final String OCEANBASE_SOURCE = "source"; + private static final String OCEANBASE_SINK = "sink"; + + private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://" + HOST + ":%s"; + private static final String OCEANBASE_DRIVER_CLASS = "com.oceanbase.jdbc.Driver"; + + abstract String imageName(); + + abstract String host(); + + abstract int port(); + + abstract String username(); + + abstract String password(); + + abstract List<String> configFile(); + + abstract String createSqlTemplate(); + + abstract String[] getFieldNames(); + + @Override + JdbcCase getJdbcCase() { + Map<String, String> containerEnv = new HashMap<>(); + String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port()); + Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE, fieldNames); + + return JdbcCase.builder() + .dockerImage(imageName()) + .networkAliases(host()) + .containerEnv(containerEnv) + .driverClass(OCEANBASE_DRIVER_CLASS) + .host(HOST) + .port(port()) + .localPort(port()) + .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE) + .jdbcUrl(jdbcUrl) + .userName(username()) + .password(password()) + .database(OCEANBASE_DATABASE) + .sourceTable(OCEANBASE_SOURCE) + .sinkTable(OCEANBASE_SINK) + .createSql(createSqlTemplate()) + .configFile(configFile()) + .insertSql(insertSql) + .testData(testDataSet) + .build(); + } + + @Override + void compareResult() { + String sourceSql = + String.format( + "select * from %s.%s order by 1", OCEANBASE_DATABASE, OCEANBASE_SOURCE); + String sinkSql = + String.format("select * from %s.%s order by 1", OCEANBASE_DATABASE, OCEANBASE_SINK); + try { + Statement sourceStatement = connection.createStatement(); + Statement sinkStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + Assertions.assertEquals( + sourceResultSet.getMetaData().getColumnCount(), + sinkResultSet.getMetaData().getColumnCount()); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : getFieldNames()) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = + IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = + IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + } + } + } + sourceResultSet.last(); + sinkResultSet.last(); + } catch (Exception e) { + throw new RuntimeException("Compare result error", e); + } + } + + @Override + String driverUrl() { + return "https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar"; + } + + @Override + protected void createSchemaIfNeeded() { + String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE; + try { + connection.prepareStatement(sql).executeUpdate(); + } catch (Exception e) { + throw new SeaTunnelRuntimeException( + JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql " + sql, e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java new file mode 100644 index 000000000..548fecaee --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Disabled; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; + +@Disabled("Disabled due to insufficient hardware resources in the CI environment") +public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase { + + @Override + String imageName() { + return "oceanbase/oceanbase-ce:4.0.0.0"; + } + + @Override + String host() { + return "e2e_oceanbase_mysql"; + } + + @Override + int port() { + return 2881; + } + + @Override + String username() { + return "root"; + } + + @Override + String password() { + return ""; + } + + @Override + List<String> configFile() { + return Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf"); + } + + @Override + String createSqlTemplate() { + return "CREATE TABLE IF NOT EXISTS %s\n" + + "(\n" + + " `c_bit_1` bit(1) DEFAULT NULL,\n" + + " `c_bit_8` bit(8) DEFAULT NULL,\n" + + " `c_bit_16` bit(16) DEFAULT NULL,\n" + + " `c_bit_32` bit(32) DEFAULT NULL,\n" + + " `c_bit_64` bit(64) DEFAULT NULL,\n" + + " `c_boolean` tinyint(1) DEFAULT NULL,\n" + + " `c_tinyint` tinyint(4) DEFAULT NULL,\n" + + " `c_tinyint_unsigned` tinyint(3) unsigned DEFAULT NULL,\n" + + " `c_smallint` smallint(6) DEFAULT NULL,\n" + + " `c_smallint_unsigned` smallint(5) unsigned DEFAULT NULL,\n" + + " `c_mediumint` mediumint(9) DEFAULT NULL,\n" + + " `c_mediumint_unsigned` mediumint(8) unsigned DEFAULT NULL,\n" + + " `c_int` int(11) DEFAULT NULL,\n" + + " `c_integer` int(11) DEFAULT NULL,\n" + + " `c_bigint` bigint(20) DEFAULT NULL,\n" + + " `c_bigint_unsigned` bigint(20) unsigned DEFAULT NULL,\n" + + " `c_decimal` decimal(20, 0) DEFAULT NULL,\n" + + " `c_decimal_unsigned` decimal(38, 18) DEFAULT NULL,\n" + + " `c_float` float DEFAULT NULL,\n" + + " `c_float_unsigned` float unsigned DEFAULT NULL,\n" + + " `c_double` double DEFAULT NULL,\n" + + " `c_double_unsigned` double unsigned DEFAULT NULL,\n" + + " `c_char` char(1) DEFAULT NULL,\n" + + " `c_tinytext` tinytext,\n" + + " `c_mediumtext` mediumtext,\n" + + " `c_text` text,\n" + + " `c_varchar` varchar(255) DEFAULT NULL,\n" + + " `c_json` json DEFAULT NULL,\n" + + " `c_longtext` longtext,\n" + + " `c_date` date DEFAULT NULL,\n" + + " `c_datetime` datetime DEFAULT NULL,\n" + + " `c_timestamp` timestamp NULL DEFAULT NULL,\n" + + " `c_tinyblob` tinyblob,\n" + + " `c_mediumblob` mediumblob,\n" + + " `c_blob` blob,\n" + + " `c_longblob` longblob,\n" + + " `c_varbinary` varbinary(255) DEFAULT NULL,\n" + + " `c_binary` binary(1) DEFAULT NULL,\n" + + " `c_year` year(4) DEFAULT NULL,\n" + + " `c_int_unsigned` int(10) unsigned DEFAULT NULL,\n" + + " `c_integer_unsigned` int(10) unsigned DEFAULT NULL,\n" + + " `c_bigint_30` BIGINT(40) unsigned DEFAULT NULL,\n" + + " `c_decimal_unsigned_30` DECIMAL(30) unsigned DEFAULT NULL,\n" + + " `c_decimal_30` DECIMAL(30) DEFAULT NULL\n" + + ");"; + } + + @Override + String[] getFieldNames() { + return new String[] { + "c_bit_1", + "c_bit_8", + "c_bit_16", + "c_bit_32", + "c_bit_64", + "c_boolean", + "c_tinyint", + "c_tinyint_unsigned", + "c_smallint", + "c_smallint_unsigned", + "c_mediumint", + "c_mediumint_unsigned", + "c_int", + "c_integer", + "c_year", + "c_int_unsigned", + "c_integer_unsigned", + "c_bigint", + "c_bigint_unsigned", + "c_decimal", + "c_decimal_unsigned", + "c_float", + "c_float_unsigned", + "c_double", + "c_double_unsigned", + "c_char", + "c_tinytext", + "c_mediumtext", + "c_text", + "c_varchar", + "c_json", + "c_longtext", + "c_date", + "c_datetime", + "c_timestamp", + "c_tinyblob", + "c_mediumblob", + "c_blob", + "c_longblob", + "c_varbinary", + "c_binary", + "c_bigint_30", + "c_decimal_unsigned_30", + "c_decimal_30", + }; + } + + @Override + Pair<String[], List<SeaTunnelRow>> initTestData() { + String[] fieldNames = getFieldNames(); + + List<SeaTunnelRow> rows = new ArrayList<>(); + BigDecimal bigintValue = new BigDecimal("2844674407371055000"); + BigDecimal decimalValue = new BigDecimal("999999999999999999999999999899"); + for (int i = 0; i < 100; i++) { + byte byteArr = Integer.valueOf(i).byteValue(); + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + i % 2 == 0 ? (byte) 1 : (byte) 0, + new byte[] {byteArr}, + new byte[] {byteArr, byteArr}, + new byte[] {byteArr, byteArr, byteArr, byteArr}, + new byte[] { + byteArr, byteArr, byteArr, byteArr, byteArr, byteArr, byteArr, + byteArr + }, + i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE, + i, + i, + i, + i, + i, + i, + i, + i, + i, + Long.parseLong("1"), + Long.parseLong("1"), + Long.parseLong("1"), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 18), + BigDecimal.valueOf(i, 18), + Float.parseFloat("1.1"), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + Double.parseDouble("1.1"), + "f", + String.format("f1_%s", i), + String.format("f1_%s", i), + String.format("f1_%s", i), + String.format("f1_%s", i), + String.format("{\"aa\":\"bb_%s\"}", i), + String.format("f1_%s", i), + Date.valueOf(LocalDate.now()), + Timestamp.valueOf(LocalDateTime.now()), + new Timestamp(System.currentTimeMillis()), + "test".getBytes(), + "test".getBytes(), + "test".getBytes(), + "test".getBytes(), + "test".getBytes(), + "f".getBytes(), + bigintValue.add(BigDecimal.valueOf(i)), + decimalValue.add(BigDecimal.valueOf(i)), + decimalValue.add(BigDecimal.valueOf(i)), + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + GenericContainer<?> initContainer() { + GenericContainer<?> container = + new GenericContainer<>(imageName()) + .withNetwork(NETWORK) + .withNetworkAliases(host()) + .waitingFor(Wait.forLogMessage(".*boot success!.*", 1)) + .withStartupTimeout(Duration.ofMinutes(5)) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName()))); + + container.setPortBindings(Lists.newArrayList(String.format("%s:%s", port(), port()))); + + return container; + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java new file mode 100644 index 000000000..4c3cca5dd --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Disabled; +import org.testcontainers.containers.GenericContainer; + +import com.google.common.collect.Lists; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker environment") +public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase { + + @Override + String imageName() { + return null; + } + + @Override + String host() { + return "e2e_oceanbase_oracle"; + } + + @Override + int port() { + return 2883; + } + + @Override + String username() { + return "root"; + } + + @Override + String password() { + return ""; + } + + @Override + List<String> configFile() { + return Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf"); + } + + @Override + GenericContainer<?> initContainer() { + throw new UnsupportedOperationException(); + } + + @Override + public void startUp() { + jdbcCase = getJdbcCase(); + + given().ignoreExceptions() + .await() + .atMost(360, TimeUnit.SECONDS) + .untilAsserted(() -> this.initializeJdbcConnection(jdbcCase.getJdbcUrl())); + + createSchemaIfNeeded(); + createNeededTables(); + insertTestData(); + } + + @Override + public String quoteIdentifier(String field) { + return "\"" + field + "\""; + } + + @Override + String createSqlTemplate() { + return "create table %s\n" + + "(\n" + + " VARCHAR_10_COL varchar2(10),\n" + + " CHAR_10_COL char(10),\n" + + " CLOB_COL clob,\n" + + " NUMBER_3_SF_2_DP number(3, 2),\n" + + " INTEGER_COL integer,\n" + + " FLOAT_COL float(10),\n" + + " REAL_COL real,\n" + + " BINARY_FLOAT_COL binary_float,\n" + + " BINARY_DOUBLE_COL binary_double,\n" + + " DATE_COL date,\n" + + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n" + + " TIMESTAMP_WITH_LOCAL_TZ timestamp with local time zone\n" + + ")"; + } + + @Override + String[] getFieldNames() { + return new String[] { + "VARCHAR_10_COL", + "CHAR_10_COL", + "CLOB_COL", + "NUMBER_3_SF_2_DP", + "INTEGER_COL", + "FLOAT_COL", + "REAL_COL", + "BINARY_FLOAT_COL", + "BINARY_DOUBLE_COL", + "DATE_COL", + "TIMESTAMP_WITH_3_FRAC_SEC_COL", + "TIMESTAMP_WITH_LOCAL_TZ" + }; + } + + @Override + Pair<String[], List<SeaTunnelRow>> initTestData() { + String[] fieldNames = getFieldNames(); + + List<SeaTunnelRow> rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + String.format("f%s", i), + String.format("f%s", i), + String.format("f%s", i), + BigDecimal.valueOf(1.1), + i, + Float.parseFloat("2.2"), + Float.parseFloat("2.2"), + Float.parseFloat("22.2"), + Double.parseDouble("2.2"), + Date.valueOf(LocalDate.now()), + Timestamp.valueOf(LocalDateTime.now()), + Timestamp.valueOf(LocalDateTime.now()) + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf new file mode 100644 index 000000000..098d3ffae --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.oceanbase.jdbc.Driver + url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" + user = root + password = "" + query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_binary, c_yea [...] + compatible_mode = "mysql" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +sink { + Jdbc { + driver = com.oceanbase.jdbc.Driver + url = "jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC" + user = root + password = "" + query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob, c_varbinary, c_bin [...] + compatible_mode = "mysql" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf new file mode 100644 index 000000000..bf2b1ccf0 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf @@ -0,0 +1,53 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + jdbc{ + # This is a example source plugin **only for test and demonstrate the feature source plugin** + url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel" + driver = com.oceanbase.jdbc.Driver + user = "root" + password = "" + query = "SELECT VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ FROM source" + compatible_mode = "oracle" + } +} + +transform { +} + +sink { + jdbc{ + url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel" + driver = com.oceanbase.jdbc.Driver + user = "root" + password = "" + query = "INSERT INTO sink (VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ) VALUES(?,?,?,?,?,?,?,?,?,?,?,?)" + compatible_mode = "oracle" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java index b1ea69efa..092f37f9b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java @@ -56,6 +56,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -113,6 +114,7 @@ public class PulsarBatchIT extends TestSuiteBase implements TestResource { new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME)) .withNetwork(NETWORK) .withNetworkAliases(PULSAR_HOST) + .withStartupTimeout(Duration.ofMinutes(3)) .withLogConsumer( new Slf4jLogConsumer( DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));