Flink CDC Issue Import created FLINK-34859:
----------------------------------------------

             Summary: [Bug] Oracle cdc in table api does no support 
server-time-zone option
                 Key: FLINK-34859
                 URL: https://issues.apache.org/jira/browse/FLINK-34859
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
            Reporter: Flink CDC Issue Import


### Search before asking

- [X] I searched in the 
[issues|https://github.com/ververica/flink-cdc-connectors/issues) and found 
nothing similar.


### Flink version

1.17.1

### Flink CDC version

3.0.0

### Database and its version

Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production  
With the Partitioning, OLAP, Data Mining and Real Application Testing options

### Minimal reproduce step

## Create a cdc source in table api with `server-time-zone` option specified.

```java
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        Schema schema = Schema.newBuilder()
                .column("NAME", DataTypes.STRING())
                .column("ADDR", DataTypes.STRING())
                .build();

        String factoryIdentifier = new 
OracleTableSourceFactory().factoryIdentifier();
        TableDescriptor tableDescriptor = 
TableDescriptor.forConnector(factoryIdentifier)
                .schema(schema)
                // .format(DebeziumJsonFormatFactory.IDENTIFIER)
                .option(OracleSourceOptions.HOSTNAME, "my-oracle-host")
                .option(OracleSourceOptions.PORT, 1521)
                .option(OracleSourceOptions.USERNAME, "my-oracle-username")
                .option(OracleSourceOptions.PASSWORD, "my-oracle-password")
                .option(OracleSourceOptions.DATABASE_NAME, "my-oracle-database")
                .option(OracleSourceOptions.SCHEMA_NAME, "my-oracle-schema")
                .option(OracleSourceOptions.TABLE_NAME, "TEST")
                .option(OracleSourceOptions.SCAN_STARTUP_MODE, "initial")
                .option(OracleSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED, 
false)
                .option(OracleSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE, 10)
                .option(OracleSourceOptions.SERVER_TIME_ZONE, "Asia/Shanghai")
                .option("debezium.include.schema.changes", "false")
                
.option("debezium.database.history.store.only.captured.tables.ddl", "true")
                .build();

        StreamTableEnvironmentImpl tEnv = (StreamTableEnvironmentImpl) 
StreamTableEnvironmentImpl.create(env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
        Table table = tEnv.from(tableDescriptor);
        tEnv.toChangelogStream(table).print();
        env.execute();
```
## Exceptions are:
```text
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a source for reading table '*anonymous_oracle-cdc$1*'.

Table options are:

'connector'='oracle-cdc'
'database-name'='my-oracle-database'
'debezium.database.history.store.only.captured.tables.ddl'='true'
'debezium.include.schema.changes'='false'
'hostname'='my-oracle-host'
'password'='******'
'port'='1521'
'scan.incremental.snapshot.enabled'='false'
'scan.snapshot.fetch.size'='10'
'scan.startup.mode'='initial'
'schema-name'='my-oracle-schema'
'server-time-zone'='Asia/Shanghai'
'table-name'='TEST'
'username'='my-oracle-username'
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:167)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:192)
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:175)
        at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:115)
        at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:357)
        at 
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:158)
        at 
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
        at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:155)
        at 
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:135)
        at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
        at 
org.apache.flink.table.operations.SourceQueryOperation.accept(SourceQueryOperation.java:86)
        at 
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.java:261)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:289)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
        at 
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
        at 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toChangelogStream(StreamTableEnvironmentImpl.java:263)
        at 
org.codebase.flink.cdc.FlinkOracleCdcTest.main(FlinkOracleCdcTest.java:70)
Caused by: org.apache.flink.table.api.ValidationException: Unsupported options 
found for 'oracle-cdc'.

Unsupported options:

server-time-zone

Supported options:

chunk-key.even-distribution.factor.lower-bound
chunk-key.even-distribution.factor.upper-bound
chunk-meta.group.size
connect.max-retries
connect.timeout
connection.pool.size
connector
database-name
debezium.database.history.store.only.captured.tables.ddl
debezium.include.schema.changes
hostname
password
port
property-version
scan.incremental.close-idle-reader.enabled
scan.incremental.snapshot.backfill.skip
scan.incremental.snapshot.chunk.key-column
scan.incremental.snapshot.chunk.size
scan.incremental.snapshot.enabled
scan.snapshot.fetch.size
scan.startup.mode
schema-name
split-key.even-distribution.factor.lower-bound
split-key.even-distribution.factor.upper-bound
table-name
url
username
        at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:632)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:931)
        at 
org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validateExcept(FactoryUtil.java:955)
        at 
com.ververica.cdc.connectors.oracle.table.OracleTableSourceFactory.createDynamicTableSource(OracleTableSourceFactory.java:70)
        at 
org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(FactoryUtil.java:164]
        ... 28 more
```

### What did you expect to see?

The program should run and print data

### What did you see instead?

Exception

### Anything else?

![image|https://github.com/ververica/flink-cdc-connectors/assets/23203149/b3bd94c8-6388-4c4d-b614-0084dd262a5c]
![image|https://github.com/ververica/flink-cdc-connectors/assets/23203149/df2dc0b9-c45e-450a-ab41-c6998e7d49ef]

As we can see in the screenshots, `OracleTableSourceFactory` does not support 
`server-time-zone` option as well as `MySqlTableSourceFactory` support this 
option. By the way, we can specify this option in the `OracleSourceBuilder` 
class.


### Are you willing to submit a PR?

- [X] I'm willing to submit a PR!

---------------- Imported from GitHub ----------------
Url: https://github.com/apache/flink-cdc/issues/2977
Created by: [LiuBodong|https://github.com/LiuBodong]
Labels: bug, 
Created at: Tue Jan 09 09:24:10 CST 2024
State: open




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to