This is an automated email from the ASF dual-hosted git repository.
virajjasani pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new 997320228e PHOENIX-7878 CDC perf improvement - skip redundant cell
versions on data table scans (#2493)
997320228e is described below
commit 997320228e67afd76c0a1210864770b46ce09cce
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed Jun 10 14:51:45 2026 -0700
PHOENIX-7878 CDC perf improvement - skip redundant cell versions on data
table scans (#2493)
---
.../apache/phoenix/filter/CDCVersionFilter.java | 210 +++++++++++++++
.../coprocessor/CDCGlobalIndexRegionScanner.java | 65 ++++-
.../java/org/apache/phoenix/end2end/CDCBaseIT.java | 184 ++++++++++++-
.../org/apache/phoenix/end2end/CDCQueryIT.java | 83 +++++-
.../phoenix/filter/CDCVersionFilterTest.java | 296 +++++++++++++++++++++
5 files changed, 814 insertions(+), 24 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
new file mode 100644
index 0000000000..794ad2ee40
--- /dev/null
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Filter for CDC data table scans that prunes redundant cell versions. For
each data row the filter
+ * is given the {@code [min, max]} band of change timestamps for that row (the
oldest and newest CDC
+ * change seen in the current scan). Per column (family:qualifier), with cells
delivered in
+ * timestamp-descending order, it includes only:
+ * <ul>
+ * <li>cells whose timestamp is within {@code [min, max]} (the change cells),
and</li>
+ * <li>the first cell strictly below {@code min} (the pre-image for the oldest
change), and</li>
+ * <li>all DeleteFamily / DeleteFamilyVersion markers (needed for CDC deletion
tracking).</li>
+ * </ul>
+ * Cells newer than {@code max}, and all but the first cell older than {@code
min}, are skipped,
+ * reducing network I/O, memory, and processing overhead on the data table
scan.
+ */
+public class CDCVersionFilter extends FilterBase implements Writable {
+
+ private Map<ImmutableBytesPtr, long[]> rangeMap;
+
+ // Per-row state
+ private long[] currentRange;
+ private byte[] currentRowKey;
+
+ // Per-column state
+ private byte[] prevFamily;
+ private byte[] prevQualifier;
+
+ private boolean belowMinEmitted;
+
+ public CDCVersionFilter() {
+ }
+
+ /**
+ * @param rangeMap mapping of data row key to a 2-element {@code [min, max]}
array giving the
+ * oldest and newest change timestamp for that row
+ */
+ public CDCVersionFilter(Map<ImmutableBytesPtr, long[]> rangeMap) {
+ this.rangeMap = rangeMap;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ currentRange = null;
+ currentRowKey = null;
+ resetColumnState();
+ }
+
+ private void resetColumnState() {
+ prevFamily = null;
+ prevQualifier = null;
+ belowMinEmitted = false;
+ }
+
+ private boolean isNewRow(Cell cell) {
+ if (currentRowKey == null) {
+ return true;
+ }
+ return !Bytes.equals(currentRowKey, 0, currentRowKey.length,
cell.getRowArray(),
+ cell.getRowOffset(), cell.getRowLength());
+ }
+
+ private void onNewRow(Cell cell) {
+ currentRowKey = CellUtil.cloneRow(cell);
+ ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(currentRowKey);
+ currentRange = rangeMap != null ? rangeMap.get(rowKeyPtr) : null;
+ resetColumnState();
+ }
+
+ private boolean isNewColumn(Cell cell) {
+ if (prevFamily == null) {
+ return true;
+ }
+ if (
+ !Bytes.equals(prevFamily, 0, prevFamily.length, cell.getFamilyArray(),
cell.getFamilyOffset(),
+ cell.getFamilyLength())
+ ) {
+ return true;
+ }
+ return !Bytes.equals(prevQualifier, 0, prevQualifier.length,
cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength());
+ }
+
+ private void trackColumn(Cell cell) {
+ prevFamily = CellUtil.cloneFamily(cell);
+ prevQualifier = CellUtil.cloneQualifier(cell);
+ }
+
+ // No @Override for HBase 3 compatibility
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ return filterCell(v);
+ }
+
+ @Override
+ public ReturnCode filterCell(Cell cell) throws IOException {
+ if (isNewRow(cell)) {
+ onNewRow(cell);
+ }
+
+ if (currentRange == null) {
+ return ReturnCode.INCLUDE;
+ }
+
+ Cell.Type type = cell.getType();
+ if (type == Cell.Type.DeleteFamily || type ==
Cell.Type.DeleteFamilyVersion) {
+ return ReturnCode.INCLUDE;
+ }
+
+ if (isNewColumn(cell)) {
+ trackColumn(cell);
+ belowMinEmitted = false;
+ }
+
+ long cellTs = cell.getTimestamp();
+ long minChangeTs = currentRange[0];
+ long maxChangeTs = currentRange[1];
+
+ if (cellTs > maxChangeTs) {
+ return ReturnCode.SKIP;
+ }
+ if (cellTs >= minChangeTs) {
+ return ReturnCode.INCLUDE;
+ }
+ if (!belowMinEmitted) {
+ belowMinEmitted = true;
+ return ReturnCode.INCLUDE;
+ }
+ return ReturnCode.NEXT_COL;
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return Writables.getBytes(this);
+ }
+
+ public static CDCVersionFilter parseFrom(byte[] pbBytes) throws
DeserializationException {
+ try {
+ return (CDCVersionFilter) Writables.getWritable(pbBytes, new
CDCVersionFilter());
+ } catch (IOException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ if (rangeMap == null) {
+ out.writeInt(0);
+ return;
+ }
+ out.writeInt(rangeMap.size());
+ for (Map.Entry<ImmutableBytesPtr, long[]> entry : rangeMap.entrySet()) {
+ ImmutableBytesPtr key = entry.getKey();
+ long[] range = entry.getValue();
+ out.writeInt(key.getLength());
+ out.write(key.get(), key.getOffset(), key.getLength());
+ out.writeLong(range[0]);
+ out.writeLong(range[1]);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int numRows = in.readInt();
+ if (numRows < 0) {
+ throw new IOException("Invalid CDCVersionFilter row count: " + numRows);
+ }
+ rangeMap = new HashMap<>();
+ for (int i = 0; i < numRows; i++) {
+ int keyLen = in.readInt();
+ if (keyLen < 0) {
+ throw new IOException("Invalid CDCVersionFilter row key length: " +
keyLen);
+ }
+ byte[] keyBytes = new byte[keyLen];
+ in.readFully(keyBytes);
+ long minChangeTs = in.readLong();
+ long maxChangeTs = in.readLong();
+ rangeMap.put(new ImmutableBytesPtr(keyBytes), new long[] { minChangeTs,
maxChangeTs });
+ }
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 97b388dc43..b061cfe2f2 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -28,6 +28,7 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -40,6 +41,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -49,6 +52,7 @@ import
org.apache.phoenix.coprocessor.generated.IndexMutationsProtos;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.SingleCellColumnExpression;
+import org.apache.phoenix.filter.CDCVersionFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.CDCTableInfo;
import org.apache.phoenix.index.IndexMaintainer;
@@ -123,21 +127,60 @@ public class CDCGlobalIndexRegionScanner extends
UncoveredGlobalIndexRegionScann
) {
return null;
}
- // TODO: Get Timerange from the start row and end row of the index scan
object
- // and set it in the datatable scan object.
- // if (scan.getStartRow().length == 8) {
- // startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
- // scan.getStartRow(), 0, SortOrder.getDefault());
- // }
- // if (scan.getStopRow().length == 8) {
- // stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
- // scan.getStopRow(), 0, SortOrder.getDefault());
- // }
Scan dataScan = prepareDataTableScan(dataRowKeys, true);
if (dataScan == null) {
return null;
}
- return CDCUtil.setupScanForCDC(dataScan);
+ CDCUtil.setupScanForCDC(dataScan);
+ Map<ImmutableBytesPtr, long[]> changeRangeMap =
buildDataRowChangeRangeMap(dataRowKeys);
+ if (!changeRangeMap.isEmpty()) {
+ CDCVersionFilter versionFilter = new CDCVersionFilter(changeRangeMap);
+ Filter existingFilter = dataScan.getFilter();
+ if (existingFilter != null) {
+ dataScan.setFilter(
+ new FilterList(FilterList.Operator.MUST_PASS_ALL, existingFilter,
versionFilter));
+ } else {
+ dataScan.setFilter(versionFilter);
+ }
+ }
+ return dataScan;
+ }
+
+ private Map<ImmutableBytesPtr, long[]>
+ buildDataRowChangeRangeMap(Collection<byte[]> dataRowKeys) {
+ Set<ImmutableBytesPtr> scopedRowKeys = new HashSet<>(dataRowKeys.size());
+ for (byte[] dataRowKey : dataRowKeys) {
+ scopedRowKeys.add(new ImmutableBytesPtr(dataRowKey));
+ }
+ Map<ImmutableBytesPtr, long[]> result = new HashMap<>();
+ for (List<Cell> indexRow : indexRows) {
+ if (indexRow.isEmpty()) {
+ continue;
+ }
+ Cell firstCell = indexRow.get(0);
+ byte[] indexRowKey =
ImmutableBytesPtr.cloneCellRowIfNecessary(firstCell);
+ byte[] dataRowKey = indexToDataRowKeyMap.get(indexRowKey);
+ if (dataRowKey == null) {
+ continue;
+ }
+ ImmutableBytesPtr dataRowKeyPtr = new ImmutableBytesPtr(dataRowKey);
+ if (!scopedRowKeys.contains(dataRowKeyPtr)) {
+ continue;
+ }
+ long changeTs = firstCell.getTimestamp();
+ long[] range = result.get(dataRowKeyPtr);
+ if (range == null) {
+ result.put(dataRowKeyPtr, new long[] { changeTs, changeTs });
+ } else {
+ if (changeTs < range[0]) {
+ range[0] = changeTs;
+ }
+ if (changeTs > range[1]) {
+ range[1] = changeTs;
+ }
+ }
+ }
+ return result;
}
protected boolean getNextCoveredIndexRow(List<Cell> result) throws
IOException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 167f059be7..af33d4d8bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -614,6 +614,149 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
return changes;
}
+ protected List<ChangeRow> generateChangesForPrePostImage(long startTS,
String[] tenantids,
+ String tableName, CommitAdapter committer) throws Exception {
+ List<ChangeRow> changes = new ArrayList<>();
+ EnvironmentEdgeManager.injectEdge(injectEdge);
+ injectEdge.setValue(startTS);
+ committer.init();
+ Map<String, Object> rowid1 = new HashMap() {
+ {
+ put("K", 1);
+ }
+ };
+ Map<String, Object> rowid2 = new HashMap() {
+ {
+ put("K", 2);
+ }
+ };
+ long ts = startTS;
+ for (String tid : tenantids) {
+ try (Connection conn = committer.getConnection(tid)) {
+ // Initial inserts for two rows at the same timestamp (one batch,
multiple data rows).
+ changes.add(
+ addChange(conn, tableName, new ChangeRow(tid, ts, rowid1, new
TreeMap<String, Object>() {
+ {
+ put("V1", 1L);
+ put("V2", 10L);
+ put("V3", 1000L);
+ put("B.VB", 100L);
+ }
+ })));
+ changes.add(
+ addChange(conn, tableName, new ChangeRow(tid, ts, rowid2, new
TreeMap<String, Object>() {
+ {
+ put("V1", 200L);
+ put("V2", 2000L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Partial update: only V1 changes, so V2/V3/B.VB pre-images come from
the prior version.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V1", 2L);
+ }
+ })));
+ committer.commit(conn);
+
+ // V3 and B.VB remain untouched here, so they stay sparse relative to
the V1/V2 churn.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V1", 3L);
+ put("V2", 20L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Column-level null -> DeleteColumn marker on V2.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V2", null);
+ }
+ })));
+ committer.commit(conn);
+
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V1", 4L);
+ put("B.VB", 400L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Full-row delete -> DeleteFamily marker.
+ changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100,
rowid1, null)));
+ committer.commit(conn);
+
+ // Re-insert after delete: PRE image must be empty (bounded by the
delete family marker).
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V1", 5L);
+ put("V2", 50L);
+ }
+ })));
+ committer.commit(conn);
+
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("B.VB", 500L);
+ }
+ })));
+ committer.commit(conn);
+
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V1", 6L);
+ put("V2", 60L);
+ put("V3", 6000L);
+ put("B.VB", 600L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Second row mutated again so the scan batch keeps covering multiple
data rows.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid2, new TreeMap<String, Object>() {
+ {
+ put("V1", 201L);
+ }
+ })));
+ committer.commit(conn);
+
+ // Consecutive full-row deletes: CDC collapses these into a single
delete event.
+ changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100,
rowid1, null)));
+ committer.commit(conn);
+ changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100,
rowid1, null)));
+ committer.commit(conn);
+
+ // Re-insert a single column after the deletes.
+ changes.add(addChange(conn, tableName,
+ new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+ {
+ put("V1", 7L);
+ }
+ })));
+ committer.commit(conn);
+ }
+ ts += 100;
+ }
+ committer.reset();
+ for (int i = 0; i < changes.size(); ++i) {
+ LOGGER.debug("----- generated change: " + i + " tenantId:" +
changes.get(i).tenantId
+ + " changeTS: " + changes.get(i).changeTS + " pks: " +
changes.get(i).pks + " change: "
+ + changes.get(i).change);
+ }
+ return changes;
+ }
+
protected void verifyChangesViaSCN(String tenantId, Connection conn, String
cdcFullName,
Map<String, String> pkColumns, String dataTableName, Map<String, String>
dataColumns,
List<ChangeRow> changes, long startTS, long endTS) throws Exception {
@@ -682,14 +825,19 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
if (!changeRow.getChangeType().equals(cdcObj.get(CDC_EVENT_TYPE))) {
assertEquals(changeDesc, changeRow.getChangeType(),
cdcObj.get(CDC_EVENT_TYPE));
}
- if (
- cdcObj.containsKey(CDC_PRE_IMAGE) && !((Map)
cdcObj.get(CDC_PRE_IMAGE)).isEmpty()
- && changeScopes.contains(PTable.CDCChangeScope.PRE)
- ) {
- Map<String, Object> preImage = getRowImage(changeDesc, tenantId,
dataTableName, dataColumns,
- changeRow, changeRow.changeTS);
- assertEquals(changeDesc, preImage,
- fillInNulls((Map<String, Object>) cdcObj.get(CDC_PRE_IMAGE),
dataColumns.keySet()));
+ if (changeScopes.contains(PTable.CDCChangeScope.PRE)) {
+ Map<String, Object> cdcPreImage = (Map<String, Object>)
cdcObj.get(CDC_PRE_IMAGE);
+ Map<String, Object> expectedPreImage = getRowImage(changeDesc,
tenantId, dataTableName,
+ dataColumns, changeRow, changeRow.changeTS, false);
+ if (expectedPreImage == null) {
+ // No live row state immediately before the change (first insert, or
re-insert after a
+ // delete): the pre-image must be empty or absent.
+ assertTrue(changeDesc + " expected empty pre-image but got: " +
cdcPreImage,
+ cdcPreImage == null || cdcPreImage.isEmpty());
+ } else {
+ assertEquals(changeDesc, expectedPreImage,
+ fillInNulls(cdcPreImage, dataColumns.keySet()));
+ }
}
if (changeScopes.contains(PTable.CDCChangeScope.CHANGE)) {
assertEquals(changeDesc,
@@ -716,6 +864,19 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
protected Map<String, Object> getRowImage(String changeDesc, String tenantId,
String dataTableName, Map<String, String> dataColumns, ChangeRow
changeRow, long scnTimestamp)
throws Exception {
+ return getRowImage(changeDesc, tenantId, dataTableName, dataColumns,
changeRow, scnTimestamp,
+ true);
+ }
+
+ /**
+ * Reads the row image (all data columns, nulls included) as of {@code
scnTimestamp}. When
+ * {@code mustExist} is true, the row is asserted to exist. When false, a
missing row returns
+ * {@code null} so callers can distinguish "no live row state at this SCN"
from "row present (with
+ * possibly null column values)" - used to assert expected-empty vs
expected-nonempty pre-images.
+ */
+ protected Map<String, Object> getRowImage(String changeDesc, String tenantId,
+ String dataTableName, Map<String, String> dataColumns, ChangeRow
changeRow, long scnTimestamp,
+ boolean mustExist) throws Exception {
Map<String, Object> image = new HashMap<>();
Properties props = new Properties();
props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB,
Long.toString(scnTimestamp));
@@ -734,7 +895,12 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
}
// Create projection without namespace.
ResultSet rs = stmt.executeQuery();
- assertTrue(changeDesc, rs.next());
+ if (!rs.next()) {
+ if (mustExist) {
+ fail(changeDesc + " expected data table row to exist at SCN " +
scnTimestamp);
+ }
+ return null;
+ }
for (String colName : projections.keySet()) {
PDataType dt = PDataType.fromSqlTypeName(dataColumns.get(colName));
image.put(colName,
getJsonEncodedValue(rs.getObject(projections.get(colName)), dt));
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 9a86354778..385e4401ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -337,7 +337,6 @@ public class CDCQueryIT extends CDCBaseIT {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, new TreeMap<String, String>() {
{
put("K", "INTEGER");
@@ -434,6 +433,85 @@ public class CDCQueryIT extends CDCBaseIT {
}
}
+ /**
+ * Exercises CDC PRE/POST image reconstruction over a row with a deep stack
of cell versions
+ * interleaved with column-level nulls, full-row deletes, consecutive
deletes and re-inserts. This
+ * specifically stresses the server-side version pruning applied to the data
table scan: the PRE
+ * and POST images are recomputed independently via SCN queries on the data
table and compared
+ * against the CDC output, so any over-pruning of needed versions surfaces
as a mismatch.
+ */
+ @Test
+ public void testSelectCDCPreAndPostImageWithVersionPruning() throws
Exception {
+ String cdcName, cdc_sql;
+ String schemaName = getSchemaName();
+ String tableName = getTableOrViewName(schemaName);
+ String datatableName = tableName;
+ try (Connection conn = newConnection()) {
+ createTable(conn,
+ "CREATE TABLE " + tableName + " (" + (multitenant ? "TENANT_ID
CHAR(5) NOT NULL, " : "")
+ + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, v3 INTEGER, B.vb
INTEGER, "
+ + "CONSTRAINT PK PRIMARY KEY " + (multitenant ? "(TENANT_ID, k) " :
"(k)") + ")",
+ encodingScheme, multitenant, tableSaltBuckets, false, null);
+ if (forView) {
+ String viewName = getTableOrViewName(schemaName);
+ createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " +
tableName,
+ encodingScheme);
+ tableName = viewName;
+ }
+ cdcName = getCDCName();
+ cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+ createCDC(conn, cdc_sql, encodingScheme);
+ }
+
+ String tenantId = multitenant ? "1000" : null;
+ String[] tenantids = { tenantId };
+ if (multitenant) {
+ tenantids = new String[] { tenantId, "2000" };
+ }
+
+ long startTS = System.currentTimeMillis();
+ List<ChangeRow> changes =
+ generateChangesForPrePostImage(startTS, tenantids, tableName,
COMMIT_SUCCESS);
+ long currentTime = System.currentTimeMillis();
+ long endTS = changes.get(changes.size() - 1).getTimestamp() + 1;
+ if (endTS > currentTime) {
+ Thread.sleep(endTS - currentTime);
+ }
+
+ Map<String, String> dataColumns = new TreeMap<String, String>() {
+ {
+ put("V1", "INTEGER");
+ put("V2", "INTEGER");
+ put("V3", "INTEGER");
+ put("B.VB", "INTEGER");
+ }
+ };
+ String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+ try (Connection conn = newConnection(tenantId)) {
+ dumpCDCResults(conn, cdcName, new TreeMap<String, String>() {
+ {
+ put("K", "INTEGER");
+ }
+ }, addPartitionInList(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," +
"\"CDC JSON\" FROM "
+ + cdcFullName));
+
+ // Verify PRE and POST images are correctly reconstructed from the
version-pruned data scan.
+ verifyChangesViaSCN(tenantId,
+ getCDCQueryPreparedStatement(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName,
startTS, endTS)
+ .executeQuery(),
+ datatableName, dataColumns, changes, PRE_POST_IMG);
+
+ // Cross-check CHANGE, PRE and POST images together.
+ verifyChangesViaSCN(tenantId,
+ getCDCQueryPreparedStatement(conn, cdcFullName,
+ "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " +
cdcFullName, startTS, endTS)
+ .executeQuery(),
+ datatableName, dataColumns, changes, ALL_IMG);
+ }
+ }
+
@Test
public void testSelectGeneric() throws Exception {
String cdcName, cdc_sql;
@@ -494,7 +572,6 @@ public class CDCQueryIT extends CDCBaseIT {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn,
cdcFullName,
"SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName));
@@ -598,7 +675,6 @@ public class CDCQueryIT extends CDCBaseIT {
};
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, new TreeMap<String, String>() {
{
put("K", "INTEGER");
@@ -681,7 +757,6 @@ public class CDCQueryIT extends CDCBaseIT {
String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
try (Connection conn = newConnection(tenantId)) {
- // For debug: uncomment to see the exact results logged to console.
dumpCDCResults(conn, cdcName, pkColumns,
"SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName);
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
new file mode 100644
index 0000000000..40036d3440
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link CDCVersionFilter}, which keeps, per column, the cells
within the per-row
+ * change-timestamp band {@code [min, max]} plus the first cell below {@code
min} (the pre-image),
+ * and always keeps DeleteFamily markers.
+ */
+public class CDCVersionFilterTest {
+
+ private static final byte[] ROW1 = Bytes.toBytes("row1");
+ private static final byte[] ROW2 = Bytes.toBytes("row2");
+ private static final byte[] CF1 = Bytes.toBytes("cf");
+ private static final byte[] CF2 = Bytes.toBytes("cf2");
+ private static final byte[] CQ1 = Bytes.toBytes("cq1");
+ private static final byte[] CQ2 = Bytes.toBytes("cq2");
+ private static final byte[] VAL = Bytes.toBytes("v");
+
+ private static final byte[] CF = CF1;
+
+ private Cell put(byte[] row, byte[] cf, byte[] cq, long ts) {
+ return new KeyValue(row, cf, cq, ts, KeyValue.Type.Put, VAL);
+ }
+
+ private Cell deleteColumn(byte[] row, byte[] cf, byte[] cq, long ts) {
+ return new KeyValue(row, cf, cq, ts, KeyValue.Type.DeleteColumn);
+ }
+
+ private Cell pointDelete(byte[] row, byte[] cf, byte[] cq, long ts) {
+ return new KeyValue(row, cf, cq, ts, KeyValue.Type.Delete);
+ }
+
+ private Cell deleteFamily(byte[] row, byte[] cf, long ts) {
+ return new KeyValue(row, cf, null, ts, KeyValue.Type.DeleteFamily);
+ }
+
+ private static long[] band(long min, long max) {
+ return new long[] { min, max };
+ }
+
+ private CDCVersionFilter createFilter(Map<ImmutableBytesPtr, long[]>
rangeMap) {
+ return new CDCVersionFilter(rangeMap);
+ }
+
+ private Map<ImmutableBytesPtr, long[]> singleRow(byte[] row, long min, long
max) {
+ Map<ImmutableBytesPtr, long[]> map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(row), band(min, max));
+ return map;
+ }
+
+ @Test
+ public void testSingleChangeKeepsChangeAndPreImage() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100))); // change
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80))); // redundant
+ }
+
+ @Test
+ public void testBandIncludesAllInRangeVersions() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 60, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
70)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
60))); // == min
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
50))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
40)));
+ }
+
+ @Test
+ public void testCellsAboveMaxAreSkipped() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 50, 80));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
80))); // == max
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
60)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
50))); // == min
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
40))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
30)));
+ }
+
+ @Test
+ public void testAllCellsAboveMaxAreSkipped() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 50, 50));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+ assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+ }
+
+ @Test
+ public void testColumnWithNoInBandCellKeepsOnlyPreImage() throws IOException
{
+ // The column has no cell at/inside the band; the first cell below min is
its pre-image.
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
70)));
+ }
+
+ @Test
+ public void testDeleteFamilyAlwaysIncluded() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF,
50)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF,
10)));
+ }
+
+ @Test
+ public void testDeleteFamilyDoesNotAffectColumnState() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ }
+
+ @Test
+ public void testDeleteColumnInBandIncluded() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF,
CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ }
+
+ @Test
+ public void testDeleteColumnAsPreImage() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF,
CQ1, 90))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ }
+
+ @Test
+ public void testPointDeleteInBandAndAsPreImage() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF,
CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF,
CQ1, 90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ }
+
+ @Test
+ public void testMultipleColumnsResetState() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ // Column CQ1
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ // Column CQ2 — per-column pre-image state should reset.
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2,
85)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ2,
70)));
+ }
+
+ @Test
+ public void testMultipleColumnFamiliesResetState() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ // CF1:CQ1
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1,
90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF1, CQ1,
80)));
+ // CF2:CQ1 — same qualifier, different family, state should reset.
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1,
85)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF2, CQ1,
70)));
+ }
+
+ @Test
+ public void testMultipleRows() throws IOException {
+ Map<ImmutableBytesPtr, long[]> map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), band(100, 100));
+ map.put(new ImmutableBytesPtr(ROW2), band(50, 80));
+ CDCVersionFilter filter = createFilter(map);
+ // ROW1
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ // ROW2 — the row change is detected automatically via the row key.
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
80)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
60)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
50))); // == min
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
40))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW2, CF, CQ1,
30)));
+ }
+
+ @Test
+ public void testRowNotInMapIncludesEverything() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
90)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1,
80)));
+ }
+
+ @Test
+ public void testNullRangeMap() throws IOException {
+ CDCVersionFilter filter = new CDCVersionFilter(null);
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ }
+
+ @Test
+ public void testEmptyRangeMap() throws IOException {
+ CDCVersionFilter filter = createFilter(new HashMap<>());
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ }
+
+ @Test
+ public void testResetClearsState() throws IOException {
+ CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ filter.reset();
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
100)));
+ assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1,
90)));
+ assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1,
80)));
+ }
+
+ @Test
+ public void testSerializationRoundTrip() throws IOException,
DeserializationException {
+ Map<ImmutableBytesPtr, long[]> map = new HashMap<>();
+ map.put(new ImmutableBytesPtr(ROW1), band(60, 100));
+ map.put(new ImmutableBytesPtr(ROW2), band(80, 80));
+ CDCVersionFilter original = createFilter(map);
+
+ byte[] serialized = original.toByteArray();
+ CDCVersionFilter deserialized = CDCVersionFilter.parseFrom(serialized);
+
+ // ROW1 band [60, 100]
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF,
CQ1, 100)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF,
CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF,
CQ1, 60))); // == min
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF,
CQ1, 50))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW1, CF,
CQ1, 40)));
+
+ // ROW2 band [80, 80]
+ deserialized.reset();
+ assertEquals(ReturnCode.SKIP, deserialized.filterCell(put(ROW2, CF, CQ1,
100))); // above max
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF,
CQ1, 80)));
+ assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF,
CQ1, 70))); // pre-image
+ assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW2, CF,
CQ1, 60)));
+ }
+
+ // parseFrom is a public, class-name-addressable deserialization entry
point; a malformed payload
+ // with a negative wire-supplied length must be rejected rather than throwing
+ // NegativeArraySizeException.
+
+ @Test(expected = DeserializationException.class)
+ public void testParseFromRejectsNegativeRowCount() throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(bos)) {
+ out.writeInt(-1); // numRows
+ }
+ CDCVersionFilter.parseFrom(bos.toByteArray());
+ }
+
+ @Test(expected = DeserializationException.class)
+ public void testParseFromRejectsNegativeRowKeyLength() throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (DataOutputStream out = new DataOutputStream(bos)) {
+ out.writeInt(1); // numRows
+ out.writeInt(-3); // keyLen negative
+ }
+ CDCVersionFilter.parseFrom(bos.toByteArray());
+ }
+}