[ https://issues.apache.org/jira/browse/FLINK-21996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17318585#comment-17318585 ]
Kezhu Wang edited comment on FLINK-21996 at 4/10/21, 8:11 PM: -------------------------------------------------------------- Is there any design guarantee for no source operator coordinator in checkpointing [~sewen] ? I did not see one from docs, nor exclusion either. For no source operator coordinator, it is possible that events sending before checkpoint arrives after checkpoint in case of all good. A relative long latency between sending and arriving could make checkpoint from upstream arrives earlier than events. I constructed a [test case|https://github.com/kezhuw/flink/commit/b3aeb30298ace48c8ea11be488281d6eba8930b1#diff-7007c5266035493493e3a1476d98442bf50a7f023b801b48c6b6dc15436c9499R657] for demonstration. was (Author: kezhuw): Is there any design guarantee for no source operator coordinator in [~sewen] ? I did not see one from docs, nor exclusion either. For no source operator coordinator, it is possible that events sending before checkpoint arrives after checkpoint in case of all good. A relative long latency between sending and arriving could could make checkpoint from upstream arrives earlier than events. I constructed a [test case|https://github.com/kezhuw/flink/commit/b3aeb30298ace48c8ea11be488281d6eba8930b1#diff-7007c5266035493493e3a1476d98442bf50a7f023b801b48c6b6dc15436c9499R657] for demonstration. > Transient RPC failure without TaskManager failure can lead to split > assignment loss > ----------------------------------------------------------------------------------- > > Key: FLINK-21996 > URL: https://issues.apache.org/jira/browse/FLINK-21996 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.12.2 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Blocker > Fix For: 1.13.0 > > > NOTE: This bug has not been actually observed. It is based on reviews of the > current implementation. > I would expect it to be a pretty rare case, bu at scale, even the rare cases > happen often enough. > h2. Problem > Intermediate RPC messages from JM to TM can get dropped, even when the TM is > not marked as failed. > That can happen when the connection can be recovered before the heartbeat > times out. > So RPCs generally retry, or handle failures: For example Deploy-Task-RPC > retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers > a new checkpoint. > The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a > Future with the acknowledgement. But if that one fails, we are in the > situation where we do not know whether the event sending was successful or > not (only the ack failed). > This is especially tricky for split assignments and checkpoints. Consider > this sequence of actions: > 1. Coordinator assigns a split. Ack not yet received. > 2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so > is not included on the Coordinator. > 3. Split assignment RPC response is "failed". > 4. Checkpoint completes. > Now we don't know whether the split was in the checkpoint on the Operator > (TaskManager) or not, and with that we don't know whether we should add it > back to the coordinator. We need to do something to make sure the split is > now either on the coordinator or on the Operator. Currently, the split is > implicitly assumed to be on the Operator; if it isn't, then that split is > lost. > Not, it is worth pointing out that this is a pretty rare situation, because > it means that the RPC with the split assignment fails and the one for the > checkpoint succeeds, even though they are in close proximity. The way the > Akka-based RPC transport works (with retries, etc.), this can happen, but > isn't very likely. That why we haven't so far seen this bug in practice or > haven't gotten a report for it, yet. > h2. Proposed solution > The solution has two components: > 1. Fallback to consistent point: If the system doesn't know whether two > parts are still consistent with each other (here coordinator and Operator), > fall back to a consistent point. Here that is the case when the Ack-Future > for the "Send Operator Event" RPC fails or times out. Then we call the > scheduler to trigger a failover of the target operator to latest checkpoint > and signaling the coordinator the same. That restores consistency. We can > later optimize this (see below). > 2. We cannot trigger checkpoints while we are "in limbo" concerning our > knowledge about splits. Concretely that means that the Coordinator can only > acknowledge the checkpoint once the Acks for pending Operator Event RPCs > (Assign-Splits) have arrived. The checkpoint future is conditional on all > pending RPC futures. If the RPC futures fail (or time out) then the > checkpoint cannot complete (and the target operator will anyways go through a > failover). In the common case, RPC round trip time is milliseconds, which > would be added to the checkpoint latency if the checkpoint happends to > overlap with a split assignment (most won't). > h2. Possible Future Improvements > Step (1) above can be optimized by going with retries first and sequence > numbers to deduplicate the calls. That can help reduce the number of cases > were a failover is needed. However, the number of situations where the RPC > would need a retry and has a chance of succeeding (the TM is not down) should > be very few to begin with, so whether this optimization is worth it remains > to be seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)