Hi Gyula, Thank you for the feedback! With your permission, I plan to integrate the implementation into the flink-kubernetes-operator-autoscaler module to test it on my env. Subsequently, maybe contribute these changes back to the community by submitting a pull request to the GitHub repository in the coming months.
Best, Yang On Tue, 7 Nov 2023 at 19:08, Gyula Fóra <gyula.f...@gmail.com> wrote: > Sounds like a lot of work for very little gain to me. If you really feel > that there is some room for improvement with the current implementation, it > may be simpler to fix that . > > Gyula > > On Tue, 7 Nov 2023 at 01:20, Yang LI <yang.hunter...@gmail.com> wrote: > >> Thanks for the information! >> >> I haven't tested Kuberntes's built-in rollback mechanism yet. I feel like >> I can create another independent operator which detects flink application >> jvm memory and triggers rollback. >> >> Another solution I would like to discuss is also to implement an >> independent operator. This operator do following things: >> >> - Retrieve the state size metrics for Flink applications from >> Prometheus. >> - Gather current and recommended parallelism metrics from the Flink >> operator, also reported in Prometheus. >> - If a downscale is advised, I would calculate whether the new >> cluster configuration, considering state size and JVM heap max size, can >> support the entire state; if not, the operation would be halted. >> - If feasible, this operator would manage the rescaling process >> similarly to the Flink operator, either by making API requests or by >> applying a kubectl patch to the FlinkDeployment CRD. >> >> By doing this we could achieve something similar to what we can do with a >> plugin system, Of course in this case I'll disable scaling of the flink >> operator, Do you think it could work? >> >> Best, >> Yang >> >> On Mon, 6 Nov 2023 at 23:43, Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hey! >>> >>> Bit of a tricky problem, as it's not really possible to know that the >>> job will be able to start with lower parallelism in some cases. Custom >>> plugins may work but that would be an extremely complex solution at this >>> point. >>> >>> The Kubernetes operator has a built-in rollback mechanism that can help >>> with rolling back these broken scale operations, have you tried that? >>> Furthermore we are planning to introduce some heap/GC related metrics soon >>> (probably after the next release for 1.8.0) that may help us catching these >>> issues. >>> >>> Cheers, >>> Gyula >>> >>> On Mon, Nov 6, 2023 at 9:27 AM Yang LI <yang.hunter...@gmail.com> wrote: >>> >>>> Dear Flink Community, >>>> >>>> I am currently working on implementing auto-scaling for my Flink >>>> application using the Flink operator's autoscaler. During testing, I >>>> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when >>>> the autoscaler attempted to scale down. This issue arises when the incoming >>>> record rate decreases while the state size has not yet reduced >>>> correspondingly. Despite numerous tests, managing this issue has been >>>> difficult due to the lack of a parameter that allows for specifying a >>>> cooldown period(essential for processing and reducing state size)prior to >>>> actual scaling down. Moreover, determining an optimal duration for this >>>> cooldown period is also not straightforward. I believe that enhancing the >>>> autoscaler with a focus on memory checks or more broadly on stability >>>> conditions could significantly address this issue.. Here are some potential >>>> solutions that, in my opinion, could improve the situation: >>>> >>>> 1. Integrate heap memory-related metrics into the metric >>>> collection, coupled with a memory safety margin check within the >>>> autoscaler's algorithm. >>>> >>>> 2. Introduce a plugin system and a pre-rescaling step in the Flink >>>> operator's autoscaler, which would allow users to implement custom >>>> plugins. >>>> These plugins could host listeners that activate during the pre-hook >>>> step, >>>> adding an additional checkpoint before the algorithm executes. So we can >>>> keep blocking scaling down until custom checks are passed to ensure it >>>> is >>>> safe to proceed with scaling down. >>>> >>>> 3. Implement a parameter that establishes a stability threshold for >>>> heap memory usage percentage or jvm old gc (duration or count). In the >>>> event that the threshold is exceeded, the system would revert to the >>>> last >>>> stable scale in the scaling history. Then the stabilization interval >>>> would >>>> start to work, providing the Flink cluster with additional time to >>>> process >>>> and reduce the state size >>>> >>>> >>>> >>>> Let me know what you think about it! Thanks! >>>> >>>> Best, >>>> >>>> Yang LI >>>> >>>