Flink CDC Issue Import created FLINK-34859: ----------------------------------------------
Summary: [Bug] Oracle cdc in table api does no support server-time-zone option Key: FLINK-34859 URL: https://issues.apache.org/jira/browse/FLINK-34859 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Flink CDC Issue Import ### Search before asking - [X] I searched in the [issues|https://github.com/ververica/flink-cdc-connectors/issues) and found nothing similar. ### Flink version 1.17.1 ### Flink CDC version 3.0.0 ### Database and its version Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production With the Partitioning, OLAP, Data Mining and Real Application Testing options ### Minimal reproduce step ## Create a cdc source in table api with `server-time-zone` option specified. ```java StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); Schema schema = Schema.newBuilder() .column("NAME", DataTypes.STRING()) .column("ADDR", DataTypes.STRING()) .build(); String factoryIdentifier = new OracleTableSourceFactory().factoryIdentifier(); TableDescriptor tableDescriptor = TableDescriptor.forConnector(factoryIdentifier) .schema(schema) // .format(DebeziumJsonFormatFactory.IDENTIFIER) .option(OracleSourceOptions.HOSTNAME, "my-oracle-host") .option(OracleSourceOptions.PORT, 1521) .option(OracleSourceOptions.USERNAME, "my-oracle-username") .option(OracleSourceOptions.PASSWORD, "my-oracle-password") .option(OracleSourceOptions.DATABASE_NAME, "my-oracle-database") .option(OracleSourceOptions.SCHEMA_NAME, "my-oracle-schema") .option(OracleSourceOptions.TABLE_NAME, "TEST") .option(OracleSourceOptions.SCAN_STARTUP_MODE, "initial") .option(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, false) .option(OracleSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, 10) .option(OracleSourceOptions.SERVER_TIME_ZONE, "Asia/Shanghai") .option("debezium.include.schema.changes", "false") .option("debezium.database.history.store.only.captured.tables.ddl", "true") .build(); StreamTableEnvironmentImpl tEnv = (StreamTableEnvironmentImpl) StreamTableEnvironmentImpl.create(env, EnvironmentSettings.newInstance().inStreamingMode().build()); Table table = tEnv.from(tableDescriptor); tEnv.toChangelogStream(table).print(); env.execute(); ``` ## Exceptions are: ```text Exception in thread "main" org.apache.flink.table.api.ValidationException: Unable to create a source for reading table '*anonymous_oracle-cdc$1*'. Table options are: 'connector'='oracle-cdc' 'database-name'='my-oracle-database' 'debezium.database.history.store.only.captured.tables.ddl'='true' 'debezium.include.schema.changes'='false' 'hostname'='my-oracle-host' 'password'='******' 'port'='1521' 'scan.incremental.snapshot.enabled'='false' 'scan.snapshot.fetch.size'='10' 'scan.startup.mode'='initial' 'schema-name'='my-oracle-schema' 'server-time-zone'='Asia/Shanghai' 'table-name'='TEST' 'username'='my-oracle-username' at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:357) at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158) at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155) at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135) at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) at org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86) at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261) at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:289) at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224) at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219) at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263) at org.codebase.flink.cdc.FlinkOracleCdcTest.main(FlinkOracleCdcTest.java:70) Caused by: org.apache.flink.table.api.ValidationException: Unsupported options found for 'oracle-cdc'. Unsupported options: server-time-zone Supported options: chunk-key.even-distribution.factor.lower-bound chunk-key.even-distribution.factor.upper-bound chunk-meta.group.size connect.max-retries connect.timeout connection.pool.size connector database-name debezium.database.history.store.only.captured.tables.ddl debezium.include.schema.changes hostname password port property-version scan.incremental.close-idle-reader.enabled scan.incremental.snapshot.backfill.skip scan.incremental.snapshot.chunk.key-column scan.incremental.snapshot.chunk.size scan.incremental.snapshot.enabled scan.snapshot.fetch.size scan.startup.mode schema-name split-key.even-distribution.factor.lower-bound split-key.even-distribution.factor.upper-bound table-name url username at org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:632) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:931) at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:955) at com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory.createDynamicTableSource(OracleTableSourceFactory.java:70) at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164] ... 28 more ``` ### What did you expect to see? The program should run and print data ### What did you see instead? Exception ### Anything else? ![image|https://github.com/ververica/flink-cdc-connectors/assets/23203149/b3bd94c8-6388-4c4d-b614-0084dd262a5c] ![image|https://github.com/ververica/flink-cdc-connectors/assets/23203149/df2dc0b9-c45e-450a-ab41-c6998e7d49ef] As we can see in the screenshots, `OracleTableSourceFactory` does not support `server-time-zone` option as well as `MySqlTableSourceFactory` support this option. By the way, we can specify this option in the `OracleSourceBuilder` class. ### Are you willing to submit a PR? - [X] I'm willing to submit a PR! ---------------- Imported from GitHub ---------------- Url: https://github.com/apache/flink-cdc/issues/2977 Created by: [LiuBodong|https://github.com/LiuBodong] Labels: bug, Created at: Tue Jan 09 09:24:10 CST 2024 State: open -- This message was sent by Atlassian Jira (v8.20.10#820010)