Hello again, I tried to follow a process of elimination and simplify my code as much as possible, see https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py. This splittable DoFn is pared down to only doing the polling. It no longer uses the Redis client or uses any libraries internal to us. I have also included a simplified pipeline, see https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py. It now simply reads from the dummy Redis reader splittable DoFn and writes using the dummer writer DoFn. I also did away with dataclasses and simply represent restrictions as ints.
I was surprised to see that I'm still experiencing checkpoint growth. I have to say that I am at quite a loss. I also looked at an old mailing list thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html) and tried windowing/triggering the PCollection coming out of the dummy Redis reader splittable DoFn, but still saw unbounded checkpoint size growth. I thought it might be worth flagging that My Redis reader splittable DoFn works such that an element never finishes processing, i.e. the unboundedness is at the element level. I went with this design to mirror how Redis streams are structured, i.e. the message ids in Redis streams start at 0-0, go up to ">" (which represents the part of the stream that has not been consumed yet) and finish at "$", which is kind of like Long.MAX_VALUE (i.e. you are never meant to get there). Another thing that may be worth flagging is that when I am polling an empty stream, I am essentially returning the same consumed and residual restriction on each poll. The input restriction to try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages available on the stream. The output consumed partition is [>, >), i.e. no messages were read. The output residual partition is [>, $) again, i.e. all unread messages available on the stream. I've also been looking through the apache/beam github repo to try to get a better sense of what might be going on. I noticed that the residual restrictions I return are being added to execution_context.delayed_applications, which is then returned via a protobuf message in the residual_roots field, but I cannot tell where/how execution_context.delayed_applications is actually getting cleared. Maybe that's a red herring. I also tried to look at the KafkaIO connector (which is in Java, of course) to see it would be informative about any issues with how I am using the various Beam abstractions, but that led nowhere for me. It would be really helpful to hear from people on this mailing list regarding: 1. Any potential causes for the checkpoint size growth. The only possible cause I can think of so far is that perhaps the SDF is implicitly storing some state that is not getting cleared out. 2. A complete working example of an unbounded SDF written using the Python SDK that uses tracker.defer_remainder, so I can at least start from something that works (i.e. does not have unbounded checkpoint size growth). 3. Whether what I am flagging above might just be red herrings? 4. Anything else that might help me make progress. Thanks again, On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran < nima...@liminalinsights.com> wrote: > Hello, > > I am running a pipeline built in the Python SDK that reads from a Redis > stream <https://redis.io/docs/data-types/streams/> via an SDF, in the > following environment: > > - Python 3.11 > - Apache Beam 2.48.0 > - Flink 1.16 > - Checkpoint interval: 60s > - state.backend (Flink): hashmap > - state_backend (Beam): filesystem > > The issue that I am observing is that the checkpoint size keeps growing, > even when there are no items to read on the Redis stream. Since there are > no items to read on the Redis stream, the Redis stream SDF is simply doing > the following steps repeatedly, as part of DoFn.process, i.e. the pattern > described in the user-initiated checkpoint pattern in the Apache Beam > programming guide > <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint> > to handle polling for new items with some delay, if the last poll returned > no items: > > 1. Make the call to the Redis client to read items from the Redis > stream > 2. Receive no items from the Redis stream, and hence, > 3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to > defer execution for 5 seconds. That code is located here > > <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541> > . > 4. Go back to step 1. > > This checkpoint size growth happens regardless of whether I'm using > heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows > large enough to cause the task manager to crash, due to exhausting Java > heap space. The rate of checkpoint size growth is proportional to the > number of tracker.defer_remainder() calls I have done, i.e. increasing > parallelism and/or decreasing the timeout used in tracker.defer_remainder > will increase the rate of checkpoint growth. > > I took a look at the heap-based checkpoint files that I observed were > getting larger with each checkpoint (just using the less command) and > noticed that many copies of the residual restriction were present, which > seemed like a red flag. The residual restriction here is the one that > results from calling tracker.defer_remainder(), which results in a > tracker.try_split(0.0). > > I've included the SDF code and jobmanager logs showing growing checkpoint > size here: > https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've > included the restriction provider/tracker and other pieces for > completeness, but the SDF is towards the bottom. > > Any help would be appreciated! 🙏🏾 > > Thanks, > -- > Nimalan Mahendran > ML Engineer at Liminal Insights >