An interesting update - I found PeriodicImpulse, which is a Python-based
unbounded SDF. I created the following minimal pipeline:

    with beam.Pipeline(options=runner_options) as pipeline:
        traceable_measurements = pipeline | PeriodicImpulse(fire_interval=5)
        traceable_measurements | beam.Map(print)

And I am still seeing growing checkpoint size in Flink. I'll keep running
the pipeline to ensure that this trend continues, but this is pretty
concerning:
2023-07-21 17:24:22,401 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 1 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960262390 for job
a5e3323d73491f3e6c409c79f160c555.
2023-07-21 17:24:22,487 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Completed checkpoint 1 for job a5e3323d73491f3e6c409c79f160c555 (500341
bytes, checkpointDuration=94 ms, finalizationTime=3 ms).
2023-07-21 17:25:22,389 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Triggering checkpoint 2 (type=CheckpointType{name='Checkpoint',
sharingFilesStrategy=FORWARD_BACKWARD}) @ 1689960322388 for job
a5e3323d73491f3e6c409c79f160c555.
2023-07-21 17:25:22,431 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
Completed checkpoint 2 for job a5e3323d73491f3e6c409c79f160c555 (895320
bytes, checkpointDuration=42 ms, finalizationTime=1 ms).

Once again, any pointers would be appreciated.

On Fri, Jul 21, 2023 at 9:04 AM Nimalan Mahendran <
nima...@liminalinsights.com> wrote:

> Hello again,
>
> I tried to follow a process of elimination and simplify my code as much as
> possible, see
> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_beam-py.
> This splittable DoFn is pared down to only doing the polling. It no longer
> uses the Redis client or uses any libraries internal to us. I have also
> included a simplified pipeline, see
> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-simplified_pipeline-py.
> It now simply reads from the dummy Redis reader splittable DoFn and writes
> using the dummer writer DoFn. I also did away with dataclasses and simply
> represent restrictions as ints.
>
> I was surprised to see that I'm still experiencing checkpoint growth. I
> have to say that I am at quite a loss. I also looked at an old mailing list
> thread (https://www.mail-archive.com/user@beam.apache.org/msg05513.html)
> and tried windowing/triggering the PCollection coming out of the dummy
> Redis reader splittable DoFn, but still saw unbounded checkpoint size
> growth.
>
> I thought it might be worth flagging that My Redis reader splittable DoFn
> works such that an element never finishes processing, i.e. the
> unboundedness is at the element level. I went with this design to mirror
> how Redis streams are structured, i.e. the message ids in Redis streams
> start at 0-0, go up to ">" (which represents the part of the stream that
> has not been consumed yet) and finish at "$", which is kind of like
> Long.MAX_VALUE (i.e. you are never meant to get there).
>
> Another thing that may be worth flagging is that when I am polling an
> empty stream, I am essentially returning the same consumed and residual
> restriction on each poll. The input restriction to
> try_split(fraction_of_remainder=0) is [>, $), i.e. all unread messages
> available on the stream. The output consumed partition is [>, >), i.e. no
> messages were read. The output residual partition is [>, $) again, i.e. all
> unread messages available on the stream.
>
> I've also been looking through the apache/beam github repo to try to get a
> better sense of what might be going on. I noticed that the residual
> restrictions I return are being added to
> execution_context.delayed_applications, which is then returned via a
> protobuf message in the residual_roots field, but I cannot tell where/how
> execution_context.delayed_applications is actually getting cleared. Maybe
> that's a red herring. I also tried to look at the KafkaIO connector (which
> is in Java, of course) to see it would be informative about any issues with
> how I am using the various Beam abstractions, but that led nowhere for me.
>
> It would be really helpful to hear from people on this mailing list
> regarding:
>
>    1. Any potential causes for the checkpoint size growth. The only
>    possible cause I can think of so far is that perhaps the SDF is implicitly
>    storing some state that is not getting cleared out.
>    2. A complete working example of an unbounded SDF written using the
>    Python SDK that uses tracker.defer_remainder, so I can at least start from
>    something that works (i.e. does not have unbounded checkpoint size growth).
>    3. Whether what I am flagging above might just be red herrings?
>    4. Anything else that might help me make progress.
>
> Thanks again,
>
> On Tue, Jul 18, 2023 at 5:43 PM Nimalan Mahendran <
> nima...@liminalinsights.com> wrote:
>
>> Hello,
>>
>> I am running a pipeline built in the Python SDK that reads from a Redis
>> stream <https://redis.io/docs/data-types/streams/> via an SDF, in the
>> following environment:
>>
>>    - Python 3.11
>>    - Apache Beam 2.48.0
>>    - Flink 1.16
>>    - Checkpoint interval: 60s
>>    - state.backend (Flink): hashmap
>>    - state_backend (Beam): filesystem
>>
>> The issue that I am observing is that the checkpoint size keeps growing,
>> even when there are no items to read on the Redis stream. Since there are
>> no items to read on the Redis stream, the Redis stream SDF is simply doing
>> the following steps repeatedly, as part of DoFn.process, i.e. the pattern
>> described in the user-initiated checkpoint pattern in the Apache Beam
>> programming guide
>> <https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint>
>> to handle polling for new items with some delay, if the last poll returned
>> no items:
>>
>>    1. Make the call to the Redis client to read items from the Redis
>>    stream
>>    2. Receive no items from the Redis stream, and hence,
>>    3. Call tracker.defer_remainder(Duration.of(5)) and return-ing to
>>    defer execution for 5 seconds. That code is located here
>>    
>> <https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e#file-beam-py-L534-L541>
>>    .
>>    4. Go back to step 1.
>>
>> This checkpoint size growth happens regardless of whether I'm using
>> heap-based or RocksDB-based checkpoints. Eventually, the checkpoint grows
>> large enough to cause the task manager to crash, due to exhausting Java
>> heap space. The rate of checkpoint size growth is proportional to the
>> number of tracker.defer_remainder() calls I have done, i.e. increasing
>> parallelism and/or decreasing the timeout used in tracker.defer_remainder
>> will increase the rate of checkpoint growth.
>>
>> I took a look at the heap-based checkpoint files that I observed were
>> getting larger with each checkpoint (just using the less command) and
>> noticed that many copies of the residual restriction were present, which
>> seemed like a red flag. The residual restriction here is the one that
>> results from calling tracker.defer_remainder(), which results in a
>> tracker.try_split(0.0).
>>
>> I've included the SDF code and jobmanager logs showing growing checkpoint
>> size here:
>> https://gist.github.com/nybbles/1feafda7321f7c6a9e16c1455ebf0f3e. I've
>> included the restriction provider/tracker and other pieces for
>> completeness, but the SDF is towards the bottom.
>>
>> Any help would be appreciated! 🙏🏾
>>
>> Thanks,
>> --
>> Nimalan Mahendran
>> ML Engineer at Liminal Insights
>>
>

Reply via email to