Dear Flink Community, I am currently working on implementing auto-scaling for my Flink application using the Flink operator's autoscaler. During testing, I encountered a "java.lang.OutOfMemoryError: Java heap space" exception when the autoscaler attempted to scale down. This issue arises when the incoming record rate decreases while the state size has not yet reduced correspondingly. Despite numerous tests, managing this issue has been difficult due to the lack of a parameter that allows for specifying a cooldown period(essential for processing and reducing state size)prior to actual scaling down. Moreover, determining an optimal duration for this cooldown period is also not straightforward. I believe that enhancing the autoscaler with a focus on memory checks or more broadly on stability conditions could significantly address this issue.. Here are some potential solutions that, in my opinion, could improve the situation:
1. Integrate heap memory-related metrics into the metric collection, coupled with a memory safety margin check within the autoscaler's algorithm. 2. Introduce a plugin system and a pre-rescaling step in the Flink operator's autoscaler, which would allow users to implement custom plugins. These plugins could host listeners that activate during the pre-hook step, adding an additional checkpoint before the algorithm executes. So we can keep blocking scaling down until custom checks are passed to ensure it is safe to proceed with scaling down. 3. Implement a parameter that establishes a stability threshold for heap memory usage percentage or jvm old gc (duration or count). In the event that the threshold is exceeded, the system would revert to the last stable scale in the scaling history. Then the stabilization interval would start to work, providing the Flink cluster with additional time to process and reduce the state size Let me know what you think about it! Thanks! Best, Yang LI