[jira] [Resolved] (KAFKA-7478) Reduce OAuthBearerLoginModule verbosity
[ https://issues.apache.org/jira/browse/KAFKA-7478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7478. --- Resolution: Fixed Fix Version/s: 2.2.0 > Reduce OAuthBearerLoginModule verbosity > --- > > Key: KAFKA-7478 > URL: https://issues.apache.org/jira/browse/KAFKA-7478 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.2.0 > > > The OAuthBearerLoginModule is pretty verbose by default and this fills logs > in with too much information. It would be nice if we could reduce the > verbosity by default and let the user opt in to inspect these debug-friendly > messages > {code:java} > [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule login - > Login succeeded; invoke commit() to commit it; current committed token > count=0 > [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule commit - > Committing my token; current committed token count = 0 > [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule commit - > Done committing my token; committed token count is now 1 > [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] > org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin > login - Successfully logged in. > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6863) Kafka clients should try to use multiple DNS resolved IP addresses if the first one fails
[ https://issues.apache.org/jira/browse/KAFKA-6863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6863. --- Resolution: Fixed Reviewer: Rajini Sivaram > Kafka clients should try to use multiple DNS resolved IP addresses if the > first one fails > - > > Key: KAFKA-6863 > URL: https://issues.apache.org/jira/browse/KAFKA-6863 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 1.0.0, 1.1.0 >Reporter: Edoardo Comar >Assignee: Edoardo Comar >Priority: Major > Fix For: 2.1.0 > > > Currently Kafka clients resolve a symbolic hostname using > {{new InetSocketAddress(String hostname, int port)}} > which only picks one IP address even if the DNS has multiple records for the > hostname, as it calls > {{InetAddress.getAllByName(host)[0]}} > For some environments where the hostnames are mapped by the DNS to multiple > IPs, e.g. in clouds where the IPs point to the external load balancers, it > would be preferable that the client, on failing to connect to one of the IPs, > would try the other ones before giving up the connection. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7498) common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions
Rajini Sivaram created KAFKA-7498: - Summary: common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions Key: KAFKA-7498 URL: https://issues.apache.org/jira/browse/KAFKA-7498 Project: Kafka Issue Type: Bug Components: clients Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.1.0 `org.apache.kafka.common.requests.CreatePartitionsRequest` currently uses `org.apache.kafka.clients.admin.NewPartitions`. We shouldn't have references from `common` to `clients`. Since `org.apache.kafka.clients.admin` is a public package, we cannot use a common class for Admin API and requests. So we should do something similar to CreateTopicsRequest for which we have `org.apache.kafka.clients.admin.NewTopic` class used for the admin API and an equivalent `org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails` class that doesn't refer to `clients.admin`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7485) Flaky test `DyanamicBrokerReconfigurationTest.testTrustStoreAlter`
[ https://issues.apache.org/jira/browse/KAFKA-7485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7485. --- Resolution: Fixed Reviewer: Jason Gustafson Fix Version/s: 2.1.0 > Flaky test `DyanamicBrokerReconfigurationTest.testTrustStoreAlter` > -- > > Key: KAFKA-7485 > URL: https://issues.apache.org/jira/browse/KAFKA-7485 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > {code} > 09:53:53 > 09:53:53 kafka.server.DynamicBrokerReconfigurationTest > testTrustStoreAlter > FAILED > 09:53:53 org.apache.kafka.common.errors.SslAuthenticationException: SSL > handshake failed > 09:53:53 > 09:53:53 Caused by: > 09:53:53 javax.net.ssl.SSLProtocolException: Handshake message > sequence violation, 2 > 09:53:53 at > java.base/sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1611) > 09:53:53 at > java.base/sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:497) > 09:53:53 at > java.base/sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:745) > 09:53:53 at > java.base/sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:680) > 09:53:53 at > java.base/javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:626) > 09:53:53 at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:474) > 09:53:53 at > org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:274) > 09:53:53 at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:126) > 09:53:53 at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:532) > 09:53:53 at > org.apache.kafka.common.network.Selector.poll(Selector.java:467) > 09:53:53 at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > 09:53:53 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265) > 09:53:53 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > 09:53:53 at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > 09:53:53 at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > 09:53:53 at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:316) > 09:53:53 at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1210) > 09:53:53 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1179) > 09:53:53 at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1119) > 09:53:53 at > kafka.server.DynamicBrokerReconfigurationTest.kafka$server$DynamicBrokerReconfigurationTest$$awaitInitialPositions(DynamicBrokerReconfigurationTest.scala:997) > 09:53:53 at > kafka.server.DynamicBrokerReconfigurationTest$ConsumerBuilder.build(DynamicBrokerReconfigurationTest.scala:1424) > 09:53:53 at > kafka.server.DynamicBrokerReconfigurationTest.verifySslProduceConsume$1(DynamicBrokerReconfigurationTest.scala:286) > 09:53:53 at > kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(DynamicBrokerReconfigurationTest.scala:311) > 09:53:53 > 09:53:53 Caused by: > 09:53:53 javax.net.ssl.SSLProtocolException: Handshake message > sequence violation, 2 > 09:53:53 at > java.base/sun.security.ssl.HandshakeStateManager.check(HandshakeStateManager.java:398) > 09:53:53 at > java.base/sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:215) > 09:53:53 at > java.base/sun.security.ssl.Handshaker.processLoop(Handshaker.java:1098) > 09:53:53 at > java.base/sun.security.ssl.Handshaker$1.run(Handshaker.java:1031) > 09:53:53 at > java.base/sun.security.ssl.Handshaker$1.run(Handshaker.java:1028) > 09:53:53 at > java.base/java.security.AccessController.doPrivileged(Native Method) > 09:53:53 at > java.base/sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1540) > 09:53:53 at > org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:399) > 09:53:53 at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTra
[jira] [Resolved] (KAFKA-7498) common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions
[ https://issues.apache.org/jira/browse/KAFKA-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7498. --- Resolution: Fixed Reviewer: Ismael Juma > common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions > > > Key: KAFKA-7498 > URL: https://issues.apache.org/jira/browse/KAFKA-7498 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > `org.apache.kafka.common.requests.CreatePartitionsRequest` currently uses > `org.apache.kafka.clients.admin.NewPartitions`. We shouldn't have references > from `common` to `clients`. Since `org.apache.kafka.clients.admin` is a > public package, we cannot use a common class for Admin API and requests. So > we should do something similar to CreateTopicsRequest for which we have > `org.apache.kafka.clients.admin.NewTopic` class used for the admin API and an > equivalent > `org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails` class > that doesn't refer to `clients.admin`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7513) Flaky test SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain
Rajini Sivaram created KAFKA-7513: - Summary: Flaky test SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain Key: KAFKA-7513 URL: https://issues.apache.org/jira/browse/KAFKA-7513 Project: Kafka Issue Type: Bug Components: security Affects Versions: 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.1.0 Have seen this test fail quite a few times in PR builds (e.g. https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/123): {code} java.lang.AssertionError: expected: but was: at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:834) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:144) at org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:114) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientConnectionFailure(SaslAuthenticatorFailureDelayTest.java:223) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientAuthenticationFailure(SaslAuthenticatorFailureDelayTest.java:212) at org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain(SaslAuthenticatorFailureDelayTest.java:115) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7496) KafkaAdminClient#describeAcls should handle invalid filters gracefully
[ https://issues.apache.org/jira/browse/KAFKA-7496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7496. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: 2.1.0 > KafkaAdminClient#describeAcls should handle invalid filters gracefully > -- > > Key: KAFKA-7496 > URL: https://issues.apache.org/jira/browse/KAFKA-7496 > Project: Kafka > Issue Type: Bug > Components: admin >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Minor > Fix For: 2.1.0 > > > KafkaAdminClient#describeAcls should handle invalid filters gracefully. > Specifically, it should return a future which yields an exception. > The following code results in an uncaught IllegalArgumentException in the > admin client thread, resulting in a zombie admin client. > {code} > AclBindingFilter aclFilter = new AclBindingFilter( > new ResourcePatternFilter(ResourceType.UNKNOWN, null, PatternType.ANY), > AccessControlEntryFilter.ANY > ); > kafkaAdminClient.describeAcls(aclFilter).values().get(); > {code} > See the resulting stacktrace below > {code} > ERROR [kafka-admin-client-thread | adminclient-3] Uncaught exception in > thread 'kafka-admin-client-thread | adminclient-3': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.IllegalArgumentException: Filter contain UNKNOWN elements > at > org.apache.kafka.common.requests.DescribeAclsRequest.validate(DescribeAclsRequest.java:140) > at > org.apache.kafka.common.requests.DescribeAclsRequest.(DescribeAclsRequest.java:92) > at > org.apache.kafka.common.requests.DescribeAclsRequest$Builder.build(DescribeAclsRequest.java:77) > at > org.apache.kafka.common.requests.DescribeAclsRequest$Builder.build(DescribeAclsRequest.java:67) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:450) > at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:411) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:910) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1107) > at java.base/java.lang.Thread.run(Thread.java:844) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7513) Flaky test SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain
[ https://issues.apache.org/jira/browse/KAFKA-7513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7513. --- Resolution: Fixed Reviewer: Ismael Juma > Flaky test SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain > - > > Key: KAFKA-7513 > URL: https://issues.apache.org/jira/browse/KAFKA-7513 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > Have seen this test fail quite a few times in PR builds (e.g. > https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/123): > {code} > java.lang.AssertionError: expected: but was: > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:118) > at org.junit.Assert.assertEquals(Assert.java:144) > at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:114) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientConnectionFailure(SaslAuthenticatorFailureDelayTest.java:223) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.createAndCheckClientAuthenticationFailure(SaslAuthenticatorFailureDelayTest.java:212) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureDelayTest.testInvalidPasswordSaslPlain(SaslAuthenticatorFailureDelayTest.java:115) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride
[ https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7505. --- Resolution: Fixed Reviewer: Ismael Juma Fix Version/s: 2.1.0 > Flaky test: SslTransportLayerTest.testListenerConfigOverride > > > Key: KAFKA-7505 > URL: https://issues.apache.org/jira/browse/KAFKA-7505 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > This test seems to fail quite a bit recently. I've seen it happen with Java > 11 quite a bit so it could be more likely to fail there. > {code:java} > java.lang.AssertionError: expected: but > was: at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:118) at > org.junit.Assert.assertEquals(Assert.java:144) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104) > at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109) > at > org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7538) Improve locking model used to update ISRs and HW
Rajini Sivaram created KAFKA-7538: - Summary: Improve locking model used to update ISRs and HW Key: KAFKA-7538 URL: https://issues.apache.org/jira/browse/KAFKA-7538 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 We currently use a ReadWriteLock in Partition to update ISRs and high water mark for the partition. This can result in severe lock contention if there are multiple producers writing a large amount of data into a single partition. The current locking model is: # read lock while appending to log on every Produce request on the request handler thread # write lock on leader change, updating ISRs etc. on request handler or scheduler thread # write lock on every replica fetch request to check if ISRs need to be updated and to update HW and ISR on the request handler thread 2) is infrequent, but 1) and 3) may be frequent and can result in lock contention. If there are lots of produce requests to a partition from multiple processes, on the leader broker we may see: # one slow log append locks up one request thread for that produce while holding onto the read lock # (replicationFactor-1) request threads can be blocked waiting for write lock to process replica fetch request # potentially several other request threads processing Produce may be queued up to acquire read lock because of the waiting writers. In a thread dump with this issue, we noticed several request threads blocked waiting for write, possibly to due to replication fetch retries. Possible fixes: # Process `Partition#maybeExpandIsr` on a single scheduler thread similar to `Partition#maybeShrinkIsr` so that only a single thread is blocked on the write lock. But this will delay updating ISRs and HW. # Change locking in `Partition#maybeExpandIsr` so that only read lock is acquired to check if ISR needs updating and write lock is acquired only to update ISRs. Also use a different lock for updating HW (perhaps just the Partition object lock) so that typical replica fetch requests complete without acquiring Partition write lock on the request handler thread. I will submit a PR for 2) , but other suggestions to fix this are welcome. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7352) KIP-368: Allow SASL Connections to Periodically Re-Authenticate
[ https://issues.apache.org/jira/browse/KAFKA-7352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7352. --- Resolution: Fixed Reviewer: Rajini Sivaram > KIP-368: Allow SASL Connections to Periodically Re-Authenticate > --- > > Key: KAFKA-7352 > URL: https://issues.apache.org/jira/browse/KAFKA-7352 > Project: Kafka > Issue Type: Improvement > Components: clients, core >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Labels: kip > Fix For: 2.2.0 > > > KIP-368: Allow SASL Connections to Periodically Re-Authenticate > The adoption of KIP-255: OAuth Authentication via SASL/OAUTHBEARER in release > 2.0.0 creates the possibility of using information in the bearer token to > make authorization decisions. Unfortunately, however, Kafka connections are > long-lived, so there is no ability to change the bearer token associated with > a particular connection. Allowing SASL connections to periodically > re-authenticate would resolve this. In addition to this motivation there are > two others that are security-related. First, to eliminate access to Kafka > the current requirement is to remove all authorizations (i.e. remove all > ACLs). This is necessary because of the long-lived nature of the > connections. It is operationally simpler to shut off access at the point of > authentication, and with the release of KIP-86: Configurable SASL Callback > Handlers it is going to become more and more likely that installations will > authenticate users against external directories (e.g. via LDAP). The ability > to stop Kafka access by simply disabling an account in an LDAP directory (for > example) is desirable. The second motivating factor for re-authentication > related to security is that the use of short-lived tokens is a common OAuth > security recommendation, but issuing a short-lived token to a Kafka client > (or a broker when OAUTHBEARER is the inter-broker protocol) currently has no > benefit because once a client is connected to a broker the client is never > challenged again and the connection may remain intact beyond the token > expiration time (and may remain intact indefinitely under perfect > circumstances). This KIP proposes adding the ability for clients (and > brokers when OAUTHBEARER is the inter-broker protocol) to re-authenticate > their connections to brokers and have the new bearer token appear on their > session rather than the old one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start fetchers
Rajini Sivaram created KAFKA-7576: - Summary: Dynamic update of replica fetcher threads may fail to start fetchers Key: KAFKA-7576 URL: https://issues.apache.org/jira/browse/KAFKA-7576 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.0.0, 1.1.1, 1.0.2, 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram When config update notification of `num.replica.fetchers` is processed, partitions are migrated as necessary to increase or decrease the number of fetcher threads. Existing fetchers are shutdown first before new ones are created.This migration is performed on the thread processing ZK change notification. The shutdown of Selector of existing fetchers is not safe since replica fetcher thread may be processing data at the time using the same Selector. After the failure, another update of the config or broker restart is required to restart the replica fetchers. Exception stack trace: {code:java} java.lang.IllegalArgumentException at java.nio.Buffer.position(Buffer.java:244) at sun.nio.ch.IOUtil.write(IOUtil.java:68) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70) at org.apache.kafka.common.network.Selector.doClose(Selector.java:748) at org.apache.kafka.common.network.Selector.close(Selector.java:736) at org.apache.kafka.common.network.Selector.close(Selector.java:698) at org.apache.kafka.common.network.Selector.close(Selector.java:314) at org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533) at kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) at kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90) at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86) at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76) at kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) at kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72) at kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88) at kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574) at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) at kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) at scala.collection.immutable.List.foreach(List.scala:392) at kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410) kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7576) Dynamic update of replica fetcher threads may fail to start/close fetchers
[ https://issues.apache.org/jira/browse/KAFKA-7576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7576. --- Resolution: Fixed Reviewer: Jason Gustafson > Dynamic update of replica fetcher threads may fail to start/close fetchers > -- > > Key: KAFKA-7576 > URL: https://issues.apache.org/jira/browse/KAFKA-7576 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.1.1, 2.0.1, 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.2, 2.1.1, 2.0.2 > > > KAFKA-6051 moved ReplicaFetcherBlockingSend shutdown earlier in the shutdown > sequence of ReplicaFetcherThread. As a result, shutdown of replica fetchers > can now throw an exception because Selector may be closed on a different > thread while data is being written on another thread. KAFKA-7464 changed this > behaviour for 2.0.1 and 2.1.0. The exception during close() is now logged and > not propagated to avoid exceptions during broker shutdown. > When config update notification of `num.replica.fetchers` is processed, > partitions are migrated as necessary to increase or decrease the number of > fetcher threads. Existing fetchers are shutdown first before new ones are > created.This migration is performed on the thread processing ZK change > notification. The shutdown of Selector of existing fetchers is not safe since > replica fetcher thread may be processing data at the time using the same > Selector. > Without the fix from KAFKA-7464, another update of the config or broker > restart is required to restart the replica fetchers after dynamic config > update if shutdown encounters an exception. > Exception stack trace: > {code:java} > java.lang.IllegalArgumentException > at java.nio.Buffer.position(Buffer.java:244) > at sun.nio.ch.IOUtil.write(IOUtil.java:68) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) > at > org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:210) > at > org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:160) > at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:718) > at > org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:70) > at org.apache.kafka.common.network.Selector.doClose(Selector.java:748) > at org.apache.kafka.common.network.Selector.close(Selector.java:736) > at org.apache.kafka.common.network.Selector.close(Selector.java:698) > at org.apache.kafka.common.network.Selector.close(Selector.java:314) > at > org.apache.kafka.clients.NetworkClient.close(NetworkClient.java:533) > at > kafka.server.ReplicaFetcherBlockingSend.close(ReplicaFetcherBlockingSend.scala:107) > at > kafka.server.ReplicaFetcherThread.initiateShutdown(ReplicaFetcherThread.scala:90) > at > kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:86) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:76) > at > kafka.server.AbstractFetcherManager$$anonfun$migratePartitions$1$1.apply(AbstractFetcherManager.scala:72) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) > at > scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:130) > at > kafka.server.AbstractFetcherManager.migratePartitions$1(AbstractFetcherManager.scala:72) > at > kafka.server.AbstractFetcherManager.resizeThreadPool(AbstractFetcherManager.scala:88) > at > kafka.server.DynamicThreadPool.reconfigure(DynamicBrokerConfig.scala:574) > at > kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) > at > kafka.server.DynamicBrokerConfig$$anonfun$kafka$server$DynamicBrokerConfig$$updateCurrentConfig$1.apply(DynamicBrokerConfig.scala:410) > at scala.collection.immutable.List.foreach(List.scala:392) > at > kafka.server.DynamicBrokerConfig.kafka$server$DynamicBrokerConfig$$updateCurrentConfig(DynamicBrokerConfig.scala:410) > kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread.doWork(ZkNodeChangeNotificationListener.scala:135) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) > {code} > The fix from KAFKA-7464 in 2.0.1 and 2.1.0 avoids the issue with creatio
[jira] [Resolved] (KAFKA-7565) NPE in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-7565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7565. --- Resolution: Duplicate Fix Version/s: (was: 2.2.0) [~avakhrenev] Thanks for testing, closing this issue. > NPE in KafkaConsumer > > > Key: KAFKA-7565 > URL: https://issues.apache.org/jira/browse/KAFKA-7565 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.1 >Reporter: Alexey Vakhrenev >Priority: Critical > > The stacktrace is > {noformat} > java.lang.NullPointerException > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:221) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:202) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:563) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:390) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:244) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1171) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > {noformat} > Couldn't find minimal reproducer, but it happens quite often in our system. > We use {{pause()}} and {{wakeup()}} methods quite extensively, maybe it is > somehow related. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7639) Read one request at a time from socket to reduce broker memory usage
Rajini Sivaram created KAFKA-7639: - Summary: Read one request at a time from socket to reduce broker memory usage Key: KAFKA-7639 URL: https://issues.apache.org/jira/browse/KAFKA-7639 Project: Kafka Issue Type: Improvement Components: network Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 Broker's Selector currently reads all requests available on the socket when the socket is ready for read. These are queued up as staged receives. We mute the channel and stop reading any more data until all the staged requests are processed. This behaviour is slightly inconsistent since for the initial read we drain the socket buffer, allowing it to get filled up again, but if data arrives slighly after the initial read, then we dont read from the socket buffer until pending requests are processed. To avoid holding onto requests for longer than required, we should read one request at a time even if more data is available in the socket buffer. This is especially useful for produce requests which may be large and may take long to process. Note that with the default socket read buffer size of 100K, this is not a critical issue. But with larger socket buffers, this could result in excessive memory usage if a lot of produce requests are buffered in the broker and the producer times out, reconnects and sends more data before broker has cleared older requests. By reading one-at-a-time, we reduce the amount of time the broker holds onto memory for each request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7607) NetworkClientUtils.sendAndReceive can take a long time to return during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-7607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7607. --- Resolution: Fixed Fix Version/s: 2.0.2 2.1.1 1.1.2 Fixed under KAFKA-7576. > NetworkClientUtils.sendAndReceive can take a long time to return during > shutdown > > > Key: KAFKA-7607 > URL: https://issues.apache.org/jira/browse/KAFKA-7607 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Bob Barrett >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.2, 2.1.1, 2.0.2 > > > If a RequestSendThread is shut down while waiting on the underlying Selector > to return from a select() call, the Selector will swallow the interrupt, wake > up and return immediately. NetworkClientUtils.sendAndReceive will then > potentially re-poll the client, forcing the thread shutdown to wait for the > request to complete. We should check the thread interrupt status before > re-polling the client to prevent this delay. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-2609) SSL renegotiation code paths need more tests
[ https://issues.apache.org/jira/browse/KAFKA-2609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-2609. --- Resolution: Won't Fix Since renegotiation is currently disabled in Kafka and renegotiation is forbidden in TLS 1.3, closing this JIRA. > SSL renegotiation code paths need more tests > > > Key: KAFKA-2609 > URL: https://issues.apache.org/jira/browse/KAFKA-2609 > Project: Kafka > Issue Type: Test >Affects Versions: 0.9.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > > If renegotiation is triggered when read interest is off, at the moment it > looks like read interest is never turned back on. More unit tests are > required to test different renegotiation scenarios since these are much > harder to exercise in system tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7702) Prefixed ACLs don't work with single character prefix
Rajini Sivaram created KAFKA-7702: - Summary: Prefixed ACLs don't work with single character prefix Key: KAFKA-7702 URL: https://issues.apache.org/jira/browse/KAFKA-7702 Project: Kafka Issue Type: Bug Components: security Affects Versions: 2.1.0, 2.0.1 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0, 2.1.1, 2.0.2 Prefixed ACLs with a single character are not matched correctly against resource names. ALLOW rule with single character prefix doesn't grant access to any resource and DENY rule with single character prefix doesn't deny access to any resource since the prefix is not matched correctly. This is not an exploitable security vulnerability since only authenticated users with authorization to create ACLs can create the prefixed ACLs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7702) Prefixed ACLs don't work with single character prefix
[ https://issues.apache.org/jira/browse/KAFKA-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7702. --- Resolution: Fixed Reviewer: Jun Rao > Prefixed ACLs don't work with single character prefix > - > > Key: KAFKA-7702 > URL: https://issues.apache.org/jira/browse/KAFKA-7702 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.0.1, 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.2.0, 2.1.1, 2.0.2 > > > Prefixed ACLs with a single character are not matched correctly against > resource names. ALLOW rule with single character prefix doesn't grant access > to any resource and DENY rule with single character prefix doesn't deny > access to any resource since the prefix is not matched correctly. > This is not an exploitable security vulnerability since only authenticated > users with authorization to create ACLs can create the prefixed ACLs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition
[ https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7697. --- Resolution: Fixed Reviewer: Jason Gustafson > Possible deadlock in kafka.cluster.Partition > > > Key: KAFKA-7697 > URL: https://issues.apache.org/jira/browse/KAFKA-7697 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Gian Merlino >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 2.2.0, 2.1.1 > > Attachments: threaddump.txt > > > After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up > within a few minutes (by "locked up" I mean that all request handler threads > were busy, and other brokers reported that they couldn't communicate with > it). I restarted it a few times and it did the same thing each time. After > downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from > the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads > trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition. > It jumps out that there are two threads that already have some read lock > (can't tell which one) and are trying to acquire a second one (on two > different read locks: 0x000708184b88 and 0x00070821f188): > kafka-request-handler-1 and kafka-request-handler-4. Both are handling a > produce request, and in the process of doing so, are calling > Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the > same time, both of those locks have writers from other threads waiting on > them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks > appear to have writers that hold them (if only because no threads in the dump > are deep enough in inWriteLock to indicate that). > ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over > readers. Is it possible that kafka-request-handler-1 and > kafka-request-handler-4 are each trying to read-lock the partition that is > currently locked by the other one, and they're both parked waiting for > kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they > never will, because the former two threads own read locks and aren't giving > them up? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7712) Handle exceptions from immediately connected channels in Selector
Rajini Sivaram created KAFKA-7712: - Summary: Handle exceptions from immediately connected channels in Selector Key: KAFKA-7712 URL: https://issues.apache.org/jira/browse/KAFKA-7712 Project: Kafka Issue Type: Bug Components: network Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 We try to handle all possible exceptions in Selector to ensure that channels are always closed and their states kept consistent. For immediately connected channels, we should ensure that any exception during connection results in the channel being closed properly and removed from all maps. This is a very unlikely scenario, but we do already handle the exception. We should clean up properly in the catch block. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7719) Improve fairness in SocketServer processors
Rajini Sivaram created KAFKA-7719: - Summary: Improve fairness in SocketServer processors Key: KAFKA-7719 URL: https://issues.apache.org/jira/browse/KAFKA-7719 Project: Kafka Issue Type: Improvement Components: network Affects Versions: 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7730) Limit total number of active connections in the broker
Rajini Sivaram created KAFKA-7730: - Summary: Limit total number of active connections in the broker Key: KAFKA-7730 URL: https://issues.apache.org/jira/browse/KAFKA-7730 Project: Kafka Issue Type: New Feature Components: network Affects Versions: 2.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.2.0 Add a new listener config `max.connections` to limit the maximum number of active connections on each listener. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7755) Kubernetes - Kafka clients are resolving DNS entries only one time
[ https://issues.apache.org/jira/browse/KAFKA-7755?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7755. --- Resolution: Fixed Fix Version/s: 2.1.1 2.2.0 > Kubernetes - Kafka clients are resolving DNS entries only one time > -- > > Key: KAFKA-7755 > URL: https://issues.apache.org/jira/browse/KAFKA-7755 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 > Environment: Kubernetes >Reporter: Loïc Monney >Priority: Blocker > Fix For: 2.2.0, 2.1.1 > > Attachments: pom.xml > > > *Introduction* > Since 2.1.0 Kafka clients are supporting multiple DNS resolved IP addresses > if the first one fails. This change has been introduced by > https://issues.apache.org/jira/browse/KAFKA-6863. However this DNS resolution > is now performed only one time by the clients. This is not a problem if all > brokers have fixed IP addresses, however this is definitely an issue when > Kafka brokers are run on top of Kubernetes. Indeed, new Kubernetes pods will > receive another IP address, so as soon as all brokers will have been > restarted clients won't be able to reconnect to any broker. > *Impact* > Everyone running Kafka 2.1 or later on top of Kubernetes is impacted when a > rolling restart is performed. > *Root cause* > Since https://issues.apache.org/jira/browse/KAFKA-6863 Kafka clients are > resolving DNS entries only once. > *Proposed solution* > In > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java#L368] > Kafka clients should perform the DNS resolution again when all IP addresses > have been "used" (when _index_ is back to 0) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6834) log cleaner should handle the case when the size of a message set is larger than the max message size
[ https://issues.apache.org/jira/browse/KAFKA-6834?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6834. --- Resolution: Fixed Reviewer: Jason Gustafson Fix Version/s: 2.0.0 > log cleaner should handle the case when the size of a message set is larger > than the max message size > - > > Key: KAFKA-6834 > URL: https://issues.apache.org/jira/browse/KAFKA-6834 > Project: Kafka > Issue Type: Bug >Reporter: Jun Rao >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > In KAFKA-5316, we added the logic to allow a message (set) larger than the > per topic message size to be written to the log during log cleaning. However, > the buffer size in the log cleaner is still bounded by the per topic message > size. This can cause the log cleaner to die and cause the broker to run out > of disk space. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6911) Incorrect check for keystore/truststore dynamic update
Rajini Sivaram created KAFKA-6911: - Summary: Incorrect check for keystore/truststore dynamic update Key: KAFKA-6911 URL: https://issues.apache.org/jira/browse/KAFKA-6911 Project: Kafka Issue Type: Task Components: core Affects Versions: 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0, 1.1.1 The check to see if keystore or truststore needs updating is incorrect - it checks if one of the configs has not changed, rather than changed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6912) Add authorization tests for custom principal types
Rajini Sivaram created KAFKA-6912: - Summary: Add authorization tests for custom principal types Key: KAFKA-6912 URL: https://issues.apache.org/jira/browse/KAFKA-6912 Project: Kafka Issue Type: Task Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0 KIP-290 proposes to add prefixed-wildcarded principals to enable ACLs to be configured for groups of principals. This doesn't work with all security protocols - e.g. SSL principals are of format CN=name,O=org,C=country where prefixes don't fit in terms of grouping. Kafka currently doesn't support the concept of user groups, but it is possible to use custom KafkaPrincipalBuilders to generate group principals during authentication. By default, Kafka generates principals of type User, but custom types (e.g. Group) are supported. This does currently have the restriction ACLs may be defined only at group level (cannot combine both user & group level ACLs for a connection), but it works currently for all security protocols. We don't have any tests that verify custom principal types and authorization based on custom principal types. It will be good to add some tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6916) AdminClient does not refresh metadata on broker failure
Rajini Sivaram created KAFKA-6916: - Summary: AdminClient does not refresh metadata on broker failure Key: KAFKA-6916 URL: https://issues.apache.org/jira/browse/KAFKA-6916 Project: Kafka Issue Type: Task Components: admin Affects Versions: 1.0.1, 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0 There are intermittent test failures in DynamicBrokerReconfigurationTest when brokers are restarted. The test uses ephemeral ports and hence ports after server restart are not the same as the ports before restart. The tests rely on metadata refresh on producers, consumers and admin clients to obtain new server ports when connections fail. This works with producers and consumers, but results in intermittent failures with admin client because refresh is not triggered. There are a couple of issues in AdminClient: # Unlike producers and consumers, adminClient does not request metadata update when connection to a broker fails. This is particularly bad if controller goes down. Controller is used for various requests like createTopics and describeTopics. If controller goes down and adminClient.describeTopics() is invoked, adminClient sends the request to the old controller. If the connection fails, it keeps retrying with the same address. Metadata refresh is never triggered. The request times out after 2 minutes by default, metadata is not refreshed for 5 minutes by default. We should refresh metadata whenever connection to a broker fails. # Admin client requests are always retried on the same node. In the example above, if controller goes down and a new controller is elected, it will be good if the retried request is sent to the new controller. Otherwise we are just blocking the call for 2 minutes with a lot of retries that would never succeed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6917) Request handler deadlocks attempting to acquire group metadata lock
Rajini Sivaram created KAFKA-6917: - Summary: Request handler deadlocks attempting to acquire group metadata lock Key: KAFKA-6917 URL: https://issues.apache.org/jira/browse/KAFKA-6917 Project: Kafka Issue Type: Task Components: core Affects Versions: 1.0.1, 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0, 1.0.2, 1.1.1 We have noticed another deadlock with the group metadata lock with version 1.1. {quote} Found one Java-level deadlock: = "executor-Heartbeat": waiting for ownable synchronizer 0x0005ce477080, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "kafka-request-handler-3" "kafka-request-handler-3": waiting for ownable synchronizer 0x0005cbe7f698, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "kafka-request-handler-4" "kafka-request-handler-4": waiting for ownable synchronizer 0x0005ce477080, (a java.util.concurrent.locks.ReentrantLock$NonfairSync), which is held by "kafka-request-handler-3" Java stack information for the threads listed above: === "executor-Heartbeat": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0005ce477080> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) at kafka.coordinator.group.GroupCoordinator.onExpireHeartbeat(GroupCoordinator.scala:833) at kafka.coordinator.group.DelayedHeartbeat.onExpiration(DelayedHeartbeat.scala:34) at kafka.server.DelayedOperation.run(DelayedOperation.scala:144) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) "kafka-request-handler-3": at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0005cbe7f698> (a java.util.concurrent.locks.ReentrantLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199) at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209) at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:188) at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:801) at kafka.coordinator.group.GroupMetadataManager$$anonfun$handleTxnCompletion$1.apply(GroupMetadataManager.scala:799) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:799) at kafka.coordinator.group.GroupCoordinator.handleTxnCompletion(GroupCoordinator.scala:496) at kafka.server.KafkaApis.kafka$server$KafkaApis$$maybeSendResponseCallback$1(KafkaApis.scala:1633) at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691) at kafka.server.KafkaApis$$anonfun$handleWriteTxnMarkersRequest$1$$anonfun$apply$19.apply(KafkaApis.scala:1691) at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:111) at kafka.server.DelayedOperation.maybeTryCompl
[jira] [Resolved] (KAFKA-6912) Add authorization tests for custom principal types
[ https://issues.apache.org/jira/browse/KAFKA-6912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6912. --- Resolution: Fixed Reviewer: Dong Lin > Add authorization tests for custom principal types > -- > > Key: KAFKA-6912 > URL: https://issues.apache.org/jira/browse/KAFKA-6912 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > KIP-290 proposes to add prefixed-wildcarded principals to enable ACLs to be > configured for groups of principals. This doesn't work with all security > protocols - e.g. SSL principals are of format CN=name,O=org,C=country where > prefixes don't fit in terms of grouping. Kafka currently doesn't support the > concept of user groups, but it is possible to use custom > KafkaPrincipalBuilders to generate group principals during authentication. By > default, Kafka generates principals of type User, but custom types (e.g. > Group) are supported. This does currently have the restriction ACLs may be > defined only at group level (cannot combine both user & group level ACLs for > a connection), but it works currently for all security protocols. > We don't have any tests that verify custom principal types and authorization > based on custom principal types. It will be good to add some tests. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6877) Remove completedFetch upon a failed parse if it contains no records.
[ https://issues.apache.org/jira/browse/KAFKA-6877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6877. --- Resolution: Fixed Reviewer: Jiangjie Qin > Remove completedFetch upon a failed parse if it contains no records. > > > Key: KAFKA-6877 > URL: https://issues.apache.org/jira/browse/KAFKA-6877 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0, 1.0.1 >Reporter: Adem Efe Gencer >Assignee: Adem Efe Gencer >Priority: Major > Fix For: 2.0.0 > > > This patch removed a completedFetch from the completedFetches queue upon a > failed parse if it contains no records. The following scenario explains why > this is needed for an instance of this case – i.e. in > TopicAuthorizationException. > 0. Let's assume a scenario, in which the consumer is attempting to read from > a topic without the necessary read permission. > 1. In Fetcher#fetchedRecords(), after peeking the completedFetches, the > Fetcher#parseCompletedFetch(CompletedFetch) throws a > TopicAuthorizationException (as expected). > 2. Fetcher#fetchedRecords() passes the TopicAuthorizationException up without > having a chance to poll completedFetches. So, the same completedFetch remains > at the completedFetches queue. > 3. Upon following calls to Fetcher#fetchedRecords(), peeking the > completedFetches will always return the same completedFetch independent of > any updates to the ACL that the topic is trying to read from. > 4. Hence, despite the creation of an ACL with correct permissions, once the > consumer sees the TopicAuthorizationException, it will be unable to recover > without a bounce. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6711) GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file
[ https://issues.apache.org/jira/browse/KAFKA-6711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6711. --- Resolution: Fixed Reviewer: Matthias J. Sax Fix Version/s: 2.0.0 > GlobalStateManagerImpl should not write offsets of in-memory stores in > checkpoint file > -- > > Key: KAFKA-6711 > URL: https://issues.apache.org/jira/browse/KAFKA-6711 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.1 >Reporter: Cemalettin Koç >Assignee: Cemalettin Koç >Priority: Major > Labels: newbie > Fix For: 2.0.0 > > > We are using an InMemoryStore along with GlobalKTable and I noticed that > after each restart I am losing all my data. When I debug it, > `/tmp/kafka-streams/category-client-1/global/.checkpoint` file contains > offset for my GlobalKTable topic. I had checked GlobalStateManagerImpl > implementation and noticed that it is not guarded for cases similar to mine. > I am fairly new to Kafka land and probably there might be another way to fix > issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
[ https://issues.apache.org/jira/browse/KAFKA-6546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6546. --- Resolution: Fixed Reviewer: Ismael Juma Fix Version/s: (was: 2.1.0) 2.0.0 > Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener > > > Key: KAFKA-6546 > URL: https://issues.apache.org/jira/browse/KAFKA-6546 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > In 1,1, if an endpoint is available on the broker processing a metadata > request, but the corresponding listener is not available on the leader of a > partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned > UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some > brokers are not configured with all listeners or it could indicate a > transient error when listeners are dynamically added, We want to treat the > error as a transient error to process dynamic updates, but we should notify > clients of the actual error. This change should be made when MetadataRequest > version is updated so that LEADER_NOT_AVAILABLE is returned to older clients. > See > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > and [https://github.com/apache/kafka/pull/4539] for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6809. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: (was: 2.1.0) 2.0.0 PR: [https://github.com/apache/kafka/pull/5301] > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.0.0, 1.1.2 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7028) super.users doesn't work with custom principals
[ https://issues.apache.org/jira/browse/KAFKA-7028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7028. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: (was: 2.1.0) 2.0.0 > super.users doesn't work with custom principals > --- > > Key: KAFKA-7028 > URL: https://issues.apache.org/jira/browse/KAFKA-7028 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.0.0 > > > SimpleAclAuthorizer creates a KafkaPrincipal for the users defined in the > super.users broker config. However, it should use the configured > KafkaPrincipalBuilder so that it works with a custom defined one. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure
Rajini Sivaram created KAFKA-7119: - Summary: Intermittent test failure with GSSAPI authentication failure Key: KAFKA-7119 URL: https://issues.apache.org/jira/browse/KAFKA-7119 Project: Kafka Issue Type: Bug Components: security Affects Versions: 2.0.0 Reporter: Rajini Sivaram I have seen this failure a couple of times in builds (e.g. [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] {quote} org.apache.kafka.common.errors.SaslAuthenticationException: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred when evaluating SASL token received from the Kafka Broker. Kafka Client will go to AUTHENTICATION_FAILED state. Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Request is a replay (34) - Request is a replay)] at jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.Subject.doAs(Subject.java:423) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) at kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) at kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.lang.Thread.run(Thread.java:844) Caused by: GSSException: No valid credentials provided (Mechanism level: Request is a replay (34) - Request is a replay) at java.security.jgss/sun.security.jgss.
[jira] [Resolved] (KAFKA-7112) StreamThread does not check for state again after pollRequests()
[ https://issues.apache.org/jira/browse/KAFKA-7112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7112. --- Resolution: Fixed Fix Version/s: 2.0.0 > StreamThread does not check for state again after pollRequests() > > > Key: KAFKA-7112 > URL: https://issues.apache.org/jira/browse/KAFKA-7112 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.0.0 > > > In StreamThread's main loop, we have: > {code} > if (state == State.PARTITIONS_ASSIGNED) { > // try to fetch some records with zero poll millis > // to unblock the restoration as soon as possible > records = pollRequests(Duration.ZERO); > if (taskManager.updateNewAndRestoringTasks()) { > setState(State.RUNNING); > } > } > {code} > in which we first check for state, and if it is {{PARTITIONS_ASSIGNED}} then > call `consumer.poll()` and then call > `askManager.updateNewAndRestoringTasks()`. There is a race condition though, > that if another rebalance gets triggered, then `onPartitionRevoked` will be > called in which we will {{restoreConsumer.unsubscribe();}}, and then if we > call {{taskManager.updateNewAndRestoringTasks()}} right away we will see this: > {code} > Encountered the following error during processing: > (org.apache.kafka.streams.processor.internals.StreamThread) > java.lang.IllegalStateException: Consumer is not subscribed to any topics or > assigned any partitions > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1150) > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:83) > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7111) Review the NetworkClient log level used
[ https://issues.apache.org/jira/browse/KAFKA-7111?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7111. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: 2.0.0 > Review the NetworkClient log level used > --- > > Key: KAFKA-7111 > URL: https://issues.apache.org/jira/browse/KAFKA-7111 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Luan Cestari >Assignee: Stanislav Kozlovski >Priority: Trivial > Fix For: 2.0.0 > > > Hi, > > I was using Kafka on some projects and unfortunately I had to use Debug (and > some times even Trace) log level to see some issues. One of the most recently > cases was: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L872] > > If I got the name of the broker and it is unreachable, the errors should be > more severe than "DEBUG" level IMHO. I would at least put a INFO level for > this case or ERROR level (which seems to fit better but I don't know the > practices used in the project). > > Thank you in advance > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6268) Tools should now swallow exceptions like resolving network names
[ https://issues.apache.org/jira/browse/KAFKA-6268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6268. --- Resolution: Duplicate Fix Version/s: (was: 2.1.0) 2.0.0 Fixed under KAFKA-7111. > Tools should now swallow exceptions like resolving network names > > > Key: KAFKA-6268 > URL: https://issues.apache.org/jira/browse/KAFKA-6268 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.1 >Reporter: Antony Stubbs >Assignee: Stanislav Kozlovski >Priority: Major > Fix For: 2.0.0 > > > The cli consumer client shows nothing when it can't resolve a domain. This > and other errors like it should be shown to the user by default. You have to > turn on DEBUG level logging in the tools log4j to find there is an error. > {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node > as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) > (org.apache.kafka.clients.NetworkClient) > java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092 > at org.apache.kafka.common.network.Selector.connect(Selector.java:195) > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764) > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908) > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) > at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: java.nio.channels.UnresolvedAddressException > at sun.nio.ch.Net.checkAddress(Net.java:101) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) > at org.apache.kafka.common.network.Selector.connect(Selector.java:192) > ... 18 more > }} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7136) PushHttpMetricsReporter may deadlock when processing metrics changes
Rajini Sivaram created KAFKA-7136: - Summary: PushHttpMetricsReporter may deadlock when processing metrics changes Key: KAFKA-7136 URL: https://issues.apache.org/jira/browse/KAFKA-7136 Project: Kafka Issue Type: Bug Components: metrics Affects Versions: 1.1.0, 2.0.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0 We noticed a deadlock in {{PushHttpMetricsReporter}}. Locking for metrics was changed under KAFKA-6765 to avoid {{NullPointerException}} in metrics reporters due to concurrent read and updates. {{PushHttpMetricsReporter}} requires a lock to process metrics registration that is invoked while holding the sensor lock. It also reads metrics attempting to acquire sensor lock while holding its lock (inverse order). This resulted in the deadlock below. {quote} Found one Java-level deadlock: Java stack information for the threads listed above: === "StreamThread-7": at org.apache.kafka.tools.PushHttpMetricsReporter.metricChange(PushHttpMetricsReporter.java:144) - waiting to lock <0x000655a54310> (a java.lang.Object) at org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:563) - locked <0x000655a44a28> (a org.apache.kafka.common.metrics.Metrics) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:236) - locked <0x00065629c170> (a org.apache.kafka.common.metrics.Sensor) at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:217) at org.apache.kafka.common.network.Selector$SelectorMetrics.maybeRegisterConnectionMetrics(Selector.java:1016) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:462) at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:274) at org.apache.kafka.clients.consumer.internals.Fetcher.getAllTopicMetadata(Fetcher.java:254) at org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1820) at org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1798) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.refreshChangelogInfo(StoreChangelogReader.java:224) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:121) at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:74) at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:317) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:824) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) "pool-17-thread-1": at org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:82) - waiting to lock <0x00065629c170> (a org.apache.kafka.common.metrics.Sensor) at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:58) at org.apache.kafka.tools.PushHttpMetricsReporter$HttpReporter.run(PushHttpMetricsReporter.java:177) - locked <0x000655a54310> (a java.lang.Object) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Found 1 deadlock. {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7168) Broker shutdown during SSL handshake may be handled as handshake failure
Rajini Sivaram created KAFKA-7168: - Summary: Broker shutdown during SSL handshake may be handled as handshake failure Key: KAFKA-7168 URL: https://issues.apache.org/jira/browse/KAFKA-7168 Project: Kafka Issue Type: Bug Components: security Affects Versions: 1.0.2, 1.1.1, 2.0.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram If broker is shutdown while SSL handshake of a client connection is in progress, the client may process the resulting SSLException as a non-retriable handshake failure rather than a retriable I/O exception. This can cause streams applications to fail during rolling restarts. Exception stack trace: {quote} org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed Caused by: javax.net.ssl.SSLException: Received close_notify during handshake at sun.security.ssl.Alerts.getSSLException(Alerts.java:208) at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1639) at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1607) at sun.security.ssl.SSLEngineImpl.recvAlert(SSLEngineImpl.java:1752) at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:1068) at sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:890) at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:764) at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624) at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:465) at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:266) at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:88) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474) at org.apache.kafka.common.network.Selector.poll(Selector.java:412) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:206) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7168) Broker shutdown during SSL handshake may be handled as handshake failure
[ https://issues.apache.org/jira/browse/KAFKA-7168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7168. --- Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 > Broker shutdown during SSL handshake may be handled as handshake failure > > > Key: KAFKA-7168 > URL: https://issues.apache.org/jira/browse/KAFKA-7168 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 1.0.2, 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > If broker is shutdown while SSL handshake of a client connection is in > progress, the client may process the resulting SSLException as a > non-retriable handshake failure rather than a retriable I/O exception. This > can cause streams applications to fail during rolling restarts. > Exception stack trace: > {quote} > org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake > failed > Caused by: javax.net.ssl.SSLException: Received close_notify during handshake > at sun.security.ssl.Alerts.getSSLException(Alerts.java:208) > at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1639) > at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1607) > at sun.security.ssl.SSLEngineImpl.recvAlert(SSLEngineImpl.java:1752) > at sun.security.ssl.SSLEngineImpl.readRecord(SSLEngineImpl.java:1068) > at > sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:890) > at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:764) > at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624) > at > org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:465) > at > org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:266) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:88) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474) > at org.apache.kafka.common.network.Selector.poll(Selector.java:412) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:206) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:219) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741) > {quote} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7182) SASL/OAUTHBEARER client response is missing %x01 separators
[ https://issues.apache.org/jira/browse/KAFKA-7182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7182. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: 2.0.0 > SASL/OAUTHBEARER client response is missing %x01 separators > --- > > Key: KAFKA-7182 > URL: https://issues.apache.org/jira/browse/KAFKA-7182 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.0.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Blocker > Labels: pull-request-available > Fix For: 2.0.0 > > > The format of the SASL/OAUTHBEARER client response is defined in [RFC 7628 > Section 3.1|https://tools.ietf.org/html/rfc7628#section-3.1] as follows: > {noformat} > kvsep = %x01 > key= 1*(ALPHA) > value = *(VCHAR / SP / HTAB / CR / LF ) > kvpair = key "=" value kvsep > client-resp= (gs2-header kvsep *kvpair kvsep) / kvsep > {noformat} > ;;gs2-header = See [RFC 5801 (Section > 4)|https://tools.ietf.org/html/rfc5801#section-4] > The SASL/OAUTHBEARER client response as currently implemented in > OAuthBearerSaslClient sends the valid gs2-header "n,," but then sends the > "auth" key and value immediately after it, like this: > {code:java} > String.format("n,,auth=Bearer %s", callback.token().value()) > {code} > This does not conform to the specification because there is no %x01 after the > gs2-header, no %x01 after the auth value, and no terminating %x01. The code > should instead be as follows: > {code:java} > String.format("n,,\u0001auth=Bearer %s\u0001\u0001", callback.token().value()) > {code} > Similarly, the parsing of the client response in OAuthBearerSaslServer, which > currently allows the malformed text, must also change. > *This should be fixed prior to the initial release of the SASL/OAUTHBEARER > code in 2.0.0 to prevent compatibility problems.* -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7188) Avoid reverse DNS lookup in SASL channel builder
Rajini Sivaram created KAFKA-7188: - Summary: Avoid reverse DNS lookup in SASL channel builder Key: KAFKA-7188 URL: https://issues.apache.org/jira/browse/KAFKA-7188 Project: Kafka Issue Type: Bug Components: network Reporter: Rajini Sivaram Assignee: Rajini Sivaram SaslChannelBuilder uses InetAddress.getHostName which may perform reverse DNS lookup, causing delays in some environments. We should replace these with SocketAddress.getHostString if possible. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name
[ https://issues.apache.org/jira/browse/KAFKA-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7185. --- Resolution: Fixed Reviewer: Ismael Juma Fix Version/s: 2.0.0 > getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name > -- > > Key: KAFKA-7185 > URL: https://issues.apache.org/jira/browse/KAFKA-7185 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.0.0 >Reporter: Dhruvil Shah >Assignee: Dhruvil Shah >Priority: Blocker > Fix For: 2.0.0 > > > KIP-290 introduced a way to match ACLs based on prefix. Certain resource > names like that for group id can be empty strings. When an empty string is > passed into `getMatchingAcls`, it would throw a > `StringIndexOutOfBoundsException` because of the following logic: > {noformat} > val prefixed = aclCache.range( > Resource(resourceType, resourceName, PatternType.PREFIXED), > Resource(resourceType, resourceName.substring(0, Math.min(1, > resourceName.length)), PatternType.PREFIXED) > ) > .filterKeys(resource => resourceName.startsWith(resource.name)) > .flatMap { case (resource, versionedAcls) => versionedAcls.acls } > .toSet{noformat} > This is a regression introduced in 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7193) ZooKeeper client times out with localhost due to random choice of ipv4/ipv6
Rajini Sivaram created KAFKA-7193: - Summary: ZooKeeper client times out with localhost due to random choice of ipv4/ipv6 Key: KAFKA-7193 URL: https://issues.apache.org/jira/browse/KAFKA-7193 Project: Kafka Issue Type: Bug Components: zkclient Affects Versions: 2.0.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram ZooKeeper client from version 3.4.13 doesn't handle connections to `localhost` very well. If ZooKeeper is started on 127.0.0.1 on a machine that has both ipv4 and ipv6 and a client is created using `localhost` rather than the IP address in the connection string, ZooKeeper client attempts to connect to ipv4 or ipv6 randomly with a fixed one second backoff if connection fails. With the default 6 second connection timeout in Kafka, this can result in client connection failures if ipv6 is chosen in consecutive address selections. Streams tests are failing intermittently as a result of this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7193) ZooKeeper client times out with localhost due to random choice of ipv4/ipv6
[ https://issues.apache.org/jira/browse/KAFKA-7193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7193. --- Resolution: Fixed Reviewer: Ismael Juma Fix Version/s: 2.0.0 > ZooKeeper client times out with localhost due to random choice of ipv4/ipv6 > --- > > Key: KAFKA-7193 > URL: https://issues.apache.org/jira/browse/KAFKA-7193 > Project: Kafka > Issue Type: Bug > Components: zkclient >Affects Versions: 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > ZooKeeper client from version 3.4.13 doesn't handle connections to > `localhost` very well. If ZooKeeper is started on 127.0.0.1 on a machine that > has both ipv4 and ipv6 and a client is created using `localhost` rather than > the IP address in the connection string, ZooKeeper client attempts to connect > to ipv4 or ipv6 randomly with a fixed one second backoff if connection fails. > With the default 6 second connection timeout in Kafka, this can result in > client connection failures if ipv6 is chosen in consecutive address > selections. > Streams tests are failing intermittently as a result of this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7194) Error deserializing assignment after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-7194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7194. --- Resolution: Fixed Reviewer: Konstantine Karantasis Fix Version/s: 2.0.0 > Error deserializing assignment after rebalance > -- > > Key: KAFKA-7194 > URL: https://issues.apache.org/jira/browse/KAFKA-7194 > Project: Kafka > Issue Type: Bug >Reporter: Konstantine Karantasis >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.0 > > > A simple sink connector task is failing in a test with the following > exception: > {noformat} > [2018-07-02 12:31:13,200] ERROR WorkerSinkTask{id=verifiable-sink-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > org.apache.kafka.common.protocol.types.SchemaException: Error reading field > 'version': java.nio.BufferUnderflowException > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:243) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:421) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:353) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:338) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748){noformat} > > After dumping the consumer offsets on the partition that this consumer group > is writing with: > {noformat} > bin/kafka-dump-log.sh --offsets-decoder --files ./.log > {noformat} > we get: > {noformat} > Dumping ./.log > Starting offset: 0 > offset: 0 position: 0 CreateTime: 1530534673177 isvalid: true keysize: 27 > valuesize: 217 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 > sequence: -1 isTransactional: false headerKeys: [] key: > {"metadata":"connect-verifiable-sink"} payload: > {"protocolType":"consumer","protocol":"range","generationId":1,"assignment":"{consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4=[test-0]}"} > offset: 1 position: 314 CreateTime: 1530534673206 isvalid: true keysize: 27 > valuesize: 32 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 > sequence: -1 isTransactional: false headerKeys: [] key: > {"metadata":"connect-verifiable-sink"} payload: > {"protocolType":"consumer","protocol":null,"generationId":2,"assignment":"{}"}{noformat} > > Since the broker seems to send a non-empty response to the consumer, there's > a chance that the response buffer is consumed more than once at some point > when parsing the response in the client. > Here's what the kafka-request.log shows it sends to the client with the > `SYNC_GROUP` response that throws the error: > {noformat} > [2018-07-02 12:31:13,185] DEBUG Completed > request:RequestHeader(apiKey=SYNC_GROUP, apiVersion=2, clientId=consumer-4, > correlationId=5) -- > {group_id=connect-verifiable-sink,generation_id=1,member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,group_assignment=[{member_id=consumer-4-bad84955-e702-44fe-a018-677bd3b3a9d4,member_assignment=java.nio.HeapByteBuffer[pos=0 >
[jira] [Created] (KAFKA-7237) Add explicit fatal marker to fatal error messages
Rajini Sivaram created KAFKA-7237: - Summary: Add explicit fatal marker to fatal error messages Key: KAFKA-7237 URL: https://issues.apache.org/jira/browse/KAFKA-7237 Project: Kafka Issue Type: Bug Components: core Reporter: Rajini Sivaram We currently use FATAL markers for fatal error messages in core. But only logback supports markers out-of-the-box and log4j simply logs these as ERROR messages, ignoring the marker. We should perhaps include a String marker within these messages to make it easier to identify fatal messages in logs. >From https://www.slf4j.org/faq.html: While markers are part of the SLF4J API, only logback supports markers off the shelf. For example, if you add the {{%marker}}conversion word to its pattern, logback's {{PatternLayout}} will add marker data to its output. Marker data can be used to [filter messages|http://logback.qos.ch/manual/filters.html] or even [trigger|http://logback.qos.ch/manual/appenders.html#OnMarkerEvaluator] an outgoing email [at the end of an individual transaction|http://logback.qos.ch/recipes/emailPerTransaction.html]. In combination with logging frameworks such as log4j and java.util.logging which do not support markers, marker data will be silently ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6463) Review logging level for user errors in AdminManager
Rajini Sivaram created KAFKA-6463: - Summary: Review logging level for user errors in AdminManager Key: KAFKA-6463 URL: https://issues.apache.org/jira/browse/KAFKA-6463 Project: Kafka Issue Type: Improvement Reporter: Rajini Sivaram Fix For: 1.1.0 AdminManager currently logs errors due to bad requests at INFO level (e.g. alter configs with bad value). In other components, I think we only log user errors are either not logged or logged at a lower logging level. We should review logging in AdminManager. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6476) Document dynamic config update
Rajini Sivaram created KAFKA-6476: - Summary: Document dynamic config update Key: KAFKA-6476 URL: https://issues.apache.org/jira/browse/KAFKA-6476 Project: Kafka Issue Type: Sub-task Components: core, documentation Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.0 Add documentation for dynamic broker config update. Include: - Command line options for kafka-configs.sh with examples - Configs that can be updated along with constraints applied -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6494) Extend ConfigCommand to update broker config using new AdminClient
Rajini Sivaram created KAFKA-6494: - Summary: Extend ConfigCommand to update broker config using new AdminClient Key: KAFKA-6494 URL: https://issues.apache.org/jira/browse/KAFKA-6494 Project: Kafka Issue Type: Sub-task Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.0 Add --bootstrap-server and --command-config options for new AdminClient. Update ConfigCommand to use new AdminClient for dynamic broker config updates in KIP-226. Full conversion of ConfigCommand to new AdminClient will be done later under KIP-248. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6501) Add test to verify markPartitionsForTruncation after fetcher thread pool resize
Rajini Sivaram created KAFKA-6501: - Summary: Add test to verify markPartitionsForTruncation after fetcher thread pool resize Key: KAFKA-6501 URL: https://issues.apache.org/jira/browse/KAFKA-6501 Project: Kafka Issue Type: Sub-task Reporter: Rajini Sivaram Assignee: Rajini Sivaram Follow-on task from KAFKA-6242 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6517) ZooKeeperClient holds a lock while waiting for responses, blocking shutdown
Rajini Sivaram created KAFKA-6517: - Summary: ZooKeeperClient holds a lock while waiting for responses, blocking shutdown Key: KAFKA-6517 URL: https://issues.apache.org/jira/browse/KAFKA-6517 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.0 Stack traces from a local test run that was deadlocked because shutdown couldn't acquire the lock: # kafka-scheduler-7: acquired read lock in kafka.zookeeper.ZooKeeperClient.handleRequests # Test worker-EventThread waiting for write lock to process SessionExpired in kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process # ForkJoinPool-1-worker-11 processing KafkaServer.shutdown is queued behind 2) waiting to acquire read lock for kafka.zookeeper.ZooKeeperClient.unregisterStateChangeHandler Stack traces of the relevant threads: {quote} "kafka-scheduler-7" daemon prio=5 tid=0x7fade918d800 nid=0xd317 waiting on condition [0x7b371000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007e4c6e698> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:146) at kafka.zookeeper.ZooKeeperClient$$anonfun$handleRequests$1.apply(ZooKeeperClient.scala:126) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:256) at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:125) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1432) at kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1425) at kafka.zk.KafkaZkClient.conditionalUpdatePath(KafkaZkClient.scala:583) at kafka.utils.ReplicationUtils$.updateLeaderAndIsr(ReplicationUtils.scala:33) at kafka.cluster.Partition.kafka$cluster$Partition$$updateIsr(Partition.scala:665) at kafka.cluster.Partition$$anonfun$4.apply$mcZ$sp(Partition.scala:509) at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500) at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:500) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) at kafka.cluster.Partition.maybeShrinkIsr(Partition.scala:499) at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335) at kafka.server.ReplicaManager$$anonfun$kafka$server$ReplicaManager$$maybeShrinkIsr$2.apply(ReplicaManager.scala:1335) .. "Test worker-EventThread" daemon prio=5 tid=0x7fade90cf800 nid=0xef13 waiting on condition [0x7a23f000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000781847620> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:867) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1197) at java.util.concurrent.locks.ReentrantReadWriteLock$WriteLock.lock(ReentrantReadWriteLock.java:945) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:248) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:355) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) "ForkJoinPool-1-worker-11" daemon prio=5 tid=0x7fade9a83000 nid=0x17907 waiting on condition [0x700011eaf000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00078184
[jira] [Created] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
Rajini Sivaram created KAFKA-6526: - Summary: Update controller to handle changes to unclean.leader.election.enable Key: KAFKA-6526 URL: https://issues.apache.org/jira/browse/KAFKA-6526 Project: Kafka Issue Type: Sub-task Reporter: Rajini Sivaram Assignee: Rajini Sivaram At the moment, updates to default unclean.leader.election.enable uses the same code path as updates to topic overrides. This requires controller change for the new value to take effect. It will be good if we can update the controller to handle the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6532) Delegation token internals should not impact public interfaces
Rajini Sivaram created KAFKA-6532: - Summary: Delegation token internals should not impact public interfaces Key: KAFKA-6532 URL: https://issues.apache.org/jira/browse/KAFKA-6532 Project: Kafka Issue Type: Bug Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram We need to make sure that code related to the internal delegation tokens implementation doesn't have any impact on public interfaces, including customizable callback handlers from KIP-86. # KafkaPrincipal has a public _tokenAuthenticated()_ method. Principal builders are configurable and we now expect custom principal builders to set this value. Since we allow the same endpoint to be used for basic SCRAM and delegation tokens, the configured principal builder needs a way of detecting token authentication. Default principal builder does this using internal SCRAM implementation code. It will be better if configurable principal builders didn't have to set this flag at all. # It will be better to replace _o.a.k.c.security.scram.DelegationTokenAuthenticationCallback_ with a more generic _ScramExtensionsCallback_. This will allow us to add more extensions in future and it will also enable custom Scram extensions. # _ScramCredentialCallback_ was extended to add _tokenOwner_ and mechanism. Mechanism is determined during SASL handshake and shouldn't be configurable in a callback handler. _ScramCredentialCallback_ is being made a public interface in KIP-86 with configurable callback handlers. Since delegation token implementation is internal and not extensible, _tokenOwner_ should be in a delegation-token-specific callback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6528) Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize
[ https://issues.apache.org/jira/browse/KAFKA-6528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6528. --- Resolution: Fixed Fix Version/s: 1.1.0 > Transient failure in DynamicBrokerReconfigurationTest.testThreadPoolResize > -- > > Key: KAFKA-6528 > URL: https://issues.apache.org/jira/browse/KAFKA-6528 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > {code:java} > java.lang.AssertionError: expected:<108> but was:<123> > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.failNotEquals(Assert.java:834) > at org.junit.Assert.assertEquals(Assert.java:645) > at org.junit.Assert.assertEquals(Assert.java:631) > at > kafka.server.DynamicBrokerReconfigurationTest.stopAndVerifyProduceConsume(DynamicBrokerReconfigurationTest.scala:755) > at > kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:443) > at > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:451){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6532) Delegation token internals should not impact public interfaces
[ https://issues.apache.org/jira/browse/KAFKA-6532?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6532. --- Resolution: Fixed Fix Version/s: 1.1.0 > Delegation token internals should not impact public interfaces > -- > > Key: KAFKA-6532 > URL: https://issues.apache.org/jira/browse/KAFKA-6532 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > We need to make sure that code related to the internal delegation tokens > implementation doesn't have any impact on public interfaces, including > customizable callback handlers from KIP-86. > # KafkaPrincipal has a public _tokenAuthenticated()_ method. Principal > builders are configurable and we now expect custom principal builders to set > this value. Since we allow the same endpoint to be used for basic SCRAM and > delegation tokens, the configured principal builder needs a way of detecting > token authentication. Default principal builder does this using internal > SCRAM implementation code. It will be better if configurable principal > builders didn't have to set this flag at all. > # It will be better to replace > _o.a.k.c.security.scram.DelegationTokenAuthenticationCallback_ with a more > generic _ScramExtensionsCallback_. This will allow us to add more extensions > in future and it will also enable custom Scram extensions. > # _ScramCredentialCallback_ was extended to add _tokenOwner_ and mechanism. > Mechanism is determined during SASL handshake and shouldn't be configurable > in a callback handler. _ScramCredentialCallback_ is being made a public > interface in KIP-86 with configurable callback handlers. Since delegation > token implementation is internal and not extensible, _tokenOwner_ should be > in a delegation-token-specific callback. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6546) Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener
Rajini Sivaram created KAFKA-6546: - Summary: Add ENDPOINT_NOT_FOUND_ON_LEADER error code for missing listener Key: KAFKA-6546 URL: https://issues.apache.org/jira/browse/KAFKA-6546 Project: Kafka Issue Type: Improvement Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.2.0 In 1,1, if an endpoint is available on the broker processing a metadata request, but the corresponding listener is not available on the leader of a partition, LEADER_NOT_AVAILABLE is returned (earlier versions returned UNKNOWN_SERVER_ERROR). This could indicate broker misconfiguration where some brokers are not configured with all listeners or it could indicate a transient error when listeners are dynamically added, We want to treat the error as a transient error to process dynamic updates, but we should notify clients of the actual error. This change should be made when MetadataRequest version is updated so that LEADER_NOT_AVAILABLE is returned to older clients. See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] and [https://github.com/apache/kafka/pull/4539] for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6476) Document dynamic config update
[ https://issues.apache.org/jira/browse/KAFKA-6476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6476. --- Resolution: Fixed Reviewer: Jason Gustafson Fix Version/s: (was: 1.2.0) 1.1.0 > Document dynamic config update > -- > > Key: KAFKA-6476 > URL: https://issues.apache.org/jira/browse/KAFKA-6476 > Project: Kafka > Issue Type: Sub-task > Components: core, documentation >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > Add documentation for dynamic broker config update. > Include: > - Command line options for kafka-configs.sh with examples > - Configs that can be updated along with constraints applied > - Secret rotation for password encoder > Also add a new column for broker configs to indicate which configs can be > dynamically updated, -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6240) Support dynamic updates of frequently updated broker configs
[ https://issues.apache.org/jira/browse/KAFKA-6240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6240. --- Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 > Support dynamic updates of frequently updated broker configs > > > Key: KAFKA-6240 > URL: https://issues.apache.org/jira/browse/KAFKA-6240 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. > Implementation will be done under sub-tasks. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6512) Java Producer: Excessive memory usage with compression enabled
[ https://issues.apache.org/jira/browse/KAFKA-6512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6512. --- Resolution: Fixed Fix Version/s: (was: 1.2.0) 1.1.0 Implemented options 1) and 2) from the description. > Java Producer: Excessive memory usage with compression enabled > -- > > Key: KAFKA-6512 > URL: https://issues.apache.org/jira/browse/KAFKA-6512 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 > Environment: Windows 10 >Reporter: Kyle Tinker >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.0 > > Attachments: KafkaSender.java > > > h2. User Story > As a user of the Java producer, I want a predictable memory usage for the > Kafka client so that I can ensure that my system is sized appropriately and > will be stable even under heavy usage. > As a user of the Java producer, I want a smaller memory footprint so that my > systems don't consume as many resources. > h2. Acceptance Criteria > * Enabling Compression in Kafka should not significantly increase the memory > usage of Kafka > * The memory usage of Kafka's Java Producer should be roughly in line with > the buffer size (buffer.memory) and the number of producers declared. > h2. Additional Information > I've observed high memory usage in the producer when enabling compression > (gzip or lz4). I don't observe the behavior with compression off, but with > it on I'll run out of heap (2GB). Using a Java profiler, I see the data is > in the KafkaLZ4BlockOutputStream (or related class for gzip). I see that > MemoryRecordsBuilder:closeForRecordAppends() is trying to deal with this, but > is not successful. I'm most likely network bottlenecked, so I expect the > producer buffers to be full while the job is running and potentially a lot of > unacknowledged records. > I've tried using the default buffer.memory with 20 producers (across 20 > threads) and sending data as quickly as I can. I've also tried 1MB of > buffer.memory, which seemed to reduce memory consumption but I could still > run OOM in certain cases. I have max.in.flight.requests.per.connection set > to 1. In short, I should only have ~20 MB (20* 1MB) of data in buffers, but > I can easily exhaust 2000 MB used by Kafka. > In looking at the code more, it looks like the KafkaLZ4BlockOutputStream > doesn't clear the compressedBuffer or buffer when close() is called. In my > heap dump, both of those are ~65k size each, meaning that each batch is > taking up ~148k of space, of which 131k is buffers. (buffer.memory=1,000,000 > and messages are 1k each until the batch fills). > Kafka tries to manage memory usage by calling > MemoryRecordsBuilder:closeForRecordAppends(), which as documented as "Release > resources required for record appends (e.g. compression buffers)". However, > this method doesn't actually clear those buffers because > KafkaLZ4BlockOutputStream.close() only writes the block and end mark and > closes the output stream. It doesn't actually clear the buffer and > compressedBuffer in KafkaLZ4BlockOutputStream. Those stay allocated in RAM > until the block is acknowledged by the broker, processed in > Sender:handleProduceResponse(), and the batch is deallocated. This memory > usage therefore increases, possibly without bound. In my test program, the > program died with approximately 345 unprocessed batches per producer (20 > producers), despite having max.in.flight.requests.per.connection=1. > h2. Steps to Reproduce > # Create a topic test with plenty of storage > # Use a connection with a very fast upload pipe and limited download. This > allows the outbound data to go out, but acknowledgements to be delayed > flowing in. > # Download KafkaSender.java (attached to this ticket) > # Set line 17 to reference your Kafka broker > # Run the program with a 1GB Xmx value > h2. Possible solutions > There are a few possible optimizations I can think of: > # We could declare KafkaLZ4BlockOutputStream.buffer and compressedBuffer as > non-final and null them in the close() method > # We could declare the MemoryRecordsBuilder.appendStream non-final and null > it in the closeForRecordAppends() method > # We could have the ProducerBatch discard the recordsBuilder in > closeForRecordAppends(), however, this is likely a bad idea because the > recordsBuilder contains significant metadata that is likely needed after the > stream is closed. It is also final. > # We could try to limit the number of non-acknowledged batches in flight. > This would bound the maximum memory usage but may negatively impact > performance. > > Fix #1 would only improve the LZ4 algorithm, and not any other alg
[jira] [Resolved] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6549. --- Resolution: Fixed Fixed under KAFKA-6517. > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Manikumar >Assignee: Manikumar >Priority: Blocker > Fix For: 1.1.0 > > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) > at
[jira] [Created] (KAFKA-6573) KafkaController.brokerInfo not updated on dynamic update
Rajini Sivaram created KAFKA-6573: - Summary: KafkaController.brokerInfo not updated on dynamic update Key: KAFKA-6573 URL: https://issues.apache.org/jira/browse/KAFKA-6573 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.0 KafkaController.brokerInfo is cached in-memory and used to re-register the broker in ZooKeeper if ZK session expires. It should be kept up-to-date if listeners are dynamically updated. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6576) Configurable Quota Management (KIP-257)
Rajini Sivaram created KAFKA-6576: - Summary: Configurable Quota Management (KIP-257) Key: KAFKA-6576 URL: https://issues.apache.org/jira/browse/KAFKA-6576 Project: Kafka Issue Type: New Feature Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram See [https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management] for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6653) Delayed operations may not be completed when there is lock contention
Rajini Sivaram created KAFKA-6653: - Summary: Delayed operations may not be completed when there is lock contention Key: KAFKA-6653 URL: https://issues.apache.org/jira/browse/KAFKA-6653 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.0.1, 0.11.0.2, 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram If there is lock contention while multiple threads check if a delayed operation may be completed (e.g. a produce request with acks=-1), only the thread that acquires the lock without blocking attempts to complete the operation. This change was made to avoid deadlocks under KAFKA-5970. But this leaves a timing window when an operation becomes ready to complete after another thread has acquired the lock and performed the check for completion, but not yet released the lock. In this case, the operation may never be completed and will timeout unless there are other operations with the same key. The timeout was observed in a failed system test where a produce request timed out, causing the test failure. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6695) Add a system test for dynamic broker config update
Rajini Sivaram created KAFKA-6695: - Summary: Add a system test for dynamic broker config update Key: KAFKA-6695 URL: https://issues.apache.org/jira/browse/KAFKA-6695 Project: Kafka Issue Type: Test Components: core Reporter: Rajini Sivaram Assignee: Rajini Sivaram Add a system test that does some basic validation of dynamic broker configs. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6710) Streams integration tests hang during shutdown
Rajini Sivaram created KAFKA-6710: - Summary: Streams integration tests hang during shutdown Key: KAFKA-6710 URL: https://issues.apache.org/jira/browse/KAFKA-6710 Project: Kafka Issue Type: Bug Components: core, streams Affects Versions: 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Builds have been timing out a lot recently and many of the logs show streams integration tests being run, but not completed. While running tests locally, I saw a failure during shutdown of {{TableTableJoinIntegrationTest}}. The test was stuck waiting for a broker to shutdown when a {{KafkaScheduler}} was attemping to delete logs. KAFKA-6624 (Commit #1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a _Thread.sleep_ to wait for the time to delete each log segment inside the scheduled delete task. The failing streams test had 62 logs to delete and since MockTime doesn't get updated during the test, it would have waited for 62 minutes to complete. This blocks shutdown of the broker for 62 minutes. This is an issue if a streams integration test takes more than 30 seconds when the first delayed delete task is scheduled to be run. Changing _Thread.sleep_ to _time.sleep_ fixes this test issue. But it will be good to know why we have a _sleep_ on a _Scheduler_ at all. With the default _log.segment.delete.delay.ms_ of one minute, this potentially blocks a scheduler thread for upto a minute when there are logs to be deleted. Couldn't we just break out of the loop if it is not yet time to delete the first log segment in the list? The log would then get deleted when the broker checks next time. [~junrao] [~lindong] ? *Stack trace from failing test*: {{"kafka-scheduler-8" daemon prio=5 tid=0x7fe58dc16800 nid=0x9603 waiting on condition [0x73f25000]}} {{ java.lang.Thread.State: TIMED_WAITING (sleeping)}} {{ at java.lang.Thread.sleep(Native Method)}} {{ at kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)}} {{ at kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)}} {{ at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)}} {{ at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}} {{ at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)}} {{ at java.util.concurrent.FutureTask.run(FutureTask.java:262)}} {{ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)}} {{ at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)}} {{ at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)}} {{ at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)}} {{ at java.lang.Thread.run(Thread.java:745)}}{{}} {{}}{{"Test worker" prio=5 tid=0x7fe58db72000 nid=0x5203 waiting on condition [0x71cbd000]}} {{ java.lang.Thread.State: TIMED_WAITING (parking)}} {{ at sun.misc.Unsafe.park(Native Method)}} {{ - parking to wait for <0x000780fb8918> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} {{ at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)}} {{ at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)}} {{ at java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)}} {{ at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)}} {{ at kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)}} {{ at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)}} {{ at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)}} {{ at org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)}} {{ at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)}} {{ at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)}} {{ at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)}} {{ at org.junit.rules.RunRules.evaluate(RunRules.java:20)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6710) Streams integration tests hang during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-6710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6710. --- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 1.2.0 > Streams integration tests hang during shutdown > -- > > Key: KAFKA-6710 > URL: https://issues.apache.org/jira/browse/KAFKA-6710 > Project: Kafka > Issue Type: Bug > Components: core, streams >Affects Versions: 1.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Builds have been timing out a lot recently and many of the logs show streams > integration tests being run, but not completed. While running tests locally, > I saw a failure during shutdown of {{TableTableJoinIntegrationTest}}. The > test was stuck waiting for a broker to shutdown when a {{KafkaScheduler}} was > attemping to delete logs. KAFKA-6624 (Commit > #1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a _Thread.sleep_ to wait for > the time to delete each log segment inside the scheduled delete task. The > failing streams test had 62 logs to delete and since MockTime doesn't get > updated during the test, it would have waited for 62 minutes to complete. > This blocks shutdown of the broker for 62 minutes. This is an issue if a > streams integration test takes more than 30 seconds when the first delayed > delete task is scheduled to be run. > Changing _Thread.sleep_ to _time.sleep_ fixes this test issue. But it will be > good to know why we have a _sleep_ on a _Scheduler_ at all. With the default > _log.segment.delete.delay.ms_ of one minute, this potentially blocks a > scheduler thread for upto a minute when there are logs to be deleted. > Couldn't we just break out of the loop if it is not yet time to delete the > first log segment in the list? The log would then get deleted when the broker > checks next time. [~junrao] [~lindong] ? > > *Stack trace from failing test*: > {{"kafka-scheduler-8" daemon prio=5 tid=0x7fe58dc16800 nid=0x9603 waiting > on condition [0x73f25000]}} > {{ java.lang.Thread.State: TIMED_WAITING (sleeping)}} > {{ at java.lang.Thread.sleep(Native Method)}} > {{ at > kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)}} > {{ at > kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)}} > {{ at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)}} > {{ at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)}} > {{ at java.util.concurrent.FutureTask.run(FutureTask.java:262)}} > {{ at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)}} > {{ at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)}} > {{ at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)}} > {{ at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)}} > {{ at java.lang.Thread.run(Thread.java:745)}}{{}} > {{}}{{"Test worker" prio=5 tid=0x7fe58db72000 nid=0x5203 waiting on > condition [0x71cbd000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x000780fb8918> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)}} > {{ at > java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)}} > {{ at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)}} > {{ at > kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)}} > {{ at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)}} > {{ at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)}} > {{ at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)}} > {{ at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)}} > {{ at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)}} > {{ at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)}} > {{ at org.junit.rules.RunRules.evaluate(RunRules.java:20)}} -- This message was sent by Atlassian
[jira] [Resolved] (KAFKA-4292) KIP-86: Configurable SASL callback handlers
[ https://issues.apache.org/jira/browse/KAFKA-4292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-4292. --- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 1.2.0 > KIP-86: Configurable SASL callback handlers > --- > > Key: KAFKA-4292 > URL: https://issues.apache.org/jira/browse/KAFKA-4292 > Project: Kafka > Issue Type: Improvement > Components: security >Affects Versions: 0.10.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Implementation of KIP-86: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-86%3A+Configurable+SASL+callback+handlers -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6741) Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded
[ https://issues.apache.org/jira/browse/KAFKA-6741?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6741. --- Resolution: Fixed Assignee: Manikumar Fix Version/s: 1.2.0 > Transient test failure: SslTransportLayerTest.testNetworkThreadTimeRecorded > --- > > Key: KAFKA-6741 > URL: https://issues.apache.org/jira/browse/KAFKA-6741 > Project: Kafka > Issue Type: Bug >Reporter: Manikumar >Assignee: Manikumar >Priority: Minor > Fix For: 1.2.0 > > > debug logs: > {code} > [2018-04-03 14:51:33,365] DEBUG Added sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,368] DEBUG Added sensor with name connections-created: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,368] DEBUG Added sensor with name > successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,368] DEBUG Added sensor with name > failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,368] DEBUG Added sensor with name bytes-sent-received: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,369] DEBUG Added sensor with name bytes-sent: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,370] DEBUG Added sensor with name bytes-received: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,370] DEBUG Added sensor with name select-time: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,371] DEBUG Added sensor with name io-time: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,379] DEBUG Removed sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,379] DEBUG Removed sensor with name > connections-created: (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,379] DEBUG Removed sensor with name > successful-authentication: (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,380] DEBUG Removed sensor with name > failed-authentication: (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,380] DEBUG Removed sensor with name > bytes-sent-received: (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-sent: > (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,380] DEBUG Removed sensor with name bytes-received: > (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,382] DEBUG Removed sensor with name select-time: > (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,382] DEBUG Removed sensor with name io-time: > (org.apache.kafka.common.metrics.Metrics:447) > [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,382] DEBUG Added sensor with name connections-created: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,383] DEBUG Added sensor with name > successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,383] DEBUG Added sensor with name > failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,383] DEBUG Added sensor with name bytes-sent-received: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-sent: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,384] DEBUG Added sensor with name bytes-received: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,384] DEBUG Added sensor with name select-time: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,384] DEBUG Added sensor with name io-time: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,444] DEBUG Added sensor with name connections-closed: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,445] DEBUG Added sensor with name connections-created: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,445] DEBUG Added sensor with name > successful-authentication: (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,445] DEBUG Added sensor with name > failed-authentication: (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent-received: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-sent: > (org.apache.kafka.common.metrics.Metrics:414) > [2018-04-03 14:51:33,446] DEBUG Added sensor with name bytes-received: > (org.apache.
[jira] [Resolved] (KAFKA-6576) Configurable Quota Management (KIP-257)
[ https://issues.apache.org/jira/browse/KAFKA-6576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6576. --- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 1.2.0 > Configurable Quota Management (KIP-257) > --- > > Key: KAFKA-6576 > URL: https://issues.apache.org/jira/browse/KAFKA-6576 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > See > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-257+-+Configurable+Quota+Management] > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6688) The Trogdor coordinator should track task statuses
[ https://issues.apache.org/jira/browse/KAFKA-6688?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6688. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: 1.2.0 > The Trogdor coordinator should track task statuses > -- > > Key: KAFKA-6688 > URL: https://issues.apache.org/jira/browse/KAFKA-6688 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Major > Fix For: 1.2.0 > > > The Trogdor coordinator should track task statuses -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6765) Intermittent test failure in CustomQuotaCallbackTest
Rajini Sivaram created KAFKA-6765: - Summary: Intermittent test failure in CustomQuotaCallbackTest Key: KAFKA-6765 URL: https://issues.apache.org/jira/browse/KAFKA-6765 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.2.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.2.0 Exception stack trace: {quote} java.lang.NullPointerException at org.apache.kafka.common.metrics.stats.SampledStat.purgeObsoleteSamples(SampledStat.java:104) at org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:74) at org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:68) at kafka.api.QuotaTestClients$.metricValue(BaseQuotaTest.scala:163) at kafka.api.QuotaTestClients.produceUntilThrottled(BaseQuotaTest.scala:193) at kafka.api.CustomQuotaCallbackTest$GroupedUser.produceConsume(CustomQuotaCallbackTest.scala:272) at kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:146) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6765) Intermittent test failure in CustomQuotaCallbackTest
[ https://issues.apache.org/jira/browse/KAFKA-6765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6765. --- Resolution: Fixed Reviewer: Jun Rao > Intermittent test failure in CustomQuotaCallbackTest > > > Key: KAFKA-6765 > URL: https://issues.apache.org/jira/browse/KAFKA-6765 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.2.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.2.0 > > > Exception stack trace: > {quote} > java.lang.NullPointerException at > org.apache.kafka.common.metrics.stats.SampledStat.purgeObsoleteSamples(SampledStat.java:104) > at > org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:74) > at > org.apache.kafka.common.metrics.KafkaMetric.metricValue(KafkaMetric.java:68) > at kafka.api.QuotaTestClients$.metricValue(BaseQuotaTest.scala:163) at > kafka.api.QuotaTestClients.produceUntilThrottled(BaseQuotaTest.scala:193) at > kafka.api.CustomQuotaCallbackTest$GroupedUser.produceConsume(CustomQuotaCallbackTest.scala:272) > at > kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:146) > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6772) Broker should load credentials from ZK before requests are allowed
[ https://issues.apache.org/jira/browse/KAFKA-6772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6772. --- Resolution: Fixed Reviewer: Jun Rao > Broker should load credentials from ZK before requests are allowed > -- > > Key: KAFKA-6772 > URL: https://issues.apache.org/jira/browse/KAFKA-6772 > Project: Kafka > Issue Type: Improvement >Affects Versions: 1.0.0, 1.1.0, 1.0.1 >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.0.2, 1.2.0, 1.1.1 > > > It is currently possible for clients to get an AuthenticationException during > start-up if the brokers have not yet loaded credentials from ZK. This > definitely affects SCRAM, but it may also affect delegation tokens. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6800) Update documentation for SASL/PLAIN and SCRAM to use callbacks
Rajini Sivaram created KAFKA-6800: - Summary: Update documentation for SASL/PLAIN and SCRAM to use callbacks Key: KAFKA-6800 URL: https://issues.apache.org/jira/browse/KAFKA-6800 Project: Kafka Issue Type: Task Components: documentation, security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.2.0 Refer to custom callbacks introduced in KIP-86 in SASL documentation instead of replacing login module. Also include `org.apache.kafka.common.security.plain` and `org.apache.kafka.common.security.scram` in javadocs since these are now part of the public API. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker
Rajini Sivaram created KAFKA-6805: - Summary: Allow dynamic broker configs to be configured in ZooKeeper before starting broker Key: KAFKA-6805 URL: https://issues.apache.org/jira/browse/KAFKA-6805 Project: Kafka Issue Type: Task Components: tools Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0 At the moment, dynamic broker configs like SSL keystore and password can be configured using ConfigCommand only after a broker is started (using the new AdminClient). To start a broker, these configs have to be defined in server.properties. We want to restrict updates using ZooKeeper once broker starts up, but we should allow updates using ZK prior to starting brokers. This is particularly useful for password configs which are stored encrypted in ZK, making it difficult to set manually before starting brokers. ConfigCommand is being updated to talk to AdminClient under KIP-248, but we will need to maintain the tool using ZK to enable credentials to be created in ZK before starting brokers. So the functionality to set broker configs can sit alongside that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6810) Enable dynamic reconfiguration of SSL truststores
Rajini Sivaram created KAFKA-6810: - Summary: Enable dynamic reconfiguration of SSL truststores Key: KAFKA-6810 URL: https://issues.apache.org/jira/browse/KAFKA-6810 Project: Kafka Issue Type: Task Components: security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0 We currently allow broker's SSL keystores to be dynamically reconfigured to support short-lived keystores (KIP-226). It will be useful to allow truststores to be reconfigured as well to allow new certificates to be added and also to remove certifcates. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6835) Enable topic unclean leader election to be enabled without controller change
Rajini Sivaram created KAFKA-6835: - Summary: Enable topic unclean leader election to be enabled without controller change Key: KAFKA-6835 URL: https://issues.apache.org/jira/browse/KAFKA-6835 Project: Kafka Issue Type: Task Components: core Reporter: Rajini Sivaram Dynamic update of broker's default unclean.leader.election.enable will be processed without controller change (KAFKA-6526). We should probably do the same for topic overrides as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6526) Update controller to handle changes to unclean.leader.election.enable
[ https://issues.apache.org/jira/browse/KAFKA-6526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-6526. --- Resolution: Fixed Reviewer: Dong Lin > Update controller to handle changes to unclean.leader.election.enable > - > > Key: KAFKA-6526 > URL: https://issues.apache.org/jira/browse/KAFKA-6526 > Project: Kafka > Issue Type: Improvement >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > At the moment, updates to default unclean.leader.election.enable uses the > same code path as updates to topic overrides. This requires controller change > for the new value to take effect. It will be good if we can update the > controller to handle the change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6854) Log cleaner fails with transaction markers that are deleted during clean
Rajini Sivaram created KAFKA-6854: - Summary: Log cleaner fails with transaction markers that are deleted during clean Key: KAFKA-6854 URL: https://issues.apache.org/jira/browse/KAFKA-6854 Project: Kafka Issue Type: Task Components: core Affects Versions: 1.1.0 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Log cleaner grows buffers when `result.messagesRead` is zero. In a typical scenario, this is a result of source buffer being too small to read the first batch. The buffer is then doubled in size until one batch can be read, up to a maximum of `max.message.size`. There are issues with the maximum message size used in calculations as reported in KAFKA-6834. But there is a separate issue with the use of `result.messagesRead` when transactions are used. This contains the number of filtered messages read from source which can be zero when a transaction control marker is discarded. Log cleaner incorrectly assumes that messages were not read because the buffer was too small. This can result in the log cleaner attempting to grow buffers forever, failing with an exception when `max.message.bytes` is reached. This kills the log cleaner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7251) Add support for TLS 1.3
Rajini Sivaram created KAFKA-7251: - Summary: Add support for TLS 1.3 Key: KAFKA-7251 URL: https://issues.apache.org/jira/browse/KAFKA-7251 Project: Kafka Issue Type: New Feature Components: security Reporter: Rajini Sivaram Fix For: 2.1.0 Java 11 adds support for TLS 1.3. We should support this when we add support for Java 11. Related issues: [https://bugs.openjdk.java.net/browse/JDK-8206170] [https://bugs.openjdk.java.net/browse/JDK-8206178] [https://bugs.openjdk.java.net/browse/JDK-8208538] [https://bugs.openjdk.java.net/browse/JDK-8207009] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7255) Timing issue in SimpleAclAuthorizer with concurrent create/update
Rajini Sivaram created KAFKA-7255: - Summary: Timing issue in SimpleAclAuthorizer with concurrent create/update Key: KAFKA-7255 URL: https://issues.apache.org/jira/browse/KAFKA-7255 Project: Kafka Issue Type: Bug Components: security Affects Versions: 2.0.0, 1.1.1, 1.0.2, 0.11.0.3 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0 There is a small timing window in SimpleAclAuthorizer where ACL updates may be lost if two brokers create ACLs for a resource at the same time. Scenario: Administrator creates new.topic and sends one ACL request to add ACL for UserA for new.topic and a second request to add ACL for UserB for new.topic using AdminClient. These requests may be sent to different brokers by AdminClient. In most cases, both ACLs are added for the resource new.topic, but there is a small timing window where one broker may overwrite the ACL written by the other broker, resulting in only one of the ACLs (either UserA or UserB) being actually stored in ZooKeeper. The timing window itself is very small, but we have seen intermittent failures in SimpleAclAuthorizerTest.testHighConcurrencyModificationOfResourceAcls as a result of this window. Even though this issue can result in incorrect ACLs affecting security, we have not raised this as a security vulnerability since this is not an exploitable issue. ACLs can only be set by privileged users in Kafka who have Alter access on the Cluster resource. Users without this privileged access cannot use this issue to gain additional access to any resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7261) Request and response total metrics record bytes instead of request count
Rajini Sivaram created KAFKA-7261: - Summary: Request and response total metrics record bytes instead of request count Key: KAFKA-7261 URL: https://issues.apache.org/jira/browse/KAFKA-7261 Project: Kafka Issue Type: Bug Components: metrics Reporter: Rajini Sivaram Assignee: Rajini Sivaram Request and response total metrics seem to be recording total bytes rather than total requests since they record using a common sensor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7255) Timing issue in SimpleAclAuthorizer with concurrent create/update
[ https://issues.apache.org/jira/browse/KAFKA-7255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7255. --- Resolution: Fixed > Timing issue in SimpleAclAuthorizer with concurrent create/update > - > > Key: KAFKA-7255 > URL: https://issues.apache.org/jira/browse/KAFKA-7255 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 0.11.0.4, 1.0.3, 1.1.2, 2.0.1, 2.1.0 > > > There is a small timing window in SimpleAclAuthorizer where ACL updates may > be lost if two brokers create ACLs for a resource at the same time. > Scenario: Administrator creates new.topic and sends one ACL request to add > ACL for UserA for new.topic and a second request to add ACL for UserB for > new.topic using AdminClient. These requests may be sent to different brokers > by AdminClient. In most cases, both ACLs are added for the resource > new.topic, but there is a small timing window where one broker may overwrite > the ACL written by the other broker, resulting in only one of the ACLs > (either UserA or UserB) being actually stored in ZooKeeper. The timing window > itself is very small, but we have seen intermittent failures in > SimpleAclAuthorizerTest.testHighConcurrencyModificationOfResourceAcls as a > result of this window. > Even though this issue can result in incorrect ACLs affecting security, we > have not raised this as a security vulnerability since this is not an > exploitable issue. ACLs can only be set by privileged users in Kafka who have > Alter access on the Cluster resource. Users without this privileged access > cannot use this issue to gain additional access to any resource. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7261) Request and response total metrics record bytes instead of request count
[ https://issues.apache.org/jira/browse/KAFKA-7261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7261. --- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 2.1.0 2.0.1 1.1.2 1.0.3 > Request and response total metrics record bytes instead of request count > > > Key: KAFKA-7261 > URL: https://issues.apache.org/jira/browse/KAFKA-7261 > Project: Kafka > Issue Type: Bug > Components: metrics >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0 > > > Request and response total metrics seem to be recording total bytes rather > than total requests since they record using a common sensor. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
Rajini Sivaram created KAFKA-7280: - Summary: ConcurrentModificationException in FetchSessionHandler in heartbeat thread Key: KAFKA-7280 URL: https://issues.apache.org/jira/browse/KAFKA-7280 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.0.0, 1.1.1 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 1.1.2, 2.0.1, 2.1.0 Request/response handling in FetchSessionHandler is not thread-safe. But we are using it in Kafka consumer without any synchronization even though poll() from heartbeat thread can process responses. Heartbeat thread holds the coordinator lock while processing its poll and responses, making other operations involving the group coordinator safe. We also need to lock FetchSessionHandler for the operations that update or read FetchSessionHandler#sessionPartitions. This exception is from a system test run on trunk of TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: {quote} [2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, groupId=group] Heartbeat thread failed due to unexpected error (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) java.util.ConcurrentModificationException at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) at org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) at org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) at org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7288) Transient failure in SslSelectorTest.testCloseConnectionInClosingState
Rajini Sivaram created KAFKA-7288: - Summary: Transient failure in SslSelectorTest.testCloseConnectionInClosingState Key: KAFKA-7288 URL: https://issues.apache.org/jira/browse/KAFKA-7288 Project: Kafka Issue Type: Bug Components: unit tests Reporter: Rajini Sivaram Assignee: Rajini Sivaram Noticed this failure in SslSelectorTest.testCloseConnectionInClosingState a few times in unit tests in Jenkins: {quote} java.lang.AssertionError: Channel not expired expected null, but was: at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotNull(Assert.java:755) at org.junit.Assert.assertNull(Assert.java:737) at org.apache.kafka.common.network.SelectorTest.testCloseConnectionInClosingState(SelectorTest.java:341) {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure
[ https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7119. --- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 2.1.0 2.0.1 1.1.2 1.0.3 > Intermittent test failure with GSSAPI authentication failure > > > Key: KAFKA-7119 > URL: https://issues.apache.org/jira/browse/KAFKA-7119 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0 > > > I have seen this failure a couple of times in builds (e.g. > [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] > {quote} > org.apache.kafka.common.errors.SaslAuthenticationException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred > when evaluating SASL token received from the Kafka Broker. Kafka Client will > go to AUTHENTICATION_FAILED state. Caused by: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Request is a > replay (34) - Request is a replay)] at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) > at java.base/java.security.AccessController.doPrivileged(Native Method) at > java.base/javax.security.auth.Subject.doAs(Subject.java:423) at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at > kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) > at > kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.j
[jira] [Resolved] (KAFKA-7169) Add support for Custom SASL extensions in OAuth authentication
[ https://issues.apache.org/jira/browse/KAFKA-7169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7169. --- Resolution: Fixed Reviewer: Ron Dagostino Fix Version/s: 2.1.0 > Add support for Custom SASL extensions in OAuth authentication > -- > > Key: KAFKA-7169 > URL: https://issues.apache.org/jira/browse/KAFKA-7169 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.1.0 > > > KIP: > [here|https://cwiki.apache.org/confluence/display/KAFKA/KIP-342%3A+Add+support+for+Custom+SASL+extensions+in+OAuth+authentication] > Kafka currently supports non-configurable SASL extensions in its SCRAM > authentication protocol for delegation token validation. It would be useful > to provide configurable SASL extensions for the OAuthBearer authentication > mechanism as well, such that clients could attach arbitrary data for the > principal authenticating into Kafka. This way, a custom principal can hold > information derived from the authentication mechanism, which could prove > useful for better tracing and troubleshooting, for example. This can be done > in a way which allows for easier extendability in future SASL mechanisms. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure
[ https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reopened KAFKA-7119: --- There was a test failure with the 1.0 build. It looks like we need to handle retriable Kerberos exceptions on the server-side as well. I could recreate this failure only with Java 7, but to be safe, will fix trunk as well. Exception from https://builds.apache.org/job/kafka–jdk7/232/: {quote} java.util.concurrent.ExecutionException: java.lang.AssertionError: expected: but was: at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:202) at kafka.server.GssapiAuthenticationTest$$anonfun$testRequestIsAReplay$1.apply(GssapiAuthenticationTest.scala:80) at kafka.server.GssapiAuthenticationTest$$anonfun$testRequestIsAReplay$1.apply(GssapiAuthenticationTest.scala:80) {quote} > Intermittent test failure with GSSAPI authentication failure > > > Key: KAFKA-7119 > URL: https://issues.apache.org/jira/browse/KAFKA-7119 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0 > > > I have seen this failure a couple of times in builds (e.g. > [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] > {quote} > org.apache.kafka.common.errors.SaslAuthenticationException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred > when evaluating SASL token received from the Kafka Broker. Kafka Client will > go to AUTHENTICATION_FAILED state. Caused by: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Request is a > replay (34) - Request is a replay)] at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) > at java.base/java.security.AccessController.doPrivileged(Native Method) at > java.base/javax.security.auth.Subject.doAs(Subject.java:423) at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at > kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) > at > kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.inter
[jira] [Resolved] (KAFKA-7288) Transient failure in SslSelectorTest.testCloseConnectionInClosingState
[ https://issues.apache.org/jira/browse/KAFKA-7288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7288. --- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 2.1.0 > Transient failure in SslSelectorTest.testCloseConnectionInClosingState > -- > > Key: KAFKA-7288 > URL: https://issues.apache.org/jira/browse/KAFKA-7288 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.1.0 > > > Noticed this failure in SslSelectorTest.testCloseConnectionInClosingState a > few times in unit tests in Jenkins: > {quote} > java.lang.AssertionError: Channel not expired expected null, but > was: at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotNull(Assert.java:755) at > org.junit.Assert.assertNull(Assert.java:737) at > org.apache.kafka.common.network.SelectorTest.testCloseConnectionInClosingState(SelectorTest.java:341) > {quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure
[ https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7119. --- Resolution: Fixed > Intermittent test failure with GSSAPI authentication failure > > > Key: KAFKA-7119 > URL: https://issues.apache.org/jira/browse/KAFKA-7119 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.0.3, 1.1.2, 2.0.1, 2.1.0 > > > I have seen this failure a couple of times in builds (e.g. > [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] > {quote} > org.apache.kafka.common.errors.SaslAuthenticationException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred > when evaluating SASL token received from the Kafka Broker. Kafka Client will > go to AUTHENTICATION_FAILED state. Caused by: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Request is a > replay (34) - Request is a replay)] at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) > at java.base/java.security.AccessController.doPrivileged(Native Method) at > java.base/javax.security.auth.Subject.doAs(Subject.java:423) at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at > kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) > at > kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.s
[jira] [Resolved] (KAFKA-7324) NPE due to lack of SASLExtensions in SASL/OAUTHBEARER
[ https://issues.apache.org/jira/browse/KAFKA-7324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7324. --- Resolution: Fixed > NPE due to lack of SASLExtensions in SASL/OAUTHBEARER > - > > Key: KAFKA-7324 > URL: https://issues.apache.org/jira/browse/KAFKA-7324 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Ron Dagostino >Assignee: Ron Dagostino >Priority: Major > Fix For: 2.1.0 > > > When there are no SASL extensions in an OAUTHBEARER request (or the callback > handler does not support SaslExtensionsCallback) the > OAuthBearerSaslClient.retrieveCustomExtensions() method returns null. This > null value is then passed to the OAuthBearerClientInitialResponse > constructor, and that results in an NPE: > java.lang.NullPointerException > at > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.validateExtensions(OAuthBearerClientInitialResponse.java:115) > at > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.(OAuthBearerClientInitialResponse.java:81) > at > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerClientInitialResponse.(OAuthBearerClientInitialResponse.java:75) > at > org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient.evaluateChallenge(OAuthBearerSaslClient.java:99) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7429) Enable dynamic key/truststore update with same filename/password
Rajini Sivaram created KAFKA-7429: - Summary: Enable dynamic key/truststore update with same filename/password Key: KAFKA-7429 URL: https://issues.apache.org/jira/browse/KAFKA-7429 Project: Kafka Issue Type: Improvement Components: security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.1.0 At the moment, SSL keystores and truststores on brokers can be dynamically updated using AdminClient by providing a new keystore or truststore. But we require either the filename or password to be modified to trigger the update. In some scenarios, we may want to perform the update using the same file (and password). So it will be good to provide a way to trigger reload of existing keystores and truststores using the same AdminClient update mechanism. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7437) Store leader epoch in offset commit metadata
[ https://issues.apache.org/jira/browse/KAFKA-7437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7437. --- Resolution: Fixed Reviewer: Rajini Sivaram Fix Version/s: 2.1.0 > Store leader epoch in offset commit metadata > > > Key: KAFKA-7437 > URL: https://issues.apache.org/jira/browse/KAFKA-7437 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.1.0 > > > This patch implements the changes described in KIP-320 for the persistence of > leader epoch information in the offset commit metadata: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7453) Enable idle expiry of connections which are never selected
Rajini Sivaram created KAFKA-7453: - Summary: Enable idle expiry of connections which are never selected Key: KAFKA-7453 URL: https://issues.apache.org/jira/browse/KAFKA-7453 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.0.0, 1.1.1, 1.0.2, 0.11.0.3, 0.10.2.2 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.1.0 We add connections to Selector#channels when a connection is registered, but we start idle expiry of connections only when the connection is first selected. In some environments where the channel may never get selected, this could leak memory and sockets. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7454) Use lazy allocation for SslTransportLayer buffers
Rajini Sivaram created KAFKA-7454: - Summary: Use lazy allocation for SslTransportLayer buffers Key: KAFKA-7454 URL: https://issues.apache.org/jira/browse/KAFKA-7454 Project: Kafka Issue Type: Improvement Components: security Affects Versions: 2.0.0, 1.1.1, 1.0.2, 0.11.0.3 Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.1.0 At the moment, three heap buffers are allocated for SslTransportLayers at the time when the instance is created (before establishing the connection on the client-side and when accepting the connection on the broker-side). When there are a large number of connections and the broker is overloaded, this can result in unnecessary memory pressure on the broker due to client reconnections since buffers may be allocated unnecessarily for client connections whose handshake is never processed. It will be better to lazily allocate buffers to reduce memory pressure. On the broker-side, buffers will be allocated when the first packet is received from the client, starting handshake. On the client-side, buffers will be allocated once the connection is established when the client initiates handshake. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7453) Enable idle expiry of connections which are never selected
[ https://issues.apache.org/jira/browse/KAFKA-7453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram resolved KAFKA-7453. --- Resolution: Fixed Fix Version/s: 2.0.1 1.1.2 > Enable idle expiry of connections which are never selected > -- > > Key: KAFKA-7453 > URL: https://issues.apache.org/jira/browse/KAFKA-7453 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > We add connections to Selector#channels when a connection is registered, but > we start idle expiry of connections only when the connection is first > selected. In some environments where the channel may never get selected, this > could leak memory and sockets. -- This message was sent by Atlassian JIRA (v7.6.3#76005)