Hi Nimalan, Thank you for this tremendous effort of narrowing this down and creating the minimal example!
I used only Beam Java SDK so far, and experienced checkpoint size growing only in 2 cases: 1 - some resources leaking in user code in DoFn 2 - data overload not being handled well with connectors (rare) I usually used Java memory dumps to investigate both. While awaiting for replies from Python SDK users, you can try to run your OOM case with memory dump <https://www.baeldung.com/java-heap-dump-capture> to see what gets accumulated in Java heap inside your Flink app. Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Tue, 25 Jul 2023 at 00:54, Nimalan Mahendran <nima...@liminalinsights.com> wrote: > I wanted to update that I pared down my setup to reproduce this bug and > filed an issue: https://github.com/apache/beam/issues/27648. It includes > steps to reproduce the issue. I would be very interested to hear whether > others are experiencing the same issue, or if they're also running > unbounded SDFs on Flink and not seeing this issue. > > I'm also really hoping to get some traction on this issue as it is quite a > serious bug for our system that we have built on top of Beam/Flink ☹️ > > On Fri, Jul 21, 2023 at 10:28 AM Nimalan Mahendran < > nima...@liminalinsights.com> wrote: > >> An interesting update - I found PeriodicImpulse, which is a Python-based >> unbounded SDF. I created the following minimal pipeline: >> >> with beam.Pipeline(options=runner_options) as pipeline: >> traceable_measurements = pipeline | >> PeriodicImpulse(fire_interval=5) >> traceable_measurements | beam.Map(print) >> >> And I am still seeing growing checkpoint size in Flink. I'll keep running >> the pipeline to ensure that this trend continues, but this is pretty >> concerning: >> 2023-07-21 17:24:22,401 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >> Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', >> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job >> a5e3323d73491f3e6c409c79f160c555. >> 2023-07-21 17:24:22,487 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >> Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341 >> bytes, checkpointDuration=94 ms, finalizationTime=3 ms). >> 2023-07-21 17:25:22,389 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >> Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', >> sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job >> a5e3323d73491f3e6c409c79f160c555. >> 2023-07-21 17:25:22,431 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >> Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320 >> bytes, checkpointDuration=42 ms, finalizationTime=1 ms). >> >> Once again, any pointers would be appreciated. >> >> On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran < >> nima...@liminalinsights.com> wrote: >> >>> Hello again, >>> >>> I tried to follow a process of elimination and simplify my code as much >>> as possible, see >>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py. >>> This splittable DoFn is pared down to only doing the polling. It no longer >>> uses the Redis client or uses any libraries internal to us. I have also >>> included a simplified pipeline, see >>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py. >>> It now simply reads from the dummy Redis reader splittable DoFn and writes >>> using the dummer writer DoFn. I also did away with dataclasses and simply >>> represent restrictions as ints. >>> >>> I was surprised to see that I'm still experiencing checkpoint growth. I >>> have to say that I am at quite a loss. I also looked at an old mailing list >>> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html) >>> and tried windowing/triggering the PCollection coming out of the dummy >>> Redis reader splittable DoFn, but still saw unbounded checkpoint size >>> growth. >>> >>> I thought it might be worth flagging that My Redis reader splittable >>> DoFn works such that an element never finishes processing, i.e. the >>> unboundedness is at the element level. I went with this design to mirror >>> how Redis streams are structured, i.e. the message ids in Redis streams >>> start at 0-0, go up to ">" (which represents the part of the stream that >>> has not been consumed yet) and finish at "$", which is kind of like >>> Long.MAX_VALUE (i.e. you are never meant to get there). >>> >>> Another thing that may be worth flagging is that when I am polling an >>> empty stream, I am essentially returning the same consumed and residual >>> restriction on each poll. The input restriction to >>> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages >>> available on the stream. The output consumed partition is [>, >), i.e. no >>> messages were read. The output residual partition is [>, $) again, i.e. all >>> unread messages available on the stream. >>> >>> I've also been looking through the apache/beam github repo to try to get >>> a better sense of what might be going on. I noticed that the residual >>> restrictions I return are being added to >>> execution_context.delayed_applications, which is then returned via a >>> protobuf message in the residual_roots field, but I cannot tell where/how >>> execution_context.delayed_applications is actually getting cleared. Maybe >>> that's a red herring. I also tried to look at the KafkaIO connector (which >>> is in Java, of course) to see it would be informative about any issues with >>> how I am using the various Beam abstractions, but that led nowhere for me. >>> >>> It would be really helpful to hear from people on this mailing list >>> regarding: >>> >>> 1. Any potential causes for the checkpoint size growth. The only >>> possible cause I can think of so far is that perhaps the SDF is >>> implicitly >>> storing some state that is not getting cleared out. >>> 2. A complete working example of an unbounded SDF written using the >>> Python SDK that uses tracker.defer_remainder, so I can at least start >>> from >>> something that works (i.e. does not have unbounded checkpoint size >>> growth). >>> 3. Whether what I am flagging above might just be red herrings? >>> 4. Anything else that might help me make progress. >>> >>> Thanks again, >>> >>> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran < >>> nima...@liminalinsights.com> wrote: >>> >>>> Hello, >>>> >>>> I am running a pipeline built in the Python SDK that reads from a Redis >>>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the >>>> following environment: >>>> >>>> - Python 3.11 >>>> - Apache Beam 2.48.0 >>>> - Flink 1.16 >>>> - Checkpoint interval: 60s >>>> - state.backend (Flink): hashmap >>>> - state_backend (Beam): filesystem >>>> >>>> The issue that I am observing is that the checkpoint size keeps >>>> growing, even when there are no items to read on the Redis stream. Since >>>> there are no items to read on the Redis stream, the Redis stream SDF is >>>> simply doing the following steps repeatedly, as part of DoFn.process, i.e. >>>> the pattern described in the user-initiated checkpoint pattern in the >>>> Apache Beam programming guide >>>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint> >>>> to handle polling for new items with some delay, if the last poll returned >>>> no items: >>>> >>>> 1. Make the call to the Redis client to read items from the Redis >>>> stream >>>> 2. Receive no items from the Redis stream, and hence, >>>> 3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to >>>> defer execution for 5 seconds. That code is located here >>>> >>>> <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541> >>>> . >>>> 4. Go back to step 1. >>>> >>>> This checkpoint size growth happens regardless of whether I'm using >>>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows >>>> large enough to cause the task manager to crash, due to exhausting Java >>>> heap space. The rate of checkpoint size growth is proportional to the >>>> number of tracker.defer_remainder() calls I have done, i.e. increasing >>>> parallelism and/or decreasing the timeout used in tracker.defer_remainder >>>> will increase the rate of checkpoint growth. >>>> >>>> I took a look at the heap-based checkpoint files that I observed were >>>> getting larger with each checkpoint (just using the less command) and >>>> noticed that many copies of the residual restriction were present, which >>>> seemed like a red flag. The residual restriction here is the one that >>>> results from calling tracker.defer_remainder(), which results in a >>>> tracker.try_split(0.0). >>>> >>>> I've included the SDF code and jobmanager logs showing growing >>>> checkpoint size here: >>>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've >>>> included the restriction provider/tracker and other pieces for >>>> completeness, but the SDF is towards the bottom. >>>> >>>> Any help would be appreciated! 🙏🏾 >>>> >>>> Thanks, >>>> -- >>>> Nimalan Mahendran >>>> ML Engineer at Liminal Insights >>>> >>>