[jira] [Assigned] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta
[ https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-19331: - Assignee: Ming-Yen Chung > No error handling for leader not appeared in applyLocalFollowersDelta > -- > > Key: KAFKA-19331 > URL: https://issues.apache.org/jira/browse/KAFKA-19331 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Ming-Yen Chung >Priority: Major > Labels: newbie > > In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from > the leader, we'll check if the leader node info is in metadata image. If > somehow it didn't include in the newImage, we'll log something like: > > {code:java} > [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching > quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader > Some(2) because it is not alive. (state.change.logger) > [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of > become-follower for 1 partitions (state.change.logger) > {code} > > It's confusing to users to see it's unable to fetch, then start fetch. And in > the end, it's not actually fetching... We should handling the error well by > updating the `FailedPartition` and not updating other successful result > status. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]
dajac commented on code in PR #19790: URL: https://github.com/apache/kafka/pull/19790#discussion_r2106896835 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2485,8 +2485,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test offset deletion while consuming val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, util.Set.of(tp1, tp2)) -// Top level error will equal to the first partition level error -assertFutureThrows(classOf[GroupSubscribedToTopicException], offsetDeleteResult.all()) Review Comment: It seems that we can revert this change now. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java: ## @@ -1576,6 +1581,7 @@ public void testFromClassicGroup() { assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch()); assertEquals(expectedConsumerGroup.state(), consumerGroup.state()); assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor()); +assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata())); Review Comment: Is `Map.copyOf` really necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19331) No error handling for leader unregistered in applyLocalFollowersDelta
Luke Chen created KAFKA-19331: - Summary: No error handling for leader unregistered in applyLocalFollowersDelta Key: KAFKA-19331 URL: https://issues.apache.org/jira/browse/KAFKA-19331 Project: Kafka Issue Type: Bug Reporter: Luke Chen In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from the leader, we'll check if the leader node info is in metadata image. If somehow it didn't include in the newImage, we'll log something like: {code:java} [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader Some(2) because it is not alive. (state.change.logger) [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of become-follower for 1 partitions (state.change.logger) {code} It's confusing to users to see it's unable to fetch, then start fetch. And in the end, it's not actually fetching... We should handling the error well by updating the `FailedPartition` and not updating other successful result status. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19331) No error handling for leader unregistered in applyLocalFollowersDelta
[ https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-19331: -- Labels: newbie (was: ) > No error handling for leader unregistered in applyLocalFollowersDelta > -- > > Key: KAFKA-19331 > URL: https://issues.apache.org/jira/browse/KAFKA-19331 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > Labels: newbie > > In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from > the leader, we'll check if the leader node info is in metadata image. If > somehow it didn't include in the newImage, we'll log something like: > > {code:java} > [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching > quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader > Some(2) because it is not alive. (state.change.logger) > [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of > become-follower for 1 partitions (state.change.logger) > {code} > > It's confusing to users to see it's unable to fetch, then start fetch. And in > the end, it's not actually fetching... We should handling the error well by > updating the `FailedPartition` and not updating other successful result > status. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta
[ https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954040#comment-17954040 ] Ming-Yen Chung commented on KAFKA-19331: Hi [~showuon] , could you assign this ticket to me? > No error handling for leader not appeared in applyLocalFollowersDelta > -- > > Key: KAFKA-19331 > URL: https://issues.apache.org/jira/browse/KAFKA-19331 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > Labels: newbie > > In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from > the leader, we'll check if the leader node info is in metadata image. If > somehow it didn't include in the newImage, we'll log something like: > > {code:java} > [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching > quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader > Some(2) because it is not alive. (state.change.logger) > [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of > become-follower for 1 partitions (state.change.logger) > {code} > > It's confusing to users to see it's unable to fetch, then start fetch. And in > the end, it's not actually fetching... We should handling the error well by > updating the `FailedPartition` and not updating other successful result > status. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-19330: Change MockSerializer/Deserializer to use String serializer instead of byte[] [kafka]
mingyen066 opened a new pull request, #19812: URL: https://github.com/apache/kafka/pull/19812 While rewriting `EndToEndClusterIdTest` in Java (#19741 ), I found that the test uses `MockInterceptor` and `MockSerializer `together. However, `MockSerializer` was using a `byte[]` serializer, while `MockInterceptor` expected a `String` serializer, leading to a `ClassCastException`. I chose to update `MockSerializer` to use String, as it is used less frequently than the interceptor. Using String also simplifies the code by avoiding the need to write expressions like "value".getBytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]
lucasbru commented on code in PR #19802: URL: https://github.com/apache/kafka/pull/19802#discussion_r2106715246 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ## @@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds( } /** - * @return An immutable map of partition metadata for each topic that are inputs for this streams group. + * @return The metadata hash. */ -public Map partitionMetadata() { -return Collections.unmodifiableMap(partitionMetadata); +public long metadataHash() { +return metadataHash.get(); } /** - * Updates the partition metadata. This replaces the previous one. + * Updates the metadata hash. * - * @param partitionMetadata The new partition metadata. + * @param metadataHash The new metadata hash. */ -public void setPartitionMetadata( -Map partitionMetadata -) { -this.partitionMetadata.clear(); -this.partitionMetadata.putAll(partitionMetadata); -maybeUpdateConfiguredTopology(); -maybeUpdateGroupState(); Review Comment: ConfiguredTopology can be derived completely from the records for `PartitionMetadata` and `Topology`. So we just need to cache it, and can recreate it upon the first heartbeat after failover. We could consider persisting it, but that would just mean storing somewhat duplicate data. `state` is also derived from the other records. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
dajac commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2106927220 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -550,6 +556,7 @@ private GroupMetadataManager( this.shareGroupAssignor = shareGroupAssignor; this.authorizerPlugin = authorizerPlugin; this.streamsGroupAssignors = streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, Function.identity())); +this.topicHashCache = new ConcurrentHashMap<>(); Review Comment: nit: We can use a regular HashMap here because the GroupMetadataManager is never used concurrently. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java: ## @@ -151,6 +152,8 @@ public String toLowerCaseString() { */ private final TimelineHashMap resolvedRegularExpressions; +private final AtomicBoolean addSubscriptionMetadataTombstoneRecord = new AtomicBoolean(false); Review Comment: We should also use a timeline data structure for this one. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java: ## @@ -398,6 +423,21 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +public long computeMetadataHash( Review Comment: nit: This could be a static method. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid( * @return The hash of the group. */ static long computeGroupHash(Map topicHashes) { -if (topicHashes.isEmpty()) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(); +for (Map.Entry entry : topicHashes.entrySet()) { +// Filter out entries with a hash value of 0, which indicates no topic +if (entry.getValue() != 0) { Review Comment: I wonder whether it is really necessary to ignore those. Having a zero does not hurt, isn't it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java: ## @@ -398,6 +423,21 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +public long computeMetadataHash( +Map subscribedTopicNames, +Map topicHashCache, +MetadataImage metadataImage +) { +Map topicHash = new HashMap<>(subscribedTopicNames.size()); +subscribedTopicNames.keySet().forEach(topicName -> { Review Comment: nit: We can remove the curly braces. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2220,6 +2227,11 @@ private CoordinatorResult int groupEpoch = group.groupEpoch(); SubscriptionType subscriptionType = group.subscriptionType(); +if (group.addSubscriptionMetadataTombstoneRecord()) { + records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)); +group.setAddSubscriptionMetadataTombstoneRecord(false); Review Comment: I wonder whether we should put this code into updateSubscriptionMetadata to ensure that we do this in all the place. Have you considered it? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2220,6 +2227,11 @@ private CoordinatorResult int groupEpoch = group.groupEpoch(); SubscriptionType subscriptionType = group.subscriptionType(); +if (group.addSubscriptionMetadataTombstoneRecord()) { + records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)); +group.setAddSubscriptionMetadataTombstoneRecord(false); Review Comment: This is not the proper way to do this. In general, we never mutate the state from here. We should move it to the replay method of the tombstone. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18904: kafka-configs.sh return resource doesn't exist message [3/N] [kafka]
DL1231 commented on code in PR #19808: URL: https://github.com/apache/kafka/pull/19808#discussion_r2106900047 ## clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java: ## @@ -163,6 +164,15 @@ public static ListConsumerGroupOffsetsResult listConsumerGroupOffsetsResult(Stri return new ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group), future)); } +public static ListConfigResourcesResult listConfigResourcesResult(Map> resourceNames) { +Collection resources = resourceNames.entrySet().stream() +.flatMap(entry -> entry.getValue().stream() +.map(name -> new ConfigResource(entry.getKey(), name))) +.collect(Collectors.toList()); +return new ListConfigResourcesResult(KafkaFuture.completedFuture(resources)); + Review Comment: nit: extra blank ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -342,6 +342,42 @@ object ConfigCommand extends Logging { } private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = { +if (!describeAll) { + entityName.foreach { name => +entityType match { + case TopicType => +Topic.validate(name) +if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get.contains(name)) { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case BrokerType | BrokerLoggerConfigType => +if (adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) { + // valid broker id +} else if (name == BrokerDefaultEntityName) { + // default broker configs +} else { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case ClientMetricsType => +if (adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all.get Review Comment: nit: Can't this just be util.Set.of? ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -342,6 +342,42 @@ object ConfigCommand extends Logging { } private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = { +if (!describeAll) { + entityName.foreach { name => +entityType match { + case TopicType => +Topic.validate(name) +if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get.contains(name)) { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case BrokerType | BrokerLoggerConfigType => +if (adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) { + // valid broker id +} else if (name == BrokerDefaultEntityName) { + // default broker configs +} else { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case ClientMetricsType => +if (adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all.get + .stream.noneMatch(_.name == name)) { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case GroupType => +if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() == name) && + adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), new ListConfigResourcesOptions).all.get Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19268 Missing mocks for SharePartitionManagerTest tests [kafka]
adixitconfluent commented on PR #19786: URL: https://github.com/apache/kafka/pull/19786#issuecomment-2909105133 @AndrewJSchofield can you please review this PR as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-19259) Async consumer fetch intermittent delays on console consumer
[ https://issues.apache.org/jira/browse/KAFKA-19259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17953933#comment-17953933 ] Arpit Goyal edited comment on KAFKA-19259 at 5/26/25 8:20 AM: -- [~lianetm] I was just going through the code and I think this can be the issue though I am not able to reproduce it. 1. Application thread checks empty buffer and is about to wait 2. Consumer Network thread adds data and signals 3. Application thread misses the non-empty state check but hasn't started waiting yet 4. Application thread then waits for the full timeout despite data being available. [~lianetm] My bad. This will not be possible as both share the same lock. was (Author: JIRAUSER301926): [~lianetm] I was just going through the code and I think this can be the issue though I am not able to reproduce it. 1. Application thread checks empty buffer and is about to wait 2. Consumer Network thread adds data and signals 3. Application thread misses the non-empty state check but hasn't started waiting yet 4. Application thread then waits for the full timeout despite data being available. > Async consumer fetch intermittent delays on console consumer > > > Key: KAFKA-19259 > URL: https://issues.apache.org/jira/browse/KAFKA-19259 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 4.0.0 >Reporter: Lianet Magrans >Assignee: Arpit Goyal >Priority: Major > Fix For: 4.1.0 > > > We noticed that fetching with the kafka-console-consumer.sh tool using the > new consumer shows some intermittent delays, that are not seen when running > the same with the classic consumer. Note that I disabled auto-commit to > isolate the delay, and from a first look seems to come from the > fetchBuffer.awaitNonEmpty logic, that alternatively takes almost the full > poll timeout (runs "fast", then "slow", and continues to alternate) > [https://github.com/apache/kafka/blob/0b81d6c7802c1be55dc823ce51729f2c6a6071a7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1808] > > The difference in behaviour between the 2 consumers can be seen with this > setup: > * topic with 6 partitions (I tried with 1 partition first and didn't see the > delay, then with 3 and 6 I could see it) > * data populated in topic with producer sending generated uuids to the topic > in while loop > * run console consumer (asycn) no commit: > bin/kafka-console-consumer.sh --topic t1 --bootstrap-server localhost:9092 > --consumer-property group.protocol=consumer --group cg1 --consumer-property > enable.auto.commit=false > Here we can notice the pattern that looks like batches, and custom logs on > the awaitNonEmpty show it take the full poll timeout on alternate poll > iterations. > * run same but for classic consumer (consumer-property > group.protocol=classic) -> not such pattern of intermittent delays > Produce continuously (I used this) > while sleep 1; do echo $(uuidgen); done | bin/kafka-console-producer.sh > --bootstrap-server localhost:9092 --topic t1 > This needs more investigation to fully understand if it's indeed something in > the fetch path or something else) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta
[ https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954042#comment-17954042 ] Luke Chen commented on KAFKA-19331: --- Assigned to you. Thanks [~mingyen066] ! > No error handling for leader not appeared in applyLocalFollowersDelta > -- > > Key: KAFKA-19331 > URL: https://issues.apache.org/jira/browse/KAFKA-19331 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Ming-Yen Chung >Priority: Major > Labels: newbie > > In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from > the leader, we'll check if the leader node info is in metadata image. If > somehow it didn't include in the newImage, we'll log something like: > > {code:java} > [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching > quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader > Some(2) because it is not alive. (state.change.logger) > [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of > become-follower for 1 partitions (state.change.logger) > {code} > > It's confusing to users to see it's unable to fetch, then start fetch. And in > the end, it's not actually fetching... We should handling the error well by > updating the `FailedPartition` and not updating other successful result > status. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group [kafka]
dajac commented on PR #19796: URL: https://github.com/apache/kafka/pull/19796#issuecomment-2909150320 This one should basically be a copy of https://github.com/apache/kafka/pull/19761. Let's merge that one first and then we can review this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [11/N] move ConsumerWithLegacyMessageFormatIntegrationTest to clients-integration-tests module [kafka]
m1a2st commented on code in PR #19810: URL: https://github.com/apache/kafka/pull/19810#discussion_r2107996769 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerWithLegacyMessageFormatIntegrationTest.java: ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.RecordVersion; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.storage.internals.log.UnifiedLog; + +import org.junit.jupiter.api.BeforeEach; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + + +@ClusterTestDefaults( +brokers = 3 +) +public class ConsumerWithLegacyMessageFormatIntegrationTest { + +private final ClusterInstance cluster; + +private final String topic1 = "part-test-topic-1"; +private final String topic2 = "part-test-topic-2"; +private final String topic3 = "part-test-topic-3"; + +private final TopicPartition t1p0 = new TopicPartition(topic1, 0); +private final TopicPartition t1p1 = new TopicPartition(topic1, 1); +private final TopicPartition t2p0 = new TopicPartition(topic2, 0); +private final TopicPartition t2p1 = new TopicPartition(topic2, 1); +private final TopicPartition t3p0 = new TopicPartition(topic3, 0); +private final TopicPartition t3p1 = new TopicPartition(topic3, 1); + +public ConsumerWithLegacyMessageFormatIntegrationTest(ClusterInstance cluster) { +this.cluster = cluster; +} + +private void appendLegacyRecords(int numRecords, TopicPartition tp, int brokerId, byte magicValue) { +List records = new ArrayList<>(); +for (int i = 0; i < numRecords; i++) { +records.add(new SimpleRecord(i, ("key " + i).getBytes(), ("value " + i).getBytes())); +} + +ByteBuffer buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, +CompressionType.NONE, records)); +MemoryRecordsBuilder builder = MemoryRecords.builder( +buffer, +magicValue, +Compression.of(CompressionType.NONE).build(), +TimestampType.CREATE_TIME, +0L, +RecordBatch.NO_TIMESTAMP, +RecordBatch.NO_PRODUCER_ID, +RecordBatch.NO_PRODUCER_EPOCH, +0, +false, +RecordBatch.NO_PARTITION_LEADER_EPOCH +); + +records.forEach(builder::append); + +cluster.brokers().values().stream() +.filter(b -> b.config().brokerId() == brokerId) +.forEach(b -> { +UnifiedLog unifiedLog = b.replicaManager().logManager().getLog(tp, false).get(); + unifiedLog.appendAsLeaderWithRecordVersion(builder.build(), 0, RecordVersion.lookup(magicValue)); +// Default isolation.level is read_uncommitted. It makes Partition#fetchOffsetForTim
[jira] [Commented] (KAFKA-19336) Upgrade jackson libs since v2.16 is not maintained anymore
[ https://issues.apache.org/jira/browse/KAFKA-19336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954207#comment-17954207 ] Szu-Yung Wang commented on KAFKA-19336: --- Hi, May I take this? > Upgrade jackson libs since v2.16 is not maintained anymore > -- > > Key: KAFKA-19336 > URL: https://issues.apache.org/jira/browse/KAFKA-19336 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Priority: Major > > From jackson doc: [https://github.com/FasterXML/jackson/wiki/Jackson-Releases] > > [2.16 > >|https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.16]Closed in > >April 2025 with release of 2.19.0 > We should upgrade jackson libs to newer version. From the doc, it should be > backward compatible if we choose the latest 2.19.0. > > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.17] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.18] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.19] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-19310: (MINOR) Missing mocks for DelayedShareFetchTest tests related to Memory Records slicing [kafka]
adixitconfluent opened a new pull request, #19823: URL: https://github.com/apache/kafka/pull/19823 ### About Added test memory records to avoid the silent exception thrown during slicing. ### Testing Ran the tests of `DelayedShareFetchTest` to make sure that there is no silent exception in any test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]
ShivsundarR commented on code in PR #19818: URL: https://github.com/apache/kafka/pull/19818#discussion_r2108331701 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java: ## @@ -561,10 +568,42 @@ public void testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) { assertEquals(state, membershipManager.state()); verify(responseData, never()).memberId(); -verify(responseData, never()).memberEpoch(); +// In unsubscribed, we check if we received a leave group response, so we do verify member epoch. +if (state != MemberState.UNSUBSCRIBED) { +verify(responseData, never()).memberEpoch(); +} verify(responseData, never()).assignment(); } +@Test +public void testIgnoreLeaveResponseWhenNotLeavingGroup() { +ShareMembershipManager membershipManager = createMemberInStableState(); + +CompletableFuture leaveResult = membershipManager.leaveGroup(); + +// Send leave request, transitioning to UNSUBSCRIBED state +membershipManager.onHeartbeatRequestGenerated(); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +// Receive a previous heartbeat response, which should be ignored Review Comment: This response is not "ignored" right, we are setting the member epoch in the response to `membershipManager.memberEpoch` (-1 in this case). The leave operation is then processed successfully post which we are asserting if `leaveResult` has completed. If this is the case, can we change the comment above to reflect the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]
dajac commented on code in PR #19790: URL: https://github.com/apache/kafka/pull/19790#discussion_r2108336679 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java: ## @@ -1576,6 +1581,7 @@ public void testFromClassicGroup() { assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch()); assertEquals(expectedConsumerGroup.state(), consumerGroup.state()); assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor()); +assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata())); Review Comment: Ah, I got it. The issue is that TimelineHashMap does not implement toString. Hence when the comparison fails, you get a cryptic error: ``` Expected :org.apache.kafka.timeline.TimelineHashMap@17c13 Actual :org.apache.kafka.timeline.TimelineHashMap@308d9 ``` However, it still uses `.equals` to compare the objects. I am fine with keeping the copy to make it better here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]
ShivsundarR commented on code in PR #19818: URL: https://github.com/apache/kafka/pull/19818#discussion_r2108331701 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java: ## @@ -561,10 +568,42 @@ public void testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) { assertEquals(state, membershipManager.state()); verify(responseData, never()).memberId(); -verify(responseData, never()).memberEpoch(); +// In unsubscribed, we check if we received a leave group response, so we do verify member epoch. +if (state != MemberState.UNSUBSCRIBED) { +verify(responseData, never()).memberEpoch(); +} verify(responseData, never()).assignment(); } +@Test +public void testIgnoreLeaveResponseWhenNotLeavingGroup() { +ShareMembershipManager membershipManager = createMemberInStableState(); + +CompletableFuture leaveResult = membershipManager.leaveGroup(); + +// Send leave request, transitioning to UNSUBSCRIBED state +membershipManager.onHeartbeatRequestGenerated(); +assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); + +// Receive a previous heartbeat response, which should be ignored Review Comment: This response is not "ignored" right, we are setting the member epoch in the response to `membershipManager.memberEpoch` (-1 in this case). The leave operation is then processed successfully post which we are asserting if `leaveResult` has completed. If this is the case, can we change the comment above to reflect the same. (and in the other tests too). Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19334 MetadataShell execution unintentionally deletes lock file [kafka]
frankvicky commented on PR #19817: URL: https://github.com/apache/kafka/pull/19817#issuecomment-2911237948 @cmccabe: Could you please take a look when you have a free moment? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module [kafka]
brandboat commented on code in PR #19773: URL: https://github.com/apache/kafka/pull/19773#discussion_r2108051296 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java: ## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the consumer that covers logic related to manual assignment. + */ +@ClusterTestDefaults( +types = {Type.KRAFT}, +brokers = PlaintextConsumerAssignTest.BROKER_COUNT, +serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), +@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), +} +) +public class PlaintextConsumerAssignTest { + +public static final int BROKER_COUNT = 3; + +private final ClusterInstance clusterInstance; +private final String topic = "topic"; +private final int partition = 0; +TopicPartition tp = new TopicPartition(topic, partition); + +PlaintextConsumerAssignTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} + +@BeforeEach +public void setup() throws InterruptedException { +clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2); Review Comment: I think the BROKER_COUNT should be replica count, right? ```suggestion clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module [kafka]
TaiJuWu commented on code in PR #19773: URL: https://github.com/apache/kafka/pull/19773#discussion_r2108058182 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java: ## @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.clients.ClientsTestUtils; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; + +import org.junit.jupiter.api.BeforeEach; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Integration tests for the consumer that covers logic related to manual assignment. + */ +@ClusterTestDefaults( +types = {Type.KRAFT}, +brokers = PlaintextConsumerAssignTest.BROKER_COUNT, +serverProperties = { +@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "3"), +@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, value = "100"), +@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "10"), +} +) +public class PlaintextConsumerAssignTest { + +public static final int BROKER_COUNT = 3; + +private final ClusterInstance clusterInstance; +private final String topic = "topic"; +private final int partition = 0; +TopicPartition tp = new TopicPartition(topic, partition); + +PlaintextConsumerAssignTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} + +@BeforeEach +public void setup() throws InterruptedException { +clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2); Review Comment: Fix it, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module [kafka]
TaiJuWu commented on PR #19773: URL: https://github.com/apache/kafka/pull/19773#issuecomment-2910955102 Hi @brandboat , thanks for your detailed review and point out my mistakes, all comments are addressed, PTAL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7516: Attempt to dynamically load ManagementFactory [kafka]
github-actions[bot] commented on PR #19764: URL: https://github.com/apache/kafka/pull/19764#issuecomment-2910989098 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Improve the `KAFKA_HEAP_OPTS` definition while run `kafka-server-start.bat` Batch [kafka]
github-actions[bot] commented on PR #19703: URL: https://github.com/apache/kafka/pull/19703#issuecomment-2910989162 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Kafka 18913/cleanup state updater on failure [kafka]
github-actions[bot] commented on PR #19750: URL: https://github.com/apache/kafka/pull/19750#issuecomment-2910989126 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
FrankYang0529 commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107945239 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -20635,12 +21067,10 @@ barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) ), // Remove regex. List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId, "foo*")), -// Updated subscription metadata. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of( -barTopicName, new TopicMetadata(barTopicId, barTopicName, 3) -))), // Bumped epoch. - List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 0)) + List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, computeGroupHash(Map.of( +barTopicName, barTopicHash + ), result.records() Review Comment: Thanks for the suggestion. I add test to following cases: * update: testNewRacksDataInMetadataImageTriggersEpochBump * remove: testRemoveTopicCleanupTopicHash -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
FrankYang0529 commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107946290 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java: ## @@ -398,6 +423,21 @@ public Map computeSubscriptionMetadata( return Collections.unmodifiableMap(newSubscriptionMetadata); } +public static long computeMetadataHash( +Map subscribedTopicNames, +Map topicHashCache, +MetadataImage metadataImage +) { +Map topicHash = new HashMap<>(subscribedTopicNames.size()); +subscribedTopicNames.keySet().forEach(topicName -> +topicHash.put( +topicName, +topicHashCache.computeIfAbsent(topicName, k -> Utils.computeTopicHash(topicName, metadataImage)) Review Comment: Good catch! I think we can ignore topic if related `TopicImage` is not existent, so we can revert change in `Utils#computeGroupHash`. I also add related unit tests in `ConsumerGroupTest.java`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
FrankYang0529 commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107946911 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid( * @return The hash of the group. */ static long computeGroupHash(Map topicHashes) { -if (topicHashes.isEmpty()) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(); +for (Map.Entry entry : topicHashes.entrySet()) { +// Filter out entries with a hash value of 0, which indicates no topic +if (entry.getValue() != 0) { Review Comment: Removed related change here, because we can ignore null `TopicImage` in `ModernGroup#computeMetadataHash`. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18379: Enforce resigned cannot transition to any other state in same epoch [kafka]
github-actions[bot] commented on PR #19236: URL: https://github.com/apache/kafka/pull/19236#issuecomment-2910989346 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] WIP: 4.0 Markdown docs [kafka]
hvishwanath opened a new pull request, #19821: URL: https://github.com/apache/kafka/pull/19821 4.0 Markdown docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-18877: Add an mechanism to find cases where we accessed variables from the wrong thread. [kafka]
github-actions[bot] commented on PR #19231: URL: https://github.com/apache/kafka/pull/19231#issuecomment-2910989370 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] (wip )KAFKA-19042 consumer bounce test [kafka]
TaiJuWu opened a new pull request, #19822: URL: https://github.com/apache/kafka/pull/19822 Delete this text and replace it with a detailed description of your change. The PR title and body will become the squashed commit message. If you would like to tag individuals, add some commentary, upload images, or include other supplemental information that should not be part of the eventual commit message, please use a separate comment. If applicable, please include a summary of the testing strategy (including rationale) for the proposed change. Unit and/or integration tests are expected for any behavior change and system tests should be considered for larger changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-1792: change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments [kafka]
github-actions[bot] commented on PR #18903: URL: https://github.com/apache/kafka/pull/18903#issuecomment-2911010127 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19144 Move DelayedProduce to server module [kafka]
frankvicky commented on code in PR #19793: URL: https://github.com/apache/kafka/pull/19793#discussion_r2108058643 ## core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala: ## @@ -271,9 +271,10 @@ class LocalLeaderEndPointTest extends Logging { origin: AppendOrigin = AppendOrigin.CLIENT, requiredAcks: Short = -1): CallbackResult[PartitionResponse] = { val result = new CallbackResult[PartitionResponse]() -def appendCallback(responses: scala.collection.Map[TopicIdPartition, PartitionResponse]): Unit = { - val response = responses.get(partition) - assertTrue(response.isDefined) +def appendCallback(responses: java.util.Map[TopicIdPartition, PartitionResponse]): Unit = { Review Comment: Given that we already have an import alias for `java.util.Map`, we could reuse it. ```suggestion def appendCallback(responses: JMap[TopicIdPartition, PartitionResponse]): Unit = { ``` ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -967,12 +969,30 @@ class ReplicaManager(val config: KafkaConfig, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], initialAppendResults: Map[TopicIdPartition, LogAppendResult], initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus], -responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, +responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, ): Unit = { if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { // create delayed produce operation - val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) - val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) + val produceMetadata = new ProduceMetadata(requiredAcks, initialProduceStatus.asJava) + + def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = { +val (hasEnough, error) = getPartitionOrError(tp) match { + case Left(err) => +// Case A Review Comment: What does this comment mean? ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -967,12 +969,30 @@ class ReplicaManager(val config: KafkaConfig, entriesPerPartition: Map[TopicIdPartition, MemoryRecords], initialAppendResults: Map[TopicIdPartition, LogAppendResult], initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus], -responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit, +responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit, ): Unit = { if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, initialAppendResults)) { // create delayed produce operation - val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus) - val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, this, responseCallback) + val produceMetadata = new ProduceMetadata(requiredAcks, initialProduceStatus.asJava) + + def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit = { +val (hasEnough, error) = getPartitionOrError(tp) match { + case Left(err) => +// Case A Review Comment: I see this is relevant to the comment of `DelayedProduce,` but it's confusing that these comments are standalone here. Could you write the whole meaning of these cases, or link these comments to `tryComplete` ? ## core/src/test/scala/unit/kafka/server/KafkaApisTest.scala: ## @@ -2832,7 +2832,7 @@ class KafkaApisTest extends Logging { any(), ArgumentMatchers.eq(requestLocal), any() -)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE +)).thenAnswer(_ => responseCallback.getValue.apply(Map(new TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava)) Review Comment: ```suggestion )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE ``` ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -2990,9 +2990,9 @@ class ReplicaManagerTest { requiredAcks: Short = -1): CallbackResult[PartitionResponse] = { val result = new CallbackResult[PartitionResponse]() val topicIdPartition = new TopicIdPartition(topicId, partition) -def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): Unit = { - val response = responses.get(topicIdPartition) - assertTrue(response.isDefined) +def appendCallback(responses: util.Map[TopicIdPartition, PartitionResponse]): Unit = { + val response = java.util.Optional.ofNullable(responses.get(topicIdPartition)) Review Comment: We alrea
Re: [PR] [WIP] KAFKA-18562: standardize election/fetch timeout between Unattached and Followers [kafka]
github-actions[bot] commented on PR #18921: URL: https://github.com/apache/kafka/pull/18921#issuecomment-2911010160 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18157:Consider UnsupportedVersionException child class to represent the case of unsupported fields [kafka]
github-actions[bot] commented on PR #18072: URL: https://github.com/apache/kafka/pull/18072#issuecomment-2911010040 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19336) Upgrade jackson libs since v2.16 is not maintained anymore
Luke Chen created KAFKA-19336: - Summary: Upgrade jackson libs since v2.16 is not maintained anymore Key: KAFKA-19336 URL: https://issues.apache.org/jira/browse/KAFKA-19336 Project: Kafka Issue Type: Improvement Reporter: Luke Chen >From jackson doc: [https://github.com/FasterXML/jackson/wiki/Jackson-Releases] > [2.16 |https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.16]Closed >in April 2025 with release of 2.19.0 We should upgrade jackson libs to newer version. From the doc, it should be backward compatible if we choose the latest 2.19.0. [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.17] [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.18] [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.19] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19042: [9/N] Move GroupAuthorizerIntegrationTest to clients-integration-tests module [kafka]
nick-zh commented on PR #19685: URL: https://github.com/apache/kafka/pull/19685#issuecomment-2911205618 While I would love to help, I think you meant a different Nick @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19315: Move ControllerMutationQuotaManager to server module [kafka]
m1a2st commented on code in PR #19807: URL: https://github.com/apache/kafka/pull/19807#discussion_r2107979149 ## server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java: ## @@ -0,0 +1,938 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.internals.Plugin; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Quota; +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.Sanitizer; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.network.Session; +import org.apache.kafka.server.config.ClientQuotaManagerConfig; +import org.apache.kafka.server.quota.ClientQuotaCallback; +import org.apache.kafka.server.quota.ClientQuotaEntity; +import org.apache.kafka.server.quota.ClientQuotaType; +import org.apache.kafka.server.quota.QuotaType; +import org.apache.kafka.server.quota.QuotaUtils; +import org.apache.kafka.server.quota.SensorAccess; +import org.apache.kafka.server.quota.ThrottleCallback; +import org.apache.kafka.server.quota.ThrottledChannel; +import org.apache.kafka.server.util.ShutdownableThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Consumer; + +final class QuotaTypes { +static final int NO_QUOTAS = 0; +static final int CLIENT_ID_QUOTA_ENABLED = 1; +static final int USER_QUOTA_ENABLED = 2; +static final int USER_CLIENT_ID_QUOTA_ENABLED = 4; +static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are used with custom quotas +} + +public class ClientQuotaManager { + +private static final Logger log = LoggerFactory.getLogger(ClientQuotaManager.class); + +// Purge sensors after 1 hour of inactivity +public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600; +private static final String DEFAULT_NAME = ""; + +public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY = +new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE); +public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY = +new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null); +public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY = +new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, DefaultClientIdEntity.INSTANCE); + +public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { } + +public static class UserEntity implements BaseUserEntity { +private final String sanitizedUser; + +public UserEntity(String sanitizedUser) { +this.sanitizedUser = sanitizedUser; +} + +@Override +public ClientQuotaEntity.ConfigEntityType entityType() { +return ClientQuotaEntity.ConfigEntityType.USER; +} + +@Override +public String name() { +return Sanitizer.desanitize(sanitizedUser); +} + +public String getSanitizedUser() { +return sanitizedUser; +} + +@Override +public String toString() { +return "user " + sanitizedUser; +} + +@Override +public boolean equals(Object obj) { +if (this == obj) return true; +if (obj == null || getClass() != obj.getClass()) return false; +UserEntity t
[PR] KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets [kafka]
JimmyWang6 opened a new pull request, #19820: URL: https://github.com/apache/kafka/pull/19820 [KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to finish the AlterShareGroupOffsets for ShareGroupCommand part. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-19336) Upgrade jackson libs since v2.16 is not maintained anymore
[ https://issues.apache.org/jira/browse/KAFKA-19336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Szu-Yung Wang reassigned KAFKA-19336: - Assignee: Szu-Yung Wang > Upgrade jackson libs since v2.16 is not maintained anymore > -- > > Key: KAFKA-19336 > URL: https://issues.apache.org/jira/browse/KAFKA-19336 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Szu-Yung Wang >Priority: Major > > From jackson doc: [https://github.com/FasterXML/jackson/wiki/Jackson-Releases] > > [2.16 > >|https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.16]Closed in > >April 2025 with release of 2.19.0 > We should upgrade jackson libs to newer version. From the doc, it should be > backward compatible if we choose the latest 2.19.0. > > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.17] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.18] > [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.19] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19334) MetadataShell bypasses file lock unexpectedly due to lock file deletion
[ https://issues.apache.org/jira/browse/KAFKA-19334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TengYao Chi updated KAFKA-19334: Fix Version/s: 3.9.2 4.0.1 4.1.0 > MetadataShell bypasses file lock unexpectedly due to lock file deletion > --- > > Key: KAFKA-19334 > URL: https://issues.apache.org/jira/browse/KAFKA-19334 > Project: Kafka > Issue Type: Bug >Affects Versions: 4.0.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Major > Fix For: 3.9.2, 4.0.1, 4.1.0 > > > MetadataShell acquires log dir lock to prevent unexpected results by > concurrent reads/writes fixed by [https://github.com/apache/kafka/pull/14899]. > > This lock works as expected when we execute MetadataShell against running log > for the first time: > > {code:java} > % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint > Unexpected error: Unable to lock /path/to/data. Please ensure that no broker > or controller process is using this directory before proceeding.{code} > However, if we execute MetadataShell with same command again with controller > still running, it succeeds unexpectedly. > > > {code:java} > % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint > Loading... > Starting... > [ Kafka Metadata Shell ] > >> {code} > And I found that .lock file vanishes after MetadataShell exit. > > This is because MetadataShell calls FileLock#destroy when it fails acquiring > lock, which deletes the lock file. > [https://github.com/apache/kafka/blob/4.0.0/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java#L131] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19334) MetadataShell bypasses file lock unexpectedly due to lock file deletion
[ https://issues.apache.org/jira/browse/KAFKA-19334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954105#comment-17954105 ] Haruki Okada commented on KAFKA-19334: -- [~cmccabe] Submitted a patch for this. Could you take a look? > MetadataShell bypasses file lock unexpectedly due to lock file deletion > --- > > Key: KAFKA-19334 > URL: https://issues.apache.org/jira/browse/KAFKA-19334 > Project: Kafka > Issue Type: Bug >Affects Versions: 4.0.0 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Major > > MetadataShell acquires log dir lock to prevent unexpected results by > concurrent reads/writes fixed by [https://github.com/apache/kafka/pull/14899]. > > This lock works as expected when we execute MetadataShell against running log > for the first time: > > {code:java} > % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint > Unexpected error: Unable to lock /path/to/data. Please ensure that no broker > or controller process is using this directory before proceeding.{code} > However, if we execute MetadataShell with same command again with controller > still running, it succeeds unexpectedly. > > > {code:java} > % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint > Loading... > Starting... > [ Kafka Metadata Shell ] > >> {code} > And I found that .lock file vanishes after MetadataShell exit. > > This is because MetadataShell calls FileLock#destroy when it fails acquiring > lock, which deletes the lock file. > [https://github.com/apache/kafka/blob/4.0.0/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java#L131] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19333) Inconsistent behavior between `ConsumerMembershipManager` and `StreamsMembershipManager` on `onAllTasksLost` execution
[ https://issues.apache.org/jira/browse/KAFKA-19333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954108#comment-17954108 ] Nick Guo commented on KAFKA-19333: -- Hi [~lianetm] ! Thanks for taking time to explain this,it's really helpful! > Inconsistent behavior between `ConsumerMembershipManager` and > `StreamsMembershipManager` on `onAllTasksLost` execution > -- > > Key: KAFKA-19333 > URL: https://issues.apache.org/jira/browse/KAFKA-19333 > Project: Kafka > Issue Type: Improvement > Components: clients, streams >Reporter: Nick Guo >Assignee: Nick Guo >Priority: Minor > > `ConsumerMembershipManager` does not create an event to run a callback if > there is nothing to revoke,but `StreamsMembershipManager` does. > related discussion and pr: > discussion:[https://github.com/apache/kafka/pull/18551/files#r2106243432] > pr: https://github.com/apache/kafka/pull/19779 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19056: Rewrite EndToEndClusterIdTest in Java and move it to the server module [kafka]
mingyen066 commented on code in PR #19741: URL: https://github.com/apache/kafka/pull/19741#discussion_r2107586894 ## server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java: ## @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.api; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.ClusterResource; +import org.apache.kafka.common.ClusterResourceListener; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.metrics.MetricConfigs; +import org.apache.kafka.test.MockConsumerInterceptor; +import org.apache.kafka.test.MockDeserializer; +import org.apache.kafka.test.MockMetricsReporter; +import org.apache.kafka.test.MockProducerInterceptor; +import org.apache.kafka.test.MockSerializer; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.BeforeEach; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.kafka.test.TestUtils.isValidClusterId; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** The test cases here verify the following conditions. + * 1. The ProducerInterceptor receives the cluster id after the onSend() method is called and before onAcknowledgement() method is called. + * 2. The Serializer receives the cluster id before the serialize() method is called. + * 3. The producer MetricReporter receives the cluster id after send() method is called on KafkaProducer. + * 4. The ConsumerInterceptor receives the cluster id before the onConsume() method. + * 5. The Deserializer receives the cluster id before the deserialize() method is called. + * 6. The consumer MetricReporter receives the cluster id after poll() is called on KafkaConsumer. + * 7. The broker MetricReporter receives the cluster id after the broker startup is over. + * 8. The broker KafkaMetricReporter receives the cluster id after the broker startup is over. + * 9. All the components receive the same cluster id. + */ +@ClusterTestDefaults(serverProperties = { +@ClusterConfigProperty(key = MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.api.EndToEndClusterIdTest$MockCommonMetricsReporter"), +}) +public class EndToEndClusterIdTest { + +private static final String TOPIC = "e2etopic"; +private static final int PARTITION = 0; +private static final TopicPartition TP = new TopicPartition(TOPIC, PARTITION); +private final ClusterInstance clusterInstance; +private String clusterBrokerId; +private String controllerId; +private static final String PRODUCER_CLIENT_ID = "producerClientId"; +private static final String CONSUMER_CLIENT_ID = "consumerClientId"; + +EndToEndClusterIdTest(ClusterInstance clusterInstance) { +this.clusterInstance = clusterInstance; +} + +@BeforeEach +public void setup() throws InterruptedException { +this.clusterInstance.createTopic(TOPIC, 2, (short) 1); +clusterBrokerId = String.valueOf(clusterInstance.brokerIds().iterator().next()); +controllerId = String.valueOf(clusterInstance.controllerIds().iterator().next()); +MockDeserializer.resetStaticVariables(); +} + +public static class MockCommonMetricsReporter extends MockMetricsReporter implements ClusterResourceListener { +public static final Map CLU
[jira] [Created] (KAFKA-19335) Streams group heartbeat sometimes fails with INVALID_REQUEST
Lucas Brutschy created KAFKA-19335: -- Summary: Streams group heartbeat sometimes fails with INVALID_REQUEST Key: KAFKA-19335 URL: https://issues.apache.org/jira/browse/KAFKA-19335 Project: Kafka Issue Type: Sub-task Reporter: Lucas Brutschy KIP-1071 soak sometimes fails with this error: {{[2025-04-22 17:34:12,585] ERROR [consumer_background_thread] [Consumer instanceId=ip-172-31-9-4.us-west-2.compute.internal-1, clientId=i-0b96c1803fb4c32d7-StreamThread-1-consumer, groupId=stream-soak-test] StreamsGroupHeartbeatRequest failed due to INVALID_REQUEST: Topology can only be provided when (re-)joining. (org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager)}} h3. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-19335) Streams group heartbeat sometimes fails with INVALID_REQUEST
[ https://issues.apache.org/jira/browse/KAFKA-19335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-19335: -- Assignee: Lucas Brutschy > Streams group heartbeat sometimes fails with INVALID_REQUEST > > > Key: KAFKA-19335 > URL: https://issues.apache.org/jira/browse/KAFKA-19335 > Project: Kafka > Issue Type: Sub-task >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > > KIP-1071 soak sometimes fails with this error: > {{[2025-04-22 17:34:12,585] ERROR [consumer_background_thread] [Consumer > instanceId=ip-172-31-9-4.us-west-2.compute.internal-1, > clientId=i-0b96c1803fb4c32d7-StreamThread-1-consumer, > groupId=stream-soak-test] StreamsGroupHeartbeatRequest failed due to > INVALID_REQUEST: Topology can only be provided when (re-)joining. > (org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager)}} > h3. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-19335: Membership Managers end negative epoch in JOINING [kafka]
lucasbru opened a new pull request, #19818: URL: https://github.com/apache/kafka/pull/19818 There is a sequence of interactions with the membership managers of KIP-848, KIP-932, KIP-1071 that can put the member ship manager into JOINING state, but where member epoch is set to -1. This can result in an invalid request being sent, since joining heartbeats should not have member epoch -1. This may lead to the member failing to join. In the case of streams, the group coordinator will return INVALID_REQUEST. This is the sequence triggering the bug, which seems to relatively likely, caused by two heartbeat responses being received after the next one has been sent. `membershipManager.leaveGroup(); -> transitions to LEAVING membershipManager.onHeartbeatRequestGenerated(); -> transitions to UNSUBSCRIBED membershipManager.onHeartbeatSuccess(... with member epoch > 0); -> unblocks the consumer membershipManager.onSubscriptionUpdated(); membershipManager.onConsumerPoll(); -> transitions to JOINING membershipManager.onHeartbeatSuccess(... with member epoch < 0); -> updates the epoch to a negative value -> Now we are in state JOINING with memberEpoch -1, and the next heartbeat we send will be malformed, triggering INVALID_REQUEST` The bug may also be triggered if the `unsubscribe` times out, but this seems more of a corner case. To prevent the bug, we are taking two measures: The likely path to triggering the bug can be prevented by not unblocking an `unsubscribe` call in the consumer when a non-leave-heartbeat epoch is received. Once we have sent out leave group heartbeat, we will ignore all heartbeats, except for those containing memberEpoch < 0. For extra measure, we also prevent the second case (`unsubscribe` timing out). In this case, the consumer gets unblocked before we have received the leave group heartbeat response, and may resubscribe to the group. In this case, we shall just ignore the heartbeat response that contains a member epoch < 0, once it arrives and we have already left the `UNSUBSCRIBED` state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-19288) Ensure new consumer joining attempt not overwritten on delayed HB response to previous leave
[ https://issues.apache.org/jira/browse/KAFKA-19288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-19288: -- Assignee: Lucas Brutschy (was: Lianet Magrans) > Ensure new consumer joining attempt not overwritten on delayed HB response to > previous leave > > > Key: KAFKA-19288 > URL: https://issues.apache.org/jira/browse/KAFKA-19288 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848-client-support > Fix For: 4.1.0 > > > For the asyn consumer, check and cover edge case where the consumer may > transition to JOINING, but receive a delayed response to a previous leave > request. We should ensure that the response is ignored and the consumer joins > as it intended. > I expect this doesn't usually happens given that the unsubscribe is a > blocking operation, so the consumer won't be able to join until the > unsubscribe completes. But if the leave request doesn't get a response in > time (unsubscribes fails quietly), and the response arrives after it, I > expect the joining could be overwritten (member updating it's joining epoch 0 > to the -1 epoch received in the response) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19288) Ensure new consumer joining attempt not overwritten on delayed HB response to previous leave
[ https://issues.apache.org/jira/browse/KAFKA-19288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954114#comment-17954114 ] Lucas Brutschy commented on KAFKA-19288: https://github.com/apache/kafka/pull/19818 > Ensure new consumer joining attempt not overwritten on delayed HB response to > previous leave > > > Key: KAFKA-19288 > URL: https://issues.apache.org/jira/browse/KAFKA-19288 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848-client-support > Fix For: 4.1.0 > > > For the asyn consumer, check and cover edge case where the consumer may > transition to JOINING, but receive a delayed response to a previous leave > request. We should ensure that the response is ignored and the consumer joins > as it intended. > I expect this doesn't usually happens given that the unsubscribe is a > blocking operation, so the consumer won't be able to join until the > unsubscribe completes. But if the leave request doesn't get a response in > time (unsubscribes fails quietly), and the response arrives after it, I > expect the joining could be overwritten (member updating it's joining epoch 0 > to the -1 epoch received in the response) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19330: Change MockSerializer/Deserializer to use String serializer instead of byte[] [kafka]
chia7712 commented on code in PR #19812: URL: https://github.com/apache/kafka/pull/19812#discussion_r2107630603 ## clients/src/test/java/org/apache/kafka/test/MockDeserializer.java: ## @@ -52,11 +53,14 @@ public void configure(Map configs, boolean isKey) { } @Override -public byte[] deserialize(String topic, byte[] data) { +public String deserialize(String topic, byte[] data) { // This will ensure that we get the cluster metadata when deserialize is called for the first time // as subsequent compareAndSet operations will fail. clusterIdBeforeDeserialize.compareAndSet(noClusterId, clusterMeta.get()); -return data; +if (data == null) Review Comment: ```java if (data == null) return null; return data.getBytes(StandardCharsets.UTF_8); ``` ## clients/src/test/java/org/apache/kafka/test/MockSerializer.java: ## @@ -35,11 +36,14 @@ public MockSerializer() { } @Override -public byte[] serialize(String topic, byte[] data) { +public byte[] serialize(String topic, String data) { // This will ensure that we get the cluster metadata when serialize is called for the first time // as subsequent compareAndSet operations will fail. CLUSTER_ID_BEFORE_SERIALIZE.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get()); -return data; +if (data == null) Review Comment: ditto ## clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java: ## @@ -603,7 +603,7 @@ public void testSerializerClose() { final int oldInitCount = MockSerializer.INIT_COUNT.get(); final int oldCloseCount = MockSerializer.CLOSE_COUNT.get(); -KafkaProducer producer = new KafkaProducer<>( +KafkaProducer producer = new KafkaProducer<>( Review Comment: ```java try (var ignored = new KafkaProducer<>(configs, new MockSerializer(), new MockSerializer())) { assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get()); assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get()); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19288) Ensure new consumer joining attempt not overwritten on delayed HB response to previous leave
[ https://issues.apache.org/jira/browse/KAFKA-19288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954116#comment-17954116 ] Lucas Brutschy commented on KAFKA-19288: The sequence is slightly different from what is described in this ticket, and it could be more likely. What is actually happening that after unsubscribing, we receive a heartbeat response with memberEpoch > 0, which happens to unblock the consumer. So unsubscribe does not need to time out for this to be triggered. > Ensure new consumer joining attempt not overwritten on delayed HB response to > previous leave > > > Key: KAFKA-19288 > URL: https://issues.apache.org/jira/browse/KAFKA-19288 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Major > Labels: kip-848-client-support > Fix For: 4.1.0 > > > For the asyn consumer, check and cover edge case where the consumer may > transition to JOINING, but receive a delayed response to a previous leave > request. We should ensure that the response is ignored and the consumer joins > as it intended. > I expect this doesn't usually happens given that the unsubscribe is a > blocking operation, so the consumer won't be able to join until the > unsubscribe completes. But if the leave request doesn't get a response in > time (unsubscribes fails quietly), and the response arrives after it, I > expect the joining could be overwritten (member updating it's joining epoch 0 > to the -1 epoch received in the response) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19330: Change MockSerializer/Deserializer to use String serializer instead of byte[] [kafka]
mingyen066 commented on PR #19812: URL: https://github.com/apache/kafka/pull/19812#issuecomment-2910251765 @chia7712 Thanks for the review. I've addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19300: AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException [kafka]
chia7712 merged PR #19779: URL: https://github.com/apache/kafka/pull/19779 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-19300) AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException
[ https://issues.apache.org/jira/browse/KAFKA-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-19300. Resolution: Fixed > AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException > --- > > Key: KAFKA-19300 > URL: https://issues.apache.org/jira/browse/KAFKA-19300 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Chia-Ping Tsai >Assignee: Nick Guo >Priority: Major > Fix For: 4.1.0 > > > it can be reproduced by > GroupAuthorizerIntegrationTest.testConsumeUnsubscribeWithoutGroupPermission. > The root cause is shown below. > 1. AsyncConsumer#unsubscribe is executed > 2. process(UnsubscribeEvent) changes the state to LEAVING > 3. the state is changed to UNSUBSCRIBED in generating Heartbeat > 4. the `LeaveInProgress` is skipped in transitionToFatal since the state is > UNSUBSCRIBED > The behavior is inconsistent to classic consumer -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19322: Remove the DelayedOperation constructor that accepts an external lock [kafka]
chia7712 merged PR #19798: URL: https://github.com/apache/kafka/pull/19798 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-19322) Remove the DelayedOperation constructor that accepts an external lock
[ https://issues.apache.org/jira/browse/KAFKA-19322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-19322. Fix Version/s: 4.1.0 Resolution: Fixed > Remove the DelayedOperation constructor that accepts an external lock > - > > Key: KAFKA-19322 > URL: https://issues.apache.org/jira/browse/KAFKA-19322 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Yu Chia Ma >Priority: Minor > Fix For: 4.1.0 > > > see discussion > https://github.com/apache/kafka/pull/19759#discussion_r2097557356 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19144 Move DelayedProduce to server module [kafka]
chia7712 commented on PR #19793: URL: https://github.com/apache/kafka/pull/19793#issuecomment-2910286667 @johnny94 please fix the conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFK1-19315: Move ControllerMutationQuotaManager to server module [kafka]
chia7712 commented on code in PR #19807: URL: https://github.com/apache/kafka/pull/19807#discussion_r2107643996 ## server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java: ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Time; + +/** + * The PermissiveControllerMutationQuota defines a permissive quota for a given user/clientId pair. + * The quota is permissive meaning that 1) it does accept any mutations even if the quota is + * exhausted; and 2) it does throttle as soon as the quota is exhausted. + * + * @param time @Time object to use Review Comment: ditto ## server/src/main/java/org/apache/kafka/server/UnboundedControllerMutationQuota.java: ## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server; + +/** + * Default quota used when quota is disabled. + */ +public class UnboundedControllerMutationQuota implements ControllerMutationQuota { + +public static final UnboundedControllerMutationQuota INSTANCE = new UnboundedControllerMutationQuota(); Review Comment: Could you please inline this class? ```java public interface ControllerMutationQuota { ControllerMutationQuota UNBOUNDED_CONTROLLER_MUTATION_QUOTA = new ControllerMutationQuota() { @Override public boolean isExceeded() { return false; } @Override public void record(double permits) { } @Override public int throttleTime() { return 0; } }; ``` ## server/src/main/java/org/apache/kafka/server/AbstractControllerMutationQuota.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server; + +import org.apache.kafka.common.metrics.QuotaViolationException; +import org.apache.kafka.common.utils.Time; + +/** + * The AbstractControllerMutationQuota is the base class of StrictControllerMutationQuota and + * PermissiveControllerMutationQuota. + * + * @param time @Time object to use Review Comment: please fix the docs ## server/src/main/java/org/apache/kafka/server/ClientSensors.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF
Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]
FrankYang0529 commented on code in PR #19802: URL: https://github.com/apache/kafka/pull/19802#discussion_r2107522589 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ## @@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds( } /** - * @return An immutable map of partition metadata for each topic that are inputs for this streams group. + * @return The metadata hash. */ -public Map partitionMetadata() { -return Collections.unmodifiableMap(partitionMetadata); +public long metadataHash() { +return metadataHash.get(); } /** - * Updates the partition metadata. This replaces the previous one. + * Updates the metadata hash. * - * @param partitionMetadata The new partition metadata. + * @param metadataHash The new metadata hash. */ -public void setPartitionMetadata( -Map partitionMetadata -) { -this.partitionMetadata.clear(); -this.partitionMetadata.putAll(partitionMetadata); -maybeUpdateConfiguredTopology(); -maybeUpdateGroupState(); Review Comment: We may not be able to set metadata hash in `ConfiguredTopology`. When replaying `StreamsGroupMetadataValue`, we need to set `metadataHash` back. If we add it to `ConfiguredTopology`, that means there will have a non-empty `ConfiguredTopology` in the group and the `metadataHash` may be the same as latest computed value. Then we don't have a value to compare whether need to compute a new `ConfiguredTopology`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]
lianetm commented on PR #19818: URL: https://github.com/apache/kafka/pull/19818#issuecomment-2910378099 Hey @lucasbru, thanks for taking on this one. Agree with the gap on leave HB responses received in unexpected order. And the fix makes sense to me (only complete the leave if the HB response is a response to leave, and never apply epoch received in a leave HB response). What I'm not seeing clearly is how this would lead to INVALID_REQUEST? (so worried that even though this is a sensible gap and fix there may still be something else behind the failure you got?). If this race happens, I expect that we end up sending a full HB (all fields), but with the -1 epoch, correct? Then the request should fail with UNKNOWN_MEMBER, the moment the coordinator tries to find the member that wants to leave https://github.com/apache/kafka/blob/6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L3902 (same applies for the Consumer btw, we should get UNKNOWN_MEMBER if the client sends a full HB to join but with epoch -1 by mistake/race). Thoughts? not sure if I'm missing something here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]
lucasbru commented on code in PR #19802: URL: https://github.com/apache/kafka/pull/19802#discussion_r2107633695 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ## @@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds( } /** - * @return An immutable map of partition metadata for each topic that are inputs for this streams group. + * @return The metadata hash. */ -public Map partitionMetadata() { -return Collections.unmodifiableMap(partitionMetadata); +public long metadataHash() { +return metadataHash.get(); } /** - * Updates the partition metadata. This replaces the previous one. + * Updates the metadata hash. * - * @param partitionMetadata The new partition metadata. + * @param metadataHash The new metadata hash. */ -public void setPartitionMetadata( -Map partitionMetadata -) { -this.partitionMetadata.clear(); -this.partitionMetadata.putAll(partitionMetadata); -maybeUpdateConfiguredTopology(); -maybeUpdateGroupState(); Review Comment: Ah, I did not mean as a replacement for the `TimelineLong` inside, `StreamsGroup`. We need an additional hash to be stored inside `ConfiguredTopology`, to remember for which state of the topics it was calculated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]
lucasbru commented on PR #19818: URL: https://github.com/apache/kafka/pull/19818#issuecomment-2910391860 > Thoughts? not sure if I'm missing something here Thanks for taking a look, @lianetm ! You are right, consumer groups and share groups should fail with `UNKNOWN_MEMBER`. In streams, we send the topology in a full request, and have code to reject when it is being sent in anything but a joining heartbeat. This will trigger the `INVALID_REQUEST`. Hope that will clear things up! https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L539 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
dajac commented on PR #19761: URL: https://github.com/apache/kafka/pull/19761#issuecomment-2909780629 I would like to merge https://github.com/apache/kafka/pull/19790 before merging this one because we need to cherry-pick it to 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]
ahuang98 commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2107750048 ## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ## @@ -453,40 +452,34 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /** - * Helper method to update quota types counts and quotaTypesEnabled flag. - * @param quotaTypeKey The QuotaTypes constant (e.g., QuotaTypes.UserClientIdQuotaEnabled) - * @param increment True to increment count, false to decrement + * Helper method to update quotaTypesEnabled which is a bitwise OR combination of the enabled quota types. + * For example: + * - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then quotaTypesEnabled = 3 (2 | 1 = 3) + * - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then quotaTypesEnabled = (4 | 1 = 5) + * - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then quotaTypesEnabled = 6 (4 | 2 = 6) + * - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7 */ - private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = { -if (quotaTypeKey == QuotaTypes.NoQuotas) { - return -} -val previousQuotaTypesEnabled = quotaTypesEnabled - -// Update activeQuotaTypes counts -activeQuotaTypes.compute(quotaTypeKey, (_, count) => - if (increment) Option(count).getOrElse(0) + 1 - else if (Option(count).exists(_ > 1)) count - 1 - else 0 -) + private def updateQuotaTypes(): Unit = { +quotaTypesEnabled = if (clientQuotaCallbackPlugin.isDefined) { +QuotaTypes.CustomQuotas Review Comment: thanks for the details and for the additional test! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]
ahuang98 commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2107754429 ## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ## @@ -428,18 +427,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { -if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled -else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) -quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - quota match { -case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) -case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) +case Some(newQuota) => + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + if(!activeQuotaEntities.put(quotaEntity, true)){ Review Comment: we're adding quotaEntity (of type KafkaQuotaEntity) to a map whose keys are of type ClientQuotaEntity? ## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ## @@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case None => new DefaultQuotaCallback } private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) + private val activeQuotaEntities = new ConcurrentHashMap[ClientQuotaEntity, Boolean]() Review Comment: perhaps you meant to make the key type KafkaQuotaEntity - in any case though, I'm not sure I understand why the key is of type "...QuotaEntity" (which contains both the type and name) instead of just "ConfigEntityType" (which contains only the type) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [9/N] Move GroupAuthorizerIntegrationTest to clients-integration-tests module [kafka]
chia7712 commented on PR #19685: URL: https://github.com/apache/kafka/pull/19685#issuecomment-2910266692 @nick-zh please fix the conflicts -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]
MahsaSeifikar commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2107803984 ## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ## @@ -428,18 +427,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, try { val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity) - if (userEntity.nonEmpty) { -if (quotaEntity.clientIdEntity.nonEmpty) - quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled -else - quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled - } else if (clientEntity.nonEmpty) -quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled - quota match { -case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) -case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity) +case Some(newQuota) => + quotaCallback.updateQuota(clientQuotaType, quotaEntity, newQuota.bound) + if(!activeQuotaEntities.put(quotaEntity, true)){ Review Comment: My bad it shoud be a map with KafkaQuotaEntity as key -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19154; Offset Fetch API should return INVALID_OFFSET if requested topic id does not match persisted one [kafka]
lianetm commented on code in PR #19744: URL: https://github.com/apache/kafka/pull/19744#discussion_r2107800303 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -859,6 +859,7 @@ boolean hasCommittedOffset( * * @return A List of OffsetFetchResponseTopics response. */ +@SuppressWarnings("NPathComplexity") Review Comment: Can we maybe avoid the suppression if we use a var for the new check? (something like `var isInvalidOffset = offsetAndMetadata == null || isMismatchedTopicId(...` and check on it on ln 910? ## core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala: ## @@ -527,4 +527,88 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) extends GroupCoordinatorB ) } } + + @ClusterTest + def testFetchOffsetWithRecreatedTopic(): Unit = { +// There are two ways to ensure that committed of recreated topics are not returned. +// 1) When a topic is deleted, GroupCoordinatorService#onPartitionsDeleted is called to +//delete all its committed offsets. +// 2) Since version 10 of the OffsetCommit API, the topic id is stored alongside the +//committed offset. When it is queried, it is only returned iff the topic id of +//committed offset matches the requested one. +// The test tests both conditions but not in a deterministic way as they race +// against each others. Review Comment: yeap, tricky to play against that here on integration tests, but seems good enough because it ensures the final outcome (combined with the unit tests for the topic ID check) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]
dongnuo123 commented on code in PR #19790: URL: https://github.com/apache/kafka/pull/19790#discussion_r210783 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java: ## @@ -1576,6 +1581,7 @@ public void testFromClassicGroup() { assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch()); assertEquals(expectedConsumerGroup.state(), consumerGroup.state()); assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor()); +assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata())); Review Comment: Without it `assertEquals` just compares the reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]
dongnuo123 commented on code in PR #19790: URL: https://github.com/apache/kafka/pull/19790#discussion_r2107834673 ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -2485,8 +2485,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { // Test offset deletion while consuming val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, util.Set.of(tp1, tp2)) -// Top level error will equal to the first partition level error -assertFutureThrows(classOf[GroupSubscribedToTopicException], offsetDeleteResult.all()) Review Comment: Reverted it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… [kafka]
cadonna commented on code in PR #19275: URL: https://github.com/apache/kafka/pull/19275#discussion_r2107363103 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -446,29 +446,15 @@ private List getTaskIdsAsStrings(final KafkaStreams streams) { private static Stream singleAndMultiTaskParameters() { Review Comment: This name does not really fit anymore. I propose to rename this method to `topologyComplexityAndRebalanceProtocol`. ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -484,7 +470,7 @@ private Properties props(final Properties extraProperties) { streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, TestClientSupplier.class); streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, TestConsumerWrapper.class); streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); -streamsConfiguration.putAll(extraProperties); Review Comment: This does not seem right. On line 472 the group protocol config is passed to `props()`, but here it is ignored. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -395,18 +379,15 @@ public void handleAssignment(final Map> activeTasks, // 2. for tasks that have changed active/standby status, just recycle and skip re-creating them // 3. otherwise, close them since they are no longer owned final Map failedTasks = new LinkedHashMap<>(); -if (stateUpdater == null) { -handleTasksWithoutStateUpdater(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); -} else { -handleTasksWithStateUpdater( -activeTasksToCreate, -standbyTasksToCreate, -tasksToRecycle, -tasksToCloseClean, -failedTasks -); - failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater()); -} + +handleTasksWithStateUpdater( Review Comment: Could you please rename this method to `handleTasks()`. We do not need to distinguish the cases with and without state updater. ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1909,7 +1772,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro } @Test -public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() { +public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() { Review Comment: Could you please change ```java final long changelogOffsetOfRunningTask = 42L; ``` to ```java final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET; ``` to make the case more real? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer streamsUncaughtExceptionHandler) { -final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); Review Comment: There is still a system test that uses the config. It is [`streams_upgrade_test.test_upgrade_downgrade_state_updater()`](https://github.com/apache/kafka/blob/1ded681684e771b16aa98ae751f39b9816345a83/tests/kafkatest/tests/streams/streams_upgrade_test.py#L178). There is a comment that says: ``` Once same-thread state restoration is removed from the code, this test should use different versions of the code. ``` I guess it means to only use a version before `3.8` (e.g. `LATEST_3_7`) for the `from_version` and `DEV_VERSION` for the `to_version`. You need to choose a version before `3.8` because before `3.8` the state updater was not enabled by default. @lucasbru did I correctly interpret your comment? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1940,57 +1803,6 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat ); } -@Test -public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception { Review Comment: Could you please replace this test with the following: ```java @Test public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() { final StreamTask restoringStatefulTask = statefulTask(taskId01, taskId01ChangelogPartitions) .inState(State.RESTORING).build(); final long changelogOffsetOfRestoringStandbyTask = 84L
Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]
MahsaSeifikar commented on code in PR #19742: URL: https://github.com/apache/kafka/pull/19742#discussion_r2107840373 ## core/src/main/scala/kafka/server/ClientQuotaManager.scala: ## @@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case None => new DefaultQuotaCallback } private val clientQuotaType = QuotaType.toClientQuotaType(quotaType) + private val activeQuotaEntities = new ConcurrentHashMap[ClientQuotaEntity, Boolean]() Review Comment: `KafkaQuotaEntity` is used as the key for `activeQuotaEntities` because it tracks entity name (e.g., "userA" or "client1", or default user or client) and we can extract the entity type from that. Consider a scenario where a customer has two quotas: one for `(userA, client1)` and another for `(userA, client2)`. If we only tracked the entity type, removing the quota for one combination (e.g., `(userA, client1)`) could incorrectly affect other combinations of the same type (e.g., `(userA, client2)`). By using `KafkaQuotaEntity`, we make sure that removing one quota does not impact others of the same type. I found this issue while testing scenarios involving duplicate quota types. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Update README.md [kafka]
whybeeh1 opened a new pull request, #19819: URL: https://github.com/apache/kafka/pull/19819 Better readability -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update README.md [kafka]
whybeeh1 closed pull request #19819: Update README.md URL: https://github.com/apache/kafka/pull/19819 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update README.md [kafka]
whybeeh1 commented on PR #19819: URL: https://github.com/apache/kafka/pull/19819#issuecomment-2910613900 cl -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Update README.md [kafka]
whybeeh1 commented on PR #19819: URL: https://github.com/apache/kafka/pull/19819#issuecomment-2910631422 l -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19268 Missing mocks for SharePartitionManagerTest tests [kafka]
AndrewJSchofield merged PR #19786: URL: https://github.com/apache/kafka/pull/19786 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-19268) Missing mocks for SharePartitionManagerTest tests
[ https://issues.apache.org/jira/browse/KAFKA-19268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield resolved KAFKA-19268. -- Fix Version/s: 4.1.0 Resolution: Fixed > Missing mocks for SharePartitionManagerTest tests > - > > Key: KAFKA-19268 > URL: https://issues.apache.org/jira/browse/KAFKA-19268 > Project: Kafka > Issue Type: Sub-task >Reporter: Abhinav Dixit >Assignee: jiseung >Priority: Major > Fix For: 4.1.0 > > > A few tests in SharePartitionManagerTest throw silent exceptions but the > tests pass. This is mainly due to missing mocks in those tests. The following > tests need to be analyzed and fixed - > testAcknowledgeCompletesDelayedShareFetchRequest > testMultipleConcurrentShareFetches > testCachedTopicPartitionsForValidShareSessions > testReleaseSessionCompletesDelayedShareFetchRequest > testReleaseSessionSuccess -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19285: Added more tests in SharePartitionManagerTest [kafka]
AndrewJSchofield commented on PR #19778: URL: https://github.com/apache/kafka/pull/19778#issuecomment-2910658074 @chirag-wadhwa5 Please merge latest changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19292) Introduce soft state for StreamsGroup
[ https://issues.apache.org/jira/browse/KAFKA-19292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-19292: Component/s: streams > Introduce soft state for StreamsGroup > - > > Key: KAFKA-19292 > URL: https://issues.apache.org/jira/browse/KAFKA-19292 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Bill Bejeck >Assignee: Bill Bejeck >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19244: Add support for kafka-streams-groups.sh options (delete group, offset-related APIs) [1/N] [kafka]
aliehsaeedii commented on code in PR #19646: URL: https://github.com/apache/kafka/pull/19646#discussion_r2107042451 ## tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java: ## @@ -330,13 +400,433 @@ Map getOffsets(StreamsGroupDescription description) Map getCommittedOffsets(String groupId) { try { -return adminClient.listConsumerGroupOffsets( -Map.of(groupId, new ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get(); +return adminClient.listStreamsGroupOffsets( +Map.of(groupId, new ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } +Map> resetOffsets() { +Map> result = new HashMap<>(); +List groupIds = listStreamsGroups(); +if (!groupIds.isEmpty()) { +Map> streamsGroups = adminClient.describeStreamsGroups( +groupIds +).describedGroups(); + +streamsGroups.forEach((groupId, groupDescription) -> { +try { +String state = groupDescription.get().groupState().toString(); +switch (state) { +case "Empty": +case "Dead": +result.put(groupId, resetOffsetsForInactiveGroup(groupId)); +break; +default: +printError("Assignments can only be reset if the group '" + groupId + "' is inactive, but the current state is " + state + ".", Optional.empty()); +result.put(groupId, Collections.emptyMap()); +} +} catch (InterruptedException ie) { +throw new RuntimeException(ie); +} catch (ExecutionException ee) { +if (ee.getCause() instanceof GroupIdNotFoundException) { +result.put(groupId, resetOffsetsForInactiveGroup(groupId)); +} else { +throw new RuntimeException(ee); +} +} +}); +} +return result; +} + +private Map resetOffsetsForInactiveGroup(String groupId) { +try { +Collection partitionsToReset = getPartitionsToReset(groupId); +Map preparedOffsets = prepareOffsetsToReset(groupId, partitionsToReset); + +// Dry-run is the default behavior if --execute is not specified +boolean dryRun = opts.options.has(opts.dryRunOpt) || !opts.options.has(opts.executeOpt); +if (!dryRun) { +adminClient.alterStreamsGroupOffsets( +groupId, +preparedOffsets +).all().get(); +} + +return preparedOffsets; +} catch (InterruptedException ie) { +throw new RuntimeException(ie); +} catch (ExecutionException ee) { +Throwable cause = ee.getCause(); +if (cause instanceof KafkaException) { +throw (KafkaException) cause; +} else { +throw new RuntimeException(cause); +} +} +} + +private Collection getPartitionsToReset(String groupId) throws ExecutionException, InterruptedException { +if (opts.options.has(opts.allTopicsOpt)) { +return getCommittedOffsets(groupId).keySet(); +} else if (opts.options.has(opts.topicOpt)) { +List topics = opts.options.valuesOf(opts.topicOpt); +return parseTopicPartitionsToReset(topics); +} else { +if (!opts.options.has(opts.resetFromFileOpt)) +CommandLineUtils.printUsageAndExit(opts.parser, "One of the reset scopes should be defined: --all-topics, --topic."); + +return Collections.emptyList(); +} +} + +private List parseTopicPartitionsToReset(List topicArgs) throws ExecutionException, InterruptedException { +List topicsWithPartitions = new ArrayList<>(); +List topics = new ArrayList<>(); + +topicArgs.forEach(topicArg -> { +if (topicArg.contains(":")) +topicsWithPartitions.add(topicArg); +else +topics.add(topicArg); +}); + +List specifiedPartitions = + topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collect
Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]
lucasbru commented on code in PR #19802: URL: https://github.com/apache/kafka/pull/19802#discussion_r2106982543 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java: ## @@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds( } /** - * @return An immutable map of partition metadata for each topic that are inputs for this streams group. + * @return The metadata hash. */ -public Map partitionMetadata() { -return Collections.unmodifiableMap(partitionMetadata); +public long metadataHash() { +return metadataHash.get(); } /** - * Updates the partition metadata. This replaces the previous one. + * Updates the metadata hash. * - * @param partitionMetadata The new partition metadata. + * @param metadataHash The new metadata hash. */ -public void setPartitionMetadata( -Map partitionMetadata -) { -this.partitionMetadata.clear(); -this.partitionMetadata.putAll(partitionMetadata); -maybeUpdateConfiguredTopology(); -maybeUpdateGroupState(); Review Comment: Could you instead store the group metadata hash along with / inside ConfiguredTopology, and rebuild configuredTopology not only when it is empty, but also when the group metadata hash does not match the current group metadata hash? Also, that would mean, when we call `setTopology` or `setPartitionMetadata` inside `StreamsGroup`, we will not call `maybeUpdateConfiguredTopology`, since it will be only be created in one place - from the `streamsGroupHeartbeat`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta
[ https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-19331: -- Summary: No error handling for leader not appeared in applyLocalFollowersDelta (was: No error handling for leader unregistered in applyLocalFollowersDelta ) > No error handling for leader not appeared in applyLocalFollowersDelta > -- > > Key: KAFKA-19331 > URL: https://issues.apache.org/jira/browse/KAFKA-19331 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Priority: Major > Labels: newbie > > In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from > the leader, we'll check if the leader node info is in metadata image. If > somehow it didn't include in the newImage, we'll log something like: > > {code:java} > [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching > quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader > Some(2) because it is not alive. (state.change.logger) > [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of > become-follower for 1 partitions (state.change.logger) > {code} > > It's confusing to users to see it's unable to fetch, then start fetch. And in > the end, it's not actually fetching... We should handling the error well by > updating the `FailedPartition` and not updating other successful result > status. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Change `Streams group` to `streams group` [kafka]
lucasbru commented on code in PR #19813: URL: https://github.com/apache/kafka/pull/19813#discussion_r2107119425 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -346,18 +346,18 @@ private void optimizeTopology(final Properties props) { (String) props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)); } if (optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) { -LOG.debug("Optimizing the Kafka Streams graph for ktable source nodes"); +LOG.debug("Optimizing the Kafka streams graph for ktable source nodes"); reuseKTableSourceTopics(); } if (optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) { -LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); +LOG.debug("Optimizing the Kafka streams graph for repartition nodes"); mergeRepartitionTopics(); } if (optimizationConfigs.contains(StreamsConfig.SINGLE_STORE_SELF_JOIN)) { -LOG.debug("Optimizing the Kafka Streams graph for self-joins"); +LOG.debug("Optimizing the Kafka streams graph for self-joins"); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); } -LOG.debug("Optimizing the Kafka Streams graph for null-key records"); +LOG.debug("Optimizing the Kafka streams graph for null-key records"); Review Comment: I think even if we want to use "streams" in lower case, we should capitalize "Kafka Streams" IMHO. ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -16021,7 +16021,7 @@ public void testStreamsGroupMemberRequestingShutdownApplication() { .setShutdownApplication(true) ); -String statusDetail = String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.", memberId1); +String statusDetail = String.format("streams group member %s encountered a fatal error and requested a shutdown for the entire application.", memberId1); Review Comment: we shouldn't replace it at the beginning of the line ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2010,7 +2010,7 @@ private CoordinatorResult stream new Status() .setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) .setStatusDetail( -String.format("Streams group member %s encountered a fatal error and requested a shutdown for the entire application.", +String.format("streams group member %s encountered a fatal error and requested a shutdown for the entire application.", Review Comment: we shouldn't replace it at the beginning of the line ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -880,7 +880,7 @@ StreamsGroup getStreamsGroupOrThrow( Group group = groups.get(groupId); if (group == null) { -throw new GroupIdNotFoundException(String.format("Streams group %s not found.", groupId)); +throw new GroupIdNotFoundException(String.format("streams group %s not found.", groupId)); Review Comment: we shouldn't replace it at the beginning of the line ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -236,7 +236,7 @@ public class GroupCoordinatorConfig { public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; /// -/// Streams group configs +/// streams group configs Review Comment: we shouldn't replace it at the beginning of the line ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java: ## @@ -466,7 +466,7 @@ public GroupCoordinatorConfig(AbstractConfig config) { require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs, String.format("%s must be less than %s", SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); -// Streams group configs validation. +// streams group configs validation. Review Comment: we shouldn't replace it at the beginning of the line ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ## @@ -16105,7 +16105,7 @@ public void testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() { .setShutdownApplication(true) ); -String statusDetail = String.format("Streams group member %s encountered a fatal error and requested a shutdown for the
Re: [PR] MINOR: Cleanup JMH-Benchmarks Module [kafka]
sjhajharia commented on PR #19791: URL: https://github.com/apache/kafka/pull/19791#issuecomment-2909416974 Hey @m1a2st I have updated the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
FrankYang0529 commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107245093 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java: ## @@ -151,6 +152,8 @@ public String toLowerCaseString() { */ private final TimelineHashMap resolvedRegularExpressions; +private final AtomicBoolean addSubscriptionMetadataTombstoneRecord = new AtomicBoolean(false); Review Comment: Thanks for the suggestion. I forgot to consider rollback case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
FrankYang0529 commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107242601 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -2220,6 +2227,11 @@ private CoordinatorResult int groupEpoch = group.groupEpoch(); SubscriptionType subscriptionType = group.subscriptionType(); +if (group.addSubscriptionMetadataTombstoneRecord()) { + records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)); +group.setAddSubscriptionMetadataTombstoneRecord(false); Review Comment: The `updateSubscriptionMetadata` is a good place to add this record. We always call `updateSubscriptionMetadata` when metadata image is expired. That means the group can add the record after bumping to 4.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18904: kafka-configs.sh return resource doesn't exist message [3/N] [kafka]
FrankYang0529 commented on code in PR #19808: URL: https://github.com/apache/kafka/pull/19808#discussion_r2107254802 ## core/src/main/scala/kafka/admin/ConfigCommand.scala: ## @@ -342,6 +342,42 @@ object ConfigCommand extends Logging { } private def describeResourceConfig(adminClient: Admin, entityType: String, entityName: Option[String], describeAll: Boolean): Unit = { +if (!describeAll) { + entityName.foreach { name => +entityType match { + case TopicType => +Topic.validate(name) +if (!adminClient.listTopics(new ListTopicsOptions().listInternal(true)).names.get.contains(name)) { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case BrokerType | BrokerLoggerConfigType => +if (adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) { + // valid broker id +} else if (name == BrokerDefaultEntityName) { + // default broker configs +} else { + System.out.println(s"The $entityType '$name' doesn't exist and doesn't have dynamic config.") + return +} + case ClientMetricsType => +if (adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS), new ListConfigResourcesOptions).all.get Review Comment: I prefer to leave this as `java.util.xxx` because we already used this pattern like: https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/core/src/main/scala/kafka/admin/ConfigCommand.scala#L311 We can do some refactor for Scala code in core module after this PR. Or we can refactor it when migrating to Java eventually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15283:[1/N] Client support for OffsetCommit with topic ID [kafka]
DL1231 commented on PR #19577: URL: https://github.com/apache/kafka/pull/19577#issuecomment-2909614033 @lianetm @dajac, PTAL when you get a chance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19332) Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck
Shivsundar R created KAFKA-19332: Summary: Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck Key: KAFKA-19332 URL: https://issues.apache.org/jira/browse/KAFKA-19332 Project: Kafka Issue Type: Sub-task Reporter: Shivsundar R The test has been flaky in AK builds - [https://develocity.apache.org/scans/tests?search.names=CI%20workflow%2CGit%20repository&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=github%2Ctrunk&search.tasks=test&search.timeZoneId=Asia%2FCalcutta&search.values=CI%2Chttps:%2F%2Fgithub.com%2Fapache%2Fkafka&tests.container=org.apache.kafka.clients.consumer.ShareConsumerTest&tests.test=testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck()%5B2%5D] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19333) Inconsistent behavior between `ConsumerMembershipManager` and `StreamsMembershipManager` on `onAllTasksLost` execution
Nick Guo created KAFKA-19333: Summary: Inconsistent behavior between `ConsumerMembershipManager` and `StreamsMembershipManager` on `onAllTasksLost` execution Key: KAFKA-19333 URL: https://issues.apache.org/jira/browse/KAFKA-19333 Project: Kafka Issue Type: Improvement Reporter: Nick Guo Assignee: Nick Guo `ConsumerMembershipManager` does not create an event to run a callback if there is nothing to revoke,but `StreamsMembershipManager` does. related discussion and pr: discussion:[https://github.com/apache/kafka/pull/18551/files#r2106243432] pr: https://github.com/apache/kafka/pull/19779 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-19290: Exploit mapKey optimisation in protocol requests and responses (wip) [kafka]
FrankYang0529 opened a new pull request, #19815: URL: https://github.com/apache/kafka/pull/19815 The mapKey optimisation can be used in some KIP-932 RPC schemas to improve efficiency of some key-based accesses. For ShareFetch, ShareAcknowledge, ShareGroupHeartbeat and ShareGroupDescribe, the v0 RPCs are already released and cannot be retrospectively optimized. This PR changes following RPCs: * DeleteShareGroupOffsetsRequest * DeleteShareGroupStateRequest * DescribeShareGroupOffsetsRequest * InitializeShareGroupStateRequest * ReadShareGroupStateRequest * ReadShareGroupStateSummaryRequest * WriteShareGroupStateRequest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19300: AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException [kafka]
Rancho-7 commented on code in PR #19779: URL: https://github.com/apache/kafka/pull/19779#discussion_r2107277009 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ## @@ -443,7 +443,7 @@ public void transitionToFatal() { log.error("Member {} with epoch {} transitioned to fatal state", memberId, memberEpoch); notifyEpochChange(Optional.empty()); -if (previousState == MemberState.UNSUBSCRIBED) { +if (previousState == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) { Review Comment: Sounds good, I have opened https://issues.apache.org/jira/browse/KAFKA-19333 to track this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19332) Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck and testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck
[ https://issues.apache.org/jira/browse/KAFKA-19332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhinav Dixit updated KAFKA-19332: -- Summary: Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck and testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck (was: Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck) > Fix flaky test : > testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck and > testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck > --- > > Key: KAFKA-19332 > URL: https://issues.apache.org/jira/browse/KAFKA-19332 > Project: Kafka > Issue Type: Sub-task >Reporter: Shivsundar R >Priority: Major > > The test has been flaky in AK builds - > [https://develocity.apache.org/scans/tests?search.names=CI%20workflow%2CGit%20repository&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=github%2Ctrunk&search.tasks=test&search.timeZoneId=Asia%2FCalcutta&search.values=CI%2Chttps:%2F%2Fgithub.com%2Fapache%2Fkafka&tests.container=org.apache.kafka.clients.consumer.ShareConsumerTest&tests.test=testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck()%5B2%5D] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid full metadata requests on broker-side regex [kafka]
lianetm opened a new pull request, #19814: URL: https://github.com/apache/kafka/pull/19814 This PR uses topic IDs received in assignment (under new protocol) to ensure that only these assigned topics are included in the consumer metadata requests performed when the user subscribes to broker-side regex (RE2J) With the changes we also end up fixing another issue (KAFKA-18729) that aimed to avoid iterating the full set of assigned partitions when checking if a topic should be retained from the metadata response when using RE2J. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
FrankYang0529 commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107213749 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid( * @return The hash of the group. */ static long computeGroupHash(Map topicHashes) { -if (topicHashes.isEmpty()) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(); +for (Map.Entry entry : topicHashes.entrySet()) { +// Filter out entries with a hash value of 0, which indicates no topic +if (entry.getValue() != 0) { Review Comment: We use `computeSubscribedTopicNames` to get `subscribedTopicNames` and use the result to calculate topic hashes which group wants to subscribe to. However, the `computeSubscribedTopicNames` doesn't check whether a topic is really existent. If a group subscribes to a non-existent topic, the topic is in `subscribedTopicNames` and the `computeMetadataHash` uses non-existent topic hash as part of metadata hash. A sample case is `testSubscriptionMetadataRefreshedAgainAfterWriteFailure`. The group subscribes to `foo` and `bar`, but `bar` topic is not in metadata image. https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java#L496-L507 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]
dajac commented on code in PR #19761: URL: https://github.com/apache/kafka/pull/19761#discussion_r2107327586 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata( numMembers ); -if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { +if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { -log.debug("[GroupId {}] Computed new subscription metadata: {}.", -groupId, subscriptionMetadata); +log.debug("[GroupId {}] Computed new metadata hash: {}.", +groupId, groupMetadataHash); } bumpGroupEpoch = true; -records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); } if (bumpGroupEpoch) { groupEpoch += 1; -records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); -log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); +records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); +log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); +if (group.addSubscriptionMetadataTombstoneRecord()) { Review Comment: nit: I wonder whether we should call it `hasSubscriptionMetadataRecord()`. What do you think? I also suggest to add a comment explaining this block. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -490,6 +490,11 @@ GroupMetadataManager build() { */ private MetadataImage metadataImage; +/** + * The topic hash value by topic name. + */ Review Comment: It would be great if we could expand the comment to explain how we maintain this cache. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java: ## @@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid( * @return The hash of the group. */ static long computeGroupHash(Map topicHashes) { -if (topicHashes.isEmpty()) { +// Sort entries by topic name +List> sortedEntries = new ArrayList<>(); +for (Map.Entry entry : topicHashes.entrySet()) { +// Filter out entries with a hash value of 0, which indicates no topic +if (entry.getValue() != 0) { Review Comment: Sure, I understand that we may have nonexistent topics here. I was more trying to understand whether having those zeros in the final hash was an issue. I suppose that you're saying that it is an issue. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata( numMembers ); -if (!subscriptionMetadata.equals(group.subscriptionMetadata())) { +if (groupMetadataHash != group.metadataHash()) { if (log.isDebugEnabled()) { -log.debug("[GroupId {}] Computed new subscription metadata: {}.", -groupId, subscriptionMetadata); +log.debug("[GroupId {}] Computed new metadata hash: {}.", +groupId, groupMetadataHash); } bumpGroupEpoch = true; -records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata)); } if (bumpGroupEpoch) { groupEpoch += 1; -records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0)); -log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch); +records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, groupMetadataHash)); +log.info("[GroupId {}] Bumped group epoch to {} with metadata hash {}.", groupId, groupEpoch, groupMetadataHash); metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME); } group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); +if (group.addSubscriptionMetadataTombstoneRecord()) { + records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId)); +group.setAddSubscriptionMetadataTombstoneRecord(false); Review Comment: We should remove this as it will be updated when the record is replayed. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java: ## @@ -398,6 +423,21 @@ public Map computeSu
Re: [PR] KAFKA-2526: command line --producer-property wins [kafka]
jkt628 commented on PR #16492: URL: https://github.com/apache/kafka/pull/16492#issuecomment-2909788972 note `test for client.id` moved to https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org