This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e1dbc749d10 Improved ScheduledExecutors behavior and metrics. (#19168)
e1dbc749d10 is described below
commit e1dbc749d10a130b68166f65266e64616700de1e
Author: Gian Merlino <[email protected]>
AuthorDate: Tue Mar 17 21:43:46 2026 -0700
Improved ScheduledExecutors behavior and metrics. (#19168)
Main changes:
1) Fix scheduleAtFixedRate to schedule the next run after the prior
run completes, rather than before it starts. The previous logic could
lead to the callable running concurrently with itself when the
scheduled executor is multi-threaded.
2) Align scheduleAtFixedRate and scheduleWithFixedDelay so both of them
re-schedule when the callable throws an exception. Previously
scheduleAtFixedRate did re-schedule but scheduleWithFixedDelay did not.
3) Add DelayMetricEmittingScheduledExecutorService which emits a metric
for scheduling delay. This helps operators understand if a particular
scheduled executor is too overloaded to run tasks on time.
4) Add scheduling delay metrics: overlord/duty/wait/millis,
coordinator/duty/wait/millis, and ingest/reporting/wait/millis.
---
.../overlord/duty/OverlordDutyExecutor.java | 12 +-
.../supervisor/SeekableStreamSupervisor.java | 44 ++--
.../overlord/duty/OverlordDutyExecutorTest.java | 5 +-
...treamSupervisorScaleDuringTaskRolloverTest.java | 6 +-
.../SeekableStreamSupervisorStateTest.java | 32 ++-
.../java/org/apache/druid/msq/exec/WorkerImpl.java | 6 +-
.../druid/msq/util/DecoratedExecutorService.java | 162 ------------
.../frame/processor/FrameProcessorExecutor.java | 7 +-
.../concurrent/DecoratedExecutorService.java | 175 +++++++++++++
.../DecoratedScheduledExecutorService.java | 92 +++++++
...elayMetricEmittingScheduledExecutorService.java | 125 +++++++++
.../util/common/concurrent/ScheduledExecutors.java | 102 +++++---
...MetricEmittingScheduledExecutorServiceTest.java | 282 +++++++++++++++++++++
.../common/concurrent/ScheduledExecutorsTest.java | 187 +++++++++++++-
.../druid/server/coordinator/DruidCoordinator.java | 7 +-
15 files changed, 995 insertions(+), 249 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
index 689d0876894..91fb734269c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutor.java
@@ -25,8 +25,10 @@ import
org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
@@ -38,6 +40,7 @@ public class OverlordDutyExecutor
private static final Logger log = new Logger(OverlordDutyExecutor.class);
private final ScheduledExecutorFactory execFactory;
+ private final ServiceEmitter emitter;
private final Set<OverlordDuty> duties;
private volatile ScheduledExecutorService exec;
@@ -47,10 +50,12 @@ public class OverlordDutyExecutor
@Inject
public OverlordDutyExecutor(
ScheduledExecutorFactory scheduledExecutorFactory,
+ ServiceEmitter emitter,
Set<OverlordDuty> duties
)
{
this.execFactory = scheduledExecutorFactory;
+ this.emitter = emitter;
this.duties = duties;
}
@@ -128,7 +133,12 @@ public class OverlordDutyExecutor
{
if (exec == null) {
final int numThreads = 1;
- exec = execFactory.create(numThreads, "Overlord-Duty-Exec--%d");
+ exec = ScheduledExecutors.emittingDelayMetric(
+ execFactory.create(numThreads, "Overlord-Duty-Exec--%d"),
+ emitter,
+ "overlord/duty/wait/millis",
+ Map.of()
+ );
log.info("Initialized duty executor with [%d] threads", numThreads);
}
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index d257c1e4e8c..58d366d2306 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -1009,7 +1009,17 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.supervisorId = spec.getId();
this.exec =
Execs.singleThreaded(StringUtils.encodeForFormat(supervisorTag));
this.scheduledExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorTag) +
"-Scheduler-%d");
- this.reportingExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorTag) +
"-Reporting-%d");
+ final Map<String, Object> reportingDimensions = new HashMap<>();
+ reportingDimensions.put(DruidMetrics.SUPERVISOR_ID, supervisorId);
+ reportingDimensions.put(DruidMetrics.DATASOURCE, dataSource);
+ reportingDimensions.put(DruidMetrics.STREAM, ioConfig.getStream());
+ reportingDimensions.put(DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS));
+ this.reportingExec = ScheduledExecutors.emittingDelayMetric(
+
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorTag) +
"-Reporting-%d"),
+ spec.getEmitter(),
+ "ingest/reporting/wait/millis",
+ reportingDimensions
+ );
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
@@ -4742,26 +4752,26 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
SeekableStreamSupervisorTuningConfig tuningConfig = spec.getTuningConfig();
// Lag is collected with fixed delay instead of fixed rate as lag
collection can involve calling external
// services and with fixed delay, a cooling buffer is guaranteed between
successive calls
- reportingExec.scheduleWithFixedDelay(
- this::updateCurrentAndLatestOffsets,
- ioConfig.getStartDelay().getMillis() +
INITIAL_GET_OFFSET_DELAY_MILLIS, // wait for tasks to start up
- Math.max(
+ ScheduledExecutors.scheduleWithFixedDelay(
+ reportingExec,
+ Duration.millis(ioConfig.getStartDelay().getMillis() +
INITIAL_GET_OFFSET_DELAY_MILLIS),
+ Duration.millis(Math.max(
tuningConfig.getOffsetFetchPeriod().getMillis(),
MINIMUM_GET_OFFSET_PERIOD_MILLIS
- ),
- TimeUnit.MILLISECONDS
+ )),
+ this::updateCurrentAndLatestOffsets
);
- reportingExec.scheduleAtFixedRate(
- this::emitLag,
- ioConfig.getStartDelay().getMillis() +
INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
- spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
- TimeUnit.MILLISECONDS
+ ScheduledExecutors.scheduleAtFixedRate(
+ reportingExec,
+ Duration.millis(ioConfig.getStartDelay().getMillis() +
INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS),
+
Duration.millis(spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis()),
+ this::emitLag
);
- reportingExec.scheduleAtFixedRate(
- this::emitNoticesQueueSize,
- ioConfig.getStartDelay().getMillis() +
INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS, // wait for tasks to start up
- spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
- TimeUnit.MILLISECONDS
+ ScheduledExecutors.scheduleAtFixedRate(
+ reportingExec,
+ Duration.millis(ioConfig.getStartDelay().getMillis() +
INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS),
+
Duration.millis(spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis()),
+ this::emitNoticesQueueSize
);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
index bc6be505c62..452ea66c5d9 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/duty/OverlordDutyExecutorTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.duty;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
@@ -48,7 +49,7 @@ public class OverlordDutyExecutorTest
.thenReturn(executorService);
OverlordDutyExecutor dutyExecutor =
- new OverlordDutyExecutor(executorFactory, ImmutableSet.of(testDuty1,
testDuty2));
+ new OverlordDutyExecutor(executorFactory, new StubServiceEmitter(),
ImmutableSet.of(testDuty1, testDuty2));
// Invoke start multiple times
dutyExecutor.start();
@@ -81,7 +82,7 @@ public class OverlordDutyExecutorTest
ScheduledExecutorFactory executorFactory =
Mockito.mock(ScheduledExecutorFactory.class);
OverlordDutyExecutor dutyExecutor =
- new OverlordDutyExecutor(executorFactory,
Collections.singleton(testDuty));
+ new OverlordDutyExecutor(executorFactory, new StubServiceEmitter(),
Collections.singleton(testDuty));
dutyExecutor.start();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
index 01882f1e821..53f96706b67 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java
@@ -29,8 +29,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.util.List;
-
public class SeekableStreamSupervisorScaleDuringTaskRolloverTest extends
SeekableStreamSupervisorTestBase
{
private static final int DEFAULT_TASK_COUNT = 10;
@@ -105,9 +103,6 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
EasyMock.expect(spec.createAutoscaler(EasyMock.anyObject()))
.andReturn(createMockAutoScaler(targetTaskCount))
.anyTimes();
- EasyMock.expect(spec.getContextValue(EasyMock.eq(DruidMetrics.TAGS)))
- .andReturn(List.of("tag1", "tag2"))
- .anyTimes();
EasyMock.replay(spec);
TestSeekableStreamSupervisor supervisor = new
TestSeekableStreamSupervisor(100);
@@ -167,6 +162,7 @@ public class
SeekableStreamSupervisorScaleDuringTaskRolloverTest extends Seekabl
EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
+
EasyMock.expect(spec.getContextValue(EasyMock.eq(DruidMetrics.TAGS))).andReturn(null).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index 3f534de8068..d59e2711ce8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -77,6 +77,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.EmittingLogger;
@@ -712,6 +713,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}).anyTimes();
EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
+
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskQueue.getActiveTasksForDatasource(DATASOURCE)).andReturn(Map.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
@@ -2563,7 +2565,9 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
latch.await();
supervisor.emitLag();
- Assert.assertEquals(0, emitter.getNumEmittedEvents());
+ Assert.assertEquals(0, emitter.getMetricEvents("ingest/test/lag").size());
+ Assert.assertEquals(0,
emitter.getMetricEvents("ingest/test/maxLag").size());
+ Assert.assertEquals(0,
emitter.getMetricEvents("ingest/test/avgLag").size());
}
private void validateSupervisorStateAfterResetOffsets(
@@ -2591,9 +2595,11 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2);
+ // ScheduledExecutors.scheduleWithFixedDelay and scheduleAtFixedRate use
exec.schedule internally,
+ // so we expect 3 schedule calls (1 for delay-based offset fetching, 2 for
fixed-rate lag/queue reporting)
ScheduledExecutorService executorService =
EasyMock.createMock(ScheduledExecutorService.class);
-
EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(),
EasyMock.eq(86415000L), EasyMock.eq(300000L),
EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once();
- EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(),
EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()),
EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2);
+
EasyMock.expect(executorService.schedule(EasyMock.anyObject(Runnable.class),
EasyMock.anyLong(),
EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(3);
+ EasyMock.expect(executorService.isShutdown()).andReturn(false).anyTimes();
EasyMock.replay(executorService, spec);
final BaseTestSeekableStreamSupervisor supervisor = new
BaseTestSeekableStreamSupervisor()
@@ -3378,17 +3384,17 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
- reportingExec.scheduleAtFixedRate(
- this::emitLag,
- ioConfig.getStartDelay().getMillis(),
- spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
- TimeUnit.MILLISECONDS
+ ScheduledExecutors.scheduleAtFixedRate(
+ reportingExec,
+ Duration.millis(ioConfig.getStartDelay().getMillis()),
+
Duration.millis(spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis()),
+ this::emitLag
);
- reportingExec.scheduleAtFixedRate(
- this::emitNoticesQueueSize,
- ioConfig.getStartDelay().getMillis(),
- spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
- TimeUnit.MILLISECONDS
+ ScheduledExecutors.scheduleAtFixedRate(
+ reportingExec,
+ Duration.millis(ioConfig.getStartDelay().getMillis()),
+
Duration.millis(spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis()),
+ this::emitNoticesQueueSize
);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
index c639b1d7003..0b3bfdc235a 100644
--- a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
+++ b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java
@@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.SettableFuture;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -43,6 +42,7 @@ import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.DecoratedExecutorService;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.counters.CounterSnapshotsTree;
@@ -73,7 +73,6 @@ import
org.apache.druid.msq.shuffle.input.WorkerOrLocalInputChannelFactory;
import org.apache.druid.msq.shuffle.output.StageOutputHolder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.PartialKeyStatisticsInformation;
-import org.apache.druid.msq.util.DecoratedExecutorService;
import org.apache.druid.msq.util.MSQMetricUtils;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.BaseQuery;
@@ -101,6 +100,7 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -884,7 +884,7 @@ public class WorkerImpl implements Worker
* In production, the underlying {@link QueryProcessingPool} pool is set up
by
* {@link org.apache.druid.guice.DruidProcessingModule}.
*/
- private ListeningExecutorService makeProcessingPool()
+ private ExecutorService makeProcessingPool()
{
final QueryProcessingPool queryProcessingPool =
context.injector().getInstance(QueryProcessingPool.class);
final int priority = 0;
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/DecoratedExecutorService.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/util/DecoratedExecutorService.java
deleted file mode 100644
index 68095f6dab6..00000000000
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/util/DecoratedExecutorService.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.msq.util;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * A {@link ListeningExecutorService} where all tasks are automatically
decorated before being submitted to a
- * delegate executor service.
- */
-public class DecoratedExecutorService implements ListeningExecutorService
-{
- private final ListeningExecutorService exec;
- private final Decorator decorator;
-
- public DecoratedExecutorService(
- final ListeningExecutorService exec,
- final Decorator decorator
- )
- {
- this.exec = exec;
- this.decorator = decorator;
- }
-
- @Override
- public <T> ListenableFuture<T> submit(Callable<T> task)
- {
- return exec.submit(decorator.decorateCallable(task));
- }
-
- @Override
- public ListenableFuture<?> submit(Runnable task)
- {
- return exec.submit(decorator.decorateRunnable(task));
- }
-
- @Override
- public <T> ListenableFuture<T> submit(Runnable task, T result)
- {
- return exec.submit(decorator.decorateRunnable(task), result);
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks) throws InterruptedException
- {
- final List<Callable<T>> decoratedTasks = new ArrayList<>();
-
- for (final Callable<T> task : tasks) {
- decoratedTasks.add(decorator.decorateCallable(task));
- }
-
- return exec.invokeAll(decoratedTasks);
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
tasks, long timeout, TimeUnit unit)
- throws InterruptedException
- {
- final List<Callable<T>> decoratedTasks = new ArrayList<>();
-
- for (final Callable<T> task : tasks) {
- decoratedTasks.add(decorator.decorateCallable(task));
- }
-
- return exec.invokeAll(decoratedTasks, timeout, unit);
- }
-
- @Override
- public void shutdown()
- {
- exec.shutdown();
- }
-
- @Override
- public List<Runnable> shutdownNow()
- {
- return exec.shutdownNow();
- }
-
- @Override
- public boolean isShutdown()
- {
- return exec.isShutdown();
- }
-
- @Override
- public boolean isTerminated()
- {
- return exec.isTerminated();
- }
-
- @Override
- public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException
- {
- return exec.awaitTermination(timeout, unit);
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws
InterruptedException, ExecutionException
- {
- final List<Callable<T>> decoratedTasks = new ArrayList<>();
-
- for (final Callable<T> task : tasks) {
- decoratedTasks.add(decorator.decorateCallable(task));
- }
-
- return exec.invokeAny(decoratedTasks);
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException
- {
- final List<Callable<T>> decoratedTasks = new ArrayList<>();
-
- for (final Callable<T> task : tasks) {
- decoratedTasks.add(decorator.decorateCallable(task));
- }
-
- return exec.invokeAny(decoratedTasks, timeout, unit);
- }
-
- @Override
- public void execute(Runnable command)
- {
- exec.execute(decorator.decorateRunnable(command));
- }
-
- public interface Decorator
- {
- <T> Callable<T> decorateCallable(Callable<T> callable);
-
- Runnable decorateRunnable(Runnable runnable);
- }
-}
diff --git
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
index 3ed565212f6..f3d378fe576 100644
---
a/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
+++
b/processing/src/main/java/org/apache/druid/frame/processor/FrameProcessorExecutor.java
@@ -27,7 +27,6 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -68,7 +67,7 @@ public class FrameProcessorExecutor
{
private static final Logger log = new Logger(FrameProcessorExecutor.class);
- private final ListeningExecutorService exec;
+ private final ExecutorService exec;
private final Object lock = new Object();
@@ -97,7 +96,7 @@ public class FrameProcessorExecutor
@GuardedBy("lock")
private final Map<FrameProcessor<?>, Thread> runningProcessors = new
IdentityHashMap<>();
- public FrameProcessorExecutor(final ListeningExecutorService exec)
+ public FrameProcessorExecutor(final ExecutorService exec)
{
this.exec = exec;
}
@@ -513,7 +512,7 @@ public class FrameProcessorExecutor
/**
* Returns the underlying executor service used by this executor.
*/
- ListeningExecutorService getExecutorService()
+ ExecutorService getExecutorService()
{
return exec;
}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DecoratedExecutorService.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DecoratedExecutorService.java
new file mode 100644
index 00000000000..18489d75bac
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DecoratedExecutorService.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.concurrent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An {@link ExecutorService} where all tasks are automatically decorated
before being submitted to a
+ * delegate executor service.
+ */
+public class DecoratedExecutorService implements ExecutorService
+{
+ protected final ExecutorService exec;
+ protected final Decorator decorator;
+
+ public DecoratedExecutorService(
+ final ExecutorService exec,
+ final Decorator decorator
+ )
+ {
+ this.exec = exec;
+ this.decorator = decorator;
+ }
+
+ @Override
+ public <T> Future<T> submit(final Callable<T> task)
+ {
+ return exec.submit(decorator.decorateCallable(task));
+ }
+
+ @Override
+ public Future<?> submit(final Runnable task)
+ {
+ return exec.submit(decorator.decorateRunnable(task));
+ }
+
+ @Override
+ public <T> Future<T> submit(final Runnable task, final T result)
+ {
+ return exec.submit(decorator.decorateRunnable(task), result);
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>>
tasks) throws InterruptedException
+ {
+ return exec.invokeAll(decorateAll(tasks));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ final Collection<? extends Callable<T>> tasks,
+ final long timeout,
+ final TimeUnit unit
+ ) throws InterruptedException
+ {
+ return exec.invokeAll(decorateAll(tasks), timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException
+ {
+ return exec.invokeAny(decorateAll(tasks));
+ }
+
+ @Override
+ public <T> T invokeAny(
+ final Collection<? extends Callable<T>> tasks,
+ final long timeout,
+ final TimeUnit unit
+ ) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ return exec.invokeAny(decorateAll(tasks), timeout, unit);
+ }
+
+ @Override
+ public void execute(final Runnable command)
+ {
+ exec.execute(decorator.decorateRunnable(command));
+ }
+
+ @Override
+ public void shutdown()
+ {
+ exec.shutdown();
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ return exec.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return exec.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return exec.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(final long timeout, final TimeUnit unit)
throws InterruptedException
+ {
+ return exec.awaitTermination(timeout, unit);
+ }
+
+ private <T> List<Callable<T>> decorateAll(final Collection<? extends
Callable<T>> tasks)
+ {
+ final List<Callable<T>> decorated = new ArrayList<>(tasks.size());
+ for (final Callable<T> task : tasks) {
+ decorated.add(decorator.decorateCallable(task));
+ }
+ return decorated;
+ }
+
+ /**
+ * Decorates tasks before they are submitted to an executor.
+ */
+ public interface Decorator
+ {
+ <T> Callable<T> decorateCallable(Callable<T> callable);
+
+ Runnable decorateRunnable(Runnable runnable);
+
+ /**
+ * Decorates a callable that will be executed after an intended delay.
Used by
+ * {@link DecoratedScheduledExecutorService} for {@link
java.util.concurrent.ScheduledExecutorService#schedule}.
+ * Defaults to {@link #decorateCallable}.
+ */
+ default <T> Callable<T> decorateScheduledCallable(Callable<T> callable,
long delay, TimeUnit unit)
+ {
+ return decorateCallable(callable);
+ }
+
+ /**
+ * Decorates a runnable that will be executed after an intended delay.
Used by
+ * {@link DecoratedScheduledExecutorService} for {@link
java.util.concurrent.ScheduledExecutorService#schedule}.
+ * Defaults to {@link #decorateRunnable}.
+ */
+ default Runnable decorateScheduledRunnable(Runnable runnable, long delay,
TimeUnit unit)
+ {
+ return decorateRunnable(runnable);
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DecoratedScheduledExecutorService.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DecoratedScheduledExecutorService.java
new file mode 100644
index 00000000000..3a3b6d1ca26
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DecoratedScheduledExecutorService.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.concurrent;
+
+import org.apache.druid.error.NotYetImplemented;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link ScheduledExecutorService} where all tasks are automatically
decorated before being submitted to a
+ * delegate executor service. Extends {@link DecoratedExecutorService} to
handle the
+ * {@link java.util.concurrent.ExecutorService} methods.
+ * <p>
+ * The JDK {@link #scheduleAtFixedRate} and {@link #scheduleWithFixedDelay}
methods are not supported;
+ * use {@link ScheduledExecutors} instead.
+ */
+public class DecoratedScheduledExecutorService extends DecoratedExecutorService
+ implements ScheduledExecutorService
+{
+ private final ScheduledExecutorService scheduledExec;
+
+ public DecoratedScheduledExecutorService(
+ final ScheduledExecutorService delegate,
+ final Decorator decorator
+ )
+ {
+ super(delegate, decorator);
+ this.scheduledExec = delegate;
+ }
+
+ @Override
+ public ScheduledFuture<?> schedule(final Runnable command, final long delay,
final TimeUnit unit)
+ {
+ return scheduledExec.schedule(decorator.decorateScheduledRunnable(command,
delay, unit), delay, unit);
+ }
+
+ @Override
+ public <V> ScheduledFuture<V> schedule(final Callable<V> callable, final
long delay, final TimeUnit unit)
+ {
+ return
scheduledExec.schedule(decorator.decorateScheduledCallable(callable, delay,
unit), delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(
+ final Runnable command,
+ final long initialDelay,
+ final long period,
+ final TimeUnit unit
+ )
+ {
+ throw NotYetImplemented.ex(
+ null,
+ "Class[%s] does not implement scheduleAtFixedRate, use
ScheduledExecutors.scheduleAtFixedRate",
+ getClass().getName()
+ );
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(
+ final Runnable command,
+ final long initialDelay,
+ final long delay,
+ final TimeUnit unit
+ )
+ {
+ throw NotYetImplemented.ex(
+ null,
+ "Class[%s] does not implement scheduleWithFixedDelay, use
ScheduledExecutors.scheduleWithFixedDelay",
+ getClass().getName()
+ );
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DelayMetricEmittingScheduledExecutorService.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DelayMetricEmittingScheduledExecutorService.java
new file mode 100644
index 00000000000..b61580e7ee9
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/DelayMetricEmittingScheduledExecutorService.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.concurrent;
+
+import org.apache.druid.java.util.common.Stopwatch;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link DecoratedScheduledExecutorService} that emits a scheduling delay
metric each time a
+ * task runs. The metric value is the difference between the actual time from
submission
+ * to execution and the intended delay, in milliseconds, floored at zero.
+ * <p>
+ * Use {@link ScheduledExecutors#emittingDelayMetric} for convenience.
+ */
+public class DelayMetricEmittingScheduledExecutorService extends
DecoratedScheduledExecutorService
+{
+ public DelayMetricEmittingScheduledExecutorService(
+ final ScheduledExecutorService delegate,
+ final ServiceEmitter emitter,
+ final String metricName,
+ final Map<String, Object> metricDimensions
+ )
+ {
+ super(delegate, new SchedulingDelayDecorator(emitter, metricName,
metricDimensions));
+ }
+
+ private static class SchedulingDelayDecorator implements Decorator
+ {
+ private final ServiceEmitter emitter;
+ private final String metricName;
+ private final Map<String, Object> metricDimensions;
+
+ private SchedulingDelayDecorator(
+ final ServiceEmitter emitter,
+ final String metricName,
+ final Map<String, Object> metricDimensions
+ )
+ {
+ this.emitter = emitter;
+ this.metricName = metricName;
+ this.metricDimensions = metricDimensions;
+ }
+
+ @Override
+ public <T> Callable<T> decorateCallable(final Callable<T> callable)
+ {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ return () -> {
+ emitSchedulingDelay(stopwatch.millisElapsed(), 0);
+ return callable.call();
+ };
+ }
+
+ @Override
+ public Runnable decorateRunnable(final Runnable runnable)
+ {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ return () -> {
+ emitSchedulingDelay(stopwatch.millisElapsed(), 0);
+ runnable.run();
+ };
+ }
+
+ @Override
+ public <T> Callable<T> decorateScheduledCallable(
+ final Callable<T> callable,
+ final long delay,
+ final TimeUnit unit
+ )
+ {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ final long intendedDelayMillis = unit.toMillis(delay);
+ return () -> {
+ emitSchedulingDelay(stopwatch.millisElapsed(), intendedDelayMillis);
+ return callable.call();
+ };
+ }
+
+ @Override
+ public Runnable decorateScheduledRunnable(final Runnable runnable, final
long delay, final TimeUnit unit)
+ {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ final long intendedDelayMillis = unit.toMillis(delay);
+ return () -> {
+ emitSchedulingDelay(stopwatch.millisElapsed(), intendedDelayMillis);
+ runnable.run();
+ };
+ }
+
+ private void emitSchedulingDelay(final long actualDelayMillis, final long
intendedDelayMillis)
+ {
+ final long delayMillis = Math.max(0, actualDelayMillis -
intendedDelayMillis);
+
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ for (final Map.Entry<String, Object> entry :
metricDimensions.entrySet()) {
+ builder.setDimensionIfNotNull(entry.getKey(), entry.getValue());
+ }
+
+ emitter.emit(builder.setMetric(metricName, delayMillis));
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index bd2fea04d46..271ef85d130 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -19,10 +19,13 @@
package org.apache.druid.java.util.common.concurrent;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.joda.time.Duration;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -48,26 +51,17 @@ public class ScheduledExecutors
exec,
initialDelay,
delay,
- new Callable<>()
- {
- @Override
- public Signal call()
- {
- runnable.run(); // (Exceptions are handled for us)
- if (exec.isShutdown()) {
- log.warn("ScheduledExecutorService is ShutDown. Return
'Signal.STOP' and stopped rescheduling %s (delay %s)", this, delay);
- return Signal.STOP;
- } else {
- return Signal.REPEAT;
- }
- }
+ () -> {
+ runnable.run(); // (Exceptions are handled for us)
+ return Signal.REPEAT;
}
);
}
/**
* Run callable repeatedly with the given delay between calls, until it
- * returns Signal.STOP. Exceptions are caught and logged as errors.
+ * returns Signal.STOP or the executor is shut down. Exceptions are caught
+ * and logged as errors, and do not prevent subsequent executions.
*/
public static void scheduleWithFixedDelay(
final ScheduledExecutorService exec,
@@ -83,17 +77,21 @@ public class ScheduledExecutors
@Override
public void run()
{
+ Signal signal = Signal.REPEAT;
+
try {
log.trace("Running %s (delay %s)", callable, delay);
- if (callable.call() == Signal.REPEAT) {
- log.trace("Rescheduling %s (delay %s)", callable, delay);
- exec.schedule(this, delay.getMillis(), TimeUnit.MILLISECONDS);
- } else {
- log.debug("Stopped rescheduling %s (delay %s)", callable,
delay);
- }
+ signal = callable.call();
}
catch (Throwable e) {
- log.error(e, "Uncaught exception.");
+ log.warn(e, "Uncaught exception. Rescheduling.");
+ }
+
+ if (signal == Signal.REPEAT && !exec.isShutdown()) {
+ log.trace("Rescheduling %s (delay %s)", callable, delay);
+ exec.schedule(this, delay.getMillis(), TimeUnit.MILLISECONDS);
+ } else {
+ log.debug("Stopped rescheduling %s (delay %s)", callable, delay);
}
}
},
@@ -108,11 +106,14 @@ public class ScheduledExecutors
* <p>
* This differs from {@link #scheduleWithFixedDelay} in that the period is
measured from the start of each
* execution rather than from the completion. If an execution takes longer
than the period, the next execution
- * will begin immediately after the current one starts.
+ * will begin immediately after the current one completes.
* <p>
* This also differs from {@link
ScheduledExecutorService#scheduleAtFixedRate} in that it prevents task pileup:
* only one future execution is scheduled at a time rather than scheduling
all future executions upfront.
* This prevents a backlog of pending tasks from building up if the executor
is delayed or tasks run slowly.
+ * <p>
+ * Exceptions thrown by the task are caught and logged as errors, and do not
prevent subsequent executions.
+ * Scheduling also stops if the executor is shut down.
*
* @param exec the ScheduledExecutorService to use for scheduling
* @param initialDelay the duration to wait before the first execution
@@ -126,15 +127,15 @@ public class ScheduledExecutors
final Runnable runnable
)
{
- scheduleAtFixedRate(exec, initialDelay, period, new Callable<Signal>()
- {
- @Override
- public Signal call()
- {
- runnable.run();
- return Signal.REPEAT;
- }
- });
+ scheduleAtFixedRate(
+ exec,
+ initialDelay,
+ period,
+ () -> {
+ runnable.run(); // (Exceptions are handled for us)
+ return Signal.REPEAT;
+ }
+ );
}
public static void scheduleAtFixedRate(ScheduledExecutorService exec,
Duration rate, Callable<Signal> callable)
@@ -153,21 +154,26 @@ public class ScheduledExecutors
exec.schedule(
new Runnable()
{
- private volatile Signal prevSignal = null;
-
@Override
public void run()
{
- if (prevSignal == null || prevSignal == Signal.REPEAT) {
- exec.schedule(this, rate.getMillis(), TimeUnit.MILLISECONDS);
- }
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ Signal signal = Signal.REPEAT;
try {
log.trace("Running %s (period %s)", callable, rate);
- prevSignal = callable.call();
+ signal = callable.call();
}
catch (Throwable e) {
- log.error(e, "Uncaught exception.");
+ log.warn(e, "Uncaught exception. Rescheduling.");
+ }
+
+ if (signal == Signal.REPEAT && !exec.isShutdown()) {
+ final long nextDelay = Math.max(0, rate.getMillis() -
stopwatch.millisElapsed());
+ log.trace("Rescheduling %s (delay %s)", callable, nextDelay);
+ exec.schedule(this, nextDelay, TimeUnit.MILLISECONDS);
+ } else {
+ log.debug("Stopped rescheduling %s (rate %s)", callable, rate);
}
}
},
@@ -181,6 +187,26 @@ public class ScheduledExecutors
REPEAT, STOP
}
+ /**
+ * Wraps a {@link ScheduledExecutorService} to emit a metric each time a
task from
+ * {@link ScheduledExecutorService#schedule} runs. The metric value is the
scheduling lag:
+ * the difference between the actual delay and the intended delay, in
milliseconds, floored at zero.
+ *
+ * @param exec the executor to wrap
+ * @param emitter the emitter to emit metrics to
+ * @param metricName the name of the metric to emit
+ * @param metricDimensions dimensions to include with the metric
+ */
+ public static ScheduledExecutorService emittingDelayMetric(
+ final ScheduledExecutorService exec,
+ final ServiceEmitter emitter,
+ final String metricName,
+ final Map<String, Object> metricDimensions
+ )
+ {
+ return new DelayMetricEmittingScheduledExecutorService(exec, emitter,
metricName, metricDimensions);
+ }
+
public static ScheduledExecutorFactory createFactory(final Lifecycle
lifecycle)
{
return (corePoolSize, nameFormat) ->
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/DelayMetricEmittingScheduledExecutorServiceTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/DelayMetricEmittingScheduledExecutorServiceTest.java
new file mode 100644
index 00000000000..0695f1f7e72
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/DelayMetricEmittingScheduledExecutorServiceTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.concurrent;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.joda.time.Duration;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DelayMetricEmittingScheduledExecutorServiceTest
+{
+ private static final String METRIC_NAME = "schedule/lag";
+
+ @Test
+ public void testScheduleRunnableEmitsLagMetric() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of("component", "testComponent")
+ );
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ exec.schedule(latch::countDown, 10, TimeUnit.MILLISECONDS);
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Task should
complete");
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size(), "Should emit exactly one
metric");
+
+ final ServiceMetricEvent event = events.get(0);
+ Assertions.assertTrue(event.getValue().longValue() >= 0, "Lag should be >=
0");
+ Assertions.assertEquals("testComponent",
event.getUserDims().get("component"));
+ }
+
+ @Test
+ public void testScheduleCallableEmitsLagMetric() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of("component", "testComponent")
+ );
+
+ final String result = exec.schedule(() -> "hello", 10,
TimeUnit.MILLISECONDS).get(5, TimeUnit.SECONDS);
+ exec.shutdown();
+
+ Assertions.assertEquals("hello", result);
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size(), "Should emit exactly one
metric");
+ Assertions.assertTrue(events.get(0).getValue().longValue() >= 0, "Lag
should be >= 0");
+ }
+
+ @Test
+ public void testLagIsSmallWhenExecutorIsNotOverloaded() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of()
+ );
+
+ // Schedule with 200ms delay — lag should be small (not 200ms)
+ final CountDownLatch latch = new CountDownLatch(1);
+ exec.schedule(latch::countDown, 200, TimeUnit.MILLISECONDS);
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Task should
complete");
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size());
+ Assertions.assertTrue(
+ events.get(0).getValue().longValue() < 100,
+ "Lag should be less than 100ms, was: " +
events.get(0).getValue().longValue()
+ );
+ }
+
+ @Test
+ public void testEmitsOnRepeatedScheduleFromScheduledExecutors() throws
Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of("scheduler", "test")
+ );
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger count = new AtomicInteger(0);
+
+ // Use ScheduledExecutors.scheduleAtFixedRate which calls exec.schedule
internally
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ Duration.millis(0),
+ Duration.millis(100),
+ () -> {
+ if (count.incrementAndGet() >= 3) {
+ latch.countDown();
+ return ScheduledExecutors.Signal.STOP;
+ }
+ return ScheduledExecutors.Signal.REPEAT;
+ }
+ );
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Should complete");
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ // 3 executions = 3 schedule calls that ran = 3 metrics
+ Assertions.assertEquals(3, events.size(), "Should emit one metric per
schedule call");
+
+ for (final ServiceMetricEvent event : events) {
+ Assertions.assertEquals("test", event.getUserDims().get("scheduler"));
+ }
+ }
+
+ @Test
+ public void testExecuteEmitsLagMetric() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of("component", "testComponent")
+ );
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ exec.execute(latch::countDown);
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Task should
complete");
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size(), "Should emit exactly one
metric");
+ Assertions.assertTrue(events.get(0).getValue().longValue() >= 0, "Lag
should be >= 0");
+ Assertions.assertEquals("testComponent",
events.get(0).getUserDims().get("component"));
+ }
+
+ @Test
+ public void testSubmitRunnableEmitsLagMetric() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of()
+ );
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Future<?> future = exec.submit(latch::countDown);
+
+ Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS), "Task should
complete");
+ future.get(5, TimeUnit.SECONDS);
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size(), "Should emit exactly one
metric");
+ Assertions.assertTrue(events.get(0).getValue().longValue() >= 0, "Lag
should be >= 0");
+ }
+
+ @Test
+ public void testSubmitRunnableWithResultEmitsLagMetric() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of()
+ );
+
+ final Future<String> future = exec.submit(() -> {}, "result");
+
+ Assertions.assertEquals("result", future.get(5, TimeUnit.SECONDS));
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size(), "Should emit exactly one
metric");
+ Assertions.assertTrue(events.get(0).getValue().longValue() >= 0, "Lag
should be >= 0");
+ }
+
+ @Test
+ public void testSubmitCallableEmitsLagMetric() throws Exception
+ {
+ final StubServiceEmitter emitter = new StubServiceEmitter();
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ emitter,
+ METRIC_NAME,
+ ImmutableMap.of()
+ );
+
+ final Future<String> future = exec.submit(() -> "hello");
+
+ Assertions.assertEquals("hello", future.get(5, TimeUnit.SECONDS));
+ exec.shutdown();
+
+ final List<ServiceMetricEvent> events =
emitter.getMetricEvents(METRIC_NAME);
+ Assertions.assertEquals(1, events.size(), "Should emit exactly one
metric");
+ Assertions.assertTrue(events.get(0).getValue().longValue() >= 0, "Lag
should be >= 0");
+ }
+
+ @Test
+ public void testScheduleAtFixedRateThrows()
+ {
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ new StubServiceEmitter(),
+ METRIC_NAME,
+ ImmutableMap.of()
+ );
+
+ try {
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> exec.scheduleAtFixedRate(() -> {}, 0, 100,
TimeUnit.MILLISECONDS)
+ );
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+
+ @Test
+ public void testScheduleWithFixedDelayThrows()
+ {
+ final ScheduledExecutorService exec =
ScheduledExecutors.emittingDelayMetric(
+ Execs.scheduledSingleThreaded("test-%d"),
+ new StubServiceEmitter(),
+ METRIC_NAME,
+ ImmutableMap.of()
+ );
+
+ try {
+ Assertions.assertThrows(
+ DruidException.class,
+ () -> exec.scheduleWithFixedDelay(() -> {}, 0, 100,
TimeUnit.MILLISECONDS)
+ );
+ }
+ finally {
+ exec.shutdown();
+ }
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
index 81dc7ccd8c1..b256dc8268e 100644
---
a/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
+++
b/processing/src/test/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutorsTest.java
@@ -38,7 +38,7 @@ public class ScheduledExecutorsTest
Duration initialDelay = Duration.millis(100);
Duration delay = Duration.millis(500);
int taskDuration = 100; // ms
- ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("BasicAuthenticatorCacheManager-Exec--%d");
+ ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testFixedDelay--%d");
List<Long> taskStartTimes = new ArrayList<>();
AtomicInteger executionCount = new AtomicInteger(0);
@@ -97,6 +97,69 @@ public class ScheduledExecutorsTest
}
}
+ @Test
+ public void testScheduleWithFixedDelayContinuesAfterException() throws
Exception
+ {
+ final Duration initialDelay = Duration.millis(0);
+ final Duration delay = Duration.millis(100);
+ final ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testFixedDelayException-%d");
+
+ final AtomicInteger executionCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ScheduledExecutors.scheduleWithFixedDelay(
+ exec,
+ initialDelay,
+ delay,
+ () -> {
+ final int count = executionCount.incrementAndGet();
+ if (count == 1) {
+ throw new RuntimeException("test exception");
+ }
+ if (count >= 3) {
+ latch.countDown();
+ return ScheduledExecutors.Signal.STOP;
+ }
+ return ScheduledExecutors.Signal.REPEAT;
+ }
+ );
+
+ final boolean completed = latch.await(5, TimeUnit.SECONDS);
+ exec.shutdown();
+
+ Assert.assertTrue("Should continue executing after exception", completed);
+ Assert.assertEquals("Should have exactly 3 executions", 3,
executionCount.get());
+ }
+
+ @Test
+ public void testScheduleWithFixedDelayStopsOnSignalStop() throws Exception
+ {
+ final Duration initialDelay = Duration.millis(0);
+ final Duration delay = Duration.millis(100);
+ final ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testFixedDelayStop-%d");
+
+ final AtomicInteger executionCount = new AtomicInteger(0);
+
+ ScheduledExecutors.scheduleWithFixedDelay(
+ exec,
+ initialDelay,
+ delay,
+ () -> {
+ executionCount.incrementAndGet();
+ return ScheduledExecutors.Signal.STOP;
+ }
+ );
+
+ Thread.sleep(500);
+ exec.shutdown();
+
+ Assert.assertEquals(
+ "Should execute exactly once before stopping",
+ 1,
+ executionCount.get()
+ );
+ }
+
@Test
public void testScheduleAtFixedRateWithLongRunningTask() throws Exception
{
@@ -153,10 +216,10 @@ public class ScheduledExecutorsTest
// Should be at least 800ms (first task duration), but not much more since
it schedules immediately
Assert.assertTrue(
"Second task should start after first task completes (~800ms), was: "
+ timeBetweenFirstAndSecond,
- timeBetweenFirstAndSecond >= 750 && timeBetweenFirstAndSecond <= 900
+ timeBetweenFirstAndSecond >= 750 && timeBetweenFirstAndSecond <= 950
);
- // Verify subsequent tasks maintain the period
+ // Verify subsequent tasks maintain the period (no catch-up burst)
long timeBetweenSecondAndThird = executionStartTimes.get(2) -
executionStartTimes.get(1);
// Should be approximately 500ms (the period), since tasks now finish
quickly
Assert.assertTrue(
@@ -164,4 +227,122 @@ public class ScheduledExecutorsTest
timeBetweenSecondAndThird >= 450 && timeBetweenSecondAndThird <= 600
);
}
+
+ @Test
+ public void
testScheduleAtFixedRateNoConcurrentExecutionWithMultipleThreads() throws
Exception
+ {
+ final Duration initialDelay = Duration.millis(0);
+ final Duration period = Duration.millis(100);
+ // Use 4 threads to verify that even with available threads, only one
execution runs at a time
+ final ScheduledExecutorService exec = ScheduledExecutors.fixed(4,
"testNoConcurrency-%d");
+
+ final AtomicInteger concurrentCount = new AtomicInteger(0);
+ final AtomicInteger maxConcurrentCount = new AtomicInteger(0);
+ final AtomicInteger executionCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ initialDelay,
+ period,
+ () -> {
+ try {
+ final int concurrent = concurrentCount.incrementAndGet();
+
+ // Track the maximum number of concurrent executions
+ int currentMax;
+ do {
+ currentMax = maxConcurrentCount.get();
+ } while (concurrent > currentMax &&
!maxConcurrentCount.compareAndSet(currentMax, concurrent));
+
+ // Task takes 3x the period — would cause overlap if scheduling
were broken
+ Thread.sleep(period.getMillis() * 3);
+
+ concurrentCount.decrementAndGet();
+
+ if (executionCount.incrementAndGet() >= 4) {
+ latch.countDown();
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ );
+
+ final boolean completed = latch.await(10, TimeUnit.SECONDS);
+ exec.shutdown();
+
+ Assert.assertTrue("Should complete within timeout", completed);
+ Assert.assertEquals(
+ "Should never have more than 1 concurrent execution",
+ 1,
+ maxConcurrentCount.get()
+ );
+ }
+
+ @Test
+ public void testScheduleAtFixedRateStopsOnSignalStop() throws Exception
+ {
+ final Duration initialDelay = Duration.millis(0);
+ final Duration period = Duration.millis(100);
+ final ScheduledExecutorService exec = ScheduledExecutors.fixed(4,
"testStop-%d");
+
+ final AtomicInteger executionCount = new AtomicInteger(0);
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ initialDelay,
+ period,
+ () -> {
+ executionCount.incrementAndGet();
+ return ScheduledExecutors.Signal.STOP;
+ }
+ );
+
+ // Wait enough time for multiple executions to have occurred if stopping
were broken
+ Thread.sleep(500);
+ exec.shutdown();
+
+ Assert.assertEquals(
+ "Should execute exactly once before stopping",
+ 1,
+ executionCount.get()
+ );
+ }
+
+ @Test
+ public void testScheduleAtFixedRateContinuesAfterException() throws Exception
+ {
+ final Duration initialDelay = Duration.millis(0);
+ final Duration period = Duration.millis(100);
+ final ScheduledExecutorService exec =
Execs.scheduledSingleThreaded("testException-%d");
+
+ final AtomicInteger executionCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ ScheduledExecutors.scheduleAtFixedRate(
+ exec,
+ initialDelay,
+ period,
+ () -> {
+ final int count = executionCount.incrementAndGet();
+ if (count == 1) {
+ throw new RuntimeException("test exception");
+ }
+ if (count >= 3) {
+ latch.countDown();
+ return ScheduledExecutors.Signal.STOP;
+ }
+ return ScheduledExecutors.Signal.REPEAT;
+ }
+ );
+
+ final boolean completed = latch.await(5, TimeUnit.SECONDS);
+ exec.shutdown();
+
+ Assert.assertTrue("Should continue executing after exception", completed);
+ Assert.assertEquals("Should have exactly 3 executions", 3,
executionCount.get());
+ }
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index c9cb719f56e..72f50a87a57 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -711,7 +711,12 @@ public class DruidCoordinator
{
this.startingLeaderCounter = startingLeaderCounter;
this.dutyGroup = new CoordinatorDutyGroup(alias, duties, period, this);
- this.executor = executorFactory.create(1, "Coordinator-Exec-" + alias +
"-%d");
+ this.executor = ScheduledExecutors.emittingDelayMetric(
+ executorFactory.create(1, "Coordinator-Exec-" + alias + "-%d"),
+ emitter,
+ "coordinator/duty/wait/millis",
+ Map.of(Dimension.DUTY_GROUP.reportedName(), alias)
+ );
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]