Hi Piotr,

Thanks for the link to the JIRA ticket, we actually don’t see much state size 
overhead between checkpoints in aligned vs unaligned, so we will go with your 
recommendation of using unaligned checkpoints with 0s alignment timeout.

For context, we are testing unaligned checkpoints with our application with 
these tasks: [kafka source, map, filter] -> keyby -> [session window] -> 
[various kafka sinks]. The first task has parallelism 40 and the rest of the 
tasks have parallelism 240. This is the FLIP 27 Kafka source.

We added an artificial sleep (25 ms per invocation of in process function) the 
session window task to simulate backpressure; however, we still see checkpoints 
failing due to task acknowledgement doesn’t complete within our checkpoint 
timeout (30 minutes).

I am able to correlate that the input buffers from window and output buffers 
from source being 100% usage corresponds to the checkpoint failures. When they 
are not full (input can drop to as low as 60% usage and output can drop to as 
low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, 
it is the session window task or source task failing to 100% acknowledge the 
barriers within timeout. I do see the source task acknowledgement taking long 
in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 
hours) and source is idle and not busy at this time.

All other input buffers are low usage (mostly 0). For output buffer, the usage 
is around 50% for window--everything else is near 0% all the time except the 
source mentioned before (makes sense since rest are just sinks).

We are also running a parallel Flink job with the same configurations, except 
with unaligned checkpoints disabled. Here we see observe the same behavior 
except now some of the checkpoints are failing due to the source task not 
acknowledging everything within timeout—however, most failures are still due to 
session window acknowledgement.

All the data seems to points an issue with the source? Now, I don’t know how to 
explain this behavior since unaligned checkpoints should overtake records in 
the buffers (once seen at the input buffer, forward immediately downstream to 
output buffer).

Just to confirm, this is our checkpoint configuration:
```
Option
Value
Checkpointing Mode      Exactly Once
Checkpoint Storage      FileSystemCheckpointStorage
State Backend   EmbeddedRocksDBStateBackend
Interval        5m 0s
Timeout 30m 0s
Minimum Pause Between Checkpoints       2m 0s
Maximum Concurrent Checkpoints  1
Unaligned Checkpoints   Enabled
Persist Checkpoints Externally  Enabled (retain on cancellation)
Tolerable Failed Checkpoints    10
```

Are there other metrics should I look at—why else should tasks fail 
acknowledgement in unaligned mode? Is it something about the implementation 
details of window function that I am not considering? My main hunch is 
something to do with the source.

Best,
Mason

> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org> wrote:
> 
> Hi Mason,
> 
> In Flink 1.14 we have also changed the timeout behavior from checking against 
> the alignment duration, to simply checking how old is the checkpoint barrier 
> (so it would also account for the start delay) [1]. It was done in order to 
> solve problems as you are describing. Unfortunately we can not backport this 
> change to 1.13.x as it's a breaking change.
> 
> Anyway, from our experience I would recommend going all in with the unaligned 
> checkpoints, so setting the timeout back to the default value of 0ms. With 
> timeouts you are gaining very little (a tiny bit smaller state size if there 
> is no backpressure - tiny bit because without backpressure, even with timeout 
> set to 0ms, the amount of captured inflight data is basically insignificant), 
> while in practise you slow down the checkpoint barriers propagation time by 
> quite a lot.
> 
> Best,
> Piotrek
> 
> [1] https://issues.apache.org/jira/browse/FLINK-23041 
> <https://issues.apache.org/jira/browse/FLINK-23041>
> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com 
> <mailto:mas.chen6...@gmail.com>> napisał(a):
> Hi all,
> 
> I'm using Flink 1.13 and my job is experiencing high start delay, more so 
> than high alignment time. (our flip 27 kafka source is heavily 
> backpressured). Since our alignment timeout is set to 1s, the unaligned 
> checkpoint never triggers since alignment delay is always below the threshold.
> 
> It's seems there is only a configuration for alignment timeout but should 
> there also be one for start delay timeout: 
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>  
> <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout>
> 
> I'm interested to know the reasoning why there isn't a timeout for start 
> delay as well--was it because it was deemed too complex for the user to 
> configure two parameters for unaligned checkpoints?
> 
> I'm aware of buffer debloating in 1.14 that could help but I'm trying to see 
> how far unaligned checkpointing can take me.
> 
> Best,
> Mason

Reply via email to