Hi Piotr, Thanks for the link to the JIRA ticket, we actually don’t see much state size overhead between checkpoints in aligned vs unaligned, so we will go with your recommendation of using unaligned checkpoints with 0s alignment timeout.
For context, we are testing unaligned checkpoints with our application with these tasks: [kafka source, map, filter] -> keyby -> [session window] -> [various kafka sinks]. The first task has parallelism 40 and the rest of the tasks have parallelism 240. This is the FLIP 27 Kafka source. We added an artificial sleep (25 ms per invocation of in process function) the session window task to simulate backpressure; however, we still see checkpoints failing due to task acknowledgement doesn’t complete within our checkpoint timeout (30 minutes). I am able to correlate that the input buffers from window and output buffers from source being 100% usage corresponds to the checkpoint failures. When they are not full (input can drop to as low as 60% usage and output can drop to as low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, it is the session window task or source task failing to 100% acknowledge the barriers within timeout. I do see the source task acknowledgement taking long in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and source is idle and not busy at this time. All other input buffers are low usage (mostly 0). For output buffer, the usage is around 50% for window--everything else is near 0% all the time except the source mentioned before (makes sense since rest are just sinks). We are also running a parallel Flink job with the same configurations, except with unaligned checkpoints disabled. Here we see observe the same behavior except now some of the checkpoints are failing due to the source task not acknowledging everything within timeout—however, most failures are still due to session window acknowledgement. All the data seems to points an issue with the source? Now, I don’t know how to explain this behavior since unaligned checkpoints should overtake records in the buffers (once seen at the input buffer, forward immediately downstream to output buffer). Just to confirm, this is our checkpoint configuration: ``` Option Value Checkpointing Mode Exactly Once Checkpoint Storage FileSystemCheckpointStorage State Backend EmbeddedRocksDBStateBackend Interval 5m 0s Timeout 30m 0s Minimum Pause Between Checkpoints 2m 0s Maximum Concurrent Checkpoints 1 Unaligned Checkpoints Enabled Persist Checkpoints Externally Enabled (retain on cancellation) Tolerable Failed Checkpoints 10 ``` Are there other metrics should I look at—why else should tasks fail acknowledgement in unaligned mode? Is it something about the implementation details of window function that I am not considering? My main hunch is something to do with the source. Best, Mason > On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org> wrote: > > Hi Mason, > > In Flink 1.14 we have also changed the timeout behavior from checking against > the alignment duration, to simply checking how old is the checkpoint barrier > (so it would also account for the start delay) [1]. It was done in order to > solve problems as you are describing. Unfortunately we can not backport this > change to 1.13.x as it's a breaking change. > > Anyway, from our experience I would recommend going all in with the unaligned > checkpoints, so setting the timeout back to the default value of 0ms. With > timeouts you are gaining very little (a tiny bit smaller state size if there > is no backpressure - tiny bit because without backpressure, even with timeout > set to 0ms, the amount of captured inflight data is basically insignificant), > while in practise you slow down the checkpoint barriers propagation time by > quite a lot. > > Best, > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-23041 > <https://issues.apache.org/jira/browse/FLINK-23041> > wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com > <mailto:mas.chen6...@gmail.com>> napisał(a): > Hi all, > > I'm using Flink 1.13 and my job is experiencing high start delay, more so > than high alignment time. (our flip 27 kafka source is heavily > backpressured). Since our alignment timeout is set to 1s, the unaligned > checkpoint never triggers since alignment delay is always below the threshold. > > It's seems there is only a configuration for alignment timeout but should > there also be one for start delay timeout: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout > > <https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout> > > I'm interested to know the reasoning why there isn't a timeout for start > delay as well--was it because it was deemed too complex for the user to > configure two parameters for unaligned checkpoints? > > I'm aware of buffer debloating in 1.14 that could help but I'm trying to see > how far unaligned checkpointing can take me. > > Best, > Mason