frankgh commented on code in PR #4789:
URL: https://github.com/apache/cassandra/pull/4789#discussion_r3229698449
##########
src/java/org/apache/cassandra/db/ReadCommand.java:
##########
@@ -436,14 +436,26 @@ public ReadCommand copyAsDigestQuery(Iterable<Replica>
replicas)
public abstract boolean isReversed();
public ReadResponse createResponse(UnfilteredPartitionIterator iterator,
RepairedDataInfo rdi)
+ {
+ return createResponse(iterator, rdi, false);
+ }
+ public ReadResponse createResponse(UnfilteredPartitionIterator iterator,
RepairedDataInfo rdi, boolean localRead)
Review Comment:
NIT line break?
```suggestion
public ReadResponse createResponse(UnfilteredPartitionIterator iterator,
RepairedDataInfo rdi, boolean localRead)
```
##########
test/unit/org/apache/cassandra/db/ReadResponseTest.java:
##########
@@ -172,6 +191,212 @@ public void makeDigestDoesntConsiderRepairedDataInfo()
assertEquals(response1.digest(command1), response2.digest(command2));
}
+ @Test
+ public void inMemoryResponseEmptyIteratorMatchesLocalDataResponse()
+ {
+ ReadCommand command = command(key(), metadata);
+ StubRepairedDataInfo rdi = new
StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+
+ ReadResponse localResponse =
command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
+ ReadResponse inMemoryResponse =
command.createLocalObjectResponse(EmptyIterators.unfilteredPartition(metadata),
rdi);
+
+ assertIteratorsEqual(command, localResponse, inMemoryResponse);
+ }
+
+ @Test
+ public void inMemoryResponseWithRowsMatchesLocalDataResponse()
+ {
+ int key = key();
+ ReadCommand command = command(key, metadata);
+ StubRepairedDataInfo rdi = new
StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+
+ DecoratedKey dk =
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key));
+ Row row = buildRow(metadata, dk);
+ PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, dk,
row);
+
+ ReadResponse localResponse =
command.createResponse(singlePartitionIterator(update), rdi);
+ ReadResponse inMemoryResponse =
command.createLocalObjectResponse(singlePartitionIterator(update), rdi);
+
+ assertIteratorsEqual(command, localResponse, inMemoryResponse);
+ }
+
+ @Test(expected = UnsupportedOperationException.class)
+ public void inMemoryResponseCannotBeSerialized()
+ {
+ ReadCommand command = command(key(), metadata);
+ StubRepairedDataInfo rdi = new
StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
+ ReadResponse response =
command.createLocalObjectResponse(EmptyIterators.unfilteredPartition(metadata),
rdi);
+
+ try (DataOutputBuffer out = new DataOutputBuffer())
+ {
+ ReadResponse.serializer.serialize(response, out,
MessagingService.current_version);
+ }
+ catch (IOException e)
+ {
+ fail("Unexpected IOException: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void inMemoryResponseWithOverflowMatchesLocalDataResponse()
+ {
+ // 5 rows, only 2 fit in memory — 3 overflow into the serialized buffer
+ testMultipleRows(2, 5);
+ }
+
+ @Test
+ public void inMemoryResponseAllRowsInMemoryWhenUnderLimit()
+ {
+ // 3 rows, limit is 10 — all fit in memory with no overflow
+ testMultipleRows(10, 3);
+ }
+
+ @Test
+ public void inMemoryResponseWithZeroMaxRowsUsesOnlyOverflow()
+ {
+ // all rows go directly into the overflow buffer
+ testMultipleRows(0, 3);
+ }
Review Comment:
We are missing a test that has a RT marker between in memory rows and the
overflow rows. I don't think we are handling this case correctly. We have a
potential data resurrection issue here. Can we add this test?
```suggestion
}
@Test
public void
inMemoryResponseWithRangeTombstoneBetweenInMemoryAndOverflow()
{
// RT at [1, 2] sits between the in-memory rows (0) and the overflow
rows (3-4)
testWithTombstones(1, 5, 1, 2);
}
```
The fix I suggest is in
`org.apache.cassandra.db.ReadResponse.InMemoryDataResponse.LimitedUnfilteredRowIterator#computeNext`
to ensure the RT open/close pairs are never split . We either keep them
entirely in memory or in overflow:
```
private boolean insideOpenMarker = false;
@Override
protected Unfiltered computeNext()
{
if (!wrapped.hasNext())
return endOfData();
if (rowCount >= maxRows && !insideOpenMarker)
{
hasOverflow = true;
return endOfData();
}
Unfiltered next = wrapped.next();
if (next.isRangeTombstoneMarker())
{
RangeTombstoneMarker marker = (RangeTombstoneMarker)
next;
insideOpenMarker = marker.isOpen(isReverseOrder);
}
else
rowCount++;
return next;
}
```
##########
src/java/org/apache/cassandra/db/ReadResponse.java:
##########
@@ -288,8 +325,8 @@ static abstract class DataResponse extends ReadResponse
// TODO: can the digest be calculated over the raw bytes now?
// The response, serialized in the current messaging version
private final ByteBuffer data;
- private final ByteBuffer repairedDataDigest;
- private final boolean isRepairedDigestConclusive;
+ protected ByteBuffer repairedDataDigest;
+ protected boolean isRepairedDigestConclusive;
Review Comment:
can we keep these as `private final` and instead provide a static method to
`build` an `InMemoryDataResponse` object? something like this:
```
static InMemoryDataResponse build(UnfilteredPartitionIterator iter,
ReadCommand command, RepairedDataInfo rdi, int maxRows)
{
ImmutableBTreePartition partition;
ByteBuffer overflow;
if (!iter.hasNext())
{
partition = null;
overflow = null;
}
else
{
try (UnfilteredRowIterator rowIter = iter.next())
{
LimitedUnfilteredRowIterator limitedIter = new
LimitedUnfilteredRowIterator(rowIter, maxRows);
partition = ImmutableBTreePartition.create(limitedIter);
// Uses buildOverflow (row-iterator level) to match the
UnfilteredRowIteratorSerializer
// deserializer used in makeIterator.
overflow = limitedIter.hasOverflow()
? LocalDataResponse.buildOverflow(rowIter,
command.columnFilter())
: null;
}
}
// Capture digest after consuming the iterator so that any
updates made by
// RepairedDataInfo transformations are reflected in the digest.
return new InMemoryDataResponse(partition, overflow,
rdi.getDigest(), rdi.isConclusive());
}
private InMemoryDataResponse(ImmutableBTreePartition partition,
ByteBuffer overflow,
ByteBuffer repairedDataDigest, boolean
isRepairedDigestConclusive)
{
super(null, repairedDataDigest, isRepairedDigestConclusive,
MessagingService.current_version, DeserializationHelper.Flag.LOCAL);
this.partition = partition;
this.overflow = overflow;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]