I agree. Also create https://issues.apache.org/jira/browse/FLINK-34704 for tracking and further discussion.
Best, Zakelly On Fri, Mar 15, 2024 at 2:59 PM Gyula Fóra <gyula.f...@gmail.com> wrote: > Posting this to dev as well... > > Thanks Zakelly, > Sounds like a solution could be to add a new different version of yield > that would actually yield to the checkpoint barrier too. That way operator > implementations could decide whether any state modification may or may not > have happened and can optionally allow checkpoint to be taken in the > "middle of record processing". > > Gyula > > On Fri, Mar 15, 2024 at 3:49 AM Zakelly Lan <zakelly....@gmail.com> wrote: > >> Hi Gyula, >> >> Processing checkpoint halfway through `processElement` is problematic. >> The current element will not be included in the input in-flight data, and >> we cannot assume it has taken effect on the state by user code. So the best >> way is to treat `processElement` as an 'atomic' operation. I guess that's >> why the priority of the cp barrier is set low. >> However, the AsyncWaitOperator is a special case where we know the >> element blocked at `addToWorkQueue` has not started triggering the >> userFunction. Thus I'd suggest putting the element in the queue when the cp >> barrier comes, and taking a snapshot of the whole queue afterwards. The >> problem will be solved. But this approach also involves some code >> modifications on the mailbox executor. >> >> >> Best, >> Zakelly >> >> On Thu, Mar 14, 2024 at 9:15 PM Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Thank you for the detailed analysis Zakelly. >>> >>> I think we should consider whether yield should process checkpoint >>> barriers because this puts quite a serious limitation on the unaligned >>> checkpoints in these cases. >>> Do you know what is the reason behind the current priority setting? Is >>> there a problem with processing the barrier here? >>> >>> Gyula >>> >>> On Thu, Mar 14, 2024 at 1:22 PM Zakelly Lan <zakelly....@gmail.com> >>> wrote: >>> >>>> Hi Gyula, >>>> >>>> Well I tried your example in local mini-cluster, and it seems the >>>> source can take checkpoints but it will block in the following >>>> AsyncWaitOperator. IIUC, the unaligned checkpoint barrier should wait until >>>> the current `processElement` finishes its execution. In your example, the >>>> element queue of `AsyncWaitOperator` will end up full and `processElement` >>>> will be blocked at `addToWorkQueue`. Even though it will call >>>> `mailboxExecutor.yield();`, it still leaves the checkpoint barrier >>>> unprocessed since the priority of the barrier is -1, lower than the one >>>> `yield()` should handle. I verified this using single-step debugging. >>>> >>>> And if one element could finish its async io, the cp barrier can be >>>> processed afterwards. For example: >>>> ``` >>>> env.getCheckpointConfig().enableUnalignedCheckpoints(); >>>> env.getCheckpointConfig().setCheckpointInterval(10000); // 10s interval >>>> env.getConfig().setParallelism(1); >>>> AsyncDataStream.orderedWait( >>>> env.fromSequence(Long.MIN_VALUE, >>>> Long.MAX_VALUE).shuffle(), >>>> new AsyncFunction<Long, Long>() { >>>> boolean first = true; >>>> @Override >>>> public void asyncInvoke(Long aLong, >>>> ResultFuture<Long> resultFuture) { >>>> if (first) { >>>> >>>> Executors.newSingleThreadExecutor().execute(() -> { >>>> try { >>>> Thread.sleep(20000); // process >>>> after 20s, only for the first one. >>>> } catch (Throwable e) {} >>>> LOG.info("Complete one"); >>>> >>>> resultFuture.complete(Collections.singleton(1L)); >>>> }); >>>> first = false; >>>> } >>>> } >>>> }, >>>> 24, >>>> TimeUnit.HOURS, >>>> 1) >>>> .print(); >>>> ``` >>>> The checkpoint 1 can be normally finished after the "Complete one" log >>>> print. >>>> >>>> I guess the users have no means to solve this problem, we might >>>> optimize this later. >>>> >>>> >>>> Best, >>>> Zakelly >>>> >>>> On Thu, Mar 14, 2024 at 5:41 PM Gyula Fóra <gyula.f...@gmail.com> >>>> wrote: >>>> >>>>> Hey all! >>>>> >>>>> I encountered a strange and unexpected behaviour when trying to use >>>>> unaligned checkpoints with AsyncIO. >>>>> >>>>> If the async operation queue is full and backpressures the pipeline >>>>> completely, then unaligned checkpoints cannot be completed. To me this >>>>> sounds counterintuitive because one of the benefits of the AsyncIO would >>>>> be >>>>> that we can simply checkpoint the queue and not have to wait for the >>>>> completion. >>>>> >>>>> To repro you can simply run: >>>>> >>>>> AsyncDataStream.orderedWait( >>>>> env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).shuffle(), >>>>> new AsyncFunction<Long, Long>() { >>>>> @Override >>>>> public void asyncInvoke(Long aLong, ResultFuture<Long> >>>>> resultFuture) {} >>>>> }, >>>>> 24, >>>>> TimeUnit.HOURS, >>>>> 1) >>>>> .print(); >>>>> >>>>> This pipeline will completely backpressure the source and checkpoints >>>>> do not progress even though they are unaligned. Already the source cannot >>>>> take a checkpoint it seems which for me is surprising because this is >>>>> using >>>>> the new source interface. >>>>> >>>>> Does anyone know why this happens and if there may be a solution? >>>>> >>>>> Thanks >>>>> Gyula >>>>> >>>>