Hi Robin, I noticed that I answered privately, so let me forward that to the user list.
Please come back to the ML if you have more questions. Best, Arvid On Thu, Jan 9, 2020 at 5:47 PM Robin Cassan <robin.cas...@contentsquare.com> wrote: > Hi Arvid, thanks a lot for this quick response! > We have wrongly assumed that our data wasn't skewed because the state size > was the same across partitions, but you are right about the fact that we > still may have huge data for a few given keys. We will compute a few stats > on our partitions to have more visibility on our data skew. We also haven't > monitored the checkpoint barrier and backpressure, having more visibility > on these could also help us understand our problem. > It's also great to know about the future unaligned checkpoint support, > eager to see it live! > > Thanks again for your help, this will give us a few things to investigate! > > Le jeu. 9 janv. 2020 à 11:08, Arvid Heise <ar...@ververica.com> a écrit : > >> Hi Robin, >> >> such behavior can usually be seen if you have skew in your data. Usually, >> a keyBy reshuffles the data and very many records share the same key (or >> the hash collides). Hence, this task is heavily backpressuring and the >> checkpoint barrier take a long time to arrive at the operator and the >> respective downstream channels. >> >> State size may or may not correlate to the number of records per subtask. >> Often the state size correlates to the number of different keys, but you >> could still have one key that is really huge. >> >> Furthermore, it may be that keys are fairly well distributed, but the >> record size depends on the key. Then processing records with a specific key >> may take much longer than records of other keys. >> >> If the aforementioned skew does not apply to you and you cannot observe >> any backpressure, could you please share your topology and use case? >> >> Btw, we are currently working on unaligned checkpoints, where you can >> trade checkpointing time with state size (in-flight data is stored and >> checkpoint barrier overtakes it). [1] It's targeted for Flink 1.11 though. >> >> [1] >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints >> >> On Thu, Jan 9, 2020 at 8:13 AM Robin Cassan < >> robin.cas...@contentsquare.com> wrote: >> >>> Hi all! >>> I am having trouble explaining why my checkpoints take so much time, >>> even though most partitions finish their checkpoints quite quickly. We are >>> running a 96 partitions job that consumes and produces to Kafka and >>> checkpoints to amazon S3. As you can see on the screenshot below, the State >>> State is pretty well balanced and the Checkpoint Durations (Async and Sync) >>> are always kept under 13 minutes. However, the End-To-End Duration of >>> subtask 4 is 1h17m, which makes the checkpoint stuck at 99% for a very long >>> time. >>> We have observed that, for the last few checkpoints, subtask 4 was >>> always causing this slowness. >>> Have you ever observed such a behavior? What could be the reason for a >>> huge end-to-end time on a single subtask? >>> >>> Thank you and don't hesitate to ask if you need more information >>> >>> >>>