[ 
https://issues.apache.org/jira/browse/KAFKA-5462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16663625#comment-16663625
 ] 

ASF GitHub Bot commented on KAFKA-5462:
---------------------------------------

omkreddy closed pull request #5684: KAFKA-5462: Add configuration to build 
custom SSL principal name (KIP-371)
URL: https://github.com/apache/kafka/pull/5684
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index a29d8069b99..e3a8a774a51 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -34,17 +34,28 @@
     public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
     public static final String SASL_ENABLED_MECHANISMS_CONFIG = 
"sasl.enabled.mechanisms";
     public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = 
"sasl.server.callback.handler.class";
+    public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = 
"ssl.principal.mapping.rules";
 
     public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully 
qualified name of a class that implements the " +
             "KafkaPrincipalBuilder interface, which is used to build the 
KafkaPrincipal object used during " +
             "authorization. This config also supports the deprecated 
PrincipalBuilder interface which was previously " +
             "used for client authentication over SSL. If no principal builder 
is defined, the default behavior depends " +
-            "on the security protocol in use. For SSL authentication, the 
principal name will be the distinguished " +
+            "on the security protocol in use. For SSL authentication,  the 
principal will be derived using the" +
+            " rules defined by <code>" + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + 
"</code> applied on the distinguished " +
             "name from the client certificate if one is provided; otherwise, 
if client authentication is not required, " +
             "the principal name will be ANONYMOUS. For SASL authentication, 
the principal will be derived using the " +
             "rules defined by <code>" + 
SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + "</code> if GSSAPI is in use, " 
+
             "and the SASL authentication ID for other mechanisms. For 
PLAINTEXT, the principal will be ANONYMOUS.";
 
+    public static final String SSL_PRINCIPAL_MAPPING_RULES_DOC = "A list of 
rules for mapping from distinguished name" +
+            " from the client certificate to short name. The rules are 
evaluated in order and the first rule that matches" +
+            " a principal name is used to map it to a short name. Any later 
rules in the list are ignored. By default," +
+            " distinguished name of the X.500 certificate will be the 
principal. For more details on the format please" +
+            " see <a href=\"#security_authz\"> security authorization and 
acls</a>. Note that this configuration is ignored" +
+            " if an extension of KafkaPrincipalBuilder is provided by the 
<code>" + PRINCIPAL_BUILDER_CLASS_CONFIG + "</code>" +
+           " configuration.";
+    public static final List<String> DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = 
Collections.singletonList("DEFAULT");
+
     public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A 
list of rules for mapping from principal " +
             "names to short names (typically operating system usernames). The 
rules are evaluated in order and the " +
             "first rule that matches a principal name is used to map it to a 
short name. Any later rules in the list are " +
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 10779b7881f..b3040f3ef73 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
@@ -163,12 +164,13 @@ private static void requireNonNullMode(Mode mode, 
SecurityProtocol securityProto
     public static KafkaPrincipalBuilder createPrincipalBuilder(Map<String, ?> 
configs,
                                                                TransportLayer 
transportLayer,
                                                                Authenticator 
authenticator,
-                                                               
KerberosShortNamer kerberosShortNamer) {
+                                                               
KerberosShortNamer kerberosShortNamer,
+                                                               
SslPrincipalMapper sslPrincipalMapper) {
         Class<?> principalBuilderClass = (Class<?>) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
         final KafkaPrincipalBuilder builder;
 
         if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
-            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer);
+            builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, 
sslPrincipalMapper);
         } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
             builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
         } else if 
(org.apache.kafka.common.security.auth.PrincipalBuilder.class.isAssignableFrom(principalBuilderClass))
 {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index e397f05060f..e5dc778705f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -70,7 +70,7 @@ public void close() {}
 
         private PlaintextAuthenticator(Map<String, ?> configs, 
PlaintextTransportLayer transportLayer, ListenerName listenerName) {
             this.transportLayer = transportLayer;
-            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
+            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, 
null);
             this.listenerName = listenerName;
         }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 86d41d08518..ffa8deb3127 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -18,10 +18,12 @@
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.auth.SslAuthenticationContext;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
@@ -33,6 +35,7 @@
 import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -44,6 +47,7 @@
     private SslFactory sslFactory;
     private Mode mode;
     private Map<String, ?> configs;
+    private SslPrincipalMapper sslPrincipalMapper;
 
     /**
      * Constructs a SSL channel builder. ListenerName is provided only
@@ -58,6 +62,10 @@ public SslChannelBuilder(Mode mode, ListenerName 
listenerName, boolean isInterBr
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
+            @SuppressWarnings("unchecked")
+            List<String> sslPrincipalMappingRules = (List<String>) 
configs.get(BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG);
+            if (sslPrincipalMappingRules != null)
+                sslPrincipalMapper = 
SslPrincipalMapper.fromRules(sslPrincipalMappingRules);
             this.sslFactory = new SslFactory(mode, null, 
isInterBrokerListener);
             this.sslFactory.configure(this.configs);
         } catch (Exception e) {
@@ -89,7 +97,7 @@ public ListenerName listenerName() {
     public KafkaChannel buildChannel(String id, SelectionKey key, int 
maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, 
id, key, peerHost(key));
-            Authenticator authenticator = new SslAuthenticator(configs, 
transportLayer, listenerName);
+            Authenticator authenticator = new SslAuthenticator(configs, 
transportLayer, listenerName, sslPrincipalMapper);
             return new KafkaChannel(id, transportLayer, authenticator, 
maxReceiveSize,
                     memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
@@ -154,9 +162,9 @@ private String peerHost(SelectionKey key) {
         private final KafkaPrincipalBuilder principalBuilder;
         private final ListenerName listenerName;
 
-        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer 
transportLayer, ListenerName listenerName) {
+        private SslAuthenticator(Map<String, ?> configs, SslTransportLayer 
transportLayer, ListenerName listenerName, SslPrincipalMapper 
sslPrincipalMapper) {
             this.transportLayer = transportLayer;
-            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null);
+            this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null, 
sslPrincipalMapper);
             this.listenerName = listenerName;
         }
         /**
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
index 30b0a3e0980..38c303f5266 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
+import javax.security.auth.x500.X500Principal;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Authenticator;
@@ -32,6 +33,8 @@
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLSession;
 import javax.security.sasl.SaslServer;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.security.Principal;
@@ -55,6 +58,7 @@
     private final Authenticator authenticator;
     private final TransportLayer transportLayer;
     private final KerberosShortNamer kerberosShortNamer;
+    private final SslPrincipalMapper sslPrincipalMapper;
 
     /**
      * Construct a new instance which wraps an instance of the older {@link 
org.apache.kafka.common.security.auth.PrincipalBuilder}.
@@ -73,27 +77,31 @@ public static DefaultKafkaPrincipalBuilder 
fromOldPrincipalBuilder(Authenticator
                 requireNonNull(authenticator),
                 requireNonNull(transportLayer),
                 requireNonNull(oldPrincipalBuilder),
-                kerberosShortNamer);
+                kerberosShortNamer,
+                null);
     }
 
     @SuppressWarnings("deprecation")
     private DefaultKafkaPrincipalBuilder(Authenticator authenticator,
                                          TransportLayer transportLayer,
                                          
org.apache.kafka.common.security.auth.PrincipalBuilder oldPrincipalBuilder,
-                                         KerberosShortNamer 
kerberosShortNamer) {
+                                         KerberosShortNamer kerberosShortNamer,
+                                         SslPrincipalMapper 
sslPrincipalMapper) {
         this.authenticator = authenticator;
         this.transportLayer = transportLayer;
         this.oldPrincipalBuilder = oldPrincipalBuilder;
         this.kerberosShortNamer = kerberosShortNamer;
+        this.sslPrincipalMapper =  sslPrincipalMapper;
     }
 
     /**
      * Construct a new instance.
      *
      * @param kerberosShortNamer Kerberos name rewrite rules or null if none 
have been configured
+     * @param sslPrincipalMapper SSL Principal mapper or null if none have 
been configured
      */
-    public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer) 
{
-        this(null, null, null, kerberosShortNamer);
+    public DefaultKafkaPrincipalBuilder(KerberosShortNamer kerberosShortNamer, 
SslPrincipalMapper sslPrincipalMapper) {
+        this(null, null, null, kerberosShortNamer, sslPrincipalMapper);
     }
 
     @Override
@@ -110,7 +118,7 @@ public KafkaPrincipal build(AuthenticationContext context) {
                 return 
convertToKafkaPrincipal(oldPrincipalBuilder.buildPrincipal(transportLayer, 
authenticator));
 
             try {
-                return convertToKafkaPrincipal(sslSession.getPeerPrincipal());
+                return applySslPrincipalMapper(sslSession.getPeerPrincipal());
             } catch (SSLPeerUnverifiedException se) {
                 return KafkaPrincipal.ANONYMOUS;
             }
@@ -136,6 +144,19 @@ private KafkaPrincipal applyKerberosShortNamer(String 
authorizationId) {
         }
     }
 
+    private KafkaPrincipal applySslPrincipalMapper(Principal principal) {
+        try {
+            if (!(principal instanceof X500Principal) || principal == 
KafkaPrincipal.ANONYMOUS) {
+                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
principal.getName());
+            } else {
+                return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
sslPrincipalMapper.getName(principal.getName()));
+            }
+        } catch (IOException e) {
+            throw new KafkaException("Failed to map name for '" + 
principal.getName() +
+                    "' based on SSL principal mapping rules.", e);
+        }
+    }
+
     private KafkaPrincipal convertToKafkaPrincipal(Principal principal) {
         return new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
principal.getName());
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 48a49fe4e94..4db1971b377 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -153,7 +153,7 @@ public SaslServerAuthenticator(Map<String, ?> configs,
 
         // Note that the old principal builder does not support SASL, so we do 
not need to pass the
         // authenticator or the transport layer
-        this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser);
+        this.principalBuilder = 
ChannelBuilders.createPrincipalBuilder(configs, null, null, kerberosNameParser, 
null);
     }
 
     private void createSaslServer(String mechanism) throws IOException {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
new file mode 100644
index 00000000000..7ec4a79b2eb
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslPrincipalMapper.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SslPrincipalMapper {
+
+    private static final Pattern RULE_PARSER = 
Pattern.compile("((DEFAULT)|(RULE:(([^/]*)/([^/]*))/([LU])?))");
+
+    private final List<Rule> rules;
+
+    public SslPrincipalMapper(List<Rule> sslPrincipalMappingRules) {
+        this.rules = sslPrincipalMappingRules;
+    }
+
+    public static SslPrincipalMapper fromRules(List<String> 
sslPrincipalMappingRules) {
+        List<String> rules = sslPrincipalMappingRules == null ? 
Collections.singletonList("DEFAULT") : sslPrincipalMappingRules;
+        return new SslPrincipalMapper(parseRules(rules));
+    }
+
+    private static List<Rule> parseRules(List<String> rules) {
+        List<Rule> result = new ArrayList<>();
+        for (String rule : rules) {
+            Matcher matcher = RULE_PARSER.matcher(rule);
+            if (!matcher.lookingAt()) {
+                throw new IllegalArgumentException("Invalid rule: " + rule);
+            }
+            if (rule.length() != matcher.end()) {
+                throw new IllegalArgumentException("Invalid rule: `" + rule + 
"`, unmatched substring: `" + rule.substring(matcher.end()) + "`");
+            }
+            if (matcher.group(2) != null) {
+                result.add(new Rule());
+            } else {
+                result.add(new Rule(matcher.group(5),
+                                    matcher.group(6),
+                                    "L".equals(matcher.group(7)),
+                                    "U".equals(matcher.group(7))));
+            }
+        }
+        return result;
+    }
+
+    public String getName(String distinguishedName) throws IOException {
+        for (Rule r : rules) {
+            String principalName = r.apply(distinguishedName);
+            if (principalName != null) {
+                return principalName;
+            }
+        }
+        throw new NoMatchingRule("No rules apply to " + distinguishedName + ", 
rules " + rules);
+    }
+
+    @Override
+    public String toString() {
+        return "SslPrincipalMapper(rules = " + rules + ")";
+    }
+
+    public static class NoMatchingRule extends IOException {
+        NoMatchingRule(String msg) {
+            super(msg);
+        }
+    }
+
+    private static class Rule {
+        private static final Pattern BACK_REFERENCE_PATTERN = 
Pattern.compile("\\$(\\d+)");
+
+        private final boolean isDefault;
+        private final Pattern pattern;
+        private final String replacement;
+        private final boolean toLowerCase;
+        private final boolean toUpperCase;
+
+        Rule() {
+            isDefault = true;
+            pattern = null;
+            replacement = null;
+            toLowerCase = false;
+            toUpperCase = false;
+        }
+
+        Rule(String pattern, String replacement, boolean toLowerCase, boolean 
toUpperCase) {
+            isDefault = false;
+            this.pattern = pattern == null ? null : Pattern.compile(pattern);
+            this.replacement = replacement;
+            this.toLowerCase = toLowerCase;
+            this.toUpperCase = toUpperCase;
+        }
+
+        String apply(String distinguishedName) {
+            if (isDefault) {
+                return distinguishedName;
+            }
+
+            String result = null;
+            final Matcher m = pattern.matcher(distinguishedName);
+
+            if (m.matches()) {
+                result = distinguishedName.replaceAll(pattern.pattern(), 
escapeLiteralBackReferences(replacement, m.groupCount()));
+            }
+
+            if (toLowerCase && result != null) {
+                result = result.toLowerCase(Locale.ENGLISH);
+            } else if (toUpperCase & result != null) {
+                result = result.toUpperCase(Locale.ENGLISH);
+            }
+
+            return result;
+        }
+
+        //If we find a back reference that is not valid, then we will treat it 
as a literal string. For example, if we have 3 capturing
+        //groups and the Replacement Value has the value is "$1@$4", then we 
want to treat the $4 as a literal "$4", rather
+        //than attempting to use it as a back reference.
+        //This method was taken from Apache Nifi project : 
org.apache.nifi.authorization.util.IdentityMappingUtil
+        private String escapeLiteralBackReferences(final String unescaped, 
final int numCapturingGroups) {
+            if (numCapturingGroups == 0) {
+                return unescaped;
+            }
+
+            String value = unescaped;
+            final Matcher backRefMatcher = 
BACK_REFERENCE_PATTERN.matcher(value);
+            while (backRefMatcher.find()) {
+                final String backRefNum = backRefMatcher.group(1);
+                if (backRefNum.startsWith("0")) {
+                    continue;
+                }
+                final int originalBackRefIndex = Integer.parseInt(backRefNum);
+                int backRefIndex = originalBackRefIndex;
+
+
+                // if we have a replacement value like $123, and we have less 
than 123 capturing groups, then
+                // we want to truncate the 3 and use capturing group 12; if we 
have less than 12 capturing groups,
+                // then we want to truncate the 2 and use capturing group 1; 
if we don't have a capturing group then
+                // we want to truncate the 1 and get 0.
+                while (backRefIndex > numCapturingGroups && backRefIndex >= 
10) {
+                    backRefIndex /= 10;
+                }
+
+                if (backRefIndex > numCapturingGroups) {
+                    final StringBuilder sb = new StringBuilder(value.length() 
+ 1);
+                    final int groupStart = backRefMatcher.start(1);
+
+                    sb.append(value.substring(0, groupStart - 1));
+                    sb.append("\\");
+                    sb.append(value.substring(groupStart - 1));
+                    value = sb.toString();
+                }
+            }
+
+            return value;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            if (isDefault) {
+                buf.append("DEFAULT");
+            } else {
+                buf.append("RULE:");
+                if (pattern != null) {
+                    buf.append(pattern);
+                }
+                if (replacement != null) {
+                    buf.append("/");
+                    buf.append(replacement);
+                }
+                if (toLowerCase) {
+                    buf.append("/L");
+                } else if (toUpperCase) {
+                    buf.append("/U");
+                }
+            }
+            return buf.toString();
+        }
+
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
index 27daf0face8..630cba1f3e1 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java
@@ -45,7 +45,7 @@ public void testCreateOldPrincipalBuilder() throws Exception {
 
         Map<String, Object> configs = new HashMap<>();
         configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
OldPrincipalBuilder.class);
-        KafkaPrincipalBuilder builder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, 
null);
+        KafkaPrincipalBuilder builder = 
ChannelBuilders.createPrincipalBuilder(configs, transportLayer, authenticator, 
null, null);
 
         // test old principal builder is properly configured and delegated to
         assertTrue(OldPrincipalBuilder.configured);
@@ -60,7 +60,7 @@ public void testCreateOldPrincipalBuilder() throws Exception {
     public void testCreateConfigurableKafkaPrincipalBuilder() {
         Map<String, Object> configs = new HashMap<>();
         configs.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, 
ConfigurableKafkaPrincipalBuilder.class);
-        KafkaPrincipalBuilder builder = 
ChannelBuilders.createPrincipalBuilder(configs, null, null, null);
+        KafkaPrincipalBuilder builder = 
ChannelBuilders.createPrincipalBuilder(configs, null, null, null, null);
         assertTrue(builder instanceof ConfigurableKafkaPrincipalBuilder);
         assertTrue(((ConfigurableKafkaPrincipalBuilder) builder).configured);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
index a05a8502bf1..dd5087a84b3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java
@@ -16,23 +16,28 @@
  */
 package org.apache.kafka.common.security.auth;
 
+import javax.security.auth.x500.X500Principal;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.Authenticator;
 import org.apache.kafka.common.network.TransportLayer;
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import org.junit.Test;
 
 import javax.net.ssl.SSLSession;
 import javax.security.sasl.SaslServer;
 import java.net.InetAddress;
 import java.security.Principal;
+import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
@@ -63,7 +68,7 @@ public void 
testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception
 
     @Test
     public void testReturnAnonymousPrincipalForPlaintext() throws Exception {
-        try (DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null)) {
+        try (DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null, null)) {
             assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(
                     new 
PlaintextAuthenticationContext(InetAddress.getLocalHost(), 
SecurityProtocol.PLAINTEXT.name())));
         }
@@ -100,7 +105,7 @@ public void testUseSessionPeerPrincipalForSsl() throws 
Exception {
 
         when(session.getPeerPrincipal()).thenReturn(new DummyPrincipal("foo"));
 
-        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null, null);
 
         KafkaPrincipal principal = builder.build(
                 new SslAuthenticationContext(session, 
InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
@@ -112,6 +117,60 @@ public void testUseSessionPeerPrincipalForSsl() throws 
Exception {
         verify(session, atLeastOnce()).getPeerPrincipal();
     }
 
+    @Test
+    public void testPrincipalIfSSLPeerIsNotAuthenticated() throws Exception {
+        SSLSession session = mock(SSLSession.class);
+
+        when(session.getPeerPrincipal()).thenReturn(KafkaPrincipal.ANONYMOUS);
+
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null, null);
+
+        KafkaPrincipal principal = builder.build(
+                new SslAuthenticationContext(session, 
InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()));
+        assertEquals(KafkaPrincipal.ANONYMOUS, principal);
+
+        builder.close();
+        verify(session, atLeastOnce()).getPeerPrincipal();
+    }
+
+
+    @Test
+    public void testPrincipalWithSslPrincipalMapper() throws Exception {
+        SSLSession session = mock(SSLSession.class);
+
+        when(session.getPeerPrincipal()).thenReturn(new 
X500Principal("CN=Duke, OU=ServiceUsers, O=Org, C=US"))
+                                        .thenReturn(new 
X500Principal("CN=Duke, OU=SME, O=mycp, L=Fulton, ST=MD, C=US"))
+                                        .thenReturn(new 
X500Principal("CN=duke, OU=JavaSoft, O=Sun Microsystems"))
+                                        .thenReturn(new 
X500Principal("OU=JavaSoft, O=Sun Microsystems, C=US"));
+
+        List<String> rules = Arrays.asList(
+            "RULE:^CN=(.*),OU=ServiceUsers.*$/$1/L",
+            "RULE:^CN=(.*),OU=(.*),O=(.*),L=(.*),ST=(.*),C=(.*)$/$1@$2/L",
+            "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
+            "DEFAULT"
+        );
+
+        SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null, mapper);
+
+        SslAuthenticationContext sslContext = new 
SslAuthenticationContext(session, InetAddress.getLocalHost(), 
SecurityProtocol.PLAINTEXT.name());
+
+        KafkaPrincipal principal = builder.build(sslContext);
+        assertEquals("duke", principal.getName());
+
+        principal = builder.build(sslContext);
+        assertEquals("duke@sme", principal.getName());
+
+        principal = builder.build(sslContext);
+        assertEquals("DUKE", principal.getName());
+
+        principal = builder.build(sslContext);
+        assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", 
principal.getName());
+
+        builder.close();
+        verify(session, times(4)).getPeerPrincipal();
+    }
+
     @Test
     public void testPrincipalBuilderScram() throws Exception {
         SaslServer server = mock(SaslServer.class);
@@ -119,7 +178,7 @@ public void testPrincipalBuilderScram() throws Exception {
         
when(server.getMechanismName()).thenReturn(ScramMechanism.SCRAM_SHA_256.mechanismName());
         when(server.getAuthorizationID()).thenReturn("foo");
 
-        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null);
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(null, null);
 
         KafkaPrincipal principal = builder.build(new 
SaslAuthenticationContext(server,
                 SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), 
SecurityProtocol.SASL_PLAINTEXT.name()));
@@ -141,7 +200,7 @@ public void testPrincipalBuilderGssapi() throws Exception {
         when(server.getAuthorizationID()).thenReturn("foo/h...@realm.com");
         when(kerberosShortNamer.shortName(any())).thenReturn("foo");
 
-        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(kerberosShortNamer);
+        DefaultKafkaPrincipalBuilder builder = new 
DefaultKafkaPrincipalBuilder(kerberosShortNamer, null);
 
         KafkaPrincipal principal = builder.build(new 
SaslAuthenticationContext(server,
                 SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), 
SecurityProtocol.SASL_PLAINTEXT.name()));
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
new file mode 100644
index 00000000000..c647fd00a3d
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslPrincipalMapperTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.ssl;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class SslPrincipalMapperTest {
+
+    @Test
+    public void testValidRules() {
+        testValidRule(Arrays.asList("DEFAULT"));
+        testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
+        testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L", 
"DEFAULT"));
+        
testValidRule(Arrays.asList("RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/"));
+        
testValidRule(Arrays.asList("RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L"));
+        
testValidRule(Arrays.asList("RULE:^cn=(.?),ou=(.?),dc=(.?),dc=(.?)$/$1@$2/U"));
+    }
+
+    private void testValidRule(List<String> rules) {
+        SslPrincipalMapper.fromRules(rules);
+    }
+
+    @Test
+    public void testInvalidRules() {
+        testInvalidRule(Arrays.asList("default"));
+        testInvalidRule(Arrays.asList("DEFAUL"));
+        testInvalidRule(Arrays.asList("DEFAULT/L"));
+        testInvalidRule(Arrays.asList("DEFAULT/U"));
+
+        testInvalidRule(Arrays.asList("RULE:CN=(.*?),OU=ServiceUsers.*/$1"));
+        
testInvalidRule(Arrays.asList("rule:^CN=(.*?),OU=ServiceUsers.*$/$1/"));
+        
testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L/U"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/L"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/U"));
+        testInvalidRule(Arrays.asList("RULE:^CN=(.*?),OU=ServiceUsers.*$/LU"));
+    }
+
+    private void testInvalidRule(List<String> rules) {
+        try {
+            System.out.println(SslPrincipalMapper.fromRules(rules));
+            fail("should have thrown IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+        }
+    }
+
+    @Test
+    public void testSslPrincipalMapper() throws Exception {
+        List<String> rules = Arrays.asList(
+            "RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/L",
+            
"RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L",
+            "RULE:^cn=(.*?),ou=(.*?),dc=(.*?),dc=(.*?)$/$1@$2/U",
+            "RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/U",
+            "DEFAULT"
+        );
+
+        SslPrincipalMapper mapper = SslPrincipalMapper.fromRules(rules);
+
+        assertEquals("duke", 
mapper.getName("CN=Duke,OU=ServiceUsers,O=Org,C=US"));
+        assertEquals("duke@sme", 
mapper.getName("CN=Duke,OU=SME,O=mycp,L=Fulton,ST=MD,C=US"));
+        assertEquals("DUKE@SME", 
mapper.getName("cn=duke,ou=sme,dc=mycp,dc=com"));
+        assertEquals("DUKE", mapper.getName("cN=duke,OU=JavaSoft,O=Sun 
Microsystems"));
+        assertEquals("OU=JavaSoft,O=Sun Microsystems,C=US", 
mapper.getName("OU=JavaSoft,O=Sun Microsystems,C=US"));
+    }
+
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 9edda4ea7f1..9bf41a1e054 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -220,6 +220,7 @@ object Defaults {
   val SslClientAuthRequested = "requested"
   val SslClientAuthNone = "none"
   val SslClientAuth = SslClientAuthNone
+  val SslPrincipalMappingRules = 
BrokerSecurityConfigs.DEFAULT_SSL_PRINCIPAL_MAPPING_RULES
 
   /** ********* Sasl configuration ***********/
   val SaslMechanismInterBrokerProtocol = SaslConfigs.DEFAULT_SASL_MECHANISM
@@ -439,6 +440,7 @@ object KafkaConfig {
   val SslEndpointIdentificationAlgorithmProp = 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
   val SslSecureRandomImplementationProp = 
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG
   val SslClientAuthProp = BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG
+  val SslPrincipalMappingRulesProp = 
BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_CONFIG
 
   /** ********* SASL Configuration ****************/
   val SaslMechanismInterBrokerProtocolProp = 
"sasl.mechanism.inter.broker.protocol"
@@ -760,6 +762,7 @@ object KafkaConfig {
   val SslEndpointIdentificationAlgorithmDoc = 
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC
   val SslSecureRandomImplementationDoc = 
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC
   val SslClientAuthDoc = BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC
+  val SslPrincipalMappingRulesDoc = 
BrokerSecurityConfigs.SSL_PRINCIPAL_MAPPING_RULES_DOC
 
   /** ********* Sasl Configuration ****************/
   val SaslMechanismInterBrokerProtocolDoc = "SASL mechanism used for 
inter-broker communication. Default is GSSAPI."
@@ -998,6 +1001,7 @@ object KafkaConfig {
       .define(SslSecureRandomImplementationProp, STRING, null, LOW, 
SslSecureRandomImplementationDoc)
       .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, 
in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, 
Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc)
       .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, 
SslCipherSuitesDoc)
+      .define(SslPrincipalMappingRulesProp, LIST, 
Defaults.SslPrincipalMappingRules, LOW, SslPrincipalMappingRulesDoc)
 
       /** ********* Sasl Configuration ****************/
       .define(SaslMechanismInterBrokerProtocolProp, STRING, 
Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, 
SaslMechanismInterBrokerProtocolDoc)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index b75c3e7eb24..ba8e54b2eac 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -691,6 +691,7 @@ class KafkaConfigTest {
         case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore 
string
         case KafkaConfig.SslSecureRandomImplementationProp => // ignore string
         case KafkaConfig.SslCipherSuitesProp => // ignore string
+        case KafkaConfig.SslPrincipalMappingRulesProp => // ignore string
 
         //Sasl Configs
         case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore
diff --git a/docs/security.html b/docs/security.html
index b0183343a4b..1cd7c784d09 100644
--- a/docs/security.html
+++ b/docs/security.html
@@ -1020,8 +1020,37 @@ <h3><a id="security_authz" href="#security_authz">7.4 
Authorization and ACLs</a>
     <pre>allow.everyone.if.no.acl.found=true</pre>
     One can also add super users in server.properties like the following (note 
that the delimiter is semicolon since SSL user names may contain comma). 
Default PrincipalType string "User" is case sensitive.
     <pre>super.users=User:Bob;User:Alice</pre>
-    By default, the SSL user name will be of the form 
"CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can 
change that by setting a customized PrincipalBuilder in server.properties like 
the following.
+
+    <h5><a id="security_authz_ssl" href="#security_authz_ssl">Customizing SSL 
User Name</a></h5>
+
+    By default, the SSL user name will be of the form 
"CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown". One can 
change that by setting <code>ssl.principal.mapping.rules</code> to a customized 
rule in server.properties.
+    This config allows a list of rules for mapping X.500 distinguished name to 
short name. The rules are evaluated in order and the first rule that matches a 
distinguished name is used to map it to a short name. Any later rules in the 
list are ignored.
+
+    <br>The format of <code>ssl.principal.mapping.rules</code> is a list where 
each rule starts with "RULE:" and contains an expression as the following 
formats. Default rule will return
+    string representation of the X.500 certificate distinguished name. If the 
distinguished name matches the pattern, then the replacement command will be 
run over the name.
+    This also supports lowercase/uppercase options, to force the translated 
result to be all lower/uppercase case. This is done by adding a "/L" or "/U' to 
the end of the rule.
+
+    <pre>
+        RULE:pattern/replacement/
+        RULE:pattern/replacement/[LU]
+    </pre>
+
+    Example <code>ssl.principal.mapping.rules</code> values are:
+    <pre>
+        RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
+        RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
+        RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
+        DEFAULT
+    </pre>
+
+    Above rules translate distinguished name 
"CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to 
"serviceuser"
+    and "CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown" to 
"adminuser@admin".
+
+    <br>For advanced use cases, one can customize the name by setting a 
customized PrincipalBuilder in server.properties like the following.
     <pre>principal.builder.class=CustomizedPrincipalBuilderClass</pre>
+
+    <h5><a id="security_authz_sasl" href="#security_authz_sasl">Customizing 
SASL User Name</a></h5>
+
     By default, the SASL user name will be the primary part of the Kerberos 
principal. One can change that by setting 
<code>sasl.kerberos.principal.to.local.rules</code> to a customized rule in 
server.properties.
     The format of <code>sasl.kerberos.principal.to.local.rules</code> is a 
list where each rule works in the same way as the auth_to_local in <a 
href="http://web.mit.edu/Kerberos/krb5-latest/doc/admin/conf_files/krb5_conf.html";>Kerberos
 configuration file (krb5.conf)</a>. This also support additional lowercase 
rule, to force the translated result to be all lower case. This is done by 
adding a "/L" to the end of the rule. check below formats for syntax.
     Each rules starts with RULE: and contains an expression as the following 
formats. See the kerberos documentation for more details.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a configuration for users to specify a template for building a custom 
> principal name
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5462
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5462
>             Project: Kafka
>          Issue Type: Bug
>          Components: security
>    Affects Versions: 0.10.2.1
>            Reporter: Koelli Mungee
>            Assignee: Manikumar
>            Priority: Major
>
> Add a configuration for users to specify a template for building a custom 
> principal name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to