Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575753935


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")
+topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024")
+
+createTopic(topic, partitionNum, 1, topicProperties)
+assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent)
+
+// send enough records to trigger log rolling
+(0 until 20).foreach { _ =>
+  TestUtils.generateAndProduceMessages(servers, topic, 10, 1)
+}
+TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new 
TopicPartition(topic, 0)).get.numberOfSegments > 1,
+  "timed out waiting for log segment to roll")
+
+// Wait for log segment retention. Override initialTaskDelayMs as 5 
seconds.
+// The first retention task is executed after 5 seconds, so waiting for 10 
seconds should be enough.

Review Comment:
   Fixed it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15616:
URL: https://github.com/apache/kafka/pull/15616#discussion_r1575754146


##
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala:
##
@@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
 assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
+  @Test
+  def testAlterReplicaLogDirsRequestWithRetention(): Unit = {
+val partitionNum = 1
+
+// Alter replica dir before topic creation
+val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath
+val partitionDirs1 = (0 until partitionNum).map(partition => new 
TopicPartition(topic, partition) -> logDir1).toMap
+val alterReplicaLogDirsResponse1 = 
sendAlterReplicaLogDirsRequest(partitionDirs1)
+
+// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all 
partitions
+val tp = new TopicPartition(topic, 0)
+assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
findErrorForPartition(alterReplicaLogDirsResponse1, tp))
+assertTrue(servers.head.logManager.getLog(tp).isEmpty)
+
+val topicProperties = new Properties()
+topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024")
+topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1")

Review Comment:
   Yes, I add more comments for it. Thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13459) MM2 should be able to add the source offset to the record header

2024-04-23 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839981#comment-17839981
 ] 

Daniel Urban commented on KAFKA-13459:
--

[~viktorsomogyi]  No plans, and it does require a KIP.

> MM2 should be able to add the source offset to the record header
> 
>
> Key: KAFKA-13459
> URL: https://issues.apache.org/jira/browse/KAFKA-13459
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> MM2 could add the source offset to the record header to help with diagnostics 
> in some use-cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]

2024-04-23 Thread via GitHub


lucasbru commented on code in PR #15778:
URL: https://github.com/apache/kafka/pull/15778#discussion_r1575801343


##
tests/kafkatest/tests/client/consumer_test.py:
##
@@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, 
static_membership, bounce_
 self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, 
num_bounces=num_bounces)
 
 num_revokes_after_bounce = consumer.num_revokes_for_alive() - 
num_revokes_before_bounce
-
-check_condition = num_revokes_after_bounce != 0
+
 # under static membership, the live consumer shall not revoke any 
current running partitions,
 # since there is no global rebalance being triggered.
 if static_membership:
-check_condition = num_revokes_after_bounce == 0
-
-assert check_condition, \
-"Total revoked count %d does not match the expectation of having 0 
revokes as %d" % \
-(num_revokes_after_bounce, check_condition)
+assert num_revokes_after_bounce == 0, \
+"Unexpected revocation triggered when bouncing static member. 
Expecting 0 but had %d revocations" % num_revokes_after_bounce
+elif consumer.is_eager():
+assert num_revokes_after_bounce != 0, \

Review Comment:
   Did this test work before for `CooperativeStickyAssignor`, or was this case 
not tested?
   
   Also, is there _any_ way now to detect that without static membership, a 
rebalance happens during the roll? It seems like in the case where 
static_membership == false, and is_eager == false, we are not asserting 
anything now, so I wonder how much value is still there in that combination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16461: New consumer fails to consume records in security_test.py system test [kafka]

2024-04-23 Thread via GitHub


lucasbru merged PR #15746:
URL: https://github.com/apache/kafka/pull/15746


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix logging of KafkaStreams close [kafka]

2024-04-23 Thread via GitHub


soarez merged PR #15783:
URL: https://github.com/apache/kafka/pull/15783


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1575855253


##
server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class QuotaConfigs {
+public static final String NUM_QUOTA_SAMPLES_CONFIG = "quota.window.num";
+public static final String NUM_QUOTA_SAMPLES_DOC = "The number of samples 
to retain in memory for client quotas";
+public static final String NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG = 
"controller.quota.window.num";
+public static final String NUM_CONTROLLER_QUOTA_SAMPLES_DOC = "The number 
of samples to retain in memory for controller mutation quotas";
+public static final String NUM_REPLICATION_QUOTA_SAMPLES_CONFIG = 
"replication.quota.window.num";
+public static final String NUM_REPLICATION_QUOTA_SAMPLES_DOC = "The number 
of samples to retain in memory for replication quotas";
+public static final String 
NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_CONFIG = 
"alter.log.dirs.replication.quota.window.num";
+public static final String 
NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_DOC = "The number of samples to 
retain in memory for alter log dirs replication quotas";
+
+// Always have 10 whole windows + 1 current window
+public static final int NUM_QUOTA_SAMPLES_DEFAULT = 11;
+
+public static final String QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"quota.window.size.seconds";
+public static final String QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span 
of each sample for client quotas";
+public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"controller.quota.window.size.seconds";
+public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The 
time span of each sample for controller mutations quotas";
+public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"replication.quota.window.size.seconds";
+public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = 
"The time span of each sample for replication quotas";
+public static final String 
ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"alter.log.dirs.replication.quota.window.size.seconds";
+public static final String 
ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of 
each sample for alter log dirs replication quotas";
+public static final int QUOTA_WINDOW_SIZE_SECONDS_DEFAULT = 1;
+
+public static final String CLIENT_QUOTA_CALLBACK_CLASS_CONFIG = 
"client.quota.callback.class";
+public static final String CLIENT_QUOTA_CALLBACK_CLASS_DOC = "The fully 
qualified name of a class that implements the ClientQuotaCallback interface, " +
+"which is used to determine quota limits applied to client 
requests. By default, the  and  " +
+"quotas that are stored in ZooKeeper are applied. For any given 
request, the most specific quota that matches the user principal " +
+"of the session and the client-id of the request is applied.";
+
+public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG = 
"leader.replication.throttled.replicas";
+public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_DOC = "A 
list of replicas for which log replication should be throttled on " +
+"the leader side. The list should describe a set of replicas in 
the form " +
+"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or 
alternatively the wildcard '*' can be used to throttle " +
+"all replicas for this topic.";
+public static final List 
LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT = Collections.emptyList();
+
+public static final String FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG 
= "follower.replication.throttled.replicas";
+public static fina

Re: [PR] MINOR: Add junit properties to display parameterized test names [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #14983:
URL: https://github.com/apache/kafka/pull/14983#issuecomment-2071780823

   I have backport this PR to 3.7 see 
https://github.com/apache/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2071783047

   The failed tests is fixed by 
https://github.com/chia7712/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-23 Thread via GitHub


chia7712 merged PR #15777:
URL: https://github.com/apache/kafka/pull/15777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15354:
URL: https://github.com/apache/kafka/pull/15354#issuecomment-2071806320

   @jolshan Do you plan to backport this to 3.7? I recently try to backport 
other PRs to 3.7 and encounter the same flaky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15772:
URL: https://github.com/apache/kafka/pull/15772#issuecomment-2071831061

   > controller.listener.names
   
   oh, it will be moved to `KRaftConfigs`. please ignore my previous comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15772:
URL: https://github.com/apache/kafka/pull/15772#issuecomment-2071861237

   Those failed tests pass on my local. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]

2024-04-23 Thread via GitHub


chia7712 merged PR #15772:
URL: https://github.com/apache/kafka/pull/15772


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15762:
URL: https://github.com/apache/kafka/pull/15762#issuecomment-2071864618

   retrigger QA due to incomplete build. I will merge it after QA get completed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-23 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1576005665


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote.quota;
+
+import kafka.server.QuotaType;
+import kafka.server.SensorAccess;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.SimpleRate;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class RLMQuotaManager {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
+
+private final RLMQuotaManagerConfig config;
+private final Metrics metrics;
+private final QuotaType quotaType;
+private final String description;
+private final Time time;
+
+private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+private final SensorAccess sensorAccess;
+private Quota quota;
+
+public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, 
QuotaType quotaType, String description, Time time) {
+this.config = config;
+this.metrics = metrics;
+this.quotaType = quotaType;
+this.description = description;
+this.time = time;
+
+this.quota = new Quota(config.getQuotaBytesPerSecond(), true);
+this.sensorAccess = new SensorAccess(lock, metrics);
+}
+
+public void updateQuota(Quota newQuota) {
+lock.writeLock().lock();
+try {
+this.quota = newQuota;
+
+Map allMetrics = metrics.metrics();
+MetricName quotaMetricName = metricName();
+allMetrics.forEach((metricName, metric) -> {
+if (metricName.name().equals(quotaMetricName.name()) && 
metricName.group().equals(quotaMetricName.group())) {
+Map metricTags = metricName.tags();
+LOGGER.info("Sensor for quota-id {} already exists. 
Setting quota to {} in MetricConfig", metricTags, newQuota);

Review Comment:
   Makes sense, will change this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]

2024-04-23 Thread via GitHub


abhijeetk88 commented on code in PR #15625:
URL: https://github.com/apache/kafka/pull/15625#discussion_r1576007574


##
core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java:
##


Review Comment:
   Yes, the integration of the quota manager will come in the follow-up PRs. I 
have mentioned it in the description of the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576009920


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
 

Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576013151


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -171,29 +209,27 @@ private void produceRecord() {
 }
 
 private void withStableConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+try (Consumer consumer = createConsumer(new 
Properties());) {

Review Comment:
   please remove `;` 



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +238,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Could you add tests for `GroupProtocol.CONSUMER`? see comment: 
https://github.com/apache/kafka/pull/15766#discussion_r1576009920



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -171,29 +209,27 @@ private void produceRecord() {
 }
 
 private void withStableConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+try (Consumer consumer = createConsumer(new 
Properties());) {
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+Assertions.assertNotEquals(0, records.count());
 consumer.commitSync();
 body.run();
-} finally {
-Utils.closeQuietly(consumer, "consumer");
 }
 }
 
 private void withEmptyConsumerGroup(Runnable body) {

Review Comment:
   Can we merge `withStableConsumerGroup` and `withEmptyConsumerGroup`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576056664


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = 
Collections.unmodifiableMap(propertyOverrides);

Review Comment:
   Thanks, I'll address this in other builders also.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison opened a new pull request, #15786:
URL: https://github.com/apache/kafka/pull/15786

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16604:
-

 Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs
 Key: KAFKA-16604
 URL: https://issues.apache.org/jira/browse/KAFKA-16604
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


Currently, one can create ConfigKey by either invoking the public constructor 
directly and passing it to a ConfigDef object or by invoking the a bunch of 
define methods. The 2 ways can get confusing at times. Moreover, it could lead 
to errors as was noticed in KAFKA-16592

We should ideally have only 1 way exposed to the users which IMO should be to 
create the objects only through the exposed define methods. This ticket is 
about marking the public constructor of ConfigKey as Deprecated first and then 
making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]

2024-04-23 Thread via GitHub


vamossagar12 commented on PR #15762:
URL: https://github.com/apache/kafka/pull/15762#issuecomment-2072021300

   @chia7712 , I created this ticket: 
[KAFKA-16604](https://issues.apache.org/jira/browse/KAFKA-16604) for tracking 
the KIP


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576082586


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +238,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Can we do it in another PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


dajac commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576085044


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTopicM

Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072051627

   > @FrankYang0529 sorry that I check the PR again, and more comments are 
left. PTAL
   
   Hi @chia7712, I addressed last comments. I think we can add new test case in 
next PR. We can more focus on migrate to ClusterTestExtensions in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]

2024-04-23 Thread via GitHub


dajac commented on code in PR #15721:
URL: https://github.com/apache/kafka/pull/15721#discussion_r1576113492


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1422,10 +1499,16 @@ private 
CoordinatorResult consumerGr
 ) throws ApiException {
 ConsumerGroup group = consumerGroup(groupId);
 List records;
+CompletableFuture appendFuture = null;
 if (instanceId == null) {
 ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, false);
 log.info("[GroupId {}] Member {} left the consumer group.", 
groupId, memberId);
-records = consumerGroupFenceMember(group, member);
+if (validateOnlineDowngrade(group, memberId)) {
+records = new ArrayList<>();
+appendFuture = convertToClassicGroup(group, memberId, records);
+} else {
+records = consumerGroupFenceMember(group, member);
+}

Review Comment:
   Having this duplicated code in multiple places is a tad annoying, don't you 
think? I wonder if we could push it down into `consumerGroupFenceMember` as it 
seems that the downgrade must be checked whenever `consumerGroupFenceMember` is 
called. `consumerGroupFenceMember` could return a future or null and take the 
records as an argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on PR #15761:
URL: https://github.com/apache/kafka/pull/15761#issuecomment-2072088191

   > @brandboat Sorry that I leave more comments below. please take a look. 
thanks
   
   Thank you for your patience! I appreciate your input. Already addressed all 
comments as above. Please take another look when you are available :smiley: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]

2024-04-23 Thread via GitHub


TaiJuWu commented on PR #15782:
URL: https://github.com/apache/kafka/pull/15782#issuecomment-2072107254

   > Thank you for the changes.
   > 
   > Is there a reason to prefer the `java.lang.` prefix?
   
   There is no special reason, Removing prefix is ok.
   Following other code, Removing this prefix, thanks for your remind. 
   
   > 
   > There are other uses of `System.getProperty("line.separator")`, should 
those be changed too?
   > 
   > * clients/src/main/java/org/apache/kafka/common/utils/Shell.java
   > * clients/src/main/java/org/apache/kafka/common/utils/Utils.java
   > * tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java
   > * core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
   > * 
connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java
   
   These are removed if you use the newest trunk branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840075#comment-17840075
 ] 

Chia-Ping Tsai commented on KAFKA-16604:


[~sagarrao] Thanks for opening this jira. Do you have free cycle to take over 
it?

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072132702

   > I think we can add new test case in next PR. We can more focus on migrate 
to ClusterTestExtensions in this PR.
   
   Please take a look at 
https://github.com/apache/kafka/pull/15766#discussion_r1576009920, and we need 
a bit refactor but it should not a hard work. So please try to address it in 
this PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1576158733


##
core/src/main/scala/kafka/server/DynamicConfig.scala:
##
@@ -36,30 +35,12 @@ import scala.jdk.CollectionConverters._
 object DynamicConfig {

Review Comment:
   It need to be moved anyway when we move `KafkaConfig` class. So it's on my 
roadmap for 
   KAFKA-15853. The Jira for moving KafkaConfig out of core is one of the most 
complicated one as many pieces will need to move to support this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576155040


##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -167,11 +186,11 @@ private TestKitNodes(
 NavigableMap controllerNodes,
 NavigableMap brokerNodes
 ) {
-this.baseDirectory = baseDirectory;
-this.clusterId = clusterId;
-this.bootstrapMetadata = bootstrapMetadata;
-this.controllerNodes = controllerNodes;
-this.brokerNodes = brokerNodes;
+this.baseDirectory = Objects.requireNonNull(baseDirectory);
+this.clusterId = Objects.requireNonNull(clusterId);
+this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata);
+this.controllerNodes = Objects.requireNonNull(controllerNodes);

Review Comment:
   Please follow the rule of this PR - be a immutable object



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;
+this.incarnationId = Objects.requireNonNull(incarnationId);

Review Comment:
   this is not used so it is fine to remove it



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -97,9 +107,12 @@ public Builder setBrokerNodes(int numBrokerNodes, int 
disksPerBroker) {
 if (!brokerNodeBuilders.isEmpty()) {
 nextId = brokerNodeBuilders.lastKey() + 1;
 }
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()

Review Comment:
   Can we evaluate those settings in `build` method? That can simplify code 
since we don't need to revert the changes (for example, the number of brokers).



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   `logDataDirectories` should return immutable collection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15773:
URL: https://github.com/apache/kafka/pull/15773#issuecomment-2072172997

   > > Yeah, I tried to use unit test. However, I didn't find a good way to 
check whether kafka-delete-logs task has run or not. I also tried to check 
whether scheduler can be stopped. However, the mock scheduler can still add new 
task during scheduler shutdown.
   > 
   > Could we use `KafkaScheduler` instead?
   
   Yeah, update to use `KafkaScheduler`. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-23 Thread via GitHub


kamalcph commented on code in PR #15748:
URL: https://github.com/apache/kafka/pull/15748#discussion_r1576175394


##
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##
@@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
 // Set the last modified time to an old value to force deletion of old 
segments
 val endOffset = log.logEndOffset
 log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * 
retentionMs))
-TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset,
+TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments,
   "Timed out waiting for deletion of old segments")
-assertEquals(1, log.numberOfSegments)
+assertEquals(log.logStartOffset, endOffset)

Review Comment:
   The tests is flaky post landing the PR in trunk: 
   
   
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest
   
   We have to raise a similar fix in trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-23 Thread via GitHub


kamalcph commented on code in PR #15748:
URL: https://github.com/apache/kafka/pull/15748#discussion_r1576175394


##
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##
@@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
 // Set the last modified time to an old value to force deletion of old 
segments
 val endOffset = log.logEndOffset
 log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * 
retentionMs))
-TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset,
+TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments,
   "Timed out waiting for deletion of old segments")
-assertEquals(1, log.numberOfSegments)
+assertEquals(log.logStartOffset, endOffset)

Review Comment:
   The test is flaky post landing the PR in trunk: 
   
   
https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest
   
   We have to raise a similar fix in trunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest

2024-04-23 Thread Kamal Chandraprakash (Jira)
Kamal Chandraprakash created KAFKA-16605:


 Summary: Fix the flaky LogCleanerParameterizedIntegrationTest
 Key: KAFKA-16605
 URL: https://issues.apache.org/jira/browse/KAFKA-16605
 Project: Kafka
  Issue Type: Task
Reporter: Kamal Chandraprakash
Assignee: Kamal Chandraprakash






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest

2024-04-23 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash updated KAFKA-16605:
-
Description: 
https://ge.apache.org/scans/tests?search.relativeStartTime=P7D&search.rootProjectNames=kafka&search.timeZoneId=Asia%2FCalcutta&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest&tests.test=testCleansCombinedCompactAndDeleteTopic(CompressionType)%5B1%5D

> Fix the flaky LogCleanerParameterizedIntegrationTest
> 
>
> Key: KAFKA-16605
> URL: https://issues.apache.org/jira/browse/KAFKA-16605
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Major
>
> https://ge.apache.org/scans/tests?search.relativeStartTime=P7D&search.rootProjectNames=kafka&search.timeZoneId=Asia%2FCalcutta&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest&tests.test=testCleansCombinedCompactAndDeleteTopic(CompressionType)%5B1%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1576188830


##
server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java:
##
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class QuotaConfigs {
+public static final String NUM_QUOTA_SAMPLES_CONFIG = "quota.window.num";
+public static final String NUM_QUOTA_SAMPLES_DOC = "The number of samples 
to retain in memory for client quotas";
+public static final String NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG = 
"controller.quota.window.num";
+public static final String NUM_CONTROLLER_QUOTA_SAMPLES_DOC = "The number 
of samples to retain in memory for controller mutation quotas";
+public static final String NUM_REPLICATION_QUOTA_SAMPLES_CONFIG = 
"replication.quota.window.num";
+public static final String NUM_REPLICATION_QUOTA_SAMPLES_DOC = "The number 
of samples to retain in memory for replication quotas";
+public static final String 
NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_CONFIG = 
"alter.log.dirs.replication.quota.window.num";
+public static final String 
NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_DOC = "The number of samples to 
retain in memory for alter log dirs replication quotas";
+
+// Always have 10 whole windows + 1 current window
+public static final int NUM_QUOTA_SAMPLES_DEFAULT = 11;
+
+public static final String QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"quota.window.size.seconds";
+public static final String QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span 
of each sample for client quotas";
+public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"controller.quota.window.size.seconds";
+public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The 
time span of each sample for controller mutations quotas";
+public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"replication.quota.window.size.seconds";
+public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = 
"The time span of each sample for replication quotas";
+public static final String 
ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = 
"alter.log.dirs.replication.quota.window.size.seconds";
+public static final String 
ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of 
each sample for alter log dirs replication quotas";
+public static final int QUOTA_WINDOW_SIZE_SECONDS_DEFAULT = 1;
+
+public static final String CLIENT_QUOTA_CALLBACK_CLASS_CONFIG = 
"client.quota.callback.class";
+public static final String CLIENT_QUOTA_CALLBACK_CLASS_DOC = "The fully 
qualified name of a class that implements the ClientQuotaCallback interface, " +
+"which is used to determine quota limits applied to client 
requests. By default, the  and  " +
+"quotas that are stored in ZooKeeper are applied. For any given 
request, the most specific quota that matches the user principal " +
+"of the session and the client-id of the request is applied.";
+
+public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG = 
"leader.replication.throttled.replicas";
+public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_DOC = "A 
list of replicas for which log replication should be throttled on " +
+"the leader side. The list should describe a set of replicas in 
the form " +
+"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or 
alternatively the wildcard '*' can be used to throttle " +
+"all replicas for this topic.";
+public static final List 
LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT = Collections.emptyList();
+
+public static final String FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG 
= "follower.replication.throttled.replicas";
+public static final

[PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]

2024-04-23 Thread via GitHub


kamalcph opened a new pull request, #15787:
URL: https://github.com/apache/kafka/pull/15787

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]

2024-04-23 Thread via GitHub


kamalcph commented on code in PR #15748:
URL: https://github.com/apache/kafka/pull/15748#discussion_r1576190364


##
core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala:
##
@@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends 
AbstractLogCleanerIntegrati
 // Set the last modified time to an old value to force deletion of old 
segments
 val endOffset = log.logEndOffset
 log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * 
retentionMs))
-TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset,
+TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments,
   "Timed out waiting for deletion of old segments")
-assertEquals(1, log.numberOfSegments)
+assertEquals(log.logStartOffset, endOffset)

Review Comment:
   Raised #15787 PR to fix it in trunk



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576193128


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##
@@ -209,7 +209,7 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 
   class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams, 
JoinGroupCallback] {
 override def responseCallback(responsePromise: 
Promise[JoinGroupCallbackParams]): JoinGroupCallback = {
-  val callback: JoinGroupCallback = responsePromise.success(_)
+  val callback: JoinGroupCallback = responsePromise.success

Review Comment:
   This causes build error when using scala 2.12
   ```
   [Error] 
/home/chia7712/project/kafka/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:212:57:
 method with dependent type (value: 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallbackParams)responsePromise.type
 cannot be converted to function value
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072251579

   > > I think we can add new test case in next PR. We can more focus on 
migrate to ClusterTestExtensions in this PR.
   > 
   > Please take a look at [#15766 
(comment)](https://github.com/apache/kafka/pull/15766#discussion_r1576009920), 
and we need a bit refactor but it should not a hard work. So please try to 
address it in this PR.
   
   Update it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576233041


##
core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:
##
@@ -209,7 +209,7 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 
   class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams, 
JoinGroupCallback] {
 override def responseCallback(responsePromise: 
Promise[JoinGroupCallbackParams]): JoinGroupCallback = {
-  val callback: JoinGroupCallback = responsePromise.success(_)
+  val callback: JoinGroupCallback = responsePromise.success

Review Comment:
   Yup, fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -170,30 +227,23 @@ private void produceRecord() {
 }
 }
 
-private void withStableConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
+private void withConsumerGroup(Runnable body, boolean isStable, Properties 
consumerProperties) {
+try (Consumer consumer = 
createConsumer(consumerProperties)) {
+consumer.subscribe(Collections.singletonList(TOPIC));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS));
+Assertions.assertNotEquals(0, records.count());
 consumer.commitSync();
-body.run();
-} finally {
-Utils.closeQuietly(consumer, "consumer");
+if (isStable) {
+body.run();
+}
 }
-}
-
-private void withEmptyConsumerGroup(Runnable body) {
-Consumer consumer = createConsumer(new Properties());
-try {
-TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, 
DEFAULT_MAX_WAIT_MS);
-consumer.commitSync();
-} finally {
-Utils.closeQuietly(consumer, "consumer");
+if (!isStable) {
+body.run();
 }
-body.run();
 }
 
 private KafkaProducer createProducer(Properties config) {

Review Comment:
   It seems `config` is always empty, so please remove it.



##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +56,152 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+private final ClusterInstance clusterInstance;
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
+
+@AfterEach
+public void tearDown() {
+if (consumerGroupService != null) {
+consumerGroupService.close();
+}
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteOffsetsNonExistingGroup(String quorum) {
+@ClusterTest
+public void testDeleteOffsetsNonExistingGroup() {
 String group = "missing.group";
 String topic = "foo:1";
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(getArgs(group, topic));
+setupConsumerGroupService(getArgs(group, topic));
 
-Entry> res = 
service.deleteOffsets(group, Collections.singletonList(topic));
+Entry> res = 
consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
 assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey());
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void 
testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) {
-testWithStableConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
+@ClusterTest
+public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
+createTopic(TOPIC);
+Properties consumerProperties = new Properties();
+testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, 
true, consumerProperties);
+if (clusterInstance.isKRaftTest()) {
+consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+testWithConsumerGroup(TOPIC, 0, 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties);
+}
 }
 
-@ParameterizedTest
-   

Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1576247775


##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -98,23 +99,18 @@ public class ReassignPartitionsCommand {
 static final int EARLIEST_TOPICS_JSON_VERSION = 1;
 
 // Throttles that are set at the level of an individual broker.
-//DynamicConfig.Broker.LeaderReplicationThrottledRateProp
-static final String BROKER_LEVEL_LEADER_THROTTLE = 
"leader.replication.throttled.rate";
-//DynamicConfig.Broker.FollowerReplicationThrottledRateProp
-static final String BROKER_LEVEL_FOLLOWER_THROTTLE = 
"follower.replication.throttled.rate";
-//DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp
-static final String BROKER_LEVEL_LOG_DIR_THROTTLE = 
"replica.alter.log.dirs.io.max.bytes.per.second";
+static final String BROKER_LEVEL_LEADER_THROTTLE = 
QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG;

Review Comment:
   Do we need to keep those fields? or could we directly use QuotaConfigs?



##
checkstyle/import-control-metadata.xml:
##
@@ -123,6 +123,7 @@
 
 
 
+

Review Comment:
   Nit: The list is roughly ordered, can we put 
`org.apache.kafka.server.config` above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10496: Removed relying on external DNS servers in tests [kafka]

2024-04-23 Thread via GitHub


ijuma commented on PR #9315:
URL: https://github.com/apache/kafka/pull/9315#issuecomment-2072363189

   Fyi, Java 18+ do have a SPI for this:
   https://bugs.openjdk.org/browse/JDK-8263693


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1576308101


##
tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java:
##
@@ -98,23 +99,18 @@ public class ReassignPartitionsCommand {
 static final int EARLIEST_TOPICS_JSON_VERSION = 1;
 
 // Throttles that are set at the level of an individual broker.
-//DynamicConfig.Broker.LeaderReplicationThrottledRateProp
-static final String BROKER_LEVEL_LEADER_THROTTLE = 
"leader.replication.throttled.rate";
-//DynamicConfig.Broker.FollowerReplicationThrottledRateProp
-static final String BROKER_LEVEL_FOLLOWER_THROTTLE = 
"follower.replication.throttled.rate";
-//DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp
-static final String BROKER_LEVEL_LOG_DIR_THROTTLE = 
"replica.alter.log.dirs.io.max.bytes.per.second";
+static final String BROKER_LEVEL_LEADER_THROTTLE = 
QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG;

Review Comment:
   Good catch fixed changed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15774:
URL: https://github.com/apache/kafka/pull/15774#discussion_r1576308464


##
checkstyle/import-control-metadata.xml:
##
@@ -123,6 +123,7 @@
 
 
 
+

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]

2024-04-23 Thread via GitHub


dajac commented on code in PR #15785:
URL: https://github.com/apache/kafka/pull/15785#discussion_r1576317772


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##
@@ -918,40 +935,53 @@ private void 
maybeUpdateClassicProtocolMembersSupportedProtocols(
 
 /**
  * Updates the subscribed topic names count.
+ * Changes to the subscription model as a consequence
+ * of this update is reflected as well.
  *
  * @param oldMember The old member.
  * @param newMember The new member.
  */
-private void maybeUpdateSubscribedTopicNames(
+private void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(
 ConsumerGroupMember oldMember,
 ConsumerGroupMember newMember
 ) {
-maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, 
newMember);
+
maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(subscribedTopicNames, 
oldMember, newMember);
 }
 
 /**
- * Updates the subscription count.
+ * Updates the subscription count and the subscription model, if required.
  *
  * @param subscribedTopicCount  The map to update.
  * @param oldMember The old member.
  * @param newMember The new member.
  */
-private static void maybeUpdateSubscribedTopicNames(
+private void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(

Review Comment:
   Should we add unit test to validate this logic?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576329465


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -959,7 +960,7 @@ class ZkAdminManager(val config: KafkaConfig,
 } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) {
   (requestStatus.user, unknownScramMechanismMsg)

Review Comment:
   the condition above should use `.contains` instead of `== Some`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576332250


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   the braces here is redundant not sure if we want to clean this up as well or 
not 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on PR #15679:
URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072421039

   > @FrankYang0529 thanks for updated PR. please take a look at two comments.
   
   Hi @chia7712, thanks for the review. Updated it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576333034


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   We have this in few places in this class as well 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576338028


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -144,7 +144,7 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =

Review Comment:
   This is not used as far as I can see, do we still need it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576343801


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -300,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig,
   protected val allPartitions = new Pool[TopicPartition, HostedPartition](

Review Comment:
   Should we declare type for protected (and the other public variables above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)
Jakub Scholz created KAFKA-16606:


 Summary: JBOD support in KRaft does not seem to be gated by the 
metadata version
 Key: KAFKA-16606
 URL: https://issues.apache.org/jira/browse/KAFKA-16606
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
Reporter: Jakub Scholz


JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka [source 
code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
 suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
However, it seems to be possible to run KRaft cluster with JBOD even with older 
metadata versions such as {{{}3.6{}}}. For example, I have a cluster using the 
{{3.6}} metadata version:
{code:java}
bin/kafka-features.sh --bootstrap-server localhost:9092 describe
Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
{code}
Yet a KRaft cluster with JBOD seems to run fine:
{code:java}
bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
Querying brokers for log directories information
Received log directory information from brokers 2000,3000,1000
{"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-3","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-36","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-47","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-14","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-43","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-1","size":114240,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-10","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-22","size":0,"offsetLag":0,"isF

Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576354443


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   ough >500 is too much. Well they will get cleanup as we move more scala to 
java :D 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on PR #15786:
URL: https://github.com/apache/kafka/pull/15786#issuecomment-2072445566

   We also have couple of out-of-date parameters in javaDocs, we can either fix 
here or have another followup pr
   - 
https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L150
 
   - 
https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L270


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576351984


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   Yeah in Scala braces are not required around multi-line blocks. I've not 
made this change because braces are required in Java and we have the braces in 
Scala all over the code base. Changing this is probably a >500 line diff.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576358398


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -300,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig,
   protected val allPartitions = new Pool[TopicPartition, HostedPartition](

Review Comment:
   Ideally we should declare types for all public and protected fields but this 
is a huge change.
   
   Also while it's useful in some cases, in many I find it's not adding much 
value. In this specific example I even find it annoying as you get:
   ```
   protected val allPartitions: Pool[TopicPartition, HostedPartition] = new 
Pool[TopicPartition, HostedPartition](
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576360092


##
core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala:
##
@@ -52,8 +51,8 @@ object ZooKeeperClient {
  * @param sessionTimeoutMs session timeout in milliseconds
  * @param connectionTimeoutMs connection timeout in milliseconds
  * @param maxInFlightRequests maximum number of unacknowledged requests the 
client will send before blocking.
+ * @param clientConfig ZooKeeper client configuration, for TLS configs if 
desired

Review Comment:
   Nice, I believe we might need to update the have java doc in 
`registerStateChangeHandler` and `unregisterStateChangeHandler` in same file as 
well 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576363327


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   That's just my guesstimate, but I expect it to be large. I kind of focused 
on the low hanging fruits here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576366593


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig,
 users.get.filterNot(usersToSkip.contains).foreach { user =>
   try {
 val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, 
Sanitizer.sanitize(user))
-addToResultsIfHasScramCredential(user, userConfigs, true)
+addToResultsIfHasScramCredential(user, userConfigs, explicitUser = 
true)
   } catch {
 case e: Exception => {

Review Comment:
   make sense don't worry about it. We are moving more and more from scala to 
java anyway so these will get resolved over time. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840118#comment-17840118
 ] 

Mickael Maison commented on KAFKA-16606:


cc [~soarez] 

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-3","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-36","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-47","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-14","size":0,"

Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testLongTopicNames(quorum: String): Unit = {
 val client = Admin.create(createConfig)
-val longTopicName = String.join("", Collections.nCopies(249, "x"));
-val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+val longTopicName = String.join("", Collections.nCopies(249, "x"))

Review Comment:
   Small scala suggestion here (which you feel free to ignore) we can use 
`List.fill(249)("x").mkString("")` instead of Java `String.join` and 
`Collections.nCopies`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]

2024-04-23 Thread via GitHub


KevinZTW commented on code in PR #15714:
URL: https://github.com/apache/kafka/pull/15714#discussion_r1576372796


##
storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java:
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentTopicIdException;
+
+import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class PartitionMetadataFileTest  {
+private final File dir = TestUtils.tempDirectory();
+private final File file = PartitionMetadataFile.newFile(dir);
+
+@Test
+public void testSetRecordWithDifferentTopicId() {
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+partitionMetadataFile.record(topicId);
+Uuid differentTopicId = Uuid.randomUuid();
+assertThrows(InconsistentTopicIdException.class, () -> 
partitionMetadataFile.record(differentTopicId));
+}
+
+@Test
+public void testSetRecordWithSameTopicId() {
+PartitionMetadataFile partitionMetadataFile = new 
PartitionMetadataFile(file, null);
+Uuid topicId = Uuid.randomUuid();
+partitionMetadataFile.record(topicId);
+assertDoesNotThrow(() -> partitionMetadataFile.record(topicId));

Review Comment:
   sorry I missed this one just update the PR as suggested!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


OmniaGM commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   @ValueSource(strings = Array("zk", "kraft"))
   def testLongTopicNames(quorum: String): Unit = {
 val client = Admin.create(createConfig)
-val longTopicName = String.join("", Collections.nCopies(249, "x"));
-val invalidTopicName = String.join("", Collections.nCopies(250, "x"));
+val longTopicName = String.join("", Collections.nCopies(249, "x"))

Review Comment:
   Small scala suggestion here (which feel free to ignore) we can use 
`List.fill(249)("x").mkString("")` instead of Java `String.join` and 
`Collections.nCopies`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576386813


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +256,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   We can change the type from `Properties` to `Map`. With that 
change we don't need to create a lot of `Properties` in each test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]

2024-04-23 Thread via GitHub


nikramakrishnan commented on PR #15241:
URL: https://github.com/apache/kafka/pull/15241#issuecomment-2072530250

   Bump! @satishd @kamalcph can we get this review going? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576392526


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
 

[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840125#comment-17840125
 ] 

Igor Soarez commented on KAFKA-16606:
-

Thanks for bringing this to my attention [~mimaison].

Hi [~scholzj] , thanks for pointing this out. I think there's some confusion 
here with a JBOD configuration being +allowed+ vs being +supported+ in KRaft.

In terms of just reading and writing data to multiple log directories, as long 
as those direcories are always available, there's nothing special about KRaft 
that would require changes compared with ZK mode. What is enabled with 3.7 is 
the handling of failed log directories. You'll find that partitions don't get 
new leaders elected – becoming indefinitely unavailable – if the log directory 
for the leader replica fails but the broker stays alive.

If a single directory is configured and it becomes unavailable the broker shuts 
down, as there is no point in continuing to run without access to storage. When 
it shuts down the controller becomes aware of that – via an ephemeral znode in 
ZK mode, or via missing hearbeats in KRaft – and it will re-elect leaders for 
partitions that were led by the broker. When multiple directories are 
configured, it is critical to have a separate mechanism to let the controller 
know there is a partial failure – the broker is still alive and operational on 
the remaining log dirs, but any partitions on the directory that failed need a 
leadership and ISR update.

In ZK mode that was handled by notifying the controller via a znode, so a an 
alternative solution was required for KRaft. You can find the details in 
[KIP-858|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft].

Let me know if that makes sense.

 

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


lianetm commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576409478


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,448 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+})
+public class DeleteConsumerGroupsTest {
+private final ClusterInstance cluster;
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+}
+
+@ClusterTest
+public void testDeleteWithTopicOption() {
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"};
 assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteCmdNonExistingGroup() {
 String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
+String[] cgcArgs = new String[]{"--bootstrap-server", 
cluster.bootstrapServers(), "--delete", "--group", missingGroup};
 ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
-
 String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
 assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was 
not detected while deleting consumer group");
 }
 
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
+@ClusterTest
+public void testDeleteNonExistingGroup() {
 String missingGroup = "missing.group";
-
  

Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576420731


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -144,7 +144,7 @@ case class LogReadResult(info: FetchDataInfo,
   def withEmptyFetchInfo: LogReadResult =

Review Comment:
   You're right, this is unused -> removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


mimaison commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576422765


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -959,7 +960,7 @@ class ZkAdminManager(val config: KafkaConfig,
 } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) {
   (requestStatus.user, unknownScramMechanismMsg)

Review Comment:
   It looks weird to me but it seems `contains()` is Scala idiomatic, so I made 
the change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


FrankYang0529 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576432135


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -202,7 +256,7 @@ private KafkaProducer 
createProducer(Properties config) {
 }
 
 private Consumer createConsumer(Properties config) {

Review Comment:
   Updated it and also added empty map for zk, or zk will not be tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576432963


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   I make `logDataDirectories()` return Set instead List to get 
rid of the redundant arraylist wrapping.



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -121,16 +146,16 @@ public BrokerNode build(
 private final boolean combined;
 private final Map propertyOverrides;
 
-BrokerNode(
+private BrokerNode(
 Uuid incarnationId,
 MetaPropertiesEnsemble initialMetaPropertiesEnsemble,
 boolean combined,
 Map propertyOverrides
 ) {
-this.incarnationId = incarnationId;
-this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble;

Review Comment:
   I make `logDataDirectories()` return `Set` instead `List` to 
get rid of the redundant arraylist wrapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840132#comment-17840132
 ] 

Jakub Scholz commented on KAFKA-16606:
--

[~soarez] That sounds like without using the 3.7-IV2 metadata JBOD in Kraft 
works fine until it fails and then it might cause problems. So I wonder if it 
would have been better to protect the users from that situation.

But it sounds like it was an intentional decision. And as I said, changing it 
now might be anyway a bit tricky as it might break someone's cluster after 
upgrading to 3.7.1 / 3.8.0. So I guess it sounds good as it is and I will close 
this. Thanks for the explanation.

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","s

Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]

2024-04-23 Thread via GitHub


mimaison commented on PR #15756:
URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072692296

   While it's not directly adding new configurations, it's effectively changing 
the behavior of `ExtractField` and `InsertField` and making them support new 
configurations. The `configure()` method on Transformation is only called with 
configurations provided with their prefix. In this example a user would need to 
set something like:
   
   ```
   transforms=my-smt
   transforms.my-smt.type=MyTransformation
   transforms.my-smt.key.converter.replace.null.with.default=false
   ```
   
   Also by not defining the configurations explicitly users can't discover them 
using the `GET /connector-plugins/{plugin-type}/config`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-23 Thread via GitHub


johnnychhsu opened a new pull request, #15788:
URL: https://github.com/apache/kafka/pull/15788

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


frankvicky commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072734321

   Hi @lianetm, @chia7712 
   Thanks for the suggestions, I have addressed the comments. 😀


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Various cleanups in core [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15786:
URL: https://github.com/apache/kafka/pull/15786#discussion_r1576457731


##
core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala:
##
@@ -29,9 +29,9 @@ import scala.jdk.CollectionConverters._
  */
 class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: 
Logger) {
   import LinuxIoMetricsCollector._
-  var lastUpdateMs: Long = -1L
-  var cachedReadBytes:Long = 0L
-  var cachedWriteBytes:Long = 0L
+  private var lastUpdateMs: Long = -1L

Review Comment:
   It seems the type declaration is unnecessary since its literal ends with `L`



##
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala:
##
@@ -347,7 +347,7 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
 if (isKRaftTest()) {
   val result = new util.HashMap[Uuid, String]()
   
controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach
 {

Review Comment:
   how about 
`controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach((k,
 v) => result.put(v, k))`



##
core/src/main/scala/kafka/server/SharedServer.scala:
##
@@ -107,16 +107,16 @@ class SharedServer(
   @volatile var brokerMetrics: BrokerServerMetrics = _
   @volatile var controllerServerMetrics: ControllerMetadataMetrics = _
   @volatile var loader: MetadataLoader = _
-  val snapshotsDisabledReason = new AtomicReference[String](null)
+  private val snapshotsDisabledReason = new AtomicReference[String](null)
   @volatile var snapshotEmitter: SnapshotEmitter = _
-  @volatile var snapshotGenerator: SnapshotGenerator = _
-  @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
+  @volatile private var snapshotGenerator: SnapshotGenerator = _
+  @volatile private var metadataLoaderMetrics: MetadataLoaderMetrics = _
 
   def clusterId: String = metaPropsEnsemble.clusterId().get()
 
-  def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt()
+  def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt
 
-  def isUsed(): Boolean = synchronized {
+  private def isUsed(): Boolean = synchronized {

Review Comment:
   How about `isUsed`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Jakub Scholz (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jakub Scholz resolved KAFKA-16606.
--
Resolution: Not A Problem

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-3","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-36","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-47","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-14","size":0,"offsetLag":0,"isFuture":false},{"partition":

Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]

2024-04-23 Thread via GitHub


johnnychhsu commented on PR #15787:
URL: https://github.com/apache/kafka/pull/15787#issuecomment-2072775161

   nice fix! crystal clear solution :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) {
 return this;
 }
 
-public BrokerNode build(
-String baseDirectory,
-Uuid clusterId,
-boolean combined
-) {
+public Builder setClusterId(Uuid clusterId) {
+this.clusterId = clusterId;
+return this;
+}
+
+public Builder setBaseDirectory(String baseDirectory) {
+this.baseDirectory = baseDirectory;
+return this;
+}
+
+public Builder setCombined(boolean combined) {
+this.combined = combined;
+return this;
+}
+
+public Builder setPropertyOverrides(Map 
propertyOverrides) {
+this.propertyOverrides = Collections.unmodifiableMap(new 
HashMap<>(propertyOverrides));
+return this;
+}
+
+public BrokerNode build() {
 if (id == -1) {
 throw new RuntimeException("You must set the node id.");
 }
-if (incarnationId == null) {
-incarnationId = Uuid.randomUuid();
-}
 List logDataDirectories = IntStream

Review Comment:
   Could you add null check for `baseDirectory`?



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) {
 }
 
 public Builder setNumControllerNodes(int numControllerNodes) {
-if (numControllerNodes < 0) {
-throw new RuntimeException("Invalid negative value for 
numControllerNodes");
-}
-
-while (controllerNodeBuilders.size() > numControllerNodes) {
-controllerNodeBuilders.pollFirstEntry();
-}
-while (controllerNodeBuilders.size() < numControllerNodes) {
-int nextId = startControllerId();
-if (!controllerNodeBuilders.isEmpty()) {
-nextId = controllerNodeBuilders.lastKey() + 1;
-}
-controllerNodeBuilders.put(nextId,
-new ControllerNode.Builder().
-setId(nextId));
-}
+this.numControllerNodes = numControllerNodes;
 return this;
 }
 
 public Builder setNumBrokerNodes(int numBrokerNodes) {
-return setBrokerNodes(numBrokerNodes, 1);
+this.numBrokerNodes = numBrokerNodes;
+return this;
+}
+
+public Builder setNumDisksPerBroker(int numDisksPerBroker) {
+this.numDisksPerBroker = numDisksPerBroker;
+return this;
+}
+
+public Builder setPerBrokerProperties(Map> perBrokerProperties) {
+this.perBrokerProperties = Collections.unmodifiableMap(
+perBrokerProperties.entrySet().stream()
+.collect(Collectors.toMap(Map.Entry::getKey, e -> 
Collections.unmodifiableMap(new HashMap<>(e.getValue());
+return this;
 }
 
-public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) {
+public TestKitNodes build() {
+if (numControllerNodes < 0) {
+throw new RuntimeException("Invalid negative value for 
numControllerNodes");
+}
 if (numBrokerNodes < 0) {
 throw new RuntimeException("Invalid negative value for 
numBrokerNodes");
 }
-if (disksPerBroker <= 0) {
-throw new RuntimeException("Invalid value for disksPerBroker");
-}
-while (brokerNodeBuilders.size() > numBrokerNodes) {
-brokerNodeBuilders.pollFirstEntry();
+if (numDisksPerBroker <= 0) {
+throw new RuntimeException("Invalid value for 
numDisksPerBroker");
 }
-while (brokerNodeBuilders.size() < numBrokerNodes) {
-int nextId = startBrokerId();
-if (!brokerNodeBuilders.isEmpty()) {
-nextId = brokerNodeBuilders.lastKey() + 1;
-}
-BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder()
-.setId(nextId)
-.setNumLogDirectories(disksPerBroker);
-brokerNodeBuilders.put(nextId, brokerNodeBuilder);
-}
-return this;
-}
 
-public TestKitNodes build() {
 String baseDirectory = TestUtils.tempDirectory().getAbsolutePath();

Review Comment:
   We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` 
will delete the return folder when terminating.



##
core/src/test/java/kafka/testkit/TestKitNodes.java:
##
@@ -167,11 +164,1

Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]

2024-04-23 Thread via GitHub


johnnychhsu commented on code in PR #15727:
URL: https://github.com/apache/kafka/pull/15727#discussion_r1576500340


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala:
##
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package unit.kafka.server
+
+import kafka.server.GroupCoordinatorBaseRequestTest
+import kafka.test.ClusterInstance
+import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.utils.TestUtils
+import org.apache.kafka.common.ConsumerGroupState
+import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData
+import 
org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, 
DescribedGroup, TopicPartitions}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{Tag, Timeout}
+
+import scala.jdk.CollectionConverters._
+
+@Timeout(120)
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1)

Review Comment:
   may i know if we also need to include zk mode for this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576507146


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]

2024-04-23 Thread via GitHub


johnnychhsu commented on PR #15720:
URL: https://github.com/apache/kafka/pull/15720#issuecomment-2072812226

   thanks for the review @cmccabe ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15679:
URL: https://github.com/apache/kafka/pull/15679#discussion_r1576508031


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java:
##
@@ -42,109 +58,141 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 
-public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends 
ConsumerGroupCommandTest {
-String[] getArgs(String group, String topic) {
-return new String[] {
-"--bootstrap-server", bootstrapServers(listenerName()),
-"--delete-offsets",
-"--group", group,
-"--topic", topic
-};
+@Tag("integration")
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+@ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+@ExtendWith(ClusterTestExtensions.class)
+public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
+public static final String TOPIC = "foo";
+public static final String GROUP = "test.group";
+private final ClusterInstance clusterInstance;
+
+private ConsumerGroupCommand.ConsumerGroupService consumerGroupService;
+private final Iterable> consumerConfigs;
+
+DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance 
clusterInstance) {
+this.clusterInstance = clusterInstance;
+this.consumerConfigs = clusterInstance.isKRaftTest()
+? Arrays.asList(
+new HashMap() {{

Review Comment:
   We can use immutable map and make `createConsumer` create a inner mutable 
map to collect all configs. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]

2024-04-23 Thread via GitHub


mfvitale commented on PR #15756:
URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072820368

   > The `configure()` method on Transformation is only called with 
configurations provided with their prefix.
   
   This cleared my doubt. So in that case it should be clear declared as an SMT 
configuration. I'll open a KIP. Thanks for the explanation. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]

2024-04-23 Thread via GitHub


rreddy-22 commented on code in PR #15717:
URL: https://github.com/apache/kafka/pull/15717#discussion_r1576516058


##
jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java:
##
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec;
+import org.apache.kafka.coordinator.group.assignor.AssignmentSpec;
+import org.apache.kafka.coordinator.group.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.assignor.MemberAssignment;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.assignor.UniformAssignor;
+import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata;
+import org.apache.kafka.coordinator.group.consumer.TopicMetadata;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+public class ServerSideAssignorBenchmark {
+
+public enum AssignorType {
+RANGE(new RangeAssignor()),
+UNIFORM(new UniformAssignor());
+
+private final PartitionAssignor assignor;
+
+AssignorType(PartitionAssignor assignor) {
+this.assignor = assignor;
+}
+
+public PartitionAssignor assignor() {
+return assignor;
+}
+}
+
+/**
+ * The subscription pattern followed by the members of the group.
+ *
+ * A subscription model is considered homogenous if all the members of the 
group
+ * are subscribed to the same set of topics, it is heterogeneous otherwise.
+ */
+public enum SubscriptionModel {
+HOMOGENEOUS, HETEROGENEOUS
+}
+
+/**
+ * The assignment type is decided based on whether all the members are 
assigned partitions
+ * for the first time (full), or incrementally when a rebalance is 
triggered.
+ */
+public enum AssignmentType {
+FULL, INCREMENTAL
+}
+
+@Param({"100", "500", "1000", "5000", "1"})
+private int memberCount;
+
+@Param({"5", "10", "50"})
+private int partitionsToMemberRatio;
+
+@Param({"10", "100", "1000"})
+private int topicCount;
+
+@Param({"true", "false"})
+private boolean isRackAware;
+
+@Param({"HOMOGENEOUS", "HETEROGENEOUS"})
+private SubscriptionModel subscriptionModel;
+
+@Param({"RANGE", "UNIFORM"})
+private AssignorType assignorType;
+
+@Param({"FULL", "INCREMENTAL"})
+private AssignmentType assignmentType;
+
+private PartitionAssignor partitionAssignor;
+
+private static final int NUMBER_OF_RACKS = 3;
+
+private static final int MAX_BUCKET_COUNT = 5;
+
+private AssignmentSpec assignmentSpec;
+
+private SubscribedTopicDescriber subscribedTopicDescriber;
+
+private final List allTopicIds = new ArrayList<>(topicCount);
+
+@Setup(Level.Trial)
+public void setup() {
+Map topicMetadata = createTopicMetadata();
+subscribedTopicDescriber = new SubscribedTo

[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840148#comment-17840148
 ] 

Igor Soarez commented on KAFKA-16606:
-

That's right [~scholzj] . When KIP-858 started, the documentation was already 
explicit about JBOD support being one of the missing features in KRaft, but the 
configuration was already allowed, so it didn't seem right to change it at the 
time, but in hindsight maybe that would've been a better decision.

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on code in PR #15766:
URL: https://github.com/apache/kafka/pull/15766#discussion_r1576523223


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java:
##
@@ -17,279 +17,440 @@
 package org.apache.kafka.tools.consumer.group;
 
 import joptsimple.OptionException;
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static java.util.Collections.singleton;
+import static java.util.Collections.singletonMap;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.apache.kafka.common.ConsumerGroupState.EMPTY;
+import static org.apache.kafka.common.ConsumerGroupState.STABLE;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest {
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteWithTopicOption(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"};
-assertThrows(OptionException.class, () -> 
getConsumerGroupService(cgcArgs));
-}
-
-@ParameterizedTest
-@ValueSource(strings = {"zk", "kraft"})
-public void testDeleteCmdNonExistingGroup(String quorum) {
-createOffsetsTopic(listenerName(), new Properties());
-String missingGroup = "missing.group";
-
-String[] cgcArgs = new String[]{"--bootstrap-server", 
bootstrapServers(listenerName()), "--delete", "--group", missingGroup};
-ConsumerGroupCommand.ConsumerGroupService service = 
getConsumerGroupService(cgcArgs);
 
-String output = 
ToolsTestUtils.grabConsoleOutput(service::deleteGroups);
-assertTrue(output.contains("Group '" + missingGroup + "' could not be 
deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()),
-"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not 
detected while deleting consumer group");
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = 
"1"),
+@ClusterConfigProperty(key = "offsets.topic.replication.factor", value 
= "1"),
+@ClusterConfigProperty(key = "group.coordinator.new.enable", value = 
"true")
+})
+public class DeleteConsumerGroupsTest {
+private static final String TOPIC = "foo";
+private static final String GROUP = "test.group";
+private static final String MISSING_GROUP = "missing.group";
+
+private final ClusterInstance cluster;
+private final Iterable groupProtocols;
+
+public DeleteConsumerGroupsTest(ClusterInstance cluster) {
+this.cluster = cluster;
+this.groupProtocols = cluster.isKRaftTest()
+? Arrays.asList(CLASSIC, CONSUMER)
+: singleton(C

Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15766:
URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072841232

   @frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer 
running in background. Hence, we can consider moving 
`AbstractConsumerGroupExecutor`/`ConsumerGroupExecutor` to a individual java 
file. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]

2024-04-23 Thread via GitHub


brandboat commented on code in PR #15761:
URL: https://github.com/apache/kafka/pull/15761#discussion_r1576531833


##
core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala:
##
@@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) 
extends AbstractApiVersio
 assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion())
   }
 
-  @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties 
= Array(
-new ClusterConfigProperty(key = "unstable.api.versions.enable", value = 
"false"),
-new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value 
= "false"),
+  @ClusterTests(Array(

Review Comment:
   Already added the TODO in the top of this file. L30



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version

2024-04-23 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840150#comment-17840150
 ] 

Mickael Maison commented on KAFKA-16606:


To me that kind of looks like a bug. Previously JBOD was not supported with 
KRaft so it was a big deal that it did not quite work. As we're approaching 
JBOD being production ready, we should ease the rough edges and avoid allowing 
weird unsupported configurations. 

As JBOD is not production ready, would we really break anyone's cluster if we 
prevented using multiple log dirs with KRaft and the version set as < 3.7?  I 
understand some people may disagree but I wonder if it's a discussion worth 
having. 

> JBOD support in KRaft does not seem to be gated by the metadata version
> ---
>
> Key: KAFKA-16606
> URL: https://issues.apache.org/jira/browse/KAFKA-16606
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Jakub Scholz
>Priority: Major
>
> JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka 
> [source 
> code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195]
>  suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. 
> However, it seems to be possible to run KRaft cluster with JBOD even with 
> older metadata versions such as {{{}3.6{}}}. For example, I have a cluster 
> using the {{3.6}} metadata version:
> {code:java}
> bin/kafka-features.sh --bootstrap-server localhost:9092 describe
> Feature: metadata.version       SupportedMinVersion: 3.0-IV1    
> SupportedMaxVersion: 3.7-IV4    FinalizedVersionLevel: 3.6-IV2  Epoch: 1375 
> {code}
> Yet a KRaft cluster with JBOD seems to run fine:
> {code:java}
> bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
> Querying brokers for log directories information
> Received log directory information from brokers 2000,3000,1000
> {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":

Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15788:
URL: https://github.com/apache/kafka/pull/15788#issuecomment-2072851173

   @johnnychhsu Instead of removing them, could you make `MetadataLogConfig` 
use those help methods? 
   
   
https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/MetadataLogConfig.scala#L38


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Current working branch for KIP-848 kip [kafka]

2024-04-23 Thread via GitHub


lucasbru opened a new pull request, #15789:
URL: https://github.com/apache/kafka/pull/15789

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >