Hi all, I would like to start a discussion about introducing a Node Health Management and Quarantine framework at the ResourceManager level in Flink.
*Background* Currently, Flink provides a blocklist mechanism (FLIP-224 <https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism>), which was primarily designed for speculative execution in batch jobs. While this works well for mitigating slow nodes in specific scenarios, it does not provide a general-purpose node-level health management mechanism. In production environments, we have observed the following recurring issues: 1. TaskManagers that repeatedly fail due to hardware or JVM-level instability. 2. Nodes experiencing transient performance degradation (e.g., GC stalls, IO issues). 3. Heartbeat-flapping nodes that repeatedly rejoin and disrupt scheduling stability. 4. The lack of a centralized mechanism to temporarily isolate problematic nodes from scheduling decisions. At present, these scenarios are typically handled externally (e.g., Kubernetes taints, YARN node controls), or indirectly through repeated task failures. However, Flink itself does not provide a runtime-level node quarantine capability. *Problem Statement* There is currently no cluster-level, scheduling-aware mechanism in Flink to: - Mark a TaskManager as unhealthy. - Temporarily exclude it from slot allocation. - Automatically restore it after a configurable expiration period. - Provide visibility via metrics or REST API. This makes node-level fault governance reactive rather than proactive. *Proposal (High-Level)* The goal of this discussion is not to introduce a simple "blacklist", but to explore a lightweight and extensible Node Health Management layer integrated at the ResourceManager level. Key design principles: [image: image.png] 1. ResourceManager-level integration (Slot filtering before allocation). 2. No modification to existing scheduling algorithms. 3. Fully backward compatible and disabled by default. 4. Policy-driven and pluggable (manual + automatic triggers). 5. Minimal surface area for initial phase. *Possible Architecture Sketch* Introduce a NodeHealthManager component within ResourceManager: ResourceManager └── NodeHealthManager └── Slot filtering hook in SlotManager *Phase 1 (Minimal Scope)*: - Manual quarantine via REST API. - Slot filtering based on node state. - Configurable expiration time. - Basic metrics exposure. Future extensions (not in initial phase): - Failure-count-based triggering. - Heartbeat-based health scoring. - Performance-based triggers. - Integration with AdaptiveScheduler. *Discussion Points* 1. Should node-level health management be part of Flink runtime, or remain entirely external (e.g., K8s/YARN responsibility)? 2. Is ResourceManager the correct integration point? 3. Would a pluggable health policy abstraction be acceptable? 4. Are there existing efforts or related FLIPs that should be aligned with? *Backward Compatibility* The feature would be: - Disabled by default. - Transparent to existing deployments. - No changes to job semantics. I would appreciate feedback on whether this direction aligns with the community’s design philosophy, and whether drafting a formal FLIP would be appropriate. Looking forward to your thoughts. Best regards, Feat Zhang FLIP-XXX: Node Health Management and Quarantine Framework <https://drive.google.com/open?id=1-kRk01swcBaw6saBNJqoCVxD4vWfXiez-FaW_Oknz5c>
