[jira] [Commented] (KAFKA-18930) KRaft MigrationEvent won't retry when failing to write data to ZK

2025-03-08 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932938#comment-17932938
 ] 

Luke Chen commented on KAFKA-18930:
---

[~davidarthur] [~mumrah] , I'd like to hear your thought on this issue. Thanks.

> KRaft MigrationEvent won't retry when failing to write data to ZK 
> --
>
> Key: KAFKA-18930
> URL: https://issues.apache.org/jira/browse/KAFKA-18930
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.9.0
>Reporter: Luke Chen
>Priority: Major
>
> When running ZK migrating to KRaft, there will be a dual-write mode. In that 
> mode, metadata will write to KRaft, then write to ZK asynchronously. When 
> there's some exception, KRaft MigrationEvent won't retry when failing to 
> write data to ZK. That causes metadata inconsistency between KRaft and ZK.
>  
> Note:
> 1. Besides, when doing KRaft controller clean shutdown, we should keep 
> retrying the failing ZK writing until force shutdown, to make sure the 
> metadata is consistent.
> 2.  When doing shutdown, [the order of 
> shutdown|https://github.com/apache/kafka/blob/1ec1043d5197c4f807fa5cbc41d875b289443096/core/src/main/scala/kafka/server/ControllerServer.scala#L69-L76]
>  is to close ZK -> close RPC Client -> close migration driver. That causes 
> another issue that even if we retry the ZK write, it will never succeed when 
> shutdown is ongoing because ZK connection is closed first.
>  
> The impact is when rolling back to ZK mode during migration, the metadata in 
> ZK is out of date



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]

2025-03-08 Thread via GitHub


mjsax commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986158617


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1163,10 +1166,12 @@ void handleRevocation(final Collection 
revokedPartitions) {
 // as such we just need to skip those dirty tasks in the checkpoint
 final Set dirtyTasks = new HashSet<>();
 try {
-// in handleRevocation we must call commitOffsetsOrTransaction() 
directly rather than
-// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make 
sure we don't skip the
-// offset commit because we are in a rebalance
-taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+if (revokedTasksNeedCommit) {

Review Comment:
   Not sure if I can follow?
   
   Your proposal would say, we stop committing if a TX is in-flight, but we do 
want to commit for this case, right? Even if offset-map is empty.
   
   And moving it to the outer-most context seems not to be "correct", because 
checking if a TX is inflight for the ALOS case seems unnecessary (guess it 
would not be wrong, because the call would just always return `false` so not 
really changing anything effectively, but it seems unnecessary to change the 
code) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Improve PR linter output [kafka]

2025-03-08 Thread via GitHub


mumrah commented on PR #19159:
URL: https://github.com/apache/kafka/pull/19159#issuecomment-2707179546

   Example step summary:
   
   Commit will look like:
   
   ```
   MINOR: Disallow unused local variables
   
   Recently, we found a regression that could have been detected by static 
analysis, since a local variable wasn't being passed to a method during a 
refactoring, and was left unused. It was fixed in 
[7a749b5](https://github.com/apache/kafka/commit/7a749b589f8c98bd452b79b49cdfb182894d7f57),
  but almost slipped into 4.0. Unused variables are typically detected by IDEs, 
but this is insufficient to prevent these kinds of bugs. This change enables 
unused local variable detection in checkstyle for Kafka.
   
   A few notes on the usage:
- There are two situations in which people actually want to have a local 
variable but not use it. First, there are `for (Type ignored: collection)` 
loops which have to loop `collection.length` number of times, but that do not 
use `ignored` in the loop body. These are typically still easier to read than a 
classical `for` loop. Second, some IDEs detect it if a return value of a 
function such as `File.delete` is not being used. In this case, people 
sometimes store the result in an unused local variable to make ignoring the 
return value explicit and to avoid the squiggly lines.
- In Java 22, unsued local variables can be omitted by using a single 
underscore `_`. This is supported by checkstyle. In pre-22 versions, IntelliJ 
allows such variables to be named `ignored` to suppress the unused local 
variable warning. This pattern is often (but not consistently) used in the 
Kafka codebase. This is, however, not supported by checkstyle.
   
   Since we cannot switch to Java 22, yet, and we want to use automated 
detection using checkstyle, we have to resort to prefixing the unused local 
variables with `@SuppressWarnings("UnusedLocalVariable")`. We have to apply 
this in 11 cases across the Kafka codebase. While not being pretty, I'd argue 
it's worth it to prevent bugs like the one fixed in 
[7a749b5](https://github.com/apache/kafka/commit/7a749b589f8c98bd452b79b49cdfb182894d7f57).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   ```
   
   Validation results:
   * ✅ Title is not truncated
   * ✅ Title is not too short
   * ✅ Title is not too long
   * ✅ Title has expected KAFKA/MINOR/HOTFIX
   * ✅ Body is not empty
   * ✅ PR template text not present
   * ❌ Old PR template text should be removed
   * ❌ Pull Request is approved, but no 'Reviewers' found in commit body


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18942) Add reviewers to PR body with committer-tools

2025-03-08 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933576#comment-17933576
 ] 

Chia-Ping Tsai commented on KAFKA-18942:


+1 to this idea. [~mingyen066] will take over this

> Add reviewers to PR body with committer-tools
> -
>
> Key: KAFKA-18942
> URL: https://issues.apache.org/jira/browse/KAFKA-18942
> Project: Kafka
>  Issue Type: Sub-task
>  Components: build
>Reporter: David Arthur
>Assignee: Ming-Yen Chung
>Priority: Major
>
> When we switch to the merge queue, we cannot alter the commit message 
> directly and instead must use the PR body for the eventual commit message.
>  
> In order to include our "Reviewers" metadata in the commit, we must edit the 
> PR body after a review has happened and add the "Reviewers" manually. This is 
> rather annoying and we can do better.
>  
> The committer-tools script "reviewers.py" can use the GitHub API (via "gh") 
> to read, modify, and update the PR body with the reviewers selected by this 
> tool.
>  
> For example, 
>  
> {noformat}
> $ ./committer-tools/reviewers.py
> Utility to help generate 'Reviewers' string for Pull Requests. Use Ctrl+D or 
> Ctrl+C to exit
> Name or email (case insensitive): chia
> Possible matches (in order of most recent):
> [1] Chia-Ping Tsai chia7...@gmail.com (1908)
> [2] Chia-Ping Tsai chia7...@apache.org (13)
> [3] Chia-Chuan Yu yujuan...@gmail.com (11)
> [4] Chia Chuan Yu yujuan...@gmail.com (10)
> Make a selection: 1
> Reviewers so far: [('Chia-Ping Tsai', 'chia7...@gmail.com', 1908)]
> Name or email (case insensitive): ^C
> Reviewers: Chia-Ping Tsai 
> Pull Request to update (Ctrl+D or Ctrl+C to skip): 19144
> Adding Reviewers to 19144...
> {noformat}
>  
> The script should be able to handle existing "Reviewers" string in the PR body



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR KIP link change to use immutable link [kafka]

2025-03-08 Thread via GitHub


m1a2st commented on PR #19153:
URL: https://github.com/apache/kafka/pull/19153#issuecomment-2708602615

   ```
   https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-=
   ```
   
   Keep only this URL, as it will redirect to the specific section.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR KIP link change to use immutable link [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on PR #19153:
URL: https://github.com/apache/kafka/pull/19153#issuecomment-2708606376

   > Keep only this URL, as it will redirect to the specific section.
   
   how about 
https://cwiki.apache.org/confluence/x/2xRRCg#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid
  ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18700: Migrate SnapshotPath and Entry in LogHistory to record classes [kafka]

2025-03-08 Thread via GitHub


chia7712 merged PR #19062:
URL: https://github.com/apache/kafka/pull/19062


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18700) Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo to record classes

2025-03-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-18700:
---
Summary: Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and 
LogAppendInfo to record classes  (was: Migrate suitable classes to records in 
raft)

> Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo 
> to record classes
> --
>
> Key: KAFKA-18700
> URL: https://issues.apache.org/jira/browse/KAFKA-18700
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: TengYao Chi
>Assignee: Ming-Yen Chung
>Priority: Minor
>
> raft:
> Entry
> SnapshotPath



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Rewrite unchecked operations in Mock API [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on code in PR #19071:
URL: https://github.com/apache/kafka/pull/19071#discussion_r1986194037


##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java:
##
@@ -391,24 +386,20 @@ public void testStateStore() {
 public void 
shouldNotEnableSendingOldValuesIfNotMaterializedAlreadyAndNotForcedToMaterialize()
 {
 final StreamsBuilder builder = new StreamsBuilder();
 
-final KTableImpl table =
-(KTableImpl) builder.table("topic1", 
consumed);
+final var kTable = assertInstanceOf(KTableImpl.class, 
builder.table("topic1", consumed));
+kTable.enableSendingOldValues(false);
 
-table.enableSendingOldValues(false);
-
-assertThat(table.sendingOldValueEnabled(), is(false));
+assertFalse(kTable.sendingOldValueEnabled());

Review Comment:
   ditto



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java:
##
@@ -587,19 +577,16 @@ public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie
 assertThrows(NullPointerException.class, () -> 
table.transformValues(null));
 }
 
-@SuppressWarnings("unchecked")
 @Test
 public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
-final ValueTransformerWithKeySupplier 
valueTransformerSupplier =
-mock(ValueTransformerWithKeySupplier.class);
+final ValueTransformerWithKeySupplier 
valueTransformerSupplier = mock();
 assertThrows(NullPointerException.class, () -> 
table.transformValues(valueTransformerSupplier, (Materialized>) null));
 }
 
-@SuppressWarnings("unchecked")
+
 @Test
 public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
-final ValueTransformerWithKeySupplier 
valueTransformerSupplier =
-mock(ValueTransformerWithKeySupplier.class);
+final ValueTransformerWithKeySupplier 
valueTransformerSupplier = mock();
 assertThrows(NullPointerException.class, () -> 
table.transformValues(valueTransformerSupplier, (String[]) null));

Review Comment:
   maybe we can pass `mock()` to `transformValues` directly?
   
   ```java
   assertThrows(NullPointerException.class, () -> 
table.transformValues(mock(), (String[]) null));
   ```



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java:
##
@@ -587,19 +577,16 @@ public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie
 assertThrows(NullPointerException.class, () -> 
table.transformValues(null));
 }
 
-@SuppressWarnings("unchecked")
 @Test
 public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
-final ValueTransformerWithKeySupplier 
valueTransformerSupplier =
-mock(ValueTransformerWithKeySupplier.class);
+final ValueTransformerWithKeySupplier 
valueTransformerSupplier = mock();
 assertThrows(NullPointerException.class, () -> 
table.transformValues(valueTransformerSupplier, (Materialized>) null));

Review Comment:
   ditto



##
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java:
##
@@ -391,24 +386,20 @@ public void testStateStore() {
 public void 
shouldNotEnableSendingOldValuesIfNotMaterializedAlreadyAndNotForcedToMaterialize()
 {
 final StreamsBuilder builder = new StreamsBuilder();
 
-final KTableImpl table =
-(KTableImpl) builder.table("topic1", 
consumed);
+final var kTable = assertInstanceOf(KTableImpl.class, 
builder.table("topic1", consumed));
+kTable.enableSendingOldValues(false);
 
-table.enableSendingOldValues(false);
-
-assertThat(table.sendingOldValueEnabled(), is(false));
+assertFalse(kTable.sendingOldValueEnabled());
 }
 
 @Test
 public void 
shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMaterialize() {
 final StreamsBuilder builder = new StreamsBuilder();
 
-final KTableImpl table =
-(KTableImpl) builder.table("topic1", 
consumed);
-
-table.enableSendingOldValues(true);
+final var kTable = assertInstanceOf(KTableImpl.class, 
builder.table("topic1", consumed));
+kTable.enableSendingOldValues(true);
 
-assertThat(table.sendingOldValueEnabled(), is(true));
+assertTrue(kTable.sendingOldValueEnabled());

Review Comment:
   this change seems to be unnecessary? Also, we don't need to rename the 
`table` to `kTable`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-18942) Add reviewers to PR body with committer-tools

2025-03-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-18942:
--

Assignee: Ming-Yen Chung

> Add reviewers to PR body with committer-tools
> -
>
> Key: KAFKA-18942
> URL: https://issues.apache.org/jira/browse/KAFKA-18942
> Project: Kafka
>  Issue Type: Sub-task
>  Components: build
>Reporter: David Arthur
>Assignee: Ming-Yen Chung
>Priority: Major
>
> When we switch to the merge queue, we cannot alter the commit message 
> directly and instead must use the PR body for the eventual commit message.
>  
> In order to include our "Reviewers" metadata in the commit, we must edit the 
> PR body after a review has happened and add the "Reviewers" manually. This is 
> rather annoying and we can do better.
>  
> The committer-tools script "reviewers.py" can use the GitHub API (via "gh") 
> to read, modify, and update the PR body with the reviewers selected by this 
> tool.
>  
> For example, 
>  
> {noformat}
> $ ./committer-tools/reviewers.py
> Utility to help generate 'Reviewers' string for Pull Requests. Use Ctrl+D or 
> Ctrl+C to exit
> Name or email (case insensitive): chia
> Possible matches (in order of most recent):
> [1] Chia-Ping Tsai chia7...@gmail.com (1908)
> [2] Chia-Ping Tsai chia7...@apache.org (13)
> [3] Chia-Chuan Yu yujuan...@gmail.com (11)
> [4] Chia Chuan Yu yujuan...@gmail.com (10)
> Make a selection: 1
> Reviewers so far: [('Chia-Ping Tsai', 'chia7...@gmail.com', 1908)]
> Name or email (case insensitive): ^C
> Reviewers: Chia-Ping Tsai 
> Pull Request to update (Ctrl+D or Ctrl+C to skip): 19144
> Adding Reviewers to 19144...
> {noformat}
>  
> The script should be able to handle existing "Reviewers" string in the PR body



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-18700) Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo to record classes

2025-03-08 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-18700.

Fix Version/s: 4.1.0
   Resolution: Fixed

> Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo 
> to record classes
> --
>
> Key: KAFKA-18700
> URL: https://issues.apache.org/jira/browse/KAFKA-18700
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: TengYao Chi
>Assignee: Ming-Yen Chung
>Priority: Minor
> Fix For: 4.1.0
>
>
> raft:
> Entry
> SnapshotPath



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-18944) Remove unused setters from ClusterConfig

2025-03-08 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-18944:
--

 Summary: Remove unused setters from ClusterConfig
 Key: KAFKA-18944
 URL: https://issues.apache.org/jira/browse/KAFKA-18944
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


saslServerProperties, saslClientProperties, adminClientProperties, 
producerProperties, consumerProperties

 

those setters are not used actually, so we should remove them to avoid 
misleading developers.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-10731: add support for SSL hot reload [kafka]

2025-03-08 Thread via GitHub


github-actions[bot] commented on PR #17987:
URL: https://github.com/apache/kafka/pull/17987#issuecomment-2708651331

   This PR is being marked as stale since it has not had any activity in 90 
days. If you
   would like to keep this PR alive, please leave a comment asking for a 
review. If the PR has 
   merge conflicts, update it with the latest from the base branch.
   
   If you are having difficulty finding a reviewer, please reach out on the 
   [mailing list](https://kafka.apache.org/contact).
   
   If this PR is no longer valid or desired, please feel free to close it. If 
no activity
   occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-18420) Find out the license which is in the license file but is not in distribution

2025-03-08 Thread kangning.li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933584#comment-17933584
 ] 

kangning.li commented on KAFKA-18420:
-

file: [https://github.com/apache/kafka/pull/18299]
 

This PR has already accomplished this task. So I am going to close this issue.

> Find out the license which is in the license file but is not in distribution
> 
>
> Key: KAFKA-18420
> URL: https://issues.apache.org/jira/browse/KAFKA-18420
> Project: Kafka
>  Issue Type: Improvement
>Reporter: kangning.li
>Assignee: kangning.li
>Priority: Major
>
> see discussion:   
> https://github.com/apache/kafka/pull/18299#discussion_r1904604076



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR KIP link change to use immutable link [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on PR #19153:
URL: https://github.com/apache/kafka/pull/19153#issuecomment-2708599138

   ![螢幕快照 2025-03-09 
09-08-00](https://github.com/user-attachments/assets/97bc7361-7379-45cd-af6e-69da15bb1198)
   
   @m1a2st could you please fix above links as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18915: Migrate AdminClientRebootstrapTest to use new test infra [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on code in PR #19094:
URL: https://github.com/apache/kafka/pull/19094#discussion_r1986207973


##
core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.test.api;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.ClusterTemplate;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
+import org.apache.kafka.test.TestUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+public class AdminClientRebootstrapTest {
+private static final int BROKER_COUNT = 2;
+
+private static List generator() {
+// Enable unclean leader election for the test topic
+Map serverProperties = Map.of(
+TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true",
+GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, 
String.valueOf(BROKER_COUNT)
+);
+
+return Stream.of(false, true)
+.map(AdminClientRebootstrapTest::getRebootstrapConfig)
+.map(rebootstrapProperties -> 
AdminClientRebootstrapTest.buildConfig(serverProperties, rebootstrapProperties))
+.toList();
+}
+
+private static Map getRebootstrapConfig(boolean 
useRebootstrapTriggerMs) {
+Map properties = new HashMap<>();
+if (useRebootstrapTriggerMs) {
+
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "5000");
+} else {
+
properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "360");
+
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
"5000");
+
properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
 "5000");
+properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, 
"1000");
+
properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000");
+}
+properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"rebootstrap");
+return properties;
+}
+
+private static ClusterConfig buildConfig(Map 
serverProperties, Map rebootstrapProperties) {
+return ClusterConfig.defaultBuilder()
+.setTypes(Set.of(Type.KRAFT))
+.setBrokers(BROKER_COUNT)
+.setAdminClientProperties(rebootstrapProperties)

Review Comment:
   open https://issues.apache.org/jira/browse/KAFKA-18944 to remove those 
unused setters



##
core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java:
##
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.test.api;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfig;
+import org.apache.kafka.common.test.api.Cluste

[jira] [Commented] (KAFKA-18944) Remove unused setters from ClusterConfig

2025-03-08 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-18944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933582#comment-17933582
 ] 

Chia-Ping Tsai commented on KAFKA-18944:


a example of misusing: 
https://github.com/apache/kafka/pull/19094#discussion_r1986207871

> Remove unused setters from ClusterConfig
> 
>
> Key: KAFKA-18944
> URL: https://issues.apache.org/jira/browse/KAFKA-18944
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> saslServerProperties, saslClientProperties, adminClientProperties, 
> producerProperties, consumerProperties
>  
> those setters are not used actually, so we should remove them to avoid 
> misleading developers.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18933 Add client integration tests module [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on code in PR #19144:
URL: https://github.com/apache/kafka/pull/19144#discussion_r1986210326


##
clients/integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientRebootstrapTest.java:
##
@@ -33,6 +32,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+
+
 public class AdminClientRebootstrapTest {

Review Comment:
   @mumrah could you please consider using a different test? 
`AdminClientRebootstrapTest` requires some rewriting (see 
https://github.com/apache/kafka/pull/19094#discussion_r1986208718). Perhaps we 
could use `AdminFenceProducersTest` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18909: Move DynamicThreadPool to server module [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on PR #19081:
URL: https://github.com/apache/kafka/pull/19081#issuecomment-2708604833

   @clarkwtc could you please merge trunk to run CI again?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup connect runtime module [kafka]

2025-03-08 Thread via GitHub


wernerdv commented on code in PR #18074:
URL: https://github.com/apache/kafka/pull/18074#discussion_r1986122644


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -466,16 +466,16 @@ protected boolean readPartition(TopicPartition 
topicPartition) {
 return true;
 }
 
-private void poll(long timeoutMs) {
+private void poll() {
 try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(timeoutMs));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));

Review Comment:
   @ashrivastava88 If you look at the trunk before this commit was merged, you 
can see that the method KafkaBasedLog#poll was used in only 2 places with the 
argument timeoutMs=Integer.MAX_VALUE.
   So, in this PR, I did not change the existing behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]

2025-03-08 Thread via GitHub


mimaison commented on PR #19030:
URL: https://github.com/apache/kafka/pull/19030#issuecomment-2708369200

   Rebased on trunk and ported 
https://github.com/apache/kafka/commit/40db001588047201a406ebe969d1d7d2d5eefd57


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Cleanup connect runtime module [kafka]

2025-03-08 Thread via GitHub


ashrivastava88 commented on code in PR #18074:
URL: https://github.com/apache/kafka/pull/18074#discussion_r1986112509


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -466,16 +466,16 @@ protected boolean readPartition(TopicPartition 
topicPartition) {
 return true;
 }
 
-private void poll(long timeoutMs) {
+private void poll() {
 try {
-ConsumerRecords records = 
consumer.poll(Duration.ofMillis(timeoutMs));
+ConsumerRecords records = 
consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));

Review Comment:
   the timeout here is getting set to Integer.MAX_VALUE,, so if we have any 
failure at consumer fetch. 
   the consumer will keep on retrying till  this timeout is reached.
   We have had a case at confluent,, where one of our customer shared logs,, 
and it is filled with the consumer fetch,, retrying ,, and every subsequent 
timeout is decresing,, from this Integer.MAX_VALUE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Adjust ToC of zk2kraft and fix wrong section number of docker [kafka]

2025-03-08 Thread via GitHub


chia7712 merged PR #19146:
URL: https://github.com/apache/kafka/pull/19146


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18648: Make `records` in `FetchResponse` nullable again [kafka]

2025-03-08 Thread via GitHub


ijuma commented on PR #19131:
URL: https://github.com/apache/kafka/pull/19131#issuecomment-2704563232

   Cherry-picked to 4.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]

2025-03-08 Thread via GitHub


mjsax commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986165537


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1153,8 +1157,7 @@ void handleRevocation(final Collection 
revokedPartitions) {
 prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);

Review Comment:
   Hmmm... Thinking about this again, it seems the `if` below should actually 
only apply to EOSv2 case? I believe we did actually include some task 
unnecessarily for ALOS (and older version EOSv1) case?
   
   However, changing this code below does break two tests...
   ```
   if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) {
   prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
   }
   ``
   Tests:
   ```
   
TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos()
   TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation()
   ```
   At least the second test assumes we commit everything for ALOS, too. I was 
added when we added EOSv2 and unified commit logic 
(https://github.com/apache/kafka/pull/8318) -- but I cannot remember why we did 
it this way... \cc @guozhangwang @ableegoldman do you remember?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2025-03-08 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2939:
---
Labels:   (was: newbie)

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Priority: Major
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]

2025-03-08 Thread via GitHub


mjsax commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986165537


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1153,8 +1157,7 @@ void handleRevocation(final Collection 
revokedPartitions) {
 prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);

Review Comment:
   Hmmm... Thinking about this again, it seems the `if` below should actually 
only apply to EOSv2 case? I believe we did actually include some task 
unnecessarily for ALOS (and older version EOSv1) case?
   
   However, changing this code below does break two tests...
   ```
   if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) {
   prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, 
consumedOffsetsPerTask);
   }
   ```
   Tests:
   ```
   
TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos()
   TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation()
   ```
   At least the second test assumes we commit everything for ALOS, too. I was 
added when we added EOSv2 and unified commit logic 
(https://github.com/apache/kafka/pull/8318) -- but I cannot remember why we did 
it this way... \cc @guozhangwang @ableegoldman do you remember?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-18944: Remove unused setters from ClusterConfig [kafka]

2025-03-08 Thread via GitHub


clarkwtc opened a new pull request, #19166:
URL: https://github.com/apache/kafka/pull/19166

   Remove unused `saslServerProperties`, `saslClientProperties`, 
`adminClientProperties`, `producerProperties`, and `consumerProperties` in 
ClusterConfig.
   
   First, I quickly fixed the unused adminClientProperties, and then I will 
move on to https://github.com/apache/kafka/pull/19094 to fix the related issues.
   
   Pass AdminClientRebootstrapTest
   https://github.com/user-attachments/assets/73c50376-6602-493d-8abd-0eb2bb304114";
 />
   
   Pass ClusterConfigTest
   https://github.com/user-attachments/assets/b4da59da-dfdf-4698-9077-5086854360ab";
 />
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-18944) Remove unused setters from ClusterConfig

2025-03-08 Thread Wei-Ting Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wei-Ting Chen reassigned KAFKA-18944:
-

Assignee: Wei-Ting Chen  (was: Chia-Ping Tsai)

> Remove unused setters from ClusterConfig
> 
>
> Key: KAFKA-18944
> URL: https://issues.apache.org/jira/browse/KAFKA-18944
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Wei-Ting Chen
>Priority: Major
>
> saslServerProperties, saslClientProperties, adminClientProperties, 
> producerProperties, consumerProperties
>  
> those setters are not used actually, so we should remove them to avoid 
> misleading developers.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18909: Move DynamicThreadPool to server module [kafka]

2025-03-08 Thread via GitHub


clarkwtc commented on PR #19081:
URL: https://github.com/apache/kafka/pull/19081#issuecomment-2708609581

   @chia7712 
   No problem.
   I've merged it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18422) add Kafka client upgrade path

2025-03-08 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-18422:

Fix Version/s: 4.0.0

> add Kafka client upgrade path
> -
>
> Key: KAFKA-18422
> URL: https://issues.apache.org/jira/browse/KAFKA-18422
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Blocker
>  Labels: need-kip
> Fix For: 4.0.0
>
>
> https://github.com/apache/kafka/pull/18193#issuecomment-2572283545



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]

2025-03-08 Thread via GitHub


frankvicky commented on code in PR #17614:
URL: https://github.com/apache/kafka/pull/17614#discussion_r1986090241


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -427,7 +428,7 @@ public void onGroupAssignmentUpdated(Set 
partitions) {
 // call close methods if internal objects are already constructed; 
this is to prevent resource leak. see KAFKA-2121
 // we do not need to call `close` at all when `log` is null, which 
means no internal objects were initialized.
 if (this.log != null) {
-close(Duration.ZERO, true);
+close(Duration.ZERO, 
CloseOptions.GroupMembershipOperation.DEFAULT, true);

Review Comment:
   Given that KIP-848 doesn't define an epoch for `dynamic book remaining in 
the group,` I decided to ignore the heartbeat when closing. In this way, we 
don't need to change the protocol and server-side implmentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]

2025-03-08 Thread via GitHub


mimaison commented on code in PR #19030:
URL: https://github.com/apache/kafka/pull/19030#discussion_r1986107143


##
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##
@@ -55,6 +113,2289 @@ public class UnifiedLog {
 public static final String STRAY_DIR_SUFFIX = 
LogFileUtils.STRAY_DIR_SUFFIX;
 public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET;
 
+// For compatibility, metrics are defined to be under `Log` class
+private final KafkaMetricsGroup metricsGroup = new 
KafkaMetricsGroup("kafka.log", "Log");
+
+/* A lock that guards all modifications to the log */
+private final Object lock = new Object();
+private final Map> metricNames = new 
HashMap<>();
+
+// localLog The LocalLog instance containing non-empty log segments 
recovered from disk
+private final LocalLog localLog;
+private final BrokerTopicStats brokerTopicStats;
+private final ProducerStateManager producerStateManager;
+private final boolean remoteStorageSystemEnable;
+private final ScheduledFuture producerExpireCheck;
+private final int producerIdExpirationCheckIntervalMs;
+private final String logIdent;
+private final Logger logger;
+private final LogValidator.MetricsRecorder validatorMetricsRecorder;
+
+/* The earliest offset which is part of an incomplete transaction. This is 
used to compute the
+ * last stable offset (LSO) in ReplicaManager. Note that it is possible 
that the "true" first unstable offset
+ * gets removed from the log (through record or segment deletion). In this 
case, the first unstable offset
+ * will point to the log start offset, which may actually be either part 
of a completed transaction or not
+ * part of a transaction at all. However, since we only use the LSO for 
the purpose of restricting the
+ * read_committed consumer to fetching decided data (i.e. committed, 
aborted, or non-transactional), this
+ * temporary abuse seems justifiable and saves us from scanning the log 
after deletion to find the first offsets
+ * of each ongoing transaction in order to compute a new first unstable 
offset. It is possible, however,
+ * that this could result in disagreement between replicas depending on 
when they began replicating the log.
+ * In the worst case, the LSO could be seen by a consumer to go backwards.
+ */
+private volatile Optional firstUnstableOffsetMetadata = 
Optional.empty();
+private volatile Optional partitionMetadataFile = 
Optional.empty();
+// This is the offset(inclusive) until which segments are copied to the 
remote storage.
+private volatile long highestOffsetInRemoteStorage = -1L;
+
+/* Keep track of the current high watermark in order to ensure that 
segments containing offsets at or above it are
+ * not eligible for deletion. This means that the active segment is only 
eligible for deletion if the high watermark
+ * equals the log end offset (which may never happen for a partition under 
consistent load). This is needed to
+ * prevent the log start offset (which is exposed in fetch responses) from 
getting ahead of the high watermark.
+ */
+private volatile LogOffsetMetadata highWatermarkMetadata;
+private volatile long localLogStartOffset;
+private volatile long logStartOffset;
+private volatile LeaderEpochFileCache leaderEpochCache;
+private volatile Optional topicId;
+private volatile LogOffsetsListener logOffsetsListener;
+
+/**
+ * A log which presents a unified view of local and tiered log segments.
+ *
+ * The log consists of tiered and local segments with the tiered 
portion of the log being optional. There could be an
+ * overlap between the tiered and local segments. The active segment is 
always guaranteed to be local. If tiered segments
+ * are present, they always appear at the beginning of the log, followed 
by an optional region of overlap, followed by the local
+ * segments including the active segment.
+ *
+ * NOTE: this class handles state and behavior specific to tiered 
segments as well as any behavior combining both tiered
+ * and local segments. The state and behavior specific to local segments 
are handled by the encapsulated LocalLog instance.
+ *
+ * @param logStartOffset The earliest offset allowed to be exposed to 
kafka client.
+ *   The logStartOffset can be updated by :
+ *   - user's DeleteRecordsRequest
+ *   - broker's log retention
+ *   - broker's log truncation
+ *   - broker's log recovery
+ *   The logStartOffset is used to decide the 
following:
+ *   - Log deletion. LogSegment whose nextOffset <= 
log's logStartOffset can be deleted.
+ * It may trigger log rolling if the active 
segmen

Re: [PR] KAFKA-18602: Incorrect FinalizedVersionLevel reported for dynamic KRaft quorum [kafka]

2025-03-08 Thread via GitHub


chia7712 commented on code in PR #18685:
URL: https://github.com/apache/kafka/pull/18685#discussion_r1986118137


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -522,10 +522,14 @@ class KRaftMetadataCache(
 if (kraftVersionLevel > 0) {
   finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
 }
+var metadataVersion = MetadataVersion.MINIMUM_VERSION

Review Comment:
   That seems like a deadlock to me. If a controller can't receive RPC requests 
before having a valid metadata version, it can't communicate with other static 
voters initially. This means it's impossible to obtain a valid metadata version.
   
   The approach in #19127 doesn't completely fix the issue, but the issue 
occurs at the start of static votes. During that phase, the quorum isn't ready, 
so exposing the 3.3 MV via the API response should be low risk. 
   
   To clean up, if the above description is accurate, `SimpleApiVersionManager` 
can replace `featuresPublisher` with `MetadataCache` and manually add the 3.3 
MV to the API response when `MetadataCache` lacks a valid MV. That is basically 
equal to what #19127 does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Clean up metadata module [kafka]

2025-03-08 Thread via GitHub


mumrah commented on PR #19069:
URL: https://github.com/apache/kafka/pull/19069#issuecomment-2708307265

   @sjhajharia can you merge trunk into this branch? There are some recent CI 
changes that are needed. I will try to review this over the weekend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-18931) Debug the case where kafka-share-groups.sh --describe --members does not match with the reality when share consumer is closed during broker outage

2025-03-08 Thread Chirag Wadhwa (Jira)
Chirag Wadhwa created KAFKA-18931:
-

 Summary: Debug the case where kafka-share-groups.sh --describe 
--members does not match with the reality when share consumer is closed during 
broker outage
 Key: KAFKA-18931
 URL: https://issues.apache.org/jira/browse/KAFKA-18931
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chirag Wadhwa
Assignee: Chirag Wadhwa






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-18933 Add client integration tests module [kafka]

2025-03-08 Thread via GitHub


mumrah opened a new pull request, #19144:
URL: https://github.com/apache/kafka/pull/19144

   Adds a new ":clients:integration-test" Gradle module with one example test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18195: Enter incompatible instead of leaving incompatible entires blank in Kafka Streams broker compatibility matrix [kafka]

2025-03-08 Thread via GitHub


mjsax commented on code in PR #18258:
URL: https://github.com/apache/kafka/pull/18258#discussion_r1984230751


##
docs/streams/upgrade-guide.html:
##
@@ -1735,73 +1735,38 @@ 
 
 Streams API broker 
compatibility
 
-The following table shows which versions of the Kafka Streams API are 
compatible with various Kafka broker versions.
+The following table shows which versions of the Kafka Streams API are 
compatible with various Kafka broker versions. For Kafka Stream version older 
than 2.3.x, please check 3.9 
upgrade document.
 
 
 
   
 
-Kafka Broker (columns)
+Kafka Broker (columns)
   
 
 
   
 Kafka Streams API (rows)
-0.10.0.x
-0.10.1.x and 0.10.2.x
-0.11.0.x and1.0.x and1.1.x and2.0.x
 2.1.x and2.2.x and2.3.x and2.4.x and2.5.x 
and2.6.x and2.7.x and2.8.x and3.0.x and3.1.x and3.2.x 
and3.3.x and3.4.x and3.5.x and3.6.x and3.7.x and3.8.x 
and3.9.x
 4.0.x
   
   
-0.10.0.x
+2.4.x and2.5.x
+compatible with exactly-once v2 turned off

Review Comment:
   2.4.x and 2.5.x do not have EOSv2 -- so this is confusing... There is 
nothing to be turned on/off -- EOSv2 was only added in 2.6.0 release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18736: Decide when a heartbeat should be sent [kafka]

2025-03-08 Thread via GitHub


lucasbru commented on code in PR #19121:
URL: https://github.com/apache/kafka/pull/19121#discussion_r1983675477


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final 
LogContext logContext,
 retryBackoffMaxMs,
 maxPollIntervalMs
 );
+this.pollTimer = time.timer(maxPollIntervalMs);
 }
 
+/**
+ * This will build a heartbeat request if one must be sent, determined 
based on the member
+ * state. A heartbeat is sent in the following situations:

Review Comment:
   These are not separate situations in which a heartbeat must be sent. Maybe 
you mean "when all of the following conditions apply" or soemthign?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final 
LogContext logContext,
 retryBackoffMaxMs,
 maxPollIntervalMs
 );
+this.pollTimer = time.timer(maxPollIntervalMs);
 }
 
+/**
+ * This will build a heartbeat request if one must be sent, determined 
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * 
+ * Member is part of the consumer group or wants to join it.
+ * The heartbeat interval has expired, or the member is in a state 
that indicates
+ * that it should heartbeat without waiting for the interval.
+ * 
+ * This will also determine the maximum wait time until the next poll 
based on the member's
+ * state.
+ * 
+ * If the member is without a coordinator or is in a failed state, 
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.
+ * If the member cannot send a heartbeat due to either exponential 
backoff, it will
+ * return the remaining time left on the backoff timer.
+ * If the member's heartbeat timer has not expired, It will return 
the remaining time
+ * left on the heartbeat timer.
+ * If the member can send a heartbeat, the timer is set to the 
current heartbeat interval.
+ * 
+ *
+ * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
that includes a
+ * heartbeat request if one must be sent, and the time to wait 
until the next poll.
+ */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-return new NetworkClientDelegate.PollResult(
-heartbeatRequestState.heartbeatIntervalMs(),
-Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
-);
+if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager.shouldSkipHeartbeat()) {
+membershipManager.onHeartbeatRequestSkipped();
+maybePropagateCoordinatorFatalErrorEvent();
+return NetworkClientDelegate.PollResult.EMPTY;
+}
+pollTimer.update(currentTimeMs);
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+
+membershipManager.onPollTimerExpired();
+NetworkClientDelegate.UnsentRequest leaveHeartbeat = 
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+// We can ignore the leave response because we can join before or 
after receiving the response.
+heartbeatRequestState.reset();
+heartbeatState.reset();
+return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(leaveHeartbeat));
+}
+if (shouldHeartbeatBeforeIntervalExpires() || 
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs);
+return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(request));
+} else {
+return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+}
+}
+
+/**
+ * A heartbeat should be sent without waiting for the heartbeat interval 
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignm

Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]

2025-03-08 Thread via GitHub


lucasbru commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986031099


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1153,8 +1157,7 @@ void handleRevocation(final Collection 
revokedPartitions) {
 prepareCommitAndAddOffsetsToMap(revokedActiveTasks, 
consumedOffsetsPerTask);

Review Comment:
   This could be moved into the `if` below, instead of being called (but 
executing a no-op) in the "optimized" case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18074) Add kafka client compatibility matrix

2025-03-08 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-18074:

Fix Version/s: (was: 4.0.0)

> Add kafka client compatibility matrix
> -
>
> Key: KAFKA-18074
> URL: https://issues.apache.org/jira/browse/KAFKA-18074
> Project: Kafka
>  Issue Type: Task
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Blocker
>
> in 4.0 we have many major breaking changes - JDK upgrade and protocol cleanup 
> - that may confuse users in rolling upgrade and setup env. Hence, we should 
> add a matrix for all our client modules - client, streams, and connect
> the matrix consists of following item.
> 1. supported JDKs
> 2. supported broker versions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]

2025-03-08 Thread via GitHub


cadonna commented on code in PR #19164:
URL: https://github.com/apache/kafka/pull/19164#discussion_r1986072577


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1163,10 +1166,12 @@ void handleRevocation(final Collection 
revokedPartitions) {
 // as such we just need to skip those dirty tasks in the checkpoint
 final Set dirtyTasks = new HashSet<>();
 try {
-// in handleRevocation we must call commitOffsetsOrTransaction() 
directly rather than
-// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make 
sure we don't skip the
-// offset commit because we are in a rebalance
-taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+if (revokedTasksNeedCommit) {

Review Comment:
   If we have this, do we want to also adapt the following condition in 
`TaskExecutor#commitOffsetsOrTransaction()`
   ```java
   if (!offsetsPerTask.isEmpty() || 
taskManager.streamsProducer().transactionInFlight()) {
   ```
   to
   ```java
   if (!offsetsPerTask.isEmpty()) {
   ```
   (and maybe move it to the outermost context as it was before the PR that 
introduced the bug)?



##
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##
@@ -922,6 +932,175 @@ public void onRestoreEnd(final TopicPartition 
topicPartition,
 );
 }
 
+
+private final AtomicReference transactionalProducerId = new 
AtomicReference<>();
+
+private class TestClientSupplier extends DefaultKafkaClientSupplier {
+@Override
+public Producer getProducer(final Map 
config) {
+transactionalProducerId.compareAndSet(null, (String) 
config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
+
+return new KafkaProducer<>(config, new ByteArraySerializer(), new 
ByteArraySerializer());
+}
+}
+
+final static AtomicReference taskWithData = new 
AtomicReference<>();
+final static AtomicBoolean didRevokeIdleTask = new AtomicBoolean(false);
+
+@Test
+public void 
shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() 
throws Exception {
+shouldNotProduceDuplicates(false);
+}
+
+@Test
+public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws 
Exception {
+shouldNotProduceDuplicates(true);
+}
+
+private void shouldNotProduceDuplicates(final boolean usePunctuation) 
throws Exception {

Review Comment:
   nit:
   
   ```suggestion
   @ParameterizedTest(name = "{argumentsWithNames}")
   @FieldSource("namedArguments")
   @ParameterizedTest(name = "shouldCommitAllTasks with punctuation: {0}")
   @ValueSource(booleans = {true, false})
   public void shouldNotProduceDuplicates(final boolean usePunctuation) 
throws Exception {
   ...
   }
   
   private static List namedArguments = Arrays.asList(
   arguments(named("Should not commit active tasks with pending input 
if revoked task did not make progress"), false),
   arguments(named("Should commit all tasks if revoked task triggers 
punctuation"), true)
  );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18074) Add kafka client compatibility matrix

2025-03-08 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-18074:

Fix Version/s: 4.0.0

> Add kafka client compatibility matrix
> -
>
> Key: KAFKA-18074
> URL: https://issues.apache.org/jira/browse/KAFKA-18074
> Project: Kafka
>  Issue Type: Task
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Blocker
> Fix For: 4.0.0
>
>
> in 4.0 we have many major breaking changes - JDK upgrade and protocol cleanup 
> - that may confuse users in rolling upgrade and setup env. Hence, we should 
> add a matrix for all our client modules - client, streams, and connect
> the matrix consists of following item.
> 1. supported JDKs
> 2. supported broker versions



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Clean up metadata module [kafka]

2025-03-08 Thread via GitHub


sjhajharia commented on PR #19069:
URL: https://github.com/apache/kafka/pull/19069#issuecomment-2708215564

   cc: @m1a2st / @mumrah for a final review
   Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-18422) add Kafka client upgrade path

2025-03-08 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-18422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-18422:

Fix Version/s: (was: 4.0.0)

> add Kafka client upgrade path
> -
>
> Key: KAFKA-18422
> URL: https://issues.apache.org/jira/browse/KAFKA-18422
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Blocker
>  Labels: need-kip
>
> https://github.com/apache/kafka/pull/18193#issuecomment-2572283545



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-18602: Incorrect FinalizedVersionLevel reported for dynamic KRaft quorum [kafka]

2025-03-08 Thread via GitHub


FrankYang0529 commented on code in PR #18685:
URL: https://github.com/apache/kafka/pull/18685#discussion_r1986040272


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -522,10 +522,14 @@ class KRaftMetadataCache(
 if (kraftVersionLevel > 0) {
   finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel)
 }
+var metadataVersion = MetadataVersion.MINIMUM_VERSION

Review Comment:
   @chia7712 @cmccabe @ijuma @junrao, I found that we cannot add the check of 
`uninitializedPublishers.isEmpty()` at the end of `ControllerServer#startup`. 
If users use static voter (`controller.quorum.voters`), the `NetworkClient` 
tries to send `ApiVersionsReqeust` to other voters [0]. After it receives error 
response, it disconnects the connection [1]. At the meantime, the 
`KafkaRaftClient` tries to send `VoteRequest` [2], but the request cannot be 
sent cause of disconnection. Finally, none of controller can be ready with 
empty `uninitializedPublishers`.
   
   Do we want to use another Jira to track this issue? The original issue is 
resolved by #19127.
   
   [0]
   
https://github.com/apache/kafka/blob/947c414a8c0d30223c725a91507b2d891d5851e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1087-L1089
   
   [1]
   
https://github.com/apache/kafka/blob/947c414a8c0d30223c725a91507b2d891d5851e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1027-L1031
   
   [2]
   
https://github.com/apache/kafka/blob/947c414a8c0d30223c725a91507b2d891d5851e2/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L3161-L3164



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18873: Fixed incorrect error message when exceeds 5 for transactional producers [kafka]

2025-03-08 Thread via GitHub


EsMoX commented on PR #19041:
URL: https://github.com/apache/kafka/pull/19041#issuecomment-2708346593

   > @EsMoX I just noticed there's a superfluous newline in the error message 
in the PR, can you remove it please? Thanks!
   
   @kirktrue Done, Thank you! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18927: Remove LATEST_0_11, LATEST_1_0, LATEST_1_1, LATEST_2_0 [kafka]

2025-03-08 Thread via GitHub


Parkerhiphop commented on PR #19134:
URL: https://github.com/apache/kafka/pull/19134#issuecomment-2708352912

   Thanks for the testing and information provided.
   I will take a look at these failed tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-18736: Decide when a heartbeat should be sent [kafka]

2025-03-08 Thread via GitHub


cadonna commented on code in PR #19121:
URL: https://github.com/apache/kafka/pull/19121#discussion_r1982894114


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##
@@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final 
LogContext logContext,
 retryBackoffMaxMs,
 maxPollIntervalMs
 );
+this.pollTimer = time.timer(maxPollIntervalMs);
 }
 
+/**
+ * This will build a heartbeat request if one must be sent, determined 
based on the member
+ * state. A heartbeat is sent in the following situations:
+ * 
+ * Member is part of the consumer group or wants to join it.
+ * The heartbeat interval has expired, or the member is in a state 
that indicates
+ * that it should heartbeat without waiting for the interval.
+ * 
+ * This will also determine the maximum wait time until the next poll 
based on the member's
+ * state.
+ * 
+ * If the member is without a coordinator or is in a failed state, 
the timer is set
+ * to Long.MAX_VALUE, as there's no need to send a heartbeat.
+ * If the member cannot send a heartbeat due to either exponential 
backoff, it will
+ * return the remaining time left on the backoff timer.
+ * If the member's heartbeat timer has not expired, It will return 
the remaining time
+ * left on the heartbeat timer.
+ * If the member can send a heartbeat, the timer is set to the 
current heartbeat interval.
+ * 
+ *
+ * @return {@link 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} 
that includes a
+ * heartbeat request if one must be sent, and the time to wait 
until the next poll.
+ */
 @Override
 public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
-return new NetworkClientDelegate.PollResult(
-heartbeatRequestState.heartbeatIntervalMs(),
-Collections.singletonList(makeHeartbeatRequest(currentTimeMs))
-);
+if (coordinatorRequestManager.coordinator().isEmpty() || 
membershipManager.shouldSkipHeartbeat()) {
+membershipManager.onHeartbeatRequestSkipped();
+maybePropagateCoordinatorFatalErrorEvent();
+return NetworkClientDelegate.PollResult.EMPTY;
+}
+pollTimer.update(currentTimeMs);
+if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) {
+logger.warn("Consumer poll timeout has expired. This means the 
time between " +
+"subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, " +
+"which typically implies that the poll loop is spending too 
much time processing " +
+"messages. You can address this either by increasing 
max.poll.interval.ms or by " +
+"reducing the maximum size of batches returned in poll() with 
max.poll.records.");
+
+membershipManager.onPollTimerExpired();
+NetworkClientDelegate.UnsentRequest leaveHeartbeat = 
makeHeartbeatRequestOnlyLogResponse(currentTimeMs);
+
+// We can ignore the leave response because we can join before or 
after receiving the response.
+heartbeatRequestState.reset();
+heartbeatState.reset();
+return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(leaveHeartbeat));
+}
+if (shouldHeartbeatBeforeIntervalExpires() || 
heartbeatRequestState.canSendRequest(currentTimeMs)) {
+NetworkClientDelegate.UnsentRequest request = 
makeHeartbeatRequest(currentTimeMs);
+return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), 
Collections.singletonList(request));
+} else {
+return new 
NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
+}
+}
+
+/**
+ * A heartbeat should be sent without waiting for the heartbeat interval 
to expire if:
+ * - the member is leaving the group
+ * or
+ * - the member is joining the group or acknowledging the assignment and 
for both cases there is no heartbeat request
+ *   in flight.
+ * @return
+ */
+private boolean shouldHeartbeatBeforeIntervalExpires() {
+return membershipManager.state() == MemberState.LEAVING
+||
+(membershipManager.state() == MemberState.JOINING || 
membershipManager.state() == MemberState.ACKNOWLEDGING)
+&& !heartbeatRequestState.requestInFlight();

Review Comment:
   @zheguang Thanks for your comment!
   
   The comment says:
   ```
   the member is joining the group or acknowledging the assignment and for both 
cases there is no heartbeat request in flight.
   ```
   That means, the expression should be true for
   ```