This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f41687c4277 [improve][broker]Call scheduleAtFixedRateNonConcurrently 
for scheduled tasks, instead of scheduleAtFixedRate (#24596)
f41687c4277 is described below

commit f41687c4277e0e7b0347862dc2fd983cb0f8cded
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 25 22:19:54 2025 +0800

    [improve][broker]Call scheduleAtFixedRateNonConcurrently for scheduled 
tasks, instead of scheduleAtFixedRate (#24596)
---
 .../pulsar/broker/service/BrokerService.java       |  82 +--
 ...ingleThreadNonConcurrentFixedRateScheduler.java | 401 ++++++++++++
 ...eThreadNonConcurrentFixedRateSchedulerTest.java | 684 +++++++++++++++++++++
 3 files changed, 1116 insertions(+), 51 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9bd107f3320..044f44a644a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -68,7 +68,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -85,7 +84,6 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -184,6 +182,7 @@ import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import 
org.apache.pulsar.common.util.SingleThreadNonConcurrentFixedRateScheduler;
 import org.apache.pulsar.common.util.netty.ChannelFutures;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.apache.pulsar.common.util.netty.NettyFutureUtil;
@@ -243,10 +242,10 @@ public class BrokerService implements Closeable {
     private final ConcurrentLinkedQueue<TopicLoadingContext> 
pendingTopicLoadingQueue;
 
     private AuthorizationService authorizationService;
-    private final ScheduledExecutorService statsUpdater;
+    private final SingleThreadNonConcurrentFixedRateScheduler statsUpdater;
 
     @Getter
-    private final ScheduledExecutorService backlogQuotaChecker;
+    private final SingleThreadNonConcurrentFixedRateScheduler 
backlogQuotaChecker;
 
     protected final AtomicReference<Semaphore> lookupRequestSemaphore;
     protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
@@ -274,11 +273,11 @@ public class BrokerService implements Closeable {
             .help("Counter of connections throttled because of per-connection 
limit")
             .register();
 
-    private final ScheduledExecutorService inactivityMonitor;
-    private final ScheduledExecutorService messageExpiryMonitor;
-    private final ScheduledExecutorService compactionMonitor;
-    private final ScheduledExecutorService consumedLedgersMonitor;
-    private ScheduledExecutorService deduplicationSnapshotMonitor;
+    private final SingleThreadNonConcurrentFixedRateScheduler 
inactivityMonitor;
+    private final SingleThreadNonConcurrentFixedRateScheduler 
messageExpiryMonitor;
+    private final SingleThreadNonConcurrentFixedRateScheduler 
compactionMonitor;
+    private final SingleThreadNonConcurrentFixedRateScheduler 
consumedLedgersMonitor;
+    private SingleThreadNonConcurrentFixedRateScheduler 
deduplicationSnapshotMonitor;
     protected final PublishRateLimiter brokerPublishRateLimiter;
     private final DispatchRateLimiterFactory dispatchRateLimiterFactory;
     protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
@@ -350,10 +349,7 @@ public class BrokerService implements Closeable {
                 pulsar.getConfiguration().getNumAcceptorThreads(), false, 
acceptorThreadFactory);
         this.workerGroup = eventLoopGroup;
 
-        this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
-                .name("pulsar-stats-updater")
-                .numThreads(1)
-                .build();
+        this.statsUpdater = new 
SingleThreadNonConcurrentFixedRateScheduler("pulsar-stats-updater");
         this.authorizationService = new AuthorizationService(
                 pulsar.getConfiguration(), pulsar().getPulsarResources());
         this.entryFilterProvider = new 
EntryFilterProvider(pulsar.getConfiguration());
@@ -363,27 +359,13 @@ public class BrokerService implements Closeable {
             
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
         }
 
-        this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
-                .name("pulsar-inactivity-monitor")
-                .numThreads(1)
-                .build();
-        this.messageExpiryMonitor = OrderedScheduler.newSchedulerBuilder()
-                .name("pulsar-msg-expiry-monitor")
-                .numThreads(1)
-                .build();
-        this.compactionMonitor = OrderedScheduler.newSchedulerBuilder()
-                .name("pulsar-compaction-monitor")
-                .numThreads(1)
-                .build();
-        this.consumedLedgersMonitor = OrderedScheduler.newSchedulerBuilder()
-                .name("pulsar-consumed-ledgers-monitor")
-                .numThreads(1)
-                .build();
+        this.inactivityMonitor = new 
SingleThreadNonConcurrentFixedRateScheduler("pulsar-inactivity-monitor");
+        this.messageExpiryMonitor = new 
SingleThreadNonConcurrentFixedRateScheduler("pulsar-msg-expiry-monitor");
+        this.compactionMonitor = new 
SingleThreadNonConcurrentFixedRateScheduler("pulsar-compaction-monitor");
+        this.consumedLedgersMonitor =
+                new 
SingleThreadNonConcurrentFixedRateScheduler("pulsar-consumed-ledgers-monitor");
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
-        this.backlogQuotaChecker = OrderedScheduler.newSchedulerBuilder()
-                .name("pulsar-backlog-quota-checker")
-                .numThreads(1)
-                .build();
+        this.backlogQuotaChecker = new 
SingleThreadNonConcurrentFixedRateScheduler("pulsar-backlog-quota-checker");
         this.authenticationService = new 
AuthenticationService(pulsar.getConfiguration(),
                 pulsar.getOpenTelemetry().getOpenTelemetry());
         this.topicFactory = createPersistentTopicFactory();
@@ -646,7 +628,7 @@ public class BrokerService implements Closeable {
 
     protected void startClearInvalidateTopicNameCacheTask() {
         final int maxSecondsToClearTopicNameCache = 
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
-        inactivityMonitor.scheduleAtFixedRate(
+        inactivityMonitor.scheduleAtFixedRateNonConcurrently(
             () -> 
TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
             maxSecondsToClearTopicNameCache,
             maxSecondsToClearTopicNameCache,
@@ -654,7 +636,7 @@ public class BrokerService implements Closeable {
     }
 
     protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int 
statsUpdateFrequencyInSecs) {
-        statsUpdater.scheduleAtFixedRate(this::updateRates,
+        statsUpdater.scheduleAtFixedRateNonConcurrently(this::updateRates,
             statsUpdateInitialDelayInSecs, statsUpdateFrequencyInSecs, 
TimeUnit.SECONDS);
 
         // Ensure the broker starts up with initial stats
@@ -665,7 +647,7 @@ public class BrokerService implements Closeable {
         ServiceConfiguration config = pulsar().getConfiguration();
         if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) {
             int interval = config.getHealthCheckMetricsUpdateTimeInSeconds();
-            statsUpdater.scheduleAtFixedRate(this::checkHealth,
+            statsUpdater.scheduleAtFixedRateNonConcurrently(this::checkHealth,
                     interval, interval, TimeUnit.SECONDS);
         }
     }
@@ -687,11 +669,9 @@ public class BrokerService implements Closeable {
         // scheduled task runs.
         int interval = 
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
         if (interval > 0) {
-            this.deduplicationSnapshotMonitor = 
OrderedScheduler.newSchedulerBuilder()
-                    .name("deduplication-snapshot-monitor")
-                    .numThreads(1)
-                    .build();
-            deduplicationSnapshotMonitor.scheduleAtFixedRate(() -> 
forEachTopic(
+            this.deduplicationSnapshotMonitor =
+                    new 
SingleThreadNonConcurrentFixedRateScheduler("deduplication-snapshot-monitor");
+            deduplicationSnapshotMonitor.scheduleAtFixedRateNonConcurrently(() 
-> forEachTopic(
                     Topic::checkDeduplicationSnapshot)
                     , interval, interval, TimeUnit.SECONDS);
         }
@@ -700,14 +680,14 @@ public class BrokerService implements Closeable {
     protected void startInactivityMonitor() {
         if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) 
{
             int interval = 
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
-            inactivityMonitor.scheduleAtFixedRate(() -> checkGC(), interval, 
interval,
+            inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> 
checkGC(), interval, interval,
                     TimeUnit.SECONDS);
         }
 
         // Deduplication info checker
         long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
                 
.toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes())
 / 3;
-        
inactivityMonitor.scheduleAtFixedRate(this::checkMessageDeduplicationInfo,
+        
inactivityMonitor.scheduleAtFixedRateNonConcurrently(this::checkMessageDeduplicationInfo,
                 duplicationCheckerIntervalInSeconds,
                 duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
 
@@ -716,7 +696,7 @@ public class BrokerService implements Closeable {
             long subscriptionExpiryCheckIntervalInSeconds =
                     TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration()
                             .getSubscriptionExpiryCheckIntervalInMinutes());
-            
inactivityMonitor.scheduleAtFixedRate(this::checkInactiveSubscriptions,
+            
inactivityMonitor.scheduleAtFixedRateNonConcurrently(this::checkInactiveSubscriptions,
                     subscriptionExpiryCheckIntervalInSeconds,
                     subscriptionExpiryCheckIntervalInSeconds, 
TimeUnit.SECONDS);
         }
@@ -724,29 +704,29 @@ public class BrokerService implements Closeable {
         // check cluster migration
         int interval = 
pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
         if (interval > 0) {
-            inactivityMonitor.scheduleAtFixedRate(() -> 
checkClusterMigration(), interval, interval,
+            inactivityMonitor.scheduleAtFixedRateNonConcurrently(() -> 
checkClusterMigration(), interval, interval,
                     TimeUnit.SECONDS);
         }
     }
 
     protected void startMessageExpiryMonitor() {
         int interval = 
pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
-        messageExpiryMonitor.scheduleAtFixedRate(this::checkMessageExpiry, 
interval, interval,
+        
messageExpiryMonitor.scheduleAtFixedRateNonConcurrently(this::checkMessageExpiry,
 interval, interval,
                 TimeUnit.MINUTES);
     }
 
     protected void startCheckReplicationPolicies() {
         int interval = 
pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
         if (interval > 0) {
-            
messageExpiryMonitor.scheduleAtFixedRate(this::checkReplicationPolicies, 
interval, interval,
-                    TimeUnit.SECONDS);
+            
messageExpiryMonitor.scheduleAtFixedRateNonConcurrently(this::checkReplicationPolicies,
 interval,
+                    interval, TimeUnit.SECONDS);
         }
     }
 
     protected void startCompactionMonitor() {
         int interval = 
pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
         if (interval > 0) {
-            compactionMonitor.scheduleAtFixedRate(this::checkCompaction,
+            
compactionMonitor.scheduleAtFixedRateNonConcurrently(this::checkCompaction,
                     interval, interval, TimeUnit.SECONDS);
         }
     }
@@ -754,7 +734,7 @@ public class BrokerService implements Closeable {
     protected void startConsumedLedgersMonitor() {
         int interval = 
pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
         if (interval > 0) {
-            
consumedLedgersMonitor.scheduleAtFixedRate(this::checkConsumedLedgers,
+            
consumedLedgersMonitor.scheduleAtFixedRateNonConcurrently(this::checkConsumedLedgers,
                     interval, interval, TimeUnit.SECONDS);
         }
     }
@@ -763,7 +743,7 @@ public class BrokerService implements Closeable {
         if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
             final int interval = 
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
             log.info("Scheduling a thread to check backlog quota after [{}] 
seconds in background", interval);
-            backlogQuotaChecker.scheduleAtFixedRate(this::monitorBacklogQuota, 
interval, interval,
+            
backlogQuotaChecker.scheduleAtFixedRateNonConcurrently(this::monitorBacklogQuota,
 interval, interval,
                     TimeUnit.SECONDS);
         } else {
             log.info("Backlog quota check monitoring is disabled");
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
new file mode 100644
index 00000000000..1ddb2101f3a
--- /dev/null
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SingleThreadNonConcurrentFixedRateScheduler extends 
ScheduledThreadPoolExecutor
+        implements ScheduledExecutorService {
+
+    // Since the class DelayedWorkQueue uses "getDelay" to compare objects 
that are different types, the sequence
+    // number can be the same between objects of different types.
+    private static final AtomicLong fixRateTaskSequencerGenerator = new 
AtomicLong();
+
+    private static final RejectedExecutionHandler 
defaultRejectedExecutionHandler = new AbortPolicy();
+
+    private volatile RejectedExecutionHandler rejectedExecutionHandler = 
defaultRejectedExecutionHandler;
+
+    public SingleThreadNonConcurrentFixedRateScheduler(String name) {
+        super(1, new DefaultThreadFactory(name));
+        super.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+        super.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        super.setRemoveOnCancelPolicy(false);
+        // Trigger worker thread creation.
+        submit(() -> {});
+    }
+
+    private static final class SafeRunnable implements Runnable {
+        private final Runnable task;
+
+        SafeRunnable(Runnable task) {
+            this.task = task;
+        }
+
+        @Override
+        public void run() {
+            try {
+                task.run();
+            } catch (Throwable t) {
+                log.warn("Unexpected throwable from task {}: {}", 
task.getClass(), t.getMessage(), t);
+            }
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        return super.schedule(new SafeRunnable(command), delay, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay, long 
delay, TimeUnit unit) {
+        return super.scheduleWithFixedDelay(new SafeRunnable(command), 
initialDelay, delay, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay, long 
period, TimeUnit unit) {
+        return super.scheduleAtFixedRate(new SafeRunnable(command), 
initialDelay, period, unit);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public Future<?> submit(Runnable task) {
+        return super.submit(new SafeRunnable(task));
+    }
+
+    /***
+     * Different with {@link #scheduleAtFixedRate(Runnable, long, long, 
TimeUnit)}, If the execution time of the next
+     * period task > period: New tasks will trigger be dropped, instead, 
execute the next period task after the current
+     * time.
+     */
+    public ScheduledFuture<?> scheduleAtFixedRateNonConcurrently(Runnable 
command,
+                                                                 long 
initialDelay,
+                                                                 long period,
+                                                                 TimeUnit 
unit) {
+        if (command == null || unit == null) {
+            throw new NullPointerException();
+        }
+        if (period <= 0L) {
+            throw new IllegalArgumentException("period can not be null");
+        }
+        ScheduledFutureTask<Void> sft =
+                new ScheduledFutureTask<Void>(command,
+                        null,
+                        triggerTime(initialDelay, unit),
+                        unit.toNanos(period),
+                        fixRateTaskSequencerGenerator.getAndIncrement());
+        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
+        sft.outerTask = t;
+        delayedExecute(t);
+        return t;
+    }
+
+    /**
+     * Main execution method for delayed or periodic tasks.  If pool
+     * is shut down, rejects the task. Otherwise adds task to queue
+     * and starts a thread, if necessary, to run it.  (We cannot
+     * prestart the thread to run the task because the task (probably)
+     * shouldn't be run yet.)  If the pool is shut down while the task
+     * is being added, cancel and remove it if required by state and
+     * run-after-shutdown parameters.
+     *
+     * @param task the task
+     */
+    private void delayedExecute(RunnableScheduledFuture<?> task) {
+        if (isShutdown()) {
+            reject(task);
+        } else {
+            super.getQueue().add(task);
+            if (!canRunInCurrentRunState(task) && remove(task)) {
+                task.cancel(false);
+            }
+        }
+    }
+
+    /**
+     * Invokes the rejected execution handler for the given command.
+     * Package-protected for use by ScheduledThreadPoolExecutor.
+     */
+    private void reject(Runnable command) {
+        rejectedExecutionHandler.rejectedExecution(command, this);
+    }
+
+    /**
+     * Returns the nanoTime-based trigger time of a delayed action.
+     */
+    private long triggerTime(long delay, TimeUnit unit) {
+        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
+    }
+
+    /**
+     * Returns the nanoTime-based trigger time of a delayed action.
+     */
+    private long triggerTime(long delay) {
+        return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : 
overflowFree(delay));
+    }
+
+    /**
+     * Constrains the values of all delays in the queue to be within
+     * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
+     * This may occur if a task is eligible to be dequeued, but has
+     * not yet been, while some other task is added with a delay of
+     * Long.MAX_VALUE.
+     */
+    private long overflowFree(long delay) {
+        Delayed head = (Delayed) super.getQueue().peek();
+        if (head != null) {
+            long headDelay = head.getDelay(NANOSECONDS);
+            if (headDelay < 0 && (delay - headDelay < 0)) {
+                delay = Long.MAX_VALUE + headDelay;
+            }
+        }
+        return delay;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean 
flag) {
+        throw new IllegalArgumentException("Not support to set 
continueExistingPeriodicTasksAfterShutdownPolicy, it is"
+                + " always false");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean 
flag) {
+        throw new IllegalArgumentException("Not support to set 
executeExistingDelayedTasksAfterShutdownPolicy, it is"
+                + " always false");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     * Since the method "remove(runnable)" of DelayedWorkQueue is expensive if 
the tasks in the queue is not the same
+     * type, we denied the remove policy.
+     */
+    @Override
+    public void setRemoveOnCancelPolicy(boolean value) {
+        throw new IllegalArgumentException("Not support to set 
removeOnCancelPolicy, it is always false");
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public boolean getRemoveOnCancelPolicy() {
+        return false;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+        super.setRejectedExecutionHandler(handler);
+        this.rejectedExecutionHandler = handler;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public RejectedExecutionHandler getRejectedExecutionHandler() {
+        return rejectedExecutionHandler;
+    }
+
+    private class ScheduledFutureTask<V>
+            extends FutureTask<V> implements RunnableScheduledFuture<V> {
+
+        /** Sequence number to break ties FIFO. */
+        private final long sequenceNumber;
+
+        /** The nanoTime-based time when the task is enabled to execute. */
+        private volatile long time;
+
+        /**
+         * Period for repeating tasks, in nanoseconds.
+         * A positive value indicates fixed-rate execution.
+         * A negative value indicates fixed-delay execution.
+         * A value of 0 indicates a non-repeating (one-shot) task.
+         */
+        private final long period;
+
+        /** The actual task to be re-enqueued by reExecutePeriodic. */
+        RunnableScheduledFuture<V> outerTask = this;
+
+        /**
+         * Creates a periodic action with given nanoTime-based initial
+         * trigger time and period.
+         */
+        ScheduledFutureTask(Runnable r, V result, long triggerTime,
+                            long period, long sequenceNumber) {
+            super(r, result);
+            this.time = triggerTime;
+            this.period = period;
+            this.sequenceNumber = sequenceNumber;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof ScheduledFutureTask<?> that)) {
+                return false;
+            }
+            return sequenceNumber == that.sequenceNumber && time == that.time 
&& period == that.period
+                    && Objects.equals(outerTask, that.outerTask);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(sequenceNumber, time, period, outerTask);
+        }
+
+        public long getDelay(TimeUnit unit) {
+            return unit.convert(time - System.nanoTime(), NANOSECONDS);
+        }
+
+        @Override
+        public int compareTo(Delayed other) {
+            if (other == this) { // compare zero if same object
+                return 0;
+            }
+            if (other instanceof ScheduledFutureTask) {
+                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
+                long diff = time - x.time;
+                if (diff < 0) {
+                    return -1;
+                } else if (diff > 0) {
+                    return 1;
+                } else if (sequenceNumber < x.sequenceNumber) {
+                    return -1;
+                } else {
+                    return 1;
+                }
+            }
+            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
+            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+        }
+
+        /**
+         * Returns {@code true} if this is a periodic (not a one-shot) action.
+         */
+        public boolean isPeriodic() {
+            return period != 0;
+        }
+
+        /**
+         * Sets the next time to run for a periodic task.
+         */
+        private void setNextRunTime() {
+            // If the execution time of the next period task > period: New 
tasks will trigger be dropped, instead,
+            // execute the next period task after the current time.
+            if (System.nanoTime() > time + period) {
+                time += ((System.nanoTime() - time) / period + 1) * period;
+            } else {
+                time += period;
+            }
+        }
+
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            // The racy read of heapIndex below is benign:
+            // if heapIndex < 0, then OOTA guarantees that we have surely
+            // been removed; else we recheck under lock in remove()
+            return super.cancel(mayInterruptIfRunning);
+        }
+
+        /**
+         * Overrides FutureTask version so as to reset/requeue if periodic.
+         */
+        public void run() {
+            if (!canRunInCurrentRunState(this)) {
+                cancel(false);
+            } else if (!isPeriodic()) {
+                super.run();
+            } else if (super.runAndReset()) {
+                setNextRunTime();
+                reExecutePeriodic(outerTask);
+            }
+        }
+    }
+
+    /**
+     * Re-queues a periodic task unless current run state precludes it.
+     * Same idea as delayedExecute except drops task rather than rejecting.
+     *
+     * @param task the task
+     */
+    private void reExecutePeriodic(RunnableScheduledFuture<?> task) {
+        if (canRunInCurrentRunState(task)) {
+            super.getQueue().add(task);
+            return;
+        }
+        task.cancel(false);
+    }
+
+    /**
+     * Returns true if can run a task given current run state and
+     * run-after-shutdown parameters.
+     */
+    private boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
+        // Since the policies 
"continueExistingPeriodicTasksAfterShutdownPolicy" and
+        // "executeExistingDelayedTasksAfterShutdownPolicy" are always 
"false", the checking of "terminating" is not
+        // needed.
+        return !isShutdown();
+    }
+}
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
new file mode 100644
index 00000000000..78ceffb79c4
--- /dev/null
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SingleThreadNonConcurrentFixedRateSchedulerTest {
+
+    private SingleThreadNonConcurrentFixedRateScheduler executor;
+
+    @BeforeMethod
+    public void setUp() {
+        executor = new 
SingleThreadNonConcurrentFixedRateScheduler("test-executor");
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        if (executor != null && !executor.isShutdown()) {
+            executor.shutdown();
+            try {
+                if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                    executor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                executor.shutdownNow();
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Test
+    public void testConstructorCreatesWorkerThread() {
+        // The constructor should trigger worker thread creation
+        assertTrue(executor.getPoolSize() > 0);
+        assertFalse(executor.isShutdown());
+        assertFalse(executor.isTerminated());
+    }
+
+    @Test
+    public void testSubmitTask() throws Exception {
+        AtomicBoolean executed = new AtomicBoolean(false);
+
+        Future<?> future = executor.submit(() -> executed.set(true));
+
+        future.get(1, TimeUnit.SECONDS);
+        assertTrue(executed.get());
+        assertTrue(future.isDone());
+        assertFalse(future.isCancelled());
+    }
+
+    @Test
+    public void testSubmitTaskWithException() throws Exception {
+        AtomicBoolean taskExecuted = new AtomicBoolean(false);
+
+        // Submit a task that throws an exception
+        Future<?> future = executor.submit(() -> {
+            taskExecuted.set(true);
+            throw new RuntimeException("Test exception");
+        });
+
+        // The SafeRunnable wrapper catches the exception, so the task should 
complete
+        // but the future will still contain the exception
+        try {
+            future.get(1, TimeUnit.SECONDS);
+            // If we get here without exception, that's also fine - it means 
SafeRunnable worked
+        } catch (Exception e) {
+            // This is expected - the exception is still propagated through 
the Future
+            assertTrue(e.getCause() instanceof RuntimeException);
+            assertEquals("Test exception", e.getCause().getMessage());
+        }
+
+        assertTrue(future.isDone());
+        assertFalse(future.isCancelled());
+        assertTrue(taskExecuted.get(), "Task should have been executed despite 
the exception");
+    }
+
+    @Test
+    public void testScheduleTask() throws Exception {
+        AtomicBoolean executed = new AtomicBoolean(false);
+        long startTime = System.currentTimeMillis();
+
+        ScheduledFuture<?> future = executor.schedule(() -> 
executed.set(true), 100, TimeUnit.MILLISECONDS);
+
+        future.get(1, TimeUnit.SECONDS);
+        long endTime = System.currentTimeMillis();
+
+        assertTrue(executed.get());
+        assertTrue(future.isDone());
+        assertTrue(endTime - startTime >= 100); // Should have waited at least 
100ms
+    }
+
+    @Test
+    public void testScheduleAtFixedRate() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(15);
+
+        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+            executionCount.incrementAndGet();
+            latch.countDown();
+        }, 0, 50, TimeUnit.MILLISECONDS);
+
+        // Wait for at least 15 executions
+        assertTrue(latch.await(1, TimeUnit.SECONDS));
+        future.cancel(false);
+
+        assertTrue(executionCount.get() >= 15);
+    }
+
+    @Test
+    public void testScheduleWithFixedDelay() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(15);
+
+        ScheduledFuture<?> future = executor.scheduleWithFixedDelay(() -> {
+            executionCount.incrementAndGet();
+            latch.countDown();
+        }, 0, 50, TimeUnit.MILLISECONDS);
+
+        // Wait for at least 3 executions
+        assertTrue(latch.await(15, TimeUnit.SECONDS));
+        future.cancel(false);
+
+        assertTrue(executionCount.get() >= 15);
+    }
+
+    @Test
+    public void testScheduleAtFixedRateNonConcurrently() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+        AtomicLong[] executionTimes = new AtomicLong[10];
+        for (int i = 0; i < executionTimes.length; i++) {
+            executionTimes[i] = new AtomicLong(0);
+        }
+
+        // Schedule a task that takes longer than the period to test outdated 
task dropping
+        ScheduledFuture<?> future = 
executor.scheduleAtFixedRateNonConcurrently(() -> {
+            int count = executionCount.getAndIncrement();
+            if (count < executionTimes.length) {
+                executionTimes[count].set(System.nanoTime());
+            }
+            try {
+                Thread.sleep(150); // Task takes 150ms
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }, 0, 50, TimeUnit.MILLISECONDS); // Period is 50ms (shorter than task 
execution time)
+
+        // Wait for several executions
+        Thread.sleep(800);
+        future.cancel(false);
+
+        // Should have executed multiple times
+        assertTrue(executionCount.get() >= 2);
+        // Should have dropped some executions due to the long task execution 
time.
+        // 800 / 150 = 5.33, so at least 5 executions. Use 7 to avoid 
flakiness.
+        assertTrue(executionCount.get() < 7);
+
+        // Verify that outdated tasks were dropped by checking execution 
intervals
+        // The actual intervals should be longer than the scheduled period due 
to task execution time
+        for (int i = 1; i < Math.min(executionCount.get(), 
executionTimes.length); i++) {
+            if (executionTimes[i].get() > 0 && executionTimes[i - 1].get() > 
0) {
+                long intervalNanos = executionTimes[i].get() - 
executionTimes[i - 1].get();
+                long intervalMs = intervalNanos / 1_000_000;
+                // Interval should be at least the task execution time 
(150ms), not the period (50ms)
+                assertTrue(intervalMs >= 140, "Interval was " + intervalMs + 
"ms, expected >= 140ms");
+            }
+        }
+    }
+
+    @Test
+    public void testScheduleAtFixedRateNonConcurrentlyWithNullCommand() {
+        try {
+            executor.scheduleAtFixedRateNonConcurrently(null, 0, 100, 
TimeUnit.MILLISECONDS);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+            // Expected
+        }
+    }
+
+    @Test
+    public void testScheduleAtFixedRateNonConcurrentlyWithNullTimeUnit() {
+        try {
+            executor.scheduleAtFixedRateNonConcurrently(() -> {}, 0, 100, 
null);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException e) {
+            // Expected
+        }
+    }
+
+    @Test
+    public void testScheduleAtFixedRateNonConcurrentlyWithInvalidPeriod() {
+        try {
+            executor.scheduleAtFixedRateNonConcurrently(() -> {}, 0, 0, 
TimeUnit.MILLISECONDS);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // Expected
+            assertTrue(e.getMessage().contains("period can not be null"));
+        }
+
+        try {
+            executor.scheduleAtFixedRateNonConcurrently(() -> {}, 0, -1, 
TimeUnit.MILLISECONDS);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // Expected
+            assertTrue(e.getMessage().contains("period can not be null"));
+        }
+    }
+
+    @Test
+    public void testDropOutdatedTaskBehavior() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+        AtomicLong[] startTimes = new AtomicLong[5];
+        AtomicLong[] endTimes = new AtomicLong[5];
+        for (int i = 0; i < 5; i++) {
+            startTimes[i] = new AtomicLong(0);
+            endTimes[i] = new AtomicLong(0);
+        }
+
+        // Schedule a task that takes much longer than the period
+        ScheduledFuture<?> future = 
executor.scheduleAtFixedRateNonConcurrently(() -> {
+            int count = executionCount.getAndIncrement();
+            if (count < 5) {
+                startTimes[count].set(System.nanoTime());
+                try {
+                    Thread.sleep(200); // Task takes 200ms
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+                endTimes[count].set(System.nanoTime());
+            } else {
+            }
+        }, 0, 50, TimeUnit.MILLISECONDS); // Period is 50ms
+
+        // Wait for several executions
+        Thread.sleep(1500);
+        future.cancel(false);
+
+        // 1500 - 200 * 5 = 500ms left, so at most 10 more executions
+        // Should have executed multiple times
+        assertTrue(executionCount.get() >= 5);
+        assertTrue(executionCount.get() < 15);
+
+        // Verify that the next execution time is calculated correctly when 
tasks are outdated
+        // Each execution should start after the previous one ends, not at 
fixed intervals
+        for (int i = 1; i < Math.min(executionCount.get(), 5); i++) {
+            if (startTimes[i].get() > 0 && endTimes[i - 1].get() > 0) {
+                // Next task should start after previous task ends
+                assertTrue(startTimes[i].get() >= endTimes[i - 1].get(),
+                    "Task " + i + " started before task " + (i - 1) + " 
ended");
+            }
+        }
+    }
+
+    @Test
+    public void testPolicyGettersReturnCorrectValues() {
+        // These policies are always false and cannot be changed
+        
assertFalse(executor.getContinueExistingPeriodicTasksAfterShutdownPolicy());
+        
assertFalse(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
+        assertFalse(executor.getRemoveOnCancelPolicy());
+    }
+
+    @Test
+    public void testPolicySettersThrowExceptions() {
+        try {
+            executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            
assertTrue(e.getMessage().contains("continueExistingPeriodicTasksAfterShutdownPolicy"));
+        }
+
+        try {
+            executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            
assertTrue(e.getMessage().contains("executeExistingDelayedTasksAfterShutdownPolicy"));
+        }
+
+        try {
+            executor.setRemoveOnCancelPolicy(true);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            assertTrue(e.getMessage().contains("removeOnCancelPolicy"));
+        }
+    }
+
+    @Test
+    public void testRejectedExecutionHandler() {
+        AtomicBoolean handlerCalled = new AtomicBoolean(false);
+        AtomicReference<Runnable> rejectedTask = new AtomicReference<>();
+        RejectedExecutionHandler customHandler = (r, executor) -> {
+            handlerCalled.set(true);
+            rejectedTask.set(r);
+        };
+
+        executor.setRejectedExecutionHandler(customHandler);
+        assertEquals(executor.getRejectedExecutionHandler(), customHandler);
+
+        // Shutdown the executor and try to submit a task
+        executor.shutdown();
+        Runnable testTask = () -> {};
+        executor.submit(testTask);
+
+        // The custom handler should be called
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
handlerCalled.get());
+        assertNotNull(rejectedTask.get());
+    }
+
+    @Test
+    public void testDefaultRejectedExecutionHandler() {
+        // Test that the default handler is AbortPolicy
+        RejectedExecutionHandler defaultHandler = 
executor.getRejectedExecutionHandler();
+        assertNotNull(defaultHandler);
+        assertTrue(defaultHandler instanceof ThreadPoolExecutor.AbortPolicy);
+
+        // Shutdown and verify rejection behavior
+        executor.shutdown();
+        try {
+            executor.submit(() -> {});
+            fail("Should throw RejectedExecutionException with default 
handler");
+        } catch (RejectedExecutionException e) {
+            // Expected with AbortPolicy
+        }
+    }
+
+    @Test
+    public void testShutdownBehavior() throws Exception {
+        AtomicBoolean taskExecuted = new AtomicBoolean(false);
+
+        // Submit a task
+        executor.submit(() -> taskExecuted.set(true));
+
+        // Wait for task to complete
+        Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> 
taskExecuted.get());
+
+        // Shutdown the executor
+        executor.shutdown();
+        assertTrue(executor.isShutdown());
+
+        // Wait for termination
+        assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+        assertTrue(executor.isTerminated());
+    }
+
+    @Test
+    public void testTasksAfterShutdown() {
+        executor.shutdown();
+
+        try {
+            executor.submit(() -> {});
+            fail("Should throw RejectedExecutionException");
+        } catch (RejectedExecutionException e) {
+            // Expected
+        }
+    }
+
+    @Test
+    public void testConcurrentTaskExecution() throws Exception {
+        int taskCount = 10;
+        CountDownLatch latch = new CountDownLatch(taskCount);
+        AtomicInteger executionOrder = new AtomicInteger(0);
+        AtomicInteger[] results = new AtomicInteger[taskCount];
+
+        // Submit multiple tasks
+        for (int i = 0; i < taskCount; i++) {
+            final int taskId = i;
+            results[i] = new AtomicInteger(-1);
+            executor.submit(() -> {
+                results[taskId].set(executionOrder.getAndIncrement());
+                latch.countDown();
+            });
+        }
+
+        // Wait for all tasks to complete
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        // Verify all tasks executed
+        for (int i = 0; i < taskCount; i++) {
+            assertTrue(results[i].get() >= 0, "Task " + i + " did not 
execute");
+        }
+    }
+
+    @Test
+    public void testTaskCancellation() throws Exception {
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch blockLatch = new CountDownLatch(1);
+
+        // Submit a long-running task
+        Future<?> future = executor.submit(() -> {
+            startLatch.countDown();
+            try {
+                blockLatch.await();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        });
+
+        // Wait for task to start
+        assertTrue(startLatch.await(1, TimeUnit.SECONDS));
+
+        // Cancel the task
+        boolean cancelled = future.cancel(true);
+        assertTrue(cancelled);
+        assertTrue(future.isCancelled());
+        assertTrue(future.isDone());
+
+        // Release the blocking task
+        blockLatch.countDown();
+    }
+
+    @Test
+    public void testScheduledTaskCancellation() throws Exception {
+        AtomicBoolean executed = new AtomicBoolean(false);
+
+        // Schedule a task with a delay
+        ScheduledFuture<?> future = executor.schedule(() -> 
executed.set(true), 500, TimeUnit.MILLISECONDS);
+
+        // Cancel before execution
+        boolean cancelled = future.cancel(false);
+        assertTrue(cancelled);
+        assertTrue(future.isCancelled());
+        assertTrue(future.isDone());
+
+        // Wait a bit to ensure task doesn't execute
+        Thread.sleep(600);
+        assertFalse(executed.get());
+    }
+
+    @Test
+    public void testPeriodicTaskCancellation() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+
+        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+            executionCount.incrementAndGet();
+        }, 0, 50, TimeUnit.MILLISECONDS);
+
+        // Let it run a few times
+        Thread.sleep(200);
+        int countBeforeCancel = executionCount.get();
+        assertTrue(countBeforeCancel > 0);
+
+        // Cancel the task
+        boolean cancelled = future.cancel(false);
+        assertTrue(cancelled);
+        assertTrue(future.isCancelled());
+
+        // Wait and verify no more executions
+        Thread.sleep(200);
+        assertEquals(executionCount.get(), countBeforeCancel);
+    }
+
+    @Test
+    public void testExceptionHandlingInPeriodicTask() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+        AtomicInteger exceptionCount = new AtomicInteger(0);
+
+        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+            int count = executionCount.incrementAndGet();
+            if (count % 2 == 0) {
+                exceptionCount.incrementAndGet();
+                throw new RuntimeException("Test exception " + count);
+            }
+        }, 0, 50, TimeUnit.MILLISECONDS);
+
+        // Let it run for a while
+        Thread.sleep(300);
+        future.cancel(false);
+
+        // Verify that exceptions didn't stop the periodic execution
+        assertTrue(executionCount.get() > 2);
+        assertTrue(exceptionCount.get() > 0);
+        // Should have roughly half the executions throwing exceptions
+        assertTrue(Math.abs(executionCount.get() - 2 * exceptionCount.get()) 
<= 1);
+    }
+
+    @Test
+    public void testSchedulingWithDifferentDelays() throws Exception {
+        // Test scheduling with different delays to indirectly test trigger 
time calculation
+        AtomicLong executionTime1 = new AtomicLong(0);
+        AtomicLong executionTime2 = new AtomicLong(0);
+
+        long startTime = System.currentTimeMillis();
+
+        // Schedule two tasks with different delays
+        ScheduledFuture<?> future1 = executor.schedule(() -> {
+            executionTime1.set(System.currentTimeMillis());
+        }, 100, TimeUnit.MILLISECONDS);
+
+        ScheduledFuture<?> future2 = executor.schedule(() -> {
+            executionTime2.set(System.currentTimeMillis());
+        }, 200, TimeUnit.MILLISECONDS);
+
+        // Wait for both to complete
+        future1.get(1, TimeUnit.SECONDS);
+        future2.get(1, TimeUnit.SECONDS);
+
+        // Verify timing
+        assertTrue(executionTime1.get() >= startTime + 100);
+        assertTrue(executionTime2.get() >= startTime + 200);
+        assertTrue(executionTime2.get() > executionTime1.get());
+    }
+
+    @Test
+    public void testSingleThreadExecution() throws Exception {
+        int taskCount = 20;
+        CountDownLatch latch = new CountDownLatch(taskCount);
+        AtomicReference<String> threadName = new AtomicReference<>();
+        AtomicBoolean singleThread = new AtomicBoolean(true);
+
+        // Submit multiple tasks
+        for (int i = 0; i < taskCount; i++) {
+            executor.submit(() -> {
+                String currentThreadName = Thread.currentThread().getName();
+                if (threadName.get() == null) {
+                    threadName.set(currentThreadName);
+                } else if (!threadName.get().equals(currentThreadName)) {
+                    singleThread.set(false);
+                }
+                latch.countDown();
+            });
+        }
+
+        // Wait for all tasks to complete
+        assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+        // Verify all tasks ran on the same thread
+        assertTrue(singleThread.get(), "Tasks should run on a single thread");
+        assertNotNull(threadName.get());
+        assertTrue(threadName.get().contains("test-executor"));
+    }
+
+    @Test
+    public void testShutdownNow() throws Exception {
+        AtomicInteger completedTasks = new AtomicInteger(0);
+        CountDownLatch blockLatch = new CountDownLatch(1);
+
+        // Submit a blocking task
+        executor.submit(() -> {
+            try {
+                blockLatch.await();
+                completedTasks.incrementAndGet();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        });
+
+        // Submit additional tasks
+        for (int i = 0; i < 5; i++) {
+            executor.submit(() -> completedTasks.incrementAndGet());
+        }
+
+        // Shutdown immediately
+        executor.shutdownNow();
+        assertTrue(executor.isShutdown());
+
+        // Release the blocking task
+        blockLatch.countDown();
+
+        // Wait for termination
+        assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+        assertTrue(executor.isTerminated());
+
+        // Some tasks may not have completed due to immediate shutdown
+        assertTrue(completedTasks.get() < 6);
+    }
+
+    @Test
+    public void testScheduledFutureTaskComparison() throws Exception {
+        // Test that scheduled tasks are ordered correctly by time and sequence
+        AtomicInteger executionOrder = new AtomicInteger(0);
+        AtomicInteger[] results = new AtomicInteger[3];
+        for (int i = 0; i < 3; i++) {
+            results[i] = new AtomicInteger(-1);
+        }
+
+        // Schedule tasks with different delays
+        ScheduledFuture<?> future3 = executor.schedule(() -> {
+            results[2].set(executionOrder.getAndIncrement());
+        }, 200, TimeUnit.MILLISECONDS);
+
+        ScheduledFuture<?> future1 = executor.schedule(() -> {
+            results[0].set(executionOrder.getAndIncrement());
+        }, 50, TimeUnit.MILLISECONDS);
+
+        ScheduledFuture<?> future2 = executor.schedule(() -> {
+            results[1].set(executionOrder.getAndIncrement());
+        }, 100, TimeUnit.MILLISECONDS);
+
+        // Wait for all to complete
+        future1.get(1, TimeUnit.SECONDS);
+        future2.get(1, TimeUnit.SECONDS);
+        future3.get(1, TimeUnit.SECONDS);
+
+        // Verify execution order matches delay order
+        assertEquals(results[0].get(), 0); // First to execute (50ms delay)
+        assertEquals(results[1].get(), 1); // Second to execute (100ms delay)
+        assertEquals(results[2].get(), 2); // Third to execute (200ms delay)
+    }
+
+    @Test
+    public void testPeriodicTaskWithShutdown() throws Exception {
+        AtomicInteger executionCount = new AtomicInteger(0);
+
+        // Start a periodic task
+        ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+            executionCount.incrementAndGet();
+        }, 0, 50, TimeUnit.MILLISECONDS);
+
+        // Let it run a few times
+        Thread.sleep(200);
+        int countBeforeShutdown = executionCount.get();
+        assertTrue(countBeforeShutdown >= 2);
+
+        // Shutdown the executor
+        executor.shutdown();
+
+        // Wait a bit more
+        Thread.sleep(200);
+
+        // Task should stop executing after shutdown
+        int countAfterShutdown = executionCount.get();
+        // Allow for one more execution that might have been in progress
+        assertTrue(countAfterShutdown <= countBeforeShutdown + 1);
+
+        assertTrue(future.isCancelled() || future.isDone());
+    }
+
+    @Test
+    public void testSafeRunnableExceptionLogging() throws Exception {
+        // This test verifies that exceptions are caught and logged by 
SafeRunnable
+        AtomicBoolean taskExecuted = new AtomicBoolean(false);
+        AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+        Future<?> future = executor.submit(() -> {
+            taskExecuted.set(true);
+            exceptionThrown.set(true);
+            throw new RuntimeException("Test exception for SafeRunnable");
+        });
+
+        // Wait for task completion
+        try {
+            future.get(1, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            // Exception might be propagated through Future
+        }
+
+        assertTrue(taskExecuted.get());
+        assertTrue(exceptionThrown.get());
+        assertTrue(future.isDone());
+    }
+}

Reply via email to