chenyz1984 commented on issue #8702: URL: https://github.com/apache/seatunnel/issues/8702#issuecomment-2658038725
作业配置: ```yml env { parallelism = 8 job.mode = "BATCH" # checkpoint.interval = 300000 } source{ Jdbc { url = "jdbc:oracle:thin:@//10.65.178.2:51511/pdb2" driver = "oracle.jdbc.OracleDriver" user = "shearddata" password = "SYldjy_2025" connection_check_timeout_sec = 100 table_path = "LNJY.AC12", query = """ SELECT * FROM LNJY.AC12 WHERE ( ZAE036 >= TRUNC(TO_DATE('"""${BY_DAY_BEGIN}"""', 'yyyyMMdd'),'DD') AND ZAE036 < TRUNC(TO_DATE('"""${BY_DAY_END}"""', 'yyyyMMdd'),'DD') ) OR ZAE036 IS NULL """ partition_column = "AAC120" split.size = 20000 plugin_output = "AC12_SRC" } Jdbc { url = "jdbc:oracle:thin:@//10.65.178.2:51511/pdb2" driver = "oracle.jdbc.OracleDriver" user = "shearddata" password = "SYldjy_2025" connection_check_timeout_sec = 100 table_path = "LNJY.AC12", query = """ SELECT COUNT(1) AS SRC_TOTAL_COUNT, SUM( CASE WHEN ( ZAE036 >= TRUNC(TO_DATE('"""${BY_DAY_BEGIN}"""', 'yyyyMMdd'),'DD') AND ZAE036 < TRUNC(TO_DATE('"""${BY_DAY_END}"""', 'yyyyMMdd'),'DD') ) OR ZAE036 IS NULL THEN 1 ELSE 0 END ) AS READ_COUNT FROM LNJY.AC12 """ split.size = 20000 plugin_output = "AC12_CNT" } } transform { Sql { plugin_input = "AC12_SRC" plugin_output = "AC12" query = "SELECT *, NOW() AS PTIME FROM AC12_SRC" } Sql { plugin_input = "AC12_CNT" plugin_output = "AC12_RPT" query = """ SELECT '人社_就业' AS TABLE_CLASS, 'AC12' AS TABLE_NAME, CURRENT_DATE() AS SYNC_DATE, SRC_TOTAL_COUNT, READ_COUNT, NOW() AS BEGIN_TIME, CAST(NULL AS INT) AS DEST_TOTAL_COUNT, CAST(NULL AS INT) AS WRITE_COUNT, CAST(NULL AS TIMESTAMP) AS END_TIME FROM AC12_CNT """ } } sink { jdbc { url = "jdbc:mysql://10.65.178.2:3306/rensheju?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "rensheju" password = "Sysz2024" connection_check_timeout_sec = 60 max_retries = 3 transaction_timeout_sec = 30 generate_sink_sql = true database = "rensheju" table = "AC12" plugin_input = "AC12" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" batch_size = 20000 enable_upsert = true # 默认 true。当只有 insert 操作时,将此选项设置为 false,可提高性能 primary_keys = ["AAC120"] } jdbc { url = "jdbc:mysql://10.65.178.2:3306/rensheju?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" driver = "com.mysql.cj.jdbc.Driver" user = "rensheju" password = "Sysz2024" connection_check_timeout_sec = 60 max_retries = 3 transaction_timeout_sec = 30 generate_sink_sql = true plugin_input = "AC12_RPT" database = "rensheju" table = "st_sync_rpt" schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST" data_save_mode = "APPEND_DATA" batch_size = 20000 enable_upsert = true # 默认 true。当只有 insert 操作时,将此选项设置为 false,可提高性能 primary_keys = ["TABLE_CLASS","TABLE_NAME","SYNC_DATE"] } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org