An interesting update - I found PeriodicImpulse, which is a Python-based unbounded SDF. I created the following minimal pipeline:
with beam.Pipeline(options=runner_options) as pipeline: traceable_measurements = pipeline | PeriodicImpulse(fire_interval=5) traceable_measurements | beam.Map(print) And I am still seeing growing checkpoint size in Flink. I'll keep running the pipeline to ensure that this trend continues, but this is pretty concerning: 2023-07-21 17:24:22,401 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job a5e3323d73491f3e6c409c79f160c555. 2023-07-21 17:24:22,487 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341 bytes, checkpointDuration=94 ms, finalizationTime=3 ms). 2023-07-21 17:25:22,389 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job a5e3323d73491f3e6c409c79f160c555. 2023-07-21 17:25:22,431 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320 bytes, checkpointDuration=42 ms, finalizationTime=1 ms). Once again, any pointers would be appreciated. On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran < nima...@liminalinsights.com> wrote: > Hello again, > > I tried to follow a process of elimination and simplify my code as much as > possible, see > https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py. > This splittable DoFn is pared down to only doing the polling. It no longer > uses the Redis client or uses any libraries internal to us. I have also > included a simplified pipeline, see > https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py. > It now simply reads from the dummy Redis reader splittable DoFn and writes > using the dummer writer DoFn. I also did away with dataclasses and simply > represent restrictions as ints. > > I was surprised to see that I'm still experiencing checkpoint growth. I > have to say that I am at quite a loss. I also looked at an old mailing list > thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html) > and tried windowing/triggering the PCollection coming out of the dummy > Redis reader splittable DoFn, but still saw unbounded checkpoint size > growth. > > I thought it might be worth flagging that My Redis reader splittable DoFn > works such that an element never finishes processing, i.e. the > unboundedness is at the element level. I went with this design to mirror > how Redis streams are structured, i.e. the message ids in Redis streams > start at 0-0, go up to ">" (which represents the part of the stream that > has not been consumed yet) and finish at "$", which is kind of like > Long.MAX_VALUE (i.e. you are never meant to get there). > > Another thing that may be worth flagging is that when I am polling an > empty stream, I am essentially returning the same consumed and residual > restriction on each poll. The input restriction to > try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages > available on the stream. The output consumed partition is [>, >), i.e. no > messages were read. The output residual partition is [>, $) again, i.e. all > unread messages available on the stream. > > I've also been looking through the apache/beam github repo to try to get a > better sense of what might be going on. I noticed that the residual > restrictions I return are being added to > execution_context.delayed_applications, which is then returned via a > protobuf message in the residual_roots field, but I cannot tell where/how > execution_context.delayed_applications is actually getting cleared. Maybe > that's a red herring. I also tried to look at the KafkaIO connector (which > is in Java, of course) to see it would be informative about any issues with > how I am using the various Beam abstractions, but that led nowhere for me. > > It would be really helpful to hear from people on this mailing list > regarding: > > 1. Any potential causes for the checkpoint size growth. The only > possible cause I can think of so far is that perhaps the SDF is implicitly > storing some state that is not getting cleared out. > 2. A complete working example of an unbounded SDF written using the > Python SDK that uses tracker.defer_remainder, so I can at least start from > something that works (i.e. does not have unbounded checkpoint size growth). > 3. Whether what I am flagging above might just be red herrings? > 4. Anything else that might help me make progress. > > Thanks again, > > On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran < > nima...@liminalinsights.com> wrote: > >> Hello, >> >> I am running a pipeline built in the Python SDK that reads from a Redis >> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the >> following environment: >> >> - Python 3.11 >> - Apache Beam 2.48.0 >> - Flink 1.16 >> - Checkpoint interval: 60s >> - state.backend (Flink): hashmap >> - state_backend (Beam): filesystem >> >> The issue that I am observing is that the checkpoint size keeps growing, >> even when there are no items to read on the Redis stream. Since there are >> no items to read on the Redis stream, the Redis stream SDF is simply doing >> the following steps repeatedly, as part of DoFn.process, i.e. the pattern >> described in the user-initiated checkpoint pattern in the Apache Beam >> programming guide >> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint> >> to handle polling for new items with some delay, if the last poll returned >> no items: >> >> 1. Make the call to the Redis client to read items from the Redis >> stream >> 2. Receive no items from the Redis stream, and hence, >> 3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to >> defer execution for 5 seconds. That code is located here >> >> <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541> >> . >> 4. Go back to step 1. >> >> This checkpoint size growth happens regardless of whether I'm using >> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows >> large enough to cause the task manager to crash, due to exhausting Java >> heap space. The rate of checkpoint size growth is proportional to the >> number of tracker.defer_remainder() calls I have done, i.e. increasing >> parallelism and/or decreasing the timeout used in tracker.defer_remainder >> will increase the rate of checkpoint growth. >> >> I took a look at the heap-based checkpoint files that I observed were >> getting larger with each checkpoint (just using the less command) and >> noticed that many copies of the residual restriction were present, which >> seemed like a red flag. The residual restriction here is the one that >> results from calling tracker.defer_remainder(), which results in a >> tracker.try_split(0.0). >> >> I've included the SDF code and jobmanager logs showing growing checkpoint >> size here: >> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've >> included the restriction provider/tracker and other pieces for >> completeness, but the SDF is towards the bottom. >> >> Any help would be appreciated! 🙏🏾 >> >> Thanks, >> -- >> Nimalan Mahendran >> ML Engineer at Liminal Insights >> >