Iuckyriver opened a new issue, #8971:
URL: https://github.com/apache/seatunnel/issues/8971

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
    使用mysql cdc 同步一张表A数据到到三张分表,通过三个transform和三个sink,增量阶段一致在读数据,一条数据也没有写入
   
   ### SeaTunnel Version
   
   2.3.8
   
   ### SeaTunnel Config
   
   ```conf
   env {
     job.mode = "STREAMING"
     checkpoint.interval = 600000
     checkpoint.timeout = 3600000
     job.name = "log_split_initial_2_three"
   }
   
   source {
     MySQL-CDC {
       base-url = 
"jdbc:mysql://xxx:3306/xxx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false"
       username = "xxx"
       password = "xxx"
       table-names = ["xxx.sales_trade_log"]
       result_table_name="sales_trade_log_xxx"
       startup.mode = "INITIAL",
       parallelism = 2
       server-time-zone = "Asia/Shanghai"
       sample-sharding.threshold = 2048
       connection.pool.size = 3
       snapshot.split.size = 1024
       snapshot.fetch.size = 1024
     }
   }
   
   transform {
     sql {
       source_table_name = "sales_trade_log_xxx"
       result_table_name = "sales_trade_log_202502_2"
       query = "SELECT * FROM sales_trade_log WHERE created 
>PARSEDATETIME('2025-02-01 00:00:00','yyyy-MM-dd HH:mm:ss') and type = 2"
     }
   
     sql {
       source_table_name = "sales_trade_log_xxx"
       result_table_name = "sales_trade_log_202502_45"
       query = "SELECT * FROM sales_trade_log WHERE created 
>PARSEDATETIME('2025-02-01 00:00:00','yyyy-MM-dd HH:mm:ss') and type = 45"
     }
   
     sql {
       source_table_name = "sales_trade_log_xxx"
       result_table_name = "sales_trade_log_202502_105"
       query = "SELECT * FROM sales_trade_log WHERE created 
>PARSEDATETIME('2025-02-01 00:00:00','yyyy-MM-dd HH:mm:ss') and type = 105"
     }
     
   }
   
   sink {
       jdbc {
           url = 
"jdbc:mysql://xxx:3306/xx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "xxx"
           password = "xxx"
           database="xx"
           generate_sink_sql = true
           source_table_name = "sales_trade_log_202502_2"
           table = "sales_trade_log_202502_2"
           primary_keys = ["rec_id"]
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
           data_save_mode="APPEND_DATA"
           batch_size = 1024
       }
   
       jdbc {
           url = 
"jdbc:mysql://xxx:3306/xx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "xxx"
           password = "xxx"
           database="xx"
           
           generate_sink_sql = true
           source_table_name = "sales_trade_log_202502_45"
           table = "sales_trade_log_202502_45"
           primary_keys = ["rec_id"]
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
           data_save_mode="APPEND_DATA"
           batch_size = 1024
       }
       
       jdbc {
           url = 
"jdbc:mysql://xxx:3306/xx?useSSL=false&useUnicode=true&characterEncoding=UTF-8&allowPublicKeyRetrieval=true&yearIsDateType=false&zeroDateTimeBehavior=CONVERT_TO_NULL&tinyInt1isBit=false"
           driver = "com.mysql.cj.jdbc.Driver"
           user = "xxx"
           password = "xxx"
           database="xx"
           
           generate_sink_sql = true
           source_table_name = "sales_trade_log_202502_105"
           table = "sales_trade_log_202502_105"
           primary_keys = ["rec_id"]
           schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
           data_save_mode="APPEND_DATA"
           batch_size = 1024
       }
   }
   ```
   
   ### Running Command
   
   ```shell
   seatunnel.sh
   ```
   
   ### Error Exception
   
   ```log
   只有读取qps, 无写入qps
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to