[ 
https://issues.apache.org/jira/browse/FLINK-25559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17470316#comment-17470316
 ] 

Zongwen Li commented on FLINK-25559:
------------------------------------

[~lincoln.86xy] 
{code:java}
WITH (
'connector' = 'upsert-kafka',
'topic' = 'XXX',
'sink.parallelism' = '%s',
'properties.bootstrap.servers' = 's',
'key.format' = 'json',
'value.format' = 'json'
)
{code}
Not set TTL;

The associated fields are of the same type;

> SQL JOIN causes data loss
> -------------------------
>
>                 Key: FLINK-25559
>                 URL: https://issues.apache.org/jira/browse/FLINK-25559
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.14.0, 1.13.2, 1.13.3, 1.13.5, 1.14.2
>            Reporter: Zongwen Li
>            Priority: Major
>
> {code:java}
> // sql omits some selected fields
> INSERT INTO kd_product_info
> SELECT
>  ps.product AS productId,
>  ps.productsaleid AS productSaleId,
>  CAST(p.complex AS INT) AS complex,
>  p.createtime AS createTime,
>  p.updatetime AS updateTime,
>  p.ean AS ean,
>  ts.availablequantity AS totalAvailableStock,
> IF
>  (
>   ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity > 0,
>   ts.availablequantity - ts.lockoccupy - ts.lock_available_quantity,
>   0
>  ) AS sharedStock
>  ,rps.purchase AS purchase
>  ,v.`name` AS vendorName
>  FROM
>  product_sale ps
>  JOIN product p ON ps.product = p.id
>  LEFT JOIN rate_product_sale rps ON ps.productsaleid = rps.id
>  LEFT JOIN pss_total_stock ts ON ps.productsaleid = ts.productsale
>  LEFT JOIN vendor v ON ps.merchant_id = v.merchant_id AND ps.vendor = v.vendor
>  LEFT JOIN mccategory mc ON ps.merchant_id = mc.merchant_id AND p.mccategory 
> = mc.id
>  LEFT JOIN new_mccategory nmc ON p.mccategory = nmc.mc
>  LEFT JOIN product_sale_grade_plus psgp ON ps.productsaleid = psgp.productsale
>  LEFT JOIN product_sale_extend pse359 ON ps.productsaleid = 
> pse359.product_sale AND pse359.meta = 359
>  LEFT JOIN product_image_url piu ON ps.product = piu.product {code}
> All table sources are upsert-kafka,and format is json;
> the data in each table is between 5 million and 10 million, parallelism: 24;
> We tested sinking to kudu and ES;
> Also tested multiple versions: 1.13.2, 1.13.3, 1.13.5, 1.14.0, 1.14.2;
> After many tests, we found that when the left join table is more or the 
> parallelism of the operator is greater, the data will be more easily lost.
> In addition, we also found that a job(1.14.0) lost some data after running 
> for half a month with parallelism of 1, and the data is normal after 
> rerunning. Guess it's the same problem;



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to