This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 8fcf309dad871a373b41cb26c8e0b714c8c874ef
Author: Stefan Miklosovic <smikloso...@apache.org>
AuthorDate: Wed Apr 9 13:35:08 2025 +0200

    Implement appender of slow queries to system_views.slow_queries table
    
    patch by Stefan Miklosovic; reviewed by Dmitry Konstantinov, Bernardo 
Botella for CASSANDRA-13001
---
 CHANGES.txt                                        |   1 +
 conf/cassandra.yaml                                |   4 +
 conf/logback.xml                                   |  12 ++
 .../configuration/cass_logback_xml_file.adoc       |  68 ++++++-
 .../config/CassandraRelevantProperties.java        |   3 +
 .../org/apache/cassandra/db/AbstractReadQuery.java |  12 ++
 .../cassandra/db/monitoring/Monitorable.java       |  74 +++++++
 .../cassandra/db/monitoring/MonitoringTask.java    | 217 ++++++++++++++++++---
 .../db/virtual/AbstractLoggerVirtualTable.java     | 111 +++++++++++
 .../cassandra/db/virtual/LogMessagesTable.java     | 112 ++---------
 .../cassandra/db/virtual/SlowQueriesTable.java     | 195 ++++++++++++++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 .../apache/cassandra/service/CassandraDaemon.java  |  16 +-
 .../logging/AbstractVirtualTableAppender.java      | 144 ++++++++++++++
 .../utils/logging/LogbackLoggingSupport.java       |  28 ++-
 .../cassandra/utils/logging/LoggingSupport.java    |   8 +-
 .../utils/logging/SlowQueriesAppender.java         |  45 +++++
 .../utils/logging/VirtualTableAppender.java        |  92 +--------
 .../logback-dtest_with_slow_query_appender.xml     |  63 ++++++
 ...back-dtest_with_slow_query_appender_invalid.xml |  73 +++++++
 .../test/AbstractVirtualLogsTableTest.java         |  26 ++-
 .../distributed/test/SlowQueriesAppenderTest.java  |  73 +++++++
 .../distributed/test/SlowQueryDeserTest.java       |  66 +++++++
 .../distributed/test/VirtualTableLogsTest.java     |  44 +++--
 ...st.java => AbstractLoggerVirtualTableTest.java} | 124 +++++-------
 .../cassandra/db/virtual/LogMessagesTableTest.java | 144 +++-----------
 .../cassandra/db/virtual/SlowQueriesTableTest.java | 145 ++++++++++++++
 27 files changed, 1455 insertions(+), 446 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5b7cb6469c..1d86f30789 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Implement appender of slow queries to system_views.slow_queries table 
(CASSANDRA-13001)
  * Add autocompletion in CQLSH for built-in functions (CASSANDRA-19631)
  * Grant permission on keyspaces system_views and system_virtual_schema not 
possible (CASSANDRA-20171)
  * General Purpose Transactions (Accord) [CEP-15] (CASSANDRA-17092)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index fe80fda164..0c68afe500 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1396,6 +1396,10 @@ request_timeout: 10000ms
 # How long before a node logs slow queries. Select queries that take longer 
than
 # this timeout to execute, will generate an aggregated log message, so that 
slow queries
 # can be identified. Set this value to zero to disable slow query logging.
+#
+# It is possible to log slow queries into system_views.slow_queries virtual 
table.
+# Consult logback.xml to uncomment specific appender and logger to enable this 
functionality.
+#
 # Min unit: ms
 slow_query_log_timeout: 500ms
 
diff --git a/conf/logback.xml b/conf/logback.xml
index 102cf06352..a8dabf6564 100644
--- a/conf/logback.xml
+++ b/conf/logback.xml
@@ -119,6 +119,18 @@ appender reference in the root level section below.
     </filter>
   </appender> -->
 
+  <!-- Uncomment below configuration and corresponding appender-ref in 
slow_queries logger to activate
+       logging into system_views.slow_queries virtual table. -->
+  <!-- <appender name="SLOW_QUERIES_APPENDER" 
class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> -->
+
+  <!-- Log slow queries to system_views.slow_queries virtual table -->
+  <logger name="slow_queries" additivity="false" level="DEBUG">
+    <appender-ref ref="DEBUGLOG"/>
+    <!-- uncomment this appender reference together with appender definition 
above
+         to start to put slow queries into system_views.slow_queries virtual 
table -->
+    <!-- <appender-ref ref="SLOW_QUERIES_APPENDER"/> -->
+  </logger>
+
   <root level="INFO">
     <appender-ref ref="SYSTEMLOG" />
     <appender-ref ref="STDOUT" />
diff --git 
a/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc 
b/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc
index a62dfe91a7..b6e4d5f545 100644
--- 
a/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc
+++ 
b/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc
@@ -80,7 +80,7 @@ Specify the format of the message. Part of the rolling policy.
 <pattern>%-5level [%thread] %date\{ISO8601} %F:%L - %msg%n</pattern>
 </encoder>
 
-=== Logging to Cassandra virtual table
+=== Logging system logs to Cassandra virtual table
 
 It is possible to configure logback.xml in such a way that logs would appear 
in `system_views.system_log` table.
 This is achieved by appender implemented in class `VirtualTableAppender` which 
is called `CQLLOG` in the
@@ -101,6 +101,72 @@ each message will occupy memory.
 
 The appender to virtual table is commented out by default so logging to 
virtual table is not active.
 
+=== Logging slow queries to Cassandra virual table
+
+It is possible to log slow queries into `system_views.slow_queries` table. A 
query is evaluated to be slow
+if it takes more than `slow_query_log_timeout` in `cassandra.yaml`.
+
+To log messages to `system_views.slow_queries` you need to:
+
+1. uncomment `SLOW_QUERIES_APPENDER` log appender
+2. uncomment `appender-ref` pointing to `SLOW_QUERIES_APPENDER` in 
`slow_queries` logger:
+
+The respective configuration in `logback.xml` looks like this:
+
+[source,XML]
+----
+  <!-- Uncomment below configuration and corresponding appender-ref in 
slow_queries logger to activate
+       logging into system_views.slow_queries virtual table. -->
+  <!-- <appender name="SLOW_QUERIES_APPENDER" 
class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> -->
+
+  <!-- Log slow queries to system_views.slow_queries virtual table -->
+  <logger name="slow_queries" additivity="false" level="DEBUG">
+    <appender-ref ref="DEBUGLOG"/>
+    <!-- uncomment this appender reference together with appender definition 
above
+         to start to put slow queries into system_views.slow_queries virtual 
table
+    <appender-ref ref="SLOW_QUERIES_APPENDER"/>
+    -->
+  </logger>
+----
+
+By default, slow queries will be logged to `debug.log`. By uncommenting 
virtual table appender, it will be
+logged to `debug.log` as well as to `system_views.slow_queries`. If you want 
to log it to `system_views.slow_queries` only, you need to comment out 
`DEBUGLOG` `appender-ref` in `slow_queries` logger declaration.
+
+If you want to log slow queries to a dedicated log file (which is e.g. 
rotated), that is also possible
+by pointing `slow_queries` logger to a respective file appender of a given 
reference, similar to `DEBUGLOG` where all logs go by default.
+
+The structure of a table looks like this:
+
+[source,cql]
+----
+cassandra@cqlsh> DESCRIBE system_views.slow_queries ;
+
+/*
+Warning: Table system_views.slow_queries is a virtual table and cannot be 
recreated with CQL.
+Structure, for reference:
+VIRTUAL TABLE system_views.slow_queries (
+    keyspace_name text,
+    table_name text,
+    timestamp timestamp,
+    query text,
+    avg_ms bigint,
+    cross_node boolean,
+    max_ms bigint,
+    min_ms bigint,
+    times_reported int,
+    PRIMARY KEY (keyspace_name, table_name, timestamp, query)
+) WITH CLUSTERING ORDER BY (table_name ASC, timestamp ASC, query ASC)
+    AND comment = 'Slow queries';
+----
+
+By having slow queries in a virtual table, an operator can check if there are 
slow queries for some table, see if
+some queries violate some time threshold etc. The rows in this table are same 
data as one would get in `debug.log`, they
+are just way more convenient to parse and query.
+
+`system_views.slow_queries` table is limited on number of rows it can hold, by 
default 10 000, configurable by `cassandra.virtual.slow_queries.max.rows` 
system property. If this table is full, the oldest entry is removed and the 
newest is inserted. This virtual table can be truncated by CQL and deletion on 
partition key (`keyspace_name` column) is allowed.
+
+A reader noticed that by placing custom appender implementation of 
`SLOW_QUERIES_APPENDER` appender on a class path and referencing it in 
`logback.xml`, it is possible to log slow queries wherever we have an appender 
for it.
+
 === Contents of default `logback.xml`
 
 [source,XML]
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 7459fecb8c..e2957ce95f 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -27,6 +27,7 @@ import com.google.common.primitives.Ints;
 
 import accord.utils.Invariants;
 import org.apache.cassandra.db.virtual.LogMessagesTable;
+import org.apache.cassandra.db.virtual.SlowQueriesTable;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.StorageCompatibilityMode;
@@ -363,6 +364,8 @@ public enum CassandraRelevantProperties
     LOG4J2_DISABLE_JMX_LEGACY("log4j2.disable.jmx"),
     LOG4J_SHUTDOWN_HOOK_ENABLED("log4j.shutdownHookEnabled"),
     LOGBACK_CONFIGURATION_FILE("logback.configurationFile"),
+    /** Maximum number of rows in system_views.slow_queries */
+    
LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.slow_queries.max.rows",
 convertToString(SlowQueriesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)),
     /** Maximum number of rows in system_views.logs table */
     LOGS_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.logs.max.rows", 
convertToString(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)),
     /**
diff --git a/src/java/org/apache/cassandra/db/AbstractReadQuery.java 
b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
index 448069cfca..2e72c7ec4f 100644
--- a/src/java/org/apache/cassandra/db/AbstractReadQuery.java
+++ b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
@@ -118,4 +118,16 @@ abstract class AbstractReadQuery extends MonitorableImpl 
implements ReadQuery
     }
 
     protected abstract void appendCQLWhereClause(StringBuilder sb);
+
+    @Override
+    public String monitoredOnKeyspace()
+    {
+        return metadata().keyspace;
+    }
+
+    @Override
+    public String monitoredOnTable()
+    {
+        return metadata().name;
+    }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java 
b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
index 10bd10438a..4288a667de 100644
--- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
+++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.monitoring;
 
 public interface Monitorable
 {
+    Monitorable NO_OP = new NoOp();
+
     String name();
     long creationTimeNanos();
     long timeoutNanos();
@@ -33,4 +35,76 @@ public interface Monitorable
 
     boolean abort();
     boolean complete();
+
+    default String monitoredOnKeyspace() { return null; };
+    default String monitoredOnTable() { return null; };
+
+    class NoOp implements Monitorable
+    {
+        @Override
+        public String name()
+        {
+            return null;
+        }
+
+        @Override
+        public long creationTimeNanos()
+        {
+            return 0;
+        }
+
+        @Override
+        public long timeoutNanos()
+        {
+            return 0;
+        }
+
+        @Override
+        public long slowTimeoutNanos()
+        {
+            return 0;
+        }
+
+        @Override
+        public boolean isInProgress()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isAborted()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isCompleted()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isSlow()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean isCrossNode()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean abort()
+        {
+            return false;
+        }
+
+        @Override
+        public boolean complete()
+        {
+            return false;
+        }
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java 
b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
index 243569910b..4d6d995c77 100644
--- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
+++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.monitoring;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -32,9 +33,19 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.JsonUtils;
 import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.logging.LoggingSupport;
+import org.apache.cassandra.utils.logging.LoggingSupportFactory;
+import org.apache.cassandra.utils.logging.SlowQueriesAppender;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.MONITORING_MAX_OPERATIONS;
@@ -47,8 +58,9 @@ import static 
org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQu
  * We also log timed out operations, see CASSANDRA-7392.
  * Since CASSANDRA-12403 we also log queries that were slow.
  */
-class MonitoringTask
+public class MonitoringTask
 {
+    private static final String SLOW_OPERATIONS_LOGGER_NAME = "slow_queries";
     private static final String LINE_SEPARATOR = 
CassandraRelevantProperties.LINE_SEPARATOR.getString();
     private static final Logger logger = 
LoggerFactory.getLogger(MonitoringTask.class);
     private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES);
@@ -70,6 +82,8 @@ class MonitoringTask
     private final ScheduledFuture<?> reportingTask;
     private final OperationsQueue failedOperationsQueue;
     private final OperationsQueue slowOperationsQueue;
+    private Logger slowOperationsLogger = logger;
+    private boolean slowOperationsLoggedToVirtualTable;
     private long approxLastLogTimeNanos;
 
 
@@ -97,6 +111,15 @@ class MonitoringTask
                                                                                
      reportIntervalMillis,
                                                                                
      reportIntervalMillis,
                                                                                
      TimeUnit.MILLISECONDS);
+
+        LoggingSupport support = LoggingSupportFactory.getLoggingSupport();
+        if (support.getLogger(SLOW_OPERATIONS_LOGGER_NAME).isPresent())
+        {
+            if (support.getAppender(SlowQueriesAppender.class, 
SlowQueriesAppender.APPENDER_NAME).isPresent())
+                slowOperationsLoggedToVirtualTable = true;
+
+            slowOperationsLogger = 
LoggerFactory.getLogger(SLOW_OPERATIONS_LOGGER_NAME);
+        }
     }
 
     public void cancel()
@@ -169,14 +192,30 @@ class MonitoringTask
         if (!slowOperations.isEmpty())
         {
             long approxElapsedNanos = approxCurrentTimeNanos - 
approxLastLogTimeNanos;
-            noSpamLogger.info("Some operations were slow, details available at 
debug level (debug.log)");
+            noSpamLogger.info("Some operations were slow, details available at 
debug level (debug.log) or " +
+                              "system_views.slow_queries virtual table (when 
enabled).");
+
+            if (slowOperationsLogger.isDebugEnabled())
+            {
+                if (slowOperationsLoggedToVirtualTable)
+                {
+                    // This is the crux of the patch for appending to vtable.
+                    // Because we can send only Strings to debug method (or 
objects, on which toString()
+                    // would be eventually called), we need to log a string in 
such a way that we can
+                    // get Operation object(s) back "on the other side" when 
dealing with vtables and custom appenders
+                    // as appenders work with LoggingEvent where message is 
just a string.
+                    // It would be very hard / tricky / error-prone to parse 
customly crafted log message
+                    // which appears in logs when no vtable appender is used.
+                    
slowOperationsLogger.debug(Operation.serialize(slowOperations.getOperations()));
+                }
+                else
+                    slowOperationsLogger.debug("{} operations were slow in the 
last {} msecs:{}{}",
+                                               slowOperations.num(),
+                                               
NANOSECONDS.toMillis(approxElapsedNanos),
+                                               LINE_SEPARATOR,
+                                               slowOperations.getLogMessage());
+            }
 
-            if (logger.isDebugEnabled())
-                logger.debug("{} operations were slow in the last {} 
msecs:{}{}",
-                             slowOperations.num(),
-                             NANOSECONDS.toMillis(approxElapsedNanos),
-                             LINE_SEPARATOR,
-                             slowOperations.getLogMessage());
             return true;
         }
         return false;
@@ -274,6 +313,12 @@ class MonitoringTask
             return operations.size() + numDropped;
         }
 
+        private Collection<Operation> getOperations()
+        {
+            return operations.values();
+        }
+
+        @JsonIgnore
         String getLogMessage()
         {
             if (isEmpty())
@@ -307,9 +352,16 @@ class MonitoringTask
      * same name (CQL query text) is reported and store the average, min and 
max
      * times.
      */
-    protected abstract static class Operation
+    @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "id")
+    @JsonSubTypes({ @JsonSubTypes.Type(value = SlowOperation.class) })
+    @VisibleForTesting
+    public abstract static class Operation
     {
+        @JsonProperty
+        String id = getClass().getName();
+
         /** The operation that was reported as slow or timed out */
+        @JsonIgnore
         final Monitorable operation;
 
         /** The number of times the operation was reported */
@@ -319,24 +371,50 @@ class MonitoringTask
         long totalTimeNanos;
 
         /** The maximum time spent by this operation */
-        long maxTime;
+        long maxTimeNanos;
 
         /** The minimum time spent by this operation */
-        long minTime;
+        long minTimeNanos;
 
         /** The name of the operation, i.e. the SELECT query CQL,
          * this is set lazily as it takes time to build the query CQL */
         private String name;
 
+        /**
+         * creation time of this Operation object, in ms,
+         * this is different from operation's creationTimeNanos
+         * which does not follow wall clock and is useless for
+         * reporting purposes e.g. in virtual tables
+         */
+        private final long timestampMs;
+
+        // optional keyspace and table this operation acts on
+        // used upon deserialization
+        private String keyspace;
+        private String table;
+        private boolean crossNode;
+
         Operation(Monitorable operation, long failedAtNanos)
         {
             this.operation = operation;
             numTimesReported = 1;
             totalTimeNanos = failedAtNanos - operation.creationTimeNanos();
-            minTime = totalTimeNanos;
-            maxTime = totalTimeNanos;
+            minTimeNanos = totalTimeNanos;
+            maxTimeNanos = totalTimeNanos;
+            timestampMs = Clock.Global.currentTimeMillis() - 
(Clock.Global.nanoTime() - operation.creationTimeNanos()) / 1_000_000;
+        }
+
+        void add(Operation operation)
+        {
+            numTimesReported++;
+            totalTimeNanos += operation.totalTimeNanos;
+            maxTimeNanos = Math.max(maxTimeNanos, operation.maxTimeNanos);
+            minTimeNanos = Math.min(minTimeNanos, operation.minTimeNanos);
         }
 
+        public abstract String getLogMessage();
+
+        @JsonProperty
         public String name()
         {
             if (name == null)
@@ -344,15 +422,96 @@ class MonitoringTask
             return name;
         }
 
-        void add(Operation operation)
+        @JsonProperty
+        public String keyspace()
         {
-            numTimesReported++;
-            totalTimeNanos += operation.totalTimeNanos;
-            maxTime = Math.max(maxTime, operation.maxTime);
-            minTime = Math.min(minTime, operation.minTime);
+            if (operation != null)
+            {
+                String monitored = operation.monitoredOnKeyspace();
+                if (monitored != null)
+                    return monitored;
+            }
+            return keyspace;
         }
 
-        public abstract String getLogMessage();
+        public void setKeyspace(String keyspace)
+        {
+            this.keyspace = keyspace;
+        }
+
+        public void setTable(String table)
+        {
+            this.table = table;
+        }
+
+        @JsonProperty
+        public String table()
+        {
+            if (operation != null)
+            {
+                String monitored = operation.monitoredOnTable();
+                if (monitored != null)
+                    return monitored;
+            }
+            return table;
+        }
+
+        @JsonProperty
+        public boolean isCrossNode()
+        {
+            if (operation != null)
+                return operation.isCrossNode();
+
+            return crossNode;
+        }
+
+        @JsonProperty
+        public int numTimesReported()
+        {
+            return numTimesReported;
+        }
+
+        @JsonProperty
+        public long totalTimeNanos()
+        {
+            return totalTimeNanos;
+        }
+
+        @JsonProperty
+        public long maxTimeNanos()
+        {
+            return maxTimeNanos;
+        }
+
+        @JsonProperty
+        public long minTimeNanos()
+        {
+            return minTimeNanos;
+        }
+
+        @JsonIgnore
+        public long averageTime()
+        {
+            return totalTimeNanos / numTimesReported;
+        }
+
+        @JsonProperty
+        public long timestampMs()
+        {
+            return timestampMs;
+        }
+
+        public static String serialize(Collection<Operation> operations)
+        {
+            return JsonUtils.writeAsJsonString(operations);
+        }
+
+        private static final TypeReference<List<Operation>> TYPE_REFERENCE = 
new TypeReference<>() {};
+
+        public static List<Operation> deserialize(String message) throws 
Throwable
+        {
+            return JsonUtils.JSON_OBJECT_MAPPER.readValue(message, 
TYPE_REFERENCE);
+        }
     }
 
     /**
@@ -378,8 +537,8 @@ class MonitoringTask
                                      name(),
                                      numTimesReported,
                                      NANOSECONDS.toMillis(totalTimeNanos / 
numTimesReported),
-                                     NANOSECONDS.toMillis(minTime),
-                                     NANOSECONDS.toMillis(maxTime),
+                                     NANOSECONDS.toMillis(minTimeNanos),
+                                     NANOSECONDS.toMillis(maxTimeNanos),
                                      
NANOSECONDS.toMillis(operation.timeoutNanos()),
                                      operation.isCrossNode() ? 
"msec/cross-node" : "msec");
         }
@@ -388,13 +547,21 @@ class MonitoringTask
     /**
      * An operation (query) that was reported as slow.
      */
-    private final static class SlowOperation extends Operation
+    @VisibleForTesting
+    public final static class SlowOperation extends Operation
     {
-        SlowOperation(Monitorable operation, long failedAt)
+        // purely for deserialization purposes
+        public SlowOperation()
+        {
+            this(Monitorable.NO_OP, 0);
+        }
+
+        public SlowOperation(Monitorable operation, long failedAt)
         {
             super(operation, failedAt);
         }
 
+        @JsonIgnore
         public String getLogMessage()
         {
             if (numTimesReported == 1)
@@ -408,8 +575,8 @@ class MonitoringTask
                                      name(),
                                      numTimesReported,
                                      NANOSECONDS.toMillis(totalTimeNanos/ 
numTimesReported),
-                                     NANOSECONDS.toMillis(minTime),
-                                     NANOSECONDS.toMillis(maxTime),
+                                     NANOSECONDS.toMillis(minTimeNanos),
+                                     NANOSECONDS.toMillis(maxTimeNanos),
                                      
NANOSECONDS.toMillis(operation.slowTimeoutNanos()),
                                      operation.isCrossNode() ? 
"msec/cross-node" : "msec");
         }
diff --git 
a/src/java/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTable.java
new file mode 100644
index 0000000000..008d5d432a
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTable.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * This table is inherently limited on number of rows it can hold.
+ *
+ * @param <U> type parameter saying what object is stored in internal bounded 
list for query purposes
+ */
+public abstract class AbstractLoggerVirtualTable<U> extends 
AbstractMutableVirtualTable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractLoggerVirtualTable.class);
+
+    // please be sure operations on this structure are thread-safe
+    protected final List<U> buffer;
+
+    @VisibleForTesting
+    protected static int resolveBufferSize(int wantedSize, int max, int 
defaultSize)
+    {
+        return (wantedSize < 1 || wantedSize > max) ? defaultSize : wantedSize;
+    }
+
+    protected AbstractLoggerVirtualTable(TableMetadata metadata, int maxSize)
+    {
+        super(metadata);
+        this.buffer = BoundedLinkedList.create(maxSize);
+        logger.debug("capacity of virtual table {} is set to be at most {} 
rows", metadata().toString(), maxSize);
+    }
+
+    public void add(LoggingEvent event)
+    {
+        List<U> messages = getMessages(event);
+        if (messages != null)
+        {
+            // specifically calling buffer.add to reach BoundedLinkedList's add
+            // instead of linked list's addAll
+            for (U message : messages)
+                buffer.add(message);
+        }
+    }
+
+    public abstract List<U> getMessages(LoggingEvent event);
+
+    @Override
+    public void truncate()
+    {
+        synchronized (buffer)
+        {
+            buffer.clear();
+        }
+    }
+
+    @Override
+    public boolean allowFilteringImplicitly()
+    {
+        return false;
+    }
+
+    private static final class BoundedLinkedList<T> extends LinkedList<T>
+    {
+        private final int maxSize;
+
+        public static <T> List<T> create(int size)
+        {
+            return Collections.synchronizedList(new BoundedLinkedList<>(size));
+        }
+
+        private BoundedLinkedList(int maxSize)
+        {
+            this.maxSize = maxSize;
+        }
+
+        @Override
+        public synchronized boolean add(T t)
+        {
+            if (size() == maxSize)
+                removeLast();
+
+            addFirst(t);
+
+            return true;
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java 
b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
index 5903ac2ab5..87978e3fd9 100644
--- a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java
@@ -18,15 +18,11 @@
 
 package org.apache.cassandra.db.virtual;
 
-import java.util.Collections;
 import java.util.Date;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import ch.qos.logback.classic.spi.LoggingEvent;
 import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -50,11 +46,8 @@ import org.apache.cassandra.schema.TableMetadata;
  * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-18238";>CASSANDRA-18238</a>
  * @see org.apache.cassandra.utils.logging.VirtualTableAppender
  */
-public final class LogMessagesTable extends AbstractMutableVirtualTable
+public final class LogMessagesTable extends 
AbstractLoggerVirtualTable<LoggingEvent>
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(LogMessagesTable.class);
-
-    public static final int LOGS_VIRTUAL_TABLE_MIN_ROWS = 1000;
     public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 50_000;
     public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000;
 
@@ -67,11 +60,11 @@ public final class LogMessagesTable extends 
AbstractMutableVirtualTable
     public static final String LEVEL_COLUMN_NAME = "level";
     public static final String MESSAGE_COLUMN_NAME = "message";
 
-    private final List<LogMessage> buffer;
-
     LogMessagesTable(String keyspace)
     {
-        this(keyspace, resolveBufferSize());
+        this(keyspace, 
resolveBufferSize(CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt(),
+                                         LOGS_VIRTUAL_TABLE_MAX_ROWS,
+                                         LOGS_VIRTUAL_TABLE_DEFAULT_ROWS));
     }
 
     @VisibleForTesting
@@ -85,10 +78,14 @@ public final class LogMessagesTable extends 
AbstractMutableVirtualTable
                            
.addClusteringColumn(ORDER_IN_MILLISECOND_COLUMN_NAME, Int32Type.instance)
                            .addRegularColumn(LOGGER_COLUMN_NAME, 
UTF8Type.instance)
                            .addRegularColumn(LEVEL_COLUMN_NAME, 
UTF8Type.instance)
-                           .addRegularColumn(MESSAGE_COLUMN_NAME, 
UTF8Type.instance).build());
+                           .addRegularColumn(MESSAGE_COLUMN_NAME, 
UTF8Type.instance).build(),
+              size);
+    }
 
-        logger.debug("capacity of virtual table {} is set to be at most {} 
rows", metadata().toString(), size);
-        buffer = BoundedLinkedList.create(size);
+    @Override
+    public List<LoggingEvent> getMessages(LoggingEvent event)
+    {
+        return List.of(event);
     }
 
     @Override
@@ -103,12 +100,12 @@ public final class LogMessagesTable extends 
AbstractMutableVirtualTable
 
             int index = 0;
 
-            Iterator<LogMessage> iterator = buffer.listIterator();
+            Iterator<LoggingEvent> iterator = buffer.listIterator();
             while (iterator.hasNext())
             {
-                LogMessage log = iterator.next();
+                LoggingEvent log = iterator.next();
 
-                milliSecondsOfCurrentLog = log.timestamp;
+                milliSecondsOfCurrentLog = log.getTimeStamp();
                 if (milliSecondsOfPreviousLog == milliSecondsOfCurrentLog)
                     ++index;
                 else
@@ -116,86 +113,13 @@ public final class LogMessagesTable extends 
AbstractMutableVirtualTable
 
                 milliSecondsOfPreviousLog = milliSecondsOfCurrentLog;
 
-                result.row(new Date(log.timestamp), index)
-                      .column(LOGGER_COLUMN_NAME, log.logger)
-                      .column(LEVEL_COLUMN_NAME, log.level)
-                      .column(MESSAGE_COLUMN_NAME, log.message);
+                result.row(new Date(milliSecondsOfCurrentLog), index)
+                      .column(LOGGER_COLUMN_NAME, log.getLoggerName())
+                      .column(LEVEL_COLUMN_NAME, log.getLevel().toString())
+                      .column(MESSAGE_COLUMN_NAME, log.getFormattedMessage());
             }
         }
 
         return result;
     }
-
-    public void add(LoggingEvent event)
-    {
-        buffer.add(new LogMessage(event));
-    }
-
-    @Override
-    public void truncate()
-    {
-        buffer.clear();
-    }
-
-    @Override
-    public boolean allowFilteringImplicitly()
-    {
-        return false;
-    }
-
-    @VisibleForTesting
-    static int resolveBufferSize()
-    {
-        int size = 
CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt();
-        return (size < LOGS_VIRTUAL_TABLE_MIN_ROWS || size > 
LOGS_VIRTUAL_TABLE_MAX_ROWS)
-               ? LOGS_VIRTUAL_TABLE_DEFAULT_ROWS : size;
-    }
-
-    @VisibleForTesting
-    public static class LogMessage
-    {
-        public final long timestamp;
-        public final String logger;
-        public final String level;
-        public final String message;
-
-        public LogMessage(LoggingEvent event)
-        {
-            this(event.getTimeStamp(), event.getLoggerName(), 
event.getLevel().toString(), event.getFormattedMessage());
-        }
-
-        public LogMessage(long timestamp, String logger, String level, String 
message)
-        {
-            this.timestamp = timestamp;
-            this.logger = logger;
-            this.level = level;
-            this.message = message;
-        }
-    }
-
-    private static final class BoundedLinkedList<T> extends LinkedList<T>
-    {
-        private final int maxSize;
-
-        public static <T> List<T> create(int size)
-        {
-            return Collections.synchronizedList(new BoundedLinkedList<>(size));
-        }
-
-        private BoundedLinkedList(int maxSize)
-        {
-            this.maxSize = maxSize;
-        }
-
-        @Override
-        public boolean add(T t)
-        {
-            if (size() == maxSize)
-                removeLast();
-
-            addFirst(t);
-
-            return true;
-        }
-    }
 }
diff --git a/src/java/org/apache/cassandra/db/virtual/SlowQueriesTable.java 
b/src/java/org/apache/cassandra/db/virtual/SlowQueriesTable.java
new file mode 100644
index 0000000000..0d392d0ce2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/SlowQueriesTable.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.BooleanType;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.TimestampType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.monitoring.MonitoringTask.Operation;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+public class SlowQueriesTable extends AbstractLoggerVirtualTable<Operation>
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(SlowQueriesTable.class);
+
+    public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 10_000;
+    public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000;
+
+    public static final String TABLE_NAME = "slow_queries";
+    private static final String TABLE_COMMENT = "Slow queries";
+
+    public static final String KEYSPACE_COLUMN_NAME = "keyspace_name";
+    public static final String TABLE_COLUMN_NAME = "table_name";
+    public static final String TIMESTAMP_COLUMN_NAME = "timestamp";
+    public static final String QUERY_COLUMN_NAME = "query";
+    public static final String MINIMUM_TIME_COLUMN_NAME = "min_ms";
+    public static final String MAXIMUM_TIME_COLUMN_NAME = "max_ms";
+    public static final String AVERAGE_TIME_COLUMN_NAME = "avg_ms";
+    public static final String TIMES_REPORTED_COLUMN_NAME = "times_reported";
+    public static final String CROSS_NODE_COLUMN_NAME = "cross_node";
+
+    SlowQueriesTable(String keyspace)
+    {
+        this(keyspace, 
resolveBufferSize(CassandraRelevantProperties.LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.getInt(),
+                                         LOGS_VIRTUAL_TABLE_MAX_ROWS,
+                                         LOGS_VIRTUAL_TABLE_DEFAULT_ROWS));
+    }
+
+    @VisibleForTesting
+    SlowQueriesTable(String keyspace, int size)
+    {
+        super(TableMetadata.builder(keyspace, TABLE_NAME)
+                           .comment(TABLE_COMMENT)
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new 
LocalPartitioner(UTF8Type.instance))
+                           .addPartitionKeyColumn(KEYSPACE_COLUMN_NAME, 
UTF8Type.instance)
+                           .addClusteringColumn(TABLE_COLUMN_NAME, 
UTF8Type.instance)
+                           .addClusteringColumn(TIMESTAMP_COLUMN_NAME, 
TimestampType.instance)
+                           // We are adding query as a clustering column for 
uniqueness,
+                           // In theory, it might happen that two monitoring 
operations
+                           // would be emitted for same keyspace, same table 
at the exact same time
+                           // (in milliseconds). That means that one operation 
would "shadow"
+                           // another one because primary key would be same 
for both.
+                           // To make it truly unique, we include query among 
clustering keys
+                           // as well. If queries were same, then they would 
be also reported so
+                           // (it would be reflected in "times_reported" 
column)
+                           .addClusteringColumn(QUERY_COLUMN_NAME, 
UTF8Type.instance)
+                           .addRegularColumn(MINIMUM_TIME_COLUMN_NAME, 
LongType.instance)
+                           .addRegularColumn(MAXIMUM_TIME_COLUMN_NAME, 
LongType.instance)
+                           .addRegularColumn(AVERAGE_TIME_COLUMN_NAME, 
LongType.instance)
+                           .addRegularColumn(TIMES_REPORTED_COLUMN_NAME, 
Int32Type.instance)
+                           .addRegularColumn(CROSS_NODE_COLUMN_NAME, 
BooleanType.instance)
+                           .build(),
+              size);
+    }
+
+    @Override
+    protected void applyPartitionDeletion(ColumnValues partitionKey)
+    {
+        String keyspace = partitionKey.value(0);
+
+        synchronized (buffer)
+        {
+            buffer.removeIf(o -> o.keyspace().equals(keyspace));
+        }
+    }
+
+    @Override
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata(), 
DecoratedKey.comparator.reversed());
+
+        synchronized (buffer)
+        {
+            Iterator<Operation> iterator = buffer.listIterator();
+            while (iterator.hasNext())
+            {
+                Operation operation = iterator.next();
+
+                result.row(operation.keyspace(), operation.table(), new 
Date(operation.timestampMs()), operation.name())
+                      .column(MINIMUM_TIME_COLUMN_NAME, 
NANOSECONDS.toMillis(operation.minTimeNanos()))
+                      .column(MAXIMUM_TIME_COLUMN_NAME, 
NANOSECONDS.toMillis(operation.maxTimeNanos()))
+                      .column(AVERAGE_TIME_COLUMN_NAME, 
NANOSECONDS.toMillis(operation.averageTime()))
+                      .column(TIMES_REPORTED_COLUMN_NAME, 
operation.numTimesReported())
+                      .column(CROSS_NODE_COLUMN_NAME, operation.isCrossNode());
+            }
+        }
+
+        return result;
+    }
+
+    @Override
+    public List<Operation> getMessages(LoggingEvent event)
+    {
+        try
+        {
+            List<Operation> qualified = new ArrayList<>();
+            for (Operation operation : 
Operation.deserialize(event.getMessage()))
+            {
+
+                // in (improbable) case there is an operation which does not 
have
+                // keyspace / table on it, we just skip this from processing
+                // as we would have nothing to show for partition key and 
clustering column
+                if (operation.keyspace() == null || operation.table() == null)
+                    continue;
+
+                // if cf of an operation is present, take keyspace and table 
name from it
+                // instead of having new string instances per operation which 
might
+                // take relatively a lot of additional space unnecessarily
+                Keyspace keyspace = 
Keyspace.openIfExists(operation.keyspace());
+                String keyspaceName;
+                String tableName;
+                if (keyspace != null)
+                {
+                    keyspaceName = keyspace.getName();
+                    try
+                    {
+                        ColumnFamilyStore table = 
keyspace.getColumnFamilyStore(operation.table());
+                        tableName = table.getTableName();
+                    }
+                    catch (IllegalArgumentException ex)
+                    {
+                        tableName = operation.table();
+                    }
+                }
+                else
+                {
+                    keyspaceName = operation.keyspace();
+                    tableName = operation.table();
+                }
+
+                operation.setKeyspace(keyspaceName);
+                operation.setTable(tableName);
+                qualified.add(operation);
+            }
+
+            return qualified;
+        }
+        catch (Throwable t)
+        {
+            logger.trace("Unable to generate list of slow queries", t);
+            return null;
+        }
+    }
+
+    @Override
+    public boolean allowFilteringImplicitly()
+    {
+        return true;
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index 3ca8f728a8..28c6dc8fef 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -61,6 +61,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new GossipInfoTable(VIRTUAL_VIEWS))
                     .add(new QueriesTable(VIRTUAL_VIEWS))
                     .add(new LogMessagesTable(VIRTUAL_VIEWS))
+                    .add(new SlowQueriesTable(VIRTUAL_VIEWS))
                     .add(new SnapshotsTable(VIRTUAL_VIEWS))
                     .add(new PeersTable(VIRTUAL_VIEWS))
                     .add(new LocalTable(VIRTUAL_VIEWS))
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index d3c787d2e2..171ec47e1e 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -59,6 +59,8 @@ import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspaceMigrator41;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
+import org.apache.cassandra.db.virtual.LogMessagesTable;
+import org.apache.cassandra.db.virtual.SlowQueriesTable;
 import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
 import org.apache.cassandra.db.virtual.VirtualKeyspace;
 import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
@@ -94,6 +96,7 @@ import org.apache.cassandra.utils.NativeLibrary;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
 import org.apache.cassandra.utils.logging.LoggingSupportFactory;
+import org.apache.cassandra.utils.logging.SlowQueriesAppender;
 import org.apache.cassandra.utils.logging.VirtualTableAppender;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -436,7 +439,6 @@ public class CassandraDaemon
         {
             exitOrFail(e.returnCode, e.getMessage(), e.getCause());
         }
-
     }
 
     /**
@@ -555,11 +557,17 @@ public class CassandraDaemon
         if (DatabaseDescriptor.getAccord().enable_virtual_debug_only_keyspace)
             
VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
 
-        // flush log messages to system_views.system_logs virtual table as 
there were messages already logged
-        // before that virtual table was instantiated
+        // Flush log messages to system_views.system_logs virtual table as 
there were messages already logged
+        // before that virtual table was instantiated.
+        // In general, there is no need to do same treatment for slow queries 
as by the time queries are processed
+        // the logging framework if fully setup already but for the sake of it 
and to be sure, just do it as well.
         LoggingSupportFactory.getLoggingSupport()
                              .getAppender(VirtualTableAppender.class, 
VirtualTableAppender.APPENDER_NAME)
-                             .ifPresent(appender -> ((VirtualTableAppender) 
appender).flushBuffer());
+                             .ifPresent(appender -> 
appender.flushBuffer(LogMessagesTable.class, LogMessagesTable.TABLE_NAME));
+
+        LoggingSupportFactory.getLoggingSupport()
+                             .getAppender(SlowQueriesAppender.class, 
SlowQueriesAppender.APPENDER_NAME)
+                             .ifPresent(appender -> 
appender.flushBuffer(SlowQueriesTable.class, SlowQueriesTable.TABLE_NAME));
     }
 
     public synchronized void initializeClientTransports()
diff --git 
a/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java 
b/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java
new file mode 100644
index 0000000000..7becbc13fc
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.logging;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable;
+import org.apache.cassandra.db.virtual.SlowQueriesTable;
+import org.apache.cassandra.db.virtual.VirtualKeyspace;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
+import org.apache.cassandra.db.virtual.VirtualTable;
+
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
+
+public abstract class AbstractVirtualTableAppender extends 
AppenderBase<LoggingEvent>
+{
+    private final int defaultRows;
+
+    protected AbstractVirtualTableAppender(int defaultRows)
+    {
+        this.defaultRows = defaultRows;
+    }
+
+    // for holding messages until virtual registry contains logs virtual table
+    // as it takes some time during startup of a node to initialise virtual 
tables but messages are
+    // logged already
+    protected final List<LoggingEvent> messageBuffer = new LinkedList<>();
+
+    protected <T> T getVirtualTable(Class<T> vtableClass, String tableName)
+    {
+        VirtualKeyspace keyspace = 
VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS);
+
+        if (keyspace == null)
+            return null;
+
+        Optional<VirtualTable> virtualTable = keyspace.tables()
+                                                      .stream()
+                                                      .filter(vt -> 
vt.name().equals(tableName))
+                                                      .findFirst();
+
+        if (virtualTable.isEmpty())
+            return null;
+
+        VirtualTable vt = virtualTable.get();
+
+        if (!vt.getClass().equals(vtableClass))
+            throw new IllegalStateException(String.format("Virtual table %s.%s 
is not backed by an instance of %s but by %s",
+                                                          VIRTUAL_VIEWS,
+                                                          tableName,
+                                                          
vtableClass.getName(),
+                                                          
vt.getClass().getName()));
+
+        return (T) vt;
+    }
+
+    /**
+     * This method adds an event to virtual table, when present.
+     * When vtable is null, we will attempt to find it among registered ones. 
Then not found, we add it to internal
+     * buffer for later processing. This might happen e.g. for logging tables 
when log events
+     * were appended via logging framework sooner than registration of virtual 
tables was done so after they are registered,
+     * they would miss logging events happened before being so.
+     *
+     * @param vtable    vtable to append to
+     * @param event     event to append to
+     * @param tableName table name of virtual table to append to
+     * @return vtable or when null, found vtable
+     */
+    protected AbstractLoggerVirtualTable<?> 
appendToVirtualTable(AbstractLoggerVirtualTable<?> vtable, LoggingEvent event, 
String tableName)
+    {
+        AbstractLoggerVirtualTable<?> foundVtable;
+        if (vtable == null)
+        {
+            foundVtable = getVirtualTable(SlowQueriesTable.class, tableName);
+            if (foundVtable == null)
+                addToBuffer(event);
+            else
+                foundVtable.add(event);
+        }
+        else
+        {
+            foundVtable = vtable;
+            vtable.add(event);
+        }
+
+        return foundVtable;
+    }
+
+    @Override
+    public void stop()
+    {
+        synchronized (messageBuffer)
+        {
+            messageBuffer.clear();
+            super.stop();
+        }
+    }
+
+    /**
+     * Flushes all log entries which were appended before virtual table was 
registered.
+     *
+     * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces
+     */
+    public void flushBuffer(Class<? extends AbstractLoggerVirtualTable<?>> 
vtableClass, String tableName)
+    {
+        synchronized (messageBuffer)
+        {
+            Optional.ofNullable(getVirtualTable(vtableClass, 
tableName)).ifPresent(vtable -> {
+                messageBuffer.forEach(vtable::add);
+                messageBuffer.clear();
+            });
+        }
+    }
+
+    protected void addToBuffer(LoggingEvent eventObject)
+    {
+        synchronized (messageBuffer)
+        {
+            // we restrict how many logging events we can put into buffer,
+            // so we are not growing without any bound when things go south
+            if (messageBuffer.size() < defaultRows)
+                messageBuffer.add(eventObject);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java 
b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
index e710d44dd1..f32963b73a 100644
--- a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
+++ b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java
@@ -60,7 +60,8 @@ public class LogbackLoggingSupport implements LoggingSupport
     @Override
     public void onStartup()
     {
-        checkOnlyOneVirtualTableAppender();
+        checkOnlyOneVirtualTableAppender(VirtualTableAppender.class);
+        checkOnlyOneVirtualTableAppender(SlowQueriesAppender.class);
 
         // The default logback configuration in conf/logback.xml allows 
reloading the
         // configuration when the configuration file has changed (every 60 
seconds by default).
@@ -138,7 +139,20 @@ public class LogbackLoggingSupport implements 
LoggingSupport
     }
 
     @Override
-    public Optional<Appender<?>> getAppender(Class<?> appenderClass, String 
name)
+    public Optional<Logger> getLogger(String loggerName)
+    {
+        LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+        for (Logger logBackLogger : lc.getLoggerList())
+        {
+            if (logBackLogger.getName().equals(loggerName))
+                return Optional.of(logBackLogger);
+        }
+
+        return Optional.empty();
+    }
+
+    @Override
+    public <T extends Appender<?>> Optional<T> getAppender(Class<T> 
appenderClass, String appenderName)
     {
         LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
         for (Logger logBackLogger : lc.getLoggerList())
@@ -146,15 +160,15 @@ public class LogbackLoggingSupport implements 
LoggingSupport
             for (Iterator<Appender<ILoggingEvent>> iterator = 
logBackLogger.iteratorForAppenders(); iterator.hasNext();)
             {
                 Appender<ILoggingEvent> appender = iterator.next();
-                if (appender.getClass() == appenderClass && 
appender.getName().equals(name))
-                    return Optional.of(appender);
+                if (appender.getClass() == appenderClass && 
appender.getName().equals(appenderName))
+                    return Optional.of(appenderClass.cast(appender));
             }
         }
 
         return Optional.empty();
     }
 
-    private void checkOnlyOneVirtualTableAppender()
+    private void checkOnlyOneVirtualTableAppender(Class<?> appenderClass)
     {
         int count = 0;
         LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
@@ -164,7 +178,7 @@ public class LogbackLoggingSupport implements LoggingSupport
             for (Iterator<Appender<ILoggingEvent>> iterator = 
logBackLogger.iteratorForAppenders(); iterator.hasNext();)
             {
                 Appender<?> appender = iterator.next();
-                if (appender instanceof VirtualTableAppender)
+                if (appenderClass.isAssignableFrom(appender.getClass()))
                 {
                     virtualAppenderNames.add(appender.getName());
                     count += 1;
@@ -174,7 +188,7 @@ public class LogbackLoggingSupport implements LoggingSupport
 
         if (count > 1)
             throw new IllegalStateException(String.format("There are multiple 
appenders of class %s of names %s. There is only one appender of such class 
allowed.",
-                                                          
VirtualTableAppender.class.getName(), String.join(",", virtualAppenderNames)));
+                                                          
appenderClass.getName(), String.join(",", virtualAppenderNames)));
     }
 
     private boolean hasAppenders(Logger logBackLogger)
diff --git a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java 
b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
index 35e11975f9..00b40cb966 100644
--- a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
+++ b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.utils.logging;
 import java.util.Map;
 import java.util.Optional;
 
+import ch.qos.logback.classic.Logger;
 import ch.qos.logback.core.Appender;
 
 /**
@@ -53,7 +54,12 @@ public interface LoggingSupport
      */
     Map<String, String> getLoggingLevels();
 
-    default Optional<Appender<?>> getAppender(Class<?> appenderClass, String 
appenderName)
+    default <T extends Appender<?>> Optional<T> getAppender(Class<T> 
appenderClass, String appenderName)
+    {
+        return Optional.empty();
+    }
+
+    default Optional<Logger> getLogger(String loggerName)
     {
         return Optional.empty();
     }
diff --git 
a/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java 
b/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java
new file mode 100644
index 0000000000..4af2e38307
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils.logging;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable;
+import org.apache.cassandra.db.virtual.SlowQueriesTable;
+
+public final class SlowQueriesAppender extends AbstractVirtualTableAppender
+{
+    public static final String APPENDER_NAME = "SLOW_QUERIES_APPENDER";
+
+    private AbstractLoggerVirtualTable<?> slowQueries;
+
+    public SlowQueriesAppender()
+    {
+        super(SlowQueriesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS);
+    }
+
+    @Override
+    protected void append(LoggingEvent eventObject)
+    {
+        // slowQueries will be null as long as virtual tables
+        // are not registered, and we already try to put queries there.
+        // As soon as vtable is registered (as part of node's startup / 
initialisation),
+        // slow queries will never be null again
+        slowQueries = appendToVirtualTable(slowQueries, eventObject, 
SlowQueriesTable.TABLE_NAME);
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java 
b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
index 2820b2936f..03a142004a 100644
--- a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
+++ b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java
@@ -18,111 +18,35 @@
 
 package org.apache.cassandra.utils.logging;
 
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableSet;
 
 import ch.qos.logback.classic.spi.LoggingEvent;
-import ch.qos.logback.core.AppenderBase;
 import org.apache.cassandra.audit.FileAuditLogger;
+import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable;
 import org.apache.cassandra.db.virtual.LogMessagesTable;
-import org.apache.cassandra.db.virtual.VirtualKeyspace;
-import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
-import org.apache.cassandra.db.virtual.VirtualTable;
-
-import static 
org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS;
-import static org.apache.cassandra.db.virtual.LogMessagesTable.TABLE_NAME;
-import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
 
 /**
  * Appends Cassandra logs to virtual table system_views.system_logs
  */
-public final class VirtualTableAppender extends AppenderBase<LoggingEvent>
+public final class VirtualTableAppender extends AbstractVirtualTableAppender
 {
     public static final String APPENDER_NAME = "CQLLOG";
 
     private static final Set<String> forbiddenLoggers = 
ImmutableSet.of(FileAuditLogger.class.getName());
 
-    private LogMessagesTable logs;
-
-    // for holding messages until virtual registry contains logs virtual table
-    // as it takes some time during startup of a node to initialise virtual 
tables but messages are
-    // logged already
-    private final List<LoggingEvent> messageBuffer = new LinkedList<>();
+    private AbstractLoggerVirtualTable<?> logs;
 
-    @Override
-    protected void append(LoggingEvent eventObject)
+    public VirtualTableAppender()
     {
-        if (!forbiddenLoggers.contains(eventObject.getLoggerName()))
-        {
-            if (logs == null)
-            {
-                logs = getVirtualTable();
-                if (logs == null)
-                    addToBuffer(eventObject);
-                else
-                    logs.add(eventObject);
-            }
-            else
-                logs.add(eventObject);
-        }
+        super(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS);
     }
 
     @Override
-    public void stop()
-    {
-        messageBuffer.clear();
-        super.stop();
-    }
-
-    /**
-     * Flushes all logs which were appended before virtual table was 
registered.
-     *
-     * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces
-     */
-    public void flushBuffer()
-    {
-        Optional.ofNullable(getVirtualTable()).ifPresent(vtable -> {
-            messageBuffer.forEach(vtable::add);
-            messageBuffer.clear();
-        });
-    }
-
-    private LogMessagesTable getVirtualTable()
-    {
-        VirtualKeyspace keyspace = 
VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS);
-
-        if (keyspace == null)
-            return null;
-
-        Optional<VirtualTable> logsTable = keyspace.tables()
-                                                   .stream()
-                                                   .filter(vt -> 
vt.name().equals(TABLE_NAME))
-                                                   .findFirst();
-
-        if (!logsTable.isPresent())
-            return null;
-
-        VirtualTable vt = logsTable.get();
-
-        if (!(vt instanceof LogMessagesTable))
-            throw new IllegalStateException(String.format("Virtual table %s.%s 
is not backed by an instance of %s but by %s",
-                                                          VIRTUAL_VIEWS,
-                                                          TABLE_NAME,
-                                                          
LogMessagesTable.class.getName(),
-                                                          
vt.getClass().getName()));
-
-        return (LogMessagesTable) vt;
-    }
-
-    private void addToBuffer(LoggingEvent eventObject)
+    protected void append(LoggingEvent eventObject)
     {
-        // we restrict how many logging events we can put into buffer,
-        // so we are not growing without any bound when things go south
-        if (messageBuffer.size() < LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)
-            messageBuffer.add(eventObject);
+        if (!forbiddenLoggers.contains(eventObject.getLoggerName()))
+            logs = appendToVirtualTable(logs, eventObject, 
LogMessagesTable.TABLE_NAME);
     }
 }
diff --git a/test/conf/logback-dtest_with_slow_query_appender.xml 
b/test/conf/logback-dtest_with_slow_query_appender.xml
new file mode 100644
index 0000000000..1b6ed7511f
--- /dev/null
+++ b/test/conf/logback-dtest_with_slow_query_appender.xml
@@ -0,0 +1,63 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+    <define name="cluster_id" 
class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+    <define name="instance_id" 
class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+    <!-- Shutdown hook ensures that async appender flushes -->
+    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+    <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+        
<file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} 
%msg%n</pattern>
+        </encoder>
+        <immediateFlush>true</immediateFlush>
+    </appender>
+
+    <appender name="INSTANCESTDERR" target="System.err" 
class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>WARN</level>
+        </filter>
+    </appender>
+
+    <appender name="INSTANCESTDOUT" target="System.out" 
class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - 
%msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>DEBUG</level>
+        </filter>
+    </appender>
+
+    <appender name="SLOW_QUERIES_APPENDER" 
class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/>
+
+    <logger name="slow_operations" additivity="false" level="DEBUG">
+        <appender-ref ref="SLOW_QUERIES_APPENDER"/>
+    </logger>
+
+    <root level="DEBUG">
+        <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race 
conditions with appending and searching -->
+        <appender-ref ref="INSTANCESTDERR" />
+        <appender-ref ref="INSTANCESTDOUT" />
+    </root>
+</configuration>
diff --git a/test/conf/logback-dtest_with_slow_query_appender_invalid.xml 
b/test/conf/logback-dtest_with_slow_query_appender_invalid.xml
new file mode 100644
index 0000000000..a2252dd23a
--- /dev/null
+++ b/test/conf/logback-dtest_with_slow_query_appender_invalid.xml
@@ -0,0 +1,73 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration debug="false" scan="true" scanPeriod="60 seconds">
+    <define name="cluster_id" 
class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" />
+    <define name="instance_id" 
class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" />
+
+    <!-- Shutdown hook ensures that async appender flushes -->
+    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
+
+    <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
+        
<file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file>
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} 
%msg%n</pattern>
+        </encoder>
+        <immediateFlush>true</immediateFlush>
+    </appender>
+
+    <appender name="INSTANCESTDERR" target="System.err" 
class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>WARN</level>
+        </filter>
+    </appender>
+
+    <appender name="INSTANCESTDOUT" target="System.out" 
class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - 
%msg%n</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>DEBUG</level>
+        </filter>
+    </appender>
+
+    <appender name="SLOW_QUERIES_APPENDER" 
class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/>
+    <appender name="SLOW_QUERIES_APPENDER_2" 
class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/>
+
+    <appender name="CQLLOG" 
class="org.apache.cassandra.utils.logging.VirtualTableAppender">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+    </appender>
+
+    <!-- multiple appenders are not allowed -->
+    <logger name="slow_operations" additivity="false" level="DEBUG">
+        <appender-ref ref="SLOW_QUERIES_APPENDER"/>
+        <appender-ref ref="SLOW_QUERIES_APPENDER_2"/>
+    </logger>
+
+    <root level="DEBUG">
+        <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race 
conditions with appending and searching -->
+        <appender-ref ref="INSTANCESTDERR" />
+        <appender-ref ref="INSTANCESTDOUT" />
+        <appender-ref ref="CQLLOG" />
+    </root>
+</configuration>
diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java 
b/test/distributed/org/apache/cassandra/distributed/test/AbstractVirtualLogsTableTest.java
similarity index 69%
copy from src/java/org/apache/cassandra/db/monitoring/Monitorable.java
copy to 
test/distributed/org/apache/cassandra/distributed/test/AbstractVirtualLogsTableTest.java
index 10bd10438a..c8bb32fe72 100644
--- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/AbstractVirtualLogsTableTest.java
@@ -16,21 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.db.monitoring;
+package org.apache.cassandra.distributed.test;
 
-public interface Monitorable
-{
-    String name();
-    long creationTimeNanos();
-    long timeoutNanos();
-    long slowTimeoutNanos();
+import org.junit.Ignore;
+
+import static java.lang.String.format;
 
-    boolean isInProgress();
-    boolean isAborted();
-    boolean isCompleted();
-    boolean isSlow();
-    boolean isCrossNode();
+@Ignore
+public abstract class AbstractVirtualLogsTableTest extends TestBaseImpl
+{
+    public String query(String template)
+    {
+        return format(template, getTableName());
+    }
 
-    boolean abort();
-    boolean complete();
+    public abstract String getTableName();
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SlowQueriesAppenderTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/SlowQueriesAppenderTest.java
new file mode 100644
index 0000000000..d8b6b3f00f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SlowQueriesAppenderTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.virtual.SlowQueriesTable;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.shared.WithProperties;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.utils.logging.SlowQueriesAppender;
+
+import static java.lang.String.format;
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.LOGBACK_CONFIGURATION_FILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * It is inherently tricky / flaky to make some queries to be slow so we just 
test
+ * the invalid configuration otherwise the table as such is tested in {@link 
org.apache.cassandra.db.virtual.SlowQueriesTableTest}.
+ */
+public class SlowQueriesAppenderTest extends AbstractVirtualLogsTableTest
+{
+    @Test
+    public void testMultipleAppendersFailToStartNode() throws Throwable
+    {
+        
LOGBACK_CONFIGURATION_FILE.setString("test/conf/logback-dtest_with_slow_query_appender_invalid.xml");
+
+        // NOTE: Because cluster startup is expected to fail in this case, and 
can leave things in a weird state
+        // for the next state, create without starting, and set failure as 
shutdown to false,
+        // so the try-with-resources can close instances properly.
+        try (WithProperties properties = new 
WithProperties().set(LOGBACK_CONFIGURATION_FILE, 
"test/conf/logback-dtest_with_slow_query_appender_invalid.xml");
+             Cluster cluster = Cluster.build(1)
+                                      .withConfig(c -> c.with(Feature.values())
+                                                        
.set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false))
+                                      .createWithoutStarting())
+        {
+            cluster.startup();
+            fail("Node should not start as there is supposed to be invalid 
logback configuration file.");
+        }
+        catch (IllegalStateException ex)
+        {
+            assertEquals(format("There are multiple appenders of class %s " +
+                                "of names 
SLOW_QUERIES_APPENDER,SLOW_QUERIES_APPENDER_2. There is only one appender of 
such class allowed.",
+                                SlowQueriesAppender.class.getName()),
+                         ex.getMessage());
+        }
+    }
+
+    @Override
+    public String getTableName()
+    {
+        return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, 
SlowQueriesTable.TABLE_NAME);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SlowQueryDeserTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/SlowQueryDeserTest.java
new file mode 100644
index 0000000000..ae5bcc966c
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SlowQueryDeserTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
+import org.apache.cassandra.db.monitoring.MonitoringTask;
+import org.apache.cassandra.db.monitoring.MonitoringTask.SlowOperation;
+import org.apache.cassandra.utils.Clock;
+
+public class SlowQueryDeserTest
+{
+    @Test
+    public void testSlowQueryDeser() throws Throwable
+    {
+        SlowOperation slowOperation = new SlowOperation(new MonitorableImpl()
+        {
+            @Override
+            public String name()
+            {
+                return String.format("select * from %s.%s where id = 5", 
monitoredOnKeyspace(), monitoredOnTable());
+            }
+
+            @Override
+            public String monitoredOnKeyspace()
+            {
+                return "ks";
+            }
+
+            @Override
+            public String monitoredOnTable()
+            {
+                return "tb";
+            }
+
+            @Override
+            public boolean isCrossNode()
+            {
+                return true;
+            }
+        }, Clock.Global.currentTimeMillis());
+
+        String serialize = 
MonitoringTask.Operation.serialize(List.of(slowOperation));
+        Collection<MonitoringTask.Operation> deserialize = 
MonitoringTask.Operation.deserialize(serialize);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
index 71ef4dbe78..bf9f58123e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java
@@ -25,7 +25,6 @@ import org.junit.Test;
 
 import ch.qos.logback.classic.Level;
 import org.apache.cassandra.db.virtual.LogMessagesTable;
-import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.Feature;
@@ -47,8 +46,14 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-public class VirtualTableLogsTest extends TestBaseImpl
+public class VirtualTableLogsTest extends AbstractVirtualLogsTableTest
 {
+    @Override
+    public String getTableName()
+    {
+        return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, 
LogMessagesTable.TABLE_NAME);
+    }
+
     @Test
     public void testVTableOutput() throws Throwable
     {
@@ -56,9 +61,9 @@ public class VirtualTableLogsTest extends TestBaseImpl
              Cluster cluster = Cluster.build(1)
                                       .withConfig(c -> 
c.with(Feature.values()))
                                       .start();
-             )
+        )
         {
-            List<TestingLogMessage> rows = getRows(cluster);
+            List<LogMessage> rows = getRows(cluster);
             assertFalse(rows.isEmpty());
 
             rows.forEach(message -> 
assertTrue(Level.toLevel(message.level).isGreaterOrEqual(Level.INFO)));
@@ -91,39 +96,36 @@ public class VirtualTableLogsTest extends TestBaseImpl
         }
     }
 
-    private List<TestingLogMessage> getRows(Cluster cluster)
+    private List<LogMessage> getRows(Cluster cluster)
     {
         SimpleQueryResult simpleQueryResult = 
cluster.coordinator(1).executeWithResult(query("select * from %s"), ONE);
-        List<TestingLogMessage> rows = new ArrayList<>();
+        List<LogMessage> rows = new ArrayList<>();
         simpleQueryResult.forEachRemaining(row -> {
             long timestamp = row.getTimestamp(TIMESTAMP_COLUMN_NAME).getTime();
             String logger = row.getString(LOGGER_COLUMN_NAME);
             String level = row.getString(LEVEL_COLUMN_NAME);
             String message = row.getString(MESSAGE_COLUMN_NAME);
             int order = row.getInteger(ORDER_IN_MILLISECOND_COLUMN_NAME);
-            TestingLogMessage logMessage = new TestingLogMessage(timestamp, 
logger, level, message, order);
+            LogMessage logMessage = new LogMessage(timestamp, logger, level, 
message, order);
             rows.add(logMessage);
         });
         return rows;
     }
 
-    private String query(String template)
-    {
-        return format(template, getTableName());
-    }
-
-    private String getTableName()
-    {
-        return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, 
LogMessagesTable.TABLE_NAME);
-    }
-
-    private static class TestingLogMessage extends LogMessage
+    private static class LogMessage
     {
-        private int order;
+        public final long timestamp;
+        public final String logger;
+        public final String level;
+        public final String message;
+        public final int order;
 
-        public TestingLogMessage(long timestamp, String logger, String level, 
String message, int order)
+        public LogMessage(long timestamp, String logger, String level, String 
message, int order)
         {
-            super(timestamp, logger, level, message);
+            this.timestamp = timestamp;
+            this.logger = logger;
+            this.level = level;
+            this.message = message;
             this.order = order;
         }
     }
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTableTest.java
similarity index 55%
copy from test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
copy to 
test/unit/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTableTest.java
index dd32058533..40a926fda7 100644
--- a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
+++ 
b/test/unit/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTableTest.java
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import ch.qos.logback.classic.Level;
@@ -32,24 +33,21 @@ import ch.qos.logback.classic.spi.LoggingEvent;
 import com.datastax.driver.core.Row;
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.marshal.TimestampType;
-import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet;
-import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition;
-import org.apache.cassandra.dht.LocalPartitioner;
 
-import static 
org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
-public class LogMessagesTableTest extends CQLTester
+@Ignore
+public abstract class AbstractLoggerVirtualTableTest<U> extends CQLTester
 {
-    private String keyspace = createKeyspaceName();
-    private LogMessagesTable table;
+    protected final String keyspace = createKeyspaceName();
+
+    protected AbstractLoggerVirtualTable<U> table;
 
     @Test
-    public void testTruncate() throws Throwable
+    public void testTruncate()
     {
-        registerVirtualTable();
+        registerTable();
 
         int numberOfRows = 100;
         List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
@@ -61,28 +59,28 @@ public class LogMessagesTableTest extends CQLTester
     }
 
     @Test
-    public void empty() throws Throwable
+    public void testEmpty() throws Throwable
     {
-        registerVirtualTable();
+        registerTable();
         assertEmpty(execute(query("select * from %s")));
     }
 
     @Test
     public void testInsert()
     {
-        registerVirtualTable();
+        registerTable();
 
         int numberOfRows = 1000;
         List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
         loggingEvents.forEach(table::add);
 
-        assertEquals(numberOfRows, numberOfPartitions());
+        assertEquals(numberOfRows, execute(query("select * from %s")).size());
     }
 
     @Test
-    public void testLimitedCapacity() throws Throwable
+    public void testLimitedCapacity()
     {
-        registerVirtualTable(100);
+        registerTable(100);
 
         int numberOfRows = 1000;
         List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
@@ -94,7 +92,7 @@ public class LogMessagesTableTest extends CQLTester
         // the first record in the table will be the last one which we inserted
         LoggingEvent firstEvent = loggingEvents.get(999);
         assertRowsNet(executeNet(query("select timestamp from %s limit 1")),
-                      new Object[] { new Date(firstEvent.getTimeStamp()) });
+                      new Object[]{ new Date(firstEvent.getTimeStamp()) });
 
         // the last record in the table will be 900th we inserted
         List<Row> all = executeNet(query("select timestamp from %s")).all();
@@ -104,100 +102,68 @@ public class LogMessagesTableTest extends CQLTester
         assertEquals(loggingEvents.get(900).getTimeStamp(), 
timestamp.getTime());
     }
 
-    @Test
-    public void testMultipleLogsInSameMillisecond()
-    {
-        registerVirtualTable(10);
-        List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 
5);
-        loggingEvents.forEach(table::add);
-
-        // 2 partitions, 5 rows in each
-        assertEquals(2, numberOfPartitions());
-    }
-
-    @Test
-    public void testResolvingBufferSize()
-    {
-        LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(-1);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
-
-        LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(0);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
-
-        LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(1000001);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
+    protected abstract void registerTable(int maxSize);
 
-        LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(999);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
+    protected abstract void registerTable();
 
-        LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(50001);
-        assertEquals(50001, LogMessagesTable.resolveBufferSize());
-    }
-
-    private void registerVirtualTable()
-    {
-        registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS);
-    }
-
-    private void registerVirtualTable(int size)
+    protected void registerVirtualTable(AbstractLoggerVirtualTable<U> table)
     {
-        table = new LogMessagesTable(keyspace, size);
-        VirtualKeyspaceRegistry.instance.register(new 
VirtualKeyspace(keyspace, ImmutableList.of(table)));
+        this.table = table;
+        VirtualKeyspaceRegistry.instance.register(new 
VirtualKeyspace(table.metadata.keyspace, ImmutableList.of(this.table)));
     }
 
-    private int numberOfPartitions()
-    {
-        DataSet data = table.data();
-
-        Iterator<Partition> partitions = 
data.getPartitions(DataRange.allData(new 
LocalPartitioner(TimestampType.instance)));
-
-        int numberOfPartitions = 0;
-
-        while (partitions.hasNext())
-        {
-            partitions.next();
-            numberOfPartitions += 1;
-        }
-
-        return numberOfPartitions;
-    }
-
-    private String query(String query)
+    protected String query(String query)
     {
         return String.format(query, table.toString());
     }
 
-    private List<LoggingEvent> getLoggingEvents(int size)
+    protected List<LoggingEvent> getLoggingEvents(int size)
     {
         return getLoggingEvents(size, Instant.now(), 1);
     }
 
-    private List<LoggingEvent> getLoggingEvents(int size, Instant 
firstTimestamp, int logsInMillisecond)
+    protected List<LoggingEvent> getLoggingEvents(int size, Instant 
firstTimestamp, int logsInMillisecond)
     {
         List<LoggingEvent> logs = new LinkedList<>();
         int partitions = size / logsInMillisecond;
 
         for (int i = 0; i < partitions; i++)
         {
-            long timestamp = firstTimestamp.toEpochMilli();
-            firstTimestamp = firstTimestamp.plusSeconds(1);
+            firstTimestamp = firstTimestamp.plusSeconds(i);
 
             for (int j = 0; j < logsInMillisecond; j++)
-                logs.add(getLoggingEvent(timestamp));
+                logs.add(getLoggingEvent(firstTimestamp.toEpochMilli()));
         }
 
         return logs;
     }
 
-    private LoggingEvent getLoggingEvent(long timestamp)
+    protected int numberOfPartitions()
+    {
+        AbstractVirtualTable.DataSet data = table.data();
+        Iterator<AbstractVirtualTable.Partition> partitions = 
data.getPartitions(DataRange.allData(table.metadata.partitioner));
+        int numberOfPartitions = 0;
+
+        while (partitions.hasNext())
+        {
+            partitions.next();
+            numberOfPartitions += 1;
+        }
+
+        return numberOfPartitions;
+    }
+
+    protected LoggingEvent getLoggingEvent(long timestamp)
     {
         LoggingEvent event = new LoggingEvent();
         event.setLevel(Level.INFO);
-        event.setMessage("message " + timestamp);
-        event.setLoggerName("logger " + timestamp);
+        event.setMessage(getMessage(timestamp));
+        event.setLoggerName(AbstractLoggerVirtualTableTest.class.getName());
         event.setThreadName(Thread.currentThread().getName());
         event.setTimeStamp(timestamp);
 
         return event;
     }
+
+    protected abstract String getMessage(long timestamp);
 }
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
index dd32058533..7025e8ad4c 100644
--- a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java
@@ -20,69 +20,34 @@ package org.apache.cassandra.db.virtual;
 
 import java.time.Instant;
 import java.util.Date;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 
-import com.google.common.collect.ImmutableList;
 import org.junit.Test;
 
-import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.spi.LoggingEvent;
 import com.datastax.driver.core.Row;
-import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.db.DataRange;
-import org.apache.cassandra.db.marshal.TimestampType;
-import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet;
-import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition;
-import org.apache.cassandra.dht.LocalPartitioner;
 
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS;
+import static 
org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
-public class LogMessagesTableTest extends CQLTester
+public class LogMessagesTableTest extends 
AbstractLoggerVirtualTableTest<LoggingEvent>
 {
-    private String keyspace = createKeyspaceName();
-    private LogMessagesTable table;
-
-    @Test
-    public void testTruncate() throws Throwable
-    {
-        registerVirtualTable();
-
-        int numberOfRows = 100;
-        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
-        loggingEvents.forEach(table::add);
-
-        execute(query("truncate %s"));
-
-        assertTrue(executeNet(query("select timestamp from 
%s")).all().isEmpty());
-    }
-
-    @Test
-    public void empty() throws Throwable
-    {
-        registerVirtualTable();
-        assertEmpty(execute(query("select * from %s")));
-    }
-
     @Test
-    public void testInsert()
+    public void testMultipleLogsInSameMillisecond()
     {
-        registerVirtualTable();
-
-        int numberOfRows = 1000;
-        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+        registerTable();
+        List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 
5);
         loggingEvents.forEach(table::add);
 
-        assertEquals(numberOfRows, numberOfPartitions());
+        // 2 partitions, 5 rows in each
+        assertEquals(2, numberOfPartitions());
     }
 
     @Test
-    public void testLimitedCapacity() throws Throwable
+    public void testLimitedCapacity()
     {
-        registerVirtualTable(100);
+        registerTable(100);
 
         int numberOfRows = 1000;
         List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
@@ -94,7 +59,7 @@ public class LogMessagesTableTest extends CQLTester
         // the first record in the table will be the last one which we inserted
         LoggingEvent firstEvent = loggingEvents.get(999);
         assertRowsNet(executeNet(query("select timestamp from %s limit 1")),
-                      new Object[] { new Date(firstEvent.getTimeStamp()) });
+                      new Object[]{ new Date(firstEvent.getTimeStamp()) });
 
         // the last record in the table will be 900th we inserted
         List<Row> all = executeNet(query("select timestamp from %s")).all();
@@ -104,100 +69,47 @@ public class LogMessagesTableTest extends CQLTester
         assertEquals(loggingEvents.get(900).getTimeStamp(), 
timestamp.getTime());
     }
 
-    @Test
-    public void testMultipleLogsInSameMillisecond()
-    {
-        registerVirtualTable(10);
-        List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 
5);
-        loggingEvents.forEach(table::add);
-
-        // 2 partitions, 5 rows in each
-        assertEquals(2, numberOfPartitions());
-    }
-
     @Test
     public void testResolvingBufferSize()
     {
         LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(-1);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
+        assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize());
 
         LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(0);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
+        assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize());
 
         LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(1000001);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
+        assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize());
 
         LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(999);
-        assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, 
LogMessagesTable.resolveBufferSize());
+        assertEquals(999, resolveBufferSize());
 
         LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(50001);
-        assertEquals(50001, LogMessagesTable.resolveBufferSize());
+        assertEquals(50001, resolveBufferSize());
     }
 
-    private void registerVirtualTable()
+    private int resolveBufferSize()
     {
-        registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS);
+        return 
AbstractLoggerVirtualTable.resolveBufferSize(LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt(),
+                                                            
LogMessagesTable.LOGS_VIRTUAL_TABLE_MAX_ROWS,
+                                                            
LOGS_VIRTUAL_TABLE_DEFAULT_ROWS);
     }
 
-    private void registerVirtualTable(int size)
+    @Override
+    protected void registerTable(int maxSize)
     {
-        table = new LogMessagesTable(keyspace, size);
-        VirtualKeyspaceRegistry.instance.register(new 
VirtualKeyspace(keyspace, ImmutableList.of(table)));
+        registerVirtualTable(new LogMessagesTable(keyspace, maxSize));
     }
 
-    private int numberOfPartitions()
+    @Override
+    protected void registerTable()
     {
-        DataSet data = table.data();
-
-        Iterator<Partition> partitions = 
data.getPartitions(DataRange.allData(new 
LocalPartitioner(TimestampType.instance)));
-
-        int numberOfPartitions = 0;
-
-        while (partitions.hasNext())
-        {
-            partitions.next();
-            numberOfPartitions += 1;
-        }
-
-        return numberOfPartitions;
-    }
-
-    private String query(String query)
-    {
-        return String.format(query, table.toString());
-    }
-
-    private List<LoggingEvent> getLoggingEvents(int size)
-    {
-        return getLoggingEvents(size, Instant.now(), 1);
-    }
-
-    private List<LoggingEvent> getLoggingEvents(int size, Instant 
firstTimestamp, int logsInMillisecond)
-    {
-        List<LoggingEvent> logs = new LinkedList<>();
-        int partitions = size / logsInMillisecond;
-
-        for (int i = 0; i < partitions; i++)
-        {
-            long timestamp = firstTimestamp.toEpochMilli();
-            firstTimestamp = firstTimestamp.plusSeconds(1);
-
-            for (int j = 0; j < logsInMillisecond; j++)
-                logs.add(getLoggingEvent(timestamp));
-        }
-
-        return logs;
+        registerTable(1000);
     }
 
-    private LoggingEvent getLoggingEvent(long timestamp)
+    @Override
+    protected String getMessage(long timestamp)
     {
-        LoggingEvent event = new LoggingEvent();
-        event.setLevel(Level.INFO);
-        event.setMessage("message " + timestamp);
-        event.setLoggerName("logger " + timestamp);
-        event.setThreadName(Thread.currentThread().getName());
-        event.setTimeStamp(timestamp);
-
-        return event;
+        return "message " + timestamp;
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/SlowQueriesTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/SlowQueriesTableTest.java
new file mode 100644
index 0000000000..1c261e7cc9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/SlowQueriesTableTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+
+import ch.qos.logback.classic.spi.LoggingEvent;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
+import org.apache.cassandra.db.monitoring.MonitoringTask;
+import org.apache.cassandra.db.monitoring.MonitoringTask.Operation;
+import org.apache.cassandra.utils.Generators;
+import org.quicktheories.impl.JavaRandom;
+
+import static 
org.apache.cassandra.config.CassandraRelevantProperties.LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS;
+import static 
org.apache.cassandra.db.virtual.SlowQueriesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS;
+import static 
org.apache.cassandra.db.virtual.SlowQueriesTable.LOGS_VIRTUAL_TABLE_MAX_ROWS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class SlowQueriesTableTest extends 
AbstractLoggerVirtualTableTest<Operation>
+{
+    private final Random random = new Random();
+    private final JavaRandom javaRandom = new JavaRandom(random);
+
+    @Override
+    protected void registerTable(int maxSize)
+    {
+        registerVirtualTable(new SlowQueriesTable(keyspace, maxSize));
+    }
+
+    @Override
+    protected void registerTable()
+    {
+        registerTable(1000);
+    }
+
+    @Test
+    public void testLimitedCapacity()
+    {
+        registerTable(100);
+
+        int numberOfRows = 1000;
+        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+        assertEquals(1000, loggingEvents.size());
+        loggingEvents.forEach(table::add);
+
+        // even we inserted 1000 rows, only 100 are present as its capacity is 
bounded
+        assertEquals(100, executeNet(query("select * from %s")).all().size());
+    }
+
+    @Test
+    public void testDelete()
+    {
+        registerTable();
+
+        int numberOfRows = 100;
+        List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows);
+        loggingEvents.forEach(table::add);
+
+        Operation operation = table.buffer.get(0);
+
+        assertEquals(100, executeNet(query("select * from %s")).all().size());
+        execute(query("delete from %s where keyspace_name = '" + 
operation.keyspace() + '\''));
+        assertTrue(executeNet(query("select * from %s")).all().size() < 100);
+    }
+
+    @Test
+    public void testResolvingBufferSize()
+    {
+        LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(-1);
+        assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize());
+
+        LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(0);
+        assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize());
+
+        LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(1000001);
+        assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize());
+
+        LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(999);
+        assertEquals(999, resolveBufferSize());
+
+        LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(50001);
+        assertEquals(50001, resolveBufferSize());
+    }
+
+    private int resolveBufferSize()
+    {
+        return 
AbstractLoggerVirtualTable.resolveBufferSize(LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.getInt(),
+                                                            
LOGS_VIRTUAL_TABLE_MAX_ROWS,
+                                                            
LOGS_VIRTUAL_TABLE_DEFAULT_ROWS);
+    }
+
+
+    @Override
+    protected String getMessage(long timestamp)
+    {
+        MonitoringTask.SlowOperation slowOperation = new 
MonitoringTask.SlowOperation(new MonitorableImpl()
+        {
+            @Override
+            public String name()
+            {
+                return Generators.SYMBOL_GEN.generate(javaRandom);
+            }
+
+            @Override
+            public String monitoredOnKeyspace()
+            {
+                return Generators.SYMBOL_GEN.generate(javaRandom);
+            }
+
+            @Override
+            public String monitoredOnTable()
+            {
+                return Generators.SYMBOL_GEN.generate(javaRandom);
+            }
+
+            @Override
+            public boolean isCrossNode()
+            {
+                return random.nextBoolean();
+            }
+        }, timestamp);
+
+        return Operation.serialize(List.of(slowOperation));
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to