This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 068e5720dba Improve extensibility of MSQ Dart engine via extensions
(#19127)
068e5720dba is described below
commit 068e5720dba1c5e1c8528008250e6686d2062767
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Mar 17 15:15:32 2026 +0530
Improve extensibility of MSQ Dart engine via extensions (#19127)
This is a collection of minor refactors that help extensions use and
override certain Dart capabilities.
Changes:
- Add utility method `ServletResourceUtils.createAsyncTimeoutListener`
- Track idle duration of a Dart worker
- Fix visibility of some methods and classes
- Add equals and hashCode for `ChangeRequestHistory.Counter`
- Add `SegmentListerResourceTest`
- Add test method to `ServletResourceUtilsTest`
---
.../worker/http/TaskManagementResource.java | 34 +-----
.../msq/dart/worker/DartWorkerClientImpl.java | 4 +-
.../dart/worker/DartWorkerContextFactoryImpl.java | 7 +-
.../druid/msq/dart/worker/DartWorkerRunner.java | 26 ++++-
.../msq/dart/worker/http/GetWorkersResponse.java | 18 ++-
.../org/apache/druid/msq/rpc/WorkerResource.java | 35 ++----
.../msq/dart/worker/DartWorkerRunnerTest.java | 2 +-
.../dart/worker/http/GetWorkersResponseTest.java | 3 +-
.../discovery/DruidNodeDiscoveryProvider.java | 36 ++++--
.../messages/server/MessageRelayResource.java | 37 ++----
.../server/coordination/ChangeRequestHistory.java | 25 ++++-
.../druid/server/http/SegmentListerResource.java | 64 ++---------
.../druid/server/http/ServletResourceUtils.java | 39 +++++++
.../server/http/SegmentListerResourceTest.java | 124 +++++++++++++++++++++
.../server/http/ServletResourceUtilsTest.java | 29 +++++
.../druid/server/mocks/MockAsyncContext.java | 8 +-
.../testing/embedded/EmbeddedClusterApis.java | 21 +---
.../testing/embedded/EmbeddedServiceClient.java | 31 ++++++
18 files changed, 359 insertions(+), 184 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
index dd9316a7a7d..5cb22cbc9e2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
@@ -36,11 +36,10 @@ import org.apache.druid.indexing.worker.WorkerTaskManager;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
+import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.http.security.StateResourceFilter;
import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
@@ -129,32 +128,11 @@ public class TaskManagementResource
final AsyncContext asyncContext = req.startAsync();
asyncContext.addListener(
- new AsyncListener()
- {
- @Override
- public void onComplete(AsyncEvent event)
- {
- }
-
- @Override
- public void onTimeout(AsyncEvent event)
- {
-
- // HTTP 204 NO_CONTENT is sent to the client.
- future.cancel(true);
- event.getAsyncContext().complete();
- }
-
- @Override
- public void onError(AsyncEvent event)
- {
- }
-
- @Override
- public void onStartAsync(AsyncEvent event)
- {
- }
- }
+ ServletResourceUtils.createAsyncTimeoutListener(event -> {
+ // HTTP 204 NO_CONTENT is sent to the client.
+ future.cancel(true);
+ event.getAsyncContext().complete();
+ })
);
Futures.addCallback(
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
index 3dd25e7ac14..e1de222e1e0 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
@@ -176,12 +176,12 @@ public class DartWorkerClientImpl extends
BaseWorkerClientImpl implements DartWo
/**
* Service client that adds the {@link
DartWorkerResource#HEADER_CONTROLLER_HOST} header.
*/
- private static class ControllerDecoratedClient implements ServiceClient
+ public static class ControllerDecoratedClient implements ServiceClient
{
private final ServiceClient delegate;
private final String controllerHost;
- ControllerDecoratedClient(final ServiceClient delegate, final String
controllerHost)
+ public ControllerDecoratedClient(final ServiceClient delegate, final
String controllerHost)
{
this.delegate = delegate;
this.controllerHost = controllerHost;
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index 6f473166379..07117c80f19 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -120,7 +120,7 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
jsonMapper,
policyEnforcer,
injector,
- new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper,
null),
+ createWorkerClient(queryId),
processingConfig,
segmentWrangler,
groupingEngine,
@@ -135,4 +135,9 @@ public class DartWorkerContextFactoryImpl implements
DartWorkerContextFactory
emitter
);
}
+
+ protected DartWorkerClient createWorkerClient(String queryId)
+ {
+ return new DartWorkerClientImpl(queryId, serviceClientFactory,
smileMapper, null);
+ }
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
index fecc4ec7b2f..205d19ac6b1 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
@@ -30,6 +30,7 @@ import org.apache.druid.error.DruidException;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -72,6 +73,11 @@ public class DartWorkerRunner
@GuardedBy("this")
private final Set<String> activeControllerHosts = new HashSet<>();
+ /**
+ * Used to track the time since this runner has been idle.
+ */
+ private final Stopwatch sinceLastWorkerFinished =
Stopwatch.createUnstarted();
+
/**
* Query ID -> Worker instance.
*/
@@ -147,6 +153,7 @@ public class DartWorkerRunner
holder,
e -> log.warn(e, "Failed to close worker[%s]",
holder.worker.id())
);
+ resetWorkerFinishTime();
this.notifyAll();
}
},
@@ -196,7 +203,9 @@ public class DartWorkerRunner
{
final List<DartWorkerInfo> infos = new ArrayList<>();
+ final long idleDurationMillis;
synchronized (this) {
+ idleDurationMillis = workerMap.isEmpty() ?
sinceLastWorkerFinished.millisElapsed() : 0L;
for (final Map.Entry<String, WorkerHolder> entry : workerMap.entrySet())
{
final String queryId = entry.getKey();
final WorkerHolder workerHolder = entry.getValue();
@@ -211,7 +220,22 @@ public class DartWorkerRunner
}
}
- return new GetWorkersResponse(infos);
+ return new GetWorkersResponse(infos, idleDurationMillis);
+ }
+
+ /**
+ * Sets the finish time of the last worker to the current time. Used to track
+ * the duration for which this runner has been idle.
+ */
+ public void resetWorkerFinishTime()
+ {
+ synchronized (this) {
+ if (sinceLastWorkerFinished.isRunning()) {
+ sinceLastWorkerFinished.restart();
+ } else {
+ sinceLastWorkerFinished.start();
+ }
+ }
}
@LifecycleStart
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
index 0fa28a4ef17..58e5969185b 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
@@ -31,10 +31,15 @@ import java.util.Objects;
public class GetWorkersResponse
{
private final List<DartWorkerInfo> workers;
+ private final long idleDurationMillis;
- public GetWorkersResponse(@JsonProperty("workers") final
List<DartWorkerInfo> workers)
+ public GetWorkersResponse(
+ @JsonProperty("workers") final List<DartWorkerInfo> workers,
+ @JsonProperty("idleDurationMillis") final long idleDurationMillis
+ )
{
this.workers = workers;
+ this.idleDurationMillis = idleDurationMillis;
}
@JsonProperty
@@ -43,6 +48,12 @@ public class GetWorkersResponse
return workers;
}
+ @JsonProperty
+ public long getIdleDurationMillis()
+ {
+ return idleDurationMillis;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -53,12 +64,13 @@ public class GetWorkersResponse
return false;
}
GetWorkersResponse that = (GetWorkersResponse) o;
- return Objects.equals(workers, that.workers);
+ return Objects.equals(workers, that.workers)
+ && idleDurationMillis == that.idleDurationMillis;
}
@Override
public int hashCode()
{
- return Objects.hashCode(workers);
+ return Objects.hash(workers, idleDurationMillis);
}
}
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
index 0cc784cf36d..06452007d51 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
@@ -34,13 +34,12 @@ import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
+import org.apache.druid.server.http.ServletResourceUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
@@ -109,33 +108,13 @@ public class WorkerResource
asyncContext.setTimeout(GET_CHANNEL_DATA_TIMEOUT);
asyncContext.addListener(
- new AsyncListener()
- {
- @Override
- public void onComplete(AsyncEvent event)
- {
- }
-
- @Override
- public void onTimeout(AsyncEvent event)
- {
- if (responseResolved.compareAndSet(false, true)) {
- HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
- response.setStatus(HttpServletResponse.SC_OK);
- event.getAsyncContext().complete();
- }
- }
-
- @Override
- public void onError(AsyncEvent event)
- {
- }
-
- @Override
- public void onStartAsync(AsyncEvent event)
- {
+ ServletResourceUtils.createAsyncTimeoutListener(event -> {
+ if (responseResolved.compareAndSet(false, true)) {
+ HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
+ response.setStatus(HttpServletResponse.SC_OK);
+ event.getAsyncContext().complete();
}
- }
+ })
);
// Save these items, since "req" becomes inaccessible in future exception
handlers.
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
index 9ef3027c471..7bf3c53ca96 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
@@ -154,7 +154,7 @@ public class DartWorkerRunnerTest
public void test_getWorkersResponse_empty()
{
final GetWorkersResponse workersResponse =
workerRunner.getWorkersResponse();
- Assertions.assertEquals(new GetWorkersResponse(Collections.emptyList()),
workersResponse);
+ Assertions.assertEquals(new GetWorkersResponse(Collections.emptyList(),
0L), workersResponse);
}
@Test
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
index f516077a575..95444e9a064 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
@@ -43,7 +43,8 @@ public class GetWorkersResponseTest
"localhost:8101",
DateTimes.of("2000")
)
- )
+ ),
+ 0L
);
final GetWorkersResponse response2 =
jsonMapper.readValue(jsonMapper.writeValueAsBytes(response),
GetWorkersResponse.class);
diff --git
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 44ce7ff9824..2139650b003 100644
---
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -22,6 +22,7 @@ package org.apache.druid.discovery;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.DruidNode;
@@ -50,8 +51,8 @@ public abstract class DruidNodeDiscoveryProvider
ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
);
- private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery>
serviceDiscoveryMap =
- new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size());
+ private final ConcurrentHashMap<ServiceAndRoles, ServiceDruidNodeDiscovery>
serviceDiscoveryMap =
+ new ConcurrentHashMap<>(10);
public abstract BooleanSupplier getForNode(DruidNode node, NodeRole
nodeRole);
@@ -63,15 +64,26 @@ public abstract class DruidNodeDiscoveryProvider
*/
public DruidNodeDiscovery getForService(String serviceName)
{
- return serviceDiscoveryMap.computeIfAbsent(
+ return getForServiceAndRoles(
serviceName,
- service -> {
+ DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName)
+ );
+ }
- Set<NodeRole> nodeRolesToWatch =
DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service);
- if (nodeRolesToWatch == null) {
- throw new IAE("Unknown service [%s].", service);
+ /**
+ * Get DruidNodeDiscovery instance to discover nodes of a specific role that
+ * announce the given service in their metadata.
+ */
+ public DruidNodeDiscovery getForServiceAndRoles(String serviceName,
Set<NodeRole> nodeRolesToWatch)
+ {
+ return serviceDiscoveryMap.computeIfAbsent(
+ new ServiceAndRoles(serviceName, nodeRolesToWatch),
+ serviceAndRoles -> {
+ if (nodeRolesToWatch == null || nodeRolesToWatch.isEmpty()) {
+ throw InvalidInput.exception("No node role specified to watch for
service[%s].", serviceName);
}
- ServiceDruidNodeDiscovery serviceDiscovery = new
ServiceDruidNodeDiscovery(service, nodeRolesToWatch.size());
+ ServiceDruidNodeDiscovery serviceDiscovery =
+ new ServiceDruidNodeDiscovery(serviceName,
nodeRolesToWatch.size());
DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
serviceDiscovery.filteringUpstreamListener();
for (NodeRole nodeRole : nodeRolesToWatch) {
@@ -82,6 +94,14 @@ public abstract class DruidNodeDiscoveryProvider
);
}
+ /**
+ * Record containing serviceName and nodeRoles, used as a key in {@link
#serviceDiscoveryMap}.
+ */
+ private record ServiceAndRoles(String serviceName, Set<NodeRole> nodeRoles)
+ {
+
+ }
+
private static class ServiceDruidNodeDiscovery implements DruidNodeDiscovery
{
private static final Logger log = new
Logger(ServiceDruidNodeDiscovery.class);
diff --git
a/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
b/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
index 550fa64c89c..c5ec4f1b333 100644
---
a/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
+++
b/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
@@ -30,10 +30,9 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.messages.MessageBatch;
import org.apache.druid.messages.client.MessageListener;
import org.apache.druid.messages.client.MessageRelayClient;
+import org.apache.druid.server.http.ServletResourceUtils;
import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.GET;
@@ -113,34 +112,14 @@ public class MessageRelayResource<MessageType>
final AsyncContext asyncContext = req.startAsync();
asyncContext.setTimeout(GET_MESSAGES_TIMEOUT);
asyncContext.addListener(
- new AsyncListener()
- {
- @Override
- public void onComplete(AsyncEvent event)
- {
- }
-
- @Override
- public void onTimeout(AsyncEvent event)
- {
- if (didRespond.compareAndSet(false, true)) {
- HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
- response.setStatus(HttpServletResponse.SC_NO_CONTENT);
- event.getAsyncContext().complete();
- batchFuture.cancel(true);
- }
- }
-
- @Override
- public void onError(AsyncEvent event)
- {
- }
-
- @Override
- public void onStartAsync(AsyncEvent event)
- {
+ ServletResourceUtils.createAsyncTimeoutListener(event -> {
+ if (didRespond.compareAndSet(false, true)) {
+ HttpServletResponse response = (HttpServletResponse)
asyncContext.getResponse();
+ response.setStatus(HttpServletResponse.SC_NO_CONTENT);
+ event.getAsyncContext().complete();
+ batchFuture.cancel(true);
}
- }
+ })
);
// Save these items, since "req" becomes inaccessible in future exception
handlers.
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
index 7a4017e333c..0bd9ec1cc83 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
@@ -35,6 +35,7 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ExecutorService;
/**
@@ -141,7 +142,7 @@ public class ChangeRequestHistory<T>
Counter lastCounter = getLastCounter();
if (counter.counter == lastCounter.counter) {
- if (!counter.matches(lastCounter)) {
+ if (!counter.equals(lastCounter)) {
ChangeRequestsSnapshot<T> reset = ChangeRequestsSnapshot.fail(
StringUtils.format("counter[%s] failed to match with [%s]",
counter, lastCounter)
);
@@ -170,7 +171,7 @@ public class ChangeRequestHistory<T>
if (counter.counter == lastCounter.counter) {
// We don't want to trigger a counter reset if the client counter
matches the last counter! Return an empty list
// of changes instead.
- if (counter.matches(lastCounter)) {
+ if (counter.equals(lastCounter)) {
return ChangeRequestsSnapshot.success(counter, new ArrayList<>());
} else {
return ChangeRequestsSnapshot.fail(
@@ -199,7 +200,7 @@ public class ChangeRequestHistory<T>
int changeStartIndex = (int) (counter.counter + changes.size() -
lastCounter.counter);
Counter counterToMatch = counter.counter == 0 ? Counter.ZERO :
changes.get(changeStartIndex - 1).counter;
- if (!counterToMatch.matches(counter)) {
+ if (!counterToMatch.equals(counter)) {
return ChangeRequestsSnapshot.fail(
StringUtils.format(
"counter[%s] failed to match with [%s]",
@@ -296,9 +297,23 @@ public class ChangeRequestHistory<T>
return new Counter(counter + 1);
}
- public boolean matches(Counter other)
+ @Override
+ public boolean equals(Object object)
+ {
+ if (this == object) {
+ return true;
+ }
+ if (object == null || getClass() != object.getClass()) {
+ return false;
+ }
+ Counter other = (Counter) object;
+ return counter == other.counter && hash == other.hash;
+ }
+
+ @Override
+ public int hashCode()
{
- return this.counter == other.counter && this.hash == other.hash;
+ return Objects.hash(counter, hash);
}
@Override
diff --git
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 25e82382822..9a402fddf3e 100644
---
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -43,8 +43,6 @@ import
org.apache.druid.server.http.security.StateResourceFilter;
import javax.annotation.Nullable;
import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
@@ -146,32 +144,11 @@ public class SegmentListerResource
final AsyncContext asyncContext = req.startAsync();
asyncContext.addListener(
- new AsyncListener()
- {
- @Override
- public void onComplete(AsyncEvent event)
- {
- }
-
- @Override
- public void onTimeout(AsyncEvent event)
- {
-
- // HTTP 204 NO_CONTENT is sent to the client.
- future.cancel(true);
- event.getAsyncContext().complete();
- }
-
- @Override
- public void onError(AsyncEvent event)
- {
- }
-
- @Override
- public void onStartAsync(AsyncEvent event)
- {
- }
- }
+ ServletResourceUtils.createAsyncTimeoutListener(event -> {
+ // HTTP 204 NO_CONTENT is sent to the client.
+ future.cancel(true);
+ event.getAsyncContext().complete();
+ })
);
Futures.addCallback(
@@ -260,32 +237,11 @@ public class SegmentListerResource
final AsyncContext asyncContext = req.startAsync();
asyncContext.addListener(
- new AsyncListener()
- {
- @Override
- public void onComplete(AsyncEvent event)
- {
- }
-
- @Override
- public void onTimeout(AsyncEvent event)
- {
-
- // HTTP 204 NO_CONTENT is sent to the client.
- future.cancel(true);
- event.getAsyncContext().complete();
- }
-
- @Override
- public void onError(AsyncEvent event)
- {
- }
-
- @Override
- public void onStartAsync(AsyncEvent event)
- {
- }
- }
+ ServletResourceUtils.createAsyncTimeoutListener(event -> {
+ // HTTP 204 NO_CONTENT is sent to the client.
+ future.cancel(true);
+ event.getAsyncContext().complete();
+ })
);
Futures.addCallback(
diff --git
a/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
b/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
index 91c59304bfb..bce178029b4 100644
---
a/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
+++
b/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
@@ -30,9 +30,12 @@ import org.apache.druid.rpc.HttpResponseException;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.util.Map;
+import java.util.function.Consumer;
import java.util.function.Supplier;
public class ServletResourceUtils
@@ -112,4 +115,40 @@ public class ServletResourceUtils
throw new RuntimeException(e);
}
+
+ /**
+ * Creates an {@link AsyncListener} which performs the given action on the
event
+ * in case of a {@link AsyncListener#onTimeout}. The other actions performed
+ * by the listener viz. {@link AsyncListener#onStartAsync}, {@link
AsyncListener#onComplete}
+ * and {@link AsyncListener#onError} are noop.
+ */
+ public static AsyncListener createAsyncTimeoutListener(Consumer<AsyncEvent>
onTimeoutHandler)
+ {
+ return new AsyncListener()
+ {
+ @Override
+ public void onComplete(AsyncEvent asyncEvent)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void onTimeout(AsyncEvent asyncEvent)
+ {
+ onTimeoutHandler.accept(asyncEvent);
+ }
+
+ @Override
+ public void onError(AsyncEvent asyncEvent)
+ {
+ // do nothing
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent asyncEvent)
+ {
+ // do nothing
+ }
+ };
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/http/SegmentListerResourceTest.java
b/server/src/test/java/org/apache/druid/server/http/SegmentListerResourceTest.java
new file mode 100644
index 00000000000..86decda3597
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/http/SegmentListerResourceTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
+import org.apache.druid.server.coordination.ChangeRequestHistory;
+import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.mocks.MockAsyncContext;
+import org.apache.druid.server.mocks.MockHttpServletRequest;
+import org.apache.druid.server.mocks.MockHttpServletResponse;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SegmentListerResourceTest
+{
+ private static final List<DataSegment> SEGMENTS = CreateDataSegments
+ .ofDatasource(TestDataSource.WIKI)
+ .withNumPartitions(10)
+ .eachOfSizeInMb(100);
+
+ private SegmentListerResource segmentListerResource;
+ private BatchDataSegmentAnnouncer segmentAnnouncer;
+
+ @Before
+ public void setup()
+ {
+ final SegmentLoadDropHandler loadDropHandler =
Mockito.mock(SegmentLoadDropHandler.class);
+ this.segmentAnnouncer = Mockito.mock(BatchDataSegmentAnnouncer.class);
+ this.segmentListerResource = new SegmentListerResource(
+ TestHelper.JSON_MAPPER,
+ TestHelper.makeSmileMapper(),
+ segmentAnnouncer,
+ loadDropHandler
+ );
+ }
+
+ @Test
+ public void test_getSegments_returnsAllSegments_onFirstRequest() throws
Exception
+ {
+ Mockito.when(
+ segmentAnnouncer.getSegmentChangesSince(new
ChangeRequestHistory.Counter(0, 0))
+ ).thenReturn(
+ Futures.immediateFuture(
+ ChangeRequestsSnapshot.success(
+ new ChangeRequestHistory.Counter(0, 0),
+
SEGMENTS.stream().map(SegmentChangeRequestLoad::new).collect(Collectors.toList())
+ )
+ )
+ );
+
+ final MockHttpServletResponse response = new MockHttpServletResponse();
+ final HttpServletRequest request = createMockRequest(response);
+
+ segmentListerResource.getSegments(0, 0, 10L, request);
+
+ Assertions.assertEquals(response.getStatus(), 200);
+
+ final ChangeRequestsSnapshot<DataSegmentChangeRequest> responsePayload =
getResponsePayload(response);
+ Assertions.assertEquals(
+ responsePayload.getCounter(),
+ new ChangeRequestHistory.Counter(0, 0)
+ );
+ Assertions.assertNotNull(responsePayload.getRequests());
+ Assertions.assertEquals(
+ SEGMENTS.size(),
+ responsePayload.getRequests().size()
+ );
+ }
+
+ private ChangeRequestsSnapshot<DataSegmentChangeRequest> getResponsePayload(
+ MockHttpServletResponse response
+ ) throws IOException
+ {
+ return TestHelper.JSON_MAPPER.readValue(
+ response.baos.toByteArray(),
+ new TypeReference<>() {}
+ );
+ }
+
+ private HttpServletRequest createMockRequest(MockHttpServletResponse
response)
+ {
+ final MockHttpServletRequest request = new MockHttpServletRequest();
+
+ final MockAsyncContext asyncContext = new MockAsyncContext();
+ asyncContext.request = request;
+ asyncContext.response = response;
+
+ request.asyncContextSupplier = () -> asyncContext;
+ return request;
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
b/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
index ddbc3d4376f..966b9aa3487 100644
---
a/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
+++
b/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
@@ -32,9 +32,13 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
import javax.ws.rs.core.Response;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
public class ServletResourceUtilsTest
{
@@ -102,4 +106,29 @@ public class ServletResourceUtilsTest
() -> ServletResourceUtils.getDefaultValueIfCauseIs404ElseThrow(new
ISE(""), () -> "abc")
);
}
+
+ @Test
+ public void test_createAsyncTimeoutListener() throws Exception
+ {
+ final AtomicInteger timeoutCount = new AtomicInteger(0);
+ final AsyncListener listener =
ServletResourceUtils.createAsyncTimeoutListener(
+ event -> timeoutCount.incrementAndGet()
+ );
+
+ // Verify that onTimeout updates the count
+ final AsyncEvent event = Mockito.mock(AsyncEvent.class);
+ listener.onTimeout(event);
+ Assert.assertEquals(1, timeoutCount.get());
+ listener.onTimeout(event);
+ Assert.assertEquals(2, timeoutCount.get());
+
+ // Verify that other actions on the listener are noop
+ timeoutCount.set(0);
+ listener.onStartAsync(event);
+ listener.onComplete(event);
+ listener.onError(event);
+
+ Assert.assertEquals(0, timeoutCount.get());
+ Mockito.verifyNoInteractions(event);
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
b/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
index 4bdc56747d5..babbc909214 100644
--- a/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
+++ b/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
@@ -39,6 +39,7 @@ public class MockAsyncContext implements AsyncContext
public ServletResponse response;
private final AtomicBoolean completed = new AtomicBoolean();
+ private long timeout;
@Override
public ServletRequest getRequest()
@@ -104,7 +105,8 @@ public class MockAsyncContext implements AsyncContext
@Override
public void addListener(AsyncListener listener)
{
- throw new UnsupportedOperationException();
+ // Do not throw an UnsupportedOperationException since some tests need to
add listeners
+ // Add an implementation here if listeners need to be invoked in tests
}
@Override
@@ -126,12 +128,12 @@ public class MockAsyncContext implements AsyncContext
@Override
public void setTimeout(long timeout)
{
- throw new UnsupportedOperationException();
+ this.timeout = timeout;
}
@Override
public long getTimeout()
{
- throw new UnsupportedOperationException();
+ return timeout;
}
}
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index edbef9207bb..257533aecbd 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -41,12 +41,10 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.metadata.Metric;
import org.apache.druid.server.metrics.LatchableEmitter;
-import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
@@ -148,24 +146,7 @@ public class EmbeddedClusterApis implements
EmbeddedResource
*/
public String runSql(String sql, Object... args)
{
- try {
- return onAnyBroker(
- b -> b.submitSqlQuery(
- new ClientSqlQuery(
- StringUtils.format(sql, args),
- ResultFormat.CSV.name(),
- false,
- false,
- false,
- null,
- null
- )
- )
- ).trim();
- }
- catch (Exception e) {
- throw new RuntimeException(e);
- }
+ return client.runSql(sql, args);
}
/**
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
index 90e2eb90048..2621751b0b7 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -33,10 +33,12 @@ import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.annotations.EscalatedGlobal;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.query.http.ClientSqlQuery;
import org.apache.druid.rpc.FixedServiceLocator;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
@@ -48,6 +50,7 @@ import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.guice.ServiceClientModule;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.http.ResultFormat;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
@@ -223,6 +226,34 @@ public class EmbeddedServiceClient
);
}
+ /**
+ * Submits the given SQL query to any of the brokers (using {@code
BrokerClient})
+ * of the cluster.
+ *
+ * @return The result of the SQL as a single CSV string.
+ */
+ public String runSql(String sql, Object... args)
+ {
+ try {
+ return onAnyBroker(
+ b -> b.submitSqlQuery(
+ new ClientSqlQuery(
+ StringUtils.format(sql, args),
+ ResultFormat.CSV.name(),
+ false,
+ false,
+ false,
+ null,
+ null
+ )
+ )
+ ).trim();
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Nullable
private <T> T makeRequest(
Function<ObjectMapper, RequestBuilder> request,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]