Thanks for the information!

I haven't tested Kuberntes's built-in rollback mechanism yet. I feel like I
can create another independent operator which detects flink application jvm
memory and triggers rollback.

Another solution I would like to discuss is also to implement an
independent operator. This operator do following things:

   - Retrieve the state size metrics for Flink applications from Prometheus.
   - Gather current and recommended parallelism metrics from the Flink
   operator, also reported in Prometheus.
   - If a downscale is advised, I would calculate whether the new cluster
   configuration, considering state size and JVM heap max size, can support
   the entire state; if not, the operation would be halted.
   - If feasible, this operator would manage the rescaling process
   similarly to the Flink operator, either by making API requests or by
   applying a kubectl patch to the FlinkDeployment CRD.

By doing this we could achieve something similar to what we can do with a
plugin system, Of course in this case I'll disable scaling of the flink
operator, Do you think it could work?

Best,
Yang

On Mon, 6 Nov 2023 at 23:43, Gyula Fóra <gyula.f...@gmail.com> wrote:

> Hey!
>
> Bit of a tricky problem, as it's not really possible to know that the job
> will be able to start with lower parallelism in some cases. Custom plugins
> may work but that would be an extremely complex solution at this point.
>
> The Kubernetes operator has a built-in rollback mechanism that can help
> with rolling back these broken scale operations, have you tried that?
> Furthermore we are planning to introduce some heap/GC related metrics soon
> (probably after the next release for 1.8.0) that may help us catching these
> issues.
>
> Cheers,
> Gyula
>
> On Mon, Nov 6, 2023 at 9:27 AM Yang LI <yang.hunter...@gmail.com> wrote:
>
>> Dear Flink Community,
>>
>> I am currently working on implementing auto-scaling for my Flink
>> application using the Flink operator's autoscaler. During testing, I
>> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when
>> the autoscaler attempted to scale down. This issue arises when the incoming
>> record rate decreases while the state size has not yet reduced
>> correspondingly. Despite numerous tests, managing this issue has been
>> difficult due to the lack of a parameter that allows for specifying a
>> cooldown period(essential for processing and reducing state size)prior to
>> actual scaling down. Moreover, determining an optimal duration for this
>> cooldown period is also not straightforward. I believe that enhancing the
>> autoscaler with a focus on memory checks or more broadly on stability
>> conditions could significantly address this issue.. Here are some potential
>> solutions that, in my opinion, could improve the situation:
>>
>>    1. Integrate heap memory-related metrics into the metric collection,
>>    coupled with a memory safety margin check within the autoscaler's 
>> algorithm.
>>
>>    2. Introduce a plugin system and a pre-rescaling step in the Flink
>>    operator's autoscaler, which would allow users to implement custom 
>> plugins.
>>    These plugins could host listeners that activate during the pre-hook step,
>>    adding an additional checkpoint before the algorithm executes. So we can
>>    keep blocking scaling down until custom checks are passed to ensure it is
>>    safe to proceed with scaling down.
>>
>>    3. Implement a parameter that establishes a stability threshold for
>>    heap memory usage percentage or jvm old gc (duration or count). In the
>>    event that the threshold is exceeded, the system would revert to the last
>>    stable scale in the scaling history. Then the stabilization interval would
>>    start to work, providing the Flink cluster with additional time to process
>>    and reduce the state size
>>
>>
>>
>> Let me know what you think about it! Thanks!
>>
>> Best,
>>
>> Yang LI
>>
>

Reply via email to