Hi,

Thomas: 
There are no Jira tickets yet (or maybe there is something very old somewhere). 
First we want to discuss it, next present FLIP and at last create tickets :)

> if I understand correctly, then the proposal is to not block any
> input channel at all, but only log data from the backpressured channel (and
> make it part of the snapshot) until the barrier arrives

I would guess that it would be better to block the reads, unless we can already 
process the records from the blocked channel…

Paris:

Thanks for the explanation Paris. I’m starting to understand this more and I 
like the idea of snapshotting the state of an operator before receiving all of 
the checkpoint barriers - this would allow more things to happen at the same 
time instead of sequentially. As Zhijiang has pointed out there are some things 
not considered in your proposal: overtaking output buffers, but maybe those 
things could be incorporated together.

Another thing is that from the wiki description I understood that the initial 
checkpointing is not initialised by any checkpoint barrier, but by an 
independent call/message from the Observer. I haven’t played with this idea a 
lot, but I had some discussion with Nico and it seems that it might work:

1. JobManager sends and RPC “start checkpoint” to all tasks
2. Task (with two input channels l1 and l2) upon receiving RPC from 1., takes a 
snapshot of it's state and:
  a) broadcast checkpoint barrier down the stream to all channels (let’s ignore 
for a moment potential for this barrier to overtake the buffer output data)
  b) for any input channel for which it hasn’t yet received checkpoint barrier, 
the data are being added to the checkpoint
  c) once a channel (for example l1) receives a checkpoint barrier, the Task 
blocks reads from that channel (?)
  d) after all remaining channels (l2) receive checkpoint barriers, the Task  
first has to process the buffered data after that it can unblock the reads from 
the channels

Checkpoint barriers do not cascade/flow through different tasks here. 
Checkpoint barrier emitted from Task1, reaches only the immediate downstream 
Tasks. Thanks to this setup, total checkpointing time is not sum of 
checkpointing times of all Tasks one by one, but more or less max of the 
slowest Tasks. Right?

Couple of intriguing thoughts are:
 3. checkpoint barriers overtaking the output buffers
 4. can we keep processing some data (in order to not waste CPU cycles) after 
we have taking the snapshot of the Task. I think we could.

Piotrek

> On 14 Aug 2019, at 06:00, Thomas Weise <t...@apache.org> wrote:
> 
> Great discussion! I'm excited that this is already under consideration! Are
> there any JIRAs or other traces of discussion to follow?
> 
> Paris, if I understand correctly, then the proposal is to not block any
> input channel at all, but only log data from the backpressured channel (and
> make it part of the snapshot) until the barrier arrives? This is
> intriguing. But probably there is also a benefit of to not continue reading
> I1 since that could speed up retrieval from I2. Also, if the user code is
> the cause of backpressure, this would avoid pumping more data into the
> process function.
> 
> Thanks,
> Thomas
> 
> 
> On Tue, Aug 13, 2019 at 8:02 AM zhijiang <wangzhijiang...@aliyun.com.invalid>
> wrote:
> 
>> Hi Paris,
>> 
>> Thanks for the detailed sharing. And I think it is very similar with the
>> way of overtaking we proposed before.
>> 
>> There are some tiny difference:
>> The way of overtaking might need to snapshot all the input/output queues.
>> Chandy Lamport seems only need to snaphost (n-1) input channels after the
>> first barrier arrives, which might reduce the state sizea bit. But normally
>> there should be less buffers for the first input channel with barrier.
>> The output barrier still follows with regular data stream in Chandy
>> Lamport, the same way as current flink. For overtaking way, we need to pay
>> extra efforts to make barrier transport firstly before outque queue on
>> upstream side, and change the way of barrier alignment based on receiving
>> instead of current reading on downstream side.
>> In the backpressure caused by data skew, the first barrier in almost empty
>> input channel should arrive much eariler than the last heavy load input
>> channel, so the Chandy Lamport could benefit well. But for the case of all
>> balanced heavy load input channels, I mean the first arrived barrier might
>> still take much time, then the overtaking way could still fit well to speed
>> up checkpoint.
>> Anyway, your proposed suggestion is helpful on my side, especially
>> considering some implementation details .
>> 
>> Best,
>> Zhijiang
>> ------------------------------------------------------------------
>> From:Paris Carbone <seniorcarb...@gmail.com>
>> Send Time:2019年8月13日(星期二) 14:03
>> To:dev <dev@flink.apache.org>
>> Cc:zhijiang <wangzhijiang...@aliyun.com>
>> Subject:Re: Checkpointing under backpressure
>> 
>> yes! It’s quite similar I think.  Though mind that the devil is in the
>> details, i.e., the temporal order actions are taken.
>> 
>> To clarify, let us say you have a task T with two input channels I1 and I2.
>> The Chandy Lamport execution flow is the following:
>> 
>> 1) T receives barrier from  I1 and...
>> 2)  ...the following three actions happen atomically
>> I )  T snapshots its state T*
>> II)  T forwards marker to its outputs
>> III) T starts logging all events of I2 (only) into a buffer M*
>> - Also notice here that T does NOT block I1 as it does in aligned
>> snapshots -
>> 3) Eventually T receives barrier from I2 and stops recording events. Its
>> asynchronously captured snapshot is now complete: {T*,M*}.
>> Upon recovery all messages of M* should be replayed in FIFO order.
>> 
>> With this approach alignment does not create a deadlock situation since
>> anyway 2.II happens asynchronously and messages can be logged as well
>> asynchronously during the process of the snapshot. If there is
>> back-pressure in a pipeline the cause is most probably not this algorithm.
>> 
>> Back to your observation, the answer : yes and no.  In your network model,
>> I can see the logic of “logging” and “committing” a final snapshot being
>> provided by the channel implementation. However, do mind that the first
>> barrier always needs to go “all the way” to initiate the Chandy Lamport
>> algorithm logic.
>> 
>> The above flow has been proven using temporal logic in my phd thesis in
>> case you are interested about the proof.
>> I hope this helps a little clarifying things. Let me know if there is any
>> confusing point to disambiguate. I would be more than happy to help if I
>> can.
>> 
>> Paris
>> 
>>> On 13 Aug 2019, at 13:28, Piotr Nowojski <pi...@ververica.com> wrote:
>>> 
>>> Thanks for the input. Regarding the Chandy-Lamport snapshots don’t you
>> still have to wait for the “checkpoint barrier” to arrive in order to know
>> when have you already received all possible messages from the upstream
>> tasks/operators? So instead of processing the “in flight” messages (as the
>> Flink is doing currently), you are sending them to an “observer”?
>>> 
>>> In that case, that’s sounds similar to “checkpoint barriers overtaking
>> in flight records” (aka unaligned checkpoints). Just for us, the observer
>> is a snapshot state.
>>> 
>>> Piotrek
>>> 
>>>> On 13 Aug 2019, at 13:14, Paris Carbone <seniorcarb...@gmail.com>
>> wrote:
>>>> 
>>>> Interesting problem! Thanks for bringing it up Thomas.
>>>> 
>>>> Ignore/Correct me if I am wrong but I believe Chandy-Lamport snapshots
>> [1] would help out solve this problem more elegantly without sacrificing
>> correctness.
>>>> - They do not need alignment, only (async) logging for in-flight
>> records between the time the first barrier is processed until the last
>> barrier arrives in a task.
>>>> - They work fine for failure recovery as long as logged records are
>> replayed on startup.
>>>> 
>>>> Flink’s “alligned” savepoints would probably be still necessary for
>> transactional sink commits + any sort of reconfiguration (e.g., rescaling,
>> updating the logic of operators to evolve an application etc.).
>>>> 
>>>> I don’t completely understand the “overtaking” approach but if you have
>> a concrete definition I would be happy to check it out and help if I can!
>>>> Mind that Chandy-Lamport essentially does this by logging things in
>> pending channels in a task snapshot before the barrier arrives.
>>>> 
>>>> -Paris
>>>> 
>>>> [1] https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm <
>> https://en.wikipedia.org/wiki/Chandy%E2%80%93Lamport_algorithm>
>>>> 
>>>>> On 13 Aug 2019, at 10:27, Piotr Nowojski <pi...@ververica.com> wrote:
>>>>> 
>>>>> Hi Thomas,
>>>>> 
>>>>> As Zhijiang has responded, we are now in the process of discussing how
>> to address this issue and one of the solution that we are discussing is
>> exactly what you are proposing: checkpoint barriers overtaking the in
>> flight data and make the in flight data part of the checkpoint.
>>>>> 
>>>>> If everything works well, we will be able to present result of our
>> discussions on the dev mailing list soon.
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 12 Aug 2019, at 23:23, zhijiang <wangzhijiang...@aliyun.com.INVALID>
>> wrote:
>>>>>> 
>>>>>> Hi Thomas,
>>>>>> 
>>>>>> Thanks for proposing this concern. The barrier alignment takes long
>> time in backpressure case which could cause several problems:
>>>>>> 1. Checkpoint timeout as you mentioned.
>>>>>> 2. The recovery cost is high once failover, because much data needs
>> to be replayed.
>>>>>> 3. The delay for commit-based sink is high in exactly-once.
>>>>>> 
>>>>>> For credit-based flow control from release-1.5, the amount of
>> in-flighting buffers before barrier alignment is reduced, so we could get a
>> bit
>>>>>> benefits from speeding checkpoint aspect.
>>>>>> 
>>>>>> In release-1.8, I guess we did not suspend the channels which already
>> received the barrier in practice. But actually we ever did the similar thing
>>>>>> to speed barrier alighment before. I am not quite sure that
>> release-1.8 covers this feature. There were some relevant discussions under
>> jira [1].
>>>>>> 
>>>>>> For release-1.10, the community is now discussing the feature of
>> unaligned checkpoint which is mainly for resolving above concerns. The
>> basic idea
>>>>>> is to make barrier overtakes the output/input buffer queue to speed
>> alignment, and snapshot the input/output buffers as part of checkpoint
>> state. The
>>>>>> details have not confirmed yet and is still under discussion. Wish we
>> could make some improvments for the release-1.10.
>>>>>> 
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-8523
>>>>>> 
>>>>>> Best,
>>>>>> Zhijiang
>>>>>> ------------------------------------------------------------------
>>>>>> From:Thomas Weise <t...@apache.org>
>>>>>> Send Time:2019年8月12日(星期一) 21:38
>>>>>> To:dev <dev@flink.apache.org>
>>>>>> Subject:Checkpointing under backpressure
>>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> One of the major operational difficulties we observe with Flink are
>>>>>> checkpoint timeouts under backpressure. I'm looking for both
>> confirmation
>>>>>> of my understanding of the current behavior as well as pointers for
>> future
>>>>>> improvement work:
>>>>>> 
>>>>>> Prior to introduction of credit based flow control in the network
>> stack [1]
>>>>>> [2], checkpoint barriers would back up with the data for all logical
>>>>>> channels due to TCP backpressure. Since Flink 1.5, the buffers are
>>>>>> controlled per channel, and checkpoint barriers are only held back for
>>>>>> channels that have backpressure, while others can continue processing
>>>>>> normally. However, checkpoint barriers still cannot "overtake data",
>>>>>> therefore checkpoint alignment remains affected for the channel with
>>>>>> backpressure, with the potential for slow checkpointing and timeouts.
>>>>>> Albeit the delay of barriers would be capped by the maximum in-transit
>>>>>> buffers per channel, resulting in an improvement compared to previous
>>>>>> versions of Flink. Also, the backpressure based checkpoint alignment
>> can
>>>>>> help the barrier advance faster on the receiver side (by suspending
>>>>>> channels that have already delivered the barrier). Is that accurate
>> as of
>>>>>> Flink 1.8?
>>>>>> 
>>>>>> What appears to be missing to completely unblock checkpointing is a
>>>>>> mechanism for checkpoints to overtake the data. That would help in
>>>>>> situations where the processing itself is the bottleneck and
>> prioritization
>>>>>> in the network stack alone cannot address the barrier delay. Was
>> there any
>>>>>> related discussion? One possible solution would be to drain incoming
>> data
>>>>>> till the barrier and make it part of the checkpoint instead of
>> processing
>>>>>> it. This is somewhat related to asynchronous processing, but I'm
>> thinking
>>>>>> more of a solution that is automated in the Flink runtime for the
>>>>>> backpressure scenario only.
>>>>>> 
>>>>>> Thanks,
>>>>>> Thomas
>>>>>> 
>>>>>> [1] https://flink.apache.org/2019/06/05/flink-network-stack.html
>>>>>> [2]
>>>>>> 
>> https://docs.google.com/document/d/1chTOuOqe0sBsjldA_r-wXYeSIhU2zRGpUaTaik7QZ84/edit#heading=h.pjh6mv7m2hjn
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
>> 

Reply via email to