Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575753935 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") +topicProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, "1024") + +createTopic(topic, partitionNum, 1, topicProperties) +assertEquals(logDir1, servers.head.logManager.getLog(tp).get.dir.getParent) + +// send enough records to trigger log rolling +(0 until 20).foreach { _ => + TestUtils.generateAndProduceMessages(servers, topic, 10, 1) +} +TestUtils.waitUntilTrue(() => servers.head.logManager.getLog(new TopicPartition(topic, 0)).get.numberOfSegments > 1, + "timed out waiting for log segment to roll") + +// Wait for log segment retention. Override initialTaskDelayMs as 5 seconds. +// The first retention task is executed after 5 seconds, so waiting for 10 seconds should be enough. Review Comment: Fixed it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16424: remove truncated logs after alter dir [kafka]
FrankYang0529 commented on code in PR #15616: URL: https://github.com/apache/kafka/pull/15616#discussion_r1575754146 ## core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala: ## @@ -116,6 +124,56 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } + @Test + def testAlterReplicaLogDirsRequestWithRetention(): Unit = { +val partitionNum = 1 + +// Alter replica dir before topic creation +val logDir1 = new File(servers.head.config.logDirs(1)).getAbsolutePath +val partitionDirs1 = (0 until partitionNum).map(partition => new TopicPartition(topic, partition) -> logDir1).toMap +val alterReplicaLogDirsResponse1 = sendAlterReplicaLogDirsRequest(partitionDirs1) + +// The response should show error UNKNOWN_TOPIC_OR_PARTITION for all partitions +val tp = new TopicPartition(topic, 0) +assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, findErrorForPartition(alterReplicaLogDirsResponse1, tp)) +assertTrue(servers.head.logManager.getLog(tp).isEmpty) + +val topicProperties = new Properties() +topicProperties.put(TopicConfig.RETENTION_BYTES_CONFIG, "1024") +topicProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "1") Review Comment: Yes, I add more comments for it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13459) MM2 should be able to add the source offset to the record header
[ https://issues.apache.org/jira/browse/KAFKA-13459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17839981#comment-17839981 ] Daniel Urban commented on KAFKA-13459: -- [~viktorsomogyi] No plans, and it does require a KIP. > MM2 should be able to add the source offset to the record header > > > Key: KAFKA-13459 > URL: https://issues.apache.org/jira/browse/KAFKA-13459 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Viktor Somogyi-Vass >Priority: Major > > MM2 could add the source offset to the record header to help with diagnostics > in some use-cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16465: Fix consumer sys test revocation validation [kafka]
lucasbru commented on code in PR #15778: URL: https://github.com/apache/kafka/pull/15778#discussion_r1575801343 ## tests/kafkatest/tests/client/consumer_test.py: ## @@ -242,16 +242,15 @@ def test_static_consumer_bounce(self, clean_shutdown, static_membership, bounce_ self.rolling_bounce_consumers(consumer, keep_alive=num_keep_alive, num_bounces=num_bounces) num_revokes_after_bounce = consumer.num_revokes_for_alive() - num_revokes_before_bounce - -check_condition = num_revokes_after_bounce != 0 + # under static membership, the live consumer shall not revoke any current running partitions, # since there is no global rebalance being triggered. if static_membership: -check_condition = num_revokes_after_bounce == 0 - -assert check_condition, \ -"Total revoked count %d does not match the expectation of having 0 revokes as %d" % \ -(num_revokes_after_bounce, check_condition) +assert num_revokes_after_bounce == 0, \ +"Unexpected revocation triggered when bouncing static member. Expecting 0 but had %d revocations" % num_revokes_after_bounce +elif consumer.is_eager(): +assert num_revokes_after_bounce != 0, \ Review Comment: Did this test work before for `CooperativeStickyAssignor`, or was this case not tested? Also, is there _any_ way now to detect that without static membership, a rebalance happens during the roll? It seems like in the case where static_membership == false, and is_eager == false, we are not asserting anything now, so I wonder how much value is still there in that combination. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16461: New consumer fails to consume records in security_test.py system test [kafka]
lucasbru merged PR #15746: URL: https://github.com/apache/kafka/pull/15746 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix logging of KafkaStreams close [kafka]
soarez merged PR #15783: URL: https://github.com/apache/kafka/pull/15783 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
chia7712 commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1575855253 ## server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class QuotaConfigs { +public static final String NUM_QUOTA_SAMPLES_CONFIG = "quota.window.num"; +public static final String NUM_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for client quotas"; +public static final String NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG = "controller.quota.window.num"; +public static final String NUM_CONTROLLER_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for controller mutation quotas"; +public static final String NUM_REPLICATION_QUOTA_SAMPLES_CONFIG = "replication.quota.window.num"; +public static final String NUM_REPLICATION_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for replication quotas"; +public static final String NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_CONFIG = "alter.log.dirs.replication.quota.window.num"; +public static final String NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for alter log dirs replication quotas"; + +// Always have 10 whole windows + 1 current window +public static final int NUM_QUOTA_SAMPLES_DEFAULT = 11; + +public static final String QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "quota.window.size.seconds"; +public static final String QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for client quotas"; +public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "controller.quota.window.size.seconds"; +public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for controller mutations quotas"; +public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "replication.quota.window.size.seconds"; +public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for replication quotas"; +public static final String ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "alter.log.dirs.replication.quota.window.size.seconds"; +public static final String ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for alter log dirs replication quotas"; +public static final int QUOTA_WINDOW_SIZE_SECONDS_DEFAULT = 1; + +public static final String CLIENT_QUOTA_CALLBACK_CLASS_CONFIG = "client.quota.callback.class"; +public static final String CLIENT_QUOTA_CALLBACK_CLASS_DOC = "The fully qualified name of a class that implements the ClientQuotaCallback interface, " + +"which is used to determine quota limits applied to client requests. By default, theand " + +"quotas that are stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal " + +"of the session and the client-id of the request is applied."; + +public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "leader.replication.throttled.replicas"; +public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_DOC = "A list of replicas for which log replication should be throttled on " + +"the leader side. The list should describe a set of replicas in the form " + +"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " + +"all replicas for this topic."; +public static final List LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT = Collections.emptyList(); + +public static final String FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "follower.replication.throttled.replicas"; +public static fina
Re: [PR] MINOR: Add junit properties to display parameterized test names [kafka]
chia7712 commented on PR #14983: URL: https://github.com/apache/kafka/pull/14983#issuecomment-2071780823 I have backport this PR to 3.7 see https://github.com/apache/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2071783047 The failed tests is fixed by https://github.com/chia7712/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 merged PR #15777: URL: https://github.com/apache/kafka/pull/15777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
chia7712 commented on PR #15354: URL: https://github.com/apache/kafka/pull/15354#issuecomment-2071806320 @jolshan Do you plan to backport this to 3.7? I recently try to backport other PRs to 3.7 and encounter the same flaky -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on PR #15772: URL: https://github.com/apache/kafka/pull/15772#issuecomment-2071831061 > controller.listener.names oh, it will be moved to `KRaftConfigs`. please ignore my previous comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 commented on PR #15772: URL: https://github.com/apache/kafka/pull/15772#issuecomment-2071861237 Those failed tests pass on my local. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move socket configs into org.apache.kafka.network.SocketServerConfigs [kafka]
chia7712 merged PR #15772: URL: https://github.com/apache/kafka/pull/15772 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
chia7712 commented on PR #15762: URL: https://github.com/apache/kafka/pull/15762#issuecomment-2071864618 retrigger QA due to incomplete build. I will merge it after QA get completed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1576005665 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote.quota; + +import kafka.server.QuotaType; +import kafka.server.SensorAccess; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.SimpleRate; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.BoxedUnit; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class RLMQuotaManager { +private static final Logger LOGGER = LoggerFactory.getLogger(RLMQuotaManager.class); + +private final RLMQuotaManagerConfig config; +private final Metrics metrics; +private final QuotaType quotaType; +private final String description; +private final Time time; + +private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); +private final SensorAccess sensorAccess; +private Quota quota; + +public RLMQuotaManager(RLMQuotaManagerConfig config, Metrics metrics, QuotaType quotaType, String description, Time time) { +this.config = config; +this.metrics = metrics; +this.quotaType = quotaType; +this.description = description; +this.time = time; + +this.quota = new Quota(config.getQuotaBytesPerSecond(), true); +this.sensorAccess = new SensorAccess(lock, metrics); +} + +public void updateQuota(Quota newQuota) { +lock.writeLock().lock(); +try { +this.quota = newQuota; + +Map allMetrics = metrics.metrics(); +MetricName quotaMetricName = metricName(); +allMetrics.forEach((metricName, metric) -> { +if (metricName.name().equals(quotaMetricName.name()) && metricName.group().equals(quotaMetricName.group())) { +Map metricTags = metricName.tags(); +LOGGER.info("Sensor for quota-id {} already exists. Setting quota to {} in MetricConfig", metricTags, newQuota); Review Comment: Makes sense, will change this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15265: Add Remote Log Manager quota manager [kafka]
abhijeetk88 commented on code in PR #15625: URL: https://github.com/apache/kafka/pull/15625#discussion_r1576007574 ## core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java: ## Review Comment: Yes, the integration of the quota manager will come in the follow-up PRs. I have mentioned it in the description of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576009920 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576013151 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -171,29 +209,27 @@ private void produceRecord() { } private void withStableConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +try (Consumer consumer = createConsumer(new Properties());) { Review Comment: please remove `;` ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +238,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Could you add tests for `GroupProtocol.CONSUMER`? see comment: https://github.com/apache/kafka/pull/15766#discussion_r1576009920 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -171,29 +209,27 @@ private void produceRecord() { } private void withStableConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +try (Consumer consumer = createConsumer(new Properties());) { +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +Assertions.assertNotEquals(0, records.count()); consumer.commitSync(); body.run(); -} finally { -Utils.closeQuietly(consumer, "consumer"); } } private void withEmptyConsumerGroup(Runnable body) { Review Comment: Can we merge `withStableConsumerGroup` and `withEmptyConsumerGroup`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576056664 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,11 +74,27 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(propertyOverrides); Review Comment: Thanks, I'll address this in other builders also. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Various cleanups in core [kafka]
mimaison opened a new pull request, #15786: URL: https://github.com/apache/kafka/pull/15786 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
Sagar Rao created KAFKA-16604: - Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs Key: KAFKA-16604 URL: https://issues.apache.org/jira/browse/KAFKA-16604 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao Currently, one can create ConfigKey by either invoking the public constructor directly and passing it to a ConfigDef object or by invoking the a bunch of define methods. The 2 ways can get confusing at times. Moreover, it could lead to errors as was noticed in KAFKA-16592 We should ideally have only 1 way exposed to the users which IMO should be to create the objects only through the exposed define methods. This ticket is about marking the public constructor of ConfigKey as Deprecated first and then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16592: Add a new constructor which invokes the existing constructor with default value for alternativeString [kafka]
vamossagar12 commented on PR #15762: URL: https://github.com/apache/kafka/pull/15762#issuecomment-2072021300 @chia7712 , I created this ticket: [KAFKA-16604](https://issues.apache.org/jira/browse/KAFKA-16604) for tracking the KIP -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576082586 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +238,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Can we do it in another PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
dajac commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1576085044 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTopicM
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072051627 > @FrankYang0529 sorry that I check the PR again, and more comments are left. PTAL Hi @chia7712, I addressed last comments. I think we can add new test case in next PR. We can more focus on migrate to ClusterTestExtensions in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16554: Online downgrade triggering and group type conversion [kafka]
dajac commented on code in PR #15721: URL: https://github.com/apache/kafka/pull/15721#discussion_r1576113492 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -1422,10 +1499,16 @@ private CoordinatorResult consumerGr ) throws ApiException { ConsumerGroup group = consumerGroup(groupId); List records; +CompletableFuture appendFuture = null; if (instanceId == null) { ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, false); log.info("[GroupId {}] Member {} left the consumer group.", groupId, memberId); -records = consumerGroupFenceMember(group, member); +if (validateOnlineDowngrade(group, memberId)) { +records = new ArrayList<>(); +appendFuture = convertToClassicGroup(group, memberId, records); +} else { +records = consumerGroupFenceMember(group, member); +} Review Comment: Having this duplicated code in multiple places is a tad annoying, don't you think? I wonder if we could push it down into `consumerGroupFenceMember` as it seems that the downgrade must be checked whenever `consumerGroupFenceMember` is called. `consumerGroupFenceMember` could return a future or null and take the records as an argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on PR #15761: URL: https://github.com/apache/kafka/pull/15761#issuecomment-2072088191 > @brandboat Sorry that I leave more comments below. please take a look. thanks Thank you for your patience! I appreciate your input. Already addressed all comments as above. Please take another look when you are available :smiley: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Modified System.getProperty("line.separator") to java.lang.System.lineSeparator() [kafka]
TaiJuWu commented on PR #15782: URL: https://github.com/apache/kafka/pull/15782#issuecomment-2072107254 > Thank you for the changes. > > Is there a reason to prefer the `java.lang.` prefix? There is no special reason, Removing prefix is ok. Following other code, Removing this prefix, thanks for your remind. > > There are other uses of `System.getProperty("line.separator")`, should those be changed too? > > * clients/src/main/java/org/apache/kafka/common/utils/Shell.java > * clients/src/main/java/org/apache/kafka/common/utils/Utils.java > * tools/src/main/java/org/apache/kafka/tools/ClientMetricsCommand.java > * core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala > * connect/file/src/test/java/org/apache/kafka/connect/file/FileStreamSinkTaskTest.java These are removed if you use the newest trunk branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
[ https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840075#comment-17840075 ] Chia-Ping Tsai commented on KAFKA-16604: [~sagarrao] Thanks for opening this jira. Do you have free cycle to take over it? > Deprecate ConfigDef.ConfigKey constructor from public APIs > -- > > Key: KAFKA-16604 > URL: https://issues.apache.org/jira/browse/KAFKA-16604 > Project: Kafka > Issue Type: Improvement >Reporter: Sagar Rao >Priority: Major > > Currently, one can create ConfigKey by either invoking the public constructor > directly and passing it to a ConfigDef object or by invoking the a bunch of > define methods. The 2 ways can get confusing at times. Moreover, it could > lead to errors as was noticed in KAFKA-16592 > We should ideally have only 1 way exposed to the users which IMO should be to > create the objects only through the exposed define methods. This ticket is > about marking the public constructor of ConfigKey as Deprecated first and > then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072132702 > I think we can add new test case in next PR. We can more focus on migrate to ClusterTestExtensions in this PR. Please take a look at https://github.com/apache/kafka/pull/15766#discussion_r1576009920, and we need a bit refactor but it should not a hard work. So please try to address it in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
OmniaGM commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1576158733 ## core/src/main/scala/kafka/server/DynamicConfig.scala: ## @@ -36,30 +35,12 @@ import scala.jdk.CollectionConverters._ object DynamicConfig { Review Comment: It need to be moved anyway when we move `KafkaConfig` class. So it's on my roadmap for KAFKA-15853. The Jira for moving KafkaConfig out of core is one of the most complicated one as many pieces will need to move to support this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576155040 ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -167,11 +186,11 @@ private TestKitNodes( NavigableMap controllerNodes, NavigableMap brokerNodes ) { -this.baseDirectory = baseDirectory; -this.clusterId = clusterId; -this.bootstrapMetadata = bootstrapMetadata; -this.controllerNodes = controllerNodes; -this.brokerNodes = brokerNodes; +this.baseDirectory = Objects.requireNonNull(baseDirectory); +this.clusterId = Objects.requireNonNull(clusterId); +this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); +this.controllerNodes = Objects.requireNonNull(controllerNodes); Review Comment: Please follow the rule of this PR - be a immutable object ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; +this.incarnationId = Objects.requireNonNull(incarnationId); Review Comment: this is not used so it is fine to remove it ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -97,9 +107,12 @@ public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { if (!brokerNodeBuilders.isEmpty()) { nextId = brokerNodeBuilders.lastKey() + 1; } -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() Review Comment: Can we evaluate those settings in `build` method? That can simplify code since we don't need to revert the changes (for example, the number of brokers). ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: `logDataDirectories` should return immutable collection -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16588: broker shutdown hangs when log.segment.delete.delay.ms is zero [kafka]
FrankYang0529 commented on PR #15773: URL: https://github.com/apache/kafka/pull/15773#issuecomment-2072172997 > > Yeah, I tried to use unit test. However, I didn't find a good way to check whether kafka-delete-logs task has run or not. I also tried to check whether scheduler can be stopped. However, the mock scheduler can still add new task during scheduler shutdown. > > Could we use `KafkaScheduler` instead? Yeah, update to use `KafkaScheduler`. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on code in PR #15748: URL: https://github.com/apache/kafka/pull/15748#discussion_r1576175394 ## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ## @@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati // Set the last modified time to an old value to force deletion of old segments val endOffset = log.logEndOffset log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs)) -TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, +TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments, "Timed out waiting for deletion of old segments") -assertEquals(1, log.numberOfSegments) +assertEquals(log.logStartOffset, endOffset) Review Comment: The tests is flaky post landing the PR in trunk: https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest We have to raise a similar fix in trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on code in PR #15748: URL: https://github.com/apache/kafka/pull/15748#discussion_r1576175394 ## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ## @@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati // Set the last modified time to an old value to force deletion of old segments val endOffset = log.logEndOffset log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs)) -TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, +TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments, "Timed out waiting for deletion of old segments") -assertEquals(1, log.numberOfSegments) +assertEquals(log.logStartOffset, endOffset) Review Comment: The test is flaky post landing the PR in trunk: https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest We have to raise a similar fix in trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest
Kamal Chandraprakash created KAFKA-16605: Summary: Fix the flaky LogCleanerParameterizedIntegrationTest Key: KAFKA-16605 URL: https://issues.apache.org/jira/browse/KAFKA-16605 Project: Kafka Issue Type: Task Reporter: Kamal Chandraprakash Assignee: Kamal Chandraprakash -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16605) Fix the flaky LogCleanerParameterizedIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-16605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kamal Chandraprakash updated KAFKA-16605: - Description: https://ge.apache.org/scans/tests?search.relativeStartTime=P7D&search.rootProjectNames=kafka&search.timeZoneId=Asia%2FCalcutta&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest&tests.test=testCleansCombinedCompactAndDeleteTopic(CompressionType)%5B1%5D > Fix the flaky LogCleanerParameterizedIntegrationTest > > > Key: KAFKA-16605 > URL: https://issues.apache.org/jira/browse/KAFKA-16605 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Major > > https://ge.apache.org/scans/tests?search.relativeStartTime=P7D&search.rootProjectNames=kafka&search.timeZoneId=Asia%2FCalcutta&tests.container=kafka.log.LogCleanerParameterizedIntegrationTest&tests.test=testCleansCombinedCompactAndDeleteTopic(CompressionType)%5B1%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
OmniaGM commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1576188830 ## server-common/src/main/java/org/apache/kafka/server/config/QuotaConfigs.java: ## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class QuotaConfigs { +public static final String NUM_QUOTA_SAMPLES_CONFIG = "quota.window.num"; +public static final String NUM_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for client quotas"; +public static final String NUM_CONTROLLER_QUOTA_SAMPLES_CONFIG = "controller.quota.window.num"; +public static final String NUM_CONTROLLER_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for controller mutation quotas"; +public static final String NUM_REPLICATION_QUOTA_SAMPLES_CONFIG = "replication.quota.window.num"; +public static final String NUM_REPLICATION_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for replication quotas"; +public static final String NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_CONFIG = "alter.log.dirs.replication.quota.window.num"; +public static final String NUM_ALTER_LOG_DIRS_REPLICATION_QUOTA_SAMPLES_DOC = "The number of samples to retain in memory for alter log dirs replication quotas"; + +// Always have 10 whole windows + 1 current window +public static final int NUM_QUOTA_SAMPLES_DEFAULT = 11; + +public static final String QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "quota.window.size.seconds"; +public static final String QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for client quotas"; +public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "controller.quota.window.size.seconds"; +public static final String CONTROLLER_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for controller mutations quotas"; +public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "replication.quota.window.size.seconds"; +public static final String REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for replication quotas"; +public static final String ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_CONFIG = "alter.log.dirs.replication.quota.window.size.seconds"; +public static final String ALTER_LOG_DIRS_REPLICATION_QUOTA_WINDOW_SIZE_SECONDS_DOC = "The time span of each sample for alter log dirs replication quotas"; +public static final int QUOTA_WINDOW_SIZE_SECONDS_DEFAULT = 1; + +public static final String CLIENT_QUOTA_CALLBACK_CLASS_CONFIG = "client.quota.callback.class"; +public static final String CLIENT_QUOTA_CALLBACK_CLASS_DOC = "The fully qualified name of a class that implements the ClientQuotaCallback interface, " + +"which is used to determine quota limits applied to client requests. By default, theand " + +"quotas that are stored in ZooKeeper are applied. For any given request, the most specific quota that matches the user principal " + +"of the session and the client-id of the request is applied."; + +public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "leader.replication.throttled.replicas"; +public static final String LEADER_REPLICATION_THROTTLED_REPLICAS_DOC = "A list of replicas for which log replication should be throttled on " + +"the leader side. The list should describe a set of replicas in the form " + +"[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " + +"all replicas for this topic."; +public static final List LEADER_REPLICATION_THROTTLED_REPLICAS_DEFAULT = Collections.emptyList(); + +public static final String FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG = "follower.replication.throttled.replicas"; +public static final
[PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
kamalcph opened a new pull request, #15787: URL: https://github.com/apache/kafka/pull/15787 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16073: Increment the local-log-start-offset before deleting segments in memory table [kafka]
kamalcph commented on code in PR #15748: URL: https://github.com/apache/kafka/pull/15748#discussion_r1576190364 ## core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala: ## @@ -121,9 +121,9 @@ class LogCleanerParameterizedIntegrationTest extends AbstractLogCleanerIntegrati // Set the last modified time to an old value to force deletion of old segments val endOffset = log.logEndOffset log.logSegments.foreach(_.lastModified = time.milliseconds - (2 * retentionMs)) -TestUtils.waitUntilTrue(() => log.logStartOffset == endOffset, +TestUtils.waitUntilTrue(() => 1 == log.numberOfSegments, "Timed out waiting for deletion of old segments") -assertEquals(1, log.numberOfSegments) +assertEquals(log.logStartOffset, endOffset) Review Comment: Raised #15787 PR to fix it in trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
chia7712 commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576193128 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala: ## @@ -209,7 +209,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams, JoinGroupCallback] { override def responseCallback(responsePromise: Promise[JoinGroupCallbackParams]): JoinGroupCallback = { - val callback: JoinGroupCallback = responsePromise.success(_) + val callback: JoinGroupCallback = responsePromise.success Review Comment: This causes build error when using scala 2.12 ``` [Error] /home/chia7712/project/kafka/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala:212:57: method with dependent type (value: kafka.coordinator.group.GroupCoordinatorConcurrencyTest.JoinGroupCallbackParams)responsePromise.type cannot be converted to function value ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072251579 > > I think we can add new test case in next PR. We can more focus on migrate to ClusterTestExtensions in this PR. > > Please take a look at [#15766 (comment)](https://github.com/apache/kafka/pull/15766#discussion_r1576009920), and we need a bit refactor but it should not a hard work. So please try to address it in this PR. Update it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576233041 ## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala: ## @@ -209,7 +209,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest class JoinGroupOperation extends GroupOperation[JoinGroupCallbackParams, JoinGroupCallback] { override def responseCallback(responsePromise: Promise[JoinGroupCallbackParams]): JoinGroupCallback = { - val callback: JoinGroupCallback = responsePromise.success(_) + val callback: JoinGroupCallback = responsePromise.success Review Comment: Yup, fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576262016 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -170,30 +227,23 @@ private void produceRecord() { } } -private void withStableConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); +private void withConsumerGroup(Runnable body, boolean isStable, Properties consumerProperties) { +try (Consumer consumer = createConsumer(consumerProperties)) { +consumer.subscribe(Collections.singletonList(TOPIC)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(DEFAULT_MAX_WAIT_MS)); +Assertions.assertNotEquals(0, records.count()); consumer.commitSync(); -body.run(); -} finally { -Utils.closeQuietly(consumer, "consumer"); +if (isStable) { +body.run(); +} } -} - -private void withEmptyConsumerGroup(Runnable body) { -Consumer consumer = createConsumer(new Properties()); -try { -TestUtils.subscribeAndWaitForRecords(TOPIC, consumer, DEFAULT_MAX_WAIT_MS); -consumer.commitSync(); -} finally { -Utils.closeQuietly(consumer, "consumer"); +if (!isStable) { +body.run(); } -body.run(); } private KafkaProducer createProducer(Properties config) { Review Comment: It seems `config` is always empty, so please remove it. ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +56,152 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +private final ClusterInstance clusterInstance; +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} + +@AfterEach +public void tearDown() { +if (consumerGroupService != null) { +consumerGroupService.close(); +} } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsNonExistingGroup(String quorum) { +@ClusterTest +public void testDeleteOffsetsNonExistingGroup() { String group = "missing.group"; String topic = "foo:1"; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(getArgs(group, topic)); +setupConsumerGroupService(getArgs(group, topic)); -Entry> res = service.deleteOffsets(group, Collections.singletonList(topic)); +Entry> res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic)); assertEquals(Errors.GROUP_ID_NOT_FOUND, res.getKey()); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition(String quorum) { -testWithStableConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC); +@ClusterTest +public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() { +createTopic(TOPIC); +Properties consumerProperties = new Properties(); +testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); +if (clusterInstance.isKRaftTest()) { +consumerProperties.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); +testWithConsumerGroup(TOPIC, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC, true, consumerProperties); +} } -@ParameterizedTest -
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
mimaison commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1576247775 ## tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java: ## @@ -98,23 +99,18 @@ public class ReassignPartitionsCommand { static final int EARLIEST_TOPICS_JSON_VERSION = 1; // Throttles that are set at the level of an individual broker. -//DynamicConfig.Broker.LeaderReplicationThrottledRateProp -static final String BROKER_LEVEL_LEADER_THROTTLE = "leader.replication.throttled.rate"; -//DynamicConfig.Broker.FollowerReplicationThrottledRateProp -static final String BROKER_LEVEL_FOLLOWER_THROTTLE = "follower.replication.throttled.rate"; -//DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp -static final String BROKER_LEVEL_LOG_DIR_THROTTLE = "replica.alter.log.dirs.io.max.bytes.per.second"; +static final String BROKER_LEVEL_LEADER_THROTTLE = QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG; Review Comment: Do we need to keep those fields? or could we directly use QuotaConfigs? ## checkstyle/import-control-metadata.xml: ## @@ -123,6 +123,7 @@ + Review Comment: Nit: The list is roughly ordered, can we put `org.apache.kafka.server.config` above? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10496: Removed relying on external DNS servers in tests [kafka]
ijuma commented on PR #9315: URL: https://github.com/apache/kafka/pull/9315#issuecomment-2072363189 Fyi, Java 18+ do have a SPI for this: https://bugs.openjdk.org/browse/JDK-8263693 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
OmniaGM commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1576308101 ## tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java: ## @@ -98,23 +99,18 @@ public class ReassignPartitionsCommand { static final int EARLIEST_TOPICS_JSON_VERSION = 1; // Throttles that are set at the level of an individual broker. -//DynamicConfig.Broker.LeaderReplicationThrottledRateProp -static final String BROKER_LEVEL_LEADER_THROTTLE = "leader.replication.throttled.rate"; -//DynamicConfig.Broker.FollowerReplicationThrottledRateProp -static final String BROKER_LEVEL_FOLLOWER_THROTTLE = "follower.replication.throttled.rate"; -//DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp -static final String BROKER_LEVEL_LOG_DIR_THROTTLE = "replica.alter.log.dirs.io.max.bytes.per.second"; +static final String BROKER_LEVEL_LEADER_THROTTLE = QuotaConfigs.LEADER_REPLICATION_THROTTLED_RATE_CONFIG; Review Comment: Good catch fixed changed this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move quota configs into server-common package [kafka]
OmniaGM commented on code in PR #15774: URL: https://github.com/apache/kafka/pull/15774#discussion_r1576308464 ## checkstyle/import-control-metadata.xml: ## @@ -123,6 +123,7 @@ + Review Comment: Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16587: Add subscription model information to group state [kafka]
dajac commented on code in PR #15785: URL: https://github.com/apache/kafka/pull/15785#discussion_r1576317772 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ## @@ -918,40 +935,53 @@ private void maybeUpdateClassicProtocolMembersSupportedProtocols( /** * Updates the subscribed topic names count. + * Changes to the subscription model as a consequence + * of this update is reflected as well. * * @param oldMember The old member. * @param newMember The new member. */ -private void maybeUpdateSubscribedTopicNames( +private void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel( ConsumerGroupMember oldMember, ConsumerGroupMember newMember ) { -maybeUpdateSubscribedTopicNames(subscribedTopicNames, oldMember, newMember); + maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel(subscribedTopicNames, oldMember, newMember); } /** - * Updates the subscription count. + * Updates the subscription count and the subscription model, if required. * * @param subscribedTopicCount The map to update. * @param oldMember The old member. * @param newMember The new member. */ -private static void maybeUpdateSubscribedTopicNames( +private void maybeUpdateSubscribedTopicNamesAndGroupSubscriptionModel( Review Comment: Should we add unit test to validate this logic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576329465 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -959,7 +960,7 @@ class ZkAdminManager(val config: KafkaConfig, } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) { (requestStatus.user, unknownScramMechanismMsg) Review Comment: the condition above should use `.contains` instead of `== Some` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576332250 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: the braces here is redundant not sure if we want to clean this up as well or not -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on PR #15679: URL: https://github.com/apache/kafka/pull/15679#issuecomment-2072421039 > @FrankYang0529 thanks for updated PR. please take a look at two comments. Hi @chia7712, thanks for the review. Updated it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576333034 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: We have this in few places in this class as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576338028 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -144,7 +144,7 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = Review Comment: This is not used as far as I can see, do we still need it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576343801 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -300,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig, protected val allPartitions = new Pool[TopicPartition, HostedPartition]( Review Comment: Should we declare type for protected (and the other public variables above) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
Jakub Scholz created KAFKA-16606: Summary: JBOD support in KRaft does not seem to be gated by the metadata version Key: KAFKA-16606 URL: https://issues.apache.org/jira/browse/KAFKA-16606 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Jakub Scholz JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka [source code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. However, it seems to be possible to run KRaft cluster with JBOD even with older metadata versions such as {{{}3.6{}}}. For example, I have a cluster using the {{3.6}} metadata version: {code:java} bin/kafka-features.sh --bootstrap-server localhost:9092 describe Feature: metadata.version SupportedMinVersion: 3.0-IV1 SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 {code} Yet a KRaft cluster with JBOD seems to run fine: {code:java} bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe Querying brokers for log directories information Received log directory information from brokers 2000,3000,1000 {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-3","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-36","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-47","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-14","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-43","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-1","size":114240,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-10","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-22","size":0,"offsetLag":0,"isF
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576354443 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: ough >500 is too much. Well they will get cleanup as we move more scala to java :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on PR #15786: URL: https://github.com/apache/kafka/pull/15786#issuecomment-2072445566 We also have couple of out-of-date parameters in javaDocs, we can either fix here or have another followup pr - https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L150 - https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala#L270 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576351984 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: Yeah in Scala braces are not required around multi-line blocks. I've not made this change because braces are required in Java and we have the braces in Scala all over the code base. Changing this is probably a >500 line diff. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576358398 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -300,7 +300,7 @@ class ReplicaManager(val config: KafkaConfig, protected val allPartitions = new Pool[TopicPartition, HostedPartition]( Review Comment: Ideally we should declare types for all public and protected fields but this is a huge change. Also while it's useful in some cases, in many I find it's not adding much value. In this specific example I even find it annoying as you get: ``` protected val allPartitions: Pool[TopicPartition, HostedPartition] = new Pool[TopicPartition, HostedPartition]( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576360092 ## core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala: ## @@ -52,8 +51,8 @@ object ZooKeeperClient { * @param sessionTimeoutMs session timeout in milliseconds * @param connectionTimeoutMs connection timeout in milliseconds * @param maxInFlightRequests maximum number of unacknowledged requests the client will send before blocking. + * @param clientConfig ZooKeeper client configuration, for TLS configs if desired Review Comment: Nice, I believe we might need to update the have java doc in `registerStateChangeHandler` and `unregisterStateChangeHandler` in same file as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576363327 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: That's just my guesstimate, but I expect it to be large. I kind of focused on the low hanging fruits here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576366593 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -871,7 +872,7 @@ class ZkAdminManager(val config: KafkaConfig, users.get.filterNot(usersToSkip.contains).foreach { user => try { val userConfigs = adminZkClient.fetchEntityConfig(ConfigType.USER, Sanitizer.sanitize(user)) -addToResultsIfHasScramCredential(user, userConfigs, true) +addToResultsIfHasScramCredential(user, userConfigs, explicitUser = true) } catch { case e: Exception => { Review Comment: make sense don't worry about it. We are moving more and more from scala to java anyway so these will get resolved over time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840118#comment-17840118 ] Mickael Maison commented on KAFKA-16606: cc [~soarez] > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-3","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-36","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-47","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-14","size":0,"
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ValueSource(strings = Array("zk", "kraft")) def testLongTopicNames(quorum: String): Unit = { val client = Admin.create(createConfig) -val longTopicName = String.join("", Collections.nCopies(249, "x")); -val invalidTopicName = String.join("", Collections.nCopies(250, "x")); +val longTopicName = String.join("", Collections.nCopies(249, "x")) Review Comment: Small scala suggestion here (which you feel free to ignore) we can use `List.fill(249)("x").mkString("")` instead of Java `String.join` and `Collections.nCopies` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add test for PartitionMetadataFile [kafka]
KevinZTW commented on code in PR #15714: URL: https://github.com/apache/kafka/pull/15714#discussion_r1576372796 ## storage/src/test/java/org/apache/kafka/storage/internals/checkpoint/PartitionMetadataFileTest.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.checkpoint; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.InconsistentTopicIdException; + +import org.apache.kafka.storage.internals.log.LogDirFailureChannel; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class PartitionMetadataFileTest { +private final File dir = TestUtils.tempDirectory(); +private final File file = PartitionMetadataFile.newFile(dir); + +@Test +public void testSetRecordWithDifferentTopicId() { +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +partitionMetadataFile.record(topicId); +Uuid differentTopicId = Uuid.randomUuid(); +assertThrows(InconsistentTopicIdException.class, () -> partitionMetadataFile.record(differentTopicId)); +} + +@Test +public void testSetRecordWithSameTopicId() { +PartitionMetadataFile partitionMetadataFile = new PartitionMetadataFile(file, null); +Uuid topicId = Uuid.randomUuid(); +partitionMetadataFile.record(topicId); +assertDoesNotThrow(() -> partitionMetadataFile.record(topicId)); Review Comment: sorry I missed this one just update the PR as suggested! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
OmniaGM commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576375031 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2192,8 +2192,8 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { @ValueSource(strings = Array("zk", "kraft")) def testLongTopicNames(quorum: String): Unit = { val client = Admin.create(createConfig) -val longTopicName = String.join("", Collections.nCopies(249, "x")); -val invalidTopicName = String.join("", Collections.nCopies(250, "x")); +val longTopicName = String.join("", Collections.nCopies(249, "x")) Review Comment: Small scala suggestion here (which feel free to ignore) we can use `List.fill(249)("x").mkString("")` instead of Java `String.join` and `Collections.nCopies` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576386813 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +256,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: We can change the type from `Properties` to `Map`. With that change we don't need to create a lot of `Properties` in each test case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]
nikramakrishnan commented on PR #15241: URL: https://github.com/apache/kafka/pull/15241#issuecomment-2072530250 Bump! @satishd @kamalcph can we get this review going? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576385492 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576392526 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840125#comment-17840125 ] Igor Soarez commented on KAFKA-16606: - Thanks for bringing this to my attention [~mimaison]. Hi [~scholzj] , thanks for pointing this out. I think there's some confusion here with a JBOD configuration being +allowed+ vs being +supported+ in KRaft. In terms of just reading and writing data to multiple log directories, as long as those direcories are always available, there's nothing special about KRaft that would require changes compared with ZK mode. What is enabled with 3.7 is the handling of failed log directories. You'll find that partitions don't get new leaders elected – becoming indefinitely unavailable – if the log directory for the leader replica fails but the broker stays alive. If a single directory is configured and it becomes unavailable the broker shuts down, as there is no point in continuing to run without access to storage. When it shuts down the controller becomes aware of that – via an ephemeral znode in ZK mode, or via missing hearbeats in KRaft – and it will re-elect leaders for partitions that were led by the broker. When multiple directories are configured, it is critical to have a separate mechanism to let the controller know there is a partial failure – the broker is still alive and operational on the remaining log dirs, but any partitions on the directory that failed need a leadership and ISR update. In ZK mode that was handled by notifying the controller via a znode, so a an alternative solution was required for KRaft. You can find the details in [KIP-858|https://cwiki.apache.org/confluence/display/KAFKA/KIP-858%3A+Handle+JBOD+broker+disk+failure+in+KRaft]. Let me know if that makes sense. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
lianetm commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576409478 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,448 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +}) +public class DeleteConsumerGroupsTest { +private final ClusterInstance cluster; +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +@ClusterTest +public void testDeleteWithTopicOption() { +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", GROUP, "--topic"}; assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteCmdNonExistingGroup() { String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; +String[] cgcArgs = new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--delete", "--group", missingGroup}; ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); - String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); } -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); +@ClusterTest +public void testDeleteNonExistingGroup() { String missingGroup = "missing.group"; -
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576420731 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -144,7 +144,7 @@ case class LogReadResult(info: FetchDataInfo, def withEmptyFetchInfo: LogReadResult = Review Comment: You're right, this is unused -> removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
mimaison commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576422765 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -959,7 +960,7 @@ class ZkAdminManager(val config: KafkaConfig, } else if (requestStatus.mechanism == Some(ScramMechanism.UNKNOWN)) { (requestStatus.user, unknownScramMechanismMsg) Review Comment: It looks weird to me but it seems `contains()` is Scala idiomatic, so I made the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
FrankYang0529 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576432135 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -202,7 +256,7 @@ private KafkaProducer createProducer(Properties config) { } private Consumer createConsumer(Properties config) { Review Comment: Updated it and also added empty map for zk, or zk will not be tested. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576432963 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: I make `logDataDirectories()` return Set instead List to get rid of the redundant arraylist wrapping. ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -121,16 +146,16 @@ public BrokerNode build( private final boolean combined; private final Map propertyOverrides; -BrokerNode( +private BrokerNode( Uuid incarnationId, MetaPropertiesEnsemble initialMetaPropertiesEnsemble, boolean combined, Map propertyOverrides ) { -this.incarnationId = incarnationId; -this.initialMetaPropertiesEnsemble = initialMetaPropertiesEnsemble; Review Comment: I make `logDataDirectories()` return `Set` instead `List` to get rid of the redundant arraylist wrapping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840132#comment-17840132 ] Jakub Scholz commented on KAFKA-16606: -- [~soarez] That sounds like without using the 3.7-IV2 metadata JBOD in Kraft works fine until it fails and then it might cause problems. So I wonder if it would have been better to protect the users from that situation. But it sounds like it was an intentional decision. And as I said, changing it now might be anyway a bit tricky as it might break someone's cluster after upgrading to 3.7.1 / 3.8.0. So I guess it sounds good as it is and I will close this. Thanks for the explanation. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","s
Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]
mimaison commented on PR #15756: URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072692296 While it's not directly adding new configurations, it's effectively changing the behavior of `ExtractField` and `InsertField` and making them support new configurations. The `configure()` method on Transformation is only called with configurations provided with their prefix. In this example a user would need to set something like: ``` transforms=my-smt transforms.my-smt.type=MyTransformation transforms.my-smt.key.converter.replace.null.with.default=false ``` Also by not defining the configurations explicitly users can't discover them using the `GET /connector-plugins/{plugin-type}/config`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
johnnychhsu opened a new pull request, #15788: URL: https://github.com/apache/kafka/pull/15788 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
frankvicky commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072734321 Hi @lianetm, @chia7712 Thanks for the suggestions, I have addressed the comments. 😀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Various cleanups in core [kafka]
chia7712 commented on code in PR #15786: URL: https://github.com/apache/kafka/pull/15786#discussion_r1576457731 ## core/src/main/scala/kafka/metrics/LinuxIoMetricsCollector.scala: ## @@ -29,9 +29,9 @@ import scala.jdk.CollectionConverters._ */ class LinuxIoMetricsCollector(procRoot: String, val time: Time, val logger: Logger) { import LinuxIoMetricsCollector._ - var lastUpdateMs: Long = -1L - var cachedReadBytes:Long = 0L - var cachedWriteBytes:Long = 0L + private var lastUpdateMs: Long = -1L Review Comment: It seems the type declaration is unnecessary since its literal ends with `L` ## core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala: ## @@ -347,7 +347,7 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { if (isKRaftTest()) { val result = new util.HashMap[Uuid, String]() controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().entrySet().forEach { Review Comment: how about `controllerServer.controller.findAllTopicIds(ANONYMOUS_CONTEXT).get().forEach((k, v) => result.put(v, k))` ## core/src/main/scala/kafka/server/SharedServer.scala: ## @@ -107,16 +107,16 @@ class SharedServer( @volatile var brokerMetrics: BrokerServerMetrics = _ @volatile var controllerServerMetrics: ControllerMetadataMetrics = _ @volatile var loader: MetadataLoader = _ - val snapshotsDisabledReason = new AtomicReference[String](null) + private val snapshotsDisabledReason = new AtomicReference[String](null) @volatile var snapshotEmitter: SnapshotEmitter = _ - @volatile var snapshotGenerator: SnapshotGenerator = _ - @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _ + @volatile private var snapshotGenerator: SnapshotGenerator = _ + @volatile private var metadataLoaderMetrics: MetadataLoaderMetrics = _ def clusterId: String = metaPropsEnsemble.clusterId().get() - def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt() + def nodeId: Int = metaPropsEnsemble.nodeId().getAsInt - def isUsed(): Boolean = synchronized { + private def isUsed(): Boolean = synchronized { Review Comment: How about `isUsed`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jakub Scholz resolved KAFKA-16606. -- Resolution: Not A Problem > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-3","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-36","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-47","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-14","size":0,"offsetLag":0,"isFuture":false},{"partition":
Re: [PR] KAFKA-16605: Fix the flaky LogCleanerParameterizedIntegrationTest [kafka]
johnnychhsu commented on PR #15787: URL: https://github.com/apache/kafka/pull/15787#issuecomment-2072775161 nice fix! crystal clear solution :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
chia7712 commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576492165 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -66,17 +69,30 @@ public Builder setNumLogDirectories(int numLogDirectories) { return this; } -public BrokerNode build( -String baseDirectory, -Uuid clusterId, -boolean combined -) { +public Builder setClusterId(Uuid clusterId) { +this.clusterId = clusterId; +return this; +} + +public Builder setBaseDirectory(String baseDirectory) { +this.baseDirectory = baseDirectory; +return this; +} + +public Builder setCombined(boolean combined) { +this.combined = combined; +return this; +} + +public Builder setPropertyOverrides(Map propertyOverrides) { +this.propertyOverrides = Collections.unmodifiableMap(new HashMap<>(propertyOverrides)); +return this; +} + +public BrokerNode build() { if (id == -1) { throw new RuntimeException("You must set the node id."); } -if (incarnationId == null) { -incarnationId = Uuid.randomUuid(); -} List logDataDirectories = IntStream Review Comment: Could you add null check for `baseDirectory`? ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -59,73 +65,64 @@ public Builder setCombined(boolean combined) { } public Builder setNumControllerNodes(int numControllerNodes) { -if (numControllerNodes < 0) { -throw new RuntimeException("Invalid negative value for numControllerNodes"); -} - -while (controllerNodeBuilders.size() > numControllerNodes) { -controllerNodeBuilders.pollFirstEntry(); -} -while (controllerNodeBuilders.size() < numControllerNodes) { -int nextId = startControllerId(); -if (!controllerNodeBuilders.isEmpty()) { -nextId = controllerNodeBuilders.lastKey() + 1; -} -controllerNodeBuilders.put(nextId, -new ControllerNode.Builder(). -setId(nextId)); -} +this.numControllerNodes = numControllerNodes; return this; } public Builder setNumBrokerNodes(int numBrokerNodes) { -return setBrokerNodes(numBrokerNodes, 1); +this.numBrokerNodes = numBrokerNodes; +return this; +} + +public Builder setNumDisksPerBroker(int numDisksPerBroker) { +this.numDisksPerBroker = numDisksPerBroker; +return this; +} + +public Builder setPerBrokerProperties(Map> perBrokerProperties) { +this.perBrokerProperties = Collections.unmodifiableMap( +perBrokerProperties.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> Collections.unmodifiableMap(new HashMap<>(e.getValue()); +return this; } -public Builder setBrokerNodes(int numBrokerNodes, int disksPerBroker) { +public TestKitNodes build() { +if (numControllerNodes < 0) { +throw new RuntimeException("Invalid negative value for numControllerNodes"); +} if (numBrokerNodes < 0) { throw new RuntimeException("Invalid negative value for numBrokerNodes"); } -if (disksPerBroker <= 0) { -throw new RuntimeException("Invalid value for disksPerBroker"); -} -while (brokerNodeBuilders.size() > numBrokerNodes) { -brokerNodeBuilders.pollFirstEntry(); +if (numDisksPerBroker <= 0) { +throw new RuntimeException("Invalid value for numDisksPerBroker"); } -while (brokerNodeBuilders.size() < numBrokerNodes) { -int nextId = startBrokerId(); -if (!brokerNodeBuilders.isEmpty()) { -nextId = brokerNodeBuilders.lastKey() + 1; -} -BrokerNode.Builder brokerNodeBuilder = new BrokerNode.Builder() -.setId(nextId) -.setNumLogDirectories(disksPerBroker); -brokerNodeBuilders.put(nextId, brokerNodeBuilder); -} -return this; -} -public TestKitNodes build() { String baseDirectory = TestUtils.tempDirectory().getAbsolutePath(); Review Comment: We don't need to delete `baseDirectory` since `TestUtils.tempDirectory()` will delete the return folder when terminating. ## core/src/test/java/kafka/testkit/TestKitNodes.java: ## @@ -167,11 +164,1
Re: [PR] KAFKA-14509: [3/4] Add integration test for consumerGroupDescribe Api [kafka]
johnnychhsu commented on code in PR #15727: URL: https://github.com/apache/kafka/pull/15727#discussion_r1576500340 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestsTest.scala: ## @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package unit.kafka.server + +import kafka.server.GroupCoordinatorBaseRequestTest +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, ClusterTestDefaults, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.common.ConsumerGroupState +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData +import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData.{Assignment, DescribedGroup, TopicPartitions} +import org.apache.kafka.common.protocol.ApiKeys +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.extension.ExtendWith +import org.junit.jupiter.api.{Tag, Timeout} + +import scala.jdk.CollectionConverters._ + +@Timeout(120) +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT, brokers = 1) Review Comment: may i know if we also need to include zk mode for this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1576507146 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTo
Re: [PR] KAFKA-16475: add test for TopicImageNodeTest [kafka]
johnnychhsu commented on PR #15720: URL: https://github.com/apache/kafka/pull/15720#issuecomment-2072812226 thanks for the review @cmccabe ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16483: migrate DeleteOffsetsConsumerGroupCommandIntegrationTest to use ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15679: URL: https://github.com/apache/kafka/pull/15679#discussion_r1576508031 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteOffsetsConsumerGroupCommandIntegrationTest.java: ## @@ -42,109 +58,141 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; -public class DeleteOffsetsConsumerGroupCommandIntegrationTest extends ConsumerGroupCommandTest { -String[] getArgs(String group, String topic) { -return new String[] { -"--bootstrap-server", bootstrapServers(listenerName()), -"--delete-offsets", -"--group", group, -"--topic", topic -}; +@Tag("integration") +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +@ExtendWith(ClusterTestExtensions.class) +public class DeleteOffsetsConsumerGroupCommandIntegrationTest { +public static final String TOPIC = "foo"; +public static final String GROUP = "test.group"; +private final ClusterInstance clusterInstance; + +private ConsumerGroupCommand.ConsumerGroupService consumerGroupService; +private final Iterable> consumerConfigs; + +DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +this.consumerConfigs = clusterInstance.isKRaftTest() +? Arrays.asList( +new HashMap() {{ Review Comment: We can use immutable map and make `createConsumer` create a inner mutable map to collect all configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15838: ExtractField and InsertField NULL Values are replaced by default value even in NULLABLE fields [kafka]
mfvitale commented on PR #15756: URL: https://github.com/apache/kafka/pull/15756#issuecomment-2072820368 > The `configure()` method on Transformation is only called with configurations provided with their prefix. This cleared my doubt. So in that case it should be clear declared as an SMT configuration. I'll open a KIP. Thanks for the explanation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16568: JMH Benchmarks for Server Side Rebalances [kafka]
rreddy-22 commented on code in PR #15717: URL: https://github.com/apache/kafka/pull/15717#discussion_r1576516058 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java: ## @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.assignor; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.assignor.AssignmentMemberSpec; +import org.apache.kafka.coordinator.group.assignor.AssignmentSpec; +import org.apache.kafka.coordinator.group.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.assignor.PartitionAssignor; +import org.apache.kafka.coordinator.group.assignor.RangeAssignor; +import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.assignor.UniformAssignor; +import org.apache.kafka.coordinator.group.consumer.SubscribedTopicMetadata; +import org.apache.kafka.coordinator.group.consumer.TopicMetadata; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class ServerSideAssignorBenchmark { + +public enum AssignorType { +RANGE(new RangeAssignor()), +UNIFORM(new UniformAssignor()); + +private final PartitionAssignor assignor; + +AssignorType(PartitionAssignor assignor) { +this.assignor = assignor; +} + +public PartitionAssignor assignor() { +return assignor; +} +} + +/** + * The subscription pattern followed by the members of the group. + * + * A subscription model is considered homogenous if all the members of the group + * are subscribed to the same set of topics, it is heterogeneous otherwise. + */ +public enum SubscriptionModel { +HOMOGENEOUS, HETEROGENEOUS +} + +/** + * The assignment type is decided based on whether all the members are assigned partitions + * for the first time (full), or incrementally when a rebalance is triggered. + */ +public enum AssignmentType { +FULL, INCREMENTAL +} + +@Param({"100", "500", "1000", "5000", "1"}) +private int memberCount; + +@Param({"5", "10", "50"}) +private int partitionsToMemberRatio; + +@Param({"10", "100", "1000"}) +private int topicCount; + +@Param({"true", "false"}) +private boolean isRackAware; + +@Param({"HOMOGENEOUS", "HETEROGENEOUS"}) +private SubscriptionModel subscriptionModel; + +@Param({"RANGE", "UNIFORM"}) +private AssignorType assignorType; + +@Param({"FULL", "INCREMENTAL"}) +private AssignmentType assignmentType; + +private PartitionAssignor partitionAssignor; + +private static final int NUMBER_OF_RACKS = 3; + +private static final int MAX_BUCKET_COUNT = 5; + +private AssignmentSpec assignmentSpec; + +private SubscribedTopicDescriber subscribedTopicDescriber; + +private final List allTopicIds = new ArrayList<>(topicCount); + +@Setup(Level.Trial) +public void setup() { +Map topicMetadata = createTopicMetadata(); +subscribedTopicDescriber = new SubscribedTo
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840148#comment-17840148 ] Igor Soarez commented on KAFKA-16606: - That's right [~scholzj] . When KIP-858 started, the documentation was already explicit about JBOD support being one of the missing features in KRaft, but the configuration was already allowed, so it didn't seem right to change it at the time, but in hindsight maybe that would've been a better decision. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-28","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-7","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-40","size":0,"offsetLag":0,"isFuture":
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on code in PR #15766: URL: https://github.com/apache/kafka/pull/15766#discussion_r1576523223 ## tools/src/test/java/org/apache/kafka/tools/consumer/group/DeleteConsumerGroupsTest.java: ## @@ -17,279 +17,440 @@ package org.apache.kafka.tools.consumer.group; import joptsimple.OptionException; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.RangeAssignor; +import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.IntStream; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.apache.kafka.common.ConsumerGroupState.EMPTY; +import static org.apache.kafka.common.ConsumerGroupState.STABLE; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -public class DeleteConsumerGroupsTest extends ConsumerGroupCommandTest { -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteWithTopicOption(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", GROUP, "--topic"}; -assertThrows(OptionException.class, () -> getConsumerGroupService(cgcArgs)); -} - -@ParameterizedTest -@ValueSource(strings = {"zk", "kraft"}) -public void testDeleteCmdNonExistingGroup(String quorum) { -createOffsetsTopic(listenerName(), new Properties()); -String missingGroup = "missing.group"; - -String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServers(listenerName()), "--delete", "--group", missingGroup}; -ConsumerGroupCommand.ConsumerGroupService service = getConsumerGroupService(cgcArgs); -String output = ToolsTestUtils.grabConsoleOutput(service::deleteGroups); -assertTrue(output.contains("Group '" + missingGroup + "' could not be deleted due to:") && output.contains(Errors.GROUP_ID_NOT_FOUND.message()), -"The expected error (" + Errors.GROUP_ID_NOT_FOUND + ") was not detected while deleting consumer group"); +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { +@ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "1"), +@ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true") +}) +public class DeleteConsumerGroupsTest { +private static final String TOPIC = "foo"; +private static final String GROUP = "test.group"; +private static final String MISSING_GROUP = "missing.group"; + +private final ClusterInstance cluster; +private final Iterable groupProtocols; + +public DeleteConsumerGroupsTest(ClusterInstance cluster) { +this.cluster = cluster; +this.groupProtocols = cluster.isKRaftTest() +? Arrays.asList(CLASSIC, CONSUMER) +: singleton(C
Re: [PR] KAFKA-16593: Rewrite DeleteConsumerGroupsTest by ClusterTestExtensions [kafka]
chia7712 commented on PR #15766: URL: https://github.com/apache/kafka/pull/15766#issuecomment-2072841232 @frankvicky @m1a2st It seems your PR (#15766 and #15779) need a consumer running in background. Hence, we can consider moving `AbstractConsumerGroupExecutor`/`ConsumerGroupExecutor` to a individual java file. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16560: Refactor/cleanup BrokerNode/ControllerNode/ClusterConfig [kafka]
brandboat commented on code in PR #15761: URL: https://github.com/apache/kafka/pull/15761#discussion_r1576531833 ## core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala: ## @@ -84,17 +125,38 @@ class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersio assertEquals(ApiKeys.API_VERSIONS.latestVersion(), apiVersion.maxVersion()) } - @ClusterTest(metadataVersion = MetadataVersion.IBP_3_7_IV4, serverProperties = Array( -new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "false"), -new ClusterConfigProperty(key = "unstable.metadata.versions.enable", value = "false"), + @ClusterTests(Array( Review Comment: Already added the TODO in the top of this file. L30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16606) JBOD support in KRaft does not seem to be gated by the metadata version
[ https://issues.apache.org/jira/browse/KAFKA-16606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17840150#comment-17840150 ] Mickael Maison commented on KAFKA-16606: To me that kind of looks like a bug. Previously JBOD was not supported with KRaft so it was a big deal that it did not quite work. As we're approaching JBOD being production ready, we should ease the rough edges and avoid allowing weird unsupported configurations. As JBOD is not production ready, would we really break anyone's cluster if we prevented using multiple log dirs with KRaft and the version set as < 3.7? I understand some people may disagree but I wonder if it's a discussion worth having. > JBOD support in KRaft does not seem to be gated by the metadata version > --- > > Key: KAFKA-16606 > URL: https://issues.apache.org/jira/browse/KAFKA-16606 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Jakub Scholz >Priority: Major > > JBOD support in KRaft should be supported since Kafka 3.7.0. The Kafka > [source > code|https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java#L194-L195] > suggests that it is supported with the metadata version {{{}3.7-IV2{}}}. > However, it seems to be possible to run KRaft cluster with JBOD even with > older metadata versions such as {{{}3.6{}}}. For example, I have a cluster > using the {{3.6}} metadata version: > {code:java} > bin/kafka-features.sh --bootstrap-server localhost:9092 describe > Feature: metadata.version SupportedMinVersion: 3.0-IV1 > SupportedMaxVersion: 3.7-IV4 FinalizedVersionLevel: 3.6-IV2 Epoch: 1375 > {code} > Yet a KRaft cluster with JBOD seems to run fine: > {code:java} > bin/kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe > Querying brokers for log directories information > Received log directory information from brokers 2000,3000,1000 > {"brokers":[{"broker":2000,"logDirs":[{"partitions":[{"partition":"__consumer_offsets-13","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-46","size":0,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-0","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-9","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-42","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-21","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-17","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-30","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-26","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-5","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-38","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-1","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-34","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-16","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-45","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-12","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-41","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-24","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-20","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-49","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-0","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-29","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-25","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-8","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-37","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-4","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-33","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-15","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-48","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-11","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-44","size":407136,"offsetLag":0,"isFuture":false},{"partition":"kafka-test-apps-2","size":28560,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-23","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-19","size":0,"offsetLag":0,"isFuture":false},{"partition":"__consumer_offsets-32","size":0,"offsetLag":0,"isFuture":false},{"partition":
Re: [PR] MINOR: Remove unused parameters in KafkaConfig [kafka]
chia7712 commented on PR #15788: URL: https://github.com/apache/kafka/pull/15788#issuecomment-2072851173 @johnnychhsu Instead of removing them, could you make `MetadataLogConfig` use those help methods? https://github.com/apache/kafka/blob/1b301b30207ed8fca9f0aea5cf940b0353a1abca/core/src/main/scala/kafka/MetadataLogConfig.scala#L38 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Current working branch for KIP-848 kip [kafka]
lucasbru opened a new pull request, #15789: URL: https://github.com/apache/kafka/pull/15789 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org