This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new fe50526f33 PHOENIX-7758 Read repair with DistinctPrefixFilter can give 
incorrect results (#2365)
fe50526f33 is described below

commit fe50526f3305b2f2cf1ed326aaf9965dd1cf680d
Author: tkhurana <[email protected]>
AuthorDate: Wed Feb 25 14:45:22 2026 -0800

    PHOENIX-7758 Read repair with DistinctPrefixFilter can give incorrect 
results (#2365)
---
 .../phoenix/filter/DistinctPrefixFilter.java       |  42 ++++++
 .../phoenix/filter/EmptyColumnOnlyFilter.java      |  19 +--
 .../phoenix/iterate/BaseResultIterators.java       |   9 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  27 ++++
 .../coprocessor/CDCGlobalIndexRegionScanner.java   |   4 +-
 .../UncoveredGlobalIndexRegionScanner.java         |   4 +-
 .../coprocessor/UncoveredIndexRegionScanner.java   |  21 ++-
 .../UncoveredLocalIndexRegionScanner.java          |   4 +-
 .../apache/phoenix/index/GlobalIndexChecker.java   |  23 +--
 .../phoenix/iterate/RegionScannerFactory.java      |  22 ++-
 .../phoenix/end2end/BackwardCompatibilityIT.java   |   9 ++
 .../end2end/BackwardCompatibilityTestUtil.java     |   3 +
 .../end2end/index/GlobalIndexCheckerIT.java        | 162 +++++++++++++++++++++
 .../resources/gold_files/gold_query_distinct.txt   |  27 ++++
 .../src/it/resources/sql_files/create_distinct.sql |  36 +++++
 .../src/it/resources/sql_files/query_distinct.sql  |  21 +++
 16 files changed, 376 insertions(+), 57 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
index 73261a7bd4..c7c075edd7 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.filter;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -33,6 +34,7 @@ import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
 
 public class DistinctPrefixFilter extends FilterBase implements Writable {
   private static byte VERSION = 1;
@@ -44,6 +46,8 @@ public class DistinctPrefixFilter extends FilterBase 
implements Writable {
   private int lastPosition;
   private final ImmutableBytesWritable lastKey =
     new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY, -1, -1);
+  private byte[] emptyCF;
+  private byte[] emptyCQ;
 
   public DistinctPrefixFilter() {
   }
@@ -51,12 +55,28 @@ public class DistinctPrefixFilter extends FilterBase 
implements Writable {
   public DistinctPrefixFilter(RowKeySchema schema, int prefixLength) {
     this.schema = schema;
     this.prefixLength = prefixLength;
+    this.emptyCF = null;
+    this.emptyCQ = null;
+  }
+
+  public DistinctPrefixFilter(RowKeySchema schema, int prefixLength, byte[] 
emptyCF,
+    byte[] emptyCQ) {
+    this(schema, prefixLength);
+    this.emptyCF = emptyCF;
+    this.emptyCQ = emptyCQ;
   }
 
   public void setOffset(int offset) {
     this.offset = offset;
   }
 
+  // This is used when the DistinctPrefixFilter is present on a scan on an 
uncovered index
+  public void reinitialize() {
+    lastKey.set(ByteUtil.EMPTY_BYTE_ARRAY, -1, -1);
+    lastPosition = -1;
+    filterAll = false;
+  }
+
   // No @Override for HBase 3 compatibility
   public ReturnCode filterKeyValue(Cell v) throws IOException {
     return filterCell(v);
@@ -64,6 +84,10 @@ public class DistinctPrefixFilter extends FilterBase 
implements Writable {
 
   @Override
   public ReturnCode filterCell(Cell v) throws IOException {
+    if (emptyCF != null && emptyCQ != null && !ScanUtil.isEmptyColumn(v, 
emptyCF, emptyCQ)) {
+      // wait for the empty column
+      return ReturnCode.NEXT_COL;
+    }
     ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
     // First determine the prefix based on the schema
@@ -151,6 +175,12 @@ public class DistinctPrefixFilter extends FilterBase 
implements Writable {
     out.writeByte(VERSION);
     schema.write(out);
     out.writeInt(prefixLength);
+    if (emptyCF != null && emptyCQ != null) {
+      out.writeInt(emptyCF.length);
+      out.write(emptyCF);
+      out.writeInt(emptyCQ.length);
+      out.write(emptyCQ);
+    }
   }
 
   @Override
@@ -159,6 +189,18 @@ public class DistinctPrefixFilter extends FilterBase 
implements Writable {
     schema = new RowKeySchema();
     schema.readFields(in);
     prefixLength = in.readInt();
+    try {
+      int length = in.readInt();
+      emptyCF = new byte[length];
+      in.readFully(emptyCF, 0, length);
+      length = in.readInt();
+      emptyCQ = new byte[length];
+      in.readFully(emptyCQ, 0, length);
+    } catch (EOFException e) {
+      // Older client doesn't send empty column information
+      emptyCF = null;
+      emptyCQ = null;
+    }
   }
 
   @Override
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
index a5d78112c2..03e0168c3b 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
@@ -39,7 +39,6 @@ public class EmptyColumnOnlyFilter extends FilterBase 
implements Writable {
   private byte[] emptyCQ;
   private boolean found = false;
   private boolean first = true;
-  private Cell emptyColumnCell = null;
 
   public EmptyColumnOnlyFilter() {
   }
@@ -55,7 +54,6 @@ public class EmptyColumnOnlyFilter extends FilterBase 
implements Writable {
   public void reset() throws IOException {
     found = false;
     first = true;
-    emptyColumnCell = null;
   }
 
   // No @Override for HBase 3 compatibility
@@ -70,7 +68,6 @@ public class EmptyColumnOnlyFilter extends FilterBase 
implements Writable {
     }
     if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
       found = true;
-      emptyColumnCell = cell;
       return ReturnCode.INCLUDE;
     }
     if (first) {
@@ -82,22 +79,8 @@ public class EmptyColumnOnlyFilter extends FilterBase 
implements Writable {
 
   @Override
   public void filterRowCells(List<Cell> kvs) throws IOException {
-    if (kvs.size() > 2) {
-      throw new IOException("EmptyColumnOnlyFilter got unexpected cells: " + 
kvs.size());
-    } else if (kvs.size() == 2) {
-      // remove the first cell and only return the empty column cell
+    if (kvs.size() > 1) {
       kvs.remove(0);
-    } else if (kvs.size() == 1) {
-      // we only have 1 cell, check if it is the empty column cell or not
-      // since the empty column cell could have been excluded by another 
filter like the
-      // DistinctPrefixFilter.
-      Cell cell = kvs.get(0);
-      if (found && !ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
-        // we found the empty cell, but it was not included so replace the 
existing cell
-        // with the empty column cell
-        kvs.remove(0);
-        kvs.add(emptyColumnCell);
-      }
     }
   }
 
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 328b2eb870..45e160fac8 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -336,9 +336,12 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
           && groupBy.isOrderPreserving()
           && (context.getAggregationManager().isEmpty() || 
groupBy.isUngroupedAggregate())
       ) {
-
-        ScanUtil.andFilterAtEnd(scan,
-          new 
DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(), cols));
+        byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+        byte[] ecq = table.getEncodingScheme() == NON_ENCODED_QUALIFIERS
+          ? QueryConstants.EMPTY_COLUMN_BYTES
+          : 
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+        ScanUtil.andFilterAtEnd(scan, new DistinctPrefixFilter(
+          plan.getTableRef().getTable().getRowKeySchema(), cols, ecf, ecq));
         if (!groupBy.isUngroupedAggregate() && plan.getLimit() != null) {
           // We can push the limit to the server,but for UngroupedAggregate
           // we can not push the limit.
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 5305408fbc..1aaf672ca8 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1931,6 +1931,33 @@ public class ScanUtil {
     return null;
   }
 
+  public static DistinctPrefixFilter findDistinctPrefixFilter(Scan scan) {
+    Filter filter = scan.getFilter();
+    if (filter instanceof PagingFilter) {
+      filter = ((PagingFilter) filter).getDelegateFilter();
+    }
+    return findDistinctPrefixFilter(filter);
+  }
+
+  public static DistinctPrefixFilter findDistinctPrefixFilter(Filter filter) {
+    if (filter == null) {
+      return null;
+    }
+    if (filter instanceof DistinctPrefixFilter) {
+      return (DistinctPrefixFilter) filter;
+    }
+    if (filter instanceof FilterList) {
+      Iterator<Filter> filterIterator = ((FilterList) 
filter).getFilters().iterator();
+      while (filterIterator.hasNext()) {
+        DistinctPrefixFilter distinctFilter = 
findDistinctPrefixFilter(filterIterator.next());
+        if (distinctFilter != null) {
+          return distinctFilter;
+        }
+      }
+    }
+    return null;
+  }
+
   /**
    * Verify whether the given row key is in the scan boundaries i.e. scan 
start and end keys.
    * @param ptr  row key.
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 0b49441824..5bb0c40217 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -91,9 +91,9 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     final Scan scan, final RegionCoprocessorEnvironment env, final Scan 
dataTableScan,
     final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
     final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long 
pageSizeMs,
-    final long queryLimit) throws IOException {
+    final long queryLimit, boolean isDistinct) throws IOException {
     super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
-      viewConstants, ptr, pageSizeMs, queryLimit);
+      viewConstants, ptr, pageSizeMs, queryLimit, isDistinct);
     CDCUtil.setupScanForCDC(dataTableScan);
     cdcDataTableInfo = CDCTableInfo
       
.createFromProto(CDCInfoProtos.CDCTableDef.parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index 0a8a0adcde..f457a9f633 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -88,9 +88,9 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
     final Scan scan, final RegionCoprocessorEnvironment env, final Scan 
dataTableScan,
     final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
     final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long 
pageSizeMs,
-    final long queryLimit) throws IOException {
+    final long queryLimit, boolean isDistinct) throws IOException {
     super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
-      viewConstants, ptr, pageSizeMs, queryLimit);
+      viewConstants, ptr, pageSizeMs, queryLimit, isDistinct);
     final Configuration config = env.getConfiguration();
     hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
     rowCountPerTask =
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 1b126a6591..640b85bc5a 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -113,16 +113,25 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
     final Scan scan, final RegionCoprocessorEnvironment env, final Scan 
dataTableScan,
     final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
     final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long 
pageSizeMs,
-    final long queryLimit) {
+    final long queryLimit, boolean isDistinct) {
     super(innerScanner);
     final Configuration config = env.getConfiguration();
 
-    byte[] pageSizeFromScan = scan.getAttribute(INDEX_PAGE_ROWS);
-    if (pageSizeFromScan != null) {
-      pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+    if (isDistinct) {
+      // If the scan has a DistinctPrefix filter set the batch size to 1. This 
is because we don't
+      // want to skip rows without first checking if the row is valid or not 
and passes any
+      // additional filters evaluated after merging with the data table. Using 
a batch of
+      // size 1 is OK when distinct prefix filter is used since if the row is 
valid we will jump to
+      // the next unique prefix so ideally we should be scanning very few rows.
+      pageSizeInRows = 1;
     } else {
-      pageSizeInRows = (int) config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
-        QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+      byte[] pageSizeFromScan = scan.getAttribute(INDEX_PAGE_ROWS);
+      if (pageSizeFromScan != null) {
+        pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+      } else {
+        pageSizeInRows = (int) config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
+          QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+      }
     }
     if (queryLimit != -1) {
       pageSizeInRows = Long.min(pageSizeInRows, queryLimit);
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 6b5d124ce0..43c3eb4e48 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -49,9 +49,9 @@ public class UncoveredLocalIndexRegionScanner extends 
UncoveredIndexRegionScanne
     final Scan scan, final RegionCoprocessorEnvironment env, final Scan 
dataTableScan,
     final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
     final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long 
pageSizeMs,
-    final int offset, final byte[] actualStartKey, final long queryLimit) {
+    final int offset, final byte[] actualStartKey, final long queryLimit, 
boolean isDistinct) {
     super(innerScanner, region, scan, env, dataTableScan, tupleProjector, 
indexMaintainer,
-      viewConstants, ptr, pageSizeMs, queryLimit);
+      viewConstants, ptr, pageSizeMs, queryLimit, isDistinct);
     this.offset = offset;
     this.actualStartKey = actualStartKey;
   }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 7b0a2e3f82..d9dcd17eea 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.PageFilter;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
@@ -63,7 +61,6 @@ import 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.DataTableScanMetrics;
 import org.apache.phoenix.coprocessor.DelegateRegionScanner;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
-import org.apache.phoenix.filter.EmptyColumnOnlyFilter;
 import org.apache.phoenix.filter.PagingFilter;
 import org.apache.phoenix.filter.UnverifiedRowFilter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -237,23 +234,7 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
     }
 
     private boolean shouldCreateUnverifiedRowFilter(Filter delegateFilter) {
-      if (delegateFilter == null) {
-        return false;
-      }
-      Filter wrappedFilter = delegateFilter;
-      if (delegateFilter instanceof FilterList) {
-        List<Filter> filters = ((FilterList) delegateFilter).getFilters();
-        wrappedFilter = filters.get(0);
-      }
-      // Optimization since FirstKeyOnlyFilter and EmptyColumnOnlyFilter
-      // always include the empty column in the scan result
-      if (
-        wrappedFilter instanceof FirstKeyOnlyFilter
-          || wrappedFilter instanceof EmptyColumnOnlyFilter
-      ) {
-        return false;
-      }
-      return true;
+      return delegateFilter != null && !indexMaintainer.isUncovered();
     }
 
     public boolean next(List<Cell> result, boolean raw, ScannerContext 
scannerContext)
@@ -630,7 +611,7 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
         long repairStart = EnvironmentEdgeManager.currentTimeMillis();
 
         byte[] rowKey = CellUtil.cloneRow(cell);
-        long ts = cellList.get(0).getTimestamp();
+        long ts = getMaxTimestamp(cellList);
         cellList.clear();
         long repairTime;
         try {
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index bda112e709..1cbb6f67d3 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.filter.DistinctPrefixFilter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.query.QueryConstants;
@@ -117,6 +118,7 @@ public abstract class RegionScannerFactory {
       final long pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
       Expression extraWhere = null;
       long extraLimit = -1;
+      DistinctPrefixFilter distinctFilter = null;
 
       {
         // for indexes construct the row filter for uncovered columns if it 
exists
@@ -169,19 +171,22 @@ public abstract class RegionScannerFactory {
                 dataTableScan.addColumn(column.getFamily(), 
column.getQualifier());
               }
             }
+            // If the DistinctPrefix filter is present on the scan we set the 
batch size to 1
+            // when scanning uncovered index rows
+            distinctFilter = ScanUtil.findDistinctPrefixFilter(scan);
             if (ScanUtil.isLocalIndex(scan)) {
               s = new UncoveredLocalIndexRegionScanner(regionScanner, 
dataRegion, scan, env,
                 dataTableScan, tupleProjector, indexMaintainer, viewConstants, 
ptr, pageSizeMs,
-                offset, actualStartKey, extraLimit);
+                offset, actualStartKey, extraLimit, distinctFilter != null);
             } else {
               if (scan.getAttribute(CDC_DATA_TABLE_DEF) != null) {
                 s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, 
scan, env,
                   dataTableScan, tupleProjector, indexMaintainer, 
viewConstants, ptr, pageSizeMs,
-                  extraLimit);
+                  extraLimit, distinctFilter != null);
               } else {
                 s = new UncoveredGlobalIndexRegionScanner(regionScanner, 
dataRegion, scan, env,
                   dataTableScan, tupleProjector, indexMaintainer, 
viewConstants, ptr, pageSizeMs,
-                  extraLimit);
+                  extraLimit, distinctFilter != null);
               }
             }
           }
@@ -253,6 +258,11 @@ public abstract class RegionScannerFactory {
             return true;
           }
           if (result.size() == 0) {
+            if (distinctFilter != null) {
+              // we got an orphaned uncovered index row just reinitialize the 
distinct filter and
+              // move to the new row
+              distinctFilter.reinitialize();
+            }
             return next;
           }
           if ((ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) && 
!ScanUtil.isAnalyzeTable(scan)) {
@@ -274,6 +284,12 @@ public abstract class RegionScannerFactory {
               extraWhere.evaluate(merged, ptr);
               if 
(!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) {
                 result.clear();
+                if (distinctFilter != null) {
+                  // The current row was rejected after evaluating the extra 
where conditions.
+                  // We can't skip to the next unique key prefix as that could 
result in skipping
+                  // valid result so reinitialize the distinct filter and move 
to the next row
+                  distinctFilter.reinitialize();
+                }
                 return next;
               }
             }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index f509787599..a2487cd778 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -21,6 +21,7 @@ import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DATA;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_VIEW_INDEX;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD;
+import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DISTINCT;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_OFFSET;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ORDERED_GROUP_BY;
@@ -31,6 +32,7 @@ import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW;
+import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_DISTINCT;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_OFFSET;
 import static 
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ORDERED_GROUP_BY;
@@ -475,6 +477,13 @@ public class BackwardCompatibilityIT {
     assertExpectedOutput(QUERY_ORDER_BY_NON_PK);
   }
 
+  @Test
+  public void testDistinctPrefixAddDataByNewClientReadByOldClient() throws 
Exception {
+    executeQueriesWithCurrentVersion(CREATE_DISTINCT, url, NONE);
+    executeQueryWithClientVersion(compatibleClientVersion, QUERY_DISTINCT, 
zkQuorum);
+    assertExpectedOutput(QUERY_DISTINCT);
+  }
+
   private boolean isClientCompatibleForOrderedGroupByQuery() {
     String[] clientVersion = compatibleClientVersion.getVersion().split("\\.");
     int majorVersion = Integer.parseInt(clientVersion[0]);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
index d72e415085..bfea0ed716 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
@@ -84,10 +84,12 @@ public final class BackwardCompatibilityTestUtil {
   public static final String ORDERED_GROUP_BY = "ordered_groupby";
   public static final String ORDER_BY_NON_PK = "orderby_nonpk";
   public static final String OFFSET = "offset";
+  public static final String DISTINCT = "distinct";
   public static final String CREATE_UNORDERED_GROUP_BY = "create_" + 
UNORDERED_GROUP_BY;
   public static final String CREATE_ORDERED_GROUP_BY = "create_" + 
ORDERED_GROUP_BY;
   public static final String CREATE_ORDER_BY_NON_PK = "create_" + 
ORDER_BY_NON_PK;
   public static final String CREATE_OFFSET = "create_" + OFFSET;
+  public static final String CREATE_DISTINCT = "create_" + DISTINCT;
   public static final String ADD_DATA = "add_data";
   public static final String ADD_DELETE = "add_delete";
   public static final String ADD_VIEW_INDEX = "add_view_index";
@@ -99,6 +101,7 @@ public final class BackwardCompatibilityTestUtil {
   public static final String QUERY_ORDERED_GROUP_BY = QUERY_PREFIX + 
ORDERED_GROUP_BY;
   public static final String QUERY_OFFSET = QUERY_PREFIX + OFFSET;
   public static final String QUERY_ORDER_BY_NON_PK = QUERY_PREFIX + 
ORDER_BY_NON_PK;
+  public static final String QUERY_DISTINCT = QUERY_PREFIX + DISTINCT;
   public static final String QUERY_CREATE_ADD = QUERY_PREFIX + CREATE_ADD;
   public static final String QUERY_ADD_DATA = QUERY_PREFIX + ADD_DATA;
   public static final String QUERY_ADD_DELETE = QUERY_PREFIX + ADD_DELETE;
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 0abe583a18..67ccf665e2 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -36,12 +36,14 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
@@ -1361,6 +1363,166 @@ public class GlobalIndexCheckerIT extends BaseTest {
     }
   }
 
+  @Test
+  public void testReadRepairWithDistinctPrefixFilter() throws Exception {
+    Assume.assumeTrue(async == false);
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      String indexName = generateUniqueName();
+      conn.createStatement().execute("create table " + dataTableName
+        + " (id1 varchar(10) not null, id2 varchar(10) not null, val1 
varchar(10), val2 varchar(10), "
+        + "val3 varchar(10), val4 varchar(10) constraint pk primary key(id1, 
id2))"
+        + tableDDLOptions);
+      conn.createStatement().execute("CREATE INDEX " + indexName + " on " + 
dataTableName
+        + " (val1, val2) include (val3, val4)" + this.indexDDLOptions);
+
+      // create orphan unverified index row
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a2', 'val1', 'val2a', 'val3', 'val4')");
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')");
+      commitWithException(conn);
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a3', 'val1', 'val2a', 'val31', 'val4')");
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a2', 'a1', 'val1', 'val2a', 'val31', 'val4')");
+      conn.commit();
+
+      // create an unverified update to the index row pointing to an existing 
data row
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a2', 'a1', 'val1', 'val1b', 'val3', 'val4')");
+      commitWithException(conn);
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a2', 'a2', 'val1', 'val1b', 'val3', 'val4')");
+      conn.commit();
+
+      ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2");
+      String selectSql =
+        "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1' 
AND val2 = 'val2a'";
+      verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+
+      expectedValues = Lists.newArrayList("a2");
+      selectSql =
+        "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1' 
AND val2 = 'val1b'";
+      verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')");
+      conn.commit();
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+      expectedValues = Lists.newArrayList("a1", "a2", "a3");
+      selectSql =
+        "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1' 
AND val2 = 'val2a'";
+      verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+
+      // first verified and then verified
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a4', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
+      conn.commit();
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a4', 'a2', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a5', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
+      conn.commit();
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+      expectedValues = Lists.newArrayList("a4", "a5");
+      selectSql =
+        "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1_4' 
AND val2 = 'val1_4'";
+      verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+    }
+  }
+
+  @Test
+  public void testUncoveredIndexWithDistinctPrefixFilter() throws Exception {
+    Assume.assumeTrue(async == false);
+
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      String dataTableName = generateUniqueName();
+      String uncoveredIndex1 = generateUniqueName();
+      conn.createStatement().execute("create table " + dataTableName
+        + " (id1 varchar(10) not null, id2 varchar(10) not null, val1 
varchar(10), val2 varchar(10), "
+        + "val3 varchar(10), val4 varchar(10) constraint pk primary key(id1, 
id2))"
+        + tableDDLOptions);
+      conn.createStatement().execute("CREATE UNCOVERED INDEX " + 
uncoveredIndex1 + " on "
+        + dataTableName + " (val1)" + this.indexDDLOptions);
+
+      // create orphan unverified index row
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a1', 'val1a', 'val2a', 'val3', 'val4')");
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a2', 'val1a', 'val2a', 'val3', 'val4')");
+      commitWithException(conn);
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+      // add a valid row with same value for id1 column as the unverified row 
above to test that
+      // the DistinctPrefix filter is reset correctly
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a3', 'val1a', 'val2a', 'val31', 'val4')");
+      // add more valid rows with same value for id1 column so that the 
DistinctPrefix filter
+      // can skip those rows
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a4', 'val1a', 'val2b', 'val31', 'val4')");
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a1', 'a5', 'val1a', 'val2b', 'val31', 'val4')");
+      // add another valid row with a different value for id1 column
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a2', 'a1', 'val1a', 'val2a', 'val31', 'val4')");
+      conn.commit();
+
+      ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2");
+      // condition on val1 in WHERE clause so that query will use the 
uncovered index
+      String selectSql = "SELECT distinct(id1) from " + dataTableName + " 
WHERE val1 = 'val1a'";
+      verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, 
expectedValues);
+      expectedValues = Lists.newArrayList("a1");
+      // add extra where conditions to the query
+      selectSql =
+        "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1a' 
AND val2 = 'val2b'";
+      verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, 
expectedValues);
+
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a3', 'a1', 'val1b', 'val2a', 'val31', 'val4')");
+      conn.commit();
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a3', 'a2', 'val1b', 'val2a', 'val3', 'val4')");
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a3', 'a3', 'val1b', 'val2a', 'val3', 'val4')");
+      commitWithException(conn);
+      IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+      conn.createStatement().execute("upsert into " + dataTableName + " "
+        + "values ('a4', 'a1', 'val1b', 'val2a', 'val31', 'val4')");
+      conn.commit();
+      expectedValues = Lists.newArrayList("a3", "a4");
+      selectSql = "SELECT distinct(id1) from " + dataTableName + " WHERE val1 
= 'val1b'";
+      verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql, 
expectedValues);
+    }
+  }
+
+  private void verifyDistinctQueryOnIndex(Connection conn, String indexName, 
String query,
+    List<String> expectedValues) throws SQLException, IOException {
+    try (ResultSet rs = conn.createStatement().executeQuery(query)) {
+      PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+      String actualExplainPlan = 
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+      assertTrue(actualExplainPlan.contains(indexName));
+      assertTrue(actualExplainPlan, actualExplainPlan.contains("SERVER 
DISTINCT PREFIX FILTER"));
+      List actualValues = Lists.newArrayList();
+      while (rs.next()) {
+        actualValues.add(rs.getString(1));
+      }
+      assertEquals(expectedValues, actualValues);
+    } catch (AssertionError e) {
+      TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+      throw e;
+    }
+  }
+
   @Test
   public void testUnverifiedIndexRowWithFirstKeyOnlyFilter() throws Exception {
     if (async) {
diff --git a/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt 
b/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt
new file mode 100644
index 0000000000..df1400b48f
--- /dev/null
+++ b/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt
@@ -0,0 +1,27 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'COUNT(1)'
+'12'
+'ID1','ID2'
+'a','1'
+'a','2'
+'b','1'
+'b','2'
+'c','1'
+'c','2'
diff --git a/phoenix-core/src/it/resources/sql_files/create_distinct.sql 
b/phoenix-core/src/it/resources/sql_files/create_distinct.sql
new file mode 100644
index 0000000000..29943787cb
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/create_distinct.sql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS SCHEMA_0002.TABLE_0002 (ID1 VARCHAR NOT NULL,
+                                                   ID2 VARCHAR NOT NULL,
+                                                   ID3 VARCHAR NOT NULL,
+                                                   COL1 VARCHAR,
+                                                   COL2 INTEGER CONSTRAINT PK 
PRIMARY KEY (ID1, ID2, ID3));
+
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','1','x','data1', 10);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','1','y','data2', 20);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','2','x','data3', 30);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','2','y','data4', 40);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','1','x','data5', 50);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','1','y','data6', 60);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','2','x','data7', 70);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','2','y','data8', 80);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','1','x','data9', 90);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','1','y','data10', 100);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','2','x','data11', 110);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','2','y','data12', 120);
\ No newline at end of file
diff --git a/phoenix-core/src/it/resources/sql_files/query_distinct.sql 
b/phoenix-core/src/it/resources/sql_files/query_distinct.sql
new file mode 100644
index 0000000000..9f7578b008
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/query_distinct.sql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SELECT COUNT(*) FROM SCHEMA_0002.TABLE_0002;
+
+SELECT DISTINCT ID1, ID2 FROM SCHEMA_0002.TABLE_0002 ORDER BY ID1, ID2;
\ No newline at end of file


Reply via email to