This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4fb9771 CASSSIDECAR-180: Changes to add endpoint for stream stats 
(#166)
b4fb9771 is described below

commit b4fb9771a098b303b59cc0a058c76ca93df90409
Author: Arjun Ashok <arjun_as...@apple.com>
AuthorDate: Wed Jan 29 09:25:37 2025 -0800

    CASSSIDECAR-180: Changes to add endpoint for stream stats (#166)
    
    Patch by Arjun Ashok; Reviewed by Bernardo Botella, Yifan Cai, Francisco 
Guerrero for CASSSIDECAR-180
---
 CHANGES.txt                                        |   1 +
 .../sidecar/adapters/base/CassandraAdapter.java    |   2 +-
 .../adapters/base/CassandraMetricsOperations.java  |  66 ++++++-
 .../base/GossipDependentStorageJmxOperations.java  |  12 +-
 .../adapters/base/StorageJmxOperations.java        |   2 +-
 .../adapters/base/StreamManagerJmxOperations.java  |  20 ++-
 .../adapters/base/data/CompositeDataUtil.java      |  53 ++++++
 .../sidecar/adapters/base/data/ProgressInfo.java   |  54 ++++++
 .../sidecar/adapters/base/data/SessionInfo.java    | 161 +++++++++++++++++
 .../sidecar/adapters/base/data/StreamState.java    |  31 +++-
 .../sidecar/adapters/base/data/StreamSummary.java  |  23 ++-
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   1 +
 .../sidecar/common/request/StreamStatsRequest.java |  26 ++-
 .../common/response/StreamStatsResponse.java       |  60 +++++++
 .../common/response/data/StreamsProgressStats.java | 107 +++++++++++
 .../cassandra/sidecar/client/RequestContext.java   |  14 ++
 .../cassandra/sidecar/client/SidecarClient.java    |  14 ++
 .../sidecar/client/SidecarClientTest.java          |  22 +++
 .../sidecar/common/server/MetricsOperations.java   |   6 +
 .../acl/authorization/BasicPermissions.java        |   3 +
 .../sidecar/routes/StreamStatsHandler.java         |  94 ++++++++++
 .../cassandra/sidecar/server/MainModule.java       |   7 +
 .../sidecar/routes/StreamStatsIntegrationTest.java | 196 +++++++++++++++++++++
 .../sidecar/routes/StreamStatsHandlerTest.java     | 158 +++++++++++++++++
 24 files changed, 1091 insertions(+), 42 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4d9d25de..11992ddf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Add sidecar endpoint to retrieve stream stats (CASSSIDECAR-180)
  * Add sidecar endpoint to retrieve cassandra gossip health (CASSSIDECAR-173)
  * Fix SidecarSchema stuck at initialization due to ClusterLeaseTask 
scheduling (CASSSIDECAR-189)
  * Add RBAC Authorization support in Sidecar (CASSSIDECAR-161)
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index f9a1f5c3..f5c16288 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -127,7 +127,7 @@ public class CassandraAdapter implements ICassandraAdapter
     @NotNull
     public MetricsOperations metricsOperations()
     {
-        return new CassandraMetricsOperations(cqlSessionProvider);
+        return new CassandraMetricsOperations(jmxClient, cqlSessionProvider);
     }
 
     /**
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
index cc3ff9fc..8ce643c3 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraMetricsOperations.java
@@ -18,33 +18,52 @@
 
 package org.apache.cassandra.sidecar.adapters.base;
 
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import javax.management.openmbean.CompositeData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.adapters.base.data.SessionInfo;
+import org.apache.cassandra.sidecar.adapters.base.data.StreamState;
 import org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStats;
 import 
org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsDatabaseAccessor;
 import 
org.apache.cassandra.sidecar.adapters.base.db.ConnectedClientStatsSummary;
 import 
org.apache.cassandra.sidecar.adapters.base.db.schema.ConnectedClientsSchema;
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
 import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
 import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.server.JmxClient;
 import org.apache.cassandra.sidecar.common.server.MetricsOperations;
 import org.jetbrains.annotations.NotNull;
 
+import static 
org.apache.cassandra.sidecar.adapters.base.StreamManagerJmxOperations.STREAM_MANAGER_OBJ_NAME;
+
 /**
  * Default implementation that pulls methods from the Cassandra Metrics Proxy
  */
 public class CassandraMetricsOperations implements MetricsOperations
 {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CassandraMetricsOperations.class);
     private final ConnectedClientStatsDatabaseAccessor dbAccessor;
 
+    protected final JmxClient jmxClient;
+
+
     /**
      * Creates a new instance with the provided {@link CQLSessionProvider}
      */
-    public CassandraMetricsOperations(CQLSessionProvider session)
+    public CassandraMetricsOperations(JmxClient jmxClient, CQLSessionProvider 
session)
     {
+        this.jmxClient = jmxClient;
         this.dbAccessor = new ConnectedClientStatsDatabaseAccessor(session, 
new ConnectedClientsSchema());
     }
 
@@ -70,6 +89,51 @@ public class CassandraMetricsOperations implements 
MetricsOperations
         return new ConnectedClientStatsResponse(entries, 
totalConnectedClients, connectionsByUser);
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public StreamsProgressStats streamsProgressStats()
+    {
+        Set<CompositeData> streamData = 
jmxClient.proxy(StreamManagerJmxOperations.class, STREAM_MANAGER_OBJ_NAME)
+                                                 .getCurrentStreams();
+        return computeStats(streamData.stream().map(StreamState::new));
+    }
+
+    private StreamsProgressStats computeStats(Stream<StreamState> streamStates)
+    {
+        Iterator<SessionInfo> sessions = 
streamStates.map(StreamState::sessions).flatMap(Collection::stream).iterator();
+
+        long totalFilesToReceive = 0;
+        long totalFilesReceived = 0;
+        long totalBytesToReceive = 0;
+        long totalBytesReceived = 0;
+
+        long totalFilesToSend = 0;
+        long totalFilesSent = 0;
+        long totalBytesToSend = 0;
+        long totalBytesSent = 0;
+
+        while (sessions.hasNext())
+        {
+            SessionInfo sessionInfo = sessions.next();
+            totalBytesToReceive += sessionInfo.totalSizeToReceive();
+            totalBytesReceived += sessionInfo.totalSizeReceived();
+            totalFilesToReceive += sessionInfo.totalFilesToReceive();
+            totalFilesReceived += sessionInfo.totalFilesReceived();
+            totalBytesToSend += sessionInfo.totalSizeToSend();
+            totalBytesSent += sessionInfo.totalSizeSent();
+            totalFilesToSend += sessionInfo.totalFilesToSend();
+            totalFilesSent += sessionInfo.totalFilesSent();
+        }
+
+        LOGGER.debug("Progress Stats: totalBytesToReceive:{} 
totalBytesReceived:{} totalBytesToSend:{} totalBytesSent:{}",
+                     totalBytesToReceive, totalBytesReceived, 
totalBytesToSend, totalBytesSent);
+        return new StreamsProgressStats(totalFilesToReceive, 
totalFilesReceived, totalBytesToReceive, totalBytesReceived,
+                                        totalFilesToSend, totalFilesSent, 
totalBytesToSend, totalBytesSent);
+
+    }
+
     private ConnectedClientStatsResponse connectedClientSummary()
     {
         ConnectedClientStatsSummary summary = dbAccessor.summary();
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
index bbdb29c4..350096bf 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/GossipDependentStorageJmxOperations.java
@@ -146,6 +146,12 @@ public class GossipDependentStorageJmxOperations 
implements StorageJmxOperations
         return delegate.forceKeyspaceCleanup(jobs, keyspaceName, tables);
     }
 
+    @Override
+    public String getOperationMode()
+    {
+        return delegate.getOperationMode();
+    }
+
     /**
      * Ensures that gossip is running on the Cassandra instance
      *
@@ -165,10 +171,4 @@ public class GossipDependentStorageJmxOperations 
implements StorageJmxOperations
     {
         delegate.decommission(force);
     }
-
-    @Override
-    public String getOperationMode()
-    {
-        return delegate.getOperationMode();
-    }
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
index ca18db04..75332435 100644
--- 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
@@ -168,7 +168,7 @@ public interface StorageJmxOperations
 
     /**
      * Fetch the operation-mode of the node
-     * @return string representation of theoperation-mode
+     * @return string representation of the operation-mode
      */
     String getOperationMode();
 }
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java
similarity index 60%
copy from 
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
copy to 
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java
index ef7a5f95..42347dc0 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StreamManagerJmxOperations.java
@@ -16,20 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.common.server;
+package org.apache.cassandra.sidecar.adapters.base;
 
-import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import java.util.Set;
+import javax.management.openmbean.CompositeData;
 
 /**
- * An interface that defines interactions with the metrics system in Cassandra.
+ * An interface that pulls methods from the Cassandra Stream manager Proxy
  */
-public interface MetricsOperations
+public interface StreamManagerJmxOperations
 {
+
+    String STREAM_MANAGER_OBJ_NAME = 
"org.apache.cassandra.net:type=StreamManager";
+
     /**
-     * Retrieve the connected client stats metrics from the cluster
-     * @param summaryOnly boolean parameter to list connection summary only
-     * @return the requested client stats, in full or summary
+     * Returns the current snapshot of the progress of all ongoing streams.
+     * @return the current state of streams as a set of JMX {@link 
CompositeData} instances.
      */
-    ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
-
+    Set<CompositeData> getCurrentStreams();
 }
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/CompositeDataUtil.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/CompositeDataUtil.java
new file mode 100644
index 00000000..a033fe09
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/CompositeDataUtil.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.NoSuchElementException;
+import javax.management.openmbean.CompositeData;
+
+/**
+ * Utility class for operations with {@link CompositeData}
+ */
+public class CompositeDataUtil
+{
+
+    /**
+     * Generic helper to extract attribute of a specific type from the 
CompositeData type.
+     * @param data data being parsed
+     * @param key attribute being extracted
+     * @return attribute value
+     * @param <T> return type
+     */
+    public static <T> T extractValue(CompositeData data, String key)
+    {
+        Object value = data.get(key);
+        if (value == null)
+        {
+            throw new NoSuchElementException("No value is present for key: " + 
key);
+        }
+        try
+        {
+            return (T) value;
+        }
+        catch (ClassCastException cce)
+        {
+            throw new RuntimeException("Value type mismatched of key: " + key, 
cce);
+        }
+    }
+}
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java
new file mode 100644
index 00000000..512e9984
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/ProgressInfo.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import javax.management.openmbean.CompositeData;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.data.CompositeDataUtil.extractValue;
+
+/**
+ * Representation of the stream progress info
+ */
+public class ProgressInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String fileName;
+    public final String direction;
+    public final long currentBytes;
+    public final long totalBytes;
+
+    public ProgressInfo(CompositeData data)
+    {
+        this.peer = extractValue(data, "peer");
+        this.sessionIndex = extractValue(data, "sessionIndex");
+        this.fileName = extractValue(data, "fileName");
+        this.direction = extractValue(data, "direction");
+        this.currentBytes = extractValue(data, "currentBytes");
+        this.totalBytes = extractValue(data, "totalBytes");
+    }
+
+    /**
+     * @return true if transfer is completed
+     */
+    public boolean isCompleted()
+    {
+        return currentBytes >= totalBytes;
+    }
+}
diff --git 
a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java
new file mode 100644
index 00000000..c9006593
--- /dev/null
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/SessionInfo.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base.data;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
+
+import static 
org.apache.cassandra.sidecar.adapters.base.data.CompositeDataUtil.extractValue;
+
+/**
+ * Representation of session info data
+ */
+public class SessionInfo
+{
+    public final String peer;
+    public final int sessionIndex;
+    public final String connecting;
+    /** Immutable collection of receiving summaries */
+    public final List<StreamSummary> receivingSummaries;
+    /** Immutable collection of sending summaries*/
+    public final List<StreamSummary> sendingSummaries;
+    /** Current session state */
+    public final String state;
+    public final List<ProgressInfo> receivingFiles;
+    public final List<ProgressInfo> sendingFiles;
+
+    public SessionInfo(CompositeData data)
+    {
+        this.peer = extractValue(data, "peer");
+        this.sessionIndex = extractValue(data, "sessionIndex");
+        this.connecting = extractValue(data, "connecting");
+        this.receivingSummaries = parseSummaries(extractValue(data, 
"receivingSummaries"));
+        this.sendingSummaries = parseSummaries(extractValue(data, 
"sendingSummaries"));
+        this.state = extractValue(data, "state");
+        this.receivingFiles = parseFiles(extractValue(data, "receivingFiles"));
+        this.sendingFiles = parseFiles(extractValue(data, "sendingFiles"));
+    }
+
+    /**
+     * @return total size(in bytes) already received.
+     */
+    public long totalSizeReceived()
+    {
+        return totalSizeInProgress(receivingFiles);
+    }
+
+    /**
+     * @return total size(in bytes) already sent.
+     */
+    public long totalSizeSent()
+    {
+        return totalSizeInProgress(sendingFiles);
+    }
+
+    /**
+     * @return total number of files to receive in the session
+     */
+    public long totalFilesToReceive()
+    {
+        return totalFiles(receivingSummaries);
+    }
+
+    /**
+     * @return total number of files to send in the session
+     */
+    public long totalFilesToSend()
+    {
+        return totalFiles(sendingSummaries);
+    }
+
+    /**
+     * @return total size(in bytes) to receive in the session
+     */
+    public long totalSizeToReceive()
+    {
+        return totalSizes(receivingSummaries);
+    }
+
+    /**
+     * @return total size(in bytes) to send in the session
+     */
+    public long totalSizeToSend()
+    {
+        return totalSizes(sendingSummaries);
+    }
+
+    /**
+     * @return total number of files already received.
+     */
+    public long totalFilesReceived()
+    {
+        return totalFilesCompleted(receivingFiles);
+    }
+
+    /**
+     * @return total number of files already sent.
+     */
+    public long totalFilesSent()
+    {
+        return totalFilesCompleted(sendingFiles);
+    }
+
+    private long totalSizes(List<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.totalSize;
+        return total;
+    }
+
+    private long totalFilesCompleted(List<ProgressInfo> files)
+    {
+                return files.stream()
+                    .filter(ProgressInfo::isCompleted)
+                    .count();
+    }
+
+    private long totalSizeInProgress(List<ProgressInfo> streams)
+    {
+        long total = 0;
+        for (ProgressInfo stream : streams)
+            total += stream.currentBytes;
+        return total;
+    }
+
+    private List<StreamSummary> parseSummaries(CompositeData[] summaries)
+    {
+        return 
Arrays.stream(summaries).map(StreamSummary::new).collect(Collectors.toList());
+    }
+
+    private List<ProgressInfo> parseFiles(CompositeData[] files)
+    {
+        return 
Arrays.stream(files).map(ProgressInfo::new).collect(Collectors.toList());
+    }
+
+    private long totalFiles(List<StreamSummary> summaries)
+    {
+        long total = 0;
+        for (StreamSummary summary : summaries)
+            total += summary.files;
+        return total;
+    }
+}
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java
similarity index 52%
copy from 
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
copy to 
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java
index ef7a5f95..b6c4d189 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamState.java
@@ -16,20 +16,35 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.common.server;
+package org.apache.cassandra.sidecar.adapters.base.data;
 
-import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.management.openmbean.CompositeData;
 
 /**
- * An interface that defines interactions with the metrics system in Cassandra.
+ * Representation of the stream state data
  */
-public interface MetricsOperations
+public class StreamState
 {
+    private final List<SessionInfo> sessions;
+
+    public StreamState(CompositeData data)
+    {
+        this.sessions = parseSessions((CompositeData[]) data.get("sessions"));
+    }
+
     /**
-     * Retrieve the connected client stats metrics from the cluster
-     * @param summaryOnly boolean parameter to list connection summary only
-     * @return the requested client stats, in full or summary
+     * @return the session info for the sessions in the stream stats data
      */
-    ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
+    public List<SessionInfo> sessions()
+    {
+        return sessions;
+    }
 
+    private List<SessionInfo> parseSessions(CompositeData[] sessions)
+    {
+        return 
Arrays.stream(sessions).map(SessionInfo::new).collect(Collectors.toList());
+    }
 }
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java
similarity index 59%
copy from 
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
copy to 
adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java
index ef7a5f95..0d7a360f 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
+++ 
b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/data/StreamSummary.java
@@ -16,20 +16,27 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.common.server;
+package org.apache.cassandra.sidecar.adapters.base.data;
 
-import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import javax.management.openmbean.CompositeData;
 
 /**
- * An interface that defines interactions with the metrics system in Cassandra.
+ * Representation of the stream summary data
  */
-public interface MetricsOperations
+public class StreamSummary
 {
+    public final String tableId;
+
     /**
-     * Retrieve the connected client stats metrics from the cluster
-     * @param summaryOnly boolean parameter to list connection summary only
-     * @return the requested client stats, in full or summary
+     * Number of files to transfer. Can be 0 if nothing to transfer for some 
streaming request.
      */
-    ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
+    public final int files;
+    public final long totalSize;
 
+    public StreamSummary(CompositeData data)
+    {
+        this.tableId = (String) data.get("tableId");
+        this.files = (int) data.get("files");
+        this.totalSize = (long) data.get("totalSize");
+    }
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 673f0100..511a5daa 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -129,6 +129,7 @@ public final class ApiEndpointsV1
     public static final String LIST_OPERATIONAL_JOBS_ROUTE = API_V1 + 
CASSANDRA + OPERATIONAL_JOBS;
     public static final String OPERATIONAL_JOB_ROUTE = API_V1 + CASSANDRA + 
PER_OPERATIONAL_JOB;
     public static final String NODE_DECOMMISSION_ROUTE = API_V1 + CASSANDRA + 
"/operations/decommission";
+    public static final String STREAM_STATS_ROUTE = API_V1 + CASSANDRA + 
"/stats/streams";
 
     private ApiEndpointsV1()
     {
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java
similarity index 56%
copy from 
server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
copy to 
client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java
index ef7a5f95..2640f85d 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/StreamStatsRequest.java
@@ -16,20 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.common.server;
+package org.apache.cassandra.sidecar.common.request;
 
-import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
 
 /**
- * An interface that defines interactions with the metrics system in Cassandra.
+ * Class response for the StreamsStats API
  */
-public interface MetricsOperations
+public class StreamStatsRequest extends JsonRequest<StreamStatsResponse>
 {
     /**
-     * Retrieve the connected client stats metrics from the cluster
-     * @param summaryOnly boolean parameter to list connection summary only
-     * @return the requested client stats, in full or summary
+     * Constructs a request to retrieve the Cassandra node streaming stats 
information
      */
-    ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
+    public StreamStatsRequest()
+    {
+        super(ApiEndpointsV1.STREAM_STATS_ROUTE);
+    }
 
+    /**
+     * {@inheritDoc}
+     */
+    public HttpMethod method()
+    {
+        return HttpMethod.GET;
+    }
 }
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java
new file mode 100644
index 00000000..396374a9
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/StreamStatsResponse.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
+
+/**
+ * Class response for the StreamStats API
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class StreamStatsResponse
+{
+    private final String mode;
+    private final StreamsProgressStats stats;
+
+    /**
+     * Constructs a new {@link StreamStatsResponse}.
+     *
+     * @param mode list of client connection stats
+     * @param stats stream progress stats for the node
+     */
+    @JsonCreator
+    public StreamStatsResponse(@JsonProperty("operationMode") String mode,
+                               @JsonProperty("streamsProgressStats") 
StreamsProgressStats stats)
+    {
+        this.mode = mode;
+        this.stats = stats;
+    }
+
+    @JsonProperty("operationMode")
+    public String operationMode()
+    {
+        return mode;
+    }
+
+    @JsonProperty("streamsProgressStats")
+    public StreamsProgressStats streamsProgressStats()
+    {
+        return stats;
+    }
+}
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
new file mode 100644
index 00000000..88d74077
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/response/data/StreamsProgressStats.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.response.data;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A class representing stats summarizing the progress of streamed bytes and 
files on the node
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class StreamsProgressStats
+{
+    private final long totalFilesToReceive;
+    private final long totalFilesReceived;
+    private final long totalBytesToReceive;
+    private final long totalBytesReceived;
+    private final long totalFilesToSend;
+    private final long totalFilesSent;
+    private final long totalBytesToSend;
+    private final long totalBytesSent;
+
+    @JsonCreator
+    public StreamsProgressStats(@JsonProperty("totalFilesToReceive") long 
totalFilesToReceive,
+                                @JsonProperty("totalFilesReceived") long 
totalFilesReceived,
+                                @JsonProperty("totalBytesToReceive") long 
totalBytesToReceive,
+                                @JsonProperty("totalBytesReceived") long 
totalBytesReceived,
+                                @JsonProperty("totalFilesToSend") long 
totalFilesToSend,
+                                @JsonProperty("totalFilesSent") long 
totalFilesSent,
+                                @JsonProperty("totalBytesToSend") long 
totalBytesToSend,
+                                @JsonProperty("totalBytesSent") long 
totalBytesSent)
+    {
+        this.totalFilesToReceive = totalFilesToReceive;
+        this.totalFilesReceived = totalFilesReceived;
+        this.totalBytesToReceive = totalBytesToReceive;
+        this.totalBytesReceived = totalBytesReceived;
+        this.totalFilesToSend = totalFilesToSend;
+        this.totalFilesSent = totalFilesSent;
+        this.totalBytesToSend = totalBytesToSend;
+        this.totalBytesSent = totalBytesSent;
+    }
+
+    @JsonProperty("totalFilesToReceive")
+    public long totalFilesToReceive()
+    {
+        return totalFilesToReceive;
+    }
+
+    @JsonProperty("totalFilesReceived")
+    public long totalFilesReceived()
+    {
+        return totalFilesReceived;
+    }
+
+    @JsonProperty("totalBytesToReceive")
+    public long totalBytesToReceive()
+    {
+        return totalBytesToReceive;
+    }
+
+    @JsonProperty("totalBytesReceived")
+    public long totalBytesReceived()
+    {
+        return totalBytesReceived;
+    }
+
+    @JsonProperty("totalFilesToSend")
+    public long totalFilesToSend()
+    {
+        return totalFilesToSend;
+    }
+
+    @JsonProperty("totalFilesSent")
+    public long totalFilesSent()
+    {
+        return totalFilesSent;
+    }
+
+    @JsonProperty("totalBytesToSend")
+    public long totalBytesToSend()
+    {
+        return totalBytesToSend;
+    }
+
+    @JsonProperty("totalBytesSent")
+    public long totalBytesSent()
+    {
+        return totalBytesSent;
+    }
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index dd929c42..ef724128 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -47,6 +47,7 @@ import 
org.apache.cassandra.sidecar.common.request.RingRequest;
 import org.apache.cassandra.sidecar.common.request.SSTableComponentRequest;
 import org.apache.cassandra.sidecar.common.request.SchemaRequest;
 import org.apache.cassandra.sidecar.common.request.SidecarHealthRequest;
+import org.apache.cassandra.sidecar.common.request.StreamStatsRequest;
 import org.apache.cassandra.sidecar.common.request.TimeSkewRequest;
 import org.apache.cassandra.sidecar.common.request.TokenRangeReplicasRequest;
 import org.apache.cassandra.sidecar.common.request.UploadSSTableRequest;
@@ -79,6 +80,8 @@ public class RequestContext
     protected static final GossipInfoRequest GOSSIP_INFO_REQUEST = new 
GossipInfoRequest();
     protected static final ListOperationalJobsRequest LIST_JOBS_REQUEST = new 
ListOperationalJobsRequest();
     protected static final NodeDecommissionRequest NODE_DECOMMISSION_REQUEST = 
new NodeDecommissionRequest();
+
+    protected static final StreamStatsRequest STREAM_STATS_REQUEST = new 
StreamStatsRequest();
     protected static final RetryPolicy DEFAULT_RETRY_POLICY = new 
NoRetryPolicy();
     protected static final RetryPolicy 
DEFAULT_EXPONENTIAL_BACKOFF_RETRY_POLICY =
     new ExponentialBackoffRetryPolicy(10, 500L, 60_000L);
@@ -537,6 +540,17 @@ public class RequestContext
             return request(NODE_DECOMMISSION_REQUEST);
         }
 
+        /**
+         * Sets the {@code request} to be a {@link StreamStatsRequest} and 
returns a reference to this Builder
+         * enabling method chaining.
+         *
+         * @return a reference to this Builder
+         */
+        public Builder streamsStatsRequest()
+        {
+            return request(STREAM_STATS_REQUEST);
+        }
+
         /**
          * Sets the {@code retryPolicy} to be an
          * {@link 
org.apache.cassandra.sidecar.client.retry.ExponentialBackoffRetryPolicy} 
configured with
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index b2425516..749b220d 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -61,6 +61,7 @@ import 
org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.response.SchemaResponse;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
 import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import 
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
@@ -678,6 +679,19 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                             .build());
     }
 
+    /**
+     * Executes the streams stats request using the default retry policy and 
configured selection policy
+     * @param instance the instance where the request will be executed
+     * @return a completable future of the connected client stats
+     */
+    public CompletableFuture<StreamStatsResponse> streamsStats(SidecarInstance 
instance)
+    {
+        return executor.executeRequestAsync(requestBuilder()
+                                            
.singleInstanceSelectionPolicy(instance)
+                                            .streamsStatsRequest()
+                                            .build());
+    }
+
     /**
      * Executes the node decommission request using the default retry policy 
and configured selection policy
      * @param instance the instance where the request will be executed
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index d305ba88..8b1c15c9 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -80,12 +80,14 @@ import 
org.apache.cassandra.sidecar.common.response.OperationalJobResponse;
 import org.apache.cassandra.sidecar.common.response.RingResponse;
 import org.apache.cassandra.sidecar.common.response.SSTableImportResponse;
 import org.apache.cassandra.sidecar.common.response.SchemaResponse;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
 import org.apache.cassandra.sidecar.common.response.TimeSkewResponse;
 import org.apache.cassandra.sidecar.common.response.TokenRangeReplicasResponse;
 import org.apache.cassandra.sidecar.common.response.data.CdcSegmentInfo;
 import org.apache.cassandra.sidecar.common.response.data.ClientConnectionEntry;
 import 
org.apache.cassandra.sidecar.common.response.data.CreateRestoreJobResponsePayload;
 import org.apache.cassandra.sidecar.common.response.data.RingEntry;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
 import org.apache.cassandra.sidecar.common.utils.HttpRange;
 import org.apache.cassandra.sidecar.foundation.RestoreJobSecretsGen;
 
@@ -1622,6 +1624,26 @@ abstract class SidecarClientTest
         assertThat(new String(baos.toByteArray(), 
StandardCharsets.UTF_8)).isEqualTo("Test Content");
     }
 
+    @Test
+    public void testStreamsStats() throws Exception
+    {
+        StreamsProgressStats stats = new StreamsProgressStats(7, 7, 15088, 
15088, 2, 2, 1024, 1024);
+        StreamStatsResponse mockResp = new StreamStatsResponse("NORMAL", 
stats);
+        ObjectMapper mapper = new ObjectMapper();
+        String expectedResponse = mapper.writeValueAsString(mockResp);
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setBody(expectedResponse);
+        enqueue(response);
+        for (MockWebServer server : servers)
+        {
+            SidecarInstanceImpl sidecarInstance = 
RequestExecutorTest.newSidecarInstance(server);
+            StreamStatsResponse result = 
client.streamsStats(sidecarInstance).get(30, TimeUnit.SECONDS);
+            
assertThat(mapper.writeValueAsString(result)).isEqualTo(expectedResponse);
+            validateResponseServed(server, ApiEndpointsV1.STREAM_STATS_ROUTE, 
req -> { });
+        }
+    }
+
     private void enqueue(MockResponse response)
     {
         for (MockWebServer server : servers)
diff --git 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
index ef7a5f95..e57a7d31 100644
--- 
a/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
+++ 
b/server-common/src/main/java/org/apache/cassandra/sidecar/common/server/MetricsOperations.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.common.server;
 
 import 
org.apache.cassandra.sidecar.common.response.ConnectedClientStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
 
 /**
  * An interface that defines interactions with the metrics system in Cassandra.
@@ -32,4 +33,9 @@ public interface MetricsOperations
      */
     ConnectedClientStatsResponse connectedClientStats(boolean summaryOnly);
 
+    /**
+     * Retrieve the stream progress stats from the cluster
+     * @return the requested stream progress stats
+     */
+    StreamsProgressStats streamsProgressStats();
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index 8074bca3..6ee1ad9a 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -63,4 +63,7 @@ public class BasicPermissions
     public static final Permission READ_GOSSIP = new 
DomainAwarePermission("GOSSIP:READ");
     public static final Permission READ_RING = new 
DomainAwarePermission("RING:READ");
     public static final Permission READ_TOPOLOGY = new 
DomainAwarePermission("TOPOLOGY:READ");
+
+    // cassandra stats permissions
+    public static final Permission STATS = new StandardPermission("STATS");
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java
new file mode 100644
index 00000000..ba3941e6
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/StreamStatsHandler.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.Set;
+
+import com.google.inject.Inject;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.acl.authorization.VariableAwareResource;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+
+/**
+ * Handler for retrieving node streams stats
+ */
+public class StreamStatsHandler extends AbstractHandler<Void> implements 
AccessProtected
+{
+    /**
+     * Constructs a handler with the provided {@code metadataFetcher}
+     *
+     * @param metadataFetcher the metadata fetcher
+     * @param executorPools   executor pools for blocking executions
+     */
+    @Inject
+    protected StreamStatsHandler(InstanceMetadataFetcher metadataFetcher,
+                                 ExecutorPools executorPools)
+    {
+        super(metadataFetcher, executorPools, null);
+    }
+
+    @Override
+    public Set<Authorization> requiredAuthorizations()
+    {
+        String resource = VariableAwareResource.CLUSTER.resource();
+        return 
Collections.singleton(BasicPermissions.STATS.toAuthorization(resource));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void handleInternal(RoutingContext context,
+                               HttpServerRequest httpRequest,
+                               String host,
+                               SocketAddress remoteAddress,
+                               Void request)
+    {
+
+        CassandraAdapterDelegate delegate = metadataFetcher.delegate(host);
+
+        executorPools.service()
+                     .executeBlocking(() -> {
+                         String mode = 
delegate.storageOperations().operationMode();
+                         StreamsProgressStats stats = 
delegate.metricsOperations().streamsProgressStats();
+                         return new StreamStatsResponse(mode, stats);
+                     })
+                     .onSuccess(context::json)
+                     .onFailure(cause -> processFailure(cause, context, host, 
remoteAddress, request));
+
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    protected Void extractParamsOrThrow(RoutingContext context)
+    {
+        return null;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index 5fe8b229..ce0752b7 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -125,6 +125,7 @@ import org.apache.cassandra.sidecar.routes.RingHandler;
 import org.apache.cassandra.sidecar.routes.RoutingOrder;
 import org.apache.cassandra.sidecar.routes.SchemaHandler;
 import org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler;
+import org.apache.cassandra.sidecar.routes.StreamStatsHandler;
 import org.apache.cassandra.sidecar.routes.TimeSkewHandler;
 import org.apache.cassandra.sidecar.routes.TokenRangeReplicaMapHandler;
 import org.apache.cassandra.sidecar.routes.cassandra.NodeSettingsHandler;
@@ -341,6 +342,7 @@ public class MainModule extends AbstractModule
                               OperationalJobHandler operationalJobHandler,
                               ListOperationalJobsHandler 
listOperationalJobsHandler,
                               NodeDecommissionHandler nodeDecommissionHandler,
+                              StreamStatsHandler streamStatsHandler,
                               ErrorHandler errorHandler)
     {
         Router router = Router.router(vertx);
@@ -496,6 +498,11 @@ public class MainModule extends AbstractModule
                                     .handler(nodeDecommissionHandler)
                                     .build();
 
+        
protectedRouteBuilderFactory.get().router(router).method(HttpMethod.GET)
+                                    
.endpoint(ApiEndpointsV1.STREAM_STATS_ROUTE)
+                                    .handler(streamStatsHandler)
+                                    .build();
+
         
protectedRouteBuilderFactory.get().router(router).method(HttpMethod.PUT)
                                     
.endpoint(ApiEndpointsV1.SSTABLE_UPLOAD_ROUTE)
                                     .handler(ssTableUploadHandler)
diff --git 
a/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
new file mode 100644
index 00000000..809f9cb4
--- /dev/null
+++ 
b/server/src/test/integration/org/apache/cassandra/sidecar/routes/StreamStatsIntegrationTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import com.datastax.driver.core.Session;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.dynamic.ClassFileLocator;
+import net.bytebuddy.dynamic.TypeResolutionStrategy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.pool.TypePool;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.streaming.StreamOperation;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests the stream stats endpoint with cassandra container.
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsIntegrationTest extends IntegrationTestBase
+{
+    @CassandraIntegrationTest(numDataDirsPerInstance = 4, nodesPerDc = 2, 
network = true, buildCluster = false)
+    void streamStatsTest(VertxTestContext context, 
ConfigurableCassandraTestContext cassandraTestContext) throws Exception
+    {
+        BBHelperDecommissioningNode.reset();
+        UpgradeableCluster cluster = 
cassandraTestContext.configureAndStartCluster(
+        builder -> 
builder.withInstanceInitializer(BBHelperDecommissioningNode::install));
+        IUpgradeableInstance node = cluster.get(2);
+
+        createTestKeyspace();
+        createTestTableAndPopulate();
+
+        startAsync("Decommission node" + node.config().num(),
+                   () -> node.nodetoolResult("decommission", 
"--force").asserts().success());
+        AtomicBoolean hasStats = new AtomicBoolean(false);
+        AtomicBoolean dataReceived = new AtomicBoolean(false);
+
+        // Wait until nodes have reached expected state
+        awaitLatchOrThrow(BBHelperDecommissioningNode.transientStateStart, 2, 
TimeUnit.MINUTES, "transientStateStart");
+
+        // optimal no. of attempts to poll for stats to capture streaming 
stats during node decommissioning
+        loopAssert(10, 200, () -> {
+            streamStats(hasStats, dataReceived);
+            assertThat(hasStats).isTrue();
+            assertThat(dataReceived).isTrue();
+        });
+        ClusterUtils.awaitGossipStatus(node, node, "LEFT");
+        BBHelperDecommissioningNode.transientStateEnd.countDown();
+
+        context.completeNow();
+        context.awaitCompletion(2, TimeUnit.MINUTES);
+    }
+
+    private void streamStats(AtomicBoolean hasStats, AtomicBoolean 
dataReceived)
+    {
+        String testRoute = "/api/v1/cassandra/stats/streams";
+        HttpResponse<Buffer> resp;
+        try
+        {
+            resp = client.get(server.actualPort(), "127.0.0.1", testRoute)
+                         .send()
+                         .toCompletionStage()
+                         .toCompletableFuture()
+                         .get();
+            assertStreamStatsResponseOK(resp, hasStats, dataReceived);
+        }
+        catch (InterruptedException | ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    void assertStreamStatsResponseOK(HttpResponse<Buffer> response, 
AtomicBoolean hasStats, AtomicBoolean dataReceived)
+    {
+        
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+        StreamStatsResponse streamStatsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+        assertThat(streamStatsResponse).isNotNull();
+        StreamsProgressStats streamProgress = 
streamStatsResponse.streamsProgressStats();
+        assertThat(streamProgress).isNotNull();
+        if (streamProgress.totalFilesToReceive() > 0)
+        {
+            hasStats.set(true);
+            if (streamProgress.totalFilesToReceive() == 
streamProgress.totalFilesReceived() &&
+                streamProgress.totalFilesReceived() > 0)
+            {
+                dataReceived.set(true);
+                
assertThat(streamProgress.totalBytesToReceive()).isEqualTo(streamProgress.totalBytesReceived());
+                
assertThat(streamProgress.totalBytesReceived()).isGreaterThan(0);
+            }
+        }
+    }
+
+    QualifiedTableName createTestTableAndPopulate()
+    {
+        QualifiedTableName tableName = createTestTable(
+        "CREATE TABLE %s ( \n" +
+        "  race_year int, \n" +
+        "  race_name text, \n" +
+        "  cyclist_name text, \n" +
+        "  rank int, \n" +
+        "  PRIMARY KEY ((race_year, race_name), rank) \n" +
+        ");");
+        Session session = maybeGetSession();
+
+        session.execute("CREATE INDEX ryear ON " + tableName + " 
(race_year);");
+
+        for (int i = 1; i <= 1000; i++)
+        {
+            session.execute("INSERT INTO " + tableName + " (race_year, 
race_name, rank, cyclist_name) " +
+                            "VALUES (2015, 'Tour of Japan - Stage 4 - Minami > 
Shinshu', " + i + ", 'Benjamin PRADES');");
+        }
+        return tableName;
+    }
+
+    /**
+     * ByteBuddy Helper for decommissioning node
+     */
+    public static class BBHelperDecommissioningNode
+    {
+        static CountDownLatch transientStateStart = new CountDownLatch(1);
+        static CountDownLatch transientStateEnd = new CountDownLatch(1);
+
+        public static void install(ClassLoader cl, Integer nodeNumber)
+        {
+            if (nodeNumber == 2)
+            {
+                TypePool typePool = TypePool.Default.of(cl);
+                TypeDescription description = 
typePool.describe("org.apache.cassandra.streaming.StreamCoordinator")
+                                                      .resolve();
+                new ByteBuddy().rebase(description, 
ClassFileLocator.ForClassLoader.of(cl))
+                               .method(named("connectAllStreamSessions"))
+                               
.intercept(MethodDelegation.to(BBHelperDecommissioningNode.class))
+                               // Defer class loading until all dependencies 
are loaded
+                               .make(TypeResolutionStrategy.Lazy.INSTANCE, 
typePool)
+                               .load(cl, 
ClassLoadingStrategy.Default.INJECTION);
+            }
+        }
+
+        @SuppressWarnings("unused")
+        public static void connectAllStreamSessions(@SuperCall 
Callable<StreamOperation> orig) throws Exception
+        {
+            transientStateStart.countDown();
+            Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+            orig.call();
+        }
+
+        public static void reset()
+        {
+            transientStateStart = new CountDownLatch(1);
+            transientStateEnd = new CountDownLatch(1);
+        }
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java
new file mode 100644
index 00000000..701eb628
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/routes/StreamStatsHandlerTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.response.StreamStatsResponse;
+import org.apache.cassandra.sidecar.common.response.data.StreamsProgressStats;
+import org.apache.cassandra.sidecar.common.server.MetricsOperations;
+import org.apache.cassandra.sidecar.common.server.StorageOperations;
+import org.apache.cassandra.sidecar.server.MainModule;
+import org.apache.cassandra.sidecar.server.Server;
+import org.mockito.stubbing.Answer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link StreamStatsHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class StreamStatsHandlerTest
+{
+
+    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamStatsHandlerTest.class);
+    Vertx vertx;
+    Server server;
+
+    Supplier<StreamStatsResponse> streamingStatsSupplier;
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new StreamingStatsTestModule());
+        Injector injector = Guice.createInjector(Modules.override(new 
MainModule())
+                                                        .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    @Test
+    void testStreamingStatsHandler(VertxTestContext context)
+    {
+        streamingStatsSupplier = () -> {
+            StreamStatsResponse response = new StreamStatsResponse("NORMAL",
+                                                                   new 
StreamsProgressStats(7, 7, 1024, 1024, 0, 0, 0, 0));
+            return response;
+        };
+
+
+        WebClient client = WebClient.create(vertx);
+        String testRoute = "/api/v1/cassandra/stats/streams";
+        client.get(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  StreamStatsResponse statsResponse = 
response.bodyAsJson(StreamStatsResponse.class);
+                  assertThat(statsResponse).isNotNull();
+                  
assertThat(statsResponse.operationMode()).isEqualTo("NORMAL");
+                  context.completeNow();
+              }));
+    }
+
+    class StreamingStatsTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public InstancesMetadata instancesMetadata()
+        {
+            int instanceId = 100;
+            String host = "127.0.0.1";
+            InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
+            when(instanceMetadata.host()).thenReturn(host);
+            when(instanceMetadata.port()).thenReturn(9042);
+            when(instanceMetadata.id()).thenReturn(instanceId);
+            when(instanceMetadata.stagingDir()).thenReturn("");
+            CassandraAdapterDelegate delegate = 
mock(CassandraAdapterDelegate.class);
+            StorageOperations ops = mock(StorageOperations.class);
+            when(ops.operationMode()).thenAnswer((Answer<String>) invocation 
-> streamingStatsSupplier.get().operationMode());
+            when(delegate.storageOperations()).thenReturn(ops);
+            MetricsOperations metricsOps = mock(MetricsOperations.class);
+            when(metricsOps.streamsProgressStats())
+            .thenAnswer((Answer<StreamsProgressStats>) invocation -> 
streamingStatsSupplier.get().streamsProgressStats());
+            when(delegate.metricsOperations()).thenReturn(metricsOps);
+
+            when(instanceMetadata.delegate()).thenReturn(delegate);
+
+            InstancesMetadata mockInstancesMetadata = 
mock(InstancesMetadata.class);
+            
when(mockInstancesMetadata.instances()).thenReturn(Collections.singletonList(instanceMetadata));
+            
when(mockInstancesMetadata.instanceFromId(instanceId)).thenReturn(instanceMetadata);
+            
when(mockInstancesMetadata.instanceFromHost(host)).thenReturn(instanceMetadata);
+
+            return mockInstancesMetadata;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to