Hey Caio,

Your analysis of the problem sounds right to me, I don't have a good
solution for you :(

I’ve validated that CPU profiles show clearAllState using a significant
> amount of CPU.


Did you use something like async-profiler here? Do you have more info on
the breakdown into what used the CPU time?
Once we know that, there might be an opportunity to do such operations
async/lazily, or fix something with the underlying platform (e.g. Rocksdb
is slow, ...)


On Thu, Mar 21, 2024 at 12:05 AM Caio Camatta via user <
user@flink.apache.org> wrote:

> Hey Asimansu,
>
> The inputDataStream is a KeyedStream, I forgot to mention that.
>
> Caio
>
> On Wed, Mar 20, 2024 at 6:56 PM Asimansu Bera <asimansu.b...@gmail.com>
> wrote:
>
>> Hello Caio,
>>
>> Based on the pseudocode, there is no keyed function present. Hence, the
>> window will not be processed parallely . Please check again and respond
>> back.
>>
>> val windowDataStream =
>>
>>   inputDataStream
>>
>>     .window(TumblingEventTimeWindows of 1 hour)
>>
>>     .trigger(custom trigger)
>>
>>     .aggregate(
>>
>>        preAggregator = custom AggregateFunction,
>>
>>        windowFunction = custom ProcessWindowFunction
>>
>> )
>>
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#keyed-vs-non-keyed-windows
>>
>> -A
>>
>>
>> On Wed, Mar 20, 2024 at 8:55 AM Caio Camatta via user <
>> user@flink.apache.org> wrote:
>>
>>> We run a large-scale Flink 1.16 cluster that uses windowed aggregations
>>> and we’re seeing lag spikes on window closure. I’m curious if others have
>>> encountered similar issues before and if anyone has suggestions for how to
>>> tackle this problem (other than simply increasing parallelism).
>>> ContextLag definition
>>>
>>> We define end-to-end lag as the delta between the time when the event
>>> was persisted in Kafka and the time when Flink finishes processing the
>>> event.
>>> Window operator definition
>>>
>>> The important parts (in pseudocode):
>>>
>>> val windowDataStream =
>>>
>>>   inputDataStream
>>>
>>>     .window(TumblingEventTimeWindows of 1 hour)
>>>
>>>     .trigger(custom trigger)
>>>
>>>     .aggregate(
>>>
>>>        preAggregator = custom AggregateFunction,
>>>
>>>        windowFunction = custom ProcessWindowFunction
>>>
>>> )
>>>
>>> The custom trigger emits a TriggerResult.CONTINUE in onEventTime, i.e.
>>> we don’t run any user-defined logic at the end of the window. (This trigger
>>> only fires while the window is active via custom logic in onElement.)
>>> Numbers
>>>
>>> Our Flink app processes ~3K events per second and I’ve calculated that
>>> there are around 200-300K panes to close per Task at the end of the 1-hour
>>> window. Our lag is fairly stable at a few hundred milliseconds during
>>> the window but spikes to 5-10 seconds when the window expires, which is a
>>> problem for us.
>>> The issue
>>>
>>> The magnitude of the lag spikes on window closure correlate with
>>>
>>>    -
>>>
>>>    the size of the window (a 1-hour window has bigger spikes than a
>>>    5-minute window.)
>>>    -
>>>
>>>    the cardinality of the keys in the event stream.
>>>    -
>>>
>>>    the number of events being processed per second.
>>>
>>> In other words, the more panes to close, the bigger the lag spike. I'm
>>> fairly sure that the lag is coming entirely from the WindowOperator’s
>>> clearAllState and I’ve validated that CPU profiles show clearAllState
>>> using a significant amount of CPU.
>>>
>>> Does this theory sound plausible? What could we do to minimize the
>>> effects of window clean-up? It would be nice to do it incrementally or
>>> asynchronously but I'm not sure if Flink provides this functionality today.
>>> Thanks,
>>> Caio
>>>
>>

Reply via email to