[jira] [Commented] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error
[ https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938478#comment-17938478 ] Lorcan commented on KAFKA-19022: Hi [~rsamudrala], I've checked the KafkaRaftClient class for the Inconsistent_Cluster_Id error and it looks like there are several instances where a customised error message with the cluster ids is not being set: VoteResponseData BeginQuorumEpochResponseData EndQuorumEpochResponseData FetchResponseData FetchSnapshotResponseData These all implement the ApiMessage, just like AddRaftVoterResponseData does. I can create a PR to add an errorMessage field to FetchResponseData to get feedback and see if this is a viable approach. > Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID > error > -- > > Key: KAFKA-19022 > URL: https://issues.apache.org/jira/browse/KAFKA-19022 > Project: Kafka > Issue Type: Improvement > Components: kraft, logging >Affects Versions: 3.9.0 >Reporter: Ranganath Samudrala >Assignee: Lorcan >Priority: Major > > While migrating Kafka from zookeeper to kraft, we see errors in logs like > {{INCONSISTENT_CLUSTER_ID in FETCH response }} > or > {{INCONSISTENT_CLUSTER_ID in VOTER response }} > But cluster IDs being compared is not displayed in logs so there is not > enough information to see where the issue is. Is the class data *clusterId* > empty (which could potentially be a bug?) or incoming *clusterId* empty or > incorrect? > [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459] > {code:java} > private boolean hasValidClusterId(String requestClusterId) { > // We don't enforce the cluster id if it is not provided. > if (requestClusterId == null)Unknown macro: { > return true; > } > return clusterId.equals(requestClusterId); > } > . > . > private CompletableFuture handleFetchRequest( > RaftRequest.Inbound requestMetadata, > long currentTimeMs > ) { > FetchRequestData request = (FetchRequestData) requestMetadata.data(); > if (!hasValidClusterId(request.clusterId())) { > return completedFuture(new > FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); > } > . > . > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-18300) Improve coordinator records
[ https://issues.apache.org/jira/browse/KAFKA-18300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-18300. - Fix Version/s: 4.1.0 Resolution: Fixed > Improve coordinator records > --- > > Key: KAFKA-18300 > URL: https://issues.apache.org/jira/browse/KAFKA-18300 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 4.1.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013806185 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions +) { +lock.writeLock().lock(); +try { +if (abortedTransactions.isEmpty()) +return acquiredRecords; +// The record batches that need to be archived in cachedState because they were a part of aborted transactions. +List recordsToArchive = fetchAbortedTransactionRecordBatches(batches, abortedTransactions); +for (RecordBatch recordBatch : recordsToArchive) { +// Archive the offsets/batches in the cached state. +NavigableMap subMap = fetchSubMap(recordBatch); +archiveAcquiredBatchRecords(subMap, recordBatch); +} +return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); +} finally { +lock.writeLock().unlock(); +} +} + +// Visible for testing. +List filterRecordBatchesFromAcquiredRecords( +List acquiredRecords, +List recordsToArchive +) { +lock.writeLock().lock(); +try { +List result = new ArrayList<>(); + +for (AcquiredRecords acquiredRecord : acquiredRecords) { +List tempAcquiredRecords = new ArrayList<>(); +tempAcquiredRecords.add(acquiredRecord); +for (RecordBatch recordBatch : recordsToArchive) { +List newAcquiredRecords = new ArrayList<>(); +for (AcquiredRecords temp : tempAcquiredRecords) { +// Check if record batch overlaps with the acquired records. +if (temp.firstOffset() <= recordBatch.lastOffset() && temp.lastOffset() >= recordBatch.baseOffset()) { +// Split the acquired record into parts before, inside, and after the overlapping record batch. +if (temp.firstOffset() < recordBatch.baseOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(temp.firstOffset()) +.setLastOffset(recordBatch.baseOffset() - 1) +.setDeliveryCount((short) 1)); +} +if (temp.lastOffset() > recordBatch.lastOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(recordBatch.lastOffset() + 1) +.setLastOffset(temp.lastOffset()) +.setDeliveryCount((short) 1)); +} +} else { +newAcquiredRecords.add(temp); +} +} +tempAcquiredRecords = newAcquiredRecords; +} +result.addAll(tempAcquiredRecords); +} +return result; +} finally { +lock.writeLock().unlock(); +} +} + +private void archiveAcquiredBatchRecords(NavigableMap subMap, RecordBatch recordBatch) { +lock.writeLock().lock(); +try { +// The fetched batch either is exact fetch equivalent batch (mostly), subset +// or spans over multiple fetched batches. The state can vary per offset itself from +// the fetched batch in case of subset. +for (Map.Entry entry : subMap.entrySet()) { +InFlightBatch inFlightBatch = entry.getValue(); + +// If startOffs
[jira] [Resolved] (KAFKA-18612) Update ApiMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-18612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-18612. - Fix Version/s: 4.1.0 Resolution: Fixed Addressed by https://github.com/apache/kafka/commit/b6adec48c5501d7c6ceebabecf1321dd6c7b8835. > Update ApiMessageFormatter > -- > > Key: KAFKA-18612 > URL: https://issues.apache.org/jira/browse/KAFKA-18612 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 4.1.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] (WIP) KAFKA-18409: ShareGroupStateMessageFormatter should use ApiMessageFormatter [kafka]
brandboat commented on PR #18510: URL: https://github.com/apache/kafka/pull/18510#issuecomment-2753884551 > @brandboat We have merged https://github.com/apache/kafka/pull/18695. We can proceed with your PR. Please ping me when the PR is ready for review. Thanks for the heads-up, I'll handle this ASAP! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]
mimaison commented on code in PR #19216: URL: https://github.com/apache/kafka/pull/19216#discussion_r2013891681 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java: ## @@ -0,0 +1,793 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.kafka.server.util.LockUtils.inLock; + +/** + * This class manages the state (see {@link LogCleaningState}) of each partition being cleaned. + * + * 1. None: No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress + *or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1) + * 2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished + *or become LogCleaningAborted. Valid previous state is None. + * 3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1). + *Valid previous state is LogCleaningInProgress. + * 4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state. + *In this state, it can become None or LogCleaningPaused(2). + *Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2). + * 4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state. + *In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + *Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + * + */ +public class LogCleanerManager { +public static final String OFFSET_CHECKPOINT_FILE = "cleaner-offset-checkpoint"; + +private static final Logger LOG = LoggerFactory.getLogger("kafka.log.LogCleaner"); + +private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = "uncleanable-partitions-count"; +private static final String UNCLEANABLE_BYTES_METRIC_NAME = "uncleanable-bytes"; +private static final String MAX_DIRTY_PERCENT_METRIC_NAME = "max-dirty-percent"; +private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = "time-since-last-run-ms"; + +// Visible for testing +public static final Set GAUGE_METRIC_NAME_NO_TAG = Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME); + +// For compatibility, metrics are defined to be under `kafka.log.LogCleanerManager` class +private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "LogCleanerManager"); + +/** + * The set of logs currently being cleaned. + */ +private final Map inProgress = new HashMap<>(); + +/** + * The set of uncleanable partitions (partitions that have raised an unexpected error during cleaning) + * for each log directory. + */ +private final Map> uncleanablePartitions = new HashMap<>(); + +/** + * A global lock used to control all access to the in-progress set and the offset checkpoints.
Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]
m1a2st commented on code in PR #18930: URL: https://github.com/apache/kafka/pull/18930#discussion_r2014432276 ## clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java: ## @@ -289,6 +295,13 @@ protected Map postProcessParsedConfig(final Map CommonClientConfigs.warnDisablingExponentialBackoff(this); return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); } + +@Override +public Map originals() { Review Comment: When I traced `WorkerConfig`, I found that it was removed in the `originals()` method. This method is used to create admins like `TopicAdmin` or within `adminFactory`. Therefore, I think that when creating an `Admin`, we cannot pass this configuration. https://github.com/apache/kafka/blob/56d1dc1b6e12a8a325c91749485aa03bce17c7f1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java#L458 https://github.com/apache/kafka/blob/56d1dc1b6e12a8a325c91749485aa03bce17c7f1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L943 https://github.com/apache/kafka/blob/56d1dc1b6e12a8a325c91749485aa03bce17c7f1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L2146 If I missed or misunderstood anything, please let me know. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18982: Allow ClusterTests to ignore specific thread leaks [kafka]
mimaison commented on PR #19206: URL: https://github.com/apache/kafka/pull/19206#issuecomment-2755034626 I kind of agree with @gharris1727. Kafka is not using this code and this is not a public API, so this is effectively adding code to the Kafka code base just for your use case. Sure the change is small but the Apache Kafka project can't add and maintain code for private uses. I think the current state of what we offer to users to test things is not great. It's probably worth having discussions to see what we can do to improve in this area. In the past most of the testing infrastructure was in core so exposing it was hard. Now that a big chunk of that test infra has been moved to a dedicated module we should have more flexibility and options. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16894) Define switches to enable share groups for preview
[ https://issues.apache.org/jira/browse/KAFKA-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield updated KAFKA-16894: - Fix Version/s: 4.1.0 > Define switches to enable share groups for preview > -- > > Key: KAFKA-16894 > URL: https://issues.apache.org/jira/browse/KAFKA-16894 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 4.1.0 > > > Create group.version=2 and share.version=1 as the switches to enable share > groups in the broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR closed pull request #19294: MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. URL: https://github.com/apache/kafka/pull/19294 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR opened a new pull request, #19295: URL: https://github.com/apache/kafka/pull/19295 *What* Currently if we received just a control record in the `ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would not be updated as the record is ignored. But in the process, we lose the acknowledgment for this control record which is GAP for control records. PR fixes this by adding a check to include the pending acknowledgements even when the `ShareFetch` is empty. Added a unit test which verifies this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16894) Define switches to enable share groups for preview
[ https://issues.apache.org/jira/browse/KAFKA-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield updated KAFKA-16894: - Summary: Define switches to enable share groups for preview (was: Define group.version=2) > Define switches to enable share groups for preview > -- > > Key: KAFKA-16894 > URL: https://issues.apache.org/jira/browse/KAFKA-16894 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > > Create group.version=2 as the switch to enable share groups in the broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
mimaison commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014493205 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: See my attempt at implementing what I described: https://github.com/apache/kafka/commit/89e9a633cb605bc3a865c5001da704bd29628890 I don't think this changes the contracts we currently have with regards to `RemoteLogMetadataManager` and `RemoteStorageManager`. For both we still call the empty constructor, followed by a call to `configure()`, and now potentially a call to `withPluginMetrics()`, if the implementation is `Monitorable`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-19042: [2/N] Move TransactionsWithMaxInFlightOneTest to client-integration-tests module [kafka]
FrankYang0529 opened a new pull request, #19289: URL: https://github.com/apache/kafka/pull/19289 Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test infra and move it to client-integration-tests module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12744: dependency upgrade: `argparse4j` 0.7.0 -->> 0.9.0 [kafka]
dejan2609 commented on PR #19265: URL: https://github.com/apache/kafka/pull/19265#issuecomment-2755336115 Pardon me for not testing with checkstyle in a first place @mimaison. Anyway, I fixed imports but Gradle build fails again: `warning: [deprecation] newArgumentParser(String) in ArgumentParsers has been deprecated` I will paste complete stack trace here (and will use next comment to present alternatives). ``` dejan@dejan:~/kafka$ ./gradlew checkstyleMain checkstyleTest spotlessCheck Starting a Gradle Daemon, 2 incompatible and 1 stopped Daemons could not be reused, use --status for details > Configure project : Starting build with version 4.1.0-SNAPSHOT (commit id f17682ab) using Gradle 8.10.2, Java 17 and Scala 2.13.15 Build properties: ignoreFailures=false, maxParallelForks=8, maxScalacThreads=8, maxTestRetries=0 > Task :generator:compileJava /home/dejan/kafka/generator/src/main/java/org/apache/kafka/message/MessageGenerator.java:371: warning: [deprecation] newArgumentParser(String) in ArgumentParsers has been deprecated .newArgumentParser("message-generator") ^ error: warnings found and -Werror specified /home/dejan/kafka/generator/src/main/java/org/apache/kafka/message/checker/MetadataSchemaCheckerTool.java:43: warning: [deprecation] newArgumentParser(String) in ArgumentParsers has been deprecated ArgumentParser argumentParser = ArgumentParsers. ^ 1 error 2 warnings > Task :generator:compileJava FAILED FAILURE: Build failed with an exception. * What went wrong: Execution failed for task ':generator:compileJava'. > Compilation failed; see the compiler error output for details. * Try: > Run with --info option to get more log output. BUILD FAILED in 1m 54 actionable tasks: 9 executed, 45 up-to-date dejan@dejan:~/kafka$ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change
[ https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16394: Fix Version/s: 3.9.1 > ForeignKey LEFT join propagates null value on foreignKey change > --- > > Key: KAFKA-16394 > URL: https://issues.apache.org/jira/browse/KAFKA-16394 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.7.0 >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > Fix For: 3.9.1 > > Attachments: ForeignJoinTest.scala, JsonSerde.scala > > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. > > +*Scenario1: change foreignKey*+ > Input the following > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) > leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1")) > {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, 2){code} > > *+Actual result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, null) > KeyValue(pk1, 2){code} > > A null is propagated to the join result when the foreign key changes > > +*Scenario 2: Delete PrimaryKey*+ > Input > {code:scala} > rightTopic.pipeInput("fk1", "1") > rightTopic.pipeInput("fk2", "2") > leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1")) > leftTopic.pipeInput("pk1", null) {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1) > KeyValue(pk1, null) {code} > > *+Actual result+* > {code:java} > KeyValue(pk1, 1) > KeyValue(pk1, null) > KeyValue(pk1, null) {code} > An additional null is propagated to the join result. > > This bug doesn't exist on versions 3.6.0 and below. > > I believe the issue comes from the line > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134] > where we propagate the deletion in the two scenarios above > > Attaching the topology I used. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18713) Kafka Streams Left-Join not always emitting the last value
[ https://issues.apache.org/jira/browse/KAFKA-18713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-18713: Due Date: (was: 3/Feb/25) > Kafka Streams Left-Join not always emitting the last value > -- > > Key: KAFKA-18713 > URL: https://issues.apache.org/jira/browse/KAFKA-18713 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: tuna b >Assignee: Nil Madhab >Priority: Major > Fix For: 3.9.1 > > Attachments: Screenshot 2025-02-10 at 12.06.38.png > > > There seems to be an issue when performing a left-join, the latest value with > of the join does not contain the value of right side. > {code:java} > var builder = new StreamsBuilder(); > KTable departments = builder > .table("departments", > Materialized. Department>as(Stores.persistentKeyValueStore("departments")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Department.class))); > KTable persons = builder > .table("persons", > Materialized. Person>as(Stores.persistentKeyValueStore("persons")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Person.class))); > KTable joined = persons > .leftJoin(departments, Person::getDepartmentId, (person, department) -> > person.toBuilder() > .department(department) > .build(), > TableJoined.as("my-joiner"), > Materialized. Person>as(Stores.persistentKeyValueStore("joined-results")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Person.class))); > joined > .toStream() > .to("joined-results", Produced.with(Serdes.String(), > CustomJsonSerde.json(Person.class))); {code} > How to reproduce: > Create two topics persons and departments, each with 10 partitions. > Pre-populate the departments topic with 2 departments. > > Observation: * When i initially produce a Person {{p-1}} with a FK > {{{}dep-1{}}}, the join works . > * > ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} > * When i change the FK to {{{}dep-2{}}}, the join updates . > ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}} > * When i change the FK back to {{{}dep-1{}}}, the join fails . > ** output is an EnrichedResult with person {{p-1}} *but no department* > * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), > the join works again . > ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} > Also, even when you are not setting back to a previous FK, there can still be > an issue with the left join. Changing an FK means insert + delete operations, > but sometimes the result of the delete is emitted after the result of the > insert. > How to reproduce: > Create a departments topic and pre-populate it with 5 departments (dep-1 to > dep-5). Create a persons topic and create person p-1 with FK dep-1. Send an > update to the persons topic by changing the FK to dep-2 and repeat this step > until dep-5. Now you will see that the latest emitted value of the person > does not contain a department. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
apoorvmittal10 commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2014733112 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions +) { +lock.writeLock().lock(); +try { +if (abortedTransactions.isEmpty()) +return acquiredRecords; +// The record batches that need to be archived in cachedState because they were a part of aborted transactions. +List recordsToArchive = fetchAbortedTransactionRecordBatches(batches, abortedTransactions); +for (RecordBatch recordBatch : recordsToArchive) { +// Archive the offsets/batches in the cached state. +NavigableMap subMap = fetchSubMap(recordBatch); +archiveAcquiredBatchRecords(subMap, recordBatch); +} +return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); +} finally { +lock.writeLock().unlock(); +} +} + +// Visible for testing. +List filterRecordBatchesFromAcquiredRecords( +List acquiredRecords, +List recordsToArchive +) { +lock.writeLock().lock(); +try { +List result = new ArrayList<>(); + +for (AcquiredRecords acquiredRecord : acquiredRecords) { +List tempAcquiredRecords = new ArrayList<>(); +tempAcquiredRecords.add(acquiredRecord); +for (RecordBatch recordBatch : recordsToArchive) { +List newAcquiredRecords = new ArrayList<>(); +for (AcquiredRecords temp : tempAcquiredRecords) { +// Check if record batch overlaps with the acquired records. +if (temp.firstOffset() <= recordBatch.lastOffset() && temp.lastOffset() >= recordBatch.baseOffset()) { +// Split the acquired record into parts before, inside, and after the overlapping record batch. +if (temp.firstOffset() < recordBatch.baseOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(temp.firstOffset()) +.setLastOffset(recordBatch.baseOffset() - 1) +.setDeliveryCount((short) 1)); +} +if (temp.lastOffset() > recordBatch.lastOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(recordBatch.lastOffset() + 1) +.setLastOffset(temp.lastOffset()) +.setDeliveryCount((short) 1)); +} +} else { +newAcquiredRecords.add(temp); +} +} +tempAcquiredRecords = newAcquiredRecords; +} +result.addAll(tempAcquiredRecords); +} +return result; +} finally { +lock.writeLock().unlock(); +} +} + +private void archiveAcquiredBatchRecords(NavigableMap subMap, RecordBatch recordBatch) { +lock.writeLock().lock(); +try { +// The fetched batch either is exact fetch equivalent batch (mostly), subset +// or spans over multiple fetched batches. The state can vary per offset itself from +// the fetched batch in case of subset. +for (Map.Entry entry : subMap.entrySet()) { +InFlightBatch inFlightBatch = entry.getValue(); + +// If startOffse
Re: [PR] KAFKA-12744: dependency upgrade: `argparse4j` 0.7.0 -->> 0.9.0 [kafka]
dejan2609 commented on PR #19265: URL: https://github.com/apache/kafka/pull/19265#issuecomment-2755363293 So, herewith alternatives: 1. relax Checkstyle rules via exception (i.e. don't fail the build for `ArgumentParsers.newArgumentParser` method deprecation warning mentioned above) **_OR_** 2. use my old PR [#10626](https://github.com/apache/kafka/pull/10626) and replace method (`ArgumentParsers.newArgumentParser` with `ArgumentParsers.newFor`) as mentioned [here](https://argparse4j.github.io/migration.html#to-0-8-0) @mimaison LMK which option sounds better for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18899: Improve handling of timeouts for commitAsync() in ShareConsumer. [kafka]
AndrewJSchofield merged PR #19192: URL: https://github.com/apache/kafka/pull/19192 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]
dajac merged PR #18695: URL: https://github.com/apache/kafka/pull/18695 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-6333 java.awt.headless should not be on commandline [kafka]
mimaison commented on code in PR #19237: URL: https://github.com/apache/kafka/pull/19237#discussion_r2013902994 ## core/src/main/scala/kafka/Kafka.scala: ## @@ -73,7 +73,7 @@ object Kafka extends Logging { try { val serverProps = getPropsFromArgs(args) val server = buildServer(serverProps) - + System.setProperty("java.awt.headless", "true") Review Comment: I'm not the creator of the JIRa but I think the point was to completely remove this system property because Kafka does not use java.awt at all, so it does not need to run in headless mode. ## core/src/main/scala/kafka/Kafka.scala: ## @@ -73,7 +73,7 @@ object Kafka extends Logging { try { val serverProps = getPropsFromArgs(args) val server = buildServer(serverProps) - + System.setProperty("java.awt.headless", "true") Review Comment: I'm not the creator of the JIRA but I think the point was to completely remove this system property because Kafka does not use java.awt at all, so it does not need to run in headless mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Write log differently according to the size of missingListenerPartitions [kafka]
HyunSangHan commented on PR #11418: URL: https://github.com/apache/kafka/pull/11418#issuecomment-2754340862 I am still here! ✋ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16894: Define features to enable share groups [kafka]
dajac commented on PR #19293: URL: https://github.com/apache/kafka/pull/19293#issuecomment-2755031196 @AndrewJSchofield I would like to better understand how `group.version=2` will be used. Could you elaborate a bit more on what we will gate with it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
TaiJuWu commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9 is the commit I try moving `configure` to constructor but I finally gave it up. The most important consideration is `RemoteLogMetadataManager` and `RemoteStorageManager` are public plugin and we should not change this flow without any KIP If I misunderstood, please correct me. I will continue to work after getting other feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16894: Define features to enable share groups [kafka]
AndrewJSchofield opened a new pull request, #19293: URL: https://github.com/apache/kafka/pull/19293 This PR proposes two switches to enable share groups for 4.1 (preview) and 4.2 (GA). * `group.version=2` to indicate that the group coordinator is able to understand share group records. * `share.version=1` to indicate that share groups are enabled. This is used as the switch for turning share groups on and off. In 4.1, the defaults will be `group.version=2` and `share.version=0`. Then a user wanting to evaluate the preview of KIP-932 would use `bin/kafka-features.sh --bootstrap.server upgrade --feature share.version=1`. In 4.2, the default will be `share.version=1`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Mark streams RPCs as unstable [kafka]
lucasbru opened a new pull request, #19292: URL: https://github.com/apache/kafka/pull/19292 Streams groups RPCs are not enabled by default, but they should also be marked as unstable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16894) Define switches to enable share groups for preview
[ https://issues.apache.org/jira/browse/KAFKA-16894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield updated KAFKA-16894: - Description: Create group.version=2 and share.version=1 as the switches to enable share groups in the broker. (was: Create group.version=2 as the switch to enable share groups in the broker.) > Define switches to enable share groups for preview > -- > > Key: KAFKA-16894 > URL: https://issues.apache.org/jira/browse/KAFKA-16894 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > > Create group.version=2 and share.version=1 as the switches to enable share > groups in the broker. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR opened a new pull request, #19294: URL: https://github.com/apache/kafka/pull/19294 *What* Currently if we received just a control record in the `ShareFetchResponse`, then the currentFetch in `ShareConsumerImpl` would not be updated as the record is ignored. But in the process, we lose the acknowledgment for this control record which is GAP for control records. PR fixes this by adding a check to include the pending acknowledgements even when the `ShareFetch` is empty. Added a unit test which verifies this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16407) ForeignKey INNER join ignores FK change when its previous value is null
[ https://issues.apache.org/jira/browse/KAFKA-16407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16407: Fix Version/s: 3.9.1 > ForeignKey INNER join ignores FK change when its previous value is null > --- > > Key: KAFKA-16407 > URL: https://issues.apache.org/jira/browse/KAFKA-16407 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.2, 3.0.2, 3.1.2, 3.2.3, 3.3.2, 3.4.1, 3.5.2, 3.7.0, > 3.6.1 >Reporter: Ayoub Omari >Assignee: Ayoub Omari >Priority: Major > Fix For: 3.9.1 > > Attachments: InnerFKJoinTest.scala, JsonSerde.scala > > > We have two topics : _left-topic[String, LeftRecord]_ and > _right-topic[String, String]_ > where _LeftRecord_ : > {code:scala} > case class LeftRecord(foreignKey: String, name: String){code} > we do a simple *INNER* foreign key join on left-topic's foreignKey field. The > resulting join value is the value in right-topic. > > *Scenario: Primary key pk1 gets mapped to a new FK after having a null FK* > {code:scala} > rightTopic.pipeInput("fk", "1") > leftTopic.pipeInput("pk1", LeftRecord(null, "pk1")) > leftTopic.pipeInput("pk1", LeftRecord("fk", "pk1")) {code} > > *+Expected result+* > {code:scala} > KeyValue(pk1, 1){code} > > *+Actual result+* > {code:scala} > # No output ! > # Logs: > 20:14:29,723 WARN > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier > - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] > topic=[left-topic] partition=[0] offset=[0] > 20:14:29,728 WARN > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier > - Skipping record due to null foreign key. value=[LeftRecord(null,pk1)] > topic=[left-topic] partition=[0] offset=[1] > {code} > > After looking into the code, I believe this is the line behind the issue : > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L147 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18935: Ensure brokers do not return null records in FetchResponse [kafka]
junrao commented on code in PR #19167: URL: https://github.com/apache/kafka/pull/19167#discussion_r2015116293 ## clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java: ## @@ -89,7 +89,7 @@ public FetchResponseData data() { */ public FetchResponse(FetchResponseData fetchResponseData) { super(ApiKeys.FETCH); -this.data = fetchResponseData; +this.data = convertNullRecordsToEmpty(fetchResponseData); Review Comment: This code is called by both the server and the client. On the server side, it makes sense for the server to set records to empty. On the client side, it seems that it's better to take whatever it receives from the broker, instead of changing it silently. Maybe we can consolidate the server side usage based on FetchResponse.of() and the client side usage based on FetchResponse.parse(), and avoid the direct usage of this constructor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add log4j2.yaml to clients-integration-tests module [kafka]
m1a2st commented on code in PR #19252: URL: https://github.com/apache/kafka/pull/19252#discussion_r2007875519 ## clients/clients-integration-tests/src/test/resources/log4j2.yaml: ## @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Configuration: + Properties: +Property: + - name: "logPattern" +value: "[%d] %p %m (%c:%L)%n" + + Appenders: +Console: + name: STDOUT + PatternLayout: +pattern: "${logPattern}" + + Loggers: Review Comment: Sure, addressed it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]
ableegoldman commented on code in PR #19269: URL: https://github.com/apache/kafka/pull/19269#discussion_r2015005410 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java: ## @@ -70,6 +70,7 @@ public class StreamsProducer { private Producer producer; private boolean transactionInFlight = false; private boolean transactionInitialized = false; +private boolean allowReset = true; Review Comment: it's a bit confusing that we flip-flop back and forth between "disableReset" and "allowReset", I'd recommend picking one (probably `disableReset`) for all the method and variable names ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java: ## @@ -324,6 +329,22 @@ void close() { transactionInitialized = false; } +/** + * Disables producer reset to prevent producer recreation during shutdown. + * + * When disabled, subsequent calls to reInitializeProducer() will not recreate + * the producer instance, avoiding resource leak. + * + * + * This method should only be invoked when the {@link org.apache.kafka.streams.processor.internals.ActiveTaskCreator} Review Comment: nit: I'd say when the "StreamThread" is shutting down, rather than the ActiveTaskCreator Effectively it's the same thing at the moment but you never know what refactoring may bring, and the important thing here is that the StreamThread itself is shutting down -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`
[ https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938778#comment-17938778 ] Lan Ding commented on KAFKA-19024: -- Hi [~schofielaj], If this ticket is still open, may I take it over? > Enhance the client behaviour when it tries to exceed the > `group.share.max.groups` > - > > Key: KAFKA-19024 > URL: https://issues.apache.org/jira/browse/KAFKA-19024 > Project: Kafka > Issue Type: Sub-task >Reporter: Sanskar Jhajharia >Assignee: Andrew Schofield >Priority: Minor > > For share groups we use the `group.share.max.groups` config to define the > number of max share groups we allow. However, when we exceed the same, the > client logs do not specify any such error and simply do not consume. The > group doesn't get created but the client continues to send Heartbeats hoping > for one of the existing groups to shut down and allowing it to form a group. > Having a log or an exception in the client logs will help them debug such > situations accurately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Allow re-registration for fenced / cleanly shutdown brokers [kafka]
ahuang98 opened a new pull request, #19296: URL: https://github.com/apache/kafka/pull/19296 Will create a jira and add the id in a bit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow re-registration for fenced / cleanly shutdown brokers [kafka]
ahuang98 commented on code in PR #19296: URL: https://github.com/apache/kafka/pull/19296#discussion_r2015416417 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -353,7 +353,7 @@ public ControllerResult registerBroker( if (existing != null) { prevIncarnationId = existing.incarnationId(); storedBrokerEpoch = existing.epoch(); -if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) { +if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) && heartbeatManager.isBrokerActive(brokerId)) { Review Comment: thanks 😅 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17541) Improve handling of delivery count
[ https://issues.apache.org/jira/browse/KAFKA-17541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938772#comment-17938772 ] Lan Ding commented on KAFKA-17541: -- Hi [~schofielaj] , If this ticket is still open, may I take it over? > Improve handling of delivery count > -- > > Key: KAFKA-17541 > URL: https://issues.apache.org/jira/browse/KAFKA-17541 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > > There are two situations in which the delivery count handling needs to be > more intelligent. > First, for records which are automatically released as a result of closing a > share session normally, the delivery count should not be incremented. These > records were fetched but they were not actually delivered to the client since > the disposition of the delivery records is carried in the ShareAcknowledge > which closes the share session. Any remaining records were not delivered, > only fetched. > Second, for records which have a delivery count which is more than 1 or 2, > there is a suspicion that the records are not being delivered due to a > problem rather than just natural retrying. The batching of these records > should be reduced, even down to a single record as a time so we do not have > the failure to deliver a poisoned record actually causing adjacent records to > be considered unsuccessful and potentially reach the delivery count limit > without proper reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.group`s
Sanskar Jhajharia created KAFKA-19024: - Summary: Enhance the client behaviour when it tries to exceed the `group.share.max.group`s Key: KAFKA-19024 URL: https://issues.apache.org/jira/browse/KAFKA-19024 Project: Kafka Issue Type: Sub-task Reporter: Sanskar Jhajharia Assignee: Andrew Schofield For share groups we use the `group.share.max.groups` config to define the number of max share groups we allow. However, when we exceed the same, the client logs do not specify any such error and simply do not consume. The group doesn't get created but the client continues to send Heartbeats hoping for one of the existing groups to shut down and allowing it to form a group. Having a log or an exception in the client logs will help them debug such situations accurately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]
lianetm commented on code in PR #19281: URL: https://github.com/apache/kafka/pull/19281#discussion_r2014959782 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala: ## @@ -37,15 +37,17 @@ import java.lang.{Byte => JByte} import java.util.Collections import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1) +@ClusterTestDefaults( + types = Array(Type.KRAFT), + brokers = 1, + serverProperties = Array( +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) +) Review Comment: should we also add `@Tag("integration")` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow re-registration for fenced / cleanly shutdown brokers [kafka]
splett2 commented on code in PR #19296: URL: https://github.com/apache/kafka/pull/19296#discussion_r2015175160 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -353,7 +353,7 @@ public ControllerResult registerBroker( if (existing != null) { prevIncarnationId = existing.incarnationId(); storedBrokerEpoch = existing.epoch(); -if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) { +if (heartbeatManager.hasValidSession(brokerId, existing.epoch()) && heartbeatManager.isBrokerActive(brokerId)) { Review Comment: could we just check if whether `existing` is fenced or in controlled shutdown? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
mjsax commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2015238843 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) { () -> new Processor<>() { private KeyValueStore store; +// The store iterator is intentionally not closed here as it needs +// to be open during the test, so the Streams app will emit the +// org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric +// that is expected. So the globalStoreIterator is a global variable +// (pun not intended), so it can be closed in the tearDown method. @Override public void init(final ProcessorContext context) { store = context.getStateStore("iq-test-store"); +globalStoreIterator = store.all(); Review Comment: One-liner? `globalStoreIterator = context.getStateStore("iq-test-store").all()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [WIP] KAFKA-14830: Illegal state error in transactional producer [kafka]
kirktrue commented on PR #17022: URL: https://github.com/apache/kafka/pull/17022#issuecomment-2755596812 cc @k-raina -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only
[ https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938708#comment-17938708 ] Akshesh Doshi commented on KAFKA-3370: -- Hi We had an issue with our Kafka consumers recently, where the `nearest.offset.reset` feature suggested by Tayida would have been useful. Is this something we are expecting to have soon? > Add options to auto.offset.reset to reset offsets upon initialization only > -- > > Key: KAFKA-3370 > URL: https://issues.apache.org/jira/browse/KAFKA-3370 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > > Currently "auto.offset.reset" is applied in the following two cases: > 1) upon starting the consumer for the first time (hence no committed offsets > before); > 2) upon fetching offsets out-of-range. > For scenarios where case 2) needs to be avoid (i.e. people need to be > notified upon offsets out-of-range rather than silently offset reset), > "auto.offset.reset" need to be set to "none". However for case 1) setting > "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon > polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied > trying to set the offset at initialization, which are actually designed for > during the life time of the consumer (in rebalance callback, for example). > The fix proposal is to add two more options to "auto.offset.reset", > "earliest-on-start", and "latest-on-start", whose semantics are "earliest" > and "latest" for case 1) only, and "none" for case 2). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [WIP] KAFKA-14830: Illegal state error in transactional producer [kafka]
kirktrue commented on code in PR #17022: URL: https://github.com/apache/kafka/pull/17022#discussion_r2014904591 ## clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java: ## @@ -667,14 +667,23 @@ public synchronized void maybeTransitionToErrorState(RuntimeException exception) } synchronized void handleFailedBatch(ProducerBatch batch, RuntimeException exception, boolean adjustSequenceNumbers) { -maybeTransitionToErrorState(exception); +boolean isStaleBatch = batch.producerId() == producerIdAndEpoch.producerId && batch.producerEpoch() < producerIdAndEpoch.epoch; Review Comment: Thanks for the feedback @jolshan! > I'm wondering if there are any cases where producerIdAndEpoch could have a race -- or is case there the ID and epoch are the same but the issue still happens There are a couple of bug reports with logs. I'll dig through those to see if it's happened in the wild. > btw -- maybe not super common, but could the overflow case be missed here? (new producer id and epoch resets due to epoch reaching max value) Sounds super rare ;) If an epoch overflowed, wouldn't that just be interpreted as 'not equal' to the last known epoch, and thus trigger the "stale batch" logic? Perhaps my understanding of staleness is too naive? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18827: Initialize share group state group coordinator impl. [3/N] [kafka]
AndrewJSchofield merged PR #19026: URL: https://github.com/apache/kafka/pull/19026 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Use readable interface to parse requests [kafka]
jsancio merged PR #19163: URL: https://github.com/apache/kafka/pull/19163 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
mjsax commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2015232961 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) { () -> new Processor<>() { private KeyValueStore store; +// The store iterator is intentionally not closed here as it needs +// to be open during the test, so the Streams app will emit the +// org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric +// that is expected. So the globalStoreIterator is a global variable +// (pun not intended), so it can be closed in the tearDown method. @Override public void init(final ProcessorContext context) { store = context.getStateStore("iq-test-store"); +globalStoreIterator = store.all(); } @Override public void process(final Record record) { store.put(record.key(), record.value()); Review Comment: Seem we don't really need this? We never pipe any data anyway? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
mjsax commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2015235819 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) { () -> new Processor<>() { private KeyValueStore store; Review Comment: Seems we don't need this as a variable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
mjsax commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2015231173 ## streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java: ## @@ -60,7 +60,8 @@ public void metricChange(final KafkaMetric metric) { private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric metric) { final Map tags = metric.metricName().tags(); -final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId)); +final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && (tags.get(THREAD_ID_TAG).equals(threadId) || + Optional.of(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId)); Review Comment: Should this be `Optional.ofNullable(tags.get(THREAD_ID_TAG))` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18713) Kafka Streams Left-Join not always emitting the last value
[ https://issues.apache.org/jira/browse/KAFKA-18713?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-18713: Fix Version/s: 3.9.1 > Kafka Streams Left-Join not always emitting the last value > -- > > Key: KAFKA-18713 > URL: https://issues.apache.org/jira/browse/KAFKA-18713 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: tuna b >Assignee: Nil Madhab >Priority: Major > Fix For: 3.9.1 > > Attachments: Screenshot 2025-02-10 at 12.06.38.png > > > There seems to be an issue when performing a left-join, the latest value with > of the join does not contain the value of right side. > {code:java} > var builder = new StreamsBuilder(); > KTable departments = builder > .table("departments", > Materialized. Department>as(Stores.persistentKeyValueStore("departments")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Department.class))); > KTable persons = builder > .table("persons", > Materialized. Person>as(Stores.persistentKeyValueStore("persons")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Person.class))); > KTable joined = persons > .leftJoin(departments, Person::getDepartmentId, (person, department) -> > person.toBuilder() > .department(department) > .build(), > TableJoined.as("my-joiner"), > Materialized. Person>as(Stores.persistentKeyValueStore("joined-results")) > .withKeySerde(Serdes.String()) > .withValueSerde(CustomJsonSerde.json(Person.class))); > joined > .toStream() > .to("joined-results", Produced.with(Serdes.String(), > CustomJsonSerde.json(Person.class))); {code} > How to reproduce: > Create two topics persons and departments, each with 10 partitions. > Pre-populate the departments topic with 2 departments. > > Observation: * When i initially produce a Person {{p-1}} with a FK > {{{}dep-1{}}}, the join works . > * > ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} > * When i change the FK to {{{}dep-2{}}}, the join updates . > ** output is an EnrichedResult with person {{p-1 }}and department {{dep-2}} > * When i change the FK back to {{{}dep-1{}}}, the join fails . > ** output is an EnrichedResult with person {{p-1}} *but no department* > * However, if I reproduce the same event ({{{}p-1{}}} with {{{}dep-1{}}}), > the join works again . > ** output is an EnrichedResult with person {{p-1}} and department {{dep-1}} > Also, even when you are not setting back to a previous FK, there can still be > an issue with the left join. Changing an FK means insert + delete operations, > but sometimes the result of the delete is emitted after the result of the > insert. > How to reproduce: > Create a departments topic and pre-populate it with 5 departments (dep-1 to > dep-5). Create a persons topic and create person p-1 with FK dep-1. Send an > update to the persons topic by changing the FK to dep-2 and repeat this step > until dep-5. Now you will see that the latest emitted value of the person > does not contain a department. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19012) Messages ending up on the wrong topic
[ https://issues.apache.org/jira/browse/KAFKA-19012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938749#comment-17938749 ] Artem Livshits commented on KAFKA-19012: [~dnadolny] a few questions: * what are the versions of the clients that don't have this problem? * when you see misrouted messages, are they all in the same batch (i.e. the whole batch misrouted) or a batch contains both correct and misrouted messages? * when you see bursts, are they always between same topics or different messages in the same burst are misrouted between different topics? * could you check that you don't have "interceptor.classes" config defined in any of the clients? * any chance you can share the custom partitioner code? > Messages ending up on the wrong topic > - > > Key: KAFKA-19012 > URL: https://issues.apache.org/jira/browse/KAFKA-19012 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.2.3, 3.8.1 >Reporter: Donny Nadolny >Priority: Major > > We're experiencing messages very occasionally ending up on a different topic > than what they were published to. That is, we publish a message to topicA and > consumers of topicB see it and fail to parse it because the message contents > are meant for topicA. This has happened for various topics. > We've begun adding a header with the intended topic (which we get just by > reading the topic from the record that we're about to pass to the OSS client) > right before we call producer.send, this header shows the correct topic > (which also matches up with the message contents itself). Similarly we're > able to use this header and compare it to the actual topic to prevent > consuming these misrouted messages, but this is still concerning. > Some details: > - This happens rarely: it happened approximately once per 10 trillion > messages for a few months, though there was a period of a week or so where it > happened more frequently (once per 1 trillion messages or so) > - It often happens in a small burst, eg 2 or 3 messages very close in time > (but from different hosts) will be misrouted > - It often but not always coincides with some sort of event in the cluster > (a broker restarting or being replaced, network issues causing errors, etc). > Also these cluster events happen quite often with no misrouted messages > - We run many clusters, it has happened for several of them > - There is no pattern between intended and actual topic, other than the > intended topic tends to be higher volume ones (but I'd attribute that to > there being more messages published -> more occurrences affecting it rather > than it being more likely per-message) > - It only occurs with clients that are using a non-zero linger > - Once it happened with two sequential messages, both were intended for > topicA but both ended up on topicB, published by the same host (presumably > within the same linger batch) > - Most of our clients are 3.2.3 and it has only affected those, most of our > brokers are 3.2.3 but it has also happened with a cluster that's running > 3.8.1 (but I suspect a client rather than broker problem because of it never > happening with clients that use 0 linger) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error
[ https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938715#comment-17938715 ] Ranganath Samudrala commented on KAFKA-19022: - Agree that [FetchResponse.json|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/clients/src/main/resources/common/message/FetchResponse.json] does not have place to store error message as it exists in [AddRaftVoterResponse.json|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/clients/src/main/resources/common/message/AddRaftVoterResponse.json] I am not sure when the improvement will be implemented, if it will be implmented at all. Does this require for us to wait for a major release? Now, I am wondering how to find the location of the problem in my set up? > Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID > error > -- > > Key: KAFKA-19022 > URL: https://issues.apache.org/jira/browse/KAFKA-19022 > Project: Kafka > Issue Type: Improvement > Components: kraft, logging >Affects Versions: 3.9.0 >Reporter: Ranganath Samudrala >Assignee: Lorcan >Priority: Major > > While migrating Kafka from zookeeper to kraft, we see errors in logs like > {{INCONSISTENT_CLUSTER_ID in FETCH response }} > or > {{INCONSISTENT_CLUSTER_ID in VOTER response }} > But cluster IDs being compared is not displayed in logs so there is not > enough information to see where the issue is. Is the class data *clusterId* > empty (which could potentially be a bug?) or incoming *clusterId* empty or > incorrect? > [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459] > {code:java} > private boolean hasValidClusterId(String requestClusterId) { > // We don't enforce the cluster id if it is not provided. > if (requestClusterId == null)Unknown macro: { > return true; > } > return clusterId.equals(requestClusterId); > } > . > . > private CompletableFuture handleFetchRequest( > RaftRequest.Inbound requestMetadata, > long currentTimeMs > ) { > FetchRequestData request = (FetchRequestData) requestMetadata.data(); > if (!hasValidClusterId(request.clusterId())) { > return completedFuture(new > FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); > } > . > . > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
Alyssa Huang created KAFKA-19047: Summary: Broker registrations are slow if previously fenced or shutdown Key: KAFKA-19047 URL: https://issues.apache.org/jira/browse/KAFKA-19047 Project: Kafka Issue Type: Improvement Reporter: Alyssa Huang BrokerLifecycleManager prevents registration of a broker w/ an id it has seen before with a different incarnation id if the broker session expires. On clean shutdown and restart of a broker this can cause an unnecessary delay in re-registration while the quorum controller waits for the session to expire. ``` [BrokerLifecycleManager id=1] Unable to register broker 1 because the controller returned error DUPLICATE_BROKER_REGISTRATION ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]
frankvicky commented on code in PR #19269: URL: https://github.com/apache/kafka/pull/19269#discussion_r2015413116 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java: ## @@ -324,6 +329,22 @@ void close() { transactionInitialized = false; } +/** + * Disables producer reset to prevent producer recreation during shutdown. + * + * When disabled, subsequent calls to reInitializeProducer() will not recreate + * the producer instance, avoiding resource leak. + * + * + * This method should only be invoked when the {@link org.apache.kafka.streams.processor.internals.ActiveTaskCreator} Review Comment: You are right. I will clarify it in the next commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR update the README docker image version [kafka]
m1a2st commented on PR #19278: URL: https://github.com/apache/kafka/pull/19278#issuecomment-2753763363 Hello @frankvicky, I consider the README similar to Kafka's "Quick Start" section, so I think we should focus on helping users start quickly. Using `latest` should be good enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17645: KIP-1052: Enable warmup in producer performance test [kafka]
matt-welch commented on code in PR #17340: URL: https://github.com/apache/kafka/pull/17340#discussion_r2010980706 ## tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java: ## @@ -94,7 +101,19 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); -cb = new PerfCallback(sendStartMs, payload.length, stats); Review Comment: Thanks for the suggestion! I think the refactored conditional is much easier to understand. Hopefully nobody minds the passing of null steadyStateStats. I've refactored this in my latest commit. ## tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java: ## @@ -75,7 +76,13 @@ void start(String[] args) throws IOException { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; -stats = new Stats(config.numRecords, 5000); + +System.out.println("DEBUG: config.warmupRecords=" + config.warmupRecords + ", (config.warmupRecords > 0)=" + (config.warmupRecords > 0)); Review Comment: That's correct. Chia-Ping had asked for some clarification around config.warmupRecords. Once he's satisfied with that output, the line will be removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]
dajac commented on PR #19290: URL: https://github.com/apache/kafka/pull/19290#issuecomment-2754294910 I still need to add a unit tests for the new method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Refactor GroupCoordinator write path [kafka]
dajac opened a new pull request, #19290: URL: https://github.com/apache/kafka/pull/19290 This patch addresses a weirdness on the GroupCoordinator write path. The `CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords` method with `acks=1` and it expects it to completes immediately/synchronously. It works because this is effectively what the method does with `acks=1`. The issue is that fundamentally the method is asynchronous so the contract is really fragile. This patch changes it by introducing new method `ReplicaManager.appendRecordsToLeader`, which is synchronous. It also refactors `ReplicaManager#appendRecords` to use `ReplicaManager.appendRecordsToLeader` so we can benefits from all the existing tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19046) Change delete cleanup policy to compact cleanup policy
George Yang created KAFKA-19046: --- Summary: Change delete cleanup policy to compact cleanup policy Key: KAFKA-19046 URL: https://issues.apache.org/jira/browse/KAFKA-19046 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.7.1 Environment: Kafka version: 3.7 mirrormaker2 version: 3.7.1 zk version: 3.6.8 Reporter: George Yang The internal topics of MirrorMaker 2 (MM2) sometimes report the following error: ` Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389) org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) [2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:208) ` This results in the MM2 pods in the cluster entering a CrashLoopBackOff state repeatedly. When changing the configuration via kafka-configs.sh, the process runs fine. However, as we know, the default Kafka broker configuration for log.cleanup.policy is set to delete, while the default cleanup policy for MM2 is set to compact. It appears that the policy for offset.storage.topic must be compact, and similarly for status.storage and config.storage. I want to configure the cleanup policy for these three topics to always be compact. I attempted to configure them in connect-mirror-maker.properties as shown below, but all attempts failed: ` offset.storage.topic.properties.cleanup.policy=compact status.storage.topic.properties.cleanup.policy=compact config.storage.topic.properties.cleanup.policy=compact ` or ` offset.storage.topic.cleanup.policy=compact status.storage.topic.cleanup.policy=compact config.storage.topic.cleanup.policy=compact ` The logs show that the properties are unknown and report a failure in topic creation: ` Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1] org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.cb.internal': Unknown topic config name: topic.properties.cleanup.policy at org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474) at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345) at org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapte
[jira] [Resolved] (KAFKA-18899) Limit retry time for ShareConsumer.commitAsync
[ https://issues.apache.org/jira/browse/KAFKA-18899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield resolved KAFKA-18899. -- Resolution: Fixed > Limit retry time for ShareConsumer.commitAsync > -- > > Key: KAFKA-18899 > URL: https://issues.apache.org/jira/browse/KAFKA-18899 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Shivsundar R >Priority: Major > Fix For: 4.1.0 > > > Currently, `ShareConsumer.commitAsync` could in theory retry forever. It > should be bounded by the default API timeout. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013699771 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions +) { +lock.writeLock().lock(); +try { +if (abortedTransactions.isEmpty()) +return acquiredRecords; +// The record batches that need to be archived in cachedState because they were a part of aborted transactions. +List recordsToArchive = fetchAbortedTransactionRecordBatches(batches, abortedTransactions); +for (RecordBatch recordBatch : recordsToArchive) { +// Archive the offsets/batches in the cached state. +NavigableMap subMap = fetchSubMap(recordBatch); +archiveAcquiredBatchRecords(subMap, recordBatch); +} +return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); +} finally { +lock.writeLock().unlock(); +} +} + +// Visible for testing. +List filterRecordBatchesFromAcquiredRecords( +List acquiredRecords, +List recordsToArchive +) { +lock.writeLock().lock(); +try { +List result = new ArrayList<>(); + +for (AcquiredRecords acquiredRecord : acquiredRecords) { +List tempAcquiredRecords = new ArrayList<>(); +tempAcquiredRecords.add(acquiredRecord); +for (RecordBatch recordBatch : recordsToArchive) { +List newAcquiredRecords = new ArrayList<>(); +for (AcquiredRecords temp : tempAcquiredRecords) { +// Check if record batch overlaps with the acquired records. +if (temp.firstOffset() <= recordBatch.lastOffset() && temp.lastOffset() >= recordBatch.baseOffset()) { +// Split the acquired record into parts before, inside, and after the overlapping record batch. +if (temp.firstOffset() < recordBatch.baseOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(temp.firstOffset()) +.setLastOffset(recordBatch.baseOffset() - 1) +.setDeliveryCount((short) 1)); +} +if (temp.lastOffset() > recordBatch.lastOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(recordBatch.lastOffset() + 1) +.setLastOffset(temp.lastOffset()) +.setDeliveryCount((short) 1)); +} +} else { +newAcquiredRecords.add(temp); +} +} +tempAcquiredRecords = newAcquiredRecords; +} +result.addAll(tempAcquiredRecords); +} +return result; +} finally { +lock.writeLock().unlock(); +} +} + +private void archiveAcquiredBatchRecords(NavigableMap subMap, RecordBatch recordBatch) { +lock.writeLock().lock(); +try { +// The fetched batch either is exact fetch equivalent batch (mostly), subset +// or spans over multiple fetched batches. The state can vary per offset itself from +// the fetched batch in case of subset. +for (Map.Entry entry : subMap.entrySet()) { +InFlightBatch inFlightBatch = entry.getValue(); + +// If startOffs
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013704465 ## server-common/src/main/java/org/apache/kafka/server/storage/log/FetchIsolation.java: ## @@ -25,11 +25,11 @@ public enum FetchIsolation { TXN_COMMITTED; public static FetchIsolation of(FetchRequest request) { -return of(request.replicaId(), request.isolationLevel()); +return of(request.replicaId(), request.isolationLevel(), false); } -public static FetchIsolation of(int replicaId, IsolationLevel isolationLevel) { -if (!FetchRequest.isConsumer(replicaId)) { +public static FetchIsolation of(int replicaId, IsolationLevel isolationLevel, boolean isShareFetchRequest) { +if (!FetchRequest.isConsumer(replicaId) && !isShareFetchRequest) { Review Comment: I think if I just use `replicaId` as -1, that also works. But I wasn't too sure if that's the right approach. Hence, opted for adding a new variable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error
[ https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938509#comment-17938509 ] Lorcan commented on KAFKA-19022: Upon further investigation and reading the file FetchResponse.json, which is used to generate the code for FetchResponseData, it looks like a [Kafka Improvement Proposal|https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals#KafkaImprovementProposals-Whatisconsidereda%22majorchange%22thatneedsaKIP?] would be needed. > Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID > error > -- > > Key: KAFKA-19022 > URL: https://issues.apache.org/jira/browse/KAFKA-19022 > Project: Kafka > Issue Type: Improvement > Components: kraft, logging >Affects Versions: 3.9.0 >Reporter: Ranganath Samudrala >Assignee: Lorcan >Priority: Major > > While migrating Kafka from zookeeper to kraft, we see errors in logs like > {{INCONSISTENT_CLUSTER_ID in FETCH response }} > or > {{INCONSISTENT_CLUSTER_ID in VOTER response }} > But cluster IDs being compared is not displayed in logs so there is not > enough information to see where the issue is. Is the class data *clusterId* > empty (which could potentially be a bug?) or incoming *clusterId* empty or > incorrect? > [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459] > {code:java} > private boolean hasValidClusterId(String requestClusterId) { > // We don't enforce the cluster id if it is not provided. > if (requestClusterId == null)Unknown macro: { > return true; > } > return clusterId.equals(requestClusterId); > } > . > . > private CompletableFuture handleFetchRequest( > RaftRequest.Inbound requestMetadata, > long currentTimeMs > ) { > FetchRequestData request = (FetchRequestData) requestMetadata.data(); > if (!hasValidClusterId(request.clusterId())) { > return completedFuture(new > FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); > } > . > . > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013699195 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions +) { +lock.writeLock().lock(); +try { +if (abortedTransactions.isEmpty()) +return acquiredRecords; +// The record batches that need to be archived in cachedState because they were a part of aborted transactions. +List recordsToArchive = fetchAbortedTransactionRecordBatches(batches, abortedTransactions); +for (RecordBatch recordBatch : recordsToArchive) { +// Archive the offsets/batches in the cached state. +NavigableMap subMap = fetchSubMap(recordBatch); +archiveAcquiredBatchRecords(subMap, recordBatch); +} +return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); +} finally { +lock.writeLock().unlock(); +} +} + +// Visible for testing. +List filterRecordBatchesFromAcquiredRecords( +List acquiredRecords, +List recordsToArchive +) { +lock.writeLock().lock(); +try { +List result = new ArrayList<>(); + +for (AcquiredRecords acquiredRecord : acquiredRecords) { +List tempAcquiredRecords = new ArrayList<>(); +tempAcquiredRecords.add(acquiredRecord); +for (RecordBatch recordBatch : recordsToArchive) { +List newAcquiredRecords = new ArrayList<>(); +for (AcquiredRecords temp : tempAcquiredRecords) { +// Check if record batch overlaps with the acquired records. +if (temp.firstOffset() <= recordBatch.lastOffset() && temp.lastOffset() >= recordBatch.baseOffset()) { +// Split the acquired record into parts before, inside, and after the overlapping record batch. +if (temp.firstOffset() < recordBatch.baseOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(temp.firstOffset()) +.setLastOffset(recordBatch.baseOffset() - 1) +.setDeliveryCount((short) 1)); +} +if (temp.lastOffset() > recordBatch.lastOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(recordBatch.lastOffset() + 1) +.setLastOffset(temp.lastOffset()) +.setDeliveryCount((short) 1)); +} +} else { +newAcquiredRecords.add(temp); +} +} +tempAcquiredRecords = newAcquiredRecords; +} +result.addAll(tempAcquiredRecords); +} +return result; +} finally { +lock.writeLock().unlock(); +} +} + +private void archiveAcquiredBatchRecords(NavigableMap subMap, RecordBatch recordBatch) { +lock.writeLock().lock(); +try { +// The fetched batch either is exact fetch equivalent batch (mostly), subset +// or spans over multiple fetched batches. The state can vary per offset itself from +// the fetched batch in case of subset. +for (Map.Entry entry : subMap.entrySet()) { +InFlightBatch inFlightBatch = entry.getValue(); + +// If startOffs
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013997096 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions +) { +lock.writeLock().lock(); +try { +if (abortedTransactions.isEmpty()) +return acquiredRecords; +// The record batches that need to be archived in cachedState because they were a part of aborted transactions. +List recordsToArchive = fetchAbortedTransactionRecordBatches(batches, abortedTransactions); +for (RecordBatch recordBatch : recordsToArchive) { +// Archive the offsets/batches in the cached state. +NavigableMap subMap = fetchSubMap(recordBatch); +archiveAcquiredBatchRecords(subMap, recordBatch); +} +return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); +} finally { +lock.writeLock().unlock(); +} +} + +// Visible for testing. +List filterRecordBatchesFromAcquiredRecords( +List acquiredRecords, +List recordsToArchive +) { +lock.writeLock().lock(); +try { +List result = new ArrayList<>(); + +for (AcquiredRecords acquiredRecord : acquiredRecords) { +List tempAcquiredRecords = new ArrayList<>(); +tempAcquiredRecords.add(acquiredRecord); +for (RecordBatch recordBatch : recordsToArchive) { +List newAcquiredRecords = new ArrayList<>(); +for (AcquiredRecords temp : tempAcquiredRecords) { +// Check if record batch overlaps with the acquired records. +if (temp.firstOffset() <= recordBatch.lastOffset() && temp.lastOffset() >= recordBatch.baseOffset()) { +// Split the acquired record into parts before, inside, and after the overlapping record batch. +if (temp.firstOffset() < recordBatch.baseOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(temp.firstOffset()) +.setLastOffset(recordBatch.baseOffset() - 1) +.setDeliveryCount((short) 1)); +} +if (temp.lastOffset() > recordBatch.lastOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(recordBatch.lastOffset() + 1) +.setLastOffset(temp.lastOffset()) +.setDeliveryCount((short) 1)); +} +} else { +newAcquiredRecords.add(temp); +} +} +tempAcquiredRecords = newAcquiredRecords; +} +result.addAll(tempAcquiredRecords); +} +return result; +} finally { +lock.writeLock().unlock(); +} +} + +private void archiveAcquiredBatchRecords(NavigableMap subMap, RecordBatch recordBatch) { +lock.writeLock().lock(); +try { +// The fetched batch either is exact fetch equivalent batch (mostly), subset +// or spans over multiple fetched batches. The state can vary per offset itself from +// the fetched batch in case of subset. +for (Map.Entry entry : subMap.entrySet()) { +InFlightBatch inFlightBatch = entry.getValue(); + +// If startOffs
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
TaiJuWu commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9 is the commit I try moving `configure` to constructor but I finally gave it up. The most important consideration is `RemoteLogMetadataManager` and `RemoteStorageManager` are public plugin and we should not change this flow without any KIP. If I misunderstood, please correct me. I will wait other feedback and continue to work later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
TaiJuWu commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9 is the commit I try moving `configure` to constructor but I finally gave it up. The most important consideration is `RemoteLogMetadataManager` and `RemoteStorageManager` are public plugin and we should not change this flow without any KIP. If I misunderstood, please correct me. I will wait other feedback and continue to work later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7699) Improve wall-clock time punctuations
[ https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938578#comment-17938578 ] Herman Kolstad Jakobsen commented on KAFKA-7699: The KIP can be found at [KIP-1146: Anchored wall-clock punctuation - Apache Kafka - Apache Software Foundation|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+wall-clock+punctuation] > Improve wall-clock time punctuations > > > Key: KAFKA-7699 > URL: https://issues.apache.org/jira/browse/KAFKA-7699 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Herman Kolstad Jakobsen >Priority: Major > Labels: needs-kip > > Currently, wall-clock time punctuation allow to schedule periodic call backs > based on wall-clock time progress. The punctuation time starts, when the > punctuation is scheduled, thus, it's non-deterministic what is desired for > many use cases (I want a call-back in 5 minutes from "now"). > It would be a nice improvement, to allow users to "anchor" wall-clock > punctation, too, similar to a cron job: Thus, a punctuation would be > triggered at "fixed" times like the beginning of the next hour, independent > when the punctuation was registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
TaiJuWu commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9 is the commit I try moving `configure` to constructor but I finally gave it up. The most important consideration is `RemoteLogMetadataManager` and `RemoteStorageManager` are public plugin and we should not change this flow without any KIP. If I misunderstood, please correct me. I will wait other feedback and continue to work later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18827: Initialize share group state group coordinator impl. [3/N] [kafka]
AndrewJSchofield commented on PR #19026: URL: https://github.com/apache/kafka/pull/19026#issuecomment-2754166976 @smjn Please can you merge the latest changes from trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
TaiJuWu commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9 is the commit I try moving `configure` to constructor but I finally gave it up. The most important consideration is `RemoteLogMetadataManager` and `RemoteStorageManager` are public plugin and we should not change this flow without any KIP. If I misunderstood, please correct me. I will continue to work after getting other feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18891: KIP-877 add support for RemoteLogMetadataManager and RemoteStorageManager [kafka]
TaiJuWu commented on code in PR #19286: URL: https://github.com/apache/kafka/pull/19286#discussion_r2014001146 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -407,14 +413,17 @@ private void configureRLMM() { rlmmProps.put(LOG_DIR_CONFIG, logDir); rlmmProps.put("cluster.id", clusterId); -remoteLogMetadataManager.configure(rlmmProps); +remoteLogMetadataManagerPlugin.get().configure(rlmmProps); } public void startup() { // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources // in connecting to the brokers or remote storages. configureRSM(); configureRLMM(); +// the withPluginMetrics() method will be called when the plugin is instantiated (after configure() if the plugin also implements Configurable) Review Comment: https://github.com/apache/kafka/pull/19286/commits/108129df77529aa53cfa42cdaae5a40f47459ff9 is the commit I try moving `configure` to constructor but I finally gave it up. The most important consideration is `RemoteLogMetadataManager` and `RemoteStorageManager` are public plugin and we should not change this flow without any KIP; if I misunderstood please correct me. I will continue to work after getting other feedback. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
apoorvmittal10 commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013847250 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions +) { +lock.writeLock().lock(); +try { +if (abortedTransactions.isEmpty()) +return acquiredRecords; +// The record batches that need to be archived in cachedState because they were a part of aborted transactions. +List recordsToArchive = fetchAbortedTransactionRecordBatches(batches, abortedTransactions); +for (RecordBatch recordBatch : recordsToArchive) { +// Archive the offsets/batches in the cached state. +NavigableMap subMap = fetchSubMap(recordBatch); +archiveAcquiredBatchRecords(subMap, recordBatch); +} +return filterRecordBatchesFromAcquiredRecords(acquiredRecords, recordsToArchive); +} finally { +lock.writeLock().unlock(); +} +} + +// Visible for testing. +List filterRecordBatchesFromAcquiredRecords( +List acquiredRecords, +List recordsToArchive +) { +lock.writeLock().lock(); +try { +List result = new ArrayList<>(); + +for (AcquiredRecords acquiredRecord : acquiredRecords) { +List tempAcquiredRecords = new ArrayList<>(); +tempAcquiredRecords.add(acquiredRecord); +for (RecordBatch recordBatch : recordsToArchive) { +List newAcquiredRecords = new ArrayList<>(); +for (AcquiredRecords temp : tempAcquiredRecords) { +// Check if record batch overlaps with the acquired records. +if (temp.firstOffset() <= recordBatch.lastOffset() && temp.lastOffset() >= recordBatch.baseOffset()) { +// Split the acquired record into parts before, inside, and after the overlapping record batch. +if (temp.firstOffset() < recordBatch.baseOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(temp.firstOffset()) +.setLastOffset(recordBatch.baseOffset() - 1) +.setDeliveryCount((short) 1)); +} +if (temp.lastOffset() > recordBatch.lastOffset()) { +newAcquiredRecords.add(new AcquiredRecords() +.setFirstOffset(recordBatch.lastOffset() + 1) +.setLastOffset(temp.lastOffset()) +.setDeliveryCount((short) 1)); +} +} else { +newAcquiredRecords.add(temp); +} +} +tempAcquiredRecords = newAcquiredRecords; +} +result.addAll(tempAcquiredRecords); +} +return result; +} finally { +lock.writeLock().unlock(); +} +} + +private void archiveAcquiredBatchRecords(NavigableMap subMap, RecordBatch recordBatch) { +lock.writeLock().lock(); +try { +// The fetched batch either is exact fetch equivalent batch (mostly), subset +// or spans over multiple fetched batches. The state can vary per offset itself from +// the fetched batch in case of subset. +for (Map.Entry entry : subMap.entrySet()) { +InFlightBatch inFlightBatch = entry.getValue(); + +// If startOffse
Re: [PR] (WIP) KAFKA-18409: ShareGroupStateMessageFormatter should use ApiMessageFormatter [kafka]
dajac commented on PR #18510: URL: https://github.com/apache/kafka/pull/18510#issuecomment-2753880538 @brandboat We have merged https://github.com/apache/kafka/pull/18695. We can proceed with your PR. Please ping me when the PR is ready for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-18792) Enforce uniform PR structure for merge queue
[ https://issues.apache.org/jira/browse/KAFKA-18792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17936903#comment-17936903 ] David Arthur edited comment on KAFKA-18792 at 3/19/25 6:35 PM: --- Yes I think that makes sense [~mimaison]. "Co-authored-by:" is a git trailer which is basically like an email footer. Our "Reviewers:" is also a trailer. We do some trailer validation already to check for "Reviewers", so it would be easy to add/verify "Co-authored-by". We'll need to experiment to see what happens with a PR has multiple authors. Today, GitHub is adding "Co-authored-by" to the pre-filled commit message when we click "Squash and Merge". I'm not sure if the same happens when adding to the merge queue. — BTW, related to this, I would like to move us towards using these structured trailers a bit more. * Reviewed-by: anyone who left feedback on the PR * Approved-by: committers who approved the PR * Helped-by: shout-outs for inspiration or significant help * Signed-off-by: commit signatory * Fixes: KAFKA-12345 * References: KIP-123, KAFKA-23456 * etc This would help us get away from putting metadata into our commit subjects which will make the log a bit nicer looking. "git log" has options for formatting trailers, so we can still easily generate a changelog like {code:java} KAFKA-12345 Something happened KAFKA-23456 Another thing {code} was (Author: davidarthur): Yes I think that makes sense [~mimaison]. "Co-authored-by:" is a git trailer which is basically like an email footer. Our "Reviewers:" is also a trailer. We do some trailer validation already to check for "Reviewers", so it would be easy to add/verify "Co-authored-by". We'll need to experiment to see what happens with a PR has multiple authors. Today, GitHub is adding "Co-authored-by" to the pre-filled commit message when we click "Squash and Merge". I'm not sure if the same happens when adding to the merge queue. --- BTW, related to this, I would like to move us towards using these structured trailers a bit more. * Reviewed-by: anyone who left feedback on the PR * Approved-by: committers who approved the PR * Helped-by: shout-outs for inspiration or significant help * Signed-off-by: commit signatory * Fixes: KAFKA-12345 * References: KIP-123, KAFKA-23456 * etc > Enforce uniform PR structure for merge queue > > > Key: KAFKA-18792 > URL: https://issues.apache.org/jira/browse/KAFKA-18792 > Project: Kafka > Issue Type: Sub-task > Components: build >Reporter: David Arthur >Priority: Major > > Since the merge queue will use the repo's default commit message, we cannot > customize the commit when adding to the queue. Instead, we will use the PR > title and body as the commit message and subject (KAFKA-18791). > > In order to ensure well-formed commits, we will need a GitHub Action to > validate the PR title and PR body. > > Specifically, we should have the following checks: > * PR title includes KAFKA-X, MINOR, or HOTFIX > * PR title is less than 120 characters > * PR title is not automatically truncated by GitHub (ending in ...) > * PR body includes "Reviewers:" line -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]
TaiJuWu commented on code in PR #19277: URL: https://github.com/apache/kafka/pull/19277#discussion_r2013533692 ## storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BrokerCompressionTest { +private final File tmpDir = TestUtils.tempDirectory(); +private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); +private final MockTime time = new MockTime(0, 0); + +@AfterEach +public void tearDown() throws IOException { +Utils.delete(tmpDir); +} + +/** + * Test broker-side compression configuration + */ +@ParameterizedTest +@MethodSource("allCompressionParameters") +public void testBrokerSideCompression(CompressionType messageCompressionType, BrokerCompressionType brokerCompressionType) throws IOException { +Compression messageCompression = Compression.of(messageCompressionType).build(); + +/* Configure broker-side compression */ +UnifiedLog log = UnifiedLog.create( +logDir, +new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, brokerCompressionType.name)), +0L, +0L, +time.scheduler, +new BrokerTopicStats(), +time, +5 * 60 * 1000, +new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, +new LogDirFailureChannel(10), +true, +Optional.empty() +); + +/* Append two messages */ +log.appendAsLeader( +MemoryRecords.withRecords(messageCompression, 0, +new SimpleRecord("hello".getBytes()), +new SimpleRecord("there".getBytes()) +), 0 +); + +if (brokerCompressionType != BrokerCompressionType.PRODUCER) { +RecordBatch batch = readBatch(log, 0); +Compression targetCompression = BrokerCompressionType.targetCompression(log.config().compression, null); +assertEquals(targetCompression.type(), batch.compressionType(), "Compression at offset 0 should produce " + brokerCompressionType); +} else { +assertEquals(messageCompressionType, readBatch(log, 0).compressionType(), "Compression at offset 0 should produce " + messageCompressionType); +} Review Comment: All comments are addressed, thanks for your review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18893: KIP-877 Add support for ReplicaSelector [kafka]
TaiJuWu commented on code in PR #19064: URL: https://github.com/apache/kafka/pull/19064#discussion_r2004221275 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -1723,7 +1723,7 @@ class ReplicaManager(val config: KafkaConfig, metadata => findPreferredReadReplica(partition, metadata, params.replicaId, fetchInfo.fetchOffset, fetchTimeMs)) if (preferredReadReplica.isDefined) { - replicaSelectorOpt.foreach { selector => + replicaSelectorPlugin.foreach { selector => debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " + Review Comment: Not sure I understand is correct or not. I changed the debug message to `debug(s"Replica selector plugin ${selector.getClass.getSimpleName} returned preferred replica " + s"${preferredReadReplica.get} for ${params.clientMetadata}")` If I am misunderstanding , please correct me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]
dajac commented on code in PR #18695: URL: https://github.com/apache/kafka/pull/18695#discussion_r2013562510 ## tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; + +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Optional; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public abstract class CoordinatorRecordMessageFormatterTest { +private static final String TOPIC = "TOPIC"; + +protected abstract CoordinatorRecordMessageFormatter formatter(); +protected abstract Stream parameters(); + +@ParameterizedTest +@MethodSource("parameters") +public void testMessageFormatter(byte[] keyBuffer, byte[] valueBuffer, String expectedOutput) { Review Comment: Sure. I added test cases to the concrete classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]
dajac commented on PR #18695: URL: https://github.com/apache/kafka/pull/18695#issuecomment-2753506292 @chia7712 Thanks for your comments. I just addressed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]
wernerdv commented on code in PR #19216: URL: https://github.com/apache/kafka/pull/19216#discussion_r2007165486 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleanerManager.java: ## @@ -0,0 +1,800 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * This class manages the state of each partition being cleaned. + * LogCleaningState defines the cleaning states that a TopicPartition can be in. + * 1. None: No cleaning state in a TopicPartition. In this state, it can become LogCleaningInProgress + * or LogCleaningPaused(1). Valid previous state are LogCleaningInProgress and LogCleaningPaused(1) + * 2. LogCleaningInProgress : The cleaning is currently in progress. In this state, it can become None when log cleaning is finished + * or become LogCleaningAborted. Valid previous state is None. + * 3. LogCleaningAborted : The cleaning abort is requested. In this state, it can become LogCleaningPaused(1). + * Valid previous state is LogCleaningInProgress. + * 4-a. LogCleaningPaused(1) : The cleaning is paused once. No log cleaning can be done in this state. + * In this state, it can become None or LogCleaningPaused(2). + * Valid previous state is None, LogCleaningAborted or LogCleaningPaused(2). + * 4-b. LogCleaningPaused(i) : The cleaning is paused i times where i>= 2. No log cleaning can be done in this state. + * In this state, it can become LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + * Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1). + */ +public class LogCleanerManager { +public static final String OFFSET_CHECKPOINT_FILE = "cleaner-offset-checkpoint"; + +private static final Logger LOG = LoggerFactory.getLogger("kafka.log.LogCleaner"); + +private static final String UNCLEANABLE_PARTITIONS_COUNT_METRIC_NAME = "uncleanable-partitions-count"; +private static final String UNCLEANABLE_BYTES_METRIC_NAME = "uncleanable-bytes"; +private static final String MAX_DIRTY_PERCENT_METRIC_NAME = "max-dirty-percent"; +private static final String TIME_SINCE_LAST_RUN_MS_METRIC_NAME = "time-since-last-run-ms"; + +// Visible for testing +public static final Set GAUGE_METRIC_NAME_NO_TAG = Set.of(MAX_DIRTY_PERCENT_METRIC_NAME, TIME_SINCE_LAST_RUN_MS_METRIC_NAME); + +// For compatibility, metrics are defined to be under `kafka.log.LogCleanerManager` class +private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "LogCleanerManager"); + +/* the set of logs currently being cleaned */ +private final Map inProgress = new HashMap<>(); + +/* the set of uncleanable partitions (partitions that have raised an unexpected error during cleaning) + * for each log directory */ +private final Map> uncleanablePartitions = new HashMap<>(); + +/* a global lock used to control all access to the in-progress set and the offset checkpoints */ +private final Lock lock = new Re
[jira] [Resolved] (KAFKA-18893) Add support for ReplicaSelector
[ https://issues.apache.org/jira/browse/KAFKA-18893?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-18893. Fix Version/s: 4.1.0 Resolution: Fixed > Add support for ReplicaSelector > --- > > Key: KAFKA-18893 > URL: https://issues.apache.org/jira/browse/KAFKA-18893 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: TaiJuWu >Priority: Major > Fix For: 4.1.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013529068 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -847,6 +859,7 @@ public ShareAcquiredRecords acquire( } Review Comment: we are doing the filtering after this point, so this case would be automatically covered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19046) Change delete cleanup policy to compact cleanup policy
[ https://issues.apache.org/jira/browse/KAFKA-19046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Yang updated KAFKA-19046: Description: The internal topics of MirrorMaker 2 (MM2) sometimes report the following error: {code:java} Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389) org.apache.kafka.common.config.ConfigException: Topic 'mm2-offsets.cb.internal' supplied via the 'offset.storage.topic' property is required to have 'cleanup.policy=compact' to guarantee consistency and durability of source connector offsets, but found the topic currently has 'cleanup.policy=delete'. Continuing would likely result in eventually losing source connector offsets and problems restarting this Connect cluster in the future. Change the 'offset.storage.topic' property in the Connect worker configurations to use a topic with 'cleanup.policy=compact'. at org.apache.kafka.connect.util.TopicAdmin.verifyTopicCleanupPolicyOnlyCompact(TopicAdmin.java:581) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:47) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base/java.lang.Thread.run(Unknown Source) [2024-10-13 09:05:12,624] INFO Kafka MirrorMaker stopping (org.apache.kafka.connect.mirror.MirrorMaker:208){code} This results in the MM2 pods in the cluster entering a CrashLoopBackOff state repeatedly. When changing the configuration via kafka-configs.sh, the process runs fine. However, as we know, the default Kafka broker configuration for log.cleanup.policy is set to delete, while the default cleanup policy for MM2 is set to compact. It appears that the policy for offset.storage.topic must be compact, and similarly for status.storage and config.storage. I want to configure the cleanup policy for these three topics to always be compact. I attempted to configure them in connect-mirror-maker.properties as shown below, but all attempts failed: {code:java} offset.storage.topic.properties.cleanup.policy=compact status.storage.topic.properties.cleanup.policy=compact config.storage.topic.properties.cleanup.policy=compact{code} or {code:java} offset.storage.topic.cleanup.policy=compact status.storage.topic.cleanup.policy=compact config.storage.topic.cleanup.policy=compact{code} The logs show that the properties are unknown and report a failure in topic creation: {code:java} Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:389)[MirrorHerder-cb->ca-1] org.apache.kafka.connect.errors.ConnectException: Unable to create topic(s) 'mm2-offsets.cb.internal': Unknown topic config name: topic.properties.cleanup.policy at org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:474) at org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:345) at org.apache.kafka.connect.util.TopicAdmin.createTopicsWithRetry(TopicAdmin.java:363) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.createTopics(KafkaTopicBasedBackingStore.java:57) at org.apache.kafka.connect.storage.KafkaTopicBasedBackingStore.lambda$topicInitializer$0(KafkaTopicBasedBackingStore.java:43) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:247) at org.apache.kafka.connect.util.KafkaBasedLog.start(KafkaBasedLog.java:231) at org.apache.kafka.connect.storage.KafkaOffsetBackingStore.start(KafkaOffsetBackingStore.java:242) at org.apache.kafka.connect.runtime.Worker.start(Worker.java:233) at org.apache.kafka.connect.runtime.AbstractHerder.startServices(AbstractHerder.java:163) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:373) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base/java.util.co
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013589373 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -2484,6 +2497,259 @@ private long startOffsetDuringInitialization(long partitionDataStartOffset) thro } } +private ShareAcquiredRecords filterAbortedTransactionalAcquiredRecords(FetchPartitionData fetchPartitionData, FetchIsolation isolationLevel, ShareAcquiredRecords shareAcquiredRecords) { +if (isolationLevel != FetchIsolation.TXN_COMMITTED) +return shareAcquiredRecords; +// When FetchIsolation.TXN_COMMITTED is used as isolation type by the share group, we need to filter any +// transactions that were aborted/did not commit due to timeout. +List result = filterAbortedTransactionalRecords(fetchPartitionData.records.batches(), shareAcquiredRecords.acquiredRecords(), fetchPartitionData.abortedTransactions); +int acquiredCount = 0; +for (AcquiredRecords records : result) { +acquiredCount += (int) (records.lastOffset() - records.firstOffset() + 1); +} +return new ShareAcquiredRecords(result, acquiredCount); +} + +private List filterAbortedTransactionalRecords( +Iterable batches, +List acquiredRecords, +Optional> abortedTransactions Review Comment: so far in the testing I have observed that the `abortedTransactions` contains all the first offset of all the fetched record batches that have been aborted in **ascending order** of the first offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013577087 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java: ## @@ -83,6 +92,8 @@ public final class GroupConfig extends AbstractConfig { public final int streamsNumStandbyReplicas; +public final int shareIsolationLevel; Review Comment: my bad. I misread it. Changed the code to reflect the right values. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-19022) Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID error
[ https://issues.apache.org/jira/browse/KAFKA-19022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938478#comment-17938478 ] Lorcan edited comment on KAFKA-19022 at 3/26/25 8:20 AM: - Hi [~rsamudrala], I've checked the KafkaRaftClient class for the Inconsistent_Cluster_Id error and it looks like there are several instances where a customised error message with the cluster ids is not being set: VoteResponseData BeginQuorumEpochResponseData EndQuorumEpochResponseData FetchResponseData FetchSnapshotResponseData These all implement the ApiMessage, just like AddRaftVoterResponseData does. It looks like this would require a new schema for the above classes, given the logic when new fields have been added and I'm not sure if this would require a KIP or not. was (Author: JIRAUSER308669): Hi [~rsamudrala], I've checked the KafkaRaftClient class for the Inconsistent_Cluster_Id error and it looks like there are several instances where a customised error message with the cluster ids is not being set: VoteResponseData BeginQuorumEpochResponseData EndQuorumEpochResponseData FetchResponseData FetchSnapshotResponseData These all implement the ApiMessage, just like AddRaftVoterResponseData does. I can create a PR to add an errorMessage field to FetchResponseData to get feedback and see if this is a viable approach. > Display cluster IDs being compared when encountering INCONSISTENT_CLUSTER_ID > error > -- > > Key: KAFKA-19022 > URL: https://issues.apache.org/jira/browse/KAFKA-19022 > Project: Kafka > Issue Type: Improvement > Components: kraft, logging >Affects Versions: 3.9.0 >Reporter: Ranganath Samudrala >Assignee: Lorcan >Priority: Major > > While migrating Kafka from zookeeper to kraft, we see errors in logs like > {{INCONSISTENT_CLUSTER_ID in FETCH response }} > or > {{INCONSISTENT_CLUSTER_ID in VOTER response }} > But cluster IDs being compared is not displayed in logs so there is not > enough information to see where the issue is. Is the class data *clusterId* > empty (which could potentially be a bug?) or incoming *clusterId* empty or > incorrect? > [KafkaRaftClient|https://github.com/apache/kafka/blob/31e1a57c41cf9cb600751669dc71bcd9596b45f9/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1459] > {code:java} > private boolean hasValidClusterId(String requestClusterId) { > // We don't enforce the cluster id if it is not provided. > if (requestClusterId == null)Unknown macro: { > return true; > } > return clusterId.equals(requestClusterId); > } > . > . > private CompletableFuture handleFetchRequest( > RaftRequest.Inbound requestMetadata, > long currentTimeMs > ) { > FetchRequestData request = (FetchRequestData) requestMetadata.data(); > if (!hasValidClusterId(request.clusterId())) { > return completedFuture(new > FetchResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())); > } > . > . > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2013529068 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -847,6 +859,7 @@ public ShareAcquiredRecords acquire( } Review Comment: we are doing the filtering after this point, so this should case would be automatically covered. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]
TaiJuWu commented on code in PR #19277: URL: https://github.com/apache/kafka/pull/19277#discussion_r2013548508 ## storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java: ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BrokerCompressionTest { +private final File tmpDir = TestUtils.tempDirectory(); +private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); +private final MockTime time = new MockTime(0, 0); + +@AfterEach +public void tearDown() throws IOException { +Utils.delete(tmpDir); +} + +/** + * Test broker-side compression configuration + */ +@ParameterizedTest +@MethodSource("allCompressionParameters") +public void testBrokerSideCompression(CompressionType messageCompressionType, BrokerCompressionType brokerCompressionType) throws IOException { +Compression messageCompression = Compression.of(messageCompressionType).build(); + +/* Configure broker-side compression */ +UnifiedLog log = UnifiedLog.create( +logDir, +new LogConfig(Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, brokerCompressionType.name)), +0L, +0L, +time.scheduler, +new BrokerTopicStats(), +time, +5 * 60 * 1000, +new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, +new LogDirFailureChannel(10), +true, +Optional.empty() +); + +/* Append two messages */ +log.appendAsLeader( +MemoryRecords.withRecords(messageCompression, 0, +new SimpleRecord("hello".getBytes()), +new SimpleRecord("there".getBytes()) +), 0 +); + +if (brokerCompressionType != BrokerCompressionType.PRODUCER) { +RecordBatch batch = readBatch(log, 0); +Compression targetCompression = BrokerCompressionType.targetCompression(log.config().compression, null); +assertEquals(targetCompression.type(), batch.compressionType(), "Compression at offset 0 should produce " + brokerCompressionType); +} else { +assertEquals(messageCompressionType, readBatch(log, 0).compressionType(), "Compression at offset 0 should produce " + messageCompressionType); +} +} + +private static RecordBatch readBatch(UnifiedLog log, int offset) throws IOException { Review Comment: Removed the parameter and renamed method to `readFirstBatch`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]
dajac commented on code in PR #18695: URL: https://github.com/apache/kafka/pull/18695#discussion_r2013553668 ## tools/src/main/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatter.java: ## @@ -31,44 +34,50 @@ import static java.nio.charset.StandardCharsets.UTF_8; -public abstract class ApiMessageFormatter implements MessageFormatter { - +public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter { private static final String TYPE = "type"; private static final String VERSION = "version"; private static final String DATA = "data"; private static final String KEY = "key"; private static final String VALUE = "value"; -static final String UNKNOWN = "unknown"; + +private final CoordinatorRecordSerde serde; + +public CoordinatorRecordMessageFormatter(CoordinatorRecordSerde serde) { +this.serde = serde; +} @Override public void writeTo(ConsumerRecord consumerRecord, PrintStream output) { +if (Objects.isNull(consumerRecord.key())) return; + ObjectNode json = new ObjectNode(JsonNodeFactory.instance); +try { +CoordinatorRecord record = serde.deserialize( +ByteBuffer.wrap(consumerRecord.key()), +consumerRecord.value() != null ? ByteBuffer.wrap(consumerRecord.value()) : null +); + +if (!isRecordTypeAllowed(record.key().apiKey())) return; -byte[] key = consumerRecord.key(); -if (Objects.nonNull(key)) { -short keyVersion = ByteBuffer.wrap(key).getShort(); -JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key)); +json +.putObject(KEY) +.put(TYPE, record.key().apiKey()) Review Comment: That's right. We will it in https://github.com/apache/kafka/pull/18510. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18616; Refactor Tools's ApiMessageFormatter [kafka]
dajac commented on code in PR #18695: URL: https://github.com/apache/kafka/pull/18695#discussion_r2013554650 ## tools/src/test/java/org/apache/kafka/tools/consumer/CoordinatorRecordMessageFormatterTest.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools.consumer; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.MessageFormatter; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; + +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Optional; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) Review Comment: It is used to allow using a non-static method in `@MethodSource`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
mjsax commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2015214289 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -419,8 +485,36 @@ private Topology complexTopology() { return builder.build(); } -private Topology simpleTopology() { +private void addGlobalStore(final StreamsBuilder builder) { +builder.addGlobalStore(Stores.keyValueStoreBuilder( +Stores.inMemoryKeyValueStore("iq-test-store"), +Serdes.String(), +Serdes.String() +), +globalStoreTopic, +Consumed.with(Serdes.String(), Serdes.String()), +() -> new Processor<>() { +private KeyValueStore store; + +@Override +public void init(final ProcessorContext context) { +store = context.getStateStore("iq-test-store"); +} + +@Override +public void process(final Record record) { +store.put(record.key(), record.value()); +globalStoreIterator = store.all(); Review Comment: Seems my IntelliJ warn setting are different. Should be ok w/o it, too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]
chia7712 commented on PR #19216: URL: https://github.com/apache/kafka/pull/19216#issuecomment-2756906467 @wernerdv thanks for taking on this major migration. Really well done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [1/N] Move ConsumerTopicCreationTest to client-integration-tests module [kafka]
m1a2st commented on code in PR #19283: URL: https://github.com/apache/kafka/pull/19283#discussion_r2015550703 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerTopicCreationTest.java: ## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfig; +import org.apache.kafka.common.test.api.ClusterTemplate; + +import java.time.Duration; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.CLIENT_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.common.test.api.Type.KRAFT; +import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConsumerTopicCreationTest { +private static final String TOPIC = "topic"; +private static final long POLL_TIMEOUT = 1000; + +@ClusterTemplate("autoCreateTopicsConfigs") +void testAsyncConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance cluster) { +try (Consumer consumer = createConsumer(GroupProtocol.CONSUMER, true, cluster.bootstrapServers())) { +consumer.subscribe(List.of(TOPIC)); +consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); +if (allowAutoCreateTopics(cluster)) +assertTrue(getAllTopics(cluster).contains(TOPIC)); +else +assertFalse(getAllTopics(cluster).contains(TOPIC), +"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic automatically"); +} +} + +@ClusterTemplate("autoCreateTopicsConfigs") +void testAsyncConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance cluster) { +try (Consumer consumer = createConsumer(GroupProtocol.CONSUMER, false, cluster.bootstrapServers())) { +consumer.subscribe(List.of(TOPIC)); +consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); +assertFalse(getAllTopics(cluster).contains(TOPIC), +"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic automatically"); +} +} + +@ClusterTemplate("autoCreateTopicsConfigs") +void testClassicConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance cluster) { +try (Consumer consumer = createConsumer(GroupProtocol.CLASSIC, true, cluster.bootstrapServers())) { +consumer.subscribe(List.of(TOPIC)); +consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); +if (allowAutoCreateTopics(cluster)) +assertTrue(getAllTopics(cluster).contains(TOPIC)); +else +assertFalse(getAllTopics(cluster).contains(TOPIC), +"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic automatically"); +} +} + +@ClusterTemplate("autoCreateTopicsConfigs") +void testClassicConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance cluster) { +try (Consumer consumer = createConsumer(GroupProtocol.CLASSIC, false, cluster.bootstrapServers())) { +consumer.subscribe(List.of(TOPIC)); +consumer.poll(Duration.ofMillis(POLL_TIMEOUT)); +assertFalse(getAllTopics(cluster).contains(TOPIC), +"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + ALL
Re: [PR] KAFKA-18379: Enforce resigned cannot transition to any other state in same epoch [kafka]
github-actions[bot] commented on PR #19236: URL: https://github.com/apache/kafka/pull/19236#issuecomment-2756468366 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18607: Update jfreechart dependency [kafka]
mingdaoy commented on PR #19074: URL: https://github.com/apache/kafka/pull/19074#issuecomment-2756489837 @mimaison I'll test that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18607: Update jfreechart dependency [kafka]
github-actions[bot] commented on PR #19074: URL: https://github.com/apache/kafka/pull/19074#issuecomment-2756469253 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19048) Minimal Movement Replica Balancing algorithm
Jialun Peng created KAFKA-19048: --- Summary: Minimal Movement Replica Balancing algorithm Key: KAFKA-19048 URL: https://issues.apache.org/jira/browse/KAFKA-19048 Project: Kafka Issue Type: Improvement Components: generator Reporter: Jialun Peng Assignee: Jialun Peng h2. Motivation Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary. h2. Goals The proposed approach prioritizes the following objectives: # {*}Minimal Movement{*}: Minimize the number of replica relocations during rebalancing. # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed across brokers. # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures. h2. Proposed Changes h3. Rack-Level Replica Distribution The following rules ensure balanced replica allocation at the rack level: # {*}When {{*}}{{{}*rackCount = replicationFactor*{}}}: * ** Each rack receives exactly {{partitionCount}} replicas. # {*}When {{*}}{{{}*rackCount > replicationFactor*{}}}: * ** If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. * ** If weighted allocation {{{}< partitionCount{}}}: distribute remaining replicas using a weighted remainder allocation. h3. Node-Level Replica Distribution # If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others. # {*}When {{*}}{{{}*rackCount = replicationFactor*{}}}: * ** If all racks have an equal number of nodes, each node will host an equal number of replicas. * ** If rack sizes vary, nodes in larger racks will host fewer replicas on average. # {*}When {{*}}{{{}*rackCount > replicationFactor*{}}}: * ** If no rack has a significantly higher node weight, replicas will be evenly distributed. * ** If a rack has disproportionately high node weight, those nodes will receive fewer replicas. h3. Anti-Affinity Support When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add doc for configuration parameters to keep the format consistent [kafka]
github-actions[bot] commented on PR #19244: URL: https://github.com/apache/kafka/pull/19244#issuecomment-2756467904 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19008 Remove scala version from artifacts [kafka]
github-actions[bot] commented on PR #19241: URL: https://github.com/apache/kafka/pull/19241#issuecomment-2756468164 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-19048: Minimal Movement Replica Balancing algorithm [kafka]
pjl1070048431 opened a new pull request, #19297: URL: https://github.com/apache/kafka/pull/19297 ## Motivation Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary. ## Goals The new replica rebalancing strategy aims to achieve the following objectives: 1. **Minimal Movement**: Minimize the number of replica relocations during rebalancing. 2. **Replica Balancing**: Ensure that replicas are evenly distributed across brokers. 3. **Anti-Affinity Support**: Support rack-aware allocation when enabled. 4. **Leader Balancing**: Distribute leader replicas evenly across brokers. 5. **ISR Order Optimization**: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures. ## Proposed Changes ### Rack-Level Replica Distribution The following rules ensure balanced replica allocation at the rack level: 1. **When** `**rackCount = replicationFactor**`: - Each rack receives exactly `partitionCount` replicas. 2. **When** `**rackCount > replicationFactor**`: - If weighted allocation `(rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount`: each rack receives exactly `partitionCount` replicas. - If weighted allocation `< partitionCount`: distribute remaining replicas using a weighted remainder allocation. ### Node-Level Replica Distribution 1. If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others. 2. **When** `**rackCount = replicationFactor**`: - If all racks have an equal number of nodes, each node will host an equal number of replicas. - If rack sizes vary, nodes in larger racks will host fewer replicas on average. 3. **When** `**rackCount > replicationFactor**`: - If no rack has a significantly higher node weight, replicas will be evenly distributed. - If a rack has disproportionately high node weight, those nodes will receive fewer replicas. ### Anti-Affinity Support When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org