Eric Nascimento created FLINK-38582:
---------------------------------------

             Summary: DeltaGlobalCommitter stopped commit to delta log
                 Key: FLINK-38582
                 URL: https://issues.apache.org/jira/browse/FLINK-38582
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Task
    Affects Versions: 1.20.3
            Reporter: Eric Nascimento


Hello,

The _DeltaGlobalCommitter_ stopped committing new data to the table again, 
mirroring an issue encountered previously.

However, the current behavior is significantly harder to reproduce compared to 
the last instance, where any scale-up triggered the fault. We believe that 
scaling up the job was the trigger condition, though we didn't manage to 
successfully reproduce it.
h3. –
h3. The Problem

The global committer expects the number of committables to match exactly the 
number of task managers, but this isn't true for our failing deployments. The 
mismatch is stored in persistent state and survives restarts, preventing 
recovery.

This suggests the failure is related to changing the number of task managers 
and storing an inconsistent number of committables in state. The conditions 
that cause this inconsistent state are unknown.

What we can say for sure is that our application is stuck in a specific 
checkpoint that can't get pass the {_}hasGloballyReceivedAll, and our 
`{_}{_}flink_taskmanager_job_task_operator_pendingCommittables{_}` is growing 
non-stop.

*Key Configurations*
{code:java}
execution.checkpointing.unaligned.enabled: "true"
execution.checkpointing.interval: "300000"
execution.checkpointing.timeout: "45000"
upgradeMode: last-state
jobmanager.scheduler: adaptive
jobmanager.adaptive-scheduler.resource-wait-timeout: 10m 
{code}
 
–

The issue stems from a condition check in 
{*}CheckpointCommittableManagerImpl{*}. Commits only proceed when:
 
{noformat}
subtasksCommittableManagers.size() == numberOfSubtasks    && all managers have 
received their data   {noformat}
 
For our affected deployments, these numbers don't match. The values come from 
persisted state via *CheckpointSimpleVersionedSerializer* are 
_subtasksCommittableManagers = 1 and numberOfSubstasks = 2_
h3. Observed Behavior
 * GlobalCommitterOperator is instantiated but commits never execute due to 
failing conditional checks ({*}hasGloballyReceivedAll{*})
 * Parquet files are written successfully (local writes work)
 * Files are never committed to the Delta table (global commit fails)
 * Not all deployments had the issue only 2/20 entered in this state.
 * When the issue occurred, deployments were unstable:
 * Frequent restarts due to resource issues

h3. What've ruled out
 * Wrong Flink version/Docker image (verified 1.20.3)
 * Flink library conflicts in shadowJar (excluded)
 * Checkpoint failures as root cause (some expired before completion but, we 
can't see evidences that this is the issue).

h3. How to reproduce

Sadly, we don't know how. We tried multiple things, but we were unable to get 
the application to enter this state again.

We are more than willing and available to try more but, we might need some 
advice as to what we can do reproduce this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to