This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 068e5720dba Improve extensibility of MSQ Dart engine via extensions 
(#19127)
068e5720dba is described below

commit 068e5720dba1c5e1c8528008250e6686d2062767
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Mar 17 15:15:32 2026 +0530

    Improve extensibility of MSQ Dart engine via extensions (#19127)
    
    This is a collection of minor refactors that help extensions use and 
override certain Dart capabilities.
    
    Changes:
    - Add utility method `ServletResourceUtils.createAsyncTimeoutListener`
    - Track idle duration of a Dart worker
    - Fix visibility of some methods and classes
    - Add equals and hashCode for `ChangeRequestHistory.Counter`
    - Add `SegmentListerResourceTest`
    - Add test method to `ServletResourceUtilsTest`
---
 .../worker/http/TaskManagementResource.java        |  34 +-----
 .../msq/dart/worker/DartWorkerClientImpl.java      |   4 +-
 .../dart/worker/DartWorkerContextFactoryImpl.java  |   7 +-
 .../druid/msq/dart/worker/DartWorkerRunner.java    |  26 ++++-
 .../msq/dart/worker/http/GetWorkersResponse.java   |  18 ++-
 .../org/apache/druid/msq/rpc/WorkerResource.java   |  35 ++----
 .../msq/dart/worker/DartWorkerRunnerTest.java      |   2 +-
 .../dart/worker/http/GetWorkersResponseTest.java   |   3 +-
 .../discovery/DruidNodeDiscoveryProvider.java      |  36 ++++--
 .../messages/server/MessageRelayResource.java      |  37 ++----
 .../server/coordination/ChangeRequestHistory.java  |  25 ++++-
 .../druid/server/http/SegmentListerResource.java   |  64 ++---------
 .../druid/server/http/ServletResourceUtils.java    |  39 +++++++
 .../server/http/SegmentListerResourceTest.java     | 124 +++++++++++++++++++++
 .../server/http/ServletResourceUtilsTest.java      |  29 +++++
 .../druid/server/mocks/MockAsyncContext.java       |   8 +-
 .../testing/embedded/EmbeddedClusterApis.java      |  21 +---
 .../testing/embedded/EmbeddedServiceClient.java    |  31 ++++++
 18 files changed, 359 insertions(+), 184 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
index dd9316a7a7d..5cb22cbc9e2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java
@@ -36,11 +36,10 @@ import org.apache.druid.indexing.worker.WorkerTaskManager;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordination.ChangeRequestHistory;
 import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
+import org.apache.druid.server.http.ServletResourceUtils;
 import org.apache.druid.server.http.security.StateResourceFilter;
 
 import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
@@ -129,32 +128,11 @@ public class TaskManagementResource
     final AsyncContext asyncContext = req.startAsync();
 
     asyncContext.addListener(
-        new AsyncListener()
-        {
-          @Override
-          public void onComplete(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onTimeout(AsyncEvent event)
-          {
-
-            // HTTP 204 NO_CONTENT is sent to the client.
-            future.cancel(true);
-            event.getAsyncContext().complete();
-          }
-
-          @Override
-          public void onError(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onStartAsync(AsyncEvent event)
-          {
-          }
-        }
+        ServletResourceUtils.createAsyncTimeoutListener(event -> {
+          // HTTP 204 NO_CONTENT is sent to the client.
+          future.cancel(true);
+          event.getAsyncContext().complete();
+        })
     );
 
     Futures.addCallback(
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
index 3dd25e7ac14..e1de222e1e0 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerClientImpl.java
@@ -176,12 +176,12 @@ public class DartWorkerClientImpl extends 
BaseWorkerClientImpl implements DartWo
   /**
    * Service client that adds the {@link 
DartWorkerResource#HEADER_CONTROLLER_HOST} header.
    */
-  private static class ControllerDecoratedClient implements ServiceClient
+  public static class ControllerDecoratedClient implements ServiceClient
   {
     private final ServiceClient delegate;
     private final String controllerHost;
 
-    ControllerDecoratedClient(final ServiceClient delegate, final String 
controllerHost)
+    public ControllerDecoratedClient(final ServiceClient delegate, final 
String controllerHost)
     {
       this.delegate = delegate;
       this.controllerHost = controllerHost;
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
index 6f473166379..07117c80f19 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerContextFactoryImpl.java
@@ -120,7 +120,7 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
         jsonMapper,
         policyEnforcer,
         injector,
-        new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, 
null),
+        createWorkerClient(queryId),
         processingConfig,
         segmentWrangler,
         groupingEngine,
@@ -135,4 +135,9 @@ public class DartWorkerContextFactoryImpl implements 
DartWorkerContextFactory
         emitter
     );
   }
+
+  protected DartWorkerClient createWorkerClient(String queryId)
+  {
+    return new DartWorkerClientImpl(queryId, serviceClientFactory, 
smileMapper, null);
+  }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
index fecc4ec7b2f..205d19ac6b1 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java
@@ -30,6 +30,7 @@ import org.apache.druid.error.DruidException;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Stopwatch;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -72,6 +73,11 @@ public class DartWorkerRunner
   @GuardedBy("this")
   private final Set<String> activeControllerHosts = new HashSet<>();
 
+  /**
+   * Used to track the time since this runner has been idle.
+   */
+  private final Stopwatch sinceLastWorkerFinished = 
Stopwatch.createUnstarted();
+
   /**
    * Query ID -> Worker instance.
    */
@@ -147,6 +153,7 @@ public class DartWorkerRunner
                   holder,
                   e -> log.warn(e, "Failed to close worker[%s]", 
holder.worker.id())
               );
+              resetWorkerFinishTime();
               this.notifyAll();
             }
           },
@@ -196,7 +203,9 @@ public class DartWorkerRunner
   {
     final List<DartWorkerInfo> infos = new ArrayList<>();
 
+    final long idleDurationMillis;
     synchronized (this) {
+      idleDurationMillis = workerMap.isEmpty() ? 
sinceLastWorkerFinished.millisElapsed() : 0L;
       for (final Map.Entry<String, WorkerHolder> entry : workerMap.entrySet()) 
{
         final String queryId = entry.getKey();
         final WorkerHolder workerHolder = entry.getValue();
@@ -211,7 +220,22 @@ public class DartWorkerRunner
       }
     }
 
-    return new GetWorkersResponse(infos);
+    return new GetWorkersResponse(infos, idleDurationMillis);
+  }
+
+  /**
+   * Sets the finish time of the last worker to the current time. Used to track
+   * the duration for which this runner has been idle.
+   */
+  public void resetWorkerFinishTime()
+  {
+    synchronized (this) {
+      if (sinceLastWorkerFinished.isRunning()) {
+        sinceLastWorkerFinished.restart();
+      } else {
+        sinceLastWorkerFinished.start();
+      }
+    }
   }
 
   @LifecycleStart
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
index 0fa28a4ef17..58e5969185b 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponse.java
@@ -31,10 +31,15 @@ import java.util.Objects;
 public class GetWorkersResponse
 {
   private final List<DartWorkerInfo> workers;
+  private final long idleDurationMillis;
 
-  public GetWorkersResponse(@JsonProperty("workers") final 
List<DartWorkerInfo> workers)
+  public GetWorkersResponse(
+      @JsonProperty("workers") final List<DartWorkerInfo> workers,
+      @JsonProperty("idleDurationMillis") final long idleDurationMillis
+  )
   {
     this.workers = workers;
+    this.idleDurationMillis = idleDurationMillis;
   }
 
   @JsonProperty
@@ -43,6 +48,12 @@ public class GetWorkersResponse
     return workers;
   }
 
+  @JsonProperty
+  public long getIdleDurationMillis()
+  {
+    return idleDurationMillis;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -53,12 +64,13 @@ public class GetWorkersResponse
       return false;
     }
     GetWorkersResponse that = (GetWorkersResponse) o;
-    return Objects.equals(workers, that.workers);
+    return Objects.equals(workers, that.workers)
+           && idleDurationMillis == that.idleDurationMillis;
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hashCode(workers);
+    return Objects.hash(workers, idleDurationMillis);
   }
 }
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java 
b/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
index 0cc784cf36d..06452007d51 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/WorkerResource.java
@@ -34,13 +34,12 @@ import org.apache.druid.msq.kernel.StageId;
 import org.apache.druid.msq.kernel.WorkOrder;
 import org.apache.druid.msq.statistics.ClusterByStatisticsSnapshot;
 import org.apache.druid.msq.statistics.serde.ClusterByStatisticsSnapshotSerde;
+import org.apache.druid.server.http.ServletResourceUtils;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.utils.CloseableUtils;
 
 import javax.annotation.Nullable;
 import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
@@ -109,33 +108,13 @@ public class WorkerResource
 
     asyncContext.setTimeout(GET_CHANNEL_DATA_TIMEOUT);
     asyncContext.addListener(
-        new AsyncListener()
-        {
-          @Override
-          public void onComplete(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onTimeout(AsyncEvent event)
-          {
-            if (responseResolved.compareAndSet(false, true)) {
-              HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
-              response.setStatus(HttpServletResponse.SC_OK);
-              event.getAsyncContext().complete();
-            }
-          }
-
-          @Override
-          public void onError(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onStartAsync(AsyncEvent event)
-          {
+        ServletResourceUtils.createAsyncTimeoutListener(event -> {
+          if (responseResolved.compareAndSet(false, true)) {
+            HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
+            response.setStatus(HttpServletResponse.SC_OK);
+            event.getAsyncContext().complete();
           }
-        }
+        })
     );
 
     // Save these items, since "req" becomes inaccessible in future exception 
handlers.
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
index 9ef3027c471..7bf3c53ca96 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/DartWorkerRunnerTest.java
@@ -154,7 +154,7 @@ public class DartWorkerRunnerTest
   public void test_getWorkersResponse_empty()
   {
     final GetWorkersResponse workersResponse = 
workerRunner.getWorkersResponse();
-    Assertions.assertEquals(new GetWorkersResponse(Collections.emptyList()), 
workersResponse);
+    Assertions.assertEquals(new GetWorkersResponse(Collections.emptyList(), 
0L), workersResponse);
   }
 
   @Test
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
index f516077a575..95444e9a064 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/dart/worker/http/GetWorkersResponseTest.java
@@ -43,7 +43,8 @@ public class GetWorkersResponseTest
                 "localhost:8101",
                 DateTimes.of("2000")
             )
-        )
+        ),
+        0L
     );
     final GetWorkersResponse response2 =
         jsonMapper.readValue(jsonMapper.writeValueAsBytes(response), 
GetWorkersResponse.class);
diff --git 
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
 
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
index 44ce7ff9824..2139650b003 100644
--- 
a/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
+++ 
b/server/src/main/java/org/apache/druid/discovery/DruidNodeDiscoveryProvider.java
@@ -22,6 +22,7 @@ package org.apache.druid.discovery;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.error.InvalidInput;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.DruidNode;
@@ -50,8 +51,8 @@ public abstract class DruidNodeDiscoveryProvider
       ImmutableSet.of(NodeRole.MIDDLE_MANAGER, NodeRole.INDEXER)
   );
 
-  private final ConcurrentHashMap<String, ServiceDruidNodeDiscovery> 
serviceDiscoveryMap =
-      new ConcurrentHashMap<>(SERVICE_TO_NODE_TYPES.size());
+  private final ConcurrentHashMap<ServiceAndRoles, ServiceDruidNodeDiscovery> 
serviceDiscoveryMap =
+      new ConcurrentHashMap<>(10);
 
   public abstract BooleanSupplier getForNode(DruidNode node, NodeRole 
nodeRole);
 
@@ -63,15 +64,26 @@ public abstract class DruidNodeDiscoveryProvider
    */
   public DruidNodeDiscovery getForService(String serviceName)
   {
-    return serviceDiscoveryMap.computeIfAbsent(
+    return getForServiceAndRoles(
         serviceName,
-        service -> {
+        DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(serviceName)
+    );
+  }
 
-          Set<NodeRole> nodeRolesToWatch = 
DruidNodeDiscoveryProvider.SERVICE_TO_NODE_TYPES.get(service);
-          if (nodeRolesToWatch == null) {
-            throw new IAE("Unknown service [%s].", service);
+  /**
+   * Get DruidNodeDiscovery instance to discover nodes of a specific role that
+   * announce the given service in their metadata.
+   */
+  public DruidNodeDiscovery getForServiceAndRoles(String serviceName, 
Set<NodeRole> nodeRolesToWatch)
+  {
+    return serviceDiscoveryMap.computeIfAbsent(
+        new ServiceAndRoles(serviceName, nodeRolesToWatch),
+        serviceAndRoles -> {
+          if (nodeRolesToWatch == null || nodeRolesToWatch.isEmpty()) {
+            throw InvalidInput.exception("No node role specified to watch for 
service[%s].", serviceName);
           }
-          ServiceDruidNodeDiscovery serviceDiscovery = new 
ServiceDruidNodeDiscovery(service, nodeRolesToWatch.size());
+          ServiceDruidNodeDiscovery serviceDiscovery =
+              new ServiceDruidNodeDiscovery(serviceName, 
nodeRolesToWatch.size());
           DruidNodeDiscovery.Listener filteringGatheringUpstreamListener =
               serviceDiscovery.filteringUpstreamListener();
           for (NodeRole nodeRole : nodeRolesToWatch) {
@@ -82,6 +94,14 @@ public abstract class DruidNodeDiscoveryProvider
     );
   }
 
+  /**
+   * Record containing serviceName and nodeRoles, used as a key in {@link 
#serviceDiscoveryMap}.
+   */
+  private record ServiceAndRoles(String serviceName, Set<NodeRole> nodeRoles)
+  {
+
+  }
+
   private static class ServiceDruidNodeDiscovery implements DruidNodeDiscovery
   {
     private static final Logger log = new 
Logger(ServiceDruidNodeDiscovery.class);
diff --git 
a/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
 
b/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
index 550fa64c89c..c5ec4f1b333 100644
--- 
a/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
+++ 
b/server/src/main/java/org/apache/druid/messages/server/MessageRelayResource.java
@@ -30,10 +30,9 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.messages.MessageBatch;
 import org.apache.druid.messages.client.MessageListener;
 import org.apache.druid.messages.client.MessageRelayClient;
+import org.apache.druid.server.http.ServletResourceUtils;
 
 import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.GET;
@@ -113,34 +112,14 @@ public class MessageRelayResource<MessageType>
     final AsyncContext asyncContext = req.startAsync();
     asyncContext.setTimeout(GET_MESSAGES_TIMEOUT);
     asyncContext.addListener(
-        new AsyncListener()
-        {
-          @Override
-          public void onComplete(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onTimeout(AsyncEvent event)
-          {
-            if (didRespond.compareAndSet(false, true)) {
-              HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
-              response.setStatus(HttpServletResponse.SC_NO_CONTENT);
-              event.getAsyncContext().complete();
-              batchFuture.cancel(true);
-            }
-          }
-
-          @Override
-          public void onError(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onStartAsync(AsyncEvent event)
-          {
+        ServletResourceUtils.createAsyncTimeoutListener(event -> {
+          if (didRespond.compareAndSet(false, true)) {
+            HttpServletResponse response = (HttpServletResponse) 
asyncContext.getResponse();
+            response.setStatus(HttpServletResponse.SC_NO_CONTENT);
+            event.getAsyncContext().complete();
+            batchFuture.cancel(true);
           }
-        }
+        })
     );
 
     // Save these items, since "req" becomes inaccessible in future exception 
handlers.
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
index 7a4017e333c..0bd9ec1cc83 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java
@@ -35,6 +35,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -141,7 +142,7 @@ public class ChangeRequestHistory<T>
     Counter lastCounter = getLastCounter();
 
     if (counter.counter == lastCounter.counter) {
-      if (!counter.matches(lastCounter)) {
+      if (!counter.equals(lastCounter)) {
         ChangeRequestsSnapshot<T> reset = ChangeRequestsSnapshot.fail(
             StringUtils.format("counter[%s] failed to match with [%s]", 
counter, lastCounter)
         );
@@ -170,7 +171,7 @@ public class ChangeRequestHistory<T>
     if (counter.counter == lastCounter.counter) {
       // We don't want to trigger a counter reset if the client counter 
matches the last counter! Return an empty list
       // of changes instead.
-      if (counter.matches(lastCounter)) {
+      if (counter.equals(lastCounter)) {
         return ChangeRequestsSnapshot.success(counter, new ArrayList<>());
       } else {
         return ChangeRequestsSnapshot.fail(
@@ -199,7 +200,7 @@ public class ChangeRequestHistory<T>
       int changeStartIndex = (int) (counter.counter + changes.size() - 
lastCounter.counter);
 
       Counter counterToMatch = counter.counter == 0 ? Counter.ZERO : 
changes.get(changeStartIndex - 1).counter;
-      if (!counterToMatch.matches(counter)) {
+      if (!counterToMatch.equals(counter)) {
         return ChangeRequestsSnapshot.fail(
             StringUtils.format(
                 "counter[%s] failed to match with [%s]",
@@ -296,9 +297,23 @@ public class ChangeRequestHistory<T>
       return new Counter(counter + 1);
     }
 
-    public boolean matches(Counter other)
+    @Override
+    public boolean equals(Object object)
+    {
+      if (this == object) {
+        return true;
+      }
+      if (object == null || getClass() != object.getClass()) {
+        return false;
+      }
+      Counter other = (Counter) object;
+      return counter == other.counter && hash == other.hash;
+    }
+
+    @Override
+    public int hashCode()
     {
-      return this.counter == other.counter && this.hash == other.hash;
+      return Objects.hash(counter, hash);
     }
 
     @Override
diff --git 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
index 25e82382822..9a402fddf3e 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java
@@ -43,8 +43,6 @@ import 
org.apache.druid.server.http.security.StateResourceFilter;
 
 import javax.annotation.Nullable;
 import javax.servlet.AsyncContext;
-import javax.servlet.AsyncEvent;
-import javax.servlet.AsyncListener;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.Consumes;
@@ -146,32 +144,11 @@ public class SegmentListerResource
     final AsyncContext asyncContext = req.startAsync();
 
     asyncContext.addListener(
-        new AsyncListener()
-        {
-          @Override
-          public void onComplete(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onTimeout(AsyncEvent event)
-          {
-
-            // HTTP 204 NO_CONTENT is sent to the client.
-            future.cancel(true);
-            event.getAsyncContext().complete();
-          }
-
-          @Override
-          public void onError(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onStartAsync(AsyncEvent event)
-          {
-          }
-        }
+        ServletResourceUtils.createAsyncTimeoutListener(event -> {
+          // HTTP 204 NO_CONTENT is sent to the client.
+          future.cancel(true);
+          event.getAsyncContext().complete();
+        })
     );
 
     Futures.addCallback(
@@ -260,32 +237,11 @@ public class SegmentListerResource
     final AsyncContext asyncContext = req.startAsync();
 
     asyncContext.addListener(
-        new AsyncListener()
-        {
-          @Override
-          public void onComplete(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onTimeout(AsyncEvent event)
-          {
-
-            // HTTP 204 NO_CONTENT is sent to the client.
-            future.cancel(true);
-            event.getAsyncContext().complete();
-          }
-
-          @Override
-          public void onError(AsyncEvent event)
-          {
-          }
-
-          @Override
-          public void onStartAsync(AsyncEvent event)
-          {
-          }
-        }
+        ServletResourceUtils.createAsyncTimeoutListener(event -> {
+          // HTTP 204 NO_CONTENT is sent to the client.
+          future.cancel(true);
+          event.getAsyncContext().complete();
+        })
     );
 
     Futures.addCallback(
diff --git 
a/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java 
b/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
index 91c59304bfb..bce178029b4 100644
--- 
a/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
+++ 
b/server/src/main/java/org/apache/druid/server/http/ServletResourceUtils.java
@@ -30,9 +30,12 @@ import org.apache.druid.rpc.HttpResponseException;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nullable;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 public class ServletResourceUtils
@@ -112,4 +115,40 @@ public class ServletResourceUtils
 
     throw new RuntimeException(e);
   }
+
+  /**
+   * Creates an {@link AsyncListener} which performs the given action on the 
event
+   * in case of a {@link AsyncListener#onTimeout}. The other actions performed
+   * by the listener viz. {@link AsyncListener#onStartAsync}, {@link 
AsyncListener#onComplete}
+   * and {@link AsyncListener#onError} are noop.
+   */
+  public static AsyncListener createAsyncTimeoutListener(Consumer<AsyncEvent> 
onTimeoutHandler)
+  {
+    return new AsyncListener()
+    {
+      @Override
+      public void onComplete(AsyncEvent asyncEvent)
+      {
+        // do nothing
+      }
+
+      @Override
+      public void onTimeout(AsyncEvent asyncEvent)
+      {
+        onTimeoutHandler.accept(asyncEvent);
+      }
+
+      @Override
+      public void onError(AsyncEvent asyncEvent)
+      {
+        // do nothing
+      }
+
+      @Override
+      public void onStartAsync(AsyncEvent asyncEvent)
+      {
+        // do nothing
+      }
+    };
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/http/SegmentListerResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/SegmentListerResourceTest.java
new file mode 100644
index 00000000000..86decda3597
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/http/SegmentListerResourceTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.http;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.util.concurrent.Futures;
+import org.apache.druid.segment.TestDataSource;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer;
+import org.apache.druid.server.coordination.ChangeRequestHistory;
+import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
+import org.apache.druid.server.coordination.DataSegmentChangeRequest;
+import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
+import org.apache.druid.server.coordination.SegmentLoadDropHandler;
+import org.apache.druid.server.coordinator.CreateDataSegments;
+import org.apache.druid.server.mocks.MockAsyncContext;
+import org.apache.druid.server.mocks.MockHttpServletRequest;
+import org.apache.druid.server.mocks.MockHttpServletResponse;
+import org.apache.druid.timeline.DataSegment;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+import org.mockito.Mockito;
+
+import javax.servlet.http.HttpServletRequest;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class SegmentListerResourceTest
+{
+  private static final List<DataSegment> SEGMENTS = CreateDataSegments
+      .ofDatasource(TestDataSource.WIKI)
+      .withNumPartitions(10)
+      .eachOfSizeInMb(100);
+
+  private SegmentListerResource segmentListerResource;
+  private BatchDataSegmentAnnouncer segmentAnnouncer;
+
+  @Before
+  public void setup()
+  {
+    final SegmentLoadDropHandler loadDropHandler = 
Mockito.mock(SegmentLoadDropHandler.class);
+    this.segmentAnnouncer = Mockito.mock(BatchDataSegmentAnnouncer.class);
+    this.segmentListerResource = new SegmentListerResource(
+        TestHelper.JSON_MAPPER,
+        TestHelper.makeSmileMapper(),
+        segmentAnnouncer,
+        loadDropHandler
+    );
+  }
+
+  @Test
+  public void test_getSegments_returnsAllSegments_onFirstRequest() throws 
Exception
+  {
+    Mockito.when(
+        segmentAnnouncer.getSegmentChangesSince(new 
ChangeRequestHistory.Counter(0, 0))
+    ).thenReturn(
+        Futures.immediateFuture(
+            ChangeRequestsSnapshot.success(
+                new ChangeRequestHistory.Counter(0, 0),
+                
SEGMENTS.stream().map(SegmentChangeRequestLoad::new).collect(Collectors.toList())
+            )
+        )
+    );
+
+    final MockHttpServletResponse response = new MockHttpServletResponse();
+    final HttpServletRequest request = createMockRequest(response);
+
+    segmentListerResource.getSegments(0, 0, 10L, request);
+
+    Assertions.assertEquals(response.getStatus(), 200);
+
+    final ChangeRequestsSnapshot<DataSegmentChangeRequest> responsePayload = 
getResponsePayload(response);
+    Assertions.assertEquals(
+        responsePayload.getCounter(),
+        new ChangeRequestHistory.Counter(0, 0)
+    );
+    Assertions.assertNotNull(responsePayload.getRequests());
+    Assertions.assertEquals(
+        SEGMENTS.size(),
+        responsePayload.getRequests().size()
+    );
+  }
+
+  private ChangeRequestsSnapshot<DataSegmentChangeRequest> getResponsePayload(
+      MockHttpServletResponse response
+  ) throws IOException
+  {
+    return TestHelper.JSON_MAPPER.readValue(
+        response.baos.toByteArray(),
+        new TypeReference<>() {}
+    );
+  }
+
+  private HttpServletRequest createMockRequest(MockHttpServletResponse 
response)
+  {
+    final MockHttpServletRequest request = new MockHttpServletRequest();
+
+    final MockAsyncContext asyncContext = new MockAsyncContext();
+    asyncContext.request = request;
+    asyncContext.response = response;
+
+    request.asyncContextSupplier = () -> asyncContext;
+    return request;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
 
b/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
index ddbc3d4376f..966b9aa3487 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/ServletResourceUtilsTest.java
@@ -32,9 +32,13 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 import org.jboss.netty.handler.codec.http.HttpVersion;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
 import javax.ws.rs.core.Response;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class ServletResourceUtilsTest
 {
@@ -102,4 +106,29 @@ public class ServletResourceUtilsTest
         () -> ServletResourceUtils.getDefaultValueIfCauseIs404ElseThrow(new 
ISE(""), () -> "abc")
     );
   }
+
+  @Test
+  public void test_createAsyncTimeoutListener() throws Exception
+  {
+    final AtomicInteger timeoutCount = new AtomicInteger(0);
+    final AsyncListener listener = 
ServletResourceUtils.createAsyncTimeoutListener(
+        event -> timeoutCount.incrementAndGet()
+    );
+
+    // Verify that onTimeout updates the count
+    final AsyncEvent event = Mockito.mock(AsyncEvent.class);
+    listener.onTimeout(event);
+    Assert.assertEquals(1, timeoutCount.get());
+    listener.onTimeout(event);
+    Assert.assertEquals(2, timeoutCount.get());
+
+    // Verify that other actions on the listener are noop
+    timeoutCount.set(0);
+    listener.onStartAsync(event);
+    listener.onComplete(event);
+    listener.onError(event);
+
+    Assert.assertEquals(0, timeoutCount.get());
+    Mockito.verifyNoInteractions(event);
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java 
b/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
index 4bdc56747d5..babbc909214 100644
--- a/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
+++ b/server/src/test/java/org/apache/druid/server/mocks/MockAsyncContext.java
@@ -39,6 +39,7 @@ public class MockAsyncContext implements AsyncContext
   public ServletResponse response;
 
   private final AtomicBoolean completed = new AtomicBoolean();
+  private long timeout;
 
   @Override
   public ServletRequest getRequest()
@@ -104,7 +105,8 @@ public class MockAsyncContext implements AsyncContext
   @Override
   public void addListener(AsyncListener listener)
   {
-    throw new UnsupportedOperationException();
+    // Do not throw an UnsupportedOperationException since some tests need to 
add listeners
+    // Add an implementation here if listeners need to be invoked in tests
   }
 
   @Override
@@ -126,12 +128,12 @@ public class MockAsyncContext implements AsyncContext
   @Override
   public void setTimeout(long timeout)
   {
-    throw new UnsupportedOperationException();
+    this.timeout = timeout;
   }
 
   @Override
   public long getTimeout()
   {
-    throw new UnsupportedOperationException();
+    return timeout;
   }
 }
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index edbef9207bb..257533aecbd 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -41,12 +41,10 @@ import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Comparators;
 import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.query.http.ClientSqlQuery;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.metadata.Metric;
 import org.apache.druid.server.metrics.LatchableEmitter;
-import org.apache.druid.sql.http.ResultFormat;
 import org.apache.druid.timeline.DataSegment;
 import org.joda.time.Interval;
 import org.joda.time.chrono.ISOChronology;
@@ -148,24 +146,7 @@ public class EmbeddedClusterApis implements 
EmbeddedResource
    */
   public String runSql(String sql, Object... args)
   {
-    try {
-      return onAnyBroker(
-          b -> b.submitSqlQuery(
-              new ClientSqlQuery(
-                  StringUtils.format(sql, args),
-                  ResultFormat.CSV.name(),
-                  false,
-                  false,
-                  false,
-                  null,
-                  null
-              )
-          )
-      ).trim();
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
+    return client.runSql(sql, args);
   }
 
   /**
diff --git 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
index 90e2eb90048..2621751b0b7 100644
--- 
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
+++ 
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedServiceClient.java
@@ -33,10 +33,12 @@ import org.apache.druid.common.guava.FutureUtils;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.annotations.EscalatedGlobal;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
 import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
+import org.apache.druid.query.http.ClientSqlQuery;
 import org.apache.druid.rpc.FixedServiceLocator;
 import org.apache.druid.rpc.RequestBuilder;
 import org.apache.druid.rpc.ServiceClient;
@@ -48,6 +50,7 @@ import org.apache.druid.rpc.StandardRetryPolicy;
 import org.apache.druid.rpc.guice.ServiceClientModule;
 import org.apache.druid.rpc.indexing.OverlordClient;
 import org.apache.druid.server.security.Escalator;
+import org.apache.druid.sql.http.ResultFormat;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nullable;
@@ -223,6 +226,34 @@ public class EmbeddedServiceClient
     );
   }
 
+  /**
+   * Submits the given SQL query to any of the brokers (using {@code 
BrokerClient})
+   * of the cluster.
+   *
+   * @return The result of the SQL as a single CSV string.
+   */
+  public String runSql(String sql, Object... args)
+  {
+    try {
+      return onAnyBroker(
+          b -> b.submitSqlQuery(
+              new ClientSqlQuery(
+                  StringUtils.format(sql, args),
+                  ResultFormat.CSV.name(),
+                  false,
+                  false,
+                  false,
+                  null,
+                  null
+              )
+          )
+      ).trim();
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @Nullable
   private <T> T makeRequest(
       Function<ObjectMapper, RequestBuilder> request,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to