Re: [PR] MINOR: Remove unused system test code and avoid misleading `quorum.zk` references [kafka]
m1a2st commented on code in PR #19034: URL: https://github.com/apache/kafka/pull/19034#discussion_r1971614754 ## tests/kafkatest/services/kafka/kafka.py: ## @@ -1362,7 +1356,7 @@ def alter_message_format(self, topic, msg_format_version, node=None): def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller") -force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server +force_use_zk_connection = force_use_zk_connection Review Comment: may be we can remove this parameter, and clean up the if condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID
[ https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930701#comment-17930701 ] Lianet Magrans commented on KAFKA-15283: Hi [~isding_l] , not working on it because the broker-side changes are not in place yet. Once it supports topic IDs on the OffsetFetch/Commit request, then we need to take on this one for the client side support. That being said, feel free to take it if you're interested, this will probably be for 4.1 depending on the broker. > Client support for OffsetFetch and OffsetCommit with topic ID > - > > Key: KAFKA-15283 > URL: https://issues.apache.org/jira/browse/KAFKA-15283 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Critical > Labels: kip-848-client-support, newbie, offset > Fix For: 4.1.0 > > > Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory > {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and > {{METADATA}} RPC calls. > With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in > the same way, so the new client implementation will provide it when issuing > those requests. Topic names should continue to be supported as needed by the > {{{}AdminClient{}}}. > We should also review/clean-up the support for topic names in requests such > as the {{METADATA}} request (currently supporting topic names as well as > topic IDs on the client side). > Tasks include: > * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will > be upgraded on the server to support topic ID > * Check topic ID propagation internally in the client based on RPCs > including it. > * Review existing support for topic name for potential clean if not needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [MINOR] Clean up coordinator-common and server modules [kafka]
sjhajharia commented on PR #19009: URL: https://github.com/apache/kafka/pull/19009#issuecomment-2685660580 hey @chia7712 gentle reminder for this PR Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out
[ https://issues.apache.org/jira/browse/KAFKA-18874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-18874: --- Component/s: controller > KRaft controller does not retry registration if the first attempt times out > --- > > Key: KAFKA-18874 > URL: https://issues.apache.org/jira/browse/KAFKA-18874 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Daniel Fonai >Priority: Minor > > There is a [retry > mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274] > with exponential backoff built-in in KRaft controller registration. The > timeout of the first attempt is 5 s for KRaft controllers > ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448]) > which is not configurable. > If for some reason the controller's first registration request times out, the > attempt should be retried but in practice this does not happen and the > controller is not able to join the quorum. We see the following in the faulty > controller's log: > {noformat} > 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to > send ControllerRegistrationRequestData(controllerId=3, > incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, > listeners=[Listener(name='CONTROLPLANE-9090', > host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc', > port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', > minSupportedVersion=0, maxSupportedVersion=1), > Feature(name='metadata.version', minSupportedVersion=1, > maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) > [controller-3-registration-manager-event-handler] > ... > 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel > manager timed out before sending the request. > (kafka.server.ControllerRegistrationManager) > [controller-3-to-controller-registration-channel-manager] > 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting > for the previous RPC to complete. > (kafka.server.ControllerRegistrationManager) > [controller-3-registration-manager-event-handler] > {noformat} > After this we can not see any controller retry in the log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove old message format documentation [kafka]
ijuma merged PR #19033: URL: https://github.com/apache/kafka/pull/19033 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove old message format documentation [kafka]
ijuma commented on PR #19033: URL: https://github.com/apache/kafka/pull/19033#issuecomment-2685752393 Cherry-picked to 4.0 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17431: Support invalid static configs for KRaft so long as dynamic configs are valid [kafka]
cmccabe commented on code in PR #18949: URL: https://github.com/apache/kafka/pull/18949#discussion_r1972230877 ## core/src/main/scala/kafka/raft/RaftManager.scala: ## @@ -104,6 +104,8 @@ trait RaftManager[T] { def replicatedLog: ReplicatedLog def voterNode(id: Int, listener: ListenerName): Option[Node] + + def getRecordSerde: RecordSerde[T] Review Comment: In Kafka we generally don't put 'get' in front of getters. So this method should just be `recordSerde` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API… [kafka]
dongnuo123 opened a new pull request, #19042: URL: https://github.com/apache/kafka/pull/19042 … must check topic describe (#18989) This patch filters out the topic describe unauthorized topics from the ConsumerGroupHeartbeat and ConsumerGroupDescribe response. In ConsumerGroupHeartbeat, - if the request has `subscribedTopicNames` set, we directly check the authz in `KafkaApis` and return a topic auth failure in the response if any of the topics is denied. - Otherwise, we check the authz only if a regex refresh is triggered and we do it based on the acl of the consumer that triggered the refresh. If any of the topic is denied, we filter it out from the resolved subscription. In ConsumerGroupDescribe, we check the authz of the coordinator response. If any of the topic in the group is denied, we remove the described info and add a topic auth failure to the described group. (similar to the group auth failure) Reviewers: David Jacot , Lianet Magrans , Rajini Sivaram , Chia-Ping Tsai , TaiJuWu , TengYao Chi (cherry picked from commit 36f19057e1d57a8548a4548c304799fd176c359f) Delete this text and replace it with a detailed description of your change. The PR title and body will become the squashed commit message. If you would like to tag individuals, add some commentary, upload images, or include other supplemental information that should not be part of the eventual commit message, please use a separate comment. If applicable, please include a summary of the testing strategy (including rationale) for the proposed change. Unit and/or integration tests are expected for any behavior change and system tests should be considered for larger changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API… [kafka]
dongnuo123 commented on code in PR #19042: URL: https://github.com/apache/kafka/pull/19042#discussion_r1972238070 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ## @@ -158,29 +160,28 @@ public Builder withGroupConfigManager(GroupConfigManager groupConfigManager) { return this; } +public Builder withAuthorizer(Optional authorizer) { +this.authorizer = authorizer; +return this; +} + public GroupCoordinatorService build() { -if (config == null) -throw new IllegalArgumentException("Config must be set."); -if (writer == null) -throw new IllegalArgumentException("Writer must be set."); -if (loader == null) -throw new IllegalArgumentException("Loader must be set."); -if (time == null) -throw new IllegalArgumentException("Time must be set."); -if (timer == null) -throw new IllegalArgumentException("Timer must be set."); -if (coordinatorRuntimeMetrics == null) -throw new IllegalArgumentException("CoordinatorRuntimeMetrics must be set."); -if (groupCoordinatorMetrics == null) -throw new IllegalArgumentException("GroupCoordinatorMetrics must be set."); -if (groupConfigManager == null) -throw new IllegalArgumentException("GroupConfigManager must be set."); +requireNonNull(config, new IllegalArgumentException("Config must be set.")); +requireNonNull(writer, new IllegalArgumentException("Writer must be set.")); +requireNonNull(loader, new IllegalArgumentException("Loader must be set.")); +requireNonNull(time, new IllegalArgumentException("Time must be set.")); +requireNonNull(timer, new IllegalArgumentException("Timer must be set.")); +requireNonNull(coordinatorRuntimeMetrics, new IllegalArgumentException("CoordinatorRuntimeMetrics must be set.")); +requireNonNull(groupCoordinatorMetrics, new IllegalArgumentException("GroupCoordinatorMetrics must be set.")); +requireNonNull(groupConfigManager, new IllegalArgumentException("GroupConfigManager must be set.")); +requireNonNull(authorizer, new IllegalArgumentException("Authorizer must be set.")); Review Comment: Bring in `requrieNonNull` in advance because of checkstyle error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssignment and AbstractCoordinatorTest [kafka]
lianetm commented on code in PR #18945: URL: https://github.com/apache/kafka/pull/18945#discussion_r1971704382 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -165,16 +165,18 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, Metrics metrics, String metricGrpPrefix, Time time) { -this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty()); +this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, time, Optional.empty(), Optional.empty()); } +@SuppressWarnings("this-escape") Review Comment: I guess we could we avoid suppressing here if we fix how we create the `HeartbeatThread::new` in this constructor? (ex. we could do it lazily on `startHeartbeatThreadIfNeeded`, and just keep an Optional ref to the supplier here) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.common.utils.KafkaThread; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Base class for heartbeat threads. This class provides a mechanism to enable/disable the heartbeat thread. + * The heartbeat thread should check whether it's enabled by calling {@link BaseHeartbeatThread#isEnabled()} + * before sending heartbeat requests. + */ +public class BaseHeartbeatThread extends KafkaThread implements AutoCloseable { Review Comment: should we add a basic unit test for this class (simple test just to make sure that the all actions flip the right flag, ex. isEnabled() true after calling enable(), etc.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] WIP Investigate OOM [kafka]
mumrah commented on PR #19031: URL: https://github.com/apache/kafka/pull/19031#issuecomment-2686054397 Seems to have reproduced here: https://github.com/apache/kafka/actions/runs/13550598471/job/37873138268?pr=19031 No activity for a while after ``` Wed, 26 Feb 2025 18:35:41 GMT > Task :streams:test-utils:copyDependantLibs Wed, 26 Feb 2025 18:52:51 GMT > Task :streams:test-utils:jar Wed, 26 Feb 2025 18:55:06 GMT > Task :connect:runtime:compileJava ``` This suggests that `-XX:-UseGCOverheadLimit` is working as expected. However, it also suggests that there is a real memory leak or something. This run included a larger heap of 3gb. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
Rancho-7 commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1971674751 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); Review Comment: same. https://github.com/apache/kafka/blob/4b5a16bf6ffcb4322d7c5b35a484ff914397909c/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java#L522 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
Rancho-7 commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1971671660 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); +String id = UUID.randomUUID().toString(); Review Comment: Thanks for the review! I have some questions that I noticed we use Java UUID in some other tests in this file.Should we keep the same approach or not? https://github.com/apache/kafka/blob/4b5a16bf6ffcb4322d7c5b35a484ff914397909c/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java#L544 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
Rancho-7 commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1971676812 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); +String id = UUID.randomUUID().toString(); +props.put("config.providers.file.param.testId", id); +props.put("test.key", "${file:/path:key}"); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file"); +assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); +TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap()); +assertEquals("testKey", config.originals().get("test.key")); +MockFileConfigProvider.assertClosed(id); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, +MockFileConfigProvider.class.getName() + "," + + "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); Review Comment: Thanks for pointing that out! I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
Rancho-7 commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1971676812 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); +String id = UUID.randomUUID().toString(); +props.put("config.providers.file.param.testId", id); +props.put("test.key", "${file:/path:key}"); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file"); +assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); +TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap()); +assertEquals("testKey", config.originals().get("test.key")); +MockFileConfigProvider.assertClosed(id); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, +MockFileConfigProvider.class.getName() + "," + + "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); Review Comment: Thanks for pointing that out! Fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Disallow unused local variables [kafka]
mumrah commented on code in PR #18963: URL: https://github.com/apache/kafka/pull/18963#discussion_r1971693097 ## tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java: ## @@ -79,7 +79,8 @@ public static void main(String[] args) { shareConsumers.forEach(shareConsumer -> shareConsumersMetrics.add(shareConsumer.metrics())); } shareConsumers.forEach(shareConsumer -> { -Map> val = shareConsumer.commitSync(); +@SuppressWarnings("UnusedLocalVariable") +Map> ignored = shareConsumer.commitSync(); Review Comment: I completely missed that this was in `tools`. I agree that the approach here (the explicit suppression) is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
dongnuo123 commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1971754326 ## core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala: ## @@ -561,5 +568,17 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas case _: TopicAuthorizationException => consumeRecords(consumer, numRecords, startingOffset, topic) } } + + // Consume records, ignoring TopicAuthorization exception from previously sent request. + private def consumeRecordsIgnoreAuthorizationException(consumer: Consumer[Array[Byte], Array[Byte]], +numRecords: Int = 1, +startingOffset: Int = 0, +topic: String = topic): Unit = { +try { + consumeRecords(consumer, numRecords, startingOffset, topic) +} catch { + case _: TopicAuthorizationException => consumeRecordsIgnoreAuthorizationException(consumer, numRecords, startingOffset, topic) Review Comment: Yes agree. This is simpler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18329) Delete old group coordinator (KIP-848)
[ https://issues.apache.org/jira/browse/KAFKA-18329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930718#comment-17930718 ] Ismael Juma commented on KAFKA-18329: - Why 4.2.0? What's stopping us from doing it now? > Delete old group coordinator (KIP-848) > -- > > Key: KAFKA-18329 > URL: https://issues.apache.org/jira/browse/KAFKA-18329 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 4.2.0 > > > In 4.2.0, we should be able to delete the old group coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Remove unused system test code and avoid misleading `quorum.zk` references [kafka]
ijuma commented on code in PR #19034: URL: https://github.com/apache/kafka/pull/19034#discussion_r1971797966 ## tests/kafkatest/services/kafka/kafka.py: ## @@ -1362,7 +1356,7 @@ def alter_message_format(self, topic, msg_format_version, node=None): def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller") -force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server +force_use_zk_connection = force_use_zk_connection Review Comment: You are right, `force_use_zk_connection` is always `false` for this method. Pushed a commit that removes that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18858: Refactor FeatureControlManager to avoid using uninitialized MV [kafka]
FrankYang0529 opened a new pull request, #19040: URL: https://github.com/apache/kafka/pull/19040 The `FeatureControlManager` used `MetadataVersion#LATEST_PRODUCTION` as uninitialized MV. This makes other component may get a stale MV. In production code, the `FeatureControlManager` set MV when replaying `FeatureLevelRecord`, so we can set `Optional.empty()` as uninitialized MV. If other components get an empty result, the `FeatureLevelRecord` throws an exception like `FeaturesImage`. Unit test: * FeatureControlManagerTest#testMetadataVersion: test getting MetadataVersion before and after replaying FeatureLevelRecord. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
lianetm commented on code in PR #18989: URL: https://github.com/apache/kafka/pull/18989#discussion_r1971805902 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2592,6 +2608,38 @@ class KafkaApis(val requestChannel: RequestChannel, response.groups.addAll(results) } + // Clients are not allowed to see topics that are not authorized for Describe. + val topicsToCheck = authorizer match { +case Some(_) => + response.groups.stream() +.flatMap(group => group.members.stream) +.flatMap(member => util.stream.Stream.of(member.assignment, member.targetAssignment)) +.flatMap(assignment => assignment.topicPartitions.stream) +.map(topicPartition => topicPartition.topicName) +.collect(Collectors.toSet[String]) +.asScala +case None => Set.empty[String] + } + val authorizedTopics = authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, +topicsToCheck)(identity) + val updatedGroups = response.groups.stream().map { group => Review Comment: this is also only needed if there is an authorizer right? Moreover, having like this, if there is no authorizer, don't we end up now with empty `authorizedTopics`, iterate here and end up with hasUnauthorizedTopic=true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove unused system test code and avoid misleading `quorum.zk` references [kafka]
m1a2st commented on code in PR #19034: URL: https://github.com/apache/kafka/pull/19034#discussion_r1971599561 ## tests/kafkatest/services/kafka/kafka.py: ## @@ -1362,7 +1356,7 @@ def alter_message_format(self, topic, msg_format_version, node=None): def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller") -force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server +force_use_zk_connection = force_use_zk_connection Review Comment: We should remove this line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
frankvicky commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1971633273 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); +String id = UUID.randomUUID().toString(); +props.put("config.providers.file.param.testId", id); +props.put("test.key", "${file:/path:key}"); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file"); +assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(props, Collections.emptyMap())); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); +TestIndirectConfigResolution config = new TestIndirectConfigResolution(props, Collections.emptyMap()); +assertEquals("testKey", config.originals().get("test.key")); +MockFileConfigProvider.assertClosed(id); + +System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, +MockFileConfigProvider.class.getName() + "," + + "org.apache.kafka.common.config.provider.EnvVarConfigProvider"); Review Comment: `EnvVarConfigProvider.class.getName()` ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); +String id = UUID.randomUUID().toString(); Review Comment: We could use the UUID class defined by Kafka. ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); Review Comment: We could use `AbstractConfig.CONFIG_PROVIDERS_CONFIG` instead of plaintext. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18868: add the "default value" explanation to the docs of num.replica.alter.log.dirs.threads [kafka]
m1a2st commented on code in PR #19038: URL: https://github.com/apache/kafka/pull/19038#discussion_r1971566712 ## server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java: ## @@ -52,7 +52,8 @@ public class ServerConfigs { public static final String BACKGROUND_THREADS_DOC = "The number of threads to use for various background processing tasks"; public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG = "num.replica.alter.log.dirs.threads"; -public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The number of threads that can move replicas between log directories, which may include disk I/O"; +public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The number of threads that can move replicas between log directories, which may include disk I/O." + Review Comment: Please add a space at the end of the sentence.  ## server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java: ## @@ -52,7 +52,8 @@ public class ServerConfigs { public static final String BACKGROUND_THREADS_DOC = "The number of threads to use for various background processing tasks"; public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG = "num.replica.alter.log.dirs.threads"; -public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The number of threads that can move replicas between log directories, which may include disk I/O"; +public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The number of threads that can move replicas between log directories, which may include disk I/O." + +"The default value is equal to the number of directories specified in the " + ServerLogConfigs.LOG_DIRS_CONFIG + " configuration property."; Review Comment: We should add code bolck for `ServerLogConfigs.LOG_DIRS_CONFIG` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17607: Add CI step to verify LICENSE-binary [kafka]
m1a2st commented on code in PR #18299: URL: https://github.com/apache/kafka/pull/18299#discussion_r1971829502 ## .github/workflows/build.yml: ## @@ -127,6 +127,9 @@ jobs: gradle-cache-read-only: ${{ !inputs.is-trunk }} gradle-cache-write-only: ${{ inputs.is-trunk }} develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + - name: Verify license file +if: always() Review Comment: `always()` means this will run even if a previous step failed. I think we don't need this in this step, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18614, KAFKA-18613: Add streams group request plumbing [kafka]
lucasbru merged PR #18979: URL: https://github.com/apache/kafka/pull/18979 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Remove old message format documentation [kafka]
ijuma commented on code in PR #19033: URL: https://github.com/apache/kafka/pull/19033#discussion_r1971831709 ## docs/implementation.html: ## @@ -103,67 +103,8 @@ 5.3.3 Old Message Format -Prior to Kafka 0.11, messages were transferred and stored in message sets. In a message set, each message has its own metadata. Note that although message sets are represented as an array, -they are not preceded by an int32 array size like other array elements in the protocol. +Prior to Kafka 0.11, messages were transferred and stored in message sets. See https://kafka.apache.org/39/documentation/#messageset";>Old Message Format for more details. Review Comment: Right, the KIP covers the edge cases in detail and is linked from the upgrade notes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18651) Core streams-specific broker configurations
[ https://issues.apache.org/jira/browse/KAFKA-18651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-18651: --- Fix Version/s: 4.1 > Core streams-specific broker configurations > --- > > Key: KAFKA-18651 > URL: https://issues.apache.org/jira/browse/KAFKA-18651 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Lucas Brutschy >Assignee: Bill Bejeck >Priority: Major > Fix For: 4.1 > > > We need to add the following core subset of broker configurations (some may > already be there): > {{Broker configs:}} > {{group.coordinator.rebalance.protocols – add value streams to enable streams > rebalance protocol}} > {{group.streams.session.timeout.ms}} > {{group.streams.heartbeat.interval.ms}} > {{group.streams.num.standby.replicas}} > {{group.streams.max.size}} > > We need to make sure that `num.standby.replicas` is passed with > `assignmentConfigs` to the assignor. > As defined in the KIP. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18651) Core streams-specific broker configurations
[ https://issues.apache.org/jira/browse/KAFKA-18651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy updated KAFKA-18651: --- Fix Version/s: 4.1.0 (was: 4.1) > Core streams-specific broker configurations > --- > > Key: KAFKA-18651 > URL: https://issues.apache.org/jira/browse/KAFKA-18651 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Lucas Brutschy >Assignee: Bill Bejeck >Priority: Major > Fix For: 4.1.0 > > > We need to add the following core subset of broker configurations (some may > already be there): > {{Broker configs:}} > {{group.coordinator.rebalance.protocols – add value streams to enable streams > rebalance protocol}} > {{group.streams.session.timeout.ms}} > {{group.streams.heartbeat.interval.ms}} > {{group.streams.num.standby.replicas}} > {{group.streams.max.size}} > > We need to make sure that `num.standby.replicas` is passed with > `assignmentConfigs` to the assignor. > As defined in the KIP. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
Rancho-7 opened a new pull request, #19039: URL: https://github.com/apache/kafka/pull/19039 jira: https://issues.apache.org/jira/browse/KAFKA-18850 `org.apache.kafka.automatic.config.providers = env` is incorrect. According to the source code, the correct value should be the [class name](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L616). fix the doc and add unit test to verify. https://github.com/user-attachments/assets/f3f0f19d-8ef5-4950-85e5-3c7c868447a9"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add README.md test command [kafka]
m1a2st commented on code in PR #19025: URL: https://github.com/apache/kafka/pull/19025#discussion_r1971727446 ## README.md: ## @@ -75,6 +76,7 @@ Retries are disabled by default, but you can set maxTestRetryFailures and maxTes The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 to enable a failed test to be retried once, with a total retry limit of 3. ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3 +./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 -PmaxTestRetryFailures=3 Review Comment: I don't think we need to add this command in this section, this section is focus on `Specifying test retries` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add README.md test command [kafka]
m1a2st commented on code in PR #19025: URL: https://github.com/apache/kafka/pull/19025#discussion_r1971727446 ## README.md: ## @@ -75,6 +76,7 @@ Retries are disabled by default, but you can set maxTestRetryFailures and maxTes The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 to enable a failed test to be retried once, with a total retry limit of 3. ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3 +./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 -PmaxTestRetryFailures=3 Review Comment: I don't think we need to add this command in this section, this section is focus on "Specifying test retries", and an example is already provided -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16580: Enable dynamic quorum reconfiguration for raft simulation tests pt 1 [kafka]
kevin-wu24 commented on code in PR #18987: URL: https://github.com/apache/kafka/pull/18987#discussion_r1971722138 ## raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java: ## @@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) { @Override public void verify() { -cluster.leaderHighWatermark().ifPresent(highWatermark -> { -long numReachedHighWatermark = cluster.nodes.entrySet().stream() -.filter(entry -> cluster.voters.containsKey(entry.getKey())) -.filter(entry -> entry.getValue().log.endOffset().offset() >= highWatermark) -.count(); -assertTrue( -numReachedHighWatermark >= cluster.majoritySize(), -"Insufficient nodes have reached current high watermark"); +if (cluster.withKip853) { +/* +* For clusters running in KIP-853 mode, we check that a majority of at least one of: +* 1. the leader's voter set at the HWM Review Comment: Sorry, upon further looking into this, I think the issue is just slightly different that what I've described thus far. Essentially, when we fail the invariant check when only using `lastVoterSet()`, what's going on is that the caller of `verify()` is a different thread (lets say event scheduler thread) than the KRaft leader's raft-io thread which is continuously doing all of this internal state updating. The event scheduler thread is looking at a bunch of the leader's internal state (e.g. partitionState and highWatermark), which can definitely be in the following state: `partitionState` has been updated with a new voter set, but a new `highWatermark` value has not yet been calculated with this voter set yet, which could cause this invariant check to fail when only looking at `lastVoterSet()`. The leader may not be in this state for very long, since it's in this state in between the calls to `appendAsLeader -> updateState -> maybeLoadLog` and `flushLeaderLog -> maybeUpdateHighWatermark`, but it nevertheless looks like a valid state to me. Invariants as a concept are predicates that should be true for the system at all times, no matter when you check them, so I think performing invariant verification how it's currently implemented is fine, as we could be checking the internal states of our raft nodes at any point in their execution. However, it just means we have to consider these "intermediary" states when performing verification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add README.md test command [kafka]
m1a2st commented on code in PR #19025: URL: https://github.com/apache/kafka/pull/19025#discussion_r1971727446 ## README.md: ## @@ -75,6 +76,7 @@ Retries are disabled by default, but you can set maxTestRetryFailures and maxTes The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 to enable a failed test to be retried once, with a total retry limit of 3. ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3 +./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 -PmaxTestRetryFailures=3 Review Comment: I don't think we need to add this command in this section, this section is focus on "Specifying test retries", and there is already and example ## README.md: ## @@ -75,6 +76,7 @@ Retries are disabled by default, but you can set maxTestRetryFailures and maxTes The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 to enable a failed test to be retried once, with a total retry limit of 3. ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3 +./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 -PmaxTestRetryFailures=3 Review Comment: I don't think we need to add this command in this section, this section is focus on "Specifying test retries", and there is already an example -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18873) Incorrect error message for max.in.flight.requests.per.connection when using transactional producer.
Eslam Mohamed created KAFKA-18873: - Summary: Incorrect error message for max.in.flight.requests.per.connection when using transactional producer. Key: KAFKA-18873 URL: https://issues.apache.org/jira/browse/KAFKA-18873 Project: Kafka Issue Type: Bug Components: clients Reporter: Eslam Mohamed Assignee: Eslam Mohamed {code:java} KafkaProducerTest.testInflightRequestsAndIdempotenceForIdempotentProduces{code} {{Above unit test }}checks for configuration validation errors when instantiating a {{ProducerConfig}} with invalid properties. One of the assertions in this test "invalidProps4" is designed to validate the constraint that {{max.in.flight.requests.per.connection}} must be at most {{5}} when using a transactional producer. However, the error message thrown by the {{ProducerConfig}} constructor in this scenario is incorrect. * *Observed Behavior:* When {{max.in.flight.requests.per.connection}} is set to {{6}} for a transactional producer, the test expects an exception with the message: {{"Must set max.in.flight.requests.per.connection to at most 5 when using the transactional producer."}} Instead, the error message states: {{"Must set retries to non-zero when using the idempotent producer."}} * *Expected Behavior:* The error message should explicitly indicate the violation of the {{max.in.flight.requests.per.connection}} constraint for transactional producers: {{"Must set max.in.flight.requests.per.connection to at most 5 when using the transactional producer."}} The mismatch in the error message can lead to confusion for developers debugging the configuration error, as it incorrectly hints at a {{retries}} configuration issue instead of the actual {{max.in.flight.requests.per.connection}} issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-18872) Convert StressTestLog and TestLinearWriteSpeed to jmh benchmarks
[ https://issues.apache.org/jira/browse/KAFKA-18872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930717#comment-17930717 ] Ismael Juma commented on KAFKA-18872: - Given that this is exercising the log layer, not sure JMH is appropriate. JMH is typically used for CPU bound benchmarks. > Convert StressTestLog and TestLinearWriteSpeed to jmh benchmarks > > > Key: KAFKA-18872 > URL: https://issues.apache.org/jira/browse/KAFKA-18872 > Project: Kafka > Issue Type: Task >Reporter: Mickael Maison >Priority: Major > > Both StressTestLog and TestLinearWriteSpeed are in the jmh-benchmarks module > but are not actually benchmarks. > We can keep the main method if people want but it would be helpful to be able > to run them as benchmarks to ensure changes to our log layer does not > negatively impact performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]
m1a2st commented on code in PR #19020: URL: https://github.com/apache/kafka/pull/19020#discussion_r1971884683 ## clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionExceptionHierarchyTest { + +@ParameterizedTest +@MethodSource("retriableExceptionsProvider") +void testRetriableExceptionExceptionHierarchy(Class exceptionClass) { +assertRetriableExceptionInheritance(exceptionClass); +} + +/** + * Verifies that the given exception class extends `RetriableException` + * and does **not** extend `RefreshRetriableException`. + * Using `RefreshRetriableException` changes the exception handling behavior, + * so only exceptions directly extending `RetriableException` are valid here. + * + * @param exceptionClass the exception class to check + */ +private void assertRetriableExceptionInheritance(Class exceptionClass) { +assertTrue(RetriableException.class.isAssignableFrom(exceptionClass), +exceptionClass.getSimpleName() + " should extend RetriableException"); + assertFalse(RefreshRetriableException.class.isAssignableFrom(exceptionClass), +exceptionClass.getSimpleName() + " should NOT extend RefreshRetriableException"); +} + +private static Stream> retriableExceptionsProvider() { +return Stream.of( +TimeoutException.class, +NotEnoughReplicasException.class, +CoordinatorLoadInProgressException.class, +CorruptRecordException.class, +NotEnoughReplicasAfterAppendException.class, +ConcurrentTransactionsException.class +); +} Review Comment: Please use `@ValueSource` instead of `@MethodSource`. ## clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionExceptionHierarchyTest { + +@ParameterizedTest +@MethodSource("retriableExceptionsProvider") +void testRetriableExceptionExceptionHierarchy(Class exceptionClass) { +assertRetriableExceptionInheritance(exceptionClass); +} + +/** + * Verifies that the given exception class extends `RetriableException` + * and does **not** extend `RefreshRetriableException`. + * Using `RefreshRetriableException` changes the exception handling behavior, + * so only exceptions directly extending `RetriableException` are valid here. + * + * @param exceptionClass the exception class to check + */ +private void assertRetriableExceptionInheritance(Class exceptionClass) { Review Comment: P
[jira] [Commented] (KAFKA-18877) an mechanism to find cases where we accessed variables from the wrong thread
[ https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930871#comment-17930871 ] David Arthur commented on KAFKA-18877: -- I think the most likely unsafe access is reading from a control manager from the request thread (like in the referenced PR). In some cases, it is safe to access a control manager's state from outside the controller thread (like in processBrokerHeartbeat), but in most cases it isn't. It would be great if we could easily differentiate safe vs unsafe access in the control managers. Maybe we could split the control managers classes into two interfaces (controller-thread-only and any-thread) so we could use the type system to ensure safe access. Another idea is to do some runtime check from inside the control managers themselves. "if (current thread != controller) panic" > an mechanism to find cases where we accessed variables from the wrong thread > > > Key: KAFKA-18877 > URL: https://issues.apache.org/jira/browse/KAFKA-18877 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959] > There are some _non-thread safe_ classes storing the important information, > and so they are expected to be access by specific thread. Otherwise, it may > cause unexpected behavior -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18849: Add "strict min ISR" to the docs of "min.insync.replicas" [kafka]
clarkwtc commented on code in PR #19016: URL: https://github.com/apache/kafka/pull/19016#discussion_r1972555921 ## server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java: ## @@ -78,6 +78,7 @@ public final class ServerTopicConfigSynonyms { sameNameWithLogPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), sameName(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG), sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), +sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_DOC), Review Comment: I was going the wrong way. Sure, we can directly reference `MIN_IN_SYNC_REPLICAS_DOC`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18849: Add "strict min ISR" to the docs of "min.insync.replicas" [kafka]
clarkwtc commented on code in PR #19016: URL: https://github.com/apache/kafka/pull/19016#discussion_r1972555921 ## server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java: ## @@ -78,6 +78,7 @@ public final class ServerTopicConfigSynonyms { sameNameWithLogPrefix(TopicConfig.CLEANUP_POLICY_CONFIG), sameName(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG), sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), +sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_DOC), Review Comment: I was going the wrong way. Sure, we can directly reference MIN_IN_SYNC_REPLICAS_DOC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18734: Implemented share partition metrics (KIP-1103) [kafka]
apoorvmittal10 opened a new pull request, #19045: URL: https://github.com/apache/kafka/pull/19045 The PR implements the SharePartitionMetrics as defined in KIP-1103, with one change. The metric `FetchLockRatio` is defined as `Meter` in KIP but is implemented as `HIstogram`. There was a discussion about same on KIP-1103 discussion where we thought that `FetchLockRatio` is pre-aggregated but while implemeting the rate from `Meter` can go above 100 as `Meter` defines rate per time period. Hence it makes more sense to implement metric `FetchLockRatio` as `Histogram`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-18863) Runtime additions for connector multiversioning.
[ https://issues.apache.org/jira/browse/KAFKA-18863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-18863. - Fix Version/s: 4.1.0 Resolution: Fixed > Runtime additions for connector multiversioning. > > > Key: KAFKA-18863 > URL: https://issues.apache.org/jira/browse/KAFKA-18863 > Project: Kafka > Issue Type: New Feature > Components: connect >Affects Versions: 4.1.0 >Reporter: Snehashis Pal >Assignee: Snehashis Pal >Priority: Major > Fix For: 4.1.0 > > > Updates to connect worker to support connector multi versioning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18878) Implement share session cache metrics for share fetch
Apoorv Mittal created KAFKA-18878: - Summary: Implement share session cache metrics for share fetch Key: KAFKA-18878 URL: https://issues.apache.org/jira/browse/KAFKA-18878 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18878) Implement ShareSessionCache and DelayedShareFetchMetrics metrics for share fetch
[ https://issues.apache.org/jira/browse/KAFKA-18878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apoorv Mittal updated KAFKA-18878: -- Summary: Implement ShareSessionCache and DelayedShareFetchMetrics metrics for share fetch (was: Implement share session cache metrics for share fetch) > Implement ShareSessionCache and DelayedShareFetchMetrics metrics for share > fetch > > > Key: KAFKA-18878 > URL: https://issues.apache.org/jira/browse/KAFKA-18878 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18863: Connect Multiversion Support (Versioned Connector Creation and related changes) [kafka]
gharris1727 merged PR #17743: URL: https://github.com/apache/kafka/pull/17743 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]
ableegoldman commented on code in PR #17614: URL: https://github.com/apache/kafka/pull/17614#discussion_r1972542173 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java: ## @@ -275,6 +284,24 @@ public int memberEpoch() { return memberEpoch; } +/** + * @return the operation the consumer will perform on leaving the group. + * + * @see CloseOptions.GroupMembershipOperation + */ +public CloseOptions.GroupMembershipOperation leaveGroupOperation() { +return leaveGroupOperation; +} + +/** + * Sets the operation on consumer group membership that the consumer will perform when closing. + * The {@link AbstractMembershipManager#leaveGroupOperation} should remain {@code GroupMembershipOperation.DEFAULT} + * until the consumer is closed. + * + * @param operation the operation to be performed on close + */ +public abstract void leaveGroupOperationOnClose(CloseOptions.GroupMembershipOperation operation); Review Comment: nit: call this `#leaveGroupOperationOnClose` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java: ## @@ -469,8 +498,14 @@ public int joinGroupEpoch() { */ @Override public int leaveGroupEpoch() { +if (CloseOptions.GroupMembershipOperation.LEAVE_GROUP.equals(leaveGroupOperation)) { +return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH; +} else if (CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP.equals(leaveGroupOperation)) { +return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH; +} Review Comment: why do we need to add this? IIUC the static member vs dynamic member leave group epoch should still only be based on whether the group instance id is set -- I assume if we do skip the leave group then we just wouldn't call this method to begin with (though I'm not super familiar with the "leave group epoch", just know it's part of the new rebalancing protocol) ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1151,31 +1153,28 @@ protected void handlePollTimeoutExpiry() { "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); -maybeLeaveGroup("consumer poll timeout has expired."); +maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired."); } /** - * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership or is already - * not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown). + * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership + * with the default consumer group membership operation, or is already not part of the group (i.e., does not have a + * valid member ID, is in the UNJOINED state, or the coordinator is unknown). * + * @param membershipOperation the operation on consumer group membership that the consumer will perform when closing * @param leaveReason the reason to leave the group for logging * @throws KafkaException if the rebalance callback throws exception */ -public synchronized RequestFuture maybeLeaveGroup(String leaveReason) { +public synchronized RequestFuture maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) { RequestFuture future = null; -// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, -// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, -// and the membership expiration is only controlled by session timeout. -if (isDynamicMember() && !coordinatorUnknown() && -state != MemberState.UNJOINED && generation.hasMemberId()) { -// this is a minimal effort attempt to leave the group. we do not -// attempt any resending if the request fails or times out. +if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) { log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); + LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, -Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason))) +List.of(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason))) Review Comme
[jira] [Commented] (KAFKA-18877) an mechanism to find cases where we accessed variables from the wrong thread
[ https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930883#comment-17930883 ] TengYao Chi commented on KAFKA-18877: - Hi [~chia7712] If you are not start working, I would like to take over it. :) > an mechanism to find cases where we accessed variables from the wrong thread > > > Key: KAFKA-18877 > URL: https://issues.apache.org/jira/browse/KAFKA-18877 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959] > There are some _non-thread safe_ classes storing the important information, > and so they are expected to be access by specific thread. Otherwise, it may > cause unexpected behavior -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
[ https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930891#comment-17930891 ] Kirk True commented on KAFKA-15636: --- Can we do both :) Process the first fetch and make sure it equals bytesConsumedTotal and perform another fetch to make sure the average is as expected? > Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics > > > Key: KAFKA-15636 > URL: https://issues.apache.org/jira/browse/KAFKA-15636 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Assignee: Shivsundar R >Priority: Minor > Labels: consumer-threading-refactor, fetcher, unit-tests > > {{expectedBytes}} is calculated as total, instead of avg. Is this correct? -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]
chia7712 commented on code in PR #18726: URL: https://github.com/apache/kafka/pull/18726#discussion_r1972645207 ## clients/src/main/resources/common/message/FetchResponse.json: ## @@ -106,7 +106,7 @@ ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId", "about": "The preferred read replica for the consumer to use on its next fetch request."}, -{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} Review Comment: > It seems that before https://github.com/apache/kafka/commit/fe56fc98fa736c79c9dcbb1f64f810065161a1cc, we already have a bunch of calls like the following that will leave records as null in the fetchResponse. in the normal path, `FetchResponse.recordsOrFail(partitionData)` will return empty record to replace null. ``` public static Records recordsOrFail(FetchResponseData.PartitionData partition) { if (partition.records() == null) return MemoryRecords.EMPTY; if (partition.records() instanceof Records) return (Records) partition.records(); throw new ClassCastException("The record type is " + partition.records().getClass().getSimpleName() + ", which is not a subtype of " + Records.class.getSimpleName() + ". This method is only safe to call if the `FetchResponse` was deserialized from bytes."); } ``` https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L848 However, your comment inspired me to notice that we do have a path which returns null records. See the following links – it could return `FetchResponse.partitionResponse(tp, Errors.XXX)` when converting the records. https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L835 https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L884 ``` val downConvertMagic = logConfig.map(_.recordVersion.value).flatMap { magic => if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1) Some(RecordBatch.MAGIC_VALUE_V0) else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3) Some(RecordBatch.MAGIC_VALUE_V1) else None } ``` However, a 4.0 consumer should never receive a null record because it cannot use fetch versions v0-v3. Conversely, the server rejects v0-v3 requests, and the down-conversion process is also removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18876) 4.0 documentation improvement
[ https://issues.apache.org/jira/browse/KAFKA-18876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18876: --- Fix Version/s: 4.0.0 > 4.0 documentation improvement > - > > Key: KAFKA-18876 > URL: https://issues.apache.org/jira/browse/KAFKA-18876 > Project: Kafka > Issue Type: Improvement >Affects Versions: 4.0.0 >Reporter: Jun Rao >Assignee: Mingdao Yang >Priority: Major > Fix For: 4.0.0 > > > We need to fix a few things in the 4.0 documentation. > > 6.10 Consumer Rebalance Protocol > It's missing from the index on the left. > > ConsumerGroupPartitionAssignor is cut off in > org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionA. > > 6.11 Transaction Protocol > It's missing from the index on the left. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
[ https://issues.apache.org/jira/browse/KAFKA-17037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930895#comment-17930895 ] TaiJuWu commented on KAFKA-17037: - Hi [~isding_l] , this ticket was finished for a long time but it lacks reviewers. After finding any reviewer, I will continue to work on this. > KIP-919 supports for `describeClientQuotas` and `alterClientQuotas` > --- > > Key: KAFKA-17037 > URL: https://issues.apache.org/jira/browse/KAFKA-17037 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18276 Migrate RebootstrapTest to new test infra [kafka]
clarkwtc opened a new pull request, #19046: URL: https://github.com/apache/kafka/pull/19046 Migrate RebootstrapTest to new test infra and remove the old Scala test. The test results https://github.com/user-attachments/assets/35812c96-13f7-41f8-829d-40947f768646"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18850) Fix the docs of org.apache.kafka.automatic.config.providers
[ https://issues.apache.org/jira/browse/KAFKA-18850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18850: --- Fix Version/s: 4.0.0 > Fix the docs of org.apache.kafka.automatic.config.providers > --- > > Key: KAFKA-18850 > URL: https://issues.apache.org/jira/browse/KAFKA-18850 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Nick Guo >Priority: Major > Fix For: 4.0.0 > > > It seems to me that =evn is incorrect. According to the source code, the > correct value should be the class name. for example: > `org.apache.kafka.common.config.provider.EnvVarConfigProvider` > ``` > for (String provider : configProviders.split(",")) { > String providerClass = providerClassProperty(provider); > if (indirectConfigs.containsKey(providerClass)) { > String providerClassName = indirectConfigs.get(providerClass); > if (classNameFilter.test(providerClassName)) { > providerMap.put(provider, providerClassName); > } else { > throw new ConfigException(providerClassName + " is not > allowed. Update System property '" > + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to > allow " + providerClassName); > } > } > } > ``` > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L616 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18849) add "strict min ISR" to the docs of "min.insync.replicas"
[ https://issues.apache.org/jira/browse/KAFKA-18849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18849: --- Fix Version/s: 4.0.0 > add "strict min ISR" to the docs of "min.insync.replicas" > - > > Key: KAFKA-18849 > URL: https://issues.apache.org/jira/browse/KAFKA-18849 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Wei-Ting Chen >Priority: Minor > Fix For: 4.0.0 > > > see https://github.com/apache/kafka/pull/18880 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18868) add the "default value" explanation to the docs of num.replica.alter.log.dirs.threads
[ https://issues.apache.org/jira/browse/KAFKA-18868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18868: --- Fix Version/s: 4.0.0 > add the "default value" explanation to the docs of > num.replica.alter.log.dirs.threads > - > > Key: KAFKA-18868 > URL: https://issues.apache.org/jira/browse/KAFKA-18868 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Logan Zhu >Priority: Trivial > Fix For: 4.0.0 > > > The default value of {{num.replica.alter.log.dirs.threads}} is equal to the > number of log directories, but the documentation doesn't mention this. Users > only see a "null" default from the doc -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-18869) add remote storage threads to "Updating Thread Configs" section
[ https://issues.apache.org/jira/browse/KAFKA-18869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18869: --- Fix Version/s: 4.0.0 > add remote storage threads to "Updating Thread Configs" section > --- > > Key: KAFKA-18869 > URL: https://issues.apache.org/jira/browse/KAFKA-18869 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Priority: Trivial > Fix For: 4.0.0 > > > # remote.log.reader.threads > # remote.log.manager.copier.thread.pool.size > # remote.log.manager.expiration.thread.pool.size > those configs should be added -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18868: add the "default value" explanation to the docs of num.replica.alter.log.dirs.threads [kafka]
LoganZhuZzz commented on code in PR #19038: URL: https://github.com/apache/kafka/pull/19038#discussion_r1972652456 ## server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java: ## @@ -52,7 +52,8 @@ public class ServerConfigs { public static final String BACKGROUND_THREADS_DOC = "The number of threads to use for various background processing tasks"; public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG = "num.replica.alter.log.dirs.threads"; -public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The number of threads that can move replicas between log directories, which may include disk I/O"; +public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The number of threads that can move replicas between log directories, which may include disk I/O." + Review Comment: Thanks for that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
[ https://issues.apache.org/jira/browse/KAFKA-17037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930896#comment-17930896 ] Lan Ding commented on KAFKA-17037: -- Hi [~taijuwu] , If this ticket is still open, may I take it over? > KIP-919 supports for `describeClientQuotas` and `alterClientQuotas` > --- > > Key: KAFKA-17037 > URL: https://issues.apache.org/jira/browse/KAFKA-17037 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
[ https://issues.apache.org/jira/browse/KAFKA-17037 ] Lan Ding deleted comment on KAFKA-17037: -- was (Author: JIRAUSER30): Hi [~taijuwu] , If this ticket is still open, may I take it over? > KIP-919 supports for `describeClientQuotas` and `alterClientQuotas` > --- > > Key: KAFKA-17037 > URL: https://issues.apache.org/jira/browse/KAFKA-17037 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-18870) implement describeDelegationToken for controller
[ https://issues.apache.org/jira/browse/KAFKA-18870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TaiJuWu reassigned KAFKA-18870: --- Assignee: TaiJuWu > implement describeDelegationToken for controller > > > Key: KAFKA-18870 > URL: https://issues.apache.org/jira/browse/KAFKA-18870 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Major > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17651) Implement `describeUserScramCredentials`
[ https://issues.apache.org/jira/browse/KAFKA-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930897#comment-17930897 ] Lan Ding commented on KAFKA-17651: -- Hi [~frankvicky], If this ticket is still open, may I take it over? > Implement `describeUserScramCredentials` > > > Key: KAFKA-17651 > URL: https://issues.apache.org/jira/browse/KAFKA-17651 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > Fix For: 4.1.0 > > > Currently there is no implementation for `describeDelegationToken`, > `describeUserScramCredentials` and `unregisterBroker`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]
junrao commented on code in PR #18726: URL: https://github.com/apache/kafka/pull/18726#discussion_r1972659486 ## clients/src/main/resources/common/message/FetchResponse.json: ## @@ -106,7 +106,7 @@ ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId", "about": "The preferred read replica for the consumer to use on its next fetch request."}, -{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} Review Comment: My point is that before 4.0, the server could already include null records for certain errors. So the client already needs to deal with that. If it doesn't, it's a bug in the client that needs to be fixed. Once the client is fixed, it's not necessary to make the records non-null. We could consider changing it to be non-null, but it would be useful to think through if this change should be applied consistently to other places such as ShareFetchResponse and ProduceRequest. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
chia7712 commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1972658599 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); +props.put("config.providers", "file"); +props.put("config.providers.file.class", MockFileConfigProvider.class.getName()); +String id = UUID.randomUUID().toString(); Review Comment: it is totally fine to use java `UUID` as all we want to do is to generate a random string -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18870) implement describeDelegationToken for controller
[ https://issues.apache.org/jira/browse/KAFKA-18870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930898#comment-17930898 ] Szu-Yung Wang commented on KAFKA-18870: --- Hi, may I take this? > implement describeDelegationToken for controller > > > Key: KAFKA-18870 > URL: https://issues.apache.org/jira/browse/KAFKA-18870 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Major > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]
FrankYang0529 commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2686577937 @junrao updated PR description. Also, added reviewers to it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17039) KIP-919 supports for `unregisterBroker`
[ https://issues.apache.org/jira/browse/KAFKA-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930900#comment-17930900 ] Lan Ding commented on KAFKA-17039: -- Hi [~frankvicky], If this ticket is still open, may I take it over? > KIP-919 supports for `unregisterBroker` > --- > > Key: KAFKA-17039 > URL: https://issues.apache.org/jira/browse/KAFKA-17039 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
[ https://issues.apache.org/jira/browse/KAFKA-17037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930892#comment-17930892 ] Lan Ding commented on KAFKA-17037: -- Hi [~taijuwu], are you still working on this? I would like to take over otherwise. > KIP-919 supports for `describeClientQuotas` and `alterClientQuotas` > --- > > Key: KAFKA-17037 > URL: https://issues.apache.org/jira/browse/KAFKA-17037 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17607: Add CI step to verify LICENSE-binary [kafka]
xijiu commented on code in PR #18299: URL: https://github.com/apache/kafka/pull/18299#discussion_r1972668711 ## .github/workflows/build.yml: ## @@ -127,6 +127,9 @@ jobs: gradle-cache-read-only: ${{ !inputs.is-trunk }} gradle-cache-write-only: ${{ inputs.is-trunk }} develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }} + - name: Verify license file +if: always() Review Comment: @m1a2st Thanks for the code review, and you are right, I have fixed it, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17039) KIP-919 supports for `unregisterBroker`
[ https://issues.apache.org/jira/browse/KAFKA-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930903#comment-17930903 ] TengYao Chi commented on KAFKA-17039: - Hi [~isding_l] Thanks for the interest. However, I have already a local patch for this one. Feel free to browse other issue. :) > KIP-919 supports for `unregisterBroker` > --- > > Key: KAFKA-17039 > URL: https://issues.apache.org/jira/browse/KAFKA-17039 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > > as title -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]
ijuma commented on code in PR #18726: URL: https://github.com/apache/kafka/pull/18726#discussion_r1972672957 ## clients/src/main/resources/common/message/FetchResponse.json: ## @@ -106,7 +106,7 @@ ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId", "about": "The preferred read replica for the consumer to use on its next fetch request."}, -{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} Review Comment: @chia7712 The unsupported compression case is extremely rare because it needs a client that does not support zstd and the topic to be configured with zstd. Each is rare in isolation (compression is usually defined on the producer and zstd has been supported by clients since 2.1) and the combination is even more so. For @junrao's question, @chia7712 are you saying that we actually don't end up with null records in that case? I haven't checked it carefully yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID
[ https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lan Ding reassigned KAFKA-15283: Assignee: Lan Ding (was: Lianet Magrans) > Client support for OffsetFetch and OffsetCommit with topic ID > - > > Key: KAFKA-15283 > URL: https://issues.apache.org/jira/browse/KAFKA-15283 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lan Ding >Priority: Critical > Labels: kip-848-client-support, newbie, offset > Fix For: 4.1.0 > > > Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory > {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and > {{METADATA}} RPC calls. > With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in > the same way, so the new client implementation will provide it when issuing > those requests. Topic names should continue to be supported as needed by the > {{{}AdminClient{}}}. > We should also review/clean-up the support for topic names in requests such > as the {{METADATA}} request (currently supporting topic names as well as > topic IDs on the client side). > Tasks include: > * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will > be upgraded on the server to support topic ID > * Check topic ID propagation internally in the client based on RPCs > including it. > * Review existing support for topic name for potential clean if not needed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-18877) an mechanism to find cases where we accessed variables from the wrong thread
[ https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-18877: -- Assignee: TengYao Chi (was: Chia-Ping Tsai) > an mechanism to find cases where we accessed variables from the wrong thread > > > Key: KAFKA-18877 > URL: https://issues.apache.org/jira/browse/KAFKA-18877 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Major > > from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959] > There are some _non-thread safe_ classes storing the important information, > and so they are expected to be access by specific thread. Otherwise, it may > cause unexpected behavior -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]
chia7712 commented on code in PR #18726: URL: https://github.com/apache/kafka/pull/18726#discussion_r1972681634 ## clients/src/main/resources/common/message/FetchResponse.json: ## @@ -106,7 +106,7 @@ ]}, { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": false, "entityType": "brokerId", "about": "The preferred read replica for the consumer to use on its next fetch request."}, -{ "name": "Records", "type": "records", "versions": "0+", "nullableVersions": "0+", "about": "The record data."} Review Comment: > are you saying that we actually don't end up with null records in that case? A 3.9 server does return null records in extremely rare cases, but we don't end up with null records for newer clients as down-conversion does not happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18849: Add "strict min ISR" to the docs of "min.insync.replicas" [kafka]
ijuma commented on PR #19016: URL: https://github.com/apache/kafka/pull/19016#issuecomment-2686608878 I pushed a very minor tweak. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17645: KIP-1052: Enable warmup in producer performance test [kafka]
kirktrue commented on code in PR #17340: URL: https://github.com/apache/kafka/pull/17340#discussion_r1972681856 ## tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java: ## @@ -148,6 +175,7 @@ KafkaProducer createKafkaProducer(Properties props) { Callback cb; Review Comment: I know this is unrelated to these changes, but why are `cb` and `stats` declared at the instance level instead of just created inside `start()`? ## tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java: ## @@ -75,7 +76,13 @@ void start(String[] args) throws IOException { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; -stats = new Stats(config.numRecords, 5000); + +System.out.println("DEBUG: config.warmupRecords=" + config.warmupRecords + ", (config.warmupRecords > 0)=" + (config.warmupRecords > 0)); Review Comment: Just for clarity, this `DEBUG` is for development purposes and will be removed before merging? ## tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java: ## @@ -51,6 +51,7 @@ public class ProducerPerformance { public static final String DEFAULT_TRANSACTION_ID_PREFIX = "performance-producer-"; public static final long DEFAULT_TRANSACTION_DURATION_MS = 3000L; +public static final int DEFAULT_REPORTING_INTERVAL_MS = 5000; Review Comment: It seems like this constant could be folded into the `Stats` class and removed as a constructor parameter since it's never changed? ## tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java: ## @@ -94,7 +101,19 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); -cb = new PerfCallback(sendStartMs, payload.length, stats); Review Comment: Any reason not to change this to: ```java if (config.warmupRecords > 0 && i == config.warmupRecords) { steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, true); stats.steadyStateActive = true; } cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); ``` True, `steadyStateStats` will be `null` up until the number of warmup records has elapsed. As I read in `Stats`, passing in a `null` `Stats` object doesn't hurt. CMIIW. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]
chia7712 commented on code in PR #19039: URL: https://github.com/apache/kafka/pull/19039#discussion_r1972690127 ## clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java: ## @@ -450,6 +451,32 @@ public void testConfigProvidersPropsAsParam() { MockFileConfigProvider.assertClosed(id); } +@Test +public void testAutomaticConfigProvidersWithFullClassName() { +Properties props = new Properties(); Review Comment: Could you please do a bit refactor on this test? ```java // case0: MockFileConfigProvider is disallowed by org.apache.kafka.automatic.config.providers System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file"); assertThrows(ConfigException.class, () -> new TestIndirectConfigResolution(Map.of("config.providers", "file", "config.providers.file.class", MockFileConfigProvider.class.getName()), Collections.emptyMap())); // case1: MockFileConfigProvider is allowed by org.apache.kafka.automatic.config.providers System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName()); var props = Map.of("config.providers", "file", "config.providers.file.class", MockFileConfigProvider.class.getName(), "config.providers.file.param.testId", UUID.randomUUID().toString(), "test.key", "${file:/path:key}"); assertEquals("testKey", new TestIndirectConfigResolution(props, Collections.emptyMap()).originals().get("test.key")); // case2: MockFileConfigProvider and EnvVarConfigProvider are allowed by org.apache.kafka.automatic.config.providers System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName() + ", " + EnvVarConfigProvider.class.getName()); assertEquals("testKey", new TestIndirectConfigResolution(props, Collections.emptyMap()).originals().get("test.key")); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18869:add remote storage threads to "Updating Thread Configs" section [kafka]
chia7712 merged PR #19037: URL: https://github.com/apache/kafka/pull/19037 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18869:add remote storage threads to "Updating Thread Configs" section [kafka]
chia7712 commented on PR #19037: URL: https://github.com/apache/kafka/pull/19037#issuecomment-2686638814 cherry-pick to 4.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-18869) add remote storage threads to "Updating Thread Configs" section
[ https://issues.apache.org/jira/browse/KAFKA-18869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-18869. Resolution: Fixed trunk: [https://github.com/apache/kafka/commit/8bbca913efe260ba59c824801466f73584c46f8f] 4.0: https://github.com/apache/kafka/commit/32d012fd8e0329c7b696dff9487ea704481f94a7 > add remote storage threads to "Updating Thread Configs" section > --- > > Key: KAFKA-18869 > URL: https://issues.apache.org/jira/browse/KAFKA-18869 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Priority: Trivial > Fix For: 4.0.0 > > > # remote.log.reader.threads > # remote.log.manager.copier.thread.pool.size > # remote.log.manager.expiration.thread.pool.size > those configs should be added -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-18875) KRaft controller does not retry registration if the first attempt times out
[ https://issues.apache.org/jira/browse/KAFKA-18875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TaiJuWu resolved KAFKA-18875. - Resolution: Duplicate > KRaft controller does not retry registration if the first attempt times out > --- > > Key: KAFKA-18875 > URL: https://issues.apache.org/jira/browse/KAFKA-18875 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Fonai >Priority: Minor > > There is a [retry > mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274] > with exponential backoff built-in in KRaft controller registration. The > timeout of the first attempt is 5 s for KRaft controllers > ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448]) > which is not configurable. > If for some reason the controller's first registration request times out, the > attempt should be retried but in practice this does not happen and the > controller is not able to join the quorum. We see the following in the faulty > controller's log: > {noformat} > 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to > send ControllerRegistrationRequestData(controllerId=3, > incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, > listeners=[Listener(name='CONTROLPLANE-9090', > host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc', > port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', > minSupportedVersion=0, maxSupportedVersion=1), > Feature(name='metadata.version', minSupportedVersion=1, > maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) > [controller-3-registration-manager-event-handler] > ... > 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel > manager timed out before sending the request. > (kafka.server.ControllerRegistrationManager) > [controller-3-to-controller-registration-channel-manager] > 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting > for the previous RPC to complete. > (kafka.server.ControllerRegistrationManager) > [controller-3-registration-manager-event-handler] > {noformat} > After this we can not see any controller retry in the log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18859: honor the error message of UnregisterBrokerResponse [kafka]
chia7712 commented on code in PR #19027: URL: https://github.com/apache/kafka/pull/19027#discussion_r1972693467 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -642,9 +642,11 @@ class ControllerApis( def createResponseCallback(requestThrottleMs: Int, e: Throwable): UnregisterBrokerResponse = { if (e != null) { + val errors = Errors.forException(e) Review Comment: Could you please leverage `decommissionRequest.getErrorResponse(requestThrottleMs, e)`? ```java if (e != null) { decommissionRequest.getErrorResponse(requestThrottleMs, e) } else { new UnregisterBrokerResponse(new UnregisterBrokerResponseData(). setThrottleTimeMs(requestThrottleMs)) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17651) Implement `describeUserScramCredentials`
[ https://issues.apache.org/jira/browse/KAFKA-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TengYao Chi updated KAFKA-17651: Description: Currently there is no implementation for `describeUserScramCredentials` (was: Currently there is no implementation for `describeDelegationToken`, `describeUserScramCredentials` and `unregisterBroker`.) > Implement `describeUserScramCredentials` > > > Key: KAFKA-17651 > URL: https://issues.apache.org/jira/browse/KAFKA-17651 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > Fix For: 4.1.0 > > > Currently there is no implementation for `describeUserScramCredentials` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17651) Implement `describeUserScramCredentials`
[ https://issues.apache.org/jira/browse/KAFKA-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930930#comment-17930930 ] TengYao Chi commented on KAFKA-17651: - Hi [~isding_l] Thanks for offering to help, but I've already started working on this ticket. :) > Implement `describeUserScramCredentials` > > > Key: KAFKA-17651 > URL: https://issues.apache.org/jira/browse/KAFKA-17651 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: TengYao Chi >Priority: Major > Fix For: 4.1.0 > > > Currently there is no implementation for `describeUserScramCredentials` -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]
frankvicky commented on code in PR #17614: URL: https://github.com/apache/kafka/pull/17614#discussion_r1972757524 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -1151,31 +1153,28 @@ protected void handlePollTimeoutExpiry() { "either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + "returned in poll() with max.poll.records."); -maybeLeaveGroup("consumer poll timeout has expired."); +maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired."); } /** - * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership or is already - * not part of the group (ie does not have a valid member id, is in the UNJOINED state, or the coordinator is unknown). + * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this member is using static membership + * with the default consumer group membership operation, or is already not part of the group (i.e., does not have a + * valid member ID, is in the UNJOINED state, or the coordinator is unknown). * + * @param membershipOperation the operation on consumer group membership that the consumer will perform when closing * @param leaveReason the reason to leave the group for logging * @throws KafkaException if the rebalance callback throws exception */ -public synchronized RequestFuture maybeLeaveGroup(String leaveReason) { +public synchronized RequestFuture maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, String leaveReason) { RequestFuture future = null; -// Starting from 2.3, only dynamic members will send LeaveGroupRequest to the broker, -// consumer with valid group.instance.id is viewed as static member that never sends LeaveGroup, -// and the membership expiration is only controlled by session timeout. -if (isDynamicMember() && !coordinatorUnknown() && -state != MemberState.UNJOINED && generation.hasMemberId()) { -// this is a minimal effort attempt to leave the group. we do not -// attempt any resending if the request fails or times out. +if (rebalanceConfig.leaveGroupOnClose && shouldSendLeaveGroupRequest(membershipOperation)) { log.info("Member {} sending LeaveGroup request to coordinator {} due to {}", generation.memberId, coordinator, leaveReason); + LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder( rebalanceConfig.groupId, -Collections.singletonList(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason))) +List.of(new MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason))) Review Comment: Just my two cents. The client module's minimum JDK support is Java 11, so I think it's better to leverage the modern API. But this is not related to the topic of this patch, I will revert it in next commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]
k-raina commented on code in PR #19020: URL: https://github.com/apache/kafka/pull/19020#discussion_r1971936154 ## clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionExceptionHierarchyTest { + +@ParameterizedTest +@MethodSource("retriableExceptionsProvider") +void testRetriableExceptionExceptionHierarchy(Class exceptionClass) { +assertRetriableExceptionInheritance(exceptionClass); +} + +/** + * Verifies that the given exception class extends `RetriableException` + * and does **not** extend `RefreshRetriableException`. + * Using `RefreshRetriableException` changes the exception handling behavior, + * so only exceptions directly extending `RetriableException` are valid here. + * + * @param exceptionClass the exception class to check + */ +private void assertRetriableExceptionInheritance(Class exceptionClass) { Review Comment: Updated: [Addressed Feedback](https://github.com/apache/kafka/pull/19020/commits/85b3c4b7852806ebad06c2349009b9172c2870f9) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18875) KRaft controller does not retry registration if the first attempt times out
Daniel Fonai created KAFKA-18875: Summary: KRaft controller does not retry registration if the first attempt times out Key: KAFKA-18875 URL: https://issues.apache.org/jira/browse/KAFKA-18875 Project: Kafka Issue Type: Bug Reporter: Daniel Fonai There is a [retry mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274] with exponential backoff built-in in KRaft controller registration. The timeout of the first attempt is 5 s for KRaft controllers ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448]) which is not configurable. If for some reason the controller's first registration request times out, the attempt should be retried but in practice this does not happen and the controller is not able to join the quorum. We see the following in the faulty controller's log: {noformat} 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to send ControllerRegistrationRequestData(controllerId=3, incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, listeners=[Listener(name='CONTROLPLANE-9090', host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc', port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', minSupportedVersion=0, maxSupportedVersion=1), Feature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) [controller-3-registration-manager-event-handler] ... 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel manager timed out before sending the request. (kafka.server.ControllerRegistrationManager) [controller-3-to-controller-registration-channel-manager] 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting for the previous RPC to complete. (kafka.server.ControllerRegistrationManager) [controller-3-registration-manager-event-handler] {noformat} After this we can not see any controller retry in the log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out
[ https://issues.apache.org/jira/browse/KAFKA-18874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930751#comment-17930751 ] Ismael Juma commented on KAFKA-18874: - Thanks for the report. Can you specify the version you tested? > KRaft controller does not retry registration if the first attempt times out > --- > > Key: KAFKA-18874 > URL: https://issues.apache.org/jira/browse/KAFKA-18874 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Fonai >Priority: Minor > > There is a [retry > mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274] > with exponential backoff built-in in KRaft controller registration. The > timeout of the first attempt is 5 s for KRaft controllers > ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448]) > which is not configurable. > If for some reason the controller's first registration request times out, the > attempt should be retried but in practice this does not happen and the > controller is not able to join the quorum. We see the following in the faulty > controller's log: > {noformat} > 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to > send ControllerRegistrationRequestData(controllerId=3, > incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, > listeners=[Listener(name='CONTROLPLANE-9090', > host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc', > port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', > minSupportedVersion=0, maxSupportedVersion=1), > Feature(name='metadata.version', minSupportedVersion=1, > maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) > [controller-3-registration-manager-event-handler] > ... > 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel > manager timed out before sending the request. > (kafka.server.ControllerRegistrationManager) > [controller-3-to-controller-registration-channel-manager] > 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 > incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting > for the previous RPC to complete. > (kafka.server.ControllerRegistrationManager) > [controller-3-registration-manager-event-handler] > {noformat} > After this we can not see any controller retry in the log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]
lianetm merged PR #18989: URL: https://github.com/apache/kafka/pull/18989 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]
mumrah commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2685820634 Regarding: https://github.com/user-attachments/assets/096cf482-4701-4219-a8eb-c5613adfb390"; /> This was added in #18985 as part of the merge queue effort. It is not yet required for this check to pass, but if you want to fix the failed check you can add the "Reviewers:" line to the end of the PR body. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]
junrao commented on PR #18997: URL: https://github.com/apache/kafka/pull/18997#issuecomment-2685839532 @FrankYang0529 : Thanks for the updated description. Just a minor comment. This makes finalized features may be stale when processing registerBroker event. => This may make finalized features stale when processing registerBroker event. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18873) Incorrect error message for max.in.flight.requests.per.connection when using transactional producer.
[ https://issues.apache.org/jira/browse/KAFKA-18873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-18873: -- Component/s: producer > Incorrect error message for max.in.flight.requests.per.connection when using > transactional producer. > > > Key: KAFKA-18873 > URL: https://issues.apache.org/jira/browse/KAFKA-18873 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Reporter: Eslam Mohamed >Assignee: Eslam Mohamed >Priority: Minor > Original Estimate: 1h > Remaining Estimate: 1h > > > {code:java} > KafkaProducerTest.testInflightRequestsAndIdempotenceForIdempotentProduces{code} > {{Above unit test }}checks for configuration validation errors when > instantiating a {{ProducerConfig}} with invalid properties. One of the > assertions in this test "invalidProps4" is designed to validate the > constraint that {{max.in.flight.requests.per.connection}} must be at most > {{5}} when using a transactional producer. However, the error message thrown > by the {{ProducerConfig}} constructor in this scenario is incorrect. > * *Observed Behavior:* > When {{max.in.flight.requests.per.connection}} is set to {{6}} for a > transactional producer, the test expects an exception with the message: > {{"Must set max.in.flight.requests.per.connection to at most 5 when using the > transactional producer."}} > Instead, the error message states: > {{"Must set retries to non-zero when using the idempotent producer."}} > * *Expected Behavior:* > The error message should explicitly indicate the violation of the > {{max.in.flight.requests.per.connection}} constraint for transactional > producers: > {{"Must set max.in.flight.requests.per.connection to at most 5 when using the > transactional producer."}} > The mismatch in the error message can lead to confusion for developers > debugging the configuration error, as it incorrectly hints at a {{retries}} > configuration issue instead of the actual > {{max.in.flight.requests.per.connection}} issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18876) 4.0 documentation improvement
Jun Rao created KAFKA-18876: --- Summary: 4.0 documentation improvement Key: KAFKA-18876 URL: https://issues.apache.org/jira/browse/KAFKA-18876 Project: Kafka Issue Type: Improvement Affects Versions: 4.0.0 Reporter: Jun Rao We need to fix a few things in the 4.0 documentation. 6.10 Consumer Rebalance Protocol It's missing from the index on the left. ConsumerGroupPartitionAssignor is cut off in org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionA. 6.11 Transaction Protocol It's missing from the index on the left. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] WIP Investigate OOM [kafka]
mumrah commented on code in PR #19031: URL: https://github.com/apache/kafka/pull/19031#discussion_r1972112034 ## build.gradle: ## @@ -54,7 +54,7 @@ ext { buildVersionFileName = "kafka-version.properties" defaultMaxHeapSize = "2g" - defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"] + defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC", "-XX:-UseGCOverheadLimit"] Review Comment: @ijuma WDYT about disabling this feature? From what I can tell, this will prevent a long GC pause from triggering an OOM. Instead, the build would likely just timeout (which it's doing anyways with the OOM happing in the Gradle worker). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16092) Queues for Kafka
[ https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield updated KAFKA-16092: - Fix Version/s: (was: 4.1.0) > Queues for Kafka > > > Key: KAFKA-16092 > URL: https://issues.apache.org/jira/browse/KAFKA-16092 > Project: Kafka > Issue Type: New Feature >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Labels: queues-for-kafka > Attachments: image-2024-04-28-11-05-56-153.png > > > This Jira tracks the development of KIP-932: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out
Daniel Fonai created KAFKA-18874: Summary: KRaft controller does not retry registration if the first attempt times out Key: KAFKA-18874 URL: https://issues.apache.org/jira/browse/KAFKA-18874 Project: Kafka Issue Type: Bug Reporter: Daniel Fonai There is a [retry mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274] with exponential backoff built-in in KRaft controller registration. The timeout of the first attempt is 5 s for KRaft controllers ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448]) which is not configurable. If for some reason the controller's first registration request times out, the attempt should be retried but in practice this does not happen and the controller is not able to join the quorum. We see the following in the faulty controller's log: {noformat} 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to send ControllerRegistrationRequestData(controllerId=3, incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, listeners=[Listener(name='CONTROLPLANE-9090', host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc', port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', minSupportedVersion=0, maxSupportedVersion=1), Feature(name='metadata.version', minSupportedVersion=1, maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) [controller-3-registration-manager-event-handler] ... 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel manager timed out before sending the request. (kafka.server.ControllerRegistrationManager) [controller-3-to-controller-registration-channel-manager] 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting for the previous RPC to complete. (kafka.server.ControllerRegistrationManager) [controller-3-registration-manager-event-handler] {noformat} After this we can not see any controller retry in the log. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] WIP Investigate OOM [kafka]
mumrah commented on PR #19031: URL: https://github.com/apache/kafka/pull/19031#issuecomment-2685589737 It looks like the UserQuotaTest is the likely culprit. From https://github.com/apache/kafka/actions/runs/13523225366/job/37791507296 ``` Gradle Test Run :core:test > Gradle Test Executor 38 > UserQuotaTest > testQuotaOverrideDelete(String, String) > testQuotaOverrideDelete(String, String).quorum=kraft.groupProtocol=consumer STARTED > Task :storage:compileTestJava Unexpected exception thrown. org.gradle.internal.remote.internal.MessageIOException: Could not read message from '/127.0.0.1:50402'. at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:99) at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:48) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded ``` and from https://github.com/apache/kafka/actions/runs/13534291834/job/37823471305 ``` Gradle Test Run :core:test > Gradle Test Executor 37 > UserQuotaTest > testQuotaOverrideDelete(String, String) > testQuotaOverrideDelete(String, String).quorum=kraft.groupProtocol=consumer STARTED > Task :storage:checkstyleMain > Task :shell:checkstyleMain Unexpected exception thrown. org.gradle.internal.remote.internal.MessageIOException: Could not read message from '/127.0.0.1:38838'. at org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:99) at org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270) at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) at org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:48) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded ``` It appears that the Gradle worker is trying to send results to the main process and causing a long GC pause which triggers this "GC overhead limit exceeded" error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]
k-raina commented on code in PR #19020: URL: https://github.com/apache/kafka/pull/19020#discussion_r1971934155 ## clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionExceptionHierarchyTest { + +@ParameterizedTest +@MethodSource("retriableExceptionsProvider") +void testRetriableExceptionExceptionHierarchy(Class exceptionClass) { +assertRetriableExceptionInheritance(exceptionClass); +} + +/** + * Verifies that the given exception class extends `RetriableException` + * and does **not** extend `RefreshRetriableException`. + * Using `RefreshRetriableException` changes the exception handling behavior, + * so only exceptions directly extending `RetriableException` are valid here. + * + * @param exceptionClass the exception class to check + */ +private void assertRetriableExceptionInheritance(Class exceptionClass) { Review Comment: Updated: [Addressed Feedback](https://github.com/apache/kafka/pull/19020/commits/85b3c4b7852806ebad06c2349009b9172c2870f9) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17565: Move MetadataCache interface to metadata module [kafka]
FrankYang0529 commented on PR #18801: URL: https://github.com/apache/kafka/pull/18801#issuecomment-2685571786 @mumrah I update benchmark to PR description. The result is a little different from yours. The difference between trunk and branch results on my laptop is close, so I think the result is acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]
k-raina commented on code in PR #19020: URL: https://github.com/apache/kafka/pull/19020#discussion_r1971934155 ## clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java: ## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TransactionExceptionHierarchyTest { + +@ParameterizedTest +@MethodSource("retriableExceptionsProvider") +void testRetriableExceptionExceptionHierarchy(Class exceptionClass) { +assertRetriableExceptionInheritance(exceptionClass); +} + +/** + * Verifies that the given exception class extends `RetriableException` + * and does **not** extend `RefreshRetriableException`. + * Using `RefreshRetriableException` changes the exception handling behavior, + * so only exceptions directly extending `RetriableException` are valid here. + * + * @param exceptionClass the exception class to check + */ +private void assertRetriableExceptionInheritance(Class exceptionClass) { Review Comment: Updated: [Addressed Feedback](https://github.com/apache/kafka/pull/19020/commits/85b3c4b7852806ebad06c2349009b9172c2870f9) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18723) KRaft must handle corrupted records in the fetch response
[ https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-18723: --- Fix Version/s: 4.0.1 > KRaft must handle corrupted records in the fetch response > - > > Key: KAFKA-18723 > URL: https://issues.apache.org/jira/browse/KAFKA-18723 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: José Armando García Sancio >Assignee: José Armando García Sancio >Priority: Major > Fix For: 3.9.1, 3.8.2, 3.7.3, 4.0.1 > > > It is possible for a KRaft replica to send corrupted records to the fetching > replicas in the FETCH response. This is because there is a race between when > the FETCH response gets generated by the KRaft IO thread and when the network > thread, or linux kernel, reads the byte position in the log segment. > This race can generated corrupted records if the KRaft replica performed a > truncation after the FETCH response was created but before the network thread > read the bytes from the log segment. > I have seen the following errors: > {code:java} > [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] > org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - > Encountered fatal fault: Unexpected error in raft IO thread > org.apache.kafka.common.KafkaException: Append failed unexpectedly > at > kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117) > at > kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110) > at > org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227) > at > org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209) > at > org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644) > at > org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770) > at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355) > at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71) > at > org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code} > and > {code:java} > [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] > org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - > Encountered fatal fault: Unexpected error in raft IO thread" > org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less > than the minimum record overhead (14)"{code} > This race also exists with Kafka's ISR based topic partition. In that case > the replica fetcher catches all CorruptRecordException and > InvalidRecordException. > {code:java} > } catch { > case ime@(_: CorruptRecordException | _: > InvalidRecordException) => > // we log the error and continue. This ensures two > things > // 1. If there is a corrupt message in a topic > partition, it does not bring the fetcher thread > // down and cause other topic partition to also lag > // 2. If the message is corrupt due to a transient > state in the log (truncation, partial writes > // can cause this), we simply continue and should > get fixed in the subsequent fetches > error(s"Found invalid messages during fetch for > partition $topicPartition " + > s"offset ${currentFetchState.fetchOffset}", ime) > partitionsWithError += topicPartition > {code} > The KRaft implementation doesn't handle this case: > {code:java} > } else { > Records records = > FetchResponse.recordsOrFail(partitionResponse); > if (records.sizeInBytes() > 0) { > appendAsFollower(records); > } > OptionalLong highWatermark = > partitionResponse.highWatermark() < 0 ? > OptionalLong.empty() : > OptionalLong.of(partitionResponse.highWatermark()); > updateFollowerHighWatermark(state, highWatermark); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)