[jira] [Assigned] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta

2025-05-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-19331:
-

Assignee: Ming-Yen Chung

> No error handling for leader not appeared in applyLocalFollowersDelta 
> --
>
> Key: KAFKA-19331
> URL: https://issues.apache.org/jira/browse/KAFKA-19331
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Ming-Yen Chung
>Priority: Major
>  Labels: newbie
>
> In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from 
> the leader, we'll check if the leader node info is in metadata image. If 
> somehow it didn't include in the newImage, we'll log something like:
>  
> {code:java}
> [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching 
> quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader 
> Some(2) because it is not alive. (state.change.logger)
> [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of 
> become-follower for 1 partitions (state.change.logger)
> {code}
>  
> It's confusing to users to see it's unable to fetch, then start fetch. And in 
> the end, it's not actually fetching... We should handling the error well by 
> updating the `FailedPartition` and not updating other successful result 
> status.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]

2025-05-26 Thread via GitHub


dajac commented on code in PR #19790:
URL: https://github.com/apache/kafka/pull/19790#discussion_r2106896835


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2485,8 +2485,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 // Test offset deletion while consuming
 val offsetDeleteResult = 
client.deleteConsumerGroupOffsets(testGroupId, util.Set.of(tp1, tp2))
 
-// Top level error will equal to the first partition level error
-assertFutureThrows(classOf[GroupSubscribedToTopicException], 
offsetDeleteResult.all())

Review Comment:
   It seems that we can revert this change now.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -1576,6 +1581,7 @@ public void testFromClassicGroup() {
 assertEquals(expectedConsumerGroup.groupEpoch(), 
consumerGroup.groupEpoch());
 assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
 assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
+assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), 
Map.copyOf(consumerGroup.subscriptionMetadata()));

Review Comment:
   Is `Map.copyOf` really necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19331) No error handling for leader unregistered in applyLocalFollowersDelta

2025-05-26 Thread Luke Chen (Jira)
Luke Chen created KAFKA-19331:
-

 Summary: No error handling for leader unregistered in 
applyLocalFollowersDelta 
 Key: KAFKA-19331
 URL: https://issues.apache.org/jira/browse/KAFKA-19331
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen


In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from 
the leader, we'll check if the leader node info is in metadata image. If 
somehow it didn't include in the newImage, we'll log something like:

 
{code:java}
[2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching 
quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader 
Some(2) because it is not alive. (state.change.logger)
[2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of 
become-follower for 1 partitions (state.change.logger)
{code}
 

It's confusing to users to see it's unable to fetch, then start fetch. And in 
the end, it's not actually fetching... We should handling the error well by 
updating the `FailedPartition` and not updating other successful result status.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19331) No error handling for leader unregistered in applyLocalFollowersDelta

2025-05-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-19331:
--
Labels: newbie  (was: )

> No error handling for leader unregistered in applyLocalFollowersDelta 
> --
>
> Key: KAFKA-19331
> URL: https://issues.apache.org/jira/browse/KAFKA-19331
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie
>
> In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from 
> the leader, we'll check if the leader node info is in metadata image. If 
> somehow it didn't include in the newImage, we'll log something like:
>  
> {code:java}
> [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching 
> quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader 
> Some(2) because it is not alive. (state.change.logger)
> [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of 
> become-follower for 1 partitions (state.change.logger)
> {code}
>  
> It's confusing to users to see it's unable to fetch, then start fetch. And in 
> the end, it's not actually fetching... We should handling the error well by 
> updating the `FailedPartition` and not updating other successful result 
> status.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta

2025-05-26 Thread Ming-Yen Chung (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954040#comment-17954040
 ] 

Ming-Yen Chung commented on KAFKA-19331:


Hi [~showuon] , could you assign this ticket to me?

> No error handling for leader not appeared in applyLocalFollowersDelta 
> --
>
> Key: KAFKA-19331
> URL: https://issues.apache.org/jira/browse/KAFKA-19331
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie
>
> In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from 
> the leader, we'll check if the leader node info is in metadata image. If 
> somehow it didn't include in the newImage, we'll log something like:
>  
> {code:java}
> [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching 
> quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader 
> Some(2) because it is not alive. (state.change.logger)
> [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of 
> become-follower for 1 partitions (state.change.logger)
> {code}
>  
> It's confusing to users to see it's unable to fetch, then start fetch. And in 
> the end, it's not actually fetching... We should handling the error well by 
> updating the `FailedPartition` and not updating other successful result 
> status.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-19330: Change MockSerializer/Deserializer to use String serializer instead of byte[] [kafka]

2025-05-26 Thread via GitHub


mingyen066 opened a new pull request, #19812:
URL: https://github.com/apache/kafka/pull/19812

   While rewriting `EndToEndClusterIdTest` in Java (#19741 ), I found that the 
test uses `MockInterceptor` and `MockSerializer `together. However, 
`MockSerializer` was using a `byte[]` serializer, while `MockInterceptor` 
expected a `String` serializer, leading to a `ClassCastException`.
   
   I chose to update `MockSerializer` to use String, as it is used less 
frequently than the interceptor. Using String also simplifies the code by 
avoiding the need to write expressions like "value".getBytes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]

2025-05-26 Thread via GitHub


lucasbru commented on code in PR #19802:
URL: https://github.com/apache/kafka/pull/19802#discussion_r2106715246


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds(
 }
 
 /**
- * @return An immutable map of partition metadata for each topic that are 
inputs for this streams group.
+ * @return The metadata hash.
  */
-public Map partitionMetadata() {
-return Collections.unmodifiableMap(partitionMetadata);
+public long metadataHash() {
+return metadataHash.get();
 }
 
 /**
- * Updates the partition metadata. This replaces the previous one.
+ * Updates the metadata hash.
  *
- * @param partitionMetadata The new partition metadata.
+ * @param metadataHash The new metadata hash.
  */
-public void setPartitionMetadata(
-Map partitionMetadata
-) {
-this.partitionMetadata.clear();
-this.partitionMetadata.putAll(partitionMetadata);
-maybeUpdateConfiguredTopology();
-maybeUpdateGroupState();

Review Comment:
   ConfiguredTopology can be derived completely from the records for 
`PartitionMetadata` and `Topology`. So we just need to cache it, and can 
recreate it upon the first heartbeat after failover. We could consider 
persisting it, but that would just mean storing somewhat duplicate data. 
`state` is also derived from the other records.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2106927220


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -550,6 +556,7 @@ private GroupMetadataManager(
 this.shareGroupAssignor = shareGroupAssignor;
 this.authorizerPlugin = authorizerPlugin;
 this.streamsGroupAssignors = 
streamsGroupAssignors.stream().collect(Collectors.toMap(TaskAssignor::name, 
Function.identity()));
+this.topicHashCache = new ConcurrentHashMap<>();

Review Comment:
   nit: We can use a regular HashMap here because the GroupMetadataManager is 
never used concurrently.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -151,6 +152,8 @@ public String toLowerCaseString() {
  */
 private final TimelineHashMap 
resolvedRegularExpressions;
 
+private final AtomicBoolean addSubscriptionMetadataTombstoneRecord = new 
AtomicBoolean(false);

Review Comment:
   We should also use a timeline data structure for this one.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##
@@ -398,6 +423,21 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+public long computeMetadataHash(

Review Comment:
   nit: This could be a static method.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
  * @return The hash of the group.
  */
 static long computeGroupHash(Map topicHashes) {
-if (topicHashes.isEmpty()) {
+// Sort entries by topic name
+List> sortedEntries = new ArrayList<>();
+for (Map.Entry entry : topicHashes.entrySet()) {
+// Filter out entries with a hash value of 0, which indicates no 
topic
+if (entry.getValue() != 0) {

Review Comment:
   I wonder whether it is really necessary to ignore those. Having a zero does 
not hurt, isn't it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##
@@ -398,6 +423,21 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+public long computeMetadataHash(
+Map subscribedTopicNames,
+Map topicHashCache,
+MetadataImage metadataImage
+) {
+Map topicHash = new 
HashMap<>(subscribedTopicNames.size());
+subscribedTopicNames.keySet().forEach(topicName -> {

Review Comment:
   nit: We can remove the curly braces.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2220,6 +2227,11 @@ private 
CoordinatorResult
 int groupEpoch = group.groupEpoch();
 SubscriptionType subscriptionType = group.subscriptionType();
 
+if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+group.setAddSubscriptionMetadataTombstoneRecord(false);

Review Comment:
   I wonder whether we should put this code into updateSubscriptionMetadata to 
ensure that we do this in all the place. Have you considered it?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2220,6 +2227,11 @@ private 
CoordinatorResult
 int groupEpoch = group.groupEpoch();
 SubscriptionType subscriptionType = group.subscriptionType();
 
+if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+group.setAddSubscriptionMetadataTombstoneRecord(false);

Review Comment:
   This is not the proper way to do this. In general, we never mutate the state 
from here. We should move it to the replay method of the tombstone.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18904: kafka-configs.sh return resource doesn't exist message [3/N] [kafka]

2025-05-26 Thread via GitHub


DL1231 commented on code in PR #19808:
URL: https://github.com/apache/kafka/pull/19808#discussion_r2106900047


##
clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java:
##
@@ -163,6 +164,15 @@ public static ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult(Stri
 return new 
ListConsumerGroupOffsetsResult(Collections.singletonMap(CoordinatorKey.byGroupId(group),
 future));
 }
 
+public static ListConfigResourcesResult 
listConfigResourcesResult(Map> resourceNames) {
+Collection resources = 
resourceNames.entrySet().stream()
+.flatMap(entry -> entry.getValue().stream()
+.map(name -> new ConfigResource(entry.getKey(), name)))
+.collect(Collectors.toList());
+return new 
ListConfigResourcesResult(KafkaFuture.completedFuture(resources));
+

Review Comment:
   nit: extra blank



##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -342,6 +342,42 @@ object ConfigCommand extends Logging {
   }
 
   private def describeResourceConfig(adminClient: Admin, entityType: String, 
entityName: Option[String], describeAll: Boolean): Unit = {
+if (!describeAll) {
+  entityName.foreach { name =>
+entityType match {
+  case TopicType =>
+Topic.validate(name)
+if (!adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names.get.contains(name)) {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case BrokerType | BrokerLoggerConfigType =>
+if 
(adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) {
+  // valid broker id
+} else if (name == BrokerDefaultEntityName) {
+  // default broker configs
+} else {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case ClientMetricsType =>
+if 
(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS),
 new ListConfigResourcesOptions).all.get

Review Comment:
   nit: Can't this just be util.Set.of?



##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -342,6 +342,42 @@ object ConfigCommand extends Logging {
   }
 
   private def describeResourceConfig(adminClient: Admin, entityType: String, 
entityName: Option[String], describeAll: Boolean): Unit = {
+if (!describeAll) {
+  entityName.foreach { name =>
+entityType match {
+  case TopicType =>
+Topic.validate(name)
+if (!adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names.get.contains(name)) {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case BrokerType | BrokerLoggerConfigType =>
+if 
(adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) {
+  // valid broker id
+} else if (name == BrokerDefaultEntityName) {
+  // default broker configs
+} else {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case ClientMetricsType =>
+if 
(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS),
 new ListConfigResourcesOptions).all.get
+  .stream.noneMatch(_.name == name)) {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case GroupType =>
+if (adminClient.listGroups().all.get.stream.noneMatch(_.groupId() 
== name) &&
+  
adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.GROUP), 
new ListConfigResourcesOptions).all.get

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19268 Missing mocks for SharePartitionManagerTest tests [kafka]

2025-05-26 Thread via GitHub


adixitconfluent commented on PR #19786:
URL: https://github.com/apache/kafka/pull/19786#issuecomment-2909105133

   @AndrewJSchofield can you please review this PR as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-19259) Async consumer fetch intermittent delays on console consumer

2025-05-26 Thread Arpit Goyal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17953933#comment-17953933
 ] 

Arpit Goyal edited comment on KAFKA-19259 at 5/26/25 8:20 AM:
--

[~lianetm] I was just going through the code and  I think this can be the issue 
 though I am not able to reproduce it.
1. Application thread   checks empty buffer and is about to wait
2. Consumer Network thread adds data and signals
3. Application thread misses the non-empty state check but hasn't started 
waiting yet
4. Application thread  then waits for the full timeout despite data being 
available. 

[~lianetm] My bad. This will not be possible as both share the same lock.





was (Author: JIRAUSER301926):
[~lianetm] I was just going through the code and  I think this can be the issue 
 though I am not able to reproduce it.
1. Application thread   checks empty buffer and is about to wait
2. Consumer Network thread adds data and signals
3. Application thread misses the non-empty state check but hasn't started 
waiting yet
4. Application thread  then waits for the full timeout despite data being 
available. 




> Async consumer fetch intermittent delays on console consumer
> 
>
> Key: KAFKA-19259
> URL: https://issues.apache.org/jira/browse/KAFKA-19259
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 4.0.0
>Reporter: Lianet Magrans
>Assignee: Arpit Goyal
>Priority: Major
> Fix For: 4.1.0
>
>
> We noticed that fetching with the kafka-console-consumer.sh tool using the 
> new consumer shows some intermittent delays, that are not seen when running 
> the same with the classic consumer. Note that I disabled auto-commit to 
> isolate the delay, and from a first look seems to come from the 
> fetchBuffer.awaitNonEmpty logic, that alternatively takes almost the full 
> poll timeout (runs "fast", then "slow", and continues to alternate)
> [https://github.com/apache/kafka/blob/0b81d6c7802c1be55dc823ce51729f2c6a6071a7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1808]
>   
> The difference in behaviour between the 2 consumers can be seen with this 
> setup:
>  * topic with 6 partitions (I tried with 1 partition first and didn't see the 
> delay, then with 3 and 6 I could see it) 
>  * data populated in topic with producer sending generated uuids to the topic 
> in while loop 
>  * run console consumer (asycn) no commit:
> bin/kafka-console-consumer.sh --topic t1 --bootstrap-server localhost:9092 
> --consumer-property group.protocol=consumer --group cg1 --consumer-property 
> enable.auto.commit=false
> Here we can notice the pattern that looks like batches, and custom logs on 
> the awaitNonEmpty show it take the full poll timeout on alternate poll 
> iterations.
>  * run same but for classic consumer (consumer-property 
> group.protocol=classic) -> not such pattern of intermittent delays
> Produce continuously (I used this) 
> while sleep 1; do echo $(uuidgen); done | bin/kafka-console-producer.sh 
> --bootstrap-server localhost:9092 --topic t1
> This needs more investigation to fully understand if it's indeed something in 
> the fetch path or something else) 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta

2025-05-26 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954042#comment-17954042
 ] 

Luke Chen commented on KAFKA-19331:
---

Assigned to you. Thanks [~mingyen066] !

> No error handling for leader not appeared in applyLocalFollowersDelta 
> --
>
> Key: KAFKA-19331
> URL: https://issues.apache.org/jira/browse/KAFKA-19331
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Ming-Yen Chung
>Priority: Major
>  Labels: newbie
>
> In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from 
> the leader, we'll check if the leader node info is in metadata image. If 
> somehow it didn't include in the newImage, we'll log something like:
>  
> {code:java}
> [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching 
> quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader 
> Some(2) because it is not alive. (state.change.logger)
> [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of 
> become-follower for 1 partitions (state.change.logger)
> {code}
>  
> It's confusing to users to see it's unable to fetch, then start fetch. And in 
> the end, it's not actually fetching... We should handling the error well by 
> updating the `FailedPartition` and not updating other successful result 
> status.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group [kafka]

2025-05-26 Thread via GitHub


dajac commented on PR #19796:
URL: https://github.com/apache/kafka/pull/19796#issuecomment-2909150320

   This one should basically be a copy of 
https://github.com/apache/kafka/pull/19761. Let's merge that one first and then 
we can review this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [11/N] move ConsumerWithLegacyMessageFormatIntegrationTest to clients-integration-tests module [kafka]

2025-05-26 Thread via GitHub


m1a2st commented on code in PR #19810:
URL: https://github.com/apache/kafka/pull/19810#discussion_r2107996769


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerWithLegacyMessageFormatIntegrationTest.java:
##
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.record.AbstractRecords;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RecordVersion;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+
+@ClusterTestDefaults(
+brokers = 3
+)
+public class ConsumerWithLegacyMessageFormatIntegrationTest {
+
+private final ClusterInstance cluster;
+
+private final String topic1 = "part-test-topic-1";
+private final String topic2 = "part-test-topic-2";
+private final String topic3 = "part-test-topic-3";
+
+private final TopicPartition t1p0 = new TopicPartition(topic1, 0);
+private final TopicPartition t1p1 = new TopicPartition(topic1, 1);
+private final TopicPartition t2p0 = new TopicPartition(topic2, 0);
+private final TopicPartition t2p1 = new TopicPartition(topic2, 1);
+private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
+private final TopicPartition t3p1 = new TopicPartition(topic3, 1);
+
+public ConsumerWithLegacyMessageFormatIntegrationTest(ClusterInstance 
cluster) {
+this.cluster = cluster;
+}
+
+private void appendLegacyRecords(int numRecords, TopicPartition tp, int 
brokerId, byte magicValue) {
+List records = new ArrayList<>();
+for (int i = 0; i < numRecords; i++) {
+records.add(new SimpleRecord(i, ("key " + i).getBytes(), ("value " 
+ i).getBytes()));
+}
+
+ByteBuffer buffer = 
ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue,
+CompressionType.NONE, records));
+MemoryRecordsBuilder builder = MemoryRecords.builder(
+buffer,
+magicValue,
+Compression.of(CompressionType.NONE).build(),
+TimestampType.CREATE_TIME,
+0L,
+RecordBatch.NO_TIMESTAMP,
+RecordBatch.NO_PRODUCER_ID,
+RecordBatch.NO_PRODUCER_EPOCH,
+0,
+false,
+RecordBatch.NO_PARTITION_LEADER_EPOCH
+);
+
+records.forEach(builder::append);
+
+cluster.brokers().values().stream()
+.filter(b -> b.config().brokerId() == brokerId)
+.forEach(b -> {
+UnifiedLog unifiedLog = 
b.replicaManager().logManager().getLog(tp, false).get();
+
unifiedLog.appendAsLeaderWithRecordVersion(builder.build(), 0, 
RecordVersion.lookup(magicValue));
+// Default isolation.level is read_uncommitted. It makes 
Partition#fetchOffsetForTim

[jira] [Commented] (KAFKA-19336) Upgrade jackson libs since v2.16 is not maintained anymore

2025-05-26 Thread Szu-Yung Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954207#comment-17954207
 ] 

Szu-Yung Wang commented on KAFKA-19336:
---

Hi, May I take this?

> Upgrade jackson libs since v2.16 is not maintained anymore
> --
>
> Key: KAFKA-19336
> URL: https://issues.apache.org/jira/browse/KAFKA-19336
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Priority: Major
>
> From jackson doc: [https://github.com/FasterXML/jackson/wiki/Jackson-Releases]
> > [2.16 
> >|https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.16]Closed in 
> >April 2025 with release of 2.19.0
> We should upgrade jackson libs to newer version. From the doc, it should be 
> backward compatible if we choose the latest 2.19.0.
>  
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.17]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.18]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.19]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-19310: (MINOR) Missing mocks for DelayedShareFetchTest tests related to Memory Records slicing [kafka]

2025-05-26 Thread via GitHub


adixitconfluent opened a new pull request, #19823:
URL: https://github.com/apache/kafka/pull/19823

   ### About
   Added test memory records to avoid the silent exception thrown during 
slicing.
   
   ### Testing
   Ran the tests of `DelayedShareFetchTest` to make sure that there is no 
silent exception in any test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]

2025-05-26 Thread via GitHub


ShivsundarR commented on code in PR #19818:
URL: https://github.com/apache/kafka/pull/19818#discussion_r2108331701


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java:
##
@@ -561,10 +568,42 @@ public void 
testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) {
 
 assertEquals(state, membershipManager.state());
 verify(responseData, never()).memberId();
-verify(responseData, never()).memberEpoch();
+// In unsubscribed, we check if we received a leave group response, so 
we do verify member epoch.
+if (state != MemberState.UNSUBSCRIBED) {
+verify(responseData, never()).memberEpoch();
+}
 verify(responseData, never()).assignment();
 }
 
+@Test
+public void testIgnoreLeaveResponseWhenNotLeavingGroup() {
+ShareMembershipManager membershipManager = createMemberInStableState();
+
+CompletableFuture leaveResult = membershipManager.leaveGroup();
+
+// Send leave request, transitioning to UNSUBSCRIBED state
+membershipManager.onHeartbeatRequestGenerated();
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+// Receive a previous heartbeat response, which should be ignored

Review Comment:
   This response is not "ignored" right, we are setting the member epoch in the 
response to `membershipManager.memberEpoch` (-1 in this case). The leave 
operation is then processed successfully post which we are asserting if 
`leaveResult` has completed.
   If this is the case, can we change the comment above to reflect the same. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]

2025-05-26 Thread via GitHub


dajac commented on code in PR #19790:
URL: https://github.com/apache/kafka/pull/19790#discussion_r2108336679


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -1576,6 +1581,7 @@ public void testFromClassicGroup() {
 assertEquals(expectedConsumerGroup.groupEpoch(), 
consumerGroup.groupEpoch());
 assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
 assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
+assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), 
Map.copyOf(consumerGroup.subscriptionMetadata()));

Review Comment:
   Ah, I got it. The issue is that TimelineHashMap does not implement toString. 
Hence when the comparison fails, you get a cryptic error:
   
   ```
   Expected :org.apache.kafka.timeline.TimelineHashMap@17c13
   Actual   :org.apache.kafka.timeline.TimelineHashMap@308d9
   ```
   
   However, it still uses `.equals` to compare the objects. I am fine with 
keeping the copy to make it better here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]

2025-05-26 Thread via GitHub


ShivsundarR commented on code in PR #19818:
URL: https://github.com/apache/kafka/pull/19818#discussion_r2108331701


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java:
##
@@ -561,10 +568,42 @@ public void 
testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) {
 
 assertEquals(state, membershipManager.state());
 verify(responseData, never()).memberId();
-verify(responseData, never()).memberEpoch();
+// In unsubscribed, we check if we received a leave group response, so 
we do verify member epoch.
+if (state != MemberState.UNSUBSCRIBED) {
+verify(responseData, never()).memberEpoch();
+}
 verify(responseData, never()).assignment();
 }
 
+@Test
+public void testIgnoreLeaveResponseWhenNotLeavingGroup() {
+ShareMembershipManager membershipManager = createMemberInStableState();
+
+CompletableFuture leaveResult = membershipManager.leaveGroup();
+
+// Send leave request, transitioning to UNSUBSCRIBED state
+membershipManager.onHeartbeatRequestGenerated();
+assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+
+// Receive a previous heartbeat response, which should be ignored

Review Comment:
   This response is not "ignored" right, we are setting the member epoch in the 
response to `membershipManager.memberEpoch` (-1 in this case). The leave 
operation is then processed successfully post which we are asserting if 
`leaveResult` has completed.
   If this is the case, can we change the comment above to reflect the same. 
(and in the other tests too).
   Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19334 MetadataShell execution unintentionally deletes lock file [kafka]

2025-05-26 Thread via GitHub


frankvicky commented on PR #19817:
URL: https://github.com/apache/kafka/pull/19817#issuecomment-2911237948

   @cmccabe: Could you please take a look when you have a free moment?
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module [kafka]

2025-05-26 Thread via GitHub


brandboat commented on code in PR #19773:
URL: https://github.com/apache/kafka/pull/19773#discussion_r2108051296


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+types = {Type.KRAFT},
+brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+}
+)
+public class PlaintextConsumerAssignTest {
+
+public static final int BROKER_COUNT = 3;
+
+private final ClusterInstance clusterInstance;
+private final String topic = "topic";
+private final int partition = 0;
+TopicPartition tp = new TopicPartition(topic, partition);
+
+PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
+
+@BeforeEach
+public void setup() throws InterruptedException {
+clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2);

Review Comment:
   I think the BROKER_COUNT should be replica count, right?
   ```suggestion
   clusterInstance.createTopic(topic, 2, (short) BROKER_COUNT);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module [kafka]

2025-05-26 Thread via GitHub


TaiJuWu commented on code in PR #19773:
URL: https://github.com/apache/kafka/pull/19773#discussion_r2108058182


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerAssignTest.java:
##
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.clients.ClientsTestUtils;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Integration tests for the consumer that covers logic related to manual 
assignment.
+ */
+@ClusterTestDefaults(
+types = {Type.KRAFT},
+brokers = PlaintextConsumerAssignTest.BROKER_COUNT,
+serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "3"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 
value = "100"),
+@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "10"),
+}
+)
+public class PlaintextConsumerAssignTest {
+
+public static final int BROKER_COUNT = 3;
+
+private final ClusterInstance clusterInstance;
+private final String topic = "topic";
+private final int partition = 0;
+TopicPartition tp = new TopicPartition(topic, partition);
+
+PlaintextConsumerAssignTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
+
+@BeforeEach
+public void setup() throws InterruptedException {
+clusterInstance.createTopic(topic, BROKER_COUNT, (short) 2);

Review Comment:
   Fix it, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [10/N] Move PlaintextConsumerAssignTest to clients-integration-tests module [kafka]

2025-05-26 Thread via GitHub


TaiJuWu commented on PR #19773:
URL: https://github.com/apache/kafka/pull/19773#issuecomment-2910955102

   Hi @brandboat , thanks for your detailed review and point out my mistakes, 
all comments are addressed, PTAL.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7516: Attempt to dynamically load ManagementFactory [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #19764:
URL: https://github.com/apache/kafka/pull/19764#issuecomment-2910989098

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Improve the `KAFKA_HEAP_OPTS` definition while run `kafka-server-start.bat` Batch [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #19703:
URL: https://github.com/apache/kafka/pull/19703#issuecomment-2910989162

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Kafka 18913/cleanup state updater on failure [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #19750:
URL: https://github.com/apache/kafka/pull/19750#issuecomment-2910989126

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107945239


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -20635,12 +21067,10 @@ barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3)
 ),
 // Remove regex.
 
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")),
-// Updated subscription metadata.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of(
-barTopicName, new TopicMetadata(barTopicId, barTopicName, 
3)
-))),
 // Bumped epoch.
-
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
0))
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11, 
computeGroupHash(Map.of(
+barTopicName, barTopicHash
+
 ),
 result.records()

Review Comment:
   Thanks for the suggestion. I add test to following cases:
   
   * update: testNewRacksDataInMetadataImageTriggersEpochBump
   * remove: testRemoveTopicCleanupTopicHash



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107946290


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##
@@ -398,6 +423,21 @@ public Map 
computeSubscriptionMetadata(
 return Collections.unmodifiableMap(newSubscriptionMetadata);
 }
 
+public static long computeMetadataHash(
+Map subscribedTopicNames,
+Map topicHashCache,
+MetadataImage metadataImage
+) {
+Map topicHash = new 
HashMap<>(subscribedTopicNames.size());
+subscribedTopicNames.keySet().forEach(topicName ->
+topicHash.put(
+topicName,
+topicHashCache.computeIfAbsent(topicName, k -> 
Utils.computeTopicHash(topicName, metadataImage))

Review Comment:
   Good catch! I think we can ignore topic if related `TopicImage` is not 
existent, so we can revert change in `Utils#computeGroupHash`. I also add 
related unit tests in `ConsumerGroupTest.java`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107946911


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
  * @return The hash of the group.
  */
 static long computeGroupHash(Map topicHashes) {
-if (topicHashes.isEmpty()) {
+// Sort entries by topic name
+List> sortedEntries = new ArrayList<>();
+for (Map.Entry entry : topicHashes.entrySet()) {
+// Filter out entries with a hash value of 0, which indicates no 
topic
+if (entry.getValue() != 0) {

Review Comment:
   Removed related change here, because we can ignore null `TopicImage` in 
`ModernGroup#computeMetadataHash`. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18379: Enforce resigned cannot transition to any other state in same epoch [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #19236:
URL: https://github.com/apache/kafka/pull/19236#issuecomment-2910989346

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] WIP: 4.0 Markdown docs [kafka]

2025-05-26 Thread via GitHub


hvishwanath opened a new pull request, #19821:
URL: https://github.com/apache/kafka/pull/19821

   4.0 Markdown docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-18877: Add an mechanism to find cases where we accessed variables from the wrong thread. [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #19231:
URL: https://github.com/apache/kafka/pull/19231#issuecomment-2910989370

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] (wip )KAFKA-19042 consumer bounce test [kafka]

2025-05-26 Thread via GitHub


TaiJuWu opened a new pull request, #19822:
URL: https://github.com/apache/kafka/pull/19822

   Delete this text and replace it with a detailed description of your change. 
The 
   PR title and body will become the squashed commit message.
   
   If you would like to tag individuals, add some commentary, upload images, or
   include other supplemental information that should not be part of the 
eventual
   commit message, please use a separate comment.
   
   If applicable, please include a summary of the testing strategy (including 
   rationale) for the proposed change. Unit and/or integration tests are 
expected
   for any behavior change and system tests should be considered for larger
   changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-1792: change behavior of --generate to produce assignment config with fair replica distribution and minimal number of reassignments [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #18903:
URL: https://github.com/apache/kafka/pull/18903#issuecomment-2911010127

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19144 Move DelayedProduce to server module [kafka]

2025-05-26 Thread via GitHub


frankvicky commented on code in PR #19793:
URL: https://github.com/apache/kafka/pull/19793#discussion_r2108058643


##
core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala:
##
@@ -271,9 +271,10 @@ class LocalLeaderEndPointTest extends Logging {
 origin: AppendOrigin = AppendOrigin.CLIENT,
 requiredAcks: Short = -1): 
CallbackResult[PartitionResponse] = {
 val result = new CallbackResult[PartitionResponse]()
-def appendCallback(responses: scala.collection.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
-  val response = responses.get(partition)
-  assertTrue(response.isDefined)
+def appendCallback(responses: java.util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {

Review Comment:
   Given that we already have an import alias for `java.util.Map`, we could 
reuse it.
   ```suggestion
   def appendCallback(responses: JMap[TopicIdPartition, 
PartitionResponse]): Unit = {
   ```



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -967,12 +969,30 @@ class ReplicaManager(val config: KafkaConfig,
 entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
 initialAppendResults: Map[TopicIdPartition, LogAppendResult],
 initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
-responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
+responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
   ): Unit = {
 if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
   // create delayed produce operation
-  val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
-  val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback)
+  val produceMetadata = new ProduceMetadata(requiredAcks, 
initialProduceStatus.asJava)
+
+  def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit 
= {
+val (hasEnough, error) = getPartitionOrError(tp) match {
+  case Left(err) =>
+// Case A

Review Comment:
   What does this comment mean?



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -967,12 +969,30 @@ class ReplicaManager(val config: KafkaConfig,
 entriesPerPartition: Map[TopicIdPartition, MemoryRecords],
 initialAppendResults: Map[TopicIdPartition, LogAppendResult],
 initialProduceStatus: Map[TopicIdPartition, ProducePartitionStatus],
-responseCallback: Map[TopicIdPartition, PartitionResponse] => Unit,
+responseCallback: util.Map[TopicIdPartition, PartitionResponse] => Unit,
   ): Unit = {
 if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, 
initialAppendResults)) {
   // create delayed produce operation
-  val produceMetadata = ProduceMetadata(requiredAcks, initialProduceStatus)
-  val delayedProduce = new DelayedProduce(timeoutMs, produceMetadata, 
this, responseCallback)
+  val produceMetadata = new ProduceMetadata(requiredAcks, 
initialProduceStatus.asJava)
+
+  def delegate(tp: TopicPartition, status: ProducePartitionStatus) : Unit 
= {
+val (hasEnough, error) = getPartitionOrError(tp) match {
+  case Left(err) =>
+// Case A

Review Comment:
   I see this is relevant to the comment of `DelayedProduce,` but it's 
confusing that these comments are standalone here.
   Could you write the whole meaning of these cases, or link these comments to 
`tryComplete` ?
   



##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -2832,7 +2832,7 @@ class KafkaApisTest extends Logging {
   any(),
   ArgumentMatchers.eq(requestLocal),
   any()
-)).thenAnswer(_ => responseCallback.getValue.apply(Map(new 
TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE
+)).thenAnswer(_ => responseCallback.getValue.apply(Map(new 
TopicIdPartition(topicId,tp2) -> new PartitionResponse(Errors.NONE)).asJava))

Review Comment:
   ```suggestion
   )).thenAnswer(_ => responseCallback.getValue.apply(util.Map.of(new 
TopicIdPartition(topicId,tp2), new PartitionResponse(Errors.NONE
   ```



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -2990,9 +2990,9 @@ class ReplicaManagerTest {
 requiredAcks: Short = -1): 
CallbackResult[PartitionResponse] = {
 val result = new CallbackResult[PartitionResponse]()
 val topicIdPartition = new TopicIdPartition(topicId, partition)
-def appendCallback(responses: Map[TopicIdPartition, PartitionResponse]): 
Unit = {
-  val response = responses.get(topicIdPartition)
-  assertTrue(response.isDefined)
+def appendCallback(responses: util.Map[TopicIdPartition, 
PartitionResponse]): Unit = {
+  val response = 
java.util.Optional.ofNullable(responses.get(topicIdPartition))

Review Comment:
   We alrea

Re: [PR] [WIP] KAFKA-18562: standardize election/fetch timeout between Unattached and Followers [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #18921:
URL: https://github.com/apache/kafka/pull/18921#issuecomment-2911010160

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18157:Consider UnsupportedVersionException child class to represent the case of unsupported fields [kafka]

2025-05-26 Thread via GitHub


github-actions[bot] commented on PR #18072:
URL: https://github.com/apache/kafka/pull/18072#issuecomment-2911010040

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19336) Upgrade jackson libs since v2.16 is not maintained anymore

2025-05-26 Thread Luke Chen (Jira)
Luke Chen created KAFKA-19336:
-

 Summary: Upgrade jackson libs since v2.16 is not maintained anymore
 Key: KAFKA-19336
 URL: https://issues.apache.org/jira/browse/KAFKA-19336
 Project: Kafka
  Issue Type: Improvement
Reporter: Luke Chen


>From jackson doc: [https://github.com/FasterXML/jackson/wiki/Jackson-Releases]

> [2.16 |https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.16]Closed 
>in April 2025 with release of 2.19.0

We should upgrade jackson libs to newer version. From the doc, it should be 
backward compatible if we choose the latest 2.19.0.

 

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.17]

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.18]

[https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.19]

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19042: [9/N] Move GroupAuthorizerIntegrationTest to clients-integration-tests module [kafka]

2025-05-26 Thread via GitHub


nick-zh commented on PR #19685:
URL: https://github.com/apache/kafka/pull/19685#issuecomment-2911205618

   While I would love to help, I think you meant a different Nick @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19315: Move ControllerMutationQuotaManager to server module [kafka]

2025-05-26 Thread via GitHub


m1a2st commented on code in PR #19807:
URL: https://github.com/apache/kafka/pull/19807#discussion_r2107979149


##
server/src/main/java/org/apache/kafka/server/ClientQuotaManager.java:
##
@@ -0,0 +1,938 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Quota;
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Sanitizer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.network.Session;
+import org.apache.kafka.server.config.ClientQuotaManagerConfig;
+import org.apache.kafka.server.quota.ClientQuotaCallback;
+import org.apache.kafka.server.quota.ClientQuotaEntity;
+import org.apache.kafka.server.quota.ClientQuotaType;
+import org.apache.kafka.server.quota.QuotaType;
+import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
+import org.apache.kafka.server.quota.ThrottleCallback;
+import org.apache.kafka.server.quota.ThrottledChannel;
+import org.apache.kafka.server.util.ShutdownableThread;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
+
+final class QuotaTypes {
+static final int NO_QUOTAS = 0;
+static final int CLIENT_ID_QUOTA_ENABLED = 1;
+static final int USER_QUOTA_ENABLED = 2;
+static final int USER_CLIENT_ID_QUOTA_ENABLED = 4;
+static final int CUSTOM_QUOTAS = 8; // No metric update optimizations are 
used with custom quotas
+}
+
+public class ClientQuotaManager {
+
+private static final Logger log = 
LoggerFactory.getLogger(ClientQuotaManager.class);
+
+// Purge sensors after 1 hour of inactivity
+public static final int INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS = 3600;
+private static final String DEFAULT_NAME = "";
+
+public static final KafkaQuotaEntity DEFAULT_CLIENT_ID_QUOTA_ENTITY =
+new KafkaQuotaEntity(null, DefaultClientIdEntity.INSTANCE);
+public static final KafkaQuotaEntity DEFAULT_USER_QUOTA_ENTITY =
+new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, null);
+public static final KafkaQuotaEntity DEFAULT_USER_CLIENT_ID_QUOTA_ENTITY =
+new KafkaQuotaEntity(DefaultUserEntity.INSTANCE, 
DefaultClientIdEntity.INSTANCE);
+
+public interface BaseUserEntity extends ClientQuotaEntity.ConfigEntity { }
+
+public static class UserEntity implements BaseUserEntity {
+private final String sanitizedUser;
+
+public UserEntity(String sanitizedUser) {
+this.sanitizedUser = sanitizedUser;
+}
+
+@Override
+public ClientQuotaEntity.ConfigEntityType entityType() {
+return ClientQuotaEntity.ConfigEntityType.USER;
+}
+
+@Override
+public String name() {
+return Sanitizer.desanitize(sanitizedUser);
+}
+
+public String getSanitizedUser() {
+return sanitizedUser;
+}
+
+@Override
+public String toString() {
+return "user " + sanitizedUser;
+}
+
+@Override
+public boolean equals(Object obj) {
+if (this == obj) return true;
+if (obj == null || getClass() != obj.getClass()) return false;
+UserEntity t

[PR] KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets [kafka]

2025-05-26 Thread via GitHub


JimmyWang6 opened a new pull request, #19820:
URL: https://github.com/apache/kafka/pull/19820

   [KAFKA-16720](https://issues.apache.org/jira/browse/KAFKA-16720) aims to
finish the AlterShareGroupOffsets for ShareGroupCommand part.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19336) Upgrade jackson libs since v2.16 is not maintained anymore

2025-05-26 Thread Szu-Yung Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Szu-Yung Wang reassigned KAFKA-19336:
-

Assignee: Szu-Yung Wang

> Upgrade jackson libs since v2.16 is not maintained anymore
> --
>
> Key: KAFKA-19336
> URL: https://issues.apache.org/jira/browse/KAFKA-19336
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Szu-Yung Wang
>Priority: Major
>
> From jackson doc: [https://github.com/FasterXML/jackson/wiki/Jackson-Releases]
> > [2.16 
> >|https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.16]Closed in 
> >April 2025 with release of 2.19.0
> We should upgrade jackson libs to newer version. From the doc, it should be 
> backward compatible if we choose the latest 2.19.0.
>  
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.17]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.18]
> [https://github.com/FasterXML/jackson/wiki/Jackson-Release-2.19]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19334) MetadataShell bypasses file lock unexpectedly due to lock file deletion

2025-05-26 Thread TengYao Chi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TengYao Chi updated KAFKA-19334:

Fix Version/s: 3.9.2
   4.0.1
   4.1.0

> MetadataShell bypasses file lock unexpectedly due to lock file deletion
> ---
>
> Key: KAFKA-19334
> URL: https://issues.apache.org/jira/browse/KAFKA-19334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 4.0.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
> Fix For: 3.9.2, 4.0.1, 4.1.0
>
>
> MetadataShell acquires log dir lock to prevent unexpected results by 
> concurrent reads/writes fixed by [https://github.com/apache/kafka/pull/14899].
>  
> This lock works as expected when we execute MetadataShell against running log 
> for the first time:
>  
> {code:java}
> % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint
> Unexpected error: Unable to lock /path/to/data. Please ensure that no broker 
> or controller process is using this directory before proceeding.{code}
> However, if we execute MetadataShell with same command again with controller 
> still running, it succeeds unexpectedly.
>  
>  
> {code:java}
> % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint
> Loading...
> Starting...
> [ Kafka Metadata Shell ]
> >> {code}
> And I found that .lock file vanishes after MetadataShell exit.
>  
> This is because MetadataShell calls FileLock#destroy when it fails acquiring 
> lock, which deletes the lock file.
> [https://github.com/apache/kafka/blob/4.0.0/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java#L131]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19334) MetadataShell bypasses file lock unexpectedly due to lock file deletion

2025-05-26 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954105#comment-17954105
 ] 

Haruki Okada commented on KAFKA-19334:
--

[~cmccabe] Submitted a patch for this. Could you take a look?

> MetadataShell bypasses file lock unexpectedly due to lock file deletion
> ---
>
> Key: KAFKA-19334
> URL: https://issues.apache.org/jira/browse/KAFKA-19334
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 4.0.0
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>
> MetadataShell acquires log dir lock to prevent unexpected results by 
> concurrent reads/writes fixed by [https://github.com/apache/kafka/pull/14899].
>  
> This lock works as expected when we execute MetadataShell against running log 
> for the first time:
>  
> {code:java}
> % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint
> Unexpected error: Unable to lock /path/to/data. Please ensure that no broker 
> or controller process is using this directory before proceeding.{code}
> However, if we execute MetadataShell with same command again with controller 
> still running, it succeeds unexpectedly.
>  
>  
> {code:java}
> % ./bin/kafka-metadata-shell.sh --snapshot data/bootstrap.checkpoint
> Loading...
> Starting...
> [ Kafka Metadata Shell ]
> >> {code}
> And I found that .lock file vanishes after MetadataShell exit.
>  
> This is because MetadataShell calls FileLock#destroy when it fails acquiring 
> lock, which deletes the lock file.
> [https://github.com/apache/kafka/blob/4.0.0/shell/src/main/java/org/apache/kafka/shell/MetadataShell.java#L131]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19333) Inconsistent behavior between `ConsumerMembershipManager` and `StreamsMembershipManager` on `onAllTasksLost` execution

2025-05-26 Thread Nick Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954108#comment-17954108
 ] 

Nick Guo commented on KAFKA-19333:
--

Hi [~lianetm] ! Thanks for taking time to explain this,it's really helpful!

> Inconsistent behavior between `ConsumerMembershipManager` and 
> `StreamsMembershipManager` on `onAllTasksLost` execution
> --
>
> Key: KAFKA-19333
> URL: https://issues.apache.org/jira/browse/KAFKA-19333
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Nick Guo
>Assignee: Nick Guo
>Priority: Minor
>
> `ConsumerMembershipManager` does not create an event to run a callback if 
> there is nothing to revoke,but `StreamsMembershipManager` does.
> related discussion and pr:
> discussion:[https://github.com/apache/kafka/pull/18551/files#r2106243432]
> pr: https://github.com/apache/kafka/pull/19779



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19056: Rewrite EndToEndClusterIdTest in Java and move it to the server module [kafka]

2025-05-26 Thread via GitHub


mingyen066 commented on code in PR #19741:
URL: https://github.com/apache/kafka/pull/19741#discussion_r2107586894


##
server/src/test/java/org/apache/kafka/api/EndToEndClusterIdTest.java:
##
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.api;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.metrics.MetricConfigs;
+import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockDeserializer;
+import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProducerInterceptor;
+import org.apache.kafka.test.MockSerializer;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.kafka.test.TestUtils.isValidClusterId;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** The test cases here verify the following conditions.
+ * 1. The ProducerInterceptor receives the cluster id after the onSend() 
method is called and before onAcknowledgement() method is called.
+ * 2. The Serializer receives the cluster id before the serialize() method is 
called.
+ * 3. The producer MetricReporter receives the cluster id after send() method 
is called on KafkaProducer.
+ * 4. The ConsumerInterceptor receives the cluster id before the onConsume() 
method.
+ * 5. The Deserializer receives the cluster id before the deserialize() method 
is called.
+ * 6. The consumer MetricReporter receives the cluster id after poll() is 
called on KafkaConsumer.
+ * 7. The broker MetricReporter receives the cluster id after the broker 
startup is over.
+ * 8. The broker KafkaMetricReporter receives the cluster id after the broker 
startup is over.
+ * 9. All the components receive the same cluster id.
+ */
+@ClusterTestDefaults(serverProperties = {
+@ClusterConfigProperty(key = MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
value = "org.apache.kafka.api.EndToEndClusterIdTest$MockCommonMetricsReporter"),
+})
+public class EndToEndClusterIdTest {
+
+private static final String TOPIC = "e2etopic";
+private static final int PARTITION = 0;
+private static final TopicPartition TP = new TopicPartition(TOPIC, 
PARTITION);
+private final ClusterInstance clusterInstance;
+private String clusterBrokerId;
+private String controllerId;
+private static final String PRODUCER_CLIENT_ID = "producerClientId";
+private static final String CONSUMER_CLIENT_ID = "consumerClientId";
+
+EndToEndClusterIdTest(ClusterInstance clusterInstance) {
+this.clusterInstance = clusterInstance;
+}
+
+@BeforeEach
+public void setup() throws InterruptedException {
+this.clusterInstance.createTopic(TOPIC, 2, (short) 1);
+clusterBrokerId = 
String.valueOf(clusterInstance.brokerIds().iterator().next());
+controllerId = 
String.valueOf(clusterInstance.controllerIds().iterator().next());
+MockDeserializer.resetStaticVariables();
+}
+
+public static class MockCommonMetricsReporter extends MockMetricsReporter 
implements ClusterResourceListener {
+public static final Map CLU

[jira] [Created] (KAFKA-19335) Streams group heartbeat sometimes fails with INVALID_REQUEST

2025-05-26 Thread Lucas Brutschy (Jira)
Lucas Brutschy created KAFKA-19335:
--

 Summary: Streams group heartbeat sometimes fails with 
INVALID_REQUEST
 Key: KAFKA-19335
 URL: https://issues.apache.org/jira/browse/KAFKA-19335
 Project: Kafka
  Issue Type: Sub-task
Reporter: Lucas Brutschy


KIP-1071 soak sometimes fails with this error:
 {{[2025-04-22 17:34:12,585] ERROR [consumer_background_thread] [Consumer 
instanceId=ip-172-31-9-4.us-west-2.compute.internal-1, 
clientId=i-0b96c1803fb4c32d7-StreamThread-1-consumer, groupId=stream-soak-test] 
StreamsGroupHeartbeatRequest failed due to INVALID_REQUEST: Topology can only 
be provided when (re-)joining. 
(org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager)}}
h3.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-19335) Streams group heartbeat sometimes fails with INVALID_REQUEST

2025-05-26 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-19335:
--

Assignee: Lucas Brutschy

> Streams group heartbeat sometimes fails with INVALID_REQUEST
> 
>
> Key: KAFKA-19335
> URL: https://issues.apache.org/jira/browse/KAFKA-19335
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Lucas Brutschy
>Assignee: Lucas Brutschy
>Priority: Major
>
> KIP-1071 soak sometimes fails with this error:
>  {{[2025-04-22 17:34:12,585] ERROR [consumer_background_thread] [Consumer 
> instanceId=ip-172-31-9-4.us-west-2.compute.internal-1, 
> clientId=i-0b96c1803fb4c32d7-StreamThread-1-consumer, 
> groupId=stream-soak-test] StreamsGroupHeartbeatRequest failed due to 
> INVALID_REQUEST: Topology can only be provided when (re-)joining. 
> (org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager)}}
> h3.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-19335: Membership Managers end negative epoch in JOINING [kafka]

2025-05-26 Thread via GitHub


lucasbru opened a new pull request, #19818:
URL: https://github.com/apache/kafka/pull/19818

   There is a sequence of interactions with the membership managers of KIP-848, 
KIP-932, KIP-1071 that can put the member ship manager into JOINING state, but 
where member epoch is set to -1. This can result in an invalid request being 
sent, since joining heartbeats should not have member epoch -1. This may lead 
to the member failing to join. In the case of streams, the group coordinator 
will return INVALID_REQUEST.
   
   This is the sequence triggering the bug, which seems to relatively likely, 
caused by two heartbeat responses being received after the next one has been 
sent.
   
   `membershipManager.leaveGroup(); -> transitions to LEAVING 
membershipManager.onHeartbeatRequestGenerated(); -> transitions to UNSUBSCRIBED 
membershipManager.onHeartbeatSuccess(... with member epoch > 0); -> unblocks 
the consumer membershipManager.onSubscriptionUpdated(); 
membershipManager.onConsumerPoll(); -> transitions to JOINING 
membershipManager.onHeartbeatSuccess(... with member epoch < 0); -> updates the 
epoch to a negative value -> Now we are in state JOINING with memberEpoch -1, 
and the next heartbeat we send will be malformed, triggering INVALID_REQUEST`
   
   The bug may also be triggered if the `unsubscribe` times out, but this seems 
more of a corner case.
   
   To prevent the bug, we are taking two measures: The likely path to 
triggering the bug can be prevented by not unblocking an `unsubscribe` call in 
the consumer when a non-leave-heartbeat epoch is received. Once we have sent 
out leave group heartbeat, we will ignore all heartbeats, except for those 
containing memberEpoch < 0.
   
   For extra measure, we also prevent the second case (`unsubscribe` timing 
out). In this case, the consumer gets unblocked before we have received the 
leave group heartbeat response, and may resubscribe to the group. In this case, 
we shall just ignore the heartbeat response that contains a member epoch < 0, 
once it arrives and we have already left the `UNSUBSCRIBED` state.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19288) Ensure new consumer joining attempt not overwritten on delayed HB response to previous leave

2025-05-26 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-19288:
--

Assignee: Lucas Brutschy  (was: Lianet Magrans)

> Ensure new consumer joining attempt not overwritten on delayed HB response to 
> previous leave
> 
>
> Key: KAFKA-19288
> URL: https://issues.apache.org/jira/browse/KAFKA-19288
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.1.0
>
>
> For the asyn consumer, check and cover edge case where the consumer may 
> transition to JOINING, but receive a delayed response to a previous leave 
> request. We should ensure that the response is ignored and the consumer joins 
> as it intended. 
> I expect this doesn't usually happens given that the unsubscribe is a 
> blocking operation, so the consumer won't be able to join until the 
> unsubscribe completes. But if the leave request doesn't get a response in 
> time (unsubscribes fails quietly), and the response arrives after it, I 
> expect the joining could be overwritten (member updating it's joining epoch 0 
> to the -1 epoch received in the response)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19288) Ensure new consumer joining attempt not overwritten on delayed HB response to previous leave

2025-05-26 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954114#comment-17954114
 ] 

Lucas Brutschy commented on KAFKA-19288:


https://github.com/apache/kafka/pull/19818

> Ensure new consumer joining attempt not overwritten on delayed HB response to 
> previous leave
> 
>
> Key: KAFKA-19288
> URL: https://issues.apache.org/jira/browse/KAFKA-19288
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.1.0
>
>
> For the asyn consumer, check and cover edge case where the consumer may 
> transition to JOINING, but receive a delayed response to a previous leave 
> request. We should ensure that the response is ignored and the consumer joins 
> as it intended. 
> I expect this doesn't usually happens given that the unsubscribe is a 
> blocking operation, so the consumer won't be able to join until the 
> unsubscribe completes. But if the leave request doesn't get a response in 
> time (unsubscribes fails quietly), and the response arrives after it, I 
> expect the joining could be overwritten (member updating it's joining epoch 0 
> to the -1 epoch received in the response)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19330: Change MockSerializer/Deserializer to use String serializer instead of byte[] [kafka]

2025-05-26 Thread via GitHub


chia7712 commented on code in PR #19812:
URL: https://github.com/apache/kafka/pull/19812#discussion_r2107630603


##
clients/src/test/java/org/apache/kafka/test/MockDeserializer.java:
##
@@ -52,11 +53,14 @@ public void configure(Map configs, boolean 
isKey) {
 }
 
 @Override
-public byte[] deserialize(String topic, byte[] data) {
+public String deserialize(String topic, byte[] data) {
 // This will ensure that we get the cluster metadata when deserialize 
is called for the first time
 // as subsequent compareAndSet operations will fail.
 clusterIdBeforeDeserialize.compareAndSet(noClusterId, 
clusterMeta.get());
-return data;
+if (data == null)

Review Comment:
   ```java
   if (data == null) return null;
   return data.getBytes(StandardCharsets.UTF_8);
   ```



##
clients/src/test/java/org/apache/kafka/test/MockSerializer.java:
##
@@ -35,11 +36,14 @@ public MockSerializer() {
 }
 
 @Override
-public byte[] serialize(String topic, byte[] data) {
+public byte[] serialize(String topic, String data) {
 // This will ensure that we get the cluster metadata when serialize is 
called for the first time
 // as subsequent compareAndSet operations will fail.
 CLUSTER_ID_BEFORE_SERIALIZE.compareAndSet(NO_CLUSTER_ID, 
CLUSTER_META.get());
-return data;
+if (data == null)

Review Comment:
   ditto



##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -603,7 +603,7 @@ public void testSerializerClose() {
 final int oldInitCount = MockSerializer.INIT_COUNT.get();
 final int oldCloseCount = MockSerializer.CLOSE_COUNT.get();
 
-KafkaProducer producer = new KafkaProducer<>(
+KafkaProducer producer = new KafkaProducer<>(

Review Comment:
   ```java
   try (var ignored = new KafkaProducer<>(configs, new 
MockSerializer(), new MockSerializer())) {
   assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
   assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19288) Ensure new consumer joining attempt not overwritten on delayed HB response to previous leave

2025-05-26 Thread Lucas Brutschy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17954116#comment-17954116
 ] 

Lucas Brutschy commented on KAFKA-19288:


The sequence is slightly different from what is described in this ticket, and 
it could be more likely. What is actually happening that after unsubscribing, 
we receive a heartbeat response with memberEpoch > 0, which happens to unblock 
the consumer. So unsubscribe does not need to time out for this to be triggered.

> Ensure new consumer joining attempt not overwritten on delayed HB response to 
> previous leave
> 
>
> Key: KAFKA-19288
> URL: https://issues.apache.org/jira/browse/KAFKA-19288
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 4.1.0
>
>
> For the asyn consumer, check and cover edge case where the consumer may 
> transition to JOINING, but receive a delayed response to a previous leave 
> request. We should ensure that the response is ignored and the consumer joins 
> as it intended. 
> I expect this doesn't usually happens given that the unsubscribe is a 
> blocking operation, so the consumer won't be able to join until the 
> unsubscribe completes. But if the leave request doesn't get a response in 
> time (unsubscribes fails quietly), and the response arrives after it, I 
> expect the joining could be overwritten (member updating it's joining epoch 0 
> to the -1 epoch received in the response)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19330: Change MockSerializer/Deserializer to use String serializer instead of byte[] [kafka]

2025-05-26 Thread via GitHub


mingyen066 commented on PR #19812:
URL: https://github.com/apache/kafka/pull/19812#issuecomment-2910251765

@chia7712 Thanks for the review. I've addressed the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19300: AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException [kafka]

2025-05-26 Thread via GitHub


chia7712 merged PR #19779:
URL: https://github.com/apache/kafka/pull/19779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-19300) AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException

2025-05-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-19300.

Resolution: Fixed

> AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException
> ---
>
> Key: KAFKA-19300
> URL: https://issues.apache.org/jira/browse/KAFKA-19300
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Chia-Ping Tsai
>Assignee: Nick Guo
>Priority: Major
> Fix For: 4.1.0
>
>
> it can be reproduced by 
> GroupAuthorizerIntegrationTest.testConsumeUnsubscribeWithoutGroupPermission.
> The root cause is shown below.
> 1. AsyncConsumer#unsubscribe is executed
> 2. process(UnsubscribeEvent) changes the state to LEAVING
> 3. the state is changed to UNSUBSCRIBED in generating Heartbeat
> 4. the `LeaveInProgress` is skipped in transitionToFatal since the state is 
> UNSUBSCRIBED
> The behavior is inconsistent to classic consumer



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19322: Remove the DelayedOperation constructor that accepts an external lock [kafka]

2025-05-26 Thread via GitHub


chia7712 merged PR #19798:
URL: https://github.com/apache/kafka/pull/19798


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-19322) Remove the DelayedOperation constructor that accepts an external lock

2025-05-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19322?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-19322.

Fix Version/s: 4.1.0
   Resolution: Fixed

> Remove the DelayedOperation constructor that accepts an external lock
> -
>
> Key: KAFKA-19322
> URL: https://issues.apache.org/jira/browse/KAFKA-19322
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Yu Chia Ma
>Priority: Minor
> Fix For: 4.1.0
>
>
> see discussion
> https://github.com/apache/kafka/pull/19759#discussion_r2097557356



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19144 Move DelayedProduce to server module [kafka]

2025-05-26 Thread via GitHub


chia7712 commented on PR #19793:
URL: https://github.com/apache/kafka/pull/19793#issuecomment-2910286667

   @johnny94 please fix the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFK1-19315: Move ControllerMutationQuotaManager to server module [kafka]

2025-05-26 Thread via GitHub


chia7712 commented on code in PR #19807:
URL: https://github.com/apache/kafka/pull/19807#discussion_r2107643996


##
server/src/main/java/org/apache/kafka/server/PermissiveControllerMutationQuota.java:
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * The PermissiveControllerMutationQuota defines a permissive quota for a 
given user/clientId pair.
+ * The quota is permissive meaning that 1) it does accept any mutations even 
if the quota is
+ * exhausted; and 2) it does throttle as soon as the quota is exhausted.
+ *
+ * @param time @Time object to use

Review Comment:
   ditto



##
server/src/main/java/org/apache/kafka/server/UnboundedControllerMutationQuota.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server;
+
+/**
+ * Default quota used when quota is disabled.
+ */
+public class UnboundedControllerMutationQuota implements 
ControllerMutationQuota {
+
+public static final UnboundedControllerMutationQuota INSTANCE = new 
UnboundedControllerMutationQuota();

Review Comment:
   Could you please inline this class?
   ```java
   public interface ControllerMutationQuota {
   ControllerMutationQuota UNBOUNDED_CONTROLLER_MUTATION_QUOTA = new 
ControllerMutationQuota() {
   @Override
   public boolean isExceeded() {
   return false;
   }
   
   @Override
   public void record(double permits) {
   }
   
   @Override
   public int throttleTime() {
   return 0;
   }
   };
   
   ```



##
server/src/main/java/org/apache/kafka/server/AbstractControllerMutationQuota.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.metrics.QuotaViolationException;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * The AbstractControllerMutationQuota is the base class of 
StrictControllerMutationQuota and
+ * PermissiveControllerMutationQuota.
+ *
+ * @param time @Time object to use

Review Comment:
   please fix the docs



##
server/src/main/java/org/apache/kafka/server/ClientSensors.java:
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF 

Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19802:
URL: https://github.com/apache/kafka/pull/19802#discussion_r2107522589


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds(
 }
 
 /**
- * @return An immutable map of partition metadata for each topic that are 
inputs for this streams group.
+ * @return The metadata hash.
  */
-public Map partitionMetadata() {
-return Collections.unmodifiableMap(partitionMetadata);
+public long metadataHash() {
+return metadataHash.get();
 }
 
 /**
- * Updates the partition metadata. This replaces the previous one.
+ * Updates the metadata hash.
  *
- * @param partitionMetadata The new partition metadata.
+ * @param metadataHash The new metadata hash.
  */
-public void setPartitionMetadata(
-Map partitionMetadata
-) {
-this.partitionMetadata.clear();
-this.partitionMetadata.putAll(partitionMetadata);
-maybeUpdateConfiguredTopology();
-maybeUpdateGroupState();

Review Comment:
   We may not be able to set metadata hash in `ConfiguredTopology`. When 
replaying `StreamsGroupMetadataValue`, we need to set `metadataHash` back. If 
we add it to `ConfiguredTopology`, that means there will have a non-empty 
`ConfiguredTopology` in the group and the `metadataHash` may be the same as 
latest computed value. Then we don't have a value to compare whether need to 
compute a new `ConfiguredTopology`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]

2025-05-26 Thread via GitHub


lianetm commented on PR #19818:
URL: https://github.com/apache/kafka/pull/19818#issuecomment-2910378099

   Hey @lucasbru, thanks for taking on this one. Agree with the gap on leave HB 
responses received in unexpected order. And the fix makes sense to me (only 
complete the leave if the HB response is a response to leave, and never apply 
epoch received in a leave HB response). 
   
   What I'm not seeing clearly is how this would lead to INVALID_REQUEST? (so 
worried that even though this is a sensible gap and fix there may still be 
something else behind the failure you got?). If this race happens, I expect 
that we end up sending a full HB (all fields), but with the -1 epoch, correct? 
Then the request should fail with UNKNOWN_MEMBER, the moment the coordinator 
tries to find the member that wants to leave
   
https://github.com/apache/kafka/blob/6e380fbbcc8fde22d1f2bb3310e1270d5b3f4837/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L3902
   (same applies for the Consumer btw, we should get UNKNOWN_MEMBER if the 
client sends a full HB to join but with epoch -1 by mistake/race).
   
   Thoughts? not sure if I'm missing something here


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]

2025-05-26 Thread via GitHub


lucasbru commented on code in PR #19802:
URL: https://github.com/apache/kafka/pull/19802#discussion_r2107633695


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds(
 }
 
 /**
- * @return An immutable map of partition metadata for each topic that are 
inputs for this streams group.
+ * @return The metadata hash.
  */
-public Map partitionMetadata() {
-return Collections.unmodifiableMap(partitionMetadata);
+public long metadataHash() {
+return metadataHash.get();
 }
 
 /**
- * Updates the partition metadata. This replaces the previous one.
+ * Updates the metadata hash.
  *
- * @param partitionMetadata The new partition metadata.
+ * @param metadataHash The new metadata hash.
  */
-public void setPartitionMetadata(
-Map partitionMetadata
-) {
-this.partitionMetadata.clear();
-this.partitionMetadata.putAll(partitionMetadata);
-maybeUpdateConfiguredTopology();
-maybeUpdateGroupState();

Review Comment:
   Ah, I did not mean as a replacement for the `TimelineLong` inside, 
`StreamsGroup`. We need an additional hash to be stored inside 
`ConfiguredTopology`, to remember for which state of the topics it was 
calculated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19335: Membership managers send negative epoch in JOINING [kafka]

2025-05-26 Thread via GitHub


lucasbru commented on PR #19818:
URL: https://github.com/apache/kafka/pull/19818#issuecomment-2910391860

   > Thoughts? not sure if I'm missing something here
   
   Thanks for taking a look, @lianetm ! You are right, consumer groups and 
share groups should fail with `UNKNOWN_MEMBER`. In streams, we send the 
topology in a full request, and have code to reject when it is being sent in 
anything but a joining heartbeat. This will trigger the `INVALID_REQUEST`. Hope 
that will clear things up!
   
   
https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L539


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


dajac commented on PR #19761:
URL: https://github.com/apache/kafka/pull/19761#issuecomment-2909780629

   I would like to merge https://github.com/apache/kafka/pull/19790 before 
merging this one because we need to cherry-pick it to 4.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]

2025-05-26 Thread via GitHub


ahuang98 commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2107750048


##
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##
@@ -453,40 +452,34 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
   }
 
   /**
-   * Helper method to update quota types counts and quotaTypesEnabled flag.
-   * @param quotaTypeKey The QuotaTypes constant (e.g., 
QuotaTypes.UserClientIdQuotaEnabled)
-   * @param increment True to increment count, false to decrement
+   * Helper method to update quotaTypesEnabled which is a bitwise OR 
combination of the enabled quota types.
+   * For example:
+   *  - If UserQuotaEnabled = 2 and ClientIdQuotaEnabled = 1, then 
quotaTypesEnabled = 3 (2 | 1 = 3)
+   *  - If UserClientIdQuotaEnabled = 4 and UserQuotaEnabled = 1, then 
quotaTypesEnabled =  (4 | 1 = 5)
+   *  - If UserClientIdQuotaEnabled = 4 and ClientIdQuotaEnabled = 2, then 
quotaTypesEnabled = 6 (4 | 2 = 6)
+   *  - If all three are enabled (1 | 2 | 4), then quotaTypesEnabled = 7
*/
-  private def updateQuotaTypes(quotaTypeKey: Int, increment: Boolean): Unit = {
-if (quotaTypeKey == QuotaTypes.NoQuotas) {
-  return
-}
-val previousQuotaTypesEnabled = quotaTypesEnabled
-
-// Update activeQuotaTypes counts
-activeQuotaTypes.compute(quotaTypeKey, (_, count) =>
-  if (increment) Option(count).getOrElse(0) + 1
-  else if (Option(count).exists(_ > 1)) count - 1
-  else 0
-)
+  private def updateQuotaTypes(): Unit = {
+quotaTypesEnabled =  if (clientQuotaCallbackPlugin.isDefined) {
+QuotaTypes.CustomQuotas

Review Comment:
   thanks for the details and for the additional test!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]

2025-05-26 Thread via GitHub


ahuang98 commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2107754429


##
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##
@@ -428,18 +427,19 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
 try {
   val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-  if (userEntity.nonEmpty) {
-if (quotaEntity.clientIdEntity.nonEmpty)
-  quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-else
-  quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-  } else if (clientEntity.nonEmpty)
-quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
   quota match {
-case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, 
quotaEntity, newQuota.bound)
-case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+case Some(newQuota) =>
+  quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
newQuota.bound)
+  if(!activeQuotaEntities.put(quotaEntity, true)){

Review Comment:
   we're adding quotaEntity (of type KafkaQuotaEntity) to a map whose keys are 
of type ClientQuotaEntity?



##
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##
@@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
 case None => new DefaultQuotaCallback
   }
   private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
+  private val activeQuotaEntities = new ConcurrentHashMap[ClientQuotaEntity, 
Boolean]()

Review Comment:
   perhaps you meant to make the key type KafkaQuotaEntity - in any case 
though, I'm not sure I understand why the key is of type "...QuotaEntity" 
(which contains both the type and name) instead of just "ConfigEntityType" 
(which contains only the type)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [9/N] Move GroupAuthorizerIntegrationTest to clients-integration-tests module [kafka]

2025-05-26 Thread via GitHub


chia7712 commented on PR #19685:
URL: https://github.com/apache/kafka/pull/19685#issuecomment-2910266692

   @nick-zh please fix the conflicts


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]

2025-05-26 Thread via GitHub


MahsaSeifikar commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2107803984


##
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##
@@ -428,18 +427,19 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
 try {
   val quotaEntity = KafkaQuotaEntity(userEntity, clientEntity)
 
-  if (userEntity.nonEmpty) {
-if (quotaEntity.clientIdEntity.nonEmpty)
-  quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled
-else
-  quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled
-  } else if (clientEntity.nonEmpty)
-quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled
-
   quota match {
-case Some(newQuota) => quotaCallback.updateQuota(clientQuotaType, 
quotaEntity, newQuota.bound)
-case None => quotaCallback.removeQuota(clientQuotaType, quotaEntity)
+case Some(newQuota) =>
+  quotaCallback.updateQuota(clientQuotaType, quotaEntity, 
newQuota.bound)
+  if(!activeQuotaEntities.put(quotaEntity, true)){

Review Comment:
   My bad it shoud be a map with KafkaQuotaEntity as key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19154; Offset Fetch API should return INVALID_OFFSET if requested topic id does not match persisted one [kafka]

2025-05-26 Thread via GitHub


lianetm commented on code in PR #19744:
URL: https://github.com/apache/kafka/pull/19744#discussion_r2107800303


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -859,6 +859,7 @@ boolean hasCommittedOffset(
  *
  * @return A List of OffsetFetchResponseTopics response.
  */
+@SuppressWarnings("NPathComplexity")

Review Comment:
   Can we maybe avoid the suppression if we use a var for the new check? 
(something like `var isInvalidOffset = offsetAndMetadata == null || 
isMismatchedTopicId(...` and check on it on ln 910?



##
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##
@@ -527,4 +527,88 @@ class OffsetFetchRequestTest(cluster: ClusterInstance) 
extends GroupCoordinatorB
   )
 }
   }
+
+  @ClusterTest
+  def testFetchOffsetWithRecreatedTopic(): Unit = {
+// There are two ways to ensure that committed of recreated topics are not 
returned.
+// 1) When a topic is deleted, GroupCoordinatorService#onPartitionsDeleted 
is called to
+//delete all its committed offsets.
+// 2) Since version 10 of the OffsetCommit API, the topic id is stored 
alongside the
+//committed offset. When it is queried, it is only returned iff the 
topic id of
+//committed offset matches the requested one.
+// The test tests both conditions but not in a deterministic way as they 
race
+// against each others.

Review Comment:
   yeap, tricky to play against that here on integration tests, but seems good 
enough because it ensures the final outcome (combined with the unit tests for 
the topic ID check)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]

2025-05-26 Thread via GitHub


dongnuo123 commented on code in PR #19790:
URL: https://github.com/apache/kafka/pull/19790#discussion_r210783


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupTest.java:
##
@@ -1576,6 +1581,7 @@ public void testFromClassicGroup() {
 assertEquals(expectedConsumerGroup.groupEpoch(), 
consumerGroup.groupEpoch());
 assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
 assertEquals(expectedConsumerGroup.preferredServerAssignor(), 
consumerGroup.preferredServerAssignor());
+assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), 
Map.copyOf(consumerGroup.subscriptionMetadata()));

Review Comment:
   Without it `assertEquals` just compares the reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group [kafka]

2025-05-26 Thread via GitHub


dongnuo123 commented on code in PR #19790:
URL: https://github.com/apache/kafka/pull/19790#discussion_r2107834673


##
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala:
##
@@ -2485,8 +2485,6 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 // Test offset deletion while consuming
 val offsetDeleteResult = 
client.deleteConsumerGroupOffsets(testGroupId, util.Set.of(tp1, tp2))
 
-// Top level error will equal to the first partition level error
-assertFutureThrows(classOf[GroupSubscribedToTopicException], 
offsetDeleteResult.all())

Review Comment:
   Reverted it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… [kafka]

2025-05-26 Thread via GitHub


cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2107363103


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -446,29 +446,15 @@ private List getTaskIdsAsStrings(final 
KafkaStreams streams) {
 
 private static Stream singleAndMultiTaskParameters() {

Review Comment:
   This name does not really fit anymore. I propose to rename this method to 
`topologyComplexityAndRebalanceProtocol`.



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -484,7 +470,7 @@ private Properties props(final Properties extraProperties) {
 streamsConfiguration.put(StreamsConfig.DEFAULT_CLIENT_SUPPLIER_CONFIG, 
TestClientSupplier.class);
 
streamsConfiguration.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER,
 TestConsumerWrapper.class);
 streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-streamsConfiguration.putAll(extraProperties);

Review Comment:
   This does not seem right. On line 472 the group protocol config is passed to 
`props()`, but here it is ignored.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -395,18 +379,15 @@ public void handleAssignment(final Map> activeTasks,
 // 2. for tasks that have changed active/standby status, just recycle 
and skip re-creating them
 // 3. otherwise, close them since they are no longer owned
 final Map failedTasks = new 
LinkedHashMap<>();
-if (stateUpdater == null) {
-handleTasksWithoutStateUpdater(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
-} else {
-handleTasksWithStateUpdater(
-activeTasksToCreate,
-standbyTasksToCreate,
-tasksToRecycle,
-tasksToCloseClean,
-failedTasks
-);
-
failedTasks.putAll(collectExceptionsAndFailedTasksFromStateUpdater());
-}
+
+handleTasksWithStateUpdater(

Review Comment:
   Could you please rename this method to `handleTasks()`. We do not need to 
distinguish the cases with and without state updater.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1909,7 +1772,7 @@ public void 
shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro
 }
 
 @Test
-public void 
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
+public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() 
{

Review Comment:
   Could you please change 
   ```java
   final long changelogOffsetOfRunningTask = 42L;
   ``` 
   to 
   ```java
   final long changelogOffsetOfRunningTask = Task.LATEST_OFFSET;
   ```
   to make the case more real?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
   final Runnable shutdownErrorHook,
   final BiConsumer 
streamsUncaughtExceptionHandler) {
 
-final boolean stateUpdaterEnabled = 
InternalConfig.stateUpdaterEnabled(config.originals());

Review Comment:
   There is still a system test that uses the config. It is 
[`streams_upgrade_test.test_upgrade_downgrade_state_updater()`](https://github.com/apache/kafka/blob/1ded681684e771b16aa98ae751f39b9816345a83/tests/kafkatest/tests/streams/streams_upgrade_test.py#L178).
 There is a comment that says:
   ```
   Once same-thread state restoration is removed from the code, this test 
   should use different versions of the code.
   ``` 
   I guess it means to only use a version before `3.8` (e.g. `LATEST_3_7`) for 
the `from_version` and `DEV_VERSION` for the `to_version`. You need to choose a 
version before `3.8` because before `3.8` the state updater was not enabled by 
default. 
   @lucasbru did I correctly interpret your comment?  



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1940,57 +1803,6 @@ public void 
shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStat
 );
 }
 
-@Test
-public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws 
Exception {

Review Comment:
   Could you please replace this test with the following:
   ```java
   @Test
   public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() {
   final StreamTask restoringStatefulTask = statefulTask(taskId01, 
taskId01ChangelogPartitions)
   .inState(State.RESTORING).build();
   final long changelogOffsetOfRestoringStandbyTask = 84L

Re: [PR] KAFKA-19282: Update quotaTypesEnabled on quota removal in ClientQuotaManager [kafka]

2025-05-26 Thread via GitHub


MahsaSeifikar commented on code in PR #19742:
URL: https://github.com/apache/kafka/pull/19742#discussion_r2107840373


##
core/src/main/scala/kafka/server/ClientQuotaManager.scala:
##
@@ -155,6 +155,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
 case None => new DefaultQuotaCallback
   }
   private val clientQuotaType = QuotaType.toClientQuotaType(quotaType)
+  private val activeQuotaEntities = new ConcurrentHashMap[ClientQuotaEntity, 
Boolean]()

Review Comment:
   `KafkaQuotaEntity` is used as the key for `activeQuotaEntities` because it 
tracks entity name (e.g., "userA" or "client1", or default user or client) and 
we can extract the entity type from that. Consider a scenario where a customer 
has two quotas: one for `(userA, client1)` and another for `(userA, client2)`. 
If we only tracked the entity type, removing the quota for one combination 
(e.g., `(userA, client1)`) could incorrectly affect other combinations of the 
same type (e.g., `(userA, client2)`). By using `KafkaQuotaEntity`, we make sure 
that removing one quota does not impact others of the same type. I found this 
issue while testing scenarios involving duplicate quota types.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Update README.md [kafka]

2025-05-26 Thread via GitHub


whybeeh1 opened a new pull request, #19819:
URL: https://github.com/apache/kafka/pull/19819

   Better readability


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update README.md [kafka]

2025-05-26 Thread via GitHub


whybeeh1 closed pull request #19819: Update README.md
URL: https://github.com/apache/kafka/pull/19819


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update README.md [kafka]

2025-05-26 Thread via GitHub


whybeeh1 commented on PR #19819:
URL: https://github.com/apache/kafka/pull/19819#issuecomment-2910613900

   cl


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Update README.md [kafka]

2025-05-26 Thread via GitHub


whybeeh1 commented on PR #19819:
URL: https://github.com/apache/kafka/pull/19819#issuecomment-2910631422

   l


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19268 Missing mocks for SharePartitionManagerTest tests [kafka]

2025-05-26 Thread via GitHub


AndrewJSchofield merged PR #19786:
URL: https://github.com/apache/kafka/pull/19786


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-19268) Missing mocks for SharePartitionManagerTest tests

2025-05-26 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-19268.
--
Fix Version/s: 4.1.0
   Resolution: Fixed

> Missing mocks for SharePartitionManagerTest tests
> -
>
> Key: KAFKA-19268
> URL: https://issues.apache.org/jira/browse/KAFKA-19268
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Abhinav Dixit
>Assignee: jiseung
>Priority: Major
> Fix For: 4.1.0
>
>
> A few tests in SharePartitionManagerTest throw silent exceptions but the 
> tests pass. This is mainly due to missing mocks in those tests. The following 
> tests need to be analyzed and fixed - 
> testAcknowledgeCompletesDelayedShareFetchRequest
> testMultipleConcurrentShareFetches
> testCachedTopicPartitionsForValidShareSessions
> testReleaseSessionCompletesDelayedShareFetchRequest
> testReleaseSessionSuccess



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19285: Added more tests in SharePartitionManagerTest [kafka]

2025-05-26 Thread via GitHub


AndrewJSchofield commented on PR #19778:
URL: https://github.com/apache/kafka/pull/19778#issuecomment-2910658074

   @chirag-wadhwa5 Please merge latest changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19292) Introduce soft state for StreamsGroup

2025-05-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-19292:

Component/s: streams

> Introduce soft state for StreamsGroup
> -
>
> Key: KAFKA-19292
> URL: https://issues.apache.org/jira/browse/KAFKA-19292
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19244: Add support for kafka-streams-groups.sh options (delete group, offset-related APIs) [1/N] [kafka]

2025-05-26 Thread via GitHub


aliehsaeedii commented on code in PR #19646:
URL: https://github.com/apache/kafka/pull/19646#discussion_r2107042451


##
tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java:
##
@@ -330,13 +400,433 @@ Map 
getOffsets(StreamsGroupDescription description)
 
 Map getCommittedOffsets(String 
groupId) {
 try {
-return adminClient.listConsumerGroupOffsets(
-Map.of(groupId, new 
ListConsumerGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
+return adminClient.listStreamsGroupOffsets(
+Map.of(groupId, new 
ListStreamsGroupOffsetsSpec())).partitionsToOffsetAndMetadata(groupId).get();
 } catch (InterruptedException | ExecutionException e) {
 throw new RuntimeException(e);
 }
 }
 
+Map> resetOffsets() {
+Map> result = new 
HashMap<>();
+List groupIds = listStreamsGroups();
+if (!groupIds.isEmpty()) {
+Map> 
streamsGroups = adminClient.describeStreamsGroups(
+groupIds
+).describedGroups();
+
+streamsGroups.forEach((groupId, groupDescription) -> {
+try {
+String state = 
groupDescription.get().groupState().toString();
+switch (state) {
+case "Empty":
+case "Dead":
+result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+break;
+default:
+printError("Assignments can only be reset if 
the group '" + groupId + "' is inactive, but the current state is " + state + 
".", Optional.empty());
+result.put(groupId, Collections.emptyMap());
+}
+} catch (InterruptedException ie) {
+throw new RuntimeException(ie);
+} catch (ExecutionException ee) {
+if (ee.getCause() instanceof GroupIdNotFoundException) 
{
+result.put(groupId, 
resetOffsetsForInactiveGroup(groupId));
+} else {
+throw new RuntimeException(ee);
+}
+}
+});
+}
+return result;
+}
+
+private Map 
resetOffsetsForInactiveGroup(String groupId) {
+try {
+Collection partitionsToReset = 
getPartitionsToReset(groupId);
+Map preparedOffsets = 
prepareOffsetsToReset(groupId, partitionsToReset);
+
+// Dry-run is the default behavior if --execute is not 
specified
+boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
+if (!dryRun) {
+adminClient.alterStreamsGroupOffsets(
+groupId,
+preparedOffsets
+).all().get();
+}
+
+return preparedOffsets;
+} catch (InterruptedException ie) {
+throw new RuntimeException(ie);
+} catch (ExecutionException ee) {
+Throwable cause = ee.getCause();
+if (cause instanceof KafkaException) {
+throw (KafkaException) cause;
+} else {
+throw new RuntimeException(cause);
+}
+}
+}
+
+private Collection getPartitionsToReset(String 
groupId) throws ExecutionException, InterruptedException {
+if (opts.options.has(opts.allTopicsOpt)) {
+return getCommittedOffsets(groupId).keySet();
+} else if (opts.options.has(opts.topicOpt)) {
+List topics = opts.options.valuesOf(opts.topicOpt);
+return parseTopicPartitionsToReset(topics);
+} else {
+if (!opts.options.has(opts.resetFromFileOpt))
+CommandLineUtils.printUsageAndExit(opts.parser, "One of 
the reset scopes should be defined: --all-topics, --topic.");
+
+return Collections.emptyList();
+}
+}
+
+private List parseTopicPartitionsToReset(List 
topicArgs) throws ExecutionException, InterruptedException {
+List topicsWithPartitions = new ArrayList<>();
+List topics = new ArrayList<>();
+
+topicArgs.forEach(topicArg -> {
+if (topicArg.contains(":"))
+topicsWithPartitions.add(topicArg);
+else
+topics.add(topicArg);
+});
+
+List specifiedPartitions =
+
topicsWithPartitions.stream().flatMap(this::parseTopicsWithPartitions).collect(Collect

Re: [PR] KAFKA-17747: [5/N] Replace subscription metadata with metadata hash in stream group (wip) [kafka]

2025-05-26 Thread via GitHub


lucasbru commented on code in PR #19802:
URL: https://github.com/apache/kafka/pull/19802#discussion_r2106982543


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##
@@ -582,54 +592,44 @@ public Set currentWarmupTaskProcessIds(
 }
 
 /**
- * @return An immutable map of partition metadata for each topic that are 
inputs for this streams group.
+ * @return The metadata hash.
  */
-public Map partitionMetadata() {
-return Collections.unmodifiableMap(partitionMetadata);
+public long metadataHash() {
+return metadataHash.get();
 }
 
 /**
- * Updates the partition metadata. This replaces the previous one.
+ * Updates the metadata hash.
  *
- * @param partitionMetadata The new partition metadata.
+ * @param metadataHash The new metadata hash.
  */
-public void setPartitionMetadata(
-Map partitionMetadata
-) {
-this.partitionMetadata.clear();
-this.partitionMetadata.putAll(partitionMetadata);
-maybeUpdateConfiguredTopology();
-maybeUpdateGroupState();

Review Comment:
   Could you instead store the group metadata hash along with / inside 
ConfiguredTopology, and rebuild configuredTopology not only when it is empty, 
but also when the group metadata hash does not match the current group metadata 
hash?
   
   Also, that would mean, when we call `setTopology` or `setPartitionMetadata` 
inside `StreamsGroup`, we will not call `maybeUpdateConfiguredTopology`, since 
it will be only be created in one place - from the `streamsGroupHeartbeat`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19331) No error handling for leader not appeared in applyLocalFollowersDelta

2025-05-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-19331:
--
Summary: No error handling for leader not appeared in 
applyLocalFollowersDelta   (was: No error handling for leader unregistered in 
applyLocalFollowersDelta )

> No error handling for leader not appeared in applyLocalFollowersDelta 
> --
>
> Key: KAFKA-19331
> URL: https://issues.apache.org/jira/browse/KAFKA-19331
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Priority: Major
>  Labels: newbie
>
> In ReplicaManager#applyLocalFollowersDelta, when we prepare for fetching from 
> the leader, we'll check if the leader node info is in metadata image. If 
> somehow it didn't include in the newImage, we'll log something like:
>  
> {code:java}
> [2025-05-26 15:25:58,124] TRACE [Broker id=4] Unable to start fetching 
> quickstart-events-0 with topic ID Some(Dn9K0BB8QWuj4PqcJD0nrA) from leader 
> Some(2) because it is not alive. (state.change.logger)
> [2025-05-26 15:25:58,124] INFO [Broker id=4] Started fetchers as part of 
> become-follower for 1 partitions (state.change.logger)
> {code}
>  
> It's confusing to users to see it's unable to fetch, then start fetch. And in 
> the end, it's not actually fetching... We should handling the error well by 
> updating the `FailedPartition` and not updating other successful result 
> status.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Change `Streams group` to `streams group` [kafka]

2025-05-26 Thread via GitHub


lucasbru commented on code in PR #19813:
URL: https://github.com/apache/kafka/pull/19813#discussion_r2107119425


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -346,18 +346,18 @@ private void optimizeTopology(final Properties props) {
 (String) 
props.get(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG));
 }
 if 
(optimizationConfigs.contains(StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS)) {
-LOG.debug("Optimizing the Kafka Streams graph for ktable source 
nodes");
+LOG.debug("Optimizing the Kafka streams graph for ktable source 
nodes");
 reuseKTableSourceTopics();
 }
 if 
(optimizationConfigs.contains(StreamsConfig.MERGE_REPARTITION_TOPICS)) {
-LOG.debug("Optimizing the Kafka Streams graph for repartition 
nodes");
+LOG.debug("Optimizing the Kafka streams graph for repartition 
nodes");
 mergeRepartitionTopics();
 }
 if 
(optimizationConfigs.contains(StreamsConfig.SINGLE_STORE_SELF_JOIN)) {
-LOG.debug("Optimizing the Kafka Streams graph for self-joins");
+LOG.debug("Optimizing the Kafka streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
 }
-LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+LOG.debug("Optimizing the Kafka streams graph for null-key records");

Review Comment:
   I think even if we want to use "streams" in lower case, we should capitalize 
"Kafka Streams" IMHO.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -16021,7 +16021,7 @@ public void 
testStreamsGroupMemberRequestingShutdownApplication() {
 .setShutdownApplication(true)
 );
 
-String statusDetail = String.format("Streams group member %s 
encountered a fatal error and requested a shutdown for the entire 
application.", memberId1);
+String statusDetail = String.format("streams group member %s 
encountered a fatal error and requested a shutdown for the entire 
application.", memberId1);

Review Comment:
   we shouldn't replace it at the beginning of the line



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2010,7 +2010,7 @@ private CoordinatorResult stream
 new Status()
 
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
 .setStatusDetail(
-String.format("Streams group member %s encountered a fatal 
error and requested a shutdown for the entire application.",
+String.format("streams group member %s encountered a fatal 
error and requested a shutdown for the entire application.",

Review Comment:
   we shouldn't replace it at the beginning of the line



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -880,7 +880,7 @@ StreamsGroup getStreamsGroupOrThrow(
 Group group = groups.get(groupId);
 
 if (group == null) {
-throw new GroupIdNotFoundException(String.format("Streams group %s 
not found.", groupId));
+throw new GroupIdNotFoundException(String.format("streams group %s 
not found.", groupId));

Review Comment:
   we shouldn't replace it at the beginning of the line



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -236,7 +236,7 @@ public class GroupCoordinatorConfig {
 public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = 
"The maximum heartbeat interval for share group members.";
 
 ///
-/// Streams group configs
+/// streams group configs

Review Comment:
   we shouldn't replace it at the beginning of the line



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java:
##
@@ -466,7 +466,7 @@ public GroupCoordinatorConfig(AbstractConfig config) {
 require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
 String.format("%s must be less than %s",
 SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, 
SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
-// Streams group configs validation.
+// streams group configs validation.

Review Comment:
   we shouldn't replace it at the beginning of the line



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##
@@ -16105,7 +16105,7 @@ public void 
testStreamsGroupMemberRequestingShutdownApplicationUponLeaving() {
 .setShutdownApplication(true)
 );
 
-String statusDetail = String.format("Streams group member %s 
encountered a fatal error and requested a shutdown for the

Re: [PR] MINOR: Cleanup JMH-Benchmarks Module [kafka]

2025-05-26 Thread via GitHub


sjhajharia commented on PR #19791:
URL: https://github.com/apache/kafka/pull/19791#issuecomment-2909416974

   Hey @m1a2st 
   I have updated the PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107245093


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##
@@ -151,6 +152,8 @@ public String toLowerCaseString() {
  */
 private final TimelineHashMap 
resolvedRegularExpressions;
 
+private final AtomicBoolean addSubscriptionMetadataTombstoneRecord = new 
AtomicBoolean(false);

Review Comment:
   Thanks for the suggestion. I forgot to consider rollback case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107242601


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2220,6 +2227,11 @@ private 
CoordinatorResult
 int groupEpoch = group.groupEpoch();
 SubscriptionType subscriptionType = group.subscriptionType();
 
+if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+group.setAddSubscriptionMetadataTombstoneRecord(false);

Review Comment:
   The `updateSubscriptionMetadata` is a good place to add this record. We 
always call `updateSubscriptionMetadata` when metadata image is expired. That 
means the group can add the record after bumping to 4.1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18904: kafka-configs.sh return resource doesn't exist message [3/N] [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19808:
URL: https://github.com/apache/kafka/pull/19808#discussion_r2107254802


##
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##
@@ -342,6 +342,42 @@ object ConfigCommand extends Logging {
   }
 
   private def describeResourceConfig(adminClient: Admin, entityType: String, 
entityName: Option[String], describeAll: Boolean): Unit = {
+if (!describeAll) {
+  entityName.foreach { name =>
+entityType match {
+  case TopicType =>
+Topic.validate(name)
+if (!adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names.get.contains(name)) {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case BrokerType | BrokerLoggerConfigType =>
+if 
(adminClient.describeCluster.nodes.get.stream.anyMatch(_.idString == name)) {
+  // valid broker id
+} else if (name == BrokerDefaultEntityName) {
+  // default broker configs
+} else {
+  System.out.println(s"The $entityType '$name' doesn't exist and 
doesn't have dynamic config.")
+  return
+}
+  case ClientMetricsType =>
+if 
(adminClient.listConfigResources(java.util.Set.of(ConfigResource.Type.CLIENT_METRICS),
 new ListConfigResourcesOptions).all.get

Review Comment:
   I prefer to leave this as `java.util.xxx` because we already used this 
pattern like:
   
   
https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/core/src/main/scala/kafka/admin/ConfigCommand.scala#L311
   
   We can do some refactor for Scala code in core module after this PR. Or we 
can refactor it when migrating to Java eventually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15283:[1/N] Client support for OffsetCommit with topic ID [kafka]

2025-05-26 Thread via GitHub


DL1231 commented on PR #19577:
URL: https://github.com/apache/kafka/pull/19577#issuecomment-2909614033

   @lianetm @dajac, PTAL when you get a chance.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19332) Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck

2025-05-26 Thread Shivsundar R (Jira)
Shivsundar R created KAFKA-19332:


 Summary: Fix flaky test : 
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck
 Key: KAFKA-19332
 URL: https://issues.apache.org/jira/browse/KAFKA-19332
 Project: Kafka
  Issue Type: Sub-task
Reporter: Shivsundar R


The test has been flaky in AK builds - 
[https://develocity.apache.org/scans/tests?search.names=CI%20workflow%2CGit%20repository&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=github%2Ctrunk&search.tasks=test&search.timeZoneId=Asia%2FCalcutta&search.values=CI%2Chttps:%2F%2Fgithub.com%2Fapache%2Fkafka&tests.container=org.apache.kafka.clients.consumer.ShareConsumerTest&tests.test=testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck()%5B2%5D]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19333) Inconsistent behavior between `ConsumerMembershipManager` and `StreamsMembershipManager` on `onAllTasksLost` execution

2025-05-26 Thread Nick Guo (Jira)
Nick Guo created KAFKA-19333:


 Summary: Inconsistent behavior between `ConsumerMembershipManager` 
and `StreamsMembershipManager` on `onAllTasksLost` execution
 Key: KAFKA-19333
 URL: https://issues.apache.org/jira/browse/KAFKA-19333
 Project: Kafka
  Issue Type: Improvement
Reporter: Nick Guo
Assignee: Nick Guo


`ConsumerMembershipManager` does not create an event to run a callback if there 
is nothing to revoke,but `StreamsMembershipManager` does.

related discussion and pr:

discussion:[https://github.com/apache/kafka/pull/18551/files#r2106243432]

pr: https://github.com/apache/kafka/pull/19779



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-19290: Exploit mapKey optimisation in protocol requests and responses (wip) [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 opened a new pull request, #19815:
URL: https://github.com/apache/kafka/pull/19815

   The mapKey optimisation can be used in some KIP-932 RPC schemas to improve 
efficiency of some key-based accesses.
   
   For ShareFetch, ShareAcknowledge, ShareGroupHeartbeat and 
ShareGroupDescribe, the v0 RPCs are already released and cannot be 
retrospectively optimized. This PR changes following RPCs:
   * DeleteShareGroupOffsetsRequest
   * DeleteShareGroupStateRequest
   * DescribeShareGroupOffsetsRequest
   * InitializeShareGroupStateRequest
   * ReadShareGroupStateRequest
   * ReadShareGroupStateSummaryRequest
   * WriteShareGroupStateRequest
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19300: AsyncConsumer#unsubscribe always timeout due to GroupAuthorizationException [kafka]

2025-05-26 Thread via GitHub


Rancho-7 commented on code in PR #19779:
URL: https://github.com/apache/kafka/pull/19779#discussion_r2107277009


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##
@@ -443,7 +443,7 @@ public void transitionToFatal() {
 log.error("Member {} with epoch {} transitioned to fatal state", 
memberId, memberEpoch);
 notifyEpochChange(Optional.empty());
 
-if (previousState == MemberState.UNSUBSCRIBED) {
+if (previousState == MemberState.UNSUBSCRIBED && 
maybeCompleteLeaveInProgress()) {

Review Comment:
   Sounds good, I have opened https://issues.apache.org/jira/browse/KAFKA-19333 
to track this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19332) Fix flaky test : testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck and testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck

2025-05-26 Thread Abhinav Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Dixit updated KAFKA-19332:
--
Summary: Fix flaky test : 
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck and 
testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck  (was: Fix 
flaky test : 
testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck)

> Fix flaky test : 
> testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck and 
> testAlterReadCommittedToReadUncommittedIsolationLevelWithRejectAck
> ---
>
> Key: KAFKA-19332
> URL: https://issues.apache.org/jira/browse/KAFKA-19332
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Shivsundar R
>Priority: Major
>
> The test has been flaky in AK builds - 
> [https://develocity.apache.org/scans/tests?search.names=CI%20workflow%2CGit%20repository&search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=github%2Ctrunk&search.tasks=test&search.timeZoneId=Asia%2FCalcutta&search.values=CI%2Chttps:%2F%2Fgithub.com%2Fapache%2Fkafka&tests.container=org.apache.kafka.clients.consumer.ShareConsumerTest&tests.test=testAlterReadCommittedToReadUncommittedIsolationLevelWithReleaseAck()%5B2%5D]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid full metadata requests on broker-side regex [kafka]

2025-05-26 Thread via GitHub


lianetm opened a new pull request, #19814:
URL: https://github.com/apache/kafka/pull/19814

   This PR uses topic IDs received in assignment (under new protocol) to ensure 
that only these assigned topics are included in the consumer metadata requests 
performed when the user subscribes to broker-side regex (RE2J)
   
   With the changes we also end up fixing another issue (KAFKA-18729) that 
aimed to avoid iterating the full set of assigned partitions when checking if a 
topic should be retained from the metadata response when using RE2J.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


FrankYang0529 commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107213749


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
  * @return The hash of the group.
  */
 static long computeGroupHash(Map topicHashes) {
-if (topicHashes.isEmpty()) {
+// Sort entries by topic name
+List> sortedEntries = new ArrayList<>();
+for (Map.Entry entry : topicHashes.entrySet()) {
+// Filter out entries with a hash value of 0, which indicates no 
topic
+if (entry.getValue() != 0) {

Review Comment:
   We use `computeSubscribedTopicNames` to get `subscribedTopicNames` and use 
the result to calculate topic hashes which group wants to subscribe to.  
However, the `computeSubscribedTopicNames` doesn't check whether a topic is 
really existent. If a group subscribes to a non-existent topic, the topic is in 
`subscribedTopicNames` and the `computeMetadataHash` uses non-existent topic 
hash as part of metadata hash.
   
   A sample case is `testSubscriptionMetadataRefreshedAgainAfterWriteFailure`.  
The group subscribes to `foo` and `bar`, but `bar` topic is not in metadata 
image.
   
   
https://github.com/apache/kafka/blob/48a52701b9cd45c4854f910990a85be7d73e22f5/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java#L496-L507



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17747: [4/N] Replace subscription metadata with metadata hash in consumer group [kafka]

2025-05-26 Thread via GitHub


dajac commented on code in PR #19761:
URL: https://github.com/apache/kafka/pull/19761#discussion_r2107327586


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult 
updateSubscriptionMetadata(
 numMembers
 );
 
-if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+if (groupMetadataHash != group.metadataHash()) {
 if (log.isDebugEnabled()) {
-log.debug("[GroupId {}] Computed new subscription metadata: 
{}.",
-groupId, subscriptionMetadata);
+log.debug("[GroupId {}] Computed new metadata hash: {}.",
+groupId, groupMetadataHash);
 }
 bumpGroupEpoch = true;
-records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
 }
 
 if (bumpGroupEpoch) {
 groupEpoch += 1;
-records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
-log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 
groupMetadataHash));
+log.info("[GroupId {}] Bumped group epoch to {} with metadata hash 
{}.", groupId, groupEpoch, groupMetadataHash);
 metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
 }
 
 group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
 
+if (group.addSubscriptionMetadataTombstoneRecord()) {

Review Comment:
   nit: I wonder whether we should call it `hasSubscriptionMetadataRecord()`. 
What do you think? I also suggest to add a comment explaining this block.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -490,6 +490,11 @@ GroupMetadataManager build() {
  */
 private MetadataImage metadataImage;
 
+/**
+ * The topic hash value by topic name.
+ */

Review Comment:
   It would be great if we could expand the comment to explain how we maintain 
this cache.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Utils.java:
##
@@ -349,12 +349,19 @@ static void throwIfRegularExpressionIsInvalid(
  * @return The hash of the group.
  */
 static long computeGroupHash(Map topicHashes) {
-if (topicHashes.isEmpty()) {
+// Sort entries by topic name
+List> sortedEntries = new ArrayList<>();
+for (Map.Entry entry : topicHashes.entrySet()) {
+// Filter out entries with a hash value of 0, which indicates no 
topic
+if (entry.getValue() != 0) {

Review Comment:
   Sure, I understand that we may have nonexistent topics here. I was more 
trying to understand whether having those zeros in the final hash was an issue. 
I suppose that you're saying that it is an issue.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3624,24 +3629,28 @@ private UpdateSubscriptionMetadataResult 
updateSubscriptionMetadata(
 numMembers
 );
 
-if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
+if (groupMetadataHash != group.metadataHash()) {
 if (log.isDebugEnabled()) {
-log.debug("[GroupId {}] Computed new subscription metadata: 
{}.",
-groupId, subscriptionMetadata);
+log.debug("[GroupId {}] Computed new metadata hash: {}.",
+groupId, groupMetadataHash);
 }
 bumpGroupEpoch = true;
-records.add(newConsumerGroupSubscriptionMetadataRecord(groupId, 
subscriptionMetadata));
 }
 
 if (bumpGroupEpoch) {
 groupEpoch += 1;
-records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
-log.info("[GroupId {}] Bumped group epoch to {}.", groupId, 
groupEpoch);
+records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 
groupMetadataHash));
+log.info("[GroupId {}] Bumped group epoch to {} with metadata hash 
{}.", groupId, groupEpoch, groupMetadataHash);
 metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
 }
 
 group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
 
+if (group.addSubscriptionMetadataTombstoneRecord()) {
+
records.add(newConsumerGroupSubscriptionMetadataTombstoneRecord(groupId));
+group.setAddSubscriptionMetadataTombstoneRecord(false);

Review Comment:
   We should remove this as it will be updated when the record is replayed.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/ModernGroup.java:
##
@@ -398,6 +423,21 @@ public Map 
computeSu

Re: [PR] KAFKA-2526: command line --producer-property wins [kafka]

2025-05-26 Thread via GitHub


jkt628 commented on PR #16492:
URL: https://github.com/apache/kafka/pull/16492#issuecomment-2909788972

   note `test for client.id` moved to 
https://github.com/apache/kafka/blob/trunk/tools/src/test/java/org/apache/kafka/tools/ConsoleProducerTest.java


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >