Hello again,

I tried to follow a process of elimination and simplify my code as much as
possible, see
https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
This splittable DoFn is pared down to only doing the polling. It no longer
uses the Redis client or uses any libraries internal to us. I have also
included a simplified pipeline, see
https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
It now simply reads from the dummy Redis reader splittable DoFn and writes
using the dummer writer DoFn. I also did away with dataclasses and simply
represent restrictions as ints.

I was surprised to see that I'm still experiencing checkpoint growth. I
have to say that I am at quite a loss. I also looked at an old mailing list
thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
and tried windowing/triggering the PCollection coming out of the dummy
Redis reader splittable DoFn, but still saw unbounded checkpoint size
growth.

I thought it might be worth flagging that My Redis reader splittable DoFn
works such that an element never finishes processing, i.e. the
unboundedness is at the element level. I went with this design to mirror
how Redis streams are structured, i.e. the message ids in Redis streams
start at 0-0, go up to ">" (which represents the part of the stream that
has not been consumed yet) and finish at "$", which is kind of like
Long.MAX_VALUE (i.e. you are never meant to get there).

Another thing that may be worth flagging is that when I am polling an empty
stream, I am essentially returning the same consumed and residual
restriction on each poll. The input restriction to
try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
available on the stream. The output consumed partition is [>, >), i.e. no
messages were read. The output residual partition is [>, $) again, i.e. all
unread messages available on the stream.

I've also been looking through the apache/beam github repo to try to get a
better sense of what might be going on. I noticed that the residual
restrictions I return are being added to
execution_context.delayed_applications, which is then returned via a
protobuf message in the residual_roots field, but I cannot tell where/how
execution_context.delayed_applications is actually getting cleared. Maybe
that's a red herring. I also tried to look at the KafkaIO connector (which
is in Java, of course) to see it would be informative about any issues with
how I am using the various Beam abstractions, but that led nowhere for me.

It would be really helpful to hear from people on this mailing list
regarding:

   1. Any potential causes for the checkpoint size growth. The only
   possible cause I can think of so far is that perhaps the SDF is implicitly
   storing some state that is not getting cleared out.
   2. A complete working example of an unbounded SDF written using the
   Python SDK that uses tracker.defer_remainder, so I can at least start from
   something that works (i.e. does not have unbounded checkpoint size growth).
   3. Whether what I am flagging above might just be red herrings?
   4. Anything else that might help me make progress.

Thanks again,

On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
nima...@liminalinsights.com> wrote:

> Hello,
>
> I am running a pipeline built in the Python SDK that reads from a Redis
> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
> following environment:
>
>    - Python 3.11
>    - Apache Beam 2.48.0
>    - Flink 1.16
>    - Checkpoint interval: 60s
>    - state.backend (Flink): hashmap
>    - state_backend (Beam): filesystem
>
> The issue that I am observing is that the checkpoint size keeps growing,
> even when there are no items to read on the Redis stream. Since there are
> no items to read on the Redis stream, the Redis stream SDF is simply doing
> the following steps repeatedly, as part of DoFn.process, i.e. the pattern
> described in the user-initiated checkpoint pattern in the Apache Beam
> programming guide
> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
> to handle polling for new items with some delay, if the last poll returned
> no items:
>
>    1. Make the call to the Redis client to read items from the Redis
>    stream
>    2. Receive no items from the Redis stream, and hence,
>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>    defer execution for 5 seconds. That code is located here
>    
> <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>    .
>    4. Go back to step 1.
>
> This checkpoint size growth happens regardless of whether I'm using
> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
> large enough to cause the task manager to crash, due to exhausting Java
> heap space. The rate of checkpoint size growth is proportional to the
> number of tracker.defer_remainder() calls I have done, i.e. increasing
> parallelism and/or decreasing the timeout used in tracker.defer_remainder
> will increase the rate of checkpoint growth.
>
> I took a look at the heap-based checkpoint files that I observed were
> getting larger with each checkpoint (just using the less command) and
> noticed that many copies of the residual restriction were present, which
> seemed like a red flag. The residual restriction here is the one that
> results from calling tracker.defer_remainder(), which results in a
> tracker.try_split(0.0).
>
> I've included the SDF code and jobmanager logs showing growing checkpoint
> size here:
> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
> included the restriction provider/tracker and other pieces for
> completeness, but the SDF is towards the bottom.
>
> Any help would be appreciated! 🙏🏾
>
> Thanks,
> --
> Nimalan Mahendran
> ML Engineer at Liminal Insights
>

Reply via email to