你好 主要流程 见附件
流程就是使用cdc 读取mysql。然后left join 维度表 ,最后写入到mysql
问题是测试的时候。update cdc的源表一条数据。发现结果的数据 有时候有 有时候没有,
使用connect=print 发现两条数据流。一个是delete 一个是insert,这边怀疑是乱序 导致先insert 在delete掉了,
把并行度设置为1的时候。就是正常的。如果沟通不方便 欢迎加钉钉13269166963。
在 2020-10-26 12:11:59,"史 正超" <[email protected]> 写道:
>Hi, @air23, 你能提供下完整的sql吗?,我来复现下这个问题
>________________________________
>发件人: air23 <[email protected]>
>发送时间: 2020年10月23日 6:21
>收件人: [email protected] <[email protected]>
>主题: Flink mysqlCDC ,然后jdbc sink 到mysql 乱序问题
>
>你好,
>这边发现使用cdc读取mysql ,然后写入mysql会有乱序问题
>在上游mysql update一条数据,connert=print是有一条delete 和一条insert的数据,
>但是jdbc写入mysql时候 发现mysql有时候是正常的,但是有时候会没有,当把并行度改成1的时候是正常的。
>这边怀疑是乱序了,先insert 再delete了。所以导致结果表 没有这条数据,请问flink sql 或者flink cdc 怎么保证有序。
package com.flink.large.screen.demand;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import zt.common.util.PropertyUtil;
import java.util.Properties;
public class FlinkGcWmsOrdersCdc2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
//妿并è¡åº¦è®¾ç½®1 å°±ä¸ä¼ä¹±åº
// streamExecutionEnvironment.setParallelism(1);
// streamExecutionEnvironment.set
StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(streamExecutionEnvironment);
// å建cdc表
createCdcTable(tableEnvironment);
// å建维度表
createDimTable(tableEnvironment);
// æ§è¡ä¸å¡é»è¾
executeBusinessSql(tableEnvironment);
}
private static String confCdcCommonMessage(String message) {
String connector = "mysql-cdc";
String hostname = "zongteng75";
String port = "3306";
String username = "root";
String password = "****";
String database_name = "zongteng_streaming";
return String.format(message, connector, hostname, port, username,
password, database_name);
}
/**
* å建cdc表
* @param tableEnvironment
*/
private static void createCdcTable(StreamTableEnvironment tableEnvironment)
{
// // print_table
// String create_table_print_table = "CREATE TABLE print_table (" +
// " f0 STRING" +
// ") with (" +
// " 'connector' = 'print' " +
// ")";
// tableEnvironment.executeSql(create_table_print_table);
// gc_wms_orders
String create_table_gc_wms_orders = confCdcCommonMessage("CREATE TABLE
gc_wms_orders (\n" +
"order_id INT,\n" +
"order_code STRING,\n" +
"reference_no STRING,\n" +
"customer_code STRING,\n" +
"platform STRING,\n" +
"order_platform_type STRING,\n" +
"create_type STRING,\n" +
"warehouse_id INT,\n" +
"is_oda INT,\n" +
"is_insurance INT,\n" +
"sm_code STRING,\n" +
"parcel_quantity INT,\n" +
"order_status INT,\n" +
"problem_status INT,\n" +
"underreview_status INT,\n" +
"upload_express_status INT,\n" +
"anew_express_status INT,\n" +
"intercept_status INT,\n" +
"sync_status INT,\n" +
"add_time TIMESTAMP,\n" +
"order_pick_type INT,\n" +
"sc_id INT,\n" +
"sync_wms_time TIMESTAMP,\n" +
"operator_note STRING,\n" +
"is_fba INT,\n" +
"outbound_time TIMESTAMP,\n" +
"is_more_box INT,\n" +
"o_timestamp STRING,\n" +
"payment_time TIMESTAMP,\n" +
"oms_date_create TIMESTAMP,\n" +
"pre_delivery_time TIMESTAMP,\n" +
"is_flow_volume INT,\n" +
"w_insert_dt STRING,\n" +
"data_flag STRING," +
"PRIMARY KEY (order_id) NOT ENFORCED\n" +
" ) WITH (\n" +
" 'connector' = '%s',\n" +
" 'hostname' = '%s',\n" +
" 'port' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'database-name' = '%s',\n" +
" 'table-name' = 'gc_wms_orders'\n" +
")");
// gc_wms_order_box
String create_table_gc_wms_order_box = confCdcCommonMessage("CREATE
TABLE gc_wms_order_box (\n" +
" order_code STRING,\n" +
" box_no STRING,\n" +
" tracking_number STRING,\n" +
" ob_add_time TIMESTAMP,\n" +
" w_insert_dt STRING,\n" +
" data_flag STRING,\n" +
" PRIMARY KEY (order_code, box_no) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'hostname' = '%s',\n" +
" 'port' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'database-name' = '%s',\n" +
" 'table-name' = 'gc_wms_order_box'\n" +
")");
// gc_wms_ship_order
String create_table_gc_wms_ship_order = confCdcCommonMessage("CREATE
TABLE gc_wms_ship_order (\n" +
" order_id INT,\n" +
" order_code STRING,\n" +
" tracking_number STRING,\n" +
" so_add_time TIMESTAMP,\n" +
" w_insert_dt STRING,\n" +
" data_flag STRING,\n" +
" PRIMARY KEY (order_code) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'hostname' = '%s',\n" +
" 'port' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'database-name' = '%s',\n" +
" 'table-name' = 'gc_wms_ship_order'\n" +
")");
// gc_wms_order_operation_time
String create_table_gc_wms_order_operation_time =
confCdcCommonMessage("CREATE TABLE gc_wms_order_operation_time (\n" +
" oot_id INT,\n" +
" order_id INT,\n" +
" process_time TIMESTAMP,\n" +
" pack_time TIMESTAMP,\n" +
" ship_time TIMESTAMP,\n" +
" import_time TIMESTAMP,\n" +
" w_insert_dt STRING,\n" +
" data_flag STRING,\n" +
" PRIMARY KEY (oot_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'hostname' = '%s',\n" +
" 'port' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'database-name' = '%s',\n" +
" 'table-name' = 'gc_wms_order_operation_time'\n" +
")");
// gc_wms_order_physical_relation
String create_table_gc_wms_order_physical_relation =
confCdcCommonMessage("CREATE TABLE gc_wms_order_physical_relation (\n" +
" opr_id INT,\n" +
" wp_code STRING,\n" +
" order_code STRING,\n" +
" w_insert_dt STRING,\n" +
" data_flag STRING,\n" +
" PRIMARY KEY (opr_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'hostname' = '%s',\n" +
" 'port' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'database-name' = '%s',\n" +
" 'table-name' = 'gc_wms_order_physical_relation'\n" +
")");
// gc_oms_orders
String create_table_gc_oms_orders = confCdcCommonMessage("CREATE TABLE
gc_oms_orders (\n" +
" order_id INT,\n" +
" platform STRING,\n" +
" customer_id INT,\n" +
" company_code STRING,\n" +
" order_type STRING,\n" +
" create_type STRING,\n" +
" order_status INT,\n" +
" sub_status INT,\n" +
" cancel_status INT,\n" +
" create_method INT,\n" +
" shipping_method STRING,\n" +
" shipping_method_platform STRING,\n" +
" warehouse_id STRING,\n" +
" warehouse_code STRING,\n" +
" shipping_method_no STRING,\n" +
" is_oda INT,\n" +
" oda_type INT,\n" +
" is_signature INT,\n" +
" is_insurance INT,\n" +
" insurance_value DECIMAL(10,3),\n" +
" order_weight FLOAT,\n" +
" order_desc STRING,\n" +
" date_create TIMESTAMP,\n" +
" date_release TIMESTAMP,\n" +
" date_pickup TIMESTAMP,\n" +
" date_warehouse_shipping TIMESTAMP,\n" +
" date_last_modify TIMESTAMP,\n" +
" refrence_no STRING,\n" +
" refrence_no_platform STRING,\n" +
" refrence_no_sys STRING,\n" +
" refrence_no_warehouse STRING,\n" +
" shipping_address_id INT,\n" +
" operator_id STRING,\n" +
" operator_note STRING,\n" +
" sync_status STRING,\n" +
" sync_time TIMESTAMP,\n" +
" date_create_platform TIMESTAMP,\n" +
" date_paid_platform TIMESTAMP,\n" +
" date_paid_int INT,\n" +
" amountpaid FLOAT,\n" +
" subtotal FLOAT,\n" +
" ship_fee FLOAT,\n" +
" platform_fee FLOAT,\n" +
" finalvaluefee FLOAT,\n" +
" delivery_fee DECIMAL(12,3),\n" +
" currency STRING,\n" +
" user_account STRING,\n" +
" buyer_id STRING,\n" +
" third_part_ship INT,\n" +
" is_merge INT,\n" +
" site STRING,\n" +
" abnormal_type INT,\n" +
" abnormal_reason STRING,\n" +
" is_one_piece INT,\n" +
" product_count INT,\n" +
" consignee_country STRING,\n" +
" buyer_name STRING,\n" +
" buyer_mail STRING,\n" +
" has_buyer_note INT,\n" +
" fulfillment_channel STRING,\n" +
" ship_service_level STRING,\n" +
" shipment_service_level_category STRING,\n" +
" leave_comment STRING,\n" +
" ebay_case_type STRING,\n" +
" order_refund STRING,\n" +
" process_again STRING,\n" +
" has_export INT,\n" +
" has_pickup INT,\n" +
" has_print_pickup_label INT,\n" +
" service_status INT,\n" +
" service_provider STRING,\n" +
" ot_id INT,\n" +
" sys_tips STRING,\n" +
" consignee_name STRING,\n" +
" consignee_company STRING,\n" +
" consignee_street1 STRING,\n" +
" consignee_street2 STRING,\n" +
" consignee_street3 STRING,\n" +
" consignee_district STRING,\n" +
" consignee_county STRING,\n" +
" consignee_city STRING,\n" +
" consignee_state STRING,\n" +
" consignee_country_code STRING,\n" +
" consignee_country_name STRING,\n" +
" consignee_phone STRING,\n" +
" consignee_email STRING,\n" +
" consignee_postal_code STRING,\n" +
" consignee_doorplate STRING,\n" +
" shared_sign INT,\n" +
" is_returns INT,\n" +
" is_shipping_method_not_allow_update INT,\n" +
" is_fba INT,\n" +
" consignee_is_residential INT,\n" +
" is_more_box INT,\n" +
" is_attachment INT,\n" +
" so_length DECIMAL(10,2),\n" +
" o_timestamp STRING,\n" +
" so_width DECIMAL(10,2),\n" +
" so_height DECIMAL(10,2),\n" +
" age_detection INT,\n" +
" payment_time TIMESTAMP,\n" +
" is_recommend INT,\n" +
" is_truck_service INT,\n" +
" new_order_type INT,\n" +
" design_batch_status INT,\n" +
" auto_verify_result INT,\n" +
" auto_verify_reason STRING,\n" +
" is_sync_tracking_number INT,\n" +
" order_sn STRING,\n" +
" datasource_num_id STRING,\n" +
" data_flag STRING,\n" +
" w_insert_dt STRING,\n" +
" PRIMARY KEY (order_id) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'hostname' = '%s',\n" +
" 'port' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'database-name' = '%s',\n" +
" 'table-name' = 'gc_oms_orders'\n" +
")");
// cdc
tableEnvironment.executeSql(create_table_gc_wms_orders);
tableEnvironment.executeSql(create_table_gc_wms_order_box);
tableEnvironment.executeSql(create_table_gc_wms_ship_order);
tableEnvironment.executeSql(create_table_gc_wms_order_operation_time);
tableEnvironment.executeSql(create_table_gc_wms_order_physical_relation);
tableEnvironment.executeSql(create_table_gc_oms_orders);
}
/**
* æ§è¡ä¸å¡sql
*/
private static void executeBusinessSql(StreamTableEnvironment tEnv) {
String create_table_gc_wms_qudaorealfenxi_view =
"CREATE TABLE gc_wms_qudaorealfenxi_view (\n" +
" uuid STRING,\n" +
" datasource_num_id INT,\n" +
" customer_code STRING,\n" +
" order_code STRING,\n" +
" add_time TIMESTAMP,\n" +
" order_date_release TIMESTAMP,\n" +
" import_time TIMESTAMP,\n" +
" process_time TIMESTAMP,\n" +
" pack_time TIMESTAMP,\n" +
" ship_time TIMESTAMP,\n" +
" tracking_number STRING,\n" +
" warehouse_id INT,\n" +
" warehouse_code STRING,\n" +
" warehouse_desc STRING,\n" +
" timezone INT,\n" +
" sm_code STRING,\n" +
" sm_name STRING,\n" +
" sc_id INT,\n" +
" server_channel_name STRING,\n" +
" server_code STRING,\n" +
" server_name STRING,\n" +
" tms_order_code STRING,\n" +
" wp_code STRING,\n" +
" wp_name STRING,\n" +
" is_fba INT,\n" +
" sm_is_tracking_val STRING,\n" +
" PRIMARY KEY (uuid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' =
'jdbc:mysql://zongteng75:3306/zongteng_streaming?useUnicode=true&characterEncoding=utf8&serverTimezone=GMT%2B8',"
+
" 'table-name' = 'gc_wms_qudaorealfenxi_view',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = 'c4RKchNdOg#',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' ='5'" +
")";
String create_table_gc_wms_qudaorealfenxi_view_print =
"CREATE TABLE gc_wms_qudaorealfenxi_view1 (\n" +
" uuid STRING,\n" +
" datasource_num_id INT,\n" +
" customer_code STRING,\n" +
" order_code STRING,\n" +
" add_time TIMESTAMP,\n" +
" order_date_release TIMESTAMP,\n" +
" import_time TIMESTAMP,\n" +
" process_time TIMESTAMP,\n" +
" pack_time TIMESTAMP,\n" +
" ship_time TIMESTAMP,\n" +
" tracking_number STRING,\n" +
" warehouse_id INT,\n" +
" warehouse_code STRING,\n" +
" warehouse_desc STRING,\n" +
" timezone INT,\n" +
" sm_code STRING,\n" +
" sm_name STRING,\n" +
" sc_id INT,\n" +
" server_channel_name STRING,\n" +
" server_code STRING,\n" +
" server_name STRING,\n" +
" tms_order_code STRING,\n" +
" wp_code STRING,\n" +
" wp_name STRING,\n" +
" is_fba INT,\n" +
" sm_is_tracking_val STRING,\n" +
" PRIMARY KEY (uuid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'print'\n" +
")";
// gc_wms_orders | order_id
// gc_wms_order_box | order_code box_no
// gc_wms_ship_order | order_code
// gc_wms_order_operation_time | oot_id
// gc_wms_order_physical_relation | opr_id
// gc_oms_orders | order_id
String sql = "INSERT INTO gc_wms_qudaorealfenxi_view" +
" SELECT " +
" CONCAT(" +
" CAST(gwo.order_id AS STRING), '-'\n" +
" ,IF (gwob.order_code IS NULL, 'gwob_code_1',
gwob.order_code), '-'\n" +
" ,IF (gwob.box_no IS NULL, 'gwob_no_1', gwob.box_no),
'-'\n" +
" ,IF (gwso.order_code IS NULL, 'gwso_code_1',
gwso.order_code), '-'\n" +
" ,IF (opt.oot_id IS NULL, 'opt_id_1', CAST( opt.oot_id AS
STRING)), '-'\n" +
" ,IF (op.order_code IS NULL, 'op_code_1', CAST (
op.order_code AS STRING)), '-'\n" +
" ,IF (oo.order_id IS NULL, 'oo_id_1', CAST( oo.order_id
AS STRING))\n" +
" )\n" +
" ,9004 AS datasource_num_id\n" +
" ,gwo.customer_code AS customer_code\n" +
" ,gwo.order_code AS order_code\n" +
" ,CAST (gwo.add_time AS TIMESTAMP) AS add_time" +
" ,oo.date_release AS order_date_release" +
" ,opt.import_time AS import_time" +
" ,opt.process_time AS process_time" +
" ,opt.pack_time AS pack_time" +
" ,opt.ship_time AS ship_time" +
" ,IF (gwob.tracking_number IS NOT NULL,
gwob.tracking_number, gwso.tracking_number) AS tracking_number" +
" ,gwo.warehouse_id AS warehouse_id" +
" ,ware.warehouse_code AS warehouse_code" +
" ,ware.warehouse_desc AS warehouse_desc" +
" ,(case\n" +
" when (tz.timezone_season_type = 'summer_time') then\n"
+
" (case\n" +
" when (gwo.add_time between
tz.timezone_season_start and tz.timezone_season_end) then
tz.timezone_summer_number else tz.timezone_winner_number\n" +
" end)\n" +
" when (tz.timezone_season_type = 'winner_time') then\n"
+
" (case\n" +
" when (gwo.add_time between
tz.timezone_season_start and tz.timezone_season_end) then
tz.timezone_winner_number else tz.timezone_summer_number\n" +
" end)\n" +
" end) AS timezone\n" +
" ,gwo.sm_code AS sm_code\n" +
" ,sm.sm_name AS sm_name\n" +
" ,gwo.sc_id AS sc_id\n" +
" ,sc.server_channel_name AS server_channel_name\n" +
" ,s.server_code AS server_code\n" +
" ,s.server_name AS server_name\n" +
" ,CONCAT(gwo.order_code, IF (gwob.box_no IS NULL, '',
concat('-', gwob.box_no))) AS tms_order_code\n" +
" ,op.wp_code AS wp_code\n" +
" ,wp.wp_name AS wp_name\n" +
" ,CAST (gwo.is_fba AS INT) AS is_fba\n" +
" ,sm.sm_is_tracking_val AS sm_is_tracking_val" +
" FROM " + "gc_wms_orders" + " AS gwo\n" +
" LEFT JOIN gc_wms_order_box AS gwob ON gwo.order_code =
gwob.order_code\n" +
" LEFT JOIN gc_wms_ship_order AS gwso ON gwo.order_code =
gwso.order_code\n" +
" LEFT JOIN gc_wms_warehouse AS ware ON CAST (gwo.warehouse_id
AS INT) = ware.warehouse_id\n" +
" LEFT JOIN dp_timezone AS tz ON CAST (gwo.warehouse_id AS INT)
= tz.warehouse_id AND YEAR(CAST(gwo.add_time AS TIMESTAMP)) =
tz.timezone_year\n" +
" LEFT JOIN (SELECT * FROM dim_shipping_method WHERE
datasource_num_id = 9004) AS sm ON gwo.sm_code = sm.sm_code\n" +
" LEFT JOIN (SELECT * FROM dim_server_channel WHERE
datasource_num_id = 9004) AS sc ON CAST (gwo.sc_id AS INT) =
sc.server_channel_id\n" +
" LEFT JOIN (SELECT * FROM dim_server WHERE datasource_num_id =
9004) AS s ON sc.server_channel_server_id = s.server_id\n" +
" LEFT JOIN gc_wms_order_operation_time opt ON CAST
(gwo.order_id AS INT) = opt.order_id\n" +
" LEFT JOIN ( SELECT " +
" gwo.order_code \n" +
" ,min(pr.wp_code) wp_code \n" +
" FROM " + "gc_wms_orders" + " AS gwo \n" +
" LEFT JOIN gc_wms_order_physical_relation AS pr ON
gwo.order_code = pr.order_code \n" +
" WHERE TIMESTAMPDIFF(DAY, gwo.add_time, CURRENT_DATE) <= 10
\n" +
" GROUP BY gwo.order_code" +
" ) AS op \n" +
" ON gwo.order_code = op.order_code\n" +
" LEFT JOIN gc_wms_warehouse_physical wp ON wp.wp_code =
op.wp_code\n" +
" LEFT JOIN gc_oms_orders oo ON oo.refrence_no_platform =
gwo.order_code\n" +
" WHERE 1 = 1\n" +
" AND gwo.order_status <> 0\n" +
" AND gwo.order_platform_type = 'sale'\n" +
" AND gwo.sm_code <> 'ZITI'\n" +
" AND gwo.warehouse_id not in (5,6)\n" +
" AND gwo.customer_code not in
('G403','G440','G441','G448','G452','G1330','215','000010','000016','G1234','192','193','245','G249','000028','000012','G387')\n"
+
" AND TIMESTAMPDIFF(DAY, TO_DATE(gwo.add_time), CURRENT_DATE)
<= 10 \n";
String print_sql = "INSERT INTO gc_wms_qudaorealfenxi_view1" +
" SELECT " +
" CONCAT(" +
" CAST(gwo.order_id AS STRING), '-'\n" +
" ,IF (gwob.order_code IS NULL, 'gwob_code_1',
gwob.order_code), '-'\n" +
" ,IF (gwob.box_no IS NULL, 'gwob_no_1', gwob.box_no),
'-'\n" +
" ,IF (gwso.order_code IS NULL, 'gwso_code_1',
gwso.order_code), '-'\n" +
" ,IF (opt.oot_id IS NULL, 'opt_id_1', CAST( opt.oot_id AS
STRING)), '-'\n" +
" ,IF (op.order_code IS NULL, 'op_code_1', CAST (
op.order_code AS STRING)), '-'\n" +
" ,IF (oo.order_id IS NULL, 'oo_id_1', CAST( oo.order_id
AS STRING))\n" +
" )\n" +
" ,9004 AS datasource_num_id\n" +
" ,gwo.customer_code AS customer_code\n" +
" ,gwo.order_code AS order_code\n" +
" ,CAST (gwo.add_time AS TIMESTAMP) AS add_time" +
" ,oo.date_release AS order_date_release" +
" ,opt.import_time AS import_time" +
" ,opt.process_time AS process_time" +
" ,opt.pack_time AS pack_time" +
" ,opt.ship_time AS ship_time" +
" ,IF (gwob.tracking_number IS NOT NULL,
gwob.tracking_number, gwso.tracking_number) AS tracking_number" +
" ,gwo.warehouse_id AS warehouse_id" +
" ,ware.warehouse_code AS warehouse_code" +
" ,ware.warehouse_desc AS warehouse_desc" +
" ,(case\n" +
" when (tz.timezone_season_type = 'summer_time') then\n"
+
" (case\n" +
" when (gwo.add_time between
tz.timezone_season_start and tz.timezone_season_end) then
tz.timezone_summer_number else tz.timezone_winner_number\n" +
" end)\n" +
" when (tz.timezone_season_type = 'winner_time') then\n"
+
" (case\n" +
" when (gwo.add_time between
tz.timezone_season_start and tz.timezone_season_end) then
tz.timezone_winner_number else tz.timezone_summer_number\n" +
" end)\n" +
" end) AS timezone\n" +
" ,gwo.sm_code AS sm_code\n" +
" ,sm.sm_name AS sm_name\n" +
" ,gwo.sc_id AS sc_id\n" +
" ,sc.server_channel_name AS server_channel_name\n" +
" ,s.server_code AS server_code\n" +
" ,s.server_name AS server_name\n" +
" ,CONCAT(gwo.order_code, IF (gwob.box_no IS NULL, '',
concat('-', gwob.box_no))) AS tms_order_code\n" +
" ,op.wp_code AS wp_code\n" +
" ,wp.wp_name AS wp_name\n" +
" ,CAST (gwo.is_fba AS INT) AS is_fba\n" +
" ,sm.sm_is_tracking_val AS sm_is_tracking_val" +
" FROM " + "gc_wms_orders" + " AS gwo\n" +
" LEFT JOIN gc_wms_order_box AS gwob ON gwo.order_code =
gwob.order_code\n" +
" LEFT JOIN gc_wms_ship_order AS gwso ON gwo.order_code =
gwso.order_code\n" +
" LEFT JOIN gc_wms_warehouse AS ware ON CAST (gwo.warehouse_id
AS INT) = ware.warehouse_id\n" +
" LEFT JOIN dp_timezone AS tz ON CAST (gwo.warehouse_id AS INT)
= tz.warehouse_id AND YEAR(CAST(gwo.add_time AS TIMESTAMP)) =
tz.timezone_year\n" +
" LEFT JOIN (SELECT * FROM dim_shipping_method WHERE
datasource_num_id = 9004) AS sm ON gwo.sm_code = sm.sm_code\n" +
" LEFT JOIN (SELECT * FROM dim_server_channel WHERE
datasource_num_id = 9004) AS sc ON CAST (gwo.sc_id AS INT) =
sc.server_channel_id\n" +
" LEFT JOIN (SELECT * FROM dim_server WHERE datasource_num_id =
9004) AS s ON sc.server_channel_server_id = s.server_id\n" +
" LEFT JOIN gc_wms_order_operation_time opt ON CAST
(gwo.order_id AS INT) = opt.order_id\n" +
" LEFT JOIN ( SELECT " +
" gwo.order_code \n" +
" ,min(pr.wp_code) wp_code \n" +
" FROM " + "gc_wms_orders" + " AS gwo \n" +
" LEFT JOIN gc_wms_order_physical_relation AS pr ON
gwo.order_code = pr.order_code \n" +
" WHERE TIMESTAMPDIFF(DAY, gwo.add_time, CURRENT_DATE) <= 10
\n" +
" GROUP BY gwo.order_code" +
" ) AS op \n" +
" ON gwo.order_code = op.order_code\n" +
" LEFT JOIN gc_wms_warehouse_physical wp ON wp.wp_code =
op.wp_code\n" +
" LEFT JOIN gc_oms_orders oo ON oo.refrence_no_platform =
gwo.order_code\n" +
" WHERE 1 = 1\n" +
" AND gwo.order_status <> 0\n" +
" AND gwo.order_platform_type = 'sale'\n" +
" AND gwo.sm_code <> 'ZITI'\n" +
" AND gwo.warehouse_id not in (5,6)\n" +
" AND gwo.customer_code not in
('G403','G440','G441','G448','G452','G1330','215','000010','000016','G1234','192','193','245','G249','000028','000012','G387')\n"
+
" AND TIMESTAMPDIFF(DAY, TO_DATE(gwo.add_time), CURRENT_DATE)
<= 10 \n";
tEnv.executeSql(create_table_gc_wms_qudaorealfenxi_view);
tEnv.executeSql(create_table_gc_wms_qudaorealfenxi_view_print); //
è°è¯
StatementSet stmtSet = tEnv.createStatementSet();
stmtSet.addInsertSql(sql);
stmtSet.addInsertSql(print_sql); // è°è¯
// æ§è¡ååæ·»å çææ INSERT è¯å¥
stmtSet.execute();
}
/**
* å建纬度表
*/
private static void createDimTable(StreamTableEnvironment tEnv) {
// gc_wms_warehouse
String create_table_gc_wms_warehouse = confJdbc("CREATE TABLE
gc_wms_warehouse (\n" +
" warehouse_id INT,\n" +
" warehouse_code STRING,\n" +
" warehouse_type INT,\n" +
" warehouse_status INT,\n" +
" warehouse_virtual INT,\n" +
" warehouse_transfer INT,\n" +
" country_id INT,\n" +
" country_code STRING,\n" +
" state STRING,\n" +
" city STRING,\n" +
" contacter STRING,\n" +
" company STRING,\n" +
" phone_no STRING,\n" +
" street_address1 STRING,\n" +
" street_address2 STRING,\n" +
" postcode STRING,\n" +
" warehouse_desc STRING,\n" +
" warehouse_add_time TIMESTAMP,\n" +
" warehouse_update_time TIMESTAMP,\n" +
" warehouse_proxy_code STRING,\n" +
" street_number STRING,\n" +
" timezone INT,\n" +
" value_added_tax INT,\n" +
" warehouse_en STRING,\n" +
" saving_time INT,\n" +
" arrival_days INT,\n" +
" currency_code STRING,\n" +
" `day` STRING,\n" +
" PRIMARY KEY (warehouse_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'url' = '%s',\n" +
" 'table-name' = 'gc_wms_warehouse',\n" +
" 'driver' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' = '5'\n" +
")");
String create_table_dp_timezone = confJdbc("CREATE TABLE dp_timezone
(\n" +
" warehouse_id INT,\n" +
" warehouse_code STRING,\n" +
" warehouse_name STRING,\n" +
" timezone_year INT,\n" +
" timezone_season_type STRING,\n" +
" timezone_season_start TIMESTAMP,\n" +
" timezone_season_end TIMESTAMP,\n" +
" timezone_summer_number INT,\n" +
" timezone_winner_number INT,\n" +
" timezone_summer_time_dif_val INT,\n" +
" timezone_winner_time_dif_val INT\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'url' = '%s',\n" +
" 'table-name' = 'dp_timezone',\n" +
" 'driver' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' = '5'\n" +
")");
String create_table_dim_shipping_method = confJdbc("CREATE TABLE
dim_shipping_method (\n" +
" row_wid STRING,\n" +
" warehouse_key STRING,\n" +
" sc_key STRING,\n" +
" st_key STRING,\n" +
" sct_key STRING,\n" +
" sm_transfer_warehouse_key STRING,\n" +
" sm_id INT,\n" +
" sm_code STRING,\n" +
" sm_short_name STRING,\n" +
" sm_name_cn STRING,\n" +
" sm_name STRING,\n" +
" sm_pg_code STRING,\n" +
" sm_mp_fee DECIMAL(10,2),\n" +
" sm_reg_fee DECIMAL(10,2),\n" +
" sm_addons DECIMAL(10,2),\n" +
" sm_type INT,\n" +
" sm_type_val STRING,\n" +
" sm_baf DECIMAL(10,2),\n" +
" sm_discount DECIMAL(10,2),\n" +
" sm_delivery_time_min STRING,\n" +
" sm_delivery_time_max STRING,\n" +
" sm_delivery_time_avg STRING,\n" +
" sm_is_volume INT,\n" +
" sm_is_volume_val STRING,\n" +
" sm_is_charge INT,\n" +
" sm_is_charge_val STRING,\n" +
" sm_vol_rate DECIMAL(15,4),\n" +
" sm_status INT,\n" +
" sm_status_val STRING,\n" +
" sm_class_code STRING,\n" +
" sm_class_code_val STRING,\n" +
" sm_logo STRING,\n" +
" sm_return_recipient STRING,\n" +
" sm_return_address STRING,\n" +
" sm_discount_min DECIMAL(10,2),\n" +
" sm_mp_fee_min DECIMAL(10,2),\n" +
" sm_reg_fee_min DECIMAL(10,2),\n" +
" sm_limit_volume INT,\n" +
" sm_limit_weight DECIMAL(10,3),\n" +
" sm_sort INT,\n" +
" sm_is_tracking INT,\n" +
" sm_is_tracking_val STRING,\n" +
" sm_is_signature INT,\n" +
" sm_is_signature_val STRING,\n" +
" sm_is_insurance INT,\n" +
" sm_is_insurance_val STRING,\n" +
" sm_is_validate_remote INT,\n" +
" sm_is_validate_remote_val STRING,\n" +
" sm_warehouse_id INT,\n" +
" sm_calc_type INT,\n" +
" sm_fee_type INT,\n" +
" sm_sc_id INT,\n" +
" sm_st_id INT,\n" +
" sm_carrier_number STRING,\n" +
" sm_allow_letter INT,\n" +
" sm_allow_letter_val STRING,\n" +
" sm_sct_id INT,\n" +
" sm_update_time TIMESTAMP,\n" +
" sm_pre_generate_label INT,\n" +
" sm_pre_generate_label_val STRING,\n" +
" sm_code_type INT,\n" +
" sm_code_type_val STRING,\n" +
" sm_is_more_box INT,\n" +
" sm_is_more_box_val STRING,\n" +
" sm_return_type INT,\n" +
" sm_attribute STRING,\n" +
" sm_attribute_val STRING,\n" +
" sm_is_claim INT,\n" +
" sm_is_claim_val STRING,\n" +
" sm_transfer_warehouse_id INT,\n" +
" sm_age_detection INT,\n" +
" sm_age_detection_val STRING,\n" +
" sm_recommend_platform STRING,\n" +
" sm_volume_unit STRING,\n" +
" sm_volume_type INT,\n" +
" sm_volume_type_val STRING,\n" +
" sm_volume_value DECIMAL(10,2),\n" +
" sm_weight_unit STRING,\n" +
" sm_weight_type INT,\n" +
" sm_weight_type_val STRING,\n" +
" sm_weight_value DECIMAL(10,2),\n" +
" sm_claim_desc STRING,\n" +
" sm_sfp_label_type INT,\n" +
" sm_sfp_label_type_val STRING,\n" +
" sm_life_gate INT,\n" +
" sm_life_gate_val STRING,\n" +
" sm_truck INT,\n" +
" sm_truck_val STRING,\n" +
" datasource_num_id STRING,\n" +
" data_flag STRING,\n" +
" w_insert_dt TIMESTAMP,\n" +
" PRIMARY KEY (row_wid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'url' = '%s',\n" +
" 'table-name' = 'dim_shipping_method',\n" +
" 'driver' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' = '5'\n" +
")");
// dim_server_channel
String create_table_dim_server_channel = confJdbc("CREATE TABLE
dim_server_channel (\n" +
" row_wid STRING,\n" +
" server_key STRING,\n" +
" server_channel_id INT,\n" +
" server_channel_server_id INT,\n" +
" server_channel_code STRING,\n" +
" server_channel_short_name STRING,\n" +
" server_channel_name STRING,\n" +
" server_channel_name_en STRING,\n" +
" server_channel_note STRING,\n" +
" server_channel_status INT,\n" +
" server_channel_status_val STRING,\n" +
" server_channel_is_weighing INT,\n" +
" server_channel_is_weighing_val STRING,\n" +
" server_channel_is_tracking INT,\n" +
" server_channel_is_tracking_val STRING,\n" +
" server_channel_is_volume INT,\n" +
" server_channel_is_volume_val STRING,\n" +
" server_channel_st_id INT,\n" +
" server_channel_st_auto_delivery INT,\n" +
" server_channel_st_split_start INT,\n" +
" server_channel_st_split_value INT,\n" +
" server_channel_st_split_type STRING,\n" +
" server_channel_add_time TIMESTAMP,\n" +
" server_channel_update_time TIMESTAMP,\n" +
" server_channel_asp_id INT,\n" +
" server_channel_asp_code STRING,\n" +
" server_channel_apm_id INT,\n" +
" server_channel_api_pk_type STRING,\n" +
" server_channel_wut_code STRING,\n" +
" server_channel_min_weight DECIMAL(11,3),\n" +
" server_channel_max_weight DECIMAL(11,3),\n" +
" server_channel_is_validate_address INT,\n" +
" server_channel_is_validate_address_val STRING,\n" +
" server_channel_is_pack_weighing INT,\n" +
" server_channel_is_pack_weighing_val STRING,\n" +
" server_channel_sign_type INT,\n" +
" server_channel_right_trim_length INT,\n" +
" server_channel_left_trim_length INT,\n" +
" server_channel_is_mark_weight INT,\n" +
" server_channel_is_mark_weight_val STRING,\n" +
" server_channel_push_tms_status INT,\n" +
" datasource_num_id STRING,\n" +
" data_flag STRING,\n" +
" w_insert_dt TIMESTAMP,\n" +
" PRIMARY KEY (row_wid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'url' = '%s',\n" +
" 'table-name' = 'dim_server_channel',\n" +
" 'driver' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' = '5'\n" +
")");
// dim_server
String create_table_dim_server = confJdbc("CREATE TABLE dim_server (\n"
+
" row_wid STRING,\n" +
" server_id INT,\n" +
" server_code STRING,\n" +
" server_name STRING,\n" +
" server_contact_name STRING,\n" +
" server_contact_phone STRING,\n" +
" server_address STRING,\n" +
" server_settlement_type INT,\n" +
" server_settlement_type_val STRING,\n" +
" server_balance DECIMAL(10,3),\n" +
" server_currency_code STRING,\n" +
" server_tax DECIMAL(10,3),\n" +
" server_person_user INT,\n" +
" server_update_date TIMESTAMP,\n" +
" server_type INT,\n" +
" server_is_ignore_express INT,\n" +
" server_as_id INT,\n" +
" server_as_name STRING,\n" +
" server_add_date TIMESTAMP,\n" +
" server_status INT,\n" +
" server_status_val STRING,\n" +
" server_ap_id INT,\n" +
" server_carrier_name STRING,\n" +
" server_cancel_label_flag INT,\n" +
" server_is_cancel_label_flag_val STRING,\n" +
" datasource_num_id STRING,\n" +
" data_flag STRING,\n" +
" w_insert_dt TIMESTAMP,\n" +
" PRIMARY KEY (row_wid) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'url' = '%s',\n" +
" 'table-name' = 'dim_server',\n" +
" 'driver' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' = '5'\n" +
")");
// gc_wms_warehouse_physical
String create_table_gc_wms_warehouse_physical = confJdbc("CREATE TABLE
gc_wms_warehouse_physical (\n" +
" wp_id BIGINT,\n" +
" warehouse_id INT,\n" +
" wp_name STRING,\n" +
" wp_code STRING,\n" +
" wp_status INT,\n" +
" wp_priority INT,\n" +
" wp_state STRING,\n" +
" wp_city STRING,\n" +
" wp_postcode STRING,\n" +
" wp_company STRING,\n" +
" wp_contacter STRING,\n" +
" wp_phone STRING,\n" +
" wp_street_address1 STRING,\n" +
" wp_street_address2 STRING,\n" +
" wp_street_number STRING,\n" +
" wp_add_time TIMESTAMP,\n" +
" wp_update_time TIMESTAMP,\n" +
" `day` STRING,\n" +
" PRIMARY KEY (wp_id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = '%s',\n" +
" 'url' = '%s',\n" +
" 'table-name' = 'gc_wms_warehouse_physical',\n" +
" 'driver' = '%s',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s',\n" +
" 'sink.buffer-flush.interval' = '1s',\n" +
" 'sink.buffer-flush.max-rows' = '5'\n" +
")");
tEnv.executeSql(create_table_gc_wms_warehouse);
tEnv.executeSql(create_table_dp_timezone);
tEnv.executeSql(create_table_dim_shipping_method);
tEnv.executeSql(create_table_dim_server_channel);
tEnv.executeSql(create_table_dim_server);
tEnv.executeSql(create_table_gc_wms_warehouse_physical);
}
/**
* é
ç½®jdbc屿§
* @param tableSql
*/
private static String confJdbc(String tableSql) {
Properties properties =
PropertyUtil.loadProperties("druid_jdbc/zongteng_streaming.properties");
String connector = "jdbc";
String url = properties.getProperty("url");
String driver = properties.getProperty("driverClassName");
String username = properties.getProperty("username");
String password = properties.getProperty("password");
return String.format(tableSql, connector, url, driver, username,
password);
}
}