chenyz1984 commented on issue #8702:
URL: https://github.com/apache/seatunnel/issues/8702#issuecomment-2658038725

   作业配置:
   
   ```yml
   env {
     parallelism = 8
     job.mode = "BATCH"
     # checkpoint.interval = 300000
   }
   source{
     Jdbc {
       url = "jdbc:oracle:thin:@//10.65.178.2:51511/pdb2"
       driver = "oracle.jdbc.OracleDriver"
       user = "shearddata"
       password = "SYldjy_2025"
       connection_check_timeout_sec = 100
       table_path = "LNJY.AC12",
       query = """
   SELECT * FROM LNJY.AC12 
    WHERE (
          ZAE036 >= TRUNC(TO_DATE('"""${BY_DAY_BEGIN}"""', 'yyyyMMdd'),'DD') 
      AND ZAE036 <  TRUNC(TO_DATE('"""${BY_DAY_END}"""', 'yyyyMMdd'),'DD')
          ) 
       OR ZAE036 IS NULL
       """
       partition_column = "AAC120"  
       split.size = 20000
       plugin_output = "AC12_SRC"
     }
     Jdbc {
       url = "jdbc:oracle:thin:@//10.65.178.2:51511/pdb2"
       driver = "oracle.jdbc.OracleDriver"
       user = "shearddata"
       password = "SYldjy_2025"
       connection_check_timeout_sec = 100
       table_path = "LNJY.AC12",
       query = """
   SELECT COUNT(1) AS SRC_TOTAL_COUNT,
          SUM(
             CASE WHEN (
                       ZAE036 >= TRUNC(TO_DATE('"""${BY_DAY_BEGIN}"""', 
'yyyyMMdd'),'DD') 
                   AND ZAE036 <  TRUNC(TO_DATE('"""${BY_DAY_END}"""', 
'yyyyMMdd'),'DD')
                     ) 
                    OR ZAE036 IS NULL
                  THEN 1
                  ELSE 0 END
                 ) AS READ_COUNT
     FROM LNJY.AC12
       """
       split.size = 20000
       plugin_output = "AC12_CNT"
     }
   }
   
   transform {
     Sql {
       plugin_input = "AC12_SRC"
       plugin_output = "AC12"
       query = "SELECT *, NOW() AS PTIME FROM AC12_SRC"
     }
     Sql {
       plugin_input = "AC12_CNT"
       plugin_output = "AC12_RPT"
       query = """
   SELECT '人社_就业' AS TABLE_CLASS,
          'AC12' AS TABLE_NAME,
          CURRENT_DATE() AS SYNC_DATE,
          SRC_TOTAL_COUNT,
          READ_COUNT,
          NOW() AS BEGIN_TIME,
          CAST(NULL AS INT) AS DEST_TOTAL_COUNT, 
          CAST(NULL AS INT) AS WRITE_COUNT,
          CAST(NULL AS TIMESTAMP) AS END_TIME 
     FROM AC12_CNT
       """
     }  
   }
   sink {
     jdbc {
       url = 
"jdbc:mysql://10.65.178.2:3306/rensheju?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
       driver = "com.mysql.cj.jdbc.Driver"
       user = "rensheju"
       password = "Sysz2024"
       connection_check_timeout_sec = 60
       max_retries = 3
       transaction_timeout_sec = 30
       generate_sink_sql = true
       database = "rensheju"
       table = "AC12"
       plugin_input = "AC12"
       schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
       data_save_mode = "APPEND_DATA"
       batch_size = 20000
       enable_upsert = true        # 默认 true。当只有 insert 操作时,将此选项设置为 false,可提高性能
       primary_keys = ["AAC120"]
     }
     jdbc {
       url = 
"jdbc:mysql://10.65.178.2:3306/rensheju?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
       driver = "com.mysql.cj.jdbc.Driver"
       user = "rensheju"
       password = "Sysz2024"
       connection_check_timeout_sec = 60
       max_retries = 3
       transaction_timeout_sec = 30
       generate_sink_sql = true
       plugin_input = "AC12_RPT"
       database = "rensheju"
       table = "st_sync_rpt"
       schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
       data_save_mode = "APPEND_DATA"
       batch_size = 20000
       enable_upsert = true        # 默认 true。当只有 insert 操作时,将此选项设置为 false,可提高性能
       primary_keys = ["TABLE_CLASS","TABLE_NAME","SYNC_DATE"]
     }  
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to