This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 39ef2b6e CASSSIDECAR-364: Endpoint to invalidate auth caches as 
required (#276)
39ef2b6e is described below

commit 39ef2b6e1c7ceba4ed6b5f54c80157fc75dfa304
Author: Shruti Sekaran <[email protected]>
AuthorDate: Mon Dec 1 09:47:02 2025 -0800

    CASSSIDECAR-364: Endpoint to invalidate auth caches as required (#276)
    
    Patch by Shruti Sekaran; reviewed by Saranya Krishnakumar, Francisco 
Guerrero for CASSSIDECAR-364
---
 CHANGES.txt                                        |   1 +
 .../cassandra/sidecar/common/ApiEndpointsV1.java   |   4 +
 .../common/request/InvalidateCacheRequest.java     |  93 +++
 .../cassandra/sidecar/client/RequestContext.java   |  19 +-
 .../cassandra/sidecar/client/SidecarClient.java    |  16 +
 .../sidecar/client/SidecarClientTest.java          |  53 ++
 .../routes/InvalidateCacheIntegrationTest.java     | 700 +++++++++++++++++++++
 .../apache/cassandra/sidecar/acl/AuthCache.java    |  25 +
 .../cassandra/sidecar/acl/IdentityToRoleCache.java |   2 +-
 .../acl/authorization/BasicPermissions.java        |   3 +
 .../acl/authorization/RoleAuthorizationsCache.java |  17 +-
 .../sidecar/acl/authorization/SuperUserCache.java  |   4 +-
 .../sidecar/handlers/InvalidateCacheHandler.java   | 186 ++++++
 .../sidecar/metrics/server/CacheMetrics.java       |   4 +-
 .../cassandra/sidecar/modules/AuthModule.java      |  27 +-
 .../modules/multibindings/VertxRouteMapKeys.java   |   5 +
 .../cassandra/sidecar/utils/CacheFactory.java      |   2 +
 .../sidecar/acl/RoleAuthorizationsCacheTest.java   |  20 +-
 .../handlers/InvalidateCacheHandlerTest.java       | 359 +++++++++++
 19 files changed, 1516 insertions(+), 24 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c53846b7..4d3db817 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Endpoint to invalidate auth caches as required (CASSSIDECAR-364)
  * Add /api/v2/cassandra/settings which will return Cassandra configurations 
stored in system_views.settings (CASSSIDECAR-272)
  * Fast Cassandra Input Validator (CASSSIDECAR-361)
  * Upgrade caffeine dependency (CASSSIDECAR-332)
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
index 30b360e7..70742924 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/ApiEndpointsV1.java
@@ -116,6 +116,10 @@ public final class ApiEndpointsV1
     public static final String ABORT_RESTORE_JOB_ROUTE = RESTORE_JOB_ROUTE + 
ABORT;
     public static final String RESTORE_JOB_PROGRESS_ROUTE = RESTORE_JOB_ROUTE 
+ PROGRESS;
 
+    // Cache related APIs
+    public static final String CACHE_NAME_PARAM = ":cacheName";
+    public static final String INVALIDATE_CACHE_ROUTE = API_V1 + "/caches/" + 
CACHE_NAME_PARAM + "/invalidate";
+
     // CDC APIs
     public static final String CDC_PATH = "/cdc";
     public static final String SEGMENT_PATH_PARAM = ":segment";
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/InvalidateCacheRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/InvalidateCacheRequest.java
new file mode 100644
index 00000000..3a10e355
--- /dev/null
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/InvalidateCacheRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.request;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import io.netty.handler.codec.http.HttpMethod;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import org.apache.cassandra.sidecar.common.response.HealthResponse;
+import org.jetbrains.annotations.Nullable;
+
+import static java.net.URLEncoder.encode;
+
+/**
+ * Represents a request to invalidate the specified cache
+ */
+public class InvalidateCacheRequest extends JsonRequest<HealthResponse>
+{
+    /**
+     * Constructs a request to execute cache invalidation operation
+     *
+     * @param cacheName the name of the cache to invalidate
+     * @param keys the specific keys to invalidate, or null to invalidate all 
keys
+     */
+    public InvalidateCacheRequest(String cacheName, @Nullable List<String> 
keys)
+    {
+        super(buildRequestURI(cacheName, keys));
+    }
+
+    /**
+     * Builds the request URI with optional query parameters for specific keys
+     *
+     * @param cacheName the name of the cache to invalidate
+     * @param keys the specific keys to invalidate, or null to invalidate all 
keys
+     * @return the complete request URI with query parameters if keys are 
provided
+     */
+    private static String buildRequestURI(String cacheName, @Nullable 
List<String> keys)
+    {
+        String baseUri = ApiEndpointsV1.INVALIDATE_CACHE_ROUTE
+                         .replaceAll(ApiEndpointsV1.CACHE_NAME_PARAM, 
cacheName);
+
+        if (keys == null || keys.isEmpty())
+        {
+            return baseUri;
+        }
+
+        StringBuilder uri = new StringBuilder(baseUri).append('?');
+        for (int i = 0; i < keys.size(); i++)
+        {
+            if (i > 0)
+            {
+                uri.append('&');
+            }
+            try
+            {
+                uri.append("keys=").append(encode(keys.get(i), 
StandardCharsets.UTF_8.name()));
+            }
+            catch (UnsupportedEncodingException e)
+            {
+                // UTF-8 is always supported, this should never happen
+                throw new RuntimeException("UTF-8 encoding not supported", e);
+            }
+        }
+        return uri.toString();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public HttpMethod method()
+    {
+        return HttpMethod.DELETE;
+    }
+}
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
index 4bf13bd6..966aa47d 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/RequestContext.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.client;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -39,6 +40,7 @@ import 
org.apache.cassandra.sidecar.common.request.GossipHealthRequest;
 import org.apache.cassandra.sidecar.common.request.GossipInfoRequest;
 import org.apache.cassandra.sidecar.common.request.GossipUpdateRequest;
 import org.apache.cassandra.sidecar.common.request.ImportSSTableRequest;
+import org.apache.cassandra.sidecar.common.request.InvalidateCacheRequest;
 import org.apache.cassandra.sidecar.common.request.LifecycleInfoRequest;
 import org.apache.cassandra.sidecar.common.request.LifecycleUpdateRequest;
 import org.apache.cassandra.sidecar.common.request.ListOperationalJobsRequest;
@@ -460,6 +462,19 @@ public class RequestContext
             return request(new ClearSnapshotRequest(keyspace, tableName, 
snapshotName));
         }
 
+        /**
+         * Sets the {@code request} to be a {@link InvalidateCacheRequest} for 
the given {@code cacheName}
+         * and returns a reference to this Builder enabling method chaining.
+         *
+         * @param cacheName the name of the cache to invalidate
+         * @param keys the specific keys to invalidate, or null to invalidate 
all keys
+         * @return a reference to this Builder
+         */
+        public Builder invalidateCacheRequest(String cacheName, @Nullable 
List<String> keys)
+        {
+            return request(new InvalidateCacheRequest(cacheName, keys));
+        }
+
         /**
          * Sets the {@code request} to be a {@link 
CleanSSTableUploadSessionRequest} for the given {@code uploadId}
          * and returns a reference to this Builder enabling method chaining.
@@ -599,7 +614,7 @@ public class RequestContext
          * Sets the {@code request} to be a {@link GossipUpdateRequest} for the
          * given {@link NodeCommandRequestPayload.State state}, and returns a 
reference to this Builder enabling method chaining.
          *
-         * @param state  the desired state for gossip
+         * @param state the desired state for gossip
          * @return a reference to this Builder
          */
         public Builder nodeGossipUpdateRequest(@NotNull 
NodeCommandRequestPayload.State state)
@@ -611,7 +626,7 @@ public class RequestContext
          * Sets the {@code request} to be a {@link NativeUpdateRequest} for the
          * given {@link NodeCommandRequestPayload.State state}, and returns a 
reference to this Builder enabling method chaining.
          *
-         * @param state  the desired state for native transport
+         * @param state the desired state for native transport
          * @return a reference to this Builder
          */
         public Builder nodeNativeUpdateRequest(@NotNull 
NodeCommandRequestPayload.State state)
diff --git 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
index 9c8f2f87..04d0298c 100644
--- 
a/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
+++ 
b/client/src/main/java/org/apache/cassandra/sidecar/client/SidecarClient.java
@@ -374,6 +374,22 @@ public class SidecarClient implements AutoCloseable, 
SidecarClientBlobRestoreExt
                                                             .build());
     }
 
+    /**
+     * Executes the invalidate cache request using the default retry policy 
and provided {@code instance}
+     *
+     * @param instance  the instance where the request will be executed
+     * @param cacheName the name of the cache to invalidate
+     * @param keys the specific keys to invalidate, or null to invalidate all 
keys
+     * @return a completable future with the health response indicating success
+     */
+    public CompletableFuture<HealthResponse> invalidateCache(SidecarInstance 
instance,
+                                                             String cacheName, 
@Nullable List<String> keys)
+    {
+        return 
executor.executeRequestAsync(requestBuilder().singleInstanceSelectionPolicy(instance)
+                                                            
.invalidateCacheRequest(cacheName, keys)
+                                                            .build());
+    }
+
     /**
      * Executes the create snapshot request using the default retry policy and 
provided {@code instance}
      *
diff --git 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
index de2fd1df..7cc117c3 100644
--- 
a/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
+++ 
b/client/src/testFixtures/java/org/apache/cassandra/sidecar/client/SidecarClientTest.java
@@ -662,6 +662,59 @@ abstract class SidecarClientTest
         assertThat(request.getMethod()).isEqualTo("DELETE");
     }
 
+    @Test
+    void testInvalidateCache() throws Exception
+    {
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setHeader("content-type", "application/json")
+                                .setBody("{\"status\":\"OK\"}");
+        SidecarInstanceImpl sidecarInstance = instances.get(1);
+        MockWebServer mockWebServer = servers.get(1);
+        mockWebServer.enqueue(response);
+
+        HealthResponse result = client.invalidateCache(sidecarInstance, 
"identity_to_role_cache", null)
+                                      .get(30, TimeUnit.SECONDS);
+
+        assertThat(result).isNotNull();
+        assertThat(result.status()).isEqualToIgnoringCase("OK");
+        assertThat(result.isOk()).isTrue();
+
+        assertThat(mockWebServer.getRequestCount()).isEqualTo(1);
+        RecordedRequest request = mockWebServer.takeRequest();
+        
assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.INVALIDATE_CACHE_ROUTE
+                                                
.replaceAll(ApiEndpointsV1.CACHE_NAME_PARAM, "identity_to_role_cache"));
+        assertThat(request.getMethod()).isEqualTo("DELETE");
+    }
+
+    @Test
+    void testInvalidateCacheWithKeys() throws Exception
+    {
+        MockResponse response = new MockResponse()
+                                .setResponseCode(OK.code())
+                                .setHeader("content-type", "application/json")
+                                .setBody("{\"status\":\"OK\"}");
+        SidecarInstanceImpl sidecarInstance = instances.get(1);
+        MockWebServer mockWebServer = servers.get(1);
+        mockWebServer.enqueue(response);
+
+        List<String> keys = Arrays.asList("key1", "key2", "key3");
+        HealthResponse result = client.invalidateCache(sidecarInstance, 
"identity_to_role_cache", keys)
+                                      .get(30, TimeUnit.SECONDS);
+
+        assertThat(result).isNotNull();
+        assertThat(result.status()).isEqualToIgnoringCase("OK");
+        assertThat(result.isOk()).isTrue();
+
+        assertThat(mockWebServer.getRequestCount()).isEqualTo(1);
+        RecordedRequest request = mockWebServer.takeRequest();
+        
assertThat(request.getPath()).isEqualTo(ApiEndpointsV1.INVALIDATE_CACHE_ROUTE
+                                                
.replaceAll(ApiEndpointsV1.CACHE_NAME_PARAM, "identity_to_role_cache")
+                                                + 
"?keys=key1&keys=key2&keys=key3");
+        assertThat(request.getMethod()).isEqualTo("DELETE");
+    }
+
+
     @Test
     void testCreateSnapshot() throws Exception
     {
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
new file mode 100644
index 00000000..3b5e6fa0
--- /dev/null
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/InvalidateCacheIntegrationTest.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.routes;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.Test;
+
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpMethod;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+import org.apache.cassandra.sidecar.acl.authorization.AuthorizationCacheKey;
+import org.apache.cassandra.sidecar.acl.authorization.RoleAuthorizationsCache;
+import org.apache.cassandra.sidecar.acl.authorization.SuperUserCache;
+import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import 
org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.config.CacheConfiguration;
+import org.apache.cassandra.sidecar.config.KeyStoreConfiguration;
+import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.AccessControlConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.CacheConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.KeyStoreConfigurationImpl;
+import 
org.apache.cassandra.sidecar.config.yaml.ParameterizedClassConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
+import org.apache.cassandra.sidecar.config.yaml.SslConfigurationImpl;
+import org.apache.cassandra.sidecar.testing.MtlsTestHelper;
+import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
+import org.apache.cassandra.sidecar.testing.TemporaryCqlSessionProvider;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
+import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
+import org.apache.cassandra.testing.ClusterBuilderConfiguration;
+
+import static java.net.URLEncoder.encode;
+import static org.apache.cassandra.testing.DriverTestUtils.buildContactPoints;
+import static org.apache.cassandra.testing.TestUtils.DC1_RF1;
+import static org.apache.cassandra.testing.TlsTestUtils.getSSLOptions;
+import static 
org.apache.cassandra.testing.TlsTestUtils.withAuthenticatedSession;
+import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
+
+/**
+ * Integration test for cache invalidation endpoints
+ */
+class InvalidateCacheIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
+{
+    protected static final int MIN_VERSION_WITH_MTLS = 5;
+    private static final String CASSANDRA_IDENTITY = 
"spiffe://cassandra/sidecar/cassandra_role";
+    private static final String SIDECAR_ROLE_IDENTITY = 
"spiffe://cassandra/sidecar/sidecar_role";
+    private static final String TEST_USER_IDENTITY = 
"spiffe://cassandra/sidecar/test_user";
+    private static final String TEST_USER2_IDENTITY = 
"spiffe://cassandra/sidecar/test_user2";
+    private static final String TEST_SUPERUSER_IDENTITY = 
"spiffe://cassandra/sidecar/test_superuser";
+    private static final String TEST_SUPERUSER2_IDENTITY = 
"spiffe://cassandra/sidecar/test_superuser2";
+    private static final String TEST_SUPERUSER3_IDENTITY = 
"spiffe://cassandra/sidecar/test_superuser3";
+    private static final String SCHEMA_ROUTE = "/api/v1/cassandra/schema";
+    private static final String CACHE_INVALIDATE_ROUTE_TEMPLATE = 
"/api/v1/caches/%s/invalidate";
+
+    private Path testUserKeystorePath;
+    private Path testUser2KeystorePath;
+    private Path superuserKeystorePath;
+    private Path superuser2KeystorePath;
+    private Path superuser3KeystorePath;
+
+    @Override
+    protected void beforeClusterProvisioning()
+    {
+        // mTLS authentication was added in Cassandra starting 5.0 version
+        assumeThat(SimpleCassandraVersion.create(testVersion.version()).major)
+        .as("mTLS authentication is not supported in 4.0 and 4.1 Cassandra 
versions")
+        .isGreaterThanOrEqualTo(MIN_VERSION_WITH_MTLS);
+    }
+
+    @Override
+    protected ClusterBuilderConfiguration testClusterConfiguration()
+    {
+        return super.testClusterConfiguration()
+                    .additionalInstanceConfig(Map.of("authenticator", 
"org.apache.cassandra.auth.PasswordAuthenticator"));
+    }
+
+    @Override
+    protected void afterClusterProvisioned()
+    {
+        IInstance instance = cluster.getFirstRunningInstance();
+        configureAdminAndSidecarIdentity(instance);
+
+        cluster.stopUnchecked(instance);
+
+        var instanceConfig = instance.config();
+        instanceConfig.set("authenticator.class_name", 
"org.apache.cassandra.auth.MutualTlsAuthenticator");
+        instanceConfig.set("authenticator.parameters", 
Map.of("validator_class_name",
+                                                              
"org.apache.cassandra.auth.SpiffeCertificateValidator"));
+        instanceConfig.set("role_manager", "CassandraRoleManager");
+        instanceConfig.set("authorizer", "CassandraAuthorizer");
+        instanceConfig.set("client_encryption_options.enabled", "true");
+        instanceConfig.set("client_encryption_options.optional", "true");
+        instanceConfig.set("client_encryption_options.require_client_auth", 
"true");
+        
instanceConfig.set("client_encryption_options.require_endpoint_verification", 
"false");
+        instanceConfig.set("client_encryption_options.keystore", 
mtlsTestHelper.serverKeyStorePath());
+        instanceConfig.set("client_encryption_options.keystore_password", 
mtlsTestHelper.serverKeyStorePassword());
+        instanceConfig.set("client_encryption_options.truststore", 
mtlsTestHelper.trustStorePath());
+        instanceConfig.set("client_encryption_options.truststore_password", 
mtlsTestHelper.trustStorePassword());
+        instanceConfig.set("credentials_update_interval", "50ms");
+
+        instance.startup();
+    }
+
+    @Override
+    protected Function<SidecarConfigurationImpl.Builder, 
SidecarConfigurationImpl.Builder> configurationOverrides()
+    {
+        return builder -> {
+            Map<String, String> params = Map.of("certificate_validator", 
"io.vertx.ext.auth.mtls.impl.CertificateValidatorImpl",
+                                                
"certificate_identity_extractor", 
"org.apache.cassandra.sidecar.acl.authentication.CassandraIdentityExtractor");
+            ParameterizedClassConfiguration mTLSConfig
+            = new 
ParameterizedClassConfigurationImpl("org.apache.cassandra.sidecar.acl.authentication.MutualTlsAuthenticationHandlerFactory",
+                                                      params);
+            ParameterizedClassConfiguration rbacConfig
+            = new 
ParameterizedClassConfigurationImpl("org.apache.cassandra.sidecar.acl.authorization.RoleBasedAuthorizationProvider",
+                                                      Map.of());
+
+            CacheConfiguration permissionCacheConfiguration = 
CacheConfigurationImpl.builder()
+                                                                               
     .expireAfterAccess(MillisecondBoundConfiguration.parse("5s"))
+                                                                               
     .build();
+
+            AccessControlConfiguration accessControlConfiguration = 
AccessControlConfigurationImpl.builder()
+                                                                               
                   .enabled(true)
+                                                                               
                   .authenticatorsConfiguration(List.of(mTLSConfig))
+                                                                               
                   .authorizerConfiguration(rbacConfig)
+                                                                               
                   .permissionCacheConfiguration(permissionCacheConfiguration)
+                                                                               
                   .build();
+
+            KeyStoreConfiguration truststoreConfiguration = new 
KeyStoreConfigurationImpl(mtlsTestHelper.trustStorePath(),
+                                                                               
           mtlsTestHelper.trustStorePassword(),
+                                                                               
           mtlsTestHelper.trustStoreType(),
+                                                                               
           SecondBoundConfiguration.parse("1d"));
+
+            KeyStoreConfiguration keyStoreConfiguration = new 
KeyStoreConfigurationImpl(mtlsTestHelper.serverKeyStorePath(),
+                                                                               
         mtlsTestHelper.serverKeyStorePassword(),
+                                                                               
         mtlsTestHelper.serverKeyStoreType(),
+                                                                               
         SecondBoundConfiguration.parse("1d"));
+
+            SslConfigurationImpl sslConfiguration = 
SslConfigurationImpl.builder()
+                                                                        
.enabled(true)
+                                                                        
.clientAuth("REQUEST")
+                                                                        
.keystore(keyStoreConfiguration)
+                                                                        
.truststore(truststoreConfiguration)
+                                                                        
.build();
+
+            return 
builder.accessControlConfiguration(accessControlConfiguration)
+                          .sslConfiguration(sslConfiguration);
+        };
+    }
+
+    @Override
+    protected void startSidecar(ICluster<? extends IInstance> cluster) throws 
InterruptedException
+    {
+        serverWrapper = startSidecarWithInstances(cluster, new 
TestModule(mtlsTestHelper, cluster));
+    }
+
+    @Override
+    protected void beforeTestStart()
+    {
+        waitForSchemaReady(10, TimeUnit.SECONDS);
+        try
+        {
+            testUserKeystorePath = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                      
certificateBuilder.addSanUriName(TEST_USER_IDENTITY));
+            testUser2KeystorePath = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                       
certificateBuilder.addSanUriName(TEST_USER2_IDENTITY));
+            superuserKeystorePath = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                       
certificateBuilder.addSanUriName(TEST_SUPERUSER_IDENTITY));
+            superuser2KeystorePath = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                        
certificateBuilder.addSanUriName(TEST_SUPERUSER2_IDENTITY));
+            superuser3KeystorePath = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                        
certificateBuilder.addSanUriName(TEST_SUPERUSER3_IDENTITY));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("Failed to create test keystores", e);
+        }
+    }
+
+    @Override
+    protected void initializeSchemaForTest()
+    {
+        Path clientKeystorePath = cassandraIdentityClientKeyStore();
+
+        SSLOptions sslOptions = getSSLOptions(clientKeystorePath.toString(),
+                                              
mtlsTestHelper.clientKeyStorePassword(),
+                                              mtlsTestHelper.trustStorePath(),
+                                              
mtlsTestHelper.trustStorePassword());
+        withAuthenticatedSession(cluster.get(1), "cassandra", "cassandra", 
session -> {
+            createTestKeyspace(session, "sidecar_internal", DC1_RF1);
+            createRolesPermissionsTable(session);
+            createTestRole(session);
+        }, sslOptions);
+    }
+
+    @Test
+    void testInvalidateIdentityToRoleCache()
+    {
+        IdentityToRoleCache identityToRoleCache = 
serverWrapper.injector.getInstance(IdentityToRoleCache.class);
+        verifyFullCacheInvalidation(IdentityToRoleCache.NAME,
+                                   identityToRoleCache::getAll,
+                                   testUserKeystorePath,
+                                   TEST_USER_IDENTITY,
+                                   1,
+                                   TEST_SUPERUSER_IDENTITY);
+    }
+
+    @Test
+    void testInvalidateIdentityToRoleCacheWithKeys()
+    {
+        IdentityToRoleCache identityToRoleCache = 
serverWrapper.injector.getInstance(IdentityToRoleCache.class);
+        verifySelectiveKeyInvalidation(IdentityToRoleCache.NAME,
+                                      identityToRoleCache::getAll,
+                                      List.of(testUserKeystorePath, 
testUser2KeystorePath),
+                                      List.of(TEST_USER_IDENTITY),
+                                      List.of(TEST_USER_IDENTITY),
+                                      TEST_USER2_IDENTITY,
+                                      testUserKeystorePath,
+                                      TEST_USER_IDENTITY);
+    }
+
+    @Test
+    void testInvalidateIdentityToRoleCacheWithMultipleKeys()
+    {
+        IdentityToRoleCache identityToRoleCache = 
serverWrapper.injector.getInstance(IdentityToRoleCache.class);
+        verifySelectiveKeyInvalidation(IdentityToRoleCache.NAME,
+                                      identityToRoleCache::getAll,
+                                      List.of(testUserKeystorePath, 
testUser2KeystorePath, superuserKeystorePath),
+                                      List.of(TEST_USER_IDENTITY, 
TEST_USER2_IDENTITY),
+                                      List.of(TEST_USER_IDENTITY, 
TEST_USER2_IDENTITY),
+                                      TEST_SUPERUSER_IDENTITY,
+                                      testUserKeystorePath,
+                                      TEST_USER_IDENTITY);
+    }
+
+    @Test
+    void testInvalidateRoleAuthorizationsCache()
+    {
+        RoleAuthorizationsCache roleAuthorizationsCache = 
serverWrapper.injector.getInstance(RoleAuthorizationsCache.class);
+
+        // Clear the cache first to ensure clean state
+        String clearCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, RoleAuthorizationsCache.NAME);
+        verifyAccess(HttpMethod.DELETE, clearCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        String endpointCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, endpointCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, testUserKeystorePath, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(roleAuthorizationsCache.get("unique_cache_entry_key").get("test_role")).isNotNull());
+
+        // Invalidate cache and verify its empty
+        String invalidateCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, RoleAuthorizationsCache.NAME);
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(roleAuthorizationsCache.getAll().isEmpty()));
+
+        // Re-populate cache with test user and verify test user is back in 
cache
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, testUserKeystorePath, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(roleAuthorizationsCache.get("unique_cache_entry_key").get("test_role")).isNotNull());
+    }
+
+    @Test
+    void testInvalidateRoleAuthorizationsCacheWithKeys()
+    {
+        RoleAuthorizationsCache roleAuthorizationsCache = 
serverWrapper.injector.getInstance(RoleAuthorizationsCache.class);
+        verifyKeyBasedInvalidationNotSupported(RoleAuthorizationsCache.NAME,
+                                              roleAuthorizationsCache::getAll,
+                                              testUserKeystorePath);
+    }
+
+    @Test
+    void testInvalidateSuperUserCache()
+    {
+        SuperUserCache superUserCache = 
serverWrapper.injector.getInstance(SuperUserCache.class);
+        verifyFullCacheInvalidation(SuperUserCache.NAME,
+                                   superUserCache::getAll,
+                                   superuser2KeystorePath,
+                                   "test_superuser2_role",
+                                   1,
+                                   "test_superuser_role");
+    }
+
+    @Test
+    void testInvalidateSuperUserCacheWithKeys()
+    {
+        SuperUserCache superUserCache = 
serverWrapper.injector.getInstance(SuperUserCache.class);
+        verifySelectiveKeyInvalidation(SuperUserCache.NAME,
+                                      superUserCache::getAll,
+                                      List.of(superuser2KeystorePath, 
superuser3KeystorePath),
+                                      List.of("test_superuser2_role"),
+                                      List.of("test_superuser2_role"),
+                                      "test_superuser3_role",
+                                      superuser2KeystorePath,
+                                      "test_superuser2_role");
+    }
+
+    @Test
+    void testInvalidateSuperUserCacheWithMultipleKeys()
+    {
+        SuperUserCache superUserCache = 
serverWrapper.injector.getInstance(SuperUserCache.class);
+        verifySelectiveKeyInvalidation(SuperUserCache.NAME,
+                                      superUserCache::getAll,
+                                      List.of(superuserKeystorePath, 
superuser2KeystorePath, superuser3KeystorePath),
+                                      List.of("test_superuser2_role", 
"test_superuser3_role"),
+                                      List.of("test_superuser2_role", 
"test_superuser3_role"),
+                                      "test_superuser_role",
+                                      superuser2KeystorePath,
+                                      "test_superuser2_role");
+    }
+
+    @Test
+    void testInvalidateEndpointAuthorizationCache()
+    {
+        CacheFactory cacheFactory = 
serverWrapper.injector.getInstance(CacheFactory.class);
+        AsyncCache<AuthorizationCacheKey, Boolean> endpointAuthorizationCache 
= cacheFactory.endpointAuthorizationCache();
+
+        // Clear the cache first to ensure clean state
+        String clearCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, clearCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, testUserKeystorePath, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(endpointAuthorizationCache.synchronous().asMap().isEmpty()));
+
+        // Invalidate cache and verify
+        String invalidateCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(endpointAuthorizationCache.synchronous().asMap().isEmpty()));
+
+        // Re-populate cache with test user and verify test user is back in 
cache
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, testUserKeystorePath, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(endpointAuthorizationCache.synchronous().asMap()).isNotEmpty());
+    }
+
+    @Test
+    void testInvalidateEndpointAuthorizationCacheWithKeys()
+    {
+        CacheFactory cacheFactory = 
serverWrapper.injector.getInstance(CacheFactory.class);
+        AsyncCache<AuthorizationCacheKey, Boolean> endpointAuthorizationCache 
= cacheFactory.endpointAuthorizationCache();
+
+        
verifyKeyBasedInvalidationNotSupported(CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME,
+                                              () -> 
endpointAuthorizationCache.synchronous().asMap(),
+                                              testUserKeystorePath);
+    }
+
+    @Test
+    void testInvalidateUnknownCache()
+    {
+        String invalidateCacheRoute = 
"/api/v1/caches/unknown_cache/invalidate";
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.NOT_FOUND));
+    }
+
+    @Test
+    void testInvalidateCacheWithoutPermission()
+    {
+        // test_user does not have CACHE:INVALIDATE permission, should get 403
+        String invalidateCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, IdentityToRoleCache.NAME);
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
testUserKeystorePath, assertStatus(HttpResponseStatus.FORBIDDEN));
+
+        // Verify with other cache types as well
+        String roleAuthCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, RoleAuthorizationsCache.NAME);
+        verifyAccess(HttpMethod.DELETE, roleAuthCacheRoute, 
testUserKeystorePath, assertStatus(HttpResponseStatus.FORBIDDEN));
+
+        String superUserCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, SuperUserCache.NAME);
+        verifyAccess(HttpMethod.DELETE, superUserCacheRoute, 
testUserKeystorePath, assertStatus(HttpResponseStatus.FORBIDDEN));
+
+        String endpointAuthCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, endpointAuthCacheRoute, 
testUserKeystorePath, assertStatus(HttpResponseStatus.FORBIDDEN));
+    }
+
+    private void createRolesPermissionsTable(Session session)
+    {
+        String statement = "CREATE TABLE IF NOT EXISTS 
sidecar_internal.role_permissions_v1 ("
+                          + "role text,"
+                          + "resource text,"
+                          + "permissions set<text>,"
+                          + "PRIMARY KEY(role, resource))";
+        session.execute(statement);
+    }
+
+    private void createTestRole(Session session)
+    {
+        session.execute("CREATE ROLE IF NOT EXISTS \"test_role\" WITH 
SUPERUSER = false AND LOGIN = true");
+        session.execute(String.format("ADD IDENTITY IF NOT EXISTS '%s' TO ROLE 
'test_role'", TEST_USER_IDENTITY));
+
+        session.execute("CREATE ROLE IF NOT EXISTS \"test_role2\" WITH 
SUPERUSER = false AND LOGIN = true");
+        session.execute(String.format("ADD IDENTITY IF NOT EXISTS '%s' TO ROLE 
'test_role2'", TEST_USER2_IDENTITY));
+
+        // Create a test superuser role for testing SuperUserCache
+        session.execute("CREATE ROLE IF NOT EXISTS \"test_superuser_role\" 
WITH SUPERUSER = true AND LOGIN = true");
+        session.execute(String.format("ADD IDENTITY IF NOT EXISTS '%s' TO ROLE 
'test_superuser_role'", TEST_SUPERUSER_IDENTITY));
+
+        session.execute("CREATE ROLE IF NOT EXISTS \"test_superuser2_role\" 
WITH SUPERUSER = true AND LOGIN = true");
+        session.execute(String.format("ADD IDENTITY IF NOT EXISTS '%s' TO ROLE 
'test_superuser2_role'", TEST_SUPERUSER2_IDENTITY));
+
+        session.execute("CREATE ROLE IF NOT EXISTS \"test_superuser3_role\" 
WITH SUPERUSER = true AND LOGIN = true");
+        session.execute(String.format("ADD IDENTITY IF NOT EXISTS '%s' TO ROLE 
'test_superuser3_role'", TEST_SUPERUSER3_IDENTITY));
+
+        // Insert permissions into role_permissions_v1 to populate 
RoleAuthorizationsCache
+        session.execute("INSERT INTO sidecar_internal.role_permissions_v1 
(role, resource, permissions) " +
+                       "VALUES ('test_role', 'cluster', {'SCHEMA:READ'})");
+        session.execute("INSERT INTO sidecar_internal.role_permissions_v1 
(role, resource, permissions) " +
+                       "VALUES ('test_role', 'data', {'SNAPSHOT:CREATE'})");
+        session.execute("INSERT INTO sidecar_internal.role_permissions_v1 
(role, resource, permissions) " +
+                       "VALUES ('test_role2', 'cluster', {'SCHEMA:READ'})");
+        session.execute("INSERT INTO sidecar_internal.role_permissions_v1 
(role, resource, permissions) " +
+                       "VALUES ('test_role2', 'data', {'SNAPSHOT:CREATE'})");
+    }
+
+    private void configureAdminAndSidecarIdentity(IInstance instance)
+    {
+        for (int i = 0; i < 60; i++)
+        {
+            try
+            {
+                withAuthenticatedSession(instance, "cassandra", "cassandra", 
session -> {
+                    session.execute("CREATE ROLE IF NOT EXISTS 
\"sidecar_role\" " +
+                                    "WITH SUPERUSER = true " +
+                                    "AND LOGIN = true");
+                    session.execute(String.format("ADD IDENTITY IF NOT EXISTS 
'%s' TO ROLE 'cassandra'", CASSANDRA_IDENTITY));
+                    session.execute(String.format("ADD IDENTITY IF NOT EXISTS 
'%s' TO ROLE 'sidecar_role'", SIDECAR_ROLE_IDENTITY));
+                }, null);
+                return;
+            }
+            catch (Exception e)
+            {
+                try
+                {
+                    TimeUnit.SECONDS.sleep(1);
+                }
+                catch (InterruptedException ie)
+                {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(ie);
+                }
+            }
+        }
+    }
+
+    private Path cassandraIdentityClientKeyStore()
+    {
+        try
+        {
+            return mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                     
certificateBuilder.addSanUriName(CASSANDRA_IDENTITY));
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void verifyAccess(HttpMethod method, String testRoute, Path 
clientKeystorePath, Verifier<HttpResponse<Buffer>> assertions)
+    {
+        WebClient client = trustedClient(clientKeystorePath.toString(),
+                                         
mtlsTestHelper.clientKeyStorePassword(),
+                                         mtlsTestHelper.trustStorePath(),
+                                         mtlsTestHelper.trustStorePassword());
+        try
+        {
+            HttpResponse<Buffer> response = getBlocking(client.request(method, 
serverWrapper.serverPort, "127.0.0.1", testRoute).send());
+            assertions.accept(response);
+        }
+        finally
+        {
+            client.close();
+        }
+    }
+
+    /**
+     * Verifies that attempting to invalidate a cache with keys returns 
BAD_REQUEST and leaves cache unchanged.
+     * This is for caches that don't support key-based invalidation.
+     *
+     * @param cacheName the cache name
+     * @param cacheSupplier supplier that returns the cache map
+     * @param populateKeystore keystore to use for populating the cache before 
the test
+     */
+    private void verifyKeyBasedInvalidationNotSupported(String cacheName,
+                                                       
java.util.function.Supplier<java.util.Map<?, ?>> cacheSupplier,
+                                                       Path populateKeystore)
+    {
+        // Clear cache and populate
+        String clearCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, cacheName);
+        verifyAccess(HttpMethod.DELETE, clearCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        String endpointCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, endpointCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, populateKeystore, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> assertThat(cacheSupplier.get()).isNotEmpty());
+
+        // Record cache contents before attempting invalidation
+        java.util.Map<?, ?> entriesBeforeAttempt = new 
java.util.HashMap<>(cacheSupplier.get());
+
+        // Attempt to invalidate with specific keys - should return BAD_REQUEST
+        String invalidateCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE + "?keys=someKey", cacheName);
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.BAD_REQUEST));
+
+        // Verify cache contents are unchanged
+        java.util.Map<?, ?> entriesAfterAttempt = cacheSupplier.get();
+        entriesBeforeAttempt.forEach((key, value) -> {
+            assertThat(entriesAfterAttempt.containsKey(key)).isTrue();
+            assertThat(entriesAfterAttempt.get(key)).isEqualTo(value);
+        });
+    }
+
+    /**
+     * Complete test flow for selective cache invalidation with specific keys.
+     * Clears cache, populates with multiple entries, invalidates specific 
keys, verifies selective removal, and re-populates.
+     *
+     * @param cacheName the cache name to invalidate
+     * @param cacheSupplier supplier that returns the cache map
+     * @param populateKeystores keystores to use for populating the cache
+     * @param keysToInvalidate the keys to pass to the invalidation API (one 
or more)
+     * @param verifyRemovedKeys the keys that should be removed from cache
+     * @param verifyRemainingKey at least one key that should remain in cache
+     * @param repopulateKeystore keystore to use for re-populating after 
invalidation
+     * @param expectedRepopulatedKey the key expected to be back in cache 
after re-population
+     */
+    private void verifySelectiveKeyInvalidation(String cacheName,
+                                               
java.util.function.Supplier<java.util.Map<String, ?>> cacheSupplier,
+                                               List<Path> populateKeystores,
+                                               List<String> keysToInvalidate,
+                                               List<String> verifyRemovedKeys,
+                                               String verifyRemainingKey,
+                                               Path repopulateKeystore,
+                                               String expectedRepopulatedKey)
+    {
+        // Clear cache and populate
+        String clearCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, cacheName);
+        verifyAccess(HttpMethod.DELETE, clearCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        String endpointCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, endpointCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        for (Path keystore : populateKeystores)
+        {
+            verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, keystore, 
assertStatus(HttpResponseStatus.OK));
+        }
+
+        // Verify cache has multiple entries
+        loopAssert(3, () -> 
assertThat(cacheSupplier.get()).hasSizeGreaterThanOrEqualTo(populateKeystores.size()));
+        for (String removedKey : verifyRemovedKeys)
+        {
+            loopAssert(3, () -> 
assertThat(cacheSupplier.get()).containsKey(removedKey));
+        }
+
+        // Record initial size
+        int initialSize = cacheSupplier.get().size();
+
+        // Build invalidation route with multiple keys
+        StringBuilder routeBuilder = new 
StringBuilder(String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, cacheName));
+        for (int i = 0; i < keysToInvalidate.size(); i++)
+        {
+            routeBuilder.append(i == 0 ? "?keys=" : 
"&keys=").append(encode(keysToInvalidate.get(i), StandardCharsets.UTF_8));
+        }
+        String invalidateCacheRoute = routeBuilder.toString();
+
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        // We skip clearing endpoint_authorization_cache here to avoid 
repopulating the cache we just invalidated
+
+        // Verify selective invalidation
+        loopAssert(3, () -> {
+            java.util.Map<String, ?> remainingEntries = cacheSupplier.get();
+            assertThat(remainingEntries).isNotEmpty();
+            assertThat(remainingEntries).hasSize(initialSize - 
keysToInvalidate.size());
+            for (String removedKey : verifyRemovedKeys)
+            {
+                assertThat(remainingEntries).doesNotContainKey(removedKey);
+            }
+            assertThat(remainingEntries).containsKey(verifyRemainingKey);
+        });
+
+        // Re-populate and verify
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, repopulateKeystore, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(cacheSupplier.get()).containsKey(expectedRepopulatedKey));
+    }
+
+    /**
+     * Complete test flow for full cache invalidation (without keys).
+     * Clears cache, populates it, verifies population, invalidates it, 
verifies expected state, and re-populates.
+     *
+     * @param cacheName the cache name to invalidate
+     * @param cacheSupplier supplier that returns the cache map
+     * @param populateKeystore keystore to use for populating the cache
+     * @param verifyKey key to verify exists in cache after population
+     * @param expectedSizeAfterInvalidation expected cache size after 
invalidation (0 for empty, or typically 1 for superuser entry)
+     * @param verifyRemainingKey key expected to remain after invalidation 
(can be null if expectedSizeAfterInvalidation is 0)
+     */
+    private void verifyFullCacheInvalidation(String cacheName,
+                                            
java.util.function.Supplier<java.util.Map<String, ?>> cacheSupplier,
+                                            Path populateKeystore,
+                                            String verifyKey,
+                                            int expectedSizeAfterInvalidation,
+                                            String verifyRemainingKey)
+    {
+        // Clear cache and populate
+        String clearCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, cacheName);
+        verifyAccess(HttpMethod.DELETE, clearCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        String endpointCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+        verifyAccess(HttpMethod.DELETE, endpointCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, populateKeystore, 
assertStatus(HttpResponseStatus.OK));
+
+        // Verify cache has the expected entry
+        loopAssert(3, () -> 
assertThat(cacheSupplier.get()).containsKey(verifyKey));
+
+        // Invalidate cache
+        String invalidateCacheRoute = 
String.format(CACHE_INVALIDATE_ROUTE_TEMPLATE, cacheName);
+        verifyAccess(HttpMethod.DELETE, invalidateCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+        verifyAccess(HttpMethod.DELETE, endpointCacheRoute, 
superuserKeystorePath, assertStatus(HttpResponseStatus.OK));
+
+        // Verify expected state after invalidation
+        if (expectedSizeAfterInvalidation == 0)
+        {
+            // Cache should be completely empty
+            loopAssert(3, () -> assertThat(cacheSupplier.get()).isEmpty());
+        }
+        else
+        {
+            // Cache should have remaining entries
+            loopAssert(3, () -> {
+                java.util.Map<String, ?> remainingEntries = 
cacheSupplier.get();
+                assertThat(remainingEntries).isNotEmpty();
+                
assertThat(remainingEntries).hasSize(expectedSizeAfterInvalidation);
+                assertThat(remainingEntries).doesNotContainKey(verifyKey);
+                assertThat(remainingEntries).containsKey(verifyRemainingKey);
+            });
+        }
+
+        // Re-populate and verify
+        verifyAccess(HttpMethod.GET, SCHEMA_ROUTE, populateKeystore, 
assertStatus(HttpResponseStatus.OK));
+        loopAssert(3, () -> 
assertThat(cacheSupplier.get()).containsKey(verifyKey));
+    }
+
+    static class TestModule extends AbstractModule
+    {
+        private final MtlsTestHelper mtlsTestHelper;
+        private final ICluster<? extends IInstance> cluster;
+
+        TestModule(MtlsTestHelper mtlsTestHelper, ICluster<? extends 
IInstance> cluster)
+        {
+            this.mtlsTestHelper = mtlsTestHelper;
+            this.cluster = cluster;
+        }
+
+        @Provides
+        @Singleton
+        public CQLSessionProvider cqlSessionProvider()
+        {
+            Path clientKeystoreForSidecarToCassandraConnections;
+            try
+            {
+                clientKeystoreForSidecarToCassandraConnections = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                               
                     certificateBuilder.addSanUriName(SIDECAR_ROLE_IDENTITY));
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+
+            SSLOptions sslOptions = 
getSSLOptions(clientKeystoreForSidecarToCassandraConnections.toString(),
+                                                  
mtlsTestHelper.clientKeyStorePassword(),
+                                                  
mtlsTestHelper.trustStorePath(),
+                                                  
mtlsTestHelper.trustStorePassword());
+            return new TemporaryCqlSessionProvider(buildContactPoints(cluster),
+                                                   
org.apache.cassandra.sidecar.testing.SharedExecutorNettyOptions.INSTANCE,
+                                                   sslOptions);
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/AuthCache.java 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/AuthCache.java
index 28e12857..f1c57512 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/acl/AuthCache.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/acl/AuthCache.java
@@ -138,6 +138,31 @@ public abstract class AuthCache<K, V>
         }
     }
 
+    /**
+     * Invalidate multiple keys.
+     * @param keys keys to invalidate
+     */
+    public void invalidateAll(Iterable<? extends K> keys)
+    {
+        if (cache != null)
+        {
+            cache.invalidateAll(keys);
+            logger.info("Cache entries with keys={} have been invalidated from 
cache={}", keys, this.name);
+        }
+    }
+
+    /**
+     * Invalidate all entries in this cache
+     */
+    public void invalidateAll()
+    {
+        if (cache != null)
+        {
+            cache.invalidateAll();
+            logger.info("All cache entries from {} have been invalidated", 
this.name);
+        }
+    }
+
     private LoadingCache<K, V> initCache()
     {
         if (config.refreshAfterWrite() == null && config.expireAfterAccess() 
== null)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/IdentityToRoleCache.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/IdentityToRoleCache.java
index 752d2273..97d9f4ba 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/IdentityToRoleCache.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/IdentityToRoleCache.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 @Singleton
 public class IdentityToRoleCache extends AuthCache<String, String>
 {
-    private static final String NAME = "identity_to_role_cache";
+    public static final String NAME = "identity_to_role_cache";
 
     @Inject
     public IdentityToRoleCache(Vertx vertx,
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
index 66b1972b..f09ed863 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/BasicPermissions.java
@@ -39,6 +39,9 @@ import static 
org.apache.cassandra.sidecar.acl.authorization.ResourceScopes.TABL
  */
 public class BasicPermissions
 {
+    // Cache related permissions
+    public static final Permission INVALIDATE_CACHE = new 
DomainAwarePermission("CACHE:INVALIDATE", CLUSTER_SCOPE);
+
     // SSTable staging related permissions
     public static final Permission UPLOAD_STAGED_SSTABLE = new 
DomainAwarePermission("STAGED_SSTABLE:UPLOAD", TABLE_SCOPE);
     public static final Permission IMPORT_STAGED_SSTABLE = new 
DomainAwarePermission("STAGED_SSTABLE:IMPORT", TABLE_SCOPE);
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java
index d56a9023..cf854d2b 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/RoleAuthorizationsCache.java
@@ -35,15 +35,20 @@ import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 
 /**
- * Caches role and authorizations held by it. Entries from 
system_auth.role_permissions table in Cassandra and
- * sidecar_internal.role_permissions_v1 table are processed into 
authorizations and cached here. All table entries are
- * stored against a unique cache key. Caching against UNIQUE_CACHE_ENTRY is 
done to make sure new entries in the table
- * are picked up during cache refreshes.
+ * Caches role-to-authorizations mappings. Entries from 
system_auth.role_permissions table in Cassandra and
+ * sidecar_internal.role_permissions_v1 table are processed into 
authorizations and cached here.
+ *
+ * <p>All table entries are stored against a single unique cache key ({@link 
#UNIQUE_CACHE_ENTRY}). This design is
+ * necessary because the role-to-resource-to-permissions relationship is a 
many-to-many mapping. Without bulk loading,
+ * cache refresh would require expensive full table scans to find all entries 
for each role.
+ *
+ * <p>As a result, this cache only supports bulk operations and cannot load or 
invalidate individual keys. The single
+ * key approach ensures that all new entries in the underlying tables are 
picked up during cache refreshes.
  */
 @Singleton
 public class RoleAuthorizationsCache extends AuthCache<String, Map<String, 
Set<Authorization>>>
 {
-    private static final String NAME = "role_permissions_cache";
+    public static final String NAME = "role_authorizations_cache";
     protected static final String UNIQUE_CACHE_ENTRY = 
"unique_cache_entry_key";
 
     @Inject
@@ -66,7 +71,7 @@ public class RoleAuthorizationsCache extends 
AuthCache<String, Map<String, Set<A
                                                                 sidecarSchema,
                                                                 
sidecarPermissionsDatabaseAccessor)),
               
sidecarConfiguration.accessControlConfiguration().permissionCacheConfiguration(),
-              sidecarMetrics.server().cache().rolePermissionsCacheMetrics);
+              sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics);
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SuperUserCache.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SuperUserCache.java
index 511af804..238d6ccb 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SuperUserCache.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/SuperUserCache.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.acl.authorization;
 
 import com.google.inject.Inject;
+import com.google.inject.Singleton;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.sidecar.acl.AuthCache;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
@@ -31,9 +32,10 @@ import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
  * (directly or indirectly) has superuser status.
  * Note: {@link SuperUserCache} maintains only the superuser status. It can 
not guarantee whether a role exists
  */
+@Singleton
 public class SuperUserCache extends AuthCache<String, Boolean>
 {
-    private static final String NAME = "super_user_cache";
+    public static final String NAME = "super_user_cache";
 
     @Inject
     public SuperUserCache(Vertx vertx,
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
new file mode 100644
index 00000000..aa71ee5d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandler.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.AuthCache;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+import org.apache.cassandra.sidecar.acl.authorization.AuthorizationCacheKey;
+import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
+import org.apache.cassandra.sidecar.acl.authorization.RoleAuthorizationsCache;
+import org.apache.cassandra.sidecar.acl.authorization.SuperUserCache;
+import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
+import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
+import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.cassandra.sidecar.modules.ApiModule.OK_STATUS;
+import static 
org.apache.cassandra.sidecar.utils.CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME;
+import static 
org.apache.cassandra.sidecar.utils.HttpExceptions.wrapHttpException;
+
+/**
+ * Provides REST API for invalidating caches.
+ * Supports full cache invalidation or selective key-based invalidation where 
applicable.
+ * Currently, supports authentication and authorization caches including 
identity-to-role mappings,
+ * role authorizations, superuser cache and endpoint authorization cache.
+ */
+@Singleton
+public class InvalidateCacheHandler extends 
AbstractHandler<InvalidateCacheHandler.Params> implements AccessProtected
+{
+    private static final String IDENTITY_TO_ROLE_CACHE_NO_UNDERSCORE = 
"identitytorolecache";
+    private static final String SUPER_USER_CACHE_NO_UNDERSCORE = 
"superusercache";
+    private static final String ROLE_AUTHORIZATIONS_CACHE_NO_UNDERSCORE = 
"roleauthorizationscache";
+    private static final String ENDPOINT_AUTHORIZATION_CACHE_NO_UNDERSCORE = 
"endpointauthorizationcache";
+
+    private final IdentityToRoleCache identityToRoleCache;
+    private final RoleAuthorizationsCache roleAuthorizationsCache;
+    private final SuperUserCache superUserCache;
+    private final AsyncCache<AuthorizationCacheKey, Boolean> 
endpointAuthorizationCache;
+
+    @Inject
+    public InvalidateCacheHandler(InstanceMetadataFetcher metadataFetcher,
+                                  ExecutorPools executorPools,
+                                  CassandraInputValidator validator,
+                                  IdentityToRoleCache identityToRoleCache,
+                                  RoleAuthorizationsCache 
roleAuthorizationsCache,
+                                  SuperUserCache superUserCache,
+                                  CacheFactory cacheFactory)
+    {
+        super(metadataFetcher, executorPools, validator);
+        this.identityToRoleCache = identityToRoleCache;
+        this.roleAuthorizationsCache = roleAuthorizationsCache;
+        this.superUserCache = superUserCache;
+        this.endpointAuthorizationCache = 
cacheFactory.endpointAuthorizationCache();
+    }
+
+    @Override
+    public Set<Authorization> requiredAuthorizations()
+    {
+        return 
Collections.singleton(BasicPermissions.INVALIDATE_CACHE.toAuthorization());
+    }
+
+    @Override
+    protected Params extractParamsOrThrow(RoutingContext context)
+    {
+        String cacheName = context.pathParam("cacheName");
+        List<String> keys = context.queryParam("keys");
+        return new Params(cacheName, keys);
+    }
+
+    @Override
+    protected void handleInternal(RoutingContext context,
+                                  HttpServerRequest httpRequest,
+                                  @NotNull String host,
+                                  SocketAddress remoteAddress,
+                                  Params params)
+    {
+        String cacheName = params.cacheName;
+        boolean invalidateAll = params.invalidateAll();
+
+        switch (cacheName.toLowerCase())
+        {
+            case IdentityToRoleCache.NAME:
+            case IDENTITY_TO_ROLE_CACHE_NO_UNDERSCORE:
+                invalidateAuthCache(identityToRoleCache, params, true);
+                break;
+            case SuperUserCache.NAME:
+            case SUPER_USER_CACHE_NO_UNDERSCORE:
+                invalidateAuthCache(superUserCache, params, true);
+                break;
+            case RoleAuthorizationsCache.NAME:
+            case ROLE_AUTHORIZATIONS_CACHE_NO_UNDERSCORE:
+                invalidateAuthCache(roleAuthorizationsCache, params, false);
+                break;
+            case ENDPOINT_AUTHORIZATION_CACHE_NAME:
+            case ENDPOINT_AUTHORIZATION_CACHE_NO_UNDERSCORE:
+                if (!invalidateAll)
+                {
+                    throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                                   
"endpoint_authorization_cache does not support selective key invalidation");
+                }
+                endpointAuthorizationCache.synchronous().invalidateAll();
+                break;
+            default:
+                throw wrapHttpException(HttpResponseStatus.NOT_FOUND,
+                                               "Unknown cache: " + cacheName);
+        }
+
+        logger.info("Cache {} invalidated successfully. Keys: {}", cacheName,
+                    invalidateAll ? "all" : params.keys);
+        context.json(OK_STATUS);
+    }
+
+    /**
+     * Generic method to invalidate an AuthCache with optional list of keys 
for invalidation
+     *
+     * @param cache                         AuthCache to invalidate
+     * @param params                        params containing cache name and 
keys
+     * @param supportsSelectiveInvalidation whether cache supports selective 
key invalidation
+     * @param <V> the cache value type
+     */
+    private <V> void invalidateAuthCache(AuthCache<String, V> cache, Params 
params, boolean supportsSelectiveInvalidation)
+    {
+        if (!supportsSelectiveInvalidation && !params.invalidateAll())
+        {
+            throw wrapHttpException(HttpResponseStatus.BAD_REQUEST,
+                                    params.cacheName + " does not support 
selective key invalidation");
+        }
+
+        if (params.invalidateAll())
+        {
+            cache.invalidateAll();
+        }
+        else
+        {
+            cache.invalidateAll(params.keys);
+        }
+    }
+
+    /**
+     * Simple holder class for cache invalidation parameters
+     */
+    protected static class Params
+    {
+        final String cacheName;
+        final List<String> keys;
+
+        Params(String cacheName, List<String> keys)
+        {
+            this.cacheName = cacheName;
+            this.keys = keys;
+        }
+
+        boolean invalidateAll()
+        {
+            return keys == null || keys.isEmpty();
+        }
+    }
+
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
index 84a7da4d..dd19d38a 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
@@ -31,7 +31,7 @@ public class CacheMetrics
     public final CacheStatsCounter snapshotCacheMetrics;
     public final CacheStatsCounter identityToRoleCacheMetrics;
     public final CacheStatsCounter superUserCacheMetrics;
-    public final CacheStatsCounter rolePermissionsCacheMetrics;
+    public final CacheStatsCounter roleAuthorizationsCacheMetrics;
     public final CacheStatsCounter authorizationCacheMetrics;
 
     public CacheMetrics(MetricRegistry globalMetricRegistry)
@@ -39,7 +39,7 @@ public class CacheMetrics
         snapshotCacheMetrics = new CacheStatsCounter(globalMetricRegistry, 
SNAPSHOT_CACHE_NAME);
         identityToRoleCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "identity_to_role_cache");
         superUserCacheMetrics = new CacheStatsCounter(globalMetricRegistry, 
"super_user_cache");
-        rolePermissionsCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "role_permissions_cache");
+        roleAuthorizationsCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "role_authorizations_cache");
         authorizationCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "authorization_cache");
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java
index dd27b4f3..3efe75ed 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java
@@ -30,6 +30,8 @@ import com.google.inject.multibindings.ProvidesIntoMap;
 import io.vertx.core.Vertx;
 import io.vertx.ext.auth.authorization.AuthorizationProvider;
 import io.vertx.ext.web.handler.ChainAuthHandler;
+import jakarta.ws.rs.DELETE;
+import jakarta.ws.rs.Path;
 import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
 import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
 import 
org.apache.cassandra.sidecar.acl.authentication.AuthenticationHandlerFactory;
@@ -44,6 +46,7 @@ import 
org.apache.cassandra.sidecar.acl.authorization.PermissionFactory;
 import org.apache.cassandra.sidecar.acl.authorization.PermissionFactoryImpl;
 import org.apache.cassandra.sidecar.acl.authorization.RoleAuthorizationsCache;
 import 
org.apache.cassandra.sidecar.acl.authorization.RoleBasedAuthorizationProvider;
+import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
 import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
 import org.apache.cassandra.sidecar.config.ParameterizedClassConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
@@ -51,6 +54,7 @@ import 
org.apache.cassandra.sidecar.db.schema.SidecarRolePermissionsSchema;
 import org.apache.cassandra.sidecar.db.schema.SystemAuthSchema;
 import org.apache.cassandra.sidecar.db.schema.TableSchema;
 import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+import org.apache.cassandra.sidecar.handlers.InvalidateCacheHandler;
 import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.apache.cassandra.sidecar.modules.multibindings.KeyClassMapKey;
 import org.apache.cassandra.sidecar.modules.multibindings.TableSchemaMapKeys;
@@ -59,6 +63,8 @@ import org.apache.cassandra.sidecar.routes.RouteBuilder;
 import org.apache.cassandra.sidecar.routes.RoutingOrder;
 import org.apache.cassandra.sidecar.routes.VertxRoute;
 import org.apache.cassandra.sidecar.utils.CacheFactory;
+import org.eclipse.microprofile.openapi.annotations.Operation;
+import org.eclipse.microprofile.openapi.annotations.responses.APIResponse;
 
 /**
  * Provides authentication and authorization (role-based) capability
@@ -66,14 +72,14 @@ import org.apache.cassandra.sidecar.utils.CacheFactory;
 public class AuthModule extends AbstractModule
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AuthModule.class);
-    
+
     @ProvidesIntoMap
     @KeyClassMapKey(TableSchemaMapKeys.SystemAuthSchemaKey.class)
     TableSchema systemAuthSchema() // Note that it returns TableSchema, not 
CassandraSystemTableSchema, in order to locate the same mapBinder
     {
         return new SystemAuthSchema();
     }
-    
+
     @ProvidesIntoMap
     @KeyClassMapKey(TableSchemaMapKeys.SidecarRolePermissionsSchemaKey.class)
     TableSchema sidecarRolePermissionSchema(SidecarConfiguration 
sidecarConfiguration)
@@ -157,6 +163,23 @@ public class AuthModule extends AbstractModule
                                                  .handler(chainAuthHandler));
     }
 
+    @DELETE
+    @Path(ApiEndpointsV1.INVALIDATE_CACHE_ROUTE)
+    @Operation(summary = "Invalidate the specified cache",
+    description = "Invalidates entries in the specified cache. Supports full 
cache invalidation or selective key-based invalidation where applicable. " +
+                  "Valid cache names: identity_to_role_cache, 
role_authorizations_cache, super_user_cache, endpoint_authorization_cache.")
+    @APIResponse(description = "Cache invalidated successfully",
+    responseCode = "200")
+    @ProvidesIntoMap
+    @KeyClassMapKey(VertxRouteMapKeys.InvalidateCacheKey.class)
+    VertxRoute invalidateCacheRouteKey(RouteBuilder.Factory factory,
+                                       InvalidateCacheHandler 
invalidateCacheHandler)
+    {
+        return factory.builderForRoute()
+                      .handler(invalidateCacheHandler)
+                      .build();
+    }
+
     @Provides
     @Singleton
     PermissionFactory permissionFactory()
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
index bc902ad9..99bd450c 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/multibindings/VertxRouteMapKeys.java
@@ -184,6 +184,11 @@ public interface VertxRouteMapKeys
         HttpMethod HTTP_METHOD = HttpMethod.GET;
         String ROUTE_URI = ApiEndpointsV1.RESTORE_JOB_ROUTE;
     }
+    interface InvalidateCacheKey extends RouteClassKey
+    {
+        HttpMethod HTTP_METHOD = HttpMethod.DELETE;
+        String ROUTE_URI = ApiEndpointsV1.INVALIDATE_CACHE_ROUTE;
+    }
     interface KeyspaceSchemaRouteKey extends RouteClassKey
     {
         HttpMethod HTTP_METHOD = HttpMethod.GET;
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
index 01515324..2174ac57 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
@@ -45,6 +45,8 @@ public class CacheFactory
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CacheFactory.class);
 
+    public static final String ENDPOINT_AUTHORIZATION_CACHE_NAME = 
"endpoint_authorization_cache";
+
     private final Cache<SSTableImporter.ImportOptions, Future<Void>> 
ssTableImportCache;
     private final AsyncCache<AuthorizationCacheKey, Boolean> 
endpointAuthorizationCache;
 
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/acl/RoleAuthorizationsCacheTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/RoleAuthorizationsCacheTest.java
index f0477a5a..c137eb67 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/acl/RoleAuthorizationsCacheTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/RoleAuthorizationsCacheTest.java
@@ -110,7 +110,7 @@ class RoleAuthorizationsCacheTest
                                                                     
sidecarMetrics);
         assertThat(cache.getAll().size()).isZero();
         assertThat(cache.getAuthorizations("test_role1").size()).isEqualTo(2);
-        CacheStats initialCallStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats initialCallStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(initialCallStats.hitCount()).isZero();
         assertThat(initialCallStats.missCount()).isOne();
         assertThat(cache.getAll().size()).isOne();
@@ -123,20 +123,20 @@ class RoleAuthorizationsCacheTest
 
         // New entries fetched during refreshes
         assertThat(cache.getAuthorizations("test_role2").size()).isOne();
-        CacheStats afterRefreshStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats afterRefreshStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(afterRefreshStats.hitCount()).isZero();
         assertThat(afterRefreshStats.missCount()).isOne();
         assertThat(cache.getAll().size()).isOne();
         assertThat(afterRefreshStats.evictionCount()).isOne();
 
         assertThat(cache.getAuthorizations("test_role2").size()).isOne();
-        CacheStats validEntryStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats validEntryStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(validEntryStats.hitCount()).isOne();
         assertThat(validEntryStats.missCount()).isZero();
 
         // check for not existing role
         cache.getAuthorizations("non_existing_role");
-        CacheStats afterMissStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats afterMissStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(afterMissStats.missCount()).isEqualTo(0);
         // It is a hit, since we load entire role_permissions table during 
each refresh
         assertThat(afterMissStats.hitCount()).isOne();
@@ -167,7 +167,7 @@ class RoleAuthorizationsCacheTest
         assertThat(cache.getAuthorizations("test_role1").size()).isEqualTo(1);
         assertThat(cache.getAuthorizations("test_role1").size()).isEqualTo(1);
 
-        CacheStats multipleRetrievalStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats multipleRetrievalStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(multipleRetrievalStats.hitCount()).isEqualTo(4);
         assertThat(multipleRetrievalStats.loadSuccessCount()).isEqualTo(1);
         assertThat(multipleRetrievalStats.loadFailureCount()).isEqualTo(0);
@@ -324,13 +324,13 @@ class RoleAuthorizationsCacheTest
                                                                     
mockSidecarPermissionsAccessor,
                                                                     
sidecarMetrics);
 
-        CacheStats initialStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats initialStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(initialStats.loadCount()).isZero();
         assertThat(initialStats.totalLoadTime()).isZero();
 
         cache.getAuthorizations("test_role1");
 
-        CacheStats afterLoadStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats afterLoadStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(afterLoadStats.loadCount()).isOne();
         assertThat(afterLoadStats.totalLoadTime()).isGreaterThan(0);
 
@@ -354,7 +354,7 @@ class RoleAuthorizationsCacheTest
                                                                     
mockSidecarPermissionsAccessor,
                                                                     
sidecarMetrics);
 
-        CacheStats initialStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats initialStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(initialStats.loadFailureCount()).isZero();
 
         try
@@ -366,7 +366,7 @@ class RoleAuthorizationsCacheTest
             // ignore exception
         }
 
-        CacheStats afterFailureStats = 
sidecarMetrics.server().cache().rolePermissionsCacheMetrics.snapshot();
+        CacheStats afterFailureStats = 
sidecarMetrics.server().cache().roleAuthorizationsCacheMetrics.snapshot();
         assertThat(afterFailureStats.loadFailureCount()).isEqualTo(1);
         assertThat(afterFailureStats.loadCount()).isEqualTo(1);
         assertThat(afterFailureStats.loadSuccessCount()).isEqualTo(0);
@@ -392,7 +392,7 @@ class RoleAuthorizationsCacheTest
                                           mockSidecarPermissionsAccessor,
                                           sidecarMetrics))
         .isInstanceOf(ConfigurationException.class)
-        .hasMessageContaining("role_permissions_cache must be configured with 
either refreshAfterWrite or expireAfterAccess");
+        .hasMessageContaining("role_authorizations_cache must be configured 
with either refreshAfterWrite or expireAfterAccess");
     }
 
     @Test
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
new file mode 100644
index 00000000..465b447e
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/InvalidateCacheHandlerTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.handlers;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.util.Modules;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.predicate.ResponsePredicate;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.TestModule;
+import org.apache.cassandra.sidecar.acl.AuthCache;
+import org.apache.cassandra.sidecar.acl.IdentityToRoleCache;
+import org.apache.cassandra.sidecar.acl.authorization.AuthorizationCacheKey;
+import org.apache.cassandra.sidecar.acl.authorization.RoleAuthorizationsCache;
+import org.apache.cassandra.sidecar.acl.authorization.SuperUserCache;
+import org.apache.cassandra.sidecar.modules.SidecarModules;
+import org.apache.cassandra.sidecar.server.Server;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the {@link InvalidateCacheHandler}
+ */
+@ExtendWith(VertxExtension.class)
+public class InvalidateCacheHandlerTest
+{
+    static final Logger LOGGER = 
LoggerFactory.getLogger(InvalidateCacheHandlerTest.class);
+    static final String TEST_ROUTE_TEMPLATE = "/api/v1/caches/%s/invalidate";
+
+    Vertx vertx;
+    Server server;
+    IdentityToRoleCache mockIdentityToRoleCache = 
mock(IdentityToRoleCache.class);
+    RoleAuthorizationsCache mockRoleAuthorizationsCache = 
mock(RoleAuthorizationsCache.class);
+    SuperUserCache mockSuperUserCache = mock(SuperUserCache.class);
+    AsyncCache<AuthorizationCacheKey, Boolean> mockEndpointAuthorizationCache 
= mock(AsyncCache.class);
+    Cache<AuthorizationCacheKey, Boolean> mockSyncCache = mock(Cache.class);
+
+    @BeforeEach
+    void before() throws InterruptedException
+    {
+        // Stub the AsyncCache.synchronous() to return the sync cache mock
+        
when(mockEndpointAuthorizationCache.synchronous()).thenReturn(mockSyncCache);
+
+        Injector injector;
+        Module testOverride = Modules.override(new TestModule())
+                                     .with(new InvalidateCacheTestModule());
+        injector = Guice.createInjector(Modules.override(SidecarModules.all())
+                                               .with(testOverride));
+        vertx = injector.getInstance(Vertx.class);
+        server = injector.getInstance(Server.class);
+        VertxTestContext context = new VertxTestContext();
+        server.start()
+              .onSuccess(s -> context.completeNow())
+              .onFailure(context::failNow);
+        context.awaitCompletion(5, TimeUnit.SECONDS);
+    }
+
+    @AfterEach
+    void after() throws InterruptedException
+    {
+        CountDownLatch closeLatch = new CountDownLatch(1);
+        server.close().onSuccess(res -> closeLatch.countDown());
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+            LOGGER.info("Close event received before timeout.");
+        else
+            LOGGER.error("Close event timed out.");
+    }
+
+    // IdentityToRoleCache tests
+    @Test
+    void testInvalidateIdentityToRoleCache(VertxTestContext context)
+    {
+        verifyInvalidateCache(context, IdentityToRoleCache.NAME, null, OK, 
mockIdentityToRoleCache, mockRoleAuthorizationsCache, mockSuperUserCache);
+    }
+
+    @Test
+    void testInvalidateIdentityToRoleCacheAlternativeName(VertxTestContext 
context)
+    {
+        verifyInvalidateCache(context, "IdentityToRoleCache", null, OK, 
mockIdentityToRoleCache, mockRoleAuthorizationsCache, mockSuperUserCache);
+    }
+
+    @Test
+    void testInvalidateIdentityToRoleCacheWithKeys(VertxTestContext context)
+    {
+        verifyInvalidateCache(context, IdentityToRoleCache.NAME,
+                              Arrays.asList("key1", "key2"),
+                              OK,
+                              mockIdentityToRoleCache, 
mockRoleAuthorizationsCache, mockSuperUserCache);
+    }
+
+    // RoleAuthorizationsCache tests
+    @Test
+    void testInvalidateRoleAuthorizationsCache(VertxTestContext context)
+    {
+        verifyInvalidateCache(context, RoleAuthorizationsCache.NAME, null, OK, 
mockRoleAuthorizationsCache, mockIdentityToRoleCache, mockSuperUserCache);
+    }
+
+    @Test
+    void testInvalidateRoleAuthorizationsCacheAlternativeName(VertxTestContext 
context)
+    {
+        verifyInvalidateCache(context, "RoleAuthorizationsCache", null, OK, 
mockRoleAuthorizationsCache, mockIdentityToRoleCache, mockSuperUserCache);
+    }
+
+    @Test
+    void testInvalidateRoleAuthorizationsCacheWithKeys(VertxTestContext 
context)
+    {
+        verifyInvalidateCache(context, RoleAuthorizationsCache.NAME,
+                             Arrays.asList("key1"),
+                             BAD_REQUEST,
+                             null, mockIdentityToRoleCache, 
mockRoleAuthorizationsCache, mockSuperUserCache);
+    }
+
+    // SuperUserCache tests
+    @Test
+    void testInvalidateSuperUserCache(VertxTestContext context)
+    {
+        verifyInvalidateCache(context, SuperUserCache.NAME, null, OK, 
mockSuperUserCache, mockIdentityToRoleCache, mockRoleAuthorizationsCache);
+    }
+
+    @Test
+    void testInvalidateSuperUserCacheAlternativeName(VertxTestContext context)
+    {
+        verifyInvalidateCache(context, "SuperUserCache", null, OK, 
mockSuperUserCache, mockIdentityToRoleCache, mockRoleAuthorizationsCache);
+    }
+
+    @Test
+    void testInvalidateSuperUserCacheWithKeys(VertxTestContext context)
+    {
+        verifyInvalidateCache(context, SuperUserCache.NAME,
+                             Arrays.asList("user1", "user2", "user3"),
+                             OK,
+                             mockSuperUserCache, mockIdentityToRoleCache, 
mockRoleAuthorizationsCache);
+    }
+
+    // EndpointAuthorizationCache tests
+    @Test
+    void testInvalidateEndpointAuthorizationCache(VertxTestContext context)
+    {
+        String testRoute = String.format(TEST_ROUTE_TEMPLATE, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME);
+
+        WebClient client = WebClient.create(vertx);
+        client.delete(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
+
+                  // Note: We can't easily verify the mock AsyncCache was 
invalidated in unit tests
+                  // because the handler calls .synchronous().invalidateAll() 
internally.
+                  // End-to-end behavior is tested in integration tests.
+                  verifyNoInteractions(mockIdentityToRoleCache);
+                  verifyNoInteractions(mockRoleAuthorizationsCache);
+                  verifyNoInteractions(mockSuperUserCache);
+
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void 
testInvalidateEndpointAuthorizationCacheAlternativeName(VertxTestContext 
context)
+    {
+        String testRoute = String.format(TEST_ROUTE_TEMPLATE, 
"EndpointAuthorizationCache");
+
+        WebClient client = WebClient.create(vertx);
+        client.delete(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(response -> {
+                  assertThat(response.statusCode()).isEqualTo(OK.code());
+                  
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
+
+                  // Verify alternative name works (case-insensitive)
+                  verifyNoInteractions(mockIdentityToRoleCache);
+                  verifyNoInteractions(mockRoleAuthorizationsCache);
+                  verifyNoInteractions(mockSuperUserCache);
+
+                  context.completeNow();
+              }));
+    }
+
+    @Test
+    void testInvalidateEndpointAuthorizationCacheWithKeys(VertxTestContext 
context)
+    {
+        verifyInvalidateCache(context, 
CacheFactory.ENDPOINT_AUTHORIZATION_CACHE_NAME,
+                             Arrays.asList("key1"),
+                             BAD_REQUEST,
+                             null, mockIdentityToRoleCache, 
mockRoleAuthorizationsCache, mockSuperUserCache);
+    }
+
+    // Error case tests
+    @Test
+    void testInvalidateUnknownCache(VertxTestContext context)
+    {
+        String testRoute = String.format(TEST_ROUTE_TEMPLATE, "unknown_cache");
+
+        WebClient client = WebClient.create(vertx);
+        client.delete(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_NOT_FOUND)
+              .send(context.succeeding(response -> {
+                  
assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code());
+                  LOGGER.info("Unknown Cache Response: {}", 
response.bodyAsString());
+
+                  // Verify no caches were invalidated
+                  verifyNoInteractions(mockIdentityToRoleCache);
+                  verifyNoInteractions(mockRoleAuthorizationsCache);
+                  verifyNoInteractions(mockSuperUserCache);
+
+                  context.completeNow();
+              }));
+    }
+
+    /**
+     * Helper method to test cache invalidation
+     * Verifies that the specified cache is invalidated (or returns the 
expected error) and other caches are not touched.
+     *
+     * @param context the test context
+     * @param cacheName the name of the cache to invalidate (sent in the HTTP 
request)
+     * @param keys the specific keys to invalidate, or null to invalidate all 
keys
+     * @param expectedStatus the expected HTTP response status (OK, 
BAD_REQUEST, NOT_FOUND, etc.)
+     * @param cacheToInvalidate the mock cache that should be invalidated 
(null if expecting an error)
+     * @param cachesToNotInteract other mock caches that should not be touched
+     */
+    @SuppressWarnings("rawtypes")
+    private void verifyInvalidateCache(VertxTestContext context, String 
cacheName, List<String> keys,
+                                       HttpResponseStatus expectedStatus,
+                                       AuthCache cacheToInvalidate, 
AuthCache... cachesToNotInteract)
+    {
+        String testRoute = String.format(TEST_ROUTE_TEMPLATE, cacheName);
+
+        // Append query parameters if keys are provided
+        if (keys != null && !keys.isEmpty())
+        {
+            StringBuilder queryParams = new StringBuilder("?");
+            for (int i = 0; i < keys.size(); i++)
+            {
+                if (i > 0) queryParams.append("&");
+                queryParams.append("keys=").append(keys.get(i));
+            }
+            testRoute = testRoute + queryParams;
+        }
+
+        WebClient client = WebClient.create(vertx);
+        ResponsePredicate predicate = expectedStatus == OK
+                                      ? ResponsePredicate.SC_OK
+                                      : (expectedStatus == NOT_FOUND
+                                         ? ResponsePredicate.SC_NOT_FOUND
+                                         : ResponsePredicate.SC_BAD_REQUEST);
+
+        client.delete(server.actualPort(), "127.0.0.1", testRoute)
+              .expect(predicate)
+              .send(context.succeeding(response -> {
+                  
assertThat(response.statusCode()).isEqualTo(expectedStatus.code());
+
+                  if (expectedStatus == OK)
+                  {
+                      
assertThat(response.bodyAsJsonObject().getString("status")).isEqualTo("OK");
+
+                      // Verify the correct cache was invalidated
+                      if (keys == null || keys.isEmpty())
+                      {
+                          verify(cacheToInvalidate).invalidateAll();
+                      }
+                      else
+                      {
+                          verify(cacheToInvalidate).invalidateAll(keys);
+                      }
+                  }
+
+                  // Verify other caches weren't touched
+                  for (AuthCache cache : cachesToNotInteract)
+                  {
+                      verifyNoInteractions(cache);
+                  }
+
+                  context.completeNow();
+              }));
+    }
+
+    /**
+     * Test Guice module for InvalidateCache handler tests
+     */
+    class InvalidateCacheTestModule extends AbstractModule
+    {
+        @Provides
+        @Singleton
+        public IdentityToRoleCache identityToRoleCache()
+        {
+            return mockIdentityToRoleCache;
+        }
+
+        @Provides
+        @Singleton
+        public RoleAuthorizationsCache roleAuthorizationsCache()
+        {
+            return mockRoleAuthorizationsCache;
+        }
+
+        @Provides
+        @Singleton
+        public SuperUserCache superUserCache()
+        {
+            return mockSuperUserCache;
+        }
+
+        @Provides
+        @Singleton
+        public CacheFactory cacheFactory()
+        {
+            CacheFactory mockCacheFactory = mock(CacheFactory.class);
+            
when(mockCacheFactory.endpointAuthorizationCache()).thenReturn(mockEndpointAuthorizationCache);
+            return mockCacheFactory;
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to