This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f43d9d0f7f Add support in the binary protocol to allow transactions to 
have multiple conditions
f43d9d0f7f is described below

commit f43d9d0f7f090bcd3679097a453ff0ae2b33ee8e
Author: David Capwell <[email protected]>
AuthorDate: Mon Nov 10 12:06:42 2025 -0800

    Add support in the binary protocol to allow transactions to have multiple 
conditions
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-20883
---
 CHANGES.txt                                        |   1 +
 ...ymmetricParameterisedUnversionedSerializer.java |   5 +
 .../cassandra/service/accord/TokenRange.java       |   5 +
 .../accord/serializers/SerializePacked.java        |  83 ++-
 .../service/accord/serializers/TableMetadatas.java |  11 +
 .../cassandra/service/accord/txn/AccordUpdate.java |   2 +-
 .../cassandra/service/accord/txn/TxnQuery.java     |   2 +-
 .../cassandra/service/accord/txn/TxnUpdate.java    | 789 ++++++++++++++++-----
 .../cassandra/service/accord/txn/TxnWrite.java     |  59 +-
 .../apache/cassandra/utils/ArraySerializers.java   |  32 +
 .../cassandra/utils/CollectionSerializers.java     |   2 +-
 test/unit/org/apache/cassandra/io/Serializers.java |  10 +-
 .../serializers/CommandsForKeySerializerTest.java  |   3 +-
 .../accord/serializers/SerializePackedTest.java    | 169 +++++
 .../service/accord/txn/TxnUpdateTest.java          | 239 +++++++
 .../apache/cassandra/utils/AccordGenerators.java   |   7 +-
 .../cassandra/utils/LargeBitSetSerializerTest.java |  59 --
 .../utils/SimpleBitSetSerializersTest.java         | 113 +++
 18 files changed, 1333 insertions(+), 258 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index b4996c26ba..332107e491 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Add support in the binary protocol to allow transactions to have multiple 
conditions (CASSANDRA-20883)
  * Enable CQLSSTableWriter to create SSTables compressed with a dictionary 
(CASSANDRA-20938)
  * Support ZSTD dictionary compression (CASSANDRA-17021)
  * Fix ExceptionsTable when stacktrace has zero elements (CASSANDRA-20992)
diff --git 
a/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
 
b/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
index 69b1bd2e0c..56841a0bef 100644
--- 
a/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
+++ 
b/src/java/org/apache/cassandra/io/AsymmetricParameterisedUnversionedSerializer.java
@@ -74,5 +74,10 @@ public interface 
AsymmetricParameterisedUnversionedSerializer<In, P, Out>
         }
     }
 
+    default void skip(P p, DataInputPlus in) throws IOException
+    {
+        deserialize(p, in);
+    }
+
     long serializedSize(In t, P p);
 }
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java 
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 91be38ec2f..9c00c4dcdf 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -54,6 +54,11 @@ public class TokenRange extends Range.EndInclusive
         return new TokenRange(start, end);
     }
 
+    public static TokenRange create(TableId tableId, Token start, Token end)
+    {
+        return new TokenRange(new TokenKey(tableId, start), new 
TokenKey(tableId, end));
+    }
+
     public static TokenRange createUnsafe(TokenKey start, TokenKey end)
     {
         return new TokenRange(start, end);
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java 
b/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
index b7262c1265..67d66a7185 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/SerializePacked.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import accord.utils.BitUtils;
 import accord.utils.Invariants;
 import net.nicoulaj.compilecommand.annotations.Inline;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 
@@ -30,20 +31,91 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  * A set of simple utilities to quickly serialize/deserialize arrays/lists of 
values that each require <= 64 bits to represent.
  * These are packed into an "array" of fixed bit width, so that the total size 
consumed is ceil((bits*elements)/8).
  * This can (in future) be read directly without deserialization, by indexing 
into the byte stream directly.
+ * <p/>
+ * The serialized value is optimized for values in the range 0 to 256 
(negative will be rejected), and should produce
+ * output smaller or equal to vint serialization; when values are larger than 
256, then the packing can produce 1 extra
+ * serialized byte.  Serialization is safe in these cases, and faster to skip.
  */
 public class SerializePacked
 {
-    public static void serializePackedInts(int[] vs, int from, int to, int 
max, DataOutputPlus out) throws IOException
+    public static void serializePackedSortedIntsAndLength(int[] vs, 
DataOutputPlus out) throws IOException
+    {
+        out.writeUnsignedVInt32(vs.length);
+        serializePackedSortedInts(vs, out);
+    }
+
+    public static void serializePackedSortedInts(int[] vs, DataOutputPlus out) 
throws IOException
+    {
+        if (vs.length == 0)
+            return;
+
+        int last = vs[vs.length - 1];
+        Invariants.require(last >= 0,
+                () -> String.format("Found a negative value at offset %d; 
value %d", (Object) (vs.length - 1), (Object) last));
+        out.writeUnsignedVInt32(last);
+        serializePackedInts(vs, 0, vs.length - 1, last, out);
+    }
+
+    public static int[] deserializePackedSortedIntsAndLength(DataInputPlus in) 
throws IOException
+    {
+        return deserializePackedSortedInts(in.readUnsignedVInt32(), in);
+    }
+
+    public static int[] deserializePackedSortedInts(int length, DataInputPlus 
in) throws IOException
+    {
+        if (length == 0)
+            return new int[0];
+
+        int last = in.readUnsignedVInt32();
+        int[] vs = new int[length];
+        deserializePackedInts(vs, 0, length - 1, last, in);
+        vs[length - 1] = last;
+        return vs;
+    }
+
+    public static void skipPackedSortedIntsAndLength(DataInputPlus in) throws 
IOException
+    {
+        skipPackedSortedInts(in.readUnsignedVInt32(), in);
+    }
+
+    public static void skipPackedSortedInts(int length, DataInputPlus in) 
throws IOException
+    {
+        if (length > 0)
+        {
+            int last = in.readUnsignedVInt32();
+            skipPackedInts(0, length - 1, last, in);
+        }
+    }
+
+    public static long serializedSizeOfPackedSortedIntsAndLength(int[] vs)
+    {
+        return TypeSizes.sizeofUnsignedVInt(vs.length) + 
serializedSizeOfPackedSortedInts(vs);
+    }
+
+    public static long serializedSizeOfPackedSortedInts(int[] vs)
+    {
+        if (vs.length == 0)
+            return 0;
+        int last = vs[vs.length - 1];
+        return TypeSizes.sizeofUnsignedVInt(last) + 
serializedPackedSize(vs.length - 1, last);
+    }
+
+    public static void serializePackedInts(int[] vs, int from, int to, long 
max, DataOutputPlus out) throws IOException
     {
         serializePacked((in, i) -> in[i], vs, from, to, max, out);
     }
 
-    public static void deserializePackedInts(int[] vs, int from, int to, int 
max, DataInputPlus in) throws IOException
+    public static void deserializePackedInts(int[] vs, int from, int to, long 
max, DataInputPlus in) throws IOException
     {
         deserializePacked((out, i, v) -> out[i] = (int)v, vs, from, to, max, 
in);
     }
 
-    public static long serializedPackedIntsSize(int[] vs, int from, int to, 
int max)
+    public static void skipPackedInts(int from, int to, long max, 
DataInputPlus in) throws IOException
+    {
+        in.skipBytesFully(serializedPackedSize(to - from, max));
+    }
+
+    public static long serializedPackedIntsSize(int[] vs, int from, int to, 
long max)
     {
         return serializedPackedSize(to - from, max);
     }
@@ -60,12 +132,15 @@ public class SerializePacked
         if (bitsPerEntry == 0)
             return;
 
+        long outOfRange = -1L << bitsPerEntry;
         long buffer = 0L;
         int bufferCount = 0;
         for (int i = from; i < to; i++)
         {
             long v = adapter.get(in, i);
-            Invariants.require(v <= max);
+            int finalI = i;
+            Invariants.require(v >= 0 && (v & outOfRange) == 0,
+                    () -> String.format(v < 0 ? "Found a negative value at 
offset %d; value %d" : "Value out of range at offset %d; value %d", (Object) 
finalI, (Object) v));
             buffer |= v << bufferCount;
             bufferCount = bufferCount + bitsPerEntry;
             if (bufferCount >= 64)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java 
b/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
index cec50a0a10..e022d193ff 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/TableMetadatas.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.AbstractList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
 
 import accord.utils.Invariants;
 import accord.utils.SortedArrays;
@@ -99,6 +102,14 @@ public abstract class TableMetadatas extends 
AbstractList<TableId>
         return new One(metadata);
     }
 
+    @VisibleForTesting
+    public static Complete of(List<TableMetadata> values)
+    {
+        Collector collector = new Collector();
+        collector.addAll(values);
+        return collector.build();
+    }
+
     public static Complete ofSortedUnique(TableMetadata ... metadatas)
     {
         if (metadatas.length == 0)
diff --git a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java 
b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
index 89536d2058..f6d50763a4 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/AccordUpdate.java
@@ -72,7 +72,7 @@ public abstract class AccordUpdate implements Update
 
     }
 
-    public boolean checkCondition(Data data)
+    public boolean checkAnyConditionMatch(Data data)
     {
         throw new UnsupportedOperationException();
     }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
index b0bcc5c58b..e380b87fe6 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnQuery.java
@@ -119,7 +119,7 @@ public abstract class TxnQuery implements Query
 
             AccordUpdate accordUpdate = (AccordUpdate)update;
             TxnData txnData = (TxnData)data;
-            boolean conditionCheck = accordUpdate.checkCondition(data);
+            boolean conditionCheck = accordUpdate.checkAnyConditionMatch(data);
             // If the condition applied an empty result indicates success
             if (conditionCheck)
                 return new TxnData();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index f88b1eb3b8..2affc5d778 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -25,11 +25,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.function.Function;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.Data;
+import accord.api.Key;
 import accord.api.Update;
 import accord.primitives.Keys;
 import accord.primitives.Participants;
@@ -37,17 +39,22 @@ import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
 import accord.primitives.Timestamp;
 import accord.utils.Invariants;
+import accord.utils.SimpleBitSet;
+import accord.utils.SimpleBitSets;
+import accord.utils.SortedArrays;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
+import org.apache.cassandra.io.UnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.PreserveTimestamp;
 import org.apache.cassandra.service.accord.AccordObjectSizes;
 import org.apache.cassandra.service.accord.IAccordService;
 import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.serializers.SerializePacked;
 import org.apache.cassandra.service.accord.serializers.TableMetadatas;
 import org.apache.cassandra.service.accord.serializers.TableMetadatasAndKeys;
 import org.apache.cassandra.service.accord.serializers.Version;
@@ -55,17 +62,19 @@ import 
org.apache.cassandra.service.accord.txn.TxnCondition.SerializedTxnConditi
 import org.apache.cassandra.service.accord.txn.TxnWrite.Fragment;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.transport.ProtocolVersion;
-import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ArraySerializers;
+import org.apache.cassandra.utils.CollectionSerializers;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.Pair;
 
+import static accord.utils.ArrayBuffers.cachedInts;
 import static accord.utils.Invariants.requireArgument;
 import static accord.utils.SortedArrays.Search.CEIL;
 import static com.google.common.base.Preconditions.checkState;
 import static java.lang.Boolean.FALSE;
 import static 
org.apache.cassandra.service.accord.AccordSerializers.consistencyLevelSerializer;
-import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
-import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
-import static org.apache.cassandra.utils.ArraySerializers.serializedArraySize;
+import static 
org.apache.cassandra.service.accord.txn.TxnUpdate.BlockFragment.NO_BLOCK_FRAGMENTS;
+import static 
org.apache.cassandra.service.accord.txn.TxnUpdate.ConditionalBlock.NO_CONDITIONAL_BLOCKS;
 import static org.apache.cassandra.utils.ArraySerializers.skipArray;
 import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
 import static 
org.apache.cassandra.utils.ByteBufferUtil.serializedSizeWithVIntLength;
@@ -77,13 +86,454 @@ import static 
org.apache.cassandra.utils.NullableSerializer.serializedNullableSi
 
 public class TxnUpdate extends AccordUpdate
 {
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
TxnUpdate(TableMetadatas.none(), null, new ByteBuffer[0], null, null, 
PreserveTimestamp.no));
+    static class ConditionalBlock
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
ConditionalBlock(0, null, null));
+        static final ConditionalBlock[] NO_CONDITIONAL_BLOCKS = new 
ConditionalBlock[0];
+        public static final UnversionedSerializer<ConditionalBlock> serializer 
= new UnversionedSerializer<>()
+        {
+            @Override
+            public void serialize(ConditionalBlock t, DataOutputPlus out) 
throws IOException
+            {
+                out.writeUnsignedVInt32(t.id);
+                writeWithVIntLength(t.condition.bytes(), out);
+                
SerializePacked.serializePackedSortedIntsAndLength(t.fragmentIds, out);
+            }
+
+            @Override
+            public ConditionalBlock deserialize(DataInputPlus in) throws 
IOException
+            {
+                int id = in.readUnsignedVInt32();
+                SerializedTxnCondition condition = new 
SerializedTxnCondition(readWithVIntLength(in));
+
+                // Deserialize mutations
+                int[] mutations = 
SerializePacked.deserializePackedSortedIntsAndLength(in);
+                return new ConditionalBlock(id, condition, mutations);
+            }
+
+            @Override
+            public void skip(DataInputPlus in) throws IOException
+            {
+                in.readUnsignedVInt32();
+                skipWithVIntLength(in);
+                SerializePacked.skipPackedSortedIntsAndLength(in);
+            }
+
+            @Override
+            public long serializedSize(ConditionalBlock t)
+            {
+                long size = TypeSizes.sizeofUnsignedVInt(t.id);
+                size += serializedSizeWithVIntLength(t.condition.bytes());
+                size += 
SerializePacked.serializedSizeOfPackedSortedIntsAndLength(t.fragmentIds);
+                return size;
+            }
+        };
+
+        final int id;
+        final SerializedTxnCondition condition;
+        final int[] fragmentIds;
+
+        ConditionalBlock(int id, SerializedTxnCondition condition, int[] 
fragmentIds)
+        {
+            this.id = id;
+            this.condition = condition;
+            this.fragmentIds = fragmentIds;
+        }
+
+        public long estimatedSizeOnHeap()
+        {
+            long size = EMPTY_SIZE;
+            size += condition.estimatedSizeOnHeap();
+            size += ObjectSizes.sizeOfArray(fragmentIds);
+            return size;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o == null || getClass() != o.getClass()) return false;
+            ConditionalBlock that = (ConditionalBlock) o;
+            return id == that.id && condition.equals(that.condition) && 
Arrays.equals(fragmentIds, that.fragmentIds);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(id, condition, Arrays.hashCode(fragmentIds));
+        }
+
+        public void toString(StringBuilder sb, TableMetadatas tables, Block 
block)
+        {
+            sb.append("{condition=")
+              .append(condition.deserialize(tables))
+              .append(", fragments=")
+              .append(deserialize(tables, block, fragmentIds))
+              .append('}');
+        }
+
+        public ConditionalBlock merge(ConditionalBlock that)
+        {
+            requireArgument(this.id == that.id, "Tried to merge different 
blocks; expected %d but given %d", this.id, that.id);
+            return new ConditionalBlock(id, condition, 
SortedArrays.linearUnion(this.fragmentIds, 0, this.fragmentIds.length, 
that.fragmentIds, 0, that.fragmentIds.length, cachedInts()));
+        }
+    }
+
+    static class BlockFragment
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
BlockFragment(0, null, null));
+        static final BlockFragment[] NO_BLOCK_FRAGMENTS = new BlockFragment[0];
+        public static final ParameterisedUnversionedSerializer<BlockFragment, 
TableMetadatasAndKeys> serializer = new ParameterisedUnversionedSerializer<>()
+        {
+            @Override
+            public void serialize(BlockFragment t, TableMetadatasAndKeys p, 
DataOutputPlus out) throws IOException
+            {
+                out.writeUnsignedVInt32(t.id);
+                p.serializeKey(t.key, out);
+                writeWithVIntLength(t.bytes, out);
+            }
+
+            @Override
+            public BlockFragment deserialize(TableMetadatasAndKeys p, 
DataInputPlus in) throws IOException
+            {
+                int id = in.readUnsignedVInt32();
+                PartitionKey key = p.deserializeKey(in);
+                ByteBuffer bytes = readWithVIntLength(in);
+                return new BlockFragment(id, key, bytes);
+            }
+
+            @Override
+            public void skip(TableMetadatasAndKeys p, DataInputPlus in) throws 
IOException
+            {
+                in.readUnsignedVInt32();
+                p.skipKeys(in);
+                skipWithVIntLength(in);
+            }
+
+            @Override
+            public long serializedSize(BlockFragment t, TableMetadatasAndKeys 
p)
+            {
+                long size = TypeSizes.sizeofUnsignedVInt(t.id);
+                size += p.serializedKeySize(t.key);
+                size += serializedSizeWithVIntLength(t.bytes);
+                return size;
+            }
+        };
+
+        final int id;
+        final PartitionKey key;
+        final ByteBuffer bytes;
+
+        BlockFragment(int id, PartitionKey key, ByteBuffer bytes)
+        {
+            this.id = id;
+            this.key = key;
+            this.bytes = bytes;
+        }
+
+        public boolean equals(Object that)
+        {
+            return that instanceof BlockFragment && equals((BlockFragment) 
that);
+        }
+
+        public boolean equals(BlockFragment that)
+        {
+            return this.id == that.id && this.key.equals(that.key) && 
this.bytes.equals(that.bytes);
+        }
+
+        public long estimatedSizeOnHeap()
+        {
+            long size = EMPTY_SIZE;
+            size += ObjectSizes.sizeOnHeapOf(bytes);
+            // don't count key as reference to key in parent
+            return size;
+        }
+    }
+
+    static class Block
+    {
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
Block(null, null));
+        public static final ParameterisedUnversionedSerializer<Block, 
TableMetadatasAndKeys> serializer = new ParameterisedUnversionedSerializer<>()
+        {
+            @Override
+            public void serialize(Block t, TableMetadatasAndKeys p, 
DataOutputPlus out) throws IOException
+            {
+                ArraySerializers.serializeArray(t.fragments, p, out, 
BlockFragment.serializer);
+                ArraySerializers.serializeArray(t.conditionalBlocks, out, 
ConditionalBlock.serializer);
+            }
+
+            @Override
+            public Block deserialize(TableMetadatasAndKeys p, DataInputPlus 
in) throws IOException
+            {
+                BlockFragment[] fragments = 
ArraySerializers.deserializeArray(p, in, BlockFragment.serializer, 
BlockFragment[]::new);
+                ConditionalBlock[] conditionalBlocks = 
ArraySerializers.deserializeArray(in, ConditionalBlock.serializer, 
ConditionalBlock[]::new);
+                return new Block(fragments, conditionalBlocks);
+            }
+
+            @Override
+            public void skip(TableMetadatasAndKeys p, DataInputPlus in) throws 
IOException
+            {
+                ArraySerializers.skipArray(p, in, BlockFragment.serializer);
+                ArraySerializers.skipArray(in, ConditionalBlock.serializer);
+            }
+
+            @Override
+            public long serializedSize(Block t, TableMetadatasAndKeys p)
+            {
+                long size = 0;
+                size += ArraySerializers.serializedArraySize(t.fragments, p, 
BlockFragment.serializer);
+                size += 
ArraySerializers.serializedArraySize(t.conditionalBlocks, 
ConditionalBlock.serializer);
+                return size;
+            }
+        };
+
+        final BlockFragment[] fragments;
+        final ConditionalBlock[] conditionalBlocks;
+
+        Block(BlockFragment[] fragments, ConditionalBlock[] conditionalBlocks)
+        {
+            this.fragments = fragments;
+            this.conditionalBlocks = conditionalBlocks;
+        }
+
+        public long estimatedSizeOnHeap()
+        {
+            long size = EMPTY_SIZE;
+            size += ObjectSizes.sizeOfArray(fragments);
+            for (BlockFragment bf : fragments)
+                size += bf.estimatedSizeOnHeap();
+            for (ConditionalBlock conditionalBlock : conditionalBlocks)
+                size += conditionalBlock.estimatedSizeOnHeap();
+            return size;
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if (o == null || getClass() != o.getClass()) return false;
+            Block block = (Block) o;
+            return Arrays.equals(fragments, block.fragments) && 
Arrays.equals(conditionalBlocks, block.conditionalBlocks);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(Arrays.hashCode(fragments), 
Arrays.hashCode(conditionalBlocks));
+        }
+
+        public void toString(StringBuilder sb, TableMetadatas tables)
+        {
+            sb.append("{conditionalBlocks=[");
+            for (int j = 0; j < conditionalBlocks.length; j++)
+            {
+                if (j > 0) sb.append(", ");
+                conditionalBlocks[j].toString(sb, tables, this);
+            }
+            sb.append("]}");
+        }
+
+        public Block select(Keys keys)
+        {
+            int[] outFragmentIds = cachedInts().getInts(fragments.length);
+            BlockFragment[] outFragments;
+            int count = 0;
+            {
+                {
+                    int i = 0, j = 0;
+                    while (i < keys.size() && j < fragments.length)
+                    {
+                        Key key = keys.get(i++);
+                        j = SortedArrays.exponentialSearch(fragments, j, 
fragments.length, key, (k, b) -> k.compareTo(b.key), CEIL);
+                        if (j < 0) j = -1 - j;
+                        else
+                        {
+                            do outFragmentIds[count++] = j;
+                            while (++j < fragments.length && 
fragments[j].key.equals(key));
+                        }
+                    }
+                }
+
+                if (count == fragments.length)
+                    return this;
+
+                if (count == 0)
+                    return new Block(NO_BLOCK_FRAGMENTS, 
NO_CONDITIONAL_BLOCKS);
+
+                outFragments = new BlockFragment[count];
+                for (int i = 0 ; i < count ; ++i)
+                {
+                    outFragments[i] = fragments[outFragmentIds[i]];
+                    outFragmentIds[i] = outFragments[i].id;
+                }
+            }
+
+            ConditionalBlock[] outConditions;
+            {
+                List<ConditionalBlock> collect = null;
+                for (int i = 0 ; i < conditionalBlocks.length ; ++i)
+                {
+                    ConditionalBlock cb = conditionalBlocks[i];
+                    int[] cbOutFragmentIds = 
SortedArrays.linearIntersection(cb.fragmentIds, 0, cb.fragmentIds.length, 
outFragmentIds, 0, count, cachedInts());
+                    //noinspection ArrayEquality
+                    if (cbOutFragmentIds != cb.fragmentIds) // when arrays are 
equal the cb.fragmentIds gets returned unchanged, so can do a pointer check to 
detect a change
+                    {
+                        if (collect == null)
+                        {
+                            collect = new ArrayList<>(conditionalBlocks.length 
- 1);
+                            for (int j = 0 ; j < i ; ++j) //TODO (review): why 
do we include the previous blocks that "should" have empty fragments, but we 
provide them without empty fragments?
+                                collect.add(conditionalBlocks[j]);
+                        }
+                        if (cbOutFragmentIds.length > 0)
+                            collect.add(new ConditionalBlock(cb.id, 
cb.condition, cbOutFragmentIds));
+                    }
+                }
+                if (collect == null) outConditions = conditionalBlocks;
+                else if (collect.isEmpty()) outConditions = 
NO_CONDITIONAL_BLOCKS;
+                else outConditions = collect.toArray(ConditionalBlock[]::new);
+            }
+
+            cachedInts().forceDiscard(outFragmentIds);
+            return new Block(outFragments, outConditions);
+        }
+
+        public Block merge(Block that)
+        {
+            BlockFragment[] outFragments;
+            if (this.fragments.length == 0) outFragments = that.fragments;
+            else if (that.fragments.length == 0) outFragments = this.fragments;
+            else
+            {
+                int minId = Math.min(this.fragments[0].id, 
that.fragments[0].id);
+                int maxId = Math.max(this.fragments[this.fragments.length - 
1].id, that.fragments[that.fragments.length - 1].id);
+                outFragments = new 
BlockFragment[Math.min(this.fragments.length + that.fragments.length, 1 + 
(maxId - minId))];
+
+                int i = 0, j = 0, count = 0;
+                while (i < this.fragments.length || j < that.fragments.length)
+                {
+                    int cmp;
+                    if (i == this.fragments.length) cmp = 1;
+                    else if (j == that.fragments.length) cmp = -1;
+                    else cmp = this.fragments[i].id - that.fragments[j].id;
+
+                    if (cmp <= 0)
+                    {
+                        outFragments[count] = this.fragments[i];
+                        ++i;
+                        j += cmp == 0 ? 1 : 0;
+                    }
+                    else
+                    {
+                        outFragments[count] = that.fragments[j];
+                        ++j;
+                    }
+                    ++count;
+                }
+
+                if (count != outFragments.length)
+                    outFragments = Arrays.copyOf(outFragments, count);
+            }
+
+            ConditionalBlock[] outConditions;
+            if (this.conditionalBlocks.length == 0) outConditions = 
that.conditionalBlocks;
+            else if (that.conditionalBlocks.length == 0) outConditions = 
this.conditionalBlocks;
+            else
+            {
+                int minId = Math.min(this.conditionalBlocks[0].id, 
that.conditionalBlocks[0].id);
+                int maxId = 
Math.max(this.conditionalBlocks[this.conditionalBlocks.length - 1].id, 
that.conditionalBlocks[that.conditionalBlocks.length - 1].id);
+                outConditions = new 
ConditionalBlock[Math.min(this.conditionalBlocks.length + 
that.conditionalBlocks.length, 1 + maxId - minId)];
+                int i = 0, j = 0, count = 0;
+                while (i < this.conditionalBlocks.length || j < 
that.conditionalBlocks.length)
+                {
+                    int cmp;
+                    if (i == this.conditionalBlocks.length) cmp = 1;
+                    else if (j == that.conditionalBlocks.length) cmp = -1;
+                    else cmp = this.conditionalBlocks[i].id - 
that.conditionalBlocks[j].id;
+
+                    if (cmp == 0)
+                        outConditions[count] = 
this.conditionalBlocks[i++].merge(that.conditionalBlocks[j++]);
+                    else if (cmp < 0)
+                        outConditions[count] = this.conditionalBlocks[i++];
+                    else
+                        outConditions[count] = that.conditionalBlocks[j++];
+                    ++count;
+                }
+                if (count < outConditions.length)
+                    outConditions = Arrays.copyOf(outConditions, count);
+            }
+            return new Block(outFragments, outConditions);
+        }
+    }
+
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
TxnUpdate(TableMetadatas.none(), Keys.EMPTY, Collections.emptyList(), null, 
PreserveTimestamp.no));
     private static final int FLAG_PRESERVE_TIMESTAMPS = 0x1;
 
     final TableMetadatas tables;
-    private final Keys keys;
-    private final ByteBuffer[] fragments;
-    private final SerializedTxnCondition condition;
+    final Keys keys;
+    /**
+     * CASSANDRA-20883 added this logic in, but didn't update the CQL layer to 
leverage it; left for follow-up work.
+     * <p>
+     * The reason for this setup is to allow the following in CQL (any any 
combination of them):
+     * <p>
+     * <code>
+     *     IF cond1 THEN
+     *       mutation1
+     *     ELSE
+     *       mutation2
+     *     END IF
+     * </code>
+     * <p>
+     * <code>
+     *     IF cond1 THEN
+     *       mutation1
+     *       IF cond2 THEN
+     *         mutation2
+     *       ELSE
+     *         mutation3
+     *       END IF
+     *     ELSE IF cond3 THEN
+     *       mutation4
+     *     END IF
+     * </code>
+     * <p>
+     * and lastly
+     * <p>
+     * <code>
+     *     IF cond THEN
+     *       mutation1
+     *     END IF
+     *     mutation2
+     * </code>
+     * <p>
+     * Each {@link Block} represents a single <code>IF / END IF</code> block.
+     * Each {@link ConditionalBlock} represents a single condition with its 
mutations
+     * <p>
+     * Given the flat structure, you must rewrite the <code>IF / END IF</code> 
into this structure, so for cases like nested IF they should uplift the 
conditions as so
+     * <p>
+     * Before
+     * <code>
+     *     IF cond1 THEN
+     *       mutation1
+     *       IF cond2 THEN
+     *         mutation2
+     *       ELSE
+     *         mutation3
+     *       END IF
+     *     END IF
+     * </code>
+     * <p>
+     * After
+     * <code>
+     *     IF cond1 AND cond2 THEN
+     *       mutation1
+     *       mutation2
+     *     ELSE IF cond1
+     *       mutation1
+     *     END IF
+     * </code>
+     * <p>
+     * When a non-conditional set of mutations exists with conditional ones, 
then the non-conditional mutations should
+     * be in their own block with a empty condition.
+     */
+    final List<Block> blocks;
 
     @Nullable
     private final ConsistencyLevel cassandraCommitCL;
@@ -94,46 +544,51 @@ public class TxnUpdate extends AccordUpdate
     private final PreserveTimestamp preserveTimestamps;
 
     // Memoize computation of condition
-    private Boolean conditionResult;
+    private Boolean anyConditionResult;
 
     public TxnUpdate(TableMetadatas tables, List<Fragment> fragments, 
TxnCondition condition, @Nullable ConsistencyLevel cassandraCommitCL, 
PreserveTimestamp preserveTimestamps)
     {
         requireArgument(cassandraCommitCL == null || 
IAccordService.SUPPORTED_COMMIT_CONSISTENCY_LEVELS.contains(cassandraCommitCL));
+        fragments.sort(Fragment::compareKeys);
         this.tables = tables;
         this.keys = Keys.of(fragments, fragment -> fragment.key);
-        fragments.sort(Fragment::compareKeys);
+
+        BlockFragment[] blockFragments = new BlockFragment[fragments.size()];
         // TODO (required): this node could be on version N while the peers 
are on N-1, which would have issues as the peers wouldn't know about N yet.
         //  Can not eagerly serialize until we know the "correct" version, 
else we need a way to fallback on mismatch.
-        this.fragments = toSerializedValuesArray(keys, fragments, tables, 
Version.LATEST);
-        // TODO (desired): slice TxnCondition, or pick a single shard to 
persist it
-        this.condition = new SerializedTxnCondition(condition, tables);
-        this.condition.unmemoize();
-        this.condition.deserialize(tables);
+        int[] fragmentIds = new int[fragments.size()];
+        for (int i = 0 ; i < fragments.size() ; ++i)
+        {
+            blockFragments[i] = new BlockFragment(i, fragments.get(i).key, 
Fragment.FragmentSerializer.serialize(fragments.get(i), tables, 
Version.LATEST));
+            fragmentIds[i] = i;
+        }
+
+        SerializedTxnCondition serializedCondition = new 
SerializedTxnCondition(condition, tables);
+        this.blocks = Collections.singletonList(new Block(blockFragments, new 
ConditionalBlock[] { new ConditionalBlock(0, serializedCondition, fragmentIds) 
}));
         this.cassandraCommitCL = cassandraCommitCL;
         this.preserveTimestamps = preserveTimestamps;
     }
 
-    private TxnUpdate(TableMetadatas tables, Keys keys, ByteBuffer[] 
fragments, SerializedTxnCondition condition, ConsistencyLevel 
cassandraCommitCL, PreserveTimestamp preserveTimestamps)
+    private TxnUpdate(TableMetadatas tables, Keys keys, List<Block> blocks, 
ConsistencyLevel cassandraCommitCL, PreserveTimestamp preserveTimestamps)
     {
         this.tables = tables;
         this.keys = keys;
-        this.fragments = fragments;
-        this.condition = condition;
+        this.blocks = blocks;
         this.cassandraCommitCL = cassandraCommitCL;
         this.preserveTimestamps = preserveTimestamps;
     }
 
     public static TxnUpdate empty()
     {
-        return new TxnUpdate(TableMetadatas.none(), Collections.emptyList(), 
TxnCondition.none(), null, PreserveTimestamp.no);
+        return new TxnUpdate(TableMetadatas.none(), Keys.EMPTY, 
Collections.emptyList(), null, PreserveTimestamp.no);
     }
 
     @Override
     public long estimatedSizeOnHeap()
     {
-        long size = EMPTY_SIZE + condition.estimatedSizeOnHeap();
-        for (ByteBuffer update : fragments)
-            size += ByteBufferUtil.estimatedSizeOnHeap(update);
+        long size = EMPTY_SIZE;
+        for (Block block : blocks)
+            size += block.estimatedSizeOnHeap();
         size += AccordObjectSizes.keys(keys);
         return size;
     }
@@ -141,8 +596,14 @@ public class TxnUpdate extends AccordUpdate
     @Override
     public String toString()
     {
-        return "TxnUpdate{updates=" + deserialize(keys, tables, fragments) +
-               ", condition=" + condition.deserialize(tables) + '}';
+        StringBuilder sb = new StringBuilder("TxnUpdate{blocks=[");
+        for (int i = 0; i < blocks.size(); i++)
+        {
+            if (i > 0) sb.append(", ");
+            blocks.get(i).toString(sb, tables);
+        }
+        sb.append("]}");
+        return sb.toString();
     }
 
     @Override
@@ -151,21 +612,18 @@ public class TxnUpdate extends AccordUpdate
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         TxnUpdate txnUpdate = (TxnUpdate) o;
-        return Arrays.equals(fragments, txnUpdate.fragments) && 
Objects.equals(condition, txnUpdate.condition);
+        return Objects.equals(blocks, txnUpdate.blocks);
     }
 
     @Override
     public int hashCode()
     {
-        int result = Objects.hash(condition);
-        result = 31 * result + Arrays.hashCode(fragments);
-        return result;
+        return Objects.hash(blocks);
     }
 
     @Override
     public Keys keys()
     {
-        // TODO: It doesn't seem to affect correctness, but should we return 
the union of the fragment + condition keys?
         return keys;
     }
 
@@ -177,58 +635,39 @@ public class TxnUpdate extends AccordUpdate
     }
 
     @Override
-    public Update slice(Ranges ranges)
+    public TxnUpdate slice(Ranges ranges)
     {
-        Keys keys = this.keys.slice(ranges);
-        // TODO (desired): Slice the condition.
-        return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), 
condition, cassandraCommitCL, preserveTimestamps);
+        return getTxnUpdate(keys -> keys.slice(ranges));
     }
 
     @Override
-    public Update intersecting(Participants<?> participants)
+    public TxnUpdate intersecting(Participants<?> participants)
     {
-        Keys keys = this.keys.intersecting(participants);
-        // TODO (desired): Slice the condition.
-        return new TxnUpdate(tables, keys, select(this.keys, keys, fragments), 
condition, cassandraCommitCL, preserveTimestamps);
+        return getTxnUpdate(keys -> keys.intersecting(participants));
     }
 
-    private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from)
+    @VisibleForTesting
+    TxnUpdate getTxnUpdate(Function<Keys, Keys> fn)
     {
-        ByteBuffer[] result = new ByteBuffer[out.size()];
-        int j = 0;
-        for (int i = 0 ; i < out.size() ; ++i)
-        {
-            j = in.findNext(j, out.get(i), CEIL);
-            result[i] = from[j];
-        }
-        return result;
+        Keys newKeys = fn.apply(keys);
+        List<Block> blocks = new ArrayList<>(this.blocks.size());
+        for (Block block : this.blocks)
+            blocks.add(block.select(newKeys));
+        return new TxnUpdate(tables, newKeys, blocks, cassandraCommitCL, 
preserveTimestamps);
     }
 
     @Override
-    public Update merge(Update update)
+    public TxnUpdate merge(Update update)
     {
         TxnUpdate that = (TxnUpdate) update;
-        Keys mergedKeys = this.keys.with(that.keys);
-        // TODO (desired): special method for linear merging keyed and 
non-keyed lists simultaneously
-        ByteBuffer[] mergedFragments = merge(this.keys, that.keys, 
this.fragments, that.fragments, mergedKeys.size());
-        return new TxnUpdate(tables, mergedKeys, mergedFragments, condition, 
cassandraCommitCL, preserveTimestamps);
-    }
-
-    private static ByteBuffer[] merge(Keys leftKeys, Keys rightKeys, 
ByteBuffer[] left, ByteBuffer[] right, int outputSize)
-    {
-        ByteBuffer[] out = new ByteBuffer[outputSize];
-        int l = 0, r = 0, o = 0;
-        while (l < leftKeys.size() && r < rightKeys.size())
-        {
-            int c = leftKeys.get(l).compareTo(rightKeys.get(r));
-            if (c < 0) { out[o++] = left[l++]; }
-            else if (c > 0) { out[o++] = right[r++]; }
-            else if (ByteBufferUtil.compareUnsigned(left[l], right[r]) != 0) { 
throw new IllegalStateException("The same keys have different values in each 
input"); }
-            else { out[o++] = left[l++]; r++; }
-        }
-        while (l < leftKeys.size()) { out[o++] = left[l++]; }
-        while (r < rightKeys.size()) { out[o++] = right[r++]; }
-        return out;
+        requireArgument(that.blocks.size() == this.blocks.size(), "Blocks dont 
have the same sizes; expected %d but was %d", this.blocks.size(), 
that.blocks.size());
+        Keys keys = this.keys.with(that.keys);
+
+        List<Block> mergedBlocks = new ArrayList<>(this.blocks.size());
+        for (int i = 0; i < this.blocks.size(); i++)
+            mergedBlocks.add(this.blocks.get(i).merge(that.blocks.get(i)));
+        
+        return new TxnUpdate(tables, keys, mergedBlocks, cassandraCommitCL, 
preserveTimestamps);
     }
 
     @Override
@@ -236,33 +675,43 @@ public class TxnUpdate extends AccordUpdate
     {
         ClusterMetadata cm = ClusterMetadata.current();
         checkState(cm.epoch.getEpoch() >= executeAt.epoch(), "TCM epoch %d is 
< executeAt epoch %d", cm.epoch.getEpoch(), executeAt.epoch());
-        if (!checkCondition(data))
-            return TxnWrite.EMPTY_CONDITION_FAILED;
-
-        if (keys.isEmpty())
-            return new TxnWrite(TableMetadatas.none(), 
Collections.emptyList(), true);
 
-        List<Fragment> fragments = deserialize(keys, tables, this.fragments);
-        List<TxnWrite.Update> updates = new ArrayList<>(fragments.size());
-        QueryOptions options = 
QueryOptions.forProtocolVersion(ProtocolVersion.CURRENT);
-        AccordUpdateParameters parameters = new 
AccordUpdateParameters((TxnData) data, options, executeAt.uniqueHlc());
+        Pair<List<TxnWrite.Update>, SimpleBitSet> pair = 
processCondition(executeAt, data);
+        if (pair == null)
+            return new TxnWrite(TableMetadatas.none(), 
Collections.emptyList(), SimpleBitSets.allUnset(numConditionalBlocks()));
 
-        for (Fragment fragment : fragments)
-            // Filter out fragments that already constitute complete updates 
to avoid persisting them via TxnWrite:
-            if (!fragment.isComplete())
-                updates.add(fragment.complete(parameters, tables));
+        List<TxnWrite.Update> allUpdates = pair.left;
+        SimpleBitSet conditionalBlockBitSet = pair.right;
+        if (keys.isEmpty())
+            return new TxnWrite(TableMetadatas.none(), 
Collections.emptyList(), SimpleBitSets.allSet(numConditionalBlocks()));
 
-        return new TxnWrite(tables, updates, true);
+        return new TxnWrite(tables, allUpdates, conditionalBlockBitSet);
     }
 
-    public List<TxnWrite.Update> completeUpdatesForKey(RoutableKey key)
+    
+    private boolean checkCondition(Data data, SerializedTxnCondition condition)
     {
-        List<Fragment> fragments = deserialize(keys, tables, this.fragments);
-        List<TxnWrite.Update> updates = new ArrayList<>(fragments.size());
+        TxnCondition deserializedCondition = condition.deserialize(tables);
+        if (deserializedCondition == TxnCondition.none())
+            return true;
+        return deserializedCondition.applies((TxnData) data);
+    }
 
-        for (Fragment fragment : fragments)
-            if (fragment.isComplete() && fragment.key.equals(key))
-                updates.add(fragment.toUpdate(tables));
+    public List<TxnWrite.Update> completeUpdatesForKey(SimpleBitSet 
conditionalBlockBitSet, RoutableKey key)
+    {
+        List<TxnWrite.Update> updates = new ArrayList<>();
+        
+        for (Block block : blocks)
+        {
+            for (ConditionalBlock conditionalBlock : block.conditionalBlocks)
+            {
+                if (!conditionalBlockBitSet.get(conditionalBlock.id)) continue;
+                List<Fragment> fragments = deserialize(tables, block, 
conditionalBlock.fragmentIds);
+                for (Fragment fragment : fragments)
+                    if (fragment.isComplete() && fragment.key.equals(key))
+                        updates.add(fragment.toUpdate(tables));
+            }
+        }
 
         return updates;
     }
@@ -273,13 +722,12 @@ public class TxnUpdate extends AccordUpdate
         public void serialize(TxnUpdate update, TableMetadatasAndKeys 
tablesAndKeys, DataOutputPlus out, Version version) throws IOException
         {
             // Serializing it with the condition result set shouldn't be needed
-            checkState(update.conditionResult == null, "Can't serialize if 
conditionResult is set without adding it to serialization");
+            checkState(update.anyConditionResult == null, "Can't serialize if 
conditionResult is set without adding it to serialization");
             // Once in accord "mixedTimeSource" and "yes" are the same, so 
only care about the side effect: that the timestamp is preserved or not
             out.writeByte(update.preserveTimestamps.preserve ? 
FLAG_PRESERVE_TIMESTAMPS : 0);
             tablesAndKeys.serializeKeys(update.keys, out);
-            writeWithVIntLength(update.condition.bytes(), out);
-            serializeArray(update.fragments, out, 
ByteBufferUtil.byteBufferSerializer);
             serializeNullable(update.cassandraCommitCL, out, 
consistencyLevelSerializer);
+            CollectionSerializers.serializeList(update.blocks, tablesAndKeys, 
out, Block.serializer);
         }
 
         @Override
@@ -288,20 +736,18 @@ public class TxnUpdate extends AccordUpdate
             int flags = in.readByte();
             boolean preserveTimestamps = (FLAG_PRESERVE_TIMESTAMPS & flags) == 
1;
             Keys keys = tablesAndKeys.deserializeKeys(in);
-            ByteBuffer condition = readWithVIntLength(in);
-            ByteBuffer[] fragments = deserializeArray(in, 
ByteBufferUtil.byteBufferSerializer, ByteBuffer[]::new);
             ConsistencyLevel consistencyLevel = deserializeNullable(in, 
consistencyLevelSerializer);
-            return new TxnUpdate(tablesAndKeys.tables, keys, fragments, new 
SerializedTxnCondition(condition), consistencyLevel, preserveTimestamps ? 
PreserveTimestamp.yes : PreserveTimestamp.no);
+            List<Block> blocks = 
CollectionSerializers.deserializeList(tablesAndKeys, in, Block.serializer);
+
+            return new TxnUpdate(tablesAndKeys.tables, keys, blocks, 
consistencyLevel, preserveTimestamps ? PreserveTimestamp.yes : 
PreserveTimestamp.no);
         }
 
         @Override
         public void skip(TableMetadatasAndKeys tablesAndKeys, DataInputPlus 
in, Version version) throws IOException
         {
-            in.readByte();
-            tablesAndKeys.skipKeys(in);
-            skipWithVIntLength(in);
-            skipArray(in, ByteBufferUtil.byteBufferSerializer);
-            deserializeNullable(in, consistencyLevelSerializer);
+            in.readByte(); // flags
+            deserializeNullable(in, consistencyLevelSerializer); // 
consistency level
+            skipArray(tablesAndKeys, in, Block.serializer);
         }
 
         @Override
@@ -309,105 +755,102 @@ public class TxnUpdate extends AccordUpdate
         {
             long size = 1; // flags
             size += tablesAndKeys.serializedKeysSize(update.keys);
-            size += serializedSizeWithVIntLength(update.condition.bytes());
-            size += serializedArraySize(update.fragments, 
ByteBufferUtil.byteBufferSerializer);
             size += serializedNullableSize(update.cassandraCommitCL, 
consistencyLevelSerializer);
+            size += CollectionSerializers.serializedListSize(update.blocks, 
tablesAndKeys, Block.serializer);
             return size;
         }
     };
 
-    private static ByteBuffer[] toSerializedValuesArray(Keys keys, 
List<Fragment> items, TableMetadatas tables, Version version)
+    private static List<Fragment> deserialize(TableMetadatas tables, Block 
block, int[] includeFragmentIds)
     {
-        ByteBuffer[] result = new ByteBuffer[keys.size()];
-        int i = 0, mi = items.size(), ki = 0;
-        while (i < mi)
+        List<Fragment> result = new ArrayList<>(includeFragmentIds.length);
+        int i = 0;
+        for (int fragmentId : includeFragmentIds)
         {
-            PartitionKey key = items.get(i).key;
-            int j = i + 1;
-            while (j < mi && items.get(j).key.equals(key))
-                ++j;
-
-            int nextki = keys.findNext(ki, key, CEIL);
-            Arrays.fill(result, ki, nextki, ByteBufferUtil.EMPTY_BYTE_BUFFER);
-            ki = nextki;
-            result[ki++] = toSerializedValues(items, tables, i, j, version);
-            i = j;
+            while (block.fragments[i].id < fragmentId)
+                ++i;
+
+            Invariants.require(block.fragments[i].id == fragmentId);
+            BlockFragment fragment = block.fragments[i];
+            try (DataInputBuffer in = new DataInputBuffer(fragment.bytes, 
true))
+            {
+                Version version = Version.fromVersion(in.readUnsignedVInt32());
+                result.add(Fragment.serializer.deserialize(fragment.key, 
tables, in, version));
+            }
+            catch (IOException e)
+            {
+                throw new RuntimeException(e);
+            }
         }
-        Arrays.fill(result, ki, result.length, 
ByteBufferUtil.EMPTY_BYTE_BUFFER);
         return result;
     }
 
-    private static ByteBuffer toSerializedValues(List<Fragment> items, 
TableMetadatas tables, int start, int end, Version version)
+    @Override
+    public void failCondition()
     {
-        long size = TypeSizes.sizeofUnsignedVInt(version.version) + 
TypeSizes.sizeofUnsignedVInt(end - start);
-        for (int i = start ; i < end ; ++i)
-            size += Fragment.serializer.serializedSize(items.get(i), tables, 
version);
+        anyConditionResult = FALSE;
+    }
 
-        try (DataOutputBuffer out = new DataOutputBuffer((int) size))
-        {
-            out.writeUnsignedVInt32(version.version);
-            out.writeUnsignedVInt32(end - start);
-            for (int i = start ; i < end ; ++i)
-                Fragment.serializer.serialize(items.get(i), tables, out, 
version);
-            return out.buffer(false);
-        }
-        catch (IOException e)
+    @Override
+    public boolean checkAnyConditionMatch(Data data)
+    {
+        // Assert data that was memoized is same as data that is provided?
+        if (anyConditionResult != null)
+            return anyConditionResult;
+            
+        // Check if any block has a matching condition
+        for (Block block : blocks)
         {
-            throw new RuntimeException(e);
+            for (ConditionalBlock conditionalBlock : block.conditionalBlocks)
+            {
+                if (checkCondition(data, conditionalBlock.condition))
+                    return anyConditionResult = true;
+            }
         }
+        return anyConditionResult = false;
     }
 
-    private static List<Fragment> deserialize(PartitionKey key, TableMetadatas 
tables, ByteBuffer bytes)
+    @Nullable
+    private Pair<List<TxnWrite.Update>, SimpleBitSet> 
processCondition(Timestamp executeAt, Data data)
     {
-        if (!bytes.hasRemaining())
-            return Collections.emptyList();
-
-        try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+        int numConditionalBlocks = numConditionalBlocks();
+        SimpleBitSet conditionalBlocksMatched = 
SimpleBitSet.allocate(numConditionalBlocks);
+        List<Fragment> fragments = null;
+        // Each block is executed indepdendently so a match in one block has 
no effect on another block,
+        // this is done this way to support conditional with unconditional 
writes, and multiple IF/END IF blocks
+        for (Block block : blocks)
         {
-            Version version = Version.fromVersion(in.readUnsignedVInt32());
-            int count = in.readUnsignedVInt32();
-            switch (count)
+            // This loop needs to support the expected semantics of IF/ELSE 
IF/ELSE blocks;
+            // first condition that is true is the only one that applies.
+            for (ConditionalBlock conditionalBlock : block.conditionalBlocks)
             {
-                case 0: throw new IllegalStateException();
-                case 1: return 
Collections.singletonList(Fragment.serializer.deserialize(key, tables, in, 
version));
-                default:
-                    List<Fragment> result = new ArrayList<>();
-                    for (int i = 0 ; i < count ; ++i)
-                        result.add(Fragment.serializer.deserialize(key, 
tables, in, version));
-                    return result;
+                if (checkCondition(data, conditionalBlock.condition))
+                {
+                    conditionalBlocksMatched.set(conditionalBlock.id);
+                    if (fragments == null) fragments = new ArrayList<>();
+                    fragments.addAll(deserialize(tables, block, 
conditionalBlock.fragmentIds));
+                    break;
+                }
             }
         }
-        catch (IOException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
+        if (fragments == null) return null;
 
-    private static List<Fragment> deserialize(Keys keys, TableMetadatas 
tables, ByteBuffer[] buffers)
-    {
-        Invariants.require(keys.size() == buffers.length);
-        List<Fragment> result = new ArrayList<>(buffers.length);
-        for (int i = 0 ; i < keys.size() ; ++i)
-            result.addAll(deserialize((PartitionKey) keys.get(i), tables, 
buffers[i]));
-        return result;
-    }
+        List<TxnWrite.Update> allUpdates = new ArrayList<>(fragments.size());
+        QueryOptions options = 
QueryOptions.forProtocolVersion(ProtocolVersion.CURRENT);
+        AccordUpdateParameters parameters = new 
AccordUpdateParameters((TxnData) data, options, executeAt.uniqueHlc());
 
-    @Override
-    public void failCondition()
-    {
-        conditionResult = FALSE;
+        for (Fragment fragment : fragments)
+            if (!fragment.isComplete())
+                allUpdates.add(fragment.complete(parameters, tables));
+        return Pair.create(allUpdates, conditionalBlocksMatched);
     }
 
-    @Override
-    public boolean checkCondition(Data data)
+    private int numConditionalBlocks()
     {
-        // Assert data that was memoized is same as data that is provided?
-        if (conditionResult != null)
-            return conditionResult;
-        TxnCondition condition = this.condition.deserialize(tables);
-        if (condition == TxnCondition.none())
-            return conditionResult = true;
-        return conditionResult = condition.applies((TxnData) data);
+        int numConditionalBlocks = 0;
+        for (Block block : blocks)
+            numConditionalBlocks += block.conditionalBlocks.length;
+        return numConditionalBlocks;
     }
 
     @Override
@@ -425,6 +868,6 @@ public class TxnUpdate extends AccordUpdate
     @VisibleForTesting
     public void unsafeResetCondition()
     {
-        conditionResult = null;
+        anyConditionResult = null;
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index e8e70d7be3..26af2aaaaf 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -42,6 +42,8 @@ import accord.primitives.Seekable;
 import accord.primitives.Seekables;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
+import accord.utils.SimpleBitSet;
+import accord.utils.SimpleBitSets;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.cql3.UpdateParameters;
@@ -69,6 +71,7 @@ import 
org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.utils.BooleanSerializer;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.SimpleBitSetSerializers;
 
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.cassandra.db.rows.DeserializationHelper.Flag.FROM_REMOTE;
@@ -84,9 +87,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(TxnWrite.class);
 
-    public static final TxnWrite EMPTY_CONDITION_FAILED = new 
TxnWrite(TableMetadatas.none(), Collections.emptyList(), false);
-
-    private static final long EMPTY_SIZE = 
ObjectSizes.measure(EMPTY_CONDITION_FAILED);
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
TxnWrite(TableMetadatas.none(), Collections.emptyList(), 
SimpleBitSets.allUnset(1)));
 
     public static class Update extends 
AbstractParameterisedVersionedSerialized<PartitionUpdate, TableMetadatas>
     {
@@ -398,24 +399,54 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
                 size += TypeSizes.sizeofUnsignedVInt(fragment.timestamp);
                 return size;
             }
+
+            public static ByteBuffer serialize(Fragment fragment, 
TableMetadatas tables, Version version)
+            {
+                long size = TypeSizes.sizeofUnsignedVInt(version.version);
+                size += Fragment.serializer.serializedSize(fragment, tables, 
version);
+
+                try (DataOutputBuffer out = new DataOutputBuffer((int) size))
+                {
+                    out.writeUnsignedVInt32(version.version);
+                    Fragment.serializer.serialize(fragment, tables, out, 
version);
+                    return out.buffer(false);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            private static Fragment deserialize(PartitionKey key, 
TableMetadatas tables, ByteBuffer bytes)
+            {
+                try (DataInputBuffer in = new DataInputBuffer(bytes, true))
+                {
+                    Version version = 
Version.fromVersion(in.readUnsignedVInt32());
+                    return Fragment.serializer.deserialize(key, tables, in, 
version);
+                }
+                catch (IOException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }
 
     public final TableMetadatas tables;
-    private final boolean isConditionMet;
+    private final SimpleBitSet conditionalBlockBitSet;
 
-    private TxnWrite(TableMetadatas tables, Update[] items, boolean 
isConditionMet)
+    private TxnWrite(TableMetadatas tables, Update[] items, SimpleBitSet 
conditionalBlockBitSet)
     {
         super(items, Domain.Key);
         this.tables = tables;
-        this.isConditionMet = isConditionMet;
+        this.conditionalBlockBitSet = conditionalBlockBitSet;
     }
 
-    public TxnWrite(TableMetadatas tables, List<Update> items, boolean 
isConditionMet)
+    public TxnWrite(TableMetadatas tables, List<Update> items, SimpleBitSet 
conditionalBlockBitSet)
     {
         super(items, Domain.Key);
         this.tables = tables;
-        this.isConditionMet = isConditionMet;
+        this.conditionalBlockBitSet = conditionalBlockBitSet;
     }
 
     @Override
@@ -461,7 +492,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
 
         // TODO (expected): optimise for the common single update case; lots 
of lists allocated
         List<AsyncChain<Void>> results = new ArrayList<>();
-        if (isConditionMet)
+        if (!conditionalBlockBitSet.isEmpty())
         {
             AccordExecutor executor = ((AccordCommandStore) 
commandStore).executor();
             boolean preserveTimestamps = 
txnUpdate.preserveTimestamps().preserve;
@@ -471,7 +502,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
             // Apply updates that are fully specified by the client and not 
reliant on data from reads.
             // ex. INSERT INTO tbl (a, b, c) VALUES (1, 2, 3)
             // These updates are persisted only in TxnUpdate and not in 
TxnWrite to avoid duplication.
-            List<Update> updates = 
txnUpdate.completeUpdatesForKey((RoutableKey) key);
+            List<Update> updates = 
txnUpdate.completeUpdatesForKey(conditionalBlockBitSet, (RoutableKey) key);
             updates.forEach(write -> results.add(write.write(executor, tables, 
preserveTimestamps, timestamp)));
         }
 
@@ -498,7 +529,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         public void serialize(TxnWrite write, Seekables keys, DataOutputPlus 
out, Version version) throws IOException
         {
             write.tables.serializeSelf(out);
-            BooleanSerializer.serializer.serialize(write.isConditionMet, out);
+            
SimpleBitSetSerializers.any.serialize(write.conditionalBlockBitSet, out);
             serializeArray(write.items, new 
TableMetadatasAndKeys(write.tables, keys), out, version, Update.serializer);
         }
 
@@ -506,8 +537,8 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         public TxnWrite deserialize(Seekables keys, DataInputPlus in, Version 
version) throws IOException
         {
             TableMetadatas tables = TableMetadatas.deserializeSelf(in);
-            boolean isConditionMet = 
BooleanSerializer.serializer.deserialize(in);
-            return new TxnWrite(tables, deserializeArray(new 
TableMetadatasAndKeys(tables, keys), in, version, Update.serializer, 
Update[]::new), isConditionMet);
+            SimpleBitSet conditionalBlockBitSet = 
SimpleBitSetSerializers.any.deserialize(in);
+            return new TxnWrite(tables, deserializeArray(new 
TableMetadatasAndKeys(tables, keys), in, version, Update.serializer, 
Update[]::new), conditionalBlockBitSet);
         }
 
         @Override
@@ -522,7 +553,7 @@ public class TxnWrite extends 
AbstractKeySorted<TxnWrite.Update> implements Writ
         public long serializedSize(TxnWrite write, Seekables keys, Version 
version)
         {
             return write.tables.serializedSelfSize()
-                   + 
BooleanSerializer.serializer.serializedSize(write.isConditionMet)
+                   + 
SimpleBitSetSerializers.any.serializedSize(write.conditionalBlockBitSet)
                    + serializedArraySize(write.items, new 
TableMetadatasAndKeys(write.tables, keys), version, Update.serializer);
         }
     };
diff --git a/src/java/org/apache/cassandra/utils/ArraySerializers.java 
b/src/java/org/apache/cassandra/utils/ArraySerializers.java
index 7ca6afa8a7..db2cd28248 100644
--- a/src/java/org/apache/cassandra/utils/ArraySerializers.java
+++ b/src/java/org/apache/cassandra/utils/ArraySerializers.java
@@ -23,6 +23,7 @@ import java.util.function.IntFunction;
 
 import org.apache.cassandra.io.AsymmetricVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.ParameterisedUnversionedSerializer;
 import org.apache.cassandra.io.ParameterisedVersionedSerializer;
 import org.apache.cassandra.io.UnversionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -60,6 +61,13 @@ public class ArraySerializers
             serializer.serialize(item, p, out, version);
     }
 
+    public static <T, P> void serializeArray(T[] items, P p, DataOutputPlus 
out, ParameterisedUnversionedSerializer<T, P> serializer) throws IOException
+    {
+        out.writeUnsignedVInt32(items.length);
+        for (T item : items)
+            serializer.serialize(item, p, out);
+    }
+
     public static <T> T[] deserializeArray(DataInputPlus in, 
UnversionedSerializer<T> serializer, IntFunction<T[]> arrayFactory) throws 
IOException
     {
         int size = in.readUnsignedVInt32();
@@ -118,6 +126,22 @@ public class ArraySerializers
             serializer.skip(p, in, version);
     }
 
+    public static <T, P> T[] deserializeArray(P p, DataInputPlus in, 
ParameterisedUnversionedSerializer<T, P> serializer, IntFunction<T[]> 
arrayFactory) throws IOException
+    {
+        int size = in.readUnsignedVInt32();
+        T[] items = arrayFactory.apply(size);
+        for (int i = 0; i < size; i++)
+            items[i] = serializer.deserialize(p, in);
+        return items;
+    }
+
+    public static <T, P> void skipArray(P p, DataInputPlus in, 
ParameterisedUnversionedSerializer<T, P> serializer) throws IOException
+    {
+        int size = in.readUnsignedVInt32();
+        for (int i = 0; i < size; i++)
+            serializer.skip(p, in);
+    }
+
     public static <T> long serializedArraySize(T[] array, 
UnversionedSerializer<T> serializer)
     {
         long size = sizeofUnsignedVInt(array.length);
@@ -149,4 +173,12 @@ public class ArraySerializers
             size += serializer.serializedSize(item, p, version);
         return size;
     }
+
+    public static <T, P> long serializedArraySize(T[] array, P p, 
ParameterisedUnversionedSerializer<T, P> serializer)
+    {
+        long size = sizeofUnsignedVInt(array.length);
+        for (T item : array)
+            size += serializer.serializedSize(item, p);
+        return size;
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java 
b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
index d4718096a3..b2efd717ab 100644
--- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
+++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
@@ -580,7 +580,7 @@ public class CollectionSerializers
         return result;
     }
 
-    private static <V, P, C extends Collection<? super V>, Version> C 
deserializeCollection(P p, DataInputPlus in, 
AsymmetricParameterisedUnversionedSerializer<?, P, V> serializer, 
IntFunction<C> factory) throws IOException
+    private static <V, P, C extends Collection<? super V>> C 
deserializeCollection(P p, DataInputPlus in, 
AsymmetricParameterisedUnversionedSerializer<?, P, V> serializer, 
IntFunction<C> factory) throws IOException
     {
         int size = in.readUnsignedVInt32();
         C result = factory.apply(size);
diff --git a/test/unit/org/apache/cassandra/io/Serializers.java 
b/test/unit/org/apache/cassandra/io/Serializers.java
index e9e7f04c7b..34da9e4b35 100644
--- a/test/unit/org/apache/cassandra/io/Serializers.java
+++ b/test/unit/org/apache/cassandra/io/Serializers.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.io;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.function.BiConsumer;
 
 import accord.utils.LazyToString;
 import accord.utils.ReflectionUtils;
@@ -33,6 +34,13 @@ public class Serializers
     // @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" }) 
DataOutputBuffer output = new DataOutputBuffer();
 
     public static <T> void testSerde(DataOutputBuffer output, 
AsymmetricUnversionedSerializer<T, T> serializer, T input) throws IOException
+    {
+        testSerde(output, serializer, input, (actual, expected) -> 
Assertions.assertThat(actual)
+                                                                             
.describedAs("The deserialized output does not match the serialized input; 
difference %s", new LazyToString(() -> ReflectionUtils.recursiveEquals(actual, 
input).toString()))
+                                                                             
.isEqualTo(expected));
+    }
+
+    public static <T> void testSerde(DataOutputBuffer output, 
AsymmetricUnversionedSerializer<T, T> serializer, T input, BiConsumer<T, T> 
testEqual) throws IOException
     {
         output.clear();
         long expectedSize = serializer.serializedSize(input);
@@ -41,7 +49,7 @@ public class Serializers
         ByteBuffer buffer = output.unsafeGetBufferAndFlip();
         DataInputBuffer in = new DataInputBuffer(buffer, false);
         T read = serializer.deserialize(in);
-        Assertions.assertThat(read).describedAs("The deserialized output does 
not match the serialized input; difference %s", new LazyToString(() -> 
ReflectionUtils.recursiveEquals(read, input).toString())).isEqualTo(input);
+        testEqual.accept(read, input);
         Assertions.assertThat(buffer.remaining()).describedAs("deserialize did 
not consume all the serialized input").isEqualTo(0);
         buffer.flip();
         buffer.mark();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 786b38f7ed..7a348d6c45 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -99,6 +99,7 @@ import accord.utils.AccordGens;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
+import accord.utils.SimpleBitSets;
 import accord.utils.SortedArrays;
 import accord.utils.UnhandledEnum;
 import accord.utils.async.AsyncChain;
@@ -204,7 +205,7 @@ public class CommandsForKeySerializerTest
             if (saveStatus.known.outcome() == Known.Outcome.Apply)
             {
                 if (txnId.is(Kind.Write))
-                    builder.writes(new Writes(txnId, executeAt, txn.keys(), 
new TxnWrite(TableMetadatas.none(), Collections.emptyList(), true)));
+                    builder.writes(new Writes(txnId, executeAt, txn.keys(), 
new TxnWrite(TableMetadatas.none(), Collections.emptyList(), 
SimpleBitSets.allSet(1))));
                 builder.result(new TxnData());
             }
             return builder;
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/SerializePackedTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/SerializePackedTest.java
new file mode 100644
index 0000000000..2c355139e6
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/SerializePackedTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.serializers;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import accord.utils.Gens;
+import org.junit.Test;
+
+import accord.utils.Gen;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.UnversionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+public class SerializePackedTest
+{
+    private static final Gen<int[]> zeros = rs -> new int[rs.nextInt(0, 10)];
+    private static final Gen<int[]> randomZeroAndPositive = array(rs -> 
rs.nextInt(-1, Integer.MAX_VALUE) + 1);
+    private static final Gen<int[]> randomZeroAndPositiveSmall = array(rs -> 
rs.nextInt(0, 1 << 8));
+    private static final Gen<int[]> negatives = 
array(Gens.ints().between(Integer.MIN_VALUE, -1), rs -> rs.nextInt(1, 10));
+
+    private static final Gen<int[]> monotonic = rs -> {
+        int[] array = new int[rs.nextInt(0, 10)];
+        for (int i = 0; i < array.length; i++)
+            array[i] = i;
+        return array;
+    };
+
+    private static final Gen<int[]> zeroAndPositive = rs -> {
+        if (rs.decide(0.2f)) return monotonic.next(rs);
+        return randomZeroAndPositive.next(rs);
+    };
+
+    private static final Gen<int[]> zeroAndPositiveSmall = rs -> {
+        if (rs.decide(0.2f)) return monotonic.next(rs);
+        return randomZeroAndPositiveSmall.next(rs);
+    };
+
+    private static Gen<int[]> array(Gen.IntGen valueGen)
+    {
+        return array(valueGen, rs -> rs.nextInt(0, 10));
+    }
+
+    private static Gen<int[]> array(Gen.IntGen valueGen, Gen.IntGen size)
+    {
+        return rs -> {
+            int[] array = new int[size.nextInt(rs)];
+            for (int i = 0; i < array.length; i++)
+                array[i] = valueGen.nextInt(rs);
+            Arrays.sort(array);
+            return array;
+        };
+    }
+
+    @Test
+    public void serde()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(zeroAndPositive).check(array -> 
Serializers.testSerde(output, PackedSortedSerializer.instance, array));
+    }
+
+    @Test
+    public void serdeNegative()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(negatives).check(array -> {
+            output.clear();
+            Assertions.assertThatThrownBy(() -> 
PackedSortedSerializer.instance.serialize(array, output))
+                    .isInstanceOf(IllegalStateException.class)
+                    .hasMessageContaining("Found a negative value at offset");
+        });
+    }
+
+    @Test
+    public void serdeZeros()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(zeros).check(array -> Serializers.testSerde(output, 
PackedSortedSerializer.instance, array));
+    }
+
+    @Test
+    public void serializerIsSmallerThanSimpleList()
+    {
+        qt().forAll(zeroAndPositiveSmall).check(array -> {
+            var list = SimpleListSerializer.instance.serialize(array);
+            var packed = PackedSortedSerializer.instance.serialize(array);
+
+            
Assertions.assertThat(packed.remaining()).isLessThanOrEqualTo(list.remaining());
+        });
+    }
+
+    public static class PackedSortedSerializer implements 
UnversionedSerializer<int[]>
+    {
+        public static final PackedSortedSerializer instance = new 
PackedSortedSerializer();
+
+        @Override
+        public void serialize(int[] t, DataOutputPlus out) throws IOException
+        {
+            SerializePacked.serializePackedSortedIntsAndLength(t, out);
+        }
+
+        @Override
+        public int[] deserialize(DataInputPlus in) throws IOException
+        {
+            return SerializePacked.deserializePackedSortedIntsAndLength(in);
+        }
+
+        @Override
+        public long serializedSize(int[] t)
+        {
+            return 
SerializePacked.serializedSizeOfPackedSortedIntsAndLength(t);
+        }
+    }
+
+    public static class SimpleListSerializer implements 
UnversionedSerializer<int[]>
+    {
+        public static final SimpleListSerializer instance = new 
SimpleListSerializer();
+
+        @Override
+        public void serialize(int[] t, DataOutputPlus out) throws IOException
+        {
+            out.writeUnsignedVInt32(t.length);
+            for (int i : t)
+                out.writeVInt32(i);
+        }
+
+        @Override
+        public int[] deserialize(DataInputPlus in) throws IOException
+        {
+            int size = in.readUnsignedVInt32();
+            int[] array = new int[size];
+            for (int i = 0; i < size; i++)
+                array[i] = in.readVInt32();
+            return array;
+        }
+
+        @Override
+        public long serializedSize(int[] t)
+        {
+            long size = TypeSizes.sizeofUnsignedVInt(t.length);
+            for (int i = 0; i < t.length; i++)
+                size += TypeSizes.sizeofVInt(t[i]);
+            return size;
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/service/accord/txn/TxnUpdateTest.java 
b/test/unit/org/apache/cassandra/service/accord/txn/TxnUpdateTest.java
new file mode 100644
index 0000000000..9edf8ea089
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/txn/TxnUpdateTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.txn;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import org.junit.Test;
+
+import accord.api.Key;
+import accord.primitives.Keys;
+import accord.primitives.Ranges;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.SortedArrays;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.PreserveTimestamp;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.service.accord.serializers.TableMetadatas;
+import org.apache.cassandra.service.accord.serializers.TableMetadatasAndKeys;
+import 
org.apache.cassandra.service.accord.txn.TxnCondition.SerializedTxnCondition;
+import org.apache.cassandra.service.accord.txn.TxnUpdate.Block;
+import org.apache.cassandra.service.accord.txn.TxnUpdate.ConditionalBlock;
+import org.apache.cassandra.service.accord.txn.TxnWrite.Fragment;
+import org.apache.cassandra.utils.AccordGenerators;
+import org.apache.cassandra.utils.CassandraGenerators;
+import org.apache.cassandra.utils.Generators;
+
+import static accord.utils.Property.qt;
+import static accord.utils.SortedArrays.Search.FAST;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TxnUpdateTest
+{
+    private static final LongToken T0 = new LongToken(0);
+    private static final LongToken T42 = new LongToken(42);
+
+    static
+    {
+        DatabaseDescriptor.clientInitialization();
+        DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+    }
+
+    private static final Gen<ByteBuffer> bytesGen = 
Generators.toGen(Generators.bytes(0, 20));
+    private static final Gen<List<TableId>> uniqueIds = 
Gens.lists(Generators.toGen(CassandraGenerators.TABLE_ID_GEN)).unique().ofSizeBetween(1,
 3);
+    private static final Gen<List<TableMetadata>> tablesGen = 
uniqueIds.map(ids -> {
+        List<TableMetadata> tables = new ArrayList<>();
+        for (int i = 0; i < ids.size(); i++)
+        {
+            tables.add(TableMetadata.builder("ks", "tbl" + i, ids.get(i))
+                                    .addPartitionKeyColumn("key", 
BytesType.instance)
+                                    .partitioner(Murmur3Partitioner.instance)
+                                    .build());
+        }
+        return tables;
+    });
+
+    @Test
+    public void conditionalBlockSerde()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(conditionalBlock()).check(expected -> 
Serializers.testSerde(output, ConditionalBlock.serializer, expected));
+    }
+
+    @Test
+    public void blockSerde()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(block()).check(expected -> {
+            TableMetadatasAndKeys.KeyCollector collector = new 
TableMetadatasAndKeys.KeyCollector(TableMetadatas.none());
+            for (TxnUpdate.BlockFragment fragment : expected.fragments)
+                collector.add(fragment.key);
+            Serializers.testSerde(output, Block.serializer, expected, 
collector.buildTablesAndKeys());
+        });
+    }
+
+    @Test
+    public void slice()
+    {
+        qt().check(rs -> {
+            List<TableMetadata> tables = tablesGen.next(rs);
+            TableMetadatas metadatas = TableMetadatas.of(tables);
+            List<Fragment> fragments = 
Gens.lists(fragment(tables)).ofSizeBetween(1, 10).next(rs);
+            TxnUpdate update = new TxnUpdate(metadatas, fragments, 
TxnCondition.none(), null, PreserveTimestamp.no);
+
+            // ask for ranges outside the update; should be empty
+            for (var block : 
update.slice(Ranges.single(TokenRange.create(TableId.UNDEFINED, T0, 
T42))).blocks)
+            {
+                assertThat(block.fragments).isEmpty();
+                for (var cb : block.conditionalBlocks)
+                    assertThat(cb.fragmentIds).isEmpty();
+            }
+
+            // slice the same key should return the same block
+            TxnUpdate noUpdate = update.getTxnUpdate(k -> 
k.intersecting(update.keys()));
+            for (int i = 0; i < update.blocks.size(); i++)
+                
assertThat(noUpdate.blocks.get(i)).isSameAs(update.blocks.get(i));
+
+            // slicing a single key yields a single key
+            if (update.keys().size() == 1) return;
+            int keyIndex = rs.nextInt(0, update.keys().size());
+            Key key = update.keys().get(keyIndex);
+            Keys singleKey = Keys.of(key);
+            TxnUpdate singleKeyUpdate = update.getTxnUpdate(k -> 
k.intersecting(singleKey));
+            for (int i = 0; i < update.blocks.size(); i++)
+            {
+                var block = singleKeyUpdate.blocks.get(i);
+                
assertThat(block.fragments).hasSize((int)fragments.stream().filter(f -> 
f.key.equals(key)).count());
+                for (ConditionalBlock conditionalBlock : 
block.conditionalBlocks)
+                {
+                    for (int fragmentId : conditionalBlock.fragmentIds)
+                    {
+                        int fragmentIndex = 
SortedArrays.binarySearch(block.fragments, 0, block.fragments.length, 
fragmentId, (id, bf) -> Integer.compare(id, bf.id), FAST);
+                        assertThat(fragmentIndex >= 0).isTrue();
+                        
assertThat(block.fragments[fragmentIndex].key).isEqualTo(key);
+                    }
+                }
+            }
+        });
+    }
+
+    @Test
+    public void merge()
+    {
+        qt().check(rs -> {
+            List<TableMetadata> tables = tablesGen.next(rs);
+            TableMetadatas metadatas = TableMetadatas.of(tables);
+            List<Fragment> fragments = 
Gens.lists(fragment(tables)).ofSizeBetween(1, 10).next(rs);
+            TxnUpdate update = new TxnUpdate(metadatas, fragments, 
TxnCondition.none(), null, PreserveTimestamp.no);
+            TxnUpdate emptyUpdate = 
update.slice(Ranges.single(TokenRange.create(TableId.UNDEFINED, T0, T42)));
+            List<TxnUpdate> perKeyUpdate = new 
ArrayList<>(update.keys().size());
+            for (int i = 0; i < update.keys().size(); i++)
+            {
+                int finalI = i;
+                perKeyUpdate.add(update.getTxnUpdate(k -> 
k.intersecting(Keys.of(update.keys().get(finalI)))));
+            }
+
+            assertThat(update.merge(update)).isEqualTo(update); // merge with 
self produces self
+            assertThat(emptyUpdate.merge(emptyUpdate)).isEqualTo(emptyUpdate); 
// merge
+
+            // empty with full is commutative
+            assertThat(update.merge(emptyUpdate)).isEqualTo(update);
+            assertThat(emptyUpdate.merge(update)).isEqualTo(update);
+
+            // merge per key is commutative
+            TxnUpdate accum = emptyUpdate;
+            for (TxnUpdate other : perKeyUpdate)
+                accum = accum.merge(other);
+            assertThat(accum).isEqualTo(update);
+
+            accum = emptyUpdate;
+            Collections.reverse(perKeyUpdate);
+            for (TxnUpdate other : perKeyUpdate)
+                accum = accum.merge(other);
+            assertThat(accum).isEqualTo(update);
+        });
+    }
+
+    private static Gen<Fragment> fragment(List<TableMetadata> tables)
+    {
+        return rs -> {
+            var metadata = rs.pick(tables);
+            var pk = bytesGen.next(rs);
+            DecoratedKey key = metadata.partitioner.decorateKey(pk);
+
+            PartitionUpdate update = PartitionUpdate.emptyUpdate(metadata, 
key);
+
+            return new Fragment(new PartitionKey(metadata.id, key), 
rs.nextInt(0, Integer.MAX_VALUE), update, TxnReferenceOperations.empty(), 
rs.nextLong(1, Long.MAX_VALUE));
+        };
+    }
+
+    private static Gen<ConditionalBlock> conditionalBlock()
+    {
+        Gen<SerializedTxnCondition> serializedTxnConditionGen = 
serializedTxnCondition();
+        Gen<int[]> fragmentsGen = Gens.arrays(Gens.ints().between(0, 
Integer.MAX_VALUE)).ofSizeBetween(0, 10).map(vs -> { Arrays.sort(vs); return 
vs; });
+        return rs -> {
+            int id = rs.nextInt(-1, Integer.MAX_VALUE) + 1;
+            SerializedTxnCondition condition = 
serializedTxnConditionGen.next(rs);
+            int[] fragments = fragmentsGen.next(rs);
+            return new ConditionalBlock(id, condition, fragments);
+        };
+    }
+
+    private static Gen<SerializedTxnCondition> serializedTxnCondition()
+    {
+        Gen<ByteBuffer> bytesGen = 
TxnUpdateTest.bytesGen.filter(ByteBuffer::hasRemaining);
+        return rs -> new SerializedTxnCondition(bytesGen.next(rs));
+    }
+
+    private static Gen<Block> block()
+    {
+        // can't have a empty block
+        Gen<ByteBuffer[]> bytesGen = Gens.arrays(ByteBuffer.class, 
TxnUpdateTest.bytesGen.filter(ByteBuffer::hasRemaining))
+                                                          .ofSizeBetween(0, 
10);
+        var conditionGen = Gens.arrays(ConditionalBlock.class, 
conditionalBlock()).ofSizeBetween(1, 10);
+        Gen<Key> keyGen = (Gen<Key>) (Gen<?>) 
AccordGenerators.keys(Murmur3Partitioner.instance);
+        return rs -> {
+            ByteBuffer[] bbs = bytesGen.next(rs);
+            Key[] keys = IntStream.range(0, bbs.length).mapToObj(i -> 
keyGen.next(rs)).toArray(Key[]::new);
+            int[] ids = IntStream.range(0, bbs.length).toArray();
+            Arrays.sort(ids);
+            Arrays.sort(keys);
+            TxnUpdate.BlockFragment[] fragments = new 
TxnUpdate.BlockFragment[bbs.length];
+            for (int i = 0 ; i < fragments.length ; ++i)
+                fragments[i] = new TxnUpdate.BlockFragment(ids[i], 
(PartitionKey) keys[i], bbs[i]);
+            return new Block(fragments, conditionGen.next(rs));
+        };
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 3b4fad9a0f..61696303d1 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -69,6 +69,7 @@ import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
 import accord.utils.ReducingRangeMap;
+import accord.utils.SimpleBitSets;
 import accord.utils.SortedArrays.SortedArrayList;
 import accord.utils.TinyEnumSet;
 import accord.utils.TriFunction;
@@ -290,7 +291,7 @@ public class AccordGenerators
             if (saveStatus.hasBeen(Status.PreApplied) && 
!saveStatus.hasBeen(Status.Truncated))
             {
                 if (txnId.is(Write))
-                    builder.writes(new Writes(txnId, executeAt, keysOrRanges, 
new TxnWrite(TableMetadatas.none(), Collections.emptyList(), true)));
+                    builder.writes(new Writes(txnId, executeAt, keysOrRanges, 
new TxnWrite(TableMetadatas.none(), Collections.emptyList(), 
SimpleBitSets.allSet(1))));
                 builder.result(new TxnData());
             }
             return builder;
@@ -345,8 +346,8 @@ public class AccordGenerators
                     else return Truncated.truncated(command, saveStatus, 
executeAt, null, null, null, null);
 
                 case TruncatedApplyWithOutcome:
-                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(command, saveStatus, executeAt, command.partialDeps(), 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new 
TxnWrite(TableMetadatas.none(), Collections.emptyList(), true)) : null, new 
TxnData(), txnId);
-                    else return Truncated.truncated(command, saveStatus, 
executeAt, command.partialDeps(), txnId.is(Write) ? new Writes(txnId, 
executeAt, keysOrRanges, new TxnWrite(TableMetadatas.none(), 
Collections.emptyList(), true)) : null, new TxnData(), null);
+                    if (txnId.kind().awaitsOnlyDeps()) return 
Truncated.truncated(command, saveStatus, executeAt, command.partialDeps(), 
txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new 
TxnWrite(TableMetadatas.none(), Collections.emptyList(), 
SimpleBitSets.allSet(1))) : null, new TxnData(), txnId);
+                    else return Truncated.truncated(command, saveStatus, 
executeAt, command.partialDeps(), txnId.is(Write) ? new Writes(txnId, 
executeAt, keysOrRanges, new TxnWrite(TableMetadatas.none(), 
Collections.emptyList(), SimpleBitSets.allSet(1))) : null, new TxnData(), null);
 
                 case Erased:
                 case Vestigial:
diff --git 
a/test/unit/org/apache/cassandra/utils/LargeBitSetSerializerTest.java 
b/test/unit/org/apache/cassandra/utils/LargeBitSetSerializerTest.java
deleted file mode 100644
index c32b69a5c2..0000000000
--- a/test/unit/org/apache/cassandra/utils/LargeBitSetSerializerTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.utils;
-
-import accord.utils.Gen;
-import accord.utils.LargeBitSet;
-import org.apache.cassandra.io.Serializers;
-import org.apache.cassandra.io.util.DataOutputBuffer;
-import org.junit.Test;
-
-import static accord.utils.Property.qt;
-
-public class LargeBitSetSerializerTest
-{
-    @Test
-    public void test()
-    {
-        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
-        qt().forAll(largeBitSetGen()).check(bits -> 
Serializers.testSerde(output, SimpleBitSetSerializers.large, bits));
-    }
-
-    private static Gen<LargeBitSet> largeBitSetGen()
-    {
-        return rs -> {
-            int size = rs.nextInt(0, 1 << 10);
-            LargeBitSet bitSet = new LargeBitSet(size);
-            if (size == 0 || rs.decide(0.2))
-                return bitSet; // empty
-            if (rs.decide(0.2))
-            {
-                // set 1 bit randomly
-                bitSet.set(rs.nextInt(0, size));
-                return bitSet;
-            }
-            for (int i = 0; i < size; i++)
-            {
-                if (rs.nextBoolean())
-                    bitSet.set(i);
-            }
-            return bitSet;
-        };
-    }
-}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/utils/SimpleBitSetSerializersTest.java 
b/test/unit/org/apache/cassandra/utils/SimpleBitSetSerializersTest.java
new file mode 100644
index 0000000000..0ca2ff5e81
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/SimpleBitSetSerializersTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.junit.Test;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.LargeBitSet;
+import accord.utils.SimpleBitSet;
+import accord.utils.SmallBitSet;
+import org.apache.cassandra.io.Serializers;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+public class SimpleBitSetSerializersTest
+{
+    @Test
+    public void small()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(smallGen()).check(bits -> Serializers.testSerde(output, 
SimpleBitSetSerializers.small, bits));
+    }
+
+    @Test
+    public void large()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(largeGen()).check(bits -> Serializers.testSerde(output, 
SimpleBitSetSerializers.large, bits));
+    }
+
+    @Test
+    public void any()
+    {
+        @SuppressWarnings({ "resource", "IOResourceOpenedButNotSafelyClosed" 
}) DataOutputBuffer output = new DataOutputBuffer();
+        qt().forAll(anyGen()).check(bits -> {
+            Serializers.testSerde(output, SimpleBitSetSerializers.any, bits, 
(actual, expected) -> {
+                if (actual.getClass() == expected.getClass())
+                {
+                    Assertions.assertThat(actual)
+                              .describedAs("The deserialized output does not 
match the serialized input")
+                              .isEqualTo(expected);
+                }
+                else
+                {
+                    // large can become small when deserialize
+                    
Assertions.assertThat(expected.getClass()).isEqualTo(LargeBitSet.class);
+                    
Assertions.assertThat(actual.getClass()).isEqualTo(SmallBitSet.class);
+
+                    
Assertions.assertThat(actual.nextSetBit(0)).isEqualTo(expected.nextSetBit(0));
+
+                    for (int i = actual.nextSetBit(0); i >= 0;)
+                    {
+                        Assertions.assertThat(actual.nextSetBit(i + 1))
+                                  .describedAs("Difference searching for next 
bit from %s", (i + 1))
+                                  .isEqualTo(expected.nextSetBit(i + 1));
+                        i = actual.nextSetBit(i + 1);
+                    }
+                }
+            });
+        });
+    }
+
+    private static Gen<SimpleBitSet> anyGen()
+    {
+        return rs -> rs.nextBoolean() ? smallGen().next(rs) : 
largeGen().next(rs);
+    }
+
+    private static Gen<SmallBitSet> smallGen()
+    {
+        return Gens.longs().all().map(SmallBitSet::new);
+    }
+
+    private static Gen<LargeBitSet> largeGen()
+    {
+        return rs -> {
+            int size = rs.nextInt(0, 1 << 10);
+            LargeBitSet bitSet = new LargeBitSet(size);
+            if (size == 0 || rs.decide(0.2))
+                return bitSet; // empty
+            if (rs.decide(0.2))
+            {
+                // set 1 bit randomly
+                bitSet.set(rs.nextInt(0, size));
+                return bitSet;
+            }
+            for (int i = 0; i < size; i++)
+            {
+                if (rs.nextBoolean())
+                    bitSet.set(i);
+            }
+            return bitSet;
+        };
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to