[ https://issues.apache.org/jira/browse/FLINK-25559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17470323#comment-17470323 ]
lincoln lee commented on FLINK-25559: ------------------------------------- [~Ashulin] Does the sink table's primary key differs from your last join key (` LEFT JOIN product_image_url piu ON ps.product = piu.product `)? If so, maybe the case of FLINK-20370, you can try 1.14.3 > SQL JOIN causes data loss > ------------------------- > > Key: FLINK-25559 > URL: https://issues.apache.org/jira/browse/FLINK-25559 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2 > Reporter: Zongwen Li > Priority: Major > Attachments: image-2022-01-07-11-27-01-010.png > > > {code:java} > // sql omits some selected fields > INSERT INTO kd_product_info > SELECT > ps.product AS productId, > ps.productsaleid AS productSaleId, > CAST(p.complex AS INT) AS complex, > p.createtime AS createTime, > p.updatetime AS updateTime, > p.ean AS ean, > ts.availablequantity AS totalAvailableStock, > IF > ( > ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0, > ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity, > 0 > ) AS sharedStock > ,rps.purchase AS purchase > ,v.`name` AS vendorName > FROM > product_sale ps > JOIN product p ON ps.product = p.id > LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id > LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale > LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor > LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory > = mc.id > LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc > LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale > LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = > pse359.product_sale AND pse359.meta = 359 > LEFT JOIN product_image_url piu ON ps.product = piu.product {code} > All table sources are upsert-kafka,I have ensured that the associated columns > are of the same type: > > {code:java} > // No computed columns > // Just plain physical columns > PRIMARY KEY (xx) NOT ENFORCED > ) WITH ( > 'connector' = 'upsert-kafka', > 'topic' = 'XXX', > 'group.id' = '%s', > 'properties.bootstrap.servers' = '%s', > 'key.format' = 'json', > 'value.format' = 'json' > ) {code} > the data in each table is between 5 million and 10 million, parallelism: 24; > Not set ttl; In fact, we can notice data loss as soon as 30 minutes. > > The data flow: > MySQL -> Flink CDC -> ODS (Upsert Kafka) -> the job -> sink > I'm sure the ODS data in Kafka is correct; > I have also tried to use the flink-cdc source directly, it didn't solve the > problem; > > We tested sinking to kudu, Kafka or ES; > Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2; > Lost data appears out of order on kafka, guessed as a bug of retraction > stream: > !image-2022-01-07-11-27-01-010.png! > > After many tests, we found that when the left join table is more or the > parallelism of the operator is greater, the data will be more easily lost. -- This message was sent by Atlassian Jira (v8.20.1#820001)