[jira] [Commented] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error

2025-03-26 Thread Lorcan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938478#comment-17938478
 ] 

Lorcan commented on KAFKA-19022:


Hi [~rsamudrala], I've checked the KafkaRaftClient class for the 
Inconsistent_Cluster_Id error and it looks like there are several instances 
where a customised error message with the cluster ids is not being set:

VoteResponseData
BeginQuorumEpochResponseData
EndQuorumEpochResponseData
FetchResponseData
FetchSnapshotResponseData

These all implement the ApiMessage, just like AddRaftVoterResponseData does. I 
can create a PR to add an errorMessage field to FetchResponseData to get 
feedback and see if this is a viable approach.

> Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID 
> error
> --
>
> Key: KAFKA-19022
> URL: https://issues.apache.org/jira/browse/KAFKA-19022
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, logging
>Affects Versions: 3.9.0
>Reporter: Ranganath Samudrala
>Assignee: Lorcan
>Priority: Major
>
> While migrating Kafka from zookeeper to kraft, we see errors in logs like
> {{INCONSISTENT_CLUSTER_ID in FETCH response }}
> or
> {{INCONSISTENT_CLUSTER_ID in VOTER response }}
> But cluster IDs being compared is not displayed in logs so there is not 
> enough information to see where the issue is. Is the class data *clusterId* 
> empty (which could potentially be a bug?) or incoming *clusterId* empty or 
> incorrect?
> [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
> {code:java}
>     private boolean hasValidClusterId(String requestClusterId) {
>         // We don't enforce the cluster id if it is not provided.
>         if (requestClusterId == null)Unknown macro: {
>                       return true;
>                  }
>                 return clusterId.equals(requestClusterId);
>      }
> .
> .
>     private CompletableFuture handleFetchRequest(
>         RaftRequest.Inbound requestMetadata,
>         long currentTimeMs
>     ) {
>         FetchRequestData request = (FetchRequestData) requestMetadata.data();
>         if (!hasValidClusterId(request.clusterId())) {
>                      return completedFuture(new 
> FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
>                  }
> .
> .
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-18300) Improve coordinator records

2025-03-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-18300.
-
Fix Version/s: 4.1.0
   Resolution: Fixed

> Improve coordinator records
> ---
>
> Key: KAFKA-18300
> URL: https://issues.apache.org/jira/browse/KAFKA-18300
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 4.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013806185


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions
+) {
+lock.writeLock().lock();
+try {
+if (abortedTransactions.isEmpty())
+return acquiredRecords;
+// The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveAcquiredBatchRecords(subMap, recordBatch);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+// Visible for testing.
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecords,
+List recordsToArchive
+) {
+lock.writeLock().lock();
+try {
+List result = new ArrayList<>();
+
+for (AcquiredRecords acquiredRecord : acquiredRecords) {
+List tempAcquiredRecords = new ArrayList<>();
+tempAcquiredRecords.add(acquiredRecord);
+for (RecordBatch recordBatch : recordsToArchive) {
+List newAcquiredRecords = new 
ArrayList<>();
+for (AcquiredRecords temp : tempAcquiredRecords) {
+// Check if record batch overlaps with the acquired 
records.
+if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+// Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(temp.firstOffset())
+.setLastOffset(recordBatch.baseOffset() - 
1)
+.setDeliveryCount((short) 1));
+}
+if (temp.lastOffset() > recordBatch.lastOffset()) {
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(recordBatch.lastOffset() + 
1)
+.setLastOffset(temp.lastOffset())
+.setDeliveryCount((short) 1));
+}
+} else {
+newAcquiredRecords.add(temp);
+}
+}
+tempAcquiredRecords = newAcquiredRecords;
+}
+result.addAll(tempAcquiredRecords);
+}
+return result;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void archiveAcquiredBatchRecords(NavigableMap 
subMap, RecordBatch recordBatch) {
+lock.writeLock().lock();
+try {
+// The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+// or spans over multiple fetched batches. The state can vary per 
offset itself from
+// the fetched batch in case of subset.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+
+// If startOffs

[jira] [Resolved] (KAFKA-18612) Update ApiMessageFormatter

2025-03-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-18612.
-
Fix Version/s: 4.1.0
   Resolution: Fixed

Addressed by 
https://github.com/apache/kafka/commit/b6adec48c5501d7c6ceebabecf1321dd6c7b8835.

> Update ApiMessageFormatter
> --
>
> Key: KAFKA-18612
> URL: https://issues.apache.org/jira/browse/KAFKA-18612
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 4.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] (WIP) KAFKA-18409: ShareGroupStateMessageFormatter should use ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


brandboat commented on PR #18510:
URL: https://github.com/apache/kafka/pull/18510#issuecomment-2753884551

   > @brandboat We have merged https://github.com/apache/kafka/pull/18695. We 
can proceed with your PR. Please ping me when the PR is ready for review.
   
   Thanks for the heads-up, I'll handle this ASAP!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-03-26 Thread via GitHub


mimaison commented on code in PR #19216:
URL: https://github.com/apache/kafka/pull/19216#discussion_r2013891681


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##
@@ -0,0 +1,793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.kafka.server.util.LockUtils.inLock;
+
+/**
+ * This class manages the state (see {@link LogCleaningState}) of each 
partition being cleaned.
+ * 
+ * 1. None: No cleaning state in a TopicPartition. In 
this state, it can become LogCleaningInProgress
+ *or LogCleaningPaused(1). Valid previous 
state are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In 
this state, it can become None when log cleaning is finished
+ *or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted  : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *Valid previous state is 
LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log 
cleaning can be done in this state.
+ *In this state, it can become None or 
LogCleaningPaused(2).
+ *Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 
2. No log cleaning can be done in this state.
+ *In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *Valid previous state is 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ * 
+ */
+public class LogCleanerManager {
+public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+// Visible for testing
+public static final Set GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+// For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+/**
+ * The set of logs currently being cleaned.
+ */
+private final Map inProgress = new 
HashMap<>();
+
+/**
+ * The set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+ * for each log directory.
+ */
+private final Map> uncleanablePartitions = new 
HashMap<>();
+
+/**
+ * A global lock used to control all access to the in-progress set and the 
offset checkpoints.

Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]

2025-03-26 Thread via GitHub


m1a2st commented on code in PR #18930:
URL: https://github.com/apache/kafka/pull/18930#discussion_r2014432276


##
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java:
##
@@ -289,6 +295,13 @@ protected Map 
postProcessParsedConfig(final Map
 CommonClientConfigs.warnDisablingExponentialBackoff(this);
 return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, 
parsedValues);
 }
+
+@Override
+public Map originals() {

Review Comment:
   When I traced `WorkerConfig`, I found that it was removed in the 
`originals()` method. This method is used to create admins like `TopicAdmin` or 
within `adminFactory`. Therefore, I think that when creating an `Admin`, we 
cannot pass this configuration.
   
https://github.com/apache/kafka/blob/56d1dc1b6e12a8a325c91749485aa03bce17c7f1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L458
   
https://github.com/apache/kafka/blob/56d1dc1b6e12a8a325c91749485aa03bce17c7f1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L943
   
https://github.com/apache/kafka/blob/56d1dc1b6e12a8a325c91749485aa03bce17c7f1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L2146
   
   If I missed or misunderstood anything, please let me know. Thanks 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18982: Allow ClusterTests to ignore specific thread leaks [kafka]

2025-03-26 Thread via GitHub


mimaison commented on PR #19206:
URL: https://github.com/apache/kafka/pull/19206#issuecomment-2755034626

   I kind of agree with @gharris1727. Kafka is not using this code and this is 
not a public API, so this is effectively adding code to the Kafka code base 
just for your use case. Sure the change is small but the Apache Kafka project 
can't add and maintain code for private uses. 
   
   I think the current state of what we offer to users to test things is not 
great. It's probably worth having discussions to see what we can do to improve 
in this area. In the past most of the testing infrastructure was in core so 
exposing it was hard. Now that a big chunk of that test infra has been moved to 
a dedicated module we should have more flexibility and options.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16894) Define switches to enable share groups for preview

2025-03-26 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16894:
-
Fix Version/s: 4.1.0

> Define switches to enable share groups for preview
> --
>
> Key: KAFKA-16894
> URL: https://issues.apache.org/jira/browse/KAFKA-16894
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 4.1.0
>
>
> Create group.version=2 and share.version=1 as the switches to enable share 
> groups in the broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-26 Thread via GitHub


ShivsundarR closed pull request #19294: MINOR: Add check in ShareConsumerImpl 
to send acknowledgements of control records when ShareFetch is empty.
URL: https://github.com/apache/kafka/pull/19294


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-26 Thread via GitHub


ShivsundarR opened a new pull request, #19295:
URL: https://github.com/apache/kafka/pull/19295

   *What*
   Currently if we received just a control record in the `ShareFetchResponse`, 
then the currentFetch in `ShareConsumerImpl` would not be updated as the record 
is ignored. But in the process, we lose the acknowledgment for this control 
record which is GAP for control records.
   PR fixes this by adding a check to include the pending acknowledgements even 
when the `ShareFetch` is empty.
   Added a unit test which verifies this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16894) Define switches to enable share groups for preview

2025-03-26 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16894:
-
Summary: Define switches to enable share groups for preview  (was: Define 
group.version=2)

> Define switches to enable share groups for preview
> --
>
> Key: KAFKA-16894
> URL: https://issues.apache.org/jira/browse/KAFKA-16894
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>
> Create group.version=2 as the switch to enable share groups in the broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


mimaison commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014493205


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   See my attempt at implementing what I described: 
https://github.com/apache/kafka/commit/89e9a633cb605bc3a865c5001da704bd29628890
   
   I don't think this changes the contracts we currently have with regards to 
`RemoteLogMetadataManager` and `RemoteStorageManager`. For both we still call 
the empty constructor, followed by a call to `configure()`, and now potentially 
a call to `withPluginMetrics()`, if the implementation is `Monitorable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19042: [2/N] Move TransactionsWithMaxInFlightOneTest to client-integration-tests module [kafka]

2025-03-26 Thread via GitHub


FrankYang0529 opened a new pull request, #19289:
URL: https://github.com/apache/kafka/pull/19289

   Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test infra 
and move it to client-integration-tests module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-12744: dependency upgrade: `argparse4j` 0.7.0 -->> 0.9.0 [kafka]

2025-03-26 Thread via GitHub


dejan2609 commented on PR #19265:
URL: https://github.com/apache/kafka/pull/19265#issuecomment-2755336115

   Pardon me for not testing with checkstyle in a first place @mimaison.
   
   Anyway, I fixed imports but Gradle build fails again:
   `warning: [deprecation] newArgumentParser(String) in ArgumentParsers has 
been deprecated`
   
   I will paste complete stack trace here (and will use next comment to present 
alternatives).
   ```
   dejan@dejan:~/kafka$ ./gradlew checkstyleMain checkstyleTest spotlessCheck
   Starting a Gradle Daemon, 2 incompatible and 1 stopped Daemons could not be 
reused, use --status for details
   
   > Configure project :
   Starting build with version 4.1.0-SNAPSHOT (commit id f17682ab) using Gradle 
8.10.2, Java 17 and Scala 2.13.15
   Build properties: ignoreFailures=false, maxParallelForks=8, 
maxScalacThreads=8, maxTestRetries=0
   
   > Task :generator:compileJava
   
/home/dejan/kafka/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java:371:
 warning: [deprecation] newArgumentParser(String) in ArgumentParsers has been 
deprecated
   .newArgumentParser("message-generator")
   ^
   error: warnings found and -Werror specified
   
/home/dejan/kafka/generator/src/main/java/org/apache/kafka/message/checker/MetadataSchemaCheckerTool.java:43:
 warning: [deprecation] newArgumentParser(String) in ArgumentParsers has been 
deprecated
   ArgumentParser argumentParser = ArgumentParsers.
  ^
   1 error
   2 warnings
   
   > Task :generator:compileJava FAILED
   
   FAILURE: Build failed with an exception.
   
   * What went wrong:
   Execution failed for task ':generator:compileJava'.
   > Compilation failed; see the compiler error output for details.
   
   * Try:
   > Run with --info option to get more log output.
   
   BUILD FAILED in 1m
   54 actionable tasks: 9 executed, 45 up-to-date
   dejan@dejan:~/kafka$
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2025-03-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16394:

Fix Version/s: 3.9.1

> ForeignKey LEFT join propagates null value on foreignKey change
> ---
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Fix For: 3.9.1
>
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", null) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null) {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, null) {code}
> An additional null is propagated to the join result.
>  
> This bug doesn't exist on versions 3.6.0 and below.
>  
> I believe the issue comes from the line 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
> where we propagate the deletion in the two scenarios above
>  
> Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18713) Kafka Streams Left-Join not always emitting the last value

2025-03-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-18713:

Due Date: (was: 3/Feb/25)

> Kafka Streams Left-Join not always emitting the last value
> --
>
> Key: KAFKA-18713
> URL: https://issues.apache.org/jira/browse/KAFKA-18713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: tuna b
>Assignee: Nil Madhab
>Priority: Major
> Fix For: 3.9.1
>
> Attachments: Screenshot 2025-02-10 at 12.06.38.png
>
>
> There seems to be an issue when performing a left-join, the latest value with 
> of the join does not contain the value of right side.
> {code:java}
> var builder = new StreamsBuilder();
> KTable departments = builder
> .table("departments",
> Materialized. Department>as(Stores.persistentKeyValueStore("departments"))
> .withKeySerde(Serdes.String())
> .withValueSerde(CustomJsonSerde.json(Department.class)));
> KTable persons = builder
> .table("persons",
> Materialized. Person>as(Stores.persistentKeyValueStore("persons"))
> .withKeySerde(Serdes.String())
> .withValueSerde(CustomJsonSerde.json(Person.class)));
> KTable joined = persons
> .leftJoin(departments, Person::getDepartmentId, (person, department) -> 
> person.toBuilder()
> .department(department)
> .build(),
> TableJoined.as("my-joiner"),
> Materialized. Person>as(Stores.persistentKeyValueStore("joined-results"))
> .withKeySerde(Serdes.String())
> .withValueSerde(CustomJsonSerde.json(Person.class)));
> joined
> .toStream()
> .to("joined-results", Produced.with(Serdes.String(), 
> CustomJsonSerde.json(Person.class))); {code}
> How to reproduce:
> Create two topics persons and departments, each with 10 partitions. 
> Pre-populate the departments topic with 2 departments.
>  
> Observation: * When i initially produce a Person {{p-1}} with a FK 
> {{{}dep-1{}}}, the join works .
>  * 
>  ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
>  * When i change the FK to {{{}dep-2{}}}, the join updates .
>  ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}}
>  * When i change the FK back to {{{}dep-1{}}}, the join fails .
>  ** output is an EnrichedResult with person {{p-1}} *but no department*
>  * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), 
> the join works again .
>  ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
> Also, even when you are not setting back to a previous FK, there can still be 
> an issue with the left join. Changing an FK means insert + delete operations, 
> but sometimes the result of the delete is emitted after the result of the 
> insert. 
> How to reproduce:
> Create a departments topic and pre-populate it with 5 departments (dep-1 to 
> dep-5). Create a persons topic and create person  p-1 with FK dep-1. Send an 
> update to the persons topic by changing the FK to dep-2 and repeat this step 
> until dep-5. Now you will see that the latest emitted value of the person 
> does not contain a department. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


apoorvmittal10 commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2014733112


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions
+) {
+lock.writeLock().lock();
+try {
+if (abortedTransactions.isEmpty())
+return acquiredRecords;
+// The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveAcquiredBatchRecords(subMap, recordBatch);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+// Visible for testing.
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecords,
+List recordsToArchive
+) {
+lock.writeLock().lock();
+try {
+List result = new ArrayList<>();
+
+for (AcquiredRecords acquiredRecord : acquiredRecords) {
+List tempAcquiredRecords = new ArrayList<>();
+tempAcquiredRecords.add(acquiredRecord);
+for (RecordBatch recordBatch : recordsToArchive) {
+List newAcquiredRecords = new 
ArrayList<>();
+for (AcquiredRecords temp : tempAcquiredRecords) {
+// Check if record batch overlaps with the acquired 
records.
+if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+// Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(temp.firstOffset())
+.setLastOffset(recordBatch.baseOffset() - 
1)
+.setDeliveryCount((short) 1));
+}
+if (temp.lastOffset() > recordBatch.lastOffset()) {
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(recordBatch.lastOffset() + 
1)
+.setLastOffset(temp.lastOffset())
+.setDeliveryCount((short) 1));
+}
+} else {
+newAcquiredRecords.add(temp);
+}
+}
+tempAcquiredRecords = newAcquiredRecords;
+}
+result.addAll(tempAcquiredRecords);
+}
+return result;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void archiveAcquiredBatchRecords(NavigableMap 
subMap, RecordBatch recordBatch) {
+lock.writeLock().lock();
+try {
+// The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+// or spans over multiple fetched batches. The state can vary per 
offset itself from
+// the fetched batch in case of subset.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+
+// If startOffse

Re: [PR] KAFKA-12744: dependency upgrade: `argparse4j` 0.7.0 -->> 0.9.0 [kafka]

2025-03-26 Thread via GitHub


dejan2609 commented on PR #19265:
URL: https://github.com/apache/kafka/pull/19265#issuecomment-2755363293

   So, herewith alternatives: 
   1. relax Checkstyle rules via exception (i.e. don't fail the build for 
`ArgumentParsers.newArgumentParser` method deprecation warning mentioned above) 
**_OR_**
   2. use my old PR [#10626](https://github.com/apache/kafka/pull/10626) and 
replace method (`ArgumentParsers.newArgumentParser` with  
`ArgumentParsers.newFor`) as mentioned 
[here](https://argparse4j.github.io/migration.html#to-0-8-0)
   
   @mimaison LMK which option sounds better for you.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18899: Improve handling of timeouts for commitAsync() in ShareConsumer. [kafka]

2025-03-26 Thread via GitHub


AndrewJSchofield merged PR #19192:
URL: https://github.com/apache/kafka/pull/19192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


dajac merged PR #18695:
URL: https://github.com/apache/kafka/pull/18695


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-6333 java.awt.headless should not be on commandline [kafka]

2025-03-26 Thread via GitHub


mimaison commented on code in PR #19237:
URL: https://github.com/apache/kafka/pull/19237#discussion_r2013902994


##
core/src/main/scala/kafka/Kafka.scala:
##
@@ -73,7 +73,7 @@ object Kafka extends Logging {
 try {
   val serverProps = getPropsFromArgs(args)
   val server = buildServer(serverProps)
-
+  System.setProperty("java.awt.headless", "true")

Review Comment:
   I'm not the creator of the JIRa but I think the point was to completely 
remove this system property because Kafka does not use java.awt at all, so it 
does not need to run in headless mode.



##
core/src/main/scala/kafka/Kafka.scala:
##
@@ -73,7 +73,7 @@ object Kafka extends Logging {
 try {
   val serverProps = getPropsFromArgs(args)
   val server = buildServer(serverProps)
-
+  System.setProperty("java.awt.headless", "true")

Review Comment:
   I'm not the creator of the JIRA but I think the point was to completely 
remove this system property because Kafka does not use java.awt at all, so it 
does not need to run in headless mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Write log differently according to the size of missingListenerPartitions [kafka]

2025-03-26 Thread via GitHub


HyunSangHan commented on PR #11418:
URL: https://github.com/apache/kafka/pull/11418#issuecomment-2754340862

   I am still here! ✋


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16894: Define features to enable share groups [kafka]

2025-03-26 Thread via GitHub


dajac commented on PR #19293:
URL: https://github.com/apache/kafka/pull/19293#issuecomment-2755031196

   @AndrewJSchofield I would like to better understand how `group.version=2` 
will be used. Could you elaborate a bit more on what we will gate with it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   
https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9
 is the commit I try moving `configure` to constructor but I finally gave it up.
   
   The most important consideration is `RemoteLogMetadataManager` and 
`RemoteStorageManager` are public plugin and we should not change this flow 
without any KIP If I misunderstood, please correct me.
   
   I will continue to work after getting other feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16894: Define features to enable share groups [kafka]

2025-03-26 Thread via GitHub


AndrewJSchofield opened a new pull request, #19293:
URL: https://github.com/apache/kafka/pull/19293

   This PR proposes two switches to enable share groups for 4.1 (preview) and 
4.2 (GA).
   
   * `group.version=2` to indicate that the group coordinator is able to 
understand share group records.
   * `share.version=1` to indicate that share groups are enabled. This is used 
as the switch for turning share groups on and off.
   
   In 4.1, the defaults will be `group.version=2` and `share.version=0`. Then a 
user wanting to evaluate the preview of KIP-932 would use 
`bin/kafka-features.sh --bootstrap.server  upgrade --feature 
share.version=1`.
   
   In 4.2, the default will be `share.version=1`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Mark streams RPCs as unstable [kafka]

2025-03-26 Thread via GitHub


lucasbru opened a new pull request, #19292:
URL: https://github.com/apache/kafka/pull/19292

   Streams groups RPCs are not enabled by default, but they should also be 
marked as unstable.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16894) Define switches to enable share groups for preview

2025-03-26 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16894:
-
Description: Create group.version=2 and share.version=1 as the switches to 
enable share groups in the broker.  (was: Create group.version=2 as the switch 
to enable share groups in the broker.)

> Define switches to enable share groups for preview
> --
>
> Key: KAFKA-16894
> URL: https://issues.apache.org/jira/browse/KAFKA-16894
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>
> Create group.version=2 and share.version=1 as the switches to enable share 
> groups in the broker.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]

2025-03-26 Thread via GitHub


ShivsundarR opened a new pull request, #19294:
URL: https://github.com/apache/kafka/pull/19294

   *What*
   Currently if we received just a control record in the `ShareFetchResponse`, 
then the currentFetch in `ShareConsumerImpl` would not be updated as the record 
is ignored. But in the process, we lose the acknowledgment for this control 
record which is GAP for control records.
   PR fixes this by adding a check to include the pending acknowledgements even 
when the `ShareFetch` is empty.
   Added a unit test which verifies this change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null

2025-03-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-16407:

Fix Version/s: 3.9.1

> ForeignKey INNER join ignores FK change when its previous value is null
> ---
>
> Key: KAFKA-16407
> URL: https://issues.apache.org/jira/browse/KAFKA-16407
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, 
> 3.6.1
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Fix For: 3.9.1
>
> Attachments: InnerFKJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *INNER* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK*
> {code:scala}
> rightTopic.pipeInput("fk", "1")
> leftTopic.pipeInput("pk1", LeftRecord(null, "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1){code}
>  
> *+Actual result+*
> {code:scala}
> # No output !
> # Logs:
> 20:14:29,723 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[0]
> 20:14:29,728 WARN  
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier
>   - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] 
> topic=[left-topic] partition=[0] offset=[1]
> {code}
>  
> After looking into the code, I believe this is the line behind the issue : 
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18935: Ensure brokers do not return null records in FetchResponse [kafka]

2025-03-26 Thread via GitHub


junrao commented on code in PR #19167:
URL: https://github.com/apache/kafka/pull/19167#discussion_r2015116293


##
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java:
##
@@ -89,7 +89,7 @@ public FetchResponseData data() {
  */
 public FetchResponse(FetchResponseData fetchResponseData) {
 super(ApiKeys.FETCH);
-this.data = fetchResponseData;
+this.data = convertNullRecordsToEmpty(fetchResponseData);

Review Comment:
   This code is called by both the server and the client. On the server side, 
it makes sense for the server to set records to empty. On the client side, it 
seems that it's better to take whatever it receives from the broker, instead of 
changing it silently. Maybe we can consolidate the server side usage based on 
FetchResponse.of() and the client side usage based on FetchResponse.parse(), 
and avoid the direct usage of this constructor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: add log4j2.yaml to clients-integration-tests module [kafka]

2025-03-26 Thread via GitHub


m1a2st commented on code in PR #19252:
URL: https://github.com/apache/kafka/pull/19252#discussion_r2007875519


##
clients/clients-integration-tests/src/test/resources/log4j2.yaml:
##
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+Configuration:
+  Properties:
+Property:
+  - name: "logPattern"
+value: "[%d] %p %m (%c:%L)%n"
+
+  Appenders:
+Console:
+  name: STDOUT
+  PatternLayout:
+pattern: "${logPattern}"
+
+  Loggers:

Review Comment:
   Sure, addressed it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]

2025-03-26 Thread via GitHub


ableegoldman commented on code in PR #19269:
URL: https://github.com/apache/kafka/pull/19269#discussion_r2015005410


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java:
##
@@ -70,6 +70,7 @@ public class StreamsProducer {
 private Producer producer;
 private boolean transactionInFlight = false;
 private boolean transactionInitialized = false;
+private boolean allowReset = true;

Review Comment:
   it's a bit confusing that we flip-flop back and forth between "disableReset" 
and "allowReset", I'd recommend picking one (probably `disableReset`) for all 
the method and variable names



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java:
##
@@ -324,6 +329,22 @@ void close() {
 transactionInitialized = false;
 }
 
+/**
+ * Disables producer reset to prevent producer recreation during shutdown.
+ * 
+ * When disabled, subsequent calls to reInitializeProducer() will not 
recreate
+ * the producer instance, avoiding resource leak.
+ * 
+ * 
+ * This method should only be invoked when the {@link 
org.apache.kafka.streams.processor.internals.ActiveTaskCreator}

Review Comment:
   nit: I'd say when the "StreamThread" is shutting down, rather than the 
ActiveTaskCreator
   
   Effectively it's the same thing at the moment but you never know what 
refactoring may bring, and the important thing here is that the StreamThread 
itself is shutting down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`

2025-03-26 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938778#comment-17938778
 ] 

Lan Ding commented on KAFKA-19024:
--

Hi [~schofielaj],  If this ticket is still open, may I take it over?

> Enhance the client behaviour when it tries to exceed the 
> `group.share.max.groups`
> -
>
> Key: KAFKA-19024
> URL: https://issues.apache.org/jira/browse/KAFKA-19024
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sanskar Jhajharia
>Assignee: Andrew Schofield
>Priority: Minor
>
> For share groups we use the `group.share.max.groups` config to define the 
> number of max share groups we allow. However, when we exceed the same, the 
> client logs do not specify any such error and simply do not consume. The 
> group doesn't get created but the client continues to send Heartbeats hoping 
> for one of the existing groups to shut down and allowing it to form a group. 
> Having a log or an exception in the client logs will help them debug such 
> situations accurately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Allow re-registration for fenced / cleanly shutdown brokers [kafka]

2025-03-26 Thread via GitHub


ahuang98 opened a new pull request, #19296:
URL: https://github.com/apache/kafka/pull/19296

   Will create a jira and add the id in a bit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow re-registration for fenced / cleanly shutdown brokers [kafka]

2025-03-26 Thread via GitHub


ahuang98 commented on code in PR #19296:
URL: https://github.com/apache/kafka/pull/19296#discussion_r2015416417


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -353,7 +353,7 @@ public ControllerResult 
registerBroker(
 if (existing != null) {
 prevIncarnationId = existing.incarnationId();
 storedBrokerEpoch = existing.epoch();
-if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
+if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) 
&& heartbeatManager.isBrokerActive(brokerId)) {

Review Comment:
   thanks 😅 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17541) Improve handling of delivery count

2025-03-26 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938772#comment-17938772
 ] 

Lan Ding commented on KAFKA-17541:
--

Hi [~schofielaj] ,  If this ticket is still open, may I take it over?

> Improve handling of delivery count
> --
>
> Key: KAFKA-17541
> URL: https://issues.apache.org/jira/browse/KAFKA-17541
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>
> There are two situations in which the delivery count handling needs to be 
> more intelligent.
> First, for records which are automatically released as a result of closing a 
> share session normally, the delivery count should not be incremented. These 
> records were fetched but they were not actually delivered to the client since 
> the disposition of the delivery records is carried in the ShareAcknowledge 
> which closes the share session. Any remaining records were not delivered, 
> only fetched.
> Second, for records which have a delivery count which is more than 1 or 2, 
> there is a suspicion that the records are not being delivered due to a 
> problem rather than just natural retrying. The batching of these records 
> should be reduced, even down to a single record as a time so we do not have 
> the failure to deliver a poisoned record actually causing adjacent records to 
> be considered unsuccessful and potentially reach the delivery count limit 
> without proper reason.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.group`s

2025-03-26 Thread Sanskar Jhajharia (Jira)
Sanskar Jhajharia created KAFKA-19024:
-

 Summary: Enhance the client behaviour when it tries to exceed the 
`group.share.max.group`s
 Key: KAFKA-19024
 URL: https://issues.apache.org/jira/browse/KAFKA-19024
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sanskar Jhajharia
Assignee: Andrew Schofield


For share groups we use the `group.share.max.groups` config to define the 
number of max share groups we allow. However, when we exceed the same, the 
client logs do not specify any such error and simply do not consume. The group 
doesn't get created but the client continues to send Heartbeats hoping for one 
of the existing groups to shut down and allowing it to form a group. Having a 
log or an exception in the client logs will help them debug such situations 
accurately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]

2025-03-26 Thread via GitHub


lianetm commented on code in PR #19281:
URL: https://github.com/apache/kafka/pull/19281#discussion_r2014959782


##
core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala:
##
@@ -37,15 +37,17 @@ import java.lang.{Byte => JByte}
 import java.util.Collections
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1)
+@ClusterTestDefaults(
+  types = Array(Type.KRAFT),
+  brokers = 1,
+  serverProperties = Array(
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+new ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+  )
+)

Review Comment:
   should we also add `@Tag("integration")`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow re-registration for fenced / cleanly shutdown brokers [kafka]

2025-03-26 Thread via GitHub


splett2 commented on code in PR #19296:
URL: https://github.com/apache/kafka/pull/19296#discussion_r2015175160


##
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##
@@ -353,7 +353,7 @@ public ControllerResult 
registerBroker(
 if (existing != null) {
 prevIncarnationId = existing.incarnationId();
 storedBrokerEpoch = existing.epoch();
-if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
+if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) 
&& heartbeatManager.isBrokerActive(brokerId)) {

Review Comment:
   could we just check if whether `existing` is fenced or in controlled 
shutdown?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-26 Thread via GitHub


mjsax commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2015238843


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) 
{
 () -> new Processor<>() {
 private KeyValueStore store;
 
+// The store iterator is intentionally not closed here as 
it needs
+// to be open during the test, so the Streams app will 
emit the
+// 
org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
+// that is expected. So the globalStoreIterator is a 
global variable
+// (pun not intended), so it can be closed in the tearDown 
method.
 @Override
 public void init(final ProcessorContext 
context) {
 store = context.getStateStore("iq-test-store");
+globalStoreIterator = store.all();

Review Comment:
   One-liner?
   
   `globalStoreIterator = context.getStateStore("iq-test-store").all()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [WIP] KAFKA-14830: Illegal state error in transactional producer [kafka]

2025-03-26 Thread via GitHub


kirktrue commented on PR #17022:
URL: https://github.com/apache/kafka/pull/17022#issuecomment-2755596812

   cc @k-raina 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only

2025-03-26 Thread Akshesh Doshi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938708#comment-17938708
 ] 

Akshesh Doshi commented on KAFKA-3370:
--

Hi

We had an issue with our Kafka consumers recently, where the 
`nearest.offset.reset` feature suggested by Tayida would have been useful.

Is this something we are expecting to have soon?

 

> Add options to auto.offset.reset to reset offsets upon initialization only
> --
>
> Key: KAFKA-3370
> URL: https://issues.apache.org/jira/browse/KAFKA-3370
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
>
> Currently "auto.offset.reset" is applied in the following two cases:
> 1) upon starting the consumer for the first time (hence no committed offsets 
> before);
> 2) upon fetching offsets out-of-range.
> For scenarios where case 2) needs to be avoid (i.e. people need to be 
> notified upon offsets out-of-range rather than silently offset reset), 
> "auto.offset.reset" need to be set to "none". However for case 1) setting 
> "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon 
> polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied 
> trying to set the offset at initialization, which are actually designed for 
> during the life time of the consumer (in rebalance callback, for example).
> The fix proposal is to add two more options to "auto.offset.reset", 
> "earliest-on-start", and "latest-on-start", whose semantics are "earliest" 
> and "latest" for case 1) only, and "none" for case 2).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [WIP] KAFKA-14830: Illegal state error in transactional producer [kafka]

2025-03-26 Thread via GitHub


kirktrue commented on code in PR #17022:
URL: https://github.com/apache/kafka/pull/17022#discussion_r2014904591


##
clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java:
##
@@ -667,14 +667,23 @@ public synchronized void 
maybeTransitionToErrorState(RuntimeException exception)
 }
 
 synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException 
exception, boolean adjustSequenceNumbers) {
-maybeTransitionToErrorState(exception);
+boolean isStaleBatch = batch.producerId() == 
producerIdAndEpoch.producerId && batch.producerEpoch() < 
producerIdAndEpoch.epoch;

Review Comment:
   Thanks for the feedback @jolshan!
   
   > I'm wondering if there are any cases where producerIdAndEpoch could have a 
race -- or is case there the ID and epoch are the same but the issue still 
happens
   
   There are a couple of bug reports with logs. I'll dig through those to see 
if it's happened in the wild.
   
   > btw -- maybe not super common, but could the overflow case be missed here? 
(new producer id and epoch resets due to epoch reaching max value)
   
   Sounds super rare ;)
   
   If an epoch overflowed, wouldn't that just be interpreted as 'not equal' to 
the last known epoch, and thus trigger the "stale batch" logic? Perhaps my 
understanding of staleness is too naive?
   
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18827: Initialize share group state group coordinator impl. [3/N] [kafka]

2025-03-26 Thread via GitHub


AndrewJSchofield merged PR #19026:
URL: https://github.com/apache/kafka/pull/19026


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Use readable interface to parse requests [kafka]

2025-03-26 Thread via GitHub


jsancio merged PR #19163:
URL: https://github.com/apache/kafka/pull/19163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-26 Thread via GitHub


mjsax commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2015232961


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) 
{
 () -> new Processor<>() {
 private KeyValueStore store;
 
+// The store iterator is intentionally not closed here as 
it needs
+// to be open during the test, so the Streams app will 
emit the
+// 
org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
+// that is expected. So the globalStoreIterator is a 
global variable
+// (pun not intended), so it can be closed in the tearDown 
method.
 @Override
 public void init(final ProcessorContext 
context) {
 store = context.getStateStore("iq-test-store");
+globalStoreIterator = store.all();
 }
 
 @Override
 public void process(final Record record) {
 store.put(record.key(), record.value());

Review Comment:
   Seem we don't really need this? We never pipe any data anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-26 Thread via GitHub


mjsax commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2015235819


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) 
{
 () -> new Processor<>() {
 private KeyValueStore store;

Review Comment:
   Seems we don't need this as a variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-26 Thread via GitHub


mjsax commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2015231173


##
streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java:
##
@@ -60,7 +60,8 @@ public void metricChange(final KafkaMetric metric) {
 
 private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric 
metric) {
 final Map tags = metric.metricName().tags();
-final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && 
(tags.get(THREAD_ID_TAG).equals(threadId) || 
tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
+final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && 
(tags.get(THREAD_ID_TAG).equals(threadId) ||
+
Optional.of(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId));

Review Comment:
   Should this be `Optional.ofNullable(tags.get(THREAD_ID_TAG))` ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18713) Kafka Streams Left-Join not always emitting the last value

2025-03-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-18713:

Fix Version/s: 3.9.1

> Kafka Streams Left-Join not always emitting the last value
> --
>
> Key: KAFKA-18713
> URL: https://issues.apache.org/jira/browse/KAFKA-18713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.8.0
>Reporter: tuna b
>Assignee: Nil Madhab
>Priority: Major
> Fix For: 3.9.1
>
> Attachments: Screenshot 2025-02-10 at 12.06.38.png
>
>
> There seems to be an issue when performing a left-join, the latest value with 
> of the join does not contain the value of right side.
> {code:java}
> var builder = new StreamsBuilder();
> KTable departments = builder
> .table("departments",
> Materialized. Department>as(Stores.persistentKeyValueStore("departments"))
> .withKeySerde(Serdes.String())
> .withValueSerde(CustomJsonSerde.json(Department.class)));
> KTable persons = builder
> .table("persons",
> Materialized. Person>as(Stores.persistentKeyValueStore("persons"))
> .withKeySerde(Serdes.String())
> .withValueSerde(CustomJsonSerde.json(Person.class)));
> KTable joined = persons
> .leftJoin(departments, Person::getDepartmentId, (person, department) -> 
> person.toBuilder()
> .department(department)
> .build(),
> TableJoined.as("my-joiner"),
> Materialized. Person>as(Stores.persistentKeyValueStore("joined-results"))
> .withKeySerde(Serdes.String())
> .withValueSerde(CustomJsonSerde.json(Person.class)));
> joined
> .toStream()
> .to("joined-results", Produced.with(Serdes.String(), 
> CustomJsonSerde.json(Person.class))); {code}
> How to reproduce:
> Create two topics persons and departments, each with 10 partitions. 
> Pre-populate the departments topic with 2 departments.
>  
> Observation: * When i initially produce a Person {{p-1}} with a FK 
> {{{}dep-1{}}}, the join works .
>  * 
>  ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
>  * When i change the FK to {{{}dep-2{}}}, the join updates .
>  ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}}
>  * When i change the FK back to {{{}dep-1{}}}, the join fails .
>  ** output is an EnrichedResult with person {{p-1}} *but no department*
>  * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), 
> the join works again .
>  ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}}
> Also, even when you are not setting back to a previous FK, there can still be 
> an issue with the left join. Changing an FK means insert + delete operations, 
> but sometimes the result of the delete is emitted after the result of the 
> insert. 
> How to reproduce:
> Create a departments topic and pre-populate it with 5 departments (dep-1 to 
> dep-5). Create a persons topic and create person  p-1 with FK dep-1. Send an 
> update to the persons topic by changing the FK to dep-2 and repeat this step 
> until dep-5. Now you will see that the latest emitted value of the person 
> does not contain a department. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19012) Messages ending up on the wrong topic

2025-03-26 Thread Artem Livshits (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938749#comment-17938749
 ] 

Artem Livshits commented on KAFKA-19012:


[~dnadolny] a few questions:
 * what are the versions of the clients that don't have this problem?
 * when you see misrouted messages, are they all in the same batch (i.e. the 
whole batch misrouted) or a batch contains both correct and misrouted messages?
 * when you see bursts, are they always between same topics or different 
messages in the same burst are misrouted between different topics?
 * could you check that you don't have "interceptor.classes" config defined in 
any of the clients?
 * any chance you can share the custom partitioner code?

> Messages ending up on the wrong topic
> -
>
> Key: KAFKA-19012
> URL: https://issues.apache.org/jira/browse/KAFKA-19012
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.2.3, 3.8.1
>Reporter: Donny Nadolny
>Priority: Major
>
> We're experiencing messages very occasionally ending up on a different topic 
> than what they were published to. That is, we publish a message to topicA and 
> consumers of topicB see it and fail to parse it because the message contents 
> are meant for topicA. This has happened for various topics. 
> We've begun adding a header with the intended topic (which we get just by 
> reading the topic from the record that we're about to pass to the OSS client) 
> right before we call producer.send, this header shows the correct topic 
> (which also matches up with the message contents itself). Similarly we're 
> able to use this header and compare it to the actual topic to prevent 
> consuming these misrouted messages, but this is still concerning.
> Some details:
>  - This happens rarely: it happened approximately once per 10 trillion 
> messages for a few months, though there was a period of a week or so where it 
> happened more frequently (once per 1 trillion messages or so)
>  - It often happens in a small burst, eg 2 or 3 messages very close in time 
> (but from different hosts) will be misrouted
>  - It often but not always coincides with some sort of event in the cluster 
> (a broker restarting or being replaced, network issues causing errors, etc). 
> Also these cluster events happen quite often with no misrouted messages
>  - We run many clusters, it has happened for several of them
>  - There is no pattern between intended and actual topic, other than the 
> intended topic tends to be higher volume ones (but I'd attribute that to 
> there being more messages published -> more occurrences affecting it rather 
> than it being more likely per-message)
>  - It only occurs with clients that are using a non-zero linger
>  - Once it happened with two sequential messages, both were intended for 
> topicA but both ended up on topicB, published by the same host (presumably 
> within the same linger batch)
>  - Most of our clients are 3.2.3 and it has only affected those, most of our 
> brokers are 3.2.3 but it has also happened with a cluster that's running 
> 3.8.1 (but I suspect a client rather than broker problem because of it never 
> happening with clients that use 0 linger)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error

2025-03-26 Thread Ranganath Samudrala (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938715#comment-17938715
 ] 

Ranganath Samudrala commented on KAFKA-19022:
-

Agree that

[FetchResponse.json|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/clients/src/main/resources/common/message/FetchResponse.json]
  does not have place to store error message as it exists in 
[AddRaftVoterResponse.json|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/clients/src/main/resources/common/message/AddRaftVoterResponse.json]
 

I am not sure when the improvement will be implemented, if it will be 
implmented at all. Does this require for us to wait for a major release?

Now, I am wondering how to find the location of the problem in my set up?

> Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID 
> error
> --
>
> Key: KAFKA-19022
> URL: https://issues.apache.org/jira/browse/KAFKA-19022
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, logging
>Affects Versions: 3.9.0
>Reporter: Ranganath Samudrala
>Assignee: Lorcan
>Priority: Major
>
> While migrating Kafka from zookeeper to kraft, we see errors in logs like
> {{INCONSISTENT_CLUSTER_ID in FETCH response }}
> or
> {{INCONSISTENT_CLUSTER_ID in VOTER response }}
> But cluster IDs being compared is not displayed in logs so there is not 
> enough information to see where the issue is. Is the class data *clusterId* 
> empty (which could potentially be a bug?) or incoming *clusterId* empty or 
> incorrect?
> [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
> {code:java}
>     private boolean hasValidClusterId(String requestClusterId) {
>         // We don't enforce the cluster id if it is not provided.
>         if (requestClusterId == null)Unknown macro: {
>                       return true;
>                  }
>                 return clusterId.equals(requestClusterId);
>      }
> .
> .
>     private CompletableFuture handleFetchRequest(
>         RaftRequest.Inbound requestMetadata,
>         long currentTimeMs
>     ) {
>         FetchRequestData request = (FetchRequestData) requestMetadata.data();
>         if (!hasValidClusterId(request.clusterId())) {
>                      return completedFuture(new 
> FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
>                  }
> .
> .
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown

2025-03-26 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-19047:


 Summary: Broker registrations are slow if previously fenced or 
shutdown
 Key: KAFKA-19047
 URL: https://issues.apache.org/jira/browse/KAFKA-19047
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang


BrokerLifecycleManager prevents registration of a broker w/ an id it has seen 
before with a different incarnation id if the broker session expires. On clean 
shutdown and restart of a broker this can cause an unnecessary delay in 
re-registration while the quorum controller waits for the session to expire.

```
[BrokerLifecycleManager id=1] Unable to register broker 1 because the 
controller returned error DUPLICATE_BROKER_REGISTRATION
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]

2025-03-26 Thread via GitHub


frankvicky commented on code in PR #19269:
URL: https://github.com/apache/kafka/pull/19269#discussion_r2015413116


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java:
##
@@ -324,6 +329,22 @@ void close() {
 transactionInitialized = false;
 }
 
+/**
+ * Disables producer reset to prevent producer recreation during shutdown.
+ * 
+ * When disabled, subsequent calls to reInitializeProducer() will not 
recreate
+ * the producer instance, avoiding resource leak.
+ * 
+ * 
+ * This method should only be invoked when the {@link 
org.apache.kafka.streams.processor.internals.ActiveTaskCreator}

Review Comment:
   You are right.
   I will clarify it in the next commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR update the README docker image version [kafka]

2025-03-26 Thread via GitHub


m1a2st commented on PR #19278:
URL: https://github.com/apache/kafka/pull/19278#issuecomment-2753763363

   Hello @frankvicky, I consider the README similar to Kafka's "Quick Start" 
section, so I think we should focus on helping users start quickly. Using 
`latest` should be good enough.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17645: KIP-1052: Enable warmup in producer performance test [kafka]

2025-03-26 Thread via GitHub


matt-welch commented on code in PR #17340:
URL: https://github.com/apache/kafka/pull/17340#discussion_r2010980706


##
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##
@@ -94,7 +101,19 @@ void start(String[] args) throws IOException {
 record = new ProducerRecord<>(config.topicName, payload);
 
 long sendStartMs = System.currentTimeMillis();
-cb = new PerfCallback(sendStartMs, payload.length, stats);

Review Comment:
   Thanks for the suggestion!  I think the refactored conditional is much 
easier to understand. Hopefully nobody minds the passing of null 
steadyStateStats.  I've refactored this in my latest commit.  



##
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##
@@ -75,7 +76,13 @@ void start(String[] args) throws IOException {
 // not thread-safe, do not share with other threads
 SplittableRandom random = new SplittableRandom(0);
 ProducerRecord record;
-stats = new Stats(config.numRecords, 5000);
+
+System.out.println("DEBUG: config.warmupRecords=" + 
config.warmupRecords + ", (config.warmupRecords > 0)=" + (config.warmupRecords 
> 0));

Review Comment:
   That's correct. Chia-Ping had asked for some clarification around 
config.warmupRecords. Once he's satisfied with that output, the line will be 
removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]

2025-03-26 Thread via GitHub


dajac commented on PR #19290:
URL: https://github.com/apache/kafka/pull/19290#issuecomment-2754294910

   I still need to add a unit tests for the new method.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Refactor GroupCoordinator write path [kafka]

2025-03-26 Thread via GitHub


dajac opened a new pull request, #19290:
URL: https://github.com/apache/kafka/pull/19290

   This patch addresses a weirdness on the GroupCoordinator write path. The 
`CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords` method 
with `acks=1` and it expects it to completes immediately/synchronously. It 
works because this is effectively what the method does with `acks=1`. The issue 
is that fundamentally the method is asynchronous so the contract is really 
fragile. This patch changes it by introducing new method 
`ReplicaManager.appendRecordsToLeader`, which is synchronous. It also refactors 
`ReplicaManager#appendRecords` to use `ReplicaManager.appendRecordsToLeader` so 
we can benefits from all the existing tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19046) Change delete cleanup policy to compact cleanup policy

2025-03-26 Thread George Yang (Jira)
George Yang created KAFKA-19046:
---

 Summary: Change delete cleanup policy to compact cleanup policy
 Key: KAFKA-19046
 URL: https://issues.apache.org/jira/browse/KAFKA-19046
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 3.7.1
 Environment: Kafka version: 3.7 
mirrormaker2 version: 3.7.1
zk version: 3.6.8
Reporter: George Yang


The internal topics of MirrorMaker 2 (MM2) sometimes report the following error:
`
Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' 
supplied via the 'offset.storage.topic' property is required to have 
'cleanup.policy=compact' to guarantee consistency and durability of source 
connector offsets, but found the topic currently has 'cleanup.policy=delete'. 
Continuing would likely result in eventually losing source connector offsets 
and problems restarting this Connect cluster in the future. Change the 
'offset.storage.topic' property in the Connect worker configurations to use a 
topic with 'cleanup.policy=compact'.
at 
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
[2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping 
(org.apache.kafka.connect.mirror.MirrorMaker:208)
`
 
This results in the MM2 pods in the cluster entering a CrashLoopBackOff state 
repeatedly. When changing the configuration via kafka-configs.sh, the process 
runs fine. However, as we know, the default Kafka broker configuration for 
log.cleanup.policy is set to delete, while the default cleanup policy for MM2 
is set to compact. It appears that the policy for offset.storage.topic must be 
compact, and similarly for status.storage and config.storage.
 
I want to configure the cleanup policy for these three topics to always be 
compact. I attempted to configure them in connect-mirror-maker.properties as 
shown below, but all attempts failed:
`
offset.storage.topic.properties.cleanup.policy=compact
status.storage.topic.properties.cleanup.policy=compact
config.storage.topic.properties.cleanup.policy=compact
`
or 
`
offset.storage.topic.cleanup.policy=compact
status.storage.topic.cleanup.policy=compact
config.storage.topic.cleanup.policy=compact
`
The logs show that the properties are unknown and report a failure in topic 
creation:
`
 Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1]
org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 
'mm2-offsets.cb.internal': Unknown topic config name: 
topic.properties.cleanup.policy
        at 
org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474)
        at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345)
        at 
org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363)
        at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57)
        at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43)
        at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
        at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
        at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
        at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
        at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapte

[jira] [Resolved] (KAFKA-18899) Limit retry time for ShareConsumer.commitAsync

2025-03-26 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-18899.
--
Resolution: Fixed

> Limit retry time for ShareConsumer.commitAsync
> --
>
> Key: KAFKA-18899
> URL: https://issues.apache.org/jira/browse/KAFKA-18899
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Shivsundar R
>Priority: Major
> Fix For: 4.1.0
>
>
> Currently, `ShareConsumer.commitAsync` could in theory retry forever. It 
> should be bounded by the default API timeout.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013699771


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions
+) {
+lock.writeLock().lock();
+try {
+if (abortedTransactions.isEmpty())
+return acquiredRecords;
+// The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveAcquiredBatchRecords(subMap, recordBatch);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+// Visible for testing.
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecords,
+List recordsToArchive
+) {
+lock.writeLock().lock();
+try {
+List result = new ArrayList<>();
+
+for (AcquiredRecords acquiredRecord : acquiredRecords) {
+List tempAcquiredRecords = new ArrayList<>();
+tempAcquiredRecords.add(acquiredRecord);
+for (RecordBatch recordBatch : recordsToArchive) {
+List newAcquiredRecords = new 
ArrayList<>();
+for (AcquiredRecords temp : tempAcquiredRecords) {
+// Check if record batch overlaps with the acquired 
records.
+if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+// Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(temp.firstOffset())
+.setLastOffset(recordBatch.baseOffset() - 
1)
+.setDeliveryCount((short) 1));
+}
+if (temp.lastOffset() > recordBatch.lastOffset()) {
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(recordBatch.lastOffset() + 
1)
+.setLastOffset(temp.lastOffset())
+.setDeliveryCount((short) 1));
+}
+} else {
+newAcquiredRecords.add(temp);
+}
+}
+tempAcquiredRecords = newAcquiredRecords;
+}
+result.addAll(tempAcquiredRecords);
+}
+return result;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void archiveAcquiredBatchRecords(NavigableMap 
subMap, RecordBatch recordBatch) {
+lock.writeLock().lock();
+try {
+// The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+// or spans over multiple fetched batches. The state can vary per 
offset itself from
+// the fetched batch in case of subset.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+
+// If startOffs

Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013704465


##
server-common/src/main/java/org/apache/kafka/server/storage/log/FetchIsolation.java:
##
@@ -25,11 +25,11 @@ public enum FetchIsolation {
 TXN_COMMITTED;
 
 public static FetchIsolation of(FetchRequest request) {
-return of(request.replicaId(), request.isolationLevel());
+return of(request.replicaId(), request.isolationLevel(), false);
 }
 
-public static FetchIsolation of(int replicaId, IsolationLevel 
isolationLevel) {
-if (!FetchRequest.isConsumer(replicaId)) {
+public static FetchIsolation of(int replicaId, IsolationLevel 
isolationLevel, boolean isShareFetchRequest) {
+if (!FetchRequest.isConsumer(replicaId) && !isShareFetchRequest) {

Review Comment:
   I think if I just use `replicaId` as -1, that also works. But I wasn't too 
sure if that's the right approach. Hence, opted for adding a new variable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error

2025-03-26 Thread Lorcan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938509#comment-17938509
 ] 

Lorcan commented on KAFKA-19022:


Upon further investigation and reading the file FetchResponse.json, which is 
used to generate the code for FetchResponseData, it looks like a [Kafka 
Improvement 
Proposal|https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals#KafkaImprovementProposals-Whatisconsidereda%22majorchange%22thatneedsaKIP?]
 would be needed.

> Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID 
> error
> --
>
> Key: KAFKA-19022
> URL: https://issues.apache.org/jira/browse/KAFKA-19022
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, logging
>Affects Versions: 3.9.0
>Reporter: Ranganath Samudrala
>Assignee: Lorcan
>Priority: Major
>
> While migrating Kafka from zookeeper to kraft, we see errors in logs like
> {{INCONSISTENT_CLUSTER_ID in FETCH response }}
> or
> {{INCONSISTENT_CLUSTER_ID in VOTER response }}
> But cluster IDs being compared is not displayed in logs so there is not 
> enough information to see where the issue is. Is the class data *clusterId* 
> empty (which could potentially be a bug?) or incoming *clusterId* empty or 
> incorrect?
> [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
> {code:java}
>     private boolean hasValidClusterId(String requestClusterId) {
>         // We don't enforce the cluster id if it is not provided.
>         if (requestClusterId == null)Unknown macro: {
>                       return true;
>                  }
>                 return clusterId.equals(requestClusterId);
>      }
> .
> .
>     private CompletableFuture handleFetchRequest(
>         RaftRequest.Inbound requestMetadata,
>         long currentTimeMs
>     ) {
>         FetchRequestData request = (FetchRequestData) requestMetadata.data();
>         if (!hasValidClusterId(request.clusterId())) {
>                      return completedFuture(new 
> FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
>                  }
> .
> .
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013699195


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions
+) {
+lock.writeLock().lock();
+try {
+if (abortedTransactions.isEmpty())
+return acquiredRecords;
+// The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveAcquiredBatchRecords(subMap, recordBatch);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+// Visible for testing.
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecords,
+List recordsToArchive
+) {
+lock.writeLock().lock();
+try {
+List result = new ArrayList<>();
+
+for (AcquiredRecords acquiredRecord : acquiredRecords) {
+List tempAcquiredRecords = new ArrayList<>();
+tempAcquiredRecords.add(acquiredRecord);
+for (RecordBatch recordBatch : recordsToArchive) {
+List newAcquiredRecords = new 
ArrayList<>();
+for (AcquiredRecords temp : tempAcquiredRecords) {
+// Check if record batch overlaps with the acquired 
records.
+if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+// Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(temp.firstOffset())
+.setLastOffset(recordBatch.baseOffset() - 
1)
+.setDeliveryCount((short) 1));
+}
+if (temp.lastOffset() > recordBatch.lastOffset()) {
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(recordBatch.lastOffset() + 
1)
+.setLastOffset(temp.lastOffset())
+.setDeliveryCount((short) 1));
+}
+} else {
+newAcquiredRecords.add(temp);
+}
+}
+tempAcquiredRecords = newAcquiredRecords;
+}
+result.addAll(tempAcquiredRecords);
+}
+return result;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void archiveAcquiredBatchRecords(NavigableMap 
subMap, RecordBatch recordBatch) {
+lock.writeLock().lock();
+try {
+// The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+// or spans over multiple fetched batches. The state can vary per 
offset itself from
+// the fetched batch in case of subset.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+
+// If startOffs

Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013997096


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions
+) {
+lock.writeLock().lock();
+try {
+if (abortedTransactions.isEmpty())
+return acquiredRecords;
+// The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveAcquiredBatchRecords(subMap, recordBatch);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+// Visible for testing.
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecords,
+List recordsToArchive
+) {
+lock.writeLock().lock();
+try {
+List result = new ArrayList<>();
+
+for (AcquiredRecords acquiredRecord : acquiredRecords) {
+List tempAcquiredRecords = new ArrayList<>();
+tempAcquiredRecords.add(acquiredRecord);
+for (RecordBatch recordBatch : recordsToArchive) {
+List newAcquiredRecords = new 
ArrayList<>();
+for (AcquiredRecords temp : tempAcquiredRecords) {
+// Check if record batch overlaps with the acquired 
records.
+if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+// Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(temp.firstOffset())
+.setLastOffset(recordBatch.baseOffset() - 
1)
+.setDeliveryCount((short) 1));
+}
+if (temp.lastOffset() > recordBatch.lastOffset()) {
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(recordBatch.lastOffset() + 
1)
+.setLastOffset(temp.lastOffset())
+.setDeliveryCount((short) 1));
+}
+} else {
+newAcquiredRecords.add(temp);
+}
+}
+tempAcquiredRecords = newAcquiredRecords;
+}
+result.addAll(tempAcquiredRecords);
+}
+return result;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void archiveAcquiredBatchRecords(NavigableMap 
subMap, RecordBatch recordBatch) {
+lock.writeLock().lock();
+try {
+// The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+// or spans over multiple fetched batches. The state can vary per 
offset itself from
+// the fetched batch in case of subset.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+
+// If startOffs

Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   
https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9
 is the commit I try moving `configure` to constructor but I finally gave it up.
   
   The most important consideration is `RemoteLogMetadataManager` and 
`RemoteStorageManager` are public plugin and we should not change this flow 
without any KIP.
   
   If I misunderstood, please correct me.
   
   I will wait other feedback and continue to work later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   
https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9
 is the commit I try moving `configure` to constructor but I finally gave it up.
   
   The most important consideration is `RemoteLogMetadataManager` and 
`RemoteStorageManager` are public plugin and we should not change this flow 
without any KIP.
   
   If I misunderstood, please correct me.  I will wait other feedback and 
continue to work later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations

2025-03-26 Thread Herman Kolstad Jakobsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938578#comment-17938578
 ] 

Herman Kolstad Jakobsen commented on KAFKA-7699:


The KIP can be found at [KIP-1146: Anchored wall-clock punctuation - Apache 
Kafka - Apache Software 
Foundation|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+wall-clock+punctuation]

> Improve wall-clock time punctuations
> 
>
> Key: KAFKA-7699
> URL: https://issues.apache.org/jira/browse/KAFKA-7699
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Herman Kolstad Jakobsen
>Priority: Major
>  Labels: needs-kip
>
> Currently, wall-clock time punctuation allow to schedule periodic call backs 
> based on wall-clock time progress. The punctuation time starts, when the 
> punctuation is scheduled, thus, it's non-deterministic what is desired for 
> many use cases (I want a call-back in 5 minutes from "now").
> It would be a nice improvement, to allow users to "anchor" wall-clock 
> punctation, too, similar to a cron job: Thus, a punctuation would be 
> triggered at "fixed" times like the beginning of the next hour, independent 
> when the punctuation was registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   
https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9
 is the commit I try moving `configure` to constructor but I finally gave it up.
   
   The most important consideration is `RemoteLogMetadataManager` and 
`RemoteStorageManager` are public plugin and we should not change this flow 
without any KIP.
   
   If I misunderstood, please correct me.
   I will wait other feedback and continue to work later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18827: Initialize share group state group coordinator impl. [3/N] [kafka]

2025-03-26 Thread via GitHub


AndrewJSchofield commented on PR #19026:
URL: https://github.com/apache/kafka/pull/19026#issuecomment-2754166976

   @smjn Please can you merge the latest changes from trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   
https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9
 is the commit I try moving `configure` to constructor but I finally gave it up.
   
   The most important consideration is `RemoteLogMetadataManager` and 
`RemoteStorageManager` are public plugin and we should not change this flow 
without any KIP.
   
   If I misunderstood, please correct me.
   I will continue to work after getting other feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19286:
URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -407,14 +413,17 @@ private void configureRLMM() {
 rlmmProps.put(LOG_DIR_CONFIG, logDir);
 rlmmProps.put("cluster.id", clusterId);
 
-remoteLogMetadataManager.configure(rlmmProps);
+remoteLogMetadataManagerPlugin.get().configure(rlmmProps);
 }
 
 public void startup() {
 // Initialize and configure RSM and RLMM. This will start RSM, RLMM 
resources which may need to start resources
 // in connecting to the brokers or remote storages.
 configureRSM();
 configureRLMM();
+// the withPluginMetrics() method will be called when the plugin is 
instantiated (after configure() if the plugin also implements Configurable)

Review Comment:
   
https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9
 is the commit I try moving `configure` to constructor but I finally gave it up.
   
   The most important consideration is `RemoteLogMetadataManager` and 
`RemoteStorageManager` are public plugin and we should not change this flow 
without any KIP; if I misunderstood please correct me.
   
   I will continue to work after getting other feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


apoorvmittal10 commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013847250


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions
+) {
+lock.writeLock().lock();
+try {
+if (abortedTransactions.isEmpty())
+return acquiredRecords;
+// The record batches that need to be archived in cachedState 
because they were a part of aborted transactions.
+List recordsToArchive = 
fetchAbortedTransactionRecordBatches(batches, abortedTransactions);
+for (RecordBatch recordBatch : recordsToArchive) {
+// Archive the offsets/batches in the cached state.
+NavigableMap subMap = 
fetchSubMap(recordBatch);
+archiveAcquiredBatchRecords(subMap, recordBatch);
+}
+return filterRecordBatchesFromAcquiredRecords(acquiredRecords, 
recordsToArchive);
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+// Visible for testing.
+List filterRecordBatchesFromAcquiredRecords(
+List acquiredRecords,
+List recordsToArchive
+) {
+lock.writeLock().lock();
+try {
+List result = new ArrayList<>();
+
+for (AcquiredRecords acquiredRecord : acquiredRecords) {
+List tempAcquiredRecords = new ArrayList<>();
+tempAcquiredRecords.add(acquiredRecord);
+for (RecordBatch recordBatch : recordsToArchive) {
+List newAcquiredRecords = new 
ArrayList<>();
+for (AcquiredRecords temp : tempAcquiredRecords) {
+// Check if record batch overlaps with the acquired 
records.
+if (temp.firstOffset() <= recordBatch.lastOffset() && 
temp.lastOffset() >= recordBatch.baseOffset()) {
+// Split the acquired record into parts before, 
inside, and after the overlapping record batch.
+if (temp.firstOffset() < recordBatch.baseOffset()) 
{
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(temp.firstOffset())
+.setLastOffset(recordBatch.baseOffset() - 
1)
+.setDeliveryCount((short) 1));
+}
+if (temp.lastOffset() > recordBatch.lastOffset()) {
+newAcquiredRecords.add(new AcquiredRecords()
+.setFirstOffset(recordBatch.lastOffset() + 
1)
+.setLastOffset(temp.lastOffset())
+.setDeliveryCount((short) 1));
+}
+} else {
+newAcquiredRecords.add(temp);
+}
+}
+tempAcquiredRecords = newAcquiredRecords;
+}
+result.addAll(tempAcquiredRecords);
+}
+return result;
+} finally {
+lock.writeLock().unlock();
+}
+}
+
+private void archiveAcquiredBatchRecords(NavigableMap 
subMap, RecordBatch recordBatch) {
+lock.writeLock().lock();
+try {
+// The fetched batch either is exact fetch equivalent batch 
(mostly), subset
+// or spans over multiple fetched batches. The state can vary per 
offset itself from
+// the fetched batch in case of subset.
+for (Map.Entry entry : subMap.entrySet()) {
+InFlightBatch inFlightBatch = entry.getValue();
+
+// If startOffse

Re: [PR] (WIP) KAFKA-18409: ShareGroupStateMessageFormatter should use ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


dajac commented on PR #18510:
URL: https://github.com/apache/kafka/pull/18510#issuecomment-2753880538

   @brandboat We have merged https://github.com/apache/kafka/pull/18695. We can 
proceed with your PR. Please ping me when the PR is ready for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-18792) Enforce uniform PR structure for merge queue

2025-03-26 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936903#comment-17936903
 ] 

David Arthur edited comment on KAFKA-18792 at 3/19/25 6:35 PM:
---

Yes I think that makes sense [~mimaison]. "Co-authored-by:" is a git trailer 
which is basically like an email footer. Our "Reviewers:" is also a trailer. We 
do some trailer validation already to check for "Reviewers", so it would be 
easy to add/verify "Co-authored-by".

 

We'll need to experiment to see what happens with a PR has multiple authors. 
Today, GitHub is adding "Co-authored-by" to the pre-filled commit message when 
we click "Squash and Merge". I'm not sure if the same happens when adding to 
the merge queue.

 

—

BTW, related to this, I would like to move us towards using these structured 
trailers a bit more.
 * Reviewed-by: anyone who left feedback on the PR
 * Approved-by: committers who approved the PR
 * Helped-by: shout-outs for inspiration or significant help
 * Signed-off-by: commit signatory
 * Fixes: KAFKA-12345
 * References: KIP-123, KAFKA-23456
 * etc

 

This would help us get away from putting metadata into our commit subjects 
which will make the log a bit nicer looking. "git log" has options for 
formatting trailers, so we can still easily generate a changelog like 


{code:java}
KAFKA-12345 Something happened
KAFKA-23456 Another thing {code}
 


was (Author: davidarthur):
Yes I think that makes sense [~mimaison]. "Co-authored-by:" is a git trailer 
which is basically like an email footer. Our "Reviewers:" is also a trailer. We 
do some trailer validation already to check for "Reviewers", so it would be 
easy to add/verify "Co-authored-by".

 

We'll need to experiment to see what happens with a PR has multiple authors. 
Today, GitHub is adding "Co-authored-by" to the pre-filled commit message when 
we click "Squash and Merge". I'm not sure if the same happens when adding to 
the merge queue.

 

---

 

BTW, related to this, I would like to move us towards using these structured 
trailers a bit more.
 * Reviewed-by: anyone who left feedback on the PR
 * Approved-by: committers who approved the PR
 * Helped-by: shout-outs for inspiration or significant help
 * Signed-off-by: commit signatory 
 * Fixes: KAFKA-12345
 * References: KIP-123, KAFKA-23456
 * etc

 

 

> Enforce uniform PR structure for merge queue
> 
>
> Key: KAFKA-18792
> URL: https://issues.apache.org/jira/browse/KAFKA-18792
> Project: Kafka
>  Issue Type: Sub-task
>  Components: build
>Reporter: David Arthur
>Priority: Major
>
> Since the merge queue will use the repo's default commit message, we cannot 
> customize the commit when adding to the queue. Instead, we will use the PR 
> title and body as the commit message and subject (KAFKA-18791).
>  
> In order to ensure well-formed commits, we will need a GitHub Action to 
> validate the PR title and PR body.
>  
> Specifically, we should have the following checks:
>  * PR title includes KAFKA-X, MINOR, or HOTFIX
>  * PR title is less than 120 characters
>  * PR title is not automatically truncated by GitHub (ending in ...)
>  * PR body includes "Reviewers:" line



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19277:
URL: https://github.com/apache/kafka/pull/19277#discussion_r2013533692


##
storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+private final File tmpDir = TestUtils.tempDirectory();
+private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+private final MockTime time = new MockTime(0, 0);
+
+@AfterEach
+public void tearDown() throws IOException {
+Utils.delete(tmpDir);
+}
+
+/**
+ * Test broker-side compression configuration
+ */
+@ParameterizedTest
+@MethodSource("allCompressionParameters")
+public void testBrokerSideCompression(CompressionType 
messageCompressionType, BrokerCompressionType brokerCompressionType) throws 
IOException {
+Compression messageCompression = 
Compression.of(messageCompressionType).build();
+
+/* Configure broker-side compression */
+UnifiedLog log = UnifiedLog.create(
+logDir,
+new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, 
brokerCompressionType.name)),
+0L,
+0L,
+time.scheduler,
+new BrokerTopicStats(),
+time,
+5 * 60 * 1000,
+new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+new LogDirFailureChannel(10),
+true,
+Optional.empty()
+);
+
+/* Append two messages */
+log.appendAsLeader(
+MemoryRecords.withRecords(messageCompression, 0,
+new SimpleRecord("hello".getBytes()),
+new SimpleRecord("there".getBytes())
+), 0
+);
+
+if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
+RecordBatch batch = readBatch(log, 0);
+Compression targetCompression = 
BrokerCompressionType.targetCompression(log.config().compression, null);
+assertEquals(targetCompression.type(), batch.compressionType(), 
"Compression at offset 0 should produce " + brokerCompressionType);
+} else {
+assertEquals(messageCompressionType, readBatch(log, 
0).compressionType(), "Compression at offset 0 should produce " + 
messageCompressionType);
+}

Review Comment:
   All comments are addressed, thanks for your review!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18893: KIP-877 Add support for ReplicaSelector [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19064:
URL: https://github.com/apache/kafka/pull/19064#discussion_r2004221275


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -1723,7 +1723,7 @@ class ReplicaManager(val config: KafkaConfig,
   metadata => findPreferredReadReplica(partition, metadata, 
params.replicaId, fetchInfo.fetchOffset, fetchTimeMs))
 
 if (preferredReadReplica.isDefined) {
-  replicaSelectorOpt.foreach { selector =>
+  replicaSelectorPlugin.foreach { selector =>
 debug(s"Replica selector ${selector.getClass.getSimpleName} 
returned preferred replica " +

Review Comment:
   Not sure I understand is correct or not.
   I changed the debug message to `debug(s"Replica selector plugin 
${selector.getClass.getSimpleName} returned preferred replica " +
 s"${preferredReadReplica.get} for ${params.clientMetadata}")`
 
   If I am misunderstanding , please correct me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


dajac commented on code in PR #18695:
URL: https://github.com/apache/kafka/pull/18695#discussion_r2013562510


##
tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class CoordinatorRecordMessageFormatterTest {
+private static final String TOPIC = "TOPIC";
+
+protected abstract CoordinatorRecordMessageFormatter formatter();
+protected abstract Stream parameters();
+
+@ParameterizedTest
+@MethodSource("parameters")
+public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, 
String expectedOutput) {

Review Comment:
   Sure. I added test cases to the concrete classes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


dajac commented on PR #18695:
URL: https://github.com/apache/kafka/pull/18695#issuecomment-2753506292

   @chia7712 Thanks for your comments. I just addressed them.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-03-26 Thread via GitHub


wernerdv commented on code in PR #19216:
URL: https://github.com/apache/kafka/pull/19216#discussion_r2007165486


##
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java:
##
@@ -0,0 +1,800 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class manages the state of each partition being cleaned.
+ * LogCleaningState defines the cleaning states that a TopicPartition can be 
in.
+ * 1. None: No cleaning state in a TopicPartition. In this 
state, it can become LogCleaningInProgress
+ *  or LogCleaningPaused(1). Valid previous state 
are LogCleaningInProgress and LogCleaningPaused(1)
+ * 2. LogCleaningInProgress   : The cleaning is currently in progress. In this 
state, it can become None when log cleaning is finished
+ *  or become LogCleaningAborted. Valid previous 
state is None.
+ * 3. LogCleaningAborted  : The cleaning abort is requested. In this 
state, it can become LogCleaningPaused(1).
+ *  Valid previous state is LogCleaningInProgress.
+ * 4-a. LogCleaningPaused(1)  : The cleaning is paused once. No log cleaning 
can be done in this state.
+ *  In this state, it can become None or 
LogCleaningPaused(2).
+ *  Valid previous state is None, 
LogCleaningAborted or LogCleaningPaused(2).
+ * 4-b. LogCleaningPaused(i)  : The cleaning is paused i times where i>= 2. No 
log cleaning can be done in this state.
+ *  In this state, it can become 
LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
+ *  Valid previous state is LogCleaningPaused(i-1) 
or LogCleaningPaused(i+1).
+ */
+public class LogCleanerManager {
+public static final String OFFSET_CHECKPOINT_FILE = 
"cleaner-offset-checkpoint";
+
+private static final Logger LOG = 
LoggerFactory.getLogger("kafka.log.LogCleaner");
+
+private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = 
"uncleanable-partitions-count";
+private static final String UNCLEANABLE_BYTES_METRIC_NAME = 
"uncleanable-bytes";
+private static final String MAX_DIRTY_PERCENT_METRIC_NAME = 
"max-dirty-percent";
+private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = 
"time-since-last-run-ms";
+
+// Visible for testing
+public static final Set GAUGE_METRIC_NAME_NO_TAG = 
Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME);
+
+// For compatibility, metrics are defined to be under 
`kafka.log.LogCleanerManager` class
+private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "LogCleanerManager");
+
+/* the set of logs currently being cleaned */
+private final Map inProgress = new 
HashMap<>();
+
+/* the set of uncleanable partitions (partitions that have raised an 
unexpected error during cleaning)
+ *   for each log directory */
+private final Map> uncleanablePartitions = new 
HashMap<>();
+
+/* a global lock used to control all access to the in-progress set and the 
offset checkpoints */
+private final Lock lock = new Re

[jira] [Resolved] (KAFKA-18893) Add support for ReplicaSelector

2025-03-26 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-18893.

Fix Version/s: 4.1.0
   Resolution: Fixed

> Add support for ReplicaSelector
> ---
>
> Key: KAFKA-18893
> URL: https://issues.apache.org/jira/browse/KAFKA-18893
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: TaiJuWu
>Priority: Major
> Fix For: 4.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013529068


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -847,6 +859,7 @@ public ShareAcquiredRecords acquire(
 }

Review Comment:
   we are doing the filtering after this point, so this case would be 
automatically covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-19046) Change delete cleanup policy to compact cleanup policy

2025-03-26 Thread George Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-19046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

George Yang updated KAFKA-19046:

Description: 
The internal topics of MirrorMaker 2 (MM2) sometimes report the following error:


{code:java}
Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)
org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' 
supplied via the 'offset.storage.topic' property is required to have 
'cleanup.policy=compact' to guarantee consistency and durability of source 
connector offsets, but found the topic currently has 'cleanup.policy=delete'. 
Continuing would likely result in eventually losing source connector offsets 
and problems restarting this Connect cluster in the future. Change the 
'offset.storage.topic' property in the Connect worker configurations to use a 
topic with 'cleanup.policy=compact'.
at 
org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581)
at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
[2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping 
(org.apache.kafka.connect.mirror.MirrorMaker:208){code}
 
This results in the MM2 pods in the cluster entering a CrashLoopBackOff state 
repeatedly. When changing the configuration via kafka-configs.sh, the process 
runs fine. However, as we know, the default Kafka broker configuration for 
log.cleanup.policy is set to delete, while the default cleanup policy for MM2 
is set to compact. It appears that the policy for offset.storage.topic must be 
compact, and similarly for status.storage and config.storage.
 
I want to configure the cleanup policy for these three topics to always be 
compact. I attempted to configure them in connect-mirror-maker.properties as 
shown below, but all attempts failed:


{code:java}
offset.storage.topic.properties.cleanup.policy=compact
status.storage.topic.properties.cleanup.policy=compact
config.storage.topic.properties.cleanup.policy=compact{code}

or 


{code:java}
offset.storage.topic.cleanup.policy=compact
status.storage.topic.cleanup.policy=compact
config.storage.topic.cleanup.policy=compact{code}

The logs show that the properties are unknown and report a failure in topic 
creation:

 
{code:java}
Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1]
org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 
'mm2-offsets.cb.internal': Unknown topic config name: 
topic.properties.cleanup.policy
        at 
org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474)
        at 
org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345)
        at 
org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363)
        at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57)
        at 
org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43)
        at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247)
        at 
org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231)
        at 
org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242)
        at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233)
        at 
org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373)
        at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
        at java.base/java.util.co

Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013589373


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long 
partitionDataStartOffset) thro
 }
 }
 
+private ShareAcquiredRecords 
filterAbortedTransactionalAcquiredRecords(FetchPartitionData 
fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords 
shareAcquiredRecords) {
+if (isolationLevel != FetchIsolation.TXN_COMMITTED)
+return shareAcquiredRecords;
+// When FetchIsolation.TXN_COMMITTED is used as isolation type by the 
share group, we need to filter any
+// transactions that were aborted/did not commit due to timeout.
+List result = 
filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), 
shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions);
+int acquiredCount = 0;
+for (AcquiredRecords records : result) {
+acquiredCount += (int) (records.lastOffset() - 
records.firstOffset() + 1);
+}
+return new ShareAcquiredRecords(result, acquiredCount);
+}
+
+private List filterAbortedTransactionalRecords(
+Iterable batches,
+List acquiredRecords,
+Optional> 
abortedTransactions

Review Comment:
   so far in the testing I have observed that the `abortedTransactions` 
contains all the first offset of all the fetched record batches that have been 
aborted in **ascending order** of the first offset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013577087


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java:
##
@@ -83,6 +92,8 @@ public final class GroupConfig extends AbstractConfig {
 
 public final int streamsNumStandbyReplicas;
 
+public final int shareIsolationLevel;

Review Comment:
   my bad. I misread it. Changed the code to reflect the right values.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error

2025-03-26 Thread Lorcan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938478#comment-17938478
 ] 

Lorcan edited comment on KAFKA-19022 at 3/26/25 8:20 AM:
-

Hi [~rsamudrala], I've checked the KafkaRaftClient class for the 
Inconsistent_Cluster_Id error and it looks like there are several instances 
where a customised error message with the cluster ids is not being set:

VoteResponseData
BeginQuorumEpochResponseData
EndQuorumEpochResponseData
FetchResponseData
FetchSnapshotResponseData

These all implement the ApiMessage, just like AddRaftVoterResponseData does. It 
looks like this would require a new schema for the above classes, given the 
logic when new fields have been added and I'm not sure if this would require a 
KIP or not.


was (Author: JIRAUSER308669):
Hi [~rsamudrala], I've checked the KafkaRaftClient class for the 
Inconsistent_Cluster_Id error and it looks like there are several instances 
where a customised error message with the cluster ids is not being set:

VoteResponseData
BeginQuorumEpochResponseData
EndQuorumEpochResponseData
FetchResponseData
FetchSnapshotResponseData

These all implement the ApiMessage, just like AddRaftVoterResponseData does. I 
can create a PR to add an errorMessage field to FetchResponseData to get 
feedback and see if this is a viable approach.

> Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID 
> error
> --
>
> Key: KAFKA-19022
> URL: https://issues.apache.org/jira/browse/KAFKA-19022
> Project: Kafka
>  Issue Type: Improvement
>  Components: kraft, logging
>Affects Versions: 3.9.0
>Reporter: Ranganath Samudrala
>Assignee: Lorcan
>Priority: Major
>
> While migrating Kafka from zookeeper to kraft, we see errors in logs like
> {{INCONSISTENT_CLUSTER_ID in FETCH response }}
> or
> {{INCONSISTENT_CLUSTER_ID in VOTER response }}
> But cluster IDs being compared is not displayed in logs so there is not 
> enough information to see where the issue is. Is the class data *clusterId* 
> empty (which could potentially be a bug?) or incoming *clusterId* empty or 
> incorrect?
> [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459]
> {code:java}
>     private boolean hasValidClusterId(String requestClusterId) {
>         // We don't enforce the cluster id if it is not provided.
>         if (requestClusterId == null)Unknown macro: {
>                       return true;
>                  }
>                 return clusterId.equals(requestClusterId);
>      }
> .
> .
>     private CompletableFuture handleFetchRequest(
>         RaftRequest.Inbound requestMetadata,
>         long currentTimeMs
>     ) {
>         FetchRequestData request = (FetchRequestData) requestMetadata.data();
>         if (!hasValidClusterId(request.clusterId())) {
>                      return completedFuture(new 
> FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()));
>                  }
> .
> .
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]

2025-03-26 Thread via GitHub


adixitconfluent commented on code in PR #19261:
URL: https://github.com/apache/kafka/pull/19261#discussion_r2013529068


##
core/src/main/java/kafka/server/share/SharePartition.java:
##
@@ -847,6 +859,7 @@ public ShareAcquiredRecords acquire(
 }

Review Comment:
   we are doing the filtering after this point, so this should case would be 
automatically covered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]

2025-03-26 Thread via GitHub


TaiJuWu commented on code in PR #19277:
URL: https://github.com/apache/kafka/pull/19277#discussion_r2013548508


##
storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java:
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.storage.log.FetchIsolation;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BrokerCompressionTest {
+private final File tmpDir = TestUtils.tempDirectory();
+private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+private final MockTime time = new MockTime(0, 0);
+
+@AfterEach
+public void tearDown() throws IOException {
+Utils.delete(tmpDir);
+}
+
+/**
+ * Test broker-side compression configuration
+ */
+@ParameterizedTest
+@MethodSource("allCompressionParameters")
+public void testBrokerSideCompression(CompressionType 
messageCompressionType, BrokerCompressionType brokerCompressionType) throws 
IOException {
+Compression messageCompression = 
Compression.of(messageCompressionType).build();
+
+/* Configure broker-side compression */
+UnifiedLog log = UnifiedLog.create(
+logDir,
+new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, 
brokerCompressionType.name)),
+0L,
+0L,
+time.scheduler,
+new BrokerTopicStats(),
+time,
+5 * 60 * 1000,
+new 
ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+
TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+new LogDirFailureChannel(10),
+true,
+Optional.empty()
+);
+
+/* Append two messages */
+log.appendAsLeader(
+MemoryRecords.withRecords(messageCompression, 0,
+new SimpleRecord("hello".getBytes()),
+new SimpleRecord("there".getBytes())
+), 0
+);
+
+if (brokerCompressionType != BrokerCompressionType.PRODUCER) {
+RecordBatch batch = readBatch(log, 0);
+Compression targetCompression = 
BrokerCompressionType.targetCompression(log.config().compression, null);
+assertEquals(targetCompression.type(), batch.compressionType(), 
"Compression at offset 0 should produce " + brokerCompressionType);
+} else {
+assertEquals(messageCompressionType, readBatch(log, 
0).compressionType(), "Compression at offset 0 should produce " + 
messageCompressionType);
+}
+}
+
+private static RecordBatch readBatch(UnifiedLog log, int offset) throws 
IOException {

Review Comment:
   Removed the parameter and renamed method to `readFirstBatch`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


dajac commented on code in PR #18695:
URL: https://github.com/apache/kafka/pull/18695#discussion_r2013553668


##
tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java:
##
@@ -31,44 +34,50 @@
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 
-public abstract class ApiMessageFormatter implements MessageFormatter {
-
+public abstract class CoordinatorRecordMessageFormatter implements 
MessageFormatter {
 private static final String TYPE = "type";
 private static final String VERSION = "version";
 private static final String DATA = "data";
 private static final String KEY = "key";
 private static final String VALUE = "value";
-static final String UNKNOWN = "unknown";
+
+private final CoordinatorRecordSerde serde;
+
+public CoordinatorRecordMessageFormatter(CoordinatorRecordSerde serde) {
+this.serde = serde;
+}
 
 @Override
 public void writeTo(ConsumerRecord consumerRecord, 
PrintStream output) {
+if (Objects.isNull(consumerRecord.key())) return;
+
 ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
+try {
+CoordinatorRecord record = serde.deserialize(
+ByteBuffer.wrap(consumerRecord.key()),
+consumerRecord.value() != null ? 
ByteBuffer.wrap(consumerRecord.value()) : null
+);
+
+if (!isRecordTypeAllowed(record.key().apiKey())) return;
 
-byte[] key = consumerRecord.key();
-if (Objects.nonNull(key)) {
-short keyVersion = ByteBuffer.wrap(key).getShort();
-JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key));
+json
+.putObject(KEY)
+.put(TYPE, record.key().apiKey())

Review Comment:
   That's right. We will it in https://github.com/apache/kafka/pull/18510.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]

2025-03-26 Thread via GitHub


dajac commented on code in PR #18695:
URL: https://github.com/apache/kafka/pull/18695#discussion_r2013554650


##
tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MessageFormatter;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
+
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)

Review Comment:
   It is used to allow using a non-static method in `@MethodSource`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18826: Add global thread metrics [kafka]

2025-03-26 Thread via GitHub


mjsax commented on code in PR #18953:
URL: https://github.com/apache/kafka/pull/18953#discussion_r2015214289


##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java:
##
@@ -419,8 +485,36 @@ private Topology complexTopology() {
 return builder.build();
 }
 
-private Topology simpleTopology() {
+private void addGlobalStore(final StreamsBuilder builder) {
+builder.addGlobalStore(Stores.keyValueStoreBuilder(
+Stores.inMemoryKeyValueStore("iq-test-store"),
+Serdes.String(),
+Serdes.String()
+),
+globalStoreTopic,
+Consumed.with(Serdes.String(), Serdes.String()),
+() -> new Processor<>() {
+private KeyValueStore store;
+
+@Override
+public void init(final ProcessorContext 
context) {
+store = context.getStateStore("iq-test-store");
+}
+
+@Override
+public void process(final Record record) {
+store.put(record.key(), record.value());
+globalStoreIterator = store.all();

Review Comment:
   Seems my IntelliJ warn setting are different. Should be ok w/o it, too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]

2025-03-26 Thread via GitHub


chia7712 commented on PR #19216:
URL: https://github.com/apache/kafka/pull/19216#issuecomment-2756906467

   @wernerdv thanks for taking on this major migration. Really well done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19042: [1/N] Move ConsumerTopicCreationTest to client-integration-tests module [kafka]

2025-03-26 Thread via GitHub


m1a2st commented on code in PR #19283:
URL: https://github.com/apache/kafka/pull/19283#discussion_r2015550703


##
clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerTopicCreationTest.java:
##
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.test.api.Type.KRAFT;
+import static 
org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ConsumerTopicCreationTest {
+private static final String TOPIC = "topic";
+private static final long POLL_TIMEOUT = 1000;
+
+@ClusterTemplate("autoCreateTopicsConfigs")
+void 
testAsyncConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance 
cluster) {
+try (Consumer consumer = 
createConsumer(GroupProtocol.CONSUMER, true, cluster.bootstrapServers())) {
+consumer.subscribe(List.of(TOPIC));
+consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+if (allowAutoCreateTopics(cluster))
+assertTrue(getAllTopics(cluster).contains(TOPIC));
+else
+assertFalse(getAllTopics(cluster).contains(TOPIC),
+"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + 
ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic 
automatically");
+}
+}
+
+@ClusterTemplate("autoCreateTopicsConfigs")
+void 
testAsyncConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance 
cluster) {
+try (Consumer consumer = 
createConsumer(GroupProtocol.CONSUMER, false, cluster.bootstrapServers())) {
+consumer.subscribe(List.of(TOPIC));
+consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+assertFalse(getAllTopics(cluster).contains(TOPIC),
+"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + 
ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic 
automatically");
+}
+}
+
+@ClusterTemplate("autoCreateTopicsConfigs")
+void 
testClassicConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance 
cluster) {
+try (Consumer consumer = 
createConsumer(GroupProtocol.CLASSIC, true, cluster.bootstrapServers())) {
+consumer.subscribe(List.of(TOPIC));
+consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+if (allowAutoCreateTopics(cluster))
+assertTrue(getAllTopics(cluster).contains(TOPIC));
+else
+assertFalse(getAllTopics(cluster).contains(TOPIC),
+"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + 
ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic 
automatically");
+}
+}
+
+@ClusterTemplate("autoCreateTopicsConfigs")
+void 
testClassicConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance 
cluster) {
+try (Consumer consumer = 
createConsumer(GroupProtocol.CLASSIC, false, cluster.bootstrapServers())) {
+consumer.subscribe(List.of(TOPIC));
+consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
+assertFalse(getAllTopics(cluster).contains(TOPIC),
+"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + 
ALL

Re: [PR] KAFKA-18379: Enforce resigned cannot transition to any other state in same epoch [kafka]

2025-03-26 Thread via GitHub


github-actions[bot] commented on PR #19236:
URL: https://github.com/apache/kafka/pull/19236#issuecomment-2756468366

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18607: Update jfreechart dependency [kafka]

2025-03-26 Thread via GitHub


mingdaoy commented on PR #19074:
URL: https://github.com/apache/kafka/pull/19074#issuecomment-2756489837

   @mimaison I'll test that. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18607: Update jfreechart dependency [kafka]

2025-03-26 Thread via GitHub


github-actions[bot] commented on PR #19074:
URL: https://github.com/apache/kafka/pull/19074#issuecomment-2756469253

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-19048) Minimal Movement Replica Balancing algorithm

2025-03-26 Thread Jialun Peng (Jira)
Jialun Peng created KAFKA-19048:
---

 Summary: Minimal Movement Replica Balancing algorithm
 Key: KAFKA-19048
 URL: https://issues.apache.org/jira/browse/KAFKA-19048
 Project: Kafka
  Issue Type: Improvement
  Components: generator
Reporter: Jialun Peng
Assignee: Jialun Peng


h2. Motivation

Kafka clusters typically require rebalancing of topic replicas after horizontal 
scaling to evenly distribute the load across new and existing brokers. The 
current rebalancing approach does not consider the existing replica 
distribution, often resulting in excessive and unnecessary replica movements. 
These unnecessary movements increase rebalance duration, consume significant 
bandwidth and CPU resources, and potentially disrupt ongoing production and 
consumption operations. Thus, a replica rebalancing strategy that minimizes 
movements while achieving an even distribution of replicas is necessary.
h2. Goals

The proposed approach prioritizes the following objectives:
 # {*}Minimal Movement{*}: Minimize the number of replica relocations during 
rebalancing.
 # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed across 
brokers.
 # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled.
 # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers.
 # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent 
failover traffic concentration in case of broker failures.

h2. Proposed Changes
h3. Rack-Level Replica Distribution

The following rules ensure balanced replica allocation at the rack level:
 # {*}When {{*}}{{{}*rackCount = replicationFactor*{}}}:

 * 
 ** Each rack receives exactly {{partitionCount}} replicas.

 # {*}When {{*}}{{{}*rackCount > replicationFactor*{}}}:

 * 
 ** If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ 
partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas.

 * 
 ** If weighted allocation {{{}< partitionCount{}}}: distribute remaining 
replicas using a weighted remainder allocation.

h3. Node-Level Replica Distribution
 # If the number of replicas assigned to a rack is not a multiple of the number 
of nodes in that rack, some nodes will host one additional replica compared to 
others.

 # {*}When {{*}}{{{}*rackCount = replicationFactor*{}}}:

 * 
 ** If all racks have an equal number of nodes, each node will host an equal 
number of replicas.

 * 
 ** If rack sizes vary, nodes in larger racks will host fewer replicas on 
average.

 # {*}When {{*}}{{{}*rackCount > replicationFactor*{}}}:

 * 
 ** If no rack has a significantly higher node weight, replicas will be evenly 
distributed.

 * 
 ** If a rack has disproportionately high node weight, those nodes will receive 
fewer replicas.

h3. Anti-Affinity Support

When anti-affinity is enabled, the rebalance algorithm ensures that replicas of 
the same partition do not colocate on the same rack. Brokers without rack 
configuration are excluded from anti-affinity checks.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Add doc for configuration parameters to keep the format consistent [kafka]

2025-03-26 Thread via GitHub


github-actions[bot] commented on PR #19244:
URL: https://github.com/apache/kafka/pull/19244#issuecomment-2756467904

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-19008 Remove scala version from artifacts [kafka]

2025-03-26 Thread via GitHub


github-actions[bot] commented on PR #19241:
URL: https://github.com/apache/kafka/pull/19241#issuecomment-2756468164

   A label of 'needs-attention' was automatically added to this PR in order to 
raise the
   attention of the committers. Once this issue has been triaged, the `triage` 
label
   should be removed to prevent this automation from happening again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-19048: Minimal Movement Replica Balancing algorithm [kafka]

2025-03-26 Thread via GitHub


pjl1070048431 opened a new pull request, #19297:
URL: https://github.com/apache/kafka/pull/19297

   ## Motivation
   
   Kafka clusters typically require rebalancing of topic replicas after 
horizontal scaling to evenly distribute the load across new and existing 
brokers. The current rebalancing approach does not consider the existing 
replica distribution, often resulting in excessive and unnecessary replica 
movements. These unnecessary movements increase rebalance duration, consume 
significant bandwidth and CPU resources, and potentially disrupt ongoing 
production and consumption operations. Thus, a replica rebalancing strategy 
that minimizes movements while achieving an even distribution of replicas is 
necessary.
   
   ## Goals
   
   The new replica rebalancing strategy aims to achieve the following 
objectives:
   
   1. **Minimal Movement**: Minimize the number of replica relocations during 
rebalancing.
   2. **Replica Balancing**: Ensure that replicas are evenly distributed across 
brokers.
   3. **Anti-Affinity Support**: Support rack-aware allocation when enabled.
   4. **Leader Balancing**: Distribute leader replicas evenly across brokers.
   5. **ISR Order Optimization**: Optimize adjacency relationships to prevent 
failover traffic concentration in case of broker failures.
   
   ## Proposed Changes
   
   ### Rack-Level Replica Distribution
   
   The following rules ensure balanced replica allocation at the rack level:
   
   1. **When** `**rackCount = replicationFactor**`:
  - Each rack receives exactly `partitionCount` replicas.
   2. **When** `**rackCount > replicationFactor**`:
  - If weighted allocation `(rackBrokers/totalBrokers × totalReplicas) ≥ 
partitionCount`: each rack receives exactly `partitionCount` replicas.
  - If weighted allocation `< partitionCount`: distribute remaining 
replicas using a weighted remainder allocation.
   
   ### Node-Level Replica Distribution
   
   1. If the number of replicas assigned to a rack is not a multiple of the 
number of nodes in that rack, some nodes will host one additional replica 
compared to others.
   2. **When** `**rackCount = replicationFactor**`:
  - If all racks have an equal number of nodes, each node will host an 
equal number of replicas.
  - If rack sizes vary, nodes in larger racks will host fewer replicas on 
average.
   3. **When** `**rackCount > replicationFactor**`:
  - If no rack has a significantly higher node weight, replicas will be 
evenly distributed.
  - If a rack has disproportionately high node weight, those nodes will 
receive fewer replicas.
   
   ### Anti-Affinity Support
   
   When anti-affinity is enabled, the rebalance algorithm ensures that replicas 
of the same partition do not colocate on the same rack. Brokers without rack 
configuration are excluded from anti-affinity checks.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >