This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new fe50526f33 PHOENIX-7758 Read repair with DistinctPrefixFilter can give
incorrect results (#2365)
fe50526f33 is described below
commit fe50526f3305b2f2cf1ed326aaf9965dd1cf680d
Author: tkhurana <[email protected]>
AuthorDate: Wed Feb 25 14:45:22 2026 -0800
PHOENIX-7758 Read repair with DistinctPrefixFilter can give incorrect
results (#2365)
---
.../phoenix/filter/DistinctPrefixFilter.java | 42 ++++++
.../phoenix/filter/EmptyColumnOnlyFilter.java | 19 +--
.../phoenix/iterate/BaseResultIterators.java | 9 +-
.../java/org/apache/phoenix/util/ScanUtil.java | 27 ++++
.../coprocessor/CDCGlobalIndexRegionScanner.java | 4 +-
.../UncoveredGlobalIndexRegionScanner.java | 4 +-
.../coprocessor/UncoveredIndexRegionScanner.java | 21 ++-
.../UncoveredLocalIndexRegionScanner.java | 4 +-
.../apache/phoenix/index/GlobalIndexChecker.java | 23 +--
.../phoenix/iterate/RegionScannerFactory.java | 22 ++-
.../phoenix/end2end/BackwardCompatibilityIT.java | 9 ++
.../end2end/BackwardCompatibilityTestUtil.java | 3 +
.../end2end/index/GlobalIndexCheckerIT.java | 162 +++++++++++++++++++++
.../resources/gold_files/gold_query_distinct.txt | 27 ++++
.../src/it/resources/sql_files/create_distinct.sql | 36 +++++
.../src/it/resources/sql_files/query_distinct.sql | 21 +++
16 files changed, 376 insertions(+), 57 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
index 73261a7bd4..c7c075edd7 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/DistinctPrefixFilter.java
@@ -19,6 +19,7 @@ package org.apache.phoenix.filter;
import java.io.DataInput;
import java.io.DataOutput;
+import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValueUtil;
@@ -33,6 +34,7 @@ import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.ValueSchema.Field;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.ScanUtil;
public class DistinctPrefixFilter extends FilterBase implements Writable {
private static byte VERSION = 1;
@@ -44,6 +46,8 @@ public class DistinctPrefixFilter extends FilterBase
implements Writable {
private int lastPosition;
private final ImmutableBytesWritable lastKey =
new ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY, -1, -1);
+ private byte[] emptyCF;
+ private byte[] emptyCQ;
public DistinctPrefixFilter() {
}
@@ -51,12 +55,28 @@ public class DistinctPrefixFilter extends FilterBase
implements Writable {
public DistinctPrefixFilter(RowKeySchema schema, int prefixLength) {
this.schema = schema;
this.prefixLength = prefixLength;
+ this.emptyCF = null;
+ this.emptyCQ = null;
+ }
+
+ public DistinctPrefixFilter(RowKeySchema schema, int prefixLength, byte[]
emptyCF,
+ byte[] emptyCQ) {
+ this(schema, prefixLength);
+ this.emptyCF = emptyCF;
+ this.emptyCQ = emptyCQ;
}
public void setOffset(int offset) {
this.offset = offset;
}
+ // This is used when the DistinctPrefixFilter is present on a scan on an
uncovered index
+ public void reinitialize() {
+ lastKey.set(ByteUtil.EMPTY_BYTE_ARRAY, -1, -1);
+ lastPosition = -1;
+ filterAll = false;
+ }
+
// No @Override for HBase 3 compatibility
public ReturnCode filterKeyValue(Cell v) throws IOException {
return filterCell(v);
@@ -64,6 +84,10 @@ public class DistinctPrefixFilter extends FilterBase
implements Writable {
@Override
public ReturnCode filterCell(Cell v) throws IOException {
+ if (emptyCF != null && emptyCQ != null && !ScanUtil.isEmptyColumn(v,
emptyCF, emptyCQ)) {
+ // wait for the empty column
+ return ReturnCode.NEXT_COL;
+ }
ImmutableBytesWritable ptr = new ImmutableBytesWritable();
// First determine the prefix based on the schema
@@ -151,6 +175,12 @@ public class DistinctPrefixFilter extends FilterBase
implements Writable {
out.writeByte(VERSION);
schema.write(out);
out.writeInt(prefixLength);
+ if (emptyCF != null && emptyCQ != null) {
+ out.writeInt(emptyCF.length);
+ out.write(emptyCF);
+ out.writeInt(emptyCQ.length);
+ out.write(emptyCQ);
+ }
}
@Override
@@ -159,6 +189,18 @@ public class DistinctPrefixFilter extends FilterBase
implements Writable {
schema = new RowKeySchema();
schema.readFields(in);
prefixLength = in.readInt();
+ try {
+ int length = in.readInt();
+ emptyCF = new byte[length];
+ in.readFully(emptyCF, 0, length);
+ length = in.readInt();
+ emptyCQ = new byte[length];
+ in.readFully(emptyCQ, 0, length);
+ } catch (EOFException e) {
+ // Older client doesn't send empty column information
+ emptyCF = null;
+ emptyCQ = null;
+ }
}
@Override
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
index a5d78112c2..03e0168c3b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/EmptyColumnOnlyFilter.java
@@ -39,7 +39,6 @@ public class EmptyColumnOnlyFilter extends FilterBase
implements Writable {
private byte[] emptyCQ;
private boolean found = false;
private boolean first = true;
- private Cell emptyColumnCell = null;
public EmptyColumnOnlyFilter() {
}
@@ -55,7 +54,6 @@ public class EmptyColumnOnlyFilter extends FilterBase
implements Writable {
public void reset() throws IOException {
found = false;
first = true;
- emptyColumnCell = null;
}
// No @Override for HBase 3 compatibility
@@ -70,7 +68,6 @@ public class EmptyColumnOnlyFilter extends FilterBase
implements Writable {
}
if (ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
found = true;
- emptyColumnCell = cell;
return ReturnCode.INCLUDE;
}
if (first) {
@@ -82,22 +79,8 @@ public class EmptyColumnOnlyFilter extends FilterBase
implements Writable {
@Override
public void filterRowCells(List<Cell> kvs) throws IOException {
- if (kvs.size() > 2) {
- throw new IOException("EmptyColumnOnlyFilter got unexpected cells: " +
kvs.size());
- } else if (kvs.size() == 2) {
- // remove the first cell and only return the empty column cell
+ if (kvs.size() > 1) {
kvs.remove(0);
- } else if (kvs.size() == 1) {
- // we only have 1 cell, check if it is the empty column cell or not
- // since the empty column cell could have been excluded by another
filter like the
- // DistinctPrefixFilter.
- Cell cell = kvs.get(0);
- if (found && !ScanUtil.isEmptyColumn(cell, emptyCF, emptyCQ)) {
- // we found the empty cell, but it was not included so replace the
existing cell
- // with the empty column cell
- kvs.remove(0);
- kvs.add(emptyColumnCell);
- }
}
}
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 328b2eb870..45e160fac8 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -336,9 +336,12 @@ public abstract class BaseResultIterators extends
ExplainTable implements Result
&& groupBy.isOrderPreserving()
&& (context.getAggregationManager().isEmpty() ||
groupBy.isUngroupedAggregate())
) {
-
- ScanUtil.andFilterAtEnd(scan,
- new
DistinctPrefixFilter(plan.getTableRef().getTable().getRowKeySchema(), cols));
+ byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] ecq = table.getEncodingScheme() == NON_ENCODED_QUALIFIERS
+ ? QueryConstants.EMPTY_COLUMN_BYTES
+ :
table.getEncodingScheme().encode(QueryConstants.ENCODED_EMPTY_COLUMN_NAME);
+ ScanUtil.andFilterAtEnd(scan, new DistinctPrefixFilter(
+ plan.getTableRef().getTable().getRowKeySchema(), cols, ecf, ecq));
if (!groupBy.isUngroupedAggregate() && plan.getLimit() != null) {
// We can push the limit to the server,but for UngroupedAggregate
// we can not push the limit.
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 5305408fbc..1aaf672ca8 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1931,6 +1931,33 @@ public class ScanUtil {
return null;
}
+ public static DistinctPrefixFilter findDistinctPrefixFilter(Scan scan) {
+ Filter filter = scan.getFilter();
+ if (filter instanceof PagingFilter) {
+ filter = ((PagingFilter) filter).getDelegateFilter();
+ }
+ return findDistinctPrefixFilter(filter);
+ }
+
+ public static DistinctPrefixFilter findDistinctPrefixFilter(Filter filter) {
+ if (filter == null) {
+ return null;
+ }
+ if (filter instanceof DistinctPrefixFilter) {
+ return (DistinctPrefixFilter) filter;
+ }
+ if (filter instanceof FilterList) {
+ Iterator<Filter> filterIterator = ((FilterList)
filter).getFilters().iterator();
+ while (filterIterator.hasNext()) {
+ DistinctPrefixFilter distinctFilter =
findDistinctPrefixFilter(filterIterator.next());
+ if (distinctFilter != null) {
+ return distinctFilter;
+ }
+ }
+ }
+ return null;
+ }
+
/**
* Verify whether the given row key is in the scan boundaries i.e. scan
start and end keys.
* @param ptr row key.
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 0b49441824..5bb0c40217 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -91,9 +91,9 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
final Scan scan, final RegionCoprocessorEnvironment env, final Scan
dataTableScan,
final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long
pageSizeMs,
- final long queryLimit) throws IOException {
+ final long queryLimit, boolean isDistinct) throws IOException {
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
- viewConstants, ptr, pageSizeMs, queryLimit);
+ viewConstants, ptr, pageSizeMs, queryLimit, isDistinct);
CDCUtil.setupScanForCDC(dataTableScan);
cdcDataTableInfo = CDCTableInfo
.createFromProto(CDCInfoProtos.CDCTableDef.parseFrom(scan.getAttribute(CDC_DATA_TABLE_DEF)));
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index 0a8a0adcde..f457a9f633 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -88,9 +88,9 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
final Scan scan, final RegionCoprocessorEnvironment env, final Scan
dataTableScan,
final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long
pageSizeMs,
- final long queryLimit) throws IOException {
+ final long queryLimit, boolean isDistinct) throws IOException {
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
- viewConstants, ptr, pageSizeMs, queryLimit);
+ viewConstants, ptr, pageSizeMs, queryLimit, isDistinct);
final Configuration config = env.getConfiguration();
hTableFactory = IndexWriterUtils.getDefaultDelegateHTableFactory(env);
rowCountPerTask =
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 1b126a6591..640b85bc5a 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -113,16 +113,25 @@ public abstract class UncoveredIndexRegionScanner extends
BaseRegionScanner {
final Scan scan, final RegionCoprocessorEnvironment env, final Scan
dataTableScan,
final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long
pageSizeMs,
- final long queryLimit) {
+ final long queryLimit, boolean isDistinct) {
super(innerScanner);
final Configuration config = env.getConfiguration();
- byte[] pageSizeFromScan = scan.getAttribute(INDEX_PAGE_ROWS);
- if (pageSizeFromScan != null) {
- pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+ if (isDistinct) {
+ // If the scan has a DistinctPrefix filter set the batch size to 1. This
is because we don't
+ // want to skip rows without first checking if the row is valid or not
and passes any
+ // additional filters evaluated after merging with the data table. Using
a batch of
+ // size 1 is OK when distinct prefix filter is used since if the row is
valid we will jump to
+ // the next unique prefix so ideally we should be scanning very few rows.
+ pageSizeInRows = 1;
} else {
- pageSizeInRows = (int) config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
- QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+ byte[] pageSizeFromScan = scan.getAttribute(INDEX_PAGE_ROWS);
+ if (pageSizeFromScan != null) {
+ pageSizeInRows = (int) Bytes.toLong(pageSizeFromScan);
+ } else {
+ pageSizeInRows = (int) config.getLong(INDEX_PAGE_SIZE_IN_ROWS,
+ QueryServicesOptions.DEFAULT_INDEX_PAGE_SIZE_IN_ROWS);
+ }
}
if (queryLimit != -1) {
pageSizeInRows = Long.min(pageSizeInRows, queryLimit);
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 6b5d124ce0..43c3eb4e48 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -49,9 +49,9 @@ public class UncoveredLocalIndexRegionScanner extends
UncoveredIndexRegionScanne
final Scan scan, final RegionCoprocessorEnvironment env, final Scan
dataTableScan,
final TupleProjector tupleProjector, final IndexMaintainer indexMaintainer,
final byte[][] viewConstants, final ImmutableBytesWritable ptr, final long
pageSizeMs,
- final int offset, final byte[] actualStartKey, final long queryLimit) {
+ final int offset, final byte[] actualStartKey, final long queryLimit,
boolean isDistinct) {
super(innerScanner, region, scan, env, dataTableScan, tupleProjector,
indexMaintainer,
- viewConstants, ptr, pageSizeMs, queryLimit);
+ viewConstants, ptr, pageSizeMs, queryLimit, isDistinct);
this.offset = offset;
this.actualStartKey = actualStartKey;
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 7b0a2e3f82..d9dcd17eea 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -48,8 +48,6 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.PhoenixScannerContext;
@@ -63,7 +61,6 @@ import
org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.DataTableScanMetrics;
import org.apache.phoenix.coprocessor.DelegateRegionScanner;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
-import org.apache.phoenix.filter.EmptyColumnOnlyFilter;
import org.apache.phoenix.filter.PagingFilter;
import org.apache.phoenix.filter.UnverifiedRowFilter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -237,23 +234,7 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
}
private boolean shouldCreateUnverifiedRowFilter(Filter delegateFilter) {
- if (delegateFilter == null) {
- return false;
- }
- Filter wrappedFilter = delegateFilter;
- if (delegateFilter instanceof FilterList) {
- List<Filter> filters = ((FilterList) delegateFilter).getFilters();
- wrappedFilter = filters.get(0);
- }
- // Optimization since FirstKeyOnlyFilter and EmptyColumnOnlyFilter
- // always include the empty column in the scan result
- if (
- wrappedFilter instanceof FirstKeyOnlyFilter
- || wrappedFilter instanceof EmptyColumnOnlyFilter
- ) {
- return false;
- }
- return true;
+ return delegateFilter != null && !indexMaintainer.isUncovered();
}
public boolean next(List<Cell> result, boolean raw, ScannerContext
scannerContext)
@@ -630,7 +611,7 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
long repairStart = EnvironmentEdgeManager.currentTimeMillis();
byte[] rowKey = CellUtil.cloneRow(cell);
- long ts = cellList.get(0).getTimestamp();
+ long ts = getMaxTimestamp(cellList);
cellList.clear();
long repairTime;
try {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
index bda112e709..1cbb6f67d3 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java
@@ -52,6 +52,7 @@ import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.KeyValueColumnExpression;
+import org.apache.phoenix.filter.DistinctPrefixFilter;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.query.QueryConstants;
@@ -117,6 +118,7 @@ public abstract class RegionScannerFactory {
final long pageSizeMs = ScanUtil.getPageSizeMsForRegionScanner(scan);
Expression extraWhere = null;
long extraLimit = -1;
+ DistinctPrefixFilter distinctFilter = null;
{
// for indexes construct the row filter for uncovered columns if it
exists
@@ -169,19 +171,22 @@ public abstract class RegionScannerFactory {
dataTableScan.addColumn(column.getFamily(),
column.getQualifier());
}
}
+ // If the DistinctPrefix filter is present on the scan we set the
batch size to 1
+ // when scanning uncovered index rows
+ distinctFilter = ScanUtil.findDistinctPrefixFilter(scan);
if (ScanUtil.isLocalIndex(scan)) {
s = new UncoveredLocalIndexRegionScanner(regionScanner,
dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants,
ptr, pageSizeMs,
- offset, actualStartKey, extraLimit);
+ offset, actualStartKey, extraLimit, distinctFilter != null);
} else {
if (scan.getAttribute(CDC_DATA_TABLE_DEF) != null) {
s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion,
scan, env,
dataTableScan, tupleProjector, indexMaintainer,
viewConstants, ptr, pageSizeMs,
- extraLimit);
+ extraLimit, distinctFilter != null);
} else {
s = new UncoveredGlobalIndexRegionScanner(regionScanner,
dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer,
viewConstants, ptr, pageSizeMs,
- extraLimit);
+ extraLimit, distinctFilter != null);
}
}
}
@@ -253,6 +258,11 @@ public abstract class RegionScannerFactory {
return true;
}
if (result.size() == 0) {
+ if (distinctFilter != null) {
+ // we got an orphaned uncovered index row just reinitialize the
distinct filter and
+ // move to the new row
+ distinctFilter.reinitialize();
+ }
return next;
}
if ((ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) &&
!ScanUtil.isAnalyzeTable(scan)) {
@@ -274,6 +284,12 @@ public abstract class RegionScannerFactory {
extraWhere.evaluate(merged, ptr);
if
(!Boolean.TRUE.equals(extraWhere.getDataType().toObject(ptr))) {
result.clear();
+ if (distinctFilter != null) {
+ // The current row was rejected after evaluating the extra
where conditions.
+ // We can't skip to the next unique key prefix as that could
result in skipping
+ // valid result so reinitialize the distinct filter and move
to the next row
+ distinctFilter.reinitialize();
+ }
return next;
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
index f509787599..a2487cd778 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityIT.java
@@ -21,6 +21,7 @@ import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DATA;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_DELETE;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.ADD_VIEW_INDEX;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ADD;
+import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DISTINCT;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_DIVERGED_VIEW;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_OFFSET;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.CREATE_ORDERED_GROUP_BY;
@@ -31,6 +32,7 @@ import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ADD_DELETE;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_ADD;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_CREATE_DIVERGED_VIEW;
+import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_DISTINCT;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_INDEX_REBUILD_ASYNC;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_OFFSET;
import static
org.apache.phoenix.end2end.BackwardCompatibilityTestUtil.QUERY_ORDERED_GROUP_BY;
@@ -475,6 +477,13 @@ public class BackwardCompatibilityIT {
assertExpectedOutput(QUERY_ORDER_BY_NON_PK);
}
+ @Test
+ public void testDistinctPrefixAddDataByNewClientReadByOldClient() throws
Exception {
+ executeQueriesWithCurrentVersion(CREATE_DISTINCT, url, NONE);
+ executeQueryWithClientVersion(compatibleClientVersion, QUERY_DISTINCT,
zkQuorum);
+ assertExpectedOutput(QUERY_DISTINCT);
+ }
+
private boolean isClientCompatibleForOrderedGroupByQuery() {
String[] clientVersion = compatibleClientVersion.getVersion().split("\\.");
int majorVersion = Integer.parseInt(clientVersion[0]);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
index d72e415085..bfea0ed716 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BackwardCompatibilityTestUtil.java
@@ -84,10 +84,12 @@ public final class BackwardCompatibilityTestUtil {
public static final String ORDERED_GROUP_BY = "ordered_groupby";
public static final String ORDER_BY_NON_PK = "orderby_nonpk";
public static final String OFFSET = "offset";
+ public static final String DISTINCT = "distinct";
public static final String CREATE_UNORDERED_GROUP_BY = "create_" +
UNORDERED_GROUP_BY;
public static final String CREATE_ORDERED_GROUP_BY = "create_" +
ORDERED_GROUP_BY;
public static final String CREATE_ORDER_BY_NON_PK = "create_" +
ORDER_BY_NON_PK;
public static final String CREATE_OFFSET = "create_" + OFFSET;
+ public static final String CREATE_DISTINCT = "create_" + DISTINCT;
public static final String ADD_DATA = "add_data";
public static final String ADD_DELETE = "add_delete";
public static final String ADD_VIEW_INDEX = "add_view_index";
@@ -99,6 +101,7 @@ public final class BackwardCompatibilityTestUtil {
public static final String QUERY_ORDERED_GROUP_BY = QUERY_PREFIX +
ORDERED_GROUP_BY;
public static final String QUERY_OFFSET = QUERY_PREFIX + OFFSET;
public static final String QUERY_ORDER_BY_NON_PK = QUERY_PREFIX +
ORDER_BY_NON_PK;
+ public static final String QUERY_DISTINCT = QUERY_PREFIX + DISTINCT;
public static final String QUERY_CREATE_ADD = QUERY_PREFIX + CREATE_ADD;
public static final String QUERY_ADD_DATA = QUERY_PREFIX + ADD_DATA;
public static final String QUERY_ADD_DELETE = QUERY_PREFIX + ADD_DELETE;
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 0abe583a18..67ccf665e2 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -36,12 +36,14 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
@@ -1361,6 +1363,166 @@ public class GlobalIndexCheckerIT extends BaseTest {
}
}
+ @Test
+ public void testReadRepairWithDistinctPrefixFilter() throws Exception {
+ Assume.assumeTrue(async == false);
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ conn.createStatement().execute("create table " + dataTableName
+ + " (id1 varchar(10) not null, id2 varchar(10) not null, val1
varchar(10), val2 varchar(10), "
+ + "val3 varchar(10), val4 varchar(10) constraint pk primary key(id1,
id2))"
+ + tableDDLOptions);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " on " +
dataTableName
+ + " (val1, val2) include (val3, val4)" + this.indexDDLOptions);
+
+ // create orphan unverified index row
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a2', 'val1', 'val2a', 'val3', 'val4')");
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')");
+ commitWithException(conn);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a3', 'val1', 'val2a', 'val31', 'val4')");
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a2', 'a1', 'val1', 'val2a', 'val31', 'val4')");
+ conn.commit();
+
+ // create an unverified update to the index row pointing to an existing
data row
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a2', 'a1', 'val1', 'val1b', 'val3', 'val4')");
+ commitWithException(conn);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a2', 'a2', 'val1', 'val1b', 'val3', 'val4')");
+ conn.commit();
+
+ ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2");
+ String selectSql =
+ "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1'
AND val2 = 'val2a'";
+ verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+
+ expectedValues = Lists.newArrayList("a2");
+ selectSql =
+ "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1'
AND val2 = 'val1b'";
+ verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a3', 'a2', 'val1', 'val2a', 'val3', 'val4')");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ expectedValues = Lists.newArrayList("a1", "a2", "a3");
+ selectSql =
+ "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1'
AND val2 = 'val2a'";
+ verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+
+ // first verified and then verified
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a4', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a4', 'a2', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a5', 'a1', 'val1_4', 'val1_4', 'val1_4', 'val1_4')");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ expectedValues = Lists.newArrayList("a4", "a5");
+ selectSql =
+ "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1_4'
AND val2 = 'val1_4'";
+ verifyDistinctQueryOnIndex(conn, indexName, selectSql, expectedValues);
+ }
+ }
+
+ @Test
+ public void testUncoveredIndexWithDistinctPrefixFilter() throws Exception {
+ Assume.assumeTrue(async == false);
+
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ String dataTableName = generateUniqueName();
+ String uncoveredIndex1 = generateUniqueName();
+ conn.createStatement().execute("create table " + dataTableName
+ + " (id1 varchar(10) not null, id2 varchar(10) not null, val1
varchar(10), val2 varchar(10), "
+ + "val3 varchar(10), val4 varchar(10) constraint pk primary key(id1,
id2))"
+ + tableDDLOptions);
+ conn.createStatement().execute("CREATE UNCOVERED INDEX " +
uncoveredIndex1 + " on "
+ + dataTableName + " (val1)" + this.indexDDLOptions);
+
+ // create orphan unverified index row
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a1', 'val1a', 'val2a', 'val3', 'val4')");
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a2', 'val1a', 'val2a', 'val3', 'val4')");
+ commitWithException(conn);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ // add a valid row with same value for id1 column as the unverified row
above to test that
+ // the DistinctPrefix filter is reset correctly
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a3', 'val1a', 'val2a', 'val31', 'val4')");
+ // add more valid rows with same value for id1 column so that the
DistinctPrefix filter
+ // can skip those rows
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a4', 'val1a', 'val2b', 'val31', 'val4')");
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a1', 'a5', 'val1a', 'val2b', 'val31', 'val4')");
+ // add another valid row with a different value for id1 column
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a2', 'a1', 'val1a', 'val2a', 'val31', 'val4')");
+ conn.commit();
+
+ ArrayList<String> expectedValues = Lists.newArrayList("a1", "a2");
+ // condition on val1 in WHERE clause so that query will use the
uncovered index
+ String selectSql = "SELECT distinct(id1) from " + dataTableName + "
WHERE val1 = 'val1a'";
+ verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql,
expectedValues);
+ expectedValues = Lists.newArrayList("a1");
+ // add extra where conditions to the query
+ selectSql =
+ "SELECT distinct(id1) from " + dataTableName + " WHERE val1 = 'val1a'
AND val2 = 'val2b'";
+ verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql,
expectedValues);
+
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a3', 'a1', 'val1b', 'val2a', 'val31', 'val4')");
+ conn.commit();
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(true);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a3', 'a2', 'val1b', 'val2a', 'val3', 'val4')");
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a3', 'a3', 'val1b', 'val2a', 'val3', 'val4')");
+ commitWithException(conn);
+ IndexRegionObserver.setFailDataTableUpdatesForTesting(false);
+ conn.createStatement().execute("upsert into " + dataTableName + " "
+ + "values ('a4', 'a1', 'val1b', 'val2a', 'val31', 'val4')");
+ conn.commit();
+ expectedValues = Lists.newArrayList("a3", "a4");
+ selectSql = "SELECT distinct(id1) from " + dataTableName + " WHERE val1
= 'val1b'";
+ verifyDistinctQueryOnIndex(conn, uncoveredIndex1, selectSql,
expectedValues);
+ }
+ }
+
+ private void verifyDistinctQueryOnIndex(Connection conn, String indexName,
String query,
+ List<String> expectedValues) throws SQLException, IOException {
+ try (ResultSet rs = conn.createStatement().executeQuery(query)) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String actualExplainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(actualExplainPlan.contains(indexName));
+ assertTrue(actualExplainPlan, actualExplainPlan.contains("SERVER
DISTINCT PREFIX FILTER"));
+ List actualValues = Lists.newArrayList();
+ while (rs.next()) {
+ actualValues.add(rs.getString(1));
+ }
+ assertEquals(expectedValues, actualValues);
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+ throw e;
+ }
+ }
+
@Test
public void testUnverifiedIndexRowWithFirstKeyOnlyFilter() throws Exception {
if (async) {
diff --git a/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt
b/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt
new file mode 100644
index 0000000000..df1400b48f
--- /dev/null
+++ b/phoenix-core/src/it/resources/gold_files/gold_query_distinct.txt
@@ -0,0 +1,27 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+'COUNT(1)'
+'12'
+'ID1','ID2'
+'a','1'
+'a','2'
+'b','1'
+'b','2'
+'c','1'
+'c','2'
diff --git a/phoenix-core/src/it/resources/sql_files/create_distinct.sql
b/phoenix-core/src/it/resources/sql_files/create_distinct.sql
new file mode 100644
index 0000000000..29943787cb
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/create_distinct.sql
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS SCHEMA_0002.TABLE_0002 (ID1 VARCHAR NOT NULL,
+ ID2 VARCHAR NOT NULL,
+ ID3 VARCHAR NOT NULL,
+ COL1 VARCHAR,
+ COL2 INTEGER CONSTRAINT PK
PRIMARY KEY (ID1, ID2, ID3));
+
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','1','x','data1', 10);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','1','y','data2', 20);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','2','x','data3', 30);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('a','2','y','data4', 40);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','1','x','data5', 50);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','1','y','data6', 60);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','2','x','data7', 70);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('b','2','y','data8', 80);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','1','x','data9', 90);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','1','y','data10', 100);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','2','x','data11', 110);
+UPSERT INTO SCHEMA_0002.TABLE_0002 VALUES ('c','2','y','data12', 120);
\ No newline at end of file
diff --git a/phoenix-core/src/it/resources/sql_files/query_distinct.sql
b/phoenix-core/src/it/resources/sql_files/query_distinct.sql
new file mode 100644
index 0000000000..9f7578b008
--- /dev/null
+++ b/phoenix-core/src/it/resources/sql_files/query_distinct.sql
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SELECT COUNT(*) FROM SCHEMA_0002.TABLE_0002;
+
+SELECT DISTINCT ID1, ID2 FROM SCHEMA_0002.TABLE_0002 ORDER BY ID1, ID2;
\ No newline at end of file