This is an automated email from the ASF dual-hosted git repository. frankgh pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git
The following commit(s) were added to refs/heads/trunk by this push: new b4fb9771 CASSSIDECAR-180: Changes to add endpoint for stream stats (#166) b4fb9771 is described below commit b4fb9771a098b303b59cc0a058c76ca93df90409 Author: Arjun Ashok <arjun_as...@apple.com> AuthorDate: Wed Jan 29 09:25:37 2025 -0800 CASSSIDECAR-180: Changes to add endpoint for stream stats (#166) Patch by Arjun Ashok; Reviewed by Bernardo Botella, Yifan Cai, Francisco Guerrero for CASSSIDECAR-180 --- CHANGES.txt | 1 + .../sidecar/adapters/base/CassandraAdapter.java | 2 +- .../adapters/base/CassandraMetricsOperations.java | 66 ++++++- .../base/GossipDependentStorageJmxOperations.java | 12 +- .../adapters/base/StorageJmxOperations.java | 2 +- .../adapters/base/StreamManagerJmxOperations.java | 20 ++- .../adapters/base/data/CompositeDataUtil.java | 53 ++++++ .../sidecar/adapters/base/data/ProgressInfo.java | 54 ++++++ .../sidecar/adapters/base/data/SessionInfo.java | 161 +++++++++++++++++ .../sidecar/adapters/base/data/StreamState.java | 31 +++- .../sidecar/adapters/base/data/StreamSummary.java | 23 ++- .../cassandra/sidecar/common/ApiEndpointsV1.java | 1 + .../sidecar/common/request/StreamStatsRequest.java | 26 ++- .../common/response/StreamStatsResponse.java | 60 +++++++ .../common/response/data/StreamsProgressStats.java | 107 +++++++++++ .../cassandra/sidecar/client/RequestContext.java | 14 ++ .../cassandra/sidecar/client/SidecarClient.java | 14 ++ .../sidecar/client/SidecarClientTest.java | 22 +++ .../sidecar/common/server/MetricsOperations.java | 6 + .../acl/authorization/BasicPermissions.java | 3 + .../sidecar/routes/StreamStatsHandler.java | 94 ++++++++++ .../cassandra/sidecar/server/MainModule.java | 7 + .../sidecar/routes/StreamStatsIntegrationTest.java | 196 +++++++++++++++++++++ .../sidecar/routes/StreamStatsHandlerTest.java | 158 +++++++++++++++++ 24 files changed, 1091 insertions(+), 42 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 4d9d25de..11992ddf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,6 @@ 1.0.0 ----- + * Add sidecar endpoint to retrieve stream stats (CASSSIDECAR-180) * Add sidecar endpoint to retrieve cassandra gossip health (CASSSIDECAR-173) * Fix SidecarSchema stuck at initialization due to ClusterLeaseTask scheduling (CASSSIDECAR-189) * Add RBAC Authorization support in Sidecar (CASSSIDECAR-161) diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java index f9a1f5c3..f5c16288 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java @@ -127,7 +127,7 @@ public class CassandraAdapter implements ICassandraAdapter @NotNull public MetricsOperations metricsOperations() { - return new CassandraMetricsOperations(cqlSessionProvider); + return new CassandraMetricsOperations(jmxClient, cqlSessionProvider); } /** diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java index cc3ff9fc..8ce643c3 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java @@ -18,33 +18,52 @@ package org.apache.cassandra.sidecar.adapters.base; +import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; +import javax.management.openmbean.CompositeData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.sidecar.adapters.base.data.SessionInfo; +import org.apache.cassandra.sidecar.adapters.base.data.StreamState; import org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStats; import org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsDatabaseAccessor; import org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsSummary; import org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema; import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.common.server.JmxClient; import org.apache.cassandra.sidecar.common.server.MetricsOperations; import org.jetbrains.annotations.NotNull; +import static org.apache.cassandra.sidecar.adapters.base.StreamManagerJmxOperations.STREAM_MANAGER_OBJ_NAME; + /** * Default implementation that pulls methods from the Cassandra Metrics Proxy */ public class CassandraMetricsOperations implements MetricsOperations { + private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMetricsOperations.class); private final ConnectedClientStatsDatabaseAccessor dbAccessor; + protected final JmxClient jmxClient; + + /** * Creates a new instance with the provided {@link CQLSessionProvider} */ - public CassandraMetricsOperations(CQLSessionProvider session) + public CassandraMetricsOperations(JmxClient jmxClient, CQLSessionProvider session) { + this.jmxClient = jmxClient; this.dbAccessor = new ConnectedClientStatsDatabaseAccessor(session, new ConnectedClientsSchema()); } @@ -70,6 +89,51 @@ public class CassandraMetricsOperations implements MetricsOperations return new ConnectedClientStatsResponse(entries, totalConnectedClients, connectionsByUser); } + /** + * {@inheritDoc} + */ + @Override + public StreamsProgressStats streamsProgressStats() + { + Set<CompositeData> streamData = jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME) + .getCurrentStreams(); + return computeStats(streamData.stream().map(StreamState::new)); + } + + private StreamsProgressStats computeStats(Stream<StreamState> streamStates) + { + Iterator<SessionInfo> sessions = streamStates.map(StreamState::sessions).flatMap(Collection::stream).iterator(); + + long totalFilesToReceive = 0; + long totalFilesReceived = 0; + long totalBytesToReceive = 0; + long totalBytesReceived = 0; + + long totalFilesToSend = 0; + long totalFilesSent = 0; + long totalBytesToSend = 0; + long totalBytesSent = 0; + + while (sessions.hasNext()) + { + SessionInfo sessionInfo = sessions.next(); + totalBytesToReceive += sessionInfo.totalSizeToReceive(); + totalBytesReceived += sessionInfo.totalSizeReceived(); + totalFilesToReceive += sessionInfo.totalFilesToReceive(); + totalFilesReceived += sessionInfo.totalFilesReceived(); + totalBytesToSend += sessionInfo.totalSizeToSend(); + totalBytesSent += sessionInfo.totalSizeSent(); + totalFilesToSend += sessionInfo.totalFilesToSend(); + totalFilesSent += sessionInfo.totalFilesSent(); + } + + LOGGER.debug("Progress Stats: totalBytesToReceive:{} totalBytesReceived:{} totalBytesToSend:{} totalBytesSent:{}", + totalBytesToReceive, totalBytesReceived, totalBytesToSend, totalBytesSent); + return new StreamsProgressStats(totalFilesToReceive, totalFilesReceived, totalBytesToReceive, totalBytesReceived, + totalFilesToSend, totalFilesSent, totalBytesToSend, totalBytesSent); + + } + private ConnectedClientStatsResponse connectedClientSummary() { ConnectedClientStatsSummary summary = dbAccessor.summary(); diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java index bbdb29c4..350096bf 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java @@ -146,6 +146,12 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables); } + @Override + public String getOperationMode() + { + return delegate.getOperationMode(); + } + /** * Ensures that gossip is running on the Cassandra instance * @@ -165,10 +171,4 @@ public class GossipDependentStorageJmxOperations implements StorageJmxOperations { delegate.decommission(force); } - - @Override - public String getOperationMode() - { - return delegate.getOperationMode(); - } } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java index ca18db04..75332435 100644 --- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java @@ -168,7 +168,7 @@ public interface StorageJmxOperations /** * Fetch the operation-mode of the node - * @return string representation of theoperation-mode + * @return string representation of the operation-mode */ String getOperationMode(); } diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java similarity index 60% copy from server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java copy to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java index ef7a5f95..42347dc0 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java @@ -16,20 +16,22 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.common.server; +package org.apache.cassandra.sidecar.adapters.base; -import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; +import java.util.Set; +import javax.management.openmbean.CompositeData; /** - * An interface that defines interactions with the metrics system in Cassandra. + * An interface that pulls methods from the Cassandra Stream manager Proxy */ -public interface MetricsOperations +public interface StreamManagerJmxOperations { + + String STREAM_MANAGER_OBJ_NAME = "org.apache.cassandra.net:type=StreamManager"; + /** - * Retrieve the connected client stats metrics from the cluster - * @param summaryOnly boolean parameter to list connection summary only - * @return the requested client stats, in full or summary + * Returns the current snapshot of the progress of all ongoing streams. + * @return the current state of streams as a set of JMX {@link CompositeData} instances. */ - ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); - + Set<CompositeData> getCurrentStreams(); } diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/CompositeDataUtil.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/CompositeDataUtil.java new file mode 100644 index 00000000..a033fe09 --- /dev/null +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/CompositeDataUtil.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.NoSuchElementException; +import javax.management.openmbean.CompositeData; + +/** + * Utility class for operations with {@link CompositeData} + */ +public class CompositeDataUtil +{ + + /** + * Generic helper to extract attribute of a specific type from the CompositeData type. + * @param data data being parsed + * @param key attribute being extracted + * @return attribute value + * @param <T> return type + */ + public static <T> T extractValue(CompositeData data, String key) + { + Object value = data.get(key); + if (value == null) + { + throw new NoSuchElementException("No value is present for key: " + key); + } + try + { + return (T) value; + } + catch (ClassCastException cce) + { + throw new RuntimeException("Value type mismatched of key: " + key, cce); + } + } +} diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java new file mode 100644 index 00000000..512e9984 --- /dev/null +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import javax.management.openmbean.CompositeData; + +import static org.apache.cassandra.sidecar.adapters.base.data.CompositeDataUtil.extractValue; + +/** + * Representation of the stream progress info + */ +public class ProgressInfo +{ + public final String peer; + public final int sessionIndex; + public final String fileName; + public final String direction; + public final long currentBytes; + public final long totalBytes; + + public ProgressInfo(CompositeData data) + { + this.peer = extractValue(data, "peer"); + this.sessionIndex = extractValue(data, "sessionIndex"); + this.fileName = extractValue(data, "fileName"); + this.direction = extractValue(data, "direction"); + this.currentBytes = extractValue(data, "currentBytes"); + this.totalBytes = extractValue(data, "totalBytes"); + } + + /** + * @return true if transfer is completed + */ + public boolean isCompleted() + { + return currentBytes >= totalBytes; + } +} diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java new file mode 100644 index 00000000..c9006593 --- /dev/null +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.adapters.base.data; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; + +import static org.apache.cassandra.sidecar.adapters.base.data.CompositeDataUtil.extractValue; + +/** + * Representation of session info data + */ +public class SessionInfo +{ + public final String peer; + public final int sessionIndex; + public final String connecting; + /** Immutable collection of receiving summaries */ + public final List<StreamSummary> receivingSummaries; + /** Immutable collection of sending summaries*/ + public final List<StreamSummary> sendingSummaries; + /** Current session state */ + public final String state; + public final List<ProgressInfo> receivingFiles; + public final List<ProgressInfo> sendingFiles; + + public SessionInfo(CompositeData data) + { + this.peer = extractValue(data, "peer"); + this.sessionIndex = extractValue(data, "sessionIndex"); + this.connecting = extractValue(data, "connecting"); + this.receivingSummaries = parseSummaries(extractValue(data, "receivingSummaries")); + this.sendingSummaries = parseSummaries(extractValue(data, "sendingSummaries")); + this.state = extractValue(data, "state"); + this.receivingFiles = parseFiles(extractValue(data, "receivingFiles")); + this.sendingFiles = parseFiles(extractValue(data, "sendingFiles")); + } + + /** + * @return total size(in bytes) already received. + */ + public long totalSizeReceived() + { + return totalSizeInProgress(receivingFiles); + } + + /** + * @return total size(in bytes) already sent. + */ + public long totalSizeSent() + { + return totalSizeInProgress(sendingFiles); + } + + /** + * @return total number of files to receive in the session + */ + public long totalFilesToReceive() + { + return totalFiles(receivingSummaries); + } + + /** + * @return total number of files to send in the session + */ + public long totalFilesToSend() + { + return totalFiles(sendingSummaries); + } + + /** + * @return total size(in bytes) to receive in the session + */ + public long totalSizeToReceive() + { + return totalSizes(receivingSummaries); + } + + /** + * @return total size(in bytes) to send in the session + */ + public long totalSizeToSend() + { + return totalSizes(sendingSummaries); + } + + /** + * @return total number of files already received. + */ + public long totalFilesReceived() + { + return totalFilesCompleted(receivingFiles); + } + + /** + * @return total number of files already sent. + */ + public long totalFilesSent() + { + return totalFilesCompleted(sendingFiles); + } + + private long totalSizes(List<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.totalSize; + return total; + } + + private long totalFilesCompleted(List<ProgressInfo> files) + { + return files.stream() + .filter(ProgressInfo::isCompleted) + .count(); + } + + private long totalSizeInProgress(List<ProgressInfo> streams) + { + long total = 0; + for (ProgressInfo stream : streams) + total += stream.currentBytes; + return total; + } + + private List<StreamSummary> parseSummaries(CompositeData[] summaries) + { + return Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList()); + } + + private List<ProgressInfo> parseFiles(CompositeData[] files) + { + return Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList()); + } + + private long totalFiles(List<StreamSummary> summaries) + { + long total = 0; + for (StreamSummary summary : summaries) + total += summary.files; + return total; + } +} diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java similarity index 52% copy from server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java copy to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java index ef7a5f95..b6c4d189 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java @@ -16,20 +16,35 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.common.server; +package org.apache.cassandra.sidecar.adapters.base.data; -import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import javax.management.openmbean.CompositeData; /** - * An interface that defines interactions with the metrics system in Cassandra. + * Representation of the stream state data */ -public interface MetricsOperations +public class StreamState { + private final List<SessionInfo> sessions; + + public StreamState(CompositeData data) + { + this.sessions = parseSessions((CompositeData[]) data.get("sessions")); + } + /** - * Retrieve the connected client stats metrics from the cluster - * @param summaryOnly boolean parameter to list connection summary only - * @return the requested client stats, in full or summary + * @return the session info for the sessions in the stream stats data */ - ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); + public List<SessionInfo> sessions() + { + return sessions; + } + private List<SessionInfo> parseSessions(CompositeData[] sessions) + { + return Arrays.stream(sessions).map(SessionInfo::new).collect(Collectors.toList()); + } } diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java similarity index 59% copy from server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java copy to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java index ef7a5f95..0d7a360f 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java +++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java @@ -16,20 +16,27 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.common.server; +package org.apache.cassandra.sidecar.adapters.base.data; -import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; +import javax.management.openmbean.CompositeData; /** - * An interface that defines interactions with the metrics system in Cassandra. + * Representation of the stream summary data */ -public interface MetricsOperations +public class StreamSummary { + public final String tableId; + /** - * Retrieve the connected client stats metrics from the cluster - * @param summaryOnly boolean parameter to list connection summary only - * @return the requested client stats, in full or summary + * Number of files to transfer. Can be 0 if nothing to transfer for some streaming request. */ - ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); + public final int files; + public final long totalSize; + public StreamSummary(CompositeData data) + { + this.tableId = (String) data.get("tableId"); + this.files = (int) data.get("files"); + this.totalSize = (long) data.get("totalSize"); + } } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java index 673f0100..511a5daa 100644 --- a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java @@ -129,6 +129,7 @@ public final class ApiEndpointsV1 public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + CASSANDRA + OPERATIONAL_JOBS; public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + PER_OPERATIONAL_JOB; public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + "/operations/decommission"; + public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/streams"; private ApiEndpointsV1() { diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java similarity index 56% copy from server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java copy to client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java index ef7a5f95..2640f85d 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java @@ -16,20 +16,30 @@ * limitations under the License. */ -package org.apache.cassandra.sidecar.common.server; +package org.apache.cassandra.sidecar.common.request; -import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; +import io.netty.handler.codec.http.HttpMethod; +import org.apache.cassandra.sidecar.common.ApiEndpointsV1; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; /** - * An interface that defines interactions with the metrics system in Cassandra. + * Class response for the StreamsStats API */ -public interface MetricsOperations +public class StreamStatsRequest extends JsonRequest<StreamStatsResponse> { /** - * Retrieve the connected client stats metrics from the cluster - * @param summaryOnly boolean parameter to list connection summary only - * @return the requested client stats, in full or summary + * Constructs a request to retrieve the Cassandra node streaming stats information */ - ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); + public StreamStatsRequest() + { + super(ApiEndpointsV1.STREAM_STATS_ROUTE); + } + /** + * {@inheritDoc} + */ + public HttpMethod method() + { + return HttpMethod.GET; + } } diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java new file mode 100644 index 00000000..396374a9 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; + +/** + * Class response for the StreamStats API + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class StreamStatsResponse +{ + private final String mode; + private final StreamsProgressStats stats; + + /** + * Constructs a new {@link StreamStatsResponse}. + * + * @param mode list of client connection stats + * @param stats stream progress stats for the node + */ + @JsonCreator + public StreamStatsResponse(@JsonProperty("operationMode") String mode, + @JsonProperty("streamsProgressStats") StreamsProgressStats stats) + { + this.mode = mode; + this.stats = stats; + } + + @JsonProperty("operationMode") + public String operationMode() + { + return mode; + } + + @JsonProperty("streamsProgressStats") + public StreamsProgressStats streamsProgressStats() + { + return stats; + } +} diff --git a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java new file mode 100644 index 00000000..88d74077 --- /dev/null +++ b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.common.response.data; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * A class representing stats summarizing the progress of streamed bytes and files on the node + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class StreamsProgressStats +{ + private final long totalFilesToReceive; + private final long totalFilesReceived; + private final long totalBytesToReceive; + private final long totalBytesReceived; + private final long totalFilesToSend; + private final long totalFilesSent; + private final long totalBytesToSend; + private final long totalBytesSent; + + @JsonCreator + public StreamsProgressStats(@JsonProperty("totalFilesToReceive") long totalFilesToReceive, + @JsonProperty("totalFilesReceived") long totalFilesReceived, + @JsonProperty("totalBytesToReceive") long totalBytesToReceive, + @JsonProperty("totalBytesReceived") long totalBytesReceived, + @JsonProperty("totalFilesToSend") long totalFilesToSend, + @JsonProperty("totalFilesSent") long totalFilesSent, + @JsonProperty("totalBytesToSend") long totalBytesToSend, + @JsonProperty("totalBytesSent") long totalBytesSent) + { + this.totalFilesToReceive = totalFilesToReceive; + this.totalFilesReceived = totalFilesReceived; + this.totalBytesToReceive = totalBytesToReceive; + this.totalBytesReceived = totalBytesReceived; + this.totalFilesToSend = totalFilesToSend; + this.totalFilesSent = totalFilesSent; + this.totalBytesToSend = totalBytesToSend; + this.totalBytesSent = totalBytesSent; + } + + @JsonProperty("totalFilesToReceive") + public long totalFilesToReceive() + { + return totalFilesToReceive; + } + + @JsonProperty("totalFilesReceived") + public long totalFilesReceived() + { + return totalFilesReceived; + } + + @JsonProperty("totalBytesToReceive") + public long totalBytesToReceive() + { + return totalBytesToReceive; + } + + @JsonProperty("totalBytesReceived") + public long totalBytesReceived() + { + return totalBytesReceived; + } + + @JsonProperty("totalFilesToSend") + public long totalFilesToSend() + { + return totalFilesToSend; + } + + @JsonProperty("totalFilesSent") + public long totalFilesSent() + { + return totalFilesSent; + } + + @JsonProperty("totalBytesToSend") + public long totalBytesToSend() + { + return totalBytesToSend; + } + + @JsonProperty("totalBytesSent") + public long totalBytesSent() + { + return totalBytesSent; + } +} diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java index dd929c42..ef724128 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java @@ -47,6 +47,7 @@ import org.apache.cassandra.sidecar.common.request.RingRequest; import org.apache.cassandra.sidecar.common.request.SSTableComponentRequest; import org.apache.cassandra.sidecar.common.request.SchemaRequest; import org.apache.cassandra.sidecar.common.request.SidecarHealthRequest; +import org.apache.cassandra.sidecar.common.request.StreamStatsRequest; import org.apache.cassandra.sidecar.common.request.TimeSkewRequest; import org.apache.cassandra.sidecar.common.request.TokenRangeReplicasRequest; import org.apache.cassandra.sidecar.common.request.UploadSSTableRequest; @@ -79,6 +80,8 @@ public class RequestContext protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new GossipInfoRequest(); protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new ListOperationalJobsRequest(); protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = new NodeDecommissionRequest(); + + protected static final StreamStatsRequest STREAM_STATS_REQUEST = new StreamStatsRequest(); protected static final RetryPolicy DEFAULT_RETRY_POLICY = new NoRetryPolicy(); protected static final RetryPolicy DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY = new ExponentialBackoffRetryPolicy(10, 500L, 60_000L); @@ -537,6 +540,17 @@ public class RequestContext return request(NODE_DECOMMISSION_REQUEST); } + /** + * Sets the {@code request} to be a {@link StreamStatsRequest} and returns a reference to this Builder + * enabling method chaining. + * + * @return a reference to this Builder + */ + public Builder streamsStatsRequest() + { + return request(STREAM_STATS_REQUEST); + } + /** * Sets the {@code retryPolicy} to be an * {@link org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} configured with diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java index b2425516..749b220d 100644 --- a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java +++ b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java @@ -61,6 +61,7 @@ import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.SSTableImportResponse; import org.apache.cassandra.sidecar.common.response.SchemaResponse; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; import org.apache.cassandra.sidecar.common.response.TimeSkewResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload; @@ -678,6 +679,19 @@ public class SidecarClient implements AutoCloseable, SidecarClientBlobRestoreExt .build()); } + /** + * Executes the streams stats request using the default retry policy and configured selection policy + * @param instance the instance where the request will be executed + * @return a completable future of the connected client stats + */ + public CompletableFuture<StreamStatsResponse> streamsStats(SidecarInstance instance) + { + return executor.executeRequestAsync(requestBuilder() + .singleInstanceSelectionPolicy(instance) + .streamsStatsRequest() + .build()); + } + /** * Executes the node decommission request using the default retry policy and configured selection policy * @param instance the instance where the request will be executed diff --git a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java index d305ba88..8b1c15c9 100644 --- a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java +++ b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java @@ -80,12 +80,14 @@ import org.apache.cassandra.sidecar.common.response.OperationalJobResponse; import org.apache.cassandra.sidecar.common.response.RingResponse; import org.apache.cassandra.sidecar.common.response.SSTableImportResponse; import org.apache.cassandra.sidecar.common.response.SchemaResponse; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; import org.apache.cassandra.sidecar.common.response.TimeSkewResponse; import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse; import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo; import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry; import org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload; import org.apache.cassandra.sidecar.common.response.data.RingEntry; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; import org.apache.cassandra.sidecar.common.utils.HttpRange; import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen; @@ -1622,6 +1624,26 @@ abstract class SidecarClientTest assertThat(new String(baos.toByteArray(), StandardCharsets.UTF_8)).isEqualTo("Test Content"); } + @Test + public void testStreamsStats() throws Exception + { + StreamsProgressStats stats = new StreamsProgressStats(7, 7, 15088, 15088, 2, 2, 1024, 1024); + StreamStatsResponse mockResp = new StreamStatsResponse("NORMAL", stats); + ObjectMapper mapper = new ObjectMapper(); + String expectedResponse = mapper.writeValueAsString(mockResp); + MockResponse response = new MockResponse() + .setResponseCode(OK.code()) + .setBody(expectedResponse); + enqueue(response); + for (MockWebServer server : servers) + { + SidecarInstanceImpl sidecarInstance = RequestExecutorTest.newSidecarInstance(server); + StreamStatsResponse result = client.streamsStats(sidecarInstance).get(30, TimeUnit.SECONDS); + assertThat(mapper.writeValueAsString(result)).isEqualTo(expectedResponse); + validateResponseServed(server, ApiEndpointsV1.STREAM_STATS_ROUTE, req -> { }); + } + } + private void enqueue(MockResponse response) { for (MockWebServer server : servers) diff --git a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java index ef7a5f95..e57a7d31 100644 --- a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java +++ b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java @@ -19,6 +19,7 @@ package org.apache.cassandra.sidecar.common.server; import org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; /** * An interface that defines interactions with the metrics system in Cassandra. @@ -32,4 +33,9 @@ public interface MetricsOperations */ ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly); + /** + * Retrieve the stream progress stats from the cluster + * @return the requested stream progress stats + */ + StreamsProgressStats streamsProgressStats(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java index 8074bca3..6ee1ad9a 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java @@ -63,4 +63,7 @@ public class BasicPermissions public static final Permission READ_GOSSIP = new DomainAwarePermission("GOSSIP:READ"); public static final Permission READ_RING = new DomainAwarePermission("RING:READ"); public static final Permission READ_TOPOLOGY = new DomainAwarePermission("TOPOLOGY:READ"); + + // cassandra stats permissions + public static final Permission STATS = new StandardPermission("STATS"); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java b/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java new file mode 100644 index 00000000..ba3941e6 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.Set; + +import com.google.inject.Inject; +import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.net.SocketAddress; +import io.vertx.ext.auth.authorization.Authorization; +import io.vertx.ext.web.RoutingContext; +import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions; +import org.apache.cassandra.sidecar.acl.authorization.VariableAwareResource; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; +import org.apache.cassandra.sidecar.concurrent.ExecutorPools; +import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher; + +/** + * Handler for retrieving node streams stats + */ +public class StreamStatsHandler extends AbstractHandler<Void> implements AccessProtected +{ + /** + * Constructs a handler with the provided {@code metadataFetcher} + * + * @param metadataFetcher the metadata fetcher + * @param executorPools executor pools for blocking executions + */ + @Inject + protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher, + ExecutorPools executorPools) + { + super(metadataFetcher, executorPools, null); + } + + @Override + public Set<Authorization> requiredAuthorizations() + { + String resource = VariableAwareResource.CLUSTER.resource(); + return Collections.singleton(BasicPermissions.STATS.toAuthorization(resource)); + } + + /** + * {@inheritDoc} + */ + @Override + public void handleInternal(RoutingContext context, + HttpServerRequest httpRequest, + String host, + SocketAddress remoteAddress, + Void request) + { + + CassandraAdapterDelegate delegate = metadataFetcher.delegate(host); + + executorPools.service() + .executeBlocking(() -> { + String mode = delegate.storageOperations().operationMode(); + StreamsProgressStats stats = delegate.metricsOperations().streamsProgressStats(); + return new StreamStatsResponse(mode, stats); + }) + .onSuccess(context::json) + .onFailure(cause -> processFailure(cause, context, host, remoteAddress, request)); + + } + + /** + * {@inheritDoc} + */ + @Override + protected Void extractParamsOrThrow(RoutingContext context) + { + return null; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java index 5fe8b229..ce0752b7 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java @@ -125,6 +125,7 @@ import org.apache.cassandra.sidecar.routes.RingHandler; import org.apache.cassandra.sidecar.routes.RoutingOrder; import org.apache.cassandra.sidecar.routes.SchemaHandler; import org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler; +import org.apache.cassandra.sidecar.routes.StreamStatsHandler; import org.apache.cassandra.sidecar.routes.TimeSkewHandler; import org.apache.cassandra.sidecar.routes.TokenRangeReplicaMapHandler; import org.apache.cassandra.sidecar.routes.cassandra.NodeSettingsHandler; @@ -341,6 +342,7 @@ public class MainModule extends AbstractModule OperationalJobHandler operationalJobHandler, ListOperationalJobsHandler listOperationalJobsHandler, NodeDecommissionHandler nodeDecommissionHandler, + StreamStatsHandler streamStatsHandler, ErrorHandler errorHandler) { Router router = Router.router(vertx); @@ -496,6 +498,11 @@ public class MainModule extends AbstractModule .handler(nodeDecommissionHandler) .build(); + protectedRouteBuilderFactory.get().router(router).method(HttpMethod.GET) + .endpoint(ApiEndpointsV1.STREAM_STATS_ROUTE) + .handler(streamStatsHandler) + .build(); + protectedRouteBuilderFactory.get().router(router).method(HttpMethod.PUT) .endpoint(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE) .handler(ssTableUploadHandler) diff --git a/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java new file mode 100644 index 00000000..809f9cb4 --- /dev/null +++ b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.jupiter.api.extension.ExtendWith; + +import com.datastax.driver.core.Session; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.dynamic.ClassFileLocator; +import net.bytebuddy.dynamic.TypeResolutionStrategy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.pool.TypePool; +import org.apache.cassandra.distributed.UpgradeableCluster; +import org.apache.cassandra.distributed.api.IUpgradeableInstance; +import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; +import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName; +import org.apache.cassandra.sidecar.testing.IntegrationTestBase; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.testing.CassandraIntegrationTest; +import org.apache.cassandra.testing.ConfigurableCassandraTestContext; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests the stream stats endpoint with cassandra container. + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsIntegrationTest extends IntegrationTestBase +{ + @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2, network = true, buildCluster = false) + void streamStatsTest(VertxTestContext context, ConfigurableCassandraTestContext cassandraTestContext) throws Exception + { + BBHelperDecommissioningNode.reset(); + UpgradeableCluster cluster = cassandraTestContext.configureAndStartCluster( + builder -> builder.withInstanceInitializer(BBHelperDecommissioningNode::install)); + IUpgradeableInstance node = cluster.get(2); + + createTestKeyspace(); + createTestTableAndPopulate(); + + startAsync("Decommission node" + node.config().num(), + () -> node.nodetoolResult("decommission", "--force").asserts().success()); + AtomicBoolean hasStats = new AtomicBoolean(false); + AtomicBoolean dataReceived = new AtomicBoolean(false); + + // Wait until nodes have reached expected state + awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, TimeUnit.MINUTES, "transientStateStart"); + + // optimal no. of attempts to poll for stats to capture streaming stats during node decommissioning + loopAssert(10, 200, () -> { + streamStats(hasStats, dataReceived); + assertThat(hasStats).isTrue(); + assertThat(dataReceived).isTrue(); + }); + ClusterUtils.awaitGossipStatus(node, node, "LEFT"); + BBHelperDecommissioningNode.transientStateEnd.countDown(); + + context.completeNow(); + context.awaitCompletion(2, TimeUnit.MINUTES); + } + + private void streamStats(AtomicBoolean hasStats, AtomicBoolean dataReceived) + { + String testRoute = "/api/v1/cassandra/stats/streams"; + HttpResponse<Buffer> resp; + try + { + resp = client.get(server.actualPort(), "127.0.0.1", testRoute) + .send() + .toCompletionStage() + .toCompletableFuture() + .get(); + assertStreamStatsResponseOK(resp, hasStats, dataReceived); + } + catch (InterruptedException | ExecutionException e) + { + throw new RuntimeException(e); + } + } + + void assertStreamStatsResponseOK(HttpResponse<Buffer> response, AtomicBoolean hasStats, AtomicBoolean dataReceived) + { + assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code()); + StreamStatsResponse streamStatsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(streamStatsResponse).isNotNull(); + StreamsProgressStats streamProgress = streamStatsResponse.streamsProgressStats(); + assertThat(streamProgress).isNotNull(); + if (streamProgress.totalFilesToReceive() > 0) + { + hasStats.set(true); + if (streamProgress.totalFilesToReceive() == streamProgress.totalFilesReceived() && + streamProgress.totalFilesReceived() > 0) + { + dataReceived.set(true); + assertThat(streamProgress.totalBytesToReceive()).isEqualTo(streamProgress.totalBytesReceived()); + assertThat(streamProgress.totalBytesReceived()).isGreaterThan(0); + } + } + } + + QualifiedTableName createTestTableAndPopulate() + { + QualifiedTableName tableName = createTestTable( + "CREATE TABLE %s ( \n" + + " race_year int, \n" + + " race_name text, \n" + + " cyclist_name text, \n" + + " rank int, \n" + + " PRIMARY KEY ((race_year, race_name), rank) \n" + + ");"); + Session session = maybeGetSession(); + + session.execute("CREATE INDEX ryear ON " + tableName + " (race_year);"); + + for (int i = 1; i <= 1000; i++) + { + session.execute("INSERT INTO " + tableName + " (race_year, race_name, rank, cyclist_name) " + + "VALUES (2015, 'Tour of Japan - Stage 4 - Minami > Shinshu', " + i + ", 'Benjamin PRADES');"); + } + return tableName; + } + + /** + * ByteBuddy Helper for decommissioning node + */ + public static class BBHelperDecommissioningNode + { + static CountDownLatch transientStateStart = new CountDownLatch(1); + static CountDownLatch transientStateEnd = new CountDownLatch(1); + + public static void install(ClassLoader cl, Integer nodeNumber) + { + if (nodeNumber == 2) + { + TypePool typePool = TypePool.Default.of(cl); + TypeDescription description = typePool.describe("org.apache.cassandra.streaming.StreamCoordinator") + .resolve(); + new ByteBuddy().rebase(description, ClassFileLocator.ForClassLoader.of(cl)) + .method(named("connectAllStreamSessions")) + .intercept(MethodDelegation.to(BBHelperDecommissioningNode.class)) + // Defer class loading until all dependencies are loaded + .make(TypeResolutionStrategy.Lazy.INSTANCE, typePool) + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + } + + @SuppressWarnings("unused") + public static void connectAllStreamSessions(@SuperCall Callable<StreamOperation> orig) throws Exception + { + transientStateStart.countDown(); + Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS); + orig.call(); + } + + public static void reset() + { + transientStateStart = new CountDownLatch(1); + transientStateEnd = new CountDownLatch(1); + } + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java new file mode 100644 index 00000000..701eb628 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.routes; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Module; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import com.google.inject.util.Modules; +import io.vertx.core.Vertx; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.predicate.ResponsePredicate; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.apache.cassandra.sidecar.TestModule; +import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate; +import org.apache.cassandra.sidecar.cluster.InstancesMetadata; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.response.StreamStatsResponse; +import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats; +import org.apache.cassandra.sidecar.common.server.MetricsOperations; +import org.apache.cassandra.sidecar.common.server.StorageOperations; +import org.apache.cassandra.sidecar.server.MainModule; +import org.apache.cassandra.sidecar.server.Server; +import org.mockito.stubbing.Answer; + +import static io.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link StreamStatsHandler} + */ +@ExtendWith(VertxExtension.class) +public class StreamStatsHandlerTest +{ + + static final Logger LOGGER = LoggerFactory.getLogger(StreamStatsHandlerTest.class); + Vertx vertx; + Server server; + + Supplier<StreamStatsResponse> streamingStatsSupplier; + + @BeforeEach + void before() throws InterruptedException + { + Module testOverride = Modules.override(new TestModule()) + .with(new StreamingStatsTestModule()); + Injector injector = Guice.createInjector(Modules.override(new MainModule()) + .with(testOverride)); + vertx = injector.getInstance(Vertx.class); + server = injector.getInstance(Server.class); + VertxTestContext context = new VertxTestContext(); + server.start() + .onSuccess(s -> context.completeNow()) + .onFailure(context::failNow); + context.awaitCompletion(5, TimeUnit.SECONDS); + } + + @AfterEach + void after() throws InterruptedException + { + CountDownLatch closeLatch = new CountDownLatch(1); + server.close().onSuccess(res -> closeLatch.countDown()); + if (closeLatch.await(60, TimeUnit.SECONDS)) + LOGGER.info("Close event received before timeout."); + else + LOGGER.error("Close event timed out."); + } + + @Test + void testStreamingStatsHandler(VertxTestContext context) + { + streamingStatsSupplier = () -> { + StreamStatsResponse response = new StreamStatsResponse("NORMAL", + new StreamsProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0)); + return response; + }; + + + WebClient client = WebClient.create(vertx); + String testRoute = "/api/v1/cassandra/stats/streams"; + client.get(server.actualPort(), "127.0.0.1", testRoute) + .expect(ResponsePredicate.SC_OK) + .send(context.succeeding(response -> { + assertThat(response.statusCode()).isEqualTo(OK.code()); + StreamStatsResponse statsResponse = response.bodyAsJson(StreamStatsResponse.class); + assertThat(statsResponse).isNotNull(); + assertThat(statsResponse.operationMode()).isEqualTo("NORMAL"); + context.completeNow(); + })); + } + + class StreamingStatsTestModule extends AbstractModule + { + @Provides + @Singleton + public InstancesMetadata instancesMetadata() + { + int instanceId = 100; + String host = "127.0.0.1"; + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + when(instanceMetadata.host()).thenReturn(host); + when(instanceMetadata.port()).thenReturn(9042); + when(instanceMetadata.id()).thenReturn(instanceId); + when(instanceMetadata.stagingDir()).thenReturn(""); + CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class); + StorageOperations ops = mock(StorageOperations.class); + when(ops.operationMode()).thenAnswer((Answer<String>) invocation -> streamingStatsSupplier.get().operationMode()); + when(delegate.storageOperations()).thenReturn(ops); + MetricsOperations metricsOps = mock(MetricsOperations.class); + when(metricsOps.streamsProgressStats()) + .thenAnswer((Answer<StreamsProgressStats>) invocation -> streamingStatsSupplier.get().streamsProgressStats()); + when(delegate.metricsOperations()).thenReturn(metricsOps); + + when(instanceMetadata.delegate()).thenReturn(delegate); + + InstancesMetadata mockInstancesMetadata = mock(InstancesMetadata.class); + when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata)); + when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata); + when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata); + + return mockInstancesMetadata; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org