Thank you for the detailed analysis Zakelly. I think we should consider whether yield should process checkpoint barriers because this puts quite a serious limitation on the unaligned checkpoints in these cases. Do you know what is the reason behind the current priority setting? Is there a problem with processing the barrier here?
Gyula On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi Gyula, > > Well I tried your example in local mini-cluster, and it seems the source > can take checkpoints but it will block in the following AsyncWaitOperator. > IIUC, the unaligned checkpoint barrier should wait until the current > `processElement` finishes its execution. In your example, the element queue > of `AsyncWaitOperator` will end up full and `processElement` will be > blocked at `addToWorkQueue`. Even though it will call > `mailboxExecutor.yield();`, it still leaves the checkpoint barrier > unprocessed since the priority of the barrier is -1, lower than the one > `yield()` should handle. I verified this using single-step debugging. > > And if one element could finish its async io, the cp barrier can be > processed afterwards. For example: > ``` > env.getCheckpointConfig().enableUnalignedCheckpoints(); > env.getCheckpointConfig().setCheckpointInterval(10000); // 10s interval > env.getConfig().setParallelism(1); > AsyncDataStream.orderedWait( > env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(), > new AsyncFunction<Long, Long>() { > boolean first = true; > @Override > public void asyncInvoke(Long aLong, ResultFuture<Long> > resultFuture) { > if (first) { > Executors.newSingleThreadExecutor().execute(() > -> { > try { > Thread.sleep(20000); // process after > 20s, only for the first one. > } catch (Throwable e) {} > LOG.info("Complete one"); > > resultFuture.complete(Collections.singleton(1L)); > }); > first = false; > } > } > }, > 24, > TimeUnit.HOURS, > 1) > .print(); > ``` > The checkpoint 1 can be normally finished after the "Complete one" log > print. > > I guess the users have no means to solve this problem, we might optimize > this later. > > > Best, > Zakelly > > On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > >> Hey all! >> >> I encountered a strange and unexpected behaviour when trying to use >> unaligned checkpoints with AsyncIO. >> >> If the async operation queue is full and backpressures the pipeline >> completely, then unaligned checkpoints cannot be completed. To me this >> sounds counterintuitive because one of the benefits of the AsyncIO would be >> that we can simply checkpoint the queue and not have to wait for the >> completion. >> >> To repro you can simply run: >> >> AsyncDataStream.orderedWait( >> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(), >> new AsyncFunction<Long, Long>() { >> @Override >> public void asyncInvoke(Long aLong, ResultFuture<Long> >> resultFuture) {} >> }, >> 24, >> TimeUnit.HOURS, >> 1) >> .print(); >> >> This pipeline will completely backpressure the source and checkpoints do >> not progress even though they are unaligned. Already the source cannot take >> a checkpoint it seems which for me is surprising because this is using the >> new source interface. >> >> Does anyone know why this happens and if there may be a solution? >> >> Thanks >> Gyula >> >