Hi Nimalan,

Thank you for this tremendous effort of narrowing this down and creating
the minimal example!

I used only Beam Java SDK so far, and experienced checkpoint size growing
only in 2 cases:
1 - some resources leaking in user code in DoFn
2 - data overload not being handled well with connectors (rare)

I usually used Java memory dumps to investigate both. While awaiting for
replies from Python SDK users, you can try to run your OOM case with memory
dump <https://www.baeldung.com/java-heap-dump-capture> to see what gets
accumulated in Java heap inside your Flink app.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Tue, 25 Jul 2023 at 00:54, Nimalan Mahendran <nima...@liminalinsights.com>
wrote:

> I wanted to update that I pared down my setup to reproduce this bug and
> filed an issue: https://github.com/apache/beam/issues/27648. It includes
> steps to reproduce the issue. I would be very interested to hear whether
> others are experiencing the same issue, or if they're also running
> unbounded SDFs on Flink and not seeing this issue.
>
> I'm also really hoping to get some traction on this issue as it is quite a
> serious bug for our system that we have built on top of Beam/Flink ☹️
>
> On Fri, Jul 21, 2023 at 10:28 AM Nimalan Mahendran <
> nima...@liminalinsights.com> wrote:
>
>> An interesting update - I found PeriodicImpulse, which is a Python-based
>> unbounded SDF. I created the following minimal pipeline:
>>
>>     with beam.Pipeline(options=runner_options) as pipeline:
>>         traceable_measurements = pipeline |
>> PeriodicImpulse(fire_interval=5)
>>         traceable_measurements | beam.Map(print)
>>
>> And I am still seeing growing checkpoint size in Flink. I'll keep running
>> the pipeline to ensure that this trend continues, but this is pretty
>> concerning:
>> 2023-07-21 17:24:22,401 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job
>> a5e3323d73491f3e6c409c79f160c555.
>> 2023-07-21 17:24:22,487 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341
>> bytes, checkpointDuration=94 ms, finalizationTime=3 ms).
>> 2023-07-21 17:25:22,389 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint',
>> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job
>> a5e3323d73491f3e6c409c79f160c555.
>> 2023-07-21 17:25:22,431 INFO
>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
>> Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320
>> bytes, checkpointDuration=42 ms, finalizationTime=1 ms).
>>
>> Once again, any pointers would be appreciated.
>>
>> On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran <
>> nima...@liminalinsights.com> wrote:
>>
>>> Hello again,
>>>
>>> I tried to follow a process of elimination and simplify my code as much
>>> as possible, see
>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
>>> This splittable DoFn is pared down to only doing the polling. It no longer
>>> uses the Redis client or uses any libraries internal to us. I have also
>>> included a simplified pipeline, see
>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
>>> It now simply reads from the dummy Redis reader splittable DoFn and writes
>>> using the dummer writer DoFn. I also did away with dataclasses and simply
>>> represent restrictions as ints.
>>>
>>> I was surprised to see that I'm still experiencing checkpoint growth. I
>>> have to say that I am at quite a loss. I also looked at an old mailing list
>>> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
>>> and tried windowing/triggering the PCollection coming out of the dummy
>>> Redis reader splittable DoFn, but still saw unbounded checkpoint size
>>> growth.
>>>
>>> I thought it might be worth flagging that My Redis reader splittable
>>> DoFn works such that an element never finishes processing, i.e. the
>>> unboundedness is at the element level. I went with this design to mirror
>>> how Redis streams are structured, i.e. the message ids in Redis streams
>>> start at 0-0, go up to ">" (which represents the part of the stream that
>>> has not been consumed yet) and finish at "$", which is kind of like
>>> Long.MAX_VALUE (i.e. you are never meant to get there).
>>>
>>> Another thing that may be worth flagging is that when I am polling an
>>> empty stream, I am essentially returning the same consumed and residual
>>> restriction on each poll. The input restriction to
>>> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
>>> available on the stream. The output consumed partition is [>, >), i.e. no
>>> messages were read. The output residual partition is [>, $) again, i.e. all
>>> unread messages available on the stream.
>>>
>>> I've also been looking through the apache/beam github repo to try to get
>>> a better sense of what might be going on. I noticed that the residual
>>> restrictions I return are being added to
>>> execution_context.delayed_applications, which is then returned via a
>>> protobuf message in the residual_roots field, but I cannot tell where/how
>>> execution_context.delayed_applications is actually getting cleared. Maybe
>>> that's a red herring. I also tried to look at the KafkaIO connector (which
>>> is in Java, of course) to see it would be informative about any issues with
>>> how I am using the various Beam abstractions, but that led nowhere for me.
>>>
>>> It would be really helpful to hear from people on this mailing list
>>> regarding:
>>>
>>>    1. Any potential causes for the checkpoint size growth. The only
>>>    possible cause I can think of so far is that perhaps the SDF is 
>>> implicitly
>>>    storing some state that is not getting cleared out.
>>>    2. A complete working example of an unbounded SDF written using the
>>>    Python SDK that uses tracker.defer_remainder, so I can at least start 
>>> from
>>>    something that works (i.e. does not have unbounded checkpoint size 
>>> growth).
>>>    3. Whether what I am flagging above might just be red herrings?
>>>    4. Anything else that might help me make progress.
>>>
>>> Thanks again,
>>>
>>> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
>>> nima...@liminalinsights.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I am running a pipeline built in the Python SDK that reads from a Redis
>>>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
>>>> following environment:
>>>>
>>>>    - Python 3.11
>>>>    - Apache Beam 2.48.0
>>>>    - Flink 1.16
>>>>    - Checkpoint interval: 60s
>>>>    - state.backend (Flink): hashmap
>>>>    - state_backend (Beam): filesystem
>>>>
>>>> The issue that I am observing is that the checkpoint size keeps
>>>> growing, even when there are no items to read on the Redis stream. Since
>>>> there are no items to read on the Redis stream, the Redis stream SDF is
>>>> simply doing the following steps repeatedly, as part of DoFn.process, i.e.
>>>> the pattern described in the user-initiated checkpoint pattern in the
>>>> Apache Beam programming guide
>>>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
>>>> to handle polling for new items with some delay, if the last poll returned
>>>> no items:
>>>>
>>>>    1. Make the call to the Redis client to read items from the Redis
>>>>    stream
>>>>    2. Receive no items from the Redis stream, and hence,
>>>>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>>>>    defer execution for 5 seconds. That code is located here
>>>>    
>>>> <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>>>>    .
>>>>    4. Go back to step 1.
>>>>
>>>> This checkpoint size growth happens regardless of whether I'm using
>>>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
>>>> large enough to cause the task manager to crash, due to exhausting Java
>>>> heap space. The rate of checkpoint size growth is proportional to the
>>>> number of tracker.defer_remainder() calls I have done, i.e. increasing
>>>> parallelism and/or decreasing the timeout used in tracker.defer_remainder
>>>> will increase the rate of checkpoint growth.
>>>>
>>>> I took a look at the heap-based checkpoint files that I observed were
>>>> getting larger with each checkpoint (just using the less command) and
>>>> noticed that many copies of the residual restriction were present, which
>>>> seemed like a red flag. The residual restriction here is the one that
>>>> results from calling tracker.defer_remainder(), which results in a
>>>> tracker.try_split(0.0).
>>>>
>>>> I've included the SDF code and jobmanager logs showing growing
>>>> checkpoint size here:
>>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
>>>> included the restriction provider/tracker and other pieces for
>>>> completeness, but the SDF is towards the bottom.
>>>>
>>>> Any help would be appreciated! 🙏🏾
>>>>
>>>> Thanks,
>>>> --
>>>> Nimalan Mahendran
>>>> ML Engineer at Liminal Insights
>>>>
>>>

Reply via email to