featzhang created FLINK-39176:
---------------------------------

             Summary: Introduce Node Health Management & Quarantine Framework 
for ResourceManager
                 Key: FLINK-39176
                 URL: https://issues.apache.org/jira/browse/FLINK-39176
             Project: Flink
          Issue Type: New Feature
          Components: Runtime / Configuration, Runtime / Coordination
            Reporter: featzhang


What is the purpose of the change

Currently, Apache Flink lacks a comprehensive, cluster-level node health 
management mechanism. While FLIP-224 introduces a blocklist for speculative 
execution in batch jobs, it has several limitations:

* Not applicable to streaming jobs
* Not a general-purpose faulty node governance mechanism  
* No complete Web UI integration
* No full recovery support after RM/JM failover
* Not a cluster-level resource scheduling quarantine mechanism

This improvement introduces a Node Health Management & Quarantine Framework at 
the ResourceManager level to provide manual node quarantine capabilities with 
automatic slot filtering.

Brief change log

* Introduce NodeHealthManager abstraction with pluggable implementations
* Integrate with SlotManager for filtering quarantined nodes during slot 
allocation
* Add REST API endpoints for manual node quarantine management 
{code:java}
(POST/GET/DELETE /cluster/nodes/{nodeId}/quarantine)
{code}

* Add configuration options (cluster.node-health.enabled, 
cluster.node-health.default-duration)
* Add cleanup scheduler for expired quarantine entries
* Comprehensive unit and integration tests

Verifying this change

This change can be verified by:
* Unit tests for NodeHealthManager implementations
* Integration test NodeQuarantineSlotFilteringITCase verifying end-to-end slot 
filtering
* Manual testing of REST API endpoints
* Configuration validation tests

Does this pull request potentially affect one of the following parts:

* Dependencies (does it add or upgrade a dependency): No
* The public API, i.e., is any changed class annotated with @Public(Evolving): 
No
* The serializers: No
* The runtime per-record code paths: No
* Anything that affects deployment or recovery: Yes (ResourceManager level 
changes)
* The S3 file system connector: No

Documentation

* Does this pull request introduce a new feature? Yes
* If yes, how is the feature documented? JavaDocs, Configuration docs

Implementation Details

Architecture:
{code}
NodeHealthManager (Interface)
├── DefaultNodeHealthManager (ConcurrentHashMap-based)
└── NoOpNodeHealthManager (Disabled state)

Integration Point:
SlotManager.allocateSlots() → filter quarantined nodes
{code}

Key Classes:
* org.apache.flink.runtime.resourcemanager.health.NodeHealthManager
* org.apache.flink.runtime.resourcemanager.health.NodeHealthStatus
* org.apache.flink.runtime.rest.handler.cluster.NodeQuarantineHandler

Configuration Options:
{code}
cluster.node-health.enabled: false (default)
cluster.node-health.default-duration: 10min
cluster.node-health.max-entries: 1000
{code}

REST API Endpoints:
{code}
POST   /cluster/nodes/{nodeId}/quarantine
GET    /cluster/nodes/quarantine
DELETE /cluster/nodes/{nodeId}/quarantine
{code}

Acceptance Criteria:
* NodeHealthManager interface and implementations
* Integration with SlotManagerImpl for slot filtering
* REST API handlers with proper request/response bodies
* Configuration options with backward compatibility (disabled by default)
* Unit tests achieving >90% code coverage
* Integration test verifying end-to-end functionality
* Documentation updates for new configuration options
* No performance impact when feature is disabled

Related Issues:
* FLIP-224: Speculative Execution for Batch Jobs
* Future work: Automatic failure detection and health scoring



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to