This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 60e34876ef3 feat: unnest and join makeCursorHolderAsync (#19600)
60e34876ef3 is described below

commit 60e34876ef37f6879344c1ca881176c61d4bd99e
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jun 24 13:11:37 2026 -0700

    feat: unnest and join makeCursorHolderAsync (#19600)
---
 .../columnar/ColumnarFrameCursorFactory.java       |   3 +-
 .../frame/segment/row/RowFrameCursorFactory.java   |   3 +-
 .../org/apache/druid/segment/CursorFactory.java    |  17 +-
 .../druid/segment/QueryableIndexCursorFactory.java |   2 +-
 .../druid/segment/ResidentCursorFactory.java       |  39 +++
 .../druid/segment/RowBasedCursorFactory.java       |   2 +-
 .../apache/druid/segment/UnnestCursorFactory.java  |  79 ++++-
 .../incremental/IncrementalIndexCursorFactory.java |   4 +-
 .../segment/join/HashJoinSegmentCursorFactory.java | 338 ++++++++++++++-------
 .../loading/TombstoneSegmentizerFactory.java       |   3 +-
 .../druid/segment/DeferredCursorFactory.java       |  80 +++++
 .../druid/segment/UnnestCursorFactoryTest.java     |  56 ++++
 .../join/HashJoinSegmentCursorFactoryTest.java     |  75 +++++
 13 files changed, 560 insertions(+), 141 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
index 6dba57245fc..fd9ffc15e2f 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
@@ -40,6 +40,7 @@ import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.QueryableIndexColumnSelectorFactory;
+import org.apache.druid.segment.ResidentCursorFactory;
 import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.SimpleSettableOffset;
 import org.apache.druid.segment.VirtualColumns;
@@ -64,7 +65,7 @@ import java.util.List;
  *
  * @see RowFrameCursorFactory the row-based version
  */
-public class ColumnarFrameCursorFactory implements CursorFactory
+public class ColumnarFrameCursorFactory implements ResidentCursorFactory
 {
   private final Frame frame;
   private final RowSignature signature;
diff --git 
a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
index 93523893c8c..4f72640e95c 100644
--- 
a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
@@ -33,6 +33,7 @@ import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.ResidentCursorFactory;
 import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.SimpleSettableOffset;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -49,7 +50,7 @@ import java.util.List;
  *
  * @see ColumnarFrameCursorFactory the columnar version
  */
-public class RowFrameCursorFactory implements CursorFactory
+public class RowFrameCursorFactory implements ResidentCursorFactory
 {
   private final Frame frame;
   private final FrameReader frameReader;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java 
b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
index 4a79490adc3..486625bb7f3 100644
--- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.segment;
 
+import org.apache.druid.error.DruidException;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.RowSignature;
 
@@ -35,14 +36,22 @@ public interface CursorFactory extends ColumnInspector
   /**
    * Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for 
cursor factories that may need to do I/O
    * (e.g. download column data from deep storage) before they can serve a 
cursor. Callers running on threads that
-   * must not block should use this.
+   * must not block use this rather than {@link #makeCursorHolder}.
    * <p>
-   * The default implementation completes synchronously by delegating to 
{@link #makeCursorHolder(CursorBuildSpec)},
-   * which keeps every existing implementation async-correct without changes.
+   * There is intentionally no working default: this method must be explicitly 
implemented to participate in
+   * async-aware engines (MSQ). A factory whose source is always 
fully-resident and never needs to block while waiting
+   * on some other thread to perform work can implement {@link 
ResidentCursorFactory} instead of {@link CursorFactory}
+   * directly, which provides a default implementation of this method that 
wraps
+   * {@link #makeCursorHolder(CursorBuildSpec)}.
    */
   default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
   {
-    return AsyncCursorHolder.completed(makeCursorHolder(spec));
+    throw DruidException.defensive(
+        "makeCursorHolderAsync is not implemented by [%s]. Override it (or 
implement ResidentCursorFactory): return "
+        + "AsyncCursorHolder.completed(makeCursorHolder(spec)) if the source 
is always fully resident, or build/await "
+        + "the cursor holder asynchronously if it supports load on demand.",
+        getClass().getName()
+    );
   }
 
   /**
diff --git 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 1d752cb6686..1803554dc56 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -58,7 +58,7 @@ import java.util.Arrays;
 import java.util.LinkedHashSet;
 import java.util.List;
 
-public class QueryableIndexCursorFactory implements CursorFactory
+public class QueryableIndexCursorFactory implements ResidentCursorFactory
 {
   private final QueryableIndex index;
   private final TimeBoundaryInspector timeBoundaryInspector;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java 
b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
new file mode 100644
index 00000000000..99cf4461709
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+/**
+ * A {@link CursorFactory} whose {@link #makeCursorHolder} never blocks on 
I/O, i.e. a fully-resident / in-memory
+ * source with no on-demand loading. Implementing this interface, rather than 
{@link CursorFactory} directly, declares
+ * that intent and supplies the trivial {@link #makeCursorHolderAsync} 
implementation: the holder is built synchronously
+ * and returned already completed.
+ * <p>
+ * Factories backed by, or wrapping, an on-demand source must implement {@link 
CursorFactory} directly and provide a
+ * real {@link CursorFactory#makeCursorHolderAsync} that builds/awaits the 
holder asynchronously so they never block the
+ * calling thread.
+ */
+public interface ResidentCursorFactory extends CursorFactory
+{
+  @Override
+  default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    return AsyncCursorHolder.completed(makeCursorHolder(spec));
+  }
+}
diff --git 
a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java 
b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
index 13f2da53e82..3594c3a92f8 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
@@ -32,7 +32,7 @@ import org.apache.druid.utils.CloseableUtils;
 import javax.annotation.Nullable;
 import java.util.List;
 
-public class RowBasedCursorFactory<RowType> implements CursorFactory
+public class RowBasedCursorFactory<RowType> implements ResidentCursorFactory
 {
   private final Sequence<RowType> rowSequence;
   private final RowAdapter<RowType> rowAdapter;
diff --git 
a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java 
b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
index 8359b8664ab..b39264ac807 100644
--- a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
@@ -76,9 +76,59 @@ public class UnnestCursorFactory implements CursorFactory
 
   @Override
   public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
+    final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, 
unnestColumn, filterSplit.getBaseTableFilter());
+
+    final Closer closer = Closer.create();
+    // base holder is built lazily on first asCursor()/getOrdering() and 
registered for close at that point
+    final Supplier<CursorHolder> baseHolderSupplier = Suppliers.memoize(
+        () -> 
closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
+    );
+    return unnestCursorHolder(spec, filterSplit, closer, baseHolderSupplier);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
+    final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec, 
unnestColumn, filterSplit.getBaseTableFilter());
+
+    // Build the base-table holder asynchronously (a partial base segment 
downloads its required columns here), then
+    // wrap the ready holder in the unnest holder. Closing the returned holder 
before it's ready cancels the base load.
+    final AsyncCursorHolder baseAsync = 
baseCursorFactory.makeCursorHolderAsync(unnestBuildSpec);
+    final AsyncCursorHolder asyncHolder = new 
AsyncCursorHolder(baseAsync::close);
+    baseAsync.addReadyCallback(() -> {
+      final CursorHolder unnestHolder;
+      try {
+        // release() transfers ownership of the base holder to us (and 
surfaces a base-load failure as its cause); from
+        // here the unnest holder owns closing it. Construction below can't 
throw, so the catch only fires on a base
+        // failure or a cancel race (baseAsync already closed), neither of 
which leaves a base holder to leak.
+        final CursorHolder baseHolder = baseAsync.release();
+        final Closer closer = Closer.create();
+        closer.register(baseHolder);
+        unnestHolder = unnestCursorHolder(spec, filterSplit, closer, 
Suppliers.ofInstance(baseHolder));
+      }
+      catch (Throwable t) {
+        asyncHolder.setException(t);
+        return;
+      }
+      if (!asyncHolder.set(unnestHolder)) {
+        // awaiter closed the wrapper while we were producing the holder; 
close it so the base holder doesn't leak
+        unnestHolder.close();
+      }
+    });
+    return asyncHolder;
+  }
+
+  /**
+   * Split the spec's filters into base-table and post-unnest filters (see
+   * {@link #computeBaseAndPostUnnestFilters}). Cheap and metadata-only; 
shared by the sync and async holder paths.
+   */
+  private UnnestFilterSplit computeFilterSplit(CursorBuildSpec spec)
   {
     final String input = getUnnestInputIfDirectAccess(unnestColumn);
-    final UnnestFilterSplit filterSplit = computeBaseAndPostUnnestFilters(
+    return computeBaseAndPostUnnestFilters(
         spec.getFilter(),
         filter != null ? filter.toFilter() : null,
         spec.getVirtualColumns(),
@@ -86,23 +136,26 @@ public class UnnestCursorFactory implements CursorFactory
         input,
         input == null ? null : 
spec.getVirtualColumns().getColumnCapabilitiesWithFallback(baseCursorFactory, 
input)
     );
-    final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(
-        spec,
-        unnestColumn,
-        filterSplit.getBaseTableFilter()
-    );
+  }
 
+  /**
+   * Build the unnest {@link CursorHolder} on top of a base-table holder. 
{@code baseHolderSupplier} provides the base
+   * holder: the sync path builds it lazily on first use and registers it with 
{@code closer}, while the async path
+   * supplies an already-materialized holder pre-registered with {@code 
closer}.
+   */
+  private CursorHolder unnestCursorHolder(
+      CursorBuildSpec spec,
+      UnnestFilterSplit filterSplit,
+      Closer closer,
+      Supplier<CursorHolder> baseHolderSupplier
+  )
+  {
     return new CursorHolder()
     {
-      final Closer closer = Closer.create();
-      final Supplier<CursorHolder> cursorHolderSupplier = Suppliers.memoize(
-          () -> 
closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
-      );
-
       @Override
       public Cursor asCursor()
       {
-        final Cursor cursor = cursorHolderSupplier.get().asCursor();
+        final Cursor cursor = baseHolderSupplier.get().asCursor();
         if (cursor == null) {
           return null;
         }
@@ -135,7 +188,7 @@ public class UnnestCursorFactory implements CursorFactory
       @Override
       public List<OrderBy> getOrdering()
       {
-        return computeOrdering(cursorHolderSupplier.get().getOrdering());
+        return computeOrdering(baseHolderSupplier.get().getOrdering());
       }
 
       @Override
diff --git 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
index 744a856c845..07cebffa2c9 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
@@ -30,9 +30,9 @@ import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ConcatenatingCursor;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorBuildSpec;
-import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.EmptyCursorHolder;
+import org.apache.druid.segment.ResidentCursorFactory;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ColumnType;
@@ -50,7 +50,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-public class IncrementalIndexCursorFactory implements CursorFactory
+public class IncrementalIndexCursorFactory implements ResidentCursorFactory
 {
   private static final ColumnCapabilities.CoercionLogic COERCE_LOGIC =
       new ColumnCapabilities.CoercionLogic()
diff --git 
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
index b5354de02ad..a7030fae546 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.query.Order;
 import org.apache.druid.query.OrderBy;
 import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.AsyncCursorHolder;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
@@ -77,12 +78,74 @@ public class HashJoinSegmentCursorFactory implements 
CursorFactory
   public CursorHolder makeCursorHolder(CursorBuildSpec spec)
   {
     final Filter combinedFilter = baseFilterAnd(spec.getFilter());
+    final Set<String> physicalColumns = computeBasePhysicalColumns(spec, 
combinedFilter);
 
-    // for physical column tracking, we start by copying base spec physical 
columns
+    if (clauses.isEmpty()) {
+      // if there are no clauses, we can just use the base cursor directly if 
we apply the combined filter
+      return baseCursorFactory.makeCursorHolder(noClausesBaseSpec(spec, 
combinedFilter, physicalColumns));
+    }
+
+    // else we need to wipe out the grouping, aggregations, and ordering
+    final Closer joinablesCloser = Closer.create();
+    final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter, 
physicalColumns);
+    final CursorHolder baseCursorHolder = joinablesCloser.register(
+        baseCursorFactory.makeCursorHolder(plan.baseBuildSpec)
+    );
+    return joinCursorHolder(plan, joinablesCloser, baseCursorHolder);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    final Filter combinedFilter = baseFilterAnd(spec.getFilter());
+    final Set<String> physicalColumns = computeBasePhysicalColumns(spec, 
combinedFilter);
+
+    if (clauses.isEmpty()) {
+      return baseCursorFactory.makeCursorHolderAsync(noClausesBaseSpec(spec, 
combinedFilter, physicalColumns));
+    }
+
+    final Closer joinablesCloser = Closer.create();
+    // Join filter analysis + base-spec computation are CPU-only; do them 
synchronously to learn the base spec.
+    final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter, 
physicalColumns);
+
+    // Build the left/base holder asynchronously (a partial base segment 
downloads its required columns here); the
+    // join's build-side joinables are already-resident in-memory tables, so 
the base is the only async piece. Once it's
+    // ready, wrap it with the join cursors. Closing the returned holder 
before it's ready cancels the base load.
+    final AsyncCursorHolder baseAsync = 
baseCursorFactory.makeCursorHolderAsync(plan.baseBuildSpec);
+    final AsyncCursorHolder asyncHolder = new 
AsyncCursorHolder(baseAsync::close);
+    baseAsync.addReadyCallback(() -> {
+      final CursorHolder joinHolder;
+      try {
+        // release() transfers ownership of the base holder to us (and 
surfaces a base-load failure as its cause); the
+        // join holder now owns closing it via joinablesCloser. The wrap below 
can't throw, so the catch only fires on
+        // a base failure or a cancel race (baseAsync already closed), in both 
cases joinablesCloser is still empty.
+        final CursorHolder baseHolder = baseAsync.release();
+        joinablesCloser.register(baseHolder);
+        joinHolder = joinCursorHolder(plan, joinablesCloser, baseHolder);
+      }
+      catch (Throwable t) {
+        asyncHolder.setException(t);
+        return;
+      }
+      if (!asyncHolder.set(joinHolder)) {
+        // awaiter closed the wrapper while we were producing the holder; 
close it so the base holder doesn't leak
+        joinHolder.close();
+      }
+    });
+    return asyncHolder;
+  }
+
+  /**
+   * Physical columns to pass to the base cursor: a copy of the spec's 
physical columns plus any columns required by
+   * the combined filter (null when the spec didn't declare physical columns, 
meaning "all"). Shared by the no-clauses
+   * and join paths.
+   */
+  @Nullable
+  private static Set<String> computeBasePhysicalColumns(CursorBuildSpec spec, 
@Nullable Filter combinedFilter)
+  {
     final Set<String> physicalColumns = spec.getPhysicalColumns() != null
                                         ? new 
HashSet<>(spec.getPhysicalColumns())
                                         : null;
-
     if (physicalColumns != null && combinedFilter != null) {
       for (String column : combinedFilter.getRequiredColumns()) {
         if (!spec.getVirtualColumns().exists(column)) {
@@ -90,129 +153,135 @@ public class HashJoinSegmentCursorFactory implements 
CursorFactory
         }
       }
     }
+    return physicalColumns;
+  }
 
-    if (clauses.isEmpty()) {
-      // if there are no clauses, we can just use the base cursor directly if 
we apply the combined filter
-      final CursorBuildSpec newSpec = CursorBuildSpec.builder(spec)
-                                                     .setFilter(combinedFilter)
-                                                     
.setPhysicalColumns(physicalColumns)
-                                                     .build();
-      return baseCursorFactory.makeCursorHolder(newSpec);
-    }
-
-    // else we need to wipe out the grouping, aggregations, and ordering
-
-    return new CursorHolder()
-    {
-      final Closer joinablesCloser = Closer.create();
-
-      /**
-       * Typically the same as {@link 
HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when
-       * an unnest datasource is layered on top of a join datasource.
-       */
-      final JoinFilterPreAnalysis actualPreAnalysis;
-
-      /**
-       * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link 
#actualPreAnalysis} and
-       * {@link HashJoinSegmentCursorFactory#baseFilter}.
-       */
-      final JoinFilterSplit joinFilterSplit;
-
-      /**
-       * Cursor holder for {@link 
HashJoinSegmentCursorFactory#baseCursorFactory}.
-       */
-      final CursorHolder baseCursorHolder;
-
-      {
-        // Filter pre-analysis key implied by the call to "makeCursorHolder". 
We need to sanity-check that it matches
-        // the actual pre-analysis that was done. Note: we could now infer a 
rewrite config from the "makeCursorHolder"
-        // call (it requires access to the query context which we now have 
access to since the move away from
-        // CursorFactory) but this code hasn't been updated to sanity-check 
it, so currently we are still skipping it
-        // by re-using the one present in the cached key.
-        final JoinFilterPreAnalysisKey keyIn =
-            new JoinFilterPreAnalysisKey(
-                joinFilterPreAnalysis.getKey().getRewriteConfig(),
-                clauses,
-                spec.getVirtualColumns(),
-                combinedFilter
-            );
-
-        final JoinFilterPreAnalysisKey keyCached = 
joinFilterPreAnalysis.getKey();
-        if (keyIn.equals(keyCached)) {
-          // Common case: key used during filter pre-analysis (keyCached) 
matches key implied by makeCursorHolder call
-          // (keyIn).
-          actualPreAnalysis = joinFilterPreAnalysis;
-        } else {
-          // Less common case: key differs. Re-analyze the filter. This case 
can happen when an unnest datasource is
-          // layered on top of a join datasource.
-          actualPreAnalysis = 
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn);
-        }
+  /**
+   * Base cursor spec for the no-clauses case: the original spec with the 
combined filter and base physical columns.
+   */
+  private static CursorBuildSpec noClausesBaseSpec(
+      CursorBuildSpec spec,
+      @Nullable Filter combinedFilter,
+      @Nullable Set<String> physicalColumns
+  )
+  {
+    return CursorBuildSpec.builder(spec)
+                          .setFilter(combinedFilter)
+                          .setPhysicalColumns(physicalColumns)
+                          .build();
+  }
 
-        joinFilterSplit = JoinFilterAnalyzer.splitFilter(
-            actualPreAnalysis,
-            baseFilter
+  /**
+   * Run the (CPU-only) join filter pre-analysis and compute the base-table 
cursor build spec. Returns the analysis
+   * results needed at cursor-construction time ({@link JoinCursorPlan}). 
{@code physicalColumns} is mutated in place
+   * (it accumulates base-filter, pre-join virtual-column, and 
clause-condition columns, minus the join prefixes).
+   */
+  private JoinCursorPlan planJoinCursor(
+      CursorBuildSpec spec,
+      @Nullable Filter combinedFilter,
+      @Nullable Set<String> physicalColumns
+  )
+  {
+    // Filter pre-analysis key implied by the call to "makeCursorHolder". We 
need to sanity-check that it matches
+    // the actual pre-analysis that was done. Note: we could now infer a 
rewrite config from the "makeCursorHolder"
+    // call (it requires access to the query context which we now have access 
to) but this code hasn't been updated to
+    // sanity-check it, so currently we are still skipping it by re-using the 
one present in the cached key.
+    final JoinFilterPreAnalysisKey keyIn =
+        new JoinFilterPreAnalysisKey(
+            joinFilterPreAnalysis.getKey().getRewriteConfig(),
+            clauses,
+            spec.getVirtualColumns(),
+            combinedFilter
         );
 
-        // start with a full scan clipped to interval
-        final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder =
-            CursorBuildSpec.builder()
-                           .setInterval(spec.getInterval())
-                           .setQueryContext(spec.getQueryContext())
-                           .setQueryMetrics(spec.getQueryMetrics());
-
-        // retain time ordering if preferred
-        Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering());
-        if (timeOrder == Order.DESCENDING) {
-          
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder());
-        } else if (timeOrder == Order.ASCENDING) {
-          
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder());
-        }
+    final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey();
+    final JoinFilterPreAnalysis actualPreAnalysis;
+    if (keyIn.equals(keyCached)) {
+      // Common case: key used during filter pre-analysis (keyCached) matches 
key implied by makeCursorHolder call
+      // (keyIn).
+      actualPreAnalysis = joinFilterPreAnalysis;
+    } else {
+      // Less common case: key differs. Re-analyze the filter. This case can 
happen when an unnest datasource is
+      // layered on top of a join datasource.
+      actualPreAnalysis = 
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn);
+    }
 
-        // add pushdown filters if present
-        if (joinFilterSplit.getBaseTableFilter().isPresent()) {
-          
cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get());
-        }
-        final VirtualColumns preJoinVirtualColumns = 
VirtualColumns.fromIterable(
-            Iterables.concat(
-                Sets.difference(
-                    
ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()),
-                    joinFilterPreAnalysis.getPostJoinVirtualColumns()
-                ),
-                joinFilterSplit.getPushDownVirtualColumns()
-            )
-        );
-        cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns);
-
-        // add all base table physical columns if they were originally set
-        if (physicalColumns != null) {
-          if (joinFilterSplit.getBaseTableFilter().isPresent()) {
-            for (String column : 
joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) {
-              if (!spec.getVirtualColumns().exists(column) && 
!preJoinVirtualColumns.exists(column)) {
-                physicalColumns.add(column);
-              }
-            }
-          }
-          for (VirtualColumn virtualColumn : 
preJoinVirtualColumns.getVirtualColumns()) {
-            for (String column : virtualColumn.requiredColumns()) {
-              if (!spec.getVirtualColumns().exists(column) && 
!preJoinVirtualColumns.exists(column)) {
-                physicalColumns.add(column);
-              }
-            }
-          }
-          final Set<String> prefixes = new HashSet<>();
-          for (JoinableClause clause : clauses) {
-            prefixes.add(clause.getPrefix());
-            physicalColumns.addAll(clause.getCondition().getRequiredColumns());
+    final JoinFilterSplit joinFilterSplit = 
JoinFilterAnalyzer.splitFilter(actualPreAnalysis, baseFilter);
+
+    // start with a full scan clipped to interval
+    final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder =
+        CursorBuildSpec.builder()
+                       .setInterval(spec.getInterval())
+                       .setQueryContext(spec.getQueryContext())
+                       .setQueryMetrics(spec.getQueryMetrics());
+
+    // retain time ordering if preferred
+    Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering());
+    if (timeOrder == Order.DESCENDING) {
+      
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder());
+    } else if (timeOrder == Order.ASCENDING) {
+      
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder());
+    }
+
+    // add pushdown filters if present
+    if (joinFilterSplit.getBaseTableFilter().isPresent()) {
+      
cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get());
+    }
+    final VirtualColumns preJoinVirtualColumns = VirtualColumns.fromIterable(
+        Iterables.concat(
+            Sets.difference(
+                
ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()),
+                joinFilterPreAnalysis.getPostJoinVirtualColumns()
+            ),
+            joinFilterSplit.getPushDownVirtualColumns()
+        )
+    );
+    cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns);
+
+    // add all base table physical columns if they were originally set
+    if (physicalColumns != null) {
+      if (joinFilterSplit.getBaseTableFilter().isPresent()) {
+        for (String column : 
joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) {
+          if (!spec.getVirtualColumns().exists(column) && 
!preJoinVirtualColumns.exists(column)) {
+            physicalColumns.add(column);
           }
-          for (String prefix : prefixes) {
-            physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, 
prefix));
+        }
+      }
+      for (VirtualColumn virtualColumn : 
preJoinVirtualColumns.getVirtualColumns()) {
+        for (String column : virtualColumn.requiredColumns()) {
+          if (!spec.getVirtualColumns().exists(column) && 
!preJoinVirtualColumns.exists(column)) {
+            physicalColumns.add(column);
           }
-          cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns);
         }
-
-        baseCursorHolder = 
joinablesCloser.register(baseCursorFactory.makeCursorHolder(cursorBuildSpecBuilder.build()));
       }
+      final Set<String> prefixes = new HashSet<>();
+      for (JoinableClause clause : clauses) {
+        prefixes.add(clause.getPrefix());
+        physicalColumns.addAll(clause.getCondition().getRequiredColumns());
+      }
+      for (String prefix : prefixes) {
+        physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, prefix));
+      }
+      cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns);
+    }
+
+    return new JoinCursorPlan(actualPreAnalysis, joinFilterSplit, 
cursorBuildSpecBuilder.build());
+  }
 
+  /**
+   * Build the join {@link CursorHolder} on top of a base-table holder. {@code 
baseCursorHolder} must already be
+   * registered with {@code joinablesCloser} (the sync path registers it when 
built; the async path registers the
+   * released holder); closing the returned holder closes {@code 
joinablesCloser} (the base holder + per-cursor join
+   * matchers created in {@link CursorHolder#asCursor}).
+   */
+  private CursorHolder joinCursorHolder(
+      JoinCursorPlan plan,
+      Closer joinablesCloser,
+      CursorHolder baseCursorHolder
+  )
+  {
+    return new CursorHolder()
+    {
       @Override
       public Cursor asCursor()
       {
@@ -230,8 +299,8 @@ public class HashJoinSegmentCursorFactory implements 
CursorFactory
 
         return PostJoinCursor.wrap(
             retVal,
-            
VirtualColumns.fromIterable(actualPreAnalysis.getPostJoinVirtualColumns()),
-            joinFilterSplit.getJoinTableFilter().orElse(null)
+            
VirtualColumns.fromIterable(plan.actualPreAnalysis.getPostJoinVirtualColumns()),
+            plan.joinFilterSplit.getJoinTableFilter().orElse(null)
         );
       }
 
@@ -320,4 +389,39 @@ public class HashJoinSegmentCursorFactory implements 
CursorFactory
 
     return limit == baseOrdering.size() ? baseOrdering : 
baseOrdering.subList(0, limit);
   }
+
+  /**
+   * Outputs of {@link #planJoinCursor}: the (possibly re-computed) join 
filter pre-analysis and split needed when the
+   * join cursor is actually constructed, plus the base-table cursor build 
spec used to open the left-side holder.
+   */
+  private static final class JoinCursorPlan
+  {
+    /**
+     * Typically the same as {@link 
HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when an 
unnest
+     * datasource is layered on top of a join datasource.
+     */
+    private final JoinFilterPreAnalysis actualPreAnalysis;
+
+    /**
+     * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link 
#actualPreAnalysis} and
+     * {@link HashJoinSegmentCursorFactory#baseFilter}.
+     */
+    private final JoinFilterSplit joinFilterSplit;
+
+    /**
+     * Build spec for the left-side {@link 
HashJoinSegmentCursorFactory#baseCursorFactory} holder.
+     */
+    private final CursorBuildSpec baseBuildSpec;
+
+    private JoinCursorPlan(
+        JoinFilterPreAnalysis actualPreAnalysis,
+        JoinFilterSplit joinFilterSplit,
+        CursorBuildSpec baseBuildSpec
+    )
+    {
+      this.actualPreAnalysis = actualPreAnalysis;
+      this.joinFilterSplit = joinFilterSplit;
+      this.baseBuildSpec = baseBuildSpec;
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
 
b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
index 94654f58c85..546bd55f037 100644
--- 
a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
@@ -27,6 +27,7 @@ import org.apache.druid.segment.CursorFactory;
 import org.apache.druid.segment.CursorHolder;
 import org.apache.druid.segment.NoopQueryableIndex;
 import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.ResidentCursorFactory;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.SegmentLazyLoadFailCallback;
 import org.apache.druid.segment.column.ColumnCapabilities;
@@ -88,7 +89,7 @@ public class TombstoneSegmentizerFactory implements 
SegmentizerFactory
       public <T> T as(@Nonnull Class<T> clazz)
       {
         if (CursorFactory.class.equals(clazz)) {
-          return (T) new CursorFactory()
+          return (T) new ResidentCursorFactory()
           {
             @Override
             public CursorHolder makeCursorHolder(CursorBuildSpec spec)
diff --git 
a/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java 
b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
new file mode 100644
index 00000000000..e9e9305e6f3
--- /dev/null
+++ 
b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test {@link CursorFactory} that produces its async cursor holder on demand: 
{@link #makeCursorHolderAsync} hands back
+ * an unfinished {@link AsyncCursorHolder}, and {@link #complete()} fills it 
(by building the delegate's holder for the
+ * last requested spec) so a test can drive the await transition 
deterministically. {@link #canceled} flips when the
+ * returned holder is closed before it is completed. Used to exercise the 
{@code makeCursorHolderAsync} override of
+ * wrapping cursor factories (e.g. unnest, join) against a base that loads 
asynchronously.
+ */
+public class DeferredCursorFactory implements CursorFactory
+{
+  private final CursorFactory delegate;
+  public final AtomicBoolean canceled = new AtomicBoolean(false);
+  private AsyncCursorHolder pending;
+  private CursorBuildSpec pendingSpec;
+
+  public DeferredCursorFactory(CursorFactory delegate)
+  {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+  {
+    return delegate.makeCursorHolder(spec);
+  }
+
+  @Override
+  public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+  {
+    pendingSpec = spec;
+    pending = new AsyncCursorHolder(() -> canceled.set(true));
+    return pending;
+  }
+
+  /**
+   * Completes the most recent {@link #makeCursorHolderAsync} holder by 
building the delegate's holder for the spec it
+   * was called with.
+   */
+  public void complete()
+  {
+    pending.set(delegate.makeCursorHolder(pendingSpec));
+  }
+
+  @Override
+  public RowSignature getRowSignature()
+  {
+    return delegate.getRowSignature();
+  }
+
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String column)
+  {
+    return delegate.getColumnCapabilities(column);
+  }
+}
diff --git 
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
index 8ede1ac203c..11a24346939 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
@@ -256,6 +256,62 @@ public class UnnestCursorFactoryTest extends 
InitializedNullHandlingTest
     }
   }
 
+  @Test
+  public void 
test_unnest_factory_async_awaits_base_then_produces_unnest_cursor()
+  {
+    // base produces its holder asynchronously (mimics a partial segment 
downloading its columns). The unnest async
+    // holder must stay not-ready until the base completes, then yield the 
same unnested rows as the sync path.
+    final DeferredCursorFactory base = new 
DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY);
+    final UnnestCursorFactory unnest = new UnnestCursorFactory(
+        base,
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + 
INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()),
+        null
+    );
+
+    final AsyncCursorHolder asyncHolder = 
unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+    try {
+      Assert.assertFalse("unnest holder must wait for the base to complete", 
asyncHolder.isReady());
+
+      base.complete();
+      Assert.assertTrue("unnest holder becomes ready once the base completes", 
asyncHolder.isReady());
+
+      try (final CursorHolder cursorHolder = asyncHolder.release()) {
+        final Cursor cursor = cursorHolder.asCursor();
+        final DimensionSelector dimSelector =
+            
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+        final List<Object> rows = new ArrayList<>();
+        while (!cursor.isDone()) {
+          rows.add(dimSelector.getObject());
+          cursor.advance();
+        }
+        Assert.assertEquals(
+            Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "10", "11", 
"12", "13", "14", "15", "8", "9"),
+            rows
+        );
+      }
+    }
+    finally {
+      // no-op after release(); cancels the base load if an assertion above 
bailed before release()
+      asyncHolder.close();
+    }
+  }
+
+  @Test
+  public void test_unnest_factory_async_close_before_ready_cancels_base()
+  {
+    final DeferredCursorFactory base = new 
DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY);
+    final UnnestCursorFactory unnest = new UnnestCursorFactory(
+        base,
+        new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" + 
INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()),
+        null
+    );
+
+    final AsyncCursorHolder asyncHolder = 
unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+    Assert.assertFalse(asyncHolder.isReady());
+    asyncHolder.close();
+    Assert.assertTrue("closing the unnest holder before it's ready cancels the 
base load", base.canceled.get());
+  }
+
   @Test
   public void test_unnest_factory_basic_array_column()
   {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
index e1b4ffdf541..2c803dc40e7 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
@@ -30,8 +30,12 @@ import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.query.filter.OrDimFilter;
 import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.AsyncCursorHolder;
+import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.CursorBuildSpec;
 import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.DeferredCursorFactory;
 import org.apache.druid.segment.ReferenceCountedSegmentProvider;
 import org.apache.druid.segment.TopNOptimizationInspector;
 import org.apache.druid.segment.VirtualColumns;
@@ -215,6 +219,77 @@ public class HashJoinSegmentCursorFactoryTest extends 
BaseHashJoinSegmentCursorF
     );
   }
 
+  @Test
+  public void test_makeCursorAsync_factToCountryLeft_awaitsBaseThenJoins()
+  {
+    final List<JoinableClause> joinableClauses = 
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
+    final JoinFilterPreAnalysis joinFilterPreAnalysis =
+        makeDefaultConfigPreAnalysis(null, joinableClauses, 
VirtualColumns.EMPTY);
+
+    // joined row count from the synchronous path, to compare the async result 
against without re-listing every row
+    final int expectedRows;
+    try (CursorHolder syncHolder = new HashJoinSegmentCursorFactory(
+        factSegment.as(CursorFactory.class),
+        null,
+        joinableClauses,
+        joinFilterPreAnalysis
+    ).makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+      expectedRows = countRows(syncHolder);
+    }
+    Assert.assertTrue(expectedRows > 0);
+
+    // async path: the left/base holder loads on demand (mimics a partial base 
segment); build-side joinable is resident
+    final DeferredCursorFactory base = new 
DeferredCursorFactory(factSegment.as(CursorFactory.class));
+    final AsyncCursorHolder asyncHolder = new HashJoinSegmentCursorFactory(
+        base,
+        null,
+        joinableClauses,
+        joinFilterPreAnalysis
+    ).makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+    try {
+      Assert.assertFalse("join holder must wait for the base to complete", 
asyncHolder.isReady());
+      base.complete();
+      Assert.assertTrue("join holder becomes ready once the base completes", 
asyncHolder.isReady());
+      try (CursorHolder holder = asyncHolder.release()) {
+        Assert.assertEquals(expectedRows, countRows(holder));
+      }
+    }
+    finally {
+      asyncHolder.close();
+    }
+  }
+
+  @Test
+  public void test_makeCursorAsync_factToCountry_closeBeforeReadyCancelsBase()
+  {
+    final List<JoinableClause> joinableClauses = 
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
+    final JoinFilterPreAnalysis joinFilterPreAnalysis =
+        makeDefaultConfigPreAnalysis(null, joinableClauses, 
VirtualColumns.EMPTY);
+
+    final DeferredCursorFactory base = new 
DeferredCursorFactory(factSegment.as(CursorFactory.class));
+    final AsyncCursorHolder asyncHolder = new HashJoinSegmentCursorFactory(
+        base,
+        null,
+        joinableClauses,
+        joinFilterPreAnalysis
+    ).makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+
+    Assert.assertFalse(asyncHolder.isReady());
+    asyncHolder.close();
+    Assert.assertTrue("closing the join holder before it's ready cancels the 
base load", base.canceled.get());
+  }
+
+  private static int countRows(CursorHolder holder)
+  {
+    final Cursor cursor = holder.asCursor();
+    int count = 0;
+    while (!cursor.isDone()) {
+      cursor.advance();
+      count++;
+    }
+    return count;
+  }
+
   @Test
   public void test_makeCursor_factToCountryLeftUsingLookup()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to