This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb68141861 CASSANDRA-19669: Audit Log entries are missing identity for 
mTLS connections
bb68141861 is described below

commit bb68141861e77623f0d0b13f72846651a71c1017
Author: Francisco Guerrero <frank.guerr...@gmail.com>
AuthorDate: Wed May 29 13:38:02 2024 -0700

    CASSANDRA-19669: Audit Log entries are missing identity for mTLS connections
    
    Patch by Francisco Guerrero; Reviewed by Bernardo Botella, Andrew Tolbert, 
Dinesh Joshi for CASSANDRA-19669
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/audit/AuditLogEntry.java  |  74 +++--
 .../apache/cassandra/audit/AuditLogManager.java    |   4 +-
 .../cassandra/auth/MutualTlsAuthenticator.java     |   2 +-
 .../apache/cassandra/config/EncryptionOptions.java |   1 +
 .../apache/cassandra/transport/PreV5Handlers.java  |  42 ++-
 .../cassandra/distributed/impl/Instance.java       |   3 +
 .../test/auth/AuthAuditLoggingTest.java            | 344 +++++++++++++++++++++
 .../MutualTlsCertificateValidityPeriodTest.java    | 128 +-------
 .../cassandra/audit/InMemoryAuditLogger.java       |   8 +
 .../apache/cassandra/transport/TlsTestUtils.java   | 110 +++++++
 .../cassandra/utils/tls/CertificateBuilder.java    |   6 +
 12 files changed, 577 insertions(+), 146 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6e2493b192..69289806fa 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Audit Log entries are missing identity for mTLS connections 
(CASSANDRA-19669)
  * Add support for the BETWEEN operator in WHERE clauses (CASSANDRA-19604)
  * Replace Stream iteration with for-loop for 
SimpleRestriction::bindAndGetClusteringElements (CASSANDRA-19679)
  * Consolidate logging on trace level (CASSANDRA-19632)
diff --git a/src/java/org/apache/cassandra/audit/AuditLogEntry.java 
b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
index 02db076734..3fe53816b3 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogEntry.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogEntry.java
@@ -19,9 +19,11 @@ package org.apache.cassandra.audit;
 
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.UUID;
 import javax.annotation.Nullable;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.auth.AuthenticatedUser;
@@ -47,39 +49,37 @@ public class AuditLogEntry
     private final String operation;
     private final QueryOptions options;
     private final QueryState state;
+    private final Map<String, Object> metadata;
 
-    private AuditLogEntry(AuditLogEntryType type,
-                          InetAddressAndPort source,
-                          String user,
-                          long timestamp,
-                          UUID batch,
-                          String keyspace,
-                          String scope,
-                          String operation,
-                          QueryOptions options,
-                          QueryState state)
+    private AuditLogEntry(Builder builder)
     {
-        this.type = type;
-        this.source = source;
-        this.user = user;
-        this.timestamp = timestamp;
-        this.batch = batch;
-        this.keyspace = keyspace;
-        this.scope = scope;
-        this.operation = operation;
-        this.options = options;
-        this.state = state;
+        this.type = builder.type;
+        this.source = builder.source;
+        this.user = builder.user;
+        this.timestamp = builder.timestamp;
+        this.batch = builder.batch;
+        this.keyspace = builder.keyspace;
+        this.scope = builder.scope;
+        this.operation = builder.operation;
+        this.options = builder.options;
+        this.state = builder.state;
+        this.metadata = builder.metadata;
     }
 
-    String getLogString()
+    @VisibleForTesting
+    public String getLogString()
     {
         StringBuilder builder = new StringBuilder(100);
         builder.append("user:").append(user)
-               .append("|host:").append(host)
-               .append("|source:").append(source.getAddress());
-        if (source.getPort() > 0)
+               .append("|host:").append(host);
+
+        if (source != null)
         {
-            builder.append("|port:").append(source.getPort());
+            builder.append("|source:").append(source.getAddress());
+            if (source.getPort() > 0)
+            {
+                builder.append("|port:").append(source.getPort());
+            }
         }
 
         builder.append("|timestamp:").append(timestamp)
@@ -102,6 +102,10 @@ public class AuditLogEntry
         {
             builder.append("|operation:").append(operation);
         }
+        if (metadata != null && !metadata.isEmpty())
+        {
+            metadata.forEach((key, value) -> 
builder.append('|').append(key).append(':').append(value));
+        }
         return builder.toString();
     }
 
@@ -189,6 +193,7 @@ public class AuditLogEntry
         private String operation;
         private QueryOptions options;
         private QueryState state;
+        private Map<String, Object> metadata;
 
         public Builder(QueryState queryState)
         {
@@ -204,9 +209,15 @@ public class AuditLogEntry
                     source = 
InetAddressAndPort.getByAddressOverrideDefaults(addr.getAddress(), 
addr.getPort());
                 }
 
-                if (clientState.getUser() != null)
+                AuthenticatedUser authenticatedUser = clientState.getUser();
+                if (authenticatedUser != null)
                 {
-                    user = clientState.getUser().getName();
+                    user = authenticatedUser.getName();
+
+                    if (authenticatedUser.getMetadata() != null)
+                    {
+                        metadata = Map.copyOf(authenticatedUser.getMetadata());
+                    }
                 }
                 keyspace = clientState.getRawKeyspace();
             }
@@ -231,6 +242,7 @@ public class AuditLogEntry
             operation = entry.operation;
             options = entry.options;
             state = entry.state;
+            metadata = entry.metadata != null ? Map.copyOf(entry.metadata) : 
null;
         }
 
         public Builder setType(AuditLogEntryType type)
@@ -312,10 +324,16 @@ public class AuditLogEntry
             return this;
         }
 
+        public Builder setMetadata(Map<String, Object> metadata)
+        {
+            this.metadata = metadata != null ? Map.copyOf(metadata) : null;
+            return this;
+        }
+
         public AuditLogEntry build()
         {
             timestamp = timestamp > 0 ? timestamp : currentTimeMillis();
-            return new AuditLogEntry(type, source, user, timestamp, batch, 
keyspace, scope, operation, options, state);
+            return new AuditLogEntry(this);
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/audit/AuditLogManager.java 
b/src/java/org/apache/cassandra/audit/AuditLogManager.java
index ed104602d6..748988aed6 100644
--- a/src/java/org/apache/cassandra/audit/AuditLogManager.java
+++ b/src/java/org/apache/cassandra/audit/AuditLogManager.java
@@ -23,13 +23,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
-
 import javax.annotation.Nullable;
 import javax.management.openmbean.CompositeData;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,7 +77,7 @@ public class AuditLogManager implements QueryEvents.Listener, 
AuthEvents.Listene
         }
         else
         {
-            logger.debug("Audit logging is disabled.");
+            logger.info("Audit logging is disabled.");
             auditLogger = new NoOpAuditLogger(Collections.emptyMap());
         }
 
diff --git a/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java 
b/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java
index 9044f6f8bb..14d6ef72c6 100644
--- a/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/MutualTlsAuthenticator.java
@@ -239,7 +239,7 @@ public class MutualTlsAuthenticator implements 
IAuthenticator
             // Report metrics on client certificate expiration
             
MutualTlsMetrics.instance.clientCertificateExpirationDays.update(daysToCertificateExpiration);
 
-            return new AuthenticatedUser(role, MTLS, 
Collections.singletonMap(METADATA_IDENTITY_KEY, identity));
+            return new AuthenticatedUser(role, MTLS, 
Map.of(METADATA_IDENTITY_KEY, identity));
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/config/EncryptionOptions.java 
b/src/java/org/apache/cassandra/config/EncryptionOptions.java
index 6cbdfb3ba3..6471eea9e3 100644
--- a/src/java/org/apache/cassandra/config/EncryptionOptions.java
+++ b/src/java/org/apache/cassandra/config/EncryptionOptions.java
@@ -332,6 +332,7 @@ public class EncryptionOptions
         putSslContextFactoryParameter(sslContextFactoryParameters, 
ConfigKey.ENABLED, this.enabled);
         putSslContextFactoryParameter(sslContextFactoryParameters, 
ConfigKey.OPTIONAL, this.optional);
         putSslContextFactoryParameter(sslContextFactoryParameters, 
ConfigKey.MAX_CERTIFICATE_VALIDITY_PERIOD, 
this.max_certificate_validity_period);
+        putSslContextFactoryParameter(sslContextFactoryParameters, 
ConfigKey.CERTIFICATE_VALIDITY_WARN_THRESHOLD, 
this.certificate_validity_warn_threshold);
     }
 
     private void initializeSslContextFactory()
diff --git a/src/java/org/apache/cassandra/transport/PreV5Handlers.java 
b/src/java/org/apache/cassandra/transport/PreV5Handlers.java
index 38eaaf71f3..fd39edabc1 100644
--- a/src/java/org/apache/cassandra/transport/PreV5Handlers.java
+++ b/src/java/org/apache/cassandra/transport/PreV5Handlers.java
@@ -18,9 +18,13 @@
 
 package org.apache.cassandra.transport;
 
+import java.net.SocketAddress;
+import java.security.cert.CertificateException;
 import java.util.List;
+import javax.net.ssl.SSLException;
 
 import com.google.common.base.Predicate;
+import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,10 +38,14 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.MessageToMessageDecoder;
 import io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.cassandra.auth.AuthEvents;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.net.ResourceLimits;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.transport.ClientResourceLimits.Overload;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.utils.JVMStabilityInspector;
@@ -334,13 +342,22 @@ public class PreV5Handlers
                 if (isFatal(cause))
                     future.addListener((ChannelFutureListener) f -> 
ctx.close());
             }
-            
-            if 
(DatabaseDescriptor.getClientErrorReportingExclusions().contains(ctx.channel().remoteAddress()))
+
+            SocketAddress remoteAddress = ctx.channel().remoteAddress();
+            AuthenticationException authenticationException = 
maybeExtractAndWrapAuthenticationException(cause);
+            if (authenticationException != null)
+            {
+                QueryState queryState = new 
QueryState(ClientState.forExternalCalls(remoteAddress));
+                AuthEvents.instance.notifyAuthFailure(queryState, 
authenticationException);
+            }
+
+            if (remoteAddress != null && 
DatabaseDescriptor.getClientErrorReportingExclusions().contains(remoteAddress))
             {
                 // Sometimes it is desirable to ignore exceptions from 
specific IPs; such as when security scans are
                 // running.  To avoid polluting logs and metrics, metrics are 
not updated when the IP is in the exclude
                 // list.
-                logger.debug("Excluding client exception for {}; address 
contained in client_error_reporting_exclusions", ctx.channel().remoteAddress(), 
cause);
+                logger.debug("Excluding client exception for {}; address 
contained in client_error_reporting_exclusions",
+                             remoteAddress, cause);
                 return;
             }
             ExceptionHandlers.logClientNetworkingExceptions(cause);
@@ -351,6 +368,25 @@ public class PreV5Handlers
         {
             return cause instanceof ProtocolException; // this matches 
previous versions which didn't annotate exceptions as fatal or not
         }
+
+        private static AuthenticationException 
maybeExtractAndWrapAuthenticationException(Throwable cause)
+        {
+            CertificateException certificateException = 
ExceptionUtils.throwableOfType(cause, CertificateException.class);
+
+            if (certificateException != null)
+            {
+                return new 
AuthenticationException(certificateException.getMessage(), cause);
+            }
+
+            SSLException sslException = ExceptionUtils.throwableOfType(cause, 
SSLException.class);
+
+            if (sslException != null)
+            {
+                return new AuthenticationException(sslException.getMessage(), 
cause);
+            }
+
+            return null;
+        }
     }
 
     private static ProtocolVersion getConnectionVersion(ChannelHandlerContext 
ctx)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index ed0497180d..296f95b9b4 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -52,6 +52,7 @@ import org.slf4j.LoggerFactory;
 
 import io.netty.util.concurrent.GlobalEventExecutor;
 import org.apache.cassandra.Util;
+import org.apache.cassandra.audit.AuditLogManager;
 import org.apache.cassandra.auth.AuthCache;
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
@@ -826,6 +827,8 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         CassandraDaemon.getInstanceForTesting().completeSetup();
         CassandraDaemon.enableAutoCompaction(Schema.instance.getKeyspaces());
 
+        AuditLogManager.instance.initialize();
+
         if (config.has(NATIVE_PROTOCOL))
         {
             
CassandraDaemon.getInstanceForTesting().initializeClientTransports();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/auth/AuthAuditLoggingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/auth/AuthAuditLoggingTest.java
new file mode 100644
index 0000000000..6c6ed6ffe1
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/auth/AuthAuditLoggingTest.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.auth;
+
+import java.net.InetAddress;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.Collections;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.audit.AuditLogEntry;
+import org.apache.cassandra.audit.AuditLogManager;
+import org.apache.cassandra.audit.InMemoryAuditLogger;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.JavaDriverUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.utils.tls.CertificateBuilder;
+import org.apache.cassandra.utils.tls.CertificateBundle;
+
+import static org.apache.cassandra.audit.AuditLogEntryType.LOGIN_ERROR;
+import static org.apache.cassandra.audit.AuditLogEntryType.LOGIN_SUCCESS;
+import static 
org.apache.cassandra.auth.CassandraRoleManager.DEFAULT_SUPERUSER_NAME;
+import static 
org.apache.cassandra.auth.CassandraRoleManager.DEFAULT_SUPERUSER_PASSWORD;
+import static 
org.apache.cassandra.transport.TlsTestUtils.SERVER_KEYSTORE_PASSWORD;
+import static 
org.apache.cassandra.transport.TlsTestUtils.SERVER_TRUSTSTORE_PASSWORD;
+import static org.apache.cassandra.transport.TlsTestUtils.configureIdentity;
+import static 
org.apache.cassandra.transport.TlsTestUtils.generateClientCertificate;
+import static 
org.apache.cassandra.transport.TlsTestUtils.generateSelfSignedCertificate;
+import static org.apache.cassandra.transport.TlsTestUtils.getSSLOptions;
+import static 
org.apache.cassandra.transport.TlsTestUtils.withAuthenticatedSession;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+/**
+ * Tests authentication audit logging events
+ */
+public class AuthAuditLoggingTest extends TestBaseImpl
+{
+    public static final String NON_SPIFFE_IDENTITY = 
"nonspiffe://test.cassandra.apache.org/dTest/mtls";
+    public static final String NON_MAPPED_IDENTITY = 
"spiffe://test.cassandra.apache.org/dTest/notMapped";
+    private static ICluster<IInvokableInstance> CLUSTER;
+
+    @ClassRule
+    public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+    static CertificateBundle CA;
+    static Path truststorePath;
+    static SSLOptions sslOptions;
+
+    @BeforeClass
+    public static void setupClass() throws Exception
+    {
+        Cluster.Builder builder = 
Cluster.build(1).withDynamicPortAllocation(true);
+
+        CA = new CertificateBuilder().subject("CN=Apache Cassandra Root CA, 
OU=Certification Authority, O=Unknown, C=Unknown")
+                                     .alias("fakerootca")
+                                     .isCertificateAuthority(true)
+                                     .buildSelfSigned();
+
+        truststorePath = CA.toTempKeyStorePath(tempFolder.getRoot().toPath(),
+                                               
SERVER_TRUSTSTORE_PASSWORD.toCharArray(),
+                                               
SERVER_TRUSTSTORE_PASSWORD.toCharArray());
+
+
+        CertificateBundle keystore = new 
CertificateBuilder().subject("CN=Apache Cassandra, OU=ssl_test, O=Unknown, 
L=Unknown, ST=Unknown, C=Unknown")
+                                                             
.addSanDnsName(InetAddress.getLocalHost().getCanonicalHostName())
+                                                             
.addSanDnsName(InetAddress.getLocalHost().getHostName())
+                                                             
.buildIssuedBy(CA);
+
+        Path serverKeystorePath = 
keystore.toTempKeyStorePath(tempFolder.getRoot().toPath(),
+                                                              
SERVER_KEYSTORE_PASSWORD.toCharArray(),
+                                                              
SERVER_KEYSTORE_PASSWORD.toCharArray());
+
+        builder.withConfig(c -> c.set("authenticator.class_name", 
"org.apache.cassandra.auth.MutualTlsWithPasswordFallbackAuthenticator")
+                                 .set("authenticator.parameters", 
Collections.singletonMap("validator_class_name", 
"org.apache.cassandra.auth.SpiffeCertificateValidator"))
+                                 .set("role_manager", "CassandraRoleManager")
+                                 .set("authorizer", "CassandraAuthorizer")
+                                 .set("client_encryption_options.enabled", 
"true")
+                                 
.set("client_encryption_options.require_client_auth", "optional")
+                                 .set("client_encryption_options.keystore", 
serverKeystorePath.toString())
+                                 
.set("client_encryption_options.keystore_password", SERVER_KEYSTORE_PASSWORD)
+                                 .set("client_encryption_options.truststore", 
truststorePath.toString())
+                                 
.set("client_encryption_options.truststore_password", 
SERVER_TRUSTSTORE_PASSWORD)
+                                 
.set("client_encryption_options.require_endpoint_verification", "false")
+                                 .set("audit_logging_options.enabled", "true")
+                                 
.set("audit_logging_options.logger.class_name", "InMemoryAuditLogger")
+                                 
.set("audit_logging_options.included_categories", "AUTH")
+                                 .with(Feature.NATIVE_PROTOCOL, 
Feature.GOSSIP, Feature.NETWORK));
+        CLUSTER = builder.start();
+
+        sslOptions = getSSLOptions(null, truststorePath);
+        configureIdentity(CLUSTER, sslOptions);
+    }
+
+    @AfterClass
+    public static void teardown() throws Exception
+    {
+        if (CLUSTER != null)
+            CLUSTER.close();
+    }
+
+    @Before
+    public void beforeEach()
+    {
+        // drain the audit log entries, so we can start fresh for each test
+        CLUSTER.get(1).runOnInstance(() -> ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue().clear());
+        maybeRestoreMutualTlsWithPasswordFallbackAuthenticator();
+    }
+
+    @Test
+    public void testPasswordAuthenticationSuccessfulAuth()
+    {
+        CharSequence expectedLogStringRegex = 
"^user:cassandra\\|host:.*/127.0.0.1:\\d+\\|source:/127.0.0.1" +
+                                              
"\\|port:\\d+\\|timestamp:\\d+\\|type:LOGIN_SUCCESS\\|category:AUTH" +
+                                              "\\|operation:LOGIN SUCCESSFUL$";
+
+        withAuthenticatedSession(CLUSTER.get(1), DEFAULT_SUPERUSER_NAME, 
DEFAULT_SUPERUSER_PASSWORD, session -> {
+            session.execute("DESCRIBE KEYSPACES");
+
+            CLUSTER.get(1).runOnInstance(() -> {
+                // We should have events recorded for the control connection 
and the session connection
+                AuditLogEntry entry1 = ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue().poll();
+                assertThat(entry1).isNotNull();
+                
assertThat(entry1.getHost().toString(false)).matches(".*/127.0.0.1");
+                
assertThat(entry1.getSource().toString(false)).isEqualTo("/127.0.0.1");
+                assertThat(entry1.getUser()).isEqualTo("cassandra");
+                assertThat(entry1.getType()).isEqualTo(LOGIN_SUCCESS);
+                
assertThat(entry1.getLogString()).matches(expectedLogStringRegex);
+                AuditLogEntry entry2 = ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue().poll();
+                assertThat(entry2).isNotNull();
+                
assertThat(entry2.getHost().toString(false)).matches(".*/127.0.0.1");
+                
assertThat(entry2.getSource().toString(false)).isEqualTo("/127.0.0.1");
+                assertThat(entry2.getUser()).isEqualTo("cassandra");
+                assertThat(entry2.getType()).isEqualTo(LOGIN_SUCCESS);
+                
assertThat(entry2.getLogString()).matches(expectedLogStringRegex);
+            });
+        }, sslOptions);
+    }
+
+    @Test
+    public void testPasswordAuthenticationFailedAuth()
+    {
+        CharSequence expectedLogStringRegex = 
"^user:null\\|host:/127.0.0.1:\\d+\\|source:/127.0.0.1" +
+                                              
"\\|port:\\d+\\|timestamp:\\d+\\|type:LOGIN_ERROR\\|category:AUTH" +
+                                              "\\|operation:LOGIN FAILURE; 
Provided username cassandra and/or .*$";
+        try
+        {
+            withAuthenticatedSession(CLUSTER.get(1), DEFAULT_SUPERUSER_NAME, 
"bad password", session -> {
+            }, sslOptions);
+            fail("Authentication should fail with a bad password");
+        }
+        catch (com.datastax.driver.core.exceptions.AuthenticationException 
authenticationException)
+        {
+            CLUSTER.get(1).runOnInstance(() -> {
+                Queue<AuditLogEntry> auditLogEntries = ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue();
+                AuditLogEntry entry = auditLogEntries.poll();
+                assertThat(entry).isNotNull();
+                
assertThat(entry.getHost().toString(false)).isEqualTo("/127.0.0.1");
+                
assertThat(entry.getSource().toString(false)).isEqualTo("/127.0.0.1");
+                assertThat(entry.getUser()).isNull();
+                assertThat(entry.getType()).isEqualTo(LOGIN_ERROR);
+                
assertThat(entry.getLogString()).matches(expectedLogStringRegex);
+            });
+        }
+    }
+
+    @Test
+    public void testMutualTlsAuthenticationSuccessfulAuth() throws Exception
+    {
+        Path clientKeystorePath = generateClientCertificate(null, 
tempFolder.getRoot(), CA);
+        CharSequence expectedLogStringRegex = 
"^user:cassandra_ssl_test\\|host:.*/127.0.0.1:\\d+\\|source:/127.0.0.1" +
+                                              
"\\|port:\\d+\\|timestamp:\\d+\\|type:LOGIN_SUCCESS\\|category:AUTH" +
+                                              "\\|operation:LOGIN 
SUCCESSFUL\\|identity:spiffe://test.cassandra.apache.org/unitTest/mtls$";
+
+        try (com.datastax.driver.core.Cluster c = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath, truststorePath)));
+             Session session = c.connect())
+        {
+            session.execute("DESCRIBE KEYSPACES");
+
+            CLUSTER.get(1).runOnInstance(() -> {
+                // We should have events recorded for the control connection 
and the session connection
+                AuditLogEntry entry1 = ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue().poll();
+                assertThat(entry1).isNotNull();
+                
assertThat(entry1.getHost().toString(false)).matches(".*/127.0.0.1");
+                
assertThat(entry1.getSource().toString(false)).isEqualTo("/127.0.0.1");
+                assertThat(entry1.getUser()).isEqualTo("cassandra_ssl_test");
+                assertThat(entry1.getType()).isEqualTo(LOGIN_SUCCESS);
+                
assertThat(entry1.getLogString()).matches(expectedLogStringRegex);
+                AuditLogEntry entry2 = ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue().poll();
+                assertThat(entry2).isNotNull();
+                
assertThat(entry2.getHost().toString(false)).matches(".*/127.0.0.1");
+                
assertThat(entry2.getSource().toString(false)).isEqualTo("/127.0.0.1");
+                assertThat(entry2.getUser()).isEqualTo("cassandra_ssl_test");
+                assertThat(entry2.getType()).isEqualTo(LOGIN_SUCCESS);
+                
assertThat(entry2.getLogString()).matches(expectedLogStringRegex);
+            });
+        }
+    }
+
+    @Test
+    public void testMutualTlsAuthenticationFailedWithUntrustedCertificate() 
throws Exception
+    {
+        configureMutualTlsAuthenticator();
+        // optionally match source/port because in MacOS source/port are null
+        CharSequence expectedLogStringRegex = 
"^user:null\\|host:.*/127.0.0.1:\\d+(\\|source:/127.0.0.1\\|port:\\d+)?" +
+                                              
"\\|timestamp:\\d+\\|type:LOGIN_ERROR\\|category:AUTH" +
+                                              "\\|operation:LOGIN FAILURE; 
Empty client certificate chain.*$";
+        Path untrustedCertPath = generateSelfSignedCertificate(null, 
tempFolder.getRoot());
+
+        testMtlsAuthenticationFailure(untrustedCertPath, "Authentication 
should fail with a self-signed certificate", expectedLogStringRegex);
+    }
+
+    @Test
+    public void testMutualTlsAuthenticationFailedWithExpiredCertificate() 
throws Exception
+    {
+        // optionally match source/port because in MacOS source/port are null
+        CharSequence expectedLogStringRegex = 
"^user:null\\|host:.*/127.0.0.1:\\d+(\\|source:/127.0.0.1\\|port:\\d+)?" +
+                                              
"\\|timestamp:\\d+\\|type:LOGIN_ERROR\\|category:AUTH" +
+                                              "\\|operation:LOGIN FAILURE; 
PKIX path validation failed.*$";
+
+        Path expiredCertPath = generateClientCertificate(b -> 
b.notBefore(Instant.now().minus(30, ChronoUnit.DAYS))
+                                                               
.notAfter(Instant.now().minus(10, ChronoUnit.DAYS)), tempFolder.getRoot(), CA);
+
+        testMtlsAuthenticationFailure(expiredCertPath, "Authentication should 
fail with an expired certificate", expectedLogStringRegex);
+    }
+
+    @Test
+    public void 
testMutualTlsAuthenticationFailedWithInvalidSpiffeCertificate() throws Exception
+    {
+        CharSequence expectedLogStringRegex = 
"^user:null\\|host:.*/127.0.0.1:\\d+\\|source:/127.0.0.1" +
+                                              
"\\|port:\\d+\\|timestamp:\\d+\\|type:LOGIN_ERROR\\|category:AUTH" +
+                                              "\\|operation:LOGIN FAILURE; 
Unable to extract Spiffe from the certificate.*$";
+
+        Path invalidSpiffeCertPath = generateClientCertificate(b -> 
b.clearSubjectAlternativeNames()
+                                                                     
.addSanUriName(NON_SPIFFE_IDENTITY), tempFolder.getRoot(), CA);
+
+        testMtlsAuthenticationFailure(invalidSpiffeCertPath, "Authentication 
should fail with an invalid spiffe certificate", expectedLogStringRegex);
+    }
+
+    @Test
+    public void 
testMutualTlsAuthenticationFailedWithIdentityThatDoesNotMapToARole() throws 
Exception
+    {
+        CharSequence expectedLogStringRegex = 
"^user:null\\|host:.*/127.0.0.1:\\d+\\|source:/127.0.0.1" +
+                                              
"\\|port:\\d+\\|timestamp:\\d+\\|type:LOGIN_ERROR\\|category:AUTH" +
+                                              "\\|operation:LOGIN FAILURE; 
Certificate identity 'spiffe://test.cassandra.apache.org/dTest/notMapped' not 
authorized.*$";
+
+        Path unmappedIdentityCertPath = generateClientCertificate(b -> 
b.clearSubjectAlternativeNames()
+                                                                        
.addSanUriName(NON_MAPPED_IDENTITY), tempFolder.getRoot(), CA);
+
+        testMtlsAuthenticationFailure(unmappedIdentityCertPath, 
"Authentication should fail with a certificate that doesn't map to a role", 
expectedLogStringRegex);
+    }
+
+    static void testMtlsAuthenticationFailure(Path clientKeystorePath, String 
failureMessage, CharSequence expectedLogStringRegex)
+    {
+        try (com.datastax.driver.core.Cluster c = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath, truststorePath)));
+             Session ignored = c.connect())
+        {
+            fail(failureMessage);
+        }
+        catch (com.datastax.driver.core.exceptions.NoHostAvailableException 
exception)
+        {
+            CLUSTER.get(1).runOnInstance(() -> {
+                // We should have events recorded for the control connection 
and the session connection
+                Queue<AuditLogEntry> auditLogEntries = ((InMemoryAuditLogger) 
AuditLogManager.instance.getLogger()).internalQueue();
+                AuditLogEntry entry = maybeGetAuditLogEntry(auditLogEntries);
+                assertThat(entry).isNotNull();
+                
assertThat(entry.getHost().toString(false)).matches(".*/127.0.0.1");
+                assertThat(entry.getUser()).isNull();
+                assertThat(entry.getType()).isEqualTo(LOGIN_ERROR);
+                
assertThat(entry.getLogString()).matches(expectedLogStringRegex);
+            });
+        }
+    }
+
+    static void configureMutualTlsAuthenticator()
+    {
+        IInvokableInstance instance = CLUSTER.get(1);
+        ClusterUtils.stopUnchecked(instance);
+        instance.config().set("authenticator.class_name", 
"org.apache.cassandra.auth.MutualTlsAuthenticator");
+        instance.config().set("client_encryption_options.require_client_auth", 
"required");
+        instance.startup();
+    }
+
+    static void maybeRestoreMutualTlsWithPasswordFallbackAuthenticator()
+    {
+        IInvokableInstance instance = CLUSTER.get(1);
+
+        if 
("org.apache.cassandra.auth.MutualTlsWithPasswordFallbackAuthenticator".equals(instance.config().getString("authenticator.class_name")))
+        {
+            return;
+        }
+
+        ClusterUtils.stopUnchecked(instance);
+        instance.config().set("authenticator.class_name", 
"org.apache.cassandra.auth.MutualTlsWithPasswordFallbackAuthenticator");
+        instance.config().set("client_encryption_options.require_client_auth", 
"optional");
+        instance.startup();
+    }
+
+    static AuditLogEntry maybeGetAuditLogEntry(Queue<AuditLogEntry> 
auditLogEntries)
+    {
+        int attempts = 0;
+        AuditLogEntry entry = auditLogEntries.poll();
+
+        while (entry == null && attempts++ < 10)
+        {
+            // wait until the entry is propagated
+            Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+            entry = auditLogEntries.poll();
+        }
+        return entry;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java
index 4f65b7ad75..b2247b5b4b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/auth/MutualTlsCertificateValidityPeriodTest.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.distributed.test.auth;
 
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.nio.file.Path;
 import java.time.Instant;
 import java.time.temporal.ChronoUnit;
@@ -27,11 +26,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
-import java.util.function.Function;
 import java.util.stream.StreamSupport;
-import javax.net.ssl.SSLException;
 
-import com.google.common.collect.ImmutableMap;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -40,38 +36,29 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import com.codahale.metrics.Histogram;
-import com.datastax.driver.core.PlainTextAuthProvider;
-import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
-import com.datastax.driver.core.SSLOptions;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.SimpleStatement;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.JavaDriverUtils;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
-import org.apache.cassandra.distributed.util.Auth;
-import org.apache.cassandra.distributed.util.SingleHostLoadBalancingPolicy;
 import org.apache.cassandra.metrics.ClearableHistogram;
 import org.apache.cassandra.metrics.MutualTlsMetrics;
-import org.apache.cassandra.security.ISslContextFactory;
-import org.apache.cassandra.transport.SimpleClientSslContextFactory;
 import org.apache.cassandra.utils.tls.CertificateBuilder;
 import org.apache.cassandra.utils.tls.CertificateBundle;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.InstanceOfAssertFactories;
 
-import static 
org.apache.cassandra.auth.CassandraRoleManager.DEFAULT_SUPERUSER_NAME;
-import static 
org.apache.cassandra.auth.CassandraRoleManager.DEFAULT_SUPERUSER_PASSWORD;
-import static 
org.apache.cassandra.transport.TlsTestUtils.CLIENT_TRUSTSTORE_PASSWORD;
+import static 
org.apache.cassandra.transport.TlsTestUtils.CLIENT_SPIFFE_IDENTITY;
 import static 
org.apache.cassandra.transport.TlsTestUtils.SERVER_KEYSTORE_PASSWORD;
 import static 
org.apache.cassandra.transport.TlsTestUtils.SERVER_TRUSTSTORE_PASSWORD;
+import static org.apache.cassandra.transport.TlsTestUtils.configureIdentity;
+import static 
org.apache.cassandra.transport.TlsTestUtils.generateClientCertificate;
+import static org.apache.cassandra.transport.TlsTestUtils.getSSLOptions;
 import static org.assertj.core.api.Assertions.as;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -80,9 +67,7 @@ import static org.assertj.core.api.Assertions.fail;
  */
 public class MutualTlsCertificateValidityPeriodTest extends TestBaseImpl
 {
-    private static final String IDENTITY = 
"spiffe://test.cassandra.apache.org/dTest/mtls";
     private static ICluster<IInvokableInstance> CLUSTER;
-    private static final char[] KEYSTORE_PASSWORD = "cassandra".toCharArray();
 
     @ClassRule
     public static TemporaryFolder tempFolder = new TemporaryFolder();
@@ -130,7 +115,7 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
                                  .with(Feature.NATIVE_PROTOCOL, 
Feature.GOSSIP));
         CLUSTER = builder.start();
 
-        configureIdentity();
+        configureIdentity(CLUSTER, getSSLOptions(null, truststorePath));
     }
 
     @AfterClass
@@ -163,9 +148,9 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
     @Test
     public void testExpiringCertificate() throws Exception
     {
-        Path clientKeystorePath = generateClientCertificate(null);
+        Path clientKeystorePath = generateClientCertificate(null, 
tempFolder.getRoot(), CA);
 
-        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath)));
+        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath, truststorePath)));
 
         testWithDriver(driver, (Session session) -> {
             ResultSet clientView = session.execute(new SimpleStatement("SELECT 
* FROM system_views.clients"));
@@ -181,9 +166,9 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
 
             
Assertions.assertThat(authenticationMetadata).isNotNull().hasSize(1)
                       .containsKey("identity")
-                      .extractingByKey("identity", 
as(InstanceOfAssertFactories.STRING)).isEqualTo(IDENTITY);
+                      .extractingByKey("identity", 
as(InstanceOfAssertFactories.STRING)).isEqualTo(CLIENT_SPIFFE_IDENTITY);
             
Assertions.assertThat(row.getString("authentication_mode")).isEqualTo("MutualTls");
-            Assertions.assertThat(CLUSTER.get(1).logs().grep("Certificate with 
identity '" + IDENTITY + "' will expire").getResult())
+            Assertions.assertThat(CLUSTER.get(1).logs().grep("Certificate with 
identity '" + CLIENT_SPIFFE_IDENTITY + "' will expire").getResult())
                       .isNotEmpty();
             CLUSTER.get(1).runOnInstance(() -> 
Assertions.assertThat(MutualTlsMetrics.instance.clientCertificateExpirationDays.getCount()).isEqualTo(2));
         });
@@ -193,9 +178,9 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
     public void testCertificateReachingMaxValidityPeriod() throws Exception
     {
         Path clientKeystorePath = generateClientCertificate(b -> 
b.notBefore(Instant.now().minus(26, ChronoUnit.DAYS))
-                                                                  
.notAfter(Instant.now().plus(4, ChronoUnit.DAYS).minus(1, ChronoUnit.MINUTES)));
+                                                                  
.notAfter(Instant.now().plus(4, ChronoUnit.DAYS).minus(1, ChronoUnit.MINUTES)), 
tempFolder.getRoot(), CA);
 
-        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath)));
+        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath, truststorePath)));
 
         testWithDriver(driver, (Session session) -> {
             ResultSet clientView = session.execute(new SimpleStatement("SELECT 
* FROM system_views.clients"));
@@ -211,9 +196,9 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
 
             
Assertions.assertThat(authenticationMetadata).isNotNull().hasSize(1)
                       .containsKey("identity")
-                      .extractingByKey("identity", 
as(InstanceOfAssertFactories.STRING)).isEqualTo(IDENTITY);
+                      .extractingByKey("identity", 
as(InstanceOfAssertFactories.STRING)).isEqualTo(CLIENT_SPIFFE_IDENTITY);
             
Assertions.assertThat(row.getString("authentication_mode")).isEqualTo("MutualTls");
-            Assertions.assertThat(CLUSTER.get(1).logs().grep("Certificate with 
identity '" + IDENTITY + "' will expire").getResult())
+            Assertions.assertThat(CLUSTER.get(1).logs().grep("Certificate with 
identity '" + CLIENT_SPIFFE_IDENTITY + "' will expire").getResult())
                       .isNotEmpty();
             CLUSTER.get(1).runOnInstance(() -> 
Assertions.assertThat(MutualTlsMetrics.instance.clientCertificateExpirationDays.getCount()).isGreaterThanOrEqualTo(2));
         });
@@ -222,9 +207,9 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
     @Test
     public void testFailsWhenCertificateExceedsMaxAllowedValidityPeriod() 
throws Exception
     {
-        Path clientKeystorePath = generateClientCertificate(b -> 
b.notAfter(Instant.now().plus(365, ChronoUnit.DAYS)));
+        Path clientKeystorePath = generateClientCertificate(b -> 
b.notAfter(Instant.now().plus(365, ChronoUnit.DAYS)), tempFolder.getRoot(), CA);
 
-        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath)));
+        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath, truststorePath)));
 
         try
         {
@@ -242,9 +227,9 @@ public class MutualTlsCertificateValidityPeriodTest extends 
TestBaseImpl
     public void testFailsWhenCertificateIsExpired() throws Exception
     {
         Path clientKeystorePath = generateClientCertificate(b -> 
b.notBefore(Instant.now().minus(30, ChronoUnit.DAYS))
-                                                                  
.notAfter(Instant.now().minus(10, ChronoUnit.DAYS)));
+                                                                  
.notAfter(Instant.now().minus(10, ChronoUnit.DAYS)), tempFolder.getRoot(), CA);
 
-        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath)));
+        com.datastax.driver.core.Cluster driver = 
JavaDriverUtils.create(CLUSTER, null, b -> 
b.withSSL(getSSLOptions(clientKeystorePath, truststorePath)));
 
         try
         {
@@ -269,83 +254,4 @@ public class MutualTlsCertificateValidityPeriodTest 
extends TestBaseImpl
             }
         }
     }
-
-    public static SSLOptions getSSLOptions(Path keystorePath) throws 
RuntimeException
-    {
-        try
-        {
-            return RemoteEndpointAwareJdkSSLOptions.builder()
-                                                   
.withSSLContext(getClientSslContextFactory(keystorePath)
-                                                                   
.createJSSESslContext(EncryptionOptions.ClientAuth.OPTIONAL))
-                                                   .build();
-        }
-        catch (SSLException e)
-        {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static ISslContextFactory getClientSslContextFactory(Path 
keystorePath)
-    {
-        ImmutableMap.Builder<String, Object> params = ImmutableMap.<String, 
Object>builder()
-                                                                  
.put("truststore", truststorePath.toString())
-                                                                  
.put("truststore_password", CLIENT_TRUSTSTORE_PASSWORD);
-
-        if (keystorePath != null)
-        {
-            params.put("keystore", keystorePath.toString())
-                  .put("keystore_password", "cassandra");
-        }
-
-        return new SimpleClientSslContextFactory(params.build());
-    }
-
-    private static void configureIdentity()
-    {
-        withAuthenticatedSession(CLUSTER.get(1), DEFAULT_SUPERUSER_NAME, 
DEFAULT_SUPERUSER_PASSWORD, session -> {
-            session.execute("CREATE ROLE cassandra_ssl_test WITH LOGIN = 
true");
-            session.execute(String.format("ADD IDENTITY '%s' TO ROLE 
'cassandra_ssl_test'", IDENTITY));
-            // GRANT select to cassandra_ssl_test to be able to query the 
system_views.clients virtual table
-            session.execute("GRANT SELECT ON ALL KEYSPACES to 
cassandra_ssl_test");
-        });
-    }
-
-    static void withAuthenticatedSession(IInvokableInstance instance, String 
username, String password, Consumer<Session> consumer)
-    {
-        // wait for existing roles
-        Auth.waitForExistingRoles(instance);
-
-        InetSocketAddress nativeInetSocketAddress = 
ClusterUtils.getNativeInetSocketAddress(instance);
-        InetAddress address = nativeInetSocketAddress.getAddress();
-        LoadBalancingPolicy lbc = new SingleHostLoadBalancingPolicy(address);
-
-        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder()
-                                                                               
            .withLoadBalancingPolicy(lbc)
-                                                                               
            .withSSL(getSSLOptions(null))
-                                                                               
            .withAuthProvider(new PlainTextAuthProvider(username, password))
-                                                                               
            .addContactPoint(address.getHostAddress())
-                                                                               
            .withPort(nativeInetSocketAddress.getPort());
-
-        try (com.datastax.driver.core.Cluster c = builder.build(); Session 
session = c.connect())
-        {
-            consumer.accept(session);
-        }
-    }
-
-    private Path generateClientCertificate(Function<CertificateBuilder, 
CertificateBuilder> customizeCertificate) throws Exception
-    {
-
-        CertificateBuilder builder = new 
CertificateBuilder().subject("CN=Apache Cassandra, OU=ssl_test, O=Unknown, 
L=Unknown, ST=Unknown, C=Unknown")
-                                                             
.notBefore(Instant.now().minus(1, ChronoUnit.DAYS))
-                                                             
.notAfter(Instant.now().plus(1, ChronoUnit.DAYS))
-                                                             
.alias("spiffecert")
-                                                             
.addSanUriName(IDENTITY)
-                                                             
.rsa2048Algorithm();
-        if (customizeCertificate != null)
-        {
-            builder = customizeCertificate.apply(builder);
-        }
-        CertificateBundle ssc = builder.buildIssuedBy(CA);
-        return ssc.toTempKeyStorePath(tempFolder.getRoot().toPath(), 
KEYSTORE_PASSWORD, KEYSTORE_PASSWORD);
-    }
 }
diff --git a/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java 
b/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java
index f9a4038cc0..d6a61dbc0a 100644
--- a/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java
+++ b/test/unit/org/apache/cassandra/audit/InMemoryAuditLogger.java
@@ -21,6 +21,8 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Queue;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class InMemoryAuditLogger implements IAuditLogger
 {
     final Queue<AuditLogEntry> inMemQueue = new LinkedList<>();
@@ -49,4 +51,10 @@ public class InMemoryAuditLogger implements IAuditLogger
         enabled = false;
         inMemQueue.clear();
     }
+
+    @VisibleForTesting
+    public Queue<AuditLogEntry> internalQueue()
+    {
+        return inMemQueue;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/transport/TlsTestUtils.java 
b/test/unit/org/apache/cassandra/transport/TlsTestUtils.java
index 8939937c98..6e6425c77b 100644
--- a/test/unit/org/apache/cassandra/transport/TlsTestUtils.java
+++ b/test/unit/org/apache/cassandra/transport/TlsTestUtils.java
@@ -18,18 +18,39 @@
 
 package org.apache.cassandra.transport;
 
+import java.io.File;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.file.Path;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 import java.util.Collections;
 import java.util.Map;
+import java.util.function.Consumer;
+import java.util.function.Function;
 import javax.net.ssl.SSLException;
 
 import com.google.common.collect.ImmutableMap;
 
+import com.datastax.driver.core.PlainTextAuthProvider;
 import com.datastax.driver.core.RemoteEndpointAwareJdkSSLOptions;
 import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.config.ParameterizedClass;
+import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.util.Auth;
+import org.apache.cassandra.distributed.util.SingleHostLoadBalancingPolicy;
 import org.apache.cassandra.security.ISslContextFactory;
+import org.apache.cassandra.utils.tls.CertificateBuilder;
+import org.apache.cassandra.utils.tls.CertificateBundle;
+
+import static 
org.apache.cassandra.auth.CassandraRoleManager.DEFAULT_SUPERUSER_NAME;
+import static 
org.apache.cassandra.auth.CassandraRoleManager.DEFAULT_SUPERUSER_PASSWORD;
 
 public class TlsTestUtils
 {
@@ -109,4 +130,93 @@ public class TlsTestUtils
                                                .build();
     }
 
+    public static SSLOptions getSSLOptions(Path keystorePath, Path 
truststorePath) throws RuntimeException
+    {
+        try
+        {
+            return RemoteEndpointAwareJdkSSLOptions.builder()
+                                                   
.withSSLContext(getClientSslContextFactory(keystorePath, truststorePath)
+                                                                   
.createJSSESslContext(EncryptionOptions.ClientAuth.OPTIONAL))
+                                                   .build();
+        }
+        catch (SSLException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static ISslContextFactory getClientSslContextFactory(Path 
keystorePath, Path truststorePath)
+    {
+        ImmutableMap.Builder<String, Object> params = ImmutableMap.<String, 
Object>builder()
+                                                                  
.put("truststore", truststorePath.toString())
+                                                                  
.put("truststore_password", CLIENT_TRUSTSTORE_PASSWORD);
+
+        if (keystorePath != null)
+        {
+            params.put("keystore", keystorePath.toString())
+                  .put("keystore_password", "cassandra");
+        }
+
+        return new SimpleClientSslContextFactory(params.build());
+    }
+
+    public static void configureIdentity(ICluster<IInvokableInstance> cluster, 
SSLOptions sslOptions)
+    {
+        withAuthenticatedSession(cluster.get(1), DEFAULT_SUPERUSER_NAME, 
DEFAULT_SUPERUSER_PASSWORD, session -> {
+            session.execute("CREATE ROLE cassandra_ssl_test WITH LOGIN = 
true");
+            session.execute(String.format("ADD IDENTITY '%s' TO ROLE 
'cassandra_ssl_test'", CLIENT_SPIFFE_IDENTITY));
+            // GRANT select to cassandra_ssl_test to be able to query the 
system_views.clients virtual table
+            session.execute("GRANT SELECT ON ALL KEYSPACES to 
cassandra_ssl_test");
+        }, sslOptions);
+    }
+
+    public static Path 
generateSelfSignedCertificate(Function<CertificateBuilder, CertificateBuilder> 
customizeCertificate, File targetDirectory) throws Exception
+    {
+        return generateClientCertificate(customizeCertificate, 
targetDirectory, null);
+    }
+
+    public static Path generateClientCertificate(Function<CertificateBuilder, 
CertificateBuilder> customizeCertificate, File targetDirectory, 
CertificateBundle ca) throws Exception
+    {
+        CertificateBuilder builder = new 
CertificateBuilder().subject("CN=Apache Cassandra, OU=ssl_test, O=Unknown, 
L=Unknown, ST=Unknown, C=Unknown")
+                                                             
.notBefore(Instant.now().minus(1, ChronoUnit.DAYS))
+                                                             
.notAfter(Instant.now().plus(1, ChronoUnit.DAYS))
+                                                             
.alias("spiffecert")
+                                                             
.addSanUriName(CLIENT_SPIFFE_IDENTITY)
+                                                             
.rsa2048Algorithm();
+        if (customizeCertificate != null)
+        {
+            builder = customizeCertificate.apply(builder);
+        }
+        CertificateBundle ssc = ca != null
+                                ? builder.buildIssuedBy(ca)
+                                : builder.buildSelfSigned();
+        return ssc.toTempKeyStorePath(targetDirectory.toPath(), 
SERVER_KEYSTORE_PASSWORD.toCharArray(), SERVER_KEYSTORE_PASSWORD.toCharArray());
+    }
+
+    public static void withAuthenticatedSession(IInvokableInstance instance,
+                                         String username,
+                                         String password,
+                                         Consumer<Session> consumer,
+                                         SSLOptions sslOptions)
+    {
+        // wait for existing roles
+        Auth.waitForExistingRoles(instance);
+
+        InetSocketAddress nativeInetSocketAddress = 
ClusterUtils.getNativeInetSocketAddress(instance);
+        InetAddress address = nativeInetSocketAddress.getAddress();
+        LoadBalancingPolicy lbc = new SingleHostLoadBalancingPolicy(address);
+
+        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder()
+                                                                               
            .withLoadBalancingPolicy(lbc)
+                                                                               
            .withSSL(sslOptions)
+                                                                               
            .withAuthProvider(new PlainTextAuthProvider(username, password))
+                                                                               
            .addContactPoint(address.getHostAddress())
+                                                                               
            .withPort(nativeInetSocketAddress.getPort());
+
+        try (com.datastax.driver.core.Cluster c = builder.build(); Session 
session = c.connect())
+        {
+            consumer.accept(session);
+        }
+    }
+
 }
diff --git a/test/unit/org/apache/cassandra/utils/tls/CertificateBuilder.java 
b/test/unit/org/apache/cassandra/utils/tls/CertificateBuilder.java
index 3f8a785f2d..0f43efa3bb 100644
--- a/test/unit/org/apache/cassandra/utils/tls/CertificateBuilder.java
+++ b/test/unit/org/apache/cassandra/utils/tls/CertificateBuilder.java
@@ -120,6 +120,12 @@ public class CertificateBuilder
         return this;
     }
 
+    public CertificateBuilder clearSubjectAlternativeNames()
+    {
+        subjectAlternativeNames.clear();
+        return this;
+    }
+
     public CertificateBuilder secureRandom(SecureRandom secureRandom)
     {
         this.random = Objects.requireNonNull(secureRandom);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to