I noticed that the `Checkpointed Data Size` is always equals to `Full
Checkpoint Data Size`, I think the job is using full checkpoint instead of
incremental checkpoint,  you can check it

> I have already enabled the incremental checkpointing, Attaching the
> screenshot. Can you please elaborate on what makes you think it is not
> enabled, It might hint towards the issue. The problem is checkpoint size is
> not going down and keeps on increasing while savepoint size shows the
> correct behavior of going up and down with the throughput peaks.
> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>> I think it is normal for the data size of a savepoint to be smaller than
>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>> data, which is an LSM structured storage where the same key will have
>> multiple version records, while savepoint will traverse all keys and store
>> only one record per key.
>> But I noticed that you did not enable incremental checkpoint, which
>> resulted in each checkpoint saving full data. You can refer to [1] for more
>> detail and turn it on, which will reduce the data size of the checkpoint.
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>>> Subject: Checkpoint size smaller than Savepoint size
>>> To: <user@flink.apache.org>
>>> Hello,
>>> According to Flink's documentation, Checkpoints are designed to be
>>> lightweight. However, in my Flink pipeline, I have observed that the
>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>> behavior? What are the possible scenarios that can lead to this situation?
>>> Additionally, I have noticed that the checkpoint size in my datastream
>>> pipeline continues to grow while the savepoint size behaves as expected.
>>> Could this be attributed to the usage of Common Table Expressions (CTEs) in
>>> Flink SQL?
>>> Flink version: 1.16.1
>>> Incremental checkpointing is enabled.
>>> StateBackend: RocksDB
>>> Time Characteristic: Ingestion
>>> SQL:
>>>   *
>>> from
>>>   (
>>>     With Actuals as (
>>>       SELECT
>>>         clientOrderId,
>>>         Cast(
>>>           ValueFromKeyCacheUDF(
>>>             concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>           ) as INT
>>>         ) as zoneId,
>>>         cityId,
>>>         case
>>>           when status = 'ASSIGNED' then 1
>>>           else 0
>>>         end as acceptance_flag,
>>>         unicast.proctime
>>>       FROM
>>>         order
>>>         INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>>>         AND order.proctime BETWEEN unicast.proctime - interval '70'
>>> minute
>>>         AND unicast.proctime + interval '10' minute
>>>         and unicast.status in ('ASSIGNED', 'REJECTED')
>>>     ),
>>>     zone_agg as (
>>>       select
>>>         zoneId,
>>>         (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>>         avg(cityId) as cityId,
>>>         COUNT(*) as `unicast_count`,
>>>         proctime() as proctime
>>>       from
>>>         Actuals
>>>       group by
>>>         HOP(
>>>           proctime(),
>>>           interval '5' minute,
>>>           interval '30' minute
>>>         ),
>>>         zoneId
>>>     ),
>>>     city_agg as(
>>>       select
>>>         cityId,
>>>         sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>>>         proctime() as proctime
>>>       from
>>>         Actuals
>>>       group by
>>>         HOP(
>>>           proctime(),
>>>           interval '5' minute,
>>>           interval '30' minute
>>>         ),
>>>         cityId
>>>     ),
>>>     final as (
>>>       select
>>>         zone_agg.zoneId,
>>>         zone_agg.cityId,
>>>         avg(zone_agg.unicast_count) as unicast_count,
>>>         avg(zone_agg.zone_quotient) as zone_quotient,
>>>         avg(city_agg.city_quotient) as city_quotient
>>>       from
>>>         city_agg
>>>         INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>>>         AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
>>> minute
>>>         AND zone_agg.proctime
>>>       group by
>>>         HOP(
>>>           proctime(),
>>>           interval '5' minute,
>>>           interval '30' minute
>>>         ),
>>>         zone_agg.zoneId,
>>>         zone_agg.cityId
>>>     ),
>>>     new_final as (
>>>       select
>>>         'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key,
>>>         zone_quotient,
>>>         city_quotient,
>>>         case
>>>           when unicast_count > 5 then zone_quotient
>>>           else city_quotient
>>>         end as `value`
>>>       from
>>>         final
>>>     )
>>>     select
>>>       key,
>>>       case
>>>         when new_final.`value` > 1 then 1
>>>         else new_final.`value`
>>>       end as `value`,
>>>       zone_quotient,
>>>       city_quotient
>>>     from
>>>       new_final
>>>   )
