I wanted to update that I pared down my setup to reproduce this bug and
filed an issue: https://github.com/apache/beam/issues/27648. It includes
steps to reproduce the issue. I would be very interested to hear whether
others are experiencing the same issue, or if they're also running
unbounded SDFs on Flink and not seeing this issue.

I'm also really hoping to get some traction on this issue as it is quite a
serious bug for our system that we have built on top of Beam/Flink ☹️

On Fri, Jul 21, 2023 at 10:28 AM Nimalan Mahendran <
nima...@liminalinsights.com> wrote:

> An interesting update - I found PeriodicImpulse, which is a Python-based
> unbounded SDF. I created the following minimal pipeline:
>
>     with beam.Pipeline(options=runner_options) as pipeline:
>         traceable_measurements = pipeline |
> PeriodicImpulse(fire_interval=5)
>         traceable_measurements | beam.Map(print)
>
> And I am still seeing growing checkpoint size in Flink. I'll keep running
> the pipeline to ensure that this trend continues, but this is pretty
> concerning:
> 2023-07-21 17:24:22,401 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job
> a5e3323d73491f3e6c409c79f160c555.
> 2023-07-21 17:24:22,487 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341
> bytes, checkpointDuration=94 ms, finalizationTime=3 ms).
> 2023-07-21 17:25:22,389 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint',
> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job
> a5e3323d73491f3e6c409c79f160c555.
> 2023-07-21 17:25:22,431 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320
> bytes, checkpointDuration=42 ms, finalizationTime=1 ms).
>
> Once again, any pointers would be appreciated.
>
> On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran <
> nima...@liminalinsights.com> wrote:
>
>> Hello again,
>>
>> I tried to follow a process of elimination and simplify my code as much
>> as possible, see
>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
>> This splittable DoFn is pared down to only doing the polling. It no longer
>> uses the Redis client or uses any libraries internal to us. I have also
>> included a simplified pipeline, see
>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
>> It now simply reads from the dummy Redis reader splittable DoFn and writes
>> using the dummer writer DoFn. I also did away with dataclasses and simply
>> represent restrictions as ints.
>>
>> I was surprised to see that I'm still experiencing checkpoint growth. I
>> have to say that I am at quite a loss. I also looked at an old mailing list
>> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
>> and tried windowing/triggering the PCollection coming out of the dummy
>> Redis reader splittable DoFn, but still saw unbounded checkpoint size
>> growth.
>>
>> I thought it might be worth flagging that My Redis reader splittable DoFn
>> works such that an element never finishes processing, i.e. the
>> unboundedness is at the element level. I went with this design to mirror
>> how Redis streams are structured, i.e. the message ids in Redis streams
>> start at 0-0, go up to ">" (which represents the part of the stream that
>> has not been consumed yet) and finish at "$", which is kind of like
>> Long.MAX_VALUE (i.e. you are never meant to get there).
>>
>> Another thing that may be worth flagging is that when I am polling an
>> empty stream, I am essentially returning the same consumed and residual
>> restriction on each poll. The input restriction to
>> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
>> available on the stream. The output consumed partition is [>, >), i.e. no
>> messages were read. The output residual partition is [>, $) again, i.e. all
>> unread messages available on the stream.
>>
>> I've also been looking through the apache/beam github repo to try to get
>> a better sense of what might be going on. I noticed that the residual
>> restrictions I return are being added to
>> execution_context.delayed_applications, which is then returned via a
>> protobuf message in the residual_roots field, but I cannot tell where/how
>> execution_context.delayed_applications is actually getting cleared. Maybe
>> that's a red herring. I also tried to look at the KafkaIO connector (which
>> is in Java, of course) to see it would be informative about any issues with
>> how I am using the various Beam abstractions, but that led nowhere for me.
>>
>> It would be really helpful to hear from people on this mailing list
>> regarding:
>>
>>    1. Any potential causes for the checkpoint size growth. The only
>>    possible cause I can think of so far is that perhaps the SDF is implicitly
>>    storing some state that is not getting cleared out.
>>    2. A complete working example of an unbounded SDF written using the
>>    Python SDK that uses tracker.defer_remainder, so I can at least start from
>>    something that works (i.e. does not have unbounded checkpoint size 
>> growth).
>>    3. Whether what I am flagging above might just be red herrings?
>>    4. Anything else that might help me make progress.
>>
>> Thanks again,
>>
>> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
>> nima...@liminalinsights.com> wrote:
>>
>>> Hello,
>>>
>>> I am running a pipeline built in the Python SDK that reads from a Redis
>>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
>>> following environment:
>>>
>>>    - Python 3.11
>>>    - Apache Beam 2.48.0
>>>    - Flink 1.16
>>>    - Checkpoint interval: 60s
>>>    - state.backend (Flink): hashmap
>>>    - state_backend (Beam): filesystem
>>>
>>> The issue that I am observing is that the checkpoint size keeps growing,
>>> even when there are no items to read on the Redis stream. Since there are
>>> no items to read on the Redis stream, the Redis stream SDF is simply doing
>>> the following steps repeatedly, as part of DoFn.process, i.e. the pattern
>>> described in the user-initiated checkpoint pattern in the Apache Beam
>>> programming guide
>>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
>>> to handle polling for new items with some delay, if the last poll returned
>>> no items:
>>>
>>>    1. Make the call to the Redis client to read items from the Redis
>>>    stream
>>>    2. Receive no items from the Redis stream, and hence,
>>>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>>>    defer execution for 5 seconds. That code is located here
>>>    
>>> <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>>>    .
>>>    4. Go back to step 1.
>>>
>>> This checkpoint size growth happens regardless of whether I'm using
>>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
>>> large enough to cause the task manager to crash, due to exhausting Java
>>> heap space. The rate of checkpoint size growth is proportional to the
>>> number of tracker.defer_remainder() calls I have done, i.e. increasing
>>> parallelism and/or decreasing the timeout used in tracker.defer_remainder
>>> will increase the rate of checkpoint growth.
>>>
>>> I took a look at the heap-based checkpoint files that I observed were
>>> getting larger with each checkpoint (just using the less command) and
>>> noticed that many copies of the residual restriction were present, which
>>> seemed like a red flag. The residual restriction here is the one that
>>> results from calling tracker.defer_remainder(), which results in a
>>> tracker.try_split(0.0).
>>>
>>> I've included the SDF code and jobmanager logs showing growing
>>> checkpoint size here:
>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
>>> included the restriction provider/tracker and other pieces for
>>> completeness, but the SDF is towards the bottom.
>>>
>>> Any help would be appreciated! 🙏🏾
>>>
>>> Thanks,
>>> --
>>> Nimalan Mahendran
>>> ML Engineer at Liminal Insights
>>>
>>

Reply via email to