Hi Gyula, Well I tried your example in local mini-cluster, and it seems the source can take checkpoints but it will block in the following AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until the current `processElement` finishes its execution. In your example, the element queue of `AsyncWaitOperator` will end up full and `processElement` will be blocked at `addToWorkQueue`. Even though it will call `mailboxExecutor.yield();`, it still leaves the checkpoint barrier unprocessed since the priority of the barrier is -1, lower than the one `yield()` should handle. I verified this using single-step debugging.
And if one element could finish its async io, the cp barrier can be processed afterwards. For example: ``` env.getCheckpointConfig().enableUnalignedCheckpoints(); env.getCheckpointConfig().setCheckpointInterval(10000); // 10s interval env.getConfig().setParallelism(1); AsyncDataStream.orderedWait( env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(), new AsyncFunction<Long, Long>() { boolean first = true; @Override public void asyncInvoke(Long aLong, ResultFuture<Long> resultFuture) { if (first) { Executors.newSingleThreadExecutor().execute(() -> { try { Thread.sleep(20000); // process after 20s, only for the first one. } catch (Throwable e) {} LOG.info("Complete one"); resultFuture.complete(Collections.singleton(1L)); }); first = false; } } }, 24, TimeUnit.HOURS, 1) .print(); ``` The checkpoint 1 can be normally finished after the "Complete one" log print. I guess the users have no means to solve this problem, we might optimize this later. Best, Zakelly On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Hey all! > > I encountered a strange and unexpected behaviour when trying to use > unaligned checkpoints with AsyncIO. > > If the async operation queue is full and backpressures the pipeline > completely, then unaligned checkpoints cannot be completed. To me this > sounds counterintuitive because one of the benefits of the AsyncIO would be > that we can simply checkpoint the queue and not have to wait for the > completion. > > To repro you can simply run: > > AsyncDataStream.orderedWait( > env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(), > new AsyncFunction<Long, Long>() { > @Override > public void asyncInvoke(Long aLong, ResultFuture<Long> > resultFuture) {} > }, > 24, > TimeUnit.HOURS, > 1) > .print(); > > This pipeline will completely backpressure the source and checkpoints do > not progress even though they are unaligned. Already the source cannot take > a checkpoint it seems which for me is surprising because this is using the > new source interface. > > Does anyone know why this happens and if there may be a solution? > > Thanks > Gyula >