Hi Mason,

Since you are using RocksDB, you could enable this metric [1] 
state-backend-rocksdb-metrics-estimate-num-keys which gives (afaik) good 
indication of the number of active windows.
I’ve never seen (despite the warning) negative effect on the runtime.

Hope this help …

Thias




[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-metrics-estimate-num-keys

From: Mason Chen <mason.c...@apple.com>
Sent: Dienstag, 11. Januar 2022 19:20
To: Piotr Nowojski <pnowoj...@apache.org>
Cc: Mason Chen <mas.chen6...@gmail.com>; user <user@flink.apache.org>
Subject: Re: unaligned checkpoint for job with large start delay

Hi Piotrek,

No worries—I hope you had a good break.

Counting how many windows have been registered/fired and plotting that over 
time.
It’s straightforward to count windows that are fired (the trigger exposes the 
run time context and we can collect the information in that code path). 
However, it’s not so clear how to count the windows that have been registered 
since the window assigner does not expose the run time context—is this even the 
right place to count? It’s not necessarily the case that an assignment results 
in a new window registered. Am I missing anything else relevant from the user 
facing interface perspective?

 Unfortunately at the moment I don't know how to implement such a metric 
without affecting performance on the critical path, so I don't see this 
happening soon :(
Perhaps, it can be an opt in feature? I do it see it being really useful since 
most users aren’t really familiar with windows and these metrics can help 
easily identify the common problem of too many windows firing.

The additional metrics certainly help in diagnosing some of the symptoms of the 
root problem.

Best,
Mason


On Jan 10, 2022, at 1:00 AM, Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many 
windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has been 
blocked being "busy" processing for example timers. FLINK-25414 shows only 
blocked on being hard/soft backpressure. Unfortunately at the moment I don't 
know how to implement such a metric without affecting performance on the 
critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen 
<mason.c...@apple.com<mailto:mason.c...@apple.com>> napisał(a):
Hi Piotrek,


In other words, something (presumably a watermark) has fired more than 151 200 
windows at once, which is taking ~1h 10minutes to process and during this time 
the checkpoint can not make any progress. Is this number of triggered windows 
plausible in your scenario?

It seems plausible—there are potentially many keys (and many windows). Is there 
a way to confirm with metrics? We can add a window fire counter to the window 
operator that only gets incremented at the end of windows evaluation, in order 
to see the huge jumps in window fires. I can this benefiting other users who 
troubleshoot the problem of large number of window firing.

Best,
Mason


On Dec 29, 2021, at 2:56 AM, Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

> and it has to finish processing this output before checkpoint can begin—is 
> this right?

Yes. Checkpoint will be only executed once all triggered windows will be fully 
processed.

But from what you have posted it looks like all of that delay is coming from 
hundreds of thousands of windows firing all at the same time. Between 20:30 and 
~21:40 there must have been a bit more than 36 triggers/s * 60s/min * 70min = 
151 200triggers fired at once (or in a very short interval). In other words, 
something (presumably a watermark) has fired more than 151 200 windows at once, 
which is taking ~1h 10minutes to process and during this time the checkpoint 
can not make any progress. Is this number of triggered windows plausible in 
your scenario?

Best,
Piotrek


czw., 23 gru 2021 o 12:12 Mason Chen 
<mason.c...@apple.com<mailto:mason.c...@apple.com>> napisał(a):
Hi Piotr,

Thanks for the thorough response and the PR—will review later.

Clarifications:
1. The flat map you refer to produces at most 1 record.
2. The session window operator’s window process function emits at least 1 
record.
3. The 25 ms sleep is at the beginning of the window process function.

Your explanation about how records being bigger than the buffer size can cause 
blockage makes sense to me. However, my average record size is around 770 bytes 
coming out of the source and 960 bytes coming out of the window. Also, we don’t 
override the default `taskmanager.memory.segment-size`. My Flink job memory 
config is as follows:

```
taskmanager.memory.jvm-metaspace.size: 512 mb
taskmanager.memory.jvm-overhead.max: 2Gb
taskmanager.memory.jvm-overhead.min: 512Mb
taskmanager.memory.managed.fraction: '0.4'
taskmanager.memory.network.fraction: '0.2'
taskmanager.memory.network.max: 2Gb
taskmanager.memory.network.min: 200Mb
taskmanager.memory.process.size: 16Gb
taskmanager.numberOfTaskSlots: '4'
```

 Are you sure your job is making any progress? Are records being processed? 
Hasn't your job simply deadlocked on something?

To distinguish task blockage vs graceful backpressure, I have checked the 
operator throughput metrics and have confirmed that during window task buffer 
blockage, the window operator DOES emit records. Tasks look like they aren’t 
doing anything but the window is emitting records.

<throughput_metrics.png>


Furthermore, I created a custom trigger to wrap a metric counter for FIRED 
counts to get a estimation of how many windows are fired at the same time. I 
ran a separate job with the same configs—the results look as follows:
<trigger_metrics.png>

On average, when the buffers are blocked, there are 36 FIREs per second. Since 
each of these fires invokes the window process function, 25 ms * 36 = 900 ms 
means we sleep almost a second cumulatively, per second—which is pretty severe. 
Combined with the fact that the window process function can emit many records, 
the task takes even longer to checkpoint since the flatmap/kafka sink is 
chained with the window operator—and it has to finish processing this output 
before checkpoint can begin—is this right? In addition, when the window fires 
per second reduces, checkpoint is able to continue and succeed.

So, I think that the surge of window firing combined with the sleep is the 
source of the issue, which makes sense. I’m not sure how to confirm whether or 
not the points about buffer sizes being insufficient for the window output is 
also interplaying with this issue.

Best,
Mason



On Dec 22, 2021, at 6:17 AM, Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

One more question. Are you sure your job is making any progress? Are records 
being processed? Hasn't your job simply deadlocked on something?

Best,
Piotrek

śr., 22 gru 2021 o 10:02 Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> napisał(a):
Hi,

Thanks for getting back to us. This is indeed weird.

>> One of the unaligned checkpoints limitations is that Flink can not snapshot 
>> a state of an operator in the middle of processing a record.
>
>This is true for aligned checkpoints too, right?

In a sense. For aligned checkpoints there is a stronger limitation, that the 
task has to process all of the buffered records on the input before it's able 
to do an aligned checkpoint. For unaligned checkpoints the task has to finish 
fully processing only the currently processed record.

> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 
> sources are designed to overcome since the need to acquire the checkpoint 
> lock is irrelevant? Is it a bug?

Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources are 
working in the task thread. But the task thread can not process records and do 
a checkpoint at the same time. FLIP-27 source will not pick up a next record 
from the input if there is a backpressure (that allows checkpoint to be 
triggered while task is back pressured), but this back pressure detection 
mechanism (or rather mechanism that prevents blocking waits of the task thread 
when there is a back pressure) is not perfect. A couple of the largests 
limitations are:
a) If your single record doesn't fit in a single network buffer, for example 
network buffer default size is 32KB and your record size can reach 33KB, back 
pressure detection will allow to process next record since there will be some 
buffer available, but the produced record won't fit into this single buffer and 
will have to blockingly wait for another buffer to be recycled (increasing 
start delay and/or alignment time).
b) If you have a flat map style operator/function in the chain, that multiplies 
the number of records you can hit exactly the same problem. For example, the 
network buffer is 32KB, record size is 330B, but you have a flat map that 
suddenly produces 100 records (each 330B). 330B * 100 = 33KB so again you might 
end up with the task being blocked as a single buffer wouldn't be enough to 
serialize all of those 100 records.
c) The same as b), but caused by a timer/watermark triggering WindowOperator to 
produce lots of records.

> 2. When the source operator finished for checkpoint 337, why is start delay 
> high for the window? Barriers should have been forwarded downstream quite 
> quickly unless the window operator is blocking for a few hours...

All of those points apply actually to every task, not only FLIP-27 source task 
and maybe they could explain why the window/flat map task has been blocked for 
~2.5h.

Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your record 
size is sometimes exceeding network buffer size, this can cause the source task 
to be blocked for those 6 hours. Source task will be simply stuck waiting for a 
buffer to be recycled, and this will only happen once a downstream task will 
process one more buffer.

> 3. If the window is the bottleneck, what are the various ways to confirm 
> this? We have metrics to measure the process function but we don’t know how 
> many windows are getting fired at the same time to give the overall latency 
> for the operator. Are there metrics or logs to see how many windows are 
> getting fired or how long the window operator is blocking the window input 
> buffers from processing?

In the webUI the task nodes are colored depending on the busy/backpressured 
time. You can clearly see that the source is fully backpressured all the time, 
while the window is constantly busy. I presume your function that is inducing 
25ms per record sleep time is chained with the window. That confirms for me 
that the window task is the bottleneck. However unfortunately there is no easy 
way to tell how severe this back pressure and for how long those tasks are 
blocked. In other words, a task that is busy processing records for 1ms every 
1ns and a Task that is blocked busy processing a single record for 6h will both 
have the same 100% Busy metric. Same goes for blocked on the back pressure 
(both task back pressured for 1ms every 1ns and task back pressured 1h every 
1ns will have 100% back pressure metric). Moreover there is currently no way to 
distinguish if a task is back pressured in a graceful way, without blocking the 
task thread, or if it is indeed blocking the task thread (due to a), b) or c)). 
I have created a ticket to add some metrics to help with that [1], but it won't 
help you right now.

I. You could do some estimations on paper if anything that I have written above 
can theoretically happen. You should know the size of the windows/record 
sizes/what your Flat Map functions are doing (it seems like you have two of 
those chained after the WindowOperator?). From the looks of it, 25ms sleep per 
record, WindowOperator + Flat Map, huge state of window operators might suggest 
that it's possible.
II.  As those tasks are blocked for hours, triggering a checkpoint and 
collecting some stack traces can help you understand what the tasks are 
actually doing. But for that you would need to understand how to differentiate 
a blocked task, so...
III. ... maybe actually the most efficient way for us to help you would be if 
you could minimize/simplify your job, replace Kafka source with an artificial 
source that would be generating records, but in such a way that would still 
reproduce this behavior and share your code with us?

Best, Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-25414


wt., 21 gru 2021 o 20:10 Mason Chen 
<mason.c...@apple.com<mailto:mason.c...@apple.com>> napisał(a):
Hi Piotr,

These observations correspond to the 0ms alignment timeout setting.

The checkpoints are timeouting because the checkpoint acknowledgement is timing 
out. Now, we increased the timeout to 3 hours in our checkpoints and we still 
face errors due to checkpoint acknowledgement—the rest of the checkpoint config 
is still the same.

This is our job graph:
<job_graph.png>
To give more details about the window, we use the default event time trigger 
with a gap of 300 seconds and 180 allowed lateness. The window only implements 
the process function in which it emits 1 element.

Here are the screenshots of the failed checkpoints. Failures typically come in 
groups like this. On average, checkpoints complete in 2m 49s.

<failed_checkpoint_summary.png>

To show a few of the failed checkpoints in more detail:

For checkpoint 337, the source finishes checkpoint within a normal latency and 
the window checkpoint times out due to high start delay.
<checkpoint_337.png>

For checkpoint 338, we see very high start delay at the source and blocks the 
window operator from completing its checkpoint. I sorted by end to end duration 
for the subtasks to give an idea of the worst start delay. Start delay even 
show values beyond our checkpoint timeout (e.g. 4, 5, 6 hours).
<checkpoint_338.png>



One of the unaligned checkpoints limitations is that Flink can not snapshot a 
state of an operator in the middle of processing a record.
This is true for aligned checkpoints too, right?

So my questions are:

1. Why is there high start delay at the source? Isn’t this what FLIP 27 sources 
are designed to overcome since the need to acquire the checkpoint lock is 
irrelevant? Is it a bug?
2. When the source operator finished for checkpoint 337, why is start delay 
high for the window? Barriers should have been forwarded downstream quite 
quickly unless the window operator is blocking for a few hours...
3. If the window is the bottleneck, what are the various ways to confirm this? 
We have metrics to measure the process function but we don’t know how many 
windows are getting fired at the same time to give the overall latency for the 
operator. Are there metrics or logs to see how many windows are getting fired 
or how long the window operator is blocking the window input buffers from 
processing?

Thanks,
Mason



On Dec 20, 2021, at 3:01 AM, Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

Those checkpoint timeouts (30 minutes) have you already observed with the 
alignment timeout set to 0ms? Or as you were previously running it with 1s 
alignment timeout?

If the latter, it might be because unaligned checkpoints are failing to kick in 
in the first place. Setting the timeout to 0ms should solve the problem.

If the former, have you checked why the checkpoints are timeouting? What part 
of the checkpointing process is taking a long time? For example can you post a 
screenshot from the WebUI of checkpoint stats for each task? The only 
explanation I could think of is this sleep time that you added. 25ms per record 
is really a lot. I mean really a lot. 30 minutes / 25 ms/record = 72 000 
records. One of the unaligned checkpoints limitations is that Flink can not 
snapshot a state of an operator in the middle of processing a record. In your 
particular case, Flink will not be able to snapshot the state of the session 
window operator in the middle of the windows being fired. If your window 
operator is firing a lot of windows at the same time, or a single window is 
producing 72k of records (which would be an unusual but not unimaginable 
amount), this could block checkpointing of the window operator for 30 minutes 
due to this 25ms sleep down the stream.

Piotrek

pt., 17 gru 2021 o 19:19 Mason Chen 
<mason.c...@apple.com<mailto:mason.c...@apple.com>> napisał(a):
Hi Piotr,

Thanks for the link to the JIRA ticket, we actually don’t see much state size 
overhead between checkpoints in aligned vs unaligned, so we will go with your 
recommendation of using unaligned checkpoints with 0s alignment timeout.

For context, we are testing unaligned checkpoints with our application with 
these tasks: [kafka source, map, filter] -> keyby -> [session window] -> 
[various kafka sinks]. The first task has parallelism 40 and the rest of the 
tasks have parallelism 240. This is the FLIP 27 Kafka source.

We added an artificial sleep (25 ms per invocation of in process function) the 
session window task to simulate backpressure; however, we still see checkpoints 
failing due to task acknowledgement doesn’t complete within our checkpoint 
timeout (30 minutes).

I am able to correlate that the input buffers from window and output buffers 
from source being 100% usage corresponds to the checkpoint failures. When they 
are not full (input can drop to as low as 60% usage and output can drop to as 
low as 55% usage), the checkpoints succeed within less than 2 ms. In all cases, 
it is the session window task or source task failing to 100% acknowledge the 
barriers within timeout. I do see the source task acknowledgement taking long 
in some of the failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 
hours) and source is idle and not busy at this time.

All other input buffers are low usage (mostly 0). For output buffer, the usage 
is around 50% for window--everything else is near 0% all the time except the 
source mentioned before (makes sense since rest are just sinks).

We are also running a parallel Flink job with the same configurations, except 
with unaligned checkpoints disabled. Here we see observe the same behavior 
except now some of the checkpoints are failing due to the source task not 
acknowledging everything within timeout—however, most failures are still due to 
session window acknowledgement.

All the data seems to points an issue with the source? Now, I don’t know how to 
explain this behavior since unaligned checkpoints should overtake records in 
the buffers (once seen at the input buffer, forward immediately downstream to 
output buffer).

Just to confirm, this is our checkpoint configuration:
```
Option

Value

Checkpointing Mode

Exactly Once

Checkpoint Storage

FileSystemCheckpointStorage

State Backend

EmbeddedRocksDBStateBackend

Interval

5m 0s

Timeout

30m 0s

Minimum Pause Between Checkpoints

2m 0s

Maximum Concurrent Checkpoints

1

Unaligned Checkpoints

Enabled

Persist Checkpoints Externally

Enabled (retain on cancellation)

Tolerable Failed Checkpoints

10

```

Are there other metrics should I look at—why else should tasks fail 
acknowledgement in unaligned mode? Is it something about the implementation 
details of window function that I am not considering? My main hunch is 
something to do with the source.

Best,
Mason


On Dec 16, 2021, at 12:25 AM, Piotr Nowojski 
<pnowoj...@apache.org<mailto:pnowoj...@apache.org>> wrote:

Hi Mason,

In Flink 1.14 we have also changed the timeout behavior from checking against 
the alignment duration, to simply checking how old is the checkpoint barrier 
(so it would also account for the start delay) [1]. It was done in order to 
solve problems as you are describing. Unfortunately we can not backport this 
change to 1.13.x as it's a breaking change.

Anyway, from our experience I would recommend going all in with the unaligned 
checkpoints, so setting the timeout back to the default value of 0ms. With 
timeouts you are gaining very little (a tiny bit smaller state size if there is 
no backpressure - tiny bit because without backpressure, even with timeout set 
to 0ms, the amount of captured inflight data is basically insignificant), while 
in practise you slow down the checkpoint barriers propagation time by quite a 
lot.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-23041

wt., 14 gru 2021 o 22:04 Mason Chen 
<mas.chen6...@gmail.com<mailto:mas.chen6...@gmail.com>> napisał(a):
Hi all,

I'm using Flink 1.13 and my job is experiencing high start delay, more so than 
high alignment time. (our flip 27 kafka source is heavily backpressured). Since 
our alignment timeout is set to 1s, the unaligned checkpoint never triggers 
since alignment delay is always below the threshold.

It's seems there is only a configuration for alignment timeout but should there 
also be one for start delay timeout: 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout

I'm interested to know the reasoning why there isn't a timeout for start delay 
as well--was it because it was deemed too complex for the user to configure two 
parameters for unaligned checkpoints?

I'm aware of buffer debloating in 1.14 that could help but I'm trying to see 
how far unaligned checkpointing can take me.

Best,
Mason





Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to