[ 
https://issues.apache.org/jira/browse/FLINK-25559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zongwen Li updated FLINK-25559:
-------------------------------
    Description: 
{code:java}
// sql omits some selected fields
INSERT INTO kd_product_info
SELECT
 ps.product AS productId,
 ps.productsaleid AS productSaleId,
 CAST(p.complex AS INT) AS complex,
 p.createtime AS createTime,
 p.updatetime AS updateTime,
 p.ean AS ean,
 ts.availablequantity AS totalAvailableStock,
IF
 (
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
  0
 ) AS sharedStock
 ,rps.purchase AS purchase
 ,v.`name` AS vendorName
 FROM
 product_sale ps
 JOIN product p ON ps.product = p.id
 LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
 LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
 LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
 LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory = 
mc.id
 LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
 LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
 LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale 
AND pse359.meta = 359
 LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
All table sources are upsert-kafka,and format is json;

the data in each table is between 5 million and 10 million, parallelism: 24;

We tested sinking to kudu and ES;

Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;

After many tests, we found that when the left join table is more or the 
parallelism of the operator is greater, the data will be more easily lost.

In addition, we also found that a job(1.14.0) lost some data after running for 
half a month with parallelism of 1, and the data is normal after rerunning. 
Guess it's the same problem;

  was:
{code:java}
// sql omits some selected fields
INSERT INTO kd_product_info
SELECT
 ps.product AS productId,
 ps.productsaleid AS productSaleId,
 CAST(p.complex AS INT) AS complex,
 p.createtime AS createTime,
 p.updatetime AS updateTime,
 p.ean AS ean,
 ts.availablequantity AS totalAvailableStock,
IF
 (
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
  ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
  0
 ) AS sharedStock
 ,rps.purchase AS purchase
 ,v.`name` AS vendorName
 FROM
 product_sale ps
 JOIN product p ON ps.product = p.id
 LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
 LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
 LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
 LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory = 
mc.id
 LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
 LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
 LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = pse359.product_sale 
AND pse359.meta = 359
 LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
All table sources are upsert-kafka,and format is json;

the data in each table is between 5 million and 10 million, parallelism: 24;

We tested sinking to kudu and ES;

Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;

After many tests, we found that when the left join table is more or the 
parallelism of the operator is greater, the data will be more easily lost.

In addition, we also found that a job(1.14.0) lost some data after running for 
half a month with a parallelism of 1, and the data is normal after rerunning. 
Guess it's the same problem;


> SQL JOIN causes data loss
> -------------------------
>
>                 Key: FLINK-25559
>                 URL: https://issues.apache.org/jira/browse/FLINK-25559
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2
>            Reporter: Zongwen Li
>            Priority: Major
>
> {code:java}
> // sql omits some selected fields
> INSERT INTO kd_product_info
> SELECT
>  ps.product AS productId,
>  ps.productsaleid AS productSaleId,
>  CAST(p.complex AS INT) AS complex,
>  p.createtime AS createTime,
>  p.updatetime AS updateTime,
>  p.ean AS ean,
>  ts.availablequantity AS totalAvailableStock,
> IF
>  (
>   ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
>   ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
>   0
>  ) AS sharedStock
>  ,rps.purchase AS purchase
>  ,v.`name` AS vendorName
>  FROM
>  product_sale ps
>  JOIN product p ON ps.product = p.id
>  LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
>  LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
>  LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
>  LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory 
> = mc.id
>  LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
>  LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
>  LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = 
> pse359.product_sale AND pse359.meta = 359
>  LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
> All table sources are upsert-kafka,and format is json;
> the data in each table is between 5 million and 10 million, parallelism: 24;
> We tested sinking to kudu and ES;
> Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
> After many tests, we found that when the left join table is more or the 
> parallelism of the operator is greater, the data will be more easily lost.
> In addition, we also found that a job(1.14.0) lost some data after running 
> for half a month with parallelism of 1, and the data is normal after 
> rerunning. Guess it's the same problem;



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to