Hi Neha, I think it is normal for the data size of a savepoint to be smaller than the full data of a checkpoint. Flink uses rocksdb to store checkpointed data, which is an LSM structured storage where the same key will have multiple version records, while savepoint will traverse all keys and store only one record per key.
But I noticed that you did not enable incremental checkpoint, which resulted in each checkpoint saving full data. You can refer to [1] for more detail and turn it on, which will reduce the data size of the checkpoint. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints Best, Shammon FY On Sun, Jul 16, 2023 at 2:30 PM Neha . <neh...@swiggy.in> wrote: > Hello Shammon FY, > > It is a production issue for me. Can you please take a look if anything > can be done? > > ---------- Forwarded message --------- > From: Neha . <neh...@swiggy.in> > Date: Fri, Jul 14, 2023 at 4:06 PM > Subject: Checkpoint size smaller than Savepoint size > To: <user@flink.apache.org> > > > Hello, > > According to Flink's documentation, Checkpoints are designed to be > lightweight. However, in my Flink pipeline, I have observed that the > savepoint sizes are smaller than the checkpoints. Is this expected > behavior? What are the possible scenarios that can lead to this situation? > > Additionally, I have noticed that the checkpoint size in my datastream > pipeline continues to grow while the savepoint size behaves as expected. > Could this be attributed to the usage of Common Table Expressions (CTEs) in > Flink SQL? > > Flink version: 1.16.1 > Incremental checkpointing is enabled. > StateBackend: RocksDB > Time Characteristic: Ingestion > > SQL: > > SELECT > * > from > ( > With Actuals as ( > SELECT > clientOrderId, > Cast( > ValueFromKeyCacheUDF( > concat('R_', CAST(order_df.restaurant_id AS VARCHAR)) > ) as INT > ) as zoneId, > cityId, > case > when status = 'ASSIGNED' then 1 > else 0 > end as acceptance_flag, > unicast.proctime > FROM > order > INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id > AND order.proctime BETWEEN unicast.proctime - interval '70' minute > AND unicast.proctime + interval '10' minute > and unicast.status in ('ASSIGNED', 'REJECTED') > ), > zone_agg as ( > select > zoneId, > (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`, > avg(cityId) as cityId, > COUNT(*) as `unicast_count`, > proctime() as proctime > from > Actuals > group by > HOP( > proctime(), > interval '5' minute, > interval '30' minute > ), > zoneId > ), > city_agg as( > select > cityId, > sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`, > proctime() as proctime > from > Actuals > group by > HOP( > proctime(), > interval '5' minute, > interval '30' minute > ), > cityId > ), > final as ( > select > zone_agg.zoneId, > zone_agg.cityId, > avg(zone_agg.unicast_count) as unicast_count, > avg(zone_agg.zone_quotient) as zone_quotient, > avg(city_agg.city_quotient) as city_quotient > from > city_agg > INNER join zone_agg on zone_agg.cityId = city_agg.cityId > AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60' > minute > AND zone_agg.proctime > group by > HOP( > proctime(), > interval '5' minute, > interval '30' minute > ), > zone_agg.zoneId, > zone_agg.cityId > ), > new_final as ( > select > 'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key, > zone_quotient, > city_quotient, > case > when unicast_count > 5 then zone_quotient > else city_quotient > end as `value` > from > final > ) > select > key, > case > when new_final.`value` > 1 then 1 > else new_final.`value` > end as `value`, > zone_quotient, > city_quotient > from > new_final > ) > > > > > ------------------------------ > IMPORTANT NOTICE: The contents of this email and any attachments are > confidential in nature and intended solely for the addressee, and are > subject to the terms and conditions of disclosure as further described > here: https://www.scd.swiggy.in/nda. If you are not the intended > recipient or you do not agree to the terms and conditions of disclosure, > please delete this email immediately, and notify the sender by return > email. In the event that you continue to access the information herein or > act upon it in any manner, the terms and conditions shall be deemed > accepted by you.