[ https://issues.apache.org/jira/browse/KAFKA-6750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501683#comment-16501683 ]
ASF GitHub Bot commented on KAFKA-6750: --------------------------------------- ijuma closed pull request #4829: KAFKA-6750: Add listener name to authentication context (KIP-282) URL: https://github.com/apache/kafka/pull/4829 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 078d844e85f..1a643963fe4 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -131,7 +131,7 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, tokenCache); break; case PLAINTEXT: - channelBuilder = new PlaintextChannelBuilder(); + channelBuilder = new PlaintextChannelBuilder(listenerName); break; default: throw new IllegalArgumentException("Unexpected securityProtocol " + securityProtocol); diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index c0d1059aea9..c7b80cbac63 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -33,8 +33,17 @@ public class PlaintextChannelBuilder implements ChannelBuilder { private static final Logger log = LoggerFactory.getLogger(PlaintextChannelBuilder.class); + private final ListenerName listenerName; private Map<String, ?> configs; + /** + * Constructs a plaintext channel builder. ListenerName is non-null whenever + * it's instantiated in the broker and null otherwise. + */ + public PlaintextChannelBuilder(ListenerName listenerName) { + this.listenerName = listenerName; + } + public void configure(Map<String, ?> configs) throws KafkaException { this.configs = configs; } @@ -43,7 +52,7 @@ public void configure(Map<String, ?> configs) throws KafkaException { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { try { PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key); - PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer); + PlaintextAuthenticator authenticator = new PlaintextAuthenticator(configs, transportLayer, listenerName); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { @@ -58,10 +67,12 @@ public void close() {} private static class PlaintextAuthenticator implements Authenticator { private final PlaintextTransportLayer transportLayer; private final KafkaPrincipalBuilder principalBuilder; + private final ListenerName listenerName; - private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer) { + private PlaintextAuthenticator(Map<String, ?> configs, PlaintextTransportLayer transportLayer, ListenerName listenerName) { this.transportLayer = transportLayer; this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); + this.listenerName = listenerName; } @Override @@ -70,7 +81,10 @@ public void authenticate() throws IOException {} @Override public KafkaPrincipal principal() { InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress(); - return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress)); + // listenerName should only be null in Client mode where principal() should not be called + if (listenerName == null) + throw new IllegalStateException("Unexpected call to principal() when listenerName is null"); + return principalBuilder.build(new PlaintextAuthenticationContext(clientAddress, listenerName.value())); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 941c455365d..b43e4c3a472 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -89,7 +89,7 @@ public ListenerName listenerName() { public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { try { SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key)); - Authenticator authenticator = new SslAuthenticator(configs, transportLayer); + Authenticator authenticator = new SslAuthenticator(configs, transportLayer, listenerName); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE); } catch (Exception e) { @@ -152,10 +152,12 @@ private String peerHost(SelectionKey key) { private static class SslAuthenticator implements Authenticator { private final SslTransportLayer transportLayer; private final KafkaPrincipalBuilder principalBuilder; + private final ListenerName listenerName; - private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer) { + private SslAuthenticator(Map<String, ?> configs, SslTransportLayer transportLayer, ListenerName listenerName) { this.transportLayer = transportLayer; this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs, transportLayer, this, null); + this.listenerName = listenerName; } /** * No-Op for plaintext authenticator @@ -170,7 +172,13 @@ public void authenticate() throws IOException {} @Override public KafkaPrincipal principal() { InetAddress clientAddress = transportLayer.socketChannel().socket().getInetAddress(); - SslAuthenticationContext context = new SslAuthenticationContext(transportLayer.sslSession(), clientAddress); + // listenerName should only be null in Client mode where principal() should not be called + if (listenerName == null) + throw new IllegalStateException("Unexpected call to principal() when listenerName is null"); + SslAuthenticationContext context = new SslAuthenticationContext( + transportLayer.sslSession(), + clientAddress, + listenerName.value()); return principalBuilder.build(context); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java index b8c08477027..a8abea85896 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/AuthenticationContext.java @@ -18,9 +18,11 @@ import java.net.InetAddress; + /** * An object representing contextual information from the authentication session. See - * {@link SaslAuthenticationContext} and {@link SslAuthenticationContext}. + * {@link PlaintextAuthenticationContext}, {@link SaslAuthenticationContext} + * and {@link SslAuthenticationContext}. This class is only used in the broker. */ public interface AuthenticationContext { /** @@ -32,4 +34,9 @@ * Address of the authenticated client */ InetAddress clientAddress(); + + /** + * Name of the listener used for the connection + */ + String listenerName(); } diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java index bc14d36aaf5..a111f21ec13 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PlaintextAuthenticationContext.java @@ -20,9 +20,11 @@ public class PlaintextAuthenticationContext implements AuthenticationContext { private final InetAddress clientAddress; + private final String listenerName; - public PlaintextAuthenticationContext(InetAddress clientAddress) { + public PlaintextAuthenticationContext(InetAddress clientAddress, String listenerName) { this.clientAddress = clientAddress; + this.listenerName = listenerName; } @Override @@ -35,4 +37,9 @@ public InetAddress clientAddress() { return clientAddress; } + @Override + public String listenerName() { + return listenerName; + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java index 89e606377d9..719d041770d 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/SaslAuthenticationContext.java @@ -17,17 +17,20 @@ package org.apache.kafka.common.security.auth; import javax.security.sasl.SaslServer; + import java.net.InetAddress; public class SaslAuthenticationContext implements AuthenticationContext { private final SaslServer server; private final SecurityProtocol securityProtocol; private final InetAddress clientAddress; + private final String listenerName; - public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol, InetAddress clientAddress) { + public SaslAuthenticationContext(SaslServer server, SecurityProtocol securityProtocol, InetAddress clientAddress, String listenerName) { this.server = server; this.securityProtocol = securityProtocol; this.clientAddress = clientAddress; + this.listenerName = listenerName; } public SaslServer server() { @@ -43,4 +46,9 @@ public SecurityProtocol securityProtocol() { public InetAddress clientAddress() { return clientAddress; } + + @Override + public String listenerName() { + return listenerName; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java index d87a8920b47..88819f91cf8 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/SslAuthenticationContext.java @@ -22,10 +22,12 @@ public class SslAuthenticationContext implements AuthenticationContext { private final SSLSession session; private final InetAddress clientAddress; + private final String listenerName; - public SslAuthenticationContext(SSLSession session, InetAddress clientAddress) { + public SslAuthenticationContext(SSLSession session, InetAddress clientAddress, String listenerName) { this.session = session; this.clientAddress = clientAddress; + this.listenerName = listenerName; } public SSLSession session() { @@ -41,4 +43,9 @@ public SecurityProtocol securityProtocol() { public InetAddress clientAddress() { return clientAddress; } + + @Override + public String listenerName() { + return listenerName; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java index 5140afb196d..f2cc621a2b8 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java @@ -282,7 +282,7 @@ public void authenticate() throws IOException { @Override public KafkaPrincipal principal() { - SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress()); + SaslAuthenticationContext context = new SaslAuthenticationContext(saslServer, securityProtocol, clientAddress(), listenerName.value()); KafkaPrincipal principal = principalBuilder.build(context); if (ScramMechanism.isScram(saslMechanism) && Boolean.parseBoolean((String) saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) { principal.tokenAuthenticated(true); diff --git a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java index de210e7496c..c2b89fe6882 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder; import org.apache.kafka.common.security.auth.PlaintextAuthenticationContext; import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.easymock.EasyMock; import org.junit.Test; @@ -38,7 +39,6 @@ public class ChannelBuildersTest { @Test - @SuppressWarnings("deprecation") public void testCreateOldPrincipalBuilder() throws Exception { TransportLayer transportLayer = EasyMock.mock(TransportLayer.class); Authenticator authenticator = EasyMock.mock(Authenticator.class); @@ -51,7 +51,7 @@ public void testCreateOldPrincipalBuilder() throws Exception { assertTrue(OldPrincipalBuilder.configured); // test delegation - KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())); + KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); assertEquals(OldPrincipalBuilder.PRINCIPAL_NAME, principal.getName()); assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); } diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index f1c6a5af142..8ce8d5043be 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -80,7 +80,7 @@ public void setUp() throws Exception { this.server = new EchoServer(SecurityProtocol.PLAINTEXT, configs); this.server.start(); this.time = new MockTime(); - this.channelBuilder = new PlaintextChannelBuilder(); + this.channelBuilder = new PlaintextChannelBuilder(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)); this.channelBuilder.configure(configs); this.metrics = new Metrics(); this.selector = new Selector(5000, this.metrics, time, "MetricGroup", channelBuilder, new LogContext()); @@ -305,7 +305,7 @@ public void testMute() throws Exception { @Test public void registerFailure() throws Exception { - ChannelBuilder channelBuilder = new PlaintextChannelBuilder() { + ChannelBuilder channelBuilder = new PlaintextChannelBuilder(null) { @Override public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException { diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index d6b4bac48d8..de3da8e439c 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -429,8 +429,7 @@ public void testClientAuthenticationRequestedNotProvided() throws Exception { */ @Test public void testInvalidSecureRandomImplementation() throws Exception { - SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false); - try { + try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false)) { sslClientConfigs.put(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); fail("SSL channel configured with invalid SecureRandom implementation"); @@ -444,8 +443,7 @@ public void testInvalidSecureRandomImplementation() throws Exception { */ @Test public void testInvalidTruststorePassword() throws Exception { - SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false); - try { + try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false)) { sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); fail("SSL channel configured with invalid truststore password"); @@ -459,8 +457,7 @@ public void testInvalidTruststorePassword() throws Exception { */ @Test public void testInvalidKeystorePassword() throws Exception { - SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false); - try { + try (SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false)) { sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid"); channelBuilder.configure(sslClientConfigs); fail("SSL channel configured with invalid keystore password"); @@ -767,7 +764,7 @@ public void testCloseSsl() throws Exception { @Test public void testClosePlaintext() throws Exception { - testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder()); + testClose(SecurityProtocol.PLAINTEXT, new PlaintextChannelBuilder(null)); } private void testClose(SecurityProtocol securityProtocol, ChannelBuilder clientChannelBuilder) throws Exception { diff --git a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java index fdf368788c0..8b2e8b26296 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/auth/DefaultKafkaPrincipalBuilderTest.java @@ -53,19 +53,21 @@ public void testUseOldPrincipalBuilderForPlaintextIfProvided() throws Exception DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, null); - KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost())); + KafkaPrincipal principal = builder.build(new PlaintextAuthenticationContext( + InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); } @Test public void testReturnAnonymousPrincipalForPlaintext() throws Exception { DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); - assertEquals(KafkaPrincipal.ANONYMOUS, builder.build(new PlaintextAuthenticationContext(InetAddress.getLocalHost()))); + assertEquals(KafkaPrincipal.ANONYMOUS, builder.build( + new PlaintextAuthenticationContext(InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name()))); + builder.close(); } @Test @@ -86,12 +88,12 @@ public void testUseOldPrincipalBuilderForSslIfProvided() throws Exception { DefaultKafkaPrincipalBuilder builder = DefaultKafkaPrincipalBuilder.fromOldPrincipalBuilder(authenticator, transportLayer, oldPrincipalBuilder, null); - KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost())); + KafkaPrincipal principal = builder.build( + new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); assertEquals("foo", principal.getName()); builder.close(); - verifyAll(); } @@ -105,10 +107,12 @@ public void testUseSessionPeerPrincipalForSsl() throws Exception { DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); - KafkaPrincipal principal = builder.build(new SslAuthenticationContext(session, InetAddress.getLocalHost())); + KafkaPrincipal principal = builder.build( + new SslAuthenticationContext(session, InetAddress.getLocalHost(), SecurityProtocol.PLAINTEXT.name())); assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); assertEquals("foo", principal.getName()); + builder.close(); verifyAll(); } @@ -124,10 +128,11 @@ public void testPrincipalBuilderScram() throws Exception { DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(null); KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, - SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost())); + SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name())); assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); assertEquals("foo", principal.getName()); + builder.close(); verifyAll(); } @@ -146,10 +151,11 @@ public void testPrincipalBuilderGssapi() throws Exception { DefaultKafkaPrincipalBuilder builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer); KafkaPrincipal principal = builder.build(new SaslAuthenticationContext(server, - SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost())); + SecurityProtocol.SASL_PLAINTEXT, InetAddress.getLocalHost(), SecurityProtocol.SASL_PLAINTEXT.name())); assertEquals(KafkaPrincipal.USER_TYPE, principal.getPrincipalType()); assertEquals("foo", principal.getName()); + builder.close(); verifyAll(); } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 4500d49e337..35f45cd8236 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -331,7 +331,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } - private def sendRecords(numRecords: Int, tp: TopicPartition) { + protected final def sendRecords(numRecords: Int, tp: TopicPartition) { val futures = (0 until numRecords).map { i => val record = new ProducerRecord(tp.topic(), tp.partition(), s"$i".getBytes, s"$i".getBytes) debug(s"Sending this record: $record") @@ -344,7 +344,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } } - protected def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], + protected final def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], numRecords: Int = 1, startingOffset: Int = 0, topic: String = topic, diff --git a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala index 627934035c2..0b2fbcaf459 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextEndToEndAuthorizationTest.scala @@ -19,13 +19,20 @@ package kafka.api import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth._ -import org.junit.Before +import org.junit.{Before, Test} +import org.junit.Assert._ +import org.apache.kafka.common.errors.TopicAuthorizationException // This test case uses a separate listener for client and inter-broker communication, from // which we derive corresponding principals object PlaintextEndToEndAuthorizationTest { + @volatile + private var clientListenerName = None: Option[String] + @volatile + private var serverListenerName = None: Option[String] class TestClientPrincipalBuilder extends KafkaPrincipalBuilder { override def build(context: AuthenticationContext): KafkaPrincipal = { + clientListenerName = Some(context.listenerName) context match { case ctx: PlaintextAuthenticationContext if ctx.clientAddress != null => new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client") @@ -37,6 +44,7 @@ object PlaintextEndToEndAuthorizationTest { class TestServerPrincipalBuilder extends KafkaPrincipalBuilder { override def build(context: AuthenticationContext): KafkaPrincipal = { + serverListenerName = Some(context.listenerName) context match { case ctx: PlaintextAuthenticationContext => new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "server") @@ -67,4 +75,13 @@ class PlaintextEndToEndAuthorizationTest extends EndToEndAuthorizationTest { super.setUp() } + @Test + def testListenerName() { + // To check the client listener name, establish a session on the server by sending any request eg sendRecords + intercept[TopicAuthorizationException](sendRecords(1, tp)) + + assertEquals(Some("CLIENT"), PlaintextEndToEndAuthorizationTest.clientListenerName) + assertEquals(Some("SERVER"), PlaintextEndToEndAuthorizationTest.serverListenerName) + } + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add listener name to AuthenticationContext > ------------------------------------------ > > Key: KAFKA-6750 > URL: https://issues.apache.org/jira/browse/KAFKA-6750 > Project: Kafka > Issue Type: Improvement > Components: security > Reporter: Mickael Maison > Assignee: Mickael Maison > Priority: Major > Fix For: 2.0.0 > > > It will be nice to have the ListenerName available in AuthenticationContexts. > Since we can have multiple listeners using the same security protocol, this > would allow to easily identify the origin of the connection in custom > PrincipalBuilders rather than require to parse the IPs. > [KIP-282:Add the listener name to the authentication > context|[https://cwiki.apache.org/confluence/display/KAFKA/KIP-282%3A+Add+the+listener+name+to+the+authentication+context]] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)