Hi Mason,

Those checkpoint timeouts (30 minutes) have you already observed with the
alignment timeout set to 0ms? Or as you were previously running it with 1s
alignment timeout?

If the latter, it might be because unaligned checkpoints are failing to
kick in in the first place. Setting the timeout to 0ms should solve the
problem.

If the former, have you checked why the checkpoints are timeouting? What
part of the checkpointing process is taking a long time? For example can
you post a screenshot from the WebUI of checkpoint stats for each task? The
only explanation I could think of is this sleep time that you added. 25ms
per record is really a lot. I mean really a lot. 30 minutes / 25 ms/record
= 72 000 records. One of the unaligned checkpoints limitations is that
Flink can not snapshot a state of an operator in the middle of processing a
record. In your particular case, Flink will not be able to snapshot the
state of the session window operator in the middle of the windows being
fired. If your window operator is firing a lot of windows at the same time,
or a single window is producing 72k of records (which would be an
unusual but not unimaginable amount), this could block checkpointing of the
window operator for 30 minutes due to this 25ms sleep down the stream.

Piotrek

pt., 17 gru 2021 o 19:19 Mason Chen <mason.c...@apple.com> napisał(a):

> Hi Piotr,
>
> Thanks for the link to the JIRA ticket, we actually don’t see much state
> size overhead between checkpoints in aligned vs unaligned, so we will go
> with your recommendation of using unaligned checkpoints with 0s alignment
> timeout.
>
> For context, we are testing unaligned checkpoints with our application
> with these tasks: [kafka source, map, filter] -> keyby -> [session window]
> -> [various kafka sinks]. The first task has parallelism 40 and the rest of
> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>
> We added an artificial sleep (25 ms per invocation of in process function)
> the session window task to simulate backpressure; however, we still see
> checkpoints failing due to task acknowledgement doesn’t complete within our
> checkpoint timeout (30 minutes).
>
> I am able to correlate that the input buffers from *window* and output
> buffers from *source* being 100% usage corresponds to the checkpoint
> failures. When they are not full (input can drop to as low as 60% usage and
> output can drop to as low as 55% usage), the checkpoints succeed within
> less than 2 ms. In all cases, it is the session window task or source task
> failing to 100% acknowledge the barriers within timeout. I do see the
> *source* task acknowledgement taking long in some of the failures (e.g.
> 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and
> not busy at this time.
>
> All other input buffers are low usage (mostly 0). For output buffer, the
> usage is around 50% for window--everything else is near 0% all the time
> except the source mentioned before (makes sense since rest are just sinks).
>
> We are also running a parallel Flink job with the same configurations,
> except with unaligned checkpoints disabled. Here we see observe the same
> behavior except now some of the checkpoints are failing due to the source
> task not acknowledging everything within timeout—however, most failures are
> still due to session window acknowledgement.
>
> All the data seems to points an issue with the source? Now, I don’t know
> how to explain this behavior since unaligned checkpoints should overtake
> records in the buffers (once seen at the input buffer, forward immediately
> downstream to output buffer).
>
> Just to confirm, this is our checkpoint configuration:
> ```
> Option
> Value
> Checkpointing Mode Exactly Once
> Checkpoint Storage FileSystemCheckpointStorage
> State Backend EmbeddedRocksDBStateBackend
> Interval 5m 0s
> Timeout 30m 0s
> Minimum Pause Between Checkpoints 2m 0s
> Maximum Concurrent Checkpoints 1
> Unaligned Checkpoints Enabled
> Persist Checkpoints Externally Enabled (retain on cancellation)
> Tolerable Failed Checkpoints 10
> ```
>
> Are there other metrics should I look at—why else should tasks fail
> acknowledgement in unaligned mode? Is it something about the implementation
> details of window function that I am not considering? My main hunch is
> something to do with the source.
>
> Best,
> Mason
>
> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org> wrote:
>
> Hi Mason,
>
> In Flink 1.14 we have also changed the timeout behavior from checking
> against the alignment duration, to simply checking how old is the
> checkpoint barrier (so it would also account for the start delay) [1]. It
> was done in order to solve problems as you are describing. Unfortunately we
> can not backport this change to 1.13.x as it's a breaking change.
>
> Anyway, from our experience I would recommend going all in with the
> unaligned checkpoints, so setting the timeout back to the default value of
> 0ms. With timeouts you are gaining very little (a tiny bit smaller state
> size if there is no backpressure - tiny bit because without backpressure,
> even with timeout set to 0ms, the amount of captured inflight data is
> basically insignificant), while in practise you slow down the checkpoint
> barriers propagation time by quite a lot.
>
> Best,
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-23041
>
> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com> napisał(a):
>
>> Hi all,
>>
>> I'm using Flink 1.13 and my job is experiencing high start delay, more so
>> than high alignment time. (our flip 27 kafka source is heavily
>> backpressured). Since our alignment timeout is set to 1s, the unaligned
>> checkpoint never triggers since alignment delay is always below the
>> threshold.
>>
>> It's seems there is only a configuration for alignment timeout but should
>> there also be one for start delay timeout:
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>>
>> I'm interested to know the reasoning why there isn't a timeout for start
>> delay as well--was it because it was deemed too complex for the user to
>> configure two parameters for unaligned checkpoints?
>>
>> I'm aware of buffer debloating in 1.14 that could help but I'm trying to
>> see how far unaligned checkpointing can take me.
>>
>> Best,
>> Mason
>>
>
>

Reply via email to