This is an automated email from the ASF dual-hosted git repository.
clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 60e34876ef3 feat: unnest and join makeCursorHolderAsync (#19600)
60e34876ef3 is described below
commit 60e34876ef37f6879344c1ca881176c61d4bd99e
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Jun 24 13:11:37 2026 -0700
feat: unnest and join makeCursorHolderAsync (#19600)
---
.../columnar/ColumnarFrameCursorFactory.java | 3 +-
.../frame/segment/row/RowFrameCursorFactory.java | 3 +-
.../org/apache/druid/segment/CursorFactory.java | 17 +-
.../druid/segment/QueryableIndexCursorFactory.java | 2 +-
.../druid/segment/ResidentCursorFactory.java | 39 +++
.../druid/segment/RowBasedCursorFactory.java | 2 +-
.../apache/druid/segment/UnnestCursorFactory.java | 79 ++++-
.../incremental/IncrementalIndexCursorFactory.java | 4 +-
.../segment/join/HashJoinSegmentCursorFactory.java | 338 ++++++++++++++-------
.../loading/TombstoneSegmentizerFactory.java | 3 +-
.../druid/segment/DeferredCursorFactory.java | 80 +++++
.../druid/segment/UnnestCursorFactoryTest.java | 56 ++++
.../join/HashJoinSegmentCursorFactoryTest.java | 75 +++++
13 files changed, 560 insertions(+), 141 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
index 6dba57245fc..fd9ffc15e2f 100644
---
a/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/frame/segment/columnar/ColumnarFrameCursorFactory.java
@@ -40,6 +40,7 @@ import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.QueryableIndexColumnSelectorFactory;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.VirtualColumns;
@@ -64,7 +65,7 @@ import java.util.List;
*
* @see RowFrameCursorFactory the row-based version
*/
-public class ColumnarFrameCursorFactory implements CursorFactory
+public class ColumnarFrameCursorFactory implements ResidentCursorFactory
{
private final Frame frame;
private final RowSignature signature;
diff --git
a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
index 93523893c8c..4f72640e95c 100644
---
a/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/frame/segment/row/RowFrameCursorFactory.java
@@ -33,6 +33,7 @@ import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.SimpleAscendingOffset;
import org.apache.druid.segment.SimpleSettableOffset;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -49,7 +50,7 @@ import java.util.List;
*
* @see ColumnarFrameCursorFactory the columnar version
*/
-public class RowFrameCursorFactory implements CursorFactory
+public class RowFrameCursorFactory implements ResidentCursorFactory
{
private final Frame frame;
private final FrameReader frameReader;
diff --git
a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
index 4a79490adc3..486625bb7f3 100644
--- a/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/CursorFactory.java
@@ -19,6 +19,7 @@
package org.apache.druid.segment;
+import org.apache.druid.error.DruidException;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.RowSignature;
@@ -35,14 +36,22 @@ public interface CursorFactory extends ColumnInspector
/**
* Asynchronous variant of {@link #makeCursorHolder(CursorBuildSpec)} for
cursor factories that may need to do I/O
* (e.g. download column data from deep storage) before they can serve a
cursor. Callers running on threads that
- * must not block should use this.
+ * must not block use this rather than {@link #makeCursorHolder}.
* <p>
- * The default implementation completes synchronously by delegating to
{@link #makeCursorHolder(CursorBuildSpec)},
- * which keeps every existing implementation async-correct without changes.
+ * There is intentionally no working default: this method must be explicitly
implemented to participate in
+ * async-aware engines (MSQ). A factory whose source is always
fully-resident and never needs to block while waiting
+ * on some other thread to perform work can implement {@link
ResidentCursorFactory} instead of {@link CursorFactory}
+ * directly, which provides a default implementation of this method that
wraps
+ * {@link #makeCursorHolder(CursorBuildSpec)}.
*/
default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
{
- return AsyncCursorHolder.completed(makeCursorHolder(spec));
+ throw DruidException.defensive(
+ "makeCursorHolderAsync is not implemented by [%s]. Override it (or
implement ResidentCursorFactory): return "
+ + "AsyncCursorHolder.completed(makeCursorHolder(spec)) if the source
is always fully resident, or build/await "
+ + "the cursor holder asynchronously if it supports load on demand.",
+ getClass().getName()
+ );
}
/**
diff --git
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
index 1d752cb6686..1803554dc56 100644
---
a/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/QueryableIndexCursorFactory.java
@@ -58,7 +58,7 @@ import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
-public class QueryableIndexCursorFactory implements CursorFactory
+public class QueryableIndexCursorFactory implements ResidentCursorFactory
{
private final QueryableIndex index;
private final TimeBoundaryInspector timeBoundaryInspector;
diff --git
a/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
new file mode 100644
index 00000000000..99cf4461709
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/segment/ResidentCursorFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+/**
+ * A {@link CursorFactory} whose {@link #makeCursorHolder} never blocks on
I/O, i.e. a fully-resident / in-memory
+ * source with no on-demand loading. Implementing this interface, rather than
{@link CursorFactory} directly, declares
+ * that intent and supplies the trivial {@link #makeCursorHolderAsync}
implementation: the holder is built synchronously
+ * and returned already completed.
+ * <p>
+ * Factories backed by, or wrapping, an on-demand source must implement {@link
CursorFactory} directly and provide a
+ * real {@link CursorFactory#makeCursorHolderAsync} that builds/awaits the
holder asynchronously so they never block the
+ * calling thread.
+ */
+public interface ResidentCursorFactory extends CursorFactory
+{
+ @Override
+ default AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ return AsyncCursorHolder.completed(makeCursorHolder(spec));
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
index 13f2da53e82..3594c3a92f8 100644
---
a/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/RowBasedCursorFactory.java
@@ -32,7 +32,7 @@ import org.apache.druid.utils.CloseableUtils;
import javax.annotation.Nullable;
import java.util.List;
-public class RowBasedCursorFactory<RowType> implements CursorFactory
+public class RowBasedCursorFactory<RowType> implements ResidentCursorFactory
{
private final Sequence<RowType> rowSequence;
private final RowAdapter<RowType> rowAdapter;
diff --git
a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
index 8359b8664ab..b39264ac807 100644
--- a/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/UnnestCursorFactory.java
@@ -76,9 +76,59 @@ public class UnnestCursorFactory implements CursorFactory
@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+ {
+ final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
+ final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec,
unnestColumn, filterSplit.getBaseTableFilter());
+
+ final Closer closer = Closer.create();
+ // base holder is built lazily on first asCursor()/getOrdering() and
registered for close at that point
+ final Supplier<CursorHolder> baseHolderSupplier = Suppliers.memoize(
+ () ->
closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
+ );
+ return unnestCursorHolder(spec, filterSplit, closer, baseHolderSupplier);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ final UnnestFilterSplit filterSplit = computeFilterSplit(spec);
+ final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(spec,
unnestColumn, filterSplit.getBaseTableFilter());
+
+ // Build the base-table holder asynchronously (a partial base segment
downloads its required columns here), then
+ // wrap the ready holder in the unnest holder. Closing the returned holder
before it's ready cancels the base load.
+ final AsyncCursorHolder baseAsync =
baseCursorFactory.makeCursorHolderAsync(unnestBuildSpec);
+ final AsyncCursorHolder asyncHolder = new
AsyncCursorHolder(baseAsync::close);
+ baseAsync.addReadyCallback(() -> {
+ final CursorHolder unnestHolder;
+ try {
+ // release() transfers ownership of the base holder to us (and
surfaces a base-load failure as its cause); from
+ // here the unnest holder owns closing it. Construction below can't
throw, so the catch only fires on a base
+ // failure or a cancel race (baseAsync already closed), neither of
which leaves a base holder to leak.
+ final CursorHolder baseHolder = baseAsync.release();
+ final Closer closer = Closer.create();
+ closer.register(baseHolder);
+ unnestHolder = unnestCursorHolder(spec, filterSplit, closer,
Suppliers.ofInstance(baseHolder));
+ }
+ catch (Throwable t) {
+ asyncHolder.setException(t);
+ return;
+ }
+ if (!asyncHolder.set(unnestHolder)) {
+ // awaiter closed the wrapper while we were producing the holder;
close it so the base holder doesn't leak
+ unnestHolder.close();
+ }
+ });
+ return asyncHolder;
+ }
+
+ /**
+ * Split the spec's filters into base-table and post-unnest filters (see
+ * {@link #computeBaseAndPostUnnestFilters}). Cheap and metadata-only;
shared by the sync and async holder paths.
+ */
+ private UnnestFilterSplit computeFilterSplit(CursorBuildSpec spec)
{
final String input = getUnnestInputIfDirectAccess(unnestColumn);
- final UnnestFilterSplit filterSplit = computeBaseAndPostUnnestFilters(
+ return computeBaseAndPostUnnestFilters(
spec.getFilter(),
filter != null ? filter.toFilter() : null,
spec.getVirtualColumns(),
@@ -86,23 +136,26 @@ public class UnnestCursorFactory implements CursorFactory
input,
input == null ? null :
spec.getVirtualColumns().getColumnCapabilitiesWithFallback(baseCursorFactory,
input)
);
- final CursorBuildSpec unnestBuildSpec = transformCursorBuildSpec(
- spec,
- unnestColumn,
- filterSplit.getBaseTableFilter()
- );
+ }
+ /**
+ * Build the unnest {@link CursorHolder} on top of a base-table holder.
{@code baseHolderSupplier} provides the base
+ * holder: the sync path builds it lazily on first use and registers it with
{@code closer}, while the async path
+ * supplies an already-materialized holder pre-registered with {@code
closer}.
+ */
+ private CursorHolder unnestCursorHolder(
+ CursorBuildSpec spec,
+ UnnestFilterSplit filterSplit,
+ Closer closer,
+ Supplier<CursorHolder> baseHolderSupplier
+ )
+ {
return new CursorHolder()
{
- final Closer closer = Closer.create();
- final Supplier<CursorHolder> cursorHolderSupplier = Suppliers.memoize(
- () ->
closer.register(baseCursorFactory.makeCursorHolder(unnestBuildSpec))
- );
-
@Override
public Cursor asCursor()
{
- final Cursor cursor = cursorHolderSupplier.get().asCursor();
+ final Cursor cursor = baseHolderSupplier.get().asCursor();
if (cursor == null) {
return null;
}
@@ -135,7 +188,7 @@ public class UnnestCursorFactory implements CursorFactory
@Override
public List<OrderBy> getOrdering()
{
- return computeOrdering(cursorHolderSupplier.get().getOrdering());
+ return computeOrdering(baseHolderSupplier.get().getOrdering());
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
index 744a856c845..07cebffa2c9 100644
---
a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactory.java
@@ -30,9 +30,9 @@ import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ConcatenatingCursor;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
-import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.EmptyCursorHolder;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnType;
@@ -50,7 +50,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-public class IncrementalIndexCursorFactory implements CursorFactory
+public class IncrementalIndexCursorFactory implements ResidentCursorFactory
{
private static final ColumnCapabilities.CoercionLogic COERCE_LOGIC =
new ColumnCapabilities.CoercionLogic()
diff --git
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
index b5354de02ad..a7030fae546 100644
---
a/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactory.java
@@ -27,6 +27,7 @@ import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.filter.Filter;
+import org.apache.druid.segment.AsyncCursorHolder;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
@@ -77,12 +78,74 @@ public class HashJoinSegmentCursorFactory implements
CursorFactory
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
{
final Filter combinedFilter = baseFilterAnd(spec.getFilter());
+ final Set<String> physicalColumns = computeBasePhysicalColumns(spec,
combinedFilter);
- // for physical column tracking, we start by copying base spec physical
columns
+ if (clauses.isEmpty()) {
+ // if there are no clauses, we can just use the base cursor directly if
we apply the combined filter
+ return baseCursorFactory.makeCursorHolder(noClausesBaseSpec(spec,
combinedFilter, physicalColumns));
+ }
+
+ // else we need to wipe out the grouping, aggregations, and ordering
+ final Closer joinablesCloser = Closer.create();
+ final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter,
physicalColumns);
+ final CursorHolder baseCursorHolder = joinablesCloser.register(
+ baseCursorFactory.makeCursorHolder(plan.baseBuildSpec)
+ );
+ return joinCursorHolder(plan, joinablesCloser, baseCursorHolder);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ final Filter combinedFilter = baseFilterAnd(spec.getFilter());
+ final Set<String> physicalColumns = computeBasePhysicalColumns(spec,
combinedFilter);
+
+ if (clauses.isEmpty()) {
+ return baseCursorFactory.makeCursorHolderAsync(noClausesBaseSpec(spec,
combinedFilter, physicalColumns));
+ }
+
+ final Closer joinablesCloser = Closer.create();
+ // Join filter analysis + base-spec computation are CPU-only; do them
synchronously to learn the base spec.
+ final JoinCursorPlan plan = planJoinCursor(spec, combinedFilter,
physicalColumns);
+
+ // Build the left/base holder asynchronously (a partial base segment
downloads its required columns here); the
+ // join's build-side joinables are already-resident in-memory tables, so
the base is the only async piece. Once it's
+ // ready, wrap it with the join cursors. Closing the returned holder
before it's ready cancels the base load.
+ final AsyncCursorHolder baseAsync =
baseCursorFactory.makeCursorHolderAsync(plan.baseBuildSpec);
+ final AsyncCursorHolder asyncHolder = new
AsyncCursorHolder(baseAsync::close);
+ baseAsync.addReadyCallback(() -> {
+ final CursorHolder joinHolder;
+ try {
+ // release() transfers ownership of the base holder to us (and
surfaces a base-load failure as its cause); the
+ // join holder now owns closing it via joinablesCloser. The wrap below
can't throw, so the catch only fires on
+ // a base failure or a cancel race (baseAsync already closed), in both
cases joinablesCloser is still empty.
+ final CursorHolder baseHolder = baseAsync.release();
+ joinablesCloser.register(baseHolder);
+ joinHolder = joinCursorHolder(plan, joinablesCloser, baseHolder);
+ }
+ catch (Throwable t) {
+ asyncHolder.setException(t);
+ return;
+ }
+ if (!asyncHolder.set(joinHolder)) {
+ // awaiter closed the wrapper while we were producing the holder;
close it so the base holder doesn't leak
+ joinHolder.close();
+ }
+ });
+ return asyncHolder;
+ }
+
+ /**
+ * Physical columns to pass to the base cursor: a copy of the spec's
physical columns plus any columns required by
+ * the combined filter (null when the spec didn't declare physical columns,
meaning "all"). Shared by the no-clauses
+ * and join paths.
+ */
+ @Nullable
+ private static Set<String> computeBasePhysicalColumns(CursorBuildSpec spec,
@Nullable Filter combinedFilter)
+ {
final Set<String> physicalColumns = spec.getPhysicalColumns() != null
? new
HashSet<>(spec.getPhysicalColumns())
: null;
-
if (physicalColumns != null && combinedFilter != null) {
for (String column : combinedFilter.getRequiredColumns()) {
if (!spec.getVirtualColumns().exists(column)) {
@@ -90,129 +153,135 @@ public class HashJoinSegmentCursorFactory implements
CursorFactory
}
}
}
+ return physicalColumns;
+ }
- if (clauses.isEmpty()) {
- // if there are no clauses, we can just use the base cursor directly if
we apply the combined filter
- final CursorBuildSpec newSpec = CursorBuildSpec.builder(spec)
- .setFilter(combinedFilter)
-
.setPhysicalColumns(physicalColumns)
- .build();
- return baseCursorFactory.makeCursorHolder(newSpec);
- }
-
- // else we need to wipe out the grouping, aggregations, and ordering
-
- return new CursorHolder()
- {
- final Closer joinablesCloser = Closer.create();
-
- /**
- * Typically the same as {@link
HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when
- * an unnest datasource is layered on top of a join datasource.
- */
- final JoinFilterPreAnalysis actualPreAnalysis;
-
- /**
- * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link
#actualPreAnalysis} and
- * {@link HashJoinSegmentCursorFactory#baseFilter}.
- */
- final JoinFilterSplit joinFilterSplit;
-
- /**
- * Cursor holder for {@link
HashJoinSegmentCursorFactory#baseCursorFactory}.
- */
- final CursorHolder baseCursorHolder;
-
- {
- // Filter pre-analysis key implied by the call to "makeCursorHolder".
We need to sanity-check that it matches
- // the actual pre-analysis that was done. Note: we could now infer a
rewrite config from the "makeCursorHolder"
- // call (it requires access to the query context which we now have
access to since the move away from
- // CursorFactory) but this code hasn't been updated to sanity-check
it, so currently we are still skipping it
- // by re-using the one present in the cached key.
- final JoinFilterPreAnalysisKey keyIn =
- new JoinFilterPreAnalysisKey(
- joinFilterPreAnalysis.getKey().getRewriteConfig(),
- clauses,
- spec.getVirtualColumns(),
- combinedFilter
- );
-
- final JoinFilterPreAnalysisKey keyCached =
joinFilterPreAnalysis.getKey();
- if (keyIn.equals(keyCached)) {
- // Common case: key used during filter pre-analysis (keyCached)
matches key implied by makeCursorHolder call
- // (keyIn).
- actualPreAnalysis = joinFilterPreAnalysis;
- } else {
- // Less common case: key differs. Re-analyze the filter. This case
can happen when an unnest datasource is
- // layered on top of a join datasource.
- actualPreAnalysis =
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn);
- }
+ /**
+ * Base cursor spec for the no-clauses case: the original spec with the
combined filter and base physical columns.
+ */
+ private static CursorBuildSpec noClausesBaseSpec(
+ CursorBuildSpec spec,
+ @Nullable Filter combinedFilter,
+ @Nullable Set<String> physicalColumns
+ )
+ {
+ return CursorBuildSpec.builder(spec)
+ .setFilter(combinedFilter)
+ .setPhysicalColumns(physicalColumns)
+ .build();
+ }
- joinFilterSplit = JoinFilterAnalyzer.splitFilter(
- actualPreAnalysis,
- baseFilter
+ /**
+ * Run the (CPU-only) join filter pre-analysis and compute the base-table
cursor build spec. Returns the analysis
+ * results needed at cursor-construction time ({@link JoinCursorPlan}).
{@code physicalColumns} is mutated in place
+ * (it accumulates base-filter, pre-join virtual-column, and
clause-condition columns, minus the join prefixes).
+ */
+ private JoinCursorPlan planJoinCursor(
+ CursorBuildSpec spec,
+ @Nullable Filter combinedFilter,
+ @Nullable Set<String> physicalColumns
+ )
+ {
+ // Filter pre-analysis key implied by the call to "makeCursorHolder". We
need to sanity-check that it matches
+ // the actual pre-analysis that was done. Note: we could now infer a
rewrite config from the "makeCursorHolder"
+ // call (it requires access to the query context which we now have access
to) but this code hasn't been updated to
+ // sanity-check it, so currently we are still skipping it by re-using the
one present in the cached key.
+ final JoinFilterPreAnalysisKey keyIn =
+ new JoinFilterPreAnalysisKey(
+ joinFilterPreAnalysis.getKey().getRewriteConfig(),
+ clauses,
+ spec.getVirtualColumns(),
+ combinedFilter
);
- // start with a full scan clipped to interval
- final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder =
- CursorBuildSpec.builder()
- .setInterval(spec.getInterval())
- .setQueryContext(spec.getQueryContext())
- .setQueryMetrics(spec.getQueryMetrics());
-
- // retain time ordering if preferred
- Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering());
- if (timeOrder == Order.DESCENDING) {
-
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder());
- } else if (timeOrder == Order.ASCENDING) {
-
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder());
- }
+ final JoinFilterPreAnalysisKey keyCached = joinFilterPreAnalysis.getKey();
+ final JoinFilterPreAnalysis actualPreAnalysis;
+ if (keyIn.equals(keyCached)) {
+ // Common case: key used during filter pre-analysis (keyCached) matches
key implied by makeCursorHolder call
+ // (keyIn).
+ actualPreAnalysis = joinFilterPreAnalysis;
+ } else {
+ // Less common case: key differs. Re-analyze the filter. This case can
happen when an unnest datasource is
+ // layered on top of a join datasource.
+ actualPreAnalysis =
JoinFilterAnalyzer.computeJoinFilterPreAnalysis(keyIn);
+ }
- // add pushdown filters if present
- if (joinFilterSplit.getBaseTableFilter().isPresent()) {
-
cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get());
- }
- final VirtualColumns preJoinVirtualColumns =
VirtualColumns.fromIterable(
- Iterables.concat(
- Sets.difference(
-
ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()),
- joinFilterPreAnalysis.getPostJoinVirtualColumns()
- ),
- joinFilterSplit.getPushDownVirtualColumns()
- )
- );
- cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns);
-
- // add all base table physical columns if they were originally set
- if (physicalColumns != null) {
- if (joinFilterSplit.getBaseTableFilter().isPresent()) {
- for (String column :
joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) {
- if (!spec.getVirtualColumns().exists(column) &&
!preJoinVirtualColumns.exists(column)) {
- physicalColumns.add(column);
- }
- }
- }
- for (VirtualColumn virtualColumn :
preJoinVirtualColumns.getVirtualColumns()) {
- for (String column : virtualColumn.requiredColumns()) {
- if (!spec.getVirtualColumns().exists(column) &&
!preJoinVirtualColumns.exists(column)) {
- physicalColumns.add(column);
- }
- }
- }
- final Set<String> prefixes = new HashSet<>();
- for (JoinableClause clause : clauses) {
- prefixes.add(clause.getPrefix());
- physicalColumns.addAll(clause.getCondition().getRequiredColumns());
+ final JoinFilterSplit joinFilterSplit =
JoinFilterAnalyzer.splitFilter(actualPreAnalysis, baseFilter);
+
+ // start with a full scan clipped to interval
+ final CursorBuildSpec.CursorBuildSpecBuilder cursorBuildSpecBuilder =
+ CursorBuildSpec.builder()
+ .setInterval(spec.getInterval())
+ .setQueryContext(spec.getQueryContext())
+ .setQueryMetrics(spec.getQueryMetrics());
+
+ // retain time ordering if preferred
+ Order timeOrder = Cursors.getTimeOrdering(spec.getPreferredOrdering());
+ if (timeOrder == Order.DESCENDING) {
+
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.descendingTimeOrder());
+ } else if (timeOrder == Order.ASCENDING) {
+
cursorBuildSpecBuilder.setPreferredOrdering(Cursors.ascendingTimeOrder());
+ }
+
+ // add pushdown filters if present
+ if (joinFilterSplit.getBaseTableFilter().isPresent()) {
+
cursorBuildSpecBuilder.setFilter(joinFilterSplit.getBaseTableFilter().get());
+ }
+ final VirtualColumns preJoinVirtualColumns = VirtualColumns.fromIterable(
+ Iterables.concat(
+ Sets.difference(
+
ImmutableSet.copyOf(spec.getVirtualColumns().getVirtualColumns()),
+ joinFilterPreAnalysis.getPostJoinVirtualColumns()
+ ),
+ joinFilterSplit.getPushDownVirtualColumns()
+ )
+ );
+ cursorBuildSpecBuilder.setVirtualColumns(preJoinVirtualColumns);
+
+ // add all base table physical columns if they were originally set
+ if (physicalColumns != null) {
+ if (joinFilterSplit.getBaseTableFilter().isPresent()) {
+ for (String column :
joinFilterSplit.getBaseTableFilter().get().getRequiredColumns()) {
+ if (!spec.getVirtualColumns().exists(column) &&
!preJoinVirtualColumns.exists(column)) {
+ physicalColumns.add(column);
}
- for (String prefix : prefixes) {
- physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x,
prefix));
+ }
+ }
+ for (VirtualColumn virtualColumn :
preJoinVirtualColumns.getVirtualColumns()) {
+ for (String column : virtualColumn.requiredColumns()) {
+ if (!spec.getVirtualColumns().exists(column) &&
!preJoinVirtualColumns.exists(column)) {
+ physicalColumns.add(column);
}
- cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns);
}
-
- baseCursorHolder =
joinablesCloser.register(baseCursorFactory.makeCursorHolder(cursorBuildSpecBuilder.build()));
}
+ final Set<String> prefixes = new HashSet<>();
+ for (JoinableClause clause : clauses) {
+ prefixes.add(clause.getPrefix());
+ physicalColumns.addAll(clause.getCondition().getRequiredColumns());
+ }
+ for (String prefix : prefixes) {
+ physicalColumns.removeIf(x -> JoinPrefixUtils.isPrefixedBy(x, prefix));
+ }
+ cursorBuildSpecBuilder.setPhysicalColumns(physicalColumns);
+ }
+
+ return new JoinCursorPlan(actualPreAnalysis, joinFilterSplit,
cursorBuildSpecBuilder.build());
+ }
+ /**
+ * Build the join {@link CursorHolder} on top of a base-table holder. {@code
baseCursorHolder} must already be
+ * registered with {@code joinablesCloser} (the sync path registers it when
built; the async path registers the
+ * released holder); closing the returned holder closes {@code
joinablesCloser} (the base holder + per-cursor join
+ * matchers created in {@link CursorHolder#asCursor}).
+ */
+ private CursorHolder joinCursorHolder(
+ JoinCursorPlan plan,
+ Closer joinablesCloser,
+ CursorHolder baseCursorHolder
+ )
+ {
+ return new CursorHolder()
+ {
@Override
public Cursor asCursor()
{
@@ -230,8 +299,8 @@ public class HashJoinSegmentCursorFactory implements
CursorFactory
return PostJoinCursor.wrap(
retVal,
-
VirtualColumns.fromIterable(actualPreAnalysis.getPostJoinVirtualColumns()),
- joinFilterSplit.getJoinTableFilter().orElse(null)
+
VirtualColumns.fromIterable(plan.actualPreAnalysis.getPostJoinVirtualColumns()),
+ plan.joinFilterSplit.getJoinTableFilter().orElse(null)
);
}
@@ -320,4 +389,39 @@ public class HashJoinSegmentCursorFactory implements
CursorFactory
return limit == baseOrdering.size() ? baseOrdering :
baseOrdering.subList(0, limit);
}
+
+ /**
+ * Outputs of {@link #planJoinCursor}: the (possibly re-computed) join
filter pre-analysis and split needed when the
+ * join cursor is actually constructed, plus the base-table cursor build
spec used to open the left-side holder.
+ */
+ private static final class JoinCursorPlan
+ {
+ /**
+ * Typically the same as {@link
HashJoinSegmentCursorFactory#joinFilterPreAnalysis}, but may differ when an
unnest
+ * datasource is layered on top of a join datasource.
+ */
+ private final JoinFilterPreAnalysis actualPreAnalysis;
+
+ /**
+ * Result of {@link JoinFilterAnalyzer#splitFilter} on {@link
#actualPreAnalysis} and
+ * {@link HashJoinSegmentCursorFactory#baseFilter}.
+ */
+ private final JoinFilterSplit joinFilterSplit;
+
+ /**
+ * Build spec for the left-side {@link
HashJoinSegmentCursorFactory#baseCursorFactory} holder.
+ */
+ private final CursorBuildSpec baseBuildSpec;
+
+ private JoinCursorPlan(
+ JoinFilterPreAnalysis actualPreAnalysis,
+ JoinFilterSplit joinFilterSplit,
+ CursorBuildSpec baseBuildSpec
+ )
+ {
+ this.actualPreAnalysis = actualPreAnalysis;
+ this.joinFilterSplit = joinFilterSplit;
+ this.baseBuildSpec = baseBuildSpec;
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
index 94654f58c85..546bd55f037 100644
---
a/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
+++
b/processing/src/main/java/org/apache/druid/segment/loading/TombstoneSegmentizerFactory.java
@@ -27,6 +27,7 @@ import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
import org.apache.druid.segment.NoopQueryableIndex;
import org.apache.druid.segment.QueryableIndex;
+import org.apache.druid.segment.ResidentCursorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.segment.column.ColumnCapabilities;
@@ -88,7 +89,7 @@ public class TombstoneSegmentizerFactory implements
SegmentizerFactory
public <T> T as(@Nonnull Class<T> clazz)
{
if (CursorFactory.class.equals(clazz)) {
- return (T) new CursorFactory()
+ return (T) new ResidentCursorFactory()
{
@Override
public CursorHolder makeCursorHolder(CursorBuildSpec spec)
diff --git
a/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
new file mode 100644
index 00000000000..e9e9305e6f3
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/segment/DeferredCursorFactory.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment;
+
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Test {@link CursorFactory} that produces its async cursor holder on demand:
{@link #makeCursorHolderAsync} hands back
+ * an unfinished {@link AsyncCursorHolder}, and {@link #complete()} fills it
(by building the delegate's holder for the
+ * last requested spec) so a test can drive the await transition
deterministically. {@link #canceled} flips when the
+ * returned holder is closed before it is completed. Used to exercise the
{@code makeCursorHolderAsync} override of
+ * wrapping cursor factories (e.g. unnest, join) against a base that loads
asynchronously.
+ */
+public class DeferredCursorFactory implements CursorFactory
+{
+ private final CursorFactory delegate;
+ public final AtomicBoolean canceled = new AtomicBoolean(false);
+ private AsyncCursorHolder pending;
+ private CursorBuildSpec pendingSpec;
+
+ public DeferredCursorFactory(CursorFactory delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public CursorHolder makeCursorHolder(CursorBuildSpec spec)
+ {
+ return delegate.makeCursorHolder(spec);
+ }
+
+ @Override
+ public AsyncCursorHolder makeCursorHolderAsync(CursorBuildSpec spec)
+ {
+ pendingSpec = spec;
+ pending = new AsyncCursorHolder(() -> canceled.set(true));
+ return pending;
+ }
+
+ /**
+ * Completes the most recent {@link #makeCursorHolderAsync} holder by
building the delegate's holder for the spec it
+ * was called with.
+ */
+ public void complete()
+ {
+ pending.set(delegate.makeCursorHolder(pendingSpec));
+ }
+
+ @Override
+ public RowSignature getRowSignature()
+ {
+ return delegate.getRowSignature();
+ }
+
+ @Override
+ public ColumnCapabilities getColumnCapabilities(String column)
+ {
+ return delegate.getColumnCapabilities(column);
+ }
+}
diff --git
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
index 8ede1ac203c..11a24346939 100644
---
a/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/UnnestCursorFactoryTest.java
@@ -256,6 +256,62 @@ public class UnnestCursorFactoryTest extends
InitializedNullHandlingTest
}
}
+ @Test
+ public void
test_unnest_factory_async_awaits_base_then_produces_unnest_cursor()
+ {
+ // base produces its holder asynchronously (mimics a partial segment
downloading its columns). The unnest async
+ // holder must stay not-ready until the base completes, then yield the
same unnested rows as the sync path.
+ final DeferredCursorFactory base = new
DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY);
+ final UnnestCursorFactory unnest = new UnnestCursorFactory(
+ base,
+ new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" +
INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()),
+ null
+ );
+
+ final AsyncCursorHolder asyncHolder =
unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ try {
+ Assert.assertFalse("unnest holder must wait for the base to complete",
asyncHolder.isReady());
+
+ base.complete();
+ Assert.assertTrue("unnest holder becomes ready once the base completes",
asyncHolder.isReady());
+
+ try (final CursorHolder cursorHolder = asyncHolder.release()) {
+ final Cursor cursor = cursorHolder.asCursor();
+ final DimensionSelector dimSelector =
+
cursor.getColumnSelectorFactory().makeDimensionSelector(DefaultDimensionSpec.of(OUTPUT_COLUMN_NAME));
+ final List<Object> rows = new ArrayList<>();
+ while (!cursor.isDone()) {
+ rows.add(dimSelector.getObject());
+ cursor.advance();
+ }
+ Assert.assertEquals(
+ Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "10", "11",
"12", "13", "14", "15", "8", "9"),
+ rows
+ );
+ }
+ }
+ finally {
+ // no-op after release(); cancels the base load if an assertion above
bailed before release()
+ asyncHolder.close();
+ }
+ }
+
+ @Test
+ public void test_unnest_factory_async_close_before_ready_cancels_base()
+ {
+ final DeferredCursorFactory base = new
DeferredCursorFactory(INCREMENTAL_INDEX_CURSOR_FACTORY);
+ final UnnestCursorFactory unnest = new UnnestCursorFactory(
+ base,
+ new ExpressionVirtualColumn(OUTPUT_COLUMN_NAME, "\"" +
INPUT_COLUMN_NAME + "\"", null, ExprMacroTable.nil()),
+ null
+ );
+
+ final AsyncCursorHolder asyncHolder =
unnest.makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ Assert.assertFalse(asyncHolder.isReady());
+ asyncHolder.close();
+ Assert.assertTrue("closing the unnest holder before it's ready cancels the
base load", base.canceled.get());
+ }
+
@Test
public void test_unnest_factory_basic_array_column()
{
diff --git
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
index e1b4ffdf541..2c803dc40e7 100644
---
a/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
+++
b/processing/src/test/java/org/apache/druid/segment/join/HashJoinSegmentCursorFactoryTest.java
@@ -30,8 +30,12 @@ import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.OrDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
+import org.apache.druid.segment.AsyncCursorHolder;
+import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.DeferredCursorFactory;
import org.apache.druid.segment.ReferenceCountedSegmentProvider;
import org.apache.druid.segment.TopNOptimizationInspector;
import org.apache.druid.segment.VirtualColumns;
@@ -215,6 +219,77 @@ public class HashJoinSegmentCursorFactoryTest extends
BaseHashJoinSegmentCursorF
);
}
+ @Test
+ public void test_makeCursorAsync_factToCountryLeft_awaitsBaseThenJoins()
+ {
+ final List<JoinableClause> joinableClauses =
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
+ final JoinFilterPreAnalysis joinFilterPreAnalysis =
+ makeDefaultConfigPreAnalysis(null, joinableClauses,
VirtualColumns.EMPTY);
+
+ // joined row count from the synchronous path, to compare the async result
against without re-listing every row
+ final int expectedRows;
+ try (CursorHolder syncHolder = new HashJoinSegmentCursorFactory(
+ factSegment.as(CursorFactory.class),
+ null,
+ joinableClauses,
+ joinFilterPreAnalysis
+ ).makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
+ expectedRows = countRows(syncHolder);
+ }
+ Assert.assertTrue(expectedRows > 0);
+
+ // async path: the left/base holder loads on demand (mimics a partial base
segment); build-side joinable is resident
+ final DeferredCursorFactory base = new
DeferredCursorFactory(factSegment.as(CursorFactory.class));
+ final AsyncCursorHolder asyncHolder = new HashJoinSegmentCursorFactory(
+ base,
+ null,
+ joinableClauses,
+ joinFilterPreAnalysis
+ ).makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+ try {
+ Assert.assertFalse("join holder must wait for the base to complete",
asyncHolder.isReady());
+ base.complete();
+ Assert.assertTrue("join holder becomes ready once the base completes",
asyncHolder.isReady());
+ try (CursorHolder holder = asyncHolder.release()) {
+ Assert.assertEquals(expectedRows, countRows(holder));
+ }
+ }
+ finally {
+ asyncHolder.close();
+ }
+ }
+
+ @Test
+ public void test_makeCursorAsync_factToCountry_closeBeforeReadyCancelsBase()
+ {
+ final List<JoinableClause> joinableClauses =
ImmutableList.of(factToCountryOnIsoCode(JoinType.LEFT));
+ final JoinFilterPreAnalysis joinFilterPreAnalysis =
+ makeDefaultConfigPreAnalysis(null, joinableClauses,
VirtualColumns.EMPTY);
+
+ final DeferredCursorFactory base = new
DeferredCursorFactory(factSegment.as(CursorFactory.class));
+ final AsyncCursorHolder asyncHolder = new HashJoinSegmentCursorFactory(
+ base,
+ null,
+ joinableClauses,
+ joinFilterPreAnalysis
+ ).makeCursorHolderAsync(CursorBuildSpec.FULL_SCAN);
+
+ Assert.assertFalse(asyncHolder.isReady());
+ asyncHolder.close();
+ Assert.assertTrue("closing the join holder before it's ready cancels the
base load", base.canceled.get());
+ }
+
+ private static int countRows(CursorHolder holder)
+ {
+ final Cursor cursor = holder.asCursor();
+ int count = 0;
+ while (!cursor.isDone()) {
+ cursor.advance();
+ count++;
+ }
+ return count;
+ }
+
@Test
public void test_makeCursor_factToCountryLeftUsingLookup()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]