Thanks for the information! I haven't tested Kuberntes's built-in rollback mechanism yet. I feel like I can create another independent operator which detects flink application jvm memory and triggers rollback.
Another solution I would like to discuss is also to implement an independent operator. This operator do following things: - Retrieve the state size metrics for Flink applications from Prometheus. - Gather current and recommended parallelism metrics from the Flink operator, also reported in Prometheus. - If a downscale is advised, I would calculate whether the new cluster configuration, considering state size and JVM heap max size, can support the entire state; if not, the operation would be halted. - If feasible, this operator would manage the rescaling process similarly to the Flink operator, either by making API requests or by applying a kubectl patch to the FlinkDeployment CRD. By doing this we could achieve something similar to what we can do with a plugin system, Of course in this case I'll disable scaling of the flink operator, Do you think it could work? Best, Yang On Mon, 6 Nov 2023 at 23:43, Gyula Fóra <gyula.f...@gmail.com> wrote: > Hey! > > Bit of a tricky problem, as it's not really possible to know that the job > will be able to start with lower parallelism in some cases. Custom plugins > may work but that would be an extremely complex solution at this point. > > The Kubernetes operator has a built-in rollback mechanism that can help > with rolling back these broken scale operations, have you tried that? > Furthermore we are planning to introduce some heap/GC related metrics soon > (probably after the next release for 1.8.0) that may help us catching these > issues. > > Cheers, > Gyula > > On Mon, Nov 6, 2023 at 9:27 AM Yang LI <yang.hunter...@gmail.com> wrote: > >> Dear Flink Community, >> >> I am currently working on implementing auto-scaling for my Flink >> application using the Flink operator's autoscaler. During testing, I >> encountered a "java.lang.OutOfMemoryError: Java heap space" exception when >> the autoscaler attempted to scale down. This issue arises when the incoming >> record rate decreases while the state size has not yet reduced >> correspondingly. Despite numerous tests, managing this issue has been >> difficult due to the lack of a parameter that allows for specifying a >> cooldown period(essential for processing and reducing state size)prior to >> actual scaling down. Moreover, determining an optimal duration for this >> cooldown period is also not straightforward. I believe that enhancing the >> autoscaler with a focus on memory checks or more broadly on stability >> conditions could significantly address this issue.. Here are some potential >> solutions that, in my opinion, could improve the situation: >> >> 1. Integrate heap memory-related metrics into the metric collection, >> coupled with a memory safety margin check within the autoscaler's >> algorithm. >> >> 2. Introduce a plugin system and a pre-rescaling step in the Flink >> operator's autoscaler, which would allow users to implement custom >> plugins. >> These plugins could host listeners that activate during the pre-hook step, >> adding an additional checkpoint before the algorithm executes. So we can >> keep blocking scaling down until custom checks are passed to ensure it is >> safe to proceed with scaling down. >> >> 3. Implement a parameter that establishes a stability threshold for >> heap memory usage percentage or jvm old gc (duration or count). In the >> event that the threshold is exceeded, the system would revert to the last >> stable scale in the scaling history. Then the stabilization interval would >> start to work, providing the Flink cluster with additional time to process >> and reduce the state size >> >> >> >> Let me know what you think about it! Thanks! >> >> Best, >> >> Yang LI >> >