827086469 commented on issue #8463: URL: https://github.com/apache/seatunnel/issues/8463#issuecomment-2785466137
> # 1. CDC 同步配置文件 > ## 1.1. CDC 单表(Oracle -> MySQL) > env { > # stream_ora2mysql.conf > parallelism = 2 > job.mode = "STREAMING" > checkpoint.interval = 5000 > } > > source { > # CDC 同步时,建议为每个表独立定义一个 Oracle-CDC 连接,以保证稳定性 > Oracle-CDC { > driver = "oracle.jdbc.driver.OracleDriver" > base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 的连接串 > username = "c##logminer" # 公共用户 Common User > password = "logminer" > database-names = ["PDB1"] # 监控的 PDB(大写) > schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写) > table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔 > "PDB1.PROVINCE_LNJY.MY_OBJECTS" > ] > table-names-config = [ # 可以为特定表,显式指定主键列 > { > table = "PDB1.PROVINCE_LNJY.MY_OBJECTS" > primaryKeys = ["OBJECT_ID"] > } > ] > result_table_name = "ORA_MY_OBJECTS" > use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true > skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。 > source.reader.close.timeout = 120000 > connection.pool.size = 1 > schema-changes.enabled = true # 启用 schema evolution 功能 > exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复 > debezium { > database.pdb.name = "PDB1" # 需指定 PDB 名称 > } > } > } > > transform { > Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间 > source_table_name = "ORA_MY_OBJECTS" > result_table_name = "MYSQL_MY_OBJECTS" > query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_MY_OBJECTS;" > } > } > > sink { > jdbc { > url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" > driver = "com.mysql.cj.jdbc.Driver" > user = "root" > password = "sysz2024" > database = "lnjy_frontdb" > table = "${table_name}" # 目标端表名与源端一致 > generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句 > primary_keys = ["OBJECT_ID"] # generate_sink_sql 为 true 时,需指定主键 > > source_table_name = "MYSQL_MY_OBJECTS" > schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式 > data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据; > field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写 > > is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name > xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" > } > } > ## 1.2. CDC 多表(Oracle -> MySQL) > env { > # stream_ora2mysql_multi.conf > parallelism = 4 > job.mode = "STREAMING" > checkpoint.interval = 5000 > } > > source { > # CDC 数据同步,建议为每个表独立设置一个 Oracle-CDC,以确保稳定性 > Oracle-CDC { > driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver > base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 连接串 > username = "c##logminer" # 源库需要创建,公共用户 Common User > password = "logminer" > database-names = ["PDB1"] # 监控的 PDB(大写) > schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写) > table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔 > "PDB1.PROVINCE_LNJY.T_MY_TABLES" > ] > table-names-config = [ > { > table = "PDB1.PROVINCE_LNJY.T_MY_TABLES" > primaryKeys = ["ID"] # 为表显式指定主键字段 > } > ] > use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true > skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。 > result_table_name = "ORA_MY_TABLES" # 将本阶段处理的数据,注册为数据集(或称临时表) > source.reader.close.timeout = 120000 > connection.pool.size = 1 > schema-changes.enabled = true # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本 > exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复 > debezium { > database.pdb.name = "PDB1" # 需指定 PDB 名称 > } > } > > Oracle-CDC { > driver = "oracle.jdbc.driver.OracleDriver" # 固定为 oracle.jdbc.driver.OracleDriver > base-url = "jdbc:oracle:thin:@//10.0.6.53:1521/pdb1" # 需要为 PDB 连接串 > username = "c##logminer" # 源库需要创建,公共用户 Common User > password = "logminer" > database-names = ["PDB1"] # 监控的 PDB(大写) > schema-names = ["PROVINCE_LNJY"] # 监控的 schema 列表(大写) > table-names = [ # 监控的 table 列表,格式:<PDB>.<SCHEMA>.<TNAME>,多个表用逗号分隔 > "PDB1.PROVINCE_LNJY.T_ALL_TABLES" > ] > table-names-config = [ > { > table = "PDB1.PROVINCE_LNJY.T_ALL_TABLES" > primaryKeys = ["ID"] # 为表显式指定主键字段 > } > ] > use_select_count = false # 全量阶段,是否用 select count(*) 获取数据量;当 count(*) 比 analyze table 速度快时,可设置为 true > skip_analyze = false # 全量阶段,是否跳过 analyze table。如果表数据变更不频繁,可以设为true。 > result_table_name = "ORA_ALL_TABLES" # 将本阶段处理的数据,注册为数据集(或称临时表) > source.reader.close.timeout = 120000 > connection.pool.size = 1 > schema-changes.enabled = true # 默认为 true。是否启用 schema evolution 功能,即自动推断 DDL 脚本 > exactly_once = true # 默认为 false。是否启用数据的精确一次性处理。启用 extractly one 语义,可确保数据不会重复 > debezium { > database.pdb.name = "PDB1" # 需指定 PDB 名称 > } > } > } > > transform { > Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间 > source_table_name = "ORA_MY_TABLES" > result_table_name = "TRANS_MY_TABLES" > query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_MY_TABLES;" > } > Sql { # 为目标表增加时间戳字段,以记录数据行的变更时间 > source_table_name = "ORA_ALL_TABLES" > result_table_name = "TRANS_ALL_TABLES" > query = "SELECT *, NOW() AS CREATE_TIME, NOW() AS UPDATE_TIME FROM ORA_ALL_TABLES;" > } > } > > sink { > jdbc { > url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" > driver = "com.mysql.cj.jdbc.Driver" > user = "root" > password = "sysz2024" > database = "lnjy_frontdb" > table = "${table_name}" # 目标端表名与源端一致 > generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句 > primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键 > > source_table_name = "TRANS_MY_TABLES" > schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式 > data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据; > field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写 > > is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name > xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" > } > > jdbc { > url = "jdbc:mysql://10.0.6.53:3306/lnjy_frontdb?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" > driver = "com.mysql.cj.jdbc.Driver" > user = "root" > password = "sysz2024" > database = "lnjy_frontdb" > table = "${table_name}" # 目标端表名与源端一致 > generate_sink_sql = true # 默认 false,是否自动生成目标端库表的 sql 语句 > primary_keys = ["ID"] # generate_sink_sql 为 true 时,需指定主键 > > source_table_name = "TRANS_ALL_TABLES" > schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" # 任务启动之前,目的表的处理方式 > data_save_mode = "APPEND_DATA" # 任务启动之前,若目标端已存在数据,则保留数据库结构与数据; > field_ide = "ORIGINAL" # 字段是否需要大小写转换:ORIGINAL 不转换,UPPERCASE 转大写,LOWERCASE 转小写 > > is_exactly_once = true # 默认 false。是否启用数据的精确一次性处理,设置为true时,需要设置 xa_data_source_class_name > xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource" > } > } > # 2. 踩到的坑 > ## 2.1. ORA-00942: table or view does not exist > 1. **问题现象** > > 批量同步阶段结束,在进入增量同步时,报错 `ORA-00942: table or view does not exist`,日志中还包含 `UPDATE LOG_MINING_FLUSH` 表的 SQL 语句。经过排查,发现 `c##logminer` 在 PDB 中包含 `LOG_MINING_FLUSH` 表;但在 CDB 中,该表不存在。 > > 2. **解决方案** > > 在 CDB 的 `c##logminer` 用户下,手动建表 `LOG_MINING_FLUSH`: > > SQL> CREATE TABLE LOG_MINING_FLUSH(LAST_SCN NUMBER(19,0)); > SQL> INSERT INTO LOG_MINING_FLUSH VALUES (0); > SQL> commit; > 3. **疑问** > > 猜测 `LOG_MINING_FLUSH` 表是用于记录 Redo 相关偏移量的。但为何在 PDB 中自动建该表,却未在 CDB 中自动建表?与 CDC 作业配置的 `base-url` 是否有关系? > > ## 2.2. ORA-65040: operation not allowed from within a pluggable database > 1. **问题描述** > > `base-url` 设置为 pdb 名称时,在批量同步阶段可以正常读取数据。但是,进入增量同步阶段时,报错 `ORA-65040: operation not allowed from within a pluggable database`。 > > 2. **问题解决** > > 为 `source` 增加如下配置: > > debezium { > > database.name = "CDB$ROOT" # 需指定 CDB 名称 > database.pdb.name = "PDB1" # 需指定 PDB 名称 > } > 增量同步可正常工作。 > > 3. **疑问** > > 同步进程根据 `debezium` 的配置,执行 `CDB` 与 `PDB` 的切换操作。然后,执行 `SYS.DBMS_LOGMNR` 中的存储过程? 你这里用PDB是正常的吗,我用PDB就会报Unable to create a source for identifier 'Oracle-CDC', Can not find catalog table with factoryId,用CDB任务是能正常启动的,但是不会同步数据 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org