Re: [PR] MINOR: remove transform and through from repartition description [kafka]

2025-04-07 Thread via GitHub


mjsax commented on code in PR #19291:
URL: https://github.com/apache/kafka/pull/19291#discussion_r2032232490


##
docs/streams/developer-guide/dsl-api.html:
##
@@ -764,10 +764,10 @@ Manually trigger repartitioning 
of the stream with desired number of partitions. (details)
 
-repartition() is similar to through() however Kafka Streams will manage the topic 
for you.
+Kafka Streams will manage the topic for 
repartition().
 Generated topic is treated as internal topic, as a 
result data will be purged automatically as any other internal repartition 
topic.
 In addition, you can specify the desired number of 
partitions, which allows to easily scale in/out downstream sub-topologies.
-repartition() operation always triggers 
repartitioning of the stream, as a result it can be used with embedded 
Processor API methods (like transform() 
et al.) that do not trigger auto repartitioning when key changing operation is 
performed beforehand.

Review Comment:
   Instead of removing the reference to `transform()` we should update it to 
`process()` with replaces `transform()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19102) Enhance the docs of group.coordinator.append.linger.ms

2025-04-07 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-19102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

黃竣陽 reassigned KAFKA-19102:
---

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> Enhance the docs of group.coordinator.append.linger.ms
> --
>
> Key: KAFKA-19102
> URL: https://issues.apache.org/jira/browse/KAFKA-19102
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
>
> the docs of `group.coordinator.append.linger.ms` should remind users that 
> increasing the value also increase the latency of responding the 
> coordinator-related requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14485: Move LogCleaner to storage module [kafka]

2025-04-07 Thread via GitHub


wernerdv commented on code in PR #19387:
URL: https://github.com/apache/kafka/pull/19387#discussion_r2032487977


##
build.gradle:
##
@@ -3737,6 +3737,7 @@ project(':connect:mirror') {
 testImplementation project(':core')
 testImplementation project(':test-common:test-common-runtime')
 testImplementation project(':server')
+testImplementation project(':server-common')

Review Comment:
   Yes, you're right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2025-04-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16394:

Fix Version/s: (was: 3.9.1)

> ForeignKey LEFT join propagates null value on foreignKey change
> ---
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", null) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null) {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, null) {code}
> An additional null is propagated to the join result.
>  
> This bug doesn't exist on versions 3.6.0 and below.
>  
> I believe the issue comes from the line 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
> where we propagate the deletion in the two scenarios above
>  
> Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18962: Fix onBatchRestored call in GlobalStateManagerImpl [kafka]

2025-04-07 Thread via GitHub


mjsax commented on code in PR #19188:
URL: https://github.com/apache/kafka/pull/19188#discussion_r2032155035


##
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##
@@ -294,13 +306,17 @@ public synchronized ConsumerRecords poll(final 
Duration timeout) {
 rec.offset() + 1, rec.leaderEpoch(), 
leaderAndEpoch);
 subscriptions.position(entry.getKey(), newPosition);
 nextOffsetAndMetadata.put(entry.getKey(), new 
OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
+numPollRecords++;
+recIterator.remove();
 }
 }
-toClear.add(entry.getKey());
+
+if (entry.getValue().isEmpty()) {

Review Comment:
   ```suggestion
   if (recIterator.isEmpty()) {
   ```



##
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java:
##
@@ -275,14 +279,22 @@ public synchronized ConsumerRecords poll(final 
Duration timeout) {
 // update the consumed offset
 final Map>> results = new 
HashMap<>();
 final Map nextOffsetAndMetadata = 
new HashMap<>();
-final List toClear = new ArrayList<>();
+long numPollRecords = 0L;
+
+final Iterator>>> 
partitionsIter = this.records.entrySet().iterator();
+while (partitionsIter.hasNext() && numPollRecords < 
this.maxPollRecords) {
+Map.Entry>> entry = 
partitionsIter.next();
 
-for (Map.Entry>> entry : 
this.records.entrySet()) {
 if (!subscriptions.isPaused(entry.getKey())) {
-final List> recs = entry.getValue();
-for (final ConsumerRecord rec : recs) {
+final ListIterator> recIterator = 
entry.getValue().listIterator();

Review Comment:
   Any particular reason why we use `ListIterator`, and not just `Iterator`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18962) StateRestoreListener onBatchRestored method is called with the totalRestored on GlobalStateStore reprocess

2025-04-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18962?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-18962:

Affects Version/s: 3.8.0
   (was: 3.9.0)

> StateRestoreListener onBatchRestored method is called with the totalRestored 
> on GlobalStateStore reprocess
> --
>
> Key: KAFKA-18962
> URL: https://issues.apache.org/jira/browse/KAFKA-18962
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: Florian Hussonnois
>Assignee: Florian Hussonnois
>Priority: Minor
> Fix For: 4.1.0, 3.9.1, 4.0.1
>
>
> The  StateRestoreListener#onBatchRestored method is called with
> the total number of records restored instead of  the total number of records 
> restored in this batch (i.e, numRestored) for GlobalStateStore



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Minor tidying in GroupMetadataManager [kafka]

2025-04-07 Thread via GitHub


AndrewJSchofield opened a new pull request, #19411:
URL: https://github.com/apache/kafka/pull/19411

   Some trivial tidying up in GroupMetadataManager.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19100: Use ProcessRole.toString instead of String in AclApis [kafka]

2025-04-07 Thread via GitHub


gongxuanzhang commented on code in PR #19406:
URL: https://github.com/apache/kafka/pull/19406#discussion_r2032322050


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -84,7 +85,7 @@ class ControllerApis(
   val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId, 
logger.underlying)
-  private val aclApis = new AclApis(authHelper, authorizer, requestHelper, 
"controller", config)
+  private val aclApis = new AclApis(authHelper, authorizer, requestHelper, 
ProcessRole.ControllerRole.toString, config)

Review Comment:
   nice comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Modify KafkaEventQueue VoidEvent to as singleton and use more proper function interface [kafka]

2025-04-07 Thread via GitHub


gongxuanzhang commented on PR #19356:
URL: https://github.com/apache/kafka/pull/19356#issuecomment-2785145813

   @chia7712  WDYT 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7061: KIP-280 Enhanced log compaction [kafka]

2025-04-07 Thread via GitHub


senthilm-ms commented on PR #8103:
URL: https://github.com/apache/kafka/pull/8103#issuecomment-2785163121

   Got it and I see the interests from many in the community...
   
   I will spend some time by end of April & early May and send the update PR.
   
   Thanks,
   Senthil
   
   From: Matthias J. Sax ***@***.***>
   Sent: Tuesday, April 8, 2025 7:02 AM
   To: apache/kafka ***@***.***>
   Cc: Senthilnathan Muthusamy ***@***.***>; Mention ***@***.***>
   Subject: Re: [apache/kafka] KAFKA-7061: KIP-280 Enhanced log compaction 
(#8103)
   
   
   Just a question of review capacity I assume -- I think it would still be 
valuable to complete this, but somebody must have time to do the work, plus 
some committer agree to help reviewing...
   
   -
   Reply to this email directly, view it on 
GitHub, or 
unsubscribe.
   You are receiving this because you were mentioned.Message ID: 
***@***.**@***.***>>
   [mjsax]mjsax left a comment 
(apache/kafka#8103)
   
   Just a question of review capacity I assume -- I think it would still be 
valuable to complete this, but somebody must have time to do the work, plus 
some committer agree to help reviewing...
   
   -
   Reply to this email directly, view it on 
GitHub, or 
unsubscribe.
   You are receiving this because you were mentioned.Message ID: 
***@***.**@***.***>>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16996) The leastLoadedNode() function in kafka-client may choose a faulty node during the consumer thread starting and meanwhile one of the KAFKA server node is dead.

2025-04-07 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16996:
---
Component/s: consumer

> The leastLoadedNode() function in kafka-client may choose a faulty node 
> during the consumer thread starting and meanwhile one of the KAFKA server 
> node is dead.
> ---
>
> Key: KAFKA-16996
> URL: https://issues.apache.org/jira/browse/KAFKA-16996
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.0.1, 2.3.0, 3.6.0
>Reporter: Goufu
>Priority: Blocker
>
> The leastLoadedNode() function has a bug during the consumer process starting 
> period. The function sendMetadataRequest() called by 
> getTopicMetadataRequest() uses a random node which maybe faulty since every 
> node‘s state recorded in the client thread is not ready yet. It happened in 
> my production environment during my consumer thread restarting and meanwhile 
> one of the KAFKA server node is dead. Then the client startup failed. 
> I'm using the kafka-client-2.0.1.jar. I have checked the source code of 
> higher versions and the issue still exists.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19101 Remove ControllerMutationQuotaManager#throttleTimeMs unused parameter [kafka]

2025-04-07 Thread via GitHub


FrankYang0529 commented on code in PR #19410:
URL: https://github.com/apache/kafka/pull/19410#discussion_r2032444160


##
core/src/main/scala/kafka/server/ControllerMutationQuotaManager.scala:
##
@@ -63,7 +63,7 @@ abstract class AbstractControllerMutationQuota(private val 
time: Time) extends C
   protected var lastRecordedTimeMs = 0L

Review Comment:
   It looks like there is no other class uses this variable directly. Can we 
update it as private? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19100: Use ProcessRole.toString instead of String in AclApis [kafka]

2025-04-07 Thread via GitHub


mimaison commented on code in PR #19406:
URL: https://github.com/apache/kafka/pull/19406#discussion_r2031923532


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -84,7 +85,7 @@ class ControllerApis(
   val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
   val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
   val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId, 
logger.underlying)
-  private val aclApis = new AclApis(authHelper, authorizer, requestHelper, 
"controller", config)
+  private val aclApis = new AclApis(authHelper, authorizer, requestHelper, 
ProcessRole.ControllerRole.toString, config)

Review Comment:
   Could we change the `AclApis` constructor to accept a `ProcessRole` instance 
instead of a `String`?
   It seems this field is only used in a call to `String.format()` that would 
automatically call `toString()` for us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19104) Support metadata version downgrade in Kafka

2025-04-07 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-19104:


 Summary: Support metadata version downgrade in Kafka
 Key: KAFKA-19104
 URL: https://issues.apache.org/jira/browse/KAFKA-19104
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe
Assignee: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19047: Broker registrations are slow if previously fenced or shutdown [kafka]

2025-04-07 Thread via GitHub


jsancio commented on code in PR #19296:
URL: https://github.com/apache/kafka/pull/19296#discussion_r2031970291


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -353,7 +353,7 @@ public ControllerResult 
registerBroker(
 if (existing != null) {
 prevIncarnationId = existing.incarnationId();
 storedBrokerEpoch = existing.epoch();
-if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
+if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) 
&& !existing.fenced() && !existing.inControlledShutdown()) {
 if (!request.incarnationId().equals(prevIncarnationId)) {
 throw new DuplicateBrokerRegistrationException("Another 
broker is " +
 "registered with that broker id.");

Review Comment:
   @ahuang98 @cmccabe Note that the invalid session message and delay 
reregistration happens even after controlled shutdown. It is okay to have 
delays in reregistration during uncontrolled shutdown but not after controlled 
shutdown. Otherwise, this is a regression from the ZK implementation and 
behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-04-07 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031767657


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception 
{
 verifyShareGroupStateTopicRecordsProduced();
 }
 
+@ClusterTest
+public void testReadCommittedIsolationLevel() {
+alterShareAutoOffsetReset("group1", "earliest");
+alterShareIsolationLevel("group1", "read_committed");
+try (Producer transactionalProducer = 
createProducer("T1");
+ ShareConsumer shareConsumer = 
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+shareConsumer.subscribe(Set.of(tp.topic()));
+ConsumerRecords records = 
waitedPoll(shareConsumer, 2500L, 8);
+// 5th and 10th message transaction was aborted, hence they won't 
be included in the fetched records.
+assertEquals(8, records.count());
+int messageCounter = 1;
+for (ConsumerRecord record : records) {
+assertEquals(tp.topic(), record.topic());
+assertEquals(tp.partition(), record.partition());
+if (messageCounter % 5 == 0)
+messageCounter++;
+assertEquals("Message " + messageCounter, new 
String(record.value()));
+messageCounter++;
+}
+}
+verifyShareGroupStateTopicRecordsProduced();
+}
+
+@ClusterTest
+public void testReadUncommittedIsolationLevel() {
+alterShareAutoOffsetReset("group1", "earliest");
+alterShareIsolationLevel("group1", "read_uncommitted");
+try (Producer transactionalProducer = 
createProducer("T1");
+ ShareConsumer shareConsumer = 
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+shareConsumer.subscribe(Set.of(tp.topic()));
+ConsumerRecords records = 
waitedPoll(shareConsumer, 2500L, 10);
+// Even though 5th and 10th message transaction was aborted, they 
will be included in the fetched records since IsolationLevel is 
READ_UNCOMMITTED.
+assertEquals(10, records.count());
+int messageCounter = 1;
+for (ConsumerRecord record : records) {
+assertEquals(tp.topic(), record.topic());
+assertEquals(tp.partition(), record.partition());
+assertEquals("Message " + messageCounter, new 
String(record.value()));
+messageCounter++;
+}
+}
+verifyShareGroupStateTopicRecordsProduced();
+}
+
+@ClusterTest
+public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+alterShareAutoOffsetReset("group1", "earliest");
+alterShareIsolationLevel("group1", "read_uncommitted");
+try (Producer transactionalProducer = 
createProducer("T1");
+ ShareConsumer shareConsumer = 
createShareConsumer("group1")) {
+shareConsumer.subscribe(Set.of(tp.topic()));
+transactionalProducer.initTransactions();
+try {
+// First transaction is committed.
+produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+ConsumerRecords records = 
waitedPoll(shareConsumer, 2500L, 1);

Review Comment:
   I don't think that this will force the test to wait for 2.5 sec. This is the 
maximum time to block. The method returns immediately if there are records 
available. Otherwise, it will await the passed timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: use enum map for error counts map [kafka]

2025-04-07 Thread via GitHub


mjsax commented on PR #19314:
URL: https://github.com/apache/kafka/pull/19314#issuecomment-2784956048

   This one is still small enough. -- No reason to split it. -- There is no 
fixed rule. It always depends always on the ticket, but I usually try to keep 
it below 500 LOC -- if larger, there should be a good reason for not splitting 
it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-04-07 Thread via GitHub


junrao commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031937222


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java:
##
@@ -2090,6 +2091,307 @@ public void testComplexShareConsumer() throws Exception 
{
 verifyShareGroupStateTopicRecordsProduced();
 }
 
+@ClusterTest
+public void testReadCommittedIsolationLevel() {
+alterShareAutoOffsetReset("group1", "earliest");
+alterShareIsolationLevel("group1", "read_committed");
+try (Producer transactionalProducer = 
createProducer("T1");
+ ShareConsumer shareConsumer = 
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+shareConsumer.subscribe(Set.of(tp.topic()));
+ConsumerRecords records = 
waitedPoll(shareConsumer, 2500L, 8);
+// 5th and 10th message transaction was aborted, hence they won't 
be included in the fetched records.
+assertEquals(8, records.count());
+int messageCounter = 1;
+for (ConsumerRecord record : records) {
+assertEquals(tp.topic(), record.topic());
+assertEquals(tp.partition(), record.partition());
+if (messageCounter % 5 == 0)
+messageCounter++;
+assertEquals("Message " + messageCounter, new 
String(record.value()));
+messageCounter++;
+}
+}
+verifyShareGroupStateTopicRecordsProduced();
+}
+
+@ClusterTest
+public void testReadUncommittedIsolationLevel() {
+alterShareAutoOffsetReset("group1", "earliest");
+alterShareIsolationLevel("group1", "read_uncommitted");
+try (Producer transactionalProducer = 
createProducer("T1");
+ ShareConsumer shareConsumer = 
createShareConsumer("group1")) {
+
produceCommittedAndAbortedTransactionsInInterval(transactionalProducer, 10, 5);
+shareConsumer.subscribe(Set.of(tp.topic()));
+ConsumerRecords records = 
waitedPoll(shareConsumer, 2500L, 10);
+// Even though 5th and 10th message transaction was aborted, they 
will be included in the fetched records since IsolationLevel is 
READ_UNCOMMITTED.
+assertEquals(10, records.count());
+int messageCounter = 1;
+for (ConsumerRecord record : records) {
+assertEquals(tp.topic(), record.topic());
+assertEquals(tp.partition(), record.partition());
+assertEquals("Message " + messageCounter, new 
String(record.value()));
+messageCounter++;
+}
+}
+verifyShareGroupStateTopicRecordsProduced();
+}
+
+@ClusterTest
+public void testAlterReadUncommittedToReadCommittedIsolationLevel() {
+alterShareAutoOffsetReset("group1", "earliest");
+alterShareIsolationLevel("group1", "read_uncommitted");
+try (Producer transactionalProducer = 
createProducer("T1");
+ ShareConsumer shareConsumer = 
createShareConsumer("group1")) {
+shareConsumer.subscribe(Set.of(tp.topic()));
+transactionalProducer.initTransactions();
+try {
+// First transaction is committed.
+produceCommittedTransaction(transactionalProducer, "Message 
1");
+
+ConsumerRecords records = 
waitedPoll(shareConsumer, 2500L, 1);
+assertEquals(1, records.count());
+ConsumerRecord record = 
records.iterator().next();
+assertEquals("Message 1", new String(record.value()));
+assertEquals(tp.topic(), record.topic());
+assertEquals(tp.partition(), record.partition());
+records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+shareConsumer.commitSync();
+
+// Second transaction is aborted.
+produceAbortedTransaction(transactionalProducer, "Message 2");
+
+records = waitedPoll(shareConsumer, 2500L, 1);
+assertEquals(1, records.count());
+record = records.iterator().next();
+assertEquals("Message 2", new String(record.value()));
+records.forEach(consumedRecord -> 
shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
+shareConsumer.commitSync();
+
+// Third transaction is committed.
+produceCommittedTransaction(transactionalProducer, "Message 
3");
+// Fourth transaction is aborted.
+produceAbortedTransaction(transactionalProducer, "Message 4");
+
+records = waitedPoll(shareConsumer, 2500L, 2);
+// Message 3 and Message 4 would be returned by this poll.
+   

Re: [PR] KAFKA-7061: KIP-280 Enhanced log compaction [kafka]

2025-04-07 Thread via GitHub


mjsax commented on PR #8103:
URL: https://github.com/apache/kafka/pull/8103#issuecomment-2785005543

   Just a question of review capacity I assume -- I think it would still be 
valuable to complete this, but somebody must have time to do the work, plus 
some committer agree to help reviewing...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18874: Fix KRaft controller registration retry on timeout [kafka]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #19321:
URL: https://github.com/apache/kafka/pull/19321#issuecomment-2785121287

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Change Controller Mutation Quota Implementation [kafka]

2025-04-07 Thread via GitHub


github-actions[bot] commented on PR #19318:
URL: https://github.com/apache/kafka/pull/19318#issuecomment-2785121312

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add documentation about KIP-405 remote reads serving just one partition per FetchRequest [kafka]

2025-04-07 Thread via GitHub


showuon commented on code in PR #19336:
URL: https://github.com/apache/kafka/pull/19336#discussion_r2030785098


##
docs/ops.html:
##
@@ -4026,13 +4026,14 @@ Broker 
Configurations
 
-By default, Kafka server will not enable tiered storage feature. 
remote.log.storage.system.enable
+By default, the Kafka server will not enable the tiered storage feature. 
remote.log.storage.system.enable
   is the property to control whether to enable tiered storage functionality in 
a broker or not. Setting it to "true" enables this feature.
 
 
 RemoteStorageManager is an interface to provide the lifecycle 
of remote log segments and indexes. Kafka server
-  doesn't provide out-of-the-box implementation of RemoteStorageManager. 
Configuring remote.log.storage.manager.class.name
-  and remote.log.storage.manager.class.path to specify the 
implementation of RemoteStorageManager.
+  doesn't provide out-of-the-box implementation of RemoteStorageManager. Users 
must configure remote.log.storage.manager.class.name
+  and remote.log.storage.manager.class.path to specify the 
implementation of RemoteStorageManager. One Apache 2.0 licensed implementation 
supporting all
+  three major hyperscalers' object stores (AWS, GCP, Azure) is https://github.com/Aiven-Open/tiered-storage-for-apache-kafka";>available 
here.

Review Comment:
   @stanislavkozlovski , I think we should remove anything mentioning the 3rd 
party project in the doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18888: Add KIP-877 support to Authorizer [kafka]

2025-04-07 Thread via GitHub


mimaison commented on code in PR #19050:
URL: https://github.com/apache/kafka/pull/19050#discussion_r2030836809


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -250,7 +251,7 @@ public GroupCoordinatorShard build() {
 throw new IllegalArgumentException("TopicPartition must be 
set.");
 if (groupConfigManager == null)
 throw new IllegalArgumentException("GroupConfigManager must be 
set.");
-if (authorizer == null)
+if (authorizerPlugin == null)

Review Comment:
   It's fine to pass `Optional.empty()` down the call stack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19001: Use streams group-level configurations in heartbeat [kafka]

2025-04-07 Thread via GitHub


lucasbru merged PR #19219:
URL: https://github.com/apache/kafka/pull/19219


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18888: Add KIP-877 support to Authorizer [kafka]

2025-04-07 Thread via GitHub


mimaison commented on code in PR #19050:
URL: https://github.com/apache/kafka/pull/19050#discussion_r2030840916


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java:
##
@@ -505,8 +506,8 @@ public Builder 
withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssi
 return this;
 }
 
-public Builder withAuthorizer(Authorizer authorizer) {
-this.authorizer = Optional.of(authorizer);
+public Builder withAuthorizerPlugin(Plugin 
authorizerPlugin) {
+this.authorizerPlugin = Optional.of(authorizerPlugin);

Review Comment:
   As far as I can tell the other fields are also always set, so I'll leave the 
`null` check for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18845: Remove flaky tag on QuorumControllerTest#testUncleanShutdownBrokerElrEnabled [kafka]

2025-04-07 Thread via GitHub


FrankYang0529 commented on PR #19403:
URL: https://github.com/apache/kafka/pull/19403#issuecomment-2783002149

   ![Screenshot 2025-04-07 at 7 28 55 
PM](https://github.com/user-attachments/assets/e2c17ecf-f6da-4f5d-bc3c-b001cf0c2550)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18888: Add KIP-877 support to Authorizer [kafka]

2025-04-07 Thread via GitHub


mimaison commented on code in PR #19050:
URL: https://github.com/apache/kafka/pull/19050#discussion_r2030844573


##
server/src/test/java/org/apache/kafka/server/MonitorablePluginsIntegrationTest.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
+import org.apache.kafka.common.replica.RackAwareReplicaSelector;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static 
org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.server.config.ReplicationConfigs.REPLICA_SELECTOR_CLASS_CONFIG;
+import static 
org.apache.kafka.server.config.ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MonitorablePluginsIntegrationTest {
+
+private static int controllerId(Type type) {
+return type == Type.KRAFT ? 3000 : 0;
+}
+
+private static Map expectedTags(String config, String 
clazz) {
+return expectedTags(config, clazz, Map.of());
+}
+
+private static Map expectedTags(String config, String 
clazz, Map extraTags) {
+Map tags = new LinkedHashMap<>();
+tags.put("config", config);
+tags.put("class", clazz);
+tags.putAll(extraTags);
+return tags;
+}
+
+@ClusterTest(
+types = {Type.KRAFT, Type.CO_KRAFT},
+serverProperties = {
+@ClusterConfigProperty(key = 
StandardAuthorizer.SUPER_USERS_CONFIG, value = "User:ANONYMOUS"),
+@ClusterConfigProperty(key = AUTHORIZER_CLASS_NAME_CONFIG, value = 
"org.apache.kafka.metadata.authorizer.StandardAuthorizer"),
+@ClusterConfigProperty(key = REPLICA_SELECTOR_CLASS_CONFIG, value 
= 
"org.apache.kafka.server.MonitorablePluginsIntegrationTest$MonitorableReplicaSelector")
+}
+)
+public void testMonitorableServerPlugins(ClusterInstance clusterInstance) 
throws Exception {
+try (Admin admin = Admin.create(Map.of(BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers( {
+admin.describeCluster().clusterId().get();
+}

Review Comment:
   Right we don't need it now, I had it because I was also testing other server 
plugins. I'll delete it for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: move some class to server from server_common [kafka]

2025-04-07 Thread via GitHub


gongxuanzhang opened a new pull request, #19408:
URL: https://github.com/apache/kafka/pull/19408

   about 
   https://github.com/apache/kafka/pull/19226#issuecomment-2783370917


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18888: Add KIP-877 support to Authorizer [kafka]

2025-04-07 Thread via GitHub


m1a2st commented on code in PR #19050:
URL: https://github.com/apache/kafka/pull/19050#discussion_r2031317020


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3284,7 +3285,7 @@ private boolean maybeUpdateRegularExpressions(
  * @param log   The log instance.
  * @param time  The time instance.
  * @param image The metadata image to use for listing the topics.
- * @param authorizerThe authorizer.
+ * @param authorizerPluginThe authorizer.

Review Comment:
   How about update to `The authorizer is wrapped in a {@link 
org.apache.kafka.common.internals.Plugin}`?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3352,15 +3353,15 @@ public static Map 
refreshRegularExpressions(
  * that the member is authorized to describe.
  *
  * @param context   The request context.
- * @param authorizerThe authorizer.
+ * @param authorizerPlugin  The authorizer.

Review Comment:
   ditto: How about update to `The authorizer is wrapped in a {@link 
org.apache.kafka.common.internals.Plugin}`?



##
server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java:
##
@@ -79,17 +80,17 @@ public Builder addReadinessFutures(
 /**
  * Build the EndpointReadyFutures object.
  *
- * @param authorizerThe authorizer to use, if any. Will be started.
- * @param info  Server information to be passed to the 
authorizer.
+ * @param authorizerPluginThe authorizer to use, if any. Will be 
started.

Review Comment:
   How about update to `The authorizer which is wrapped in a {@link 
org.apache.kafka.common.internals.Plugin} to use`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18888: Add KIP-877 support to Authorizer [kafka]

2025-04-07 Thread via GitHub


mimaison commented on code in PR #19050:
URL: https://github.com/apache/kafka/pull/19050#discussion_r2031012460


##
metadata/src/main/java/org/apache/kafka/metadata/publisher/AclPublisher.java:
##
@@ -103,6 +105,9 @@ public void onMetadataUpdate(MetadataDelta delta, 
MetadataImage newImage, Loader
 
 @Override
 public void close() {
-authorizer.ifPresent(clusterMetadataAuthorizer -> 
clusterMetadataAuthorizer.completeInitialLoad(new TimeoutException()));
+authorizer.ifPresent(authorizer -> {
+ClusterMetadataAuthorizer clusterMetadataAuthorizer = 
(ClusterMetadataAuthorizer) authorizer.get();
+clusterMetadataAuthorizer.completeInitialLoad(new 
TimeoutException());
+});
 }
 }

Review Comment:
   I was already missing. Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19027: Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults [kafka]

2025-04-07 Thread via GitHub


chia7712 commented on code in PR #19347:
URL: https://github.com/apache/kafka/pull/19347#discussion_r2031094554


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -38,12 +34,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import static org.apache.kafka.common.test.api.Type.CO_KRAFT;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 
 /**
  * The old test framework {@link 
kafka.api.BaseConsumerTest#getTestGroupProtocolParametersAll} test for the 
following cases:

Review Comment:
   agree. the config template is removed, so we don't need those docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Only send endpoints to partitions on changes [kafka]

2025-04-07 Thread via GitHub


bbejeck opened a new pull request, #19407:
URL: https://github.com/apache/kafka/pull/19407

   This PR updates the StreamGroup heart beat response to only include the 
`endpointToPartitions` information after there has been a task assignment 
change and all members have successfully migrated to the new group epoch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17777) Flaky KafkaConsumerTest testReturnRecordsDuringRebalance

2025-04-07 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-1:
-

Assignee: (was: Kirk True)

> Flaky KafkaConsumerTest testReturnRecordsDuringRebalance
> 
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: flaky-test
> Fix For: 4.1.0
>
>
> Top flaky consumer test at the moment (after several recent fixes addressing 
> consumer tests flakiness): 
> [https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.sortField=FLAKY]
>  
> It's been flaky in trunk for a while:
> [https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=America%2FToronto&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.sortField=FLAKY&tests.test=testReturnRecordsDuringRebalance(GroupProtocol)%5B1%5D]
>  
>  
> Fails with : org.opentest4j.AssertionFailedError: Condition not met within 
> timeout 15000. Does not complete rebalance in time ==> expected:  but 
> was: 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18913: Removing _state.updater.enabled_ flag through the Stream… [kafka]

2025-04-07 Thread via GitHub


cadonna commented on code in PR #19275:
URL: https://github.com/apache/kafka/pull/19275#discussion_r2031317747


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java:
##
@@ -99,8 +99,8 @@ SmokeTestDriver.VerificationResult result() {
 // We set 2 timeout condition to fail the test before passing the 
verification:
 // (1) 10 min timeout, (2) 30 tries of polling without getting any data
 @ParameterizedTest
-@CsvSource({"false, false", "true, false"})
-public void shouldWorkWithRebalance(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) throws InterruptedException {
+@CsvSource({"true, false"})

Review Comment:
   This does not look right!
   It should be
   ```suggestion
   @CsvSource({"true", "false"})
   ```
   You can see it when you run the test. It should have two runs -- one for 
processing threads enabled and one for processing threads disabled, but it only 
has one with processing threads enabled.
   With my suggestion, you get the two runs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19084: Port KAFKA-16224, KAFKA-16764 for ShareConsumers [kafka]

2025-04-07 Thread via GitHub


AndrewJSchofield merged PR #19369:
URL: https://github.com/apache/kafka/pull/19369


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18066) Misleading/mismatched StreamThread id in logging

2025-04-07 Thread Peter Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941532#comment-17941532
 ] 

Peter Lee commented on KAFKA-18066:
---

Sure

> Misleading/mismatched StreamThread id in logging
> 
>
> Key: KAFKA-18066
> URL: https://issues.apache.org/jira/browse/KAFKA-18066
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Peter Lee
>Priority: Minor
>  Labels: newbie, newbie++
>
> While debugging a test application I was confused to see a number of log 
> lines where the StreamThread name appeared twice but had a different thread 
> id/index in the same message. For example:
> {code:java}
> [INFO ] 2024-11-19 04:59:14.541 
> [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-1] StreamThread - 
> stream-thread [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-3] 
> Creating thread producer client{code}
> Generally you would expect that the actual Logger prefix (the first thread 
> name, in this case StreamThread-1) is the same as the LogContext prefix (the 
> second thread name, ie the StreamThread-3 in this example). I dug into it and 
> figured out that this happens for all of the messages logged during the 
> StreamThread#create method, ie before the new thread is actually created. 
> What happened was StreamThread-1 had actually died, and started up a new 
> thread (StreamThread-3) to replace itself before shutting down. So we were 
> logging things _about_ StreamThread-3, but _from_ StreamThread-1.
> While this doesn't necessarily harm anyone, it's quite confusing to see and 
> requires extensive knowledge of Streams to understand (a) that it's not a 
> bug, and (b) which thread the messages are actually referring to. It also 
> makes things harder to parse and read – for example I often filter logs on 
> the Logger prefix to gather everything related to a particular thread and eg 
> the clients it owns. The name of the currently executing thread is more 
> reliable and gathers everything whereas not every logger is configured with 
> the LogContext prefix (eg `stream-thread 
> [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-3]`). 
> We should move things out of the static StreamThread#create method and into 
> the thread constructor to make the logging consistent and reliable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-18066) Misleading/mismatched StreamThread id in logging

2025-04-07 Thread Uladzislau Blok (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941531#comment-17941531
 ] 

Uladzislau Blok commented on KAFKA-18066:
-

Hey [~peterxcli],
Are you still looking into that? If not I could pick this up

> Misleading/mismatched StreamThread id in logging
> 
>
> Key: KAFKA-18066
> URL: https://issues.apache.org/jira/browse/KAFKA-18066
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Peter Lee
>Priority: Minor
>  Labels: newbie, newbie++
>
> While debugging a test application I was confused to see a number of log 
> lines where the StreamThread name appeared twice but had a different thread 
> id/index in the same message. For example:
> {code:java}
> [INFO ] 2024-11-19 04:59:14.541 
> [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-1] StreamThread - 
> stream-thread [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-3] 
> Creating thread producer client{code}
> Generally you would expect that the actual Logger prefix (the first thread 
> name, in this case StreamThread-1) is the same as the LogContext prefix (the 
> second thread name, ie the StreamThread-3 in this example). I dug into it and 
> figured out that this happens for all of the messages logged during the 
> StreamThread#create method, ie before the new thread is actually created. 
> What happened was StreamThread-1 had actually died, and started up a new 
> thread (StreamThread-3) to replace itself before shutting down. So we were 
> logging things _about_ StreamThread-3, but _from_ StreamThread-1.
> While this doesn't necessarily harm anyone, it's quite confusing to see and 
> requires extensive knowledge of Streams to understand (a) that it's not a 
> bug, and (b) which thread the messages are actually referring to. It also 
> makes things harder to parse and read – for example I often filter logs on 
> the Logger prefix to gather everything related to a particular thread and eg 
> the clients it owns. The name of the currently executing thread is more 
> reliable and gathers everything whereas not every logger is configured with 
> the LogContext prefix (eg `stream-thread 
> [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-3]`). 
> We should move things out of the static StreamThread#create method and into 
> the thread constructor to make the logging consistent and reliable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-18066) Misleading/mismatched StreamThread id in logging

2025-04-07 Thread Uladzislau Blok (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Uladzislau Blok reassigned KAFKA-18066:
---

Assignee: Uladzislau Blok  (was: Peter Lee)

> Misleading/mismatched StreamThread id in logging
> 
>
> Key: KAFKA-18066
> URL: https://issues.apache.org/jira/browse/KAFKA-18066
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Uladzislau Blok
>Priority: Minor
>  Labels: newbie, newbie++
>
> While debugging a test application I was confused to see a number of log 
> lines where the StreamThread name appeared twice but had a different thread 
> id/index in the same message. For example:
> {code:java}
> [INFO ] 2024-11-19 04:59:14.541 
> [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-1] StreamThread - 
> stream-thread [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-3] 
> Creating thread producer client{code}
> Generally you would expect that the actual Logger prefix (the first thread 
> name, in this case StreamThread-1) is the same as the LogContext prefix (the 
> second thread name, ie the StreamThread-3 in this example). I dug into it and 
> figured out that this happens for all of the messages logged during the 
> StreamThread#create method, ie before the new thread is actually created. 
> What happened was StreamThread-1 had actually died, and started up a new 
> thread (StreamThread-3) to replace itself before shutting down. So we were 
> logging things _about_ StreamThread-3, but _from_ StreamThread-1.
> While this doesn't necessarily harm anyone, it's quite confusing to see and 
> requires extensive knowledge of Streams to understand (a) that it's not a 
> bug, and (b) which thread the messages are actually referring to. It also 
> makes things harder to parse and read – for example I often filter logs on 
> the Logger prefix to gather everything related to a particular thread and eg 
> the clients it owns. The name of the currently executing thread is more 
> reliable and gathers everything whereas not every logger is configured with 
> the LogContext prefix (eg `stream-thread 
> [e2e-963c5b74-0353-4253-bdf2-b71881d9d9f2-StreamThread-3]`). 
> We should move things out of the static StreamThread#create method and into 
> the thread constructor to make the logging consistent and reliable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-18845: Remove flaky tag on QuorumControllerTest#testUncleanShutdownBrokerElrEnabled [kafka]

2025-04-07 Thread via GitHub


FrankYang0529 opened a new pull request, #19403:
URL: https://github.com/apache/kafka/pull/19403

   It has been around two weeks since fixing 
QuorumControllerTest#testUncleanShutdownBrokerElrEnabled PR 
https://github.com/apache/kafka/pull/19240 was merged. There is no flaky result 
after 2025/03/21, so it has enough evidence to prove the flaky is fixed. It's 
good to remove flaky tag.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-6629: parameterise SegmentedCacheFunctionTest for session key schemas [kafka]

2025-04-07 Thread via GitHub


lorcanj opened a new pull request, #19404:
URL: https://github.com/apache/kafka/pull/19404

   Addresses: [KAFKA-6629](https://issues.apache.org/jira/browse/KAFKA-6629)
   
   Adds configuration for the SessionKeySchema and parameterises the existing 
tests
   so that both WindowKeys and SessionKeys are tested under the existing unit 
tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18981: Fix flaky QuorumControllerTest.testMinIsrUpdateWithElr [kafka]

2025-04-07 Thread via GitHub


FrankYang0529 commented on PR #19262:
URL: https://github.com/apache/kafka/pull/19262#issuecomment-2783020471

   Hi @mumrah, do we have chance to merge this PR? In last 28 days, the 
QuorumControllerTest#testMinIsrUpdateWithElr has 5% to get flaky result.
   
   ![Screenshot 2025-04-07 at 7 33 41 
PM](https://github.com/user-attachments/assets/00bce14d-695d-4fad-9ef1-764f10bb727c)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19100) Use ProcessRole.toString instead of String in AclApis

2025-04-07 Thread Jira
黃竣陽 created KAFKA-19100:
---

 Summary: Use ProcessRole.toString instead of String in AclApis
 Key: KAFKA-19100
 URL: https://issues.apache.org/jira/browse/KAFKA-19100
 Project: Kafka
  Issue Type: Task
Reporter: 黃竣陽


[https://github.com/apache/kafka/blob/391b258d50ab3d2edc636bf6f56f51c6e37a98a5/core/src/main/scala/kafka/server/ControllerApis.scala#L87]

[https://github.com/apache/kafka/blob/391b258d50ab3d2edc636bf6f56f51c6e37a98a5/core/src/main/scala/kafka/server/KafkaApis.scala#L116]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-19100) Use ProcessRole.toString instead of String in AclApis

2025-04-07 Thread xuanzhang gong (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xuanzhang gong reassigned KAFKA-19100:
--

Assignee: xuanzhang gong

> Use ProcessRole.toString instead of String in AclApis
> -
>
> Key: KAFKA-19100
> URL: https://issues.apache.org/jira/browse/KAFKA-19100
> Project: Kafka
>  Issue Type: Task
>Reporter: 黃竣陽
>Assignee: xuanzhang gong
>Priority: Minor
>
> [https://github.com/apache/kafka/blob/391b258d50ab3d2edc636bf6f56f51c6e37a98a5/core/src/main/scala/kafka/server/ControllerApis.scala#L87]
> [https://github.com/apache/kafka/blob/391b258d50ab3d2edc636bf6f56f51c6e37a98a5/core/src/main/scala/kafka/server/KafkaApis.scala#L116]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-04-07 Thread via GitHub


mimaison commented on PR #19226:
URL: https://github.com/apache/kafka/pull/19226#issuecomment-2783370917

   Yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-04-07 Thread via GitHub


chia7712 commented on PR #19226:
URL: https://github.com/apache/kafka/pull/19226#issuecomment-2783375231

   @gongxuanzhang WDYT? If you agree @mimaison's comment, could you please 
create a MINOR to address it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19100: Use ProcessRole.toString instead of String in AclApis [kafka]

2025-04-07 Thread via GitHub


gongxuanzhang opened a new pull request, #19406:
URL: https://github.com/apache/kafka/pull/19406

   like title
   about jira https://issues.apache.org/jira/browse/KAFKA-19100


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-04-07 Thread via GitHub


gongxuanzhang commented on PR #19226:
URL: https://github.com/apache/kafka/pull/19226#issuecomment-2783399502

   @mimaison  @chia7712  
   Thanks for your comments
   I will create a MINOR PR to address it 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup Core Module [kafka]

2025-04-07 Thread via GitHub


AndrewJSchofield merged PR #19372:
URL: https://github.com/apache/kafka/pull/19372


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16718-2/n: KafkaAdminClient and GroupCoordinator implementation for DeleteShareGroupOffsets RPC [kafka]

2025-04-07 Thread via GitHub


AndrewJSchofield commented on code in PR #18976:
URL: https://github.com/apache/kafka/pull/18976#discussion_r2031381756


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -1508,6 +1555,110 @@ private 
CompletableFuture 
deleteShareGroupOffsets(
+RequestContext context,
+DeleteShareGroupOffsetsRequestData requestData
+) {
+if (!isActive.get()) {
+return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+if (metadataImage == null) {
+return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.COORDINATOR_NOT_AVAILABLE));
+}
+
+String groupId = requestData.groupId();
+
+if (!isGroupIdNotEmpty(groupId)) {
+return CompletableFuture.completedFuture(
+
DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.INVALID_GROUP_ID));
+}
+
+Map requestTopicIdToNameMapping = new HashMap<>();
+List 
deleteShareGroupStateRequestTopicsData = new 
ArrayList<>(requestData.topics().size());
+
List 
deleteShareGroupOffsetsResponseTopicList = new 
ArrayList<>(requestData.topics().size());
+
+requestData.topics().forEach(topic -> {
+Uuid topicId = 
metadataImage.topics().topicNameToIdView().get(topic.topicName());
+if (topicId != null) {
+requestTopicIdToNameMapping.put(topicId, topic.topicName());
+deleteShareGroupStateRequestTopicsData.add(new 
DeleteShareGroupStateRequestData.DeleteStateData()
+.setTopicId(topicId)
+.setPartitions(
+topic.partitions().stream().map(
+partitionIndex -> new 
DeleteShareGroupStateRequestData.PartitionData().setPartition(partitionIndex)
+).toList()
+));
+} else {
+deleteShareGroupOffsetsResponseTopicList.add(new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic()
+.setTopicName(topic.topicName())
+.setPartitions(topic.partitions().stream().map(
+partition -> new 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponsePartition()
+.setPartitionIndex(partition)
+
.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code())
+
.setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())
+).toList()));
+}
+});
+
+// If the request for the persister is empty, just complete the 
operation right away.
+if (deleteShareGroupStateRequestTopicsData.isEmpty()) {
+return CompletableFuture.completedFuture(
+new DeleteShareGroupOffsetsResponseData()
+.setResponses(deleteShareGroupOffsetsResponseTopicList));
+}
+
+CompletableFuture future = new 
CompletableFuture<>();
+
+TopicPartition topicPartition = topicPartitionFor(groupId);
+
+// This is done to make sure the provided group is empty. Offsets can 
be deleted only for an empty share group.
+CompletableFuture> 
describeGroupFuture =
+runtime.scheduleReadOperation(
+"share-group-describe",
+topicPartition,
+(coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDescribe(List.of(groupId), lastCommittedOffset)
+).exceptionally(exception -> handleOperationException(
+"share-group-describe",
+List.of(groupId),
+exception,
+(error, __) -> 
ShareGroupDescribeRequest.getErrorDescribedGroupList(List.of(groupId), error),
+log
+));
+
+describeGroupFuture.whenComplete((groups, throwable) -> {
+if (throwable != null) {
+log.error("Failed to describe the share group {}", groupId, 
throwable);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
+} else if (groups == null || groups.isEmpty()) {
+log.error("Describe share group resulted in empty response for 
group {}", groupId);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));
+} else if (groups.get(0).errorCode() != Errors.NONE.code()) {
+log.error("Failed to describe the share group {}", groupId);
+
future.complete(DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(Errors.GROUP_ID_NOT_FOUND));

Review Comment:
   In this case, the list `groups` may contain an exception whi

Re: [PR] KAFKA-19030: Remove metricNamePrefix from RequestChannel [kafka]

2025-04-07 Thread via GitHub


Parkerhiphop commented on code in PR #19374:
URL: https://github.com/apache/kafka/pull/19374#discussion_r2031557837


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -495,8 +491,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
 
   val shouldRun = new AtomicBoolean(true)
 
-  def metricPrefix(): String
-  def threadPrefix(): String
+  def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix

Review Comment:
   Agreed. I was focused on fixing the previous test case and missed 
this—thanks for the suggestion!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2025-04-07 Thread via GitHub


mjsax commented on PR #15607:
URL: https://github.com/apache/kafka/pull/15607#issuecomment-2783873590

   As discussed via DM on some other channel, I think the original issue of 
duplicate tombstones was already fixed via 
https://github.com/apache/kafka/pull/19005
   
   I think we should re-purpose this PR though, to improve the FK-join 
implementation further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-19098) Remove lastOffset from PartitionResponse

2025-04-07 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-19098.

Fix Version/s: 4.1.0
   Resolution: Fixed

> Remove lastOffset from PartitionResponse
> 
>
> Key: KAFKA-19098
> URL: https://issues.apache.org/jira/browse/KAFKA-19098
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Nick Guo
>Priority: Minor
> Fix For: 4.1.0
>
>
> the `lastOffset` is not used actually, so it can be removed. We can remove 
> some constructors subsequently



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19030: Remove metricNamePrefix from RequestChannel [kafka]

2025-04-07 Thread via GitHub


Parkerhiphop commented on code in PR #19374:
URL: https://github.com/apache/kafka/pull/19374#discussion_r2030194156


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -97,7 +97,7 @@ class SocketServer(
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)

Review Comment:
   @chia7712 Yes, I've remove the `DataPlaneAcceptor#MetricPrefix`.
   
   But I'm not sure the method `DataPlaneAcceptor#threadPrefix` should be 
replaced by `DataPlaneAcceptor#ThreadPrefix` since the `threadPrefix()` is used 
within the `Acceptor`. And the Acceptor should not use the constant from its 
child class `DataPlaneAcceptor`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19030: Remove metricNamePrefix from RequestChannel [kafka]

2025-04-07 Thread via GitHub


Parkerhiphop commented on code in PR #19374:
URL: https://github.com/apache/kafka/pull/19374#discussion_r2031577526


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -97,7 +97,7 @@ class SocketServer(
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)

Review Comment:
   Updated: I’m not sure if we should replace the 
DataPlaneAcceptor#threadPrefix() method with the constant 
DataPlaneAcceptor#THREAD_PREFIX, since threadPrefix() is used within the 
Acceptor interface. 
   
   It seems that Acceptor shouldn’t directly rely on constants defined in its 
child class (DataPlaneAcceptor), as the comment below suggests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16758: remove internal.leave.group.on.close config [kafka]

2025-04-07 Thread via GitHub


frankvicky opened a new pull request, #19400:
URL: https://github.com/apache/kafka/pull/19400

   This is a follow-up of #17614 
   The patch is to remove the `internal.leave.group.on.close` config.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19060 Documented null edge cases in the Clients API JavaDoc [kafka]

2025-04-07 Thread via GitHub


Yunyung commented on code in PR #19393:
URL: https://github.com/apache/kafka/pull/19393#discussion_r2030586173


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1658,7 +1660,8 @@ public Map 
beginningOffsets(Collection par
  * @see #seekToEnd(Collection)
  *
  * @param partitions the partitions to get the end offsets.
- * @return The end offsets for the given partitions.
+ * @return The end offsets for the given partitions. If the offset for a 
specific partition cannot be found or the 
+ * timeout is zero, the corresponding value will be {@code null}

Review Comment:
   Some paragraphs have periods, while others do not, such as in existing 
JavaDocs. We should make them all consistent (this can be addressed in another 
PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14523: Decouple RemoteLogManager and Partition [kafka]

2025-04-07 Thread via GitHub


mimaison commented on code in PR #19391:
URL: https://github.com/apache/kafka/pull/19391#discussion_r2030649656


##
storage/src/main/java/org/apache/kafka/server/log/remote/TopicPartitionLog.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.storage.internals.log.UnifiedLog;
+
+import java.util.Optional;
+
+/**
+ * Interface to decouple RemoteLogManager and Partition
+ */
+public interface TopicPartitionLog {

Review Comment:
   To be honest I don't know at the moment. It depends where `Partition` ends 
up going but there's still a lot of work to do before we can move it, so I 
think we'll need this interface for a little while.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19060 Documented null edge cases in the Clients API JavaDoc [kafka]

2025-04-07 Thread via GitHub


Yunyung commented on code in PR #19393:
URL: https://github.com/apache/kafka/pull/19393#discussion_r2030586173


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1658,7 +1660,8 @@ public Map 
beginningOffsets(Collection par
  * @see #seekToEnd(Collection)
  *
  * @param partitions the partitions to get the end offsets.
- * @return The end offsets for the given partitions.
+ * @return The end offsets for the given partitions. If the offset for a 
specific partition cannot be found or the 
+ * timeout is zero, the corresponding value will be {@code null}

Review Comment:
   Some paragraphs have periods, while others do not,  for existing JavaDocs. I 
think we should make them all consistent (this can be addressed in another PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19060 Documented null edge cases in the Clients API JavaDoc [kafka]

2025-04-07 Thread via GitHub


Yunyung commented on code in PR #19393:
URL: https://github.com/apache/kafka/pull/19393#discussion_r2030586173


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1658,7 +1660,8 @@ public Map 
beginningOffsets(Collection par
  * @see #seekToEnd(Collection)
  *
  * @param partitions the partitions to get the end offsets.
- * @return The end offsets for the given partitions.
+ * @return The end offsets for the given partitions. If the offset for a 
specific partition cannot be found or the 
+ * timeout is zero, the corresponding value will be {@code null}

Review Comment:
   Some paragraphs have periods, while others do not,  for existing JavaDocs. I 
think we could make them all consistent (this can be addressed in another PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-04-07 Thread via GitHub


mimaison commented on PR #19226:
URL: https://github.com/apache/kafka/pull/19226#issuecomment-2782354297

   I was looking at https://github.com/apache/kafka/pull/19390 and wondering 
whether we should put these classes in `server` instead of `server-common` as 
they don't seem to be used elsewhere. For example we put 
`DelayedRemoteListOffsets` in the `storage` module. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15370: ACL changes to support 2PC (KIP-939) [kafka]

2025-04-07 Thread via GitHub


FrankYang0529 commented on code in PR #19364:
URL: https://github.com/apache/kafka/pull/19364#discussion_r2030671804


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -1520,6 +1520,10 @@ class KafkaApis(val requestChannel: RequestChannel,
 requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 return
   }
+  if (initProducerIdRequest.enable2Pc() && 
!authHelper.authorize(request.context, TWO_PHASE_COMMIT, TRANSACTIONAL_ID, 
transactionalId)) {
+requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)

Review Comment:
   The `TRANSACTIONAL_ID_AUTHORIZATION_FAILED` is returned when client lacks 
either `WRITE` or `TWO_PHASE_COMMIT` permissions. Should we include information 
about which specific permission needs to be granted to the user in order to 
complete the operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused `ApiVersions` variable from Sender and RecordAccumulator [kafka]

2025-04-07 Thread via GitHub


Yunyung commented on PR #19399:
URL: https://github.com/apache/kafka/pull/19399#issuecomment-2782382998

   > It looks like `apiVersions` in `RecordAccumulator` can also be cleanup. 
Could you include it in this PR? Thanks.
   
   Done. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18336: Improve jmh tests on ACL in AuthorizerBenchmark and StandardAuthorizerUpdateBenchmark [kafka]

2025-04-07 Thread via GitHub


ekuvardin commented on PR #18293:
URL: https://github.com/apache/kafka/pull/18293#issuecomment-2782433185

   @chia7712 Could you please help and review code?
   
   Next stage after this MR I would like to rewrite some code connecting to ACL 
and replace HashSet with prefix set to improve ACL filters.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18402: Optimise of comparison on org.apache.kafka.common.Uuid [kafka]

2025-04-07 Thread via GitHub


bmscomp commented on PR #13627:
URL: https://github.com/apache/kafka/pull/13627#issuecomment-2782438397

   @divijvaidya  @ijuma  Still think this is a better implementation, I think 
what do you think about it ? Or should I close this 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Rename RemoteLogStorageManager variable to RemoteStorageManager [kafka]

2025-04-07 Thread via GitHub


stanislavkozlovski opened a new pull request, #19401:
URL: https://github.com/apache/kafka/pull/19401

   This patch renames the KIP-405 Plugin variable from 
`remoteLogStorageManager` to `remoteStorageManager`. After [writing about 
it](https://aiven.io/blog/apache-kafka-tiered-storage-in-depth-how-writes-and-metadata-flow),
 I realized I got swayed by the code and called the component incorrectly - the 
official name doesn't have `Log` in it. I thought i'd go ahead and change the 
code so it's consistent with the naming too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Port changes from KAFKA-18569 for ShareConsumers [kafka]

2025-04-07 Thread via GitHub


ShivsundarR opened a new pull request, #19402:
URL: https://github.com/apache/kafka/pull/19402

   *What*
   
   - `ShareConsumers` may wait on an unneeded `FindCoordinator` during 
`close()`(i.e after the acknowledgements are sent).
   
   - This change https://github.com/apache/kafka/pull/18590 added the 
`StopFindCoordinatorOnClose` event and was used by the regular consumers. We 
are using the same event in `ShareConsumers` as well to prevent sending this 
event when coordinator is no longer needed.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-07 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2030885033


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2226,6 +2224,16 @@ private CoordinatorResult stream
 );
 }
 
+if (group.isShutdownRequested()) {
+returnedStatus.add(
+new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code())
+.setStatusDetail(
+"A KafkaStreams instance encountered a fatal error and 
requested a shutdown for the entire application."

Review Comment:
   This replicates the message used in the old protocol. But you are right that 
it would be nice to include the member ID. This requires to change the field 
from a boolean to an optional string. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17639 Add Java 23 to CI [kafka]

2025-04-07 Thread via GitHub


showuon commented on PR #19396:
URL: https://github.com/apache/kafka/pull/19396#issuecomment-2782818007

   The Jenkins build adds JDK 23 now, and the build completes with 2 flaky 
tests: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-19396/2/
   
   @gharris1727 , please help review. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19037: Integrate consumer-side code with Streams [kafka]

2025-04-07 Thread via GitHub


cadonna commented on code in PR #19377:
URL: https://github.com/apache/kafka/pull/19377#discussion_r2030990356


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java:
##
@@ -99,8 +115,17 @@ SmokeTestDriver.VerificationResult result() {
 // We set 2 timeout condition to fail the test before passing the 
verification:
 // (1) 10 min timeout, (2) 30 tries of polling without getting any data
 @ParameterizedTest
-@CsvSource({"false, false", "true, false"})
-public void shouldWorkWithRebalance(final boolean stateUpdaterEnabled, 
final boolean processingThreadsEnabled) throws InterruptedException {
+@CsvSource({
+"false, false, true",
+"true, false, true",
+"false, false, false",
+"true, false, false",

Review Comment:
   I removed the runs with processing threads enabled since one of them was 
failing. I did not investigate further.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19027: Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults [kafka]

2025-04-07 Thread via GitHub


m1a2st commented on code in PR #19347:
URL: https://github.com/apache/kafka/pull/19347#discussion_r2031081161


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java:
##
@@ -38,12 +34,6 @@
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
-import static org.apache.kafka.common.test.api.Type.CO_KRAFT;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
-import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 
 /**
  * The old test framework {@link 
kafka.api.BaseConsumerTest#getTestGroupProtocolParametersAll} test for the 
following cases:

Review Comment:
   I think we don't need this JavaDoc anymore, WDYT? cc @chia7712 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19060 Documented null edge cases in the Clients API JavaDoc [kafka]

2025-04-07 Thread via GitHub


m1a2st commented on code in PR #19393:
URL: https://github.com/apache/kafka/pull/19393#discussion_r2031149018


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1658,7 +1660,8 @@ public Map 
beginningOffsets(Collection par
  * @see #seekToEnd(Collection)
  *
  * @param partitions the partitions to get the end offsets.
- * @return The end offsets for the given partitions.
+ * @return The end offsets for the given partitions. If the offset for a 
specific partition cannot be found or the 
+ * timeout is zero, the corresponding value will be {@code null}

Review Comment:
   Sure, but I won't address this changes in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Only send endpoints to partitions on changes [kafka]

2025-04-07 Thread via GitHub


bbejeck commented on code in PR #19407:
URL: https://github.com/apache/kafka/pull/19407#discussion_r2031311815


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -513,6 +513,8 @@ GroupMetadataManager build() {
  */
 private final int streamsGroupMetadataRefreshIntervalMs;
 
+private final Set streamsGroupMembersSendMetadata = new 
HashSet<>();
+

Review Comment:
   Introducing soft-state to track members that have received 
endpointToPartitions information in a heartbeat response 



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2341,15 +2343,13 @@ private CoordinatorResult stream
 );
 
 scheduleStreamsGroupSessionTimeout(groupId, memberId);
-List 
endpointToPartitions = maybeBuildEndpointToPartitions(group);
 // Prepare the response.
 StreamsGroupHeartbeatResponseData response = new 
StreamsGroupHeartbeatResponseData()
 .setMemberId(updatedMember.memberId())
 .setMemberEpoch(updatedMember.memberEpoch())
-.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs)
-.setPartitionsByUserEndpoint(endpointToPartitions);
-
-// The assignment is only provided in the following cases:
+.setHeartbeatIntervalMs(streamsGroupHeartbeatIntervalMs);
+

Review Comment:
   Removed this line that resulted in sending the endpoint information in every 
heartbeat response.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2358,9 +2358,18 @@ private CoordinatorResult stream
 || hasAssignedStandbyTasksChanged(member, updatedMember)
 || hasAssignedWarmupTasksChanged(member, updatedMember)
 ) {
+streamsGroupMembersSendMetadata.clear();

Review Comment:
   There's been an assignment change, so all members will need to eventually 
get an update to the endpoint metadata.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -2358,9 +2358,18 @@ private CoordinatorResult stream
 || hasAssignedStandbyTasksChanged(member, updatedMember)
 || hasAssignedWarmupTasksChanged(member, updatedMember)
 ) {
+streamsGroupMembersSendMetadata.clear();
 
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedActiveTasks()));
 
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedStandbyTasks()));
 
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedWarmupTasks()));
+} else {
+long memberGroupEpochCount = 
group.members().values().stream().filter(m -> m.memberEpoch() == 
group.groupEpoch()).count();
+if (memberGroupEpochCount == group.members().size()) {
+if 
(!streamsGroupMembersSendMetadata.contains(updatedMember.memberId())) {
+
response.setPartitionsByUserEndpoint(maybeBuildEndpointToPartitions(group));
+
streamsGroupMembersSendMetadata.add(updatedMember.memberId());
+}

Review Comment:
   If all members have a `memberEpoch` equaling the `groupEpoch` and the 
`memberId` is not in the sent metadata set, add the endpoint information to the 
heartbeat response and add the id to the set to prevent redundant updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18894: Add KIP-877 support for ConfigProvider [kafka]

2025-04-07 Thread via GitHub


m1a2st commented on PR #19397:
URL: https://github.com/apache/kafka/pull/19397#issuecomment-2783653368

   Thanks for this patch, please use `./gradlew spotlessApply` fix build error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19030: Remove metricNamePrefix from RequestChannel [kafka]

2025-04-07 Thread via GitHub


Parkerhiphop commented on code in PR #19374:
URL: https://github.com/apache/kafka/pull/19374#discussion_r2030194156


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -97,7 +97,7 @@ class SocketServer(
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
   private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)

Review Comment:
   @chia7712 Yes, I've remove the `DataPlaneAcceptor#MetricPrefix`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-04-07 Thread via GitHub


junrao commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031538464


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2488,6 +2509,190 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
maybeFilterAbortedTransactionalAcquiredRecords(
+FetchPartitionData fetchPartitionData,
+FetchIsolation isolationLevel,
+ShareAcquiredRecords shareAcquiredRecords
+) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED || 
fetchPartitionData.abortedTransactions.isEmpty() || 
fetchPartitionData.abortedTransactions.get().isEmpty())
+return shareAcquiredRecords;
+
+// When FetchIsolation.TXN_COMMITTED is used as isolation level by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalAcquiredRecords(fetchPartitionData.records.batches(),
+shareAcquiredRecords.acquiredRecords(), 
fetchPartitionData.abortedTransactions.get());
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalAcquiredRecords(
+Iterable batches,
+List acquiredRecords,
+List abortedTransactions
+) {
+// The record batches that need to be archived in cachedState because 
they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveRecords(recordBatch.baseOffset(), recordBatch.lastOffset() 
+ 1, subMap, RecordState.ACQUIRED);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+}
+
+/**
+ * This function filters out the offsets present in the acquired records 
list that are also a part of batches that need to be archived.
+ * It follows an iterative refinement of acquired records to eliminate 
batches to be archived.
+ * @param acquiredRecordsList The list containing acquired records. This 
list is sorted by the firstOffset of the acquired batch.
+ * @param batchesToArchive The list containing record batches to archive. 
This list is sorted by the baseOffset of the record batch.
+ * @return The list containing filtered acquired records offsets.
+ */
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecordsList,
+List batchesToArchive
+) {
+List result = new ArrayList<>();
+Iterator acquiredRecordsListIter = 
acquiredRecordsList.iterator();
+Iterator batchesToArchiveIterator = 
batchesToArchive.iterator();
+if (!batchesToArchiveIterator.hasNext())
+return acquiredRecordsList;
+RecordBatch batchToArchive = batchesToArchiveIterator.next();
+AcquiredRecords unresolvedAcquiredRecords = null;
+
+while (unresolvedAcquiredRecords != null || 
acquiredRecordsListIter.hasNext()) {
+if (unresolvedAcquiredRecords == null)
+unresolvedAcquiredRecords = acquiredRecordsListIter.next();
+
+long unresolvedFirstOffset = 
unresolvedAcquiredRecords.firstOffset();
+long unresolvedLastOffset = unresolvedAcquiredRecords.lastOffset();
+short unresolvedDeliveryCount = 
unresolvedAcquiredRecords.deliveryCount();
+
+if (batchToArchive == null) {
+result.add(unresolvedAcquiredRecords);
+unresolvedAcquiredRecords = null;
+continue;
+}
+
+// Non-overlap check - unresolvedFirstOffset offsets lie before 
the batchToArchive offsets. No need to filter out the offsets in such a 
scenario.
+if (unresolvedLastOffset < batchToArchive.baseOffset()) {
+// Offsets in unresolvedAcquiredRecords do not overlap with 
batchToArchive, hence it should not get filtered out.
+result.add(unresolvedAcquiredRecords);
+unresolvedAcquiredRecords = null;
+}
+
+// Overlap check - unresolvedFirstOffset offsets overlap with the 
batchToArchive offsets. We need to filter out the overlapping
+// offsets in such a scenario.
+if (unresolvedFirstOffset <= batchToArchive.lastOffset() &&
+unresolvedLastOffset >= batchToArchive.baseOffset()) {
+unresolvedAcquiredRecords = null;
+// Split the unresolvedFirstOffset into parts - before and 
after the 

Re: [PR] MINOR: use enum map for error counts map [kafka]

2025-04-07 Thread via GitHub


lorcanj commented on PR #19314:
URL: https://github.com/apache/kafka/pull/19314#issuecomment-2784070246

   Hi @mjsax, I noticed you commented on a ticket that I raised which mentioned 
not combining too many changes into 1 PR. Could you advise whether it would be 
advisable to split this up into smaller separate PRs?
   
   I thought it would be more efficient to do it in one PR but I overlooked the 
effect on any reviewer. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19103) Remove OffsetConfig

2025-04-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-19103:
--

 Summary: Remove OffsetConfig
 Key: KAFKA-19103
 URL: https://issues.apache.org/jira/browse/KAFKA-19103
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


it was used by old coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19054: StreamThread exception handling with SHUTDOWN_APPLICATION may trigger a tight loop with MANY logs [kafka]

2025-04-07 Thread via GitHub


apalan60 commented on PR #19394:
URL: https://github.com/apache/kafka/pull/19394#issuecomment-2783979984

   @FrankYang0529 
   
   Thanks for your review.
   I've added a test. PTAL, Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19101) Remove ControllerMutationQuotaManager#throttleTimeMs unused parameter

2025-04-07 Thread Jira
黃竣陽 created KAFKA-19101:
---

 Summary: Remove ControllerMutationQuotaManager#throttleTimeMs 
unused parameter
 Key: KAFKA-19101
 URL: https://issues.apache.org/jira/browse/KAFKA-19101
 Project: Kafka
  Issue Type: Improvement
Reporter: 黃竣陽
Assignee: 黃竣陽


It seems `timeMs` this parameter never used in Kafka project, the first commit 
is 
https://github.com/apache/kafka/commit/b5f90daf13b4945305951ca0eecdb454a4dcafc2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19103) Remove OffsetConfig

2025-04-07 Thread Bolin Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941677#comment-17941677
 ] 

Bolin Lin commented on KAFKA-19103:
---

Hi [~chia7712],  ** I just jumped in—could you please clarify what we’re 
supposed to do?
Is there a specific part or keyword I should start with? 

> Remove OffsetConfig
> ---
>
> Key: KAFKA-19103
> URL: https://issues.apache.org/jira/browse/KAFKA-19103
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Bolin Lin
>Priority: Trivial
>
> it was used by old coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes

2025-04-07 Thread Lorcan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941541#comment-17941541
 ] 

Lorcan commented on KAFKA-6629:
---

Hi [~guozhang], I've created a PR for this change:

https://github.com/apache/kafka/pull/19404

> SegmentedCacheFunctionTest does not cover session window serdes
> ---
>
> Key: KAFKA-6629
> URL: https://issues.apache.org/jira/browse/KAFKA-6629
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lorcan
>Priority: Major
>  Labels: newbie, unit-test
>
> The SegmentedCacheFunctionTest.java only covers time window serdes, but not 
> session window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19099) Remove GroupSyncKey, GroupJoinKey, and MemberKey

2025-04-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-19099:
--

 Summary: Remove GroupSyncKey, GroupJoinKey, and MemberKey
 Key: KAFKA-19099
 URL: https://issues.apache.org/jira/browse/KAFKA-19099
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


they are useless after removing old coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19027: Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults [kafka]

2025-04-07 Thread via GitHub


chia7712 commented on code in PR #19347:
URL: https://github.com/apache/kafka/pull/19347#discussion_r2031064108


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -66,13 +68,28 @@
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.test.TestUtils.RANDOM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@ClusterTestDefaults(
+types = {Type.CO_KRAFT},
+serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "1000"),

Review Comment:
   could you please ensure those configs are required to other tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19099) Remove GroupSyncKey, GroupJoinKey, and MemberKey

2025-04-07 Thread Nick Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17941543#comment-17941543
 ] 

Nick Guo commented on KAFKA-19099:
--

Hi [~chia7712] ,I would like to take this issue.Thanks!

> Remove GroupSyncKey, GroupJoinKey, and MemberKey
> 
>
> Key: KAFKA-19099
> URL: https://issues.apache.org/jira/browse/KAFKA-19099
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> they are useless after removing old coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-19099) Remove GroupSyncKey, GroupJoinKey, and MemberKey

2025-04-07 Thread Nick Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Guo reassigned KAFKA-19099:


Assignee: Nick Guo  (was: Chia-Ping Tsai)

> Remove GroupSyncKey, GroupJoinKey, and MemberKey
> 
>
> Key: KAFKA-19099
> URL: https://issues.apache.org/jira/browse/KAFKA-19099
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Nick Guo
>Priority: Minor
>
> they are useless after removing old coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19027: Replace ConsumerGroupCommandTestUtils#generator by ClusterTestDefaults [kafka]

2025-04-07 Thread via GitHub


Rancho-7 commented on code in PR #19347:
URL: https://github.com/apache/kafka/pull/19347#discussion_r2031070243


##
tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java:
##
@@ -66,13 +68,28 @@
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG;
+import static 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG;
 import static org.apache.kafka.test.TestUtils.RANDOM;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+@ClusterTestDefaults(
+types = {Type.CO_KRAFT},
+serverProperties = {
+@ClusterConfigProperty(key = OFFSETS_TOPIC_PARTITIONS_CONFIG, value = 
"1"),
+@ClusterConfigProperty(key = OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
value = "1"),
+@ClusterConfigProperty(key = GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 
value = "1000"),

Review Comment:
   let me check and confirm.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-07 Thread via GitHub


lucasbru commented on code in PR #19359:
URL: https://github.com/apache/kafka/pull/19359#discussion_r2030885666


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java:
##
@@ -1106,4 +1106,33 @@ public void testIsSubscribedToTopic() {
 assertTrue(streamsGroup.isSubscribedToTopic("test-topic2"));
 assertFalse(streamsGroup.isSubscribedToTopic("non-existent-topic"));
 }
+
+@Test
+public void testShutdownRequestedMethods() {
+String memberId1 = "test-member-id1";
+String memberId2 = "test-member-id2";
+LogContext logContext = new LogContext();
+SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
+GroupCoordinatorMetricsShard metricsShard = 
mock(GroupCoordinatorMetricsShard.class);
+StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, "test-group", metricsShard);
+
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId1));
+
streamsGroup.updateMember(streamsGroup.getOrCreateDefaultMember(memberId2));
+
+// Initially, shutdown should not be requested
+assertFalse(streamsGroup.isShutdownRequested());
+
+// Set shutdown requested
+streamsGroup.setShutdownRequested(memberId1);
+assertTrue(streamsGroup.isShutdownRequested());
+
+// As long as group not empty, remain in shutdown requested state
+streamsGroup.removeMember(memberId1);
+assertTrue(streamsGroup.isShutdownRequested());
+
+// As soon as the group is empty, clear the shutdown requested state
+streamsGroup.removeMember(memberId2);
+assertFalse(streamsGroup.isShutdownRequested());
+}
+
 }

Review Comment:
   Ah, okay! Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19030: Remove metricNamePrefix from RequestChannel [kafka]

2025-04-07 Thread via GitHub


m1a2st commented on code in PR #19374:
URL: https://github.com/apache/kafka/pull/19374#discussion_r2031136549


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -495,8 +491,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
 
   val shouldRun = new AtomicBoolean(true)
 
-  def metricPrefix(): String
-  def threadPrefix(): String
+  def threadPrefix(): String = DataPlaneAcceptor.ThreadPrefix

Review Comment:
   I'm not sure it's a good idea for a parent class to use constants from its 
child classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19060 Documented null edge cases in the Clients API JavaDoc [kafka]

2025-04-07 Thread via GitHub


m1a2st commented on PR #19393:
URL: https://github.com/apache/kafka/pull/19393#issuecomment-2783697038

   Hello @kirktrue, if you have free cycle, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move ShareCoordinatorConfig and GroupCoordinatorConfig out of KafkaConfig [kafka]

2025-04-07 Thread via GitHub


FrankYang0529 opened a new pull request, #19409:
URL: https://github.com/apache/kafka/pull/19409

   Since GroupCoordinatorConfig and GroupCoordinatorConfig are not 
reconfigurable, they don't need to be integrated in KafkaConfig.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move ShareCoordinatorConfig and GroupCoordinatorConfig out of KafkaConfig [kafka]

2025-04-07 Thread via GitHub


dajac commented on code in PR #19409:
URL: https://github.com/apache/kafka/pull/19409#discussion_r2031502146


##
core/src/main/scala/kafka/server/KafkaConfig.scala:
##
@@ -194,16 +193,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   private val _quorumConfig = new QuorumConfig(this)
   def quorumConfig: QuorumConfig = _quorumConfig
 
-  private val _groupCoordinatorConfig = new GroupCoordinatorConfig(this)

Review Comment:
   What's the drawback of keeping them here? It would be inlined with the 
others, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-04-07 Thread via GitHub


mimaison commented on PR #19226:
URL: https://github.com/apache/kafka/pull/19226#issuecomment-2783221120

   `RemoteLogManager` only uses `DelayedRemoteListOffsets` hence why we put 
that in the `storage` module. But as far as I can tell other purgatory code 
could be in `server`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18796-3: Increased the default acquisition lock timeout in SharePartition from 100ms to 2 seconds [kafka]

2025-04-07 Thread via GitHub


chirag-wadhwa5 opened a new pull request, #19405:
URL: https://github.com/apache/kafka/pull/19405

   This PR increases the default acquisition lock timeout in SharePartitionTest 
from 100ms to 2 seconds in an attempt to remove flakiness from the respective 
tests. The test reports suggest some of the acquisition lock timeout do not get 
expired, even when the tests wait for a longer duration than the expiration 
timeout. This might be because the SystemTimeReaper thread advances the clock 
every 200ms which is large as compared to the acquisition lock timeout for the 
acquired records in the test class.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19077: Propagate shutdownRequested field [kafka]

2025-04-07 Thread via GitHub


lucasbru merged PR #19359:
URL: https://github.com/apache/kafka/pull/19359


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19101 Remove ControllerMutationQuotaManager#throttleTimeMs unused parameter [kafka]

2025-04-07 Thread via GitHub


m1a2st opened a new pull request, #19410:
URL: https://github.com/apache/kafka/pull/19410

   It seems `timeMs` this parameter never used in Kafka project, the method 
init commit is 
https://github.com/apache/kafka/commit/b5f90daf13b4945305951ca0eecdb454a4dcafc2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19097) Fix order of arguments to assertEquals

2025-04-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19097?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-19097:

Component/s: streams
 unit tests

> Fix order of arguments to assertEquals
> --
>
> Key: KAFKA-19097
> URL: https://issues.apache.org/jira/browse/KAFKA-19097
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Lorcan
>Assignee: HongYi Chen
>Priority: Minor
>  Labels: beginner-friendly, newbie,
>
> There are examples of tests using assertEquals where the arguments are in the 
> wrong order.
> Example from InternalStreamsBuilderTest:
> assertEquals(count.get(), 3);
> The expected value should be the first argument.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-04-07 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2031757437


##
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##
@@ -6671,15 +6693,441 @@ private String assertionFailedMessage(SharePartition 
sharePartition, Map acquiredRecords1 = List.of(
+new 
AcquiredRecords().setFirstOffset(1).setLastOffset(5).setDeliveryCount((short) 
1),
+new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)
+);
+List recordBatches1 = List.of(
+memoryRecordsBuilder(3, 2).build().batches().iterator().next(),
+memoryRecordsBuilder(3, 12).build().batches().iterator().next()
+);
+assertEquals(
+List.of(
+new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
1),
+new 
AcquiredRecords().setFirstOffset(10).setLastOffset(11).setDeliveryCount((short) 
2),
+new 
AcquiredRecords().setFirstOffset(15).setLastOffset(15).setDeliveryCount((short) 
2),
+new 
AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 
1)),
+
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords1, 
recordBatches1));
+
+List acquiredRecords2 = List.of(
+new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
3),
+new 
AcquiredRecords().setFirstOffset(9).setLastOffset(30).setDeliveryCount((short) 
2),
+new 
AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 
3)
+);
+List recordBatches2 = List.of(
+memoryRecordsBuilder(21, 5).build().batches().iterator().next(),
+memoryRecordsBuilder(5, 31).build().batches().iterator().next()
+);
+assertEquals(
+List.of(
+new 
AcquiredRecords().setFirstOffset(1).setLastOffset(4).setDeliveryCount((short) 
3),
+new 
AcquiredRecords().setFirstOffset(26).setLastOffset(30).setDeliveryCount((short) 
2),
+new 
AcquiredRecords().setFirstOffset(36).setLastOffset(40).setDeliveryCount((short) 
3)
+
+), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
recordBatches2)
+);
+
+// Record batches is empty.
+assertEquals(acquiredRecords2, 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords2, 
List.of()));
+
+List acquiredRecords3 = List.of(
+new 
AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 
1)
+);
+List recordBatches3 = List.of(
+memoryRecordsBuilder(1, 8).build().batches().iterator().next(),
+memoryRecordsBuilder(1, 18).build().batches().iterator().next()
+);
+
+assertEquals(
+List.of(
+new 
AcquiredRecords().setFirstOffset(0).setLastOffset(7).setDeliveryCount((short) 
1),
+new 
AcquiredRecords().setFirstOffset(9).setLastOffset(17).setDeliveryCount((short) 
1),
+new 
AcquiredRecords().setFirstOffset(19).setLastOffset(19).setDeliveryCount((short) 
1)
+
+), 
sharePartition.filterRecordBatchesFromAcquiredRecords(acquiredRecords3, 
recordBatches3)
+);
+}
+
+@Test
+public void testAcquireWithReadCommittedIsolationLevel() {
+SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+.withState(SharePartitionState.ACTIVE)
+.build());
+
+ByteBuffer buffer = ByteBuffer.allocate(4096);
+memoryRecordsBuilder(buffer, 5, 10).close();
+memoryRecordsBuilder(buffer, 5, 15).close();
+memoryRecordsBuilder(buffer, 15, 20).close();
+memoryRecordsBuilder(buffer, 8, 50).close();
+memoryRecordsBuilder(buffer, 10, 58).close();
+memoryRecordsBuilder(buffer, 5, 70).close();
+
+buffer.flip();
+MemoryRecords records = MemoryRecords.readableRecords(buffer);
+FetchPartitionData fetchPartitionData = fetchPartitionData(records, 
newAbortedTransactions());
+
+// We are mocking the result of function 
fetchAbortedTransactionRecordBatches. The records present at these offsets need 
to be archived.
+// We won't be utilizing the aborted transactions passed in 
fetchPartitionData.
+
when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(),
 fetchPartitionData.abortedTransactions.get())).thenReturn(
+List.of(
+memoryRecordsBui

Re: [PR] KAFKA-19090: Move DelayedFuture and DelayedFuturePurgatory to server-common module [kafka]

2025-04-07 Thread via GitHub


mimaison commented on PR #19390:
URL: https://github.com/apache/kafka/pull/19390#issuecomment-2784473414

   Should these classes go to the `server` module instead? They only seem to be 
used by `AclApis`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-04-07 Thread via GitHub


chia7712 commented on PR #19226:
URL: https://github.com/apache/kafka/pull/19226#issuecomment-2783237052

   > RemoteLogManager only uses DelayedRemoteListOffsets hence why we put that 
in the storage module. But as far as I can tell other purgatory code could be 
in server.
   
   `RemoteLogManager` also uses `TopicPartitionOperationKey` (and 
`DelayedOperationKey`), so I guess you meant other classes, such as 
`DeleteRecordsPartitionStatus`, `DelayedDeleteRecords`, and `DelayedOperation`, 
can be moved to `server` module, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18796-3: Increased the default acquisition lock timeout in SharePartition from 100ms to 2 seconds [kafka]

2025-04-07 Thread via GitHub


chirag-wadhwa5 commented on PR #19405:
URL: https://github.com/apache/kafka/pull/19405#issuecomment-2783292100

   Thanks for the review. Answering the questions -
   1. The flakiness could not be reproduced locally in the first place, so 
there is no proof of that. Honestly reproducing the error atleast once locally 
would be extremely helpful, but that hasn't been possible yet.
   2. Maybe I could change it to 1 second. The idea was that since there is 
some bundling up of requests in the timingWheel, and the clock is advanced 
every 200ms, maybe the acquisition lock timer task would be expired in the 
second round of clock advance (after 400ms), but the waitFor condition only 
waits for 300ms, so it never gets the opportunity to get expired. So the 
timeout should be something greater than 400ms plus the time it takes to 
execute the expiration logic. But the time we are saving by reducing it from 2 
seconds to 500ms is just 5 to 10 seconds, which I don't think should be any 
cause of concern. So if it's not a big problem, let's keep it 2 seconds to be 
absolutely sure. Otherwise I can reduce it to 1 second.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >