This is an automated email from the ASF dual-hosted git repository. smiklosovic pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 8fcf309dad871a373b41cb26c8e0b714c8c874ef Author: Stefan Miklosovic <smikloso...@apache.org> AuthorDate: Wed Apr 9 13:35:08 2025 +0200 Implement appender of slow queries to system_views.slow_queries table patch by Stefan Miklosovic; reviewed by Dmitry Konstantinov, Bernardo Botella for CASSANDRA-13001 --- CHANGES.txt | 1 + conf/cassandra.yaml | 4 + conf/logback.xml | 12 ++ .../configuration/cass_logback_xml_file.adoc | 68 ++++++- .../config/CassandraRelevantProperties.java | 3 + .../org/apache/cassandra/db/AbstractReadQuery.java | 12 ++ .../cassandra/db/monitoring/Monitorable.java | 74 +++++++ .../cassandra/db/monitoring/MonitoringTask.java | 217 ++++++++++++++++++--- .../db/virtual/AbstractLoggerVirtualTable.java | 111 +++++++++++ .../cassandra/db/virtual/LogMessagesTable.java | 112 ++--------- .../cassandra/db/virtual/SlowQueriesTable.java | 195 ++++++++++++++++++ .../cassandra/db/virtual/SystemViewsKeyspace.java | 1 + .../apache/cassandra/service/CassandraDaemon.java | 16 +- .../logging/AbstractVirtualTableAppender.java | 144 ++++++++++++++ .../utils/logging/LogbackLoggingSupport.java | 28 ++- .../cassandra/utils/logging/LoggingSupport.java | 8 +- .../utils/logging/SlowQueriesAppender.java | 45 +++++ .../utils/logging/VirtualTableAppender.java | 92 +-------- .../logback-dtest_with_slow_query_appender.xml | 63 ++++++ ...back-dtest_with_slow_query_appender_invalid.xml | 73 +++++++ .../test/AbstractVirtualLogsTableTest.java | 26 ++- .../distributed/test/SlowQueriesAppenderTest.java | 73 +++++++ .../distributed/test/SlowQueryDeserTest.java | 66 +++++++ .../distributed/test/VirtualTableLogsTest.java | 44 +++-- ...st.java => AbstractLoggerVirtualTableTest.java} | 124 +++++------- .../cassandra/db/virtual/LogMessagesTableTest.java | 144 +++----------- .../cassandra/db/virtual/SlowQueriesTableTest.java | 145 ++++++++++++++ 27 files changed, 1455 insertions(+), 446 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5b7cb6469c..1d86f30789 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Implement appender of slow queries to system_views.slow_queries table (CASSANDRA-13001) * Add autocompletion in CQLSH for built-in functions (CASSANDRA-19631) * Grant permission on keyspaces system_views and system_virtual_schema not possible (CASSANDRA-20171) * General Purpose Transactions (Accord) [CEP-15] (CASSANDRA-17092) diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index fe80fda164..0c68afe500 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1396,6 +1396,10 @@ request_timeout: 10000ms # How long before a node logs slow queries. Select queries that take longer than # this timeout to execute, will generate an aggregated log message, so that slow queries # can be identified. Set this value to zero to disable slow query logging. +# +# It is possible to log slow queries into system_views.slow_queries virtual table. +# Consult logback.xml to uncomment specific appender and logger to enable this functionality. +# # Min unit: ms slow_query_log_timeout: 500ms diff --git a/conf/logback.xml b/conf/logback.xml index 102cf06352..a8dabf6564 100644 --- a/conf/logback.xml +++ b/conf/logback.xml @@ -119,6 +119,18 @@ appender reference in the root level section below. </filter> </appender> --> + <!-- Uncomment below configuration and corresponding appender-ref in slow_queries logger to activate + logging into system_views.slow_queries virtual table. --> + <!-- <appender name="SLOW_QUERIES_APPENDER" class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> --> + + <!-- Log slow queries to system_views.slow_queries virtual table --> + <logger name="slow_queries" additivity="false" level="DEBUG"> + <appender-ref ref="DEBUGLOG"/> + <!-- uncomment this appender reference together with appender definition above + to start to put slow queries into system_views.slow_queries virtual table --> + <!-- <appender-ref ref="SLOW_QUERIES_APPENDER"/> --> + </logger> + <root level="INFO"> <appender-ref ref="SYSTEMLOG" /> <appender-ref ref="STDOUT" /> diff --git a/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc b/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc index a62dfe91a7..b6e4d5f545 100644 --- a/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc +++ b/doc/modules/cassandra/pages/managing/configuration/cass_logback_xml_file.adoc @@ -80,7 +80,7 @@ Specify the format of the message. Part of the rolling policy. <pattern>%-5level [%thread] %date\{ISO8601} %F:%L - %msg%n</pattern> </encoder> -=== Logging to Cassandra virtual table +=== Logging system logs to Cassandra virtual table It is possible to configure logback.xml in such a way that logs would appear in `system_views.system_log` table. This is achieved by appender implemented in class `VirtualTableAppender` which is called `CQLLOG` in the @@ -101,6 +101,72 @@ each message will occupy memory. The appender to virtual table is commented out by default so logging to virtual table is not active. +=== Logging slow queries to Cassandra virual table + +It is possible to log slow queries into `system_views.slow_queries` table. A query is evaluated to be slow +if it takes more than `slow_query_log_timeout` in `cassandra.yaml`. + +To log messages to `system_views.slow_queries` you need to: + +1. uncomment `SLOW_QUERIES_APPENDER` log appender +2. uncomment `appender-ref` pointing to `SLOW_QUERIES_APPENDER` in `slow_queries` logger: + +The respective configuration in `logback.xml` looks like this: + +[source,XML] +---- + <!-- Uncomment below configuration and corresponding appender-ref in slow_queries logger to activate + logging into system_views.slow_queries virtual table. --> + <!-- <appender name="SLOW_QUERIES_APPENDER" class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> --> + + <!-- Log slow queries to system_views.slow_queries virtual table --> + <logger name="slow_queries" additivity="false" level="DEBUG"> + <appender-ref ref="DEBUGLOG"/> + <!-- uncomment this appender reference together with appender definition above + to start to put slow queries into system_views.slow_queries virtual table + <appender-ref ref="SLOW_QUERIES_APPENDER"/> + --> + </logger> +---- + +By default, slow queries will be logged to `debug.log`. By uncommenting virtual table appender, it will be +logged to `debug.log` as well as to `system_views.slow_queries`. If you want to log it to `system_views.slow_queries` only, you need to comment out `DEBUGLOG` `appender-ref` in `slow_queries` logger declaration. + +If you want to log slow queries to a dedicated log file (which is e.g. rotated), that is also possible +by pointing `slow_queries` logger to a respective file appender of a given reference, similar to `DEBUGLOG` where all logs go by default. + +The structure of a table looks like this: + +[source,cql] +---- +cassandra@cqlsh> DESCRIBE system_views.slow_queries ; + +/* +Warning: Table system_views.slow_queries is a virtual table and cannot be recreated with CQL. +Structure, for reference: +VIRTUAL TABLE system_views.slow_queries ( + keyspace_name text, + table_name text, + timestamp timestamp, + query text, + avg_ms bigint, + cross_node boolean, + max_ms bigint, + min_ms bigint, + times_reported int, + PRIMARY KEY (keyspace_name, table_name, timestamp, query) +) WITH CLUSTERING ORDER BY (table_name ASC, timestamp ASC, query ASC) + AND comment = 'Slow queries'; +---- + +By having slow queries in a virtual table, an operator can check if there are slow queries for some table, see if +some queries violate some time threshold etc. The rows in this table are same data as one would get in `debug.log`, they +are just way more convenient to parse and query. + +`system_views.slow_queries` table is limited on number of rows it can hold, by default 10 000, configurable by `cassandra.virtual.slow_queries.max.rows` system property. If this table is full, the oldest entry is removed and the newest is inserted. This virtual table can be truncated by CQL and deletion on partition key (`keyspace_name` column) is allowed. + +A reader noticed that by placing custom appender implementation of `SLOW_QUERIES_APPENDER` appender on a class path and referencing it in `logback.xml`, it is possible to log slow queries wherever we have an appender for it. + === Contents of default `logback.xml` [source,XML] diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 7459fecb8c..e2957ce95f 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -27,6 +27,7 @@ import com.google.common.primitives.Ints; import accord.utils.Invariants; import org.apache.cassandra.db.virtual.LogMessagesTable; +import org.apache.cassandra.db.virtual.SlowQueriesTable; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.StorageCompatibilityMode; @@ -363,6 +364,8 @@ public enum CassandraRelevantProperties LOG4J2_DISABLE_JMX_LEGACY("log4j2.disable.jmx"), LOG4J_SHUTDOWN_HOOK_ENABLED("log4j.shutdownHookEnabled"), LOGBACK_CONFIGURATION_FILE("logback.configurationFile"), + /** Maximum number of rows in system_views.slow_queries */ + LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.slow_queries.max.rows", convertToString(SlowQueriesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)), /** Maximum number of rows in system_views.logs table */ LOGS_VIRTUAL_TABLE_MAX_ROWS("cassandra.virtual.logs.max.rows", convertToString(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)), /** diff --git a/src/java/org/apache/cassandra/db/AbstractReadQuery.java b/src/java/org/apache/cassandra/db/AbstractReadQuery.java index 448069cfca..2e72c7ec4f 100644 --- a/src/java/org/apache/cassandra/db/AbstractReadQuery.java +++ b/src/java/org/apache/cassandra/db/AbstractReadQuery.java @@ -118,4 +118,16 @@ abstract class AbstractReadQuery extends MonitorableImpl implements ReadQuery } protected abstract void appendCQLWhereClause(StringBuilder sb); + + @Override + public String monitoredOnKeyspace() + { + return metadata().keyspace; + } + + @Override + public String monitoredOnTable() + { + return metadata().name; + } } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java index 10bd10438a..4288a667de 100644 --- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java +++ b/src/java/org/apache/cassandra/db/monitoring/Monitorable.java @@ -20,6 +20,8 @@ package org.apache.cassandra.db.monitoring; public interface Monitorable { + Monitorable NO_OP = new NoOp(); + String name(); long creationTimeNanos(); long timeoutNanos(); @@ -33,4 +35,76 @@ public interface Monitorable boolean abort(); boolean complete(); + + default String monitoredOnKeyspace() { return null; }; + default String monitoredOnTable() { return null; }; + + class NoOp implements Monitorable + { + @Override + public String name() + { + return null; + } + + @Override + public long creationTimeNanos() + { + return 0; + } + + @Override + public long timeoutNanos() + { + return 0; + } + + @Override + public long slowTimeoutNanos() + { + return 0; + } + + @Override + public boolean isInProgress() + { + return false; + } + + @Override + public boolean isAborted() + { + return false; + } + + @Override + public boolean isCompleted() + { + return false; + } + + @Override + public boolean isSlow() + { + return false; + } + + @Override + public boolean isCrossNode() + { + return false; + } + + @Override + public boolean abort() + { + return false; + } + + @Override + public boolean complete() + { + return false; + } + } } diff --git a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java index 243569910b..4d6d995c77 100644 --- a/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java +++ b/src/java/org/apache/cassandra/db/monitoring/MonitoringTask.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.monitoring; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -32,9 +33,19 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.JsonUtils; import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.logging.LoggingSupport; +import org.apache.cassandra.utils.logging.LoggingSupportFactory; +import org.apache.cassandra.utils.logging.SlowQueriesAppender; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.config.CassandraRelevantProperties.MONITORING_MAX_OPERATIONS; @@ -47,8 +58,9 @@ import static org.apache.cassandra.utils.concurrent.BlockingQueues.newBlockingQu * We also log timed out operations, see CASSANDRA-7392. * Since CASSANDRA-12403 we also log queries that were slow. */ -class MonitoringTask +public class MonitoringTask { + private static final String SLOW_OPERATIONS_LOGGER_NAME = "slow_queries"; private static final String LINE_SEPARATOR = CassandraRelevantProperties.LINE_SEPARATOR.getString(); private static final Logger logger = LoggerFactory.getLogger(MonitoringTask.class); private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 5L, TimeUnit.MINUTES); @@ -70,6 +82,8 @@ class MonitoringTask private final ScheduledFuture<?> reportingTask; private final OperationsQueue failedOperationsQueue; private final OperationsQueue slowOperationsQueue; + private Logger slowOperationsLogger = logger; + private boolean slowOperationsLoggedToVirtualTable; private long approxLastLogTimeNanos; @@ -97,6 +111,15 @@ class MonitoringTask reportIntervalMillis, reportIntervalMillis, TimeUnit.MILLISECONDS); + + LoggingSupport support = LoggingSupportFactory.getLoggingSupport(); + if (support.getLogger(SLOW_OPERATIONS_LOGGER_NAME).isPresent()) + { + if (support.getAppender(SlowQueriesAppender.class, SlowQueriesAppender.APPENDER_NAME).isPresent()) + slowOperationsLoggedToVirtualTable = true; + + slowOperationsLogger = LoggerFactory.getLogger(SLOW_OPERATIONS_LOGGER_NAME); + } } public void cancel() @@ -169,14 +192,30 @@ class MonitoringTask if (!slowOperations.isEmpty()) { long approxElapsedNanos = approxCurrentTimeNanos - approxLastLogTimeNanos; - noSpamLogger.info("Some operations were slow, details available at debug level (debug.log)"); + noSpamLogger.info("Some operations were slow, details available at debug level (debug.log) or " + + "system_views.slow_queries virtual table (when enabled)."); + + if (slowOperationsLogger.isDebugEnabled()) + { + if (slowOperationsLoggedToVirtualTable) + { + // This is the crux of the patch for appending to vtable. + // Because we can send only Strings to debug method (or objects, on which toString() + // would be eventually called), we need to log a string in such a way that we can + // get Operation object(s) back "on the other side" when dealing with vtables and custom appenders + // as appenders work with LoggingEvent where message is just a string. + // It would be very hard / tricky / error-prone to parse customly crafted log message + // which appears in logs when no vtable appender is used. + slowOperationsLogger.debug(Operation.serialize(slowOperations.getOperations())); + } + else + slowOperationsLogger.debug("{} operations were slow in the last {} msecs:{}{}", + slowOperations.num(), + NANOSECONDS.toMillis(approxElapsedNanos), + LINE_SEPARATOR, + slowOperations.getLogMessage()); + } - if (logger.isDebugEnabled()) - logger.debug("{} operations were slow in the last {} msecs:{}{}", - slowOperations.num(), - NANOSECONDS.toMillis(approxElapsedNanos), - LINE_SEPARATOR, - slowOperations.getLogMessage()); return true; } return false; @@ -274,6 +313,12 @@ class MonitoringTask return operations.size() + numDropped; } + private Collection<Operation> getOperations() + { + return operations.values(); + } + + @JsonIgnore String getLogMessage() { if (isEmpty()) @@ -307,9 +352,16 @@ class MonitoringTask * same name (CQL query text) is reported and store the average, min and max * times. */ - protected abstract static class Operation + @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, property = "id") + @JsonSubTypes({ @JsonSubTypes.Type(value = SlowOperation.class) }) + @VisibleForTesting + public abstract static class Operation { + @JsonProperty + String id = getClass().getName(); + /** The operation that was reported as slow or timed out */ + @JsonIgnore final Monitorable operation; /** The number of times the operation was reported */ @@ -319,24 +371,50 @@ class MonitoringTask long totalTimeNanos; /** The maximum time spent by this operation */ - long maxTime; + long maxTimeNanos; /** The minimum time spent by this operation */ - long minTime; + long minTimeNanos; /** The name of the operation, i.e. the SELECT query CQL, * this is set lazily as it takes time to build the query CQL */ private String name; + /** + * creation time of this Operation object, in ms, + * this is different from operation's creationTimeNanos + * which does not follow wall clock and is useless for + * reporting purposes e.g. in virtual tables + */ + private final long timestampMs; + + // optional keyspace and table this operation acts on + // used upon deserialization + private String keyspace; + private String table; + private boolean crossNode; + Operation(Monitorable operation, long failedAtNanos) { this.operation = operation; numTimesReported = 1; totalTimeNanos = failedAtNanos - operation.creationTimeNanos(); - minTime = totalTimeNanos; - maxTime = totalTimeNanos; + minTimeNanos = totalTimeNanos; + maxTimeNanos = totalTimeNanos; + timestampMs = Clock.Global.currentTimeMillis() - (Clock.Global.nanoTime() - operation.creationTimeNanos()) / 1_000_000; + } + + void add(Operation operation) + { + numTimesReported++; + totalTimeNanos += operation.totalTimeNanos; + maxTimeNanos = Math.max(maxTimeNanos, operation.maxTimeNanos); + minTimeNanos = Math.min(minTimeNanos, operation.minTimeNanos); } + public abstract String getLogMessage(); + + @JsonProperty public String name() { if (name == null) @@ -344,15 +422,96 @@ class MonitoringTask return name; } - void add(Operation operation) + @JsonProperty + public String keyspace() { - numTimesReported++; - totalTimeNanos += operation.totalTimeNanos; - maxTime = Math.max(maxTime, operation.maxTime); - minTime = Math.min(minTime, operation.minTime); + if (operation != null) + { + String monitored = operation.monitoredOnKeyspace(); + if (monitored != null) + return monitored; + } + return keyspace; } - public abstract String getLogMessage(); + public void setKeyspace(String keyspace) + { + this.keyspace = keyspace; + } + + public void setTable(String table) + { + this.table = table; + } + + @JsonProperty + public String table() + { + if (operation != null) + { + String monitored = operation.monitoredOnTable(); + if (monitored != null) + return monitored; + } + return table; + } + + @JsonProperty + public boolean isCrossNode() + { + if (operation != null) + return operation.isCrossNode(); + + return crossNode; + } + + @JsonProperty + public int numTimesReported() + { + return numTimesReported; + } + + @JsonProperty + public long totalTimeNanos() + { + return totalTimeNanos; + } + + @JsonProperty + public long maxTimeNanos() + { + return maxTimeNanos; + } + + @JsonProperty + public long minTimeNanos() + { + return minTimeNanos; + } + + @JsonIgnore + public long averageTime() + { + return totalTimeNanos / numTimesReported; + } + + @JsonProperty + public long timestampMs() + { + return timestampMs; + } + + public static String serialize(Collection<Operation> operations) + { + return JsonUtils.writeAsJsonString(operations); + } + + private static final TypeReference<List<Operation>> TYPE_REFERENCE = new TypeReference<>() {}; + + public static List<Operation> deserialize(String message) throws Throwable + { + return JsonUtils.JSON_OBJECT_MAPPER.readValue(message, TYPE_REFERENCE); + } } /** @@ -378,8 +537,8 @@ class MonitoringTask name(), numTimesReported, NANOSECONDS.toMillis(totalTimeNanos / numTimesReported), - NANOSECONDS.toMillis(minTime), - NANOSECONDS.toMillis(maxTime), + NANOSECONDS.toMillis(minTimeNanos), + NANOSECONDS.toMillis(maxTimeNanos), NANOSECONDS.toMillis(operation.timeoutNanos()), operation.isCrossNode() ? "msec/cross-node" : "msec"); } @@ -388,13 +547,21 @@ class MonitoringTask /** * An operation (query) that was reported as slow. */ - private final static class SlowOperation extends Operation + @VisibleForTesting + public final static class SlowOperation extends Operation { - SlowOperation(Monitorable operation, long failedAt) + // purely for deserialization purposes + public SlowOperation() + { + this(Monitorable.NO_OP, 0); + } + + public SlowOperation(Monitorable operation, long failedAt) { super(operation, failedAt); } + @JsonIgnore public String getLogMessage() { if (numTimesReported == 1) @@ -408,8 +575,8 @@ class MonitoringTask name(), numTimesReported, NANOSECONDS.toMillis(totalTimeNanos/ numTimesReported), - NANOSECONDS.toMillis(minTime), - NANOSECONDS.toMillis(maxTime), + NANOSECONDS.toMillis(minTimeNanos), + NANOSECONDS.toMillis(maxTimeNanos), NANOSECONDS.toMillis(operation.slowTimeoutNanos()), operation.isCrossNode() ? "msec/cross-node" : "msec"); } diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTable.java new file mode 100644 index 0000000000..008d5d432a --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTable.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.spi.LoggingEvent; +import org.apache.cassandra.schema.TableMetadata; + +/** + * This table is inherently limited on number of rows it can hold. + * + * @param <U> type parameter saying what object is stored in internal bounded list for query purposes + */ +public abstract class AbstractLoggerVirtualTable<U> extends AbstractMutableVirtualTable +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractLoggerVirtualTable.class); + + // please be sure operations on this structure are thread-safe + protected final List<U> buffer; + + @VisibleForTesting + protected static int resolveBufferSize(int wantedSize, int max, int defaultSize) + { + return (wantedSize < 1 || wantedSize > max) ? defaultSize : wantedSize; + } + + protected AbstractLoggerVirtualTable(TableMetadata metadata, int maxSize) + { + super(metadata); + this.buffer = BoundedLinkedList.create(maxSize); + logger.debug("capacity of virtual table {} is set to be at most {} rows", metadata().toString(), maxSize); + } + + public void add(LoggingEvent event) + { + List<U> messages = getMessages(event); + if (messages != null) + { + // specifically calling buffer.add to reach BoundedLinkedList's add + // instead of linked list's addAll + for (U message : messages) + buffer.add(message); + } + } + + public abstract List<U> getMessages(LoggingEvent event); + + @Override + public void truncate() + { + synchronized (buffer) + { + buffer.clear(); + } + } + + @Override + public boolean allowFilteringImplicitly() + { + return false; + } + + private static final class BoundedLinkedList<T> extends LinkedList<T> + { + private final int maxSize; + + public static <T> List<T> create(int size) + { + return Collections.synchronizedList(new BoundedLinkedList<>(size)); + } + + private BoundedLinkedList(int maxSize) + { + this.maxSize = maxSize; + } + + @Override + public synchronized boolean add(T t) + { + if (size() == maxSize) + removeLast(); + + addFirst(t); + + return true; + } + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java index 5903ac2ab5..87978e3fd9 100644 --- a/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java +++ b/src/java/org/apache/cassandra/db/virtual/LogMessagesTable.java @@ -18,15 +18,11 @@ package org.apache.cassandra.db.virtual; -import java.util.Collections; import java.util.Date; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import ch.qos.logback.classic.spi.LoggingEvent; import org.apache.cassandra.config.CassandraRelevantProperties; @@ -50,11 +46,8 @@ import org.apache.cassandra.schema.TableMetadata; * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-18238">CASSANDRA-18238</a> * @see org.apache.cassandra.utils.logging.VirtualTableAppender */ -public final class LogMessagesTable extends AbstractMutableVirtualTable +public final class LogMessagesTable extends AbstractLoggerVirtualTable<LoggingEvent> { - private static final Logger logger = LoggerFactory.getLogger(LogMessagesTable.class); - - public static final int LOGS_VIRTUAL_TABLE_MIN_ROWS = 1000; public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 50_000; public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000; @@ -67,11 +60,11 @@ public final class LogMessagesTable extends AbstractMutableVirtualTable public static final String LEVEL_COLUMN_NAME = "level"; public static final String MESSAGE_COLUMN_NAME = "message"; - private final List<LogMessage> buffer; - LogMessagesTable(String keyspace) { - this(keyspace, resolveBufferSize()); + this(keyspace, resolveBufferSize(CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt(), + LOGS_VIRTUAL_TABLE_MAX_ROWS, + LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)); } @VisibleForTesting @@ -85,10 +78,14 @@ public final class LogMessagesTable extends AbstractMutableVirtualTable .addClusteringColumn(ORDER_IN_MILLISECOND_COLUMN_NAME, Int32Type.instance) .addRegularColumn(LOGGER_COLUMN_NAME, UTF8Type.instance) .addRegularColumn(LEVEL_COLUMN_NAME, UTF8Type.instance) - .addRegularColumn(MESSAGE_COLUMN_NAME, UTF8Type.instance).build()); + .addRegularColumn(MESSAGE_COLUMN_NAME, UTF8Type.instance).build(), + size); + } - logger.debug("capacity of virtual table {} is set to be at most {} rows", metadata().toString(), size); - buffer = BoundedLinkedList.create(size); + @Override + public List<LoggingEvent> getMessages(LoggingEvent event) + { + return List.of(event); } @Override @@ -103,12 +100,12 @@ public final class LogMessagesTable extends AbstractMutableVirtualTable int index = 0; - Iterator<LogMessage> iterator = buffer.listIterator(); + Iterator<LoggingEvent> iterator = buffer.listIterator(); while (iterator.hasNext()) { - LogMessage log = iterator.next(); + LoggingEvent log = iterator.next(); - milliSecondsOfCurrentLog = log.timestamp; + milliSecondsOfCurrentLog = log.getTimeStamp(); if (milliSecondsOfPreviousLog == milliSecondsOfCurrentLog) ++index; else @@ -116,86 +113,13 @@ public final class LogMessagesTable extends AbstractMutableVirtualTable milliSecondsOfPreviousLog = milliSecondsOfCurrentLog; - result.row(new Date(log.timestamp), index) - .column(LOGGER_COLUMN_NAME, log.logger) - .column(LEVEL_COLUMN_NAME, log.level) - .column(MESSAGE_COLUMN_NAME, log.message); + result.row(new Date(milliSecondsOfCurrentLog), index) + .column(LOGGER_COLUMN_NAME, log.getLoggerName()) + .column(LEVEL_COLUMN_NAME, log.getLevel().toString()) + .column(MESSAGE_COLUMN_NAME, log.getFormattedMessage()); } } return result; } - - public void add(LoggingEvent event) - { - buffer.add(new LogMessage(event)); - } - - @Override - public void truncate() - { - buffer.clear(); - } - - @Override - public boolean allowFilteringImplicitly() - { - return false; - } - - @VisibleForTesting - static int resolveBufferSize() - { - int size = CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt(); - return (size < LOGS_VIRTUAL_TABLE_MIN_ROWS || size > LOGS_VIRTUAL_TABLE_MAX_ROWS) - ? LOGS_VIRTUAL_TABLE_DEFAULT_ROWS : size; - } - - @VisibleForTesting - public static class LogMessage - { - public final long timestamp; - public final String logger; - public final String level; - public final String message; - - public LogMessage(LoggingEvent event) - { - this(event.getTimeStamp(), event.getLoggerName(), event.getLevel().toString(), event.getFormattedMessage()); - } - - public LogMessage(long timestamp, String logger, String level, String message) - { - this.timestamp = timestamp; - this.logger = logger; - this.level = level; - this.message = message; - } - } - - private static final class BoundedLinkedList<T> extends LinkedList<T> - { - private final int maxSize; - - public static <T> List<T> create(int size) - { - return Collections.synchronizedList(new BoundedLinkedList<>(size)); - } - - private BoundedLinkedList(int maxSize) - { - this.maxSize = maxSize; - } - - @Override - public boolean add(T t) - { - if (size() == maxSize) - removeLast(); - - addFirst(t); - - return true; - } - } } diff --git a/src/java/org/apache/cassandra/db/virtual/SlowQueriesTable.java b/src/java/org/apache/cassandra/db/virtual/SlowQueriesTable.java new file mode 100644 index 0000000000..0d392d0ce2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/SlowQueriesTable.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ch.qos.logback.classic.spi.LoggingEvent; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.BooleanType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.TimestampType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.monitoring.MonitoringTask.Operation; +import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.schema.TableMetadata; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class SlowQueriesTable extends AbstractLoggerVirtualTable<Operation> +{ + private static final Logger logger = LoggerFactory.getLogger(SlowQueriesTable.class); + + public static final int LOGS_VIRTUAL_TABLE_DEFAULT_ROWS = 10_000; + public static final int LOGS_VIRTUAL_TABLE_MAX_ROWS = 100_000; + + public static final String TABLE_NAME = "slow_queries"; + private static final String TABLE_COMMENT = "Slow queries"; + + public static final String KEYSPACE_COLUMN_NAME = "keyspace_name"; + public static final String TABLE_COLUMN_NAME = "table_name"; + public static final String TIMESTAMP_COLUMN_NAME = "timestamp"; + public static final String QUERY_COLUMN_NAME = "query"; + public static final String MINIMUM_TIME_COLUMN_NAME = "min_ms"; + public static final String MAXIMUM_TIME_COLUMN_NAME = "max_ms"; + public static final String AVERAGE_TIME_COLUMN_NAME = "avg_ms"; + public static final String TIMES_REPORTED_COLUMN_NAME = "times_reported"; + public static final String CROSS_NODE_COLUMN_NAME = "cross_node"; + + SlowQueriesTable(String keyspace) + { + this(keyspace, resolveBufferSize(CassandraRelevantProperties.LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.getInt(), + LOGS_VIRTUAL_TABLE_MAX_ROWS, + LOGS_VIRTUAL_TABLE_DEFAULT_ROWS)); + } + + @VisibleForTesting + SlowQueriesTable(String keyspace, int size) + { + super(TableMetadata.builder(keyspace, TABLE_NAME) + .comment(TABLE_COMMENT) + .kind(TableMetadata.Kind.VIRTUAL) + .partitioner(new LocalPartitioner(UTF8Type.instance)) + .addPartitionKeyColumn(KEYSPACE_COLUMN_NAME, UTF8Type.instance) + .addClusteringColumn(TABLE_COLUMN_NAME, UTF8Type.instance) + .addClusteringColumn(TIMESTAMP_COLUMN_NAME, TimestampType.instance) + // We are adding query as a clustering column for uniqueness, + // In theory, it might happen that two monitoring operations + // would be emitted for same keyspace, same table at the exact same time + // (in milliseconds). That means that one operation would "shadow" + // another one because primary key would be same for both. + // To make it truly unique, we include query among clustering keys + // as well. If queries were same, then they would be also reported so + // (it would be reflected in "times_reported" column) + .addClusteringColumn(QUERY_COLUMN_NAME, UTF8Type.instance) + .addRegularColumn(MINIMUM_TIME_COLUMN_NAME, LongType.instance) + .addRegularColumn(MAXIMUM_TIME_COLUMN_NAME, LongType.instance) + .addRegularColumn(AVERAGE_TIME_COLUMN_NAME, LongType.instance) + .addRegularColumn(TIMES_REPORTED_COLUMN_NAME, Int32Type.instance) + .addRegularColumn(CROSS_NODE_COLUMN_NAME, BooleanType.instance) + .build(), + size); + } + + @Override + protected void applyPartitionDeletion(ColumnValues partitionKey) + { + String keyspace = partitionKey.value(0); + + synchronized (buffer) + { + buffer.removeIf(o -> o.keyspace().equals(keyspace)); + } + } + + @Override + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata(), DecoratedKey.comparator.reversed()); + + synchronized (buffer) + { + Iterator<Operation> iterator = buffer.listIterator(); + while (iterator.hasNext()) + { + Operation operation = iterator.next(); + + result.row(operation.keyspace(), operation.table(), new Date(operation.timestampMs()), operation.name()) + .column(MINIMUM_TIME_COLUMN_NAME, NANOSECONDS.toMillis(operation.minTimeNanos())) + .column(MAXIMUM_TIME_COLUMN_NAME, NANOSECONDS.toMillis(operation.maxTimeNanos())) + .column(AVERAGE_TIME_COLUMN_NAME, NANOSECONDS.toMillis(operation.averageTime())) + .column(TIMES_REPORTED_COLUMN_NAME, operation.numTimesReported()) + .column(CROSS_NODE_COLUMN_NAME, operation.isCrossNode()); + } + } + + return result; + } + + @Override + public List<Operation> getMessages(LoggingEvent event) + { + try + { + List<Operation> qualified = new ArrayList<>(); + for (Operation operation : Operation.deserialize(event.getMessage())) + { + + // in (improbable) case there is an operation which does not have + // keyspace / table on it, we just skip this from processing + // as we would have nothing to show for partition key and clustering column + if (operation.keyspace() == null || operation.table() == null) + continue; + + // if cf of an operation is present, take keyspace and table name from it + // instead of having new string instances per operation which might + // take relatively a lot of additional space unnecessarily + Keyspace keyspace = Keyspace.openIfExists(operation.keyspace()); + String keyspaceName; + String tableName; + if (keyspace != null) + { + keyspaceName = keyspace.getName(); + try + { + ColumnFamilyStore table = keyspace.getColumnFamilyStore(operation.table()); + tableName = table.getTableName(); + } + catch (IllegalArgumentException ex) + { + tableName = operation.table(); + } + } + else + { + keyspaceName = operation.keyspace(); + tableName = operation.table(); + } + + operation.setKeyspace(keyspaceName); + operation.setTable(tableName); + qualified.add(operation); + } + + return qualified; + } + catch (Throwable t) + { + logger.trace("Unable to generate list of slow queries", t); + return null; + } + } + + @Override + public boolean allowFilteringImplicitly() + { + return true; + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java index 3ca8f728a8..28c6dc8fef 100644 --- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java @@ -61,6 +61,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace .add(new GossipInfoTable(VIRTUAL_VIEWS)) .add(new QueriesTable(VIRTUAL_VIEWS)) .add(new LogMessagesTable(VIRTUAL_VIEWS)) + .add(new SlowQueriesTable(VIRTUAL_VIEWS)) .add(new SnapshotsTable(VIRTUAL_VIEWS)) .add(new PeersTable(VIRTUAL_VIEWS)) .add(new LocalTable(VIRTUAL_VIEWS)) diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index d3c787d2e2..171ec47e1e 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -59,6 +59,8 @@ import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.SystemKeyspaceMigrator41; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.virtual.AccordDebugKeyspace; +import org.apache.cassandra.db.virtual.LogMessagesTable; +import org.apache.cassandra.db.virtual.SlowQueriesTable; import org.apache.cassandra.db.virtual.SystemViewsKeyspace; import org.apache.cassandra.db.virtual.VirtualKeyspace; import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; @@ -94,6 +96,7 @@ import org.apache.cassandra.utils.NativeLibrary; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; import org.apache.cassandra.utils.logging.LoggingSupportFactory; +import org.apache.cassandra.utils.logging.SlowQueriesAppender; import org.apache.cassandra.utils.logging.VirtualTableAppender; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -436,7 +439,6 @@ public class CassandraDaemon { exitOrFail(e.returnCode, e.getMessage(), e.getCause()); } - } /** @@ -555,11 +557,17 @@ public class CassandraDaemon if (DatabaseDescriptor.getAccord().enable_virtual_debug_only_keyspace) VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance); - // flush log messages to system_views.system_logs virtual table as there were messages already logged - // before that virtual table was instantiated + // Flush log messages to system_views.system_logs virtual table as there were messages already logged + // before that virtual table was instantiated. + // In general, there is no need to do same treatment for slow queries as by the time queries are processed + // the logging framework if fully setup already but for the sake of it and to be sure, just do it as well. LoggingSupportFactory.getLoggingSupport() .getAppender(VirtualTableAppender.class, VirtualTableAppender.APPENDER_NAME) - .ifPresent(appender -> ((VirtualTableAppender) appender).flushBuffer()); + .ifPresent(appender -> appender.flushBuffer(LogMessagesTable.class, LogMessagesTable.TABLE_NAME)); + + LoggingSupportFactory.getLoggingSupport() + .getAppender(SlowQueriesAppender.class, SlowQueriesAppender.APPENDER_NAME) + .ifPresent(appender -> appender.flushBuffer(SlowQueriesTable.class, SlowQueriesTable.TABLE_NAME)); } public synchronized void initializeClientTransports() diff --git a/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java new file mode 100644 index 0000000000..7becbc13fc --- /dev/null +++ b/src/java/org/apache/cassandra/utils/logging/AbstractVirtualTableAppender.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.logging; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +import ch.qos.logback.classic.spi.LoggingEvent; +import ch.qos.logback.core.AppenderBase; +import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable; +import org.apache.cassandra.db.virtual.SlowQueriesTable; +import org.apache.cassandra.db.virtual.VirtualKeyspace; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.db.virtual.VirtualTable; + +import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS; + +public abstract class AbstractVirtualTableAppender extends AppenderBase<LoggingEvent> +{ + private final int defaultRows; + + protected AbstractVirtualTableAppender(int defaultRows) + { + this.defaultRows = defaultRows; + } + + // for holding messages until virtual registry contains logs virtual table + // as it takes some time during startup of a node to initialise virtual tables but messages are + // logged already + protected final List<LoggingEvent> messageBuffer = new LinkedList<>(); + + protected <T> T getVirtualTable(Class<T> vtableClass, String tableName) + { + VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS); + + if (keyspace == null) + return null; + + Optional<VirtualTable> virtualTable = keyspace.tables() + .stream() + .filter(vt -> vt.name().equals(tableName)) + .findFirst(); + + if (virtualTable.isEmpty()) + return null; + + VirtualTable vt = virtualTable.get(); + + if (!vt.getClass().equals(vtableClass)) + throw new IllegalStateException(String.format("Virtual table %s.%s is not backed by an instance of %s but by %s", + VIRTUAL_VIEWS, + tableName, + vtableClass.getName(), + vt.getClass().getName())); + + return (T) vt; + } + + /** + * This method adds an event to virtual table, when present. + * When vtable is null, we will attempt to find it among registered ones. Then not found, we add it to internal + * buffer for later processing. This might happen e.g. for logging tables when log events + * were appended via logging framework sooner than registration of virtual tables was done so after they are registered, + * they would miss logging events happened before being so. + * + * @param vtable vtable to append to + * @param event event to append to + * @param tableName table name of virtual table to append to + * @return vtable or when null, found vtable + */ + protected AbstractLoggerVirtualTable<?> appendToVirtualTable(AbstractLoggerVirtualTable<?> vtable, LoggingEvent event, String tableName) + { + AbstractLoggerVirtualTable<?> foundVtable; + if (vtable == null) + { + foundVtable = getVirtualTable(SlowQueriesTable.class, tableName); + if (foundVtable == null) + addToBuffer(event); + else + foundVtable.add(event); + } + else + { + foundVtable = vtable; + vtable.add(event); + } + + return foundVtable; + } + + @Override + public void stop() + { + synchronized (messageBuffer) + { + messageBuffer.clear(); + super.stop(); + } + } + + /** + * Flushes all log entries which were appended before virtual table was registered. + * + * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces + */ + public void flushBuffer(Class<? extends AbstractLoggerVirtualTable<?>> vtableClass, String tableName) + { + synchronized (messageBuffer) + { + Optional.ofNullable(getVirtualTable(vtableClass, tableName)).ifPresent(vtable -> { + messageBuffer.forEach(vtable::add); + messageBuffer.clear(); + }); + } + } + + protected void addToBuffer(LoggingEvent eventObject) + { + synchronized (messageBuffer) + { + // we restrict how many logging events we can put into buffer, + // so we are not growing without any bound when things go south + if (messageBuffer.size() < defaultRows) + messageBuffer.add(eventObject); + } + } +} diff --git a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java index e710d44dd1..f32963b73a 100644 --- a/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java +++ b/src/java/org/apache/cassandra/utils/logging/LogbackLoggingSupport.java @@ -60,7 +60,8 @@ public class LogbackLoggingSupport implements LoggingSupport @Override public void onStartup() { - checkOnlyOneVirtualTableAppender(); + checkOnlyOneVirtualTableAppender(VirtualTableAppender.class); + checkOnlyOneVirtualTableAppender(SlowQueriesAppender.class); // The default logback configuration in conf/logback.xml allows reloading the // configuration when the configuration file has changed (every 60 seconds by default). @@ -138,7 +139,20 @@ public class LogbackLoggingSupport implements LoggingSupport } @Override - public Optional<Appender<?>> getAppender(Class<?> appenderClass, String name) + public Optional<Logger> getLogger(String loggerName) + { + LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); + for (Logger logBackLogger : lc.getLoggerList()) + { + if (logBackLogger.getName().equals(loggerName)) + return Optional.of(logBackLogger); + } + + return Optional.empty(); + } + + @Override + public <T extends Appender<?>> Optional<T> getAppender(Class<T> appenderClass, String appenderName) { LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); for (Logger logBackLogger : lc.getLoggerList()) @@ -146,15 +160,15 @@ public class LogbackLoggingSupport implements LoggingSupport for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();) { Appender<ILoggingEvent> appender = iterator.next(); - if (appender.getClass() == appenderClass && appender.getName().equals(name)) - return Optional.of(appender); + if (appender.getClass() == appenderClass && appender.getName().equals(appenderName)) + return Optional.of(appenderClass.cast(appender)); } } return Optional.empty(); } - private void checkOnlyOneVirtualTableAppender() + private void checkOnlyOneVirtualTableAppender(Class<?> appenderClass) { int count = 0; LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); @@ -164,7 +178,7 @@ public class LogbackLoggingSupport implements LoggingSupport for (Iterator<Appender<ILoggingEvent>> iterator = logBackLogger.iteratorForAppenders(); iterator.hasNext();) { Appender<?> appender = iterator.next(); - if (appender instanceof VirtualTableAppender) + if (appenderClass.isAssignableFrom(appender.getClass())) { virtualAppenderNames.add(appender.getName()); count += 1; @@ -174,7 +188,7 @@ public class LogbackLoggingSupport implements LoggingSupport if (count > 1) throw new IllegalStateException(String.format("There are multiple appenders of class %s of names %s. There is only one appender of such class allowed.", - VirtualTableAppender.class.getName(), String.join(",", virtualAppenderNames))); + appenderClass.getName(), String.join(",", virtualAppenderNames))); } private boolean hasAppenders(Logger logBackLogger) diff --git a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java index 35e11975f9..00b40cb966 100644 --- a/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java +++ b/src/java/org/apache/cassandra/utils/logging/LoggingSupport.java @@ -21,6 +21,7 @@ package org.apache.cassandra.utils.logging; import java.util.Map; import java.util.Optional; +import ch.qos.logback.classic.Logger; import ch.qos.logback.core.Appender; /** @@ -53,7 +54,12 @@ public interface LoggingSupport */ Map<String, String> getLoggingLevels(); - default Optional<Appender<?>> getAppender(Class<?> appenderClass, String appenderName) + default <T extends Appender<?>> Optional<T> getAppender(Class<T> appenderClass, String appenderName) + { + return Optional.empty(); + } + + default Optional<Logger> getLogger(String loggerName) { return Optional.empty(); } diff --git a/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java b/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java new file mode 100644 index 0000000000..4af2e38307 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/logging/SlowQueriesAppender.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils.logging; + +import ch.qos.logback.classic.spi.LoggingEvent; +import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable; +import org.apache.cassandra.db.virtual.SlowQueriesTable; + +public final class SlowQueriesAppender extends AbstractVirtualTableAppender +{ + public static final String APPENDER_NAME = "SLOW_QUERIES_APPENDER"; + + private AbstractLoggerVirtualTable<?> slowQueries; + + public SlowQueriesAppender() + { + super(SlowQueriesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS); + } + + @Override + protected void append(LoggingEvent eventObject) + { + // slowQueries will be null as long as virtual tables + // are not registered, and we already try to put queries there. + // As soon as vtable is registered (as part of node's startup / initialisation), + // slow queries will never be null again + slowQueries = appendToVirtualTable(slowQueries, eventObject, SlowQueriesTable.TABLE_NAME); + } +} diff --git a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java index 2820b2936f..03a142004a 100644 --- a/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java +++ b/src/java/org/apache/cassandra/utils/logging/VirtualTableAppender.java @@ -18,111 +18,35 @@ package org.apache.cassandra.utils.logging; -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; import java.util.Set; import com.google.common.collect.ImmutableSet; import ch.qos.logback.classic.spi.LoggingEvent; -import ch.qos.logback.core.AppenderBase; import org.apache.cassandra.audit.FileAuditLogger; +import org.apache.cassandra.db.virtual.AbstractLoggerVirtualTable; import org.apache.cassandra.db.virtual.LogMessagesTable; -import org.apache.cassandra.db.virtual.VirtualKeyspace; -import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; -import org.apache.cassandra.db.virtual.VirtualTable; - -import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS; -import static org.apache.cassandra.db.virtual.LogMessagesTable.TABLE_NAME; -import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS; /** * Appends Cassandra logs to virtual table system_views.system_logs */ -public final class VirtualTableAppender extends AppenderBase<LoggingEvent> +public final class VirtualTableAppender extends AbstractVirtualTableAppender { public static final String APPENDER_NAME = "CQLLOG"; private static final Set<String> forbiddenLoggers = ImmutableSet.of(FileAuditLogger.class.getName()); - private LogMessagesTable logs; - - // for holding messages until virtual registry contains logs virtual table - // as it takes some time during startup of a node to initialise virtual tables but messages are - // logged already - private final List<LoggingEvent> messageBuffer = new LinkedList<>(); + private AbstractLoggerVirtualTable<?> logs; - @Override - protected void append(LoggingEvent eventObject) + public VirtualTableAppender() { - if (!forbiddenLoggers.contains(eventObject.getLoggerName())) - { - if (logs == null) - { - logs = getVirtualTable(); - if (logs == null) - addToBuffer(eventObject); - else - logs.add(eventObject); - } - else - logs.add(eventObject); - } + super(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS); } @Override - public void stop() - { - messageBuffer.clear(); - super.stop(); - } - - /** - * Flushes all logs which were appended before virtual table was registered. - * - * @see org.apache.cassandra.service.CassandraDaemon#setupVirtualKeyspaces - */ - public void flushBuffer() - { - Optional.ofNullable(getVirtualTable()).ifPresent(vtable -> { - messageBuffer.forEach(vtable::add); - messageBuffer.clear(); - }); - } - - private LogMessagesTable getVirtualTable() - { - VirtualKeyspace keyspace = VirtualKeyspaceRegistry.instance.getKeyspaceNullable(VIRTUAL_VIEWS); - - if (keyspace == null) - return null; - - Optional<VirtualTable> logsTable = keyspace.tables() - .stream() - .filter(vt -> vt.name().equals(TABLE_NAME)) - .findFirst(); - - if (!logsTable.isPresent()) - return null; - - VirtualTable vt = logsTable.get(); - - if (!(vt instanceof LogMessagesTable)) - throw new IllegalStateException(String.format("Virtual table %s.%s is not backed by an instance of %s but by %s", - VIRTUAL_VIEWS, - TABLE_NAME, - LogMessagesTable.class.getName(), - vt.getClass().getName())); - - return (LogMessagesTable) vt; - } - - private void addToBuffer(LoggingEvent eventObject) + protected void append(LoggingEvent eventObject) { - // we restrict how many logging events we can put into buffer, - // so we are not growing without any bound when things go south - if (messageBuffer.size() < LOGS_VIRTUAL_TABLE_DEFAULT_ROWS) - messageBuffer.add(eventObject); + if (!forbiddenLoggers.contains(eventObject.getLoggerName())) + logs = appendToVirtualTable(logs, eventObject, LogMessagesTable.TABLE_NAME); } } diff --git a/test/conf/logback-dtest_with_slow_query_appender.xml b/test/conf/logback-dtest_with_slow_query_appender.xml new file mode 100644 index 0000000000..1b6ed7511f --- /dev/null +++ b/test/conf/logback-dtest_with_slow_query_appender.xml @@ -0,0 +1,63 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" /> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender"> + <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + </encoder> + <immediateFlush>true</immediateFlush> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> + + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <appender name="SLOW_QUERIES_APPENDER" class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> + + <logger name="slow_operations" additivity="false" level="DEBUG"> + <appender-ref ref="SLOW_QUERIES_APPENDER"/> + </logger> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching --> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> + </root> +</configuration> diff --git a/test/conf/logback-dtest_with_slow_query_appender_invalid.xml b/test/conf/logback-dtest_with_slow_query_appender_invalid.xml new file mode 100644 index 0000000000..a2252dd23a --- /dev/null +++ b/test/conf/logback-dtest_with_slow_query_appender_invalid.xml @@ -0,0 +1,73 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration debug="false" scan="true" scanPeriod="60 seconds"> + <define name="cluster_id" class="org.apache.cassandra.distributed.impl.ClusterIDDefiner" /> + <define name="instance_id" class="org.apache.cassandra.distributed.impl.InstanceIDDefiner" /> + + <!-- Shutdown hook ensures that async appender flushes --> + <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + + <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender"> + <file>./build/test/logs/${cassandra.testtag}/${suitename}/${cluster_id}/${instance_id}/system.log</file> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %msg%n</pattern> + </encoder> + <immediateFlush>true</immediateFlush> + </appender> + + <appender name="INSTANCESTDERR" target="System.err" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>WARN</level> + </filter> + </appender> + + <appender name="INSTANCESTDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - %msg%n</pattern> + </encoder> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>DEBUG</level> + </filter> + </appender> + + <appender name="SLOW_QUERIES_APPENDER" class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> + <appender name="SLOW_QUERIES_APPENDER_2" class="org.apache.cassandra.utils.logging.SlowQueriesAppender"/> + + <appender name="CQLLOG" class="org.apache.cassandra.utils.logging.VirtualTableAppender"> + <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> + <level>INFO</level> + </filter> + </appender> + + <!-- multiple appenders are not allowed --> + <logger name="slow_operations" additivity="false" level="DEBUG"> + <appender-ref ref="SLOW_QUERIES_APPENDER"/> + <appender-ref ref="SLOW_QUERIES_APPENDER_2"/> + </logger> + + <root level="DEBUG"> + <appender-ref ref="INSTANCEFILE" /> <!-- use blocking to avoid race conditions with appending and searching --> + <appender-ref ref="INSTANCESTDERR" /> + <appender-ref ref="INSTANCESTDOUT" /> + <appender-ref ref="CQLLOG" /> + </root> +</configuration> diff --git a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java b/test/distributed/org/apache/cassandra/distributed/test/AbstractVirtualLogsTableTest.java similarity index 69% copy from src/java/org/apache/cassandra/db/monitoring/Monitorable.java copy to test/distributed/org/apache/cassandra/distributed/test/AbstractVirtualLogsTableTest.java index 10bd10438a..c8bb32fe72 100644 --- a/src/java/org/apache/cassandra/db/monitoring/Monitorable.java +++ b/test/distributed/org/apache/cassandra/distributed/test/AbstractVirtualLogsTableTest.java @@ -16,21 +16,19 @@ * limitations under the License. */ -package org.apache.cassandra.db.monitoring; +package org.apache.cassandra.distributed.test; -public interface Monitorable -{ - String name(); - long creationTimeNanos(); - long timeoutNanos(); - long slowTimeoutNanos(); +import org.junit.Ignore; + +import static java.lang.String.format; - boolean isInProgress(); - boolean isAborted(); - boolean isCompleted(); - boolean isSlow(); - boolean isCrossNode(); +@Ignore +public abstract class AbstractVirtualLogsTableTest extends TestBaseImpl +{ + public String query(String template) + { + return format(template, getTableName()); + } - boolean abort(); - boolean complete(); + public abstract String getTableName(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/SlowQueriesAppenderTest.java b/test/distributed/org/apache/cassandra/distributed/test/SlowQueriesAppenderTest.java new file mode 100644 index 0000000000..d8b6b3f00f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/SlowQueriesAppenderTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.Test; + +import org.apache.cassandra.db.virtual.SlowQueriesTable; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.shared.WithProperties; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.utils.logging.SlowQueriesAppender; + +import static java.lang.String.format; +import static org.apache.cassandra.config.CassandraRelevantProperties.LOGBACK_CONFIGURATION_FILE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * It is inherently tricky / flaky to make some queries to be slow so we just test + * the invalid configuration otherwise the table as such is tested in {@link org.apache.cassandra.db.virtual.SlowQueriesTableTest}. + */ +public class SlowQueriesAppenderTest extends AbstractVirtualLogsTableTest +{ + @Test + public void testMultipleAppendersFailToStartNode() throws Throwable + { + LOGBACK_CONFIGURATION_FILE.setString("test/conf/logback-dtest_with_slow_query_appender_invalid.xml"); + + // NOTE: Because cluster startup is expected to fail in this case, and can leave things in a weird state + // for the next state, create without starting, and set failure as shutdown to false, + // so the try-with-resources can close instances properly. + try (WithProperties properties = new WithProperties().set(LOGBACK_CONFIGURATION_FILE, "test/conf/logback-dtest_with_slow_query_appender_invalid.xml"); + Cluster cluster = Cluster.build(1) + .withConfig(c -> c.with(Feature.values()) + .set(Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN, false)) + .createWithoutStarting()) + { + cluster.startup(); + fail("Node should not start as there is supposed to be invalid logback configuration file."); + } + catch (IllegalStateException ex) + { + assertEquals(format("There are multiple appenders of class %s " + + "of names SLOW_QUERIES_APPENDER,SLOW_QUERIES_APPENDER_2. There is only one appender of such class allowed.", + SlowQueriesAppender.class.getName()), + ex.getMessage()); + } + } + + @Override + public String getTableName() + { + return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, SlowQueriesTable.TABLE_NAME); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/SlowQueryDeserTest.java b/test/distributed/org/apache/cassandra/distributed/test/SlowQueryDeserTest.java new file mode 100644 index 0000000000..ae5bcc966c --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/SlowQueryDeserTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.Collection; +import java.util.List; + +import org.junit.Test; + +import org.apache.cassandra.db.monitoring.MonitorableImpl; +import org.apache.cassandra.db.monitoring.MonitoringTask; +import org.apache.cassandra.db.monitoring.MonitoringTask.SlowOperation; +import org.apache.cassandra.utils.Clock; + +public class SlowQueryDeserTest +{ + @Test + public void testSlowQueryDeser() throws Throwable + { + SlowOperation slowOperation = new SlowOperation(new MonitorableImpl() + { + @Override + public String name() + { + return String.format("select * from %s.%s where id = 5", monitoredOnKeyspace(), monitoredOnTable()); + } + + @Override + public String monitoredOnKeyspace() + { + return "ks"; + } + + @Override + public String monitoredOnTable() + { + return "tb"; + } + + @Override + public boolean isCrossNode() + { + return true; + } + }, Clock.Global.currentTimeMillis()); + + String serialize = MonitoringTask.Operation.serialize(List.of(slowOperation)); + Collection<MonitoringTask.Operation> deserialize = MonitoringTask.Operation.deserialize(serialize); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java index 71ef4dbe78..bf9f58123e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/VirtualTableLogsTest.java @@ -25,7 +25,6 @@ import org.junit.Test; import ch.qos.logback.classic.Level; import org.apache.cassandra.db.virtual.LogMessagesTable; -import org.apache.cassandra.db.virtual.LogMessagesTable.LogMessage; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.Constants; import org.apache.cassandra.distributed.api.Feature; @@ -47,8 +46,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class VirtualTableLogsTest extends TestBaseImpl +public class VirtualTableLogsTest extends AbstractVirtualLogsTableTest { + @Override + public String getTableName() + { + return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, LogMessagesTable.TABLE_NAME); + } + @Test public void testVTableOutput() throws Throwable { @@ -56,9 +61,9 @@ public class VirtualTableLogsTest extends TestBaseImpl Cluster cluster = Cluster.build(1) .withConfig(c -> c.with(Feature.values())) .start(); - ) + ) { - List<TestingLogMessage> rows = getRows(cluster); + List<LogMessage> rows = getRows(cluster); assertFalse(rows.isEmpty()); rows.forEach(message -> assertTrue(Level.toLevel(message.level).isGreaterOrEqual(Level.INFO))); @@ -91,39 +96,36 @@ public class VirtualTableLogsTest extends TestBaseImpl } } - private List<TestingLogMessage> getRows(Cluster cluster) + private List<LogMessage> getRows(Cluster cluster) { SimpleQueryResult simpleQueryResult = cluster.coordinator(1).executeWithResult(query("select * from %s"), ONE); - List<TestingLogMessage> rows = new ArrayList<>(); + List<LogMessage> rows = new ArrayList<>(); simpleQueryResult.forEachRemaining(row -> { long timestamp = row.getTimestamp(TIMESTAMP_COLUMN_NAME).getTime(); String logger = row.getString(LOGGER_COLUMN_NAME); String level = row.getString(LEVEL_COLUMN_NAME); String message = row.getString(MESSAGE_COLUMN_NAME); int order = row.getInteger(ORDER_IN_MILLISECOND_COLUMN_NAME); - TestingLogMessage logMessage = new TestingLogMessage(timestamp, logger, level, message, order); + LogMessage logMessage = new LogMessage(timestamp, logger, level, message, order); rows.add(logMessage); }); return rows; } - private String query(String template) - { - return format(template, getTableName()); - } - - private String getTableName() - { - return format("%s.%s", SchemaConstants.VIRTUAL_VIEWS, LogMessagesTable.TABLE_NAME); - } - - private static class TestingLogMessage extends LogMessage + private static class LogMessage { - private int order; + public final long timestamp; + public final String logger; + public final String level; + public final String message; + public final int order; - public TestingLogMessage(long timestamp, String logger, String level, String message, int order) + public LogMessage(long timestamp, String logger, String level, String message, int order) { - super(timestamp, logger, level, message); + this.timestamp = timestamp; + this.logger = logger; + this.level = level; + this.message = message; this.order = order; } } diff --git a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTableTest.java similarity index 55% copy from test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java copy to test/unit/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTableTest.java index dd32058533..40a926fda7 100644 --- a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AbstractLoggerVirtualTableTest.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import com.google.common.collect.ImmutableList; +import org.junit.Ignore; import org.junit.Test; import ch.qos.logback.classic.Level; @@ -32,24 +33,21 @@ import ch.qos.logback.classic.spi.LoggingEvent; import com.datastax.driver.core.Row; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.marshal.TimestampType; -import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet; -import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition; -import org.apache.cassandra.dht.LocalPartitioner; -import static org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class LogMessagesTableTest extends CQLTester +@Ignore +public abstract class AbstractLoggerVirtualTableTest<U> extends CQLTester { - private String keyspace = createKeyspaceName(); - private LogMessagesTable table; + protected final String keyspace = createKeyspaceName(); + + protected AbstractLoggerVirtualTable<U> table; @Test - public void testTruncate() throws Throwable + public void testTruncate() { - registerVirtualTable(); + registerTable(); int numberOfRows = 100; List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); @@ -61,28 +59,28 @@ public class LogMessagesTableTest extends CQLTester } @Test - public void empty() throws Throwable + public void testEmpty() throws Throwable { - registerVirtualTable(); + registerTable(); assertEmpty(execute(query("select * from %s"))); } @Test public void testInsert() { - registerVirtualTable(); + registerTable(); int numberOfRows = 1000; List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); loggingEvents.forEach(table::add); - assertEquals(numberOfRows, numberOfPartitions()); + assertEquals(numberOfRows, execute(query("select * from %s")).size()); } @Test - public void testLimitedCapacity() throws Throwable + public void testLimitedCapacity() { - registerVirtualTable(100); + registerTable(100); int numberOfRows = 1000; List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); @@ -94,7 +92,7 @@ public class LogMessagesTableTest extends CQLTester // the first record in the table will be the last one which we inserted LoggingEvent firstEvent = loggingEvents.get(999); assertRowsNet(executeNet(query("select timestamp from %s limit 1")), - new Object[] { new Date(firstEvent.getTimeStamp()) }); + new Object[]{ new Date(firstEvent.getTimeStamp()) }); // the last record in the table will be 900th we inserted List<Row> all = executeNet(query("select timestamp from %s")).all(); @@ -104,100 +102,68 @@ public class LogMessagesTableTest extends CQLTester assertEquals(loggingEvents.get(900).getTimeStamp(), timestamp.getTime()); } - @Test - public void testMultipleLogsInSameMillisecond() - { - registerVirtualTable(10); - List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 5); - loggingEvents.forEach(table::add); - - // 2 partitions, 5 rows in each - assertEquals(2, numberOfPartitions()); - } - - @Test - public void testResolvingBufferSize() - { - LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(-1); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); - - LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(0); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); - - LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(1000001); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + protected abstract void registerTable(int maxSize); - LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(999); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + protected abstract void registerTable(); - LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(50001); - assertEquals(50001, LogMessagesTable.resolveBufferSize()); - } - - private void registerVirtualTable() - { - registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS); - } - - private void registerVirtualTable(int size) + protected void registerVirtualTable(AbstractLoggerVirtualTable<U> table) { - table = new LogMessagesTable(keyspace, size); - VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(keyspace, ImmutableList.of(table))); + this.table = table; + VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(table.metadata.keyspace, ImmutableList.of(this.table))); } - private int numberOfPartitions() - { - DataSet data = table.data(); - - Iterator<Partition> partitions = data.getPartitions(DataRange.allData(new LocalPartitioner(TimestampType.instance))); - - int numberOfPartitions = 0; - - while (partitions.hasNext()) - { - partitions.next(); - numberOfPartitions += 1; - } - - return numberOfPartitions; - } - - private String query(String query) + protected String query(String query) { return String.format(query, table.toString()); } - private List<LoggingEvent> getLoggingEvents(int size) + protected List<LoggingEvent> getLoggingEvents(int size) { return getLoggingEvents(size, Instant.now(), 1); } - private List<LoggingEvent> getLoggingEvents(int size, Instant firstTimestamp, int logsInMillisecond) + protected List<LoggingEvent> getLoggingEvents(int size, Instant firstTimestamp, int logsInMillisecond) { List<LoggingEvent> logs = new LinkedList<>(); int partitions = size / logsInMillisecond; for (int i = 0; i < partitions; i++) { - long timestamp = firstTimestamp.toEpochMilli(); - firstTimestamp = firstTimestamp.plusSeconds(1); + firstTimestamp = firstTimestamp.plusSeconds(i); for (int j = 0; j < logsInMillisecond; j++) - logs.add(getLoggingEvent(timestamp)); + logs.add(getLoggingEvent(firstTimestamp.toEpochMilli())); } return logs; } - private LoggingEvent getLoggingEvent(long timestamp) + protected int numberOfPartitions() + { + AbstractVirtualTable.DataSet data = table.data(); + Iterator<AbstractVirtualTable.Partition> partitions = data.getPartitions(DataRange.allData(table.metadata.partitioner)); + int numberOfPartitions = 0; + + while (partitions.hasNext()) + { + partitions.next(); + numberOfPartitions += 1; + } + + return numberOfPartitions; + } + + protected LoggingEvent getLoggingEvent(long timestamp) { LoggingEvent event = new LoggingEvent(); event.setLevel(Level.INFO); - event.setMessage("message " + timestamp); - event.setLoggerName("logger " + timestamp); + event.setMessage(getMessage(timestamp)); + event.setLoggerName(AbstractLoggerVirtualTableTest.class.getName()); event.setThreadName(Thread.currentThread().getName()); event.setTimeStamp(timestamp); return event; } + + protected abstract String getMessage(long timestamp); } diff --git a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java index dd32058533..7025e8ad4c 100644 --- a/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/LogMessagesTableTest.java @@ -20,69 +20,34 @@ package org.apache.cassandra.db.virtual; import java.time.Instant; import java.util.Date; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import com.google.common.collect.ImmutableList; import org.junit.Test; -import ch.qos.logback.classic.Level; import ch.qos.logback.classic.spi.LoggingEvent; import com.datastax.driver.core.Row; -import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.marshal.TimestampType; -import org.apache.cassandra.db.virtual.AbstractVirtualTable.DataSet; -import org.apache.cassandra.db.virtual.AbstractVirtualTable.Partition; -import org.apache.cassandra.dht.LocalPartitioner; import static org.apache.cassandra.config.CassandraRelevantProperties.LOGS_VIRTUAL_TABLE_MAX_ROWS; +import static org.apache.cassandra.db.virtual.LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -public class LogMessagesTableTest extends CQLTester +public class LogMessagesTableTest extends AbstractLoggerVirtualTableTest<LoggingEvent> { - private String keyspace = createKeyspaceName(); - private LogMessagesTable table; - - @Test - public void testTruncate() throws Throwable - { - registerVirtualTable(); - - int numberOfRows = 100; - List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); - loggingEvents.forEach(table::add); - - execute(query("truncate %s")); - - assertTrue(executeNet(query("select timestamp from %s")).all().isEmpty()); - } - - @Test - public void empty() throws Throwable - { - registerVirtualTable(); - assertEmpty(execute(query("select * from %s"))); - } - @Test - public void testInsert() + public void testMultipleLogsInSameMillisecond() { - registerVirtualTable(); - - int numberOfRows = 1000; - List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); + registerTable(); + List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 5); loggingEvents.forEach(table::add); - assertEquals(numberOfRows, numberOfPartitions()); + // 2 partitions, 5 rows in each + assertEquals(2, numberOfPartitions()); } @Test - public void testLimitedCapacity() throws Throwable + public void testLimitedCapacity() { - registerVirtualTable(100); + registerTable(100); int numberOfRows = 1000; List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); @@ -94,7 +59,7 @@ public class LogMessagesTableTest extends CQLTester // the first record in the table will be the last one which we inserted LoggingEvent firstEvent = loggingEvents.get(999); assertRowsNet(executeNet(query("select timestamp from %s limit 1")), - new Object[] { new Date(firstEvent.getTimeStamp()) }); + new Object[]{ new Date(firstEvent.getTimeStamp()) }); // the last record in the table will be 900th we inserted List<Row> all = executeNet(query("select timestamp from %s")).all(); @@ -104,100 +69,47 @@ public class LogMessagesTableTest extends CQLTester assertEquals(loggingEvents.get(900).getTimeStamp(), timestamp.getTime()); } - @Test - public void testMultipleLogsInSameMillisecond() - { - registerVirtualTable(10); - List<LoggingEvent> loggingEvents = getLoggingEvents(10, Instant.now(), 5); - loggingEvents.forEach(table::add); - - // 2 partitions, 5 rows in each - assertEquals(2, numberOfPartitions()); - } - @Test public void testResolvingBufferSize() { LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(-1); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize()); LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(0); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize()); LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(1000001); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize()); LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(999); - assertEquals(LogMessagesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, LogMessagesTable.resolveBufferSize()); + assertEquals(999, resolveBufferSize()); LOGS_VIRTUAL_TABLE_MAX_ROWS.setInt(50001); - assertEquals(50001, LogMessagesTable.resolveBufferSize()); + assertEquals(50001, resolveBufferSize()); } - private void registerVirtualTable() + private int resolveBufferSize() { - registerVirtualTable(LogMessagesTable.LOGS_VIRTUAL_TABLE_MIN_ROWS); + return AbstractLoggerVirtualTable.resolveBufferSize(LOGS_VIRTUAL_TABLE_MAX_ROWS.getInt(), + LogMessagesTable.LOGS_VIRTUAL_TABLE_MAX_ROWS, + LOGS_VIRTUAL_TABLE_DEFAULT_ROWS); } - private void registerVirtualTable(int size) + @Override + protected void registerTable(int maxSize) { - table = new LogMessagesTable(keyspace, size); - VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(keyspace, ImmutableList.of(table))); + registerVirtualTable(new LogMessagesTable(keyspace, maxSize)); } - private int numberOfPartitions() + @Override + protected void registerTable() { - DataSet data = table.data(); - - Iterator<Partition> partitions = data.getPartitions(DataRange.allData(new LocalPartitioner(TimestampType.instance))); - - int numberOfPartitions = 0; - - while (partitions.hasNext()) - { - partitions.next(); - numberOfPartitions += 1; - } - - return numberOfPartitions; - } - - private String query(String query) - { - return String.format(query, table.toString()); - } - - private List<LoggingEvent> getLoggingEvents(int size) - { - return getLoggingEvents(size, Instant.now(), 1); - } - - private List<LoggingEvent> getLoggingEvents(int size, Instant firstTimestamp, int logsInMillisecond) - { - List<LoggingEvent> logs = new LinkedList<>(); - int partitions = size / logsInMillisecond; - - for (int i = 0; i < partitions; i++) - { - long timestamp = firstTimestamp.toEpochMilli(); - firstTimestamp = firstTimestamp.plusSeconds(1); - - for (int j = 0; j < logsInMillisecond; j++) - logs.add(getLoggingEvent(timestamp)); - } - - return logs; + registerTable(1000); } - private LoggingEvent getLoggingEvent(long timestamp) + @Override + protected String getMessage(long timestamp) { - LoggingEvent event = new LoggingEvent(); - event.setLevel(Level.INFO); - event.setMessage("message " + timestamp); - event.setLoggerName("logger " + timestamp); - event.setThreadName(Thread.currentThread().getName()); - event.setTimeStamp(timestamp); - - return event; + return "message " + timestamp; } } diff --git a/test/unit/org/apache/cassandra/db/virtual/SlowQueriesTableTest.java b/test/unit/org/apache/cassandra/db/virtual/SlowQueriesTableTest.java new file mode 100644 index 0000000000..1c261e7cc9 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/virtual/SlowQueriesTableTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.virtual; + +import java.util.List; +import java.util.Random; + +import org.junit.Test; + +import ch.qos.logback.classic.spi.LoggingEvent; +import org.apache.cassandra.db.monitoring.MonitorableImpl; +import org.apache.cassandra.db.monitoring.MonitoringTask; +import org.apache.cassandra.db.monitoring.MonitoringTask.Operation; +import org.apache.cassandra.utils.Generators; +import org.quicktheories.impl.JavaRandom; + +import static org.apache.cassandra.config.CassandraRelevantProperties.LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS; +import static org.apache.cassandra.db.virtual.SlowQueriesTable.LOGS_VIRTUAL_TABLE_DEFAULT_ROWS; +import static org.apache.cassandra.db.virtual.SlowQueriesTable.LOGS_VIRTUAL_TABLE_MAX_ROWS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class SlowQueriesTableTest extends AbstractLoggerVirtualTableTest<Operation> +{ + private final Random random = new Random(); + private final JavaRandom javaRandom = new JavaRandom(random); + + @Override + protected void registerTable(int maxSize) + { + registerVirtualTable(new SlowQueriesTable(keyspace, maxSize)); + } + + @Override + protected void registerTable() + { + registerTable(1000); + } + + @Test + public void testLimitedCapacity() + { + registerTable(100); + + int numberOfRows = 1000; + List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); + assertEquals(1000, loggingEvents.size()); + loggingEvents.forEach(table::add); + + // even we inserted 1000 rows, only 100 are present as its capacity is bounded + assertEquals(100, executeNet(query("select * from %s")).all().size()); + } + + @Test + public void testDelete() + { + registerTable(); + + int numberOfRows = 100; + List<LoggingEvent> loggingEvents = getLoggingEvents(numberOfRows); + loggingEvents.forEach(table::add); + + Operation operation = table.buffer.get(0); + + assertEquals(100, executeNet(query("select * from %s")).all().size()); + execute(query("delete from %s where keyspace_name = '" + operation.keyspace() + '\'')); + assertTrue(executeNet(query("select * from %s")).all().size() < 100); + } + + @Test + public void testResolvingBufferSize() + { + LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(-1); + assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize()); + + LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(0); + assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize()); + + LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(1000001); + assertEquals(LOGS_VIRTUAL_TABLE_DEFAULT_ROWS, resolveBufferSize()); + + LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(999); + assertEquals(999, resolveBufferSize()); + + LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.setInt(50001); + assertEquals(50001, resolveBufferSize()); + } + + private int resolveBufferSize() + { + return AbstractLoggerVirtualTable.resolveBufferSize(LOGS_SLOW_QUERIES_VIRTUAL_TABLE_MAX_ROWS.getInt(), + LOGS_VIRTUAL_TABLE_MAX_ROWS, + LOGS_VIRTUAL_TABLE_DEFAULT_ROWS); + } + + + @Override + protected String getMessage(long timestamp) + { + MonitoringTask.SlowOperation slowOperation = new MonitoringTask.SlowOperation(new MonitorableImpl() + { + @Override + public String name() + { + return Generators.SYMBOL_GEN.generate(javaRandom); + } + + @Override + public String monitoredOnKeyspace() + { + return Generators.SYMBOL_GEN.generate(javaRandom); + } + + @Override + public String monitoredOnTable() + { + return Generators.SYMBOL_GEN.generate(javaRandom); + } + + @Override + public boolean isCrossNode() + { + return random.nextBoolean(); + } + }, timestamp); + + return Operation.serialize(List.of(slowOperation)); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org