[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-19047:
---
Issue Type: Bug  (was: Improvement)

> Broker registrations are slow if previously fenced or shutdown
> --
>
> Key: KAFKA-19047
> URL: https://issues.apache.org/jira/browse/KAFKA-19047
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 4.0.0
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Major
>
> BrokerLifecycleManager prevents registration of a broker w/ an id it has seen 
> before with a different incarnation id if the broker session expires. On 
> clean shutdown and restart of a broker this can cause an unnecessary delay in 
> re-registration while the quorum controller waits for the session to expire.
> ```
> [BrokerLifecycleManager id=1] Unable to register broker 1 because the 
> controller returned error DUPLICATE_BROKER_REGISTRATION
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2025-03-27 Thread via GitHub


AyoubOm opened a new pull request, #19303:
URL: https://github.com/apache/kafka/pull/19303

   This fixes both KAFKA-16407 and KAFKA-16434.
   
   Summary of existing issues:
   
   - We are ignoring new left record when its **previous** FK value is null
   - We do not unset foreign key join result when FK becomes null
   
   _This PR was initially open in 
[#15615](https://github.com/apache/kafka/pull/15615)_
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, but client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are auto acknowledged with `GAP` and will be 
sent on the next `ShareFetch`/`ShareAcknowledge` request. But as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now for this PR, we have added any possible acknowledgements that came in 
with the empty fetch (from control records) to the `ShareFetchEvent` so that it 
can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-7699) Improve wall-clock time punctuations

2025-03-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7699:
---
Description: 
KIP-1146: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+wall-clock+punctuation]
 

Currently, wall-clock time punctuation allow to schedule periodic call backs 
based on wall-clock time progress. The punctuation time starts, when the 
punctuation is scheduled, thus, it's non-deterministic what is desired for many 
use cases (I want a call-back in 5 minutes from "now").

It would be a nice improvement, to allow users to "anchor" wall-clock 
punctation, too, similar to a cron job: Thus, a punctuation would be triggered 
at "fixed" times like the beginning of the next hour, independent when the 
punctuation was registered.

  was:
Currently, wall-clock time punctuation allow to schedule periodic call backs 
based on wall-clock time progress. The punctuation time starts, when the 
punctuation is scheduled, thus, it's non-deterministic what is desired for many 
use cases (I want a call-back in 5 minutes from "now").

It would be a nice improvement, to allow users to "anchor" wall-clock 
punctation, too, similar to a cron job: Thus, a punctuation would be triggered 
at "fixed" times like the beginning of the next hour, independent when the 
punctuation was registered.


> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Herman Kolstad Jakobsen
>Priority: Major
>  Labels: kip
>
> KIP-1146: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+wall-clock+punctuation]
>  
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Move inner test classes out of CoordinatorRuntimeTest [kafka]

2025-03-27 Thread via GitHub


squah-confluent commented on code in PR #19258:
URL: https://github.com/apache/kafka/pull/19258#discussion_r2007416635


##
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java:
##
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.common.runtime;
+
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineHashSet;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A simple Coordinator implementation that stores the records into a set.
+ */
+public class MockCoordinatorShard implements CoordinatorShard {
+static class RecordAndMetadata {

Review Comment:
   I'm trying to figure out how to do this without making the 
CoordinatorRuntimeTests even more verbose. Do you have any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7952) Consider to switch to in-memory stores in test whenever possible

2025-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939094#comment-17939094
 ] 

Matthias J. Sax commented on KAFKA-7952:


[~lorcanj] Thanks for the PR. I'll try to get to it as soon as possible (also 
sorry for late reply – I was traveling).

> Consider to switch to in-memory stores in test whenever possible
> 
>
> Key: KAFKA-7952
> URL: https://issues.apache.org/jira/browse/KAFKA-7952
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Lorcan
>Priority: Major
>  Labels: beginner, newbie
>
> We observed that tests can be very slow using default RocksDB stores (cf. 
> KAFKA-7933).
> We should consider to switch to in-memory stores whenever possible to reduce 
> test runtime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]

2025-03-27 Thread via GitHub


gharris1727 commented on code in PR #18930:
URL: https://github.com/apache/kafka/pull/18930#discussion_r2017574587


##
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java:
##
@@ -289,6 +295,13 @@ protected Map 
postProcessParsedConfig(final Map
 CommonClientConfigs.warnDisablingExponentialBackoff(this);
 return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
 }
+
+@Override
+public Map originals() {

Review Comment:
   The WorkerConfig is a connect-specific class, and it removes 
config.providers to avoid false-positive errors related to a recent CVE: 
https://www.cve.org/CVERecord?id=CVE-2024-31141
   
   Admin (and Producer and Consumer) clients don't have the same concerns that 
WorkerConfig does, or at least i'm not aware of them.
   
   Also, the title of this PR makes it sound like a documentation change, and 
this appears to be a functional change. If it needs to be addressed, can it be 
addressed separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-6840) support windowing in ktable API

2025-03-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6840:
---
Priority: Major  (was: Blocker)

> support windowing in ktable API
> ---
>
> Key: KAFKA-6840
> URL: https://issues.apache.org/jira/browse/KAFKA-6840
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: api, needs-kip
>
> The StreamsBuilder provides table() API to materialize a changelog topic into 
> a local key-value store (KTable), which is very convenient. However, current 
> underlying implementation does not support materializing one topic to a 
> windowed key-value store, which in certain cases would be very useful. 
> To make up the gap, we proposed a new API in StreamsBuilder that could get a 
> windowed Ktable.
> The table() API in StreamsBuilder looks like this:
> public synchronized  KTable table(final String topic,
>   final Consumed 
> consumed,
>   final Materialized KeyValueStore> materialized) {
>     Objects.requireNonNull(topic, "topic can't be null");
>     Objects.requireNonNull(consumed, "consumed can't be null");
>     Objects.requireNonNull(materialized, "materialized can't be null");
>     
> materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde);
>     return internalStreamsBuilder.table(topic,
>     new ConsumedInternal<>(consumed),
>     new 
> MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-"));
>     }
>  
> Where we could see that the store type is given as KeyValueStore. There is no 
> flexibility to change it to WindowStore.
>  
> To maintain compatibility of the existing API, we have two options to define 
> a new API:
> 1.Overload existing KTable struct
> public synchronized  KTable, V> windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
>  
> This could give developer an alternative to use windowed table instead. 
> However, this implies that we need to make sure all the KTable logic still 
> works as expected, such as join, aggregation, etc, so the challenge would be 
> making sure all current KTable logics work.
>  
> 2.Define a new type called WindowedKTable
> public synchronized  WindowedKTable windowedTable(final String 
> topic,
>   final Consumed 
> consumed,
>   final Materialized WindowStore> materialized);
> The benefit of doing this is that we don’t need to worry about the existing 
> functionality of KTable. However, the cost is to introduce redundancy of 
> common operation logic. When upgrading common functionality, we need to take 
> care of both types.
> We could fill in more details in the KIP. Right now I would like to hear some 
> feedbacks on the two approaches, thank you!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939055#comment-17939055
 ] 

Gunnar Morling edited comment on KAFKA-19047 at 3/27/25 7:42 PM:
-

For reference, here are the logs I've observed in that situation:

{code}
bin/kafka-server-start.sh config/server.properties
[2025-03-27 20:40:54,651] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2025-03-27 20:40:54,803] INFO Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)
[2025-03-27 20:40:54,804] INFO [ControllerServer id=1] Starting controller 
(kafka.server.ControllerServer)
[2025-03-27 20:40:54,932] INFO Updated connection-accept-rate max connection 
creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2025-03-27 20:40:54,947] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] 
Created data-plane acceptor and processors for endpoint : 
ListenerName(CONTROLLER) (kafka.network.SocketServer)
[2025-03-27 20:40:54,950] INFO authorizerStart completed for endpoint 
CONTROLLER. Endpoint is now READY. 
(org.apache.kafka.server.network.EndpointReadyFutures)
[2025-03-27 20:40:54,951] INFO [SharedServer id=1] Starting SharedServer 
(kafka.server.SharedServer)
[2025-03-27 20:40:54,971] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Recovering unflushed segment 0. 0 recovered for 
__cluster_metadata-0. (org.apache.kafka.storage.internals.log.LogLoader)
[2025-03-27 20:40:54,977] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:54,978] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding 
producer state from offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:54,978] INFO Deleted producer state snapshot 
/tmp/kraft-combined-logs/__cluster_metadata-0/5680.snapshot 
(org.apache.kafka.storage.internals.log.SnapshotFile)
[2025-03-27 20:40:54,978] INFO Deleted producer state snapshot 
/tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot 
(org.apache.kafka.storage.internals.log.SnapshotFile)
[2025-03-27 20:40:54,979] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot 
load and 0ms for segment recovery from offset 0 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,014] INFO [ProducerStateManager 
partition=__cluster_metadata-0] Wrote producer snapshot at offset 5844 with 0 
producer ids in 9 ms. 
(org.apache.kafka.storage.internals.log.ProducerStateManager)
[2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Loading producer state till offset 5844 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding 
producer state from offset 5844 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,017] INFO [ProducerStateManager 
partition=__cluster_metadata-0] Loading producer state from snapshot file 
'SnapshotFile(offset=5844, 
file=/tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot)'
 (org.apache.kafka.storage.internals.log.ProducerStateManager)
[2025-03-27 20:40:55,018] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot 
load and 0ms for segment recovery from offset 5844 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,034] INFO Initialized snapshots with IDs 
SortedSet(OffsetAndEpoch(offset=0, epoch=0)) from 
/tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
[2025-03-27 20:40:55,039] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2025-03-27 20:40:55,043] INFO [RaftManager id=1] Starting request manager with 
bootstrap servers: [localhost:9093 (id: -2 rack: null isFenced: false)] 
(org.apache.kafka.raft.KafkaRaftClient)
[2025-03-27 20:40:55,045] INFO [RaftManager id=1] Reading KRaft snapshot and 
log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient)
[2025-03-27 20:40:55,046] INFO [RaftManager id=1] Loading snapshot 
(OffsetAndEpoch(offset=0, epoch=0)) since log start offset (0) is greater than 
the internal listener's next offset (-1) 
(org.apache.kafka.raft.internals.KRaftControlRecordStateMachine)
[2025-03-27 20:40:55,048] INFO [RaftManager id=1] Latest kraft.version is 
KRAFT_VERSION_1 at offset -1 
(org.apache.kafka.raft.internals.KRaftControlRecordStateMachi

Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]

2025-03-27 Thread via GitHub


jeffkbkim commented on code in PR #19290:
URL: https://github.com/apache/kafka/pull/19290#discussion_r2016876236


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig,
   return
 }
 
-val sTime = time.milliseconds
-val localProduceResultsWithTopicId = 
appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
-  origin, entriesPerPartition, requiredAcks, requestLocal, 
verificationGuards.toMap)
-debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-val localProduceResults : Map[TopicPartition, LogAppendResult] = 
localProduceResultsWithTopicId.map {
-  case(k, v) => (k.topicPartition, v)}
+val localProduceResults = appendRecordsToLeader(
+  requiredAcks,
+  internalTopicsAllowed,
+  origin,
+  entriesPerPartition,
+  requestLocal,
+  actionQueue,
+  verificationGuards
+)
 
 val produceStatus = buildProducePartitionStatus(localProduceResults)
 
-addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
 recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
   k -> v.info.recordValidationStats
 })

Review Comment:
   (can't write a comment below this) to confirm, maybeAddDelayedProduce below 
can be asynchronous which it isn't if acks=1 today but it may change in the 
future? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]

2025-03-27 Thread via GitHub


dajac commented on code in PR #19290:
URL: https://github.com/apache/kafka/pull/19290#discussion_r2016912371


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig,
   return
 }
 
-val sTime = time.milliseconds
-val localProduceResultsWithTopicId = 
appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
-  origin, entriesPerPartition, requiredAcks, requestLocal, 
verificationGuards.toMap)
-debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
-val localProduceResults : Map[TopicPartition, LogAppendResult] = 
localProduceResultsWithTopicId.map {
-  case(k, v) => (k.topicPartition, v)}
+val localProduceResults = appendRecordsToLeader(
+  requiredAcks,
+  internalTopicsAllowed,
+  origin,
+  entriesPerPartition,
+  requestLocal,
+  actionQueue,
+  verificationGuards
+)
 
 val produceStatus = buildProducePartitionStatus(localProduceResults)
 
-addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
 recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
   k -> v.info.recordValidationStats
 })

Review Comment:
   Correct. `maybeAddDelayedProduce` decides how to complete the call. 
Basically, the method puts the produce request into the purgatory (`acls=all`) 
or complete it immediately. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]

2025-03-27 Thread via GitHub


dajac merged PR #19290:
URL: https://github.com/apache/kafka/pull/19290


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-14487: Move LogManager static methods/fields to storage module [kafka]

2025-03-27 Thread via GitHub


mimaison opened a new pull request, #19302:
URL: https://github.com/apache/kafka/pull/19302

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Gunnar Morling (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939055#comment-17939055
 ] 

Gunnar Morling commented on KAFKA-19047:


For reference, here are the logs I've observed in that situation:

```
bin/kafka-server-start.sh config/server.properties
[2025-03-27 20:40:54,651] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
[2025-03-27 20:40:54,803] INFO Registered signal handlers for TERM, INT, HUP 
(org.apache.kafka.common.utils.LoggingSignalHandler)
[2025-03-27 20:40:54,804] INFO [ControllerServer id=1] Starting controller 
(kafka.server.ControllerServer)
[2025-03-27 20:40:54,932] INFO Updated connection-accept-rate max connection 
creation rate to 2147483647 (kafka.network.ConnectionQuotas)
[2025-03-27 20:40:54,947] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] 
Created data-plane acceptor and processors for endpoint : 
ListenerName(CONTROLLER) (kafka.network.SocketServer)
[2025-03-27 20:40:54,950] INFO authorizerStart completed for endpoint 
CONTROLLER. Endpoint is now READY. 
(org.apache.kafka.server.network.EndpointReadyFutures)
[2025-03-27 20:40:54,951] INFO [SharedServer id=1] Starting SharedServer 
(kafka.server.SharedServer)
[2025-03-27 20:40:54,971] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Recovering unflushed segment 0. 0 recovered for 
__cluster_metadata-0. (org.apache.kafka.storage.internals.log.LogLoader)
[2025-03-27 20:40:54,977] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:54,978] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding 
producer state from offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:54,978] INFO Deleted producer state snapshot 
/tmp/kraft-combined-logs/__cluster_metadata-0/5680.snapshot 
(org.apache.kafka.storage.internals.log.SnapshotFile)
[2025-03-27 20:40:54,978] INFO Deleted producer state snapshot 
/tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot 
(org.apache.kafka.storage.internals.log.SnapshotFile)
[2025-03-27 20:40:54,979] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot 
load and 0ms for segment recovery from offset 0 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,014] INFO [ProducerStateManager 
partition=__cluster_metadata-0] Wrote producer snapshot at offset 5844 with 0 
producer ids in 9 ms. 
(org.apache.kafka.storage.internals.log.ProducerStateManager)
[2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Loading producer state till offset 5844 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding 
producer state from offset 5844 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,017] INFO [ProducerStateManager 
partition=__cluster_metadata-0] Loading producer state from snapshot file 
'SnapshotFile(offset=5844, 
file=/tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot)'
 (org.apache.kafka.storage.internals.log.ProducerStateManager)
[2025-03-27 20:40:55,018] INFO [LogLoader partition=__cluster_metadata-0, 
dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot 
load and 0ms for segment recovery from offset 5844 
(org.apache.kafka.storage.internals.log.UnifiedLog)
[2025-03-27 20:40:55,034] INFO Initialized snapshots with IDs 
SortedSet(OffsetAndEpoch(offset=0, epoch=0)) from 
/tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$)
[2025-03-27 20:40:55,039] INFO [raft-expiration-reaper]: Starting 
(kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper)
[2025-03-27 20:40:55,043] INFO [RaftManager id=1] Starting request manager with 
bootstrap servers: [localhost:9093 (id: -2 rack: null isFenced: false)] 
(org.apache.kafka.raft.KafkaRaftClient)
[2025-03-27 20:40:55,045] INFO [RaftManager id=1] Reading KRaft snapshot and 
log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient)
[2025-03-27 20:40:55,046] INFO [RaftManager id=1] Loading snapshot 
(OffsetAndEpoch(offset=0, epoch=0)) since log start offset (0) is greater than 
the internal listener's next offset (-1) 
(org.apache.kafka.raft.internals.KRaftControlRecordStateMachine)
[2025-03-27 20:40:55,048] INFO [RaftManager id=1] Latest kraft.version is 
KRAFT_VERSION_1 at offset -1 
(org.apache.kafka.raft.internals.KRaftControlRecordStateMachine)
[2025-03-27 20:40:55,049] INFO [RaftManager id=1

Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]

2025-03-27 Thread via GitHub


fonsdant commented on code in PR #18610:
URL: https://github.com/apache/kafka/pull/18610#discussion_r2017792607


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -277,197 +275,361 @@ public void close() {
 }
 }
 
-private class RocksDBDualCFIterator extends 
AbstractIterator>
-implements ManagedKeyValueIterator {
-
-// RocksDB's JNI interface does not expose getters/setters that allow 
the
-// comparator to be pluggable, and the default is lexicographic, so 
it's
-// safe to just force lexicographic comparator here for now.
+/**
+ * A range-based iterator for RocksDB that merges results from two column 
families.
+ *
+ * This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+ * boundaries.
+ *
+ * Key Features
+ *
+ * 
+ * Merges results from the "with-timestamp" and "no-timestamp" 
column families.
+ * Supports range-based queries with open or closed 
boundaries.
+ * Handles both forward and reverse iteration seamlessly.
+ * Ensures correct handling of inclusive and exclusive upper 
boundaries.
+ * Integrates efficiently with Kafka Streams state store 
mechanisms.
+ * 
+ *
+ * Usage
+ *
+ * The iterator can be used for different types of range-based 
operations, such as:
+ * 
+ * Iterating over all keys within a range.
+ * Prefix-based scans (when combined with dynamically calculated 
range endpoints).
+ * Open-ended range queries (e.g., from a given key to the end of 
the dataset).
+ * 
+ * 
+ *
+ * Implementation Details
+ *
+ * The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication 
where applicable.
+ *
+ * Key Methods:
+ *
+ * 
+ * {@code makeNext()}: Retrieves the next key-value pair in 
the merged range, ensuring
+ * the result is within the specified range and boundary 
conditions.
+ * {@code initializeIterators()}: Initializes the RocksDB 
iterators based on the specified range and direction.
+ * {@code isInRange()}: Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.
+ * {@code fetchNextKeyValue()}: Determines the next 
key-value pair to return based on the state of both iterators.
+ * 
+ *
+ * Thread Safety:
+ *
+ * The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+ * threads without external synchronization.
+ *
+ * Examples
+ *
+ * Iterate over a range:
+ *
+ * {@code
+ * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+ * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+ *
+ * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+ * new Bytes("keyStart".getBytes()),
+ * new Bytes("keyEnd".getBytes()),
+ * noTimestampIterator,
+ * withTimestampIterator,
+ * "storeName",
+ * true,  // Forward iteration
+ * true   // Inclusive upper boundary
+ * )) {
+ * while (iterator.hasNext()) {
+ * KeyValue entry = iterator.next();
+ * System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+ * }
+ * }
+ * }
+ *
+ * Exceptions
+ *
+ * 
+ * {@link InvalidStateStoreException}: Thrown if the 
iterator is accessed after being closed.
+ * {@link IllegalStateException}: Thrown if the close 
callback is not properly set before usage.
+ * 
+ *
+ * @see AbstractIterator
+ * @see ManagedKeyValueIterator
+ * @see RocksDBStore
+ */
+private static class RocksDBDualCFRangeIterator extends 
AbstractIterator> implements 
ManagedKeyValueIterator {
+private Runnable closeCallback;
+private byte[] noTimestampNext;
+private byte[] withTimestampNext;
 private final Comparator comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+private final RocksIterator noTimestampIterator;
+private final RocksIterator withTimestampIterator;
 private final String storeName;
-private final RocksIterator iterWithTimestamp;
-private final RocksIter

[jira] [Updated] (KAFKA-18797) Flaky testLargeAssignmentAndGroupWithUniformSubscription

2025-03-27 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-18797:
--
Attachment: testLargeAssignmentAndGroupWithUniformSubscription.jfr

> Flaky testLargeAssignmentAndGroupWithUniformSubscription
> 
>
> Key: KAFKA-18797
> URL: https://issues.apache.org/jira/browse/KAFKA-18797
> Project: Kafka
>  Issue Type: Test
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
> Attachments: Each object memory usage.png, Java Heap Memory.png, 
> testLargeAssignmentAndGroupWithUniformSubscription.jfr
>
>
> Flaky on trunk for a while (One of the top flaky tests) 
> Flaky with timeouts
> https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1739549233243&search.startTimeMin=173709000&search.tags=trunk&search.timeZoneId=America%2FToronto&tests.container=org.apache.kafka.clients.consumer.StickyAssignorTest&tests.sortField=FLAKY&tests.test=testLargeAssignmentAndGroupWithUniformSubscription(boolean)%5B1%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19050) kafka-streams-integration-tests artifact is empty

2025-03-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-19050:

Priority: Blocker  (was: Major)

> kafka-streams-integration-tests artifact is empty
> -
>
> Key: KAFKA-19050
> URL: https://issues.apache.org/jira/browse/KAFKA-19050
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, streams
>Affects Versions: 4.0.0
>Reporter: Utku Aydin
>Priority: Blocker
>
> Not sure whether this was intended or not but currently the release process 
> releases an artifact with an empty jar containing only the manifest for the 
> kafka-streams-integration-tests module. Since these tests are now in their 
> own module, I think it's an opportunity to make them available to end-users 
> as well. Personally, I would like to use EmbeddedKafkaCluster from 
> org.apache.kafka.streams.integration.utils for my own integration tests since 
> io.github.embeddedkafka is no longer being maintained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19050) kafka-streams-integration-tests artifact is empty

2025-03-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-19050:

Fix Version/s: 4.1.0
   4.0.1

> kafka-streams-integration-tests artifact is empty
> -
>
> Key: KAFKA-19050
> URL: https://issues.apache.org/jira/browse/KAFKA-19050
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, streams
>Affects Versions: 4.0.0
>Reporter: Utku Aydin
>Priority: Blocker
> Fix For: 4.1.0, 4.0.1
>
>
> Not sure whether this was intended or not but currently the release process 
> releases an artifact with an empty jar containing only the manifest for the 
> kafka-streams-integration-tests module. Since these tests are now in their 
> own module, I think it's an opportunity to make them available to end-users 
> as well. Personally, I would like to use EmbeddedKafkaCluster from 
> org.apache.kafka.streams.integration.utils for my own integration tests since 
> io.github.embeddedkafka is no longer being maintained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10844: groupBy without shuffling [kafka]

2025-03-27 Thread via GitHub


fonsdant commented on PR #18811:
URL: https://github.com/apache/kafka/pull/18811#issuecomment-2759887172

   @mjsax, I would like to suggest that we adopt "skip repartition" instead of 
"mark as partitioned". It seems to me to be more consistent with the rest of 
the KStream API methods and its functional programming language, because "skip 
repartition" is actually the action that is performed, while "mark as 
partitioned" seems to me to be more like a condition to be interpreted ("oh, 
okay, since this is marked as partitioned, I will _skip repartitioning_") than 
the action itself. What do you think?
   
   Also, I would like to know if you have other test scenarios in mind.
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19050) kafka-streams-integration-tests artifact is empty

2025-03-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-19050:

Component/s: streams

> kafka-streams-integration-tests artifact is empty
> -
>
> Key: KAFKA-19050
> URL: https://issues.apache.org/jira/browse/KAFKA-19050
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, streams
>Affects Versions: 4.0.0
>Reporter: Utku Aydin
>Priority: Major
>
> Not sure whether this was intended or not but currently the release process 
> releases an artifact with an empty jar containing only the manifest for the 
> kafka-streams-integration-tests module. Since these tests are now in their 
> own module, I think it's an opportunity to make them available to end-users 
> as well. Personally, I would like to use EmbeddedKafkaCluster from 
> org.apache.kafka.streams.integration.utils for my own integration tests since 
> io.github.embeddedkafka is no longer being maintained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18980: OffsetMetadataManager#cleanupExpiredOffsets should record the number of records rather than topic partitions [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19207:
URL: https://github.com/apache/kafka/pull/19207#discussion_r2006072232


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -875,7 +872,7 @@ public boolean cleanupExpiredOffsets(String groupId, 
List rec
 // We don't expire the offset yet if there is a pending 
transactional offset for the partition.
 if (condition.isOffsetExpired(offsetAndMetadata, 
currentTimestampMs, config.offsetsRetentionMs()) &&
 !hasPendingTransactionalOffsets(groupId, topic, 
partition)) {
-
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, 
records).toString());
+appendOffsetCommitTombstone(groupId, topic, partition, 
records);

Review Comment:
   my point was `appendOffsetCommitTombstone` can remove the returned value. 
for example:
   ```java
   private void appendOffsetCommitTombstone(
   String groupId,
   String topic,
   int partition, 
   List records
   ) {
   
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
 topic, partition));
   TopicPartition tp = new TopicPartition(topic, partition);
   log.trace("[GroupId {}] Removing expired offset and metadata for 
{}", groupId, tp);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]

2025-03-27 Thread via GitHub


fonsdant commented on code in PR #18610:
URL: https://github.com/apache/kafka/pull/18610#discussion_r2017792885


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -277,197 +275,361 @@ public void close() {
 }
 }
 
-private class RocksDBDualCFIterator extends 
AbstractIterator>
-implements ManagedKeyValueIterator {
-
-// RocksDB's JNI interface does not expose getters/setters that allow 
the
-// comparator to be pluggable, and the default is lexicographic, so 
it's
-// safe to just force lexicographic comparator here for now.
+/**
+ * A range-based iterator for RocksDB that merges results from two column 
families.
+ *
+ * This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+ * boundaries.
+ *
+ * Key Features
+ *
+ * 
+ * Merges results from the "with-timestamp" and "no-timestamp" 
column families.
+ * Supports range-based queries with open or closed 
boundaries.
+ * Handles both forward and reverse iteration seamlessly.
+ * Ensures correct handling of inclusive and exclusive upper 
boundaries.
+ * Integrates efficiently with Kafka Streams state store 
mechanisms.
+ * 
+ *
+ * Usage
+ *
+ * The iterator can be used for different types of range-based 
operations, such as:
+ * 
+ * Iterating over all keys within a range.
+ * Prefix-based scans (when combined with dynamically calculated 
range endpoints).
+ * Open-ended range queries (e.g., from a given key to the end of 
the dataset).
+ * 
+ * 
+ *
+ * Implementation Details
+ *
+ * The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication 
where applicable.
+ *
+ * Key Methods:
+ *
+ * 
+ * {@code makeNext()}: Retrieves the next key-value pair in 
the merged range, ensuring
+ * the result is within the specified range and boundary 
conditions.
+ * {@code initializeIterators()}: Initializes the RocksDB 
iterators based on the specified range and direction.
+ * {@code isInRange()}: Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.
+ * {@code fetchNextKeyValue()}: Determines the next 
key-value pair to return based on the state of both iterators.
+ * 
+ *
+ * Thread Safety:
+ *
+ * The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+ * threads without external synchronization.
+ *
+ * Examples
+ *
+ * Iterate over a range:
+ *
+ * {@code
+ * RocksIterator noTimestampIterator = 
accessor.newIterator(noTimestampColumnFamily);
+ * RocksIterator withTimestampIterator = 
accessor.newIterator(withTimestampColumnFamily);
+ *
+ * try (RocksDBDualCFRangeIterator iterator = new 
RocksDBDualCFRangeIterator(
+ * new Bytes("keyStart".getBytes()),
+ * new Bytes("keyEnd".getBytes()),
+ * noTimestampIterator,
+ * withTimestampIterator,
+ * "storeName",
+ * true,  // Forward iteration
+ * true   // Inclusive upper boundary
+ * )) {
+ * while (iterator.hasNext()) {
+ * KeyValue entry = iterator.next();
+ * System.out.println("Key: " + entry.key + ", Value: " + 
Arrays.toString(entry.value));
+ * }
+ * }
+ * }
+ *
+ * Exceptions
+ *
+ * 
+ * {@link InvalidStateStoreException}: Thrown if the 
iterator is accessed after being closed.
+ * {@link IllegalStateException}: Thrown if the close 
callback is not properly set before usage.
+ * 
+ *
+ * @see AbstractIterator
+ * @see ManagedKeyValueIterator
+ * @see RocksDBStore
+ */
+private static class RocksDBDualCFRangeIterator extends 
AbstractIterator> implements 
ManagedKeyValueIterator {
+private Runnable closeCallback;
+private byte[] noTimestampNext;
+private byte[] withTimestampNext;
 private final Comparator comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
-
+private final RocksIterator noTimestampIterator;
+private final RocksIterator withTimestampIterator;
 private final String storeName;
-private final RocksIterator iterWithTimestamp;
-private final RocksIter

Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]

2025-03-27 Thread via GitHub


fonsdant commented on code in PR #18610:
URL: https://github.com/apache/kafka/pull/18610#discussion_r2017793076


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -277,197 +275,361 @@ public void close() {
 }
 }
 
-private class RocksDBDualCFIterator extends 
AbstractIterator>
-implements ManagedKeyValueIterator {
-
-// RocksDB's JNI interface does not expose getters/setters that allow 
the
-// comparator to be pluggable, and the default is lexicographic, so 
it's
-// safe to just force lexicographic comparator here for now.
+/**
+ * A range-based iterator for RocksDB that merges results from two column 
families.
+ *
+ * This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+ * boundaries.
+ *
+ * Key Features
+ *
+ * 
+ * Merges results from the "with-timestamp" and "no-timestamp" 
column families.
+ * Supports range-based queries with open or closed 
boundaries.
+ * Handles both forward and reverse iteration seamlessly.
+ * Ensures correct handling of inclusive and exclusive upper 
boundaries.
+ * Integrates efficiently with Kafka Streams state store 
mechanisms.
+ * 
+ *
+ * Usage
+ *
+ * The iterator can be used for different types of range-based 
operations, such as:
+ * 
+ * Iterating over all keys within a range.
+ * Prefix-based scans (when combined with dynamically calculated 
range endpoints).
+ * Open-ended range queries (e.g., from a given key to the end of 
the dataset).
+ * 
+ * 
+ *
+ * Implementation Details
+ *
+ * The class extends {@link AbstractIterator} and implements {@link 
ManagedKeyValueIterator}. It uses RocksDB's
+ * native iterators for efficient traversal of keys within the specified 
range. Keys from the two column families
+ * are merged during iteration, ensuring proper order and de-duplication 
where applicable.
+ *
+ * Key Methods:
+ *
+ * 
+ * {@code makeNext()}: Retrieves the next key-value pair in 
the merged range, ensuring
+ * the result is within the specified range and boundary 
conditions.
+ * {@code initializeIterators()}: Initializes the RocksDB 
iterators based on the specified range and direction.
+ * {@code isInRange()}: Verifies if the current key-value 
pair is within the range defined by {@code from} and {@code to}.
+ * {@code fetchNextKeyValue()}: Determines the next 
key-value pair to return based on the state of both iterators.
+ * 
+ *
+ * Thread Safety:
+ *
+ * The iterator is thread-safe for sequential operations but should not 
be accessed concurrently from multiple
+ * threads without external synchronization.

Review Comment:
   Updated.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -277,197 +275,361 @@ public void close() {
 }
 }
 
-private class RocksDBDualCFIterator extends 
AbstractIterator>
-implements ManagedKeyValueIterator {
-
-// RocksDB's JNI interface does not expose getters/setters that allow 
the
-// comparator to be pluggable, and the default is lexicographic, so 
it's
-// safe to just force lexicographic comparator here for now.
+/**
+ * A range-based iterator for RocksDB that merges results from two column 
families.
+ *
+ * This iterator supports traversal over two RocksDB column families: 
one containing timestamped values and
+ * another containing non-timestamped values. It ensures that the keys 
from both column families are merged and
+ * sorted lexicographically, respecting the iteration order (forward or 
reverse) and the specified range
+ * boundaries.
+ *
+ * Key Features
+ *
+ * 
+ * Merges results from the "with-timestamp" and "no-timestamp" 
column families.
+ * Supports range-based queries with open or closed 
boundaries.
+ * Handles both forward and reverse iteration seamlessly.
+ * Ensures correct handling of inclusive and exclusive upper 
boundaries.
+ * Integrates efficiently with Kafka Streams state store 
mechanisms.
+ * 
+ *
+ * Usage
+ *
+ * The iterator can be used for different types of range-based 
operations, such as:
+ * 
+ * Iterating over all keys within a range.
+ * Prefix-based scans (when combined with dynamically calculated 
range endpoints).
+ * Open-ended range queries (e.g., from a given key to the end of 
the dataset).
+ 

Re: [PR] MINOR: Add 4.0.0 to streams system tests [kafka]

2025-03-27 Thread via GitHub


dajac merged PR #19239:
URL: https://github.com/apache/kafka/pull/19239


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-03-27 Thread via GitHub


gongxuanzhang commented on code in PR #19226:
URL: https://github.com/apache/kafka/pull/19226#discussion_r2017802756


##
server-common/src/main/java/org/apache/kafka/server/common/DelayedDeleteRecords.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.server.purgatory.DelayedOperation;
+
+import com.yammer.metrics.core.Meter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * A delayed delete records operation that can be created by the replica 
manager and watched
+ * in the delete records operation purgatory
+ */
+public class DelayedDeleteRecords extends DelayedOperation {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DelayedDeleteRecords.class);

Review Comment:
   I don't think changing the name will affect anything. Do we have a special 
need to keep the name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR UnifiedLog topic-partition as attribute as it is stable [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #19253:
URL: https://github.com/apache/kafka/pull/19253#issuecomment-2760076574

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18877: Add an mechanism to find cases where we accessed variables from the wrong thread. [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #19231:
URL: https://github.com/apache/kafka/pull/19231#issuecomment-2760076628

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18068: Fixing typo in ProducerConfig [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #17908:
URL: https://github.com/apache/kafka/pull/17908#issuecomment-2760101395

   This PR has been closed since it has not had any activity in 120 days. If 
you feel like this
   was a mistake, or you would like to continue working on it, please feel free 
to re-open the 
   PR and ask for a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7516: Attempt to dynamically load ManagementFactory class [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #17969:
URL: https://github.com/apache/kafka/pull/17969#issuecomment-2760101431

   This PR has been closed since it has not had any activity in 120 days. If 
you feel like this
   was a mistake, or you would like to continue working on it, please feel free 
to re-open the 
   PR and ask for a review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-7516: Attempt to dynamically load ManagementFactory class [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] closed pull request #17969: KAFKA-7516: Attempt to 
dynamically load ManagementFactory class
URL: https://github.com/apache/kafka/pull/17969


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18068: Fixing typo in ProducerConfig [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] closed pull request #17908: KAFKA-18068: Fixing typo in 
ProducerConfig
URL: https://github.com/apache/kafka/pull/17908


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17423 Trie implementation [kafka]

2025-03-27 Thread via GitHub


github-actions[bot] commented on PR #17087:
URL: https://github.com/apache/kafka/pull/17087#issuecomment-2760101352

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only

2025-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939097#comment-17939097
 ] 

Matthias J. Sax commented on KAFKA-3370:


Nobody is currently working on this AFAIK.

> Add options to auto.offset.reset to reset offsets upon initialization only
> --
>
> Key: KAFKA-3370
> URL: https://issues.apache.org/jira/browse/KAFKA-3370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
>
> Currently "auto.offset.reset" is applied in the following two cases:
> 1) upon starting the consumer for the first time (hence no committed offsets 
> before);
> 2) upon fetching offsets out-of-range.
> For scenarios where case 2) needs to be avoid (i.e. people need to be 
> notified upon offsets out-of-range rather than silently offset reset), 
> "auto.offset.reset" need to be set to "none". However for case 1) setting 
> "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon 
> polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied 
> trying to set the offset at initialization, which are actually designed for 
> during the life time of the consumer (in rebalance callback, for example).
> The fix proposal is to add two more options to "auto.offset.reset", 
> "earliest-on-start", and "latest-on-start", whose semantics are "earliest" 
> and "latest" for case 1) only, and "none" for case 2).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19050) kafka-streams-integration-tests artifact is empty

2025-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939098#comment-17939098
 ] 

Matthias J. Sax commented on KAFKA-19050:
-

There was always a desire for some public test artifacts. Re-using this module 
does not seems to be the right way though, but we should do a proper KIP, and 
have a prober module for it, if we really want to do this (what is a 
larger/heavy lift...)

I agree that publishing and empty jar does not make sense, and we should fix 
the build script to exclude this artifact if possible. We should mark this 
ticket as blocker for 4.0.1 and 4.1.0 releases to fix it on time.

Note, that the actually tests (including `EmbeddedKafkaCluster`) should be 
published as "test" jar, cf 
[kafka-streams-integration-tests-4.0.0-test.jar|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-integration-tests/4.0.0/kafka-streams-integration-tests-4.0.0-test.jar]
 – so what you what is already there.

> kafka-streams-integration-tests artifact is empty
> -
>
> Key: KAFKA-19050
> URL: https://issues.apache.org/jira/browse/KAFKA-19050
> Project: Kafka
>  Issue Type: Improvement
>  Components: build, streams
>Affects Versions: 4.0.0
>Reporter: Utku Aydin
>Priority: Major
>
> Not sure whether this was intended or not but currently the release process 
> releases an artifact with an empty jar containing only the manifest for the 
> kafka-streams-integration-tests module. Since these tests are now in their 
> own module, I think it's an opportunity to make them available to end-users 
> as well. Personally, I would like to use EmbeddedKafkaCluster from 
> org.apache.kafka.streams.integration.utils for my own integration tests since 
> io.github.embeddedkafka is no longer being maintained.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]

2025-03-27 Thread via GitHub


fonsdant commented on PR #18610:
URL: https://github.com/apache/kafka/pull/18610#issuecomment-2759949221

   @agavra, thanks for reviewing! I have push some commits :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17078: Add SecurityManagerCompatibility shim (#16522) [kafka]

2025-03-27 Thread via GitHub


showuon commented on PR #19221:
URL: https://github.com/apache/kafka/pull/19221#issuecomment-2760036097

   Let's wait until we have consensus in the community. I've replied in this 
thread: https://lists.apache.org/thread/6k942pphowd28dh9gn6xbnngk6nxs3n0 . 
@gharris1727 , it'd be good if you could also comment on that thread. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`

2025-03-27 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939117#comment-17939117
 ] 

Lan Ding commented on KAFKA-19024:
--

In the current implementation, the `group.share.max.groups` is only used when 
setting the maxEntries for the ShareSessionCache.
Perhaps we could introduce a new error type `MAX_SHARE_GROUP_SIZE_REACHED`. 
When processing a Heartbeat request, if the group ID does not exist and the 
`group.share.max.groups` limit is reached, this exception would be thrown. 
Clients could then catch this exception and handle it accordingly (e.g., 
logging an error message). Do you think this approach is feasible?

> Enhance the client behaviour when it tries to exceed the 
> `group.share.max.groups`
> -
>
> Key: KAFKA-19024
> URL: https://issues.apache.org/jira/browse/KAFKA-19024
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sanskar Jhajharia
>Assignee: Lan Ding
>Priority: Minor
>
> For share groups we use the `group.share.max.groups` config to define the 
> number of max share groups we allow. However, when we exceed the same, the 
> client logs do not specify any such error and simply do not consume. The 
> group doesn't get created but the client continues to send Heartbeats hoping 
> for one of the existing groups to shut down and allowing it to form a group. 
> Having a log or an exception in the client logs will help them debug such 
> situations accurately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17541) Improve handling of delivery count

2025-03-27 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-17541:


Assignee: Lan Ding  (was: Andrew Schofield)

> Improve handling of delivery count
> --
>
> Key: KAFKA-17541
> URL: https://issues.apache.org/jira/browse/KAFKA-17541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Lan Ding
>Priority: Major
>
> There are two situations in which the delivery count handling needs to be 
> more intelligent.
> First, for records which are automatically released as a result of closing a 
> share session normally, the delivery count should not be incremented. These 
> records were fetched but they were not actually delivered to the client since 
> the disposition of the delivery records is carried in the ShareAcknowledge 
> which closes the share session. Any remaining records were not delivered, 
> only fetched.
> Second, for records which have a delivery count which is more than 1 or 2, 
> there is a suspicion that the records are not being delivered due to a 
> problem rather than just natural retrying. The batching of these records 
> should be reduced, even down to a single record as a time so we do not have 
> the failure to deliver a poisoned record actually causing adjacent records to 
> be considered unsuccessful and potentially reach the delivery count limit 
> without proper reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19048) Minimal Movement Replica Balancing algorithm

2025-03-27 Thread Jialun Peng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jialun Peng updated KAFKA-19048:

Description: 
h2. Motivation

Kafka clusters typically require rebalancing of topic replicas after horizontal 
scaling to evenly distribute the load across new and existing brokers. The 
current rebalancing approach does not consider the existing replica 
distribution, often resulting in excessive and unnecessary replica movements. 
These unnecessary movements increase rebalance duration, consume significant 
bandwidth and CPU resources, and potentially disrupt ongoing production and 
consumption operations. Thus, a replica rebalancing strategy that minimizes 
movements while achieving an even distribution of replicas is necessary.
h2. Goals

The proposed approach prioritizes the following objectives:
 # {*}Minimal Movement{*}: Minimize the number of replica relocations during 
rebalancing.
 # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed across 
brokers.
 # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled.
 # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers.
 # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent 
failover traffic concentration in case of broker failures.

h2. Proposed Changes
h3. Rack-Level Replica Distribution

The following rules ensure balanced replica allocation at the rack level:
 # *When* {{{}*rackCount = replicationFactor*{}}}:

 * Each rack receives exactly {{partitionCount}} replicas.

       *       *2. *When* {{{}*rackCount > replicationFactor*{}}}:
 * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ 
partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas.

 * If weighted allocation {{{}< partitionCount{}}}: distribute remaining 
replicas using a weighted remainder allocation.

h3. Node-Level Replica Distribution
 # If the number of replicas assigned to a rack is not a multiple of the number 
of nodes in that rack, some nodes will host one additional replica compared to 
others.
 # *When* {{{}*rackCount = replicationFactor*{}}}:

 * If all racks have an equal number of nodes, each node will host an equal 
number of replicas.

 * If rack sizes vary, nodes in larger racks will host fewer replicas on 
average.

      *      *3. *When* {{{}*rackCount > replicationFactor*{}}}:
 * If no rack has a significantly higher node weight, replicas will be evenly 
distributed.

 * If a rack has disproportionately high node weight, those nodes will receive 
fewer replicas.

h3. Anti-Affinity Support

When anti-affinity is enabled, the rebalance algorithm ensures that replicas of 
the same partition do not colocate on the same rack. Brokers without rack 
configuration are excluded from anti-affinity checks.

In this way we can unify the implementation logic of rack-aware and 
non-rack-aware.

 

*Replica Balancing* *Algorithm*

Through the above steps, we can calculate the ideal replica count for each node 
and rack.
Based on the initial replica distribution of topics, we obtain the current 
replica partition allocation across nodes and racks, allowing us to identify 
which nodes violate anti-affinity rules.

We iterate through nodes with the following priority:
 # First process nodes that violate anti-affinity rules
 # Then process nodes whose current replica count exceeds the desired replica 
count (prioritizing those with the largest discrepancy)

For these identified nodes, we relocate their replicas to target nodes that:
 * Satisfy all anti-affinity constraints

 * Have a current replica count below their ideal allocation

This process continues iteratively until:
 * No nodes violate anti-affinity rules

 * All nodes' current replica counts match their desired replica counts

Upon satisfying these conditions, we achieve balanced replica distribution 
across nodes.

 

*Leader* *Balancing* *Algorithm*

*Target Leader Calculation:*

Compute baseline average: {{leader_avg = total_partitions / total_nodes}}

Identify broker where {{{}replica_count ≤ leader_avg{}}}:
 * Designate all replicas as leaders on these brokers

 * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}}

 * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= processed_brokers{}}}

Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy 
{{replica_count ≥ leader_avg}}

*Leader Assignment Constraints:*

Final targets:
 * Light {{{}brokers{}}}: {{target_leaders = replica_count}}

 * Normal {{{}broker{}}}s: {{target_leaders = leader_avg}}

 

For each partition, select the {{broker}} with the largest difference between 
its {{{}target_leaders }}and current leader count to become that partition's 
leader. Upon completing this traversal, we achieve uniform leader distribution 
across all brokers{}}}.

 

*Optimizing ISR Order*

During Leader Rebalancing, the

Re: [PR] KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on PR #19223:
URL: https://github.com/apache/kafka/pull/19223#issuecomment-2752123826

   I try to cherry-pick 
https://github.com/apache/kafka/commit/4a8a0637e07734779b40ba9785842311144f922c 
to 4.0, but there are some conflicts. @jsancio do you have free cycle to file 
PR to cherry-pick it to 4.0?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random

2025-03-27 Thread Lorcan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lorcan reassigned KAFKA-9904:
-

Assignee: Lorcan

> Use ThreadLocalConcurrent to Replace Random
> ---
>
> Key: KAFKA-9904
> URL: https://issues.apache.org/jira/browse/KAFKA-9904
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Assignee: Lorcan
>Priority: Trivial
>
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14485) Move LogCleaner to storage module

2025-03-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938848#comment-17938848
 ] 

Chia-Ping Tsai commented on KAFKA-14485:


[~javakillah] that is a good question. we have different ways to handle the 
dynamical config in code base.

1. using the self-updated KafkaConfig, likes `ReplicaManager`
2. using the reconfigurable interface to refresh the inner config, likes 
`LogCleaner`

Personally, I don't like the self-updated KafkaConfig, since it creates 
complicated dependencies and god object.

Go back to your question. I prefer to try following changes.

1. move the cleaner-related getters from KafkaConfig to `CleanerConfig`
2. temporarily allow `KafkaConfig` to create `CleanerConfig` to access the 
getters for some config
3. CleanerConfig constructor can take `AbstractConfig` to initialize all 
variables

In order to address above changes, we can move 
`org.apache.kafka.server.config.BrokerReconfigurable` to server-common module 
and change the "AbstractKafkaConfig" to "AbstractConfig". Of course, we need to 
do a bit refactor for `DynamicProducerStateManagerConfig`

> Move LogCleaner to storage module
> -
>
> Key: KAFKA-14485
> URL: https://issues.apache.org/jira/browse/KAFKA-14485
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Dmitry Werner
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016068726


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Excuse me, is it possible that `fetch.takeAcknowledgedRecords()` returns non 
empty records when `fetch.isEmpty` is true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19036: Rewrite LogAppendTimeTest and move it to storage module [kafka]

2025-03-27 Thread via GitHub


FrankYang0529 commented on PR #19282:
URL: https://github.com/apache/kafka/pull/19282#issuecomment-2757324418

   @chia7712 Thanks for the review. I addressed all comments and CI passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19226:
URL: https://github.com/apache/kafka/pull/19226#discussion_r2016093160


##
server-common/src/main/java/org/apache/kafka/server/common/DelayedDeleteRecords.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;

Review Comment:
   +1 to `org.apache.kafka.server.purgatory`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19032: Remove TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames [kafka]

2025-03-27 Thread via GitHub


FrankYang0529 commented on PR #19270:
URL: https://github.com/apache/kafka/pull/19270#issuecomment-2757323381

   @chia7712 Thanks for the reminder. I fix conflicts and CI passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-18902) Implement ShareConsumer option to throw on poll if there are unacked records

2025-03-27 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-18902:


Assignee: Andrew Schofield  (was: Shivsundar R)

> Implement ShareConsumer option to throw on poll if there are unacked records
> 
>
> Key: KAFKA-18902
> URL: https://issues.apache.org/jira/browse/KAFKA-18902
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 4.1.0
>
>
> Currently, an application using explicit acknowledgement which neglects to 
> acknowledge all of the records received from calling `poll(Duration)` is 
> re-presented with the records on the next call to poll. This has been shown 
> to be confusing.
> An option will be added, `share.acknowledgement.mode=explicit` to throw an 
> exception on poll instead.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19049: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base [kafka]

2025-03-27 Thread via GitHub


Rancho-7 commented on PR #19299:
URL: https://github.com/apache/kafka/pull/19299#issuecomment-2757438387

   > @Rancho-7 please cleanup `SaslApiVersionsRequestTest` and 
`StaticBrokerConfigTest` too
   
   Thanks for pointing out! Will fix it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]

2025-03-27 Thread via GitHub


TaiJuWu commented on code in PR #19277:
URL: https://github.com/apache/kafka/pull/19277#discussion_r2016120749


##
storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+private final File tmpDir = TestUtils.tempDirectory();
+private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+private final MockTime time = new MockTime(0, 0);
+
+@AfterEach
+public void tearDown() throws IOException {
+Utils.delete(tmpDir);
+}
+
+/**
+ * Test broker-side compression configuration
+ */
+@ParameterizedTest
+@MethodSource("allCompressionParameters")
+public void testBrokerSideCompression(CompressionType 
messageCompressionType, BrokerCompressionType brokerCompressionType) throws 
IOException {
+Compression messageCompression = 
Compression.of(messageCompressionType).build();
+
+/* Configure broker-side compression */
+UnifiedLog log = UnifiedLog.create(

Review Comment:
   Thanks for catching it. Use `try-resource` to close log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are acknowledged with 
GAP (indicating the client is ignoring these control records) and are never 
presented to the consumer application.
   
   - Now these control records are auto acknowledged with `GAP` and will be 
sent on the next `ShareFetch`/`ShareAcknowledge` request. But as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now for this PR, we have added any possible acknowledgements that came in 
with the empty fetch (from control records) to the `ShareFetchEvent` so that it 
can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, but client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are usually auto acknowledged with `GAP` and 
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now for this PR, we have added any possible acknowledgements that came in 
with the empty fetch (from control records) to the `ShareFetchEvent` so that it 
can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, but client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are usually auto acknowledged with `GAP` and 
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now for this PR, we have added any possible acknowledgements that came in 
with the empty fetch (from control records) to the `ShareFetchEvent` so that it 
can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]

2025-03-27 Thread via GitHub


dajac commented on PR #19281:
URL: https://github.com/apache/kafka/pull/19281#issuecomment-2757464241

   > nit: It appears `ShareGroupHeartbeatRequestTest` also requires cleanup. 
However, it's acceptable to leave it as is - or we can fix it in the follow-up
   
   Removed it and few others related to Share requests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, but client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are usually auto acknowledged with `GAP` and 
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now after this PR, any possible acknowledgements that came in with the 
empty fetch (from control records) to the `ShareFetchEvent` are added so that 
it can be sent on the next `poll()`.
   
   - We cannot present these to the application, so the check for 
`fetch.isEmpty` cannot be altered. But yeah there is a case when this could 
happen.  I agree it looks a bit odd though for readability.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Kind of :)) So it would have empty records but could have non-empty 
acknowledgements (for skipped records). 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, so the records never arrive to the application thread, but 
client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are usually auto acknowledged with `GAP` and 
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now after this PR, any possible acknowledgements that came in with the 
empty fetch (from control records) to the `ShareFetchEvent` are added so that 
it can be sent on the next `poll()`.
   
   - We cannot present these to the application, so the check for 
`fetch.isEmpty` cannot be altered. But yeah there is a case when this could 
happen.  I agree it looks a bit odd though for readability.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]

2025-03-27 Thread via GitHub


dajac commented on code in PR #19281:
URL: https://github.com/apache/kafka/pull/19281#discussion_r2015985621


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala:
##
@@ -37,15 +37,17 @@ import java.lang.{Byte => JByte}
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  brokers = 1,
+  serverProperties = Array(
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+  )
+)

Review Comment:
   Interesting. I was not aware of this. Let me remove them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [4/N] Move PlaintextConsumerCallbackTest to client-integration-tests module [kafka]

2025-03-27 Thread via GitHub


frankvicky commented on code in PR #19298:
URL: https://github.com/apache/kafka/pull/19298#discussion_r2015873494


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java:
##
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.callback;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
+import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ClusterTestDefaults(
+types = {Type.KRAFT},
+brokers = 3
+)
+@ExtendWith(ClusterTestExtensions.class)
+public class PlaintextConsumerCallbackTest {
+
+private final ClusterInstance cluster;
+private final String topic = "topic";
+private final TopicPartition tp = new TopicPartition(topic, 0);
+
+public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) {
+this.cluster = clusterInstance;
+}
+
+@ClusterTest
+public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+try (var consumer = createConsumer(CLASSIC)) {
+triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+});
+}
+}
+
+@ClusterTest
+public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() 
throws InterruptedException {
+try (var consumer = createConsumer(CONSUMER)) {
+triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, 
partitions) -> {
+var e = assertThrows(IllegalStateException.class, () -> 
executeConsumer.assign(List.of(tp)));
+assertEquals("Subscription to topics, partitions and pattern 
are mutually exclusive", e.getMessage());
+});
+}
+}
+
+@ClusterTest
+public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() 
throws InterruptedException {

Review Comment:
   You have `testAsyncConsumer...` above, so I suggest you also name it 
`testClassicConsumer...` also



##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java:
##
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ *

Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19277:
URL: https://github.com/apache/kafka/pull/19277#discussion_r2015807142


##
storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+private final File tmpDir = TestUtils.tempDirectory();
+private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+private final MockTime time = new MockTime(0, 0);
+
+@AfterEach
+public void tearDown() throws IOException {
+Utils.delete(tmpDir);
+}
+
+/**
+ * Test broker-side compression configuration
+ */
+@ParameterizedTest
+@MethodSource("allCompressionParameters")
+public void testBrokerSideCompression(CompressionType 
messageCompressionType, BrokerCompressionType brokerCompressionType) throws 
IOException {
+Compression messageCompression = 
Compression.of(messageCompressionType).build();
+
+/* Configure broker-side compression */
+UnifiedLog log = UnifiedLog.create(

Review Comment:
   Should we close the `log`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]

2025-03-27 Thread via GitHub


dajac commented on code in PR #19281:
URL: https://github.com/apache/kafka/pull/19281#discussion_r2015820533


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala:
##
@@ -37,15 +37,17 @@ import java.lang.{Byte => JByte}
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  brokers = 1,
+  serverProperties = Array(
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+  )
+)

Review Comment:
   Totally. Missed that one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19049) Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base

2025-03-27 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-19049:
--

 Summary: Remove the `@ExtendWith(ClusterTestExtensions.class)` 
from code base
 Key: KAFKA-19049
 URL: https://issues.apache.org/jira/browse/KAFKA-19049
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


KAFKA-18671 introduced the mechanism to inject the cluster test at runtime, so 
the integration tests don't need to use 
@ExtendWith(ClusterTestExtensions.class) any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19036: Rewrite LogAppendTimeTest and move it to storage module [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19282:
URL: https://github.com/apache/kafka/pull/19282#discussion_r2015816441


##
storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(ClusterTestExtensions.class)

Review Comment:
   this is unnecessary now. see 
https://issues.apache.org/jira/browse/KAFKA-19049



##
storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.common.test.junit.ClusterTestExtensions;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(ClusterTestExtensions.class)
+public class LogAppendTimeTest {
+@ClusterTest(
+types = {Type.KRAFT},
+brokers = 2,

Review Comment:
   it seems we don't need to create 2 broke

Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-03-27 Thread via GitHub


wernerdv commented on PR #19216:
URL: https://github.com/apache/kafka/pull/19216#issuecomment-2756859134

   @chia7712 @junrao @mimaison @frankvicky Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19281:
URL: https://github.com/apache/kafka/pull/19281#discussion_r2015826951


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala:
##
@@ -37,15 +37,17 @@ import java.lang.{Byte => JByte}
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  brokers = 1,
+  serverProperties = Array(
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+  )
+)

Review Comment:
   we don't need to add `@Tag("integration")` to the class, as `@ClusterTest` 
automatically add the `@Tag("integration")` to the test case (method).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19049) Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base

2025-03-27 Thread Nick Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nick Guo reassigned KAFKA-19049:


Assignee: Nick Guo  (was: Chia-Ping Tsai)

> Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base
> 
>
> Key: KAFKA-19049
> URL: https://issues.apache.org/jira/browse/KAFKA-19049
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Nick Guo
>Priority: Minor
>
> KAFKA-18671 introduced the mechanism to inject the cluster test at runtime, 
> so the integration tests don't need to use 
> @ExtendWith(ClusterTestExtensions.class) any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19049) Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base

2025-03-27 Thread Nick Guo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938828#comment-17938828
 ] 

Nick Guo commented on KAFKA-19049:
--

Hi [~chia7712] ,I would like to take this issue.Thanks!

> Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base
> 
>
> Key: KAFKA-19049
> URL: https://issues.apache.org/jira/browse/KAFKA-19049
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> KAFKA-18671 introduced the mechanism to inject the cluster test at runtime, 
> so the integration tests don't need to use 
> @ExtendWith(ClusterTestExtensions.class) any more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]

2025-03-27 Thread via GitHub


frankvicky commented on PR #19269:
URL: https://github.com/apache/kafka/pull/19269#issuecomment-2757028923

   Hi @ableegoldman,
   
   > can you look into adding a test to make sure that the producer does still 
get reset/recreated properly if the producer is reset (eg transaction hits a 
timeout exception) but the StreamThread is not shutting down?
   
   I've investigated the related code and found writing a test for this 
specific scenario challenging.
   
   The main difficulty is that `TaskManager` doesn't control the timeout logic. 
While `TaskManager#handleLostAll` invokes 
`ActiveTaskCreator#reInitializeProducer`, this method only triggers when a 
`TaskMigratedException` occurs. The entire process is managed by `StreamThread`.
   
   The main pain point in writing this test lies in the deeply nested component 
structure: `StreamsProducer` is a member of `ActiveTaskCreator`, which is a 
member of `TaskManager`, which is ultimately controlled by `StreamThread`. When 
using mocks, this multi-layered nesting makes testing extremely complex.
   We typically mock outer components, but this makes it difficult to directly 
access and verify the reset behavior of the inner `StreamsProducer`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19049: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base [kafka]

2025-03-27 Thread via GitHub


Rancho-7 opened a new pull request, #19299:
URL: https://github.com/apache/kafka/pull/19299

   jira: https://issues.apache.org/jira/browse/KAFKA-19049
   
   [KAFKA-18671](https://issues.apache.org/jira/browse/KAFKA-18671) introduced 
the mechanism to inject the cluster test at runtime, so the integration tests 
don't need to use `@ExtendWith(ClusterTestExtensions.class)` any more.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Revert migrate LogFetchInfo, Assignment and RequestAndCompletionHandler to java record [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19177:
URL: https://github.com/apache/kafka/pull/19177#discussion_r2016096742


##
server/src/main/java/org/apache/kafka/server/Assignment.java:
##
@@ -24,25 +24,62 @@
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.server.common.TopicIdPartition;
 
-/**
- * @param topicIdPartition The topic ID and partition index of the replica.
- * @param directoryId  The ID of the directory we are placing the replica 
into.
- * @param submissionTimeNs The time in monotonic nanosecond when this 
assignment was created.
- * @param successCallback  The callback to invoke on success.
- */
-record Assignment(
-TopicIdPartition topicIdPartition,
-Uuid directoryId,
-long submissionTimeNs,
-Runnable successCallback
-) {
+final class Assignment {

Review Comment:
   ditto



##
raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java:
##
@@ -21,4 +21,13 @@
 /**
  * Metadata for the records fetched from log, including the records itself
  */
-public record LogFetchInfo(Records records, LogOffsetMetadata 
startOffsetMetadata) { }
+public class LogFetchInfo {

Review Comment:
   Could you please leave comments to explain why we don't use record class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17830) Cover unit tests for TBRLMM init failure scenarios

2025-03-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17830.

Fix Version/s: 4.1.0
 Assignee: PoAn Yang  (was: Anshul Goyal)
   Resolution: Fixed

> Cover unit tests for TBRLMM init failure scenarios
> --
>
> Key: KAFKA-17830
> URL: https://issues.apache.org/jira/browse/KAFKA-17830
> Project: Kafka
>  Issue Type: Task
>Reporter: Kamal Chandraprakash
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: new-bie
> Fix For: 4.1.0
>
>
> [TopicBasedRemoteLogMetadataManagerTest|https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java]
>  does not cover initialization failure scenarios, it will be good to cover 
> those cases with unit tests.
> See: [https://github.com/apache/kafka/pull/17492#issuecomment-2422144959]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-27 Thread via GitHub


ShivsundarR commented on code in PR #19295:
URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java:
##
@@ -644,8 +645,12 @@ private ShareFetch collect(Map ack
 if (currentFetch.isEmpty()) {
 final ShareFetch fetch = fetchCollector.collect(fetchBuffer);
 if (fetch.isEmpty()) {
+// Check for any acknowledgements which could have come from 
control records (GAP) and include them.
+Map 
combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap);
+
combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords());
+
 // Fetch more records and send any waiting acknowledgements
-applicationEventHandler.add(new 
ShareFetchEvent(acknowledgementsMap));
+applicationEventHandler.add(new 
ShareFetchEvent(combinedAcknowledgements));

Review Comment:
   Yes :)) turns out it can. 
   
   - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 
revealed that in transactions, when client receives only a control record(eg. 
an abort marker) in the `ShareFetchResponse` (without any non-control record), 
then in the `ShareCompletedFetch`, these control records are never 
acknowledged(ideally acknowledged with GAP, indicating the client is ignoring 
these control records) and are never presented to the consumer application. 
   
   - It is expected that control records are skipped and are not presented to 
the application, but client should still acknowledge them with GAP 
(https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33)
   
   - Now these control records are usually auto acknowledged with `GAP` and 
will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as 
`fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, 
we actually ignore the fetch here(meaning we never acknowledge these control 
records) - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598
   
   - Now after this PR, any possible acknowledgements that came in with the 
empty fetch (from control records) to the `ShareFetchEvent` are added so that 
it can be sent on the next poll().
   
   - I agree it looks a bit odd though for readability. But yeah there is a 
case when this could happen. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19036: Rewrite LogAppendTimeTest and move it to storage module [kafka]

2025-03-27 Thread via GitHub


FrankYang0529 commented on code in PR #19282:
URL: https://github.com/apache/kafka/pull/19282#discussion_r2016935923


##
storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java:
##
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.TestUtils;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.Type;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LogAppendTimeTest {
+@ClusterTest(
+types = {Type.KRAFT},
+serverProperties = {
+@ClusterConfigProperty(key = "log.message.timestamp.type", value = 
"LogAppendTime"),

Review Comment:
   Yes, add another test case `testProduceConsumeWithConfigOnTopic`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18390: Use LinkedHashMap instead of Map in creating MetricName and SensorBuilder (1/N) [kafka]

2025-03-27 Thread via GitHub


dajac commented on PR #19300:
URL: https://github.com/apache/kafka/pull/19300#issuecomment-2757903933

   @TaiJuWu Thanks for the patch. Out of curiosity, have we checked that jmx 
metric names are not altered by this change? I think that they are generated 
based on the order in the data structure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19032: Remove TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on PR #19270:
URL: https://github.com/apache/kafka/pull/19270#issuecomment-2757016872

   @FrankYang0529 could you please fix the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-27 Thread via GitHub


bbejeck commented on PR #18953:
URL: https://github.com/apache/kafka/pull/18953#issuecomment-2758977759

   @mjsax comments addressed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`

2025-03-27 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield reassigned KAFKA-19024:


Assignee: Lan Ding  (was: Andrew Schofield)

By all means. Here you go.

> Enhance the client behaviour when it tries to exceed the 
> `group.share.max.groups`
> -
>
> Key: KAFKA-19024
> URL: https://issues.apache.org/jira/browse/KAFKA-19024
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sanskar Jhajharia
>Assignee: Lan Ding
>Priority: Minor
>
> For share groups we use the `group.share.max.groups` config to define the 
> number of max share groups we allow. However, when we exceed the same, the 
> client logs do not specify any such error and simply do not consume. The 
> group doesn't get created but the client continues to send Heartbeats hoping 
> for one of the existing groups to shut down and allowing it to form a group. 
> Having a log or an exception in the client logs will help them debug such 
> situations accurately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-18971) Update AK system tests for AK 4.0

2025-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939041#comment-17939041
 ] 

Matthias J. Sax commented on KAFKA-18971:
-

Seems this is already done: [https://github.com/apache/kafka/pull/19239] 

> Update AK system tests for AK 4.0
> -
>
> Key: KAFKA-18971
> URL: https://issues.apache.org/jira/browse/KAFKA-18971
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 4.1.0
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
> Fix For: 4.1.0
>
>
> Update AK system tests and add new “upgrade_from” version to {{StreamsConfig}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17541) Improve handling of delivery count

2025-03-27 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939043#comment-17939043
 ] 

Andrew Schofield commented on KAFKA-17541:
--

Sure, if the description is clear. I was really expecting this to be in 4.2, 
but if you can accelerate it into 4.1, go ahead.

> Improve handling of delivery count
> --
>
> Key: KAFKA-17541
> URL: https://issues.apache.org/jira/browse/KAFKA-17541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>
> There are two situations in which the delivery count handling needs to be 
> more intelligent.
> First, for records which are automatically released as a result of closing a 
> share session normally, the delivery count should not be incremented. These 
> records were fetched but they were not actually delivered to the client since 
> the disposition of the delivery records is carried in the ShareAcknowledge 
> which closes the share session. Any remaining records were not delivered, 
> only fetched.
> Second, for records which have a delivery count which is more than 1 or 2, 
> there is a suspicion that the records are not being delivered due to a 
> problem rather than just natural retrying. The batching of these records 
> should be reduced, even down to a single record as a time so we do not have 
> the failure to deliver a poisoned record actually causing adjacent records to 
> be considered unsuccessful and potentially reach the delivery count limit 
> without proper reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang reassigned KAFKA-19047:


Assignee: Alyssa Huang

> Broker registrations are slow if previously fenced or shutdown
> --
>
> Key: KAFKA-19047
> URL: https://issues.apache.org/jira/browse/KAFKA-19047
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Major
>
> BrokerLifecycleManager prevents registration of a broker w/ an id it has seen 
> before with a different incarnation id if the broker session expires. On 
> clean shutdown and restart of a broker this can cause an unnecessary delay in 
> re-registration while the quorum controller waits for the session to expire.
> ```
> [BrokerLifecycleManager id=1] Unable to register broker 1 because the 
> controller returned error DUPLICATE_BROKER_REGISTRATION
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19048) Minimal Movement Replica Balancing algorithm

2025-03-27 Thread Jialun Peng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jialun Peng updated KAFKA-19048:

Labels: pull-request-available  (was: )

> Minimal Movement Replica Balancing algorithm
> 
>
> Key: KAFKA-19048
> URL: https://issues.apache.org/jira/browse/KAFKA-19048
> Project: Kafka
>  Issue Type: Improvement
>  Components: generator
>Reporter: Jialun Peng
>Assignee: Jialun Peng
>Priority: Major
>  Labels: pull-request-available
>
> h2. Motivation
> Kafka clusters typically require rebalancing of topic replicas after 
> horizontal scaling to evenly distribute the load across new and existing 
> brokers. The current rebalancing approach does not consider the existing 
> replica distribution, often resulting in excessive and unnecessary replica 
> movements. These unnecessary movements increase rebalance duration, consume 
> significant bandwidth and CPU resources, and potentially disrupt ongoing 
> production and consumption operations. Thus, a replica rebalancing strategy 
> that minimizes movements while achieving an even distribution of replicas is 
> necessary.
> h2. Goals
> The proposed approach prioritizes the following objectives:
>  # {*}Minimal Movement{*}: Minimize the number of replica relocations during 
> rebalancing.
>  # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed 
> across brokers.
>  # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled.
>  # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers.
>  # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent 
> failover traffic concentration in case of broker failures.
> h2. Proposed Changes
> h3. Rack-Level Replica Distribution
> The following rules ensure balanced replica allocation at the rack level:
>  # *When* {{{}*rackCount = replicationFactor*{}}}:
>  * Each rack receives exactly {{partitionCount}} replicas.
>        **       2. *When* {{{}*rackCount > replicationFactor*{}}}:
>  * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ 
> partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas.
>  * If weighted allocation {{{}< partitionCount{}}}: distribute remaining 
> replicas using a weighted remainder allocation.
> h3. Node-Level Replica Distribution
>  # If the number of replicas assigned to a rack is not a multiple of the 
> number of nodes in that rack, some nodes will host one additional replica 
> compared to others.
>  # *When* {{{}*rackCount = replicationFactor*{}}}:
>  * If all racks have an equal number of nodes, each node will host an equal 
> number of replicas.
>  * If rack sizes vary, nodes in larger racks will host fewer replicas on 
> average.
>       **      3. *When* {{{}*rackCount > replicationFactor*{}}}:
>  * If no rack has a significantly higher node weight, replicas will be evenly 
> distributed.
>  * If a rack has disproportionately high node weight, those nodes will 
> receive fewer replicas.
> h3. Anti-Affinity Support
> When anti-affinity is enabled, the rebalance algorithm ensures that replicas 
> of the same partition do not colocate on the same rack. Brokers without rack 
> configuration are excluded from anti-affinity checks.
> In this way we can unify the implementation logic of rack-aware and 
> non-rack-aware.
>  
> *Replica Balancing* *Algorithm*
> Through the above steps, we can calculate the ideal replica count for each 
> node and rack.
> Based on the initial replica distribution of topics, we obtain the current 
> replica partition allocation across nodes and racks, allowing us to identify 
> which nodes violate anti-affinity rules.
> We iterate through nodes with the following priority:
>  # First process nodes that violate anti-affinity rules
>  # Then process nodes whose current replica count exceeds the desired replica 
> count (prioritizing those with the largest discrepancy)
> For these identified nodes, we relocate their replicas to target nodes that:
>  * Satisfy all anti-affinity constraints
>  * Have a current replica count below their ideal allocation
> This process continues iteratively until:
>  * No nodes violate anti-affinity rules
>  * All nodes' current replica counts match their desired replica counts
> Upon satisfying these conditions, we achieve balanced replica distribution 
> across nodes.
>  
> *Leader* *Balancing* *Algorithm*
> *Target Leader Calculation:*
> Compute baseline average: {{leader_avg = floor(total_partitions / 
> total_nodes)}}
> Identify broker where {{{}replica_count ≤ leader_avg{}}}:
>  * Designate all replicas as leaders on these brokers
>  * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}}
>  * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= 
> processed_br

Re: [PR] KAFKA-18935: Ensure brokers do not return null records in FetchResponse [kafka]

2025-03-27 Thread via GitHub


frankvicky commented on code in PR #19167:
URL: https://github.com/apache/kafka/pull/19167#discussion_r2016857591


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -89,7 +89,7 @@ public FetchResponseData data() {
  */
 public FetchResponse(FetchResponseData fetchResponseData) {
 super(ApiKeys.FETCH);
-this.data = fetchResponseData;
+this.data = convertNullRecordsToEmpty(fetchResponseData);

Review Comment:
   Make sense. It could also avoid overheads for clients.
   Does this approach also need to apply to `ShareFetchResponse`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add log4j2.yaml to clients-integration-tests module [kafka]

2025-03-27 Thread via GitHub


chia7712 commented on code in PR #19252:
URL: https://github.com/apache/kafka/pull/19252#discussion_r2006232652


##
clients/clients-integration-tests/src/test/resources/log4j2.yaml:
##
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Configuration:
+  Properties:
+Property:
+  - name: "logPattern"
+value: "[%d] %p %m (%c:%L)%n"
+
+  Appenders:
+Console:
+  name: STDOUT
+  PatternLayout:
+pattern: "${logPattern}"
+
+  Loggers:
+Root:
+  level: ALL

Review Comment:
   maybe `INFO` is good enough. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-27 Thread via GitHub


bbejeck commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2017246779


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) 
{
 () -> new Processor<>() {
 private KeyValueStore store;

Review Comment:
   True, since we're not processing any records.  I've removed it here, 
declared in the `init` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-27 Thread via GitHub


bbejeck commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2017220549


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) 
{
 () -> new Processor<>() {
 private KeyValueStore store;
 
+// The store iterator is intentionally not closed here as 
it needs
+// to be open during the test, so the Streams app will 
emit the
+// 
org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
+// that is expected. So the globalStoreIterator is a 
global variable
+// (pun not intended), so it can be closed in the tearDown 
method.
 @Override
 public void init(final ProcessorContext 
context) {
 store = context.getStateStore("iq-test-store");
+globalStoreIterator = store.all();
 }
 
 @Override
 public void process(final Record record) {
 store.put(record.key(), record.value());

Review Comment:
   without this the test fails, we need it open to get the iterator metrics



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-27 Thread via GitHub


bbejeck commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2017246779


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) 
{
 () -> new Processor<>() {
 private KeyValueStore store;

Review Comment:
   True, since we're not processing any records.  I've removed it here, 
declared in the `init` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Alyssa Huang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alyssa Huang updated KAFKA-19047:
-
Affects Version/s: 4.0.0

> Broker registrations are slow if previously fenced or shutdown
> --
>
> Key: KAFKA-19047
> URL: https://issues.apache.org/jira/browse/KAFKA-19047
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Major
>
> BrokerLifecycleManager prevents registration of a broker w/ an id it has seen 
> before with a different incarnation id if the broker session expires. On 
> clean shutdown and restart of a broker this can cause an unnecessary delay in 
> re-registration while the quorum controller waits for the session to expire.
> ```
> [BrokerLifecycleManager id=1] Unable to register broker 1 because the 
> controller returned error DUPLICATE_BROKER_REGISTRATION
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random

2025-03-27 Thread Lorcan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939012#comment-17939012
 ] 

Lorcan commented on KAFKA-9904:
---

Hi [~belugabehr], is this something that you'd advise should be replaced 
mostly/completely in the codebase? Or should this be more targeted for the 
scenarios described in the oracle docs? 

I've seen some instances where a Random object is initialised with a particular 
seed, which doesn't seem possible with the ThreadLocalRandom class. It also 
seems to be used for convenience in some of the Scala files to lazily generate 
random strings.

The reason I ask is due to a lack of experience with distributed systems and so 
any insight would be helpful.

> Use ThreadLocalConcurrent to Replace Random
> ---
>
> Key: KAFKA-9904
> URL: https://issues.apache.org/jira/browse/KAFKA-9904
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Mollitor
>Assignee: Lorcan
>Priority: Trivial
>
> https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-19047: Broker registrations are slow if previously fenced or shutdown [kafka]

2025-03-27 Thread via GitHub


splett2 commented on code in PR #19296:
URL: https://github.com/apache/kafka/pull/19296#discussion_r2017080238


##
metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java:
##
@@ -978,6 +980,129 @@ public void 
testBrokerContactTimesAreUpdatedOnClusterControlActivation() {
 contactTime(new BrokerIdAndEpoch(2, 100)));
 }
 
+@Test
+public void testDuplicateBrokerRegistrationWithActiveOldBroker() {
+// active here means brokerHeartbeatManager last recorded the broker 
as unfenced and not in controlled shutdown
+long brokerSessionTimeoutMs = 1000;
+MockTime time = new MockTime(0L, 20L, 1000L);
+FinalizedControllerFeatures finalizedFeatures = new 
FinalizedControllerFeatures(
+Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
+ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
+setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+setFeatureControlManager(createFeatureControlManager()).
+setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { 
}).
+
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
+setTime(time).
+build();
+clusterControl.replay(new RegisterBrokerRecord().
+setBrokerEpoch(100).
+setBrokerId(0).
+setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
+setFenced(false), 10002);
+clusterControl.activate();
+assertEquals(OptionalLong.of(1000L), 
clusterControl.heartbeatManager().tracker().
+contactTime(new BrokerIdAndEpoch(0, 100)));
+
+// while session is still valid for old broker, duplicate requests 
should fail
+time.sleep(brokerSessionTimeoutMs / 2);
+assertThrows(DuplicateBrokerRegistrationException.class, () ->
+clusterControl.registerBroker(new BrokerRegistrationRequestData().
+setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+setBrokerId(0).
+
setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+setFeatures(new 
BrokerRegistrationRequestData.FeatureCollection(
+Set.of(new BrokerRegistrationRequestData.Feature().
+setName(MetadataVersion.FEATURE_NAME).
+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+101L,
+finalizedFeatures,
+false));
+
+// if session expires for broker, even if the broker was active the 
new registration will succeed
+time.sleep(brokerSessionTimeoutMs);
+clusterControl.registerBroker(new BrokerRegistrationRequestData().
+setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+setBrokerId(0).
+setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))).
+setFeatures(new 
BrokerRegistrationRequestData.FeatureCollection(
+Set.of(new BrokerRegistrationRequestData.Feature().
+setName(MetadataVersion.FEATURE_NAME).
+
setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()).
+
setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())).
+setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+101L,
+finalizedFeatures,
+false);
+}
+
+@Test
+public void testDuplicateBrokerRegistrationWithInactiveBroker() {
+// inactive here means brokerHeartbeatManager last recorded the broker 
as fenced or in controlled shutdown
+long brokerSessionTimeoutMs = 1000;
+MockTime time = new MockTime(0L, 20L, 1000L);
+FinalizedControllerFeatures finalizedFeatures = new 
FinalizedControllerFeatures(
+Map.of(MetadataVersion.FEATURE_NAME, 
MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L);
+ClusterControlManager clusterControl = new 
ClusterControlManager.Builder().
+setClusterId("pjvUwj3ZTEeSVQmUiH3IJw").
+setFeatureControlManager(createFeatureControlManager()).
+setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { 
}).
+
setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)).
+setTime(time).
+build();
+// first broker is fenced
+clusterControl.replay(new RegisterBrokerRecord().
+setBrokerEpoch(100).
+setBrokerId(0).
+setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))).
+ 

Re: [PR] KIP-991 Add deletedConnector flag when stopping tasks [kafka]

2025-03-27 Thread via GitHub


hgeraldino closed pull request #13146: KIP-991 Add deletedConnector flag when 
stopping tasks
URL: https://github.com/apache/kafka/pull/13146


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16368: segment.bytes constraints to min 1MB [kafka]

2025-03-27 Thread via GitHub


junrao commented on PR #18140:
URL: https://github.com/apache/kafka/pull/18140#issuecomment-2758776560

   @jayteej : Thanks for the PR and sorry for chiming in late. The KIP 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations
 says that the new constraint applies at the topic level. I tried the following 
on 4.0/trunk and it seems that the constraint isn't really applied at the topic 
level?
   
   ```
   bash-3.2$ bin/kafka-configs.sh --alter --bootstrap-server localhost:9092 
--topic test --add-config segment.bytes=1000
   Completed updating config for topic test.
   bash-3.2$ bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 
--topic test
   Dynamic configs for topic test are:
 segment.bytes=1000 sensitive=false 
synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1000, 
STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, 
DEFAULT_CONFIG:log.segment.bytes=1073741824}
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19051) Fix implicit acknowledgement cannot be overriden when RecordDeserializationException occurs

2025-03-27 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frédérik ROULEAU updated KAFKA-19051:
-
Summary: Fix implicit acknowledgement cannot be overriden when 
RecordDeserializationException occurs  (was: Fix implicit acknowledgement 
cannot be override when RecordDeserializationException occurs)

> Fix implicit acknowledgement cannot be overriden when 
> RecordDeserializationException occurs
> ---
>
> Key: KAFKA-19051
> URL: https://issues.apache.org/jira/browse/KAFKA-19051
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Frédérik ROULEAU
>Priority: Major
>
> When a record generates a RecordDeserializationException, KIP mentioned that 
> with explicit acknowledgement the default Release can be overridden.
> When tried, I have:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: The record cannot 
> be acknowledged.
>     at 
> org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
>     at 
> org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
>     at 
> org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
>     at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
> It looks like the record was already released.
> Code used:
> {code:java}
> //
> } catch (RecordDeserializationException re) {
> long offset = re.offset();
> Throwable t = re.getCause();
> LOGGER.error("Failed to deserialize record at partition={} offset={}", 
> re.topicPartition().partition(), offset, t);
> ConsumerRecord record = new 
> ConsumerRecord<>(re.topicPartition().topic(), 
> re.topicPartition().partition(), offset, "", "");
> consumer.acknowledge(record, AcknowledgeType.REJECT);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19051) Fix implicit acknowledgement cannot be overridden when RecordDeserializationException occurs

2025-03-27 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frédérik ROULEAU updated KAFKA-19051:
-
Summary: Fix implicit acknowledgement cannot be overridden when 
RecordDeserializationException occurs  (was: Fix implicit acknowledgement 
cannot be overriden when RecordDeserializationException occurs)

> Fix implicit acknowledgement cannot be overridden when 
> RecordDeserializationException occurs
> 
>
> Key: KAFKA-19051
> URL: https://issues.apache.org/jira/browse/KAFKA-19051
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Frédérik ROULEAU
>Priority: Major
>
> When a record generates a RecordDeserializationException, KIP mentioned that 
> with explicit acknowledgement the default Release can be overridden.
> When tried, I have:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: The record cannot 
> be acknowledged.
>     at 
> org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123)
>     at 
> org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683)
>     at 
> org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534)
>     at org.example.frouleau.kip932.Main.main(Main.java:62) {code}
> It looks like the record was already released.
> Code used:
> {code:java}
> //
> } catch (RecordDeserializationException re) {
> long offset = re.offset();
> Throwable t = re.getCause();
> LOGGER.error("Failed to deserialize record at partition={} offset={}", 
> re.topicPartition().partition(), offset, t);
> ConsumerRecord record = new 
> ConsumerRecord<>(re.topicPartition().topic(), 
> re.topicPartition().partition(), offset, "", "");
> consumer.acknowledge(record, AcknowledgeType.REJECT);
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-19047:
---
Fix Version/s: 4.1.0
   4.0.1

> Broker registrations are slow if previously fenced or shutdown
> --
>
> Key: KAFKA-19047
> URL: https://issues.apache.org/jira/browse/KAFKA-19047
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 4.0.0
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Major
> Fix For: 4.1.0, 4.0.1
>
>
> BrokerLifecycleManager prevents registration of a broker w/ an id it has seen 
> before with a different incarnation id if the broker session expires. On 
> clean shutdown and restart of a broker this can cause an unnecessary delay in 
> re-registration while the quorum controller waits for the session to expire.
> ```
> [BrokerLifecycleManager id=1] Unable to register broker 1 because the 
> controller returned error DUPLICATE_BROKER_REGISTRATION
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-27 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-19047:
---
Component/s: controller

> Broker registrations are slow if previously fenced or shutdown
> --
>
> Key: KAFKA-19047
> URL: https://issues.apache.org/jira/browse/KAFKA-19047
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 4.0.0
>Reporter: Alyssa Huang
>Assignee: Alyssa Huang
>Priority: Major
> Fix For: 4.1.0, 4.0.1
>
>
> BrokerLifecycleManager prevents registration of a broker w/ an id it has seen 
> before with a different incarnation id if the broker session expires. On 
> clean shutdown and restart of a broker this can cause an unnecessary delay in 
> re-registration while the quorum controller waits for the session to expire.
> ```
> [BrokerLifecycleManager id=1] Unable to register broker 1 because the 
> controller returned error DUPLICATE_BROKER_REGISTRATION
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15873: Filter topics before sorting [kafka]

2025-03-27 Thread via GitHub


lorcanj opened a new pull request, #19304:
URL: https://github.com/apache/kafka/pull/19304

   Partially addresses: 
[KAFKA-15873](https://issues.apache.org/jira/browse/KAFKA-15873)
   
   When filtering and sorting, we should be applying the filter before the sort 
of topics.
   
   Order that unauthorizedForDescribeTopicMetadata is added to not relevant as 
it is a HashSet.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19049: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base [kafka]

2025-03-27 Thread via GitHub


FrankYang0529 commented on PR #19299:
URL: https://github.com/apache/kafka/pull/19299#issuecomment-2760354933

   @Rancho-7 Can we also this line in Java doc? Thanks.
   
   
https://github.com/apache/kafka/blob/28de78bcbad605a3e906d085d2e59b441ae35212/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java#L81


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18390: Use LinkedHashMap instead of Map in creating MetricName and SensorBuilder (1/N) [kafka]

2025-03-27 Thread via GitHub


TaiJuWu commented on PR #19300:
URL: https://github.com/apache/kafka/pull/19300#issuecomment-2760216868

   > @TaiJuWu Thanks for the patch. Out of curiosity, have we checked that jmx 
metric names are not altered by this change? I think that they are generated 
based on the order in the data structure.
   
   
   Hi @dajac , thanks for your review.
   I thinks the order is not changed. Both of them are following. 
   https://github.com/user-attachments/assets/2a5e45e5-2036-4e82-a29d-228fcf9432a4";
 />
   
   But I am a little confused why we need to care `jmx metrics name`?
   In the past, we use `HashMap` so it is unordered but we use `LinkedHashMap` 
in this PR. The former is non-deterministic and dependent on different 
implementation but the latter is deterministic.
   
   If there is any user assume metrics order are deterministic, that is an 
issue they need to fix.
   If I misunderstood anything, please correct me, thanks.
   
   
   Another thing was here is a PR related `MetricName` 
https://github.com/apache/kafka/pull/19222, it is difficult if we want to check 
all order is same as before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-27 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2017977111


##
server-common/src/main/java/org/apache/kafka/server/storage/log/FetchIsolation.java:
##
@@ -25,11 +25,11 @@ public enum FetchIsolation {
 TXN_COMMITTED;
 
 public static FetchIsolation of(FetchRequest request) {
-return of(request.replicaId(), request.isolationLevel());
+return of(request.replicaId(), request.isolationLevel(), false);
 }
 
-public static FetchIsolation of(int replicaId, IsolationLevel 
isolationLevel) {
-if (!FetchRequest.isConsumer(replicaId)) {
+public static FetchIsolation of(int replicaId, IsolationLevel 
isolationLevel, boolean isShareFetchRequest) {
+if (!FetchRequest.isConsumer(replicaId) && !isShareFetchRequest) {

Review Comment:
   I have made the code change to use `replicaId` as -1 and removed 
`isShareFetchRequest` param.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`

2025-03-27 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939153#comment-17939153
 ] 

Lan Ding commented on KAFKA-19024:
--

Thanks for your reply.
Whether reusing the GROUP_MAX_SIZE_REACHED error code is appropriate depends on 
whether clients need to differentiate between these two error scenarios. 
However, given that clients currently only need to retry and log the error (no 
special handling is required), reusing the error code seems acceptable.

> Enhance the client behaviour when it tries to exceed the 
> `group.share.max.groups`
> -
>
> Key: KAFKA-19024
> URL: https://issues.apache.org/jira/browse/KAFKA-19024
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sanskar Jhajharia
>Assignee: Lan Ding
>Priority: Minor
>
> For share groups we use the `group.share.max.groups` config to define the 
> number of max share groups we allow. However, when we exceed the same, the 
> client logs do not specify any such error and simply do not consume. The 
> group doesn't get created but the client continues to send Heartbeats hoping 
> for one of the existing groups to shut down and allowing it to form a group. 
> Having a log or an exception in the client logs will help them debug such 
> situations accurately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16580: Enable dynamic quorum reconfiguration for raft simulation tests [kafka]

2025-03-27 Thread via GitHub


kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r2003724311


##
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
 @Override
 public void verify() {
-cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-.filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-.filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-.count();
-assertTrue(
-numReachedHighWatermark >= cluster.majoritySize(),
-"Insufficient nodes have reached current high watermark");
+if (cluster.withKip853) {
+/*
+* For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+* 1. the leader's voter set at the HWM
+* 2. the leader's lastVoterSet()
+* has reached the HWM. We need to perform a more elaborate 
check here because in clusters where
+* an Add/RemoveVoter request increases/decreases the majority 
of voters value by 1, the leader
+* could have used either majority value to update its HWM 
value. This is because depending on
+* whether the leader read the most recent VotersRecord prior 
to updating its HWM value, the number

Review Comment:
   The summary of the discussion as to why we need to check the HWM in this way 
is contained here: 
https://github.com/apache/kafka/pull/18987#discussion_r1971722138.
   
   TLDR: the simulation test can call `verify` and look at the RaftClient 
internal state in between those two points (i.e. `partitionState` has been 
updated with latest `votersRecord`, but the HWM is still the old value from the 
last committed `votersRecord`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]

2025-03-27 Thread via GitHub


AyoubOm commented on PR #19303:
URL: https://github.com/apache/kafka/pull/19303#issuecomment-2759697061

   @mjsax FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >