[jira] [Updated] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky updated KAFKA-12682: -- Description: In version 2.8, MetadataPartitionsBuilder has the field _localChanged and _localRemoved which record the change and delete partition, but we always process _localChanged partitions, and then _localRemoved in the kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the original order, for example, 1. migrate the partition p1 from b0 to b1; 2. change the leader of p1 3.migrate p1 from b1 to b0 and the _localRemoved will delete the p1 at last. and I think MetadataPartition should include topic uuid, and the topic name is optional, the topic name only can find the partition by the for example, create topic t1, delete topic t1, create topic t1, change leader of p1 and then compact the records create topic t1, delete topic t1, change t1, p1 but currently, implementation will be 1. process change t1, p1 2. process delete topic t1 but the MetadataPartition doesn't include topic uuid, it only include topic name, when to stop, it can't find the origin topic id, and find the latest the topic id, but it's not right. was: In version 2.8, MetadataPartitionsBuilder has the field _localChanged and _localRemoved which record the change and delete partition, but we always process _localChanged partitions, and then _localRemoved in the kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the original order, for example, 1. migrate the partition p1 from b0 to b1; 2. change the leader of p1 3.migrate p1 from b1 to b0 and the _localRemoved will delete the p1 at last. and I think MetadataPartition should include topic uuid, and the topic name is optional。 for example, create topic t1, delete topic t1, create topic t1, change leader of p1 and then compact the records create topic t1, delete topic t1, change t1, p1 but currently, implementation will be 1. process change t1, p1 2. process delete topic t1 but the MetadataPartition doesn't include topic uuid, it only include topic name, when to stop, it can't find the origin topic id, and find the latest the topic id, but it's not right. > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional, the topic name only can find the partition by the > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > create topic t1, delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only include topic > name, when to stop, it can't find the origin topic id, and find the latest > the topic id, but it's not right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky updated KAFKA-12682: -- Description: In version 2.8, MetadataPartitionsBuilder has the field _localChanged and _localRemoved which record the change and delete partition, but we always process _localChanged partitions, and then _localRemoved in the kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the original order, for example, 1. migrate the partition p1 from b0 to b1; 2. change the leader of p1 3.migrate p1 from b1 to b0 and the _localRemoved will delete the p1 at last. and I think MetadataPartition should include topic uuid, and the topic name is optional, the topic name only can find the partition by the for example, create topic t1, delete topic t1, create topic t1, change leader of p1 and then compact the records delete topic t1, change t1, p1 but currently, implementation will be 1. process change t1, p1 2. process delete topic t1 but the MetadataPartition doesn't include topic uuid, it only include topic name, when to stop, it can't find the origin topic id, and find the latest the topic id, but it's not right. was: In version 2.8, MetadataPartitionsBuilder has the field _localChanged and _localRemoved which record the change and delete partition, but we always process _localChanged partitions, and then _localRemoved in the kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the original order, for example, 1. migrate the partition p1 from b0 to b1; 2. change the leader of p1 3.migrate p1 from b1 to b0 and the _localRemoved will delete the p1 at last. and I think MetadataPartition should include topic uuid, and the topic name is optional, the topic name only can find the partition by the for example, create topic t1, delete topic t1, create topic t1, change leader of p1 and then compact the records create topic t1, delete topic t1, change t1, p1 but currently, implementation will be 1. process change t1, p1 2. process delete topic t1 but the MetadataPartition doesn't include topic uuid, it only include topic name, when to stop, it can't find the origin topic id, and find the latest the topic id, but it's not right. > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional, the topic name only can find the partition by the > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only include topic > name, when to stop, it can't find the origin topic id, and find the latest > the topic id, but it's not right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r615368480 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerdes.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.Message; + +import java.nio.ByteBuffer; + +/** + * The default implementation of {@link org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager} stores all + * the remote log metadata in an internal topic. Those messages can be {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata}, + * {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate}, or {@link org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata}. + * These messages are written in Kafka's protocol message format as mentioned in + * https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat";>KIP-405 + * + * This interface is about serializing and deserializing these messages that are stored in remote log metadata internal + * topic. There are respective implementations for the mentioned message types. + * + * @param metadata type to be serialized/deserialized. + * + * @see RemoteLogSegmentMetadataSerdes + * @see RemoteLogSegmentMetadataUpdateSerdes + * @see RemotePartitionDeleteMetadataSerdes + */ +public interface RemoteLogMetadataSerdes { + +/** + * Returns the message serialized for the given {@code metadata} object. + * + * @param metadata object to be serialized. + */ +Message serialize(T metadata); Review comment: Good point. I will refactor serialize/deserialize to return/take ByteBuffer respectively. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r615369347 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataContext.java ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import java.util.Objects; + +/** + * The context associated with the record in remote log metadata topic. This contains api-key, version and the + * payload object. + * + * + * For example: + * Remote log segment metadata record will have + * + * + * api key as: {@link org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord#apiKey()}* + * version as: 0 (or respective version) , and + * payload as: {@link org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata} + * + * + * + * + * You can read more details in https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-MessageFormat";>KIP-405 + */ +public class RemoteLogMetadataContext { Review comment: This is slghtly different from `ApiMessageAndVersion`. This class does not have payload as the `Message` but it has `RemoteLogSegmentMetadata` or `RemoteLogSegmentMetadataUpdate` or `RemotePartitionDeleteMetadata`. This simplifies for producers/consumers of remote log metadata topic as they expect to send/receive these POJOs but not the `Message` objects. So, it is simpler for producers/consumers to have`Message` to POJO marshalling/unmarshalling in ser/des. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r615369734 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.Message; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerdes implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private final Map keyToSerdes; Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on pull request #10271: URL: https://github.com/apache/kafka/pull/10271#issuecomment-821960939 Thanks @junrao for your comments. Addressed them with commit [e909c7f](https://github.com/apache/kafka/pull/10271/commits/e909c7fc481945e54b7e364d2f52c9d191f988ec). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IUSR opened a new pull request #10555: MINOR: adjust HashMap init capacity to avoid unnecessary resizing
IUSR opened a new pull request #10555: URL: https://github.com/apache/kafka/pull/10555 * The initial capacities for some `HashMap` objects are less than expected and result in resizing. * Increase the initial capacity, similar to what `java.util.HashSet` does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky resolved KAFKA-12682. --- Resolution: Not A Problem > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional, the topic name only can find the partition by the > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only include topic > name, when to stop, it can't find the origin topic id, and find the latest > the topic id, but it's not right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky reopened KAFKA-12682: --- > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional, the topic name only can find the partition by the > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only include topic > name, when to stop, it can't find the origin topic id, and find the latest > the topic id, but it's not right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dejan2609 commented on pull request #10466: KAFKA-12417 "streams" module: switch deprecated Gradle configuration (`testRuntime` -->> `testRuntimeClasspath`)
dejan2609 commented on pull request #10466: URL: https://github.com/apache/kafka/pull/10466#issuecomment-821968689 > Could we reintroduce a configuration that behaves the same as `testRuntime` did before it was removed? @ijuma Can you please expand on this ? Are you referring to a `testRuntime` mentioned in a `copyDependantLibs` task ? https://github.com/apache/kafka/blob/2.8.0-rc2/build.gradle#L1472 ``` project(':streams') { ... tasks.create(name: "copyDependantLibs", type: Copy) { from (configurations.testRuntime) { ... } ... } ``` Or we now just brainstorm about Gradle behavior _**in-general**_ ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen opened a new pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand
wenbingshen opened a new pull request #10556: URL: https://github.com/apache/kafka/pull/10556 Remove some redundant parameters and configurations ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand
wenbingshen commented on pull request #10556: URL: https://github.com/apache/kafka/pull/10556#issuecomment-821975837 Dear @ijuma @chia7712 Can you take a look at it?Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky resolved KAFKA-12682. --- Resolution: Not A Problem > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional, the topic name only can find the partition by the > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only include topic > name, when to stop, it can't find the origin topic id, and find the latest > the topic id, but it's not right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky reopened KAFKA-12682: --- > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional, the topic name only can find the partition by the > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only include topic > name, when to stop, it can't find the origin topic id, and find the latest > the topic id, but it's not right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12682) Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order
[ https://issues.apache.org/jira/browse/KAFKA-12682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jacky updated KAFKA-12682: -- Description: In version 2.8, MetadataPartitionsBuilder has the field _localChanged and _localRemoved which record the change and delete partition, but we always process _localChanged partitions, and then _localRemoved in the kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the original order, for example, 1. migrate the partition p1 from b0 to b1; 2. change the leader of p1 3.migrate p1 from b1 to b0 and the _localRemoved will delete the p1 at last. and I think MetadataPartition should include topic uuid, and the topic name is optional for example, create topic t1, delete topic t1, create topic t1, change leader of p1 and then compact the records delete topic t1, change t1, p1 but currently, implementation will be 1. process change t1, p1 2. process delete topic t1 but the MetadataPartition doesn't include topic uuid, it only includes topic name, when to process, it can't find the origin topic uuid, and find the latest the topic id, but it's not right. and delete topic t1 should do before create t1 or change p1. was: In version 2.8, MetadataPartitionsBuilder has the field _localChanged and _localRemoved which record the change and delete partition, but we always process _localChanged partitions, and then _localRemoved in the kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the original order, for example, 1. migrate the partition p1 from b0 to b1; 2. change the leader of p1 3.migrate p1 from b1 to b0 and the _localRemoved will delete the p1 at last. and I think MetadataPartition should include topic uuid, and the topic name is optional, the topic name only can find the partition by the for example, create topic t1, delete topic t1, create topic t1, change leader of p1 and then compact the records delete topic t1, change t1, p1 but currently, implementation will be 1. process change t1, p1 2. process delete topic t1 but the MetadataPartition doesn't include topic uuid, it only include topic name, when to stop, it can't find the origin topic id, and find the latest the topic id, but it's not right. > Kraft MetadataPartitionsBuilder _localChanged and _localRemoved out of order > - > > Key: KAFKA-12682 > URL: https://issues.apache.org/jira/browse/KAFKA-12682 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.8.0 >Reporter: jacky >Priority: Minor > > In version 2.8, MetadataPartitionsBuilder has the field _localChanged and > _localRemoved which record the change and delete partition, but we always > process _localChanged partitions, and then _localRemoved in the > kafka.server.RaftReplicaManager#handleMetadataRecords, not respect the > original order, for example, > 1. migrate the partition p1 from b0 to b1; > 2. change the leader of p1 > 3.migrate p1 from b1 to b0 > and the _localRemoved will delete the p1 at last. > and I think MetadataPartition should include topic uuid, and the topic name > is optional > for example, > create topic t1, delete topic t1, create topic t1, change leader of p1 > and then compact the records > delete topic t1, change t1, p1 > but currently, implementation will be > 1. process change t1, p1 > 2. process delete topic t1 > but the MetadataPartition doesn't include topic uuid, it only includes topic > name, when to process, it can't find the origin topic uuid, and find the > latest the topic id, but it's not right. and delete topic t1 should do before > create t1 or change p1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12629) Flaky Test RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324517#comment-17324517 ] Wenbing Shen commented on KAFKA-12629: -- Found another error log today: One of the errors occurred in testCreateClusterAndCreateAndManyTopicsWithManyPartitions, It reports: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. The strange thing is that this test case creates topics of test-topic-1, test-topic-2, and test-topic-3, but the log in the standard output indicates that the copy acquisition thread of broker 0 needs to fetch test-topic-0 partition data from broker 1, But the hostname of broker 1 cannot be resolved. The detailed log can be viewed from this link: https://github.com/apache/kafka/pull/10551/checks?check_run_id=2373652453 > Flaky Test RaftClusterTest > -- > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324536#comment-17324536 ] Haoran Xuan commented on KAFKA-10800: - Hi, [~jagsancio], seems that the dependencies have been done, can I take this JIRA? Thanks! > Validate the snapshot id when the state machine creates a snapshot > -- > > Key: KAFKA-10800 > URL: https://issues.apache.org/jira/browse/KAFKA-10800 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Priority: Major > > When the state machine attempts to create a snapshot writer we should > validate that the following is true: > # The end offset and epoch of the snapshot is less than the high-watermark. > # The end offset and epoch of the snapshot is valid based on the leader > epoch cache. > Note that this validation should not be performed when the raft client > creates the snapshot writer because in that case the local log is out of date > and the follower should trust the snapshot id sent by the partition leader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #10508: KAFKA-12633: Remove deprecated APIs in TopologyTestDriver
guozhangwang commented on pull request #10508: URL: https://github.com/apache/kafka/pull/10508#issuecomment-822030658 > Looks like there are some related test failures, though. Thanks! I have fixed the unit test, will merge after confirming green builds locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #10508: KAFKA-12633: Remove deprecated APIs in TopologyTestDriver
guozhangwang merged pull request #10508: URL: https://github.com/apache/kafka/pull/10508 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12633) Remove deprecated "TopologyTestDriver#pipeInput / readOutput"
[ https://issues.apache.org/jira/browse/KAFKA-12633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-12633. --- Fix Version/s: 3.0.0 Resolution: Fixed > Remove deprecated "TopologyTestDriver#pipeInput / readOutput" > - > > Key: KAFKA-12633 > URL: https://issues.apache.org/jira/browse/KAFKA-12633 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12683) Remove deprecated "UsePreviousTimeOnInvalidTimeStamp"
Guozhang Wang created KAFKA-12683: - Summary: Remove deprecated "UsePreviousTimeOnInvalidTimeStamp" Key: KAFKA-12683 URL: https://issues.apache.org/jira/browse/KAFKA-12683 Project: Kafka Issue Type: Sub-task Reporter: Guozhang Wang -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang opened a new pull request #10557: KAFKA-12683: Remove deprecated UsePreviousTimeOnInvalidTimestamp
guozhangwang opened a new pull request #10557: URL: https://github.com/apache/kafka/pull/10557 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10557: KAFKA-12683: Remove deprecated UsePreviousTimeOnInvalidTimestamp
guozhangwang commented on pull request #10557: URL: https://github.com/apache/kafka/pull/10557#issuecomment-822032717 ping @abbccdda for reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order
guozhangwang commented on pull request #9441: URL: https://github.com/apache/kafka/pull/9441#issuecomment-822034184 @tombentley could you see if @hachikuji 's comments can be addressed? This is a pretty tricky bug that I would like to get fixed in 3.0. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324574#comment-17324574 ] Guozhang Wang commented on KAFKA-9168: -- Hi [~sagarrao] just for my own education could you paste the patch with benchmarks to this ticket so that I can understand who would you propose to integrate the direct byte buffer once we upgrade? Bruno is working on upgrading beyond 6.8.1 so we are on the road to do this soon. > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12669) Add deleteRange to WindowStore / KeyValueStore interfaces
[ https://issues.apache.org/jira/browse/KAFKA-12669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324575#comment-17324575 ] Guozhang Wang commented on KAFKA-12669: --- Hi [~dongjin] , thanks for your interests! It is for tracking the work that [~spena] is working on (his PR contains the implementation but only for internal usage, and this ticket is for eventually exposing it as public APIs). If you like, you can take a look at his PR (https://github.com/apache/kafka/pull/10537) and see if that matches your thoughts too. > Add deleteRange to WindowStore / KeyValueStore interfaces > - > > Key: KAFKA-12669 > URL: https://issues.apache.org/jira/browse/KAFKA-12669 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > Labels: needs-kip > > We can consider adding such APIs where the underlying implementation classes > have better optimizations than deleting the keys as get-and-delete one by one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang merged pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally
guozhangwang merged pull request #10537: URL: https://github.com/apache/kafka/pull/10537 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10537: KAFKA-10847: Delete Time-ordered duplicated records using deleteRange() internally
guozhangwang commented on pull request #10537: URL: https://github.com/apache/kafka/pull/10537#issuecomment-822036117 Merged to trunk, thanks @spena ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9640: KAFKA-10283; Consolidate client-level and consumer-level assignment within ClientState
guozhangwang commented on pull request #9640: URL: https://github.com/apache/kafka/pull/9640#issuecomment-822036332 > @guozhangwang @highluck Sorry, I've been swamped with other things lately. If you have a minute to review it yourself Guozhang I would appreciate it! Also ping @cadonna or @vvcephei for reviews since they're familiar with this part of the code Thanks @ableegoldman , I will review it :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
guozhangwang commented on pull request #9414: URL: https://github.com/apache/kafka/pull/9414#issuecomment-822036454 @dongjinleekr @vvcephei just checking are you still working on it? I saw @vvcephei has approved the PR but it was not yet merged, hence asking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9640: KAFKA-10283; Consolidate client-level and consumer-level assignment within ClientState
guozhangwang commented on pull request #9640: URL: https://github.com/apache/kafka/pull/9640#issuecomment-822036798 @highluck I read through the code, and it looks good to me overall. However the unit test failure seems relevant: ``` org.apache.kafka.streams.processor.internals.assignment.ClientStateTest.shouldReturnAssignedTasksForConsumer ``` Could you check and resolve it before we can merge? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck commented on pull request #9640: KAFKA-10283; Consolidate client-level and consumer-level assignment within ClientState
highluck commented on pull request #9640: URL: https://github.com/apache/kafka/pull/9640#issuecomment-822113308 @guozhangwang There was an error in the code. Now I have fixed it and the tests are running fine. Thanks for review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12684) The valid partition list is incorrectly replaced by the successfully elected partition list
Wenbing Shen created KAFKA-12684: Summary: The valid partition list is incorrectly replaced by the successfully elected partition list Key: KAFKA-12684 URL: https://issues.apache.org/jira/browse/KAFKA-12684 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 2.7.0, 2.6.0 Reporter: Wenbing Shen Fix For: 3.0.0 Attachments: election-preferred-leader.png, non-preferred-leader.png When using the kafka-election-tool for preferred replica election, if there are partitions in the elected list that are in the preferred replica, the list of partitions already in the preferred replica will be replaced by the successfully elected partition list. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wenbingshen opened a new pull request #10558: KAFKA-12684: Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
wenbingshen opened a new pull request #10558: URL: https://github.com/apache/kafka/pull/10558 When using the kafka-election-tool for preferred replica election, if there are partitions in the elected list that are in the preferred replica, the list of partitions already in the preferred replica will be replaced by the successfully elected partition list. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10558: KAFKA-12684: Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
wenbingshen commented on pull request #10558: URL: https://github.com/apache/kafka/pull/10558#issuecomment-822129034 @dajac @chia7712 Can you help review it? Thanks. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12684) The valid partition list is incorrectly replaced by the successfully elected partition list
[ https://issues.apache.org/jira/browse/KAFKA-12684?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen reassigned KAFKA-12684: Assignee: Wenbing Shen > The valid partition list is incorrectly replaced by the successfully elected > partition list > --- > > Key: KAFKA-12684 > URL: https://issues.apache.org/jira/browse/KAFKA-12684 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 2.6.0, 2.7.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Minor > Fix For: 3.0.0 > > Attachments: election-preferred-leader.png, non-preferred-leader.png > > > When using the kafka-election-tool for preferred replica election, if there > are partitions in the elected list that are in the preferred replica, the > list of partitions already in the preferred replica will be replaced by the > successfully elected partition list. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand
chia7712 commented on a change in pull request #10556: URL: https://github.com/apache/kafka/pull/10556#discussion_r615517933 ## File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala ## @@ -17,32 +17,27 @@ package kafka.admin -import java.io.PrintStream Review comment: I'd like this order optimization. However, it is not related to this PR and it may produce conflicting files to other PRs. Hence, could you please revert this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12485) Speed up Consumer#committed by returning cached offsets for owned partitions
[ https://issues.apache.org/jira/browse/KAFKA-12485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-12485: - Assignee: Luke Chen > Speed up Consumer#committed by returning cached offsets for owned partitions > > > Key: KAFKA-12485 > URL: https://issues.apache.org/jira/browse/KAFKA-12485 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > Labels: newbie++ > > All of the KafkaConsumer#committed APIs will currently make a remote blocking > call to the server to fetch the committed offsets. This is typically used to > reset the offsets after a crash or restart, or to fetch offsets for other > consumers in the group. However some users may wish to invoke this API on > partitions which are currently owned by the Consumer, in which case the > remote call is unnecessary since it should be able to just keep track of what > it has itself committed. > We should consider optimizing these APIs to just return the cached offsets in > place of the remote call when passed in only partitions that are currently > owned. This is similar to what we do in Consumer#position, although there we > have a guarantee that the partitions are owned by the Consumer whereas in > #committed we do not -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12675) Improve sticky general assignor scalability and performance
[ https://issues.apache.org/jira/browse/KAFKA-12675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17324358#comment-17324358 ] Travis Bischel edited comment on KAFKA-12675 at 4/19/21, 3:33 AM: -- Great insight on getting rid of partition2AllPotentialConsumers, as well as keeping some more things sorted! I was able to translate that into my own code and dropped the large imbalance from 9.5s to 0.5s, as well as from 8.5G memory util to 0.5G :) I'll take a look at the code more in depth soon. Edit: after further improvements I was able to get the large imbalance down to 220ms and 171MB, was (Author: twmb): Great insight on getting rid of partition2AllPotentialConsumers, as well as keeping some more things sorted! I was able to translate that into my own code and dropped the large imbalance from 9.5s to 0.5s, as well as from 8.5G memory util to 0.5G :) I'll take a look at the code more in depth soon. > Improve sticky general assignor scalability and performance > --- > > Key: KAFKA-12675 > URL: https://issues.apache.org/jira/browse/KAFKA-12675 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Currently, we have "general assignor" for non-equal subscription case and > "constrained assignor" for all equal subscription case. There's a performance > test for constrained assignor with: > topicCount = {color:#ff}500{color}; > partitionCount = {color:#ff}2000{color}; > consumerCount = {color:#ff}2000{color}; > in _testLargeAssignmentAndGroupWithUniformSubscription,_ total 1 million > partitions and we can complete the assignment within 2 second in my machine. > However, if we let 1 of the consumer subscribe to only 1 topic, it'll use > "general assignor", and the result with the same setting as above is: > *OutOfMemory,* > Even we down the count to: > topicCount = {color:#ff}50{color}; > partitionCount = 1{color:#ff}000{color}; > consumerCount = 1{color:#ff}000{color}; > We still got *OutOfMemory*. > With this setting: > topicCount = {color:#ff}50{color}; > partitionCount = 8{color:#ff}00{color}; > consumerCount = 8{color:#ff}00{color}; > We can complete in 10 seconds in my machine, which is still slow. > > Since we are going to set default assignment strategy to > "CooperativeStickyAssignor" soon, we should improve the scalability and > performance for sticky general assignor. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #10559: MINOR: diable RaftClusterTest first
showuon opened a new pull request #10559: URL: https://github.com/apache/kafka/pull/10559 Disable the flaky RaftClusterTest tests before the root cause KAFKA-12677 got fixed. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10559: MINOR: diable RaftClusterTest first
showuon commented on a change in pull request #10559: URL: https://github.com/apache/kafka/pull/10559#discussion_r615524139 ## File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala ## @@ -22,14 +22,16 @@ import kafka.utils.TestUtils import org.apache.kafka.clients.admin.{Admin, NewTopic} import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent} import org.apache.kafka.metadata.BrokerState -import org.junit.jupiter.api.{Test, Timeout} +import org.junit.jupiter.api.{Disabled, Test, Timeout} import org.junit.jupiter.api.Assertions._ - import java.util import java.util.Collections + import scala.jdk.CollectionConverters._ -@Timeout(12) Review comment: The timeout unit is in seconds. Fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10559: MINOR: diable RaftClusterTest first
showuon commented on pull request #10559: URL: https://github.com/apache/kafka/pull/10559#issuecomment-822144126 @cmccabe , I think the root cause might need some time to get fix. Do you think we should disable the tests first? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand
wenbingshen commented on a change in pull request #10556: URL: https://github.com/apache/kafka/pull/10556#discussion_r615545404 ## File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala ## @@ -17,32 +17,27 @@ package kafka.admin -import java.io.PrintStream -import java.io.IOException -import java.util.Properties -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils} import kafka.utils.Implicits._ -import kafka.utils.Logging -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging} import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture} +import org.apache.kafka.clients._ +import org.apache.kafka.common.Node import org.apache.kafka.common.config.ConfigDef.ValidString._ import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.AuthenticationException import org.apache.kafka.common.internals.ClusterResourceListeners +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector -import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.utils.LogContext -import org.apache.kafka.common.utils.{KafkaThread, Time} -import org.apache.kafka.common.Node -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils} +import java.io.{IOException, PrintStream} Review comment: > I'd like this order optimization. However, it is not related to this PR and it may produce conflicting files to other PRs. Hence, could you please revert this change? @chia7712 Thank you so much for your comment. Now both IOException and PrintStream are inside the same {}. Please help me to review this PR again. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10551: MINOR:Remove nonsense test line from TopicCommandTest
wenbingshen commented on a change in pull request #10551: URL: https://github.com/apache/kafka/pull/10551#discussion_r615547868 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -465,7 +465,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin topicService.describeTopic(new TopicCommandOptions(Array("--topic", testTopicName val rows = output.split("\n") assertEquals(3, rows.size) -assertTrue(rows(0).startsWith(s"\tTopic: $testTopicName")) +assertTrue(rows(0).startsWith(s"Topic: $testTopicName")) Review comment: Replace `assertTrue(rows(0).startsWith(s"Topic:$testTopicName\tPartitionCount:1"))` with `assertTrue(rows(0).startsWith(s"Topic: $testTopicName"))` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand
chia7712 commented on a change in pull request #10556: URL: https://github.com/apache/kafka/pull/10556#discussion_r615554096 ## File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala ## @@ -17,32 +17,27 @@ package kafka.admin -import java.io.PrintStream -import java.io.IOException -import java.util.Properties -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils} import kafka.utils.Implicits._ -import kafka.utils.Logging -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging} import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture} +import org.apache.kafka.clients._ +import org.apache.kafka.common.Node import org.apache.kafka.common.config.ConfigDef.ValidString._ import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.AuthenticationException import org.apache.kafka.common.internals.ClusterResourceListeners +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector -import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.utils.LogContext -import org.apache.kafka.common.utils.{KafkaThread, Time} -import org.apache.kafka.common.Node -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils} +import java.io.{IOException, PrintStream} Review comment: Sorry for my unclear comment. My point was that this PR re-order all imports of this class. It would be better to revert all changes related to re-order. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-822200511 The performance comparison in jenkins for uniform subscription and non-equal subscription with the setting: ``` topicCount = 500; partitionCount = 2000; consumerCount = 2000; ``` ``` Build / JDK 15 and Scala 2.13 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 13 sec | Passed Build / JDK 11 and Scala 2.13 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 17 sec | Passed Build / JDK 8 and Scala 2.12 / testLargeAssignmentAndGroupWithNonEqualSubscription() | 14 sec | Passed Build / JDK 8 and Scala 2.12 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.4 sec | Passed Build / JDK 15 and Scala 2.13 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.3 sec | Passed Build / JDK 11 and Scala 2.13 / testLargeAssignmentAndGroupWithUniformSubscription() | 3.9 sec | Passed ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r615227866 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -313,26 +312,24 @@ private boolean allSubscriptionsEqual(Set allTopics, prepopulateCurrentAssignments(subscriptions, currentAssignment, prevAssignment); -// a mapping of all topic partitions to all consumers that can be assigned to them -final Map> partition2AllPotentialConsumers = new HashMap<>(); -// a mapping of all consumers to all potential topic partitions that can be assigned to them -final Map> consumer2AllPotentialPartitions = new HashMap<>(); +// a mapping of all topics to all consumers that can be assigned to them +final Map> topic2AllPotentialConsumers = new HashMap<>(partitionsPerTopic.keySet().size()); +// a mapping of all consumers to all potential topics that can be assigned to them +final Map> consumer2AllPotentialTopics = new HashMap<>(subscriptions.keySet().size()); -// initialize partition2AllPotentialConsumers and consumer2AllPotentialPartitions in the following two for loops +// initialize topic2AllPotentialConsumers and consumer2AllPotentialTopics in the following two for loops for (Entry entry: partitionsPerTopic.entrySet()) { for (int i = 0; i < entry.getValue(); ++i) -partition2AllPotentialConsumers.put(new TopicPartition(entry.getKey(), i), new ArrayList<>()); +topic2AllPotentialConsumers.put(entry.getKey(), new ArrayList<>()); } for (Entry entry: subscriptions.entrySet()) { String consumerId = entry.getKey(); -consumer2AllPotentialPartitions.put(consumerId, new ArrayList<>()); +List subscribedTopics = new ArrayList<>(entry.getValue().topics().size()); +consumer2AllPotentialTopics.put(consumerId, subscribedTopics); entry.getValue().topics().stream().filter(topic -> partitionsPerTopic.get(topic) != null).forEach(topic -> { -for (int i = 0; i < partitionsPerTopic.get(topic); ++i) { -TopicPartition topicPartition = new TopicPartition(topic, i); - consumer2AllPotentialPartitions.get(consumerId).add(topicPartition); - partition2AllPotentialConsumers.get(topicPartition).add(consumerId); Review comment: refactor 1: We used to have 2 map `consumer2AllPotentialPartitions` and `partition2AllPotentialConsumers`. But that would need a lot of memory here, ex: `consumer2AllPotentialPartitions` will need 2000 map, and each map contains 1M partitions (suppose 1 million partition and 2000 consumers). But actually, we only need to store the topics of each potential partitions/consumers, and mapped with `partitionsPerTopic`. so I changed to `topic2AllPotentialConsumers` and `consumer2AllPotentialTopics`. Save memory and save time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r615228600 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -387,58 +398,121 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); +int totalPartitionCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum); + balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionCount); + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and toBeRemovedPartitions. We use two pointers technique here: + * + * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from toBeRemovedPartitions + * + * @param sortedPartitions: sorted all partitions + * @param toBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions + * @return the partitions don't assign to any current consumers + */ +private List getUnassignedPartitions(List sortedPartitions, Review comment: refactor 2: We used to have an ArrayList of `unassignedPartitions`, with all sorted partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, the ArrayList element remove is pretty slow for huge size because it needs to find element first, and then, do arrayCopy for the removed array with size of (originalSize -1). This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement. To refactor it, I used two pointer technique to loop through 2 sorted list: `sortedPartitions` and `sortedToBeRemovedPartitions`. And only add the difference set of the 2 lists. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10556: MINOR: Remove redundant code from BrokerApiVersionsCommand
wenbingshen commented on a change in pull request #10556: URL: https://github.com/apache/kafka/pull/10556#discussion_r615572667 ## File path: core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala ## @@ -17,32 +17,27 @@ package kafka.admin -import java.io.PrintStream -import java.io.IOException -import java.util.Properties -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit} - -import kafka.utils.{CommandDefaultOptions, CommandLineUtils} import kafka.utils.Implicits._ -import kafka.utils.Logging -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions} +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Logging} import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, RequestFuture} +import org.apache.kafka.clients._ +import org.apache.kafka.common.Node import org.apache.kafka.common.config.ConfigDef.ValidString._ import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.AuthenticationException import org.apache.kafka.common.internals.ClusterResourceListeners +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.Selector -import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.utils.LogContext -import org.apache.kafka.common.utils.{KafkaThread, Time} -import org.apache.kafka.common.Node -import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersionCollection -import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, ApiVersionsRequest, ApiVersionsResponse, MetadataRequest, MetadataResponse} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils} +import java.io.{IOException, PrintStream} Review comment: Sorry, blame me, I understand what you mean now, thank you for your feedback, I will make changes and submit it later today. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on pull request #10551: MINOR:Remove nonsense test line from TopicCommandTest
wenbingshen commented on pull request #10551: URL: https://github.com/apache/kafka/pull/10551#issuecomment-822208422 > @wenbingshen thanks for your patch. LGTM > > Could you revise the title of this PR? I'd like to merge it tomorrow. Of course, I will modify it now, thank you for your review. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10558: KAFKA-12684: Fix noop set is incorrectly replaced with succeeded set from LeaderElectionCommand
dajac commented on pull request #10558: URL: https://github.com/apache/kafka/pull/10558#issuecomment-822213853 @wenbingshen Thanks for the patch. Should we add a unit test to verify this or perhaps extend an existing one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org