Hi Robin,

I noticed that I answered privately, so let me forward that to the user
list.

Please come back to the ML if you have more questions.

Best,

Arvid

On Thu, Jan 9, 2020 at 5:47 PM Robin Cassan <robin.cas...@contentsquare.com>
wrote:

> Hi Arvid, thanks a lot for this quick response!
> We have wrongly assumed that our data wasn't skewed because the state size
> was the same across partitions, but you are right about the fact that we
> still may have huge data for a few given keys. We will compute a few stats
> on our partitions to have more visibility on our data skew. We also haven't
> monitored the checkpoint barrier and backpressure, having more visibility
> on these could also help us understand our problem.
> It's also great to know about the future unaligned checkpoint support,
> eager to see it live!
>
> Thanks again for your help, this will give us a few things to investigate!
>
> Le jeu. 9 janv. 2020 à 11:08, Arvid Heise <ar...@ververica.com> a écrit :
>
>> Hi Robin,
>>
>> such behavior can usually be seen if you have skew in your data. Usually,
>> a keyBy reshuffles the data and very many records share the same key (or
>> the hash collides). Hence, this task is heavily backpressuring and the
>> checkpoint barrier take a long time to arrive at the operator and the
>> respective downstream channels.
>>
>> State size may or may not correlate to the number of records per subtask.
>> Often the state size correlates to the number of different keys, but you
>> could still have one key that is really huge.
>>
>> Furthermore, it may be that keys are fairly well distributed, but the
>> record size depends on the key. Then processing records with a specific key
>> may take much longer than records of other keys.
>>
>> If the aforementioned skew does not apply to you and you cannot observe
>> any backpressure, could you please share your topology and use case?
>>
>> Btw, we are currently working on unaligned checkpoints, where you can
>> trade checkpointing time with state size (in-flight data is stored and
>> checkpoint barrier overtakes it). [1] It's targeted for Flink 1.11 though.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-76%3A+Unaligned+Checkpoints
>>
>> On Thu, Jan 9, 2020 at 8:13 AM Robin Cassan <
>> robin.cas...@contentsquare.com> wrote:
>>
>>> Hi all!
>>> I am having trouble explaining why my checkpoints take so much time,
>>> even though most partitions finish their checkpoints quite quickly. We are
>>> running a 96 partitions job that consumes and produces to Kafka and
>>> checkpoints to amazon S3. As you can see on the screenshot below, the State
>>> State is pretty well balanced and the Checkpoint Durations (Async and Sync)
>>> are always kept under 13 minutes. However, the End-To-End Duration of
>>> subtask 4 is 1h17m, which makes the checkpoint stuck at 99% for a very long
>>> time.
>>> We have observed that, for the last few checkpoints, subtask 4 was
>>> always causing this slowness.
>>> Have you ever observed such a behavior? What could be the reason for a
>>> huge end-to-end time on a single subtask?
>>>
>>> Thank you and don't hesitate to ask if you need more information
>>>
>>>
>>>

Reply via email to