[ https://issues.apache.org/jira/browse/FLINK-25559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17470352#comment-17470352 ]
Zongwen Li commented on FLINK-25559: ------------------------------------ [~lincoln.86xy] Yes, the primary key of the Sink table is different from the last join key. And I updated the SQL related to primary and join keys for all tables. > SQL JOIN causes data loss > ------------------------- > > Key: FLINK-25559 > URL: https://issues.apache.org/jira/browse/FLINK-25559 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2 > Reporter: Zongwen Li > Priority: Major > Attachments: image-2022-01-07-11-27-01-010.png > > > {code:java} > //sink table,omits some physical fields > CREATE TABLE kd_product_info ( > productSaleId BIGINT COMMENT '商品编号', > productId BIGINT COMMENT '产品编号', > PRIMARY KEY (productSaleId) NOT ENFORCED > ) > // sql omits some selected fields > INSERT INTO kd_product_info > SELECT > ps.product AS productId, > ps.productsaleid AS productSaleId, > CAST(p.complex AS INT) AS complex, > p.createtime AS createTime, > p.updatetime AS updateTime, > p.ean AS ean, > ts.availablequantity AS totalAvailableStock, > IF > ( > ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0, > ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity, > 0 > ) AS sharedStock > ,rps.purchase AS purchase > ,v.`name` AS vendorName > FROM > product_sale ps > JOIN product p ON ps.product = p.id > LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id > LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale > LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor > LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory > = mc.id > LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc > LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale > LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = > pse359.product_sale AND pse359.meta = 359 > LEFT JOIN product_image_url piu ON ps.product = piu.product {code} > All table sources are upsert-kafka,I have ensured that the associated columns > are of the same type: > > {code:java} > CREATE TABLE product_sale ( > id BIGINT COMMENT '主键', > productsaleid BIGINT COMMENT '商品编号', > product BIGINT COMMENT '产品编号', > merchant_id DECIMAL(20, 0) COMMENT '商户id', > vendor STRING COMMENT '供应商', > PRIMARY KEY (productsaleid) NOT ENFORCED > ) > // No computed columns > // Just plain physical columns > WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'XXX', > 'group.id' = '%s', > 'properties.bootstrap.servers' = '%s', > 'key.format' = 'json', > 'value.format' = 'json' > ) > CREATE TABLE product ( > id BIGINT, > mccategory STRING, > PRIMARY KEY (id) NOT ENFORCED > ) > CREATE TABLE rate_product_sale ( > id BIGINT COMMENT '主键', > PRIMARY KEY (id) NOT ENFORCED > ) > CREATE TABLE pss_total_stock ( > id INT COMMENT 'ID', > productsale BIGINT COMMENT '商品编码', > PRIMARY KEY (id) NOT ENFORCED > ) > CREATE TABLE vendor ( > merchant_id DECIMAL(20, 0) COMMENT '商户id', > vendor STRING COMMENT '供应商编码', > PRIMARY KEY (merchant_id, vendor) NOT ENFORCED > ) > CREATE TABLE mccategory ( > id STRING COMMENT 'mc编号', > merchant_id DECIMAL(20, 0) COMMENT '商户id', > PRIMARY KEY (merchant_id, id) NOT ENFORCED > ) > CREATE TABLE new_mccategory ( > mc STRING, > PRIMARY KEY (mc) NOT ENFORCED > ) > CREATE TABLE product_sale_grade_plus ( > productsale BIGINT, > PRIMARY KEY (productsale) NOT ENFORCED > ) > CREATE TABLE product_sale_extend ( > id BIGINT, > product_sale BIGINT, > meta BIGINT, > PRIMARY KEY (id) NOT ENFORCED > ) > CREATE TABLE product_image_url( > product BIGINT, > PRIMARY KEY (product) NOT ENFORCED > ){code} > the data in each table is between 5 million and 10 million, parallelism: 24; > Not set ttl; In fact, we can notice data loss as soon as 30 minutes. > > The data flow: > MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink > I'm sure the ODS data in Kafka is correct; > I have also tried to use the flink-cdc source directly, it didn't solve the > problem; > > We tested sinking to kudu, Kafka or ES; > Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2; > Lost data appears out of order on kafka, guessed as a bug of retraction > stream: > !image-2022-01-07-11-27-01-010.png! > > After many tests, we found that when the left join table is more or the > parallelism of the operator is greater, the data will be more easily lost. -- This message was sent by Atlassian Jira (v8.20.1#820001)