Iuckyriver opened a new issue, #8971: URL: https://github.com/apache/seatunnel/issues/8971
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened 使用mysql cdc 同步一张表A数据到到三张分表,通过三个transform和三个sink,增量阶段一致在读数据,一条数据也没有写入 ### SeaTunnel Version 2.3.8 ### SeaTunnel Config ```conf env { job.mode = "STREAMING" checkpoint.interval = 600000 checkpoint.timeout = 3600000 job.name = "log_split_initial_2_three" } source { MySQL-CDC { base-url = "jdbc:mysql://xxx:3306/xxx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false" username = "xxx" password = "xxx" table-names = ["xxx.sales_trade_log"] result_table_name="sales_trade_log_xxx" startup.mode = "INITIAL", parallelism = 2 server-time-zone = "Asia/Shanghai" sample-sharding.threshold = 2048 connection.pool.size = 3 snapshot.split.size = 1024 snapshot.fetch.size = 1024 } } transform { sql { source_table_name = "sales_trade_log_xxx" result_table_name = "sales_trade_log_202502_2" query = "SELECT * FROM sales_trade_log WHERE created >PARSEDATETIME('2025-02-01 00:00:00','yyyy-MM-dd HH:mm:ss') and type = 2" } sql { source_table_name = "sales_trade_log_xxx" result_table_name = "sales_trade_log_202502_45" query = "SELECT * FROM sales_trade_log WHERE created >PARSEDATETIME('2025-02-01 00:00:00','yyyy-MM-dd HH:mm:ss') and type = 45" } sql { source_table_name = "sales_trade_log_xxx" result_table_name = "sales_trade_log_202502_105" query = "SELECT * FROM sales_trade_log WHERE created >PARSEDATETIME('2025-02-01 00:00:00','yyyy-MM-dd HH:mm:ss') and type = 105" } } sink { jdbc { url = "jdbc:mysql://xxx:3306/xx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false" driver = "com.mysql.cj.jdbc.Driver" user = "xxx" password = "xxx" database="xx" generate_sink_sql = true source_table_name = "sales_trade_log_202502_2" table = "sales_trade_log_202502_2" primary_keys = ["rec_id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" batch_size = 1024 } jdbc { url = "jdbc:mysql://xxx:3306/xx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false" driver = "com.mysql.cj.jdbc.Driver" user = "xxx" password = "xxx" database="xx" generate_sink_sql = true source_table_name = "sales_trade_log_202502_45" table = "sales_trade_log_202502_45" primary_keys = ["rec_id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" batch_size = 1024 } jdbc { url = "jdbc:mysql://xxx:3306/xx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false" driver = "com.mysql.cj.jdbc.Driver" user = "xxx" password = "xxx" database="xx" generate_sink_sql = true source_table_name = "sales_trade_log_202502_105" table = "sales_trade_log_202502_105" primary_keys = ["rec_id"] schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode="APPEND_DATA" batch_size = 1024 } } ``` ### Running Command ```shell seatunnel.sh ``` ### Error Exception ```log 只有读取qps, 无写入qps ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org