This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f41687c4277 [improve][broker]Call scheduleAtFixedRateNonConcurrently
for scheduled tasks, instead of scheduleAtFixedRate (#24596)
f41687c4277 is described below
commit f41687c4277e0e7b0347862dc2fd983cb0f8cded
Author: fengyubiao <[email protected]>
AuthorDate: Thu Sep 25 22:19:54 2025 +0800
[improve][broker]Call scheduleAtFixedRateNonConcurrently for scheduled
tasks, instead of scheduleAtFixedRate (#24596)
---
.../pulsar/broker/service/BrokerService.java | 82 +--
...ingleThreadNonConcurrentFixedRateScheduler.java | 401 ++++++++++++
...eThreadNonConcurrentFixedRateSchedulerTest.java | 684 +++++++++++++++++++++
3 files changed, 1116 insertions(+), 51 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 9bd107f3320..044f44a644a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -68,7 +68,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -85,7 +84,6 @@ import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.LedgerOffloader;
@@ -184,6 +182,7 @@ import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
+import
org.apache.pulsar.common.util.SingleThreadNonConcurrentFixedRateScheduler;
import org.apache.pulsar.common.util.netty.ChannelFutures;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.common.util.netty.NettyFutureUtil;
@@ -243,10 +242,10 @@ public class BrokerService implements Closeable {
private final ConcurrentLinkedQueue<TopicLoadingContext>
pendingTopicLoadingQueue;
private AuthorizationService authorizationService;
- private final ScheduledExecutorService statsUpdater;
+ private final SingleThreadNonConcurrentFixedRateScheduler statsUpdater;
@Getter
- private final ScheduledExecutorService backlogQuotaChecker;
+ private final SingleThreadNonConcurrentFixedRateScheduler
backlogQuotaChecker;
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;
@@ -274,11 +273,11 @@ public class BrokerService implements Closeable {
.help("Counter of connections throttled because of per-connection
limit")
.register();
- private final ScheduledExecutorService inactivityMonitor;
- private final ScheduledExecutorService messageExpiryMonitor;
- private final ScheduledExecutorService compactionMonitor;
- private final ScheduledExecutorService consumedLedgersMonitor;
- private ScheduledExecutorService deduplicationSnapshotMonitor;
+ private final SingleThreadNonConcurrentFixedRateScheduler
inactivityMonitor;
+ private final SingleThreadNonConcurrentFixedRateScheduler
messageExpiryMonitor;
+ private final SingleThreadNonConcurrentFixedRateScheduler
compactionMonitor;
+ private final SingleThreadNonConcurrentFixedRateScheduler
consumedLedgersMonitor;
+ private SingleThreadNonConcurrentFixedRateScheduler
deduplicationSnapshotMonitor;
protected final PublishRateLimiter brokerPublishRateLimiter;
private final DispatchRateLimiterFactory dispatchRateLimiterFactory;
protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
@@ -350,10 +349,7 @@ public class BrokerService implements Closeable {
pulsar.getConfiguration().getNumAcceptorThreads(), false,
acceptorThreadFactory);
this.workerGroup = eventLoopGroup;
- this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
- .name("pulsar-stats-updater")
- .numThreads(1)
- .build();
+ this.statsUpdater = new
SingleThreadNonConcurrentFixedRateScheduler("pulsar-stats-updater");
this.authorizationService = new AuthorizationService(
pulsar.getConfiguration(), pulsar().getPulsarResources());
this.entryFilterProvider = new
EntryFilterProvider(pulsar.getConfiguration());
@@ -363,27 +359,13 @@ public class BrokerService implements Closeable {
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);
}
- this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
- .name("pulsar-inactivity-monitor")
- .numThreads(1)
- .build();
- this.messageExpiryMonitor = OrderedScheduler.newSchedulerBuilder()
- .name("pulsar-msg-expiry-monitor")
- .numThreads(1)
- .build();
- this.compactionMonitor = OrderedScheduler.newSchedulerBuilder()
- .name("pulsar-compaction-monitor")
- .numThreads(1)
- .build();
- this.consumedLedgersMonitor = OrderedScheduler.newSchedulerBuilder()
- .name("pulsar-consumed-ledgers-monitor")
- .numThreads(1)
- .build();
+ this.inactivityMonitor = new
SingleThreadNonConcurrentFixedRateScheduler("pulsar-inactivity-monitor");
+ this.messageExpiryMonitor = new
SingleThreadNonConcurrentFixedRateScheduler("pulsar-msg-expiry-monitor");
+ this.compactionMonitor = new
SingleThreadNonConcurrentFixedRateScheduler("pulsar-compaction-monitor");
+ this.consumedLedgersMonitor =
+ new
SingleThreadNonConcurrentFixedRateScheduler("pulsar-consumed-ledgers-monitor");
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
- this.backlogQuotaChecker = OrderedScheduler.newSchedulerBuilder()
- .name("pulsar-backlog-quota-checker")
- .numThreads(1)
- .build();
+ this.backlogQuotaChecker = new
SingleThreadNonConcurrentFixedRateScheduler("pulsar-backlog-quota-checker");
this.authenticationService = new
AuthenticationService(pulsar.getConfiguration(),
pulsar.getOpenTelemetry().getOpenTelemetry());
this.topicFactory = createPersistentTopicFactory();
@@ -646,7 +628,7 @@ public class BrokerService implements Closeable {
protected void startClearInvalidateTopicNameCacheTask() {
final int maxSecondsToClearTopicNameCache =
pulsar.getConfiguration().getMaxSecondsToClearTopicNameCache();
- inactivityMonitor.scheduleAtFixedRate(
+ inactivityMonitor.scheduleAtFixedRateNonConcurrently(
() ->
TopicName.clearIfReachedMaxCapacity(pulsar.getConfiguration().getTopicNameCacheMaxCapacity()),
maxSecondsToClearTopicNameCache,
maxSecondsToClearTopicNameCache,
@@ -654,7 +636,7 @@ public class BrokerService implements Closeable {
}
protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int
statsUpdateFrequencyInSecs) {
- statsUpdater.scheduleAtFixedRate(this::updateRates,
+ statsUpdater.scheduleAtFixedRateNonConcurrently(this::updateRates,
statsUpdateInitialDelayInSecs, statsUpdateFrequencyInSecs,
TimeUnit.SECONDS);
// Ensure the broker starts up with initial stats
@@ -665,7 +647,7 @@ public class BrokerService implements Closeable {
ServiceConfiguration config = pulsar().getConfiguration();
if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) {
int interval = config.getHealthCheckMetricsUpdateTimeInSeconds();
- statsUpdater.scheduleAtFixedRate(this::checkHealth,
+ statsUpdater.scheduleAtFixedRateNonConcurrently(this::checkHealth,
interval, interval, TimeUnit.SECONDS);
}
}
@@ -687,11 +669,9 @@ public class BrokerService implements Closeable {
// scheduled task runs.
int interval =
pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
if (interval > 0) {
- this.deduplicationSnapshotMonitor =
OrderedScheduler.newSchedulerBuilder()
- .name("deduplication-snapshot-monitor")
- .numThreads(1)
- .build();
- deduplicationSnapshotMonitor.scheduleAtFixedRate(() ->
forEachTopic(
+ this.deduplicationSnapshotMonitor =
+ new
SingleThreadNonConcurrentFixedRateScheduler("deduplication-snapshot-monitor");
+ deduplicationSnapshotMonitor.scheduleAtFixedRateNonConcurrently(()
-> forEachTopic(
Topic::checkDeduplicationSnapshot)
, interval, interval, TimeUnit.SECONDS);
}
@@ -700,14 +680,14 @@ public class BrokerService implements Closeable {
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled())
{
int interval =
pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
- inactivityMonitor.scheduleAtFixedRate(() -> checkGC(), interval,
interval,
+ inactivityMonitor.scheduleAtFixedRateNonConcurrently(() ->
checkGC(), interval, interval,
TimeUnit.SECONDS);
}
// Deduplication info checker
long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES
.toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes())
/ 3;
-
inactivityMonitor.scheduleAtFixedRate(this::checkMessageDeduplicationInfo,
+
inactivityMonitor.scheduleAtFixedRateNonConcurrently(this::checkMessageDeduplicationInfo,
duplicationCheckerIntervalInSeconds,
duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS);
@@ -716,7 +696,7 @@ public class BrokerService implements Closeable {
long subscriptionExpiryCheckIntervalInSeconds =
TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration()
.getSubscriptionExpiryCheckIntervalInMinutes());
-
inactivityMonitor.scheduleAtFixedRate(this::checkInactiveSubscriptions,
+
inactivityMonitor.scheduleAtFixedRateNonConcurrently(this::checkInactiveSubscriptions,
subscriptionExpiryCheckIntervalInSeconds,
subscriptionExpiryCheckIntervalInSeconds,
TimeUnit.SECONDS);
}
@@ -724,29 +704,29 @@ public class BrokerService implements Closeable {
// check cluster migration
int interval =
pulsar().getConfiguration().getClusterMigrationCheckDurationSeconds();
if (interval > 0) {
- inactivityMonitor.scheduleAtFixedRate(() ->
checkClusterMigration(), interval, interval,
+ inactivityMonitor.scheduleAtFixedRateNonConcurrently(() ->
checkClusterMigration(), interval, interval,
TimeUnit.SECONDS);
}
}
protected void startMessageExpiryMonitor() {
int interval =
pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes();
- messageExpiryMonitor.scheduleAtFixedRate(this::checkMessageExpiry,
interval, interval,
+
messageExpiryMonitor.scheduleAtFixedRateNonConcurrently(this::checkMessageExpiry,
interval, interval,
TimeUnit.MINUTES);
}
protected void startCheckReplicationPolicies() {
int interval =
pulsar.getConfig().getReplicationPolicyCheckDurationSeconds();
if (interval > 0) {
-
messageExpiryMonitor.scheduleAtFixedRate(this::checkReplicationPolicies,
interval, interval,
- TimeUnit.SECONDS);
+
messageExpiryMonitor.scheduleAtFixedRateNonConcurrently(this::checkReplicationPolicies,
interval,
+ interval, TimeUnit.SECONDS);
}
}
protected void startCompactionMonitor() {
int interval =
pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
- compactionMonitor.scheduleAtFixedRate(this::checkCompaction,
+
compactionMonitor.scheduleAtFixedRateNonConcurrently(this::checkCompaction,
interval, interval, TimeUnit.SECONDS);
}
}
@@ -754,7 +734,7 @@ public class BrokerService implements Closeable {
protected void startConsumedLedgersMonitor() {
int interval =
pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
if (interval > 0) {
-
consumedLedgersMonitor.scheduleAtFixedRate(this::checkConsumedLedgers,
+
consumedLedgersMonitor.scheduleAtFixedRateNonConcurrently(this::checkConsumedLedgers,
interval, interval, TimeUnit.SECONDS);
}
}
@@ -763,7 +743,7 @@ public class BrokerService implements Closeable {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval =
pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
log.info("Scheduling a thread to check backlog quota after [{}]
seconds in background", interval);
- backlogQuotaChecker.scheduleAtFixedRate(this::monitorBacklogQuota,
interval, interval,
+
backlogQuotaChecker.scheduleAtFixedRateNonConcurrently(this::monitorBacklogQuota,
interval, interval,
TimeUnit.SECONDS);
} else {
log.info("Backlog quota check monitoring is disabled");
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
new file mode 100644
index 00000000000..1ddb2101f3a
--- /dev/null
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateScheduler.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import java.util.Objects;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.RunnableScheduledFuture;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SingleThreadNonConcurrentFixedRateScheduler extends
ScheduledThreadPoolExecutor
+ implements ScheduledExecutorService {
+
+ // Since the class DelayedWorkQueue uses "getDelay" to compare objects
that are different types, the sequence
+ // number can be the same between objects of different types.
+ private static final AtomicLong fixRateTaskSequencerGenerator = new
AtomicLong();
+
+ private static final RejectedExecutionHandler
defaultRejectedExecutionHandler = new AbortPolicy();
+
+ private volatile RejectedExecutionHandler rejectedExecutionHandler =
defaultRejectedExecutionHandler;
+
+ public SingleThreadNonConcurrentFixedRateScheduler(String name) {
+ super(1, new DefaultThreadFactory(name));
+ super.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+ super.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ super.setRemoveOnCancelPolicy(false);
+ // Trigger worker thread creation.
+ submit(() -> {});
+ }
+
+ private static final class SafeRunnable implements Runnable {
+ private final Runnable task;
+
+ SafeRunnable(Runnable task) {
+ this.task = task;
+ }
+
+ @Override
+ public void run() {
+ try {
+ task.run();
+ } catch (Throwable t) {
+ log.warn("Unexpected throwable from task {}: {}",
task.getClass(), t.getMessage(), t);
+ }
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+ return super.schedule(new SafeRunnable(command), delay, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+ long initialDelay, long
delay, TimeUnit unit) {
+ return super.scheduleWithFixedDelay(new SafeRunnable(command),
initialDelay, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+ long initialDelay, long
period, TimeUnit unit) {
+ return super.scheduleAtFixedRate(new SafeRunnable(command),
initialDelay, period, unit);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public Future<?> submit(Runnable task) {
+ return super.submit(new SafeRunnable(task));
+ }
+
+ /***
+ * Different with {@link #scheduleAtFixedRate(Runnable, long, long,
TimeUnit)}, If the execution time of the next
+ * period task > period: New tasks will trigger be dropped, instead,
execute the next period task after the current
+ * time.
+ */
+ public ScheduledFuture<?> scheduleAtFixedRateNonConcurrently(Runnable
command,
+ long
initialDelay,
+ long period,
+ TimeUnit
unit) {
+ if (command == null || unit == null) {
+ throw new NullPointerException();
+ }
+ if (period <= 0L) {
+ throw new IllegalArgumentException("period can not be null");
+ }
+ ScheduledFutureTask<Void> sft =
+ new ScheduledFutureTask<Void>(command,
+ null,
+ triggerTime(initialDelay, unit),
+ unit.toNanos(period),
+ fixRateTaskSequencerGenerator.getAndIncrement());
+ RunnableScheduledFuture<Void> t = decorateTask(command, sft);
+ sft.outerTask = t;
+ delayedExecute(t);
+ return t;
+ }
+
+ /**
+ * Main execution method for delayed or periodic tasks. If pool
+ * is shut down, rejects the task. Otherwise adds task to queue
+ * and starts a thread, if necessary, to run it. (We cannot
+ * prestart the thread to run the task because the task (probably)
+ * shouldn't be run yet.) If the pool is shut down while the task
+ * is being added, cancel and remove it if required by state and
+ * run-after-shutdown parameters.
+ *
+ * @param task the task
+ */
+ private void delayedExecute(RunnableScheduledFuture<?> task) {
+ if (isShutdown()) {
+ reject(task);
+ } else {
+ super.getQueue().add(task);
+ if (!canRunInCurrentRunState(task) && remove(task)) {
+ task.cancel(false);
+ }
+ }
+ }
+
+ /**
+ * Invokes the rejected execution handler for the given command.
+ * Package-protected for use by ScheduledThreadPoolExecutor.
+ */
+ private void reject(Runnable command) {
+ rejectedExecutionHandler.rejectedExecution(command, this);
+ }
+
+ /**
+ * Returns the nanoTime-based trigger time of a delayed action.
+ */
+ private long triggerTime(long delay, TimeUnit unit) {
+ return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
+ }
+
+ /**
+ * Returns the nanoTime-based trigger time of a delayed action.
+ */
+ private long triggerTime(long delay) {
+ return System.nanoTime() + ((delay < (Long.MAX_VALUE >> 1)) ? delay :
overflowFree(delay));
+ }
+
+ /**
+ * Constrains the values of all delays in the queue to be within
+ * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
+ * This may occur if a task is eligible to be dequeued, but has
+ * not yet been, while some other task is added with a delay of
+ * Long.MAX_VALUE.
+ */
+ private long overflowFree(long delay) {
+ Delayed head = (Delayed) super.getQueue().peek();
+ if (head != null) {
+ long headDelay = head.getDelay(NANOSECONDS);
+ if (headDelay < 0 && (delay - headDelay < 0)) {
+ delay = Long.MAX_VALUE + headDelay;
+ }
+ }
+ return delay;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean
flag) {
+ throw new IllegalArgumentException("Not support to set
continueExistingPeriodicTasksAfterShutdownPolicy, it is"
+ + " always false");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean
flag) {
+ throw new IllegalArgumentException("Not support to set
executeExistingDelayedTasksAfterShutdownPolicy, it is"
+ + " always false");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ * Since the method "remove(runnable)" of DelayedWorkQueue is expensive if
the tasks in the queue is not the same
+ * type, we denied the remove policy.
+ */
+ @Override
+ public void setRemoveOnCancelPolicy(boolean value) {
+ throw new IllegalArgumentException("Not support to set
removeOnCancelPolicy, it is always false");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean getRemoveOnCancelPolicy() {
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
+ super.setRejectedExecutionHandler(handler);
+ this.rejectedExecutionHandler = handler;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ return rejectedExecutionHandler;
+ }
+
+ private class ScheduledFutureTask<V>
+ extends FutureTask<V> implements RunnableScheduledFuture<V> {
+
+ /** Sequence number to break ties FIFO. */
+ private final long sequenceNumber;
+
+ /** The nanoTime-based time when the task is enabled to execute. */
+ private volatile long time;
+
+ /**
+ * Period for repeating tasks, in nanoseconds.
+ * A positive value indicates fixed-rate execution.
+ * A negative value indicates fixed-delay execution.
+ * A value of 0 indicates a non-repeating (one-shot) task.
+ */
+ private final long period;
+
+ /** The actual task to be re-enqueued by reExecutePeriodic. */
+ RunnableScheduledFuture<V> outerTask = this;
+
+ /**
+ * Creates a periodic action with given nanoTime-based initial
+ * trigger time and period.
+ */
+ ScheduledFutureTask(Runnable r, V result, long triggerTime,
+ long period, long sequenceNumber) {
+ super(r, result);
+ this.time = triggerTime;
+ this.period = period;
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof ScheduledFutureTask<?> that)) {
+ return false;
+ }
+ return sequenceNumber == that.sequenceNumber && time == that.time
&& period == that.period
+ && Objects.equals(outerTask, that.outerTask);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sequenceNumber, time, period, outerTask);
+ }
+
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(time - System.nanoTime(), NANOSECONDS);
+ }
+
+ @Override
+ public int compareTo(Delayed other) {
+ if (other == this) { // compare zero if same object
+ return 0;
+ }
+ if (other instanceof ScheduledFutureTask) {
+ ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
+ long diff = time - x.time;
+ if (diff < 0) {
+ return -1;
+ } else if (diff > 0) {
+ return 1;
+ } else if (sequenceNumber < x.sequenceNumber) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+ long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
+ return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+ }
+
+ /**
+ * Returns {@code true} if this is a periodic (not a one-shot) action.
+ */
+ public boolean isPeriodic() {
+ return period != 0;
+ }
+
+ /**
+ * Sets the next time to run for a periodic task.
+ */
+ private void setNextRunTime() {
+ // If the execution time of the next period task > period: New
tasks will trigger be dropped, instead,
+ // execute the next period task after the current time.
+ if (System.nanoTime() > time + period) {
+ time += ((System.nanoTime() - time) / period + 1) * period;
+ } else {
+ time += period;
+ }
+ }
+
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ // The racy read of heapIndex below is benign:
+ // if heapIndex < 0, then OOTA guarantees that we have surely
+ // been removed; else we recheck under lock in remove()
+ return super.cancel(mayInterruptIfRunning);
+ }
+
+ /**
+ * Overrides FutureTask version so as to reset/requeue if periodic.
+ */
+ public void run() {
+ if (!canRunInCurrentRunState(this)) {
+ cancel(false);
+ } else if (!isPeriodic()) {
+ super.run();
+ } else if (super.runAndReset()) {
+ setNextRunTime();
+ reExecutePeriodic(outerTask);
+ }
+ }
+ }
+
+ /**
+ * Re-queues a periodic task unless current run state precludes it.
+ * Same idea as delayedExecute except drops task rather than rejecting.
+ *
+ * @param task the task
+ */
+ private void reExecutePeriodic(RunnableScheduledFuture<?> task) {
+ if (canRunInCurrentRunState(task)) {
+ super.getQueue().add(task);
+ return;
+ }
+ task.cancel(false);
+ }
+
+ /**
+ * Returns true if can run a task given current run state and
+ * run-after-shutdown parameters.
+ */
+ private boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
+ // Since the policies
"continueExistingPeriodicTasksAfterShutdownPolicy" and
+ // "executeExistingDelayedTasksAfterShutdownPolicy" are always
"false", the checking of "terminating" is not
+ // needed.
+ return !isShutdown();
+ }
+}
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
new file mode 100644
index 00000000000..78ceffb79c4
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/SingleThreadNonConcurrentFixedRateSchedulerTest.java
@@ -0,0 +1,684 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class SingleThreadNonConcurrentFixedRateSchedulerTest {
+
+ private SingleThreadNonConcurrentFixedRateScheduler executor;
+
+ @BeforeMethod
+ public void setUp() {
+ executor = new
SingleThreadNonConcurrentFixedRateScheduler("test-executor");
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ if (executor != null && !executor.isShutdown()) {
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ @Test
+ public void testConstructorCreatesWorkerThread() {
+ // The constructor should trigger worker thread creation
+ assertTrue(executor.getPoolSize() > 0);
+ assertFalse(executor.isShutdown());
+ assertFalse(executor.isTerminated());
+ }
+
+ @Test
+ public void testSubmitTask() throws Exception {
+ AtomicBoolean executed = new AtomicBoolean(false);
+
+ Future<?> future = executor.submit(() -> executed.set(true));
+
+ future.get(1, TimeUnit.SECONDS);
+ assertTrue(executed.get());
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ }
+
+ @Test
+ public void testSubmitTaskWithException() throws Exception {
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+
+ // Submit a task that throws an exception
+ Future<?> future = executor.submit(() -> {
+ taskExecuted.set(true);
+ throw new RuntimeException("Test exception");
+ });
+
+ // The SafeRunnable wrapper catches the exception, so the task should
complete
+ // but the future will still contain the exception
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ // If we get here without exception, that's also fine - it means
SafeRunnable worked
+ } catch (Exception e) {
+ // This is expected - the exception is still propagated through
the Future
+ assertTrue(e.getCause() instanceof RuntimeException);
+ assertEquals("Test exception", e.getCause().getMessage());
+ }
+
+ assertTrue(future.isDone());
+ assertFalse(future.isCancelled());
+ assertTrue(taskExecuted.get(), "Task should have been executed despite
the exception");
+ }
+
+ @Test
+ public void testScheduleTask() throws Exception {
+ AtomicBoolean executed = new AtomicBoolean(false);
+ long startTime = System.currentTimeMillis();
+
+ ScheduledFuture<?> future = executor.schedule(() ->
executed.set(true), 100, TimeUnit.MILLISECONDS);
+
+ future.get(1, TimeUnit.SECONDS);
+ long endTime = System.currentTimeMillis();
+
+ assertTrue(executed.get());
+ assertTrue(future.isDone());
+ assertTrue(endTime - startTime >= 100); // Should have waited at least
100ms
+ }
+
+ @Test
+ public void testScheduleAtFixedRate() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(15);
+
+ ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+ executionCount.incrementAndGet();
+ latch.countDown();
+ }, 0, 50, TimeUnit.MILLISECONDS);
+
+ // Wait for at least 15 executions
+ assertTrue(latch.await(1, TimeUnit.SECONDS));
+ future.cancel(false);
+
+ assertTrue(executionCount.get() >= 15);
+ }
+
+ @Test
+ public void testScheduleWithFixedDelay() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(15);
+
+ ScheduledFuture<?> future = executor.scheduleWithFixedDelay(() -> {
+ executionCount.incrementAndGet();
+ latch.countDown();
+ }, 0, 50, TimeUnit.MILLISECONDS);
+
+ // Wait for at least 3 executions
+ assertTrue(latch.await(15, TimeUnit.SECONDS));
+ future.cancel(false);
+
+ assertTrue(executionCount.get() >= 15);
+ }
+
+ @Test
+ public void testScheduleAtFixedRateNonConcurrently() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+ AtomicLong[] executionTimes = new AtomicLong[10];
+ for (int i = 0; i < executionTimes.length; i++) {
+ executionTimes[i] = new AtomicLong(0);
+ }
+
+ // Schedule a task that takes longer than the period to test outdated
task dropping
+ ScheduledFuture<?> future =
executor.scheduleAtFixedRateNonConcurrently(() -> {
+ int count = executionCount.getAndIncrement();
+ if (count < executionTimes.length) {
+ executionTimes[count].set(System.nanoTime());
+ }
+ try {
+ Thread.sleep(150); // Task takes 150ms
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }, 0, 50, TimeUnit.MILLISECONDS); // Period is 50ms (shorter than task
execution time)
+
+ // Wait for several executions
+ Thread.sleep(800);
+ future.cancel(false);
+
+ // Should have executed multiple times
+ assertTrue(executionCount.get() >= 2);
+ // Should have dropped some executions due to the long task execution
time.
+ // 800 / 150 = 5.33, so at least 5 executions. Use 7 to avoid
flakiness.
+ assertTrue(executionCount.get() < 7);
+
+ // Verify that outdated tasks were dropped by checking execution
intervals
+ // The actual intervals should be longer than the scheduled period due
to task execution time
+ for (int i = 1; i < Math.min(executionCount.get(),
executionTimes.length); i++) {
+ if (executionTimes[i].get() > 0 && executionTimes[i - 1].get() >
0) {
+ long intervalNanos = executionTimes[i].get() -
executionTimes[i - 1].get();
+ long intervalMs = intervalNanos / 1_000_000;
+ // Interval should be at least the task execution time
(150ms), not the period (50ms)
+ assertTrue(intervalMs >= 140, "Interval was " + intervalMs +
"ms, expected >= 140ms");
+ }
+ }
+ }
+
+ @Test
+ public void testScheduleAtFixedRateNonConcurrentlyWithNullCommand() {
+ try {
+ executor.scheduleAtFixedRateNonConcurrently(null, 0, 100,
TimeUnit.MILLISECONDS);
+ fail("Should throw NullPointerException");
+ } catch (NullPointerException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testScheduleAtFixedRateNonConcurrentlyWithNullTimeUnit() {
+ try {
+ executor.scheduleAtFixedRateNonConcurrently(() -> {}, 0, 100,
null);
+ fail("Should throw NullPointerException");
+ } catch (NullPointerException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testScheduleAtFixedRateNonConcurrentlyWithInvalidPeriod() {
+ try {
+ executor.scheduleAtFixedRateNonConcurrently(() -> {}, 0, 0,
TimeUnit.MILLISECONDS);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // Expected
+ assertTrue(e.getMessage().contains("period can not be null"));
+ }
+
+ try {
+ executor.scheduleAtFixedRateNonConcurrently(() -> {}, 0, -1,
TimeUnit.MILLISECONDS);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // Expected
+ assertTrue(e.getMessage().contains("period can not be null"));
+ }
+ }
+
+ @Test
+ public void testDropOutdatedTaskBehavior() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+ AtomicLong[] startTimes = new AtomicLong[5];
+ AtomicLong[] endTimes = new AtomicLong[5];
+ for (int i = 0; i < 5; i++) {
+ startTimes[i] = new AtomicLong(0);
+ endTimes[i] = new AtomicLong(0);
+ }
+
+ // Schedule a task that takes much longer than the period
+ ScheduledFuture<?> future =
executor.scheduleAtFixedRateNonConcurrently(() -> {
+ int count = executionCount.getAndIncrement();
+ if (count < 5) {
+ startTimes[count].set(System.nanoTime());
+ try {
+ Thread.sleep(200); // Task takes 200ms
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ endTimes[count].set(System.nanoTime());
+ } else {
+ }
+ }, 0, 50, TimeUnit.MILLISECONDS); // Period is 50ms
+
+ // Wait for several executions
+ Thread.sleep(1500);
+ future.cancel(false);
+
+ // 1500 - 200 * 5 = 500ms left, so at most 10 more executions
+ // Should have executed multiple times
+ assertTrue(executionCount.get() >= 5);
+ assertTrue(executionCount.get() < 15);
+
+ // Verify that the next execution time is calculated correctly when
tasks are outdated
+ // Each execution should start after the previous one ends, not at
fixed intervals
+ for (int i = 1; i < Math.min(executionCount.get(), 5); i++) {
+ if (startTimes[i].get() > 0 && endTimes[i - 1].get() > 0) {
+ // Next task should start after previous task ends
+ assertTrue(startTimes[i].get() >= endTimes[i - 1].get(),
+ "Task " + i + " started before task " + (i - 1) + "
ended");
+ }
+ }
+ }
+
+ @Test
+ public void testPolicyGettersReturnCorrectValues() {
+ // These policies are always false and cannot be changed
+
assertFalse(executor.getContinueExistingPeriodicTasksAfterShutdownPolicy());
+
assertFalse(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
+ assertFalse(executor.getRemoveOnCancelPolicy());
+ }
+
+ @Test
+ public void testPolicySettersThrowExceptions() {
+ try {
+ executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+
assertTrue(e.getMessage().contains("continueExistingPeriodicTasksAfterShutdownPolicy"));
+ }
+
+ try {
+ executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+
assertTrue(e.getMessage().contains("executeExistingDelayedTasksAfterShutdownPolicy"));
+ }
+
+ try {
+ executor.setRemoveOnCancelPolicy(true);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("removeOnCancelPolicy"));
+ }
+ }
+
+ @Test
+ public void testRejectedExecutionHandler() {
+ AtomicBoolean handlerCalled = new AtomicBoolean(false);
+ AtomicReference<Runnable> rejectedTask = new AtomicReference<>();
+ RejectedExecutionHandler customHandler = (r, executor) -> {
+ handlerCalled.set(true);
+ rejectedTask.set(r);
+ };
+
+ executor.setRejectedExecutionHandler(customHandler);
+ assertEquals(executor.getRejectedExecutionHandler(), customHandler);
+
+ // Shutdown the executor and try to submit a task
+ executor.shutdown();
+ Runnable testTask = () -> {};
+ executor.submit(testTask);
+
+ // The custom handler should be called
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() ->
handlerCalled.get());
+ assertNotNull(rejectedTask.get());
+ }
+
+ @Test
+ public void testDefaultRejectedExecutionHandler() {
+ // Test that the default handler is AbortPolicy
+ RejectedExecutionHandler defaultHandler =
executor.getRejectedExecutionHandler();
+ assertNotNull(defaultHandler);
+ assertTrue(defaultHandler instanceof ThreadPoolExecutor.AbortPolicy);
+
+ // Shutdown and verify rejection behavior
+ executor.shutdown();
+ try {
+ executor.submit(() -> {});
+ fail("Should throw RejectedExecutionException with default
handler");
+ } catch (RejectedExecutionException e) {
+ // Expected with AbortPolicy
+ }
+ }
+
+ @Test
+ public void testShutdownBehavior() throws Exception {
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+
+ // Submit a task
+ executor.submit(() -> taskExecuted.set(true));
+
+ // Wait for task to complete
+ Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() ->
taskExecuted.get());
+
+ // Shutdown the executor
+ executor.shutdown();
+ assertTrue(executor.isShutdown());
+
+ // Wait for termination
+ assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+ assertTrue(executor.isTerminated());
+ }
+
+ @Test
+ public void testTasksAfterShutdown() {
+ executor.shutdown();
+
+ try {
+ executor.submit(() -> {});
+ fail("Should throw RejectedExecutionException");
+ } catch (RejectedExecutionException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testConcurrentTaskExecution() throws Exception {
+ int taskCount = 10;
+ CountDownLatch latch = new CountDownLatch(taskCount);
+ AtomicInteger executionOrder = new AtomicInteger(0);
+ AtomicInteger[] results = new AtomicInteger[taskCount];
+
+ // Submit multiple tasks
+ for (int i = 0; i < taskCount; i++) {
+ final int taskId = i;
+ results[i] = new AtomicInteger(-1);
+ executor.submit(() -> {
+ results[taskId].set(executionOrder.getAndIncrement());
+ latch.countDown();
+ });
+ }
+
+ // Wait for all tasks to complete
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ // Verify all tasks executed
+ for (int i = 0; i < taskCount; i++) {
+ assertTrue(results[i].get() >= 0, "Task " + i + " did not
execute");
+ }
+ }
+
+ @Test
+ public void testTaskCancellation() throws Exception {
+ CountDownLatch startLatch = new CountDownLatch(1);
+ CountDownLatch blockLatch = new CountDownLatch(1);
+
+ // Submit a long-running task
+ Future<?> future = executor.submit(() -> {
+ startLatch.countDown();
+ try {
+ blockLatch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Wait for task to start
+ assertTrue(startLatch.await(1, TimeUnit.SECONDS));
+
+ // Cancel the task
+ boolean cancelled = future.cancel(true);
+ assertTrue(cancelled);
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ // Release the blocking task
+ blockLatch.countDown();
+ }
+
+ @Test
+ public void testScheduledTaskCancellation() throws Exception {
+ AtomicBoolean executed = new AtomicBoolean(false);
+
+ // Schedule a task with a delay
+ ScheduledFuture<?> future = executor.schedule(() ->
executed.set(true), 500, TimeUnit.MILLISECONDS);
+
+ // Cancel before execution
+ boolean cancelled = future.cancel(false);
+ assertTrue(cancelled);
+ assertTrue(future.isCancelled());
+ assertTrue(future.isDone());
+
+ // Wait a bit to ensure task doesn't execute
+ Thread.sleep(600);
+ assertFalse(executed.get());
+ }
+
+ @Test
+ public void testPeriodicTaskCancellation() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+
+ ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+ executionCount.incrementAndGet();
+ }, 0, 50, TimeUnit.MILLISECONDS);
+
+ // Let it run a few times
+ Thread.sleep(200);
+ int countBeforeCancel = executionCount.get();
+ assertTrue(countBeforeCancel > 0);
+
+ // Cancel the task
+ boolean cancelled = future.cancel(false);
+ assertTrue(cancelled);
+ assertTrue(future.isCancelled());
+
+ // Wait and verify no more executions
+ Thread.sleep(200);
+ assertEquals(executionCount.get(), countBeforeCancel);
+ }
+
+ @Test
+ public void testExceptionHandlingInPeriodicTask() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+ AtomicInteger exceptionCount = new AtomicInteger(0);
+
+ ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+ int count = executionCount.incrementAndGet();
+ if (count % 2 == 0) {
+ exceptionCount.incrementAndGet();
+ throw new RuntimeException("Test exception " + count);
+ }
+ }, 0, 50, TimeUnit.MILLISECONDS);
+
+ // Let it run for a while
+ Thread.sleep(300);
+ future.cancel(false);
+
+ // Verify that exceptions didn't stop the periodic execution
+ assertTrue(executionCount.get() > 2);
+ assertTrue(exceptionCount.get() > 0);
+ // Should have roughly half the executions throwing exceptions
+ assertTrue(Math.abs(executionCount.get() - 2 * exceptionCount.get())
<= 1);
+ }
+
+ @Test
+ public void testSchedulingWithDifferentDelays() throws Exception {
+ // Test scheduling with different delays to indirectly test trigger
time calculation
+ AtomicLong executionTime1 = new AtomicLong(0);
+ AtomicLong executionTime2 = new AtomicLong(0);
+
+ long startTime = System.currentTimeMillis();
+
+ // Schedule two tasks with different delays
+ ScheduledFuture<?> future1 = executor.schedule(() -> {
+ executionTime1.set(System.currentTimeMillis());
+ }, 100, TimeUnit.MILLISECONDS);
+
+ ScheduledFuture<?> future2 = executor.schedule(() -> {
+ executionTime2.set(System.currentTimeMillis());
+ }, 200, TimeUnit.MILLISECONDS);
+
+ // Wait for both to complete
+ future1.get(1, TimeUnit.SECONDS);
+ future2.get(1, TimeUnit.SECONDS);
+
+ // Verify timing
+ assertTrue(executionTime1.get() >= startTime + 100);
+ assertTrue(executionTime2.get() >= startTime + 200);
+ assertTrue(executionTime2.get() > executionTime1.get());
+ }
+
+ @Test
+ public void testSingleThreadExecution() throws Exception {
+ int taskCount = 20;
+ CountDownLatch latch = new CountDownLatch(taskCount);
+ AtomicReference<String> threadName = new AtomicReference<>();
+ AtomicBoolean singleThread = new AtomicBoolean(true);
+
+ // Submit multiple tasks
+ for (int i = 0; i < taskCount; i++) {
+ executor.submit(() -> {
+ String currentThreadName = Thread.currentThread().getName();
+ if (threadName.get() == null) {
+ threadName.set(currentThreadName);
+ } else if (!threadName.get().equals(currentThreadName)) {
+ singleThread.set(false);
+ }
+ latch.countDown();
+ });
+ }
+
+ // Wait for all tasks to complete
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ // Verify all tasks ran on the same thread
+ assertTrue(singleThread.get(), "Tasks should run on a single thread");
+ assertNotNull(threadName.get());
+ assertTrue(threadName.get().contains("test-executor"));
+ }
+
+ @Test
+ public void testShutdownNow() throws Exception {
+ AtomicInteger completedTasks = new AtomicInteger(0);
+ CountDownLatch blockLatch = new CountDownLatch(1);
+
+ // Submit a blocking task
+ executor.submit(() -> {
+ try {
+ blockLatch.await();
+ completedTasks.incrementAndGet();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+
+ // Submit additional tasks
+ for (int i = 0; i < 5; i++) {
+ executor.submit(() -> completedTasks.incrementAndGet());
+ }
+
+ // Shutdown immediately
+ executor.shutdownNow();
+ assertTrue(executor.isShutdown());
+
+ // Release the blocking task
+ blockLatch.countDown();
+
+ // Wait for termination
+ assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
+ assertTrue(executor.isTerminated());
+
+ // Some tasks may not have completed due to immediate shutdown
+ assertTrue(completedTasks.get() < 6);
+ }
+
+ @Test
+ public void testScheduledFutureTaskComparison() throws Exception {
+ // Test that scheduled tasks are ordered correctly by time and sequence
+ AtomicInteger executionOrder = new AtomicInteger(0);
+ AtomicInteger[] results = new AtomicInteger[3];
+ for (int i = 0; i < 3; i++) {
+ results[i] = new AtomicInteger(-1);
+ }
+
+ // Schedule tasks with different delays
+ ScheduledFuture<?> future3 = executor.schedule(() -> {
+ results[2].set(executionOrder.getAndIncrement());
+ }, 200, TimeUnit.MILLISECONDS);
+
+ ScheduledFuture<?> future1 = executor.schedule(() -> {
+ results[0].set(executionOrder.getAndIncrement());
+ }, 50, TimeUnit.MILLISECONDS);
+
+ ScheduledFuture<?> future2 = executor.schedule(() -> {
+ results[1].set(executionOrder.getAndIncrement());
+ }, 100, TimeUnit.MILLISECONDS);
+
+ // Wait for all to complete
+ future1.get(1, TimeUnit.SECONDS);
+ future2.get(1, TimeUnit.SECONDS);
+ future3.get(1, TimeUnit.SECONDS);
+
+ // Verify execution order matches delay order
+ assertEquals(results[0].get(), 0); // First to execute (50ms delay)
+ assertEquals(results[1].get(), 1); // Second to execute (100ms delay)
+ assertEquals(results[2].get(), 2); // Third to execute (200ms delay)
+ }
+
+ @Test
+ public void testPeriodicTaskWithShutdown() throws Exception {
+ AtomicInteger executionCount = new AtomicInteger(0);
+
+ // Start a periodic task
+ ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
+ executionCount.incrementAndGet();
+ }, 0, 50, TimeUnit.MILLISECONDS);
+
+ // Let it run a few times
+ Thread.sleep(200);
+ int countBeforeShutdown = executionCount.get();
+ assertTrue(countBeforeShutdown >= 2);
+
+ // Shutdown the executor
+ executor.shutdown();
+
+ // Wait a bit more
+ Thread.sleep(200);
+
+ // Task should stop executing after shutdown
+ int countAfterShutdown = executionCount.get();
+ // Allow for one more execution that might have been in progress
+ assertTrue(countAfterShutdown <= countBeforeShutdown + 1);
+
+ assertTrue(future.isCancelled() || future.isDone());
+ }
+
+ @Test
+ public void testSafeRunnableExceptionLogging() throws Exception {
+ // This test verifies that exceptions are caught and logged by
SafeRunnable
+ AtomicBoolean taskExecuted = new AtomicBoolean(false);
+ AtomicBoolean exceptionThrown = new AtomicBoolean(false);
+
+ Future<?> future = executor.submit(() -> {
+ taskExecuted.set(true);
+ exceptionThrown.set(true);
+ throw new RuntimeException("Test exception for SafeRunnable");
+ });
+
+ // Wait for task completion
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ // Exception might be propagated through Future
+ }
+
+ assertTrue(taskExecuted.get());
+ assertTrue(exceptionThrown.get());
+ assertTrue(future.isDone());
+ }
+}