Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many
windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has
been blocked being "busy" processing for example timers. FLINK-25414 shows
only blocked on being hard/soft backpressure. Unfortunately at the moment I
don't know how to implement such a metric without affecting performance on
the critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen <mason.c...@apple.com> napisał(a):

> Hi Piotrek,
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenario?
>
>
> It seems plausible—there are potentially many keys (and many windows). Is
> there a way to confirm with metrics? We can add a window fire counter to
> the window operator that only gets incremented at the end of windows
> evaluation, in order to see the huge jumps in window fires. I can this
> benefiting other users who troubleshoot the problem of large number of
> window firing.
>
> Best,
> Mason
>
> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pnowoj...@apache.org> wrote:
>
> Hi Mason,
>
> > and it has to finish processing this output before checkpoint can
> begin—is this right?
>
> Yes. Checkpoint will be only executed once all triggered windows will be
> fully processed.
>
> But from what you have posted it looks like all of that delay is
> coming from hundreds of thousands of windows firing all at the same time.
> Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s
> * 60s/min * 70min = 151 200triggers fired at once (or in a very short
> interval). In other words, something (presumably a watermark) has fired
> more than 151 200 windows at once, which is taking ~1h 10minutes to process
> and during this time the checkpoint can not make any progress. Is this
> number of triggered windows plausible in your scenario?
>
> Best,
> Piotrek
>
>
> czw., 23 gru 2021 o 12:12 Mason Chen <mason.c...@apple.com> napisał(a):
>
>> Hi Piotr,
>>
>> Thanks for the thorough response and the PR—will review later.
>>
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s *window process function* emits at
>> least 1 record.
>> 3. The 25 ms sleep is at the beginning of the window process function.
>>
>> Your explanation about how records being bigger than the buffer size can
>> cause blockage makes sense to me. However, my average record size is around 
>> 770
>> bytes coming out of the source and 960 bytes coming out of the window.
>> Also, we don’t override the default `taskmanager.memory.segment-size`. My
>> Flink job memory config is as follows:
>>
>> ```
>> taskmanager.memory.jvm-metaspace.size: 512 mb
>> taskmanager.memory.jvm-overhead.max: 2Gb
>> taskmanager.memory.jvm-overhead.min: 512Mb
>> taskmanager.memory.managed.fraction: '0.4'
>> taskmanager.memory.network.fraction: '0.2'
>> taskmanager.memory.network.max: 2Gb
>> taskmanager.memory.network.min: 200Mb
>> taskmanager.memory.process.size: 16Gb
>> taskmanager.numberOfTaskSlots: '4'
>> ```
>>
>>  Are you sure your job is making any progress? Are records being
>> processed? Hasn't your job simply deadlocked on something?
>>
>>
>> To distinguish task blockage vs graceful backpressure, I have checked the
>> operator throughput metrics and have confirmed that during window *task*
>> buffer blockage, the window *operator* DOES emit records. Tasks look
>> like they aren’t doing anything but the window is emitting records.
>>
>> <throughput_metrics.png>
>>
>>
>> Furthermore, I created a custom trigger to wrap a metric counter for
>> FIRED counts to get a estimation of how many windows are fired at the same
>> time. I ran a separate job with the same configs—the results look as
>> follows:
>> <trigger_metrics.png>
>>
>> On average, when the buffers are blocked, there are 36 FIREs per second.
>> Since each of these fires invokes the window process function, 25 ms * 36 =
>> 900 ms means we sleep almost a second cumulatively, per second—which is
>> pretty severe. Combined with the fact that the window process function can
>> emit many records, the task takes even longer to checkpoint since the
>> flatmap/kafka sink is chained with the window operator—and it has to finish
>> processing this output before checkpoint can begin—*is this right?* In
>> addition, when the window fires per second reduces, checkpoint is able to
>> continue and succeed.
>>
>> So, I think that the surge of window firing combined with the sleep is
>> the source of the issue, which makes sense. I’m not sure how to confirm
>> whether or not the points about buffer sizes being insufficient for the
>> window output is also interplaying with this issue.
>>
>> Best,
>> Mason
>>
>>
>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pnowoj...@apache.org> wrote:
>>
>> Hi Mason,
>>
>> One more question. Are you sure your job is making any progress? Are
>> records being processed? Hasn't your job simply deadlocked on something?
>>
>> Best,
>> Piotrek
>>
>> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pnowoj...@apache.org>
>> napisał(a):
>>
>>> Hi,
>>>
>>> Thanks for getting back to us. This is indeed weird.
>>>
>>> >> One of the unaligned checkpoints limitations is that Flink can not
>>> snapshot a state of an operator in the middle of processing a record.
>>> >
>>> >This is true for aligned checkpoints too, right?
>>>
>>> In a sense. For aligned checkpoints there is a stronger limitation, that
>>> the task has to process all of the buffered records on the input before
>>> it's able to do an aligned checkpoint. For unaligned checkpoints the task
>>> has to finish fully processing only the currently processed record.
>>>
>>> > 1. Why is there high start delay at the source? Isn’t this what FLIP
>>> 27 sources are designed to overcome since the need to acquire the
>>> checkpoint lock is irrelevant? Is it a bug?
>>>
>>> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources
>>> are working in the task thread. But the task thread can not process records
>>> and do a checkpoint at the same time. FLIP-27 source will not pick up a
>>> next record from the input if there is a backpressure (that allows
>>> checkpoint to be triggered while task is back pressured), but this back
>>> pressure detection mechanism (or rather mechanism that prevents blocking
>>> waits of the task thread when there is a back pressure) is not perfect. A
>>> couple of the largests limitations are:
>>> a) If your single record doesn't fit in a single network buffer, for
>>> example network buffer default size is 32KB and your record size can reach
>>> 33KB, back pressure detection will allow to process next record since there
>>> will be some buffer available, but the produced record won't fit into this
>>> single buffer and will have to blockingly wait for another buffer to be
>>> recycled (increasing start delay and/or alignment time).
>>> b) If you have a flat map style operator/function in the chain, that
>>> multiplies the number of records you can hit exactly the same problem. For
>>> example, the network buffer is 32KB, record size is 330B, but you have a
>>> flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB
>>> so again you might end up with the task being blocked as a single buffer
>>> wouldn't be enough to serialize all of those 100 records.
>>> c) The same as b), but caused by a timer/watermark triggering
>>> WindowOperator to produce lots of records.
>>>
>>> > 2. When the source operator finished for checkpoint 337, why is start
>>> delay high for the window? Barriers should have been forwarded downstream
>>> quite quickly unless the window operator is blocking for a few hours...
>>>
>>> All of those points apply actually to every task, not only FLIP-27
>>> source task and maybe they could explain why the window/flat map task has
>>> been blocked for ~2.5h.
>>>
>>> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your
>>> record size is sometimes exceeding network buffer size, this can cause the
>>> source task to be blocked for those 6 hours. Source task will be simply
>>> stuck waiting for a buffer to be recycled, and this will only happen once a
>>> downstream task will process one more buffer.
>>>
>>> > 3. If the window is the bottleneck, what are the various ways to
>>> confirm this? We have metrics to measure the process function but we don’t
>>> know how many windows are getting fired at the same time to give the
>>> overall latency for the operator. Are there metrics or logs to see how many
>>> windows are getting fired or how long the window operator is blocking the
>>> window input buffers from processing?
>>>
>>> In the webUI the task nodes are colored depending on the
>>> busy/backpressured time. You can clearly see that the source is fully
>>> backpressured all the time, while the window is constantly busy. I presume
>>> your function that is inducing 25ms per record sleep time is chained with
>>> the window. That confirms for me that the window task is the bottleneck.
>>> However unfortunately there is no easy way to tell how severe this back
>>> pressure and for how long those tasks are blocked. In other words, a task
>>> that is busy processing records for 1ms every 1ns and a Task that is
>>> blocked busy processing a single record for 6h will both have the same 100%
>>> Busy metric. Same goes for blocked on the back pressure (both task back
>>> pressured for 1ms every 1ns and task back pressured 1h every 1ns will have
>>> 100% back pressure metric). Moreover there is currently no way to
>>> distinguish if a task is back pressured in a graceful way, without blocking
>>> the task thread, or if it is indeed blocking the task thread (due to a), b)
>>> or c)). I have created a ticket to add some metrics to help with that [1],
>>> but it won't help you right now.
>>>
>>> I. You could do some estimations on paper if anything that I have
>>> written above can theoretically happen. You should know the size of the
>>> windows/record sizes/what your Flat Map functions are doing (it seems like
>>> you have two of those chained after the WindowOperator?). From the looks of
>>> it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window
>>> operators might suggest that it's possible.
>>> II.  As those tasks are blocked for hours, triggering a checkpoint and
>>> collecting some stack traces can help you understand what the tasks are
>>> actually doing. But for that you would need to understand how to
>>> differentiate a blocked task, so...
>>> *III. ... maybe actually the most efficient way for us to help you would
>>> be if you could minimize/simplify your job, replace Kafka source with an
>>> artificial source that would be generating records, but in such a way that
>>> would still reproduce this behavior and share your code with us?*
>>>
>>> Best, Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-25414
>>>
>>>
>>> wt., 21 gru 2021 o 20:10 Mason Chen <mason.c...@apple.com> napisał(a):
>>>
>>>> Hi Piotr,
>>>>
>>>> These observations correspond to the 0ms alignment timeout setting.
>>>>
>>>> The checkpoints are timeouting because the checkpoint acknowledgement
>>>> is timing out. Now, we increased the timeout to 3 hours in our checkpoints
>>>> and we still face errors due to checkpoint acknowledgement—the rest of the
>>>> checkpoint config is still the same.
>>>>
>>>> This is our job graph:
>>>> <job_graph.png>
>>>> To give more details about the window, we use the default event time
>>>> trigger with a gap of 300 seconds and 180 allowed lateness. The window only
>>>> implements the process function in which it emits 1 element.
>>>>
>>>> Here are the screenshots of the failed checkpoints. Failures typically
>>>> come in groups like this. On average, checkpoints complete in 2m 49s.
>>>>
>>>> <failed_checkpoint_summary.png>
>>>>
>>>> To show a few of the failed checkpoints in more detail:
>>>>
>>>> For checkpoint 337, the source finishes checkpoint within a normal
>>>> latency and the window checkpoint times out due to high start delay.
>>>> <checkpoint_337.png>
>>>>
>>>> For checkpoint 338, we see very high start delay at the source and
>>>> blocks the window operator from completing its checkpoint. I sorted by end
>>>> to end duration for the subtasks to give an idea of the worst start delay.
>>>> Start delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6
>>>> hours).
>>>> <checkpoint_338.png>
>>>>
>>>>
>>>> One of the unaligned checkpoints limitations is that Flink can not
>>>> snapshot a state of an operator in the middle of processing a record.
>>>>
>>>> This is true for aligned checkpoints too, right?
>>>>
>>>> So my questions are:
>>>>
>>>> 1. Why is there high start delay at the source? Isn’t this what FLIP 27
>>>> sources are designed to overcome since the need to acquire the checkpoint
>>>> lock is irrelevant? Is it a bug?
>>>> 2. When the source operator finished for checkpoint 337, why is start
>>>> delay high for the window? Barriers should have been forwarded downstream
>>>> quite quickly unless the window operator is blocking for a few hours...
>>>> 3. If the window is the bottleneck, what are the various ways to
>>>> confirm this? We have metrics to measure the process function but we don’t
>>>> know how many windows are getting fired at the same time to give the
>>>> overall latency for the operator. Are there metrics or logs to see how many
>>>> windows are getting fired or how long the window operator is blocking the
>>>> window input buffers from processing?
>>>>
>>>> Thanks,
>>>> Mason
>>>>
>>>>
>>>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pnowoj...@apache.org>
>>>> wrote:
>>>>
>>>> Hi Mason,
>>>>
>>>> Those checkpoint timeouts (30 minutes) have you already observed with
>>>> the alignment timeout set to 0ms? Or as you were previously running it with
>>>> 1s alignment timeout?
>>>>
>>>> If the latter, it might be because unaligned checkpoints are failing to
>>>> kick in in the first place. Setting the timeout to 0ms should solve the
>>>> problem.
>>>>
>>>> If the former, have you checked why the checkpoints are timeouting?
>>>> What part of the checkpointing process is taking a long time? For example
>>>> can you post a screenshot from the WebUI of checkpoint stats for each task?
>>>> The only explanation I could think of is this sleep time that you added.
>>>> 25ms per record is really a lot. I mean really a lot. 30 minutes / 25
>>>> ms/record = 72 000 records. One of the unaligned checkpoints limitations is
>>>> that Flink can not snapshot a state of an operator in the middle of
>>>> processing a record. In your particular case, Flink will not be able to
>>>> snapshot the state of the session window operator in the middle of the
>>>> windows being fired. If your window operator is firing a lot of windows at
>>>> the same time, or a single window is producing 72k of records (which would
>>>> be an unusual but not unimaginable amount), this could block checkpointing
>>>> of the window operator for 30 minutes due to this 25ms sleep down the
>>>> stream.
>>>>
>>>> Piotrek
>>>>
>>>> pt., 17 gru 2021 o 19:19 Mason Chen <mason.c...@apple.com> napisał(a):
>>>>
>>>>> Hi Piotr,
>>>>>
>>>>> Thanks for the link to the JIRA ticket, we actually don’t see much
>>>>> state size overhead between checkpoints in aligned vs unaligned, so we 
>>>>> will
>>>>> go with your recommendation of using unaligned checkpoints with 0s
>>>>> alignment timeout.
>>>>>
>>>>> For context, we are testing unaligned checkpoints with our application
>>>>> with these tasks: [kafka source, map, filter] -> keyby -> [session window]
>>>>> -> [various kafka sinks]. The first task has parallelism 40 and the rest 
>>>>> of
>>>>> the tasks have parallelism 240. This is the FLIP 27 Kafka source.
>>>>>
>>>>> We added an artificial sleep (25 ms per invocation of in process
>>>>> function) the session window task to simulate backpressure; however, we
>>>>> still see checkpoints failing due to task acknowledgement doesn’t complete
>>>>> within our checkpoint timeout (30 minutes).
>>>>>
>>>>> I am able to correlate that the input buffers from *window* and
>>>>> output buffers from *source* being 100% usage corresponds to the
>>>>> checkpoint failures. When they are not full (input can drop to as low as
>>>>> 60% usage and output can drop to as low as 55% usage), the checkpoints
>>>>> succeed within less than 2 ms. In all cases, it is the session window task
>>>>> or source task failing to 100% acknowledge the barriers within timeout. I
>>>>> do see the *source* task acknowledgement taking long in some of the
>>>>> failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and
>>>>> source is idle and not busy at this time.
>>>>>
>>>>> All other input buffers are low usage (mostly 0). For output buffer,
>>>>> the usage is around 50% for window--everything else is near 0% all the 
>>>>> time
>>>>> except the source mentioned before (makes sense since rest are just 
>>>>> sinks).
>>>>>
>>>>> We are also running a parallel Flink job with the same configurations,
>>>>> except with unaligned checkpoints disabled. Here we see observe the same
>>>>> behavior except now some of the checkpoints are failing due to the source
>>>>> task not acknowledging everything within timeout—however, most failures 
>>>>> are
>>>>> still due to session window acknowledgement.
>>>>>
>>>>> All the data seems to points an issue with the source? Now, I don’t
>>>>> know how to explain this behavior since unaligned checkpoints should
>>>>> overtake records in the buffers (once seen at the input buffer, forward
>>>>> immediately downstream to output buffer).
>>>>>
>>>>> Just to confirm, this is our checkpoint configuration:
>>>>> ```
>>>>> Option
>>>>> Value
>>>>> Checkpointing Mode Exactly Once
>>>>> Checkpoint Storage FileSystemCheckpointStorage
>>>>> State Backend EmbeddedRocksDBStateBackend
>>>>> Interval 5m 0s
>>>>> Timeout 30m 0s
>>>>> Minimum Pause Between Checkpoints 2m 0s
>>>>> Maximum Concurrent Checkpoints 1
>>>>> Unaligned Checkpoints Enabled
>>>>> Persist Checkpoints Externally Enabled (retain on cancellation)
>>>>> Tolerable Failed Checkpoints 10
>>>>> ```
>>>>>
>>>>> Are there other metrics should I look at—why else should tasks fail
>>>>> acknowledgement in unaligned mode? Is it something about the 
>>>>> implementation
>>>>> details of window function that I am not considering? My main hunch is
>>>>> something to do with the source.
>>>>>
>>>>> Best,
>>>>> Mason
>>>>>
>>>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org>
>>>>> wrote:
>>>>>
>>>>> Hi Mason,
>>>>>
>>>>> In Flink 1.14 we have also changed the timeout behavior from checking
>>>>> against the alignment duration, to simply checking how old is the
>>>>> checkpoint barrier (so it would also account for the start delay) [1]. It
>>>>> was done in order to solve problems as you are describing. Unfortunately 
>>>>> we
>>>>> can not backport this change to 1.13.x as it's a breaking change.
>>>>>
>>>>> Anyway, from our experience I would recommend going all in with the
>>>>> unaligned checkpoints, so setting the timeout back to the default value of
>>>>> 0ms. With timeouts you are gaining very little (a tiny bit smaller state
>>>>> size if there is no backpressure - tiny bit because without backpressure,
>>>>> even with timeout set to 0ms, the amount of captured inflight data is
>>>>> basically insignificant), while in practise you slow down the checkpoint
>>>>> barriers propagation time by quite a lot.
>>>>>
>>>>> Best,
>>>>> Piotrek
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23041
>>>>>
>>>>> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com>
>>>>> napisał(a):
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm using Flink 1.13 and my job is experiencing high start delay,
>>>>>> more so than high alignment time. (our flip 27 kafka source is heavily
>>>>>> backpressured). Since our alignment timeout is set to 1s, the unaligned
>>>>>> checkpoint never triggers since alignment delay is always below the
>>>>>> threshold.
>>>>>>
>>>>>> It's seems there is only a configuration for alignment timeout but
>>>>>> should there also be one for start delay timeout:
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>>>>>>
>>>>>> I'm interested to know the reasoning why there isn't a timeout for
>>>>>> start delay as well--was it because it was deemed too complex for the 
>>>>>> user
>>>>>> to configure two parameters for unaligned checkpoints?
>>>>>>
>>>>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying
>>>>>> to see how far unaligned checkpointing can take me.
>>>>>>
>>>>>> Best,
>>>>>> Mason
>>>>>>
>>>>>
>>>>>
>>>>
>>
>

Reply via email to