Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 ab0adf9f9 -> 95839aae2 refs/heads/cassandra-3.11 2a24acfa9 -> be2117492 refs/heads/trunk ebefc96a8 -> 77abf868a
Filter header only commit logs before recovery Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-13918 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/95839aae Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/95839aae Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/95839aae Branch: refs/heads/cassandra-3.0 Commit: 95839aae2fde28fa29b16741de6bd52c0697843f Parents: ab0adf9 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Thu Sep 28 15:01:35 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Fri Sep 29 15:05:30 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/commitlog/CommitLogReplayer.java | 57 +++++++++++++++++--- .../db/commitlog/CommitLogSegment.java | 2 +- .../db/commitlog/CompressedSegment.java | 2 +- .../db/commitlog/MemoryMappedSegment.java | 2 +- .../cassandra/db/commitlog/CommitLogTest.java | 53 ++++++++++++++++++ 6 files changed, 108 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a1f49cd..7ff61d3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Filter header only commit logs before recovery (CASSANDRA-13918) * AssertionError prepending to a list (CASSANDRA-13149) * Fix support for SuperColumn tables (CASSANDRA-12373) * Handle limit correctly on tables with strict liveness (CASSANDRA-13883) http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index b3b26dd..4fd263c 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -25,18 +25,15 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.zip.CRC32; import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; -import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -138,11 +135,59 @@ public class CommitLogReplayer return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter); } + private static boolean shouldSkip(File file) throws IOException, ConfigurationException + { + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); + if (desc.version < CommitLogDescriptor.VERSION_21) + { + return false; + } + try(ChannelProxy channel = new ChannelProxy(file); + RandomAccessReader reader = RandomAccessReader.open(channel)) + { + CommitLogDescriptor.readHeader(reader); + int end = reader.readInt(); + long filecrc = reader.readInt() & 0xffffffffL; + return end == 0 && filecrc == 0; + } + } + + private static List<File> filterCommitLogFiles(File[] toFilter) + { + List<File> filtered = new ArrayList<>(toFilter.length); + for (File file: toFilter) + { + try + { + if (shouldSkip(file)) + { + logger.info("Skipping playback of empty log: {}", file.getName()); + } + else + { + filtered.add(file); + } + } + catch (Exception e) + { + // let recover deal with it + filtered.add(file); + } + } + + return filtered; + } + public void recover(File[] clogs) throws IOException { - int i; - for (i = 0; i < clogs.length; ++i) - recover(clogs[i], i + 1 == clogs.length); + List<File> filteredLogs = filterCommitLogFiles(clogs); + + int i = 0; + for (File clog: filteredLogs) + { + i++; + recover(clog, i == filteredLogs.size()); + } } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java index f26f0dc..236a1b1 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java @@ -315,7 +315,7 @@ public abstract class CommitLogSegment syncComplete.signalAll(); } - protected void writeSyncMarker(ByteBuffer buffer, int offset, int filePos, int nextMarker) + protected static void writeSyncMarker(long id, ByteBuffer buffer, int offset, int filePos, int nextMarker) { CRC32 crc = new CRC32(); updateChecksumInt(crc, (int) (id & 0xFFFFFFFFL)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java index c73a30a..c00ce18 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/CompressedSegment.java @@ -134,7 +134,7 @@ public class CompressedSegment extends CommitLogSegment // Only one thread can be here at a given time. // Protected by synchronization on CommitLogSegment.sync(). - writeSyncMarker(compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); + writeSyncMarker(id, compressedBuffer, 0, (int) channel.position(), (int) channel.position() + compressedBuffer.remaining()); commitLog.allocator.addSize(compressedBuffer.limit()); channel.write(compressedBuffer); assert channel.position() - lastWrittenPos == compressedBuffer.limit(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java index 326469f..3a16d91 100644 --- a/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java +++ b/src/java/org/apache/cassandra/db/commitlog/MemoryMappedSegment.java @@ -77,7 +77,7 @@ public class MemoryMappedSegment extends CommitLogSegment // write previous sync marker to point to next sync marker // we don't chain the crcs here to ensure this method is idempotent if it fails - writeSyncMarker(buffer, startMarker, startMarker, nextMarker); + writeSyncMarker(id, buffer, startMarker, startMarker, nextMarker); try { SyncUtil.force((MappedByteBuffer) buffer); http://git-wip-us.apache.org/repos/asf/cassandra/blob/95839aae/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java index 1543415..9e9ee53 100644 --- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java +++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTest.java @@ -30,6 +30,7 @@ import java.util.function.BiConsumer; import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.google.common.io.Files; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -134,6 +135,58 @@ public class CommitLogTest CommitLog.instance.recover(new File[]{ tmpFile(CommitLogDescriptor.current_version) }); } + /** + * Since commit log segments can be allocated before they're needed, the commit log file with the highest + * id isn't neccesarily the last log that we wrote to. We should remove header only logs on recover so we + * can tolerate truncated logs + */ + @Test + public void testHeaderOnlyFileFiltering() throws Exception + { + File directory = Files.createTempDir(); + + CommitLogDescriptor desc1 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 1, null); + CommitLogDescriptor desc2 = new CommitLogDescriptor(CommitLogDescriptor.current_version, 2, null); + + ByteBuffer buffer; + + // this has a header and malformed data + File file1 = new File(directory, desc1.fileName()); + buffer = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buffer, desc1); + int pos = buffer.position(); + CommitLogSegment.writeSyncMarker(desc1.id, buffer, buffer.position(), buffer.position(), buffer.position() + 128); + buffer.position(pos + 8); + buffer.putInt(5); + buffer.putInt(6); + + try (OutputStream lout = new FileOutputStream(file1)) + { + lout.write(buffer.array()); + } + + // this has only a header + File file2 = new File(directory, desc2.fileName()); + buffer = ByteBuffer.allocate(1024); + CommitLogDescriptor.writeHeader(buffer, desc2); + try (OutputStream lout = new FileOutputStream(file2)) + { + lout.write(buffer.array()); + } + + // one corrupt file and one header only file should be ok + runExpecting(() -> { + CommitLog.instance.recover(file1, file2); + return null; + }, null); + + // 2 corrupt files and one header only file should fail + runExpecting(() -> { + CommitLog.instance.recover(file1, file1, file2); + return null; + }, CommitLogReplayException.class); + } + @Test public void testRecoveryWithEmptyLog20() throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org