This is an automated email from the ASF dual-hosted git repository.

virajjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 01526d3b3f PHOENIX-7878 CDC perf improvement - skip redundant cell 
versions on data table scans (#2493)
01526d3b3f is described below

commit 01526d3b3fed4b93083e997de1603f8996d36940
Author: Viraj Jasani <[email protected]>
AuthorDate: Wed Jun 10 14:51:45 2026 -0700

    PHOENIX-7878 CDC perf improvement - skip redundant cell versions on data 
table scans (#2493)
---
 .../apache/phoenix/filter/CDCVersionFilter.java    | 210 +++++++++++++++
 .../coprocessor/CDCGlobalIndexRegionScanner.java   |  65 ++++-
 .../java/org/apache/phoenix/end2end/CDCBaseIT.java | 184 ++++++++++++-
 .../org/apache/phoenix/end2end/CDCQueryIT.java     |  83 +++++-
 .../phoenix/filter/CDCVersionFilterTest.java       | 296 +++++++++++++++++++++
 5 files changed, 814 insertions(+), 24 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
new file mode 100644
index 0000000000..794ad2ee40
--- /dev/null
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/filter/CDCVersionFilter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.io.Writable;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Filter for CDC data table scans that prunes redundant cell versions. For 
each data row the filter
+ * is given the {@code [min, max]} band of change timestamps for that row (the 
oldest and newest CDC
+ * change seen in the current scan). Per column (family:qualifier), with cells 
delivered in
+ * timestamp-descending order, it includes only:
+ * <ul>
+ * <li>cells whose timestamp is within {@code [min, max]} (the change cells), 
and</li>
+ * <li>the first cell strictly below {@code min} (the pre-image for the oldest 
change), and</li>
+ * <li>all DeleteFamily / DeleteFamilyVersion markers (needed for CDC deletion 
tracking).</li>
+ * </ul>
+ * Cells newer than {@code max}, and all but the first cell older than {@code 
min}, are skipped,
+ * reducing network I/O, memory, and processing overhead on the data table 
scan.
+ */
+public class CDCVersionFilter extends FilterBase implements Writable {
+
+  private Map<ImmutableBytesPtr, long[]> rangeMap;
+
+  // Per-row state
+  private long[] currentRange;
+  private byte[] currentRowKey;
+
+  // Per-column state
+  private byte[] prevFamily;
+  private byte[] prevQualifier;
+
+  private boolean belowMinEmitted;
+
+  public CDCVersionFilter() {
+  }
+
+  /**
+   * @param rangeMap mapping of data row key to a 2-element {@code [min, max]} 
array giving the
+   *                 oldest and newest change timestamp for that row
+   */
+  public CDCVersionFilter(Map<ImmutableBytesPtr, long[]> rangeMap) {
+    this.rangeMap = rangeMap;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    currentRange = null;
+    currentRowKey = null;
+    resetColumnState();
+  }
+
+  private void resetColumnState() {
+    prevFamily = null;
+    prevQualifier = null;
+    belowMinEmitted = false;
+  }
+
+  private boolean isNewRow(Cell cell) {
+    if (currentRowKey == null) {
+      return true;
+    }
+    return !Bytes.equals(currentRowKey, 0, currentRowKey.length, 
cell.getRowArray(),
+      cell.getRowOffset(), cell.getRowLength());
+  }
+
+  private void onNewRow(Cell cell) {
+    currentRowKey = CellUtil.cloneRow(cell);
+    ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(currentRowKey);
+    currentRange = rangeMap != null ? rangeMap.get(rowKeyPtr) : null;
+    resetColumnState();
+  }
+
+  private boolean isNewColumn(Cell cell) {
+    if (prevFamily == null) {
+      return true;
+    }
+    if (
+      !Bytes.equals(prevFamily, 0, prevFamily.length, cell.getFamilyArray(), 
cell.getFamilyOffset(),
+        cell.getFamilyLength())
+    ) {
+      return true;
+    }
+    return !Bytes.equals(prevQualifier, 0, prevQualifier.length, 
cell.getQualifierArray(),
+      cell.getQualifierOffset(), cell.getQualifierLength());
+  }
+
+  private void trackColumn(Cell cell) {
+    prevFamily = CellUtil.cloneFamily(cell);
+    prevQualifier = CellUtil.cloneQualifier(cell);
+  }
+
+  // No @Override for HBase 3 compatibility
+  public ReturnCode filterKeyValue(Cell v) throws IOException {
+    return filterCell(v);
+  }
+
+  @Override
+  public ReturnCode filterCell(Cell cell) throws IOException {
+    if (isNewRow(cell)) {
+      onNewRow(cell);
+    }
+
+    if (currentRange == null) {
+      return ReturnCode.INCLUDE;
+    }
+
+    Cell.Type type = cell.getType();
+    if (type == Cell.Type.DeleteFamily || type == 
Cell.Type.DeleteFamilyVersion) {
+      return ReturnCode.INCLUDE;
+    }
+
+    if (isNewColumn(cell)) {
+      trackColumn(cell);
+      belowMinEmitted = false;
+    }
+
+    long cellTs = cell.getTimestamp();
+    long minChangeTs = currentRange[0];
+    long maxChangeTs = currentRange[1];
+
+    if (cellTs > maxChangeTs) {
+      return ReturnCode.SKIP;
+    }
+    if (cellTs >= minChangeTs) {
+      return ReturnCode.INCLUDE;
+    }
+    if (!belowMinEmitted) {
+      belowMinEmitted = true;
+      return ReturnCode.INCLUDE;
+    }
+    return ReturnCode.NEXT_COL;
+  }
+
+  @Override
+  public byte[] toByteArray() throws IOException {
+    return Writables.getBytes(this);
+  }
+
+  public static CDCVersionFilter parseFrom(byte[] pbBytes) throws 
DeserializationException {
+    try {
+      return (CDCVersionFilter) Writables.getWritable(pbBytes, new 
CDCVersionFilter());
+    } catch (IOException e) {
+      throw new DeserializationException(e);
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (rangeMap == null) {
+      out.writeInt(0);
+      return;
+    }
+    out.writeInt(rangeMap.size());
+    for (Map.Entry<ImmutableBytesPtr, long[]> entry : rangeMap.entrySet()) {
+      ImmutableBytesPtr key = entry.getKey();
+      long[] range = entry.getValue();
+      out.writeInt(key.getLength());
+      out.write(key.get(), key.getOffset(), key.getLength());
+      out.writeLong(range[0]);
+      out.writeLong(range[1]);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    int numRows = in.readInt();
+    if (numRows < 0) {
+      throw new IOException("Invalid CDCVersionFilter row count: " + numRows);
+    }
+    rangeMap = new HashMap<>();
+    for (int i = 0; i < numRows; i++) {
+      int keyLen = in.readInt();
+      if (keyLen < 0) {
+        throw new IOException("Invalid CDCVersionFilter row key length: " + 
keyLen);
+      }
+      byte[] keyBytes = new byte[keyLen];
+      in.readFully(keyBytes);
+      long minChangeTs = in.readLong();
+      long maxChangeTs = in.readLong();
+      rangeMap.put(new ImmutableBytesPtr(keyBytes), new long[] { minChangeTs, 
maxChangeTs });
+    }
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
index 97b388dc43..b061cfe2f2 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/CDCGlobalIndexRegionScanner.java
@@ -28,6 +28,7 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +41,8 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
@@ -49,6 +52,7 @@ import 
org.apache.phoenix.coprocessor.generated.IndexMutationsProtos;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
+import org.apache.phoenix.filter.CDCVersionFilter;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.CDCTableInfo;
 import org.apache.phoenix.index.IndexMaintainer;
@@ -123,21 +127,60 @@ public class CDCGlobalIndexRegionScanner extends 
UncoveredGlobalIndexRegionScann
     ) {
       return null;
     }
-    // TODO: Get Timerange from the start row and end row of the index scan 
object
-    // and set it in the datatable scan object.
-    // if (scan.getStartRow().length == 8) {
-    // startTimeRange = PLong.INSTANCE.getCodec().decodeLong(
-    // scan.getStartRow(), 0, SortOrder.getDefault());
-    // }
-    // if (scan.getStopRow().length == 8) {
-    // stopTimeRange = PLong.INSTANCE.getCodec().decodeLong(
-    // scan.getStopRow(), 0, SortOrder.getDefault());
-    // }
     Scan dataScan = prepareDataTableScan(dataRowKeys, true);
     if (dataScan == null) {
       return null;
     }
-    return CDCUtil.setupScanForCDC(dataScan);
+    CDCUtil.setupScanForCDC(dataScan);
+    Map<ImmutableBytesPtr, long[]> changeRangeMap = 
buildDataRowChangeRangeMap(dataRowKeys);
+    if (!changeRangeMap.isEmpty()) {
+      CDCVersionFilter versionFilter = new CDCVersionFilter(changeRangeMap);
+      Filter existingFilter = dataScan.getFilter();
+      if (existingFilter != null) {
+        dataScan.setFilter(
+          new FilterList(FilterList.Operator.MUST_PASS_ALL, existingFilter, 
versionFilter));
+      } else {
+        dataScan.setFilter(versionFilter);
+      }
+    }
+    return dataScan;
+  }
+
+  private Map<ImmutableBytesPtr, long[]>
+    buildDataRowChangeRangeMap(Collection<byte[]> dataRowKeys) {
+    Set<ImmutableBytesPtr> scopedRowKeys = new HashSet<>(dataRowKeys.size());
+    for (byte[] dataRowKey : dataRowKeys) {
+      scopedRowKeys.add(new ImmutableBytesPtr(dataRowKey));
+    }
+    Map<ImmutableBytesPtr, long[]> result = new HashMap<>();
+    for (List<Cell> indexRow : indexRows) {
+      if (indexRow.isEmpty()) {
+        continue;
+      }
+      Cell firstCell = indexRow.get(0);
+      byte[] indexRowKey = 
ImmutableBytesPtr.cloneCellRowIfNecessary(firstCell);
+      byte[] dataRowKey = indexToDataRowKeyMap.get(indexRowKey);
+      if (dataRowKey == null) {
+        continue;
+      }
+      ImmutableBytesPtr dataRowKeyPtr = new ImmutableBytesPtr(dataRowKey);
+      if (!scopedRowKeys.contains(dataRowKeyPtr)) {
+        continue;
+      }
+      long changeTs = firstCell.getTimestamp();
+      long[] range = result.get(dataRowKeyPtr);
+      if (range == null) {
+        result.put(dataRowKeyPtr, new long[] { changeTs, changeTs });
+      } else {
+        if (changeTs < range[0]) {
+          range[0] = changeTs;
+        }
+        if (changeTs > range[1]) {
+          range[1] = changeTs;
+        }
+      }
+    }
+    return result;
   }
 
   protected boolean getNextCoveredIndexRow(List<Cell> result) throws 
IOException {
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
index 167f059be7..af33d4d8bc 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java
@@ -614,6 +614,149 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
     return changes;
   }
 
+  protected List<ChangeRow> generateChangesForPrePostImage(long startTS, 
String[] tenantids,
+    String tableName, CommitAdapter committer) throws Exception {
+    List<ChangeRow> changes = new ArrayList<>();
+    EnvironmentEdgeManager.injectEdge(injectEdge);
+    injectEdge.setValue(startTS);
+    committer.init();
+    Map<String, Object> rowid1 = new HashMap() {
+      {
+        put("K", 1);
+      }
+    };
+    Map<String, Object> rowid2 = new HashMap() {
+      {
+        put("K", 2);
+      }
+    };
+    long ts = startTS;
+    for (String tid : tenantids) {
+      try (Connection conn = committer.getConnection(tid)) {
+        // Initial inserts for two rows at the same timestamp (one batch, 
multiple data rows).
+        changes.add(
+          addChange(conn, tableName, new ChangeRow(tid, ts, rowid1, new 
TreeMap<String, Object>() {
+            {
+              put("V1", 1L);
+              put("V2", 10L);
+              put("V3", 1000L);
+              put("B.VB", 100L);
+            }
+          })));
+        changes.add(
+          addChange(conn, tableName, new ChangeRow(tid, ts, rowid2, new 
TreeMap<String, Object>() {
+            {
+              put("V1", 200L);
+              put("V2", 2000L);
+            }
+          })));
+        committer.commit(conn);
+
+        // Partial update: only V1 changes, so V2/V3/B.VB pre-images come from 
the prior version.
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V1", 2L);
+            }
+          })));
+        committer.commit(conn);
+
+        // V3 and B.VB remain untouched here, so they stay sparse relative to 
the V1/V2 churn.
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V1", 3L);
+              put("V2", 20L);
+            }
+          })));
+        committer.commit(conn);
+
+        // Column-level null -> DeleteColumn marker on V2.
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V2", null);
+            }
+          })));
+        committer.commit(conn);
+
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V1", 4L);
+              put("B.VB", 400L);
+            }
+          })));
+        committer.commit(conn);
+
+        // Full-row delete -> DeleteFamily marker.
+        changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, 
rowid1, null)));
+        committer.commit(conn);
+
+        // Re-insert after delete: PRE image must be empty (bounded by the 
delete family marker).
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V1", 5L);
+              put("V2", 50L);
+            }
+          })));
+        committer.commit(conn);
+
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("B.VB", 500L);
+            }
+          })));
+        committer.commit(conn);
+
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V1", 6L);
+              put("V2", 60L);
+              put("V3", 6000L);
+              put("B.VB", 600L);
+            }
+          })));
+        committer.commit(conn);
+
+        // Second row mutated again so the scan batch keeps covering multiple 
data rows.
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid2, new TreeMap<String, Object>() {
+            {
+              put("V1", 201L);
+            }
+          })));
+        committer.commit(conn);
+
+        // Consecutive full-row deletes: CDC collapses these into a single 
delete event.
+        changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, 
rowid1, null)));
+        committer.commit(conn);
+        changes.add(addChange(conn, tableName, new ChangeRow(tid, ts += 100, 
rowid1, null)));
+        committer.commit(conn);
+
+        // Re-insert a single column after the deletes.
+        changes.add(addChange(conn, tableName,
+          new ChangeRow(tid, ts += 100, rowid1, new TreeMap<String, Object>() {
+            {
+              put("V1", 7L);
+            }
+          })));
+        committer.commit(conn);
+      }
+      ts += 100;
+    }
+    committer.reset();
+    for (int i = 0; i < changes.size(); ++i) {
+      LOGGER.debug("----- generated change: " + i + " tenantId:" + 
changes.get(i).tenantId
+        + " changeTS: " + changes.get(i).changeTS + " pks: " + 
changes.get(i).pks + " change: "
+        + changes.get(i).change);
+    }
+    return changes;
+  }
+
   protected void verifyChangesViaSCN(String tenantId, Connection conn, String 
cdcFullName,
     Map<String, String> pkColumns, String dataTableName, Map<String, String> 
dataColumns,
     List<ChangeRow> changes, long startTS, long endTS) throws Exception {
@@ -682,14 +825,19 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
       if (!changeRow.getChangeType().equals(cdcObj.get(CDC_EVENT_TYPE))) {
         assertEquals(changeDesc, changeRow.getChangeType(), 
cdcObj.get(CDC_EVENT_TYPE));
       }
-      if (
-        cdcObj.containsKey(CDC_PRE_IMAGE) && !((Map) 
cdcObj.get(CDC_PRE_IMAGE)).isEmpty()
-          && changeScopes.contains(PTable.CDCChangeScope.PRE)
-      ) {
-        Map<String, Object> preImage = getRowImage(changeDesc, tenantId, 
dataTableName, dataColumns,
-          changeRow, changeRow.changeTS);
-        assertEquals(changeDesc, preImage,
-          fillInNulls((Map<String, Object>) cdcObj.get(CDC_PRE_IMAGE), 
dataColumns.keySet()));
+      if (changeScopes.contains(PTable.CDCChangeScope.PRE)) {
+        Map<String, Object> cdcPreImage = (Map<String, Object>) 
cdcObj.get(CDC_PRE_IMAGE);
+        Map<String, Object> expectedPreImage = getRowImage(changeDesc, 
tenantId, dataTableName,
+          dataColumns, changeRow, changeRow.changeTS, false);
+        if (expectedPreImage == null) {
+          // No live row state immediately before the change (first insert, or 
re-insert after a
+          // delete): the pre-image must be empty or absent.
+          assertTrue(changeDesc + " expected empty pre-image but got: " + 
cdcPreImage,
+            cdcPreImage == null || cdcPreImage.isEmpty());
+        } else {
+          assertEquals(changeDesc, expectedPreImage,
+            fillInNulls(cdcPreImage, dataColumns.keySet()));
+        }
       }
       if (changeScopes.contains(PTable.CDCChangeScope.CHANGE)) {
         assertEquals(changeDesc,
@@ -716,6 +864,19 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
   protected Map<String, Object> getRowImage(String changeDesc, String tenantId,
     String dataTableName, Map<String, String> dataColumns, ChangeRow 
changeRow, long scnTimestamp)
     throws Exception {
+    return getRowImage(changeDesc, tenantId, dataTableName, dataColumns, 
changeRow, scnTimestamp,
+      true);
+  }
+
+  /**
+   * Reads the row image (all data columns, nulls included) as of {@code 
scnTimestamp}. When
+   * {@code mustExist} is true, the row is asserted to exist. When false, a 
missing row returns
+   * {@code null} so callers can distinguish "no live row state at this SCN" 
from "row present (with
+   * possibly null column values)" - used to assert expected-empty vs 
expected-nonempty pre-images.
+   */
+  protected Map<String, Object> getRowImage(String changeDesc, String tenantId,
+    String dataTableName, Map<String, String> dataColumns, ChangeRow 
changeRow, long scnTimestamp,
+    boolean mustExist) throws Exception {
     Map<String, Object> image = new HashMap<>();
     Properties props = new Properties();
     props.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
Long.toString(scnTimestamp));
@@ -734,7 +895,12 @@ public class CDCBaseIT extends ParallelStatsDisabledIT {
       }
       // Create projection without namespace.
       ResultSet rs = stmt.executeQuery();
-      assertTrue(changeDesc, rs.next());
+      if (!rs.next()) {
+        if (mustExist) {
+          fail(changeDesc + " expected data table row to exist at SCN " + 
scnTimestamp);
+        }
+        return null;
+      }
       for (String colName : projections.keySet()) {
         PDataType dt = PDataType.fromSqlTypeName(dataColumns.get(colName));
         image.put(colName, 
getJsonEncodedValue(rs.getObject(projections.get(colName)), dt));
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
index 9a86354778..385e4401ec 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java
@@ -337,7 +337,6 @@ public class CDCQueryIT extends CDCBaseIT {
 
     String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
     try (Connection conn = newConnection(tenantId)) {
-      // For debug: uncomment to see the exact results logged to console.
       dumpCDCResults(conn, cdcName, new TreeMap<String, String>() {
         {
           put("K", "INTEGER");
@@ -434,6 +433,85 @@ public class CDCQueryIT extends CDCBaseIT {
     }
   }
 
+  /**
+   * Exercises CDC PRE/POST image reconstruction over a row with a deep stack 
of cell versions
+   * interleaved with column-level nulls, full-row deletes, consecutive 
deletes and re-inserts. This
+   * specifically stresses the server-side version pruning applied to the data 
table scan: the PRE
+   * and POST images are recomputed independently via SCN queries on the data 
table and compared
+   * against the CDC output, so any over-pruning of needed versions surfaces 
as a mismatch.
+   */
+  @Test
+  public void testSelectCDCPreAndPostImageWithVersionPruning() throws 
Exception {
+    String cdcName, cdc_sql;
+    String schemaName = getSchemaName();
+    String tableName = getTableOrViewName(schemaName);
+    String datatableName = tableName;
+    try (Connection conn = newConnection()) {
+      createTable(conn,
+        "CREATE TABLE  " + tableName + " (" + (multitenant ? "TENANT_ID 
CHAR(5) NOT NULL, " : "")
+          + "k INTEGER NOT NULL, v1 INTEGER, v2 INTEGER, v3 INTEGER, B.vb 
INTEGER, "
+          + "CONSTRAINT PK PRIMARY KEY " + (multitenant ? "(TENANT_ID, k) " : 
"(k)") + ")",
+        encodingScheme, multitenant, tableSaltBuckets, false, null);
+      if (forView) {
+        String viewName = getTableOrViewName(schemaName);
+        createTable(conn, "CREATE VIEW " + viewName + " AS SELECT * FROM " + 
tableName,
+          encodingScheme);
+        tableName = viewName;
+      }
+      cdcName = getCDCName();
+      cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
+      createCDC(conn, cdc_sql, encodingScheme);
+    }
+
+    String tenantId = multitenant ? "1000" : null;
+    String[] tenantids = { tenantId };
+    if (multitenant) {
+      tenantids = new String[] { tenantId, "2000" };
+    }
+
+    long startTS = System.currentTimeMillis();
+    List<ChangeRow> changes =
+      generateChangesForPrePostImage(startTS, tenantids, tableName, 
COMMIT_SUCCESS);
+    long currentTime = System.currentTimeMillis();
+    long endTS = changes.get(changes.size() - 1).getTimestamp() + 1;
+    if (endTS > currentTime) {
+      Thread.sleep(endTS - currentTime);
+    }
+
+    Map<String, String> dataColumns = new TreeMap<String, String>() {
+      {
+        put("V1", "INTEGER");
+        put("V2", "INTEGER");
+        put("V3", "INTEGER");
+        put("B.VB", "INTEGER");
+      }
+    };
+    String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
+    try (Connection conn = newConnection(tenantId)) {
+      dumpCDCResults(conn, cdcName, new TreeMap<String, String>() {
+        {
+          put("K", "INTEGER");
+        }
+      }, addPartitionInList(conn, cdcFullName,
+        "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + 
"\"CDC JSON\" FROM "
+          + cdcFullName));
+
+      // Verify PRE and POST images are correctly reconstructed from the 
version-pruned data scan.
+      verifyChangesViaSCN(tenantId,
+        getCDCQueryPreparedStatement(conn, cdcFullName,
+          "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName, 
startTS, endTS)
+            .executeQuery(),
+        datatableName, dataColumns, changes, PRE_POST_IMG);
+
+      // Cross-check CHANGE, PRE and POST images together.
+      verifyChangesViaSCN(tenantId,
+        getCDCQueryPreparedStatement(conn, cdcFullName,
+          "SELECT /*+ CDC_INCLUDE(CHANGE, PRE, POST) */ * FROM " + 
cdcFullName, startTS, endTS)
+            .executeQuery(),
+        datatableName, dataColumns, changes, ALL_IMG);
+    }
+  }
+
   @Test
   public void testSelectGeneric() throws Exception {
     String cdcName, cdc_sql;
@@ -494,7 +572,6 @@ public class CDCQueryIT extends CDCBaseIT {
 
     String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
     try (Connection conn = newConnection(tenantId)) {
-      // For debug: uncomment to see the exact results logged to console.
       dumpCDCResults(conn, cdcName, pkColumns, addPartitionInList(conn, 
cdcFullName,
         "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName));
 
@@ -598,7 +675,6 @@ public class CDCQueryIT extends CDCBaseIT {
     };
 
     try (Connection conn = newConnection(tenantId)) {
-      // For debug: uncomment to see the exact results logged to console.
       dumpCDCResults(conn, cdcName, new TreeMap<String, String>() {
         {
           put("K", "INTEGER");
@@ -681,7 +757,6 @@ public class CDCQueryIT extends CDCBaseIT {
 
     String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName);
     try (Connection conn = newConnection(tenantId)) {
-      // For debug: uncomment to see the exact results logged to console.
       dumpCDCResults(conn, cdcName, pkColumns,
         "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName);
 
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
new file mode 100644
index 0000000000..40036d3440
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/filter/CDCVersionFilterTest.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.filter;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link CDCVersionFilter}, which keeps, per column, the cells 
within the per-row
+ * change-timestamp band {@code [min, max]} plus the first cell below {@code 
min} (the pre-image),
+ * and always keeps DeleteFamily markers.
+ */
+public class CDCVersionFilterTest {
+
+  private static final byte[] ROW1 = Bytes.toBytes("row1");
+  private static final byte[] ROW2 = Bytes.toBytes("row2");
+  private static final byte[] CF1 = Bytes.toBytes("cf");
+  private static final byte[] CF2 = Bytes.toBytes("cf2");
+  private static final byte[] CQ1 = Bytes.toBytes("cq1");
+  private static final byte[] CQ2 = Bytes.toBytes("cq2");
+  private static final byte[] VAL = Bytes.toBytes("v");
+
+  private static final byte[] CF = CF1;
+
+  private Cell put(byte[] row, byte[] cf, byte[] cq, long ts) {
+    return new KeyValue(row, cf, cq, ts, KeyValue.Type.Put, VAL);
+  }
+
+  private Cell deleteColumn(byte[] row, byte[] cf, byte[] cq, long ts) {
+    return new KeyValue(row, cf, cq, ts, KeyValue.Type.DeleteColumn);
+  }
+
+  private Cell pointDelete(byte[] row, byte[] cf, byte[] cq, long ts) {
+    return new KeyValue(row, cf, cq, ts, KeyValue.Type.Delete);
+  }
+
+  private Cell deleteFamily(byte[] row, byte[] cf, long ts) {
+    return new KeyValue(row, cf, null, ts, KeyValue.Type.DeleteFamily);
+  }
+
+  private static long[] band(long min, long max) {
+    return new long[] { min, max };
+  }
+
+  private CDCVersionFilter createFilter(Map<ImmutableBytesPtr, long[]> 
rangeMap) {
+    return new CDCVersionFilter(rangeMap);
+  }
+
+  private Map<ImmutableBytesPtr, long[]> singleRow(byte[] row, long min, long 
max) {
+    Map<ImmutableBytesPtr, long[]> map = new HashMap<>();
+    map.put(new ImmutableBytesPtr(row), band(min, max));
+    return map;
+  }
+
+  @Test
+  public void testSingleChangeKeepsChangeAndPreImage() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100))); // change
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80))); // redundant
+  }
+
+  @Test
+  public void testBandIncludesAllInRangeVersions() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 60, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
70)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
60))); // == min
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
50))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
40)));
+  }
+
+  @Test
+  public void testCellsAboveMaxAreSkipped() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 50, 80));
+    assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+    assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
80))); // == max
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
60)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
50))); // == min
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
40))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
30)));
+  }
+
+  @Test
+  public void testAllCellsAboveMaxAreSkipped() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 50, 50));
+    assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 100)));
+    assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 90)));
+    assertEquals(ReturnCode.SKIP, filter.filterCell(put(ROW1, CF, CQ1, 80)));
+  }
+
+  @Test
+  public void testColumnWithNoInBandCellKeepsOnlyPreImage() throws IOException 
{
+    // The column has no cell at/inside the band; the first cell below min is 
its pre-image.
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
70)));
+  }
+
+  @Test
+  public void testDeleteFamilyAlwaysIncluded() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 
50)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 
10)));
+  }
+
+  @Test
+  public void testDeleteFamilyDoesNotAffectColumnState() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteFamily(ROW1, CF, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+  }
+
+  @Test
+  public void testDeleteColumnInBandIncluded() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, 
CQ1, 100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+  }
+
+  @Test
+  public void testDeleteColumnAsPreImage() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(deleteColumn(ROW1, CF, 
CQ1, 90))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+  }
+
+  @Test
+  public void testPointDeleteInBandAndAsPreImage() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF, 
CQ1, 100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(pointDelete(ROW1, CF, 
CQ1, 90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+  }
+
+  @Test
+  public void testMultipleColumnsResetState() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    // Column CQ1
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+    // Column CQ2 — per-column pre-image state should reset.
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ2, 
85)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ2, 
70)));
+  }
+
+  @Test
+  public void testMultipleColumnFamiliesResetState() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    // CF1:CQ1
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF1, CQ1, 
90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF1, CQ1, 
80)));
+    // CF2:CQ1 — same qualifier, different family, state should reset.
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF2, CQ1, 
85)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF2, CQ1, 
70)));
+  }
+
+  @Test
+  public void testMultipleRows() throws IOException {
+    Map<ImmutableBytesPtr, long[]> map = new HashMap<>();
+    map.put(new ImmutableBytesPtr(ROW1), band(100, 100));
+    map.put(new ImmutableBytesPtr(ROW2), band(50, 80));
+    CDCVersionFilter filter = createFilter(map);
+    // ROW1
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+    // ROW2 — the row change is detected automatically via the row key.
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
80)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
60)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
50))); // == min
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
40))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW2, CF, CQ1, 
30)));
+  }
+
+  @Test
+  public void testRowNotInMapIncludesEverything() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW2, CF, CQ1, 
80)));
+  }
+
+  @Test
+  public void testNullRangeMap() throws IOException {
+    CDCVersionFilter filter = new CDCVersionFilter(null);
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+  }
+
+  @Test
+  public void testEmptyRangeMap() throws IOException {
+    CDCVersionFilter filter = createFilter(new HashMap<>());
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+  }
+
+  @Test
+  public void testResetClearsState() throws IOException {
+    CDCVersionFilter filter = createFilter(singleRow(ROW1, 100, 100));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    filter.reset();
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
100)));
+    assertEquals(ReturnCode.INCLUDE, filter.filterCell(put(ROW1, CF, CQ1, 
90)));
+    assertEquals(ReturnCode.NEXT_COL, filter.filterCell(put(ROW1, CF, CQ1, 
80)));
+  }
+
+  @Test
+  public void testSerializationRoundTrip() throws IOException, 
DeserializationException {
+    Map<ImmutableBytesPtr, long[]> map = new HashMap<>();
+    map.put(new ImmutableBytesPtr(ROW1), band(60, 100));
+    map.put(new ImmutableBytesPtr(ROW2), band(80, 80));
+    CDCVersionFilter original = createFilter(map);
+
+    byte[] serialized = original.toByteArray();
+    CDCVersionFilter deserialized = CDCVersionFilter.parseFrom(serialized);
+
+    // ROW1 band [60, 100]
+    assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, 
CQ1, 100)));
+    assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, 
CQ1, 80)));
+    assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, 
CQ1, 60))); // == min
+    assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW1, CF, 
CQ1, 50))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW1, CF, 
CQ1, 40)));
+
+    // ROW2 band [80, 80]
+    deserialized.reset();
+    assertEquals(ReturnCode.SKIP, deserialized.filterCell(put(ROW2, CF, CQ1, 
100))); // above max
+    assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF, 
CQ1, 80)));
+    assertEquals(ReturnCode.INCLUDE, deserialized.filterCell(put(ROW2, CF, 
CQ1, 70))); // pre-image
+    assertEquals(ReturnCode.NEXT_COL, deserialized.filterCell(put(ROW2, CF, 
CQ1, 60)));
+  }
+
+  // parseFrom is a public, class-name-addressable deserialization entry 
point; a malformed payload
+  // with a negative wire-supplied length must be rejected rather than throwing
+  // NegativeArraySizeException.
+
+  @Test(expected = DeserializationException.class)
+  public void testParseFromRejectsNegativeRowCount() throws Exception {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try (DataOutputStream out = new DataOutputStream(bos)) {
+      out.writeInt(-1); // numRows
+    }
+    CDCVersionFilter.parseFrom(bos.toByteArray());
+  }
+
+  @Test(expected = DeserializationException.class)
+  public void testParseFromRejectsNegativeRowKeyLength() throws Exception {
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try (DataOutputStream out = new DataOutputStream(bos)) {
+      out.writeInt(1); // numRows
+      out.writeInt(-3); // keyLen negative
+    }
+    CDCVersionFilter.parseFrom(bos.toByteArray());
+  }
+}


Reply via email to