This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d4d858d3822c85e6b4b71b1004d8ba6c63fa5134 Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Tue Apr 22 14:31:22 2025 -0600 Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. patch by Jon Meredith; reviewed by Marcus Eriksson for CASSANDRA-20561 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/LivenessInfo.java | 11 ++- src/java/org/apache/cassandra/db/rows/Cells.java | 10 +++ test/unit/org/apache/cassandra/db/CellTest.java | 85 +++++++++++++++++++++- .../org/apache/cassandra/db/LivenessInfoTest.java | 5 ++ .../org/apache/cassandra/db/rows/RowsTest.java | 54 ++++++++++++++ 6 files changed, 163 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4332b75555..ddd87100ad 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0.18 + * Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. (CASSANDRA-20561) * Grant permission on keyspaces system_views and system_virtual_schema not possible (CASSANDRA-20171) * Avoid computing prepared statement size for unprepared batches (CASSANDRA-20556) * Fix Dropwizard Meter causes timeouts when infrequently used (CASSANDRA-19332) diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index f3e6daabad..2857b0b381 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -193,7 +193,10 @@ public class LivenessInfo implements IMeasurableMemory * supersedes, ie. tombstone supersedes. * * If timestamps are the same and both of them are expired livenessInfo(Ideally it shouldn't happen), - * greater localDeletionTime wins. + * greater localDeletionTime wins. If the localDeletion times are the same, prefer the + * lower TTL to make the merge deterministic (it is likely that the row has been rewritten with + * USING TTL/TIMESTAMP with an updated TTL that computes to the same local deletion time -- perhaps + * from rerunning a process to migrate user data between clusters or tables). * * @param other * the {@code LivenessInfo} to compare this info to. @@ -207,7 +210,11 @@ public class LivenessInfo implements IMeasurableMemory if (isExpired() ^ other.isExpired()) return isExpired(); if (isExpiring() == other.isExpiring()) - return localExpirationTime() > other.localExpirationTime(); + { + return localExpirationTime() > other.localExpirationTime() || + (localExpirationTime() == other.localExpirationTime() && ttl() < other.ttl()); + } + return isExpiring(); } diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java b/src/java/org/apache/cassandra/db/rows/Cells.java index 59f1d3f7fd..ce774080ee 100644 --- a/src/java/org/apache/cassandra/db/rows/Cells.java +++ b/src/java/org/apache/cassandra/db/rows/Cells.java @@ -113,6 +113,16 @@ public abstract class Cells // would otherwise always win (unless it had an empty value), until it expired and was translated to a tombstone if (leftLocalDeletionTime != rightLocalDeletionTime) return leftLocalDeletionTime > rightLocalDeletionTime ? left : right; + + // Both cells are either tombstones or expiring at the same timestamp. If expiring and the + // TTLs differ, write the lower one -- the write is probably from a more recent + // UPDATE USING TTL AND TIMESTAMP, so select the most recent one to be deterministic and be + // closest to client intent. + if (!leftIsTombstone && left.ttl() != right.ttl()) + { + assert !rightIsTombstone; + return left.ttl() < right.ttl() ? left : right; + } } return compareValues(left, right) >= 0 ? left : right; diff --git a/test/unit/org/apache/cassandra/db/CellTest.java b/test/unit/org/apache/cassandra/db/CellTest.java index d4dec05e3b..a7c334bb79 100644 --- a/test/unit/org/apache/cassandra/db/CellTest.java +++ b/test/unit/org/apache/cassandra/db/CellTest.java @@ -41,7 +41,8 @@ import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; -import static java.util.Arrays.*; +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; public class CellTest { @@ -62,6 +63,8 @@ public class CellTest .addRegularColumn("v", IntegerType.instance) .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true)) .build(); + public static final ByteBuffer TEST_VALUE = ByteBufferUtil.bytes("a"); + @BeforeClass public static void defineSchema() throws ConfigurationException @@ -253,6 +256,86 @@ public class CellTest Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", null, 1)); } + + public static void assertCellsEqual(Cell<?> cellA, Cell<?> cellB) + { + assertEquals(cellA.timestamp(), cellB.timestamp()); + assertEquals(cellA.ttl(), cellB.ttl()); + assertEquals(cellA.localDeletionTime(), cellB.localDeletionTime()); + assertEquals(cellA.buffer(), cellB.buffer()); + } + + static void checkCommutes(ColumnMetadata cmd, long timestamp, long tsDiff, int ttl, int ttlDiff, int nowInSeconds, int nowDiff) + { + long timestampA = timestamp; + long timestampB = timestampA + tsDiff; + int ttlA = ttl; + int ttlB = ttl + ttlDiff; + int nowInSecsA = nowInSeconds; + int nowInSecsB = nowInSecsA + nowDiff; + if (nowInSecsA < 0 || nowInSecsB < 0) + return; + + Cell<?> cellA = ttlA == 0 ? BufferCell.tombstone(cmd, timestampA, nowInSecsA) : + ttlA < 0 ? BufferCell.live(cmd, timestampA, TEST_VALUE) : + BufferCell.expiring(cmd, timestampA, ttlA, nowInSecsA, TEST_VALUE); + Cell<?> cellB = ttlB == 0 ? BufferCell.tombstone(cmd, timestampB, nowInSecsB) : + ttlB < 0 ? BufferCell.live(cmd, timestampB, TEST_VALUE) : + BufferCell.expiring(cmd, timestampB, ttlB, nowInSecsB, TEST_VALUE); + + Cell<?> cellAB = Cells.reconcile(cellA, cellB); + Cell<?> cellBA = Cells.reconcile(cellB, cellA); + + assertCellsEqual(cellAB, cellBA); + } + + @Test + public void checkSameValueDifferentLivenessCommutes() + { + ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance); + long[] tsDiffs = new long[] {0L, + 1L, // microsecond + 1000L, // millisecond + 1000000L, // second + 60000000L}; // minute + int[] ttls = new int[] { -1, 0, 1, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 }; + int[] ttlDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 }; + + int nowInSeconds = FBUtilities.nowInSeconds(); + long timestamp = FBUtilities.timestampMicros(); + + for (long tsDiff: tsDiffs) + { + for (int ttl: ttls) + { + for (int ttlDiff : ttlDiffs) + { + for (Integer nowDiff : ttlDiffs) + checkCommutes(cmd, timestamp, tsDiff, ttl, ttlDiff, nowInSeconds, nowDiff); + } + } + } + } + + // Checks that reconciling a cell with a smaller TTL reconcile commutatively + // Similar to rewriting data retrieved with SELECT v, TTL(v), WRITETIMESTAMP(v) with + // INSERT SET v=? USING TTL ? AND TIMESTAMP ? + @Test + public void rewriteCellWithSmallerTTL() + { + ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance); + int[] nowDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 60 * 24 * 3600, 366 * 24 * 3600 }; + long timestamp = FBUtilities.timestampMicros(); + int nowInSeconds = FBUtilities.nowInSeconds(); + int ttl = 3600; + + for (Integer nowDiff : nowDiffs) + { + checkCommutes(cmd, timestamp, 0L, ttl, -nowDiff, nowInSeconds, nowDiff); + } + } + + class SimplePurger implements DeletionPurger { private final int gcBefore; diff --git a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java index 557870672e..0482423f9d 100644 --- a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java +++ b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java @@ -75,6 +75,11 @@ public class LivenessInfoTest first = LivenessInfo.withExpirationTime(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds + 1); second = LivenessInfo.withExpirationTime(100, LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds); assertSupersedes(first, second); + + // rewritten expiring with the same expiration time and a lower TTL, take the lower TTL as likely to be more recent + first = LivenessInfo.withExpirationTime(100, 4, nowInSeconds); + second = LivenessInfo.withExpirationTime(100, 5, nowInSeconds); + assertSupersedes(first, second); } @Test diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java b/test/unit/org/apache/cassandra/db/rows/RowsTest.java index cfeebfd2b1..47e42b2542 100644 --- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java +++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java @@ -45,6 +45,8 @@ import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.db.CellTest.assertCellsEqual; + public class RowsTest { private static final String KEYSPACE = "rows_test"; @@ -520,6 +522,58 @@ public class RowsTest Assert.assertEquals(0, merged.columns().size()); } + + public static BufferCell expiringWithExpirationTime(ColumnMetadata column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value) + { + return expiringWithExpirationTime(column, timestamp, ttl, localDeletionTime, value, null); + } + + public static BufferCell expiringWithExpirationTime(ColumnMetadata column, long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path) + { + assert ttl != Cell.NO_TTL; + return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); + } + + @Test + public void mergeRowsWithSameExpiryDifferentTTLCommutesLiveness() + { + int now1 = FBUtilities.nowInSeconds(); + long ts1 = secondToTs(now1); + int ldt = now1 + 1000; + + Row.Builder r1Builder = BTreeRow.unsortedBuilder(); + r1Builder.newRow(c1); + LivenessInfo originalLiveness = LivenessInfo.withExpirationTime(ts1, 100, ldt); + r1Builder.addPrimaryKeyLivenessInfo(originalLiveness); + + Row.Builder r2Builder = BTreeRow.unsortedBuilder(); + r2Builder.newRow(c1); + LivenessInfo loweredTTL = LivenessInfo.withExpirationTime(ts1, 50, ldt); + r2Builder.addPrimaryKeyLivenessInfo(loweredTTL); + + Cell<?> r2v = expiringWithExpirationTime(v, ts1, 75, ldt, BB1); + Cell<?> r2m2 = expiringWithExpirationTime(m, ts1, 50, ldt, BB1, CellPath.create(BB2)); + Cell<?> r2m3 = expiringWithExpirationTime(m, ts1, 75, ldt, BB2, CellPath.create(BB3)); + Cell<?> r2m4 = expiringWithExpirationTime(m, ts1, 100, ldt, BB3, CellPath.create(BB4)); + List<Cell<?>> expectedCells = Lists.newArrayList(r2v, r2m2, r2m3, r2m4); + + expectedCells.forEach(r1Builder::addCell); + expectedCells.forEach(r2Builder::addCell); + + Row r1 = r1Builder.build(); + Row r2 = r2Builder.build(); + + Row r1r2 = Rows.merge(r1, r2); + Row r2r1 = Rows.merge(r2, r1); + + DiffListener mergedListener = new DiffListener(); + Rows.diff(mergedListener, r1r2, r2r1); + + mergedListener.liveness.forEach(pair -> Assert.assertEquals(pair.merged, pair.original)); + mergedListener.cells.forEach(pair -> assertCellsEqual(pair.merged, pair.original)); + } + + // Creates a dummy cell for a (regular) column for the provided name and without a cellPath. private static Cell<?> liveCell(ColumnMetadata name) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org