This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7203786e06 Reduce array copying for non-equi join condition evaluation 
(#16152)
7203786e06 is described below

commit 7203786e062f908a66263bba0cad5bca8480c569
Author: Song Fu <[email protected]>
AuthorDate: Thu Jun 19 10:02:25 2025 -0700

    Reduce array copying for non-equi join condition evaluation (#16152)
---
 .../query/runtime/operator/BaseJoinOperator.java   | 109 ++++++++++++++++++++-
 .../query/runtime/operator/HashJoinOperator.java   |  13 +--
 .../runtime/operator/NonEquiJoinOperator.java      |   7 +-
 .../runtime/operator/operands/FilterOperand.java   |  16 +--
 .../runtime/operator/operands/FunctionOperand.java |   2 +-
 .../runtime/operator/operands/LiteralOperand.java  |   3 +-
 .../operator/operands/ReferenceOperand.java        |   5 +-
 .../operator/operands/TransformOperand.java        |   8 +-
 8 files changed, 139 insertions(+), 24 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
index 011c58c077..52d46fa986 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseJoinOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator;
 
+import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -271,7 +272,11 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
     return resultRow;
   }
 
-  protected boolean matchNonEquiConditions(Object[] row) {
+  protected List<Object> joinRowView(@Nullable Object[] leftRow, @Nullable 
Object[] rightRow) {
+    return JoinedRowView.of(leftRow, rightRow, _resultColumnSize, 
_leftColumnSize);
+  }
+
+  protected boolean matchNonEquiConditions(List<Object> row) {
     if (_nonEquiEvaluators.isEmpty()) {
       return true;
     }
@@ -376,4 +381,106 @@ public abstract class BaseJoinOperator extends 
MultiStageOperator {
       return _type;
     }
   }
+
+  /**
+   * This util class is a view over the left and right row joined together
+   * currently this is used for filtering and input of projection. So if the 
joined
+   * tuple doesn't pass the predicate, the join result is not materialized 
into Object[].
+   *
+   * It is debatable whether we always want to use this instead of copying the 
tuple
+   */
+  private abstract static class JoinedRowView extends AbstractList<Object> 
implements List<Object> {
+    protected final int _leftSize;
+    protected final int _size;
+
+    protected JoinedRowView(int resultColumnSize, int leftSize) {
+      _leftSize = leftSize;
+      _size = resultColumnSize;
+    }
+
+    private static final class BothNotNullView extends JoinedRowView {
+      private final Object[] _leftRow;
+      private final Object[] _rightRow;
+
+      private BothNotNullView(Object[] leftRow, Object[] rightRow, int 
resultColumnSize, int leftSize) {
+        super(resultColumnSize, leftSize);
+        _leftRow = leftRow;
+        _rightRow = rightRow;
+      }
+
+      @Override
+      public Object get(int i) {
+        return i < _leftSize ? _leftRow[i] : _rightRow[i - _leftSize];
+      }
+
+      @Override
+      public Object[] toArray() {
+        Object[] resultRow = new Object[_size];
+        System.arraycopy(_leftRow, 0, resultRow, 0, _leftSize);
+        System.arraycopy(_rightRow, 0, resultRow, _leftSize, _rightRow.length);
+        return resultRow;
+      }
+    }
+
+    private static final class RightNotNullView extends JoinedRowView {
+      private final Object[] _rightRow;
+
+      public RightNotNullView(Object[] rightRow, int resultColumnSize, int 
leftSize) {
+        super(resultColumnSize, leftSize);
+        _rightRow = rightRow;
+      }
+
+      @Override
+      public Object get(int i) {
+        return i < _leftSize ? null : _rightRow[i - _leftSize];
+      }
+
+      @Override
+      public Object[] toArray() {
+        Object[] resultRow = new Object[_size];
+        System.arraycopy(_rightRow, 0, resultRow, _leftSize, _rightRow.length);
+        return resultRow;
+      }
+    }
+
+    private static final class LeftNotNullView extends JoinedRowView {
+      private final Object[] _leftRow;
+
+      public LeftNotNullView(Object[] leftRow, int resultColumnSize, int 
leftSize) {
+        super(resultColumnSize, leftSize);
+        _leftRow = leftRow;
+      }
+
+      @Override
+      public Object get(int i) {
+        return i < _leftSize ? _leftRow[i] : null;
+      }
+
+      @Override
+      public Object[] toArray() {
+        Object[] resultRow = new Object[_size];
+        System.arraycopy(_leftRow, 0, resultRow, 0, _leftSize);
+        return resultRow;
+      }
+    }
+
+    public static JoinedRowView of(@Nullable Object[] leftRow, @Nullable 
Object[] rightRow, int resultColumnSize,
+        int leftSize) {
+      if (leftRow == null && rightRow == null) {
+        throw new IllegalStateException("both left and right side of join are 
null");
+      }
+      if (leftRow == null) {
+        return new RightNotNullView(rightRow, resultColumnSize, leftSize);
+      }
+      if (rightRow == null) {
+        return new LeftNotNullView(leftRow, resultColumnSize, leftSize);
+      }
+      return new BothNotNullView(leftRow, rightRow, resultColumnSize, 
leftSize);
+    }
+
+    @Override
+    public int size() {
+      return _size;
+    }
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index 9ad75c420f..ebacafa718 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -143,12 +143,13 @@ public class HashJoinOperator extends BaseJoinOperator {
       if (rightRow == null) {
         handleUnmatchedLeftRow(leftRow, rows);
       } else {
-        Object[] resultRow = joinRow(leftRow, rightRow);
-        if (matchNonEquiConditions(resultRow)) {
+        List<Object> resultRowView = joinRowView(leftRow, rightRow);
+        if (matchNonEquiConditions(resultRowView)) {
           if (isMaxRowsLimitReached(rows.size())) {
             break;
           }
-          rows.add(resultRow);
+          // defer copying of the content until row matches
+          rows.add(resultRowView.toArray());
           if (_matchedRightRows != null) {
             _matchedRightRows.put(key, BIT_SET_PLACEHOLDER);
           }
@@ -176,13 +177,13 @@ public class HashJoinOperator extends BaseJoinOperator {
         boolean hasMatchForLeftRow = false;
         int numRightRows = rightRows.size();
         for (int i = 0; i < numRightRows; i++) {
-          Object[] resultRow = joinRow(leftRow, rightRows.get(i));
-          if (matchNonEquiConditions(resultRow)) {
+          List<Object> resultRowView = joinRowView(leftRow, rightRows.get(i));
+          if (matchNonEquiConditions(resultRowView)) {
             if (isMaxRowsLimitReached(rows.size())) {
               maxRowsLimitReached = true;
               break;
             }
-            rows.add(resultRow);
+            rows.add(resultRowView.toArray());
             hasMatchForLeftRow = true;
             if (_matchedRightRows != null) {
               _matchedRightRows.computeIfAbsent(key, k -> new 
BitSet(numRightRows)).set(i);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 625b92fa11..638d4d9880 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -84,14 +84,13 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
       boolean maxRowsLimitReached = false;
       for (int i = 0; i < numRightRows; i++) {
         Object[] rightRow = _rightTable.get(i);
-        // TODO: Optimize this to avoid unnecessary object copy.
-        Object[] resultRow = joinRow(leftRow, rightRow);
-        if (matchNonEquiConditions(resultRow)) {
+        List<Object> joinRowView = joinRowView(leftRow, rightRow);
+        if (matchNonEquiConditions(joinRowView)) {
           if (isMaxRowsLimitReached(rows.size())) {
             maxRowsLimitReached = true;
             break;
           }
-          rows.add(resultRow);
+          rows.add(joinRowView.toArray());
           hasMatchForLeftRow = true;
           if (_matchedRightRows != null) {
             _matchedRightRows.set(i);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
index 46fc0845de..2d4b438e75 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FilterOperand.java
@@ -41,7 +41,7 @@ public abstract class FilterOperand implements 
TransformOperand {
 
   @Nullable
   @Override
-  public abstract Integer apply(Object[] row);
+  public abstract Integer apply(List<Object> row);
 
   public static class And extends FilterOperand {
     List<TransformOperand> _childOperands;
@@ -55,7 +55,7 @@ public abstract class FilterOperand implements 
TransformOperand {
 
     @Nullable
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       boolean hasNull = false;
       for (TransformOperand child : _childOperands) {
         Object result = child.apply(row);
@@ -81,7 +81,7 @@ public abstract class FilterOperand implements 
TransformOperand {
 
     @Nullable
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       boolean hasNull = false;
       for (TransformOperand child : _childOperands) {
         Object result = child.apply(row);
@@ -104,7 +104,7 @@ public abstract class FilterOperand implements 
TransformOperand {
 
     @Nullable
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       Object result = _childOperand.apply(row);
       return result != null ? 1 - (int) result : null;
     }
@@ -124,7 +124,7 @@ public abstract class FilterOperand implements 
TransformOperand {
 
     @Nullable
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       Object firstResult = _childOperands.get(0).apply(row);
       if (firstResult == null) {
         return null;
@@ -150,7 +150,7 @@ public abstract class FilterOperand implements 
TransformOperand {
     }
 
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       Object result = _childOperand.apply(row);
       return result != null ? (Integer) result : 0;
     }
@@ -164,7 +164,7 @@ public abstract class FilterOperand implements 
TransformOperand {
     }
 
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       Object result = _childOperand.apply(row);
       return result != null ? 1 - (int) result : 1;
     }
@@ -216,7 +216,7 @@ public abstract class FilterOperand implements 
TransformOperand {
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Nullable
     @Override
-    public Integer apply(Object[] row) {
+    public Integer apply(List<Object> row) {
       Comparable v1 = (Comparable) _lhs.apply(row);
       if (v1 == null) {
         return null;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
index 99feb415b3..5e58c0a3a6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/FunctionOperand.java
@@ -100,7 +100,7 @@ public class FunctionOperand implements TransformOperand {
 
   @Nullable
   @Override
-  public Object apply(Object[] row) {
+  public Object apply(List<Object> row) {
     for (int i = 0; i < _operands.size(); i++) {
       TransformOperand operand = _operands.get(i);
       Object value = operand.apply(row);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/LiteralOperand.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/LiteralOperand.java
index 16cc4f64a0..00a7f2470b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/LiteralOperand.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/LiteralOperand.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator.operands;
 
+import java.util.List;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.query.planner.logical.RexExpression;
 
@@ -37,7 +38,7 @@ public class LiteralOperand implements TransformOperand {
   }
 
   @Override
-  public Object apply(Object[] row) {
+  public Object apply(List<Object> row) {
     return _value;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/ReferenceOperand.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/ReferenceOperand.java
index e0e5ba047b..a183af92cf 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/ReferenceOperand.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/ReferenceOperand.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.runtime.operator.operands;
 
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -39,7 +40,7 @@ public class ReferenceOperand implements TransformOperand {
 
   @Nullable
   @Override
-  public Object apply(Object[] row) {
-    return row[_index];
+  public Object apply(List<Object> row) {
+    return row.get(_index);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
index e835e5b07a..76697116bd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/operands/TransformOperand.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.query.runtime.operator.operands;
 
+import java.util.Arrays;
+import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 
@@ -26,6 +28,10 @@ public interface TransformOperand {
 
   ColumnDataType getResultType();
 
+  default Object apply(Object[] row) {
+    return apply(Arrays.asList(row));
+  }
+
   @Nullable
-  Object apply(Object[] row);
+  Object apply(List<Object> row);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to