[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-19047: --- Issue Type: Bug (was: Improvement) > Broker registrations are slow if previously fenced or shutdown > -- > > Key: KAFKA-19047 > URL: https://issues.apache.org/jira/browse/KAFKA-19047 > Project: Kafka > Issue Type: Bug >Affects Versions: 4.0.0 >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > > BrokerLifecycleManager prevents registration of a broker w/ an id it has seen > before with a different incarnation id if the broker session expires. On > clean shutdown and restart of a broker this can cause an unnecessary delay in > re-registration while the quorum controller waits for the session to expire. > ``` > [BrokerLifecycleManager id=1] Unable to register broker 1 because the > controller returned error DUPLICATE_BROKER_REGISTRATION > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]
AyoubOm opened a new pull request, #19303: URL: https://github.com/apache/kafka/pull/19303 This fixes both KAFKA-16407 and KAFKA-16434. Summary of existing issues: - We are ignoring new left record when its **previous** FK value is null - We do not unset foreign key join result when FK becomes null _This PR was initially open in [#15615](https://github.com/apache/kafka/pull/15615)_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now for this PR, we have added any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` so that it can be sent on the next poll(). - I agree it looks a bit odd though for readability. But yeah there is a case when this could happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-7699) Improve wall-clock time punctuations
[ https://issues.apache.org/jira/browse/KAFKA-7699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7699: --- Description: KIP-1146: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+wall-clock+punctuation] Currently, wall-clock time punctuation allow to schedule periodic call backs based on wall-clock time progress. The punctuation time starts, when the punctuation is scheduled, thus, it's non-deterministic what is desired for many use cases (I want a call-back in 5 minutes from "now"). It would be a nice improvement, to allow users to "anchor" wall-clock punctation, too, similar to a cron job: Thus, a punctuation would be triggered at "fixed" times like the beginning of the next hour, independent when the punctuation was registered. was: Currently, wall-clock time punctuation allow to schedule periodic call backs based on wall-clock time progress. The punctuation time starts, when the punctuation is scheduled, thus, it's non-deterministic what is desired for many use cases (I want a call-back in 5 minutes from "now"). It would be a nice improvement, to allow users to "anchor" wall-clock punctation, too, similar to a cron job: Thus, a punctuation would be triggered at "fixed" times like the beginning of the next hour, independent when the punctuation was registered. > Improve wall-clock time punctuations > > > Key: KAFKA-7699 > URL: https://issues.apache.org/jira/browse/KAFKA-7699 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Herman Kolstad Jakobsen >Priority: Major > Labels: kip > > KIP-1146: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-1146%3A+Anchored+wall-clock+punctuation] > > Currently, wall-clock time punctuation allow to schedule periodic call backs > based on wall-clock time progress. The punctuation time starts, when the > punctuation is scheduled, thus, it's non-deterministic what is desired for > many use cases (I want a call-back in 5 minutes from "now"). > It would be a nice improvement, to allow users to "anchor" wall-clock > punctation, too, similar to a cron job: Thus, a punctuation would be > triggered at "fixed" times like the beginning of the next hour, independent > when the punctuation was registered. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Move inner test classes out of CoordinatorRuntimeTest [kafka]
squah-confluent commented on code in PR #19258: URL: https://github.com/apache/kafka/pull/19258#discussion_r2007416635 ## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java: ## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; + +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * A simple Coordinator implementation that stores the records into a set. + */ +public class MockCoordinatorShard implements CoordinatorShard { +static class RecordAndMetadata { Review Comment: I'm trying to figure out how to do this without making the CoordinatorRuntimeTests even more verbose. Do you have any suggestions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7952) Consider to switch to in-memory stores in test whenever possible
[ https://issues.apache.org/jira/browse/KAFKA-7952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939094#comment-17939094 ] Matthias J. Sax commented on KAFKA-7952: [~lorcanj] Thanks for the PR. I'll try to get to it as soon as possible (also sorry for late reply – I was traveling). > Consider to switch to in-memory stores in test whenever possible > > > Key: KAFKA-7952 > URL: https://issues.apache.org/jira/browse/KAFKA-7952 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Lorcan >Priority: Major > Labels: beginner, newbie > > We observed that tests can be very slow using default RocksDB stores (cf. > KAFKA-7933). > We should consider to switch to in-memory stores whenever possible to reduce > test runtime. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17662: config.providers configuration missing from the docs [kafka]
gharris1727 commented on code in PR #18930: URL: https://github.com/apache/kafka/pull/18930#discussion_r2017574587 ## clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java: ## @@ -289,6 +295,13 @@ protected Map postProcessParsedConfig(final Map CommonClientConfigs.warnDisablingExponentialBackoff(this); return CommonClientConfigs.postProcessReconnectBackoffConfigs(this, parsedValues); } + +@Override +public Map originals() { Review Comment: The WorkerConfig is a connect-specific class, and it removes config.providers to avoid false-positive errors related to a recent CVE: https://www.cve.org/CVERecord?id=CVE-2024-31141 Admin (and Producer and Consumer) clients don't have the same concerns that WorkerConfig does, or at least i'm not aware of them. Also, the title of this PR makes it sound like a documentation change, and this appears to be a functional change. If it needs to be addressed, can it be addressed separately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6840) support windowing in ktable API
[ https://issues.apache.org/jira/browse/KAFKA-6840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6840: --- Priority: Major (was: Blocker) > support windowing in ktable API > --- > > Key: KAFKA-6840 > URL: https://issues.apache.org/jira/browse/KAFKA-6840 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.1.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Labels: api, needs-kip > > The StreamsBuilder provides table() API to materialize a changelog topic into > a local key-value store (KTable), which is very convenient. However, current > underlying implementation does not support materializing one topic to a > windowed key-value store, which in certain cases would be very useful. > To make up the gap, we proposed a new API in StreamsBuilder that could get a > windowed Ktable. > The table() API in StreamsBuilder looks like this: > public synchronized KTable table(final String topic, > final Consumed > consumed, > final Materialized KeyValueStore> materialized) { > Objects.requireNonNull(topic, "topic can't be null"); > Objects.requireNonNull(consumed, "consumed can't be null"); > Objects.requireNonNull(materialized, "materialized can't be null"); > > materialized.withKeySerde(consumed.keySerde).withValueSerde(consumed.valueSerde); > return internalStreamsBuilder.table(topic, > new ConsumedInternal<>(consumed), > new > MaterializedInternal<>(materialized, internalStreamsBuilder, topic + "-")); > } > > Where we could see that the store type is given as KeyValueStore. There is no > flexibility to change it to WindowStore. > > To maintain compatibility of the existing API, we have two options to define > a new API: > 1.Overload existing KTable struct > public synchronized KTable, V> windowedTable(final String > topic, > final Consumed > consumed, > final Materialized WindowStore> materialized); > > This could give developer an alternative to use windowed table instead. > However, this implies that we need to make sure all the KTable logic still > works as expected, such as join, aggregation, etc, so the challenge would be > making sure all current KTable logics work. > > 2.Define a new type called WindowedKTable > public synchronized WindowedKTable windowedTable(final String > topic, > final Consumed > consumed, > final Materialized WindowStore> materialized); > The benefit of doing this is that we don’t need to worry about the existing > functionality of KTable. However, the cost is to introduce redundancy of > common operation logic. When upgrading common functionality, we need to take > care of both types. > We could fill in more details in the KIP. Right now I would like to hear some > feedbacks on the two approaches, thank you! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939055#comment-17939055 ] Gunnar Morling edited comment on KAFKA-19047 at 3/27/25 7:42 PM: - For reference, here are the logs I've observed in that situation: {code} bin/kafka-server-start.sh config/server.properties [2025-03-27 20:40:54,651] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2025-03-27 20:40:54,803] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler) [2025-03-27 20:40:54,804] INFO [ControllerServer id=1] Starting controller (kafka.server.ControllerServer) [2025-03-27 20:40:54,932] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas) [2025-03-27 20:40:54,947] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(CONTROLLER) (kafka.network.SocketServer) [2025-03-27 20:40:54,950] INFO authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures) [2025-03-27 20:40:54,951] INFO [SharedServer id=1] Starting SharedServer (kafka.server.SharedServer) [2025-03-27 20:40:54,971] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Recovering unflushed segment 0. 0 recovered for __cluster_metadata-0. (org.apache.kafka.storage.internals.log.LogLoader) [2025-03-27 20:40:54,977] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:54,978] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:54,978] INFO Deleted producer state snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/5680.snapshot (org.apache.kafka.storage.internals.log.SnapshotFile) [2025-03-27 20:40:54,978] INFO Deleted producer state snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot (org.apache.kafka.storage.internals.log.SnapshotFile) [2025-03-27 20:40:54,979] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,014] INFO [ProducerStateManager partition=__cluster_metadata-0] Wrote producer snapshot at offset 5844 with 0 producer ids in 9 ms. (org.apache.kafka.storage.internals.log.ProducerStateManager) [2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 5844 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 5844 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,017] INFO [ProducerStateManager partition=__cluster_metadata-0] Loading producer state from snapshot file 'SnapshotFile(offset=5844, file=/tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot)' (org.apache.kafka.storage.internals.log.ProducerStateManager) [2025-03-27 20:40:55,018] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 5844 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,034] INFO Initialized snapshots with IDs SortedSet(OffsetAndEpoch(offset=0, epoch=0)) from /tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$) [2025-03-27 20:40:55,039] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) [2025-03-27 20:40:55,043] INFO [RaftManager id=1] Starting request manager with bootstrap servers: [localhost:9093 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient) [2025-03-27 20:40:55,045] INFO [RaftManager id=1] Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient) [2025-03-27 20:40:55,046] INFO [RaftManager id=1] Loading snapshot (OffsetAndEpoch(offset=0, epoch=0)) since log start offset (0) is greater than the internal listener's next offset (-1) (org.apache.kafka.raft.internals.KRaftControlRecordStateMachine) [2025-03-27 20:40:55,048] INFO [RaftManager id=1] Latest kraft.version is KRAFT_VERSION_1 at offset -1 (org.apache.kafka.raft.internals.KRaftControlRecordStateMachi
Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]
jeffkbkim commented on code in PR #19290: URL: https://github.com/apache/kafka/pull/19290#discussion_r2016876236 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig, return } -val sTime = time.milliseconds -val localProduceResultsWithTopicId = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, - origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap) -debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) -val localProduceResults : Map[TopicPartition, LogAppendResult] = localProduceResultsWithTopicId.map { - case(k, v) => (k.topicPartition, v)} +val localProduceResults = appendRecordsToLeader( + requiredAcks, + internalTopicsAllowed, + origin, + entriesPerPartition, + requestLocal, + actionQueue, + verificationGuards +) val produceStatus = buildProducePartitionStatus(localProduceResults) -addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId) recordValidationStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats }) Review Comment: (can't write a comment below this) to confirm, maybeAddDelayedProduce below can be asynchronous which it isn't if acks=1 today but it may change in the future? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]
dajac commented on code in PR #19290: URL: https://github.com/apache/kafka/pull/19290#discussion_r2016912371 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig, return } -val sTime = time.milliseconds -val localProduceResultsWithTopicId = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed, - origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap) -debug("Produce to local log in %d ms".format(time.milliseconds - sTime)) -val localProduceResults : Map[TopicPartition, LogAppendResult] = localProduceResultsWithTopicId.map { - case(k, v) => (k.topicPartition, v)} +val localProduceResults = appendRecordsToLeader( + requiredAcks, + internalTopicsAllowed, + origin, + entriesPerPartition, + requestLocal, + actionQueue, + verificationGuards +) val produceStatus = buildProducePartitionStatus(localProduceResults) -addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId) recordValidationStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordValidationStats }) Review Comment: Correct. `maybeAddDelayedProduce` decides how to complete the call. Basically, the method puts the produce request into the purgatory (`acls=all`) or complete it immediately. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Refactor GroupCoordinator write path [kafka]
dajac merged PR #19290: URL: https://github.com/apache/kafka/pull/19290 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-14487: Move LogManager static methods/fields to storage module [kafka]
mimaison opened a new pull request, #19302: URL: https://github.com/apache/kafka/pull/19302 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939055#comment-17939055 ] Gunnar Morling commented on KAFKA-19047: For reference, here are the logs I've observed in that situation: ``` bin/kafka-server-start.sh config/server.properties [2025-03-27 20:40:54,651] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2025-03-27 20:40:54,803] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler) [2025-03-27 20:40:54,804] INFO [ControllerServer id=1] Starting controller (kafka.server.ControllerServer) [2025-03-27 20:40:54,932] INFO Updated connection-accept-rate max connection creation rate to 2147483647 (kafka.network.ConnectionQuotas) [2025-03-27 20:40:54,947] INFO [SocketServer listenerType=CONTROLLER, nodeId=1] Created data-plane acceptor and processors for endpoint : ListenerName(CONTROLLER) (kafka.network.SocketServer) [2025-03-27 20:40:54,950] INFO authorizerStart completed for endpoint CONTROLLER. Endpoint is now READY. (org.apache.kafka.server.network.EndpointReadyFutures) [2025-03-27 20:40:54,951] INFO [SharedServer id=1] Starting SharedServer (kafka.server.SharedServer) [2025-03-27 20:40:54,971] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Recovering unflushed segment 0. 0 recovered for __cluster_metadata-0. (org.apache.kafka.storage.internals.log.LogLoader) [2025-03-27 20:40:54,977] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:54,978] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:54,978] INFO Deleted producer state snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/5680.snapshot (org.apache.kafka.storage.internals.log.SnapshotFile) [2025-03-27 20:40:54,978] INFO Deleted producer state snapshot /tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot (org.apache.kafka.storage.internals.log.SnapshotFile) [2025-03-27 20:40:54,979] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 0 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,014] INFO [ProducerStateManager partition=__cluster_metadata-0] Wrote producer snapshot at offset 5844 with 0 producer ids in 9 ms. (org.apache.kafka.storage.internals.log.ProducerStateManager) [2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Loading producer state till offset 5844 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,017] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Reloading from producer snapshot and rebuilding producer state from offset 5844 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,017] INFO [ProducerStateManager partition=__cluster_metadata-0] Loading producer state from snapshot file 'SnapshotFile(offset=5844, file=/tmp/kraft-combined-logs/__cluster_metadata-0/5844.snapshot)' (org.apache.kafka.storage.internals.log.ProducerStateManager) [2025-03-27 20:40:55,018] INFO [LogLoader partition=__cluster_metadata-0, dir=/tmp/kraft-combined-logs] Producer state recovery took 1ms for snapshot load and 0ms for segment recovery from offset 5844 (org.apache.kafka.storage.internals.log.UnifiedLog) [2025-03-27 20:40:55,034] INFO Initialized snapshots with IDs SortedSet(OffsetAndEpoch(offset=0, epoch=0)) from /tmp/kraft-combined-logs/__cluster_metadata-0 (kafka.raft.KafkaMetadataLog$) [2025-03-27 20:40:55,039] INFO [raft-expiration-reaper]: Starting (kafka.raft.TimingWheelExpirationService$ExpiredOperationReaper) [2025-03-27 20:40:55,043] INFO [RaftManager id=1] Starting request manager with bootstrap servers: [localhost:9093 (id: -2 rack: null isFenced: false)] (org.apache.kafka.raft.KafkaRaftClient) [2025-03-27 20:40:55,045] INFO [RaftManager id=1] Reading KRaft snapshot and log as part of the initialization (org.apache.kafka.raft.KafkaRaftClient) [2025-03-27 20:40:55,046] INFO [RaftManager id=1] Loading snapshot (OffsetAndEpoch(offset=0, epoch=0)) since log start offset (0) is greater than the internal listener's next offset (-1) (org.apache.kafka.raft.internals.KRaftControlRecordStateMachine) [2025-03-27 20:40:55,048] INFO [RaftManager id=1] Latest kraft.version is KRAFT_VERSION_1 at offset -1 (org.apache.kafka.raft.internals.KRaftControlRecordStateMachine) [2025-03-27 20:40:55,049] INFO [RaftManager id=1
Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]
fonsdant commented on code in PR #18610: URL: https://github.com/apache/kafka/pull/18610#discussion_r2017792607 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -277,197 +275,361 @@ public void close() { } } -private class RocksDBDualCFIterator extends AbstractIterator> -implements ManagedKeyValueIterator { - -// RocksDB's JNI interface does not expose getters/setters that allow the -// comparator to be pluggable, and the default is lexicographic, so it's -// safe to just force lexicographic comparator here for now. +/** + * A range-based iterator for RocksDB that merges results from two column families. + * + * This iterator supports traversal over two RocksDB column families: one containing timestamped values and + * another containing non-timestamped values. It ensures that the keys from both column families are merged and + * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range + * boundaries. + * + * Key Features + * + * + * Merges results from the "with-timestamp" and "no-timestamp" column families. + * Supports range-based queries with open or closed boundaries. + * Handles both forward and reverse iteration seamlessly. + * Ensures correct handling of inclusive and exclusive upper boundaries. + * Integrates efficiently with Kafka Streams state store mechanisms. + * + * + * Usage + * + * The iterator can be used for different types of range-based operations, such as: + * + * Iterating over all keys within a range. + * Prefix-based scans (when combined with dynamically calculated range endpoints). + * Open-ended range queries (e.g., from a given key to the end of the dataset). + * + * + * + * Implementation Details + * + * The class extends {@link AbstractIterator} and implements {@link ManagedKeyValueIterator}. It uses RocksDB's + * native iterators for efficient traversal of keys within the specified range. Keys from the two column families + * are merged during iteration, ensuring proper order and de-duplication where applicable. + * + * Key Methods: + * + * + * {@code makeNext()}: Retrieves the next key-value pair in the merged range, ensuring + * the result is within the specified range and boundary conditions. + * {@code initializeIterators()}: Initializes the RocksDB iterators based on the specified range and direction. + * {@code isInRange()}: Verifies if the current key-value pair is within the range defined by {@code from} and {@code to}. + * {@code fetchNextKeyValue()}: Determines the next key-value pair to return based on the state of both iterators. + * + * + * Thread Safety: + * + * The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple + * threads without external synchronization. + * + * Examples + * + * Iterate over a range: + * + * {@code + * RocksIterator noTimestampIterator = accessor.newIterator(noTimestampColumnFamily); + * RocksIterator withTimestampIterator = accessor.newIterator(withTimestampColumnFamily); + * + * try (RocksDBDualCFRangeIterator iterator = new RocksDBDualCFRangeIterator( + * new Bytes("keyStart".getBytes()), + * new Bytes("keyEnd".getBytes()), + * noTimestampIterator, + * withTimestampIterator, + * "storeName", + * true, // Forward iteration + * true // Inclusive upper boundary + * )) { + * while (iterator.hasNext()) { + * KeyValue entry = iterator.next(); + * System.out.println("Key: " + entry.key + ", Value: " + Arrays.toString(entry.value)); + * } + * } + * } + * + * Exceptions + * + * + * {@link InvalidStateStoreException}: Thrown if the iterator is accessed after being closed. + * {@link IllegalStateException}: Thrown if the close callback is not properly set before usage. + * + * + * @see AbstractIterator + * @see ManagedKeyValueIterator + * @see RocksDBStore + */ +private static class RocksDBDualCFRangeIterator extends AbstractIterator> implements ManagedKeyValueIterator { +private Runnable closeCallback; +private byte[] noTimestampNext; +private byte[] withTimestampNext; private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; - +private final RocksIterator noTimestampIterator; +private final RocksIterator withTimestampIterator; private final String storeName; -private final RocksIterator iterWithTimestamp; -private final RocksIter
[jira] [Updated] (KAFKA-18797) Flaky testLargeAssignmentAndGroupWithUniformSubscription
[ https://issues.apache.org/jira/browse/KAFKA-18797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] PoAn Yang updated KAFKA-18797: -- Attachment: testLargeAssignmentAndGroupWithUniformSubscription.jfr > Flaky testLargeAssignmentAndGroupWithUniformSubscription > > > Key: KAFKA-18797 > URL: https://issues.apache.org/jira/browse/KAFKA-18797 > Project: Kafka > Issue Type: Test > Components: consumer >Reporter: Lianet Magrans >Priority: Major > Attachments: Each object memory usage.png, Java Heap Memory.png, > testLargeAssignmentAndGroupWithUniformSubscription.jfr > > > Flaky on trunk for a while (One of the top flaky tests) > Flaky with timeouts > https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=1739549233243&search.startTimeMin=173709000&search.tags=trunk&search.timeZoneId=America%2FToronto&tests.container=org.apache.kafka.clients.consumer.StickyAssignorTest&tests.sortField=FLAKY&tests.test=testLargeAssignmentAndGroupWithUniformSubscription(boolean)%5B1%5D -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19050) kafka-streams-integration-tests artifact is empty
[ https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-19050: Priority: Blocker (was: Major) > kafka-streams-integration-tests artifact is empty > - > > Key: KAFKA-19050 > URL: https://issues.apache.org/jira/browse/KAFKA-19050 > Project: Kafka > Issue Type: Improvement > Components: build, streams >Affects Versions: 4.0.0 >Reporter: Utku Aydin >Priority: Blocker > > Not sure whether this was intended or not but currently the release process > releases an artifact with an empty jar containing only the manifest for the > kafka-streams-integration-tests module. Since these tests are now in their > own module, I think it's an opportunity to make them available to end-users > as well. Personally, I would like to use EmbeddedKafkaCluster from > org.apache.kafka.streams.integration.utils for my own integration tests since > io.github.embeddedkafka is no longer being maintained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19050) kafka-streams-integration-tests artifact is empty
[ https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-19050: Fix Version/s: 4.1.0 4.0.1 > kafka-streams-integration-tests artifact is empty > - > > Key: KAFKA-19050 > URL: https://issues.apache.org/jira/browse/KAFKA-19050 > Project: Kafka > Issue Type: Improvement > Components: build, streams >Affects Versions: 4.0.0 >Reporter: Utku Aydin >Priority: Blocker > Fix For: 4.1.0, 4.0.1 > > > Not sure whether this was intended or not but currently the release process > releases an artifact with an empty jar containing only the manifest for the > kafka-streams-integration-tests module. Since these tests are now in their > own module, I think it's an opportunity to make them available to end-users > as well. Personally, I would like to use EmbeddedKafkaCluster from > org.apache.kafka.streams.integration.utils for my own integration tests since > io.github.embeddedkafka is no longer being maintained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10844: groupBy without shuffling [kafka]
fonsdant commented on PR #18811: URL: https://github.com/apache/kafka/pull/18811#issuecomment-2759887172 @mjsax, I would like to suggest that we adopt "skip repartition" instead of "mark as partitioned". It seems to me to be more consistent with the rest of the KStream API methods and its functional programming language, because "skip repartition" is actually the action that is performed, while "mark as partitioned" seems to me to be more like a condition to be interpreted ("oh, okay, since this is marked as partitioned, I will _skip repartitioning_") than the action itself. What do you think? Also, I would like to know if you have other test scenarios in mind. Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19050) kafka-streams-integration-tests artifact is empty
[ https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-19050: Component/s: streams > kafka-streams-integration-tests artifact is empty > - > > Key: KAFKA-19050 > URL: https://issues.apache.org/jira/browse/KAFKA-19050 > Project: Kafka > Issue Type: Improvement > Components: build, streams >Affects Versions: 4.0.0 >Reporter: Utku Aydin >Priority: Major > > Not sure whether this was intended or not but currently the release process > releases an artifact with an empty jar containing only the manifest for the > kafka-streams-integration-tests module. Since these tests are now in their > own module, I think it's an opportunity to make them available to end-users > as well. Personally, I would like to use EmbeddedKafkaCluster from > org.apache.kafka.streams.integration.utils for my own integration tests since > io.github.embeddedkafka is no longer being maintained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18980: OffsetMetadataManager#cleanupExpiredOffsets should record the number of records rather than topic partitions [kafka]
chia7712 commented on code in PR #19207: URL: https://github.com/apache/kafka/pull/19207#discussion_r2006072232 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -875,7 +872,7 @@ public boolean cleanupExpiredOffsets(String groupId, List rec // We don't expire the offset yet if there is a pending transactional offset for the partition. if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs()) && !hasPendingTransactionalOffsets(groupId, topic, partition)) { - expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString()); +appendOffsetCommitTombstone(groupId, topic, partition, records); Review Comment: my point was `appendOffsetCommitTombstone` can remove the returned value. for example: ```java private void appendOffsetCommitTombstone( String groupId, String topic, int partition, List records ) { records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); TopicPartition tp = new TopicPartition(topic, partition); log.trace("[GroupId {}] Removing expired offset and metadata for {}", groupId, tp); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]
fonsdant commented on code in PR #18610: URL: https://github.com/apache/kafka/pull/18610#discussion_r2017792885 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -277,197 +275,361 @@ public void close() { } } -private class RocksDBDualCFIterator extends AbstractIterator> -implements ManagedKeyValueIterator { - -// RocksDB's JNI interface does not expose getters/setters that allow the -// comparator to be pluggable, and the default is lexicographic, so it's -// safe to just force lexicographic comparator here for now. +/** + * A range-based iterator for RocksDB that merges results from two column families. + * + * This iterator supports traversal over two RocksDB column families: one containing timestamped values and + * another containing non-timestamped values. It ensures that the keys from both column families are merged and + * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range + * boundaries. + * + * Key Features + * + * + * Merges results from the "with-timestamp" and "no-timestamp" column families. + * Supports range-based queries with open or closed boundaries. + * Handles both forward and reverse iteration seamlessly. + * Ensures correct handling of inclusive and exclusive upper boundaries. + * Integrates efficiently with Kafka Streams state store mechanisms. + * + * + * Usage + * + * The iterator can be used for different types of range-based operations, such as: + * + * Iterating over all keys within a range. + * Prefix-based scans (when combined with dynamically calculated range endpoints). + * Open-ended range queries (e.g., from a given key to the end of the dataset). + * + * + * + * Implementation Details + * + * The class extends {@link AbstractIterator} and implements {@link ManagedKeyValueIterator}. It uses RocksDB's + * native iterators for efficient traversal of keys within the specified range. Keys from the two column families + * are merged during iteration, ensuring proper order and de-duplication where applicable. + * + * Key Methods: + * + * + * {@code makeNext()}: Retrieves the next key-value pair in the merged range, ensuring + * the result is within the specified range and boundary conditions. + * {@code initializeIterators()}: Initializes the RocksDB iterators based on the specified range and direction. + * {@code isInRange()}: Verifies if the current key-value pair is within the range defined by {@code from} and {@code to}. + * {@code fetchNextKeyValue()}: Determines the next key-value pair to return based on the state of both iterators. + * + * + * Thread Safety: + * + * The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple + * threads without external synchronization. + * + * Examples + * + * Iterate over a range: + * + * {@code + * RocksIterator noTimestampIterator = accessor.newIterator(noTimestampColumnFamily); + * RocksIterator withTimestampIterator = accessor.newIterator(withTimestampColumnFamily); + * + * try (RocksDBDualCFRangeIterator iterator = new RocksDBDualCFRangeIterator( + * new Bytes("keyStart".getBytes()), + * new Bytes("keyEnd".getBytes()), + * noTimestampIterator, + * withTimestampIterator, + * "storeName", + * true, // Forward iteration + * true // Inclusive upper boundary + * )) { + * while (iterator.hasNext()) { + * KeyValue entry = iterator.next(); + * System.out.println("Key: " + entry.key + ", Value: " + Arrays.toString(entry.value)); + * } + * } + * } + * + * Exceptions + * + * + * {@link InvalidStateStoreException}: Thrown if the iterator is accessed after being closed. + * {@link IllegalStateException}: Thrown if the close callback is not properly set before usage. + * + * + * @see AbstractIterator + * @see ManagedKeyValueIterator + * @see RocksDBStore + */ +private static class RocksDBDualCFRangeIterator extends AbstractIterator> implements ManagedKeyValueIterator { +private Runnable closeCallback; +private byte[] noTimestampNext; +private byte[] withTimestampNext; private final Comparator comparator = Bytes.BYTES_LEXICO_COMPARATOR; - +private final RocksIterator noTimestampIterator; +private final RocksIterator withTimestampIterator; private final String storeName; -private final RocksIterator iterWithTimestamp; -private final RocksIter
Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]
fonsdant commented on code in PR #18610: URL: https://github.com/apache/kafka/pull/18610#discussion_r2017793076 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -277,197 +275,361 @@ public void close() { } } -private class RocksDBDualCFIterator extends AbstractIterator> -implements ManagedKeyValueIterator { - -// RocksDB's JNI interface does not expose getters/setters that allow the -// comparator to be pluggable, and the default is lexicographic, so it's -// safe to just force lexicographic comparator here for now. +/** + * A range-based iterator for RocksDB that merges results from two column families. + * + * This iterator supports traversal over two RocksDB column families: one containing timestamped values and + * another containing non-timestamped values. It ensures that the keys from both column families are merged and + * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range + * boundaries. + * + * Key Features + * + * + * Merges results from the "with-timestamp" and "no-timestamp" column families. + * Supports range-based queries with open or closed boundaries. + * Handles both forward and reverse iteration seamlessly. + * Ensures correct handling of inclusive and exclusive upper boundaries. + * Integrates efficiently with Kafka Streams state store mechanisms. + * + * + * Usage + * + * The iterator can be used for different types of range-based operations, such as: + * + * Iterating over all keys within a range. + * Prefix-based scans (when combined with dynamically calculated range endpoints). + * Open-ended range queries (e.g., from a given key to the end of the dataset). + * + * + * + * Implementation Details + * + * The class extends {@link AbstractIterator} and implements {@link ManagedKeyValueIterator}. It uses RocksDB's + * native iterators for efficient traversal of keys within the specified range. Keys from the two column families + * are merged during iteration, ensuring proper order and de-duplication where applicable. + * + * Key Methods: + * + * + * {@code makeNext()}: Retrieves the next key-value pair in the merged range, ensuring + * the result is within the specified range and boundary conditions. + * {@code initializeIterators()}: Initializes the RocksDB iterators based on the specified range and direction. + * {@code isInRange()}: Verifies if the current key-value pair is within the range defined by {@code from} and {@code to}. + * {@code fetchNextKeyValue()}: Determines the next key-value pair to return based on the state of both iterators. + * + * + * Thread Safety: + * + * The iterator is thread-safe for sequential operations but should not be accessed concurrently from multiple + * threads without external synchronization. Review Comment: Updated. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -277,197 +275,361 @@ public void close() { } } -private class RocksDBDualCFIterator extends AbstractIterator> -implements ManagedKeyValueIterator { - -// RocksDB's JNI interface does not expose getters/setters that allow the -// comparator to be pluggable, and the default is lexicographic, so it's -// safe to just force lexicographic comparator here for now. +/** + * A range-based iterator for RocksDB that merges results from two column families. + * + * This iterator supports traversal over two RocksDB column families: one containing timestamped values and + * another containing non-timestamped values. It ensures that the keys from both column families are merged and + * sorted lexicographically, respecting the iteration order (forward or reverse) and the specified range + * boundaries. + * + * Key Features + * + * + * Merges results from the "with-timestamp" and "no-timestamp" column families. + * Supports range-based queries with open or closed boundaries. + * Handles both forward and reverse iteration seamlessly. + * Ensures correct handling of inclusive and exclusive upper boundaries. + * Integrates efficiently with Kafka Streams state store mechanisms. + * + * + * Usage + * + * The iterator can be used for different types of range-based operations, such as: + * + * Iterating over all keys within a range. + * Prefix-based scans (when combined with dynamically calculated range endpoints). + * Open-ended range queries (e.g., from a given key to the end of the dataset). +
Re: [PR] MINOR: Add 4.0.0 to streams system tests [kafka]
dajac merged PR #19239: URL: https://github.com/apache/kafka/pull/19239 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]
gongxuanzhang commented on code in PR #19226: URL: https://github.com/apache/kafka/pull/19226#discussion_r2017802756 ## server-common/src/main/java/org/apache/kafka/server/common/DelayedDeleteRecords.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.purgatory.DelayedOperation; + +import com.yammer.metrics.core.Meter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * A delayed delete records operation that can be created by the replica manager and watched + * in the delete records operation purgatory + */ +public class DelayedDeleteRecords extends DelayedOperation { + +private static final Logger LOG = LoggerFactory.getLogger(DelayedDeleteRecords.class); Review Comment: I don't think changing the name will affect anything. Do we have a special need to keep the name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR UnifiedLog topic-partition as attribute as it is stable [kafka]
github-actions[bot] commented on PR #19253: URL: https://github.com/apache/kafka/pull/19253#issuecomment-2760076574 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18877: Add an mechanism to find cases where we accessed variables from the wrong thread. [kafka]
github-actions[bot] commented on PR #19231: URL: https://github.com/apache/kafka/pull/19231#issuecomment-2760076628 A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the `triage` label should be removed to prevent this automation from happening again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18068: Fixing typo in ProducerConfig [kafka]
github-actions[bot] commented on PR #17908: URL: https://github.com/apache/kafka/pull/17908#issuecomment-2760101395 This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7516: Attempt to dynamically load ManagementFactory class [kafka]
github-actions[bot] commented on PR #17969: URL: https://github.com/apache/kafka/pull/17969#issuecomment-2760101431 This PR has been closed since it has not had any activity in 120 days. If you feel like this was a mistake, or you would like to continue working on it, please feel free to re-open the PR and ask for a review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-7516: Attempt to dynamically load ManagementFactory class [kafka]
github-actions[bot] closed pull request #17969: KAFKA-7516: Attempt to dynamically load ManagementFactory class URL: https://github.com/apache/kafka/pull/17969 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18068: Fixing typo in ProducerConfig [kafka]
github-actions[bot] closed pull request #17908: KAFKA-18068: Fixing typo in ProducerConfig URL: https://github.com/apache/kafka/pull/17908 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17423 Trie implementation [kafka]
github-actions[bot] commented on PR #17087: URL: https://github.com/apache/kafka/pull/17087#issuecomment-2760101352 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-3370) Add options to auto.offset.reset to reset offsets upon initialization only
[ https://issues.apache.org/jira/browse/KAFKA-3370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939097#comment-17939097 ] Matthias J. Sax commented on KAFKA-3370: Nobody is currently working on this AFAIK. > Add options to auto.offset.reset to reset offsets upon initialization only > -- > > Key: KAFKA-3370 > URL: https://issues.apache.org/jira/browse/KAFKA-3370 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Vahid Hashemian >Priority: Major > Labels: needs-kip > > Currently "auto.offset.reset" is applied in the following two cases: > 1) upon starting the consumer for the first time (hence no committed offsets > before); > 2) upon fetching offsets out-of-range. > For scenarios where case 2) needs to be avoid (i.e. people need to be > notified upon offsets out-of-range rather than silently offset reset), > "auto.offset.reset" need to be set to "none". However for case 1) setting > "auto.offset.reset" to "none" will cause NoOffsetForPartitionException upon > polling. And in this case, seekToBeginning/seekToEnd is mistakenly applied > trying to set the offset at initialization, which are actually designed for > during the life time of the consumer (in rebalance callback, for example). > The fix proposal is to add two more options to "auto.offset.reset", > "earliest-on-start", and "latest-on-start", whose semantics are "earliest" > and "latest" for case 1) only, and "none" for case 2). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19050) kafka-streams-integration-tests artifact is empty
[ https://issues.apache.org/jira/browse/KAFKA-19050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939098#comment-17939098 ] Matthias J. Sax commented on KAFKA-19050: - There was always a desire for some public test artifacts. Re-using this module does not seems to be the right way though, but we should do a proper KIP, and have a prober module for it, if we really want to do this (what is a larger/heavy lift...) I agree that publishing and empty jar does not make sense, and we should fix the build script to exclude this artifact if possible. We should mark this ticket as blocker for 4.0.1 and 4.1.0 releases to fix it on time. Note, that the actually tests (including `EmbeddedKafkaCluster`) should be published as "test" jar, cf [kafka-streams-integration-tests-4.0.0-test.jar|https://repo1.maven.org/maven2/org/apache/kafka/kafka-streams-integration-tests/4.0.0/kafka-streams-integration-tests-4.0.0-test.jar] – so what you what is already there. > kafka-streams-integration-tests artifact is empty > - > > Key: KAFKA-19050 > URL: https://issues.apache.org/jira/browse/KAFKA-19050 > Project: Kafka > Issue Type: Improvement > Components: build, streams >Affects Versions: 4.0.0 >Reporter: Utku Aydin >Priority: Major > > Not sure whether this was intended or not but currently the release process > releases an artifact with an empty jar containing only the manifest for the > kafka-streams-integration-tests module. Since these tests are now in their > own module, I think it's an opportunity to make them available to end-users > as well. Personally, I would like to use EmbeddedKafkaCluster from > org.apache.kafka.streams.integration.utils for my own integration tests since > io.github.embeddedkafka is no longer being maintained. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10409: Refactor Kakfa Streams RocksDB Iterators [kafka]
fonsdant commented on PR #18610: URL: https://github.com/apache/kafka/pull/18610#issuecomment-2759949221 @agavra, thanks for reviewing! I have push some commits :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17078: Add SecurityManagerCompatibility shim (#16522) [kafka]
showuon commented on PR #19221: URL: https://github.com/apache/kafka/pull/19221#issuecomment-2760036097 Let's wait until we have consensus in the community. I've replied in this thread: https://lists.apache.org/thread/6k942pphowd28dh9gn6xbnngk6nxs3n0 . @gharris1727 , it'd be good if you could also comment on that thread. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`
[ https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939117#comment-17939117 ] Lan Ding commented on KAFKA-19024: -- In the current implementation, the `group.share.max.groups` is only used when setting the maxEntries for the ShareSessionCache. Perhaps we could introduce a new error type `MAX_SHARE_GROUP_SIZE_REACHED`. When processing a Heartbeat request, if the group ID does not exist and the `group.share.max.groups` limit is reached, this exception would be thrown. Clients could then catch this exception and handle it accordingly (e.g., logging an error message). Do you think this approach is feasible? > Enhance the client behaviour when it tries to exceed the > `group.share.max.groups` > - > > Key: KAFKA-19024 > URL: https://issues.apache.org/jira/browse/KAFKA-19024 > Project: Kafka > Issue Type: Sub-task >Reporter: Sanskar Jhajharia >Assignee: Lan Ding >Priority: Minor > > For share groups we use the `group.share.max.groups` config to define the > number of max share groups we allow. However, when we exceed the same, the > client logs do not specify any such error and simply do not consume. The > group doesn't get created but the client continues to send Heartbeats hoping > for one of the existing groups to shut down and allowing it to form a group. > Having a log or an exception in the client logs will help them debug such > situations accurately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17541) Improve handling of delivery count
[ https://issues.apache.org/jira/browse/KAFKA-17541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-17541: Assignee: Lan Ding (was: Andrew Schofield) > Improve handling of delivery count > -- > > Key: KAFKA-17541 > URL: https://issues.apache.org/jira/browse/KAFKA-17541 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Lan Ding >Priority: Major > > There are two situations in which the delivery count handling needs to be > more intelligent. > First, for records which are automatically released as a result of closing a > share session normally, the delivery count should not be incremented. These > records were fetched but they were not actually delivered to the client since > the disposition of the delivery records is carried in the ShareAcknowledge > which closes the share session. Any remaining records were not delivered, > only fetched. > Second, for records which have a delivery count which is more than 1 or 2, > there is a suspicion that the records are not being delivered due to a > problem rather than just natural retrying. The batching of these records > should be reduced, even down to a single record as a time so we do not have > the failure to deliver a poisoned record actually causing adjacent records to > be considered unsuccessful and potentially reach the delivery count limit > without proper reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19048) Minimal Movement Replica Balancing algorithm
[ https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jialun Peng updated KAFKA-19048: Description: h2. Motivation Kafka clusters typically require rebalancing of topic replicas after horizontal scaling to evenly distribute the load across new and existing brokers. The current rebalancing approach does not consider the existing replica distribution, often resulting in excessive and unnecessary replica movements. These unnecessary movements increase rebalance duration, consume significant bandwidth and CPU resources, and potentially disrupt ongoing production and consumption operations. Thus, a replica rebalancing strategy that minimizes movements while achieving an even distribution of replicas is necessary. h2. Goals The proposed approach prioritizes the following objectives: # {*}Minimal Movement{*}: Minimize the number of replica relocations during rebalancing. # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed across brokers. # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent failover traffic concentration in case of broker failures. h2. Proposed Changes h3. Rack-Level Replica Distribution The following rules ensure balanced replica allocation at the rack level: # *When* {{{}*rackCount = replicationFactor*{}}}: * Each rack receives exactly {{partitionCount}} replicas. * *2. *When* {{{}*rackCount > replicationFactor*{}}}: * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. * If weighted allocation {{{}< partitionCount{}}}: distribute remaining replicas using a weighted remainder allocation. h3. Node-Level Replica Distribution # If the number of replicas assigned to a rack is not a multiple of the number of nodes in that rack, some nodes will host one additional replica compared to others. # *When* {{{}*rackCount = replicationFactor*{}}}: * If all racks have an equal number of nodes, each node will host an equal number of replicas. * If rack sizes vary, nodes in larger racks will host fewer replicas on average. * *3. *When* {{{}*rackCount > replicationFactor*{}}}: * If no rack has a significantly higher node weight, replicas will be evenly distributed. * If a rack has disproportionately high node weight, those nodes will receive fewer replicas. h3. Anti-Affinity Support When anti-affinity is enabled, the rebalance algorithm ensures that replicas of the same partition do not colocate on the same rack. Brokers without rack configuration are excluded from anti-affinity checks. In this way we can unify the implementation logic of rack-aware and non-rack-aware. *Replica Balancing* *Algorithm* Through the above steps, we can calculate the ideal replica count for each node and rack. Based on the initial replica distribution of topics, we obtain the current replica partition allocation across nodes and racks, allowing us to identify which nodes violate anti-affinity rules. We iterate through nodes with the following priority: # First process nodes that violate anti-affinity rules # Then process nodes whose current replica count exceeds the desired replica count (prioritizing those with the largest discrepancy) For these identified nodes, we relocate their replicas to target nodes that: * Satisfy all anti-affinity constraints * Have a current replica count below their ideal allocation This process continues iteratively until: * No nodes violate anti-affinity rules * All nodes' current replica counts match their desired replica counts Upon satisfying these conditions, we achieve balanced replica distribution across nodes. *Leader* *Balancing* *Algorithm* *Target Leader Calculation:* Compute baseline average: {{leader_avg = total_partitions / total_nodes}} Identify broker where {{{}replica_count ≤ leader_avg{}}}: * Designate all replicas as leaders on these brokers * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}} * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= processed_brokers{}}} Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy {{replica_count ≥ leader_avg}} *Leader Assignment Constraints:* Final targets: * Light {{{}brokers{}}}: {{target_leaders = replica_count}} * Normal {{{}broker{}}}s: {{target_leaders = leader_avg}} For each partition, select the {{broker}} with the largest difference between its {{{}target_leaders }}and current leader count to become that partition's leader. Upon completing this traversal, we achieve uniform leader distribution across all brokers{}}}. *Optimizing ISR Order* During Leader Rebalancing, the
Re: [PR] KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state [kafka]
chia7712 commented on PR #19223: URL: https://github.com/apache/kafka/pull/19223#issuecomment-2752123826 I try to cherry-pick https://github.com/apache/kafka/commit/4a8a0637e07734779b40ba9785842311144f922c to 4.0, but there are some conflicts. @jsancio do you have free cycle to file PR to cherry-pick it to 4.0? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random
[ https://issues.apache.org/jira/browse/KAFKA-9904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorcan reassigned KAFKA-9904: - Assignee: Lorcan > Use ThreadLocalConcurrent to Replace Random > --- > > Key: KAFKA-9904 > URL: https://issues.apache.org/jira/browse/KAFKA-9904 > Project: Kafka > Issue Type: Improvement >Reporter: David Mollitor >Assignee: Lorcan >Priority: Trivial > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14485) Move LogCleaner to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938848#comment-17938848 ] Chia-Ping Tsai commented on KAFKA-14485: [~javakillah] that is a good question. we have different ways to handle the dynamical config in code base. 1. using the self-updated KafkaConfig, likes `ReplicaManager` 2. using the reconfigurable interface to refresh the inner config, likes `LogCleaner` Personally, I don't like the self-updated KafkaConfig, since it creates complicated dependencies and god object. Go back to your question. I prefer to try following changes. 1. move the cleaner-related getters from KafkaConfig to `CleanerConfig` 2. temporarily allow `KafkaConfig` to create `CleanerConfig` to access the getters for some config 3. CleanerConfig constructor can take `AbstractConfig` to initialize all variables In order to address above changes, we can move `org.apache.kafka.server.config.BrokerReconfigurable` to server-common module and change the "AbstractKafkaConfig" to "AbstractConfig". Of course, we need to do a bit refactor for `DynamicProducerStateManagerConfig` > Move LogCleaner to storage module > - > > Key: KAFKA-14485 > URL: https://issues.apache.org/jira/browse/KAFKA-14485 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Dmitry Werner >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
chia7712 commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016068726 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Excuse me, is it possible that `fetch.takeAcknowledgedRecords()` returns non empty records when `fetch.isEmpty` is true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19036: Rewrite LogAppendTimeTest and move it to storage module [kafka]
FrankYang0529 commented on PR #19282: URL: https://github.com/apache/kafka/pull/19282#issuecomment-2757324418 @chia7712 Thanks for the review. I addressed all comments and CI passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19004:Move DelayedDeleteRecords to server-common module [kafka]
chia7712 commented on code in PR #19226: URL: https://github.com/apache/kafka/pull/19226#discussion_r2016093160 ## server-common/src/main/java/org/apache/kafka/server/common/DelayedDeleteRecords.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; Review Comment: +1 to `org.apache.kafka.server.purgatory` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19032: Remove TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames [kafka]
FrankYang0529 commented on PR #19270: URL: https://github.com/apache/kafka/pull/19270#issuecomment-2757323381 @chia7712 Thanks for the reminder. I fix conflicts and CI passes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-18902) Implement ShareConsumer option to throw on poll if there are unacked records
[ https://issues.apache.org/jira/browse/KAFKA-18902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield reassigned KAFKA-18902: Assignee: Andrew Schofield (was: Shivsundar R) > Implement ShareConsumer option to throw on poll if there are unacked records > > > Key: KAFKA-18902 > URL: https://issues.apache.org/jira/browse/KAFKA-18902 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 4.1.0 > > > Currently, an application using explicit acknowledgement which neglects to > acknowledge all of the records received from calling `poll(Duration)` is > re-presented with the records on the next call to poll. This has been shown > to be confusing. > An option will be added, `share.acknowledgement.mode=explicit` to throw an > exception on poll instead. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19049: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base [kafka]
Rancho-7 commented on PR #19299: URL: https://github.com/apache/kafka/pull/19299#issuecomment-2757438387 > @Rancho-7 please cleanup `SaslApiVersionsRequestTest` and `StaticBrokerConfigTest` too Thanks for pointing out! Will fix it soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]
TaiJuWu commented on code in PR #19277: URL: https://github.com/apache/kafka/pull/19277#discussion_r2016120749 ## storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BrokerCompressionTest { +private final File tmpDir = TestUtils.tempDirectory(); +private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); +private final MockTime time = new MockTime(0, 0); + +@AfterEach +public void tearDown() throws IOException { +Utils.delete(tmpDir); +} + +/** + * Test broker-side compression configuration + */ +@ParameterizedTest +@MethodSource("allCompressionParameters") +public void testBrokerSideCompression(CompressionType messageCompressionType, BrokerCompressionType brokerCompressionType) throws IOException { +Compression messageCompression = Compression.of(messageCompressionType).build(); + +/* Configure broker-side compression */ +UnifiedLog log = UnifiedLog.create( Review Comment: Thanks for catching it. Use `try-resource` to close log. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are acknowledged with GAP (indicating the client is ignoring these control records) and are never presented to the consumer application. - Now these control records are auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now for this PR, we have added any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` so that it can be sent on the next poll(). - I agree it looks a bit odd though for readability. But yeah there is a case when this could happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are usually auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now for this PR, we have added any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` so that it can be sent on the next poll(). - I agree it looks a bit odd though for readability. But yeah there is a case when this could happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are usually auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now for this PR, we have added any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` so that it can be sent on the next poll(). - I agree it looks a bit odd though for readability. But yeah there is a case when this could happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]
dajac commented on PR #19281: URL: https://github.com/apache/kafka/pull/19281#issuecomment-2757464241 > nit: It appears `ShareGroupHeartbeatRequestTest` also requires cleanup. However, it's acceptable to leave it as is - or we can fix it in the follow-up Removed it and few others related to Share requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are usually auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now after this PR, any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` are added so that it can be sent on the next `poll()`. - We cannot present these to the application, so the check for `fetch.isEmpty` cannot be altered. But yeah there is a case when this could happen. I agree it looks a bit odd though for readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Kind of :)) So it would have empty records but could have non-empty acknowledgements (for skipped records). - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, so the records never arrive to the application thread, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are usually auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now after this PR, any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` are added so that it can be sent on the next `poll()`. - We cannot present these to the application, so the check for `fetch.isEmpty` cannot be altered. But yeah there is a case when this could happen. I agree it looks a bit odd though for readability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]
dajac commented on code in PR #19281: URL: https://github.com/apache/kafka/pull/19281#discussion_r2015985621 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala: ## @@ -37,15 +37,17 @@ import java.lang.{Byte => JByte} import java.util.Collections import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1) +@ClusterTestDefaults( + types = Array(Type.KRAFT), + brokers = 1, + serverProperties = Array( +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) +) Review Comment: Interesting. I was not aware of this. Let me remove them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19042: [4/N] Move PlaintextConsumerCallbackTest to client-integration-tests module [kafka]
frankvicky commented on code in PR #19298: URL: https://github.com/apache/kafka/pull/19298#discussion_r2015873494 ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java: ## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.callback; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG; +import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC; +import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ClusterTestDefaults( +types = {Type.KRAFT}, +brokers = 3 +) +@ExtendWith(ClusterTestExtensions.class) +public class PlaintextConsumerCallbackTest { + +private final ClusterInstance cluster; +private final String topic = "topic"; +private final TopicPartition tp = new TopicPartition(topic, 0); + +public PlaintextConsumerCallbackTest(ClusterInstance clusterInstance) { +this.cluster = clusterInstance; +} + +@ClusterTest +public void testConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { +try (var consumer = createConsumer(CLASSIC)) { +triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { +var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); +assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); +}); +} +} + +@ClusterTest +public void testAsyncConsumerRebalanceListenerAssignOnPartitionsAssigned() throws InterruptedException { +try (var consumer = createConsumer(CONSUMER)) { +triggerOnPartitionsAssigned(tp, consumer, (executeConsumer, partitions) -> { +var e = assertThrows(IllegalStateException.class, () -> executeConsumer.assign(List.of(tp))); +assertEquals("Subscription to topics, partitions and pattern are mutually exclusive", e.getMessage()); +}); +} +} + +@ClusterTest +public void testConsumerRebalanceListenerAssignmentOnPartitionsAssigned() throws InterruptedException { Review Comment: You have `testAsyncConsumer...` above, so I suggest you also name it `testClassicConsumer...` also ## clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/callback/PlaintextConsumerCallbackTest.java: ## @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + *
Re: [PR] MINOR: migrate BrokerCompressionTest to storage module [kafka]
chia7712 commented on code in PR #19277: URL: https://github.com/apache/kafka/pull/19277#discussion_r2015807142 ## storage/src/test/java/org/apache/kafka/storage/internals/log/BrokerCompressionTest.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.SimpleRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.transaction.TransactionLogConfig; +import org.apache.kafka.server.record.BrokerCompressionType; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.util.MockTime; +import org.apache.kafka.storage.log.metrics.BrokerTopicStats; +import org.apache.kafka.test.TestUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class BrokerCompressionTest { +private final File tmpDir = TestUtils.tempDirectory(); +private final File logDir = TestUtils.randomPartitionLogDir(tmpDir); +private final MockTime time = new MockTime(0, 0); + +@AfterEach +public void tearDown() throws IOException { +Utils.delete(tmpDir); +} + +/** + * Test broker-side compression configuration + */ +@ParameterizedTest +@MethodSource("allCompressionParameters") +public void testBrokerSideCompression(CompressionType messageCompressionType, BrokerCompressionType brokerCompressionType) throws IOException { +Compression messageCompression = Compression.of(messageCompressionType).build(); + +/* Configure broker-side compression */ +UnifiedLog log = UnifiedLog.create( Review Comment: Should we close the `log`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]
dajac commented on code in PR #19281: URL: https://github.com/apache/kafka/pull/19281#discussion_r2015820533 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala: ## @@ -37,15 +37,17 @@ import java.lang.{Byte => JByte} import java.util.Collections import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1) +@ClusterTestDefaults( + types = Array(Type.KRAFT), + brokers = 1, + serverProperties = Array( +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) +) Review Comment: Totally. Missed that one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-19049) Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base
Chia-Ping Tsai created KAFKA-19049: -- Summary: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base Key: KAFKA-19049 URL: https://issues.apache.org/jira/browse/KAFKA-19049 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai KAFKA-18671 introduced the mechanism to inject the cluster test at runtime, so the integration tests don't need to use @ExtendWith(ClusterTestExtensions.class) any more -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19036: Rewrite LogAppendTimeTest and move it to storage module [kafka]
chia7712 commented on code in PR #19282: URL: https://github.com/apache/kafka/pull/19282#discussion_r2015816441 ## storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(ClusterTestExtensions.class) Review Comment: this is unnecessary now. see https://issues.apache.org/jira/browse/KAFKA-19049 ## storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.common.test.junit.ClusterTestExtensions; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(ClusterTestExtensions.class) +public class LogAppendTimeTest { +@ClusterTest( +types = {Type.KRAFT}, +brokers = 2, Review Comment: it seems we don't need to create 2 broke
Re: [PR] KAFKA-14486: Move LogCleanerManager to storage module [kafka]
wernerdv commented on PR #19216: URL: https://github.com/apache/kafka/pull/19216#issuecomment-2756859134 @chia7712 @junrao @mimaison @frankvicky Thanks for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Some cleanups in group coordinator's intergration tests [kafka]
chia7712 commented on code in PR #19281: URL: https://github.com/apache/kafka/pull/19281#discussion_r2015826951 ## core/src/test/scala/unit/kafka/server/ConsumerGroupDescribeRequestTest.scala: ## @@ -37,15 +37,17 @@ import java.lang.{Byte => JByte} import java.util.Collections import scala.jdk.CollectionConverters._ -@ClusterTestDefaults(types = Array(Type.KRAFT), brokers = 1) +@ClusterTestDefaults( + types = Array(Type.KRAFT), + brokers = 1, + serverProperties = Array( +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"), +new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1") + ) +) Review Comment: we don't need to add `@Tag("integration")` to the class, as `@ClusterTest` automatically add the `@Tag("integration")` to the test case (method). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-19049) Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base
[ https://issues.apache.org/jira/browse/KAFKA-19049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nick Guo reassigned KAFKA-19049: Assignee: Nick Guo (was: Chia-Ping Tsai) > Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base > > > Key: KAFKA-19049 > URL: https://issues.apache.org/jira/browse/KAFKA-19049 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Nick Guo >Priority: Minor > > KAFKA-18671 introduced the mechanism to inject the cluster test at runtime, > so the integration tests don't need to use > @ExtendWith(ClusterTestExtensions.class) any more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-19049) Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base
[ https://issues.apache.org/jira/browse/KAFKA-19049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17938828#comment-17938828 ] Nick Guo commented on KAFKA-19049: -- Hi [~chia7712] ,I would like to take this issue.Thanks! > Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base > > > Key: KAFKA-19049 > URL: https://issues.apache.org/jira/browse/KAFKA-19049 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > KAFKA-18671 introduced the mechanism to inject the cluster test at runtime, > so the integration tests don't need to use > @ExtendWith(ClusterTestExtensions.class) any more -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18067: Add a flag to disable producer reset during active task creator shutting down [kafka]
frankvicky commented on PR #19269: URL: https://github.com/apache/kafka/pull/19269#issuecomment-2757028923 Hi @ableegoldman, > can you look into adding a test to make sure that the producer does still get reset/recreated properly if the producer is reset (eg transaction hits a timeout exception) but the StreamThread is not shutting down? I've investigated the related code and found writing a test for this specific scenario challenging. The main difficulty is that `TaskManager` doesn't control the timeout logic. While `TaskManager#handleLostAll` invokes `ActiveTaskCreator#reInitializeProducer`, this method only triggers when a `TaskMigratedException` occurs. The entire process is managed by `StreamThread`. The main pain point in writing this test lies in the deeply nested component structure: `StreamsProducer` is a member of `ActiveTaskCreator`, which is a member of `TaskManager`, which is ultimately controlled by `StreamThread`. When using mocks, this multi-layered nesting makes testing extremely complex. We typically mock outer components, but this makes it difficult to directly access and verify the reset behavior of the inner `StreamsProducer`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-19049: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base [kafka]
Rancho-7 opened a new pull request, #19299: URL: https://github.com/apache/kafka/pull/19299 jira: https://issues.apache.org/jira/browse/KAFKA-19049 [KAFKA-18671](https://issues.apache.org/jira/browse/KAFKA-18671) introduced the mechanism to inject the cluster test at runtime, so the integration tests don't need to use `@ExtendWith(ClusterTestExtensions.class)` any more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Revert migrate LogFetchInfo, Assignment and RequestAndCompletionHandler to java record [kafka]
chia7712 commented on code in PR #19177: URL: https://github.com/apache/kafka/pull/19177#discussion_r2016096742 ## server/src/main/java/org/apache/kafka/server/Assignment.java: ## @@ -24,25 +24,62 @@ import org.apache.kafka.metadata.Replicas; import org.apache.kafka.server.common.TopicIdPartition; -/** - * @param topicIdPartition The topic ID and partition index of the replica. - * @param directoryId The ID of the directory we are placing the replica into. - * @param submissionTimeNs The time in monotonic nanosecond when this assignment was created. - * @param successCallback The callback to invoke on success. - */ -record Assignment( -TopicIdPartition topicIdPartition, -Uuid directoryId, -long submissionTimeNs, -Runnable successCallback -) { +final class Assignment { Review Comment: ditto ## raft/src/main/java/org/apache/kafka/raft/LogFetchInfo.java: ## @@ -21,4 +21,13 @@ /** * Metadata for the records fetched from log, including the records itself */ -public record LogFetchInfo(Records records, LogOffsetMetadata startOffsetMetadata) { } +public class LogFetchInfo { Review Comment: Could you please leave comments to explain why we don't use record class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17830) Cover unit tests for TBRLMM init failure scenarios
[ https://issues.apache.org/jira/browse/KAFKA-17830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17830. Fix Version/s: 4.1.0 Assignee: PoAn Yang (was: Anshul Goyal) Resolution: Fixed > Cover unit tests for TBRLMM init failure scenarios > -- > > Key: KAFKA-17830 > URL: https://issues.apache.org/jira/browse/KAFKA-17830 > Project: Kafka > Issue Type: Task >Reporter: Kamal Chandraprakash >Assignee: PoAn Yang >Priority: Minor > Labels: new-bie > Fix For: 4.1.0 > > > [TopicBasedRemoteLogMetadataManagerTest|https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java] > does not cover initialization failure scenarios, it will be good to cover > those cases with unit tests. > See: [https://github.com/apache/kafka/pull/17492#issuecomment-2422144959] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Add check in ShareConsumerImpl to send acknowledgements of control records when ShareFetch is empty. [kafka]
ShivsundarR commented on code in PR #19295: URL: https://github.com/apache/kafka/pull/19295#discussion_r2016129362 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java: ## @@ -644,8 +645,12 @@ private ShareFetch collect(Map ack if (currentFetch.isEmpty()) { final ShareFetch fetch = fetchCollector.collect(fetchBuffer); if (fetch.isEmpty()) { +// Check for any acknowledgements which could have come from control records (GAP) and include them. +Map combinedAcknowledgements = new LinkedHashMap<>(acknowledgementsMap); + combinedAcknowledgements.putAll(fetch.takeAcknowledgedRecords()); + // Fetch more records and send any waiting acknowledgements -applicationEventHandler.add(new ShareFetchEvent(acknowledgementsMap)); +applicationEventHandler.add(new ShareFetchEvent(combinedAcknowledgements)); Review Comment: Yes :)) turns out it can. - Some integ tests in this PR - https://github.com/apache/kafka/pull/19261 revealed that in transactions, when client receives only a control record(eg. an abort marker) in the `ShareFetchResponse` (without any non-control record), then in the `ShareCompletedFetch`, these control records are never acknowledged(ideally acknowledged with GAP, indicating the client is ignoring these control records) and are never presented to the consumer application. - It is expected that control records are skipped and are not presented to the application, but client should still acknowledge them with GAP (https://github.com/apache/kafka/blob/84b8fec089682486aa9827a3baffa2513118ce6d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Acknowledgements.java#L33) - Now these control records are usually auto acknowledged with `GAP` and will be sent on the next `ShareFetch`/`ShareAcknowledge` request. But here as `fetch.isEmpty()` only checks for `numRecords() == 0`, when the fetch is empty, we actually ignore the fetch here(meaning we never acknowledge these control records) - https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java#L598 - Now after this PR, any possible acknowledgements that came in with the empty fetch (from control records) to the `ShareFetchEvent` are added so that it can be sent on the next poll(). - I agree it looks a bit odd though for readability. But yeah there is a case when this could happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19036: Rewrite LogAppendTimeTest and move it to storage module [kafka]
FrankYang0529 commented on code in PR #19282: URL: https://github.com/apache/kafka/pull/19282#discussion_r2016935923 ## storage/src/test/java/org/apache/kafka/server/log/LogAppendTimeTest.java: ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.GroupProtocol; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.TestUtils; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.Type; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LogAppendTimeTest { +@ClusterTest( +types = {Type.KRAFT}, +serverProperties = { +@ClusterConfigProperty(key = "log.message.timestamp.type", value = "LogAppendTime"), Review Comment: Yes, add another test case `testProduceConsumeWithConfigOnTopic`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18390: Use LinkedHashMap instead of Map in creating MetricName and SensorBuilder (1/N) [kafka]
dajac commented on PR #19300: URL: https://github.com/apache/kafka/pull/19300#issuecomment-2757903933 @TaiJuWu Thanks for the patch. Out of curiosity, have we checked that jmx metric names are not altered by this change? I think that they are generated based on the order in the data structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19032: Remove TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames [kafka]
chia7712 commented on PR #19270: URL: https://github.com/apache/kafka/pull/19270#issuecomment-2757016872 @FrankYang0529 could you please fix the conflicts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
bbejeck commented on PR #18953: URL: https://github.com/apache/kafka/pull/18953#issuecomment-2758977759 @mjsax comments addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`
[ https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield reassigned KAFKA-19024: Assignee: Lan Ding (was: Andrew Schofield) By all means. Here you go. > Enhance the client behaviour when it tries to exceed the > `group.share.max.groups` > - > > Key: KAFKA-19024 > URL: https://issues.apache.org/jira/browse/KAFKA-19024 > Project: Kafka > Issue Type: Sub-task >Reporter: Sanskar Jhajharia >Assignee: Lan Ding >Priority: Minor > > For share groups we use the `group.share.max.groups` config to define the > number of max share groups we allow. However, when we exceed the same, the > client logs do not specify any such error and simply do not consume. The > group doesn't get created but the client continues to send Heartbeats hoping > for one of the existing groups to shut down and allowing it to form a group. > Having a log or an exception in the client logs will help them debug such > situations accurately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-18971) Update AK system tests for AK 4.0
[ https://issues.apache.org/jira/browse/KAFKA-18971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939041#comment-17939041 ] Matthias J. Sax commented on KAFKA-18971: - Seems this is already done: [https://github.com/apache/kafka/pull/19239] > Update AK system tests for AK 4.0 > - > > Key: KAFKA-18971 > URL: https://issues.apache.org/jira/browse/KAFKA-18971 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 4.1.0 >Reporter: Alieh Saeedi >Assignee: Alieh Saeedi >Priority: Major > Fix For: 4.1.0 > > > Update AK system tests and add new “upgrade_from” version to {{StreamsConfig}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17541) Improve handling of delivery count
[ https://issues.apache.org/jira/browse/KAFKA-17541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939043#comment-17939043 ] Andrew Schofield commented on KAFKA-17541: -- Sure, if the description is clear. I was really expecting this to be in 4.2, but if you can accelerate it into 4.1, go ahead. > Improve handling of delivery count > -- > > Key: KAFKA-17541 > URL: https://issues.apache.org/jira/browse/KAFKA-17541 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > > There are two situations in which the delivery count handling needs to be > more intelligent. > First, for records which are automatically released as a result of closing a > share session normally, the delivery count should not be incremented. These > records were fetched but they were not actually delivered to the client since > the disposition of the delivery records is carried in the ShareAcknowledge > which closes the share session. Any remaining records were not delivered, > only fetched. > Second, for records which have a delivery count which is more than 1 or 2, > there is a suspicion that the records are not being delivered due to a > problem rather than just natural retrying. The batching of these records > should be reduced, even down to a single record as a time so we do not have > the failure to deliver a poisoned record actually causing adjacent records to > be considered unsuccessful and potentially reach the delivery count limit > without proper reason. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alyssa Huang reassigned KAFKA-19047: Assignee: Alyssa Huang > Broker registrations are slow if previously fenced or shutdown > -- > > Key: KAFKA-19047 > URL: https://issues.apache.org/jira/browse/KAFKA-19047 > Project: Kafka > Issue Type: Improvement >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > > BrokerLifecycleManager prevents registration of a broker w/ an id it has seen > before with a different incarnation id if the broker session expires. On > clean shutdown and restart of a broker this can cause an unnecessary delay in > re-registration while the quorum controller waits for the session to expire. > ``` > [BrokerLifecycleManager id=1] Unable to register broker 1 because the > controller returned error DUPLICATE_BROKER_REGISTRATION > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19048) Minimal Movement Replica Balancing algorithm
[ https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jialun Peng updated KAFKA-19048: Labels: pull-request-available (was: ) > Minimal Movement Replica Balancing algorithm > > > Key: KAFKA-19048 > URL: https://issues.apache.org/jira/browse/KAFKA-19048 > Project: Kafka > Issue Type: Improvement > Components: generator >Reporter: Jialun Peng >Assignee: Jialun Peng >Priority: Major > Labels: pull-request-available > > h2. Motivation > Kafka clusters typically require rebalancing of topic replicas after > horizontal scaling to evenly distribute the load across new and existing > brokers. The current rebalancing approach does not consider the existing > replica distribution, often resulting in excessive and unnecessary replica > movements. These unnecessary movements increase rebalance duration, consume > significant bandwidth and CPU resources, and potentially disrupt ongoing > production and consumption operations. Thus, a replica rebalancing strategy > that minimizes movements while achieving an even distribution of replicas is > necessary. > h2. Goals > The proposed approach prioritizes the following objectives: > # {*}Minimal Movement{*}: Minimize the number of replica relocations during > rebalancing. > # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed > across brokers. > # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled. > # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers. > # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent > failover traffic concentration in case of broker failures. > h2. Proposed Changes > h3. Rack-Level Replica Distribution > The following rules ensure balanced replica allocation at the rack level: > # *When* {{{}*rackCount = replicationFactor*{}}}: > * Each rack receives exactly {{partitionCount}} replicas. > ** 2. *When* {{{}*rackCount > replicationFactor*{}}}: > * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥ > partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas. > * If weighted allocation {{{}< partitionCount{}}}: distribute remaining > replicas using a weighted remainder allocation. > h3. Node-Level Replica Distribution > # If the number of replicas assigned to a rack is not a multiple of the > number of nodes in that rack, some nodes will host one additional replica > compared to others. > # *When* {{{}*rackCount = replicationFactor*{}}}: > * If all racks have an equal number of nodes, each node will host an equal > number of replicas. > * If rack sizes vary, nodes in larger racks will host fewer replicas on > average. > ** 3. *When* {{{}*rackCount > replicationFactor*{}}}: > * If no rack has a significantly higher node weight, replicas will be evenly > distributed. > * If a rack has disproportionately high node weight, those nodes will > receive fewer replicas. > h3. Anti-Affinity Support > When anti-affinity is enabled, the rebalance algorithm ensures that replicas > of the same partition do not colocate on the same rack. Brokers without rack > configuration are excluded from anti-affinity checks. > In this way we can unify the implementation logic of rack-aware and > non-rack-aware. > > *Replica Balancing* *Algorithm* > Through the above steps, we can calculate the ideal replica count for each > node and rack. > Based on the initial replica distribution of topics, we obtain the current > replica partition allocation across nodes and racks, allowing us to identify > which nodes violate anti-affinity rules. > We iterate through nodes with the following priority: > # First process nodes that violate anti-affinity rules > # Then process nodes whose current replica count exceeds the desired replica > count (prioritizing those with the largest discrepancy) > For these identified nodes, we relocate their replicas to target nodes that: > * Satisfy all anti-affinity constraints > * Have a current replica count below their ideal allocation > This process continues iteratively until: > * No nodes violate anti-affinity rules > * All nodes' current replica counts match their desired replica counts > Upon satisfying these conditions, we achieve balanced replica distribution > across nodes. > > *Leader* *Balancing* *Algorithm* > *Target Leader Calculation:* > Compute baseline average: {{leader_avg = floor(total_partitions / > total_nodes)}} > Identify broker where {{{}replica_count ≤ leader_avg{}}}: > * Designate all replicas as leaders on these brokers > * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}} > * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -= > processed_br
Re: [PR] KAFKA-18935: Ensure brokers do not return null records in FetchResponse [kafka]
frankvicky commented on code in PR #19167: URL: https://github.com/apache/kafka/pull/19167#discussion_r2016857591 ## clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java: ## @@ -89,7 +89,7 @@ public FetchResponseData data() { */ public FetchResponse(FetchResponseData fetchResponseData) { super(ApiKeys.FETCH); -this.data = fetchResponseData; +this.data = convertNullRecordsToEmpty(fetchResponseData); Review Comment: Make sense. It could also avoid overheads for clients. Does this approach also need to apply to `ShareFetchResponse`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add log4j2.yaml to clients-integration-tests module [kafka]
chia7712 commented on code in PR #19252: URL: https://github.com/apache/kafka/pull/19252#discussion_r2006232652 ## clients/clients-integration-tests/src/test/resources/log4j2.yaml: ## @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Configuration: + Properties: +Property: + - name: "logPattern" +value: "[%d] %p %m (%c:%L)%n" + + Appenders: +Console: + name: STDOUT + PatternLayout: +pattern: "${logPattern}" + + Loggers: +Root: + level: ALL Review Comment: maybe `INFO` is good enough. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
bbejeck commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2017246779 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) { () -> new Processor<>() { private KeyValueStore store; Review Comment: True, since we're not processing any records. I've removed it here, declared in the `init` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
bbejeck commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2017220549 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) { () -> new Processor<>() { private KeyValueStore store; +// The store iterator is intentionally not closed here as it needs +// to be open during the test, so the Streams app will emit the +// org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric +// that is expected. So the globalStoreIterator is a global variable +// (pun not intended), so it can be closed in the tearDown method. @Override public void init(final ProcessorContext context) { store = context.getStateStore("iq-test-store"); +globalStoreIterator = store.all(); } @Override public void process(final Record record) { store.put(record.key(), record.value()); Review Comment: without this the test fails, we need it open to get the iterator metrics -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18826: Add global thread metrics [kafka]
bbejeck commented on code in PR #18953: URL: https://github.com/apache/kafka/pull/18953#discussion_r2017246779 ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java: ## @@ -496,16 +483,20 @@ private void addGlobalStore(final StreamsBuilder builder) { () -> new Processor<>() { private KeyValueStore store; Review Comment: True, since we're not processing any records. I've removed it here, declared in the `init` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alyssa Huang updated KAFKA-19047: - Affects Version/s: 4.0.0 > Broker registrations are slow if previously fenced or shutdown > -- > > Key: KAFKA-19047 > URL: https://issues.apache.org/jira/browse/KAFKA-19047 > Project: Kafka > Issue Type: Improvement >Affects Versions: 4.0.0 >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > > BrokerLifecycleManager prevents registration of a broker w/ an id it has seen > before with a different incarnation id if the broker session expires. On > clean shutdown and restart of a broker this can cause an unnecessary delay in > re-registration while the quorum controller waits for the session to expire. > ``` > [BrokerLifecycleManager id=1] Unable to register broker 1 because the > controller returned error DUPLICATE_BROKER_REGISTRATION > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random
[ https://issues.apache.org/jira/browse/KAFKA-9904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939012#comment-17939012 ] Lorcan commented on KAFKA-9904: --- Hi [~belugabehr], is this something that you'd advise should be replaced mostly/completely in the codebase? Or should this be more targeted for the scenarios described in the oracle docs? I've seen some instances where a Random object is initialised with a particular seed, which doesn't seem possible with the ThreadLocalRandom class. It also seems to be used for convenience in some of the Scala files to lazily generate random strings. The reason I ask is due to a lack of experience with distributed systems and so any insight would be helpful. > Use ThreadLocalConcurrent to Replace Random > --- > > Key: KAFKA-9904 > URL: https://issues.apache.org/jira/browse/KAFKA-9904 > Project: Kafka > Issue Type: Improvement >Reporter: David Mollitor >Assignee: Lorcan >Priority: Trivial > > https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-19047: Broker registrations are slow if previously fenced or shutdown [kafka]
splett2 commented on code in PR #19296: URL: https://github.com/apache/kafka/pull/19296#discussion_r2017080238 ## metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java: ## @@ -978,6 +980,129 @@ public void testBrokerContactTimesAreUpdatedOnClusterControlActivation() { contactTime(new BrokerIdAndEpoch(2, 100))); } +@Test +public void testDuplicateBrokerRegistrationWithActiveOldBroker() { +// active here means brokerHeartbeatManager last recorded the broker as unfenced and not in controlled shutdown +long brokerSessionTimeoutMs = 1000; +MockTime time = new MockTime(0L, 20L, 1000L); +FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures( +Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L); +ClusterControlManager clusterControl = new ClusterControlManager.Builder(). +setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). +setFeatureControlManager(createFeatureControlManager()). +setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). + setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)). +setTime(time). +build(); +clusterControl.replay(new RegisterBrokerRecord(). +setBrokerEpoch(100). +setBrokerId(0). +setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))). +setFenced(false), 10002); +clusterControl.activate(); +assertEquals(OptionalLong.of(1000L), clusterControl.heartbeatManager().tracker(). +contactTime(new BrokerIdAndEpoch(0, 100))); + +// while session is still valid for old broker, duplicate requests should fail +time.sleep(brokerSessionTimeoutMs / 2); +assertThrows(DuplicateBrokerRegistrationException.class, () -> +clusterControl.registerBroker(new BrokerRegistrationRequestData(). +setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). +setBrokerId(0). + setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))). +setFeatures(new BrokerRegistrationRequestData.FeatureCollection( +Set.of(new BrokerRegistrationRequestData.Feature(). +setName(MetadataVersion.FEATURE_NAME). + setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()). + setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())). +setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), +101L, +finalizedFeatures, +false)); + +// if session expires for broker, even if the broker was active the new registration will succeed +time.sleep(brokerSessionTimeoutMs); +clusterControl.registerBroker(new BrokerRegistrationRequestData(). +setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). +setBrokerId(0). +setLogDirs(List.of(Uuid.fromString("TyNK6XSSQJaJc2q9uflNHg"))). +setFeatures(new BrokerRegistrationRequestData.FeatureCollection( +Set.of(new BrokerRegistrationRequestData.Feature(). +setName(MetadataVersion.FEATURE_NAME). + setMinSupportedVersion(MetadataVersion.MINIMUM_VERSION.featureLevel()). + setMaxSupportedVersion(MetadataVersion.LATEST_PRODUCTION.featureLevel())).iterator())). +setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")), +101L, +finalizedFeatures, +false); +} + +@Test +public void testDuplicateBrokerRegistrationWithInactiveBroker() { +// inactive here means brokerHeartbeatManager last recorded the broker as fenced or in controlled shutdown +long brokerSessionTimeoutMs = 1000; +MockTime time = new MockTime(0L, 20L, 1000L); +FinalizedControllerFeatures finalizedFeatures = new FinalizedControllerFeatures( +Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.LATEST_PRODUCTION.featureLevel()), 456L); +ClusterControlManager clusterControl = new ClusterControlManager.Builder(). +setClusterId("pjvUwj3ZTEeSVQmUiH3IJw"). +setFeatureControlManager(createFeatureControlManager()). +setBrokerShutdownHandler((brokerId, isCleanShutdown, records) -> { }). + setSessionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(brokerSessionTimeoutMs)). +setTime(time). +build(); +// first broker is fenced +clusterControl.replay(new RegisterBrokerRecord(). +setBrokerEpoch(100). +setBrokerId(0). +setLogDirs(List.of(Uuid.fromString("Mj3CW3OSRi29cFeNJlXuAQ"))). +
Re: [PR] KIP-991 Add deletedConnector flag when stopping tasks [kafka]
hgeraldino closed pull request #13146: KIP-991 Add deletedConnector flag when stopping tasks URL: https://github.com/apache/kafka/pull/13146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16368: segment.bytes constraints to min 1MB [kafka]
junrao commented on PR #18140: URL: https://github.com/apache/kafka/pull/18140#issuecomment-2758776560 @jayteej : Thanks for the PR and sorry for chiming in late. The KIP https://cwiki.apache.org/confluence/display/KAFKA/KIP-1030%3A+Change+constraints+and+default+values+for+various+configurations says that the new constraint applies at the topic level. I tried the following on 4.0/trunk and it seems that the constraint isn't really applied at the topic level? ``` bash-3.2$ bin/kafka-configs.sh --alter --bootstrap-server localhost:9092 --topic test --add-config segment.bytes=1000 Completed updating config for topic test. bash-3.2$ bin/kafka-configs.sh --describe --bootstrap-server localhost:9092 --topic test Dynamic configs for topic test are: segment.bytes=1000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1000, STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-19051) Fix implicit acknowledgement cannot be overriden when RecordDeserializationException occurs
[ https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frédérik ROULEAU updated KAFKA-19051: - Summary: Fix implicit acknowledgement cannot be overriden when RecordDeserializationException occurs (was: Fix implicit acknowledgement cannot be override when RecordDeserializationException occurs) > Fix implicit acknowledgement cannot be overriden when > RecordDeserializationException occurs > --- > > Key: KAFKA-19051 > URL: https://issues.apache.org/jira/browse/KAFKA-19051 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Frédérik ROULEAU >Priority: Major > > When a record generates a RecordDeserializationException, KIP mentioned that > with explicit acknowledgement the default Release can be overridden. > When tried, I have: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: The record cannot > be acknowledged. > at > org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123) > at > org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683) > at > org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534) > at org.example.frouleau.kip932.Main.main(Main.java:62) {code} > It looks like the record was already released. > Code used: > {code:java} > // > } catch (RecordDeserializationException re) { > long offset = re.offset(); > Throwable t = re.getCause(); > LOGGER.error("Failed to deserialize record at partition={} offset={}", > re.topicPartition().partition(), offset, t); > ConsumerRecord record = new > ConsumerRecord<>(re.topicPartition().topic(), > re.topicPartition().partition(), offset, "", ""); > consumer.acknowledge(record, AcknowledgeType.REJECT); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19051) Fix implicit acknowledgement cannot be overridden when RecordDeserializationException occurs
[ https://issues.apache.org/jira/browse/KAFKA-19051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Frédérik ROULEAU updated KAFKA-19051: - Summary: Fix implicit acknowledgement cannot be overridden when RecordDeserializationException occurs (was: Fix implicit acknowledgement cannot be overriden when RecordDeserializationException occurs) > Fix implicit acknowledgement cannot be overridden when > RecordDeserializationException occurs > > > Key: KAFKA-19051 > URL: https://issues.apache.org/jira/browse/KAFKA-19051 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Frédérik ROULEAU >Priority: Major > > When a record generates a RecordDeserializationException, KIP mentioned that > with explicit acknowledgement the default Release can be overridden. > When tried, I have: > {code:java} > Exception in thread "main" java.lang.IllegalStateException: The record cannot > be acknowledged. > at > org.apache.kafka.clients.consumer.internals.ShareFetch.acknowledge(ShareFetch.java:123) > at > org.apache.kafka.clients.consumer.internals.ShareConsumerImpl.acknowledge(ShareConsumerImpl.java:683) > at > org.apache.kafka.clients.consumer.KafkaShareConsumer.acknowledge(KafkaShareConsumer.java:534) > at org.example.frouleau.kip932.Main.main(Main.java:62) {code} > It looks like the record was already released. > Code used: > {code:java} > // > } catch (RecordDeserializationException re) { > long offset = re.offset(); > Throwable t = re.getCause(); > LOGGER.error("Failed to deserialize record at partition={} offset={}", > re.topicPartition().partition(), offset, t); > ConsumerRecord record = new > ConsumerRecord<>(re.topicPartition().topic(), > re.topicPartition().partition(), offset, "", ""); > consumer.acknowledge(record, AcknowledgeType.REJECT); > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-19047: --- Fix Version/s: 4.1.0 4.0.1 > Broker registrations are slow if previously fenced or shutdown > -- > > Key: KAFKA-19047 > URL: https://issues.apache.org/jira/browse/KAFKA-19047 > Project: Kafka > Issue Type: Bug >Affects Versions: 4.0.0 >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > Fix For: 4.1.0, 4.0.1 > > > BrokerLifecycleManager prevents registration of a broker w/ an id it has seen > before with a different incarnation id if the broker session expires. On > clean shutdown and restart of a broker this can cause an unnecessary delay in > re-registration while the quorum controller waits for the session to expire. > ``` > [BrokerLifecycleManager id=1] Unable to register broker 1 because the > controller returned error DUPLICATE_BROKER_REGISTRATION > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-19047) Broker registrations are slow if previously fenced or shutdown
[ https://issues.apache.org/jira/browse/KAFKA-19047?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-19047: --- Component/s: controller > Broker registrations are slow if previously fenced or shutdown > -- > > Key: KAFKA-19047 > URL: https://issues.apache.org/jira/browse/KAFKA-19047 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 4.0.0 >Reporter: Alyssa Huang >Assignee: Alyssa Huang >Priority: Major > Fix For: 4.1.0, 4.0.1 > > > BrokerLifecycleManager prevents registration of a broker w/ an id it has seen > before with a different incarnation id if the broker session expires. On > clean shutdown and restart of a broker this can cause an unnecessary delay in > re-registration while the quorum controller waits for the session to expire. > ``` > [BrokerLifecycleManager id=1] Unable to register broker 1 because the > controller returned error DUPLICATE_BROKER_REGISTRATION > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-15873: Filter topics before sorting [kafka]
lorcanj opened a new pull request, #19304: URL: https://github.com/apache/kafka/pull/19304 Partially addresses: [KAFKA-15873](https://issues.apache.org/jira/browse/KAFKA-15873) When filtering and sorting, we should be applying the filter before the sort of topics. Order that unauthorizedForDescribeTopicMetadata is added to not relevant as it is a HashSet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-19049: Remove the `@ExtendWith(ClusterTestExtensions.class)` from code base [kafka]
FrankYang0529 commented on PR #19299: URL: https://github.com/apache/kafka/pull/19299#issuecomment-2760354933 @Rancho-7 Can we also this line in Java doc? Thanks. https://github.com/apache/kafka/blob/28de78bcbad605a3e906d085d2e59b441ae35212/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/ClusterTestExtensions.java#L81 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18390: Use LinkedHashMap instead of Map in creating MetricName and SensorBuilder (1/N) [kafka]
TaiJuWu commented on PR #19300: URL: https://github.com/apache/kafka/pull/19300#issuecomment-2760216868 > @TaiJuWu Thanks for the patch. Out of curiosity, have we checked that jmx metric names are not altered by this change? I think that they are generated based on the order in the data structure. Hi @dajac , thanks for your review. I thinks the order is not changed. Both of them are following. https://github.com/user-attachments/assets/2a5e45e5-2036-4e82-a29d-228fcf9432a4"; /> But I am a little confused why we need to care `jmx metrics name`? In the past, we use `HashMap` so it is unordered but we use `LinkedHashMap` in this PR. The former is non-deterministic and dependent on different implementation but the latter is deterministic. If there is any user assume metrics order are deterministic, that is an issue they need to fix. If I misunderstood anything, please correct me, thanks. Another thing was here is a PR related `MetricName` https://github.com/apache/kafka/pull/19222, it is difficult if we want to check all order is same as before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16729: Support isolation level for share consumer [kafka]
adixitconfluent commented on code in PR #19261: URL: https://github.com/apache/kafka/pull/19261#discussion_r2017977111 ## server-common/src/main/java/org/apache/kafka/server/storage/log/FetchIsolation.java: ## @@ -25,11 +25,11 @@ public enum FetchIsolation { TXN_COMMITTED; public static FetchIsolation of(FetchRequest request) { -return of(request.replicaId(), request.isolationLevel()); +return of(request.replicaId(), request.isolationLevel(), false); } -public static FetchIsolation of(int replicaId, IsolationLevel isolationLevel) { -if (!FetchRequest.isConsumer(replicaId)) { +public static FetchIsolation of(int replicaId, IsolationLevel isolationLevel, boolean isShareFetchRequest) { +if (!FetchRequest.isConsumer(replicaId) && !isShareFetchRequest) { Review Comment: I have made the code change to use `replicaId` as -1 and removed `isShareFetchRequest` param. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-19024) Enhance the client behaviour when it tries to exceed the `group.share.max.groups`
[ https://issues.apache.org/jira/browse/KAFKA-19024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17939153#comment-17939153 ] Lan Ding commented on KAFKA-19024: -- Thanks for your reply. Whether reusing the GROUP_MAX_SIZE_REACHED error code is appropriate depends on whether clients need to differentiate between these two error scenarios. However, given that clients currently only need to retry and log the error (no special handling is required), reusing the error code seems acceptable. > Enhance the client behaviour when it tries to exceed the > `group.share.max.groups` > - > > Key: KAFKA-19024 > URL: https://issues.apache.org/jira/browse/KAFKA-19024 > Project: Kafka > Issue Type: Sub-task >Reporter: Sanskar Jhajharia >Assignee: Lan Ding >Priority: Minor > > For share groups we use the `group.share.max.groups` config to define the > number of max share groups we allow. However, when we exceed the same, the > client logs do not specify any such error and simply do not consume. The > group doesn't get created but the client continues to send Heartbeats hoping > for one of the existing groups to shut down and allowing it to form a group. > Having a log or an exception in the client logs will help them debug such > situations accurately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16580: Enable dynamic quorum reconfiguration for raft simulation tests [kafka]
kevin-wu24 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r2003724311 ## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ## @@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) { @Override public void verify() { -cluster.leaderHighWatermark().ifPresent(highWatermark -> { -long numReachedHighWatermark = cluster.nodes.entrySet().stream() -.filter(entry -> cluster.voters.containsKey(entry.getKey())) -.filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) -.count(); -assertTrue( -numReachedHighWatermark >= cluster.majoritySize(), -"Insufficient nodes have reached current high watermark"); +if (cluster.withKip853) { +/* +* For clusters running in KIP-853 mode, we check that a majority of at least one of: +* 1. the leader's voter set at the HWM +* 2. the leader's lastVoterSet() +* has reached the HWM. We need to perform a more elaborate check here because in clusters where +* an Add/RemoveVoter request increases/decreases the majority of voters value by 1, the leader +* could have used either majority value to update its HWM value. This is because depending on +* whether the leader read the most recent VotersRecord prior to updating its HWM value, the number Review Comment: The summary of the discussion as to why we need to check the HWM in this way is contained here: https://github.com/apache/kafka/pull/18987#discussion_r1971722138. TLDR: the simulation test can call `verify` and look at the RaftClient internal state in between those two points (i.e. `partitionState` has been updated with latest `votersRecord`, but the HWM is still the old value from the last committed `votersRecord`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value [kafka]
AyoubOm commented on PR #19303: URL: https://github.com/apache/kafka/pull/19303#issuecomment-2759697061 @mjsax FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org