[GitHub] [kafka] VJvaLbhYbfr edited a comment on pull request #10792: MINOR: Small refactor of tests

2021-05-30 Thread GitBox


VJvaLbhYbfr edited a comment on pull request #10792:
URL: https://github.com/apache/kafka/pull/10792#issuecomment-850922135






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-30 Thread GitBox


showuon opened a new pull request #10794:
URL: https://github.com/apache/kafka/pull/10794


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-30 Thread GitBox


dengziming commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-850993572


   This is awesome, I also notice that there is no logic about `NOT_CONTROLLER` 
and active controller, I will take some time to test this PR. 👍


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12866) Kafka requires ZK root access even when using a chroot

2021-05-30 Thread Igor Soarez (Jira)
Igor Soarez created KAFKA-12866:
---

 Summary: Kafka requires ZK root access even when using a chroot
 Key: KAFKA-12866
 URL: https://issues.apache.org/jira/browse/KAFKA-12866
 Project: Kafka
  Issue Type: Bug
  Components: core, zkclient
Affects Versions: 2.6.2, 2.7.1, 2.8.0, 2.6.1
Reporter: Igor Soarez


When a Zookeeper chroot is configured, users do not expect Kafka to need 
Zookeeper access outside of that chroot.
h1. Why is this important?

A zookeeper cluster may be shared with other Kafka clusters or even other 
applications. It is an expected security practice to restrict each 
cluster/application's access to it's own Zookeeper chroot.
h1. Steps to reproduce
h2. Zookeeper setup

Using the zkCli, create a chroot for Kafka, make it available to Kafka but lock 
the root znode.

{{ [zk: localhost:2181(CONNECTED) 1] create /somechroot }}
{{ Created /some}}
{{ [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa}}
{{ [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345}}
{{ [zk: localhost:2181(CONNECTED) 4] setAcl / 
digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa}}
h2. Kafka setup

Configure the chroot in broker.properties:

{{zookeeper.connect=localhost:2181/somechroot}}
h2. Expected behavior

The expected behavior here is that Kafka will use the chroot without issues.
h2. Actual result

Kafka fails to start with a fatal exception:

{{ org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = 
NoAuth for /chroot}}
{{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:120)}}
{{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)}}
{{ at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)}}
{{ at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)}}
{{ at 
kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)}}
{{ at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)}}
{{ at 
kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)}}

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12866) Kafka requires ZK root access even when using a chroot

2021-05-30 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-12866:

Description: 
When a Zookeeper chroot is configured, users do not expect Kafka to need 
Zookeeper access outside of that chroot.
h1. Why is this important?

A zookeeper cluster may be shared with other Kafka clusters or even other 
applications. It is an expected security practice to restrict each 
cluster/application's access to it's own Zookeeper chroot.
h1. Steps to reproduce
h2. Zookeeper setup

Using the zkCli, create a chroot for Kafka, make it available to Kafka but lock 
the root znode.

 
{code:java}
[zk: localhost:2181(CONNECTED) 1] create /somechroot
Created /some
[zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa
[zk: localhost:2181(CONNECTED) 3] addauth digest test:12345
[zk: localhost:2181(CONNECTED) 4] setAcl / 
digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code}
 
h2. Kafka setup

Configure the chroot in broker.properties:

 
{code:java}
zookeeper.connect=localhost:2181/somechroot{code}
 

 
h2. Expected behavior

The expected behavior here is that Kafka will use the chroot without issues.
h2. Actual result

Kafka fails to start with a fatal exception:
{code:java}
org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = 
NoAuth for /chroot
at org.apache.zookeeper.KeeperException.create(KeeperException.java:120)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
at 
kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)
at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)
at 
kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)
{code}
 

 

  was:
When a Zookeeper chroot is configured, users do not expect Kafka to need 
Zookeeper access outside of that chroot.
h1. Why is this important?

A zookeeper cluster may be shared with other Kafka clusters or even other 
applications. It is an expected security practice to restrict each 
cluster/application's access to it's own Zookeeper chroot.
h1. Steps to reproduce
h2. Zookeeper setup

Using the zkCli, create a chroot for Kafka, make it available to Kafka but lock 
the root znode.

{{ [zk: localhost:2181(CONNECTED) 1] create /somechroot }}
{{ Created /some}}
{{ [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa}}
{{ [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345}}
{{ [zk: localhost:2181(CONNECTED) 4] setAcl / 
digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa}}
h2. Kafka setup

Configure the chroot in broker.properties:

{{zookeeper.connect=localhost:2181/somechroot}}
h2. Expected behavior

The expected behavior here is that Kafka will use the chroot without issues.
h2. Actual result

Kafka fails to start with a fatal exception:

{{ org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = 
NoAuth for /chroot}}
{{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:120)}}
{{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)}}
{{ at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)}}
{{ at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)}}
{{ at 
kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)}}
{{ at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)}}
{{ at 
kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)}}

 

 


> Kafka requires ZK root access even when using a chroot
> --
>
> Key: KAFKA-12866
> URL: https://issues.apache.org/jira/browse/KAFKA-12866
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 2.6.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Igor Soarez
>Priority: Major
>
> When a Zookeeper chroot is configured, users do not expect Kafka to need 
> Zookeeper access outside of that chroot.
> h1. Why is this important?
> A zookeeper cluster may be shared with other Kafka clusters or even other 
> applications. It is an expected security practice to restrict each 
> cluster/application's access to it's own Zookeeper chroot.
> h1. Steps to reproduce
> h2. Zookeeper setup
> Using the zkCli, create a chroot for Kafka, make it available to Kafka but 
> lock the root znode.
>  
> {code:java}
> [zk: localhost:2181(CONNECTED) 1] create /somechroot
> Created /some
> [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa
> [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345
> [zk: localhost:2181(CONNECTED) 4] setAcl / 
> digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code}
>  

[jira] [Assigned] (KAFKA-12866) Kafka requires ZK root access even when using a chroot

2021-05-30 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-12866:
---

Assignee: Igor Soarez

> Kafka requires ZK root access even when using a chroot
> --
>
> Key: KAFKA-12866
> URL: https://issues.apache.org/jira/browse/KAFKA-12866
> Project: Kafka
>  Issue Type: Bug
>  Components: core, zkclient
>Affects Versions: 2.6.1, 2.8.0, 2.7.1, 2.6.2
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> When a Zookeeper chroot is configured, users do not expect Kafka to need 
> Zookeeper access outside of that chroot.
> h1. Why is this important?
> A zookeeper cluster may be shared with other Kafka clusters or even other 
> applications. It is an expected security practice to restrict each 
> cluster/application's access to it's own Zookeeper chroot.
> h1. Steps to reproduce
> h2. Zookeeper setup
> Using the zkCli, create a chroot for Kafka, make it available to Kafka but 
> lock the root znode.
>  
> {code:java}
> [zk: localhost:2181(CONNECTED) 1] create /somechroot
> Created /some
> [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa
> [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345
> [zk: localhost:2181(CONNECTED) 4] setAcl / 
> digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code}
>  
> h2. Kafka setup
> Configure the chroot in broker.properties:
>  
> {code:java}
> zookeeper.connect=localhost:2181/somechroot{code}
>  
>  
> h2. Expected behavior
> The expected behavior here is that Kafka will use the chroot without issues.
> h2. Actual result
> Kafka fails to start with a fatal exception:
> {code:java}
> org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = 
> NoAuth for /chroot
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:120)
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
> at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)
> at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)
> at 
> kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)
> at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)
> at 
> kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soarez opened a new pull request #10795: KAFKA-12866: Avoid root access to Zookeeper

2021-05-30 Thread GitBox


soarez opened a new pull request #10795:
URL: https://github.com/apache/kafka/pull/10795


   https://issues.apache.org/jira/browse/KAFKA-12866
   
   The broker shouldn't assume create access to the chroot. There are
   deployement scenarios where the chroot is already created is the only
   znode which the broker can access.
   
   To test this, we can use a ZK integration test, and configure zookeeper in 
the same way the issue is reproduced.
   
   1. Create the chroot
   2. Set free access to the chroot
   3. Lock down access to the root znode
   4. Try to connect the KafkaZkClient
   
   It should be a separate `ZooKeeperTestHarness` to avoid leaving the ACL 
changes made to ZK root visible to other tests. 
   
    Rejected alternatives
   
   * Expect `NoAuth` in `KafkaZkClient.createRecursive` and assume `NoAuth` as 
success.
   * Create new configuration to set `createChrootIfNecessary = false` instead 
of the current non configurable default value of true.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095137



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the 
registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+
+// Take these as configs with the respective default values.
+private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L;
+
+private volatile boolean configured = false;
+
+// Using AtomicBoolean instead of volatile as it may encounter 
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+// if the field is read but not updated in a spin loop like in 
#initializeResources() method.
+private final AtomicBoolean close = new AtomicBoolean(false);
+private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+private Thread initializationThread;
+private Time time = Time.SYSTEM;
+private ProducerManager producerManager;
+private ConsumerManager consumerManager;
+
+// This allows to gracefully close this instance using {@link #close()} 
method while there are some pending or new
+// requests calling different methods which use the resources like 
producer/consumer managers.
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+private final RemotePartitionMetadataStore remotePartitionMetadataStore = 
new RemotePartitionMetadataStore();
+private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+
+@Override
+public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+throws RemoteStorageException {
+Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+ensureInitializedAndNotClosed();
+
+// This allows gracefully rejecting the requests while closing of this 
instance is in progress, which triggers
+// closing the producer/consumer manager instances.
+lock.readLock().lock();
+try {
+
+// This me

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095193



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the 
registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+
+// Take these as configs with the respective default values.
+private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L;
+
+private volatile boolean configured = false;
+
+// Using AtomicBoolean instead of volatile as it may encounter 
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+// if the field is read but not updated in a spin loop like in 
#initializeResources() method.
+private final AtomicBoolean close = new AtomicBoolean(false);
+private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+private Thread initializationThread;
+private Time time = Time.SYSTEM;
+private ProducerManager producerManager;
+private ConsumerManager consumerManager;
+
+// This allows to gracefully close this instance using {@link #close()} 
method while there are some pending or new
+// requests calling different methods which use the resources like 
producer/consumer managers.
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+private final RemotePartitionMetadataStore remotePartitionMetadataStore = 
new RemotePartitionMetadataStore();
+private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+
+@Override
+public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+throws RemoteStorageException {
+Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+ensureInitializedAndNotClosed();
+
+// This allows gracefully rejecting the requests while closing of this 
instance is in progress, which triggers
+// closing the producer/consumer manager instances.
+lock.readLock().lock();
+try {
+
+// This me

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095217



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the 
registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+
+// Take these as configs with the respective default values.
+private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L;
+
+private volatile boolean configured = false;
+
+// Using AtomicBoolean instead of volatile as it may encounter 
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+// if the field is read but not updated in a spin loop like in 
#initializeResources() method.
+private final AtomicBoolean close = new AtomicBoolean(false);
+private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+private Thread initializationThread;
+private Time time = Time.SYSTEM;
+private ProducerManager producerManager;
+private ConsumerManager consumerManager;
+
+// This allows to gracefully close this instance using {@link #close()} 
method while there are some pending or new
+// requests calling different methods which use the resources like 
producer/consumer managers.
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+private final RemotePartitionMetadataStore remotePartitionMetadataStore = 
new RemotePartitionMetadataStore();
+private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+
+@Override
+public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+throws RemoteStorageException {
+Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+ensureInitializedAndNotClosed();
+
+// This allows gracefully rejecting the requests while closing of this 
instance is in progress, which triggers
+// closing the producer/consumer manager instances.
+lock.readLock().lock();
+try {
+
+// This me

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095239



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import 
org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * This is the {@link RemoteLogMetadataManager} implementation with storage as 
an internal topic with name {@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
+ * This is used to publish and fetch {@link RemoteLogMetadata} for the 
registered user topic partitions with
+ * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an 
instance of this class and it subscribes
+ * to metadata updates for the registered user topic partitions.
+ */
+public class TopicBasedRemoteLogMetadataManager implements 
RemoteLogMetadataManager {
+private static final Logger log = 
LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class);
+
+// Take these as configs with the respective default values.
+private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L;
+
+private volatile boolean configured = false;
+
+// Using AtomicBoolean instead of volatile as it may encounter 
http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD
+// if the field is read but not updated in a spin loop like in 
#initializeResources() method.
+private final AtomicBoolean close = new AtomicBoolean(false);
+private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+private Thread initializationThread;
+private Time time = Time.SYSTEM;
+private ProducerManager producerManager;
+private ConsumerManager consumerManager;
+
+// This allows to gracefully close this instance using {@link #close()} 
method while there are some pending or new
+// requests calling different methods which use the resources like 
producer/consumer managers.
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+private final RemotePartitionMetadataStore remotePartitionMetadataStore = 
new RemotePartitionMetadataStore();
+private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
+private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
+
+@Override
+public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata 
remoteLogSegmentMetadata)
+throws RemoteStorageException {
+Objects.requireNonNull(remoteLogSegmentMetadata, 
"remoteLogSegmentMetadata can not be null");
+
+ensureInitializedAndNotClosed();
+
+// This allows gracefully rejecting the requests while closing of this 
instance is in progress, which triggers
+// closing the producer/consumer manager instances.
+lock.readLock().lock();
+try {
+
+// This me

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095305



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Set;
+
+public class ConsumerManager implements Closeable {

Review comment:
   I will add that.

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that ard deleted from
+ * this broker which are received through {@link 
#remove

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095350



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that ard deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * 
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+private static final long POLL_INTERVAL_MS = 30L;
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaConsumer consumer;
+private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+private volatile boolean close = false;
+private volatile boolean assignPartitions = false;
+
+private final Object assignPartitionsLock = new Object();
+
+// Remote log metadata topic partitions that consumer is assigned to.
+private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+
+// User topic partitions that this broker is a leader/follower for.
+private Set assignedTopicPartitions = 
Collections.emptySet();
+
+// Map of remote log metadata topic partition to target end offsets to be 
consumed.
+private final Map partitionToTargetEndOffsets = new 
ConcurrentHashMap<>();
+
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+
+public ConsumerTask(KafkaConsumer consumer,
+RemotePartitionMetadataEventHandler

[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095431



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+
+public class RemoteLogMetadataTopicPartitioner {
+public static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataTopicPartitioner.class);
+private final int noOfMetadataTopicPartitions;
+
+public RemoteLogMetadataTopicPartitioner(int noOfMetadataTopicPartitions) {
+this.noOfMetadataTopicPartitions = noOfMetadataTopicPartitions;
+}
+
+public int metadataPartition(TopicIdPartition topicIdPartition) {
+Objects.requireNonNull(topicIdPartition, "TopicPartition can not be 
null");
+
+int partitionNo = 
Utils.toPositive(Utils.murmur2(topicIdPartition.toString().getBytes(StandardCharsets.UTF_8)))

Review comment:
   Good point. I will add that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-30 Thread GitBox


satishd commented on pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#issuecomment-851023730


   Thanks @junrao for your comment. Addressed most of them with the latest 
commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10787: KAFKA-12864: Move queue and timeline into server-common

2021-05-30 Thread GitBox


ijuma commented on pull request #10787:
URL: https://github.com/apache/kafka/pull/10787#issuecomment-851027604


   Thanks for the PR. Can we please include the motivation?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10783: MINOR: Dependency updates around Scala libraries

2021-05-30 Thread GitBox


ijuma commented on pull request #10783:
URL: https://github.com/apache/kafka/pull/10783#issuecomment-851027684


   The snappy update doesn't seem related to Scala. Maybe that should be a 
separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10783: MINOR: Dependency updates around Scala libraries

2021-05-30 Thread GitBox


jlprat commented on pull request #10783:
URL: https://github.com/apache/kafka/pull/10783#issuecomment-851033612


   You are right, I should have skipped it. Fix pushed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat edited a comment on pull request #10783: MINOR: Dependency updates around Scala libraries

2021-05-30 Thread GitBox


jlprat edited a comment on pull request #10783:
URL: https://github.com/apache/kafka/pull/10783#issuecomment-850547775


   Sure thing, let me fetch them and link them here:
   * Spotless: Unfortunately there is no release notes to be found
   * Scala 2.12.14: https://github.com/scala/scala/releases/tag/v2.12.14
   * Scala Logging: 
https://github.com/lightbend/scala-logging/releases/tag/v3.9.3
   * Scala Collection Compat:
 *  https://github.com/scala/scala-collection-compat/releases/tag/v2.3.1
 * https://github.com/scala/scala-collection-compat/releases/tag/v2.3.2
 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.0
 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.1
 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.2
 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.3
 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.4
   * Scala Java8 Compat:
 * https://github.com/scala/scala-java8-compat/releases/tag/v1.0.0-RC1
 * https://github.com/scala/scala-java8-compat/releases/tag/v1.0.0
   * ~Snappy:~ 
 * ~https://github.com/xerial/snappy-java/releases/tag/1.1.8.2~
 * ~https://github.com/xerial/snappy-java/releases/tag/1.1.8.3~
 * ~https://github.com/xerial/snappy-java/releases/tag/1.1.8.4~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bruto1 commented on a change in pull request #10590: KAFKA-5761: support ByteBuffer as value in ProducerRecord and avoid redundant serialization when it's used

2021-05-30 Thread GitBox


bruto1 commented on a change in pull request #10590:
URL: https://github.com/apache/kafka/pull/10590#discussion_r642107164



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -920,23 +923,28 @@ private void throwIfProducerClosed() {
 " to class " + 
producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
 " specified in key.serializer", cce);
 }
-byte[] serializedValue;
-try {
-serializedValue = valueSerializer.serialize(record.topic(), 
record.headers(), record.value());
-} catch (ClassCastException cce) {
-throw new SerializationException("Can't convert value of class 
" + record.value().getClass().getName() +
+
+setReadOnly(record.headers());

Review comment:
   fixed this one
   sorry for making you look for the cause, please have a look at the PR again




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-05-30 Thread GitBox


ijuma commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-851047311


   The results are similar for the ducktape benchmarks since the bottleneck is 
elsewhere. In the PR description, I include the results for a workload that 
shows significant improvement with these changes. Also, the following 
allocation profiles show that the the lz4 buffer allocations dominate trunk and 
are gone in this PR:
   
   trunk:
   
![image](https://user-images.githubusercontent.com/24747/120117066-310dfc80-c140-11eb-8ad4-490e749e2162.png)
   
   this PR:
   
![image](https://user-images.githubusercontent.com/24747/120117071-4b47da80-c140-11eb-9292-8dc586215245.png)
   
   So, I think we can go ahead and merge this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2021-05-30 Thread GitBox


ijuma merged pull request #9229:
URL: https://github.com/apache/kafka/pull/9229


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-30 Thread Marco Lotz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco Lotz reassigned KAFKA-5676:
-

Assignee: (was: Marco Lotz)

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-30 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354118#comment-17354118
 ] 

Marco Lotz commented on KAFKA-5676:
---

[~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this 
ticket. It seems to me that this is tightly coupled in many points (which also 
explains the fair amount of unmerged PRs related to it). Based on the previous 
comments, I found out the following in the code:
 * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make 
it implement the interface, we would need to extend the interface (thus a KIP). 
Specially because the MockStreamsMetrics is used interchangeably with 
StreamsMetricsImpl class. The implementation has tons of extra public methods. 
Including static ones. There is a huge amount of test classes (20+) counting on 
StreamsMetricsImpl behaviours provided by MockStreamsMetrics.

 
 * If we go for removing MockStreamsMetrics and trying to mock 
StreamsMetricsImpl whenever it’s not truly needed:
 * One will find out that many methods in the class are final (which is not a 
limitation since PowerMock is available in Kafka project). But is a sign that 
it may require further design attention. Among them is
{code:java}
public final Sensor taskLevelSensor(...){code}

 * One will uncover that many static final methods are also called for 
StreamsMetricsImpl (that can also be solved by PowerMock). Among them is:
{code:java}
public static void StreamsMetricsImpl.addValueMetricToSensor(...){code}

 * One will find out that many test classes (e.g. StreamTaskTest or 
RecordCollectorTest) would require a considerable amount of behaviour mocking. 
Seems easily 20+ test classes would need considerable amount of mock 
configuration in this scenario. A possible solution is indeed making a mocking 
utils to init mocks of StreamsMetricsImpl - but again, it seems to me that 
there are enough code smells to indicate that something may need to be 
re-evaluated a bit further.

With those points I believe that this ticket needs to be evaluated a bit more 
deeply - since it is likely connected to some considerable tech-debt (e.g. 
final methods mocking, inconsistencies on how the mocking of the metrics is 
performed, high coupling with implementation instead of interface, etc). 
Unfortunately I won’t be able to provide the amount fo rework/redesign that 
this ticket requires right now, maybe a core committer could eventually look 
into it if important?

> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-30 Thread Marco Lotz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354118#comment-17354118
 ] 

Marco Lotz edited comment on KAFKA-5676 at 5/30/21, 10:47 PM:
--

[~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this 
ticket. It seems to me that this is tightly coupled in many points (which also 
explains the fair amount of unmerged PRs related to it). Based on the previous 
comments, I found out the following in the code:
 * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make 
it implement the interface, we would need to extend the interface (thus a KIP). 
Specially because the MockStreamsMetrics is used interchangeably with 
StreamsMetricsImpl class. The implementation has tons of extra public methods. 
Including static ones. There is a huge amount of test classes (20+) counting on 
StreamsMetricsImpl behaviours provided by MockStreamsMetrics.

 
 * If we go for removing MockStreamsMetrics and trying to mock 
StreamsMetricsImpl whenever it’s not truly needed:

 # One will find out that many methods in the class are final (which is not a 
limitation since PowerMock is available in Kafka project). But is a sign that 
it may require further design attention. Among them is
{code:java}
public final Sensor taskLevelSensor(...){code}

 # One will uncover that many static final methods are also called for 
StreamsMetricsImpl (that can also be solved by PowerMock). Among them is:
{code:java}
public static void StreamsMetricsImpl.addValueMetricToSensor(...){code}

 # One will find out that many test classes (e.g. StreamTaskTest or 
RecordCollectorTest) would require a considerable amount of behaviour mocking. 
Seems easily 20+ test classes would need considerable amount of mock 
configuration in this scenario. A possible solution is indeed making a mocking 
utils to init mocks of StreamsMetricsImpl - but again, it seems to me that 
there are enough code smells to indicate that something may need to be 
re-evaluated a bit further.

With those points I believe that this ticket needs to be evaluated a bit more 
deeply - since it is likely connected to some considerable tech-debt (e.g. 
final methods mocking, inconsistencies on how the mocking of the metrics is 
performed, high coupling with implementation instead of interface, etc). 
Unfortunately I won’t be able to provide the amount fo rework/redesign that 
this ticket requires right now, maybe a core committer could eventually look 
into it if important?


was (Author: marcolotz):
[~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this 
ticket. It seems to me that this is tightly coupled in many points (which also 
explains the fair amount of unmerged PRs related to it). Based on the previous 
comments, I found out the following in the code:
 * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make 
it implement the interface, we would need to extend the interface (thus a KIP). 
Specially because the MockStreamsMetrics is used interchangeably with 
StreamsMetricsImpl class. The implementation has tons of extra public methods. 
Including static ones. There is a huge amount of test classes (20+) counting on 
StreamsMetricsImpl behaviours provided by MockStreamsMetrics.

 
 * If we go for removing MockStreamsMetrics and trying to mock 
StreamsMetricsImpl whenever it’s not truly needed:
 * One will find out that many methods in the class are final (which is not a 
limitation since PowerMock is available in Kafka project). But is a sign that 
it may require further design attention. Among them is
{code:java}
public final Sensor taskLevelSensor(...){code}

 * One will uncover that many static final methods are also called for 
StreamsMetricsImpl (that can also be solved by PowerMock). Among them is:
{code:java}
public static void StreamsMetricsImpl.addValueMetricToSensor(...){code}

 * One will find out that many test classes (e.g. StreamTaskTest or 
RecordCollectorTest) would require a considerable amount of behaviour mocking. 
Seems easily 20+ test classes would need considerable amount of mock 
configuration in this scenario. A possible solution is indeed making a mocking 
utils to init mocks of StreamsMetricsImpl - but again, it seems to me that 
there are enough code smells to indicate that something may need to be 
re-evaluated a bit further.

With those points I believe that this ticket needs to be evaluated a bit more 
deeply - since it is likely connected to some considerable tech-debt (e.g. 
final methods mocking, inconsistencies on how the mocking of the metrics is 
performed, high coupling with implementation instead of interface, etc). 
Unfortunately I won’t be able to provide the amount fo rework/redesign that 
this ticket requires right now, maybe a core committer could eventually look 
into it if important?

> MockStreamsMetrics shoul

[GitHub] [kafka] dengziming removed a comment on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure

2021-05-30 Thread GitBox


dengziming removed a comment on pull request #9577:
URL: https://github.com/apache/kafka/pull/9577#issuecomment-833187678


   ping @mumrah to have a look 😉.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-05-30 Thread GitBox


lkokhreidze commented on a change in pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#discussion_r642141626



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java
##
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ClientTagAwareStandbyTaskAssignorTest {

Review comment:
   Will try to figure out more test cases, but those should already be good 
starting point.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Tes

[GitHub] [kafka] lkokhreidze commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-05-30 Thread GitBox


lkokhreidze commented on pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#issuecomment-851076581


   Call for review @vvcephei @cadonna @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure

2021-05-30 Thread GitBox


dengziming commented on pull request #9577:
URL: https://github.com/apache/kafka/pull/9577#issuecomment-851110147


   ping @mumrah @cmccabe , this PR is similar to KIP-497, I wish this can be 
finished before KAFKA 3.0.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal

2021-05-30 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354175#comment-17354175
 ] 

dengziming commented on KAFKA-12465:


[~omnia_h_ibrahim] Thank you for the feedback, we are now trying to design an 
algorithm to decide when to treat `INCONSISTENT_CLUSTER_ID`, some discussions 
are listed here: 
[https://github.com/apache/kafka/pull/10289#discussion_r592853088]

I think [~jagsancio]'s suggestion is a good one, WDYT?

> Decide whether inconsistent cluster id error are fatal
> --
>
> Key: KAFKA-12465
> URL: https://issues.apache.org/jira/browse/KAFKA-12465
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Priority: Major
>
> Currently, we just log an error when an inconsistent cluster-id occurred. We 
> should set a window during startup when these errors are fatal but after that 
> window, we no longer treat them to be fatal. see 
> https://github.com/apache/kafka/pull/10289#discussion_r592853088



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9135) Kafka producer/consumer are creating too many open file

2021-05-30 Thread Duong Pham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354180#comment-17354180
 ] 

Duong Pham commented on KAFKA-9135:
---

you can upgrade Java version, prefer 1.8.0.291

> Kafka producer/consumer are creating too many open file
> ---
>
> Key: KAFKA-9135
> URL: https://issues.apache.org/jira/browse/KAFKA-9135
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, consumer, producer 
>Affects Versions: 1.0.1
> Environment: apache kafka client :- 1.0.1
> Kafka version :- 1.0.1
> Open JDK :- java-1.8.0-openjdk-1.8.0.222.b10-1
> CentOS version :- CentOS Linux release 7.6.1810
>Reporter: Dhirendra Singh
>Priority: Critical
>
> We have a 3 node Kafka cluster deployment with 5 topic and 6 partition per 
> topic . we have configured the replication factor =3 , we are seeing very 
> strange problem that number of file descriptor have been crossed the ulimit ( 
> what is 50K for our application)
> As per the lsof command and our ananlsys
> 1. there are 15K established connection from kafka producer/consumer towards 
> broker and at the same time in thread dump we have observed thousands of 
> entry for kafka 'admin-client-network-thread'
> admin-client-network-thread" #224398 daemon prio=5 os_prio=0 
> tid=0x7f12ca119800 nid=0x5363 runnable [0x7f12c4db8000]
>  java.lang.Thread.State: RUNNABLE
>  at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>  at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>  at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>  at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>  - locked <0x0005e0603238> (a sun.nio.ch.Util$3)
>  - locked <0x0005e0603228> (a java.util.Collections$UnmodifiableSet)
>  - locked <0x0005e0602f08> (a sun.nio.ch.EPollSelectorImpl)
>  at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>  at org.apache.kafka.common.network.Selector.select(Selector.java:672)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:396)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238)
>  - locked <0x0005e0602dc0> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205)
>  at kafka.admin.AdminClient$$anon$1.run(AdminClient.scala:61)
>  at java.lang.Thread.run(Thread.java:748)
> 2. As per the lsof output , We have observed 35K entry for pipe and event poll
> java 5441 app 374r FIFO 0,9 0t0 22415240 pipe
>  java 5441 app 375w FIFO 0,9 0t0 22415240 pipe
>  java 5441 app 376u a_inode 0,10 0 6379 [eventpoll]
>  java 5441 app 377r FIFO 0,9 0t0 2247 pipe
>  java 5441 app 378r FIFO 0,9 0t0 28054726 pipe
>  java 5441 app 379r FIFO 0,9 0t0 22415241 pipe
>  java 5441 app 380w FIFO 0,9 0t0 22415241 pipe
>  java 5441 app 381u a_inode 0,10 0 6379 [eventpoll]
>  java 5441 app 382w FIFO 0,9 0t0 2247 pipe
>  java 5441 app 383u a_inode 0,10 0 6379 [eventpoll]
>  java 5441 app 384u a_inode 0,10 0 6379 [eventpoll]
>  java 5441 app 385r FIFO 0,9 0t0 40216087 pipe
>  java 5441 app 386r FIFO 0,9 0t0 22483470 pipe
> Setup details :- 
>  apache kafka client :- 1.0.1
>  Kafka version :- 1.0.1
>  Open JDK :- java-1.8.0-openjdk-1.8.0.222.b10-1
>  CentOS version :- CentOS Linux release 7.6.1810
> Note :- After restarted VM file descriptor count was able to clear and come 
> to normal count as 1000 then after a few second file descriptor count started 
> to increase and it will reach to 50K (ulimit) after 1 week inIdle scenarion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-30 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642195443



##
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##
@@ -36,10 +37,16 @@
 private final String message;
 
 public static ApiError fromThrowable(Throwable t) {
+Throwable throwableToBeDecode = t;
+// Future will wrap the original exception with completionException, 
which will return unexpected UNKNOWN_SERVER_ERROR.
+if (t instanceof CompletionException) {
+throwableToBeDecode = t.getCause();

Review comment:
   fix 1: unwrap the `CompletionException` to get the original exception 
inside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-30 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642195788



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -350,7 +372,8 @@ class BrokerToControllerRequestThread(
 } else if (response.wasDisconnected()) {
   updateControllerAddress(null)
   requestQueue.putFirst(queueItem)
-} else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
+} else if 
(response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER) ||
+  
maybeCheckNotControllerErrorInsideEnvelopeResponse(queueItem.requestHeader, 
response.responseBody())) {

Review comment:
   fix 2: parse the envelope response to check if `NotControllerError` 
existed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-30 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642196090



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -239,18 +244,34 @@ class BrokerToControllerChannelManagerImpl(
*
* @param request The request to be sent.
* @param callbackRequest completion callback.
+   * @param requestHeader   The request header to be sent, used for parsing 
the envelop response
*/
   def sendRequest(
 request: AbstractRequest.Builder[_ <: AbstractRequest],
-callback: ControllerRequestCompletionHandler
+callback: ControllerRequestCompletionHandler,
+requestHeader: RequestHeader
   ): Unit = {
 requestThread.enqueue(BrokerToControllerQueueItem(
   time.milliseconds(),
   request,
-  callback
+  callback,
+  requestHeader
 ))
   }
 
+  /**
+   * Send request to the controller.
+   *
+   * @param request The request to be sent.
+   * @param callbackRequest completion callback.
+   */
+  def sendRequest(
+   request: AbstractRequest.Builder[_ <: AbstractRequest],
+   callback: ControllerRequestCompletionHandler,

Review comment:
   Use the method overloading because the scala default parameter value 
doesn't work well in Mockito. 
   ref: 
https://stackoverflow.com/questions/32975379/why-mockito-doesnt-handle-default-scala-parameters-properly
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-30 Thread GitBox


showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851153567


   @hachikuji @mumrah @abbccdda @cmccabe , could you help review this PR? I'll 
add tests later. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org