Hi Mason, Sorry for a late reply, but I was OoO.
I think you could confirm it with more custom metrics. Counting how many windows have been registered/fired and plotting that over time. I think it would be more helpful in this case to check how long a task has been blocked being "busy" processing for example timers. FLINK-25414 shows only blocked on being hard/soft backpressure. Unfortunately at the moment I don't know how to implement such a metric without affecting performance on the critical path, so I don't see this happening soon :( Best, Piotrek wt., 4 sty 2022 o 18:02 Mason Chen <mason.c...@apple.com> napisał(a): > Hi Piotrek, > > In other words, something (presumably a watermark) has fired more than 151 > 200 windows at once, which is taking ~1h 10minutes to process and during > this time the checkpoint can not make any progress. Is this number of > triggered windows plausible in your scenario? > > > It seems plausible—there are potentially many keys (and many windows). Is > there a way to confirm with metrics? We can add a window fire counter to > the window operator that only gets incremented at the end of windows > evaluation, in order to see the huge jumps in window fires. I can this > benefiting other users who troubleshoot the problem of large number of > window firing. > > Best, > Mason > > On Dec 29, 2021, at 2:56 AM, Piotr Nowojski <pnowoj...@apache.org> wrote: > > Hi Mason, > > > and it has to finish processing this output before checkpoint can > begin—is this right? > > Yes. Checkpoint will be only executed once all triggered windows will be > fully processed. > > But from what you have posted it looks like all of that delay is > coming from hundreds of thousands of windows firing all at the same time. > Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s > * 60s/min * 70min = 151 200triggers fired at once (or in a very short > interval). In other words, something (presumably a watermark) has fired > more than 151 200 windows at once, which is taking ~1h 10minutes to process > and during this time the checkpoint can not make any progress. Is this > number of triggered windows plausible in your scenario? > > Best, > Piotrek > > > czw., 23 gru 2021 o 12:12 Mason Chen <mason.c...@apple.com> napisał(a): > >> Hi Piotr, >> >> Thanks for the thorough response and the PR—will review later. >> >> Clarifications: >> 1. The flat map you refer to produces at most 1 record. >> 2. The session window operator’s *window process function* emits at >> least 1 record. >> 3. The 25 ms sleep is at the beginning of the window process function. >> >> Your explanation about how records being bigger than the buffer size can >> cause blockage makes sense to me. However, my average record size is around >> 770 >> bytes coming out of the source and 960 bytes coming out of the window. >> Also, we don’t override the default `taskmanager.memory.segment-size`. My >> Flink job memory config is as follows: >> >> ``` >> taskmanager.memory.jvm-metaspace.size: 512 mb >> taskmanager.memory.jvm-overhead.max: 2Gb >> taskmanager.memory.jvm-overhead.min: 512Mb >> taskmanager.memory.managed.fraction: '0.4' >> taskmanager.memory.network.fraction: '0.2' >> taskmanager.memory.network.max: 2Gb >> taskmanager.memory.network.min: 200Mb >> taskmanager.memory.process.size: 16Gb >> taskmanager.numberOfTaskSlots: '4' >> ``` >> >> Are you sure your job is making any progress? Are records being >> processed? Hasn't your job simply deadlocked on something? >> >> >> To distinguish task blockage vs graceful backpressure, I have checked the >> operator throughput metrics and have confirmed that during window *task* >> buffer blockage, the window *operator* DOES emit records. Tasks look >> like they aren’t doing anything but the window is emitting records. >> >> <throughput_metrics.png> >> >> >> Furthermore, I created a custom trigger to wrap a metric counter for >> FIRED counts to get a estimation of how many windows are fired at the same >> time. I ran a separate job with the same configs—the results look as >> follows: >> <trigger_metrics.png> >> >> On average, when the buffers are blocked, there are 36 FIREs per second. >> Since each of these fires invokes the window process function, 25 ms * 36 = >> 900 ms means we sleep almost a second cumulatively, per second—which is >> pretty severe. Combined with the fact that the window process function can >> emit many records, the task takes even longer to checkpoint since the >> flatmap/kafka sink is chained with the window operator—and it has to finish >> processing this output before checkpoint can begin—*is this right?* In >> addition, when the window fires per second reduces, checkpoint is able to >> continue and succeed. >> >> So, I think that the surge of window firing combined with the sleep is >> the source of the issue, which makes sense. I’m not sure how to confirm >> whether or not the points about buffer sizes being insufficient for the >> window output is also interplaying with this issue. >> >> Best, >> Mason >> >> >> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski <pnowoj...@apache.org> wrote: >> >> Hi Mason, >> >> One more question. Are you sure your job is making any progress? Are >> records being processed? Hasn't your job simply deadlocked on something? >> >> Best, >> Piotrek >> >> śr., 22 gru 2021 o 10:02 Piotr Nowojski <pnowoj...@apache.org> >> napisał(a): >> >>> Hi, >>> >>> Thanks for getting back to us. This is indeed weird. >>> >>> >> One of the unaligned checkpoints limitations is that Flink can not >>> snapshot a state of an operator in the middle of processing a record. >>> > >>> >This is true for aligned checkpoints too, right? >>> >>> In a sense. For aligned checkpoints there is a stronger limitation, that >>> the task has to process all of the buffered records on the input before >>> it's able to do an aligned checkpoint. For unaligned checkpoints the task >>> has to finish fully processing only the currently processed record. >>> >>> > 1. Why is there high start delay at the source? Isn’t this what FLIP >>> 27 sources are designed to overcome since the need to acquire the >>> checkpoint lock is irrelevant? Is it a bug? >>> >>> Kind of. You do not have to acquire checkpoint lock, as FLIP-27 sources >>> are working in the task thread. But the task thread can not process records >>> and do a checkpoint at the same time. FLIP-27 source will not pick up a >>> next record from the input if there is a backpressure (that allows >>> checkpoint to be triggered while task is back pressured), but this back >>> pressure detection mechanism (or rather mechanism that prevents blocking >>> waits of the task thread when there is a back pressure) is not perfect. A >>> couple of the largests limitations are: >>> a) If your single record doesn't fit in a single network buffer, for >>> example network buffer default size is 32KB and your record size can reach >>> 33KB, back pressure detection will allow to process next record since there >>> will be some buffer available, but the produced record won't fit into this >>> single buffer and will have to blockingly wait for another buffer to be >>> recycled (increasing start delay and/or alignment time). >>> b) If you have a flat map style operator/function in the chain, that >>> multiplies the number of records you can hit exactly the same problem. For >>> example, the network buffer is 32KB, record size is 330B, but you have a >>> flat map that suddenly produces 100 records (each 330B). 330B * 100 = 33KB >>> so again you might end up with the task being blocked as a single buffer >>> wouldn't be enough to serialize all of those 100 records. >>> c) The same as b), but caused by a timer/watermark triggering >>> WindowOperator to produce lots of records. >>> >>> > 2. When the source operator finished for checkpoint 337, why is start >>> delay high for the window? Barriers should have been forwarded downstream >>> quite quickly unless the window operator is blocking for a few hours... >>> >>> All of those points apply actually to every task, not only FLIP-27 >>> source task and maybe they could explain why the window/flat map task has >>> been blocked for ~2.5h. >>> >>> Re 1. + 2. If your Window/Flat Map task can block for 6 hours, and your >>> record size is sometimes exceeding network buffer size, this can cause the >>> source task to be blocked for those 6 hours. Source task will be simply >>> stuck waiting for a buffer to be recycled, and this will only happen once a >>> downstream task will process one more buffer. >>> >>> > 3. If the window is the bottleneck, what are the various ways to >>> confirm this? We have metrics to measure the process function but we don’t >>> know how many windows are getting fired at the same time to give the >>> overall latency for the operator. Are there metrics or logs to see how many >>> windows are getting fired or how long the window operator is blocking the >>> window input buffers from processing? >>> >>> In the webUI the task nodes are colored depending on the >>> busy/backpressured time. You can clearly see that the source is fully >>> backpressured all the time, while the window is constantly busy. I presume >>> your function that is inducing 25ms per record sleep time is chained with >>> the window. That confirms for me that the window task is the bottleneck. >>> However unfortunately there is no easy way to tell how severe this back >>> pressure and for how long those tasks are blocked. In other words, a task >>> that is busy processing records for 1ms every 1ns and a Task that is >>> blocked busy processing a single record for 6h will both have the same 100% >>> Busy metric. Same goes for blocked on the back pressure (both task back >>> pressured for 1ms every 1ns and task back pressured 1h every 1ns will have >>> 100% back pressure metric). Moreover there is currently no way to >>> distinguish if a task is back pressured in a graceful way, without blocking >>> the task thread, or if it is indeed blocking the task thread (due to a), b) >>> or c)). I have created a ticket to add some metrics to help with that [1], >>> but it won't help you right now. >>> >>> I. You could do some estimations on paper if anything that I have >>> written above can theoretically happen. You should know the size of the >>> windows/record sizes/what your Flat Map functions are doing (it seems like >>> you have two of those chained after the WindowOperator?). From the looks of >>> it, 25ms sleep per record, WindowOperator + Flat Map, huge state of window >>> operators might suggest that it's possible. >>> II. As those tasks are blocked for hours, triggering a checkpoint and >>> collecting some stack traces can help you understand what the tasks are >>> actually doing. But for that you would need to understand how to >>> differentiate a blocked task, so... >>> *III. ... maybe actually the most efficient way for us to help you would >>> be if you could minimize/simplify your job, replace Kafka source with an >>> artificial source that would be generating records, but in such a way that >>> would still reproduce this behavior and share your code with us?* >>> >>> Best, Piotrek >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-25414 >>> >>> >>> wt., 21 gru 2021 o 20:10 Mason Chen <mason.c...@apple.com> napisał(a): >>> >>>> Hi Piotr, >>>> >>>> These observations correspond to the 0ms alignment timeout setting. >>>> >>>> The checkpoints are timeouting because the checkpoint acknowledgement >>>> is timing out. Now, we increased the timeout to 3 hours in our checkpoints >>>> and we still face errors due to checkpoint acknowledgement—the rest of the >>>> checkpoint config is still the same. >>>> >>>> This is our job graph: >>>> <job_graph.png> >>>> To give more details about the window, we use the default event time >>>> trigger with a gap of 300 seconds and 180 allowed lateness. The window only >>>> implements the process function in which it emits 1 element. >>>> >>>> Here are the screenshots of the failed checkpoints. Failures typically >>>> come in groups like this. On average, checkpoints complete in 2m 49s. >>>> >>>> <failed_checkpoint_summary.png> >>>> >>>> To show a few of the failed checkpoints in more detail: >>>> >>>> For checkpoint 337, the source finishes checkpoint within a normal >>>> latency and the window checkpoint times out due to high start delay. >>>> <checkpoint_337.png> >>>> >>>> For checkpoint 338, we see very high start delay at the source and >>>> blocks the window operator from completing its checkpoint. I sorted by end >>>> to end duration for the subtasks to give an idea of the worst start delay. >>>> Start delay even show values beyond our checkpoint timeout (e.g. 4, 5, 6 >>>> hours). >>>> <checkpoint_338.png> >>>> >>>> >>>> One of the unaligned checkpoints limitations is that Flink can not >>>> snapshot a state of an operator in the middle of processing a record. >>>> >>>> This is true for aligned checkpoints too, right? >>>> >>>> So my questions are: >>>> >>>> 1. Why is there high start delay at the source? Isn’t this what FLIP 27 >>>> sources are designed to overcome since the need to acquire the checkpoint >>>> lock is irrelevant? Is it a bug? >>>> 2. When the source operator finished for checkpoint 337, why is start >>>> delay high for the window? Barriers should have been forwarded downstream >>>> quite quickly unless the window operator is blocking for a few hours... >>>> 3. If the window is the bottleneck, what are the various ways to >>>> confirm this? We have metrics to measure the process function but we don’t >>>> know how many windows are getting fired at the same time to give the >>>> overall latency for the operator. Are there metrics or logs to see how many >>>> windows are getting fired or how long the window operator is blocking the >>>> window input buffers from processing? >>>> >>>> Thanks, >>>> Mason >>>> >>>> >>>> On Dec 20, 2021, at 3:01 AM, Piotr Nowojski <pnowoj...@apache.org> >>>> wrote: >>>> >>>> Hi Mason, >>>> >>>> Those checkpoint timeouts (30 minutes) have you already observed with >>>> the alignment timeout set to 0ms? Or as you were previously running it with >>>> 1s alignment timeout? >>>> >>>> If the latter, it might be because unaligned checkpoints are failing to >>>> kick in in the first place. Setting the timeout to 0ms should solve the >>>> problem. >>>> >>>> If the former, have you checked why the checkpoints are timeouting? >>>> What part of the checkpointing process is taking a long time? For example >>>> can you post a screenshot from the WebUI of checkpoint stats for each task? >>>> The only explanation I could think of is this sleep time that you added. >>>> 25ms per record is really a lot. I mean really a lot. 30 minutes / 25 >>>> ms/record = 72 000 records. One of the unaligned checkpoints limitations is >>>> that Flink can not snapshot a state of an operator in the middle of >>>> processing a record. In your particular case, Flink will not be able to >>>> snapshot the state of the session window operator in the middle of the >>>> windows being fired. If your window operator is firing a lot of windows at >>>> the same time, or a single window is producing 72k of records (which would >>>> be an unusual but not unimaginable amount), this could block checkpointing >>>> of the window operator for 30 minutes due to this 25ms sleep down the >>>> stream. >>>> >>>> Piotrek >>>> >>>> pt., 17 gru 2021 o 19:19 Mason Chen <mason.c...@apple.com> napisał(a): >>>> >>>>> Hi Piotr, >>>>> >>>>> Thanks for the link to the JIRA ticket, we actually don’t see much >>>>> state size overhead between checkpoints in aligned vs unaligned, so we >>>>> will >>>>> go with your recommendation of using unaligned checkpoints with 0s >>>>> alignment timeout. >>>>> >>>>> For context, we are testing unaligned checkpoints with our application >>>>> with these tasks: [kafka source, map, filter] -> keyby -> [session window] >>>>> -> [various kafka sinks]. The first task has parallelism 40 and the rest >>>>> of >>>>> the tasks have parallelism 240. This is the FLIP 27 Kafka source. >>>>> >>>>> We added an artificial sleep (25 ms per invocation of in process >>>>> function) the session window task to simulate backpressure; however, we >>>>> still see checkpoints failing due to task acknowledgement doesn’t complete >>>>> within our checkpoint timeout (30 minutes). >>>>> >>>>> I am able to correlate that the input buffers from *window* and >>>>> output buffers from *source* being 100% usage corresponds to the >>>>> checkpoint failures. When they are not full (input can drop to as low as >>>>> 60% usage and output can drop to as low as 55% usage), the checkpoints >>>>> succeed within less than 2 ms. In all cases, it is the session window task >>>>> or source task failing to 100% acknowledge the barriers within timeout. I >>>>> do see the *source* task acknowledgement taking long in some of the >>>>> failures (e.g. 20 minutes, 30 minutes, 50 minutes, 1 hour, 2 hours) and >>>>> source is idle and not busy at this time. >>>>> >>>>> All other input buffers are low usage (mostly 0). For output buffer, >>>>> the usage is around 50% for window--everything else is near 0% all the >>>>> time >>>>> except the source mentioned before (makes sense since rest are just >>>>> sinks). >>>>> >>>>> We are also running a parallel Flink job with the same configurations, >>>>> except with unaligned checkpoints disabled. Here we see observe the same >>>>> behavior except now some of the checkpoints are failing due to the source >>>>> task not acknowledging everything within timeout—however, most failures >>>>> are >>>>> still due to session window acknowledgement. >>>>> >>>>> All the data seems to points an issue with the source? Now, I don’t >>>>> know how to explain this behavior since unaligned checkpoints should >>>>> overtake records in the buffers (once seen at the input buffer, forward >>>>> immediately downstream to output buffer). >>>>> >>>>> Just to confirm, this is our checkpoint configuration: >>>>> ``` >>>>> Option >>>>> Value >>>>> Checkpointing Mode Exactly Once >>>>> Checkpoint Storage FileSystemCheckpointStorage >>>>> State Backend EmbeddedRocksDBStateBackend >>>>> Interval 5m 0s >>>>> Timeout 30m 0s >>>>> Minimum Pause Between Checkpoints 2m 0s >>>>> Maximum Concurrent Checkpoints 1 >>>>> Unaligned Checkpoints Enabled >>>>> Persist Checkpoints Externally Enabled (retain on cancellation) >>>>> Tolerable Failed Checkpoints 10 >>>>> ``` >>>>> >>>>> Are there other metrics should I look at—why else should tasks fail >>>>> acknowledgement in unaligned mode? Is it something about the >>>>> implementation >>>>> details of window function that I am not considering? My main hunch is >>>>> something to do with the source. >>>>> >>>>> Best, >>>>> Mason >>>>> >>>>> On Dec 16, 2021, at 12:25 AM, Piotr Nowojski <pnowoj...@apache.org> >>>>> wrote: >>>>> >>>>> Hi Mason, >>>>> >>>>> In Flink 1.14 we have also changed the timeout behavior from checking >>>>> against the alignment duration, to simply checking how old is the >>>>> checkpoint barrier (so it would also account for the start delay) [1]. It >>>>> was done in order to solve problems as you are describing. Unfortunately >>>>> we >>>>> can not backport this change to 1.13.x as it's a breaking change. >>>>> >>>>> Anyway, from our experience I would recommend going all in with the >>>>> unaligned checkpoints, so setting the timeout back to the default value of >>>>> 0ms. With timeouts you are gaining very little (a tiny bit smaller state >>>>> size if there is no backpressure - tiny bit because without backpressure, >>>>> even with timeout set to 0ms, the amount of captured inflight data is >>>>> basically insignificant), while in practise you slow down the checkpoint >>>>> barriers propagation time by quite a lot. >>>>> >>>>> Best, >>>>> Piotrek >>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23041 >>>>> >>>>> wt., 14 gru 2021 o 22:04 Mason Chen <mas.chen6...@gmail.com> >>>>> napisał(a): >>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm using Flink 1.13 and my job is experiencing high start delay, >>>>>> more so than high alignment time. (our flip 27 kafka source is heavily >>>>>> backpressured). Since our alignment timeout is set to 1s, the unaligned >>>>>> checkpoint never triggers since alignment delay is always below the >>>>>> threshold. >>>>>> >>>>>> It's seems there is only a configuration for alignment timeout but >>>>>> should there also be one for start delay timeout: >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout >>>>>> >>>>>> I'm interested to know the reasoning why there isn't a timeout for >>>>>> start delay as well--was it because it was deemed too complex for the >>>>>> user >>>>>> to configure two parameters for unaligned checkpoints? >>>>>> >>>>>> I'm aware of buffer debloating in 1.14 that could help but I'm trying >>>>>> to see how far unaligned checkpointing can take me. >>>>>> >>>>>> Best, >>>>>> Mason >>>>>> >>>>> >>>>> >>>> >> >