[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230183#comment-17230183 ]
Roman Khachatryan commented on FLINK-20097: ------------------------------------------- OK, the tests fail with the increased checkpointing frequency plus some other settings updated. I don't know whether the root cause is what I described above. Changes: {code:java} long minCheckpoints = 100; env.enableCheckpointing(10); env.getCheckpointConfig().setAlignmentTimeout(0); env.getCheckpointConfig().setCheckpointTimeout(10); // minimum allowed env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE); // can be lower because of the high TolerableCheckpointFailureNumber // collector.checkThat(result.<Integer>getAccumulatorResult(NUM_FAILURES), equalTo(EXPECTED_FAILURES)); sink: if (random.nextInt(100) == 42) { Thread.sleep(7); } {code} Failures: shouldPerformUnalignedCheckpointOnParallelRemoteChannel: NUM_DUPLICATES > 0 (2 out of 70 runs) I've also observed recovery problems (corrupted stream) similar to FLINK-19681 likely caused by corrupted InputChannel state. > Race conditions in InputChannel.ChannelStatePersister (confirm) > --------------------------------------------------------------- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network > Affects Versions: 1.12.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)