Hi all,

When running the batch WordCount example,  I configured the job execution
mode
as BATCH_FORCED, and failover-strategy as region, I manually injected some
errors to let the execution fail in different phases. In some cases, the
job could
recovery from failover and became succeed, but in some cases, the job
retried
several times and failed.

Example:
- If the failure occurred before task read data, e.g., failed before
invokable.invoke() in Task.java, failover could succeed.
- If the failure occurred after task having read data, failover did not
work.

Problem diagnose:
Running the example described before, each ExecutionVertex is defined as
a restart region, and the ResultPartitionType between executions is
BLOCKING.
Thus, SpillableSubpartition and SpillableSubpartitionView are used to
write/read
shuffle data, and data blocks are described as BufferConsumers stored in a
list
called buffers, when task requires input data from
SpillableSubpartitionView,
BufferConsumers are REMOVED from buffers. Thus, when failures occurred
after having read data, some BufferConsumers have already released.
Although tasks retried, the input data is incomplete.

Fix Proposal:
- BufferConsumer should not be removed from buffers until the consumed
ExecutionVertex is terminal.
- SpillableSubpartition should not be released until the consumed
ExecutionVertex is terminal.
- SpillableSubpartition could creates multi SpillableSubpartitionViews,
each of which is corresponding to a ExecutionAttempt.

Best,
Bo

Reply via email to