1u0 commented on a change in pull request #8692: [FLINK-12804] Introduce 
mailbox-based ExecutorService
URL: https://github.com/apache/flink/pull/8692#discussion_r298128321
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
 ##########
 @@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks.mailbox;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * This class encapsulates the logic of the mailbox-based execution model. At 
the core of this model
+ * {@link #runMailboxLoop()} that continuously executes the provided {@link 
MailboxDefaultAction} in a loop. On each
+ * iteration, the method also checks if there are pending actions in the 
mailbox and executes such actions. This model
+ * ensures single-threaded execution between the default action (e.g. record 
processing) and mailbox actions (e.g.
+ * checkpoint trigger, timer firing, ...).
+ *
+ * <p>The {@link MailboxDefaultAction} interacts with this class through the 
{@link MailboxDefaultActionContext} to
+ * communicate control flow changes to the mailbox loop, e.g. that invocations 
of the default action are temporarily
+ * or permanently exhausted.
+ *
+ * <p>The design of {@link #runMailboxLoop()} is centered around the idea of 
keeping the expected hot path
+ * (default action, no mail) as fast as possible, with just a single volatile 
read per iteration in
+ * {@link Mailbox#hasMail}. This means that all checking of mail and other 
control flags (mailboxLoopRunning,
+ * suspendedDefaultAction) are always connected to #hasMail indicating true. 
This means that control flag changes in
+ * the mailbox thread can be done directly, but we must ensure that there is 
at least one action in the mailbox so that
+ * the change is picked up. For control flag changes by all other threads, 
that must happen through mailbox actions,
+ * this is automatically the case.
+ *
+ * <p>This class has a open-prepareClose-close lifecycle that is connected 
with and maps to the lifecycle of the
+ * encapsulated {@link Mailbox} (which is open-quiesce-close).
+ *
+ * <p>The method {@link #switchToLegacySourceCompatibilityMailboxLoop(Object)} 
exists to run the current sources
+ * (see {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask} with 
the mailbox model in a compatibility
+ * mode. Once we drop the old source interface for the new one (FLIP-27) this 
method can eventually go away.
+ * */
+public class MailboxProcessor {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(MailboxProcessor.class);
+
+       /** The mailbox data-structure that manages request for special 
actions, like timers, checkpoints, ... */
+       private final Mailbox mailbox;
+
+       /** Executor-style facade for client code to submit actions to the 
mailbox. */
+       private final TaskMailboxExecutorService taskMailboxExecutor;
+
+       /** Action that is repeatedly executed if no action request is in the 
mailbox. Typically record processing. */
+       private final MailboxDefaultAction mailboxDefaultAction;
+
+       /** Control flag to terminate the mailbox loop. Must only be accessed 
from mailbox thread. */
+       private boolean mailboxLoopRunning;
+
+       /**
+        * Remembers a currently active suspension of the default action. 
Serves as flag to indicate a suspended
+        * default action (suspended if not-null) and to reuse the object as 
return value in consecutive suspend attempts.
+        * Must only be accessed from mailbox thread.
+        */
+       private MailboxDefaultAction.SuspendedDefaultAction 
suspendedDefaultAction;
+
+       /** Special action that is used to terminate the mailbox loop. */
+       private final Runnable mailboxPoisonLetter;
+
+       public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction) {
+               this.mailboxDefaultAction = 
Preconditions.checkNotNull(mailboxDefaultAction);
+               this.mailbox = new MailboxImpl();
+               this.taskMailboxExecutor = new 
TaskMailboxExecutorServiceImpl(mailbox);
+               this.mailboxPoisonLetter = () -> mailboxLoopRunning = false;
+               this.mailboxLoopRunning = true;
+               this.suspendedDefaultAction = null;
+       }
+
+       /**
+        * Returns an executor service facade to submit actions to the mailbox.
+        */
+       public TaskMailboxExecutorService getTaskMailboxExecutor() {
+               return taskMailboxExecutor;
+       }
+
+       /**
+        * Lifecycle method to open the mailbox for action submission.
+        */
+       public void open() {
+               mailbox.open();
+       }
+
+       /**
+        * Lifecycle method to close the mailbox for action submission.
+        */
+       public void prepareClose() {
+               taskMailboxExecutor.shutdown();
+       }
+
+       /**
+        * Lifecycle method to close the mailbox for action 
submission/retrieval. This will cancel all instances of
+        * {@link java.util.concurrent.RunnableFuture} that are still contained 
in the mailbox.
+        */
+       public void close() {
+               
FutureUtils.cancelRunnableFutures(taskMailboxExecutor.shutdownNow());
+       }
+
+       /**
+        * Runs the mailbox processing loop. This is where the main work is 
done.
+        */
+       public void runMailboxLoop() throws Exception {
+
+               assert taskMailboxExecutor.isMailboxThread() :
+                       "StreamTask::run must be executed by declared mailbox 
thread!";
 
 Review comment:
   What do you think about using 
`Preconditions.checkState(taskMailboxExecutor.isMailboxThread(), "...
   ")` instead of `assert`s in this file? In case of loops, this adds not much 
overhead to the whole loop runs.
   
   Also, the `"StreamTask::run must be executed by declared mailbox thread!"` 
message is outdated after refactoring.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to