lhotari commented on code in PR #24596: URL: https://github.com/apache/pulsar/pull/24596#discussion_r2259876043
########## pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadSafeScheduledExecutorService.java: ########## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.Delayed; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableScheduledFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SingleThreadSafeScheduledExecutorService extends ScheduledThreadPoolExecutor Review Comment: > I think the new class can benefit clearer, simpler and more efficient. I don't think that implementing a complete ScheduledExecutorService is necessary at all. The implementation looks very complex, which isn't needed to address the problem. > BTW, we can hardly know whether the Runnble is convectively executing. Oh yes, now I see the problem. With a single threaded executor, there wouldn't be a way to skip scheduling the task if the previous task is overdue. For a multi-threaded task executor, this type of wrapper would be useful: ```java public static Runnable preventConcurrentExecution(Runnable runnable, String taskName) { return new NonConcurrentRunnable(runnable, taskName); } private static final class NonConcurrentRunnable implements Runnable { private final Runnable runnable; private final String taskName; private final AtomicLong taskStarted = new AtomicLong(0); private NonConcurrentRunnable(Runnable runnable, String taskName) { this.runnable = runnable; this.taskName = taskName; } @Override public void run() { if (taskStarted.compareAndSet(0L, System.nanoTime())) { try { runnable.run(); } catch (Throwable t) { LOGGER.error("Unexpected throwable caught when executing task '{}'", taskName, t); } finally { taskStarted.set(0L); } } else { long elapsedTimeNanos = System.nanoTime() - taskStarted.get(); LOGGER.warn( "Runnable for task '{}' is already running, skipping execution. " + "Elapsed time since current task started: {} ms", taskName, TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos)); } } } ``` In this case, since we are executing in a single dedicated thread, I think that we shouldn't implement the ScheduledExecutorService at all and instead make it specialized for the use case where a single task is executed with an ordinary java.lang.Thread (well it makes sense to use Netty's io.netty.util.concurrent.FastThreadLocalThread thread implementation) and with logic which calculates the Thread.sleep between task executions so that it executes at fixed rate, preventing the backlogging of the single task that it can execute. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
