Hey all!

I encountered a strange and unexpected behaviour when trying to use
unaligned checkpoints with AsyncIO.

If the async operation queue is full and backpressures the pipeline
completely, then unaligned checkpoints cannot be completed. To me this
sounds counterintuitive because one of the benefits of the AsyncIO would be
that we can simply checkpoint the queue and not have to wait for the
completion.

To repro you can simply run:

AsyncDataStream.orderedWait(
    env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(),
    new AsyncFunction<Long, Long>() {
        @Override
        public void asyncInvoke(Long aLong, ResultFuture<Long>
resultFuture) {}
    },
    24,
    TimeUnit.HOURS,
    1)
    .print();

This pipeline will completely backpressure the source and checkpoints do
not progress even though they are unaligned. Already the source cannot take
a checkpoint it seems which for me is surprising because this is using the
new source interface.

Does anyone know why this happens and if there may be a solution?

Thanks
Gyula

Reply via email to