Re: [PR] MINOR: Remove unused system test code and avoid misleading `quorum.zk` references [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19034:
URL: https://github.com/apache/kafka/pull/19034#discussion_r1971614754


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -1362,7 +1356,7 @@ def alter_message_format(self, topic, msg_format_version, 
node=None):
 def kafka_acls_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None, 
override_command_config = None):
 if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
 raise Exception("Must invoke kafka-acls against a broker, not a 
KRaft controller")
-force_use_zk_connection = force_use_zk_connection or not 
self.all_nodes_acl_command_supports_bootstrap_server
+force_use_zk_connection = force_use_zk_connection

Review Comment:
   may be we can remove this parameter, and clean up the if condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2025-02-26 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930701#comment-17930701
 ] 

Lianet Magrans commented on KAFKA-15283:


Hi [~isding_l] , not working on it because the broker-side changes are not in 
place yet. Once it supports topic IDs on the OffsetFetch/Commit request, then 
we need to take on this one for the client side support. That being said, feel 
free to take it if you're interested, this will probably be for 4.1 depending 
on the broker. 

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lianet Magrans
>Priority: Critical
>  Labels: kip-848-client-support, newbie, offset
> Fix For: 4.1.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] Clean up coordinator-common and server modules [kafka]

2025-02-26 Thread via GitHub


sjhajharia commented on PR #19009:
URL: https://github.com/apache/kafka/pull/19009#issuecomment-2685660580

   hey @chia7712 
   gentle reminder for this PR
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out

2025-02-26 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-18874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-18874:
---
Component/s: controller

> KRaft controller does not retry registration if the first attempt times out
> ---
>
> Key: KAFKA-18874
> URL: https://issues.apache.org/jira/browse/KAFKA-18874
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: Daniel Fonai
>Priority: Minor
>
> There is a [retry 
> mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
>  with exponential backoff built-in in KRaft controller registration. The 
> timeout of the first attempt is 5 s for KRaft controllers 
> ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
>  which is not configurable.
> If for some reason the controller's first registration request times out, the 
> attempt should be retried but in practice this does not happen and the 
> controller is not able to join the quorum. We see the following in the faulty 
> controller's log:
> {noformat}
> 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
> send ControllerRegistrationRequestData(controllerId=3, 
> incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
> listeners=[Listener(name='CONTROLPLANE-9090', 
> host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
>  port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
> minSupportedVersion=0, maxSupportedVersion=1), 
> Feature(name='metadata.version', minSupportedVersion=1, 
> maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> ...
> 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
> manager timed out before sending the request. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-to-controller-registration-channel-manager]
> 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
> for the previous RPC to complete. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> {noformat}
> After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove old message format documentation [kafka]

2025-02-26 Thread via GitHub


ijuma merged PR #19033:
URL: https://github.com/apache/kafka/pull/19033


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove old message format documentation [kafka]

2025-02-26 Thread via GitHub


ijuma commented on PR #19033:
URL: https://github.com/apache/kafka/pull/19033#issuecomment-2685752393

   Cherry-picked to 4.0 as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17431: Support invalid static configs for KRaft so long as dynamic configs are valid [kafka]

2025-02-26 Thread via GitHub


cmccabe commented on code in PR #18949:
URL: https://github.com/apache/kafka/pull/18949#discussion_r1972230877


##
core/src/main/scala/kafka/raft/RaftManager.scala:
##
@@ -104,6 +104,8 @@ trait RaftManager[T] {
   def replicatedLog: ReplicatedLog
 
   def voterNode(id: Int, listener: ListenerName): Option[Node]
+
+  def getRecordSerde: RecordSerde[T]

Review Comment:
   In Kafka we generally don't put 'get' in front of getters. So this method 
should just be `recordSerde`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API… [kafka]

2025-02-26 Thread via GitHub


dongnuo123 opened a new pull request, #19042:
URL: https://github.com/apache/kafka/pull/19042

   … must check topic describe (#18989)
   
   This patch filters out the topic describe unauthorized topics from the 
ConsumerGroupHeartbeat and ConsumerGroupDescribe response.
   
   In ConsumerGroupHeartbeat,
   - if the request has `subscribedTopicNames` set, we directly check the authz 
in `KafkaApis` and return a topic auth failure in the response if any of the 
topics is denied.
   - Otherwise, we check the authz only if a regex refresh is triggered and we 
do it based on the acl of the consumer that triggered the refresh. If any of 
the topic is denied, we filter it out from the resolved subscription.
   
   In ConsumerGroupDescribe, we check the authz of the coordinator response. If 
any of the topic in the group is denied, we remove the described info and add a 
topic auth failure to the described group. (similar to the group auth failure)
   
   Reviewers: David Jacot , Lianet Magrans 
, Rajini Sivaram , 
Chia-Ping Tsai , TaiJuWu , TengYao Chi 

   
   (cherry picked from commit 36f19057e1d57a8548a4548c304799fd176c359f)
   
   Delete this text and replace it with a detailed description of your change. 
The 
   PR title and body will become the squashed commit message.
   
   If you would like to tag individuals, add some commentary, upload images, or
   include other supplemental information that should not be part of the 
eventual
   commit message, please use a separate comment.
   
   If applicable, please include a summary of the testing strategy (including 
   rationale) for the proposed change. Unit and/or integration tests are 
expected
   for any behavior change and system tests should be considered for larger
   changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API… [kafka]

2025-02-26 Thread via GitHub


dongnuo123 commented on code in PR #19042:
URL: https://github.com/apache/kafka/pull/19042#discussion_r1972238070


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##
@@ -158,29 +160,28 @@ public Builder withGroupConfigManager(GroupConfigManager 
groupConfigManager) {
 return this;
 }
 
+public Builder withAuthorizer(Optional authorizer) {
+this.authorizer = authorizer;
+return this;
+}
+
 public GroupCoordinatorService build() {
-if (config == null)
-throw new IllegalArgumentException("Config must be set.");
-if (writer == null)
-throw new IllegalArgumentException("Writer must be set.");
-if (loader == null)
-throw new IllegalArgumentException("Loader must be set.");
-if (time == null)
-throw new IllegalArgumentException("Time must be set.");
-if (timer == null)
-throw new IllegalArgumentException("Timer must be set.");
-if (coordinatorRuntimeMetrics == null)
-throw new IllegalArgumentException("CoordinatorRuntimeMetrics 
must be set.");
-if (groupCoordinatorMetrics == null)
-throw new IllegalArgumentException("GroupCoordinatorMetrics 
must be set.");
-if (groupConfigManager == null)
-throw new IllegalArgumentException("GroupConfigManager must be 
set.");
+requireNonNull(config, new IllegalArgumentException("Config must 
be set."));
+requireNonNull(writer, new IllegalArgumentException("Writer must 
be set."));
+requireNonNull(loader, new IllegalArgumentException("Loader must 
be set."));
+requireNonNull(time, new IllegalArgumentException("Time must be 
set."));
+requireNonNull(timer, new IllegalArgumentException("Timer must be 
set."));
+requireNonNull(coordinatorRuntimeMetrics, new 
IllegalArgumentException("CoordinatorRuntimeMetrics must be set."));
+requireNonNull(groupCoordinatorMetrics, new 
IllegalArgumentException("GroupCoordinatorMetrics must be set."));
+requireNonNull(groupConfigManager, new 
IllegalArgumentException("GroupConfigManager must be set."));
+requireNonNull(authorizer, new 
IllegalArgumentException("Authorizer must be set."));

Review Comment:
   Bring in `requrieNonNull` in advance because of checkstyle error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssignment and AbstractCoordinatorTest [kafka]

2025-02-26 Thread via GitHub


lianetm commented on code in PR #18945:
URL: https://github.com/apache/kafka/pull/18945#discussion_r1971704382


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -165,16 +165,18 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
Metrics metrics,
String metricGrpPrefix,
Time time) {
-this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, 
time, Optional.empty());
+this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, 
time, Optional.empty(), Optional.empty());
 }
 
+@SuppressWarnings("this-escape")

Review Comment:
   I guess we could we avoid suppressing here if we fix how we create the 
`HeartbeatThread::new` in this constructor? (ex. we could do it lazily on 
`startHeartbeatThreadIfNeeded`, and just keep an Optional ref to the supplier 
here)



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.KafkaThread;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base class for heartbeat threads. This class provides a mechanism to 
enable/disable the heartbeat thread.
+ * The heartbeat thread should check whether it's enabled by calling {@link 
BaseHeartbeatThread#isEnabled()}
+ * before sending heartbeat requests.
+ */
+public class BaseHeartbeatThread extends KafkaThread implements AutoCloseable {

Review Comment:
   should we add a basic unit test for this class (simple test just to make 
sure that the all actions flip the right flag, ex. isEnabled() true after 
calling enable(), etc.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] WIP Investigate OOM [kafka]

2025-02-26 Thread via GitHub


mumrah commented on PR #19031:
URL: https://github.com/apache/kafka/pull/19031#issuecomment-2686054397

   Seems to have reproduced here: 
https://github.com/apache/kafka/actions/runs/13550598471/job/37873138268?pr=19031
   
   No activity for a while after
   ```
   Wed, 26 Feb 2025 18:35:41 GMT > Task :streams:test-utils:copyDependantLibs
   Wed, 26 Feb 2025 18:52:51 GMT > Task :streams:test-utils:jar
   Wed, 26 Feb 2025 18:55:06 GMT > Task :connect:runtime:compileJava
   ```
   
   This suggests that `-XX:-UseGCOverheadLimit` is working as expected. 
However, it also suggests that there is a real memory leak or something. This 
run included a larger heap of 3gb. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


Rancho-7 commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1971674751


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");

Review Comment:
   same.
   
https://github.com/apache/kafka/blob/4b5a16bf6ffcb4322d7c5b35a484ff914397909c/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java#L522



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


Rancho-7 commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1971671660


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");
+props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+String id = UUID.randomUUID().toString();

Review Comment:
   Thanks for the review! I have some questions that I noticed we use Java UUID 
in some other tests in this file.Should we keep the same approach or not?
   
https://github.com/apache/kafka/blob/4b5a16bf6ffcb4322d7c5b35a484ff914397909c/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java#L544



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


Rancho-7 commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1971676812


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");
+props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+String id = UUID.randomUUID().toString();
+props.put("config.providers.file.param.testId", id);
+props.put("test.key", "${file:/path:key}");
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
"file");
+assertThrows(ConfigException.class, () -> new 
TestIndirectConfigResolution(props, Collections.emptyMap()));
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName());
+TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Collections.emptyMap());
+assertEquals("testKey", config.originals().get("test.key"));
+MockFileConfigProvider.assertClosed(id);
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY,
+MockFileConfigProvider.class.getName() + "," +
+
"org.apache.kafka.common.config.provider.EnvVarConfigProvider");

Review Comment:
   Thanks for pointing that out! I will fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


Rancho-7 commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1971676812


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");
+props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+String id = UUID.randomUUID().toString();
+props.put("config.providers.file.param.testId", id);
+props.put("test.key", "${file:/path:key}");
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
"file");
+assertThrows(ConfigException.class, () -> new 
TestIndirectConfigResolution(props, Collections.emptyMap()));
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName());
+TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Collections.emptyMap());
+assertEquals("testKey", config.originals().get("test.key"));
+MockFileConfigProvider.assertClosed(id);
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY,
+MockFileConfigProvider.class.getName() + "," +
+
"org.apache.kafka.common.config.provider.EnvVarConfigProvider");

Review Comment:
   Thanks for pointing that out! Fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Disallow unused local variables [kafka]

2025-02-26 Thread via GitHub


mumrah commented on code in PR #18963:
URL: https://github.com/apache/kafka/pull/18963#discussion_r1971693097


##
tools/src/main/java/org/apache/kafka/tools/ShareConsumerPerformance.java:
##
@@ -79,7 +79,8 @@ public static void main(String[] args) {
 shareConsumers.forEach(shareConsumer -> 
shareConsumersMetrics.add(shareConsumer.metrics()));
 }
 shareConsumers.forEach(shareConsumer -> {
-Map> val = 
shareConsumer.commitSync();
+@SuppressWarnings("UnusedLocalVariable")
+Map> ignored = 
shareConsumer.commitSync();

Review Comment:
   I completely missed that this was in `tools`. I agree that the approach here 
(the explicit suppression) is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]

2025-02-26 Thread via GitHub


dongnuo123 commented on code in PR #18989:
URL: https://github.com/apache/kafka/pull/18989#discussion_r1971754326


##
core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala:
##
@@ -561,5 +568,17 @@ abstract class EndToEndAuthorizationTest extends 
IntegrationTestHarness with Sas
   case _: TopicAuthorizationException => consumeRecords(consumer, 
numRecords, startingOffset, topic)
 }
   }
+
+  // Consume records, ignoring TopicAuthorization exception from previously 
sent request.
+  private def consumeRecordsIgnoreAuthorizationException(consumer: 
Consumer[Array[Byte], Array[Byte]],
+numRecords: Int = 1,
+startingOffset: Int = 0,
+topic: String = topic): Unit = {
+try {
+  consumeRecords(consumer, numRecords, startingOffset, topic)
+} catch {
+  case _: TopicAuthorizationException => 
consumeRecordsIgnoreAuthorizationException(consumer, numRecords, 
startingOffset, topic)

Review Comment:
   Yes agree. This is simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18329) Delete old group coordinator (KIP-848)

2025-02-26 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930718#comment-17930718
 ] 

Ismael Juma commented on KAFKA-18329:
-

Why 4.2.0? What's stopping us from doing it now?

> Delete old group coordinator (KIP-848)
> --
>
> Key: KAFKA-18329
> URL: https://issues.apache.org/jira/browse/KAFKA-18329
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 4.2.0
>
>
> In 4.2.0, we should be able to delete the old group coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Remove unused system test code and avoid misleading `quorum.zk` references [kafka]

2025-02-26 Thread via GitHub


ijuma commented on code in PR #19034:
URL: https://github.com/apache/kafka/pull/19034#discussion_r1971797966


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -1362,7 +1356,7 @@ def alter_message_format(self, topic, msg_format_version, 
node=None):
 def kafka_acls_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None, 
override_command_config = None):
 if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
 raise Exception("Must invoke kafka-acls against a broker, not a 
KRaft controller")
-force_use_zk_connection = force_use_zk_connection or not 
self.all_nodes_acl_command_supports_bootstrap_server
+force_use_zk_connection = force_use_zk_connection

Review Comment:
   You are right, `force_use_zk_connection` is always `false` for this method. 
Pushed a commit that removes that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18858: Refactor FeatureControlManager to avoid using uninitialized MV [kafka]

2025-02-26 Thread via GitHub


FrankYang0529 opened a new pull request, #19040:
URL: https://github.com/apache/kafka/pull/19040

   The `FeatureControlManager` used `MetadataVersion#LATEST_PRODUCTION`
   as uninitialized MV. This makes other component may get a stale MV.
   In production code, the `FeatureControlManager` set MV when replaying
   `FeatureLevelRecord`, so we can set `Optional.empty()` as uninitialized MV.
   If other components get an empty result, the `FeatureLevelRecord` throws
   an exception like `FeaturesImage`.
   
   Unit test:
   * FeatureControlManagerTest#testMetadataVersion: test getting MetadataVersion
 before and after replaying FeatureLevelRecord.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]

2025-02-26 Thread via GitHub


lianetm commented on code in PR #18989:
URL: https://github.com/apache/kafka/pull/18989#discussion_r1971805902


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2592,6 +2608,38 @@ class KafkaApis(val requestChannel: RequestChannel,
 response.groups.addAll(results)
   }
 
+  // Clients are not allowed to see topics that are not authorized for 
Describe.
+  val topicsToCheck = authorizer match {
+case Some(_) =>
+  response.groups.stream()
+.flatMap(group => group.members.stream)
+.flatMap(member => util.stream.Stream.of(member.assignment, 
member.targetAssignment))
+.flatMap(assignment => assignment.topicPartitions.stream)
+.map(topicPartition => topicPartition.topicName)
+.collect(Collectors.toSet[String])
+.asScala
+case None => Set.empty[String]
+  }
+  val authorizedTopics = 
authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC,
+topicsToCheck)(identity)
+  val updatedGroups = response.groups.stream().map { group =>

Review Comment:
   this is also only needed if there is an authorizer right? Moreover, having 
like this, if there is no authorizer, don't we end up now with empty 
`authorizedTopics`, iterate here and end up with hasUnauthorizedTopic=true? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove unused system test code and avoid misleading `quorum.zk` references [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19034:
URL: https://github.com/apache/kafka/pull/19034#discussion_r1971599561


##
tests/kafkatest/services/kafka/kafka.py:
##
@@ -1362,7 +1356,7 @@ def alter_message_format(self, topic, msg_format_version, 
node=None):
 def kafka_acls_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None, 
override_command_config = None):
 if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
 raise Exception("Must invoke kafka-acls against a broker, not a 
KRaft controller")
-force_use_zk_connection = force_use_zk_connection or not 
self.all_nodes_acl_command_supports_bootstrap_server
+force_use_zk_connection = force_use_zk_connection

Review Comment:
   We should remove this line?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


frankvicky commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1971633273


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");
+props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+String id = UUID.randomUUID().toString();
+props.put("config.providers.file.param.testId", id);
+props.put("test.key", "${file:/path:key}");
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
"file");
+assertThrows(ConfigException.class, () -> new 
TestIndirectConfigResolution(props, Collections.emptyMap()));
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName());
+TestIndirectConfigResolution config = new 
TestIndirectConfigResolution(props, Collections.emptyMap());
+assertEquals("testKey", config.originals().get("test.key"));
+MockFileConfigProvider.assertClosed(id);
+
+System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY,
+MockFileConfigProvider.class.getName() + "," +
+
"org.apache.kafka.common.config.provider.EnvVarConfigProvider");

Review Comment:
   `EnvVarConfigProvider.class.getName()`



##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");
+props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+String id = UUID.randomUUID().toString();

Review Comment:
   We could use the UUID class defined by Kafka.



##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");

Review Comment:
   We could use `AbstractConfig.CONFIG_PROVIDERS_CONFIG` instead of plaintext.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18868: add the "default value" explanation to the docs of num.replica.alter.log.dirs.threads [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19038:
URL: https://github.com/apache/kafka/pull/19038#discussion_r1971566712


##
server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java:
##
@@ -52,7 +52,8 @@ public class ServerConfigs {
 public static final String BACKGROUND_THREADS_DOC = "The number of threads 
to use for various background processing tasks";
 
 public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG = 
"num.replica.alter.log.dirs.threads";
-public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The 
number of threads that can move replicas between log directories, which may 
include disk I/O";
+public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The 
number of threads that can move replicas between log directories, which may 
include disk I/O." +

Review Comment:
   Please add a space at the end of the sentence. 
   
   ![CleanShot 2025-02-26 at 21 19 
10](https://github.com/user-attachments/assets/493782b1-358f-4677-a79d-25492ddaf9e5)
   



##
server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java:
##
@@ -52,7 +52,8 @@ public class ServerConfigs {
 public static final String BACKGROUND_THREADS_DOC = "The number of threads 
to use for various background processing tasks";
 
 public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG = 
"num.replica.alter.log.dirs.threads";
-public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The 
number of threads that can move replicas between log directories, which may 
include disk I/O";
+public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The 
number of threads that can move replicas between log directories, which may 
include disk I/O." +
+"The default value is equal to the number of directories specified 
in the " + ServerLogConfigs.LOG_DIRS_CONFIG + " configuration property.";

Review Comment:
   We should add code bolck for `ServerLogConfigs.LOG_DIRS_CONFIG`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17607: Add CI step to verify LICENSE-binary [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #18299:
URL: https://github.com/apache/kafka/pull/18299#discussion_r1971829502


##
.github/workflows/build.yml:
##
@@ -127,6 +127,9 @@ jobs:
   gradle-cache-read-only: ${{ !inputs.is-trunk }}
   gradle-cache-write-only: ${{ inputs.is-trunk }}
   develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+  - name: Verify license file
+if: always()

Review Comment:
   `always()` means this will run even if a previous step failed. I think we 
don't need this in this step, WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18614, KAFKA-18613: Add streams group request plumbing [kafka]

2025-02-26 Thread via GitHub


lucasbru merged PR #18979:
URL: https://github.com/apache/kafka/pull/18979


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove old message format documentation [kafka]

2025-02-26 Thread via GitHub


ijuma commented on code in PR #19033:
URL: https://github.com/apache/kafka/pull/19033#discussion_r1971831709


##
docs/implementation.html:
##
@@ -103,67 +103,8 @@ 5.3.3 Old Message Format
 
-Prior to Kafka 0.11, messages were transferred and stored in 
message sets. In a message set, each message has its own metadata. Note 
that although message sets are represented as an array,
-they are not preceded by an int32 array size like other array elements 
in the protocol.
+Prior to Kafka 0.11, messages were transferred and stored in 
message sets. See https://kafka.apache.org/39/documentation/#messageset";>Old Message 
Format for more details.

Review Comment:
   Right, the KIP covers the edge cases in detail and is linked from the 
upgrade notes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18651) Core streams-specific broker configurations

2025-02-26 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-18651:
---
Fix Version/s: 4.1

> Core streams-specific broker configurations
> ---
>
> Key: KAFKA-18651
> URL: https://issues.apache.org/jira/browse/KAFKA-18651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 4.1
>
>
> We need to add the following core subset of broker configurations (some may 
> already be there):
> {{Broker configs:}}
> {{group.coordinator.rebalance.protocols – add value streams to enable streams 
> rebalance protocol}}
> {{group.streams.session.timeout.ms}}
> {{group.streams.heartbeat.interval.ms}}
> {{group.streams.num.standby.replicas}}
> {{group.streams.max.size}}
>  
> We need to make sure that `num.standby.replicas` is passed with 
> `assignmentConfigs` to the assignor.
> As defined in the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18651) Core streams-specific broker configurations

2025-02-26 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy updated KAFKA-18651:
---
Fix Version/s: 4.1.0
   (was: 4.1)

> Core streams-specific broker configurations
> ---
>
> Key: KAFKA-18651
> URL: https://issues.apache.org/jira/browse/KAFKA-18651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Lucas Brutschy
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 4.1.0
>
>
> We need to add the following core subset of broker configurations (some may 
> already be there):
> {{Broker configs:}}
> {{group.coordinator.rebalance.protocols – add value streams to enable streams 
> rebalance protocol}}
> {{group.streams.session.timeout.ms}}
> {{group.streams.heartbeat.interval.ms}}
> {{group.streams.num.standby.replicas}}
> {{group.streams.max.size}}
>  
> We need to make sure that `num.standby.replicas` is passed with 
> `assignmentConfigs` to the assignor.
> As defined in the KIP.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


Rancho-7 opened a new pull request, #19039:
URL: https://github.com/apache/kafka/pull/19039

   jira: https://issues.apache.org/jira/browse/KAFKA-18850
   
   `org.apache.kafka.automatic.config.providers = env` is incorrect. According 
to the source code, the correct value should be the [class 
name](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L616).
 
   
   fix the doc and add unit test to verify.
   
   https://github.com/user-attachments/assets/f3f0f19d-8ef5-4950-85e5-3c7c868447a9";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add README.md test command [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19025:
URL: https://github.com/apache/kafka/pull/19025#discussion_r1971727446


##
README.md:
##
@@ -75,6 +76,7 @@ Retries are disabled by default, but you can set 
maxTestRetryFailures and maxTes
 The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 
to enable a failed test to be retried once, with a total retry limit of 3.
 
 ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3
+./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 
-PmaxTestRetryFailures=3

Review Comment:
   I don't think we need to add this command in this section, this section is 
focus on `Specifying test retries`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add README.md test command [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19025:
URL: https://github.com/apache/kafka/pull/19025#discussion_r1971727446


##
README.md:
##
@@ -75,6 +76,7 @@ Retries are disabled by default, but you can set 
maxTestRetryFailures and maxTes
 The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 
to enable a failed test to be retried once, with a total retry limit of 3.
 
 ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3
+./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 
-PmaxTestRetryFailures=3

Review Comment:
   I don't think we need to add this command in this section, this section is 
focus on "Specifying test retries", and an example is already provided



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16580: Enable dynamic quorum reconfiguration for raft simulation tests pt 1 [kafka]

2025-02-26 Thread via GitHub


kevin-wu24 commented on code in PR #18987:
URL: https://github.com/apache/kafka/pull/18987#discussion_r1971722138


##
raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java:
##
@@ -1127,14 +1331,75 @@ private MajorityReachedHighWatermark(Cluster cluster) {
 
 @Override
 public void verify() {
-cluster.leaderHighWatermark().ifPresent(highWatermark -> {
-long numReachedHighWatermark = 
cluster.nodes.entrySet().stream()
-.filter(entry -> 
cluster.voters.containsKey(entry.getKey()))
-.filter(entry -> entry.getValue().log.endOffset().offset() 
>= highWatermark)
-.count();
-assertTrue(
-numReachedHighWatermark >= cluster.majoritySize(),
-"Insufficient nodes have reached current high watermark");
+if (cluster.withKip853) {
+/*
+* For clusters running in KIP-853 mode, we check that a 
majority of at least one of:
+* 1. the leader's voter set at the HWM

Review Comment:
   Sorry, upon further looking into this, I think the issue is just slightly 
different that what I've described thus far.
   
   Essentially, when we fail the invariant check when only using 
`lastVoterSet()`, what's going on is that the caller of `verify()` is a 
different thread (lets say event scheduler thread) than the KRaft leader's 
raft-io thread which is continuously doing all of this internal state updating. 
The event scheduler thread is looking at a bunch of the leader's internal state 
(e.g. partitionState and highWatermark), which can definitely be in the 
following state: `partitionState` has been updated with a new voter set, but a 
new `highWatermark` value has not yet been calculated with this voter set yet, 
which could cause this invariant check to fail when only looking at 
`lastVoterSet()`. The leader may not be in this state for very long, since it's 
in this state in between the calls to `appendAsLeader -> updateState -> 
maybeLoadLog` and `flushLeaderLog -> maybeUpdateHighWatermark`, but it 
nevertheless looks like a valid state to me.
   
   Invariants as a concept are predicates that should be true for the system at 
all times, no matter when you check them, so I think performing invariant 
verification how it's currently implemented is fine, as we could be checking 
the internal states of our raft nodes at any point in their execution. However, 
it just means we have to consider these "intermediary" states when performing 
verification.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add README.md test command [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19025:
URL: https://github.com/apache/kafka/pull/19025#discussion_r1971727446


##
README.md:
##
@@ -75,6 +76,7 @@ Retries are disabled by default, but you can set 
maxTestRetryFailures and maxTes
 The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 
to enable a failed test to be retried once, with a total retry limit of 3.
 
 ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3
+./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 
-PmaxTestRetryFailures=3

Review Comment:
   I don't think we need to add this command in this section, this section is 
focus on "Specifying test retries", and there is already and example



##
README.md:
##
@@ -75,6 +76,7 @@ Retries are disabled by default, but you can set 
maxTestRetryFailures and maxTes
 The following example declares -PmaxTestRetries=1 and -PmaxTestRetryFailures=3 
to enable a failed test to be retried once, with a total retry limit of 3.
 
 ./gradlew test -PmaxTestRetries=1 -PmaxTestRetryFailures=3
+./gradlew test -Pkafka.test.run.flaky=true -PmaxTestRetries=1 
-PmaxTestRetryFailures=3

Review Comment:
   I don't think we need to add this command in this section, this section is 
focus on "Specifying test retries", and there is already an example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-18873) Incorrect error message for max.in.flight.requests.per.connection when using transactional producer.

2025-02-26 Thread Eslam Mohamed (Jira)
Eslam Mohamed created KAFKA-18873:
-

 Summary: Incorrect error message for 
max.in.flight.requests.per.connection when using transactional producer.
 Key: KAFKA-18873
 URL: https://issues.apache.org/jira/browse/KAFKA-18873
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Eslam Mohamed
Assignee: Eslam Mohamed


 
{code:java}
KafkaProducerTest.testInflightRequestsAndIdempotenceForIdempotentProduces{code}

{{Above unit test }}checks for configuration validation errors when 
instantiating a {{ProducerConfig}} with invalid properties. One of the 
assertions in this test "invalidProps4" is designed to validate the constraint 
that {{max.in.flight.requests.per.connection}} must be at most {{5}} when using 
a transactional producer. However, the error message thrown by the 
{{ProducerConfig}} constructor in this scenario is incorrect.
 * *Observed Behavior:*
When {{max.in.flight.requests.per.connection}} is set to {{6}} for a 
transactional producer, the test expects an exception with the message:
{{"Must set max.in.flight.requests.per.connection to at most 5 when using the 
transactional producer."}}
Instead, the error message states:
{{"Must set retries to non-zero when using the idempotent producer."}}

 * *Expected Behavior:*
The error message should explicitly indicate the violation of the 
{{max.in.flight.requests.per.connection}} constraint for transactional 
producers:
{{"Must set max.in.flight.requests.per.connection to at most 5 when using the 
transactional producer."}}

The mismatch in the error message can lead to confusion for developers 
debugging the configuration error, as it incorrectly hints at a {{retries}} 
configuration issue instead of the actual 
{{max.in.flight.requests.per.connection}} issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-18872) Convert StressTestLog and TestLinearWriteSpeed to jmh benchmarks

2025-02-26 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930717#comment-17930717
 ] 

Ismael Juma commented on KAFKA-18872:
-

Given that this is exercising the log layer, not sure JMH is appropriate. JMH 
is typically used for CPU bound benchmarks.

> Convert StressTestLog and TestLinearWriteSpeed to jmh benchmarks
> 
>
> Key: KAFKA-18872
> URL: https://issues.apache.org/jira/browse/KAFKA-18872
> Project: Kafka
>  Issue Type: Task
>Reporter: Mickael Maison
>Priority: Major
>
> Both StressTestLog and TestLinearWriteSpeed are in the jmh-benchmarks module 
> but are not actually benchmarks.
> We can keep the main method if people want but it would be helpful to be able 
> to run them as benchmarks to ensure changes to our log layer does not 
> negatively impact performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]

2025-02-26 Thread via GitHub


m1a2st commented on code in PR #19020:
URL: https://github.com/apache/kafka/pull/19020#discussion_r1971884683


##
clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionExceptionHierarchyTest {
+
+@ParameterizedTest
+@MethodSource("retriableExceptionsProvider")
+void testRetriableExceptionExceptionHierarchy(Class 
exceptionClass) {
+assertRetriableExceptionInheritance(exceptionClass);
+}
+
+/**
+ * Verifies that the given exception class extends `RetriableException`
+ * and does **not** extend `RefreshRetriableException`.
+ * Using `RefreshRetriableException` changes the exception handling 
behavior,
+ * so only exceptions directly extending `RetriableException` are valid 
here.
+ *
+ * @param exceptionClass the exception class to check
+ */
+private void assertRetriableExceptionInheritance(Class exceptionClass) {
+assertTrue(RetriableException.class.isAssignableFrom(exceptionClass),
+exceptionClass.getSimpleName() + " should extend 
RetriableException");
+
assertFalse(RefreshRetriableException.class.isAssignableFrom(exceptionClass),
+exceptionClass.getSimpleName() + " should NOT extend 
RefreshRetriableException");
+}
+
+private static Stream> 
retriableExceptionsProvider() {
+return Stream.of(
+TimeoutException.class,
+NotEnoughReplicasException.class,
+CoordinatorLoadInProgressException.class,
+CorruptRecordException.class,
+NotEnoughReplicasAfterAppendException.class,
+ConcurrentTransactionsException.class
+);
+}

Review Comment:
   Please use `@ValueSource` instead of `@MethodSource`.



##
clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionExceptionHierarchyTest {
+
+@ParameterizedTest
+@MethodSource("retriableExceptionsProvider")
+void testRetriableExceptionExceptionHierarchy(Class 
exceptionClass) {
+assertRetriableExceptionInheritance(exceptionClass);
+}
+
+/**
+ * Verifies that the given exception class extends `RetriableException`
+ * and does **not** extend `RefreshRetriableException`.
+ * Using `RefreshRetriableException` changes the exception handling 
behavior,
+ * so only exceptions directly extending `RetriableException` are valid 
here.
+ *
+ * @param exceptionClass the exception class to check
+ */
+private void assertRetriableExceptionInheritance(Class exceptionClass) {

Review Comment:
   P

[jira] [Commented] (KAFKA-18877) an mechanism to find cases where we accessed variables from the wrong thread

2025-02-26 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930871#comment-17930871
 ] 

David Arthur commented on KAFKA-18877:
--

I think the most likely unsafe access is reading from a control manager from 
the request thread (like in the referenced PR). In some cases, it is safe to 
access a control manager's state from outside the controller thread (like in 
processBrokerHeartbeat), but in most cases it isn't. It would be great if we 
could easily differentiate safe vs unsafe access in the control managers. 

 

Maybe we could split the control managers classes into two interfaces 
(controller-thread-only and any-thread) so we could use the type system to 
ensure safe access. 

 

Another idea is to do some runtime check from inside the control managers 
themselves. "if (current thread != controller) panic"

 

> an mechanism to find cases where we accessed variables from the wrong thread
> 
>
> Key: KAFKA-18877
> URL: https://issues.apache.org/jira/browse/KAFKA-18877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959]
> There are some _non-thread safe_ classes storing the important information, 
> and so they are expected to be access by specific thread.  Otherwise, it may 
> cause unexpected behavior



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18849: Add "strict min ISR" to the docs of "min.insync.replicas" [kafka]

2025-02-26 Thread via GitHub


clarkwtc commented on code in PR #19016:
URL: https://github.com/apache/kafka/pull/19016#discussion_r1972555921


##
server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java:
##
@@ -78,6 +78,7 @@ public final class ServerTopicConfigSynonyms {
 sameNameWithLogPrefix(TopicConfig.CLEANUP_POLICY_CONFIG),
 sameName(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG),
 sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG),
+sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_DOC),

Review Comment:
   I was going the wrong way.
   Sure, we can directly reference `MIN_IN_SYNC_REPLICAS_DOC`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18849: Add "strict min ISR" to the docs of "min.insync.replicas" [kafka]

2025-02-26 Thread via GitHub


clarkwtc commented on code in PR #19016:
URL: https://github.com/apache/kafka/pull/19016#discussion_r1972555921


##
server-common/src/main/java/org/apache/kafka/server/config/ServerTopicConfigSynonyms.java:
##
@@ -78,6 +78,7 @@ public final class ServerTopicConfigSynonyms {
 sameNameWithLogPrefix(TopicConfig.CLEANUP_POLICY_CONFIG),
 sameName(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG),
 sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG),
+sameName(TopicConfig.MIN_IN_SYNC_REPLICAS_DOC),

Review Comment:
   I was going the wrong way.
   Sure, we can directly reference MIN_IN_SYNC_REPLICAS_DOC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18734: Implemented share partition metrics (KIP-1103) [kafka]

2025-02-26 Thread via GitHub


apoorvmittal10 opened a new pull request, #19045:
URL: https://github.com/apache/kafka/pull/19045

   The PR implements the SharePartitionMetrics as defined in KIP-1103, with one 
change. The metric `FetchLockRatio` is defined as `Meter` in KIP but is 
implemented as `HIstogram`. There was a discussion about same on KIP-1103 
discussion where we thought that `FetchLockRatio` is pre-aggregated but while 
implemeting the rate from `Meter` can go above 100 as `Meter` defines rate per 
time period. Hence it makes more sense to implement metric `FetchLockRatio` as 
`Histogram`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-18863) Runtime additions for connector multiversioning.

2025-02-26 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-18863.
-
Fix Version/s: 4.1.0
   Resolution: Fixed

> Runtime additions for connector multiversioning.
> 
>
> Key: KAFKA-18863
> URL: https://issues.apache.org/jira/browse/KAFKA-18863
> Project: Kafka
>  Issue Type: New Feature
>  Components: connect
>Affects Versions: 4.1.0
>Reporter: Snehashis Pal
>Assignee: Snehashis Pal
>Priority: Major
> Fix For: 4.1.0
>
>
> Updates to connect worker to support connector multi versioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18878) Implement share session cache metrics for share fetch

2025-02-26 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-18878:
-

 Summary: Implement share session cache metrics for share fetch
 Key: KAFKA-18878
 URL: https://issues.apache.org/jira/browse/KAFKA-18878
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18878) Implement ShareSessionCache and DelayedShareFetchMetrics metrics for share fetch

2025-02-26 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal updated KAFKA-18878:
--
Summary: Implement ShareSessionCache and DelayedShareFetchMetrics metrics 
for share fetch  (was: Implement share session cache metrics for share fetch)

> Implement ShareSessionCache and DelayedShareFetchMetrics metrics for share 
> fetch
> 
>
> Key: KAFKA-18878
> URL: https://issues.apache.org/jira/browse/KAFKA-18878
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18863: Connect Multiversion Support (Versioned Connector Creation and related changes) [kafka]

2025-02-26 Thread via GitHub


gharris1727 merged PR #17743:
URL: https://github.com/apache/kafka/pull/17743


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-02-26 Thread via GitHub


ableegoldman commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1972542173


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##
@@ -275,6 +284,24 @@ public int memberEpoch() {
 return memberEpoch;
 }
 
+/**
+ * @return the operation the consumer will perform on leaving the group.
+ *
+ * @see CloseOptions.GroupMembershipOperation
+ */
+public CloseOptions.GroupMembershipOperation leaveGroupOperation() {
+return leaveGroupOperation;
+}
+
+/**
+ * Sets the operation on consumer group membership that the consumer will 
perform when closing.
+ * The {@link AbstractMembershipManager#leaveGroupOperation} should remain 
{@code GroupMembershipOperation.DEFAULT}
+ * until the consumer is closed.
+ *
+ * @param operation the operation to be performed on close
+ */
+public abstract void 
leaveGroupOperationOnClose(CloseOptions.GroupMembershipOperation operation);

Review Comment:
   nit: call this `#leaveGroupOperationOnClose`



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java:
##
@@ -469,8 +498,14 @@ public int joinGroupEpoch() {
  */
 @Override
 public int leaveGroupEpoch() {
+if 
(CloseOptions.GroupMembershipOperation.LEAVE_GROUP.equals(leaveGroupOperation)) 
{
+return ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
+} else if 
(CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP.equals(leaveGroupOperation))
 {
+return 
ConsumerGroupHeartbeatRequest.LEAVE_GROUP_STATIC_MEMBER_EPOCH;
+}

Review Comment:
   why do we need to add this? IIUC the static member vs dynamic member leave 
group epoch should still only be based on whether the group instance id is set 
-- I assume if we do skip the leave group then we just wouldn't call this 
method to begin with (though I'm not super familiar with the "leave group 
epoch", just know it's part of the new rebalancing protocol)



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1151,31 +1153,28 @@ protected void handlePollTimeoutExpiry() {
 "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
 "returned in poll() with max.poll.records.");
 
-maybeLeaveGroup("consumer poll timeout has expired.");
+maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
 }
 
 /**
- * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership or is already
- * not part of the group (ie does not have a valid member id, is in the 
UNJOINED state, or the coordinator is unknown).
+ * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership
+ * with the default consumer group membership operation, or is already not 
part of the group (i.e., does not have a
+ * valid member ID, is in the UNJOINED state, or the coordinator is 
unknown).
  *
+ * @param membershipOperation the operation on consumer group membership 
that the consumer will perform when closing
  * @param leaveReason the reason to leave the group for logging
  * @throws KafkaException if the rebalance callback throws exception
  */
-public synchronized RequestFuture maybeLeaveGroup(String 
leaveReason) {
+public synchronized RequestFuture 
maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, 
String leaveReason) {
 RequestFuture future = null;
 
-// Starting from 2.3, only dynamic members will send LeaveGroupRequest 
to the broker,
-// consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
-// and the membership expiration is only controlled by session timeout.
-if (isDynamicMember() && !coordinatorUnknown() &&
-state != MemberState.UNJOINED && generation.hasMemberId()) {
-// this is a minimal effort attempt to leave the group. we do not
-// attempt any resending if the request fails or times out.
+if (rebalanceConfig.leaveGroupOnClose && 
shouldSendLeaveGroupRequest(membershipOperation)) {
 log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
 generation.memberId, coordinator, leaveReason);
+
 LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
 rebalanceConfig.groupId,
-Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
+List.of(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))

Review Comme

[jira] [Commented] (KAFKA-18877) an mechanism to find cases where we accessed variables from the wrong thread

2025-02-26 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930883#comment-17930883
 ] 

TengYao Chi commented on KAFKA-18877:
-

Hi [~chia7712] 

If you are not start working, I would like to take over it. :)

> an mechanism to find cases where we accessed variables from the wrong thread
> 
>
> Key: KAFKA-18877
> URL: https://issues.apache.org/jira/browse/KAFKA-18877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959]
> There are some _non-thread safe_ classes storing the important information, 
> and so they are expected to be access by specific thread.  Otherwise, it may 
> cause unexpected behavior



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15636) Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics

2025-02-26 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15636?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930891#comment-17930891
 ] 

Kirk True commented on KAFKA-15636:
---

Can we do both :) Process the first fetch and make sure it equals 
bytesConsumedTotal and perform another fetch to make sure the average is as 
expected?

> Investigate FetcherTest's/FetchRequestManager's testFetchResponseMetrics
> 
>
> Key: KAFKA-15636
> URL: https://issues.apache.org/jira/browse/KAFKA-15636
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Assignee: Shivsundar R
>Priority: Minor
>  Labels: consumer-threading-refactor, fetcher, unit-tests
>
> {{expectedBytes}} is calculated as total, instead of avg. Is this correct?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]

2025-02-26 Thread via GitHub


chia7712 commented on code in PR #18726:
URL: https://github.com/apache/kafka/pull/18726#discussion_r1972645207


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -106,7 +106,7 @@
 ]},
 { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"default": "-1", "ignorable": false, "entityType": "brokerId",
   "about": "The preferred read replica for the consumer to use on its 
next fetch request."},
-{ "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}

Review Comment:
   > It seems that before 
https://github.com/apache/kafka/commit/fe56fc98fa736c79c9dcbb1f64f810065161a1cc,
 we already have a bunch of calls like the following that will leave records as 
null in the fetchResponse.
   
   in the normal path, `FetchResponse.recordsOrFail(partitionData)` will return 
empty record to replace null.
   ```
   public static Records recordsOrFail(FetchResponseData.PartitionData 
partition) {
   if (partition.records() == null) return MemoryRecords.EMPTY;
   if (partition.records() instanceof Records) return (Records) 
partition.records();
   throw new ClassCastException("The record type is " + 
partition.records().getClass().getSimpleName() + ", which is not a subtype of " 
+
   Records.class.getSimpleName() + ". This method is only safe to 
call if the `FetchResponse` was deserialized from bytes.");
   }
   ```
   
   
https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L848
   
   However, your comment inspired me to notice that we do have a path which 
returns null records. See the following links – it could return 
`FetchResponse.partitionResponse(tp, Errors.XXX)` when converting the records.
   
   
https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L835
   
https://github.com/apache/kafka/blob/3.9/core/src/main/scala/kafka/server/KafkaApis.scala#L884
   
   ```
   val downConvertMagic =
 logConfig.map(_.recordVersion.value).flatMap { magic =>
   if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1)
 Some(RecordBatch.MAGIC_VALUE_V0)
   else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3)
 Some(RecordBatch.MAGIC_VALUE_V1)
   else
 None
 }
   ```
   However, a 4.0 consumer should never receive a null record because it cannot 
use fetch versions v0-v3. Conversely, the server rejects v0-v3 requests, and 
the down-conversion process is also removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18876) 4.0 documentation improvement

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18876:
---
Fix Version/s: 4.0.0

> 4.0 documentation improvement
> -
>
> Key: KAFKA-18876
> URL: https://issues.apache.org/jira/browse/KAFKA-18876
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 4.0.0
>Reporter: Jun Rao
>Assignee: Mingdao Yang
>Priority: Major
> Fix For: 4.0.0
>
>
> We need to fix a few things in the 4.0 documentation.
>  
> 6.10 Consumer Rebalance Protocol 
> It's missing from the index on the left.
>  
> ConsumerGroupPartitionAssignor is cut off in 
> org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionA.
>  
> 6.11 Transaction Protocol
> It's missing from the index on the left.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`

2025-02-26 Thread TaiJuWu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930895#comment-17930895
 ] 

TaiJuWu commented on KAFKA-17037:
-

Hi [~isding_l] , this ticket was finished for a long time but it lacks 
reviewers.
After finding any reviewer, I will continue to work on this.

> KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
> ---
>
> Key: KAFKA-17037
> URL: https://issues.apache.org/jira/browse/KAFKA-17037
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-18276 Migrate RebootstrapTest to new test infra [kafka]

2025-02-26 Thread via GitHub


clarkwtc opened a new pull request, #19046:
URL: https://github.com/apache/kafka/pull/19046

   
   Migrate RebootstrapTest to new test infra and remove the old Scala test.
   
   The test results 
   https://github.com/user-attachments/assets/35812c96-13f7-41f8-829d-40947f768646";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18850) Fix the docs of org.apache.kafka.automatic.config.providers

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18850:
---
Fix Version/s: 4.0.0

> Fix the docs of org.apache.kafka.automatic.config.providers
> ---
>
> Key: KAFKA-18850
> URL: https://issues.apache.org/jira/browse/KAFKA-18850
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Nick Guo
>Priority: Major
> Fix For: 4.0.0
>
>
> It seems to me that =evn is incorrect. According to the source code, the 
> correct value should be the class name. for example: 
> `org.apache.kafka.common.config.provider.EnvVarConfigProvider`
> ```
> for (String provider : configProviders.split(",")) {
> String providerClass = providerClassProperty(provider);
> if (indirectConfigs.containsKey(providerClass)) {
> String providerClassName = indirectConfigs.get(providerClass);
> if (classNameFilter.test(providerClassName)) {
> providerMap.put(provider, providerClassName);
> } else {
> throw new ConfigException(providerClassName + " is not 
> allowed. Update System property '"
> + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to 
> allow " + providerClassName);
> }
> }
> }
> ```
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L616



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18849) add "strict min ISR" to the docs of "min.insync.replicas"

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18849:
---
Fix Version/s: 4.0.0

> add "strict min ISR" to the docs of "min.insync.replicas"
> -
>
> Key: KAFKA-18849
> URL: https://issues.apache.org/jira/browse/KAFKA-18849
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Wei-Ting Chen
>Priority: Minor
> Fix For: 4.0.0
>
>
> see https://github.com/apache/kafka/pull/18880



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18868) add the "default value" explanation to the docs of num.replica.alter.log.dirs.threads

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18868:
---
Fix Version/s: 4.0.0

> add the "default value" explanation to the docs of 
> num.replica.alter.log.dirs.threads
> -
>
> Key: KAFKA-18868
> URL: https://issues.apache.org/jira/browse/KAFKA-18868
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Logan Zhu
>Priority: Trivial
> Fix For: 4.0.0
>
>
> The default value of {{num.replica.alter.log.dirs.threads}} is equal to the 
> number of log directories, but the documentation doesn't mention this. Users 
> only see a "null" default from the doc



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-18869) add remote storage threads to "Updating Thread Configs" section

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18869:
---
Fix Version/s: 4.0.0

> add remote storage threads to "Updating Thread Configs" section
> ---
>
> Key: KAFKA-18869
> URL: https://issues.apache.org/jira/browse/KAFKA-18869
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Priority: Trivial
> Fix For: 4.0.0
>
>
> # remote.log.reader.threads
>  # remote.log.manager.copier.thread.pool.size
>  # remote.log.manager.expiration.thread.pool.size
> those configs should be added



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18868: add the "default value" explanation to the docs of num.replica.alter.log.dirs.threads [kafka]

2025-02-26 Thread via GitHub


LoganZhuZzz commented on code in PR #19038:
URL: https://github.com/apache/kafka/pull/19038#discussion_r1972652456


##
server/src/main/java/org/apache/kafka/server/config/ServerConfigs.java:
##
@@ -52,7 +52,8 @@ public class ServerConfigs {
 public static final String BACKGROUND_THREADS_DOC = "The number of threads 
to use for various background processing tasks";
 
 public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_CONFIG = 
"num.replica.alter.log.dirs.threads";
-public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The 
number of threads that can move replicas between log directories, which may 
include disk I/O";
+public static final String NUM_REPLICA_ALTER_LOG_DIRS_THREADS_DOC = "The 
number of threads that can move replicas between log directories, which may 
include disk I/O." +

Review Comment:
   Thanks for that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`

2025-02-26 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930896#comment-17930896
 ] 

Lan Ding commented on KAFKA-17037:
--

Hi [~taijuwu] ,  If this ticket is still open, may I take it over?

> KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
> ---
>
> Key: KAFKA-17037
> URL: https://issues.apache.org/jira/browse/KAFKA-17037
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`

2025-02-26 Thread Lan Ding (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-17037 ]


Lan Ding deleted comment on KAFKA-17037:
--

was (Author: JIRAUSER30):
Hi [~taijuwu] ,  If this ticket is still open, may I take it over?

> KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
> ---
>
> Key: KAFKA-17037
> URL: https://issues.apache.org/jira/browse/KAFKA-17037
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-18870) implement describeDelegationToken for controller

2025-02-26 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu reassigned KAFKA-18870:
---

Assignee: TaiJuWu

> implement describeDelegationToken for controller
> 
>
> Key: KAFKA-18870
> URL: https://issues.apache.org/jira/browse/KAFKA-18870
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Major
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17651) Implement `describeUserScramCredentials`

2025-02-26 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930897#comment-17930897
 ] 

Lan Ding commented on KAFKA-17651:
--

Hi [~frankvicky],  If this ticket is still open, may I take it over?

> Implement `describeUserScramCredentials`
> 
>
> Key: KAFKA-17651
> URL: https://issues.apache.org/jira/browse/KAFKA-17651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: TengYao Chi
>Assignee: TengYao Chi
>Priority: Major
> Fix For: 4.1.0
>
>
> Currently there is no implementation for `describeDelegationToken`, 
> `describeUserScramCredentials` and `unregisterBroker`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]

2025-02-26 Thread via GitHub


junrao commented on code in PR #18726:
URL: https://github.com/apache/kafka/pull/18726#discussion_r1972659486


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -106,7 +106,7 @@
 ]},
 { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"default": "-1", "ignorable": false, "entityType": "brokerId",
   "about": "The preferred read replica for the consumer to use on its 
next fetch request."},
-{ "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}

Review Comment:
   My point is that before 4.0, the server could already include null records 
for certain errors. So the client already needs to deal with that. If it 
doesn't, it's a bug in the client that needs to be fixed. Once the client is 
fixed, it's not necessary to make the records non-null. We could consider 
changing it to be non-null, but it would be useful to think through if this 
change should be applied consistently to other places such as 
ShareFetchResponse and ProduceRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


chia7712 commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1972658599


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +450,33 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();
+props.put("config.providers", "file");
+props.put("config.providers.file.class", 
MockFileConfigProvider.class.getName());
+String id = UUID.randomUUID().toString();

Review Comment:
   it is totally fine to use java `UUID` as all we want to do is to generate a 
random string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18870) implement describeDelegationToken for controller

2025-02-26 Thread Szu-Yung Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930898#comment-17930898
 ] 

Szu-Yung Wang commented on KAFKA-18870:
---

Hi, may I take this?

> implement describeDelegationToken for controller
> 
>
> Key: KAFKA-18870
> URL: https://issues.apache.org/jira/browse/KAFKA-18870
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Major
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]

2025-02-26 Thread via GitHub


FrankYang0529 commented on PR #18997:
URL: https://github.com/apache/kafka/pull/18997#issuecomment-2686577937

   @junrao updated PR description. Also, added reviewers to it. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17039) KIP-919 supports for `unregisterBroker`

2025-02-26 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930900#comment-17930900
 ] 

Lan Ding commented on KAFKA-17039:
--

Hi [~frankvicky],  If this ticket is still open, may I take it over?

> KIP-919 supports for `unregisterBroker`
> ---
>
> Key: KAFKA-17039
> URL: https://issues.apache.org/jira/browse/KAFKA-17039
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17037) KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`

2025-02-26 Thread Lan Ding (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930892#comment-17930892
 ] 

Lan Ding commented on KAFKA-17037:
--

Hi [~taijuwu], are you still working on this? I would like to take over 
otherwise.

> KIP-919 supports for `describeClientQuotas` and `alterClientQuotas`
> ---
>
> Key: KAFKA-17037
> URL: https://issues.apache.org/jira/browse/KAFKA-17037
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17607: Add CI step to verify LICENSE-binary [kafka]

2025-02-26 Thread via GitHub


xijiu commented on code in PR #18299:
URL: https://github.com/apache/kafka/pull/18299#discussion_r1972668711


##
.github/workflows/build.yml:
##
@@ -127,6 +127,9 @@ jobs:
   gradle-cache-read-only: ${{ !inputs.is-trunk }}
   gradle-cache-write-only: ${{ inputs.is-trunk }}
   develocity-access-key: ${{ secrets.DEVELOCITY_ACCESS_KEY }}
+  - name: Verify license file
+if: always()

Review Comment:
   @m1a2st  Thanks for the code review, and you are right, I have fixed it, PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17039) KIP-919 supports for `unregisterBroker`

2025-02-26 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930903#comment-17930903
 ] 

TengYao Chi commented on KAFKA-17039:
-

Hi [~isding_l] 

Thanks for the interest.

However, I have already a local patch for this one.

Feel free to browse other issue. :)

> KIP-919 supports for `unregisterBroker`
> ---
>
> Key: KAFKA-17039
> URL: https://issues.apache.org/jira/browse/KAFKA-17039
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]

2025-02-26 Thread via GitHub


ijuma commented on code in PR #18726:
URL: https://github.com/apache/kafka/pull/18726#discussion_r1972672957


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -106,7 +106,7 @@
 ]},
 { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"default": "-1", "ignorable": false, "entityType": "brokerId",
   "about": "The preferred read replica for the consumer to use on its 
next fetch request."},
-{ "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}

Review Comment:
   @chia7712 The unsupported compression case is extremely rare because it 
needs a client that does not support zstd and the topic to be configured with 
zstd. Each is rare in isolation (compression is usually defined on the producer 
and zstd has been supported by clients since 2.1) and the combination is even 
more so.
   
   For @junrao's question, @chia7712 are you saying that we actually don't end 
up with null records in that case? I haven't checked it carefully yet.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15283) Client support for OffsetFetch and OffsetCommit with topic ID

2025-02-26 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding reassigned KAFKA-15283:


Assignee: Lan Ding  (was: Lianet Magrans)

> Client support for OffsetFetch and OffsetCommit with topic ID
> -
>
> Key: KAFKA-15283
> URL: https://issues.apache.org/jira/browse/KAFKA-15283
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Lan Ding
>Priority: Critical
>  Labels: kip-848-client-support, newbie, offset
> Fix For: 4.1.0
>
>
> Currently, {{KafkaConsumer}} keeps track of topic IDs in the in-memory 
> {{ConsumerMetadata}} object, and they are provided to the {{FETCH}} and 
> {{METADATA}} RPC calls.
> With KIP-848 the OffsetFetch and OffsetCommit will start using topic IDs in 
> the same way, so the new client implementation will provide it when issuing 
> those requests. Topic names should continue to be supported as needed by the 
> {{{}AdminClient{}}}.
> We should also review/clean-up the support for topic names in requests such 
> as the {{METADATA}} request (currently supporting topic names as well as 
> topic IDs on the client side).
> Tasks include:
>  * Introduce Topic ID in existing OffsetFetch and OffsetCommit API that will 
> be upgraded on the server to support topic ID
>  * Check topic ID propagation internally in the client based on RPCs 
> including it.
>  * Review existing support for topic name for potential clean if not needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-18877) an mechanism to find cases where we accessed variables from the wrong thread

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-18877:
--

Assignee: TengYao Chi  (was: Chia-Ping Tsai)

> an mechanism to find cases where we accessed variables from the wrong thread
> 
>
> Key: KAFKA-18877
> URL: https://issues.apache.org/jira/browse/KAFKA-18877
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Major
>
> from [https://github.com/apache/kafka/pull/18997#pullrequestreview-2645589959]
> There are some _non-thread safe_ classes storing the important information, 
> and so they are expected to be access by specific thread.  Otherwise, it may 
> cause unexpected behavior



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18646: Null records in fetch response breaks librdkafka [kafka]

2025-02-26 Thread via GitHub


chia7712 commented on code in PR #18726:
URL: https://github.com/apache/kafka/pull/18726#discussion_r1972681634


##
clients/src/main/resources/common/message/FetchResponse.json:
##
@@ -106,7 +106,7 @@
 ]},
 { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", 
"default": "-1", "ignorable": false, "entityType": "brokerId",
   "about": "The preferred read replica for the consumer to use on its 
next fetch request."},
-{ "name": "Records", "type": "records", "versions": "0+", 
"nullableVersions": "0+", "about": "The record data."}

Review Comment:
   >  are you saying that we actually don't end up with null records in that 
case?
   
   A 3.9 server does return null records in extremely rare cases, but we don't 
end up with null records for newer clients as down-conversion does not happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18849: Add "strict min ISR" to the docs of "min.insync.replicas" [kafka]

2025-02-26 Thread via GitHub


ijuma commented on PR #19016:
URL: https://github.com/apache/kafka/pull/19016#issuecomment-2686608878

   I pushed a very minor tweak.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17645: KIP-1052: Enable warmup in producer performance test [kafka]

2025-02-26 Thread via GitHub


kirktrue commented on code in PR #17340:
URL: https://github.com/apache/kafka/pull/17340#discussion_r1972681856


##
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##
@@ -148,6 +175,7 @@ KafkaProducer 
createKafkaProducer(Properties props) {
 Callback cb;

Review Comment:
   I know this is unrelated to these changes, but why are `cb` and `stats` 
declared at the instance level instead of just created inside `start()`?



##
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##
@@ -75,7 +76,13 @@ void start(String[] args) throws IOException {
 // not thread-safe, do not share with other threads
 SplittableRandom random = new SplittableRandom(0);
 ProducerRecord record;
-stats = new Stats(config.numRecords, 5000);
+
+System.out.println("DEBUG: config.warmupRecords=" + 
config.warmupRecords + ", (config.warmupRecords > 0)=" + (config.warmupRecords 
> 0));

Review Comment:
   Just for clarity, this `DEBUG` is for development purposes and will be 
removed before merging?



##
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##
@@ -51,6 +51,7 @@ public class ProducerPerformance {
 
 public static final String DEFAULT_TRANSACTION_ID_PREFIX = 
"performance-producer-";
 public static final long DEFAULT_TRANSACTION_DURATION_MS = 3000L;
+public static final int DEFAULT_REPORTING_INTERVAL_MS = 5000;

Review Comment:
   It seems like this constant could be folded into the `Stats` class and 
removed as a constructor parameter since it's never changed?



##
tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java:
##
@@ -94,7 +101,19 @@ void start(String[] args) throws IOException {
 record = new ProducerRecord<>(config.topicName, payload);
 
 long sendStartMs = System.currentTimeMillis();
-cb = new PerfCallback(sendStartMs, payload.length, stats);

Review Comment:
   Any reason not to change this to:
   
   ```java
   if (config.warmupRecords > 0 && i == config.warmupRecords) {
   steadyStateStats = new Stats(config.numRecords - config.warmupRecords, 
DEFAULT_REPORTING_INTERVAL_MS, true);
   stats.steadyStateActive = true;
   }
   
   cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
   ```
   
   True, `steadyStateStats` will be `null` up until the number of warmup 
records has elapsed. As I read in `Stats`, passing in a `null` `Stats` object 
doesn't hurt.
   
   CMIIW.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18850: Fix the docs of org.apache.kafka.automatic.config.providers [kafka]

2025-02-26 Thread via GitHub


chia7712 commented on code in PR #19039:
URL: https://github.com/apache/kafka/pull/19039#discussion_r1972690127


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -450,6 +451,32 @@ public void testConfigProvidersPropsAsParam() {
 MockFileConfigProvider.assertClosed(id);
 }
 
+@Test
+public void testAutomaticConfigProvidersWithFullClassName() {
+Properties props = new Properties();

Review Comment:
   Could you please do a bit refactor on this test?
   ```java
   // case0: MockFileConfigProvider is disallowed by 
org.apache.kafka.automatic.config.providers
   
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, "file");
   assertThrows(ConfigException.class, () -> new 
TestIndirectConfigResolution(Map.of("config.providers", "file",
   "config.providers.file.class", 
MockFileConfigProvider.class.getName()),
   Collections.emptyMap()));
   
   // case1: MockFileConfigProvider is allowed by 
org.apache.kafka.automatic.config.providers
   
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName());
   var props = Map.of("config.providers", "file",
   "config.providers.file.class", 
MockFileConfigProvider.class.getName(),
   "config.providers.file.param.testId", 
UUID.randomUUID().toString(),
   "test.key", "${file:/path:key}");
   assertEquals("testKey", new TestIndirectConfigResolution(props, 
Collections.emptyMap()).originals().get("test.key"));
   
   // case2: MockFileConfigProvider and EnvVarConfigProvider are 
allowed by org.apache.kafka.automatic.config.providers
   
System.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, 
MockFileConfigProvider.class.getName()
   + ", " + EnvVarConfigProvider.class.getName());
   assertEquals("testKey", new TestIndirectConfigResolution(props,
   Collections.emptyMap()).originals().get("test.key"));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18869:add remote storage threads to "Updating Thread Configs" section [kafka]

2025-02-26 Thread via GitHub


chia7712 merged PR #19037:
URL: https://github.com/apache/kafka/pull/19037


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18869:add remote storage threads to "Updating Thread Configs" section [kafka]

2025-02-26 Thread via GitHub


chia7712 commented on PR #19037:
URL: https://github.com/apache/kafka/pull/19037#issuecomment-2686638814

   cherry-pick to 4.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-18869) add remote storage threads to "Updating Thread Configs" section

2025-02-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-18869.

Resolution: Fixed

trunk: 
[https://github.com/apache/kafka/commit/8bbca913efe260ba59c824801466f73584c46f8f]

4.0: 
https://github.com/apache/kafka/commit/32d012fd8e0329c7b696dff9487ea704481f94a7

> add remote storage threads to "Updating Thread Configs" section
> ---
>
> Key: KAFKA-18869
> URL: https://issues.apache.org/jira/browse/KAFKA-18869
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Priority: Trivial
> Fix For: 4.0.0
>
>
> # remote.log.reader.threads
>  # remote.log.manager.copier.thread.pool.size
>  # remote.log.manager.expiration.thread.pool.size
> those configs should be added



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-18875) KRaft controller does not retry registration if the first attempt times out

2025-02-26 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu resolved KAFKA-18875.
-
Resolution: Duplicate

> KRaft controller does not retry registration if the first attempt times out
> ---
>
> Key: KAFKA-18875
> URL: https://issues.apache.org/jira/browse/KAFKA-18875
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Fonai
>Priority: Minor
>
> There is a [retry 
> mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
>  with exponential backoff built-in in KRaft controller registration. The 
> timeout of the first attempt is 5 s for KRaft controllers 
> ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
>  which is not configurable.
> If for some reason the controller's first registration request times out, the 
> attempt should be retried but in practice this does not happen and the 
> controller is not able to join the quorum. We see the following in the faulty 
> controller's log:
> {noformat}
> 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
> send ControllerRegistrationRequestData(controllerId=3, 
> incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
> listeners=[Listener(name='CONTROLPLANE-9090', 
> host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
>  port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
> minSupportedVersion=0, maxSupportedVersion=1), 
> Feature(name='metadata.version', minSupportedVersion=1, 
> maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> ...
> 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
> manager timed out before sending the request. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-to-controller-registration-channel-manager]
> 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
> for the previous RPC to complete. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> {noformat}
> After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18859: honor the error message of UnregisterBrokerResponse [kafka]

2025-02-26 Thread via GitHub


chia7712 commented on code in PR #19027:
URL: https://github.com/apache/kafka/pull/19027#discussion_r1972693467


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -642,9 +642,11 @@ class ControllerApis(
   def createResponseCallback(requestThrottleMs: Int,
  e: Throwable): UnregisterBrokerResponse = {
 if (e != null) {
+  val errors = Errors.forException(e)

Review Comment:
   Could you please leverage 
`decommissionRequest.getErrorResponse(requestThrottleMs, e)`?
   ```java
   if (e != null) {
 decommissionRequest.getErrorResponse(requestThrottleMs, e)
   } else {
 new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
   setThrottleTimeMs(requestThrottleMs))
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17651) Implement `describeUserScramCredentials`

2025-02-26 Thread TengYao Chi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TengYao Chi updated KAFKA-17651:

Description: Currently there is no implementation for 
`describeUserScramCredentials`  (was: Currently there is no implementation for 
`describeDelegationToken`, `describeUserScramCredentials` and 
`unregisterBroker`.)

> Implement `describeUserScramCredentials`
> 
>
> Key: KAFKA-17651
> URL: https://issues.apache.org/jira/browse/KAFKA-17651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: TengYao Chi
>Assignee: TengYao Chi
>Priority: Major
> Fix For: 4.1.0
>
>
> Currently there is no implementation for `describeUserScramCredentials`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17651) Implement `describeUserScramCredentials`

2025-02-26 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930930#comment-17930930
 ] 

TengYao Chi commented on KAFKA-17651:
-

Hi [~isding_l] 

Thanks for offering to help, but I've already started working on this ticket. :)

> Implement `describeUserScramCredentials`
> 
>
> Key: KAFKA-17651
> URL: https://issues.apache.org/jira/browse/KAFKA-17651
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: TengYao Chi
>Assignee: TengYao Chi
>Priority: Major
> Fix For: 4.1.0
>
>
> Currently there is no implementation for `describeUserScramCredentials`



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-02-26 Thread via GitHub


frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1972757524


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -1151,31 +1153,28 @@ protected void handlePollTimeoutExpiry() {
 "either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches " +
 "returned in poll() with max.poll.records.");
 
-maybeLeaveGroup("consumer poll timeout has expired.");
+maybeLeaveGroup(DEFAULT, "consumer poll timeout has expired.");
 }
 
 /**
- * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership or is already
- * not part of the group (ie does not have a valid member id, is in the 
UNJOINED state, or the coordinator is unknown).
+ * Sends LeaveGroupRequest and logs the {@code leaveReason}, unless this 
member is using static membership
+ * with the default consumer group membership operation, or is already not 
part of the group (i.e., does not have a
+ * valid member ID, is in the UNJOINED state, or the coordinator is 
unknown).
  *
+ * @param membershipOperation the operation on consumer group membership 
that the consumer will perform when closing
  * @param leaveReason the reason to leave the group for logging
  * @throws KafkaException if the rebalance callback throws exception
  */
-public synchronized RequestFuture maybeLeaveGroup(String 
leaveReason) {
+public synchronized RequestFuture 
maybeLeaveGroup(CloseOptions.GroupMembershipOperation membershipOperation, 
String leaveReason) {
 RequestFuture future = null;
 
-// Starting from 2.3, only dynamic members will send LeaveGroupRequest 
to the broker,
-// consumer with valid group.instance.id is viewed as static member 
that never sends LeaveGroup,
-// and the membership expiration is only controlled by session timeout.
-if (isDynamicMember() && !coordinatorUnknown() &&
-state != MemberState.UNJOINED && generation.hasMemberId()) {
-// this is a minimal effort attempt to leave the group. we do not
-// attempt any resending if the request fails or times out.
+if (rebalanceConfig.leaveGroupOnClose && 
shouldSendLeaveGroupRequest(membershipOperation)) {
 log.info("Member {} sending LeaveGroup request to coordinator {} 
due to {}",
 generation.memberId, coordinator, leaveReason);
+
 LeaveGroupRequest.Builder request = new LeaveGroupRequest.Builder(
 rebalanceConfig.groupId,
-Collections.singletonList(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))
+List.of(new 
MemberIdentity().setMemberId(generation.memberId).setReason(JoinGroupRequest.maybeTruncateReason(leaveReason)))

Review Comment:
   Just my two cents.
   The client module's minimum JDK support is Java 11, so I think it's better 
to leverage the modern API.
   But this is not related to the topic of this patch, I will revert it in next 
commit. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]

2025-02-26 Thread via GitHub


k-raina commented on code in PR #19020:
URL: https://github.com/apache/kafka/pull/19020#discussion_r1971936154


##
clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionExceptionHierarchyTest {
+
+@ParameterizedTest
+@MethodSource("retriableExceptionsProvider")
+void testRetriableExceptionExceptionHierarchy(Class 
exceptionClass) {
+assertRetriableExceptionInheritance(exceptionClass);
+}
+
+/**
+ * Verifies that the given exception class extends `RetriableException`
+ * and does **not** extend `RefreshRetriableException`.
+ * Using `RefreshRetriableException` changes the exception handling 
behavior,
+ * so only exceptions directly extending `RetriableException` are valid 
here.
+ *
+ * @param exceptionClass the exception class to check
+ */
+private void assertRetriableExceptionInheritance(Class exceptionClass) {

Review Comment:
   Updated: [Addressed 
Feedback](https://github.com/apache/kafka/pull/19020/commits/85b3c4b7852806ebad06c2349009b9172c2870f9)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-18875) KRaft controller does not retry registration if the first attempt times out

2025-02-26 Thread Daniel Fonai (Jira)
Daniel Fonai created KAFKA-18875:


 Summary: KRaft controller does not retry registration if the first 
attempt times out
 Key: KAFKA-18875
 URL: https://issues.apache.org/jira/browse/KAFKA-18875
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Fonai


There is a [retry 
mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
 with exponential backoff built-in in KRaft controller registration. The 
timeout of the first attempt is 5 s for KRaft controllers 
([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
 which is not configurable.

If for some reason the controller's first registration request times out, the 
attempt should be retried but in practice this does not happen and the 
controller is not able to join the quorum. We see the following in the faulty 
controller's log:
{noformat}
2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
send ControllerRegistrationRequestData(controllerId=3, 
incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
listeners=[Listener(name='CONTROLPLANE-9090', 
host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
 port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
minSupportedVersion=0, maxSupportedVersion=1), Feature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=21)]) 
(kafka.server.ControllerRegistrationManager) 
[controller-3-registration-manager-event-handler]
...
2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
manager timed out before sending the request. 
(kafka.server.ControllerRegistrationManager) 
[controller-3-to-controller-registration-channel-manager]
2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
for the previous RPC to complete. (kafka.server.ControllerRegistrationManager) 
[controller-3-registration-manager-event-handler]
{noformat}
After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out

2025-02-26 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17930751#comment-17930751
 ] 

Ismael Juma commented on KAFKA-18874:
-

Thanks for the report. Can you specify the version you tested?

> KRaft controller does not retry registration if the first attempt times out
> ---
>
> Key: KAFKA-18874
> URL: https://issues.apache.org/jira/browse/KAFKA-18874
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Fonai
>Priority: Minor
>
> There is a [retry 
> mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
>  with exponential backoff built-in in KRaft controller registration. The 
> timeout of the first attempt is 5 s for KRaft controllers 
> ([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
>  which is not configurable.
> If for some reason the controller's first registration request times out, the 
> attempt should be retried but in practice this does not happen and the 
> controller is not able to join the quorum. We see the following in the faulty 
> controller's log:
> {noformat}
> 2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
> send ControllerRegistrationRequestData(controllerId=3, 
> incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
> listeners=[Listener(name='CONTROLPLANE-9090', 
> host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
>  port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
> minSupportedVersion=0, maxSupportedVersion=1), 
> Feature(name='metadata.version', minSupportedVersion=1, 
> maxSupportedVersion=21)]) (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> ...
> 2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
> manager timed out before sending the request. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-to-controller-registration-channel-manager]
> 2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
> incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
> for the previous RPC to complete. 
> (kafka.server.ControllerRegistrationManager) 
> [controller-3-registration-manager-event-handler]
> {noformat}
> After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18813: ConsumerGroupHeartbeat API and ConsumerGroupDescribe API must check topic describe [kafka]

2025-02-26 Thread via GitHub


lianetm merged PR #18989:
URL: https://github.com/apache/kafka/pull/18989


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]

2025-02-26 Thread via GitHub


mumrah commented on PR #18997:
URL: https://github.com/apache/kafka/pull/18997#issuecomment-2685820634

   Regarding: 
   https://github.com/user-attachments/assets/096cf482-4701-4219-a8eb-c5613adfb390";
 />
   
   This was added in #18985 as part of the merge queue effort. It is not yet 
required for this check to pass, but if you want to fix the failed check you 
can add the "Reviewers:" line to the end of the PR body. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18844: Stale features information in QuorumController#registerBroker [kafka]

2025-02-26 Thread via GitHub


junrao commented on PR #18997:
URL: https://github.com/apache/kafka/pull/18997#issuecomment-2685839532

   @FrankYang0529 : Thanks for the updated description. Just a minor comment.
   
This makes finalized features may be stale when processing registerBroker 
event. =>  This may make finalized features stale when processing 
registerBroker event. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18873) Incorrect error message for max.in.flight.requests.per.connection when using transactional producer.

2025-02-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-18873:
--
Component/s: producer 

> Incorrect error message for max.in.flight.requests.per.connection when using 
> transactional producer.
> 
>
> Key: KAFKA-18873
> URL: https://issues.apache.org/jira/browse/KAFKA-18873
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Reporter: Eslam Mohamed
>Assignee: Eslam Mohamed
>Priority: Minor
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
>  
> {code:java}
> KafkaProducerTest.testInflightRequestsAndIdempotenceForIdempotentProduces{code}
> {{Above unit test }}checks for configuration validation errors when 
> instantiating a {{ProducerConfig}} with invalid properties. One of the 
> assertions in this test "invalidProps4" is designed to validate the 
> constraint that {{max.in.flight.requests.per.connection}} must be at most 
> {{5}} when using a transactional producer. However, the error message thrown 
> by the {{ProducerConfig}} constructor in this scenario is incorrect.
>  * *Observed Behavior:*
> When {{max.in.flight.requests.per.connection}} is set to {{6}} for a 
> transactional producer, the test expects an exception with the message:
> {{"Must set max.in.flight.requests.per.connection to at most 5 when using the 
> transactional producer."}}
> Instead, the error message states:
> {{"Must set retries to non-zero when using the idempotent producer."}}
>  * *Expected Behavior:*
> The error message should explicitly indicate the violation of the 
> {{max.in.flight.requests.per.connection}} constraint for transactional 
> producers:
> {{"Must set max.in.flight.requests.per.connection to at most 5 when using the 
> transactional producer."}}
> The mismatch in the error message can lead to confusion for developers 
> debugging the configuration error, as it incorrectly hints at a {{retries}} 
> configuration issue instead of the actual 
> {{max.in.flight.requests.per.connection}} issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18876) 4.0 documentation improvement

2025-02-26 Thread Jun Rao (Jira)
Jun Rao created KAFKA-18876:
---

 Summary: 4.0 documentation improvement
 Key: KAFKA-18876
 URL: https://issues.apache.org/jira/browse/KAFKA-18876
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 4.0.0
Reporter: Jun Rao


We need to fix a few things in the 4.0 documentation.
 
6.10 Consumer Rebalance Protocol 
It's missing from the index on the left.
 
ConsumerGroupPartitionAssignor is cut off in 
org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionA.
 
6.11 Transaction Protocol
It's missing from the index on the left.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] WIP Investigate OOM [kafka]

2025-02-26 Thread via GitHub


mumrah commented on code in PR #19031:
URL: https://github.com/apache/kafka/pull/19031#discussion_r1972112034


##
build.gradle:
##
@@ -54,7 +54,7 @@ ext {
   buildVersionFileName = "kafka-version.properties"
 
   defaultMaxHeapSize = "2g"
-  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC"]
+  defaultJvmArgs = ["-Xss4m", "-XX:+UseParallelGC", "-XX:-UseGCOverheadLimit"]

Review Comment:
   @ijuma WDYT about disabling this feature? From what I can tell, this will 
prevent a long GC pause from triggering an OOM. Instead, the build would likely 
just timeout (which it's doing anyways with the OOM happing in the Gradle 
worker).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16092) Queues for Kafka

2025-02-26 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield updated KAFKA-16092:
-
Fix Version/s: (was: 4.1.0)

> Queues for Kafka
> 
>
> Key: KAFKA-16092
> URL: https://issues.apache.org/jira/browse/KAFKA-16092
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
>  Labels: queues-for-kafka
> Attachments: image-2024-04-28-11-05-56-153.png
>
>
> This Jira tracks the development of KIP-932: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18874) KRaft controller does not retry registration if the first attempt times out

2025-02-26 Thread Daniel Fonai (Jira)
Daniel Fonai created KAFKA-18874:


 Summary: KRaft controller does not retry registration if the first 
attempt times out
 Key: KAFKA-18874
 URL: https://issues.apache.org/jira/browse/KAFKA-18874
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Fonai


There is a [retry 
mechanism|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerRegistrationManager.scala#L274]
 with exponential backoff built-in in KRaft controller registration. The 
timeout of the first attempt is 5 s for KRaft controllers 
([code|https://github.com/apache/kafka/blob/3.9.0/core/src/main/scala/kafka/server/ControllerServer.scala#L448])
 which is not configurable.

If for some reason the controller's first registration request times out, the 
attempt should be retried but in practice this does not happen and the 
controller is not able to join the quorum. We see the following in the faulty 
controller's log:
{noformat}
2025-02-21 13:31:46,606 INFO [ControllerRegistrationManager id=3 
incarnation=mEzjHheAQ_eDWejAFquGiw] sendControllerRegistration: attempting to 
send ControllerRegistrationRequestData(controllerId=3, 
incarnationId=mEzjHheAQ_eDWejAFquGiw, zkMigrationReady=true, 
listeners=[Listener(name='CONTROLPLANE-9090', 
host='kraft-rollback-kafka-controller-pool-3.kraft-rollback-kafka-kafka-brokers.csm-op-test-kraft-rollback-631e64ac.svc',
 port=9090, securityProtocol=1)], features=[Feature(name='kraft.version', 
minSupportedVersion=0, maxSupportedVersion=1), Feature(name='metadata.version', 
minSupportedVersion=1, maxSupportedVersion=21)]) 
(kafka.server.ControllerRegistrationManager) 
[controller-3-registration-manager-event-handler]
...
2025-02-21 13:31:51,627 ERROR [ControllerRegistrationManager id=3 
incarnation=mEzjHheAQ_eDWejAFquGiw] RegistrationResponseHandler: channel 
manager timed out before sending the request. 
(kafka.server.ControllerRegistrationManager) 
[controller-3-to-controller-registration-channel-manager]
2025-02-21 13:31:51,726 INFO [ControllerRegistrationManager id=3 
incarnation=mEzjHheAQ_eDWejAFquGiw] maybeSendControllerRegistration: waiting 
for the previous RPC to complete. (kafka.server.ControllerRegistrationManager) 
[controller-3-registration-manager-event-handler]
{noformat}
After this we can not see any controller retry in the log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] WIP Investigate OOM [kafka]

2025-02-26 Thread via GitHub


mumrah commented on PR #19031:
URL: https://github.com/apache/kafka/pull/19031#issuecomment-2685589737

   It looks like the UserQuotaTest is the likely culprit. 
   
   From https://github.com/apache/kafka/actions/runs/13523225366/job/37791507296
   ```
   Gradle Test Run :core:test > Gradle Test Executor 38 > UserQuotaTest > 
testQuotaOverrideDelete(String, String) > testQuotaOverrideDelete(String, 
String).quorum=kraft.groupProtocol=consumer STARTED
   
   > Task :storage:compileTestJava
   Unexpected exception thrown.
   org.gradle.internal.remote.internal.MessageIOException: Could not read 
message from '/127.0.0.1:50402'.
   
at 
org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:99)
at 
org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at 
org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:48)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
   ```
   
   and from 
https://github.com/apache/kafka/actions/runs/13534291834/job/37823471305
   
   ```
   Gradle Test Run :core:test > Gradle Test Executor 37 > UserQuotaTest > 
testQuotaOverrideDelete(String, String) > testQuotaOverrideDelete(String, 
String).quorum=kraft.groupProtocol=consumer STARTED
   
   > Task :storage:checkstyleMain
   > Task :shell:checkstyleMain
   Unexpected exception thrown.
   
   org.gradle.internal.remote.internal.MessageIOException: Could not read 
message from '/127.0.0.1:38838'.
at 
org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:99)
at 
org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270)
at 
org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64)
at 
org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:48)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
   Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
   ```
   
   It appears that the Gradle worker is trying to send results to the main 
process and causing a long GC pause which triggers this "GC overhead limit 
exceeded" error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]

2025-02-26 Thread via GitHub


k-raina commented on code in PR #19020:
URL: https://github.com/apache/kafka/pull/19020#discussion_r1971934155


##
clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionExceptionHierarchyTest {
+
+@ParameterizedTest
+@MethodSource("retriableExceptionsProvider")
+void testRetriableExceptionExceptionHierarchy(Class 
exceptionClass) {
+assertRetriableExceptionInheritance(exceptionClass);
+}
+
+/**
+ * Verifies that the given exception class extends `RetriableException`
+ * and does **not** extend `RefreshRetriableException`.
+ * Using `RefreshRetriableException` changes the exception handling 
behavior,
+ * so only exceptions directly extending `RetriableException` are valid 
here.
+ *
+ * @param exceptionClass the exception class to check
+ */
+private void assertRetriableExceptionInheritance(Class exceptionClass) {

Review Comment:
   Updated: [Addressed 
Feedback](https://github.com/apache/kafka/pull/19020/commits/85b3c4b7852806ebad06c2349009b9172c2870f9)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17565: Move MetadataCache interface to metadata module [kafka]

2025-02-26 Thread via GitHub


FrankYang0529 commented on PR #18801:
URL: https://github.com/apache/kafka/pull/18801#issuecomment-2685571786

   @mumrah I update benchmark to PR description. The result is a little 
different from yours. The difference between trunk and branch results on my 
laptop is close, so I think the result is acceptable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18780: Extend RetriableException related exceptions [kafka]

2025-02-26 Thread via GitHub


k-raina commented on code in PR #19020:
URL: https://github.com/apache/kafka/pull/19020#discussion_r1971934155


##
clients/src/test/java/org/apache/kafka/common/errors/TransactionExceptionHierarchyTest.java:
##
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TransactionExceptionHierarchyTest {
+
+@ParameterizedTest
+@MethodSource("retriableExceptionsProvider")
+void testRetriableExceptionExceptionHierarchy(Class 
exceptionClass) {
+assertRetriableExceptionInheritance(exceptionClass);
+}
+
+/**
+ * Verifies that the given exception class extends `RetriableException`
+ * and does **not** extend `RefreshRetriableException`.
+ * Using `RefreshRetriableException` changes the exception handling 
behavior,
+ * so only exceptions directly extending `RetriableException` are valid 
here.
+ *
+ * @param exceptionClass the exception class to check
+ */
+private void assertRetriableExceptionInheritance(Class exceptionClass) {

Review Comment:
   Updated: [Addressed 
Feedback](https://github.com/apache/kafka/pull/19020/commits/85b3c4b7852806ebad06c2349009b9172c2870f9)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18723) KRaft must handle corrupted records in the fetch response

2025-02-26 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-18723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-18723:
---
Fix Version/s: 4.0.1

> KRaft must handle corrupted records in the fetch response
> -
>
> Key: KAFKA-18723
> URL: https://issues.apache.org/jira/browse/KAFKA-18723
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: José Armando García Sancio
>Assignee: José Armando García Sancio
>Priority: Major
> Fix For: 3.9.1, 3.8.2, 3.7.3, 4.0.1
>
>
> It is possible for a KRaft replica to send corrupted records to the fetching 
> replicas in the FETCH response. This is because there is a race between when 
> the FETCH response gets generated by the KRaft IO thread and when the network 
> thread, or linux kernel, reads the byte position in the log segment.
> This race can generated corrupted records if the KRaft replica performed a 
> truncation after the FETCH response was created but before the network thread 
> read the bytes from the log segment.
> I have seen the following errors:
> {code:java}
>  [ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread
> org.apache.kafka.common.KafkaException: Append failed unexpectedly
>   at 
> kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
>   at 
> kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
>   at 
> org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
>   at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
>   at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
>   at 
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
> and
> {code:java}
>  [ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread] 
> org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault - 
> Encountered fatal fault: Unexpected error in raft IO thread"
> org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less 
> than the minimum record overhead (14)"{code}
> This race also exists with Kafka's ISR based topic partition. In that case 
> the replica fetcher catches all CorruptRecordException and 
> InvalidRecordException.
> {code:java}
>                     } catch {
>                       case ime@(_: CorruptRecordException | _: 
> InvalidRecordException) =>
>                         // we log the error and continue. This ensures two 
> things
>                         // 1. If there is a corrupt message in a topic 
> partition, it does not bring the fetcher thread
>                         //    down and cause other topic partition to also lag
>                         // 2. If the message is corrupt due to a transient 
> state in the log (truncation, partial writes
>                         //    can cause this), we simply continue and should 
> get fixed in the subsequent fetches
>                         error(s"Found invalid messages during fetch for 
> partition $topicPartition " +
>                           s"offset ${currentFetchState.fetchOffset}", ime)
>                         partitionsWithError += topicPartition
>  {code}
> The KRaft implementation doesn't handle this case:
> {code:java}
>               } else {
>                   Records records = 
> FetchResponse.recordsOrFail(partitionResponse);
>                   if (records.sizeInBytes() > 0) {
>                       appendAsFollower(records);
>                   }
>                   OptionalLong highWatermark = 
> partitionResponse.highWatermark() < 0 ?
>                       OptionalLong.empty() :
>   OptionalLong.of(partitionResponse.highWatermark());
>                   updateFollowerHighWatermark(state, highWatermark);
>               }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >