This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bf60b0b892 Fix null join key matching in multi-stage hash joins 
(#16038)
bf60b0b892 is described below

commit bf60b0b892b3555795cedb5897b905a748199403
Author: Soupam <[email protected]>
AuthorDate: Sat Jun 21 01:45:16 2025 +0530

    Fix null join key matching in multi-stage hash joins (#16038)
    
    * Fix  JOIN to correctly handle null join keys
    
    Fix  JOIN to correctly handle null join keys
    
    Addressed the code comments
    
    * Addressed the code comments and refactored the code
    
    * Linter fix and added test for h2 db
    
    * Delegated the test on H2
    
    * Fixed the UT
    
    ---------
    
    Co-authored-by: Soupam <[email protected]>
---
 .../query/runtime/operator/HashJoinOperator.java   |  68 ++++-
 .../runtime/operator/join/ObjectLookupTable.java   |  10 +
 .../operator/join/PrimitiveLookupTable.java        |  37 +--
 .../runtime/operator/HashJoinOperatorTest.java     | 323 +++++++++++++++++++++
 .../src/test/resources/queries/LeftAntiJoins.json  |  75 +++++
 5 files changed, 479 insertions(+), 34 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ebacafa718..525c426ed6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -25,7 +25,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.calcite.rel.core.JoinRelType;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.query.planner.partitioning.KeySelector;
 import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
 import org.apache.pinot.query.planner.plannode.JoinNode;
@@ -59,6 +61,9 @@ public class HashJoinOperator extends BaseJoinOperator {
   // TODO: Optimize this
   @Nullable
   private Map<Object, BitSet> _matchedRightRows;
+  // Store null key rows separately for RIGHT and FULL JOINs
+  @Nullable
+  private List<Object[]> _nullKeyRightRows;
 
   public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator 
leftInput, DataSchema leftSchema,
       MultiStageOperator rightInput, JoinNode node) {
@@ -69,6 +74,8 @@ public class HashJoinOperator extends BaseJoinOperator {
     _rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
     _rightTable = createLookupTable(leftKeys, leftSchema);
     _matchedRightRows = needUnmatchedRightRows() ? new HashMap<>() : null;
+    // Initialize _nullKeyRightRows for both RIGHT and FULL JOINs
+    _nullKeyRightRows = needUnmatchedRightRows() ? new ArrayList<>() : null;
   }
 
   private static LookupTable createLookupTable(List<Integer> joinKeys, 
DataSchema schema) {
@@ -98,10 +105,41 @@ public class HashJoinOperator extends BaseJoinOperator {
   protected void addRowsToRightTable(List<Object[]> rows) {
     assert _rightTable != null : "Right table should not be null when adding 
rows";
     for (Object[] row : rows) {
-      _rightTable.addRow(_rightKeySelector.getKey(row), row);
+      Object key = _rightKeySelector.getKey(row);
+      // Skip rows with null join keys - they should not participate in 
equi-joins per SQL standard
+      if (isNullKey(key)) {
+        // For RIGHT and FULL JOIN, we need to preserve null key rows for the 
final output
+        if (_nullKeyRightRows != null) {
+          _nullKeyRightRows.add(row);
+        }
+        continue;
+      }
+      _rightTable.addRow(key, row);
+    }
+  }
+
+  /**
+   * Check if a join key contains null values. In SQL standard, null keys 
should not match in equi-joins.
+   **/
+  private boolean isNullKey(Object key) {
+    if (key == null) {
+      return true;
     }
+    if (key instanceof Key) {
+      Object[] components = ((Key) key).getValues();
+      for (Object comp : components) {
+        if (comp == null) {
+          return true;
+        }
+      }
+      return false;
+    }
+    // For single keys (non-composite), key == null is already checked above
+    return false;
   }
 
+
+
   @Override
   protected void finishBuildingRightTable() {
     assert _rightTable != null : "Right table should not be null when 
finishing building";
@@ -112,6 +150,7 @@ public class HashJoinOperator extends BaseJoinOperator {
   protected void onEosProduced() {
     _rightTable = null;
     _matchedRightRows = null;
+    _nullKeyRightRows = null;
   }
 
   @Override
@@ -132,6 +171,17 @@ public class HashJoinOperator extends BaseJoinOperator {
     }
   }
 
+  private boolean handleNullKey(Object key, Object[] leftRow, List<Object[]> 
rows) {
+    if (isNullKey(key)) {
+      // For INNER joins, don't add anything when key is null
+      if (_joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL) {
+        handleUnmatchedLeftRow(leftRow, rows);
+      }
+      return true;
+    }
+    return false;
+  }
+
   private List<Object[]> buildJoinedDataBlockUniqueKeys(MseBlock.Data 
leftBlock) {
     assert _rightTable != null : "Right table should not be null when building 
joined rows";
     List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
@@ -139,6 +189,10 @@ public class HashJoinOperator extends BaseJoinOperator {
 
     for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
+      // Skip rows with null join keys - they should not participate in 
equi-joins per SQL standard
+      if (handleNullKey(key, leftRow, rows)) {
+        continue;
+      }
       Object[] rightRow = (Object[]) _rightTable.lookup(key);
       if (rightRow == null) {
         handleUnmatchedLeftRow(leftRow, rows);
@@ -169,6 +223,10 @@ public class HashJoinOperator extends BaseJoinOperator {
 
     for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
+      // Skip rows with null join keys - they should not participate in 
equi-joins per SQL standard
+      if (handleNullKey(key, leftRow, rows)) {
+        continue;
+      }
       List<Object[]> rightRows = (List<Object[]>) _rightTable.lookup(key);
       if (rightRows == null) {
         handleUnmatchedLeftRow(leftRow, rows);
@@ -218,7 +276,6 @@ public class HashJoinOperator extends BaseJoinOperator {
 
     for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
-      // SEMI-JOIN only checks existence of the key
       if (_rightTable.containsKey(key)) {
         rows.add(leftRow);
       }
@@ -234,7 +291,6 @@ public class HashJoinOperator extends BaseJoinOperator {
 
     for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
-      // ANTI-JOIN only checks non-existence of the key
       if (!_rightTable.containsKey(key)) {
         rows.add(leftRow);
       }
@@ -272,6 +328,12 @@ public class HashJoinOperator extends BaseJoinOperator {
         }
       }
     }
+    // Add unmatched null key rows from right side for RIGHT and FULL JOIN
+    if (_nullKeyRightRows != null) {
+      for (Object[] nullKeyRow : _nullKeyRightRows) {
+        rows.add(joinRow(null, nullKeyRow));
+      }
+    }
     return rows;
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
index 02b00dfd3f..d94ac1cb84 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
@@ -33,6 +33,10 @@ public class ObjectLookupTable extends LookupTable {
 
   @Override
   public void addRow(@Nullable Object key, Object[] row) {
+    if (key == null) {
+      // Ignore null keys for SQL semantics
+      return;
+    }
     _lookupTable.compute(key, (k, v) -> computeNewValue(row, v));
   }
 
@@ -47,12 +51,18 @@ public class ObjectLookupTable extends LookupTable {
 
   @Override
   public boolean containsKey(@Nullable Object key) {
+    if (key == null) {
+      return false;  // Null keys are not contained per SQL semantics
+    }
     return _lookupTable.containsKey(key);
   }
 
   @Nullable
   @Override
   public Object lookup(@Nullable Object key) {
+    if (key == null) {
+      return null;  // Null keys always return null
+    }
     return _lookupTable.get(key);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
index af5c01e063..43e96ea9b7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.runtime.operator.join;
 
-import com.google.common.collect.Sets;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -26,12 +25,10 @@ import javax.annotation.Nullable;
 
 public abstract class PrimitiveLookupTable extends LookupTable {
 
-  private Object _valueForNullKey;
-
   @Override
   public void addRow(@Nullable Object key, Object[] row) {
     if (key == null) {
-      _valueForNullKey = computeNewValue(row, _valueForNullKey);
+      // Do nothing: SQL semantics ignore null keys
       return;
     }
     addRowNotNullKey(key, row);
@@ -40,9 +37,6 @@ public abstract class PrimitiveLookupTable extends 
LookupTable {
   @Override
   public void finish() {
     if (!_keysUnique) {
-      if (_valueForNullKey != null) {
-        _valueForNullKey = convertValueToList(_valueForNullKey);
-      }
       finishNotNullKey();
     }
   }
@@ -54,7 +48,8 @@ public abstract class PrimitiveLookupTable extends 
LookupTable {
   @Override
   public boolean containsKey(@Nullable Object key) {
     if (key == null) {
-      return _valueForNullKey != null;
+      // SQL semantics: null never matches
+      return false;
     }
     return containsNotNullKey(key);
   }
@@ -65,7 +60,8 @@ public abstract class PrimitiveLookupTable extends 
LookupTable {
   @Override
   public Object lookup(@Nullable Object key) {
     if (key == null) {
-      return _valueForNullKey;
+      // SQL semantics: null lookup returns null
+      return null;
     }
     return lookupNotNullKey(key);
   }
@@ -75,28 +71,7 @@ public abstract class PrimitiveLookupTable extends 
LookupTable {
   @SuppressWarnings("rawtypes")
   @Override
   public Set<Map.Entry<Object, Object>> entrySet() {
-    Set<Map.Entry<Object, Object>> notNullSet = notNullKeyEntrySet();
-    if (_valueForNullKey != null) {
-      Set<Map.Entry<Object, Object>> nullEntry = Set.of(new Map.Entry<>() {
-        @Override
-        public Object getKey() {
-          return null;
-        }
-
-        @Override
-        public Object getValue() {
-          return _valueForNullKey;
-        }
-
-        @Override
-        public Object setValue(Object value) {
-          throw new UnsupportedOperationException();
-        }
-      });
-      return Sets.union(notNullSet, nullEntry);
-    } else {
-      return notNullSet;
-    }
+    return notNullKeyEntrySet();
   }
 
   protected abstract Set<Map.Entry<Object, Object>> notNullKeyEntrySet();
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index fa55c8f230..4c101b1734 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -42,6 +42,7 @@ import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.openMocks;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
 
 
 public class HashJoinOperatorTest {
@@ -442,6 +443,328 @@ public class HashJoinOperatorTest {
         "Max rows in join should be reached");
   }
 
+  @Test
+  public void shouldHandleHashJoinKeyCollisionLeftJoinWithNulls() {
+    // Test LEFT join with both hash collision AND null values
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(1, "Aa")    // Hash collision string
+            .addRow(2, "BB")    // Hash collision string
+            .addRow(3, null)    // Null key
+            .addRow(4, "CC")    // Non-collision string
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(2, "Aa")    // Hash collision match
+            .addRow(2, "BB")    // Hash collision match
+            .addRow(3, null)    // Null key - should NOT match left null
+            .addRow(5, "DD")    // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.INT, ColumnDataType.STRING});
+
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.LEFT, 
List.of(1), List.of(1), List.of());
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    assertEquals(resultRows.size(), 4);
+    assertEquals(resultRows.get(0), new Object[]{1, "Aa", 2, "Aa"});     // 
Hash collision match
+    assertEquals(resultRows.get(1), new Object[]{2, "BB", 2, "BB"});     // 
Hash collision match
+    assertEquals(resultRows.get(2), new Object[]{3, null, null, null});  // 
Left null preserved, no match
+    assertEquals(resultRows.get(3), new Object[]{4, "CC", null, null});  // 
Left unmatched preserved
+  }
+
+  @Test
+  public void shouldHandleRightJoinWithNulls() {
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(1, "Aa")
+            .addRow(2, null)
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(2, "Aa")
+            .addRow(3, null)
+            .addRow(4, "BB")
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.INT, ColumnDataType.STRING});
+
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.RIGHT, 
List.of(1), List.of(1), List.of());
+
+    // First block: only non-null match
+    List<Object[]> resultRows1 = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+    assertEquals(1, resultRows1.size());
+    assertTrue(containsRow(resultRows1, new Object[]{1, "Aa", 2, "Aa"}));
+
+    // Second block: unmatched right rows
+    List<Object[]> resultRows2 = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+    assertEquals(2, resultRows2.size());
+    assertTrue(containsRow(resultRows2, new Object[]{null, null, 3, null}));
+    assertTrue(containsRow(resultRows2, new Object[]{null, null, 4, "BB"}));
+
+    // Third block should be EOS
+    assertTrue(operator.nextBlock().isSuccess());
+  }
+
+  @Test
+  public void shouldHandleFullJoinWithNulls() {
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(1, "Aa")
+            .addRow(2, null)
+            .addRow(4, "CC")
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(2, "Aa")
+            .addRow(2, null)
+            .addRow(3, "BB")
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.INT, ColumnDataType.STRING});
+
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.FULL, 
List.of(1), List.of(1), List.of());
+
+    // First block
+    List<Object[]> resultRows1 = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+    assertEquals(3, resultRows1.size());
+
+    assertTrue(containsRow(resultRows1, new Object[]{1, "Aa", 2, "Aa"}));   // 
Match
+    assertTrue(containsRow(resultRows1, new Object[]{2, null, null, null})); 
// Left null unmatched
+    assertTrue(containsRow(resultRows1, new Object[]{4, "CC", null, null})); 
// Left unmatched
+
+    // Second block
+    List<Object[]> resultRows2 = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+    assertEquals(2, resultRows2.size());
+
+    assertTrue(containsRow(resultRows2, new Object[]{null, null, 2, null})); 
// Right null unmatched
+    assertTrue(containsRow(resultRows2, new Object[]{null, null, 3, "BB"})); 
// Right unmatched
+  }
+
+  private boolean containsRow(List<Object[]> rows, Object[] expectedRow) {
+    for (Object[] row : rows) {
+      if (java.util.Arrays.equals(row, expectedRow)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+
+  @Test
+  public void shouldHandleSemiJoinWithNulls() {
+    // Test SEMI join - should not match null keys
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(1, "Aa")
+            .addRow(2, null)    // Null key
+            .addRow(4, "CC")    // No match in right
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(2, "Aa")    // Match for left row 1
+            .addRow(3, null)    // Null - should NOT match left null
+            .addRow(5, "BB")    // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING});
+
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.SEMI, 
List.of(1), List.of(1), List.of());
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    assertEquals(resultRows.size(), 1);
+    assertEquals(resultRows.get(0), new Object[]{1, "Aa"}); // Only non-null 
match
+  }
+
+  @Test
+  public void shouldHandleAntiJoinWithNulls() {
+    // Test ANTI join - null keys should be preserved (not matched)
+    _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(1, "Aa")    // Has match in right
+            .addRow(2, null)    // Null key - no match
+            .addRow(4, "CC")    // No match in right
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+            .addRow(2, "Aa")    // Match for left row 1
+            .addRow(3, null)    // Null - should NOT match left null
+            .addRow(5, "BB")    // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING});
+
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.ANTI, 
List.of(1), List.of(1), List.of());
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    assertEquals(resultRows.size(), 2);
+    assertEquals(resultRows.get(0), new Object[]{2, null}); // Left null 
preserved (no match)
+    assertEquals(resultRows.get(1), new Object[]{4, "CC"}); // Left unmatched 
preserved
+  }
+
+  @Test
+  public void shouldHandleCompositeKeyWithNullValues() {
+    // Test composite key join (multi-column) with null values
+    // This should expose the bug in isNullKey method where it checks for 
Object[] instead of Key
+
+    DataSchema compositeSchema = new DataSchema(
+            new String[]{"int_col", "string_col", "double_col"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE});
+
+    _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Normal row
+            .addRow(2, null, 2.0)      // Null in second key component
+            .addRow(3, "Cc", null)     // Null in third key component
+            .addRow(4, "Dd", 4.0)      // Normal row
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Match for first left row
+            .addRow(2, null, 2.0)      // Should NOT match left null (SQL 
standard)
+            .addRow(3, "Cc", null)     // Should NOT match left null (SQL 
standard)
+            .addRow(5, "Ee", 5.0)      // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "double_col1", "int_col2", 
"string_col2", "double_col2"},
+            new ColumnDataType[]{
+                    ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE,
+                    ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE
+            });
+
+    // Composite key join on columns 1 and 2 (string_col and double_col)
+    HashJoinOperator operator = getOperator(compositeSchema, resultSchema, 
JoinRelType.LEFT,
+            List.of(1, 2), List.of(1, 2), List.of(), PlanNode.NodeHint.EMPTY);
+
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Expected behavior per SQL standard:
+    // - Row 1: (1, "Aa", 1.0) should match (1, "Aa", 1.0)
+    // - Row 2: (2, null, 2.0) should NOT match (2, null, 2.0) -> left 
preserved with nulls
+    // - Row 3: (3, "Cc", null) should NOT match (3, "Cc", null) -> left 
preserved with nulls
+    // - Row 4: (4, "Dd", 4.0) has no match -> left preserved with nulls
+
+    assertEquals(resultRows.size(), 4);
+    assertEquals(resultRows.get(0), new Object[]{1, "Aa", 1.0, 1, "Aa", 1.0}); 
     // Match
+    assertEquals(resultRows.get(1), new Object[]{2, null, 2.0, null, null, 
null});  // Left null preserved
+    assertEquals(resultRows.get(2), new Object[]{3, "Cc", null, null, null, 
null}); // Left null preserved
+    assertEquals(resultRows.get(3), new Object[]{4, "Dd", 4.0, null, null, 
null});  // Left unmatched preserved
+  }
+
+  @Test
+  public void shouldHandleCompositeKeyInnerJoinWithNulls() {
+    // Test that composite keys with nulls are properly excluded from INNER 
join
+
+    DataSchema compositeSchema = new DataSchema(
+            new String[]{"int_col", "string_col", "double_col"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE});
+
+    _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Should match
+            .addRow(2, null, 2.0)      // Should be excluded (null key)
+            .addRow(3, "Cc", 3.0)      // No match in right
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Match
+            .addRow(2, null, 2.0)      // Should be excluded (null key)
+            .addRow(4, "Dd", 4.0)      // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "double_col1", "int_col2", 
"string_col2", "double_col2"},
+            new ColumnDataType[]{
+                    ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE,
+                    ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE
+            });
+
+    // Composite key join on columns 1 and 2 (string_col and double_col)
+    HashJoinOperator operator = getOperator(compositeSchema, resultSchema, 
JoinRelType.INNER,
+            List.of(1, 2), List.of(1, 2), List.of(), PlanNode.NodeHint.EMPTY);
+
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+    // Only the non-null key match should be returned
+    assertEquals(resultRows.size(), 1);
+    assertArrayEquals(resultRows.get(0), new Object[]{1, "Aa", 1.0, 1, "Aa", 
1.0});
+  }
+
+  @Test
+  public void shouldHandleCompositeKeySemiJoinWithNulls() {
+    // Test that SEMI join properly handles composite keys with nulls
+
+    DataSchema compositeSchema = new DataSchema(
+            new String[]{"int_col", "string_col", "double_col"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE});
+
+    _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Should match
+            .addRow(2, null, 2.0)      // Should be excluded (null key)
+            .addRow(3, "Cc", 3.0)      // No match in right
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Match
+            .addRow(2, null, 2.0)      // Should be excluded (null key)
+            .addRow(4, "Dd", 4.0)      // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "double_col1"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE});
+
+    // Composite key join on columns 1 and 2 (string_col and double_col)
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.SEMI,
+            List.of(1, 2), List.of(1, 2), List.of());
+
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Only left rows with non-null keys that have matches should be returned
+    assertEquals(resultRows.size(), 1);
+    assertEquals(resultRows.get(0), new Object[]{1, "Aa", 1.0});
+  }
+
+  @Test
+  public void shouldHandleCompositeKeyAntiJoinWithNulls() {
+    // Test that ANTI join properly handles composite keys with nulls
+    // Per SQL standard, rows with null keys should be included in ANTI join 
result
+
+    DataSchema compositeSchema = new DataSchema(
+            new String[]{"int_col", "string_col", "double_col"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE});
+
+    _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Has match in right
+            .addRow(2, null, 2.0)      // Null key - should be included
+            .addRow(3, "Cc", 3.0)      // No match in right
+            .buildWithEos();
+
+    _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+            .addRow(1, "Aa", 1.0)      // Match for left row 1
+            .addRow(2, null, 2.0)      // Null key - should not match left null
+            .addRow(4, "Dd", 4.0)      // No match in left
+            .buildWithEos();
+
+    DataSchema resultSchema = new DataSchema(
+            new String[]{"int_col1", "string_col1", "double_col1"},
+            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING, 
ColumnDataType.DOUBLE});
+
+    // Composite key join on columns 1 and 2 (string_col and double_col)
+    HashJoinOperator operator = getOperator(resultSchema, JoinRelType.ANTI,
+            List.of(1, 2), List.of(1, 2), List.of());
+
+    List<Object[]> resultRows = ((MseBlock.Data) 
operator.nextBlock()).asRowHeap().getRows();
+
+    // Left rows with null keys and unmatched non-null keys should be returned
+    assertEquals(resultRows.size(), 2);
+    assertTrue(containsRow(resultRows, new Object[]{2, null, 2.0}));  // Null 
key preserved
+    assertTrue(containsRow(resultRows, new Object[]{3, "Cc", 3.0}));  // 
Unmatched preserved
+  }
+
   private HashJoinOperator getOperator(DataSchema leftSchema, DataSchema 
resultSchema, JoinRelType joinType,
       List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression> 
nonEquiConditions,
       PlanNode.NodeHint nodeHint) {
diff --git a/pinot-query-runtime/src/test/resources/queries/LeftAntiJoins.json 
b/pinot-query-runtime/src/test/resources/queries/LeftAntiJoins.json
new file mode 100644
index 0000000000..7eb319f991
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/LeftAntiJoins.json
@@ -0,0 +1,75 @@
+{
+  "left_join_null_filter_test": {
+    "extraProps": {
+      "enableColumnBasedNullHandling": true
+    },
+    "tables": {
+      "t1": {
+        "schema": [
+          {"name": "key_col", "type": "STRING", "notNull": false},
+          {"name": "event_time", "type": "INT", "notNull": false},
+          {"name": "nn_event_time", "type": "INT", "notNull": true}
+        ],
+        "inputs": [
+          ["a", 1, 1],
+          ["b", 2, 2],
+          ["c", 3, 3],
+          ["d", 4, 4],
+          ["e", 5, 5],
+          [null, null, 0],
+          ["f", null, 6]
+        ]
+      },
+      "t2": {
+        "schema": [
+          {"name": "key_col", "type": "STRING", "notNull": false},
+          {"name": "event_time", "type": "INT", "notNull": false},
+          {"name": "nn_event_time", "type": "INT", "notNull": true}
+        ],
+        "inputs": [
+          ["b", 2, 2],
+          ["a", 1, 1],
+          ["c", 3, 3],
+          ["a", 2, 2],
+          ["c", 1, 1],
+          ["b", 3, 3],
+          ["d", 5, 5],
+          [null, null, 0],
+          ["f", null, 6]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "LEFT JOIN with null filter on key_col and event_time 
comparison",
+        "sql": "SET enableNullHandling=true; SELECT {t1}.key_col, 
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND 
{t1}.event_time > {t2}.event_time WHERE {t2}.key_col IS NULL",
+        "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN 
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time > {t2}.event_time WHERE 
{t2}.key_col IS NULL"
+      },
+      {
+        "description": "LEFT JOIN with null filter on key_col and event_time 
>= comparison",
+        "sql": "SET enableNullHandling=true; SELECT {t1}.key_col, 
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND 
{t1}.event_time >= {t2}.event_time WHERE {t2}.key_col IS NULL",
+        "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN 
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time >= {t2}.event_time 
WHERE {t2}.key_col IS NULL"
+      },
+      {
+        "description": "LEFT JOIN with null filter on key_col and event_time < 
comparison",
+        "sql": "SET enableNullHandling=true; SELECT {t1}.key_col, 
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND 
{t1}.event_time < {t2}.event_time WHERE {t2}.key_col IS NULL",
+        "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN 
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time < {t2}.event_time WHERE 
{t2}.key_col IS NULL"
+      },
+      {
+        "description": "LEFT JOIN with null filter on key_col and event_time 
<= comparison",
+        "sql": "SET enableNullHandling=true; SELECT {t1}.key_col, 
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND 
{t1}.event_time <= {t2}.event_time WHERE {t2}.key_col IS NULL",
+        "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN 
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time <= {t2}.event_time 
WHERE {t2}.key_col IS NULL"
+      },
+      {
+        "description": "LEFT JOIN with null filter on key_col and non-nullable 
event_time comparison",
+        "sql": "SET enableNullHandling=true; SELECT {t1}.key_col, 
{t1}.nn_event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND 
{t1}.nn_event_time > {t2}.nn_event_time WHERE {t2}.key_col IS NULL",
+        "h2Sql": "SELECT {t1}.key_col, {t1}.nn_event_time FROM {t1} LEFT JOIN 
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.nn_event_time > {t2}.nn_event_time 
WHERE {t2}.key_col IS NULL"
+      },
+      {
+        "description": "LEFT JOIN with null check on key_col",
+        "sql": "SET enableNullHandling=true; SELECT {t1}.key_col, 
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col WHERE 
{t1}.key_col IS NULL",
+        "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN 
{t2} ON {t1}.key_col = {t2}.key_col WHERE {t1}.key_col IS NULL"
+      }
+    ]
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to