Merge branch 'cassandra-2.2' into cassandra-3.0 Conflicts: src/java/org/apache/cassandra/config/CFMetaData.java src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/929438b8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/929438b8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/929438b8 Branch: refs/heads/trunk Commit: 929438b8be32e38f6d921bfdc4a011cd526dfeb7 Parents: e389dc4 9c3b967 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Aug 10 09:12:52 2015 +0200 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Aug 10 09:13:01 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 +- .../org/apache/cassandra/config/CFMetaData.java | 7 +- .../apache/cassandra/db/ColumnFamilyStore.java | 29 ++++++- .../cassandra/db/ColumnFamilyStoreMBean.java | 19 +++-- .../compaction/CompactionStrategyManager.java | 51 +++++++++--- .../db/compaction/CompactionsCQLTest.java | 81 +++++++++++++++++++- 7 files changed, 166 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 7cb9f16,772455c..639dd59 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -19,39 -13,6 +19,40 @@@ Merged from 2.1 when both exist (CASSANDRA-9777) * Release snapshot selfRef when doing snapshot repair (CASSANDRA-9998) * Cannot replace token does not exist - DN node removed as Fat Client (CASSANDRA-9871) +Merged from 2.0: + * Don't cast expected bf size to an int (CASSANDRA-9959) + + +3.0.0-alpha1 + * Implement proper sandboxing for UDFs (CASSANDRA-9402) + * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066) + * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850) + * Metrics should use up to date nomenclature (CASSANDRA-9448) + * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384) + * Cleanup crc and adler code for java 8 (CASSANDRA-9650) + * Storage engine refactor (CASSANDRA-8099, 9743, 9746, 9759, 9781, 9808, 9825, + 9848, 9705, 9859, 9867, 9874, 9828, 9801) + * Update Guava to 18.0 (CASSANDRA-9653) + * Bloom filter false positive ratio is not honoured (CASSANDRA-8413) + * New option for cassandra-stress to leave a ratio of columns null (CASSANDRA-9522) + * Change hinted_handoff_enabled yaml setting, JMX (CASSANDRA-9035) + * Add algorithmic token allocation (CASSANDRA-7032) + * Add nodetool command to replay batchlog (CASSANDRA-9547) + * Make file buffer cache independent of paths being read (CASSANDRA-8897) + * Remove deprecated legacy Hadoop code (CASSANDRA-9353) + * Decommissioned nodes will not rejoin the cluster (CASSANDRA-8801) + * Change gossip stabilization to use endpoit size (CASSANDRA-9401) + * Change default garbage collector to G1 (CASSANDRA-7486) + * Populate TokenMetadata early during startup (CASSANDRA-9317) + * Undeprecate cache recentHitRate (CASSANDRA-6591) + * Add support for selectively varint encoding fields (CASSANDRA-9499, 9865) + * Materialized Views (CASSANDRA-6477) +Merged from 2.2: + * Avoid grouping sstables for anticompaction with DTCS (CASSANDRA-9900) + * UDF / UDA execution time in trace (CASSANDRA-9723) + * Fix broken internode SSL (CASSANDRA-9884) +Merged from 2.1: ++ * Add new JMX methods to change local compaction strategy (CASSANDRA-9965) * Fix handling of enable/disable autocompaction (CASSANDRA-9899) * Add consistency level to tracing ouput (CASSANDRA-9827) * Remove repair snapshot leftover on startup (CASSANDRA-7357) http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/NEWS.txt ---------------------------------------------------------------------- diff --cc NEWS.txt index 0fa4ded,ccc5cc8..5a690bd --- a/NEWS.txt +++ b/NEWS.txt @@@ -13,55 -13,6 +13,56 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. +3.0 +=== + +New features +------------ + - Materialized Views, which allow for server-side denormalization, is now + available. Materialized views provide an alternative to secondary indexes + for non-primary key queries, and perform much better for indexing high + cardinality columns. + See http://www.datastax.com/dev/blog/new-in-cassandra-3-0-materialized-views + + +Upgrading +--------- + - 3.0 requires Java 8u20 or later. + - The default JVM GC has been changed to G1GC. + - The default JVM flag -XX:+PerfDisableSharedMem will cause the following tools JVM + to stop working: jps, jstack, jinfo, jmc, jcmd as well as 3rd party tools like Jolokia. + If you wish to use these tools you can comment this flag out in cassandra-env.{sh,ps1} + - New write stages have been added for batchlog and materialized view mutations + you can set their size in cassandra.yaml + - User defined functions are now executed in a sandbox. + To use UDFs and UDAs, you have to enable them in cassandra.yaml. + - New SSTable version 'la' with improved bloom-filter false-positive handling + compared to previous version 'ka' used in 2.2 and 2.1. Running sstableupgrade + is not necessary but recommended. + - Before upgrading to 3.0, make sure that your cluster is in complete agreement + (schema versions outputted by `nodetool describecluster` are all the same). + - Schema metadata is now stored in the new `system_schema` keyspace, and + legacy `system.schema_*` tables are now gone; see CASSANDRA-6717 for details. + - Pig's CassandraStorage has been removed. Use CqlNativeStorage instead. + - Hadoop BulkOutputFormat and BulkRecordWriter have been removed; use + CqlBulkOutputFormat and CqlBulkRecordWriter instead. + - Hadoop ColumnFamilyInputFormat and ColumnFamilyOutputFormat have been removed; + use CqlInputFormat and CqlOutputFormat instead. + - Hadoop ColumnFamilyRecordReader and ColumnFamilyRecordWriter have been removed; + use CqlRecordReader and CqlRecordWriter instead. + - hinted_handoff_enabled in cassandra.yaml no longer supports a list of data centers. + To specify a list of excluded data centers when hinted_handoff_enabled is set to true, + use hinted_handoff_disabled_datacenters, see CASSANDRA-9035 for details. + - The `sstable_compression` and `chunk_length_kb` compression options have been deprecated. + The new options are `class` and `chunk_length_in_kb`. Disabling compression should now + be done by setting the new option `enabled` to `false`. + - Only map syntax is now allowed for caching options. ALL/NONE/KEYS_ONLY/ROWS_ONLY syntax + has been deprecated since 2.1.0 and is being removed in 3.0.0. + - Batchlog entries are now stored in a new table - system.batches. + The old one has been deprecated. - ++ - JMX methods set/getCompactionStrategyClass have been removed, use ++ set/getLocalCompactionStrategy or set/getLocalCompactionStrategyJson instead. + 2.2 === http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/CFMetaData.java index 1d38274,6468973..7719587 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@@ -772,25 -819,60 +772,26 @@@ public final class CFMetaDat throw new ConfigurationException(String.format("Column family comparators do not match or are not compatible (found %s; expected %s).", cfm.comparator.getClass().getSimpleName(), comparator.getClass().getSimpleName())); } - public static void validateCompactionOptions(Class<? extends AbstractCompactionStrategy> strategyClass, Map<String, String> options) throws ConfigurationException + public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException + { + className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className; + Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy"); + if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass)) + throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className)); + + return strategyClass; + } + - public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs) ++ public static AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyStore cfs, ++ CompactionParams compactionParams) { try { - if (options == null) - return; - - Map<?,?> unknownOptions = (Map) strategyClass.getMethod("validateOptions", Map.class).invoke(null, options); - if (!unknownOptions.isEmpty()) - throw new ConfigurationException(String.format("Properties specified %s are not understood by %s", unknownOptions.keySet(), strategyClass.getSimpleName())); + Constructor<? extends AbstractCompactionStrategy> constructor = - params.compaction.klass().getConstructor(ColumnFamilyStore.class, Map.class); - return constructor.newInstance(cfs, params.compaction.options()); ++ compactionParams.klass().getConstructor(ColumnFamilyStore.class, Map.class); ++ return constructor.newInstance(cfs, compactionParams.options()); } - catch (NoSuchMethodException e) - { - logger.warn("Compaction Strategy {} does not have a static validateOptions method. Validation ignored", strategyClass.getName()); - } - catch (InvocationTargetException e) - { - if (e.getTargetException() instanceof ConfigurationException) - throw (ConfigurationException) e.getTargetException(); - throw new ConfigurationException("Failed to validate compaction options: " + options); - } - catch (ConfigurationException e) - { - throw e; - } - catch (Exception e) - { - throw new ConfigurationException("Failed to validate compaction options: " + options); - } - } - - public static Class<? extends AbstractCompactionStrategy> createCompactionStrategy(String className) throws ConfigurationException - { - className = className.contains(".") ? className : "org.apache.cassandra.db.compaction." + className; - Class<AbstractCompactionStrategy> strategyClass = FBUtilities.classForName(className, "compaction strategy"); - if (className.equals(WrappingCompactionStrategy.class.getName())) - throw new ConfigurationException("You can't set WrappingCompactionStrategy as the compaction strategy!"); - if (!AbstractCompactionStrategy.class.isAssignableFrom(strategyClass)) - throw new ConfigurationException(String.format("Specified compaction strategy class (%s) is not derived from AbstractReplicationStrategy", className)); - - return strategyClass; - } - - public static AbstractCompactionStrategy createCompactionStrategyInstance(Class<? extends AbstractCompactionStrategy> compactionStrategyClass, - ColumnFamilyStore cfs, - Map<String, String> compactionStrategyOptions) - { - try - { - Constructor<? extends AbstractCompactionStrategy> constructor = - compactionStrategyClass.getConstructor(ColumnFamilyStore.class, Map.class); - return constructor.newInstance(cfs, compactionStrategyOptions); - } - catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) + catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index beb2b93,6b71be9..4ae6694 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -248,14 -259,57 +248,35 @@@ public class ColumnFamilyStore implemen }; } - public void setCompactionStrategyClass(String compactionStrategyClass) + public void setLocalCompactionStrategyJson(String options) { - throw new UnsupportedOperationException("ColumnFamilyStore.setCompactionStrategyClass() method is no longer supported"); + setLocalCompactionStrategy(FBUtilities.fromJsonMap(options)); } - public String getCompactionStrategyClass() + public String getLocalCompactionStrategyJson() { - return metadata.params.compaction.klass().getName(); + return FBUtilities.json(getLocalCompactionStrategy()); + } + + public void setLocalCompactionStrategy(Map<String, String> options) + { + try + { - Map<String, String> optionsCopy = new HashMap<>(options); - Class<? extends AbstractCompactionStrategy> compactionStrategyClass = CFMetaData.createCompactionStrategy(optionsCopy.get("class")); - optionsCopy.remove("class"); - CFMetaData.validateCompactionOptions(compactionStrategyClass, optionsCopy); - compactionStrategyWrapper.setNewLocalCompactionStrategy(compactionStrategyClass, optionsCopy); ++ CompactionParams compactionParams = CompactionParams.fromMap(options); ++ compactionParams.validate(); ++ compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams); + } + catch (Throwable t) + { + logger.error("Could not set new local compaction strategy", t); + // dont propagate the ConfigurationException over jmx, user will only see a ClassNotFoundException + throw new IllegalArgumentException("Could not set new local compaction strategy: "+t.getMessage()); + } + } + + public Map<String, String> getLocalCompactionStrategy() + { - Map<String, String> options = new HashMap<>(compactionStrategyWrapper.options); - options.put("class", compactionStrategyWrapper.getName()); - return options; - } - - public void setCompactionStrategyClass(String compactionStrategyClass) - { - try - { - metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass); - compactionStrategyWrapper.maybeReloadCompactionStrategy(metadata); - } - catch (ConfigurationException e) - { - throw new IllegalArgumentException(e.getMessage()); - } - } - - public String getCompactionStrategyClass() - { - return metadata.compactionStrategyClass.getName(); ++ return compactionStrategyManager.getCompactionParams().asMap(); } public Map<String,String> getCompressionParameters() http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java index c23df74,1a8ba1d..84c6dd1 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStoreMBean.java @@@ -70,15 -67,36 +70,24 @@@ public interface ColumnFamilyStoreMBea public void setMaximumCompactionThreshold(int threshold); /** - * Sets the compaction strategy by class name - * @param className the name of the compaction strategy class + * Sets the compaction strategy locally for this node + * + * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted + * + * @param options compaction options with the same syntax as when doing ALTER ... WITH compaction = {..} */ - public void setCompactionStrategyClass(String className); + public void setLocalCompactionStrategyJson(String options); + public String getLocalCompactionStrategyJson(); /** - * Gets the compaction strategy class name + * Sets the compaction strategy locally for this node + * + * Note that this will be set until an ALTER with compaction = {..} is executed or the node is restarted + * + * @param options compaction options map */ - public String getCompactionStrategyClass(); + public void setLocalCompactionStrategy(Map<String, String> options); + public Map<String, String> getLocalCompactionStrategy(); - /** - * Sets the compaction strategy by class name - * @param className the name of the compaction strategy class - */ - @Deprecated - public void setCompactionStrategyClass(String className); - - /** - * Gets the compaction strategy class name - */ - @Deprecated - public String getCompactionStrategyClass(); /** * Get the compression parameters http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 4f6dfa2,0000000..7204da0 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@@ -1,453 -1,0 +1,482 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.compaction; + + +import java.util.*; +import java.util.concurrent.Callable; + +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.notifications.*; +import org.apache.cassandra.schema.CompactionParams; + +/** + * Manages the compaction strategies. + * + * Currently has two instances of actual compaction strategies - one for repaired data and one for + * unrepaired data. This is done to be able to totally separate the different sets of sstables. + */ +public class CompactionStrategyManager implements INotificationConsumer +{ + private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); + private final ColumnFamilyStore cfs; + private volatile AbstractCompactionStrategy repaired; + private volatile AbstractCompactionStrategy unrepaired; + private volatile boolean enabled = true; + public boolean isActive = true; + private volatile CompactionParams params; ++ /* ++ We keep a copy of the schema compaction parameters here to be able to decide if we ++ should update the compaction strategy in maybeReloadCompactionStrategy() due to an ALTER. ++ ++ If a user changes the local compaction strategy and then later ALTERs a compaction parameter, ++ we will use the new compaction parameters. ++ */ ++ private CompactionParams schemaCompactionParams; + + public CompactionStrategyManager(ColumnFamilyStore cfs) + { + cfs.getTracker().subscribe(this); + logger.debug("{} subscribed to the data tracker.", this); + this.cfs = cfs; + reload(cfs.metadata); + params = cfs.metadata.params.compaction; + enabled = params.isEnabled(); + } + + /** + * Return the next background task + * + * Returns a task for the compaction strategy that needs it the most (most estimated remaining tasks) + * + */ + public synchronized AbstractCompactionTask getNextBackgroundTask(int gcBefore) + { + if (!isEnabled()) + return null; + + maybeReload(cfs.metadata); + + if (repaired.getEstimatedRemainingTasks() > unrepaired.getEstimatedRemainingTasks()) + { + AbstractCompactionTask repairedTask = repaired.getNextBackgroundTask(gcBefore); + if (repairedTask != null) + return repairedTask; + return unrepaired.getNextBackgroundTask(gcBefore); + } + else + { + AbstractCompactionTask unrepairedTask = unrepaired.getNextBackgroundTask(gcBefore); + if (unrepairedTask != null) + return unrepairedTask; + return repaired.getNextBackgroundTask(gcBefore); + } + } + + public boolean isEnabled() + { + return enabled && isActive; + } + + public synchronized void resume() + { + isActive = true; + } + + /** + * pause compaction while we cancel all ongoing compactions + * + * Separate call from enable/disable to not have to save the enabled-state externally + */ + public synchronized void pause() + { + isActive = false; + } + + + private void startup() + { + for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL)) + { + if (sstable.openReason != SSTableReader.OpenReason.EARLY) + getCompactionStrategyFor(sstable).addSSTable(sstable); + } + repaired.startup(); + unrepaired.startup(); + } + + /** + * return the compaction strategy for the given sstable + * + * returns differently based on the repaired status + * @param sstable + * @return + */ + private AbstractCompactionStrategy getCompactionStrategyFor(SSTableReader sstable) + { + if (sstable.isRepaired()) + return repaired; + else + return unrepaired; + } + + public void shutdown() + { + isActive = false; + repaired.shutdown(); + unrepaired.shutdown(); + } + + public synchronized void maybeReload(CFMetaData metadata) + { - if (repaired != null && repaired.getClass().equals(metadata.params.compaction.klass()) - && unrepaired != null && unrepaired.getClass().equals(metadata.params.compaction.klass()) - && repaired.options.equals(metadata.params.compaction.options()) // todo: assumes all have the same options - && unrepaired.options.equals(metadata.params.compaction.options())) ++ // compare the old schema configuration to the new one, ignore any locally set changes. ++ if (metadata.params.compaction.equals(schemaCompactionParams)) + return; + reload(metadata); + } + + /** + * Reload the compaction strategies + * + * Called after changing configuration and at startup. + * @param metadata + */ + public synchronized void reload(CFMetaData metadata) + { + boolean disabledWithJMX = !enabled && shouldBeEnabled(); - if (repaired != null) - repaired.shutdown(); - if (unrepaired != null) - unrepaired.shutdown(); - repaired = metadata.createCompactionStrategyInstance(cfs); - unrepaired = metadata.createCompactionStrategyInstance(cfs); - params = metadata.params.compaction; ++ setStrategy(metadata.params.compaction); ++ schemaCompactionParams = metadata.params.compaction; ++ + if (disabledWithJMX || !shouldBeEnabled()) + disable(); + else + enable(); + startup(); + } + + public void replaceFlushed(Memtable memtable, SSTableReader sstable) + { + cfs.getTracker().replaceFlushed(memtable, sstable); + if (sstable != null) + CompactionManager.instance.submitBackground(cfs); + } + + public int getUnleveledSSTables() + { + if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy) + { + int count = 0; + count += ((LeveledCompactionStrategy)repaired).getLevelSize(0); + count += ((LeveledCompactionStrategy)unrepaired).getLevelSize(0); + return count; + } + return 0; + } + + public synchronized int[] getSSTableCountPerLevel() + { + if (repaired instanceof LeveledCompactionStrategy && unrepaired instanceof LeveledCompactionStrategy) + { + int [] res = new int[LeveledManifest.MAX_LEVEL_COUNT]; + int[] repairedCountPerLevel = ((LeveledCompactionStrategy) repaired).getAllLevelSize(); + res = sumArrays(res, repairedCountPerLevel); + int[] unrepairedCountPerLevel = ((LeveledCompactionStrategy) unrepaired).getAllLevelSize(); + res = sumArrays(res, unrepairedCountPerLevel); + return res; + } + return null; + } + + private static int[] sumArrays(int[] a, int[] b) + { + int[] res = new int[Math.max(a.length, b.length)]; + for (int i = 0; i < res.length; i++) + { + if (i < a.length && i < b.length) + res[i] = a[i] + b[i]; + else if (i < a.length) + res[i] = a[i]; + else + res[i] = b[i]; + } + return res; + } + + public boolean shouldDefragment() + { + assert repaired.getClass().equals(unrepaired.getClass()); + return repaired.shouldDefragment(); + } + + + public synchronized void handleNotification(INotification notification, Object sender) + { + if (notification instanceof SSTableAddedNotification) + { + SSTableAddedNotification flushedNotification = (SSTableAddedNotification) notification; + if (flushedNotification.added.isRepaired()) + repaired.addSSTable(flushedNotification.added); + else + unrepaired.addSSTable(flushedNotification.added); + } + else if (notification instanceof SSTableListChangedNotification) + { + SSTableListChangedNotification listChangedNotification = (SSTableListChangedNotification) notification; + Set<SSTableReader> repairedRemoved = new HashSet<>(); + Set<SSTableReader> repairedAdded = new HashSet<>(); + Set<SSTableReader> unrepairedRemoved = new HashSet<>(); + Set<SSTableReader> unrepairedAdded = new HashSet<>(); + + for (SSTableReader sstable : listChangedNotification.removed) + { + if (sstable.isRepaired()) + repairedRemoved.add(sstable); + else + unrepairedRemoved.add(sstable); + } + for (SSTableReader sstable : listChangedNotification.added) + { + if (sstable.isRepaired()) + repairedAdded.add(sstable); + else + unrepairedAdded.add(sstable); + } + if (!repairedRemoved.isEmpty()) + { + repaired.replaceSSTables(repairedRemoved, repairedAdded); + } + else + { + for (SSTableReader sstable : repairedAdded) + repaired.addSSTable(sstable); + } + + if (!unrepairedRemoved.isEmpty()) + { + unrepaired.replaceSSTables(unrepairedRemoved, unrepairedAdded); + } + else + { + for (SSTableReader sstable : unrepairedAdded) + unrepaired.addSSTable(sstable); + } + } + else if (notification instanceof SSTableRepairStatusChanged) + { + for (SSTableReader sstable : ((SSTableRepairStatusChanged) notification).sstable) + { + if (sstable.isRepaired()) + { + unrepaired.removeSSTable(sstable); + repaired.addSSTable(sstable); + } + else + { + repaired.removeSSTable(sstable); + unrepaired.addSSTable(sstable); + } + } + } + else if (notification instanceof SSTableDeletingNotification) + { + SSTableReader sstable = ((SSTableDeletingNotification)notification).deleting; + if (sstable.isRepaired()) + repaired.removeSSTable(sstable); + else + unrepaired.removeSSTable(sstable); + } + } + + public void enable() + { + if (repaired != null) + repaired.enable(); + if (unrepaired != null) + unrepaired.enable(); + // enable this last to make sure the strategies are ready to get calls. + enabled = true; + } + + public void disable() + { + // disable this first avoid asking disabled strategies for compaction tasks + enabled = false; + if (repaired != null) + repaired.disable(); + if (unrepaired != null) + unrepaired.disable(); + } + + /** + * Create ISSTableScanner from the given sstables + * + * Delegates the call to the compaction strategies to allow LCS to create a scanner + * @param sstables + * @param range + * @return + */ + @SuppressWarnings("resource") + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables, Collection<Range<Token>> ranges) + { + List<SSTableReader> repairedSSTables = new ArrayList<>(); + List<SSTableReader> unrepairedSSTables = new ArrayList<>(); + for (SSTableReader sstable : sstables) + { + if (sstable.isRepaired()) + repairedSSTables.add(sstable); + else + unrepairedSSTables.add(sstable); + } + + Set<ISSTableScanner> scanners = new HashSet<>(sstables.size()); + + for (Range<Token> range : ranges) + { + AbstractCompactionStrategy.ScannerList repairedScanners = repaired.getScanners(repairedSSTables, range); + AbstractCompactionStrategy.ScannerList unrepairedScanners = unrepaired.getScanners(unrepairedSSTables, range); + + for (ISSTableScanner scanner : Iterables.concat(repairedScanners.scanners, unrepairedScanners.scanners)) + { + if (!scanners.add(scanner)) + scanner.close(); + } + } + + return new AbstractCompactionStrategy.ScannerList(new ArrayList<>(scanners)); + } + + public synchronized AbstractCompactionStrategy.ScannerList getScanners(Collection<SSTableReader> sstables) + { + return getScanners(sstables, Collections.singleton(null)); + } + + public Collection<Collection<SSTableReader>> groupSSTablesForAntiCompaction(Collection<SSTableReader> sstablesToGroup) + { + return unrepaired.groupSSTablesForAntiCompaction(sstablesToGroup); + } + + public long getMaxSSTableBytes() + { + return unrepaired.getMaxSSTableBytes(); + } + + public AbstractCompactionTask getCompactionTask(LifecycleTransaction txn, int gcBefore, long maxSSTableBytes) + { + return getCompactionStrategyFor(txn.originals().iterator().next()).getCompactionTask(txn, gcBefore, maxSSTableBytes); + } + + public Collection<AbstractCompactionTask> getMaximalTasks(final int gcBefore, final boolean splitOutput) + { + // runWithCompactionsDisabled cancels active compactions and disables them, then we are able + // to make the repaired/unrepaired strategies mark their own sstables as compacting. Once the + // sstables are marked the compactions are re-enabled + return cfs.runWithCompactionsDisabled(new Callable<Collection<AbstractCompactionTask>>() + { + @Override + public Collection<AbstractCompactionTask> call() throws Exception + { + synchronized (CompactionStrategyManager.this) + { + Collection<AbstractCompactionTask> repairedTasks = repaired.getMaximalTask(gcBefore, splitOutput); + Collection<AbstractCompactionTask> unrepairedTasks = unrepaired.getMaximalTask(gcBefore, splitOutput); + + if (repairedTasks == null && unrepairedTasks == null) + return null; + + if (repairedTasks == null) + return unrepairedTasks; + if (unrepairedTasks == null) + return repairedTasks; + + List<AbstractCompactionTask> tasks = new ArrayList<>(); + tasks.addAll(repairedTasks); + tasks.addAll(unrepairedTasks); + return tasks; + } + } + }, false, false); + } + + public AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) + { + return getCompactionStrategyFor(sstables.iterator().next()).getUserDefinedTask(sstables, gcBefore); + } + + public int getEstimatedRemainingTasks() + { + int tasks = 0; + tasks += repaired.getEstimatedRemainingTasks(); + tasks += unrepaired.getEstimatedRemainingTasks(); + + return tasks; + } + + public boolean shouldBeEnabled() + { + return params.isEnabled(); + } + + public String getName() + { + return unrepaired.getName(); + } + + public List<AbstractCompactionStrategy> getStrategies() + { + return Arrays.asList(repaired, unrepaired); + } ++ ++ public synchronized void setNewLocalCompactionStrategy(CompactionParams params) ++ { ++ logger.info("Switching local compaction strategy from {} to {}}", this.params, params); ++ setStrategy(params); ++ if (shouldBeEnabled()) ++ enable(); ++ else ++ disable(); ++ startup(); ++ } ++ ++ private void setStrategy(CompactionParams params) ++ { ++ if (repaired != null) ++ repaired.shutdown(); ++ if (unrepaired != null) ++ unrepaired.shutdown(); ++ repaired = CFMetaData.createCompactionStrategyInstance(cfs, params); ++ unrepaired = CFMetaData.createCompactionStrategyInstance(cfs, params); ++ this.params = params; ++ } ++ ++ public CompactionParams getCompactionParams() ++ { ++ return params; ++ } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/929438b8/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java index 0db231e,2798689..63b21df --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java @@@ -141,7 -145,88 +145,82 @@@ public class CompactionsCQLTest extend assertTrue(minorWasTriggered(KEYSPACE, currentTable())); } - public boolean minorWasTriggered(String keyspace, String cf) throws Throwable + @Test + public void testSetLocalCompactionStrategy() throws Throwable + { + createTable("CREATE TABLE %s (id text PRIMARY KEY)"); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class", "DateTieredCompactionStrategy"); + getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions); - WrappingCompactionStrategy wrappingCompactionStrategy = (WrappingCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategy(); - assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class)); ++ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class)); + // altering something non-compaction related + execute("ALTER TABLE %s WITH gc_grace_seconds = 1000"); + // should keep the local compaction strat - assertTrue(verifyStrategies(wrappingCompactionStrategy, DateTieredCompactionStrategy.class)); ++ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), DateTieredCompactionStrategy.class)); + // altering a compaction option + execute("ALTER TABLE %s WITH compaction = {'class':'SizeTieredCompactionStrategy', 'min_threshold':3}"); + // will use the new option - assertTrue(verifyStrategies(wrappingCompactionStrategy, SizeTieredCompactionStrategy.class)); ++ assertTrue(verifyStrategies(getCurrentColumnFamilyStore().getCompactionStrategyManager(), SizeTieredCompactionStrategy.class)); + } + + + @Test + public void testSetLocalCompactionStrategyDisable() throws Throwable + { + createTable("CREATE TABLE %s (id text PRIMARY KEY)"); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class", "DateTieredCompactionStrategy"); + localOptions.put("enabled", "false"); + getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions); - assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled()); ++ assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled()); + localOptions.clear(); + localOptions.put("class", "DateTieredCompactionStrategy"); + // localOptions.put("enabled", "true"); - this is default! + getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions); - assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled()); ++ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled()); + } + + + @Test + public void testSetLocalCompactionStrategyEnable() throws Throwable + { + createTable("CREATE TABLE %s (id text PRIMARY KEY)"); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class", "DateTieredCompactionStrategy"); + + getCurrentColumnFamilyStore().disableAutoCompaction(); - assertFalse(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled()); ++ assertFalse(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled()); + + getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions); - assertTrue(getCurrentColumnFamilyStore().getCompactionStrategy().isEnabled()); ++ assertTrue(getCurrentColumnFamilyStore().getCompactionStrategyManager().isEnabled()); + + } + + + + @Test(expected = IllegalArgumentException.class) + public void testBadLocalCompactionStrategyOptions() + { + createTable("CREATE TABLE %s (id text PRIMARY KEY)"); + Map<String, String> localOptions = new HashMap<>(); + localOptions.put("class","SizeTieredCompactionStrategy"); + localOptions.put("sstable_size_in_mb","1234"); // not for STCS + getCurrentColumnFamilyStore().setLocalCompactionStrategy(localOptions); + } + - public boolean verifyStrategies(WrappingCompactionStrategy wrappingStrategy, Class<? extends AbstractCompactionStrategy> expected) ++ public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected) + { + boolean found = false; - for (AbstractCompactionStrategy actualStrategy : wrappingStrategy.getWrappedStrategies()) ++ for (AbstractCompactionStrategy actualStrategy : manager.getStrategies()) + { + if (!actualStrategy.getClass().equals(expected)) + return false; + found = true; + } + return found; + } + - private ColumnFamilyStore getCurrentColumnFamilyStore() - { - return Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable()); - } - + private boolean minorWasTriggered(String keyspace, String cf) throws Throwable { UntypedResultSet res = execute("SELECT * FROM system.compaction_history"); boolean minorWasTriggered = false;