Eric Nascimento created FLINK-38582:
---------------------------------------
Summary: DeltaGlobalCommitter stopped commit to delta log
Key: FLINK-38582
URL: https://issues.apache.org/jira/browse/FLINK-38582
Project: Flink
Issue Type: Bug
Components: Runtime / Task
Affects Versions: 1.20.3
Reporter: Eric Nascimento
Hello,
The _DeltaGlobalCommitter_ stopped committing new data to the table again,
mirroring an issue encountered previously.
However, the current behavior is significantly harder to reproduce compared to
the last instance, where any scale-up triggered the fault. We believe that
scaling up the job was the trigger condition, though we didn't manage to
successfully reproduce it.
h3. –
h3. The Problem
The global committer expects the number of committables to match exactly the
number of task managers, but this isn't true for our failing deployments. The
mismatch is stored in persistent state and survives restarts, preventing
recovery.
This suggests the failure is related to changing the number of task managers
and storing an inconsistent number of committables in state. The conditions
that cause this inconsistent state are unknown.
What we can say for sure is that our application is stuck in a specific
checkpoint that can't get pass the {_}hasGloballyReceivedAll, and our
`{_}{_}flink_taskmanager_job_task_operator_pendingCommittables{_}` is growing
non-stop.
*Key Configurations*
{code:java}
execution.checkpointing.unaligned.enabled: "true"
execution.checkpointing.interval: "300000"
execution.checkpointing.timeout: "45000"
upgradeMode: last-state
jobmanager.scheduler: adaptive
jobmanager.adaptive-scheduler.resource-wait-timeout: 10m
{code}
–
The issue stems from a condition check in
{*}CheckpointCommittableManagerImpl{*}. Commits only proceed when:
{noformat}
subtasksCommittableManagers.size() == numberOfSubtasks && all managers have
received their data {noformat}
For our affected deployments, these numbers don't match. The values come from
persisted state via *CheckpointSimpleVersionedSerializer* are
_subtasksCommittableManagers = 1 and numberOfSubstasks = 2_
h3. Observed Behavior
* GlobalCommitterOperator is instantiated but commits never execute due to
failing conditional checks ({*}hasGloballyReceivedAll{*})
* Parquet files are written successfully (local writes work)
* Files are never committed to the Delta table (global commit fails)
* Not all deployments had the issue only 2/20 entered in this state.
* When the issue occurred, deployments were unstable:
* Frequent restarts due to resource issues
h3. What've ruled out
* Wrong Flink version/Docker image (verified 1.20.3)
* Flink library conflicts in shadowJar (excluded)
* Checkpoint failures as root cause (some expired before completion but, we
can't see evidences that this is the issue).
h3. How to reproduce
Sadly, we don't know how. We tried multiple things, but we were unable to get
the application to enter this state again.
We are more than willing and available to try more but, we might need some
advice as to what we can do reproduce this.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)