Fixes for compacting larger-than-memory rows patch by jbellis; reviewed by marcuse for CASSANDRA-6274
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d4b5b0db Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d4b5b0db Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d4b5b0db Branch: refs/heads/trunk Commit: d4b5b0dbc541b4b6249ccd1b507c777d7fc0bc4f Parents: 7187a8a Author: Jonathan Ellis <jbel...@apache.org> Authored: Wed Oct 30 10:14:15 2013 -0500 Committer: Jonathan Ellis <jbel...@apache.org> Committed: Wed Oct 30 10:14:15 2013 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/ColumnIndex.java | 22 ++++---- .../db/compaction/AbstractCompactedRow.java | 4 +- .../db/compaction/CompactionController.java | 2 +- .../db/compaction/LazilyCompactedRow.java | 34 ++++++++---- .../cassandra/db/compaction/Scrubber.java | 52 ++++++++++++------- .../cassandra/io/sstable/SSTableWriter.java | 2 +- test/data/serialization/2.0/db.RowMutation.bin | Bin 3599 -> 3599 bytes 8 files changed, 74 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e73a2b5..31b944c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.3 + * Fixes for compacting larger-than-memory rows (CASSANDRA-6274) * Compact hottest sstables first and optionally omit coldest from compaction entirely (CASSANDRA-6109) * Fix modifying column_metadata from thrift (CASSANDRA-6182) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java b/src/java/org/apache/cassandra/db/ColumnIndex.java index a5e0447..75a06d7 100644 --- a/src/java/org/apache/cassandra/db/ColumnIndex.java +++ b/src/java/org/apache/cassandra/db/ColumnIndex.java @@ -140,20 +140,22 @@ public class ColumnIndex } ColumnIndex index = build(); - finish(); + maybeWriteEmptyRowHeader(); return index; } - public ColumnIndex build(Iterable<OnDiskAtom> columns) throws IOException + /** + * The important distinction wrt build() is that we may be building for a row that ends up + * being compacted away entirely, i.e., the input consists only of expired tombstones (or + * columns shadowed by expired tombstone). Thus, it is the caller's responsibility + * to decide whether to write the header for an empty row. + */ + public ColumnIndex buildForCompaction(Iterator<OnDiskAtom> columns) throws IOException { - for (OnDiskAtom c : columns) - add(c); - ColumnIndex index = build(); - - finish(); - - return index; + while (columns.hasNext()) + add(columns.next()); + return build(); } public void add(OnDiskAtom column) throws IOException @@ -219,7 +221,7 @@ public class ColumnIndex return result; } - public void finish() throws IOException + public void maybeWriteEmptyRowHeader() throws IOException { if (!deletionInfo.isLive()) maybeWriteRowHeader(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java index 966770f..734155e 100644 --- a/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/AbstractCompactedRow.java @@ -42,10 +42,10 @@ public abstract class AbstractCompactedRow implements Closeable /** * write the row (size + column index + filter + column data, but NOT row key) to @param out. - * It is an error to call this if isEmpty is false. (Because the key is appended first, - * so we'd have an incomplete row written.) * * write() may change internal state; it is NOT valid to call write() or update() a second time. + * + * @return index information for the written row, or null if the compaction resulted in only expired tombstones. */ public abstract RowIndexEntry write(long currentPosition, DataOutput out) throws IOException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index 9552895..65515d6 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -154,7 +154,7 @@ public class CompactionController /** * @return true if it's okay to drop tombstones for the given row, i.e., if we know all the verisons of the row - * are included in the compaction set + * older than @param maxDeletionTimestamp are included in the compaction set */ public boolean shouldPurge(DecoratedKey key, long maxDeletionTimestamp) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java index 7d7c5a4..0cdbbb7 100644 --- a/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java +++ b/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java @@ -52,13 +52,13 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable private final List<? extends OnDiskAtomIterator> rows; private final CompactionController controller; private final boolean shouldPurge; - private ColumnFamily emptyColumnFamily; + private final ColumnFamily emptyColumnFamily; private Reducer reducer; private ColumnStats columnStats; private boolean closed; private ColumnIndex.Builder indexBuilder; private final SecondaryIndexManager.Updater indexer; - private long maxDelTimestamp; + private long maxTombstoneTimestamp; public LazilyCompactedRow(CompactionController controller, List<? extends OnDiskAtomIterator> rows) { @@ -67,18 +67,31 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable this.controller = controller; indexer = controller.cfs.indexManager.updaterFor(key); - maxDelTimestamp = Long.MIN_VALUE; + ColumnFamily rawCf = null; + maxTombstoneTimestamp = Long.MIN_VALUE; for (OnDiskAtomIterator row : rows) { ColumnFamily cf = row.getColumnFamily(); - maxDelTimestamp = Math.max(maxDelTimestamp, cf.deletionInfo().maxTimestamp()); + maxTombstoneTimestamp = Math.max(maxTombstoneTimestamp, cf.deletionInfo().maxTimestamp()); - if (emptyColumnFamily == null) - emptyColumnFamily = cf; + if (rawCf == null) + rawCf = cf; else - emptyColumnFamily.delete(cf); + rawCf.delete(cf); } - this.shouldPurge = controller.shouldPurge(key, maxDelTimestamp); + + // Don't pass maxTombstoneTimestamp to shouldPurge since we might well have cells with + // tombstones newer than the row-level tombstones we've seen -- but we won't know that + // until we iterate over them. By passing MAX_VALUE we will only purge if there are + // no other versions of this row present. + this.shouldPurge = controller.shouldPurge(key, Long.MAX_VALUE); + + // even if we can't delete all the tombstones allowed by gcBefore, we should still call removeDeleted + // to get rid of redundant row-level and range tombstones + assert rawCf != null; + int overriddenGcBefore = shouldPurge ? controller.gcBefore : Integer.MIN_VALUE; + ColumnFamily purgedCf = ColumnFamilyStore.removeDeleted(rawCf, overriddenGcBefore); + emptyColumnFamily = purgedCf == null ? ArrayBackedSortedColumns.factory.create(controller.cfs.metadata) : purgedCf; } public RowIndexEntry write(long currentPosition, DataOutput out) throws IOException @@ -89,7 +102,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable try { indexBuilder = new ColumnIndex.Builder(emptyColumnFamily, key.key, out); - columnsIndex = indexBuilder.build(this); + columnsIndex = indexBuilder.buildForCompaction(iterator()); if (columnsIndex.columnsIndex.isEmpty()) { boolean cfIrrelevant = shouldPurge @@ -107,7 +120,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable // (however, if there are zero columns, iterator() will not be called by ColumnIndexer and reducer will be null) columnStats = new ColumnStats(reducer == null ? 0 : reducer.columns, reducer == null ? Long.MAX_VALUE : reducer.minTimestampSeen, - reducer == null ? maxDelTimestamp : Math.max(maxDelTimestamp, reducer.maxTimestampSeen), + reducer == null ? maxTombstoneTimestamp : Math.max(maxTombstoneTimestamp, reducer.maxTimestampSeen), reducer == null ? Integer.MIN_VALUE : reducer.maxLocalDeletionTimeSeen, reducer == null ? new StreamingHistogram(SSTable.TOMBSTONE_HISTOGRAM_BIN_SIZE) : reducer.tombstones, reducer == null ? Collections.<ByteBuffer>emptyList() : reducer.minColumnNameSeen, @@ -115,6 +128,7 @@ public class LazilyCompactedRow extends AbstractCompactedRow implements Iterable ); reducer = null; + indexBuilder.maybeWriteEmptyRowHeader(); out.writeShort(SSTableWriter.END_OF_ROW); close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index d8a2745..708e929 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -54,14 +54,14 @@ public class Scrubber implements Closeable private final OutputHandler outputHandler; - private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>() + private static final Comparator<Row> rowComparator = new Comparator<Row>() { - public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2) + public int compare(Row r1, Row r2) { return r1.key.compareTo(r2.key); } }; - private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator); + private final SortedSet<Row> outOfOrderRows = new TreeSet<>(rowComparator); public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException { @@ -100,7 +100,7 @@ public class Scrubber implements Closeable public void scrub() { - outputHandler.output("Scrubbing " + sstable); + outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length())); try { ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); @@ -113,7 +113,7 @@ public class Scrubber implements Closeable // TODO errors when creating the writer may leave empty temp files. writer = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable); - AbstractCompactedRow prevRow = null; + DecoratedKey prevKey = null; while (!dataFile.isEOF()) { @@ -184,20 +184,19 @@ public class Scrubber implements Closeable if (dataSize > dataFile.length()) throw new IOError(new IOException("Impossible row size " + dataSize)); - SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); - AbstractCompactedRow compactedRow = controller.getCompactedRow(row); - if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0) + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + if (prevKey != null && prevKey.compareTo(key) > 0) { - outOfOrderRows.add(compactedRow); - outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key)); + saveOutOfOrderRow(prevKey, key, atoms); continue; } + AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); if (writer.append(compactedRow) == null) emptyRows++; else goodRows++; - prevRow = compactedRow; + prevKey = key; if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex) outputHandler.warn("Index file contained a different key or row size; using key from data file"); } @@ -215,19 +214,19 @@ public class Scrubber implements Closeable key = sstable.partitioner.decorateKey(currentIndexKey); try { - SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataSizeFromIndex, true); - AbstractCompactedRow compactedRow = controller.getCompactedRow(row); - if (prevRow != null && acrComparator.compare(prevRow, compactedRow) >= 0) + SSTableIdentityIterator atoms = new SSTableIdentityIterator(sstable, dataFile, key, dataSize, true); + if (prevKey != null && prevKey.compareTo(key) > 0) { - outOfOrderRows.add(compactedRow); - outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key)); + saveOutOfOrderRow(prevKey, key, atoms); continue; } + + AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); if (writer.append(compactedRow) == null) emptyRows++; else goodRows++; - prevRow = compactedRow; + prevKey = key; } catch (Throwable th2) { @@ -273,8 +272,8 @@ public class Scrubber implements Closeable if (!outOfOrderRows.isEmpty()) { SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable); - for (AbstractCompactedRow row : outOfOrderRows) - inOrderWriter.append(row); + for (Row row : outOfOrderRows) + inOrderWriter.append(row.key, row.cf); newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge); outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable)); } @@ -294,6 +293,21 @@ public class Scrubber implements Closeable } } + private void saveOutOfOrderRow(DecoratedKey prevKey, DecoratedKey key, SSTableIdentityIterator atoms) + { + // TODO bitch if the row is too large? if it is there's not much we can do ... + outputHandler.warn(String.format("Out of order row detected (%s found after %s)", key, prevKey)); + // adding atoms in sorted order is worst-case for TMBSC, but we shouldn't need to do this very often + // and there's no sense in failing on mis-sorted cells when a TreeMap could safe us + ColumnFamily cf = atoms.getColumnFamily().cloneMeShallow(TreeMapBackedSortedColumns.factory, false); + while (atoms.hasNext()) + { + OnDiskAtom atom = atoms.next(); + cf.addAtom(atom); + } + outOfOrderRows.add(new Row(key, cf)); + } + public SSTableReader getNewSSTable() { return newSstable; http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index ac598bd..70c0b42 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -259,7 +259,7 @@ public class SSTableWriter extends SSTable columnIndexer.add(atom); // This write the atom on disk too } - columnIndexer.finish(); + columnIndexer.maybeWriteEmptyRowHeader(); dataFile.stream.writeShort(END_OF_ROW); } catch (IOException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d4b5b0db/test/data/serialization/2.0/db.RowMutation.bin ---------------------------------------------------------------------- diff --git a/test/data/serialization/2.0/db.RowMutation.bin b/test/data/serialization/2.0/db.RowMutation.bin index d6abf2d..73d93e8 100644 Binary files a/test/data/serialization/2.0/db.RowMutation.bin and b/test/data/serialization/2.0/db.RowMutation.bin differ