Xintong Song created FLINK-19151:
------------------------------------
Summary: Flink does not normalize container resource with correct
configurations when Yarn FairScheduler is used
Key: FLINK-19151
URL: https://issues.apache.org/jira/browse/FLINK-19151
Project: Flink
Issue Type: Bug
Components: Deployment / YARN
Affects Versions: 1.11.2
Reporter: Xintong Song
h3. Problem
It's a Yarn protocol that the requested container resource will be normalized
for allocation. That means, the allocated container may have different resource
(larger than or equal to) compared to requested.
Currently, Flink matches the allocated containers to the original requests by
reading the Yarn configurations and calculate how the requested resources
should be normalized.
What has been overlooked is that, Yarn FairScheduler (and its subclass
SLSFairScheduler) has overridden the normalization behavior. To be specific,
* By default, Yarn normalize container resources to integer multiple of
"yarn.scheduler.minimum-allocation-[mb|vcores]"
* FairScheduler normalize container resources to integer multiple of
"yarn.resource-types.[memory-mb|vcores].increment-allocation" (or the
deprecated keys "yarn.scheduler.increment-allocation-[mb|vcores]"), while
making sure the resource is no less than
"yarn.scheduler.minimum-allocation-[mb|vcores]"
h3. Proposal for short term solution
To fix this problem, a quick and easy way is to also read Yarn configuration
and learn which scheduler is used, and perform normalization calculations
accordingly. This should be good enough to cover behaviors of all the
schedulers that Yarn currently provides. The limitation is that, Flink will not
be able to deal with custom Yarn schedulers which overrides the normalization
behaviors.
h3. Proposal for long term solution
For long term, it would be good to use Yarn
ContainerRequest#allocationRequestId to match the allocated containers with the
original requests, so that Flink no longer needs to understand how Yarn
normalize container resources.
Yarn ContainerRequest#allocationRequestId is introduced in Hadoop 2.9, while
ATM Flink claims to be compatible with Hadoop 2.4+. Therefore, this solution
would not work at the moment.
Another idea is to support various Hadoop versions with different container
matching logics. We can abstract the container matching logics into a
dedicating component, and provide different implementations for it. This will
allow Flink to take advantages of the new versions (e.g., work well with custom
schedulers), while stay compatible with the old versions with without those
advantages.
Given that we need the resource based matching anyway for the old Hadoop
versions, and the cost for maintaining two sets of matching logics, I tend to
think this approach as a back-up option to be worked on when we indeed see a
need for it.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)