lhotari commented on code in PR #24596:
URL: https://github.com/apache/pulsar/pull/24596#discussion_r2259876043


##########
pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadSafeScheduledExecutorService.java:
##########
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SingleThreadSafeScheduledExecutorService extends 
ScheduledThreadPoolExecutor

Review Comment:
   > I think the new class can benefit clearer, simpler and more efficient. 
   
   I don't think that implementing a complete ScheduledExecutorService is 
necessary at all. The implementation looks very complex, which isn't needed to 
address the problem.
   
   > BTW, we can hardly know whether the Runnble is convectively executing.
   
   Oh yes, now I see the problem. With a single threaded executor, there 
wouldn't be a way to skip scheduling the task if the previous task is overdue. 
   
   For a multi-threaded task executor, this type of wrapper would be useful:
   
   ```java
       public static Runnable preventConcurrentExecution(Runnable runnable, 
String taskName) {
           return new NonConcurrentRunnable(runnable, taskName);
       }
   
       private static final class NonConcurrentRunnable implements Runnable {
           private final Runnable runnable;
           private final String taskName;
           private final AtomicLong taskStarted = new AtomicLong(0);
   
           private NonConcurrentRunnable(Runnable runnable, String taskName) {
               this.runnable = runnable;
               this.taskName = taskName;
           }
   
           @Override
           public void run() {
               if (taskStarted.compareAndSet(0L, System.nanoTime())) {
                   try {
                       runnable.run();
                   } catch (Throwable t) {
                       LOGGER.error("Unexpected throwable caught when executing 
task '{}'", taskName, t);
                   } finally {
                       taskStarted.set(0L);
                   }
               } else {
                   long elapsedTimeNanos = System.nanoTime() - 
taskStarted.get();
                   LOGGER.warn(
                           "Runnable for task '{}' is already running, skipping 
execution. "
                           + "Elapsed time since current task started: {} ms", 
taskName,
                           TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos));
               }
           }
       }
   ```
   
   In this case, since we are executing in a single dedicated thread, I think 
that we shouldn't implement the ScheduledExecutorService at all and instead 
make it specialized for the use case where a single task is executed with an 
ordinary java.lang.Thread (well it makes sense to use Netty's 
io.netty.util.concurrent.FastThreadLocalThread thread implementation) and with 
logic which calculates the Thread.sleep between task executions so that it 
executes at fixed rate, preventing the backlogging of the single task that it 
can execute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to