Hi Neha,

I noticed that the `Checkpointed Data Size` is always equals to `Full
Checkpoint Data Size`, I think the job is using full checkpoint instead of
incremental checkpoint,  you can check it

Best,
Shammon FY

On Mon, Jul 17, 2023 at 10:25 AM Neha . <neh...@swiggy.in> wrote:

> Hello Shammon,
>
> Thank you for your assistance.
> I have already enabled the incremental checkpointing, Attaching the
> screenshot. Can you please elaborate on what makes you think it is not
> enabled, It might hint towards the issue. The problem is checkpoint size is
> not going down and keeps on increasing while savepoint size shows the
> correct behavior of going up and down with the throughput peaks.
>
> [image: Screenshot 2023-07-17 at 7.49.19 AM.png]
>
>
> On Mon, Jul 17, 2023 at 6:28 AM Shammon FY <zjur...@gmail.com> wrote:
>
>> Hi Neha,
>>
>> I think it is normal for the data size of a savepoint to be smaller than
>> the full data of a checkpoint. Flink uses rocksdb to store checkpointed
>> data, which is an LSM structured storage where the same key will have
>> multiple version records, while savepoint will traverse all keys and store
>> only one record per key.
>>
>> But I noticed that you did not enable incremental checkpoint, which
>> resulted in each checkpoint saving full data. You can refer to [1] for more
>> detail and turn it on, which will reduce the data size of the checkpoint.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/#incremental-checkpoints
>> <https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/state_backends/*incremental-checkpoints__;Iw!!BeGeivfSdT4o5A!i6xqu0TfnOScUXZ2hWnwv1pOEjBPosucnmXfxDO3762tx0hIlwBc3e0V0ZpxUm4Q4VAPQdSXSY25U1wp$>
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Jul 16, 2023 at 2:30 PM Neha . <neh...@swiggy.in> wrote:
>>
>>> Hello  Shammon FY,
>>>
>>> It is a production issue for me. Can you please take a look if anything
>>> can be done?
>>>
>>> ---------- Forwarded message ---------
>>> From: Neha . <neh...@swiggy.in>
>>> Date: Fri, Jul 14, 2023 at 4:06 PM
>>> Subject: Checkpoint size smaller than Savepoint size
>>> To: <user@flink.apache.org>
>>>
>>>
>>> Hello,
>>>
>>> According to Flink's documentation, Checkpoints are designed to be
>>> lightweight. However, in my Flink pipeline, I have observed that the
>>> savepoint sizes are smaller than the checkpoints. Is this expected
>>> behavior? What are the possible scenarios that can lead to this situation?
>>>
>>> Additionally, I have noticed that the checkpoint size in my datastream
>>> pipeline continues to grow while the savepoint size behaves as expected.
>>> Could this be attributed to the usage of Common Table Expressions (CTEs) in
>>> Flink SQL?
>>>
>>> Flink version: 1.16.1
>>> Incremental checkpointing is enabled.
>>> StateBackend: RocksDB
>>> Time Characteristic: Ingestion
>>>
>>> SQL:
>>>
>>> SELECT
>>>   *
>>> from
>>>   (
>>>     With Actuals as (
>>>       SELECT
>>>         clientOrderId,
>>>         Cast(
>>>           ValueFromKeyCacheUDF(
>>>             concat('R_', CAST(order_df.restaurant_id AS VARCHAR))
>>>           ) as INT
>>>         ) as zoneId,
>>>         cityId,
>>>         case
>>>           when status = 'ASSIGNED' then 1
>>>           else 0
>>>         end as acceptance_flag,
>>>         unicast.proctime
>>>       FROM
>>>         order
>>>         INNER JOIN unicast_df ON unicast.clientOrderId = order.order_id
>>>         AND order.proctime BETWEEN unicast.proctime - interval '70'
>>> minute
>>>         AND unicast.proctime + interval '10' minute
>>>         and unicast.status in ('ASSIGNED', 'REJECTED')
>>>     ),
>>>     zone_agg as (
>>>       select
>>>         zoneId,
>>>         (sum(acceptance_flag) * 1.0) / count(*) as `zone_quotient`,
>>>         avg(cityId) as cityId,
>>>         COUNT(*) as `unicast_count`,
>>>         proctime() as proctime
>>>       from
>>>         Actuals
>>>       group by
>>>         HOP(
>>>           proctime(),
>>>           interval '5' minute,
>>>           interval '30' minute
>>>         ),
>>>         zoneId
>>>     ),
>>>     city_agg as(
>>>       select
>>>         cityId,
>>>         sum(acceptance_flag) * 1.0 / count(*) as `city_quotient`,
>>>         proctime() as proctime
>>>       from
>>>         Actuals
>>>       group by
>>>         HOP(
>>>           proctime(),
>>>           interval '5' minute,
>>>           interval '30' minute
>>>         ),
>>>         cityId
>>>     ),
>>>     final as (
>>>       select
>>>         zone_agg.zoneId,
>>>         zone_agg.cityId,
>>>         avg(zone_agg.unicast_count) as unicast_count,
>>>         avg(zone_agg.zone_quotient) as zone_quotient,
>>>         avg(city_agg.city_quotient) as city_quotient
>>>       from
>>>         city_agg
>>>         INNER join zone_agg on zone_agg.cityId = city_agg.cityId
>>>         AND city_agg.proctime BETWEEN zone_agg.proctime - interval '60'
>>> minute
>>>         AND zone_agg.proctime
>>>       group by
>>>         HOP(
>>>           proctime(),
>>>           interval '5' minute,
>>>           interval '30' minute
>>>         ),
>>>         zone_agg.zoneId,
>>>         zone_agg.cityId
>>>     ),
>>>     new_final as (
>>>       select
>>>         'zoneid_de_acceptance_rate#' || cast(zoneId as varchar) as key,
>>>         zone_quotient,
>>>         city_quotient,
>>>         case
>>>           when unicast_count > 5 then zone_quotient
>>>           else city_quotient
>>>         end as `value`
>>>       from
>>>         final
>>>     )
>>>     select
>>>       key,
>>>       case
>>>         when new_final.`value` > 1 then 1
>>>         else new_final.`value`
>>>       end as `value`,
>>>       zone_quotient,
>>>       city_quotient
>>>     from
>>>       new_final
>>>   )
>>>
>>>
>>>
>>>
>>> ------------------------------
>>> IMPORTANT NOTICE:  The contents of this email and any attachments are
>>> confidential in nature and intended solely for the addressee, and are
>>> subject to the terms and conditions of disclosure as further described
>>> here: https://www.scd.swiggy.in/nda. If you are not the intended
>>> recipient or you do not agree to the terms and conditions of disclosure,
>>> please delete this email immediately, and notify the sender by return
>>> email. In the event that you continue to access the information herein or
>>> act upon it in any manner, the terms and conditions shall be deemed
>>> accepted by you.
>>
>>
>
> ------------------------------
> IMPORTANT NOTICE:  The contents of this email and any attachments are
> confidential in nature and intended solely for the addressee, and are
> subject to the terms and conditions of disclosure as further described
> here: https://www.scd.swiggy.in/nda. If you are not the intended
> recipient or you do not agree to the terms and conditions of disclosure,
> please delete this email immediately, and notify the sender by return
> email. In the event that you continue to access the information herein or
> act upon it in any manner, the terms and conditions shall be deemed
> accepted by you.

Reply via email to