[GitHub] [kafka] VJvaLbhYbfr edited a comment on pull request #10792: MINOR: Small refactor of tests
VJvaLbhYbfr edited a comment on pull request #10792: URL: https://github.com/apache/kafka/pull/10792#issuecomment-850922135 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon opened a new pull request #10794: URL: https://github.com/apache/kafka/pull/10794 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
dengziming commented on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-850993572 This is awesome, I also notice that there is no logic about `NOT_CONTROLLER` and active controller, I will take some time to test this PR. 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12866) Kafka requires ZK root access even when using a chroot
Igor Soarez created KAFKA-12866: --- Summary: Kafka requires ZK root access even when using a chroot Key: KAFKA-12866 URL: https://issues.apache.org/jira/browse/KAFKA-12866 Project: Kafka Issue Type: Bug Components: core, zkclient Affects Versions: 2.6.2, 2.7.1, 2.8.0, 2.6.1 Reporter: Igor Soarez When a Zookeeper chroot is configured, users do not expect Kafka to need Zookeeper access outside of that chroot. h1. Why is this important? A zookeeper cluster may be shared with other Kafka clusters or even other applications. It is an expected security practice to restrict each cluster/application's access to it's own Zookeeper chroot. h1. Steps to reproduce h2. Zookeeper setup Using the zkCli, create a chroot for Kafka, make it available to Kafka but lock the root znode. {{ [zk: localhost:2181(CONNECTED) 1] create /somechroot }} {{ Created /some}} {{ [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa}} {{ [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345}} {{ [zk: localhost:2181(CONNECTED) 4] setAcl / digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa}} h2. Kafka setup Configure the chroot in broker.properties: {{zookeeper.connect=localhost:2181/somechroot}} h2. Expected behavior The expected behavior here is that Kafka will use the chroot without issues. h2. Actual result Kafka fails to start with a fatal exception: {{ org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /chroot}} {{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:120)}} {{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)}} {{ at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)}} {{ at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)}} {{ at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)}} {{ at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)}} {{ at kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12866) Kafka requires ZK root access even when using a chroot
[ https://issues.apache.org/jira/browse/KAFKA-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-12866: Description: When a Zookeeper chroot is configured, users do not expect Kafka to need Zookeeper access outside of that chroot. h1. Why is this important? A zookeeper cluster may be shared with other Kafka clusters or even other applications. It is an expected security practice to restrict each cluster/application's access to it's own Zookeeper chroot. h1. Steps to reproduce h2. Zookeeper setup Using the zkCli, create a chroot for Kafka, make it available to Kafka but lock the root znode. {code:java} [zk: localhost:2181(CONNECTED) 1] create /somechroot Created /some [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345 [zk: localhost:2181(CONNECTED) 4] setAcl / digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code} h2. Kafka setup Configure the chroot in broker.properties: {code:java} zookeeper.connect=localhost:2181/somechroot{code} h2. Expected behavior The expected behavior here is that Kafka will use the chroot without issues. h2. Actual result Kafka fails to start with a fatal exception: {code:java} org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /chroot at org.apache.zookeeper.KeeperException.create(KeeperException.java:120) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583) at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729) at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627) at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957) at kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60) {code} was: When a Zookeeper chroot is configured, users do not expect Kafka to need Zookeeper access outside of that chroot. h1. Why is this important? A zookeeper cluster may be shared with other Kafka clusters or even other applications. It is an expected security practice to restrict each cluster/application's access to it's own Zookeeper chroot. h1. Steps to reproduce h2. Zookeeper setup Using the zkCli, create a chroot for Kafka, make it available to Kafka but lock the root znode. {{ [zk: localhost:2181(CONNECTED) 1] create /somechroot }} {{ Created /some}} {{ [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa}} {{ [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345}} {{ [zk: localhost:2181(CONNECTED) 4] setAcl / digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa}} h2. Kafka setup Configure the chroot in broker.properties: {{zookeeper.connect=localhost:2181/somechroot}} h2. Expected behavior The expected behavior here is that Kafka will use the chroot without issues. h2. Actual result Kafka fails to start with a fatal exception: {{ org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /chroot}} {{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:120)}} {{ at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)}} {{ at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583)}} {{ at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729)}} {{ at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627)}} {{ at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957)}} {{ at kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60)}} > Kafka requires ZK root access even when using a chroot > -- > > Key: KAFKA-12866 > URL: https://issues.apache.org/jira/browse/KAFKA-12866 > Project: Kafka > Issue Type: Bug > Components: core, zkclient >Affects Versions: 2.6.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: Igor Soarez >Priority: Major > > When a Zookeeper chroot is configured, users do not expect Kafka to need > Zookeeper access outside of that chroot. > h1. Why is this important? > A zookeeper cluster may be shared with other Kafka clusters or even other > applications. It is an expected security practice to restrict each > cluster/application's access to it's own Zookeeper chroot. > h1. Steps to reproduce > h2. Zookeeper setup > Using the zkCli, create a chroot for Kafka, make it available to Kafka but > lock the root znode. > > {code:java} > [zk: localhost:2181(CONNECTED) 1] create /somechroot > Created /some > [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa > [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345 > [zk: localhost:2181(CONNECTED) 4] setAcl / > digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code} >
[jira] [Assigned] (KAFKA-12866) Kafka requires ZK root access even when using a chroot
[ https://issues.apache.org/jira/browse/KAFKA-12866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-12866: --- Assignee: Igor Soarez > Kafka requires ZK root access even when using a chroot > -- > > Key: KAFKA-12866 > URL: https://issues.apache.org/jira/browse/KAFKA-12866 > Project: Kafka > Issue Type: Bug > Components: core, zkclient >Affects Versions: 2.6.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > When a Zookeeper chroot is configured, users do not expect Kafka to need > Zookeeper access outside of that chroot. > h1. Why is this important? > A zookeeper cluster may be shared with other Kafka clusters or even other > applications. It is an expected security practice to restrict each > cluster/application's access to it's own Zookeeper chroot. > h1. Steps to reproduce > h2. Zookeeper setup > Using the zkCli, create a chroot for Kafka, make it available to Kafka but > lock the root znode. > > {code:java} > [zk: localhost:2181(CONNECTED) 1] create /somechroot > Created /some > [zk: localhost:2181(CONNECTED) 2] setAcl /somechroot world:anyone:cdrwa > [zk: localhost:2181(CONNECTED) 3] addauth digest test:12345 > [zk: localhost:2181(CONNECTED) 4] setAcl / > digest:test:Mx1uO9GLtm1qaVAQ20Vh9ODgACg=:cdrwa{code} > > h2. Kafka setup > Configure the chroot in broker.properties: > > {code:java} > zookeeper.connect=localhost:2181/somechroot{code} > > > h2. Expected behavior > The expected behavior here is that Kafka will use the chroot without issues. > h2. Actual result > Kafka fails to start with a fatal exception: > {code:java} > org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = > NoAuth for /chroot > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:120) > at > org.apache.zookeeper.KeeperException.create(KeeperException.java:54) > at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583) > at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729) > at > kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627) > at kafka.zk.KafkaZkClient$.apply(KafkaZkClient.scala:1957) > at > kafka.zk.ZkClientAclTest.testChrootExistsAndRootIsLocked(ZkClientAclTest.scala:60) > {code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] soarez opened a new pull request #10795: KAFKA-12866: Avoid root access to Zookeeper
soarez opened a new pull request #10795: URL: https://github.com/apache/kafka/pull/10795 https://issues.apache.org/jira/browse/KAFKA-12866 The broker shouldn't assume create access to the chroot. There are deployement scenarios where the chroot is already created is the only znode which the broker can access. To test this, we can use a ZK integration test, and configure zookeeper in the same way the issue is reproduced. 1. Create the chroot 2. Set free access to the chroot 3. Lock down access to the root znode 4. Try to connect the KafkaZkClient It should be a separate `ZooKeeperTestHarness` to avoid leaving the ACL changes made to ZK root visible to other tests. Rejected alternatives * Expect `NoAuth` in `KafkaZkClient.createRecursive` and assume `NoAuth` as success. * Create new configuration to set `createChrootIfNecessary = false` instead of the current non configurable default value of true. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095137 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. + * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with + * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes + * to metadata updates for the registered user topic partitions. + */ +public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); + +// Take these as configs with the respective default values. +private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L; + +private volatile boolean configured = false; + +// Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD +// if the field is read but not updated in a spin loop like in #initializeResources() method. +private final AtomicBoolean close = new AtomicBoolean(false); +private final AtomicBoolean initialized = new AtomicBoolean(false); + +private Thread initializationThread; +private Time time = Time.SYSTEM; +private ProducerManager producerManager; +private ConsumerManager consumerManager; + +// This allows to gracefully close this instance using {@link #close()} method while there are some pending or new +// requests calling different methods which use the resources like producer/consumer managers. +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore(); +private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; +private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner; + +@Override +public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) +throws RemoteStorageException { +Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); + +ensureInitializedAndNotClosed(); + +// This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers +// closing the producer/consumer manager instances. +lock.readLock().lock(); +try { + +// This me
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095193 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. + * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with + * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes + * to metadata updates for the registered user topic partitions. + */ +public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); + +// Take these as configs with the respective default values. +private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L; + +private volatile boolean configured = false; + +// Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD +// if the field is read but not updated in a spin loop like in #initializeResources() method. +private final AtomicBoolean close = new AtomicBoolean(false); +private final AtomicBoolean initialized = new AtomicBoolean(false); + +private Thread initializationThread; +private Time time = Time.SYSTEM; +private ProducerManager producerManager; +private ConsumerManager consumerManager; + +// This allows to gracefully close this instance using {@link #close()} method while there are some pending or new +// requests calling different methods which use the resources like producer/consumer managers. +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore(); +private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; +private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner; + +@Override +public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) +throws RemoteStorageException { +Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); + +ensureInitializedAndNotClosed(); + +// This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers +// closing the producer/consumer manager instances. +lock.readLock().lock(); +try { + +// This me
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095217 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. + * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with + * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes + * to metadata updates for the registered user topic partitions. + */ +public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); + +// Take these as configs with the respective default values. +private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L; + +private volatile boolean configured = false; + +// Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD +// if the field is read but not updated in a spin loop like in #initializeResources() method. +private final AtomicBoolean close = new AtomicBoolean(false); +private final AtomicBoolean initialized = new AtomicBoolean(false); + +private Thread initializationThread; +private Time time = Time.SYSTEM; +private ProducerManager producerManager; +private ConsumerManager consumerManager; + +// This allows to gracefully close this instance using {@link #close()} method while there are some pending or new +// requests calling different methods which use the resources like producer/consumer managers. +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore(); +private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; +private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner; + +@Override +public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) +throws RemoteStorageException { +Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); + +ensureInitializedAndNotClosed(); + +// This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers +// closing the producer/consumer manager instances. +lock.readLock().lock(); +try { + +// This me
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095239 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}. + * This is used to publish and fetch {@link RemoteLogMetadata} for the registered user topic partitions with + * {@link #onPartitionLeadershipChanges(Set, Set)}. Each broker will have an instance of this class and it subscribes + * to metadata updates for the registered user topic partitions. + */ +public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataManager { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManager.class); + +// Take these as configs with the respective default values. +private static final long INITIALIZATION_RETRY_INTERVAL_MS = 30_000L; + +private volatile boolean configured = false; + +// Using AtomicBoolean instead of volatile as it may encounter http://findbugs.sourceforge.net/bugDescriptions.html#SP_SPIN_ON_FIELD +// if the field is read but not updated in a spin loop like in #initializeResources() method. +private final AtomicBoolean close = new AtomicBoolean(false); +private final AtomicBoolean initialized = new AtomicBoolean(false); + +private Thread initializationThread; +private Time time = Time.SYSTEM; +private ProducerManager producerManager; +private ConsumerManager consumerManager; + +// This allows to gracefully close this instance using {@link #close()} method while there are some pending or new +// requests calling different methods which use the resources like producer/consumer managers. +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + +private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore(); +private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; +private RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner; + +@Override +public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) +throws RemoteStorageException { +Objects.requireNonNull(remoteLogSegmentMetadata, "remoteLogSegmentMetadata can not be null"); + +ensureInitializedAndNotClosed(); + +// This allows gracefully rejecting the requests while closing of this instance is in progress, which triggers +// closing the producer/consumer manager instances. +lock.readLock().lock(); +try { + +// This me
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095305 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; + +public class ConsumerManager implements Closeable { Review comment: I will add that. ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that ard deleted from + * this broker which are received through {@link #remove
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095350 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that ard deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { +private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + +private static final long POLL_INTERVAL_MS = 30L; + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaConsumer consumer; +private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; + +private volatile boolean close = false; +private volatile boolean assignPartitions = false; + +private final Object assignPartitionsLock = new Object(); + +// Remote log metadata topic partitions that consumer is assigned to. +private volatile Set assignedMetaPartitions = Collections.emptySet(); + +// User topic partitions that this broker is a leader/follower for. +private Set assignedTopicPartitions = Collections.emptySet(); + +// Map of remote log metadata topic partition to target end offsets to be consumed. +private final Map partitionToTargetEndOffsets = new ConcurrentHashMap<>(); + +// Map of remote log metadata topic partition to consumed offsets. +private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); + +public ConsumerTask(KafkaConsumer consumer, +RemotePartitionMetadataEventHandler
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095431 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java ## @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +public class RemoteLogMetadataTopicPartitioner { +public static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataTopicPartitioner.class); +private final int noOfMetadataTopicPartitions; + +public RemoteLogMetadataTopicPartitioner(int noOfMetadataTopicPartitions) { +this.noOfMetadataTopicPartitions = noOfMetadataTopicPartitions; +} + +public int metadataPartition(TopicIdPartition topicIdPartition) { +Objects.requireNonNull(topicIdPartition, "TopicPartition can not be null"); + +int partitionNo = Utils.toPositive(Utils.murmur2(topicIdPartition.toString().getBytes(StandardCharsets.UTF_8))) Review comment: Good point. I will add that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on pull request #10579: URL: https://github.com/apache/kafka/pull/10579#issuecomment-851023730 Thanks @junrao for your comment. Addressed most of them with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10787: KAFKA-12864: Move queue and timeline into server-common
ijuma commented on pull request #10787: URL: https://github.com/apache/kafka/pull/10787#issuecomment-851027604 Thanks for the PR. Can we please include the motivation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10783: MINOR: Dependency updates around Scala libraries
ijuma commented on pull request #10783: URL: https://github.com/apache/kafka/pull/10783#issuecomment-851027684 The snappy update doesn't seem related to Scala. Maybe that should be a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10783: MINOR: Dependency updates around Scala libraries
jlprat commented on pull request #10783: URL: https://github.com/apache/kafka/pull/10783#issuecomment-851033612 You are right, I should have skipped it. Fix pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat edited a comment on pull request #10783: MINOR: Dependency updates around Scala libraries
jlprat edited a comment on pull request #10783: URL: https://github.com/apache/kafka/pull/10783#issuecomment-850547775 Sure thing, let me fetch them and link them here: * Spotless: Unfortunately there is no release notes to be found * Scala 2.12.14: https://github.com/scala/scala/releases/tag/v2.12.14 * Scala Logging: https://github.com/lightbend/scala-logging/releases/tag/v3.9.3 * Scala Collection Compat: * https://github.com/scala/scala-collection-compat/releases/tag/v2.3.1 * https://github.com/scala/scala-collection-compat/releases/tag/v2.3.2 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.0 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.1 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.2 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.3 * https://github.com/scala/scala-collection-compat/releases/tag/v2.4.4 * Scala Java8 Compat: * https://github.com/scala/scala-java8-compat/releases/tag/v1.0.0-RC1 * https://github.com/scala/scala-java8-compat/releases/tag/v1.0.0 * ~Snappy:~ * ~https://github.com/xerial/snappy-java/releases/tag/1.1.8.2~ * ~https://github.com/xerial/snappy-java/releases/tag/1.1.8.3~ * ~https://github.com/xerial/snappy-java/releases/tag/1.1.8.4~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bruto1 commented on a change in pull request #10590: KAFKA-5761: support ByteBuffer as value in ProducerRecord and avoid redundant serialization when it's used
bruto1 commented on a change in pull request #10590: URL: https://github.com/apache/kafka/pull/10590#discussion_r642107164 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -920,23 +923,28 @@ private void throwIfProducerClosed() { " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } -byte[] serializedValue; -try { -serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); -} catch (ClassCastException cce) { -throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + + +setReadOnly(record.headers()); Review comment: fixed this one sorry for making you look for the cause, please have a look at the PR again -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma commented on pull request #9229: URL: https://github.com/apache/kafka/pull/9229#issuecomment-851047311 The results are similar for the ducktape benchmarks since the bottleneck is elsewhere. In the PR description, I include the results for a workload that shows significant improvement with these changes. Also, the following allocation profiles show that the the lz4 buffer allocations dominate trunk and are gone in this PR: trunk:  this PR:  So, I think we can go ahead and merge this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9229: MINOR: Reduce allocations in requests via buffer caching
ijuma merged pull request #9229: URL: https://github.com/apache/kafka/pull/9229 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test
[ https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marco Lotz reassigned KAFKA-5676: - Assignee: (was: Marco Lotz) > MockStreamsMetrics should be in o.a.k.test > -- > > Key: KAFKA-5676 > URL: https://issues.apache.org/jira/browse/KAFKA-5676 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: newbie > Time Spent: 96h > Remaining Estimate: 0h > > {{MockStreamsMetrics}}'s package should be `o.a.k.test` not > `o.a.k.streams.processor.internals`. > In addition, it should not require a {{Metrics}} parameter in its constructor > as it is only needed for its extended base class; the right way of mocking > should be implementing {{StreamsMetrics}} with mock behavior than extended a > real implementaion of {{StreamsMetricsImpl}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test
[ https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354118#comment-17354118 ] Marco Lotz commented on KAFKA-5676: --- [~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this ticket. It seems to me that this is tightly coupled in many points (which also explains the fair amount of unmerged PRs related to it). Based on the previous comments, I found out the following in the code: * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make it implement the interface, we would need to extend the interface (thus a KIP). Specially because the MockStreamsMetrics is used interchangeably with StreamsMetricsImpl class. The implementation has tons of extra public methods. Including static ones. There is a huge amount of test classes (20+) counting on StreamsMetricsImpl behaviours provided by MockStreamsMetrics. * If we go for removing MockStreamsMetrics and trying to mock StreamsMetricsImpl whenever it’s not truly needed: * One will find out that many methods in the class are final (which is not a limitation since PowerMock is available in Kafka project). But is a sign that it may require further design attention. Among them is {code:java} public final Sensor taskLevelSensor(...){code} * One will uncover that many static final methods are also called for StreamsMetricsImpl (that can also be solved by PowerMock). Among them is: {code:java} public static void StreamsMetricsImpl.addValueMetricToSensor(...){code} * One will find out that many test classes (e.g. StreamTaskTest or RecordCollectorTest) would require a considerable amount of behaviour mocking. Seems easily 20+ test classes would need considerable amount of mock configuration in this scenario. A possible solution is indeed making a mocking utils to init mocks of StreamsMetricsImpl - but again, it seems to me that there are enough code smells to indicate that something may need to be re-evaluated a bit further. With those points I believe that this ticket needs to be evaluated a bit more deeply - since it is likely connected to some considerable tech-debt (e.g. final methods mocking, inconsistencies on how the mocking of the metrics is performed, high coupling with implementation instead of interface, etc). Unfortunately I won’t be able to provide the amount fo rework/redesign that this ticket requires right now, maybe a core committer could eventually look into it if important? > MockStreamsMetrics should be in o.a.k.test > -- > > Key: KAFKA-5676 > URL: https://issues.apache.org/jira/browse/KAFKA-5676 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Marco Lotz >Priority: Major > Labels: newbie > Time Spent: 96h > Remaining Estimate: 0h > > {{MockStreamsMetrics}}'s package should be `o.a.k.test` not > `o.a.k.streams.processor.internals`. > In addition, it should not require a {{Metrics}} parameter in its constructor > as it is only needed for its extended base class; the right way of mocking > should be implementing {{StreamsMetrics}} with mock behavior than extended a > real implementaion of {{StreamsMetricsImpl}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test
[ https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354118#comment-17354118 ] Marco Lotz edited comment on KAFKA-5676 at 5/30/21, 10:47 PM: -- [~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this ticket. It seems to me that this is tightly coupled in many points (which also explains the fair amount of unmerged PRs related to it). Based on the previous comments, I found out the following in the code: * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make it implement the interface, we would need to extend the interface (thus a KIP). Specially because the MockStreamsMetrics is used interchangeably with StreamsMetricsImpl class. The implementation has tons of extra public methods. Including static ones. There is a huge amount of test classes (20+) counting on StreamsMetricsImpl behaviours provided by MockStreamsMetrics. * If we go for removing MockStreamsMetrics and trying to mock StreamsMetricsImpl whenever it’s not truly needed: # One will find out that many methods in the class are final (which is not a limitation since PowerMock is available in Kafka project). But is a sign that it may require further design attention. Among them is {code:java} public final Sensor taskLevelSensor(...){code} # One will uncover that many static final methods are also called for StreamsMetricsImpl (that can also be solved by PowerMock). Among them is: {code:java} public static void StreamsMetricsImpl.addValueMetricToSensor(...){code} # One will find out that many test classes (e.g. StreamTaskTest or RecordCollectorTest) would require a considerable amount of behaviour mocking. Seems easily 20+ test classes would need considerable amount of mock configuration in this scenario. A possible solution is indeed making a mocking utils to init mocks of StreamsMetricsImpl - but again, it seems to me that there are enough code smells to indicate that something may need to be re-evaluated a bit further. With those points I believe that this ticket needs to be evaluated a bit more deeply - since it is likely connected to some considerable tech-debt (e.g. final methods mocking, inconsistencies on how the mocking of the metrics is performed, high coupling with implementation instead of interface, etc). Unfortunately I won’t be able to provide the amount fo rework/redesign that this ticket requires right now, maybe a core committer could eventually look into it if important? was (Author: marcolotz): [~mjsax] [~cadonna] I have spent a reasonable amount of time analysing this ticket. It seems to me that this is tightly coupled in many points (which also explains the fair amount of unmerged PRs related to it). Based on the previous comments, I found out the following in the code: * As [~chia7712] mentioned, if we keep the MockStreamsMetrics class and make it implement the interface, we would need to extend the interface (thus a KIP). Specially because the MockStreamsMetrics is used interchangeably with StreamsMetricsImpl class. The implementation has tons of extra public methods. Including static ones. There is a huge amount of test classes (20+) counting on StreamsMetricsImpl behaviours provided by MockStreamsMetrics. * If we go for removing MockStreamsMetrics and trying to mock StreamsMetricsImpl whenever it’s not truly needed: * One will find out that many methods in the class are final (which is not a limitation since PowerMock is available in Kafka project). But is a sign that it may require further design attention. Among them is {code:java} public final Sensor taskLevelSensor(...){code} * One will uncover that many static final methods are also called for StreamsMetricsImpl (that can also be solved by PowerMock). Among them is: {code:java} public static void StreamsMetricsImpl.addValueMetricToSensor(...){code} * One will find out that many test classes (e.g. StreamTaskTest or RecordCollectorTest) would require a considerable amount of behaviour mocking. Seems easily 20+ test classes would need considerable amount of mock configuration in this scenario. A possible solution is indeed making a mocking utils to init mocks of StreamsMetricsImpl - but again, it seems to me that there are enough code smells to indicate that something may need to be re-evaluated a bit further. With those points I believe that this ticket needs to be evaluated a bit more deeply - since it is likely connected to some considerable tech-debt (e.g. final methods mocking, inconsistencies on how the mocking of the metrics is performed, high coupling with implementation instead of interface, etc). Unfortunately I won’t be able to provide the amount fo rework/redesign that this ticket requires right now, maybe a core committer could eventually look into it if important? > MockStreamsMetrics shoul
[GitHub] [kafka] dengziming removed a comment on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure
dengziming removed a comment on pull request #9577: URL: https://github.com/apache/kafka/pull/9577#issuecomment-833187678 ping @mumrah to have a look 😉. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10785: KIP-708 / A Rack awareness for Kafka Streams
lkokhreidze commented on a change in pull request #10785: URL: https://github.com/apache/kafka/pull/10785#discussion_r642141626 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignorTest.java ## @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_0; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_1; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_1_2; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ClientTagAwareStandbyTaskAssignorTest { Review comment: Will try to figure out more test cases, but those should already be good starting point. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RackAwarenessIntegrationTest.java ## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.Tes
[GitHub] [kafka] lkokhreidze commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams
lkokhreidze commented on pull request #10785: URL: https://github.com/apache/kafka/pull/10785#issuecomment-851076581 Call for review @vvcephei @cadonna @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure
dengziming commented on pull request #9577: URL: https://github.com/apache/kafka/pull/9577#issuecomment-851110147 ping @mumrah @cmccabe , this PR is similar to KIP-497, I wish this can be finished before KAFKA 3.0.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal
[ https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354175#comment-17354175 ] dengziming commented on KAFKA-12465: [~omnia_h_ibrahim] Thank you for the feedback, we are now trying to design an algorithm to decide when to treat `INCONSISTENT_CLUSTER_ID`, some discussions are listed here: [https://github.com/apache/kafka/pull/10289#discussion_r592853088] I think [~jagsancio]'s suggestion is a good one, WDYT? > Decide whether inconsistent cluster id error are fatal > -- > > Key: KAFKA-12465 > URL: https://issues.apache.org/jira/browse/KAFKA-12465 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Priority: Major > > Currently, we just log an error when an inconsistent cluster-id occurred. We > should set a window during startup when these errors are fatal but after that > window, we no longer treat them to be fatal. see > https://github.com/apache/kafka/pull/10289#discussion_r592853088 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9135) Kafka producer/consumer are creating too many open file
[ https://issues.apache.org/jira/browse/KAFKA-9135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354180#comment-17354180 ] Duong Pham commented on KAFKA-9135: --- you can upgrade Java version, prefer 1.8.0.291 > Kafka producer/consumer are creating too many open file > --- > > Key: KAFKA-9135 > URL: https://issues.apache.org/jira/browse/KAFKA-9135 > Project: Kafka > Issue Type: Bug > Components: admin, consumer, producer >Affects Versions: 1.0.1 > Environment: apache kafka client :- 1.0.1 > Kafka version :- 1.0.1 > Open JDK :- java-1.8.0-openjdk-1.8.0.222.b10-1 > CentOS version :- CentOS Linux release 7.6.1810 >Reporter: Dhirendra Singh >Priority: Critical > > We have a 3 node Kafka cluster deployment with 5 topic and 6 partition per > topic . we have configured the replication factor =3 , we are seeing very > strange problem that number of file descriptor have been crossed the ulimit ( > what is 50K for our application) > As per the lsof command and our ananlsys > 1. there are 15K established connection from kafka producer/consumer towards > broker and at the same time in thread dump we have observed thousands of > entry for kafka 'admin-client-network-thread' > admin-client-network-thread" #224398 daemon prio=5 os_prio=0 > tid=0x7f12ca119800 nid=0x5363 runnable [0x7f12c4db8000] > java.lang.Thread.State: RUNNABLE > at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x0005e0603238> (a sun.nio.ch.Util$3) > - locked <0x0005e0603228> (a java.util.Collections$UnmodifiableSet) > - locked <0x0005e0602f08> (a sun.nio.ch.EPollSelectorImpl) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at org.apache.kafka.common.network.Selector.select(Selector.java:672) > at org.apache.kafka.common.network.Selector.poll(Selector.java:396) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:238) > - locked <0x0005e0602dc0> (a > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:205) > at kafka.admin.AdminClient$$anon$1.run(AdminClient.scala:61) > at java.lang.Thread.run(Thread.java:748) > 2. As per the lsof output , We have observed 35K entry for pipe and event poll > java 5441 app 374r FIFO 0,9 0t0 22415240 pipe > java 5441 app 375w FIFO 0,9 0t0 22415240 pipe > java 5441 app 376u a_inode 0,10 0 6379 [eventpoll] > java 5441 app 377r FIFO 0,9 0t0 2247 pipe > java 5441 app 378r FIFO 0,9 0t0 28054726 pipe > java 5441 app 379r FIFO 0,9 0t0 22415241 pipe > java 5441 app 380w FIFO 0,9 0t0 22415241 pipe > java 5441 app 381u a_inode 0,10 0 6379 [eventpoll] > java 5441 app 382w FIFO 0,9 0t0 2247 pipe > java 5441 app 383u a_inode 0,10 0 6379 [eventpoll] > java 5441 app 384u a_inode 0,10 0 6379 [eventpoll] > java 5441 app 385r FIFO 0,9 0t0 40216087 pipe > java 5441 app 386r FIFO 0,9 0t0 22483470 pipe > Setup details :- > apache kafka client :- 1.0.1 > Kafka version :- 1.0.1 > Open JDK :- java-1.8.0-openjdk-1.8.0.222.b10-1 > CentOS version :- CentOS Linux release 7.6.1810 > Note :- After restarted VM file descriptor count was able to clear and come > to normal count as 1000 then after a few second file descriptor count started > to increase and it will reach to 50K (ulimit) after 1 week inIdle scenarion. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642195443 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ## @@ -36,10 +37,16 @@ private final String message; public static ApiError fromThrowable(Throwable t) { +Throwable throwableToBeDecode = t; +// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. +if (t instanceof CompletionException) { +throwableToBeDecode = t.getCause(); Review comment: fix 1: unwrap the `CompletionException` to get the original exception inside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642195788 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -350,7 +372,8 @@ class BrokerToControllerRequestThread( } else if (response.wasDisconnected()) { updateControllerAddress(null) requestQueue.putFirst(queueItem) -} else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) { +} else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER) || + maybeCheckNotControllerErrorInsideEnvelopeResponse(queueItem.requestHeader, response.responseBody())) { Review comment: fix 2: parse the envelope response to check if `NotControllerError` existed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642196090 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -239,18 +244,34 @@ class BrokerToControllerChannelManagerImpl( * * @param request The request to be sent. * @param callbackRequest completion callback. + * @param requestHeader The request header to be sent, used for parsing the envelop response */ def sendRequest( request: AbstractRequest.Builder[_ <: AbstractRequest], -callback: ControllerRequestCompletionHandler +callback: ControllerRequestCompletionHandler, +requestHeader: RequestHeader ): Unit = { requestThread.enqueue(BrokerToControllerQueueItem( time.milliseconds(), request, - callback + callback, + requestHeader )) } + /** + * Send request to the controller. + * + * @param request The request to be sent. + * @param callbackRequest completion callback. + */ + def sendRequest( + request: AbstractRequest.Builder[_ <: AbstractRequest], + callback: ControllerRequestCompletionHandler, Review comment: Use the method overloading because the scala default parameter value doesn't work well in Mockito. ref: https://stackoverflow.com/questions/32975379/why-mockito-doesnt-handle-default-scala-parameters-properly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10794: [WIP]KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-851153567 @hachikuji @mumrah @abbccdda @cmccabe , could you help review this PR? I'll add tests later. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org