Hi all,

I would like to start a discussion about introducing a Node Health
Management
and Quarantine framework at the ResourceManager level in Flink.

*Background*

Currently, Flink provides a blocklist mechanism (FLIP-224
<https://cwiki.apache.org/confluence/display/FLINK/FLIP-224%3A+Blocklist+Mechanism>),
which was primarily
designed for speculative execution in batch jobs. While this works well for
mitigating slow nodes in specific scenarios, it does not provide a
general-purpose
node-level health management mechanism.

In production environments, we have observed the following recurring issues:

1. TaskManagers that repeatedly fail due to hardware or JVM-level
instability.
2. Nodes experiencing transient performance degradation (e.g., GC stalls,
IO issues).
3. Heartbeat-flapping nodes that repeatedly rejoin and disrupt scheduling
stability.
4. The lack of a centralized mechanism to temporarily isolate problematic
nodes
   from scheduling decisions.

At present, these scenarios are typically handled externally (e.g.,
Kubernetes
taints, YARN node controls), or indirectly through repeated task failures.
However, Flink itself does not provide a runtime-level node quarantine
capability.

*Problem Statement*


There is currently no cluster-level, scheduling-aware mechanism in Flink to:

- Mark a TaskManager as unhealthy.
- Temporarily exclude it from slot allocation.
- Automatically restore it after a configurable expiration period.
- Provide visibility via metrics or REST API.

This makes node-level fault governance reactive rather than proactive.

*Proposal (High-Level)*


The goal of this discussion is not to introduce a simple "blacklist", but
to
explore a lightweight and extensible Node Health Management layer
integrated
at the ResourceManager level.

Key design principles:

[image: image.png]

1. ResourceManager-level integration (Slot filtering before allocation).
2. No modification to existing scheduling algorithms.
3. Fully backward compatible and disabled by default.
4. Policy-driven and pluggable (manual + automatic triggers).
5. Minimal surface area for initial phase.

*Possible Architecture Sketch*


Introduce a NodeHealthManager component within ResourceManager:

    ResourceManager
        └── NodeHealthManager
                └── Slot filtering hook in SlotManager

*Phase 1 (Minimal Scope)*:
- Manual quarantine via REST API.
- Slot filtering based on node state.
- Configurable expiration time.
- Basic metrics exposure.

Future extensions (not in initial phase):
- Failure-count-based triggering.
- Heartbeat-based health scoring.
- Performance-based triggers.
- Integration with AdaptiveScheduler.

*Discussion Points*

1. Should node-level health management be part of Flink runtime,
   or remain entirely external (e.g., K8s/YARN responsibility)?
2. Is ResourceManager the correct integration point?
3. Would a pluggable health policy abstraction be acceptable?
4. Are there existing efforts or related FLIPs that should be aligned with?

*Backward Compatibility*

The feature would be:

- Disabled by default.
- Transparent to existing deployments.
- No changes to job semantics.

I would appreciate feedback on whether this direction aligns with the
community’s design philosophy, and whether drafting a formal FLIP
would be appropriate.

Looking forward to your thoughts.

Best regards,
Feat Zhang

 FLIP-XXX: Node Health Management and Quarantine Framework
<https://drive.google.com/open?id=1-kRk01swcBaw6saBNJqoCVxD4vWfXiez-FaW_Oknz5c>

Reply via email to