Hi Neha, I noticed that the `Checkpointed Data Size` is always equals to `Full Checkpoint Data Size`, I think the job is using full checkpoint instead of incremental checkpoint, you can check it
Best, Shammon FY On Mon, Jul 17, 2023 at 10:25 AM Neha . <neh...@swiggy.in> wrote: > Hello Shammon, > > Thank you for your assistance. > I have already enabled the incremental checkpointing, Attaching the > screenshot. Can you please elaborate on what makes you think it is not > enabled, It might hint towards the issue. The problem is checkpoint size is > not going down and keeps on increasing while savepoint size shows the > correct behavior of going up and down with the throughput peaks. > > [image: Screenshot 2023-07-17 at 7.49.19 AM.png] > > > On Mon, Jul 17, 2023 at 6:28 AM Shammon FY <zjur...@gmail.com> wrote: > >> Hi Neha, >> >> I think it is normal for the data size of a savepoint to be smaller than >> the full data of a checkpoint. Flink uses rocksdb to store checkpointed >> data, which is an LSM structured storage where the same key will have >> multiple version records, while savepoint will traverse all keys and store >> only one record per key. >> >> But I noticed that you did not enable incremental checkpoint, which >> resulted in each checkpoint saving full data. You can refer to [1] for more >> detail and turn it on, which will reduce the data size of the checkpoint. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints >> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$> >> >> Best, >> Shammon FY >> >> >> On Sun, Jul 16, 2023 at 2:30 PM Neha . <neh...@swiggy.in> wrote: >> >>> Hello Shammon FY, >>> >>> It is a production issue for me. Can you please take a look if anything >>> can be done? >>> >>> ---------- Forwarded message --------- >>> From: Neha . <neh...@swiggy.in> >>> Date: Fri, Jul 14, 2023 at 4:06 PM >>> Subject: Checkpoint size smaller than Savepoint size >>> To: <user@flink.apache.org> >>> >>> >>> Hello, >>> >>> According to Flink's documentation, Checkpoints are designed to be >>> lightweight. However, in my Flink pipeline, I have observed that the >>> savepoint sizes are smaller than the checkpoints. Is this expected >>> behavior? What are the possible scenarios that can lead to this situation? >>> >>> Additionally, I have noticed that the checkpoint size in my datastream >>> pipeline continues to grow while the savepoint size behaves as expected. >>> Could this be attributed to the usage of Common Table Expressions (CTEs) in >>> Flink SQL? >>> >>> Flink version: 1.16.1 >>> Incremental checkpointing is enabled. >>> StateBackend: RocksDB >>> Time Characteristic: Ingestion >>> >>> SQL: >>> >>> SELECT >>> * >>> from >>> ( >>> With Actuals as ( >>> SELECT >>> clientOrderId, >>> Cast( >>> ValueFromKeyCacheUDF( >>> concat('R_', CAST(order_df.restaurant_id AS VARCHAR)) >>> ) as INT >>> ) as zoneId, >>> cityId, >>> case >>> when status = 'ASSIGNED' then 1 >>> else 0 >>> end as acceptance_flag, >>> unicast.proctime >>> FROM >>> order >>> INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id >>> AND order.proctime BETWEEN unicast.proctime - interval '70' >>> minute >>> AND unicast.proctime + interval '10' minute >>> and unicast.status in ('ASSIGNED', 'REJECTED') >>> ), >>> zone_agg as ( >>> select >>> zoneId, >>> (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`, >>> avg(cityId) as cityId, >>> COUNT(*) as `unicast_count`, >>> proctime() as proctime >>> from >>> Actuals >>> group by >>> HOP( >>> proctime(), >>> interval '5' minute, >>> interval '30' minute >>> ), >>> zoneId >>> ), >>> city_agg as( >>> select >>> cityId, >>> sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`, >>> proctime() as proctime >>> from >>> Actuals >>> group by >>> HOP( >>> proctime(), >>> interval '5' minute, >>> interval '30' minute >>> ), >>> cityId >>> ), >>> final as ( >>> select >>> zone_agg.zoneId, >>> zone_agg.cityId, >>> avg(zone_agg.unicast_count) as unicast_count, >>> avg(zone_agg.zone_quotient) as zone_quotient, >>> avg(city_agg.city_quotient) as city_quotient >>> from >>> city_agg >>> INNER join zone_agg on zone_agg.cityId = city_agg.cityId >>> AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60' >>> minute >>> AND zone_agg.proctime >>> group by >>> HOP( >>> proctime(), >>> interval '5' minute, >>> interval '30' minute >>> ), >>> zone_agg.zoneId, >>> zone_agg.cityId >>> ), >>> new_final as ( >>> select >>> 'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key, >>> zone_quotient, >>> city_quotient, >>> case >>> when unicast_count > 5 then zone_quotient >>> else city_quotient >>> end as `value` >>> from >>> final >>> ) >>> select >>> key, >>> case >>> when new_final.`value` > 1 then 1 >>> else new_final.`value` >>> end as `value`, >>> zone_quotient, >>> city_quotient >>> from >>> new_final >>> ) >>> >>> >>> >>> >>> ------------------------------ >>> IMPORTANT NOTICE: The contents of this email and any attachments are >>> confidential in nature and intended solely for the addressee, and are >>> subject to the terms and conditions of disclosure as further described >>> here: https://www.scd.swiggy.in/nda. If you are not the intended >>> recipient or you do not agree to the terms and conditions of disclosure, >>> please delete this email immediately, and notify the sender by return >>> email. In the event that you continue to access the information herein or >>> act upon it in any manner, the terms and conditions shall be deemed >>> accepted by you. >> >> > > ------------------------------ > IMPORTANT NOTICE: The contents of this email and any attachments are > confidential in nature and intended solely for the addressee, and are > subject to the terms and conditions of disclosure as further described > here: https://www.scd.swiggy.in/nda. If you are not the intended > recipient or you do not agree to the terms and conditions of disclosure, > please delete this email immediately, and notify the sender by return > email. In the event that you continue to access the information herein or > act upon it in any manner, the terms and conditions shall be deemed > accepted by you.