Hi all,

We have a need to be able to notify downstream systems when certain columns
have expired. Given the way Cassandra does distributed deletes and
tombstones it seems like compaction time is only time that this is fairly
predictable but the notifications will 'lag' a bit from when the columns
have actually expired.

Attached is a patch (against a 1.1.x branch) with what I am currently
working with. I am in the middle of getting a decent unit test together.

I wonder if this is even a sound approach, however. So, given the attached
patch I have a few questions:

1. What would be the best place to put a callback mechanism for column
notifications (given the context of a row key as well) where these expired
columns would be known about as soon as possible?

2. What about the "at compaction time" approach I have attached? I
currently have the mechanism plugged into the "PrecompactedRow.java" class
that will notify a custom compaction strategy if so desired.  As I
mentioned I am still working out a suitable unit test and am still
struggling with the flushes and timing involved (and perhaps caches?).


I'm grateful for any guidance you might have. Better places to hook this in
for compaction?  A more sound approach for earlier notifications? etc...


Regards,

Greg
From d0ad33821676b3895af5973aae2426bc53c0b25f Mon Sep 17 00:00:00 2001
From: Greg Cooper <gregory.coo...@iovation.com>
Date: Fri, 25 Oct 2013 14:25:36 -0700
Subject: [PATCH] Hook into the system for notifications when columns are
 removed during compaction

---
 .../db/compaction/IColumnRemovalListener.java      |   43 +++++++++++++++++
 .../cassandra/db/compaction/PrecompactedRow.java   |   42 ++++++++++++++--
 .../RemovalAwareLeveledCompactionStrategy.java     |   51 ++++++++++++++++++++
 3 files changed, 131 insertions(+), 5 deletions(-)
 create mode 100644 src/java/org/apache/cassandra/db/compaction/IColumnRemovalListener.java
 create mode 100644 src/java/org/apache/cassandra/db/compaction/RemovalAwareLeveledCompactionStrategy.java

diff --git a/src/java/org/apache/cassandra/db/compaction/IColumnRemovalListener.java b/src/java/org/apache/cassandra/db/compaction/IColumnRemovalListener.java
new file mode 100644
index 0000000..81fb350
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/IColumnRemovalListener.java
@@ -0,0 +1,43 @@
+package org.apache.cassandra.db.compaction;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+
+import java.util.Set;
+
+/**
+ * Implementations of this hear about when columns go away from a node during compaction. It is sometimes
+ * useful to know when data is no longer managed by Cassandra for external systems.
+ * Created by gcooper on 10/25/13.
+ */
+public interface IColumnRemovalListener {
+    /**
+     * This method will be called when columns are compacted from a row. The implementation of this is a custom
+     * CompactionStrategy class that is provided at runtime.
+     *
+     * @param key The key for the row where the columns were removed. (Helpful for context)
+     * @param removedCols Columns that went away during compaction.
+     */
+    void colsRemoved(DecoratedKey<?> key, Set<IColumn> removedCols);
+}
diff --git a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
index 8ed21ca..e69c8a1 100644
--- a/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
+++ b/src/java/org/apache/cassandra/db/compaction/PrecompactedRow.java
@@ -24,17 +24,16 @@ package org.apache.cassandra.db.compaction;
 import java.io.DataOutput;
 import java.io.IOError;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.MessageDigest;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.cassandra.db.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.ColumnIndexer;
-import org.apache.cassandra.db.CounterColumn;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.utils.HeapAllocator;
@@ -72,8 +71,27 @@ public class PrecompactedRow extends AbstractCompactedRow
         // We should only gc tombstone if shouldPurge == true. But otherwise,
         // it is still ok to collect column that shadowed by their (deleted)
         // container, which removeDeleted(cf, Integer.MAX_VALUE) will do
+
+        // Get a picture of what things look like before the row gets compacted.
+        // This is only done if the CompactionStrategy implements IColumnRemovalListener
+        IColumnRemovalListener removalListener = getColumnRemovalListener(controller);
+        Set<IColumn> priorCols = null;
+        if (removalListener != null) {
+            priorCols = new HashSet<IColumn>();
+            for (IColumn col : cf.getSortedColumns()) {
+                priorCols.add(col);
+            }
+        }
+
         ColumnFamily compacted = ColumnFamilyStore.removeDeleted(cf, shouldPurge != null && shouldPurge ? controller.gcBefore : Integer.MIN_VALUE);
 
+        // Again, this is only done if the CompactionStrategy is a IColumnRemovalListener
+        if (removalListener != null && priorCols != null) {
+            // Remove what's left from the 'priorCols'.  The result are the columns that were removed
+            priorCols.removeAll(compacted.getSortedColumns());
+            removalListener.colsRemoved(key, priorCols);
+        }
+
         if (compacted != null && compacted.metadata().getDefaultValidator().isCommutative())
         {
             if (shouldPurge == null)
@@ -85,6 +103,20 @@ public class PrecompactedRow extends AbstractCompactedRow
         return compacted;
     }
 
+    /**
+     * Examines the CompactionController and sees if the compaction strategy cares about column delete notifications.
+     * If so it will return it so that it can be told about columns that go away.
+     *
+     * @param compactionController What is managing the compaction process. (We get the strategy from in here)
+     * @return A column removal listener if available (implemented by compaction strategy)
+     */
+    private static IColumnRemovalListener getColumnRemovalListener(CompactionController compactionController) {
+        if (compactionController.cfs.getCompactionStrategy() instanceof IColumnRemovalListener) {
+            return (IColumnRemovalListener) compactionController.cfs.getCompactionStrategy();
+        }
+        return null;
+    }
+
     public static ColumnFamily removeDeletedAndOldShards(DecoratedKey key, boolean shouldPurge, CompactionController controller, ColumnFamily cf)
     {
         // See comment in preceding method
diff --git a/src/java/org/apache/cassandra/db/compaction/RemovalAwareLeveledCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/RemovalAwareLeveledCompactionStrategy.java
new file mode 100644
index 0000000..0d0d722
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/compaction/RemovalAwareLeveledCompactionStrategy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.IColumn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A LeveledCompactionStrategy that responds to columns being removed from the system during compaction. This
+ * simplified implementation just logs something in debug.
+ *
+ * Created by gcooper on 10/25/13.
+ */
+public class RemovalAwareLeveledCompactionStrategy extends LeveledCompactionStrategy implements IColumnRemovalListener{
+    private static final Logger logger = LoggerFactory.getLogger(RemovalAwareLeveledCompactionStrategy.class);
+    public RemovalAwareLeveledCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options) {
+        super(cfs, options);
+    }
+
+    @Override
+    public void colsRemoved(DecoratedKey<?> key, Set<IColumn> removedCols) {
+        StringBuffer removeLog = new StringBuffer();
+        removeLog.append(new String(key.key.array()));
+        for (IColumn col: removedCols) {
+            removeLog.append(',');
+            removeLog.append(new String(col.name().array()));
+        }
+        logger.debug("Row removal diagnostics (REMDAG): " + removeLog.toString());
+    }
+}
-- 
1.7.10.4

Reply via email to