1u0 commented on a change in pull request #8523: [FLINK-12481][runtime] Invoke 
timer callback in task thread (via mailbox)
URL: https://github.com/apache/flink/pull/8523#discussion_r287741194
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1358,4 +1358,19 @@ public void actionsUnavailable() throws 
InterruptedException {
                        mailbox.putMail(actionUnavailableLetter);
                }
        }
+
+       private class TimerInvocationContext implements 
SystemProcessingTimeService.ScheduledCallbackExecutionContext {
+               @Override
+               public void invoke(ProcessingTimeCallback callback, long 
timestamp) throws InterruptedException {
+                       mailbox.putMail(() -> {
+                               synchronized (getCheckpointLock()) {
 
 Review comment:
   Hi @Aitozi, thank you for the review!
   
   Personally, I'm not sure if this lock can be dropped at current stage (I'm 
yet trying to understand the runtime and which things happen in different tasks 
in parallel).
   
   I've preserved it to keep old semantics when the callback was invoked in the 
timer's service thread pool (just to be "safe"). My reasoning was that some 
implementations of `onProcessingTime` may be missing proper synchronization 
lock (hence, the original reason why the timer service had it in the first 
place) **and** at the current stage, there are still some additional threads 
that may access tasks' state and make some operations (in particular methods 
related to checkpoints handling).
   
   Although `SourceStreamTask` already has such lock around invocation in it's 
own mailbox loop, but the base `StreamTask` doesn't have it (the base class 
make invoke some letters before reaching the default action).
   
   Stefan also thinks that this lock is not needed anymore. I can try to read 
the code again more thoroughly, to check if `onProcessingTime` is either
    * guaranteed to be executed under `SourceStreamTask.defaultAction`;
    * use locking internally, as implementation detail;
    * don't need locking at all.
   
   Regarding timer cancellation, my thought is that it isn't guaranteed anyway: 
the code that use those timers, may cancel the timer, but it didn't have any 
guarantees if the timer has already fired and the callback was already invoked.
   So, to support cancellation of `ProcessingTimeCallback`s that are already in 
the mailbox, would be a slight optimization (by stretching cancellation time 
window untill the callback is in the mailbox). But not having it, should be not 
worse situation as before, imo.
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to