This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d4d858d3822c85e6b4b71b1004d8ba6c63fa5134
Author: Jon Meredith <jonmered...@apache.org>
AuthorDate: Tue Apr 22 14:31:22 2025 -0600

    Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches.
    
    patch by Jon Meredith; reviewed by Marcus Eriksson for CASSANDRA-20561
---
 CHANGES.txt                                        |  1 +
 src/java/org/apache/cassandra/db/LivenessInfo.java | 11 ++-
 src/java/org/apache/cassandra/db/rows/Cells.java   | 10 +++
 test/unit/org/apache/cassandra/db/CellTest.java    | 85 +++++++++++++++++++++-
 .../org/apache/cassandra/db/LivenessInfoTest.java  |  5 ++
 .../org/apache/cassandra/db/rows/RowsTest.java     | 54 ++++++++++++++
 6 files changed, 163 insertions(+), 3 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4332b75555..ddd87100ad 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.18
+ * Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
  * Grant permission on keyspaces system_views and system_virtual_schema not 
possible (CASSANDRA-20171)
  * Avoid computing prepared statement size for unprepared batches 
(CASSANDRA-20556)
  * Fix Dropwizard Meter causes timeouts when infrequently used 
(CASSANDRA-19332)
diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java 
b/src/java/org/apache/cassandra/db/LivenessInfo.java
index f3e6daabad..2857b0b381 100644
--- a/src/java/org/apache/cassandra/db/LivenessInfo.java
+++ b/src/java/org/apache/cassandra/db/LivenessInfo.java
@@ -193,7 +193,10 @@ public class LivenessInfo implements IMeasurableMemory
      * supersedes, ie. tombstone supersedes.
      *
      * If timestamps are the same and both of them are expired 
livenessInfo(Ideally it shouldn't happen),
-     * greater localDeletionTime wins.
+     * greater localDeletionTime wins. If the localDeletion times are the 
same, prefer the
+     * lower TTL to make the merge deterministic (it is likely that the row 
has been rewritten with
+     * USING TTL/TIMESTAMP with an updated TTL that computes to the same local 
deletion time -- perhaps
+     * from rerunning a process to migrate user data between clusters or 
tables).
      *
      * @param other
      *            the {@code LivenessInfo} to compare this info to.
@@ -207,7 +210,11 @@ public class LivenessInfo implements IMeasurableMemory
         if (isExpired() ^ other.isExpired())
             return isExpired();
         if (isExpiring() == other.isExpiring())
-            return localExpirationTime() > other.localExpirationTime();
+        {
+            return localExpirationTime() > other.localExpirationTime() ||
+                    (localExpirationTime() == other.localExpirationTime() && 
ttl() < other.ttl());
+        }
+
         return isExpiring();
     }
 
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java 
b/src/java/org/apache/cassandra/db/rows/Cells.java
index 59f1d3f7fd..ce774080ee 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -113,6 +113,16 @@ public abstract class Cells
             // would otherwise always win (unless it had an empty value), 
until it expired and was translated to a tombstone
             if (leftLocalDeletionTime != rightLocalDeletionTime)
                 return leftLocalDeletionTime > rightLocalDeletionTime ? left : 
right;
+
+            // Both cells are either tombstones or expiring at the same 
timestamp. If expiring and the
+            // TTLs differ, write the lower one -- the write is probably from 
a more recent
+            // UPDATE USING TTL AND TIMESTAMP, so select the most recent one 
to be deterministic and be
+            // closest to client intent.
+            if (!leftIsTombstone && left.ttl() != right.ttl())
+            {
+                assert !rightIsTombstone;
+                return left.ttl() < right.ttl() ? left : right;
+            }
         }
 
         return compareValues(left, right) >= 0 ? left : right;
diff --git a/test/unit/org/apache/cassandra/db/CellTest.java 
b/test/unit/org/apache/cassandra/db/CellTest.java
index d4dec05e3b..a7c334bb79 100644
--- a/test/unit/org/apache/cassandra/db/CellTest.java
+++ b/test/unit/org/apache/cassandra/db/CellTest.java
@@ -41,7 +41,8 @@ import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static java.util.Arrays.*;
+import static java.util.Arrays.asList;
+import static org.junit.Assert.assertEquals;
 
 public class CellTest
 {
@@ -62,6 +63,8 @@ public class CellTest
                      .addRegularColumn("v", IntegerType.instance)
                      .addRegularColumn("m", 
MapType.getInstance(IntegerType.instance, IntegerType.instance, true))
                      .build();
+    public static final ByteBuffer TEST_VALUE = ByteBufferUtil.bytes("a");
+
 
     @BeforeClass
     public static void defineSchema() throws ConfigurationException
@@ -253,6 +256,86 @@ public class CellTest
         Assert.assertEquals(-1, testExpiring("val", "b", 2, 1, null, "a", 
null, 1));
     }
 
+
+    public static void assertCellsEqual(Cell<?> cellA, Cell<?> cellB)
+    {
+        assertEquals(cellA.timestamp(), cellB.timestamp());
+        assertEquals(cellA.ttl(), cellB.ttl());
+        assertEquals(cellA.localDeletionTime(), cellB.localDeletionTime());
+        assertEquals(cellA.buffer(), cellB.buffer());
+    }
+
+    static void checkCommutes(ColumnMetadata cmd, long timestamp, long tsDiff, 
int ttl, int ttlDiff, int nowInSeconds, int nowDiff)
+    {
+        long timestampA = timestamp;
+        long timestampB = timestampA + tsDiff;
+        int ttlA = ttl;
+        int ttlB = ttl + ttlDiff;
+        int nowInSecsA = nowInSeconds;
+        int nowInSecsB = nowInSecsA + nowDiff;
+        if (nowInSecsA < 0 || nowInSecsB < 0)
+            return;
+
+        Cell<?> cellA = ttlA == 0 ? BufferCell.tombstone(cmd, timestampA, 
nowInSecsA) :
+                        ttlA < 0 ? BufferCell.live(cmd, timestampA, 
TEST_VALUE) :
+                        BufferCell.expiring(cmd, timestampA, ttlA, nowInSecsA, 
TEST_VALUE);
+        Cell<?> cellB = ttlB == 0 ? BufferCell.tombstone(cmd, timestampB, 
nowInSecsB) :
+                        ttlB < 0 ? BufferCell.live(cmd, timestampB, 
TEST_VALUE) :
+                        BufferCell.expiring(cmd, timestampB, ttlB, nowInSecsB, 
TEST_VALUE);
+
+        Cell<?> cellAB = Cells.reconcile(cellA, cellB);
+        Cell<?> cellBA = Cells.reconcile(cellB, cellA);
+
+        assertCellsEqual(cellAB, cellBA);
+    }
+
+    @Test
+    public void checkSameValueDifferentLivenessCommutes()
+    {
+        ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance);
+        long[] tsDiffs = new long[] {0L,
+                                     1L, // microsecond
+                                     1000L, // millisecond
+                                     1000000L, // second
+                                     60000000L}; // minute
+        int[] ttls = new int[] { -1, 0, 1, 3600, 24 * 3600, 7 * 24 * 3600, 60 
* 24 * 3600, 366 * 24 * 3600 };
+        int[] ttlDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 
60 * 24 * 3600, 366 * 24 * 3600 };
+
+        int nowInSeconds = FBUtilities.nowInSeconds();
+        long timestamp = FBUtilities.timestampMicros();
+
+        for (long tsDiff: tsDiffs)
+        {
+            for (int ttl: ttls)
+            {
+                for (int ttlDiff : ttlDiffs)
+                {
+                    for (Integer nowDiff : ttlDiffs)
+                        checkCommutes(cmd, timestamp, tsDiff, ttl, ttlDiff, 
nowInSeconds, nowDiff);
+                }
+            }
+        }
+    }
+
+    // Checks that reconciling a cell with a smaller TTL reconcile 
commutatively
+    // Similar to rewriting data retrieved with SELECT v, TTL(v), 
WRITETIMESTAMP(v) with
+    // INSERT SET v=? USING TTL ? AND TIMESTAMP ?
+    @Test
+    public void rewriteCellWithSmallerTTL()
+    {
+        ColumnMetadata cmd = fakeColumn("c", UTF8Type.instance);
+        int[] nowDiffs = new int[] { 0, 1, 60, 3600, 24 * 3600, 7 * 24 * 3600, 
60 * 24 * 3600, 366 * 24 * 3600 };
+        long timestamp = FBUtilities.timestampMicros();
+        int nowInSeconds = FBUtilities.nowInSeconds();
+        int ttl = 3600;
+
+        for (Integer nowDiff : nowDiffs)
+        {
+            checkCommutes(cmd, timestamp, 0L, ttl, -nowDiff, nowInSeconds, 
nowDiff);
+        }
+    }
+
+
     class SimplePurger implements DeletionPurger
     {
         private final int gcBefore;
diff --git a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java 
b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
index 557870672e..0482423f9d 100644
--- a/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
+++ b/test/unit/org/apache/cassandra/db/LivenessInfoTest.java
@@ -75,6 +75,11 @@ public class LivenessInfoTest
         first = LivenessInfo.withExpirationTime(100, 
LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds + 1);
         second = LivenessInfo.withExpirationTime(100, 
LivenessInfo.EXPIRED_LIVENESS_TTL, nowInSeconds);
         assertSupersedes(first, second);
+
+        // rewritten expiring with the same expiration time and a lower TTL, 
take the lower TTL as likely to be more recent
+        first = LivenessInfo.withExpirationTime(100, 4, nowInSeconds);
+        second = LivenessInfo.withExpirationTime(100, 5, nowInSeconds);
+        assertSupersedes(first, second);
     }
 
     @Test
diff --git a/test/unit/org/apache/cassandra/db/rows/RowsTest.java 
b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
index cfeebfd2b1..47e42b2542 100644
--- a/test/unit/org/apache/cassandra/db/rows/RowsTest.java
+++ b/test/unit/org/apache/cassandra/db/rows/RowsTest.java
@@ -45,6 +45,8 @@ import 
org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
+import static org.apache.cassandra.db.CellTest.assertCellsEqual;
+
 public class RowsTest
 {
     private static final String KEYSPACE = "rows_test";
@@ -520,6 +522,58 @@ public class RowsTest
         Assert.assertEquals(0, merged.columns().size());
     }
 
+
+    public static BufferCell expiringWithExpirationTime(ColumnMetadata column, 
long timestamp, int ttl, int localDeletionTime, ByteBuffer value)
+    {
+        return expiringWithExpirationTime(column, timestamp, ttl, 
localDeletionTime, value, null);
+    }
+
+    public static BufferCell expiringWithExpirationTime(ColumnMetadata column, 
long timestamp, int ttl, int localDeletionTime, ByteBuffer value, CellPath path)
+    {
+        assert ttl != Cell.NO_TTL;
+        return new BufferCell(column, timestamp, ttl, localDeletionTime, 
value, path);
+    }
+
+    @Test
+    public void mergeRowsWithSameExpiryDifferentTTLCommutesLiveness()
+    {
+        int now1 = FBUtilities.nowInSeconds();
+        long ts1 = secondToTs(now1);
+        int ldt = now1 + 1000;
+
+        Row.Builder r1Builder = BTreeRow.unsortedBuilder();
+        r1Builder.newRow(c1);
+        LivenessInfo originalLiveness = LivenessInfo.withExpirationTime(ts1, 
100, ldt);
+        r1Builder.addPrimaryKeyLivenessInfo(originalLiveness);
+
+        Row.Builder r2Builder = BTreeRow.unsortedBuilder();
+        r2Builder.newRow(c1);
+        LivenessInfo loweredTTL = LivenessInfo.withExpirationTime(ts1, 50, 
ldt);
+        r2Builder.addPrimaryKeyLivenessInfo(loweredTTL);
+
+        Cell<?> r2v = expiringWithExpirationTime(v, ts1, 75, ldt, BB1);
+        Cell<?> r2m2 = expiringWithExpirationTime(m, ts1, 50, ldt, BB1, 
CellPath.create(BB2));
+        Cell<?> r2m3 = expiringWithExpirationTime(m, ts1, 75, ldt, BB2, 
CellPath.create(BB3));
+        Cell<?> r2m4 = expiringWithExpirationTime(m, ts1, 100, ldt, BB3, 
CellPath.create(BB4));
+        List<Cell<?>> expectedCells = Lists.newArrayList(r2v, r2m2, r2m3, 
r2m4);
+
+        expectedCells.forEach(r1Builder::addCell);
+        expectedCells.forEach(r2Builder::addCell);
+
+        Row r1 = r1Builder.build();
+        Row r2 = r2Builder.build();
+
+        Row r1r2 = Rows.merge(r1, r2);
+        Row r2r1 = Rows.merge(r2, r1);
+
+        DiffListener mergedListener = new DiffListener();
+        Rows.diff(mergedListener, r1r2, r2r1);
+
+        mergedListener.liveness.forEach(pair -> 
Assert.assertEquals(pair.merged, pair.original));
+        mergedListener.cells.forEach(pair -> assertCellsEqual(pair.merged, 
pair.original));
+    }
+
+
     // Creates a dummy cell for a (regular) column for the provided name and 
without a cellPath.
     private static Cell<?> liveCell(ColumnMetadata name)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to