This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bf60b0b892 Fix null join key matching in multi-stage hash joins
(#16038)
bf60b0b892 is described below
commit bf60b0b892b3555795cedb5897b905a748199403
Author: Soupam <[email protected]>
AuthorDate: Sat Jun 21 01:45:16 2025 +0530
Fix null join key matching in multi-stage hash joins (#16038)
* Fix JOIN to correctly handle null join keys
Fix JOIN to correctly handle null join keys
Addressed the code comments
* Addressed the code comments and refactored the code
* Linter fix and added test for h2 db
* Delegated the test on H2
* Fixed the UT
---------
Co-authored-by: Soupam <[email protected]>
---
.../query/runtime/operator/HashJoinOperator.java | 68 ++++-
.../runtime/operator/join/ObjectLookupTable.java | 10 +
.../operator/join/PrimitiveLookupTable.java | 37 +--
.../runtime/operator/HashJoinOperatorTest.java | 323 +++++++++++++++++++++
.../src/test/resources/queries/LeftAntiJoins.json | 75 +++++
5 files changed, 479 insertions(+), 34 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index ebacafa718..525c426ed6 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -25,7 +25,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.calcite.rel.core.JoinRelType;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.data.table.Key;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
import org.apache.pinot.query.planner.plannode.JoinNode;
@@ -59,6 +61,9 @@ public class HashJoinOperator extends BaseJoinOperator {
// TODO: Optimize this
@Nullable
private Map<Object, BitSet> _matchedRightRows;
+ // Store null key rows separately for RIGHT and FULL JOINs
+ @Nullable
+ private List<Object[]> _nullKeyRightRows;
public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator
leftInput, DataSchema leftSchema,
MultiStageOperator rightInput, JoinNode node) {
@@ -69,6 +74,8 @@ public class HashJoinOperator extends BaseJoinOperator {
_rightKeySelector = KeySelectorFactory.getKeySelector(node.getRightKeys());
_rightTable = createLookupTable(leftKeys, leftSchema);
_matchedRightRows = needUnmatchedRightRows() ? new HashMap<>() : null;
+ // Initialize _nullKeyRightRows for both RIGHT and FULL JOINs
+ _nullKeyRightRows = needUnmatchedRightRows() ? new ArrayList<>() : null;
}
private static LookupTable createLookupTable(List<Integer> joinKeys,
DataSchema schema) {
@@ -98,10 +105,41 @@ public class HashJoinOperator extends BaseJoinOperator {
protected void addRowsToRightTable(List<Object[]> rows) {
assert _rightTable != null : "Right table should not be null when adding
rows";
for (Object[] row : rows) {
- _rightTable.addRow(_rightKeySelector.getKey(row), row);
+ Object key = _rightKeySelector.getKey(row);
+ // Skip rows with null join keys - they should not participate in
equi-joins per SQL standard
+ if (isNullKey(key)) {
+ // For RIGHT and FULL JOIN, we need to preserve null key rows for the
final output
+ if (_nullKeyRightRows != null) {
+ _nullKeyRightRows.add(row);
+ }
+ continue;
+ }
+ _rightTable.addRow(key, row);
+ }
+ }
+
+ /**
+ * Check if a join key contains null values. In SQL standard, null keys
should not match in equi-joins.
+ **/
+ private boolean isNullKey(Object key) {
+ if (key == null) {
+ return true;
}
+ if (key instanceof Key) {
+ Object[] components = ((Key) key).getValues();
+ for (Object comp : components) {
+ if (comp == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+ // For single keys (non-composite), key == null is already checked above
+ return false;
}
+
+
@Override
protected void finishBuildingRightTable() {
assert _rightTable != null : "Right table should not be null when
finishing building";
@@ -112,6 +150,7 @@ public class HashJoinOperator extends BaseJoinOperator {
protected void onEosProduced() {
_rightTable = null;
_matchedRightRows = null;
+ _nullKeyRightRows = null;
}
@Override
@@ -132,6 +171,17 @@ public class HashJoinOperator extends BaseJoinOperator {
}
}
+ private boolean handleNullKey(Object key, Object[] leftRow, List<Object[]>
rows) {
+ if (isNullKey(key)) {
+ // For INNER joins, don't add anything when key is null
+ if (_joinType == JoinRelType.LEFT || _joinType == JoinRelType.FULL) {
+ handleUnmatchedLeftRow(leftRow, rows);
+ }
+ return true;
+ }
+ return false;
+ }
+
private List<Object[]> buildJoinedDataBlockUniqueKeys(MseBlock.Data
leftBlock) {
assert _rightTable != null : "Right table should not be null when building
joined rows";
List<Object[]> leftRows = leftBlock.asRowHeap().getRows();
@@ -139,6 +189,10 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
+ // Skip rows with null join keys - they should not participate in
equi-joins per SQL standard
+ if (handleNullKey(key, leftRow, rows)) {
+ continue;
+ }
Object[] rightRow = (Object[]) _rightTable.lookup(key);
if (rightRow == null) {
handleUnmatchedLeftRow(leftRow, rows);
@@ -169,6 +223,10 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
+ // Skip rows with null join keys - they should not participate in
equi-joins per SQL standard
+ if (handleNullKey(key, leftRow, rows)) {
+ continue;
+ }
List<Object[]> rightRows = (List<Object[]>) _rightTable.lookup(key);
if (rightRows == null) {
handleUnmatchedLeftRow(leftRow, rows);
@@ -218,7 +276,6 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
- // SEMI-JOIN only checks existence of the key
if (_rightTable.containsKey(key)) {
rows.add(leftRow);
}
@@ -234,7 +291,6 @@ public class HashJoinOperator extends BaseJoinOperator {
for (Object[] leftRow : leftRows) {
Object key = _leftKeySelector.getKey(leftRow);
- // ANTI-JOIN only checks non-existence of the key
if (!_rightTable.containsKey(key)) {
rows.add(leftRow);
}
@@ -272,6 +328,12 @@ public class HashJoinOperator extends BaseJoinOperator {
}
}
}
+ // Add unmatched null key rows from right side for RIGHT and FULL JOIN
+ if (_nullKeyRightRows != null) {
+ for (Object[] nullKeyRow : _nullKeyRightRows) {
+ rows.add(joinRow(null, nullKeyRow));
+ }
+ }
return rows;
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
index 02b00dfd3f..d94ac1cb84 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/ObjectLookupTable.java
@@ -33,6 +33,10 @@ public class ObjectLookupTable extends LookupTable {
@Override
public void addRow(@Nullable Object key, Object[] row) {
+ if (key == null) {
+ // Ignore null keys for SQL semantics
+ return;
+ }
_lookupTable.compute(key, (k, v) -> computeNewValue(row, v));
}
@@ -47,12 +51,18 @@ public class ObjectLookupTable extends LookupTable {
@Override
public boolean containsKey(@Nullable Object key) {
+ if (key == null) {
+ return false; // Null keys are not contained per SQL semantics
+ }
return _lookupTable.containsKey(key);
}
@Nullable
@Override
public Object lookup(@Nullable Object key) {
+ if (key == null) {
+ return null; // Null keys always return null
+ }
return _lookupTable.get(key);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
index af5c01e063..43e96ea9b7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/join/PrimitiveLookupTable.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime.operator.join;
-import com.google.common.collect.Sets;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@@ -26,12 +25,10 @@ import javax.annotation.Nullable;
public abstract class PrimitiveLookupTable extends LookupTable {
- private Object _valueForNullKey;
-
@Override
public void addRow(@Nullable Object key, Object[] row) {
if (key == null) {
- _valueForNullKey = computeNewValue(row, _valueForNullKey);
+ // Do nothing: SQL semantics ignore null keys
return;
}
addRowNotNullKey(key, row);
@@ -40,9 +37,6 @@ public abstract class PrimitiveLookupTable extends
LookupTable {
@Override
public void finish() {
if (!_keysUnique) {
- if (_valueForNullKey != null) {
- _valueForNullKey = convertValueToList(_valueForNullKey);
- }
finishNotNullKey();
}
}
@@ -54,7 +48,8 @@ public abstract class PrimitiveLookupTable extends
LookupTable {
@Override
public boolean containsKey(@Nullable Object key) {
if (key == null) {
- return _valueForNullKey != null;
+ // SQL semantics: null never matches
+ return false;
}
return containsNotNullKey(key);
}
@@ -65,7 +60,8 @@ public abstract class PrimitiveLookupTable extends
LookupTable {
@Override
public Object lookup(@Nullable Object key) {
if (key == null) {
- return _valueForNullKey;
+ // SQL semantics: null lookup returns null
+ return null;
}
return lookupNotNullKey(key);
}
@@ -75,28 +71,7 @@ public abstract class PrimitiveLookupTable extends
LookupTable {
@SuppressWarnings("rawtypes")
@Override
public Set<Map.Entry<Object, Object>> entrySet() {
- Set<Map.Entry<Object, Object>> notNullSet = notNullKeyEntrySet();
- if (_valueForNullKey != null) {
- Set<Map.Entry<Object, Object>> nullEntry = Set.of(new Map.Entry<>() {
- @Override
- public Object getKey() {
- return null;
- }
-
- @Override
- public Object getValue() {
- return _valueForNullKey;
- }
-
- @Override
- public Object setValue(Object value) {
- throw new UnsupportedOperationException();
- }
- });
- return Sets.union(notNullSet, nullEntry);
- } else {
- return notNullSet;
- }
+ return notNullKeyEntrySet();
}
protected abstract Set<Map.Entry<Object, Object>> notNullKeyEntrySet();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
index fa55c8f230..4c101b1734 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java
@@ -42,6 +42,7 @@ import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.internal.junit.ArrayAsserts.assertArrayEquals;
public class HashJoinOperatorTest {
@@ -442,6 +443,328 @@ public class HashJoinOperatorTest {
"Max rows in join should be reached");
}
+ @Test
+ public void shouldHandleHashJoinKeyCollisionLeftJoinWithNulls() {
+ // Test LEFT join with both hash collision AND null values
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa") // Hash collision string
+ .addRow(2, "BB") // Hash collision string
+ .addRow(3, null) // Null key
+ .addRow(4, "CC") // Non-collision string
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(2, "Aa") // Hash collision match
+ .addRow(2, "BB") // Hash collision match
+ .addRow(3, null) // Null key - should NOT match left null
+ .addRow(5, "DD") // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.INT, ColumnDataType.STRING});
+
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.LEFT,
List.of(1), List.of(1), List.of());
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ assertEquals(resultRows.size(), 4);
+ assertEquals(resultRows.get(0), new Object[]{1, "Aa", 2, "Aa"}); //
Hash collision match
+ assertEquals(resultRows.get(1), new Object[]{2, "BB", 2, "BB"}); //
Hash collision match
+ assertEquals(resultRows.get(2), new Object[]{3, null, null, null}); //
Left null preserved, no match
+ assertEquals(resultRows.get(3), new Object[]{4, "CC", null, null}); //
Left unmatched preserved
+ }
+
+ @Test
+ public void shouldHandleRightJoinWithNulls() {
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa")
+ .addRow(2, null)
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(2, "Aa")
+ .addRow(3, null)
+ .addRow(4, "BB")
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.INT, ColumnDataType.STRING});
+
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.RIGHT,
List.of(1), List.of(1), List.of());
+
+ // First block: only non-null match
+ List<Object[]> resultRows1 = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+ assertEquals(1, resultRows1.size());
+ assertTrue(containsRow(resultRows1, new Object[]{1, "Aa", 2, "Aa"}));
+
+ // Second block: unmatched right rows
+ List<Object[]> resultRows2 = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+ assertEquals(2, resultRows2.size());
+ assertTrue(containsRow(resultRows2, new Object[]{null, null, 3, null}));
+ assertTrue(containsRow(resultRows2, new Object[]{null, null, 4, "BB"}));
+
+ // Third block should be EOS
+ assertTrue(operator.nextBlock().isSuccess());
+ }
+
+ @Test
+ public void shouldHandleFullJoinWithNulls() {
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa")
+ .addRow(2, null)
+ .addRow(4, "CC")
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(2, "Aa")
+ .addRow(2, null)
+ .addRow(3, "BB")
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "int_col2", "string_col2"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.INT, ColumnDataType.STRING});
+
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.FULL,
List.of(1), List.of(1), List.of());
+
+ // First block
+ List<Object[]> resultRows1 = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+ assertEquals(3, resultRows1.size());
+
+ assertTrue(containsRow(resultRows1, new Object[]{1, "Aa", 2, "Aa"})); //
Match
+ assertTrue(containsRow(resultRows1, new Object[]{2, null, null, null}));
// Left null unmatched
+ assertTrue(containsRow(resultRows1, new Object[]{4, "CC", null, null}));
// Left unmatched
+
+ // Second block
+ List<Object[]> resultRows2 = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+ assertEquals(2, resultRows2.size());
+
+ assertTrue(containsRow(resultRows2, new Object[]{null, null, 2, null}));
// Right null unmatched
+ assertTrue(containsRow(resultRows2, new Object[]{null, null, 3, "BB"}));
// Right unmatched
+ }
+
+ private boolean containsRow(List<Object[]> rows, Object[] expectedRow) {
+ for (Object[] row : rows) {
+ if (java.util.Arrays.equals(row, expectedRow)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ @Test
+ public void shouldHandleSemiJoinWithNulls() {
+ // Test SEMI join - should not match null keys
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa")
+ .addRow(2, null) // Null key
+ .addRow(4, "CC") // No match in right
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(2, "Aa") // Match for left row 1
+ .addRow(3, null) // Null - should NOT match left null
+ .addRow(5, "BB") // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING});
+
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.SEMI,
List.of(1), List.of(1), List.of());
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ assertEquals(resultRows.size(), 1);
+ assertEquals(resultRows.get(0), new Object[]{1, "Aa"}); // Only non-null
match
+ }
+
+ @Test
+ public void shouldHandleAntiJoinWithNulls() {
+ // Test ANTI join - null keys should be preserved (not matched)
+ _leftInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(1, "Aa") // Has match in right
+ .addRow(2, null) // Null key - no match
+ .addRow(4, "CC") // No match in right
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(DEFAULT_CHILD_SCHEMA)
+ .addRow(2, "Aa") // Match for left row 1
+ .addRow(3, null) // Null - should NOT match left null
+ .addRow(5, "BB") // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING});
+
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.ANTI,
List.of(1), List.of(1), List.of());
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ assertEquals(resultRows.size(), 2);
+ assertEquals(resultRows.get(0), new Object[]{2, null}); // Left null
preserved (no match)
+ assertEquals(resultRows.get(1), new Object[]{4, "CC"}); // Left unmatched
preserved
+ }
+
+ @Test
+ public void shouldHandleCompositeKeyWithNullValues() {
+ // Test composite key join (multi-column) with null values
+ // This should expose the bug in isNullKey method where it checks for
Object[] instead of Key
+
+ DataSchema compositeSchema = new DataSchema(
+ new String[]{"int_col", "string_col", "double_col"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE});
+
+ _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Normal row
+ .addRow(2, null, 2.0) // Null in second key component
+ .addRow(3, "Cc", null) // Null in third key component
+ .addRow(4, "Dd", 4.0) // Normal row
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Match for first left row
+ .addRow(2, null, 2.0) // Should NOT match left null (SQL
standard)
+ .addRow(3, "Cc", null) // Should NOT match left null (SQL
standard)
+ .addRow(5, "Ee", 5.0) // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "double_col1", "int_col2",
"string_col2", "double_col2"},
+ new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE,
+ ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE
+ });
+
+ // Composite key join on columns 1 and 2 (string_col and double_col)
+ HashJoinOperator operator = getOperator(compositeSchema, resultSchema,
JoinRelType.LEFT,
+ List.of(1, 2), List.of(1, 2), List.of(), PlanNode.NodeHint.EMPTY);
+
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Expected behavior per SQL standard:
+ // - Row 1: (1, "Aa", 1.0) should match (1, "Aa", 1.0)
+ // - Row 2: (2, null, 2.0) should NOT match (2, null, 2.0) -> left
preserved with nulls
+ // - Row 3: (3, "Cc", null) should NOT match (3, "Cc", null) -> left
preserved with nulls
+ // - Row 4: (4, "Dd", 4.0) has no match -> left preserved with nulls
+
+ assertEquals(resultRows.size(), 4);
+ assertEquals(resultRows.get(0), new Object[]{1, "Aa", 1.0, 1, "Aa", 1.0});
// Match
+ assertEquals(resultRows.get(1), new Object[]{2, null, 2.0, null, null,
null}); // Left null preserved
+ assertEquals(resultRows.get(2), new Object[]{3, "Cc", null, null, null,
null}); // Left null preserved
+ assertEquals(resultRows.get(3), new Object[]{4, "Dd", 4.0, null, null,
null}); // Left unmatched preserved
+ }
+
+ @Test
+ public void shouldHandleCompositeKeyInnerJoinWithNulls() {
+ // Test that composite keys with nulls are properly excluded from INNER
join
+
+ DataSchema compositeSchema = new DataSchema(
+ new String[]{"int_col", "string_col", "double_col"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE});
+
+ _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Should match
+ .addRow(2, null, 2.0) // Should be excluded (null key)
+ .addRow(3, "Cc", 3.0) // No match in right
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Match
+ .addRow(2, null, 2.0) // Should be excluded (null key)
+ .addRow(4, "Dd", 4.0) // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "double_col1", "int_col2",
"string_col2", "double_col2"},
+ new ColumnDataType[]{
+ ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE,
+ ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE
+ });
+
+ // Composite key join on columns 1 and 2 (string_col and double_col)
+ HashJoinOperator operator = getOperator(compositeSchema, resultSchema,
JoinRelType.INNER,
+ List.of(1, 2), List.of(1, 2), List.of(), PlanNode.NodeHint.EMPTY);
+
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+ // Only the non-null key match should be returned
+ assertEquals(resultRows.size(), 1);
+ assertArrayEquals(resultRows.get(0), new Object[]{1, "Aa", 1.0, 1, "Aa",
1.0});
+ }
+
+ @Test
+ public void shouldHandleCompositeKeySemiJoinWithNulls() {
+ // Test that SEMI join properly handles composite keys with nulls
+
+ DataSchema compositeSchema = new DataSchema(
+ new String[]{"int_col", "string_col", "double_col"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE});
+
+ _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Should match
+ .addRow(2, null, 2.0) // Should be excluded (null key)
+ .addRow(3, "Cc", 3.0) // No match in right
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Match
+ .addRow(2, null, 2.0) // Should be excluded (null key)
+ .addRow(4, "Dd", 4.0) // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "double_col1"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE});
+
+ // Composite key join on columns 1 and 2 (string_col and double_col)
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.SEMI,
+ List.of(1, 2), List.of(1, 2), List.of());
+
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Only left rows with non-null keys that have matches should be returned
+ assertEquals(resultRows.size(), 1);
+ assertEquals(resultRows.get(0), new Object[]{1, "Aa", 1.0});
+ }
+
+ @Test
+ public void shouldHandleCompositeKeyAntiJoinWithNulls() {
+ // Test that ANTI join properly handles composite keys with nulls
+ // Per SQL standard, rows with null keys should be included in ANTI join
result
+
+ DataSchema compositeSchema = new DataSchema(
+ new String[]{"int_col", "string_col", "double_col"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE});
+
+ _leftInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Has match in right
+ .addRow(2, null, 2.0) // Null key - should be included
+ .addRow(3, "Cc", 3.0) // No match in right
+ .buildWithEos();
+
+ _rightInput = new BlockListMultiStageOperator.Builder(compositeSchema)
+ .addRow(1, "Aa", 1.0) // Match for left row 1
+ .addRow(2, null, 2.0) // Null key - should not match left null
+ .addRow(4, "Dd", 4.0) // No match in left
+ .buildWithEos();
+
+ DataSchema resultSchema = new DataSchema(
+ new String[]{"int_col1", "string_col1", "double_col1"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.STRING,
ColumnDataType.DOUBLE});
+
+ // Composite key join on columns 1 and 2 (string_col and double_col)
+ HashJoinOperator operator = getOperator(resultSchema, JoinRelType.ANTI,
+ List.of(1, 2), List.of(1, 2), List.of());
+
+ List<Object[]> resultRows = ((MseBlock.Data)
operator.nextBlock()).asRowHeap().getRows();
+
+ // Left rows with null keys and unmatched non-null keys should be returned
+ assertEquals(resultRows.size(), 2);
+ assertTrue(containsRow(resultRows, new Object[]{2, null, 2.0})); // Null
key preserved
+ assertTrue(containsRow(resultRows, new Object[]{3, "Cc", 3.0})); //
Unmatched preserved
+ }
+
private HashJoinOperator getOperator(DataSchema leftSchema, DataSchema
resultSchema, JoinRelType joinType,
List<Integer> leftKeys, List<Integer> rightKeys, List<RexExpression>
nonEquiConditions,
PlanNode.NodeHint nodeHint) {
diff --git a/pinot-query-runtime/src/test/resources/queries/LeftAntiJoins.json
b/pinot-query-runtime/src/test/resources/queries/LeftAntiJoins.json
new file mode 100644
index 0000000000..7eb319f991
--- /dev/null
+++ b/pinot-query-runtime/src/test/resources/queries/LeftAntiJoins.json
@@ -0,0 +1,75 @@
+{
+ "left_join_null_filter_test": {
+ "extraProps": {
+ "enableColumnBasedNullHandling": true
+ },
+ "tables": {
+ "t1": {
+ "schema": [
+ {"name": "key_col", "type": "STRING", "notNull": false},
+ {"name": "event_time", "type": "INT", "notNull": false},
+ {"name": "nn_event_time", "type": "INT", "notNull": true}
+ ],
+ "inputs": [
+ ["a", 1, 1],
+ ["b", 2, 2],
+ ["c", 3, 3],
+ ["d", 4, 4],
+ ["e", 5, 5],
+ [null, null, 0],
+ ["f", null, 6]
+ ]
+ },
+ "t2": {
+ "schema": [
+ {"name": "key_col", "type": "STRING", "notNull": false},
+ {"name": "event_time", "type": "INT", "notNull": false},
+ {"name": "nn_event_time", "type": "INT", "notNull": true}
+ ],
+ "inputs": [
+ ["b", 2, 2],
+ ["a", 1, 1],
+ ["c", 3, 3],
+ ["a", 2, 2],
+ ["c", 1, 1],
+ ["b", 3, 3],
+ ["d", 5, 5],
+ [null, null, 0],
+ ["f", null, 6]
+ ]
+ }
+ },
+ "queries": [
+ {
+ "description": "LEFT JOIN with null filter on key_col and event_time
comparison",
+ "sql": "SET enableNullHandling=true; SELECT {t1}.key_col,
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND
{t1}.event_time > {t2}.event_time WHERE {t2}.key_col IS NULL",
+ "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time > {t2}.event_time WHERE
{t2}.key_col IS NULL"
+ },
+ {
+ "description": "LEFT JOIN with null filter on key_col and event_time
>= comparison",
+ "sql": "SET enableNullHandling=true; SELECT {t1}.key_col,
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND
{t1}.event_time >= {t2}.event_time WHERE {t2}.key_col IS NULL",
+ "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time >= {t2}.event_time
WHERE {t2}.key_col IS NULL"
+ },
+ {
+ "description": "LEFT JOIN with null filter on key_col and event_time <
comparison",
+ "sql": "SET enableNullHandling=true; SELECT {t1}.key_col,
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND
{t1}.event_time < {t2}.event_time WHERE {t2}.key_col IS NULL",
+ "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time < {t2}.event_time WHERE
{t2}.key_col IS NULL"
+ },
+ {
+ "description": "LEFT JOIN with null filter on key_col and event_time
<= comparison",
+ "sql": "SET enableNullHandling=true; SELECT {t1}.key_col,
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND
{t1}.event_time <= {t2}.event_time WHERE {t2}.key_col IS NULL",
+ "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.event_time <= {t2}.event_time
WHERE {t2}.key_col IS NULL"
+ },
+ {
+ "description": "LEFT JOIN with null filter on key_col and non-nullable
event_time comparison",
+ "sql": "SET enableNullHandling=true; SELECT {t1}.key_col,
{t1}.nn_event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col AND
{t1}.nn_event_time > {t2}.nn_event_time WHERE {t2}.key_col IS NULL",
+ "h2Sql": "SELECT {t1}.key_col, {t1}.nn_event_time FROM {t1} LEFT JOIN
{t2} ON {t1}.key_col = {t2}.key_col AND {t1}.nn_event_time > {t2}.nn_event_time
WHERE {t2}.key_col IS NULL"
+ },
+ {
+ "description": "LEFT JOIN with null check on key_col",
+ "sql": "SET enableNullHandling=true; SELECT {t1}.key_col,
{t1}.event_time FROM {t1} LEFT JOIN {t2} ON {t1}.key_col = {t2}.key_col WHERE
{t1}.key_col IS NULL",
+ "h2Sql": "SELECT {t1}.key_col, {t1}.event_time FROM {t1} LEFT JOIN
{t2} ON {t1}.key_col = {t2}.key_col WHERE {t1}.key_col IS NULL"
+ }
+ ]
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]