[jira] [Assigned] (KAFKA-17619) Remove zk type and instance from ClusterTest

2024-09-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17619:
--

Assignee: PoAn Yang  (was: Chia-Ping Tsai)

> Remove zk type and instance from ClusterTest
> 
>
> Key: KAFKA-17619
> URL: https://issues.apache.org/jira/browse/KAFKA-17619
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Major
>
> as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17116: New consumer may not send effective leave group if member ID received after close [kafka]

2024-09-26 Thread via GitHub


frankvicky closed pull request #16649: KAFKA-17116: New consumer may not send 
effective leave group if member ID received after close
URL: https://github.com/apache/kafka/pull/16649


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776430833


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {
+PersisterStateBatch last = overlapState.last();
+PersisterStateBatch candidate = overlapState.candidate();
+
+// remove non overlapping prefix from sortedBatches,
+// will make getting next overlapping pair efficient
+// as a prefix batch which is non overlapping will only
+// be checked once.
+if (overlapState.nonOverlapping() != null) {
+overlapState.nonOverlapping().forEach(sortedBatches::remove);
+finalBatches.addAll(overlapState.nonOverlapping());
+}
+
+if (candidate == null) {
+overlapState = BatchOverlapState.SENTINEL;
+continue;
+}
+
+// remove both last and candidate for easier
+// assessment about adding batches to sortedBatches
+sortedBatches.remove(last);
+sortedBatches.remove(candidate);
+
+// overlap and same state (last.firstOffset <= 
candidate.firstOffset) due to sort
+// covers:
+// case:12  34  5  
 6  7 (contiguous)
+// last:__   ______  ___   ___ 
  ___
+// candidate:   

Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776567400


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {
+PersisterStateBatch last = overlapState.last();
+PersisterStateBatch candidate = overlapState.candidate();
+
+// remove non overlapping prefix from sortedBatches,
+// will make getting next overlapping pair efficient
+// as a prefix batch which is non overlapping will only
+// be checked once.
+if (overlapState.nonOverlapping() != null) {
+overlapState.nonOverlapping().forEach(sortedBatches::remove);
+finalBatches.addAll(overlapState.nonOverlapping());
+}
+
+if (candidate == null) {
+overlapState = BatchOverlapState.SENTINEL;
+continue;
+}
+
+// remove both last and candidate for easier
+// assessment about adding batches to sortedBatches
+sortedBatches.remove(last);
+sortedBatches.remove(candidate);
+
+// overlap and same state (last.firstOffset <= 
candidate.firstOffset) due to sort
+// covers:
+// case:12  34  5  
 6  7 (contiguous)
+// last:__   ______  ___   ___ 
  ___
+// candidate:   

[jira] [Commented] (KAFKA-17547) Write large number of mirror maker logs when Kafka crashes

2024-09-26 Thread George Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884911#comment-17884911
 ] 

George Yang commented on KAFKA-17547:
-

Could someone who has a deep understanding of Mirror Maker 2 please take a look?

> Write large number of mirror maker logs when Kafka crashes
> --
>
> Key: KAFKA-17547
> URL: https://issues.apache.org/jira/browse/KAFKA-17547
> Project: Kafka
>  Issue Type: Task
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: OS: AlmaLinux 9.3
> CPU: 28cores
> Mem: 128GiB
> Kafka: v3.7.0
> MirrorMaker2: v3.7.1
>Reporter: George Yang
>Priority: Blocker
> Attachments: connect.log
>
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> We have deployed 2 data centers, each with one node, and on each node, there 
> is a Kafka pod and a MirrorMaker2 pod. Currently, if one of the Kafka pods 
> crashes, the running MirrorMaker2 pod will continuously output a large number 
> of logs (please see the attachment). As long as the Kafka pod remains down, 
> the logs will keep accumulating, eventually filling up the disk. Besides 
> stopping all the MirrorMaker2 instances or getting the crashed Kafka pod back 
> online, are there any other solutions to prevent this excessive logging? For 
> example, can we configure any parameters in MirrorMaker2 to handle this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776624490


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {
+PersisterStateBatch last = overlapState.last();
+PersisterStateBatch candidate = overlapState.candidate();
+
+// remove non overlapping prefix from sortedBatches,
+// will make getting next overlapping pair efficient
+// as a prefix batch which is non overlapping will only
+// be checked once.
+if (overlapState.nonOverlapping() != null) {
+overlapState.nonOverlapping().forEach(sortedBatches::remove);
+finalBatches.addAll(overlapState.nonOverlapping());
+}
+
+if (candidate == null) {
+overlapState = BatchOverlapState.SENTINEL;
+continue;
+}
+
+// remove both last and candidate for easier
+// assessment about adding batches to sortedBatches
+sortedBatches.remove(last);
+sortedBatches.remove(candidate);
+
+// overlap and same state (last.firstOffset <= 
candidate.firstOffset) due to sort
+// covers:
+// case:12  34  5  
 6  7 (contiguous)
+// last:__   ______  ___   ___ 
  ___
+// candidate:   

Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776624490


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {
+PersisterStateBatch last = overlapState.last();
+PersisterStateBatch candidate = overlapState.candidate();
+
+// remove non overlapping prefix from sortedBatches,
+// will make getting next overlapping pair efficient
+// as a prefix batch which is non overlapping will only
+// be checked once.
+if (overlapState.nonOverlapping() != null) {
+overlapState.nonOverlapping().forEach(sortedBatches::remove);
+finalBatches.addAll(overlapState.nonOverlapping());
+}
+
+if (candidate == null) {
+overlapState = BatchOverlapState.SENTINEL;
+continue;
+}
+
+// remove both last and candidate for easier
+// assessment about adding batches to sortedBatches
+sortedBatches.remove(last);
+sortedBatches.remove(candidate);
+
+// overlap and same state (last.firstOffset <= 
candidate.firstOffset) due to sort
+// covers:
+// case:12  34  5  
 6  7 (contiguous)
+// last:__   ______  ___   ___ 
  ___
+// candidate:   

Re: [PR] KAFKA-15931: Reopen TransactionIndex if channel is closed [kafka]

2024-09-26 Thread via GitHub


mjd95 commented on PR #15241:
URL: https://github.com/apache/kafka/pull/15241#issuecomment-2376460752

   We were the ones discussing with @jeqo - the "caching closed channels" issue 
was happening regularly for us on 3.8 in production, the thread doing a remote 
read was interrupted while iterating through the transaction index, we get a 
`ClosedByInterruptException` on some transaction index file channel, but the 
closed channel remains in the cache. The only way to mitigate was restarting 
the broker.
   
   We were able to reproduce by setting a low `remote.fetch.max.wait.ms` and 
setting a small segment size in order to generate many transaction index files.
   
   We tested that this PR fixes our repro and cherry-picked it onto our 
production release, we haven't seen the issue since then.
   
   We haven't seen the "race during channel close" issue (which is now also 
handled in this PR) in production.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12

2024-09-26 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884912#comment-17884912
 ] 

Chia-Ping Tsai commented on KAFKA-17617:


Scala 2.12 will be dropped in 4.0 (KAFKA-12895), so I think the best solution 
is to start the process now.

> New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
> 
>
> Key: KAFKA-17617
> URL: https://issues.apache.org/jira/browse/KAFKA-17617
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Critical
>
> I noticed a PR that failed for Jenkins but not for the GitHub Actions build. 
> Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub 
> actions is using 2.13.
> We still should support 2.12, so we should fix the GitHub actions now that 
> Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 
> builds.
> See Jenkins for 
> failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37]
> see for success in GH Actions: 
> [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093]
> see raw build for 2.13: 
> [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)

2024-09-26 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884917#comment-17884917
 ] 

Chia-Ping Tsai commented on KAFKA-12895:


[~frankvicky] Please review the KIP 
([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218)] 
before submitting the patch, thanks

> KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
> 
>
> Key: KAFKA-12895
> URL: https://issues.apache.org/jira/browse/KAFKA-12895
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip
> Fix For: 4.0.0
>
>
> We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it 
> in Apache Kafka 4.0.
>  
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17606) Include Rat errors in GitHub workflow summary

2024-09-26 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884919#comment-17884919
 ] 

Chia-Ping Tsai commented on KAFKA-17606:


[~loganzhu] I have assigned this Jira to you, thanks!

> Include Rat errors in GitHub workflow summary
> -
>
> Key: KAFKA-17606
> URL: https://issues.apache.org/jira/browse/KAFKA-17606
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: David Arthur
>Assignee: Logan Zhu
>Priority: Minor
>
> It is difficult to scroll through the output of "check -x test" to find Rat 
> errors. There is a summary printed at the end of the Gradle process, but it 
> just says something like
>  
> {code:java}
> * What went wrong:
> Execution failed for task ':rat'.
> > A failure occurred while executing org.nosphere.apache.rat.RatWork
>    > Apache Rat audit failure - 2 unapproved licenses
>          See file:///home/runner/work/kafka/kafka/build/rat/index.html {code}
> We should include specific Rat failures in the job summary similar to what we 
> do for checkstyle failures. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17606) Include Rat errors in GitHub workflow summary

2024-09-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17606:
--

Assignee: Logan Zhu

> Include Rat errors in GitHub workflow summary
> -
>
> Key: KAFKA-17606
> URL: https://issues.apache.org/jira/browse/KAFKA-17606
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: David Arthur
>Assignee: Logan Zhu
>Priority: Minor
>
> It is difficult to scroll through the output of "check -x test" to find Rat 
> errors. There is a summary printed at the end of the Gradle process, but it 
> just says something like
>  
> {code:java}
> * What went wrong:
> Execution failed for task ':rat'.
> > A failure occurred while executing org.nosphere.apache.rat.RatWork
>    > Apache Rat audit failure - 2 unapproved licenses
>          See file:///home/runner/work/kafka/kafka/build/rat/index.html {code}
> We should include specific Rat failures in the job summary similar to what we 
> do for checkstyle failures. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776452137


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {

Review Comment:
   No the logic is not that simple. A single iteration over the sorted batches 
might not be enough.
   The invariant is that the batches remain sorted even after manipulation
   
   Consider:
   ```
   A [1,10,0,1]
--- B [5,7,0,2]
--   C [5,15,0,3]
   ```
   A and B will combine to
   ```
    [1,4,0,1]
[5,7,0,2]
 ---[8,10,0,1]
   ```
   Now when combining with C, we have 2 previous batches to consider.
   
   Secondly,
   ```
   --- A [1,10,0,1]
--- B [5,7,0,2]
--- C [5,7,0,3]
   ```
   A and B will combine to
   ```
    [1,4,0,1]
[5,7,0,2]
 ---[8,10,0,1]
--- <- C - we broke invariant for being sorted by batches 
   ```
   
   In the current impl, these situations are implicitly handled by virtue of 
the treeset. Any newly generated batches are pushed back into the treeset and 
the `getOverlappingState` method finds the first overlapping pair as well as 
returns the non-overlapping prefix. 
   The non-overlapping prefix is then REMOVED from the treeset hence, once a 
batch is no longer overlapping, i

[jira] [Commented] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)

2024-09-26 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884913#comment-17884913
 ] 

Chia-Ping Tsai commented on KAFKA-12895:


Our new CI has already dropped support for Scala 2.12, so I'll begin the 
process.

> KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
> 
>
> Key: KAFKA-12895
> URL: https://issues.apache.org/jira/browse/KAFKA-12895
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>  Labels: kip
> Fix For: 4.0.0
>
>
> We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it 
> in Apache Kafka 4.0.
>  
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12895) KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)

2024-09-26 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12895:
--

Assignee: TengYao Chi  (was: Ismael Juma)

> KIP-751: Drop support for Scala 2.12 in Kafka 4.0 (deprecate in 3.0)
> 
>
> Key: KAFKA-12895
> URL: https://issues.apache.org/jira/browse/KAFKA-12895
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip
> Fix For: 4.0.0
>
>
> We propose to deprecate Scala 2.12 support n Apache Kafka 3.0 and to drop it 
> in Apache Kafka 4.0.
>  
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17587) Move test infrastructure out of core

2024-09-26 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884923#comment-17884923
 ] 

Chia-Ping Tsai commented on KAFKA-17587:


[~davidarthur] I open the https://issues.apache.org/jira/browse/KAFKA-17619 for 
removing the zk type and instance

> Move test infrastructure out of core
> 
>
> Key: KAFKA-17587
> URL: https://issues.apache.org/jira/browse/KAFKA-17587
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: David Arthur
>Priority: Major
>
> Currently, our integration test infrastructure exists in the ":core" module's 
> test sources. This means that any integration test must exist in 
> "core/src/test/java" or "core/src/test/scala". 
> This has two negative consequences. First, it means most of our integration 
> tests live in "core" which is why that module's test time is by far the 
> highest. The other related problem is that modules cannot easily define 
> integration tests in their directory due to circularity with the core test 
> dependency. 
> For example, ":metadata" could not add ClusterTests because that would 
> require a dependency on "project(':core').sourceSets.test.output" – but this 
> can't happen because ":core" depends on ":metadata".
> We should refactor our test infrastructure classes so that we can untangle 
> these dependencies.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17619) Remove zk type and instance from ClusterTest

2024-09-26 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17619:
--

 Summary: Remove zk type and instance from ClusterTest
 Key: KAFKA-17619
 URL: https://issues.apache.org/jira/browse/KAFKA-17619
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


as title



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17500: Metadata redirection for NOT_LEADER_OR_FOLLOWER [kafka]

2024-09-26 Thread via GitHub


AndrewJSchofield opened a new pull request, #17279:
URL: https://github.com/apache/kafka/pull/17279

   This PR implements the metadata redirection feature of the ShareFetch and 
ShareAcknowledge responses where an error code of NOT_LEADER_OR_FOLLOWER or 
FENCED_LEADER_EPOCH along with current leader information in the response is 
used to optimise handling of leadership changes in the client. This is applying 
the logic of KIP-951 to share group consumers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17606: Include Rat errors in GitHub workflow summary [kafka]

2024-09-26 Thread via GitHub


LoganZhuZzz opened a new pull request, #17280:
URL: https://github.com/apache/kafka/pull/17280

   In the current GitHub workflow, it has become difficult to locate Rat errors 
when running `check -x test`. To improve readability and make it easier to 
identify Rat errors, we need to include them in the workflow summary.
   
   ## Key Modifications
   - Added a call to `rat.py` in the `build.yml` file to annotate Rat errors 
after the `Compile and validate` step.
   - Ensured that the annotation for Rat reports will execute **only when the 
`Compile and validate` step fails.** 
   
   ## Purpose
   - Enhance the usability of the GitHub workflow, allowing developers to more 
easily identify and fix Rat errors.
   - Improve workflow efficiency by reducing the time spent manually searching 
for errors.
   
   ## Notes
   - **Please ensure that all relevant workflow tests are run before merging 
this PR to confirm that the new feature operates correctly.**
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17606: Include Rat errors in GitHub workflow summary [kafka]

2024-09-26 Thread via GitHub


LoganZhuZzz commented on PR #17280:
URL: https://github.com/apache/kafka/pull/17280#issuecomment-2376561401

   @chia7712 please take a look,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1776792978


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
 // poll once to update with the current metadata
 consumer.poll(Duration.ofMillis(0));
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+"No metadata requests sent");
 client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
 // no error for no current position
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-assertEquals(0, client.inFlightRequestCount());
-
+if (groupProtocol == GroupProtocol.CLASSIC) {
+// Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+// different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+// on the next background thread poll.
+assertEquals(0, client.inFlightRequestCount());
+}
 // poll once again, which should send the list-offset request
 consumer.seek(tp0, 50L);
 consumer.poll(Duration.ofMillis(0));
 // requests: list-offset, fetch
-assertEquals(2, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> {
+boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+return hasListOffsetRequest && hasFetchRequest;
+}, "No list-offset & fetch request sent");
 
 // no error for no end offset (so unknown lag)
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
 // poll once again, which should return the list-offset response
 // and hence next call would return correct lag result
-client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
 consumer.poll(Duration.ofMillis(0));
 
-assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {
+OptionalLong result = consumer.currentLag(tp0);
+return result.isPresent() && result.getAsLong() == 40L;
+}, "Subscription state is not updated");
 // requests: fetch
-assertEquals(1, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
 // one successful fetch should update the log end offset and the 
position
+ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
 final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
 final ConsumerRecords records = 
(ConsumerRecords) consumer.poll(Duration.ofMillis(1));
 assertEquals(5, records.count());
 assertEquals(55L, consumer.position(tp0));
 
 // correct lag result
-assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.

Review Comment:
   typo state



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
 // poll once to update with the current metadata
 consumer.poll(Duration.ofMillis(0));
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+"No metadata requests sent");
 client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
 // no error for no current position
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-assertEquals(0, client.inFlightRequestCount());
-
+if (groupProtocol == GroupProtocol.CLASSIC) {
+// Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+// different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+// on the next background thread poll.
+assertEquals(0, client.inFlightRequestCount());
+   

Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1776796286


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
 // poll once to update with the current metadata
 consumer.poll(Duration.ofMillis(0));
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+"No metadata requests sent");
 client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
 // no error for no current position
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-assertEquals(0, client.inFlightRequestCount());
-
+if (groupProtocol == GroupProtocol.CLASSIC) {
+// Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+// different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+// on the next background thread poll.
+assertEquals(0, client.inFlightRequestCount());
+}
 // poll once again, which should send the list-offset request
 consumer.seek(tp0, 50L);
 consumer.poll(Duration.ofMillis(0));
 // requests: list-offset, fetch
-assertEquals(2, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> {
+boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+return hasListOffsetRequest && hasFetchRequest;
+}, "No list-offset & fetch request sent");
 
 // no error for no end offset (so unknown lag)
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
 // poll once again, which should return the list-offset response
 // and hence next call would return correct lag result
-client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
 consumer.poll(Duration.ofMillis(0));
 
-assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {
+OptionalLong result = consumer.currentLag(tp0);
+return result.isPresent() && result.getAsLong() == 40L;
+}, "Subscription state is not updated");
 // requests: fetch
-assertEquals(1, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
 // one successful fetch should update the log end offset and the 
position
+ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
 final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
 final ConsumerRecords records = 
(ConsumerRecords) consumer.poll(Duration.ofMillis(1));
 assertEquals(5, records.count());
 assertEquals(55L, consumer.position(tp0));
 
 // correct lag result
-assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {

Review Comment:
   Is this change really needed? In this case we just did a successful fetch, 
so position is updated to 55 (ln 2541). We should be able to retrieve the lag 
of 45 (end offsets is already known to be 100). (Is not exactly the same case 
as above, where we needed to allow for the ListOffsets response to be processed 
in the background). Makes sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17620) Simplify share partition acquire API

2024-09-26 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-17620:
-

 Summary: Simplify share partition acquire API
 Key: KAFKA-17620
 URL: https://issues.apache.org/jira/browse/KAFKA-17620
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


Simplify share partition acquire API to remove completable future as there do 
not exist any future calls.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13938) Jenkins builds are timing out after streams integration tests

2024-09-26 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884953#comment-17884953
 ] 

João Pedro Fonseca commented on KAFKA-13938:


Hi, [~mumrah]! Since Jenkins was disabled yesterday, could this taks be closed?

> Jenkins builds are timing out after streams integration tests
> -
>
> Key: KAFKA-13938
> URL: https://issues.apache.org/jira/browse/KAFKA-13938
> Project: Kafka
>  Issue Type: Task
>  Components: build
>Reporter: David Arthur
>Priority: Major
>
> Jenkins PR builder is sporadically failing with timeouts. A few examples:
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-12136/5/execution/node/137/log/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-12207/1/execution/node/137/log/
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/view/change-requests/job/PR-12062/7/execution/node/138/log/
> In these examples, the timeout occurs after 
> 22:14:00  streams-5: SMOKE-TEST-CLIENT-CLOSED
> 22:14:00  streams-6: SMOKE-TEST-CLIENT-CLOSED
> 22:14:00  streams-3: SMOKE-TEST-CLIENT-CLOSED
> 22:14:00  streams-1: SMOKE-TEST-CLIENT-CLOSED
> 22:14:00  streams-2: SMOKE-TEST-CLIENT-CLOSED
> 22:14:00  streams-4: SMOKE-TEST-CLIENT-CLOSED
> 22:14:00  streams-0: SMOKE-TEST-CLIENT-CLOSED
> 04:07:06  Sending interrupt signal to process
> 04:07:17  
> 04:07:17  > Task :core:integrationTest FAILED
> 04:07:17  
> 04:07:17  FAILURE: Build failed with an exception.
> These examples also include an uncaught exception just before the apparent 
> stall
> 05:56:05  Exception: java.lang.AssertionError thrown from the 
> UncaughtExceptionHandler in thread 
> "appId_StreamsUncaughtExceptionHandlerIntegrationTestnull-badff0e3-a316-425b-841c-28352cd553ac-StreamThread-1"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" [kafka]

2024-09-26 Thread via GitHub


fonsdant commented on code in PR #17182:
URL: https://github.com/apache/kafka/pull/17182#discussion_r1776825720


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -3030,7 +3030,7 @@ private StreamTask createSingleSourceStateless(final 
StreamsConfig config,
 topology,
 consumer,
 new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-new StreamsMetricsImpl(metrics, "test", builtInMetricsVersion, 
time),

Review Comment:
   Sure! Thanks for this catch, Matthias!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #17165:
URL: https://github.com/apache/kafka/pull/17165#discussion_r1776831516


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##
@@ -458,14 +466,24 @@ String memberIdInfoForLog() {
 }
 
 /**
- * Join the group with the updated subscription, if the member is not part 
of it yet. If the
- * member is already part of the group, this will only ensure that the 
updated subscription
+ * Set {@link #subscriptionUpdated} to true to indicate that the 
subscription has been updated.
+ * The next {@link #maybeJoinGroup()} will join the group with the updated 
subscription, if the member is not part of it yet.
+ * If the member is already part of the group, this will only ensure that 
the updated subscription
  * is included in the next heartbeat request.
  * 
  * Note that list of topics of the subscription is taken from the shared 
subscription state.
  */
 public void onSubscriptionUpdated() {
-if (state == MemberState.UNSUBSCRIBED) {
+subscriptionUpdated.compareAndSet(false, true);
+}
+
+/**
+ * Join the group if the member is not part of it yet. This function 
separates {@link #transitionToJoining}
+ * from the {@link #onSubscriptionUpdated} to fulfill the requirement of 
the "rebalances will only occur during an
+ * active call to {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
+ */
+public void maybeJoinGroup() {

Review Comment:
   nit: what do you think about calling this something like `onConsumerPoll`? 
Just to make the func self explanatory (on poll -> transition to joining), no 
need to find usages to understand what this is all about. No strong feeling 
though, up to you. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17505: New consumer seekToBeginning/End should run in background thread [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #17230:
URL: https://github.com/apache/kafka/pull/17230#discussion_r1776850858


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -842,8 +843,8 @@ public void seekToEnd(Collection 
partitions) {
 
 acquireAndEnsureOpen();
 try {
-Collection parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
-subscriptions.requestOffsetReset(parts, 
OffsetResetStrategy.LATEST);
+cachedSubscriptionHasAllFetchPositions = false;
+applicationEventHandler.add(new ResetOffsetEvent(partitions, 
OffsetResetStrategy.LATEST));

Review Comment:
   Hey here, sorry for the late reply. I agree that these 3 methods should have 
similar behaviour, but I wonder if the case is that they should all use 
`addAndGet`? My concern is, when seeking (either of the 3 methods), we do 
expect that as soon as they complete, the position is updated so that other 
funcs will see it (ex. currentLag). We would be not respecting that anymore if 
we use add. seek could complete without having updated the offsets right?
   
   From the seek contract, it states that "Overrides the fetch offsets that the 
consumer will use on the next {@link #poll(Duration) poll(timeout)}", we we 
need to:
   1. override fetch offsets (this requires `addAndGet` I expect)
   2. make sure they are available on the next poll (again addAndGet on seek, 
otherwise consumer.seek + consumer.poll may not find the offsets if the poll 
happens when the `seek` with `add` is still being processed in the background 
right?)
   
   Thoughts?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17505: New consumer seekToBeginning/End should run in background thread [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #17230:
URL: https://github.com/apache/kafka/pull/17230#discussion_r1776850858


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -842,8 +843,8 @@ public void seekToEnd(Collection 
partitions) {
 
 acquireAndEnsureOpen();
 try {
-Collection parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
-subscriptions.requestOffsetReset(parts, 
OffsetResetStrategy.LATEST);
+cachedSubscriptionHasAllFetchPositions = false;
+applicationEventHandler.add(new ResetOffsetEvent(partitions, 
OffsetResetStrategy.LATEST));

Review Comment:
   Hey here, sorry for the late reply. I agree that these 3 methods should have 
similar behaviour, but I wonder if the case is that they should all use 
`addAndGet`? My concern is, when seeking (either of the 3 methods), we do 
expect that as soon as they complete, the position is updated so that other 
funcs will see it (ex. currentLag, poll). We would be not respecting that 
anymore if we use add. seek could complete without having updated the offsets 
right?
   
   From the seek contract, it states that "Overrides the fetch offsets that the 
consumer will use on the next {@link #poll(Duration) poll(timeout)}", we we 
need to:
   1. override fetch offsets (this requires `addAndGet` I expect)
   2. make sure they are available on the next poll (again addAndGet on seek, 
otherwise consumer.seek + consumer.poll may not find the offsets if the poll 
happens when the `seek` with `add` is still being processed in the background 
right?). Also calls like currentLag after that seek + add could not have the 
info they need I guess.
   
   Thoughts?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17496) Add heterogeneous configuration to TargetAssignmentBuilderBenchmark

2024-09-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17496?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-17496.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

> Add heterogeneous configuration to TargetAssignmentBuilderBenchmark
> ---
>
> Key: KAFKA-17496
> URL: https://issues.apache.org/jira/browse/KAFKA-17496
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Sean Quah
>Assignee: Sean Quah
>Priority: Minor
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17616) Remove KafkaServer

2024-09-26 Thread Dmitry Werner (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17616?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17884885#comment-17884885
 ] 

Dmitry Werner commented on KAFKA-17616:
---

[~cmccabe] Hello, if you are not start working on this issue, I would like to 
have it.

> Remove KafkaServer
> --
>
> Key: KAFKA-17616
> URL: https://issues.apache.org/jira/browse/KAFKA-17616
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin McCabe
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17571; Revert "MINOR: Log pending join members (#17219)" [kafka]

2024-09-26 Thread via GitHub


dajac commented on PR #17274:
URL: https://github.com/apache/kafka/pull/17274#issuecomment-2376097825

   @mumrah @chia7712 Long story short, I did not find any correctness issue in 
the new group coordinator. All the failed tests where due to pending members in 
the classic group preventing the group from stabilizing itself in time. This is 
actually a known source of flakiness in this area. Chris has been working on a 
patch to improve it: https://issues.apache.org/jira/browse/KAFKA-17115.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17571; Revert "MINOR: Log pending join members (#17219)" [kafka]

2024-09-26 Thread via GitHub


dajac merged PR #17274:
URL: https://github.com/apache/kafka/pull/17274


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17571) Revert #17219

2024-09-26 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-17571.
-
Fix Version/s: 4.0.0
   Resolution: Fixed

> Revert #17219
> -
>
> Key: KAFKA-17571
> URL: https://issues.apache.org/jira/browse/KAFKA-17571
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 4.0.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 4.0.0
>
>
> Revert https://github.com/apache/kafka/pull/17219



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17109: Move lock backoff retry to streams TaskManager [kafka]

2024-09-26 Thread via GitHub


cadonna commented on PR #17209:
URL: https://github.com/apache/kafka/pull/17209#issuecomment-2376116982

   @aliehsaeedii the following test fails consistently with a NPE with this PR:
   `StreamThreadTest.shouldRecordCommitLatency()`.
   See https://github.com/apache/kafka/actions/runs/11030675352?pr=17209
   Could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-17584) Fix incorrect synonym handling for dynamic log configurations

2024-09-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-17584.
---
Fix Version/s: 3.9.0
   3.8.1
   Resolution: Fixed

> Fix incorrect synonym handling for dynamic log configurations
> -
>
> Key: KAFKA-17584
> URL: https://issues.apache.org/jira/browse/KAFKA-17584
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.9.0
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.9.0, 3.8.1
>
>
> Updating certain dynamic configurations (for example `message.max.bytes`) 
> causes retention based on time to reset to the default value (source code) 
> for log.retention.ms. This poses a durability issue if users have set their 
> retention by using log.retention.hours or log.retention.minutes. In other 
> words, if a user has set log.retention.hours=-1 (infinite retention) and they 
> dynamically change `message.max.bytes` their retention will immediately 
> change back to the default of 60480 ms (7 days) and data before this will 
> be scheduled for deletion immediately.
> Steps to reproduce:
>  1. Add log.retention.minutes=1,log.retention.check.interval.ms=1000 to 
> server.properties
>  2. Start a single ZK or KRaft instance + a single Kafka instance
>  3. Create a topic using
> {code:java}
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic A 
> --replication-factor 1 --partitions 1 --config min.insync.replicas=1 --config 
> segment.bytes=512{code}
>  4. Create a few segments with the console producer
>  5. Observe that they are deleted after 1 minute
>  6. Use the following command
> {code:java}
> bin/kafka-configs.sh --bootstrap-server loclahost:9092 --entity-type brokers 
> --entity-default --alter --add-config message.max.bytes=1048609{code}
> (the value of `message.max.bytes` is irrelevant)
>  7. Create a few more segments with the console producer
>  8. Observe that segments are no longer deleted after 1 minute



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776535621


##
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java:
##
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class StateBatchUtilTest {
+static class BatchTestHolder {
+final String testName;
+final List curList;
+final List newList;
+final List expectedResult;
+final long startOffset;
+final boolean shouldRun;
+
+BatchTestHolder(String testName,
+List curList,
+List newList,
+List expectedResult,
+long startOffset) {
+this(testName, curList, newList, expectedResult, startOffset, 
true);
+}
+
+BatchTestHolder(String testName,
+List curList,
+List newList,
+List expectedResult,
+long startOffset,
+boolean shouldRun) {
+this.testName = testName;
+this.curList = curList;
+this.newList = newList;
+this.expectedResult = expectedResult;
+this.startOffset = startOffset;
+this.shouldRun = shouldRun;
+}
+
+@Override
+public String toString() {
+return this.testName;
+}
+}
+
+@SuppressWarnings({"MethodLength"})
+private static Stream generator() {
+return Stream.of(
+new BatchTestHolder(
+"Current batches with start offset midway are pruned.",
+Collections.singletonList(
+new PersisterStateBatch(100, 130, (byte) 0, (short) 1)
+),
+Collections.emptyList(),
+Collections.singletonList(
+new PersisterStateBatch(120, 130, (byte) 0, (short) 1)
+),
+120
+),
+
+new BatchTestHolder(
+"New batches with start offset midway are pruned.",
+Collections.emptyList(),
+Collections.singletonList(
+new PersisterStateBatch(100, 130, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(120, 130, (byte) 0, (short) 1)
+),
+120
+),
+
+new BatchTestHolder(
+"Both current and new batches empty.",
+Collections.emptyList(),
+Collections.emptyList(),
+Collections.emptyList(),
+120
+),
+
+// same state
+new BatchTestHolder(
+"Same state. Last and candidate have same first and last 
offset.",
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+-1
+),
+
+new BatchTestHolder(
+"Same state. Last and candidate have same first offset, 
candidate last offset strictly smaller.",
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(100, 105, (byte) 0, (short) 1)
+),
+

Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776452137


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {

Review Comment:
   No the logic is not that simple. A single iteration over the sorted batches 
might not be enough.
   The invariant is that the batches remain sorted even after manipulation
   
   Consider:
   ```
   A [1,10,0,1]
--- B [5,7,0,2]
--   C [5,15,0,3]
   ```
   A and B will combine to
   ```
    [1,4,0,1]
[5,7,0,2]
 ---[8,10,0,1]
   ```
   Now when combining with C, we have 2 previous batches to consider.
   
   Secondly,
   ```
   A [1,10,0,1]
--- B [5,7,0,2]
--- C [5,7,0,3]
   ```
   A and B will combine to
   ```
    [1,4,0,1]
[5,7,0,2]
 ---[8,10,0,1]
--- <- C - we broke invariant for being sorted by batches 
   ```
   
   In the current impl, these situations are implicitly handled by virtue of 
the treeset. Any newly generated batches are pushed back into the treeset and 
the `getOverlappingState` method finds the first overlapping pair as well as 
returns the non-overlapping prefix. 
   The non-overlapping prefix is then REMOVED from the treeset hence, once a 
batch is no longer overlapping, i

Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


smjn commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776535621


##
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java:
##
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class StateBatchUtilTest {
+static class BatchTestHolder {
+final String testName;
+final List curList;
+final List newList;
+final List expectedResult;
+final long startOffset;
+final boolean shouldRun;
+
+BatchTestHolder(String testName,
+List curList,
+List newList,
+List expectedResult,
+long startOffset) {
+this(testName, curList, newList, expectedResult, startOffset, 
true);
+}
+
+BatchTestHolder(String testName,
+List curList,
+List newList,
+List expectedResult,
+long startOffset,
+boolean shouldRun) {
+this.testName = testName;
+this.curList = curList;
+this.newList = newList;
+this.expectedResult = expectedResult;
+this.startOffset = startOffset;
+this.shouldRun = shouldRun;
+}
+
+@Override
+public String toString() {
+return this.testName;
+}
+}
+
+@SuppressWarnings({"MethodLength"})
+private static Stream generator() {
+return Stream.of(
+new BatchTestHolder(
+"Current batches with start offset midway are pruned.",
+Collections.singletonList(
+new PersisterStateBatch(100, 130, (byte) 0, (short) 1)
+),
+Collections.emptyList(),
+Collections.singletonList(
+new PersisterStateBatch(120, 130, (byte) 0, (short) 1)
+),
+120
+),
+
+new BatchTestHolder(
+"New batches with start offset midway are pruned.",
+Collections.emptyList(),
+Collections.singletonList(
+new PersisterStateBatch(100, 130, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(120, 130, (byte) 0, (short) 1)
+),
+120
+),
+
+new BatchTestHolder(
+"Both current and new batches empty.",
+Collections.emptyList(),
+Collections.emptyList(),
+Collections.emptyList(),
+120
+),
+
+// same state
+new BatchTestHolder(
+"Same state. Last and candidate have same first and last 
offset.",
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+-1
+),
+
+new BatchTestHolder(
+"Same state. Last and candidate have same first offset, 
candidate last offset strictly smaller.",
+Collections.singletonList(
+new PersisterStateBatch(100, 110, (byte) 0, (short) 1)
+),
+Collections.singletonList(
+new PersisterStateBatch(100, 105, (byte) 0, (short) 1)
+),
+

[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout

2024-09-26 Thread PoAn Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

PoAn Yang updated KAFKA-17618:
--
Labels: kip-848  (was: )

> group consumer heartbeat interval should be less than session timeout
> -
>
> Key: KAFKA-17618
> URL: https://issues.apache.org/jira/browse/KAFKA-17618
> Project: Kafka
>  Issue Type: Task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848
>
> [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session]
>  mentions:
> bq. The member is expected to heartbeat every 
> group.consumer.heartbeat.interval.ms in order to keep its session opened. If 
> it does not heartbeat at least once within the 
> group.consumer.session.timeout.ms, the group coordinator will kick the member 
> out from the group.
> To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than 
> _group.consumer.session.timeout.ms_, we can add validation for it.
> We can do similar validation for _group.share.heartbeat.interval.ms_ and 
> _group.share.session.timeout.ms_ as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout

2024-09-26 Thread PoAn Yang (Jira)
PoAn Yang created KAFKA-17618:
-

 Summary: group consumer heartbeat interval should be less than 
session timeout
 Key: KAFKA-17618
 URL: https://issues.apache.org/jira/browse/KAFKA-17618
 Project: Kafka
  Issue Type: Task
Reporter: PoAn Yang
Assignee: PoAn Yang


[KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session]
 mentions:
bq. The member is expected to heartbeat every 
group.consumer.heartbeat.interval.ms in order to keep its session opened. If it 
does not heartbeat at least once within the group.consumer.session.timeout.ms, 
the group coordinator will kick the member out from the group.

To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than 
_group.consumer.session.timeout.ms_, we can add validation for it.

We can do similar validation for _group.share.heartbeat.interval.ms_ and 
_group.share.session.timeout.ms_ as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15859: Add timeout field to the ListOffsets request [kafka]

2024-09-26 Thread via GitHub


mumrah commented on PR #17112:
URL: https://github.com/apache/kafka/pull/17112#issuecomment-2377323567

   @satishd @showuon this PR broke some tests and has caused trunk builds to 
fail. 
   
   The latest CI run for this PR was 
https://github.com/apache/kafka/actions/runs/11027485627 which shows both of 
the JUnit steps failing.
   
   Unlike with Jenkins, we need to look at _every_ failed test build -- 
especially before merging. Looking at these two failed jobs, we can see we have 
actual failed tests and not just flaky ones 
   
   https://github.com/user-attachments/assets/f02251bc-5dff-4d22-92e1-b4e01a5153fe";>
   
   https://github.com/apache/kafka/actions/runs/11027485627/job/30626323008
   
   We want to see all the status checks in the PR to be green before merging. I 
would like to have branch protections in place to prevent this sort of 
regression, but until we sort out the flaky tests we can't really.
   
   Anyways, I'm not meaning to pick on this PR -- just trying to raise 
awareness of our "new normal" for build expectations :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12

2024-09-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-17617:


Assignee: David Arthur

> New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
> 
>
> Key: KAFKA-17617
> URL: https://issues.apache.org/jira/browse/KAFKA-17617
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Assignee: David Arthur
>Priority: Minor
> Fix For: 3.7.2, 3.8.1, 3.9.1
>
>
> I noticed a PR that failed for Jenkins but not for the GitHub Actions build. 
> Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub 
> actions is using 2.13.
> We still should support 2.12, so we should fix the GitHub actions now that 
> Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 
> builds.
> See Jenkins for 
> failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37]
> see for success in GH Actions: 
> [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093]
> see raw build for 2.13: 
> [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]

2024-09-26 Thread via GitHub


kamalcph commented on PR #17287:
URL: https://github.com/apache/kafka/pull/17287#issuecomment-2377344257

   Thanks @FrankYang0529 for fixing this test!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15859: Add timeout field to the ListOffsets request [kafka]

2024-09-26 Thread via GitHub


kamalcph commented on PR #17112:
URL: https://github.com/apache/kafka/pull/17112#issuecomment-2377346567

   The failed test are fixed in #17287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17621; Reduce logging verbosity on ConsumerGroupHeartbeat path [kafka]

2024-09-26 Thread via GitHub


dajac commented on code in PR #17288:
URL: https://github.com/apache/kafka/pull/17288#discussion_r1777317218


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1984,8 +1986,10 @@ private CoordinatorResult 
classicGroupJoinToConsumerGro
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
-groupId, subscriptionMetadata);
+if (log.isDebugEnabled()) {
+log.info("[GroupId {}] Computed new subscription metadata: 
{}.",

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17621; Reduce logging verbosity on ConsumerGroupHeartbeat path [kafka]

2024-09-26 Thread via GitHub


dajac commented on code in PR #17288:
URL: https://github.com/apache/kafka/pull/17288#discussion_r1777312193


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -1984,8 +1986,10 @@ private CoordinatorResult 
classicGroupJoinToConsumerGro
 );
 
 if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
-log.info("[GroupId {}] Computed new subscription metadata: 
{}.",
-groupId, subscriptionMetadata);
+if (log.isDebugEnabled()) {
+log.info("[GroupId {}] Computed new subscription metadata: 
{}.",

Review Comment:
   It should be debug too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]

2024-09-26 Thread via GitHub


mumrah merged PR #17287:
URL: https://github.com/apache/kafka/pull/17287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17078: Add SecurityManagerCompatibility shim [kafka]

2024-09-26 Thread via GitHub


gharris1727 commented on PR #16522:
URL: https://github.com/apache/kafka/pull/16522#issuecomment-2377340024

   Hey @danishnawab thanks for reminding us.
   
   Since this may only be included in future releases (3.7.2, 3.8.1, 3.9.1, 
4.0.0), existing releases you can use the workaround of setting the system 
property `java.security.manager` to `allow`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]

2024-09-26 Thread via GitHub


lianetm merged PR #16982:
URL: https://github.com/apache/kafka/pull/16982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback

2024-09-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17623:
--
Component/s: clients

> Flaky 
> testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
> 
>
> Key: KAFKA-17623
> URL: https://issues.apache.org/jira/browse/KAFKA-17623
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Flaky for the new consumer, failing with :
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error at 
> app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618)
> ...
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition topic-0 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171)
>  
> Flaky behaviour:
>  
> https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14562 [1/2]: Implement epoch bump after every transaction [kafka]

2024-09-26 Thread via GitHub


jolshan merged PR #16719:
URL: https://github.com/apache/kafka/pull/16719


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics [kafka]

2024-09-26 Thread via GitHub


kirktrue commented on code in PR #17244:
URL: https://github.com/apache/kafka/pull/17244#discussion_r1777422783


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1279,7 +1281,7 @@ private void releaseAssignmentAndLeaveGroup(final Timer 
timer) {
 UnsubscribeEvent unsubscribeEvent = new 
UnsubscribeEvent(calculateDeadlineMs(timer));
 applicationEventHandler.add(unsubscribeEvent);
 try {
-processBackgroundEvents(unsubscribeEvent.future(), timer);
+processBackgroundEvents(unsubscribeEvent.future(), timer, e -> e 
instanceof InvalidTopicException);

Review Comment:
   Is there a chance that the `InvalidTopicException` could be nested inside 
another exception?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 commented on code in PR #17165:
URL: https://github.com/apache/kafka/pull/17165#discussion_r1776908662


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java:
##
@@ -458,14 +466,24 @@ String memberIdInfoForLog() {
 }
 
 /**
- * Join the group with the updated subscription, if the member is not part 
of it yet. If the
- * member is already part of the group, this will only ensure that the 
updated subscription
+ * Set {@link #subscriptionUpdated} to true to indicate that the 
subscription has been updated.
+ * The next {@link #maybeJoinGroup()} will join the group with the updated 
subscription, if the member is not part of it yet.
+ * If the member is already part of the group, this will only ensure that 
the updated subscription
  * is included in the next heartbeat request.
  * 
  * Note that list of topics of the subscription is taken from the shared 
subscription state.
  */
 public void onSubscriptionUpdated() {
-if (state == MemberState.UNSUBSCRIBED) {
+subscriptionUpdated.compareAndSet(false, true);
+}
+
+/**
+ * Join the group if the member is not part of it yet. This function 
separates {@link #transitionToJoining}
+ * from the {@link #onSubscriptionUpdated} to fulfill the requirement of 
the "rebalances will only occur during an
+ * active call to {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
+ */
+public void maybeJoinGroup() {

Review Comment:
   Thanks for the suggestion. I updated PR description and change 
`maybeJoinGroup` to `onConsumerPoll`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16733: Add share group record support to OffsetsMessageParser [kafka]

2024-09-26 Thread via GitHub


AndrewJSchofield opened a new pull request, #17282:
URL: https://github.com/apache/kafka/pull/17282

   KIP-932 added a bunch of new record schemas to the consumer offsets topic 
which need to be added to the OffsetsMessageParser in kafka-dump-log.sh.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16394) ForeignKey LEFT join propagates null value on foreignKey change

2024-09-26 Thread Andras Hatvani (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885016#comment-17885016
 ] 

Andras Hatvani commented on KAFKA-16394:


[~mjsax] Any update on the fix and release of this issue? I'm currently stuck 
on 3.6.1 in a project with dozens of modules due to this error.

> ForeignKey LEFT join propagates null value on foreignKey change
> ---
>
> Key: KAFKA-16394
> URL: https://issues.apache.org/jira/browse/KAFKA-16394
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.7.0
>Reporter: Ayoub Omari
>Assignee: Ayoub Omari
>Priority: Major
> Attachments: ForeignJoinTest.scala, JsonSerde.scala
>
>
> We have two topics : _left-topic[String, LeftRecord]_ and 
> _right-topic[String, String]_
> where _LeftRecord_ :
> {code:scala}
>  case class LeftRecord(foreignKey: String, name: String){code}
> we do a simple *LEFT* foreign key join on left-topic's foreignKey field. The 
> resulting join value is the value in right-topic.
>  
> +*Scenario1: change foreignKey*+
> Input the following
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2") 
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
> {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, 2){code}
>  
> *+Actual result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, 2){code}
>  
> A null is propagated to the join result when the foreign key changes
>  
> +*Scenario 2: Delete PrimaryKey*+
> Input
> {code:scala}
> rightTopic.pipeInput("fk1", "1")
> rightTopic.pipeInput("fk2", "2")
> leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
> leftTopic.pipeInput("pk1", null) {code}
>  
> *+Expected result+*
> {code:scala}
> KeyValue(pk1, 1)
> KeyValue(pk1, null) {code}
>  
> *+Actual result+*
> {code:java}
> KeyValue(pk1, 1)
> KeyValue(pk1, null)
> KeyValue(pk1, null) {code}
> An additional null is propagated to the join result.
>  
> This bug doesn't exist on versions 3.6.0 and below.
>  
> I believe the issue comes from the line 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]
> where we propagate the deletion in the two scenarios above
>  
> Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17619: Remove zk type and instance from ClusterTest [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 opened a new pull request, #17284:
URL: https://github.com/apache/kafka/pull/17284

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


AndrewJSchofield commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1777017514


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {
+PersisterStateBatch last = overlapState.last();
+PersisterStateBatch candidate = overlapState.candidate();
+
+// remove non overlapping prefix from sortedBatches,
+// will make getting next overlapping pair efficient
+// as a prefix batch which is non overlapping will only
+// be checked once.
+if (overlapState.nonOverlapping() != null) {
+overlapState.nonOverlapping().forEach(sortedBatches::remove);
+finalBatches.addAll(overlapState.nonOverlapping());
+}
+
+if (candidate == null) {
+overlapState = BatchOverlapState.SENTINEL;
+continue;
+}
+
+// remove both last and candidate for easier
+// assessment about adding batches to sortedBatches
+sortedBatches.remove(last);
+sortedBatches.remove(candidate);
+
+// overlap and same state (last.firstOffset <= 
candidate.firstOffset) due to sort
+// covers:
+// case:12  34  5  
 6  7 (contiguous)
+// last:__   ______  ___   ___ 
  ___
+// candi

[PR] MINOR: Cache topic resolution in TopicIds set [kafka]

2024-09-26 Thread via GitHub


squah-confluent opened a new pull request, #17285:
URL: https://github.com/apache/kafka/pull/17285

   Looking up topics in a TopicsImage is relatively slow. Cache the results
   in TopicIds to improve assignor performance. In benchmarks, we see a
   noticeable improvement in performance in the heterogeneous case.
   
   Before
   ```
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  CntScore   Error  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10 
HOMOGENEOUS  1000  avgt5   36.400 ± 3.004  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10   
HETEROGENEOUS  1000  avgt5  158.340 ± 0.825  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10 
HOMOGENEOUS  1000  avgt51.329 ± 0.041  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10   
HETEROGENEOUS  1000  avgt5  382.901 ± 6.203  ms/op
   ```
   
   After
   ```
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  CntScore   Error  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10 
HOMOGENEOUS  1000  avgt5   36.465 ± 1.954  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10   
HETEROGENEOUS  1000  avgt5  114.043 ± 1.424  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10 
HOMOGENEOUS  1000  avgt51.454 ± 0.019  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL 
UNIFORM  false  1 10   
HETEROGENEOUS  1000  avgt5  342.840 ± 2.744  ms/op
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17624) Remove the E2E uses of accessing ACLs from zk

2024-09-26 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17624:
--

 Summary: Remove the E2E uses of accessing ACLs from zk
 Key: KAFKA-17624
 URL: https://issues.apache.org/jira/browse/KAFKA-17624
 Project: Kafka
  Issue Type: Sub-task
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


To remove ZooKeeper code from AclCommand, we first need to remove the related 
system tests. This task should remove zookeeper.py#list_acls and its usages, 
including zookeeper_tls_test.py, zookeeper_tls_encrypt_only_test.py, 
zookeeper_security_upgrade_test.py, `test_rolling_upgrade_phase_two`, 
`test_rolling_upgrade_sasl_mechanism_phase_two`, and  
upgrade_test.py#perform_upgrade.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17624) Remove the E2E uses of accessing ACLs from zk

2024-09-26 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885050#comment-17885050
 ] 

TengYao Chi commented on KAFKA-17624:
-

Hi [~chia7712] 

If you are not start working on this issue, I would like to give it a try.:)

> Remove the E2E uses of accessing ACLs from zk
> -
>
> Key: KAFKA-17624
> URL: https://issues.apache.org/jira/browse/KAFKA-17624
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> To remove ZooKeeper code from AclCommand, we first need to remove the related 
> system tests. This task should remove zookeeper.py#list_acls and its usages, 
> including zookeeper_tls_test.py, zookeeper_tls_encrypt_only_test.py, 
> zookeeper_security_upgrade_test.py, `test_rolling_upgrade_phase_two`, 
> `test_rolling_upgrade_sasl_mechanism_phase_two`, and  
> upgrade_test.py#perform_upgrade.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] MINOR: fix failed cases in FeatureCommandTest [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 opened a new pull request, #17287:
URL: https://github.com/apache/kafka/pull/17287

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17317: Validate and maybe trigger downgrade after static member replacement [kafka]

2024-09-26 Thread via GitHub


dongnuo123 commented on PR #17008:
URL: https://github.com/apache/kafka/pull/17008#issuecomment-2377169743

   Reopened https://github.com/apache/kafka/pull/17286


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17540: Create floating tag on trunk for CI cache [kafka]

2024-09-26 Thread via GitHub


mumrah commented on code in PR #17204:
URL: https://github.com/apache/kafka/pull/17204#discussion_r1777223177


##
committer-tools/update-cache.sh:
##
@@ -0,0 +1,35 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+key=$(
+  gh cache list \
+--key 'gradle-home-v1|Linux-X64|test' \
+--sort 'created_at' \
+--limit 1 \
+--json 'key' \
+--jq '.[].key'
+)
+
+cut -d '-' -f 5 <<< "$key"
+
+if ! git config --get alias.update-cache >/dev/null; then
+  this_path=$(realpath "$0")
+  git config alias.update-cache "!bash $this_path"
+  echo
+  echo "Now, you can use 'git update-cache' as alias to execute this script."
+fi

Review Comment:
   Although I like this feature, it's probably more polite to let committer 
manually install the script as an alias rather than automatically modifying 
their git config.
   
   Instead of auto installing it, have this if statement print instructions for 
adding the alias.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17540: Create floating tag on trunk for CI cache [kafka]

2024-09-26 Thread via GitHub


mumrah commented on PR #17204:
URL: https://github.com/apache/kafka/pull/17204#issuecomment-2377207907

   > Maybe, we keep it more agnostic and print a message guiding the user to 
fetch. WDYT?
   
   Good point on different remote names. Let's still attempt to update the ref 
in this script. If it can't be done, print a warning and recommend to the user 
to update their remote.
   
   ```
   > git update-cache
   Cannot update 'trunk-cached' because SHA 
264131cdaaef3f4696942f26534b3f61f3a2a162 does not exist locally. Please update 
your remote and try again.
   > echo $? 
   1
   > git fetch origin-or-whatever
   > git update-cache
   Local branch 'trunk-cached' updated to 
264131cdaaef3f4696942f26534b3f61f3a2a162
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12

2024-09-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-17617:
-
Fix Version/s: 3.9.1

> New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
> 
>
> Key: KAFKA-17617
> URL: https://issues.apache.org/jira/browse/KAFKA-17617
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Critical
> Fix For: 3.9.1
>
>
> I noticed a PR that failed for Jenkins but not for the GitHub Actions build. 
> Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub 
> actions is using 2.13.
> We still should support 2.12, so we should fix the GitHub actions now that 
> Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 
> builds.
> See Jenkins for 
> failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37]
> see for success in GH Actions: 
> [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093]
> see raw build for 2.13: 
> [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12

2024-09-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-17617:
-
Priority: Minor  (was: Critical)

> New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
> 
>
> Key: KAFKA-17617
> URL: https://issues.apache.org/jira/browse/KAFKA-17617
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Minor
> Fix For: 3.9.1
>
>
> I noticed a PR that failed for Jenkins but not for the GitHub Actions build. 
> Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub 
> actions is using 2.13.
> We still should support 2.12, so we should fix the GitHub actions now that 
> Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 
> builds.
> See Jenkins for 
> failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37]
> see for success in GH Actions: 
> [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093]
> see raw build for 2.13: 
> [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17617) New GitHub Actions build builds Java 8 with 2.13 instead of 2.12

2024-09-26 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-17617:
-
Fix Version/s: 3.7.2
   3.8.1

> New GitHub Actions build builds Java 8 with 2.13 instead of 2.12
> 
>
> Key: KAFKA-17617
> URL: https://issues.apache.org/jira/browse/KAFKA-17617
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Priority: Minor
> Fix For: 3.7.2, 3.8.1, 3.9.1
>
>
> I noticed a PR that failed for Jenkins but not for the GitHub Actions build. 
> Tracing it down, it looks like Jenkins was using Scala 2.12 and GitHub 
> actions is using 2.13.
> We still should support 2.12, so we should fix the GitHub actions now that 
> Jenkins is removed. Until we fix, folks can merge in code that breaks 2.12 
> builds.
> See Jenkins for 
> failure:[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-17093/5/cloudbees-pipeline-explorer/?filter=37]
> see for success in GH Actions: 
> [https://github.com/apache/kafka/actions/runs/11039342155/job/30672276597?pr=17093]
> see raw build for 2.13: 
> [https://productionresultssa2.blob.core.windows.net/actions-results/4f08a774-9253-4d15-8617-a627e9961b76/workflow-job-run-716dd2a8-3073-58cf-0d26-4a389b46b592/logs/job/job-logs.txt?rsct=text%2Fplain&se=2024-09-26T01%3A05%3A04Z&sig=klIXSOTwKN9WrBvtdsN6j45DbSqg7ikwow%2FGETJy5pc%3D&ske=2024-09-26T12%3A26%3A46Z&skoid=ca7593d4-ee42-46cd-af88-8b886a2f84eb&sks=b&skt=2024-09-26T00%3A26%3A46Z&sktid=398a6654-997b-47e9-b12b-9515b896b4de&skv=2024-05-04&sp=r&spr=https&sr=b&st=2024-09-26T00%3A54%3A59Z&sv=2024-05-04]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #17244:
URL: https://github.com/apache/kafka/pull/17244#discussion_r1777270499


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##
@@ -244,6 +245,9 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
 }, waitTimeMs = 5000, msg = "An InvalidTopicException should be thrown.")
 
 assertEquals(s"Invalid topics: [${invalidTopicName}]", 
exception.getMessage)
+assertDoesNotThrow(new Executable {

Review Comment:
   I like the idea of including the unsubscribe in the test, but with this we 
loose the coverage we initially had for subscribe(invalid) + close (if there's 
a regression and releaseAssignment stops ignoring the error we won't catch it). 
What about having 2 tests, mostly doing the same intially, but then one calls 
unsubscribe and asserts does not throw (like we have now), and the other one 
calls close explicitely and asserts the same, what do you think? (we could 
reuse most of the code for the 2 tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]

2024-09-26 Thread via GitHub


mumrah commented on PR #17287:
URL: https://github.com/apache/kafka/pull/17287#issuecomment-2377227125

   Looks like the tests were failing on the PR, so this should not have been 
merged.
   
   https://github.com/apache/kafka/actions/runs/11027485627


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] HOTFIX: fix failed cases in FeatureCommandTest [kafka]

2024-09-26 Thread via GitHub


mumrah commented on PR #17287:
URL: https://github.com/apache/kafka/pull/17287#issuecomment-2377232171

   @chia7712 I agree we should eventually have some "acceptance" tests which do 
some high level sanity checks. In this case, a change to MetadataVersion 
(located in server-common) will cause pretty much all tests to run except maybe 
client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16394: Fix null propagation in foreign key join result [kafka]

2024-09-26 Thread via GitHub


wrwksexahatvani commented on PR #15607:
URL: https://github.com/apache/kafka/pull/15607#issuecomment-2376945505

   @mjsax @wcarlson5 @gongxuanzhang This is a major blocker as it leads to data 
loss as happened in dozens of my projects. When will the fix for this be 
integrated?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17480: New consumer commit all consumed should retrieve offsets in background thread [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #17150:
URL: https://github.com/apache/kafka/pull/17150#discussion_r1777058782


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##
@@ -162,23 +163,44 @@ private void process(final PollEvent event) {
 }
 
 private void process(final AsyncCommitEvent event) {
-if (!requestManagers.commitRequestManager.isPresent()) {
-return;
-}
-
-CommitRequestManager manager = 
requestManagers.commitRequestManager.get();
-CompletableFuture future = manager.commitAsync(event.offsets());
-future.whenComplete(complete(event.future()));
+process((CommitEvent) event);

Review Comment:
   We have 2 separate event types (async and sync), to then join them together 
in one here, to then split them again for the actual process with:
   ```
   if (event.type() == Type.COMMIT_ASYNC) {
   future = manager.commitAsync(offsets);
   } else {
   future = manager.commitSync(offsets, event.deadlineMs());
   }
   ```
   I get that with this we can reuse a bit but wonder if it's worth the twisted 
flow. Could we maybe keep them separate (as they originally are when the events 
are created), then process(Sync) that ends up calling the mgr.commitSync, and 
process(Async) calling manager.commitAsync, and just encapsulate in funcs what 
we want to reuse in both? (ex. maybeUpdateLastSeenEpochIfNewer() with lines 
188-191 that would be called from both, similar for the logic to retrieve 
offsets from the event, ln 180-181). What do you think?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
 assertInstanceOf(IllegalStateException.class, e.getCause());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testSyncCommitEventWithOffsets(boolean withGroupId) {

Review Comment:
   since this is testing commit it does need a group id, so it should be only 
for withGroupId=true I expect



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java:
##
@@ -208,6 +206,110 @@ public void testSeekUnvalidatedEventWithException() {
 assertInstanceOf(IllegalStateException.class, e.getCause());
 }
 
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testSyncCommitEventWithOffsets(boolean withGroupId) {
+final long deadlineMs = 12345;
+TopicPartition tp = new TopicPartition("topic", 0);
+Map offsets = 
Collections.singletonMap(tp, new OffsetAndMetadata(10, Optional.of(1), null));
+SyncCommitEvent event = new SyncCommitEvent(offsets, false, 
deadlineMs);
+
+setupProcessor(withGroupId);
+if (withGroupId) {
+doReturn(true).when(metadata).updateLastSeenEpochIfNewer(tp, 1);
+
doReturn(CompletableFuture.completedFuture(null)).when(commitRequestManager).commitSync(offsets,
 deadlineMs);
+}
+
+processor.process(event);
+verify(subscriptionState, never()).allConsumed();
+if (withGroupId) {
+verify(metadata).updateLastSeenEpochIfNewer(tp, 1);
+verify(commitRequestManager).commitSync(offsets, deadlineMs);
+} else {
+verify(metadata, never()).updateLastSeenEpochIfNewer(tp, 1);
+verify(commitRequestManager, never()).commitSync(offsets, 
deadlineMs);
+}
+assertDoesNotThrow(() -> event.future().get());
+}
+
+@ParameterizedTest
+@ValueSource(booleans = {true, false})
+public void testSyncCommitEventWithCommitAllConsumed(boolean withGroupId) {

Review Comment:
   same, only relevant for withGroupId=true right? (and the all the other 
commit tests down below)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17488: Cleanup (test) code for Kafka Streams "metric version" [kafka]

2024-09-26 Thread via GitHub


fonsdant commented on PR #17182:
URL: https://github.com/apache/kafka/pull/17182#issuecomment-2376967494

   Accidentally, I must have reverted the removal of the 
`builtInMetricsVersion` passing argument to `StreamsMetricsImpl` in 
`TopologyTestDriver`, which has caused the build to fail. The last commit fixed 
it.
   
   Also, I have removed the others `builtInMetricsVersion` values ​​that had 
not already been removed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17622) Kafka Streams Timeout During Partition Rebalance

2024-09-26 Thread Alieh Saeedi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alieh Saeedi updated KAFKA-17622:
-
Summary: Kafka Streams Timeout During Partition Rebalance   (was: Kafka 
Streams Timeout During Partition Rebalance - Seeking Insights on 
NotLeaderOrFollowerException)

> Kafka Streams Timeout During Partition Rebalance 
> -
>
> Key: KAFKA-17622
> URL: https://issues.apache.org/jira/browse/KAFKA-17622
> Project: Kafka
>  Issue Type: Bug
>Reporter: Alieh Saeedi
>Assignee: Alieh Saeedi
>Priority: Major
>
> Re: 
> [https://forum.confluent.io/t/kafka-streams-timeout-during-partition-rebalance-seeking-insights-on-notleaderorfollowerexception/11362]
> Calling {{{}Consumer.position() from KS{}}}treams  for computing the offset 
> that must be committed suffers from a race condition so that by the time we 
> want to commit, the position may be gone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17582) Unpredictable consumer position after transaction abort

2024-09-26 Thread Kyle Kingsbury (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885032#comment-17885032
 ] 

Kyle Kingsbury commented on KAFKA-17582:


I've done some more digging here, and written a Jepsen test specifically for 
rewind-vs-advance behavior: 
[https://github.com/jepsen-io/redpanda/blob/main/src/jepsen/redpanda/workload/abort.clj].
 Try something like:

{{lein run test --db kafka -w abort --safe --concurrency 5n --sub-via assign 
--rate 1000 --time-limit 120}}

I don't have clear answers yet, but it seems like this behavior is sensitive to 
whether you do the all writes in advance and only perform read-only 
transactions, or if you mix writes into the reading transactions as well. I 
also suspect that maybe advancing *is* the default behavior (uh oh) and 
rewinding is actually a consequence of a rebalance.

> Unpredictable consumer position after transaction abort
> ---
>
> Key: KAFKA-17582
> URL: https://issues.apache.org/jira/browse/KAFKA-17582
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, documentation
>Affects Versions: 3.8.0
>Reporter: Kyle Kingsbury
>Priority: Critical
>  Labels: abort, offset, transaction
> Attachments: 20240919T124411.740-0500(1).zip, Screenshot from 
> 2024-09-19 18-45-34.png
>
>
> With the official Kafka Java client, version 3.8.0, the position of consumers 
> after a transaction aborts appears unpredictable. Sometimes the consumer 
> moves on, skipping over the records it polled in the aborted transaction. 
> Sometimes it rewinds to read them again. Sometimes it rewinds *further* than 
> the most recent transaction.
> Since the goal of transactions is to enable "exactly-once semantics", it 
> seems sensible that the consumer should rewind on abort, such that any 
> subsequent transactions would start at the same offsets. Not rewinding leads 
> to data loss, since messages are consumed but their effects are not 
> committed. Rewinding too far is... just weird.
> I'm seeing this issue in Jepsen tests of Kafka 3.0.0 and other 
> Kafka-compatible systems.  It occurs without faults, and with a single 
> producer and consumer; no other concurrent processes. Here's the producer and 
> consumer config:
>  
> {{{}Producer config: {"socket.connection.setup.timeout.max.ms" 1000, 
> "transactional.id" "jt1", "bootstrap.servers" "n3:9092", "request.timeout.ms" 
> 3000, "enable.idempotence" true, "max.block.ms" 1, "value.serializer" 
> "org.apache.kafka.common.serialization.LongSerializer", "retries" 1000, 
> "key.serializer" "org.apache.kafka.common.serialization.LongSerializer", 
> "socket.connection.setup.timeout.ms" 500, "reconnect.backoff.max.ms" 1000, 
> "delivery.timeout.ms" 1, "acks" "all", "transaction.timeout.ms" 1000{
> {{{}Consumer config: {"socket.connection.setup.timeout.max.ms" 1000, 
> "bootstrap.servers" "n5:9092", "request.timeout.ms" 1, 
> "connections.max.idle.ms" 6, "session.timeout.ms" 6000, 
> "heartbeat.interval.ms" 300, "key.deserializer" 
> "org.apache.kafka.common.serialization.LongDeserializer", "group.id" 
> "jepsen-group", "metadata.max.age.ms" 6, "auto.offset.reset" "earliest", 
> "isolation.level" "read_committed", "socket.connection.setup.timeout.ms" 500, 
> "value.deserializer" 
> "org.apache.kafka.common.serialization.LongDeserializer", 
> "enable.auto.commit" false, "default.api.timeout.ms" 1{
>  
> Attached is a test run that shows this behavior, as well as a visualization 
> of the reads (polls) and writes (sends) of a single topic-partition.
> In this plot, time flows down, and offsets run left to right. Each 
> transaction is a single horizontal line. `w1` denotes a send of value 1, and 
> `r2` denotes a poll of read 2. All operations here are performed by the sole 
> process in the system, which has a single Kafka consumer and a single Kafka 
> client. First,  a transaction writes 35 and commits. Second, a transaction 
> reads 35 and aborts. Third, a transaction reads 35 and aborts: the consumer 
> has clearly re-wound to show the same record twice.
> Then a transaction writes 37. Immediately thereafter a transaction reads 37 
> and 38. Unlike before, it did *not* rewind. This transaction also aborts.
> Finally, a transaction writes 39 and 40. Then a transaction reads 39 and 40. 
> This transaction commits! Values 35, 37, and 38 have been lost!
> It doesn't seem possible that this is the effect of a consumer rebalance: 
> rebalancing should start off the consumer at the last *committed* offset, and 
> the last committed offset in this history was actually value 31–it should 
> have picked up at 35, 37, etc. This test uses auto.offset.reset=earliest, so 
> if the commit were somehow missing, it should have rewound to the

Re: [PR] KAFKA-17581: AsyncKafkaConsumer can't unsubscribe invalid topics [kafka]

2024-09-26 Thread via GitHub


lianetm commented on PR #17244:
URL: https://github.com/apache/kafka/pull/17244#issuecomment-2377319827

   Regarding:
   > I haven't had a chance to dig into the root cause as I'm curious what can 
be done at that layer so that the consumer doesn't have to be aware of it.
   
   My thoughts in case it helps. As I understand it, the root cause is a 
combination of:
   1. the consumer keeps requesting metadata for a topic (invalid) while 
"needed" (until it unsubscribes) -> this is common for both consumers, because 
it considers it can "recover" from it
   
https://github.com/apache/kafka/blob/7c429f3514dd50e98f42c74595c35b5f0b32b7d7/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L643
  (recover meaning no longer subscribed, so unsubscribed)
   2. the legacy consumer does poll the network client in this test (no need to 
send leave because it's not in the group), but the async consumer does poll 
(background thread continuously polls the network client)
   
   So I think it makes sense to handle this at the unsubscribe level, where we 
understand why it's ok to ignore the exception (because it's actually the way 
to recover from it). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1776899718


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
 // poll once to update with the current metadata
 consumer.poll(Duration.ofMillis(0));
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+"No metadata requests sent");
 client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
 // no error for no current position
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-assertEquals(0, client.inFlightRequestCount());
-
+if (groupProtocol == GroupProtocol.CLASSIC) {
+// Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+// different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+// on the next background thread poll.
+assertEquals(0, client.inFlightRequestCount());
+}
 // poll once again, which should send the list-offset request
 consumer.seek(tp0, 50L);
 consumer.poll(Duration.ofMillis(0));
 // requests: list-offset, fetch
-assertEquals(2, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> {
+boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+return hasListOffsetRequest && hasFetchRequest;
+}, "No list-offset & fetch request sent");
 
 // no error for no end offset (so unknown lag)
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
 // poll once again, which should return the list-offset response
 // and hence next call would return correct lag result
-client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
 consumer.poll(Duration.ofMillis(0));
 
-assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {
+OptionalLong result = consumer.currentLag(tp0);
+return result.isPresent() && result.getAsLong() == 40L;
+}, "Subscription state is not updated");
 // requests: fetch
-assertEquals(1, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
 // one successful fetch should update the log end offset and the 
position
+ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
 final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
 final ConsumerRecords records = 
(ConsumerRecords) consumer.poll(Duration.ofMillis(1));
 assertEquals(5, records.count());
 assertEquals(55L, consumer.position(tp0));
 
 // correct lag result
-assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {

Review Comment:
   Yes, you're right, if `consumer.position` can get `45, then the subscription 
state has already been updated. Remove `TestUtils.waitForCondition` here. 
Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 commented on code in PR #16982:
URL: https://github.com/apache/kafka/pull/16982#discussion_r1776899718


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -2486,68 +2492,96 @@ public void testCurrentLag(GroupProtocol groupProtocol) 
{
 
 // poll once to update with the current metadata
 consumer.poll(Duration.ofMillis(0));
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FIND_COORDINATOR),
+"No metadata requests sent");
 client.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, metadata.fetch().nodes().get(0)));
 
 // no error for no current position
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
-assertEquals(0, client.inFlightRequestCount());
-
+if (groupProtocol == GroupProtocol.CLASSIC) {
+// Classic consumer does not send the LIST_OFFSETS right away 
(requires an explicit poll),
+// different from the new async consumer, that will send the 
LIST_OFFSETS request in the background thread
+// on the next background thread poll.
+assertEquals(0, client.inFlightRequestCount());
+}
 // poll once again, which should send the list-offset request
 consumer.seek(tp0, 50L);
 consumer.poll(Duration.ofMillis(0));
 // requests: list-offset, fetch
-assertEquals(2, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> {
+boolean hasListOffsetRequest = requestGenerated(client, 
ApiKeys.LIST_OFFSETS);
+boolean hasFetchRequest = requestGenerated(client, ApiKeys.FETCH);
+return hasListOffsetRequest && hasFetchRequest;
+}, "No list-offset & fetch request sent");
 
 // no error for no end offset (so unknown lag)
 assertEquals(OptionalLong.empty(), consumer.currentLag(tp0));
 
 // poll once again, which should return the list-offset response
 // and hence next call would return correct lag result
-client.respond(listOffsetsResponse(singletonMap(tp0, 90L)));
+ClientRequest listOffsetRequest = findRequest(client, 
ApiKeys.LIST_OFFSETS);
+client.respondToRequest(listOffsetRequest, 
listOffsetsResponse(singletonMap(tp0, 90L)));
 consumer.poll(Duration.ofMillis(0));
 
-assertEquals(OptionalLong.of(40L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {
+OptionalLong result = consumer.currentLag(tp0);
+return result.isPresent() && result.getAsLong() == 40L;
+}, "Subscription state is not updated");
 // requests: fetch
-assertEquals(1, client.inFlightRequestCount());
+TestUtils.waitForCondition(() -> requestGenerated(client, 
ApiKeys.FETCH), "No fetch request sent");
 
 // one successful fetch should update the log end offset and the 
position
+ClientRequest fetchRequest = findRequest(client, ApiKeys.FETCH);
 final FetchInfo fetchInfo = new FetchInfo(1L, 99L, 50L, 5);
-client.respond(fetchResponse(singletonMap(tp0, fetchInfo)));
+client.respondToRequest(fetchRequest, fetchResponse(singletonMap(tp0, 
fetchInfo)));
 
 final ConsumerRecords records = 
(ConsumerRecords) consumer.poll(Duration.ofMillis(1));
 assertEquals(5, records.count());
 assertEquals(55L, consumer.position(tp0));
 
 // correct lag result
-assertEquals(OptionalLong.of(45L), consumer.currentLag(tp0));
+// For AsyncKafkaConsumer, subscription sate is updated in background, 
so the result will eventually be updated.
+TestUtils.waitForCondition(() -> {

Review Comment:
   Yes, you're right, if `consumer.position` can get `45`, then the 
subscription state has already been updated. Remove 
`TestUtils.waitForCondition` here. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17505: New consumer seekToBeginning/End should run in background thread [kafka]

2024-09-26 Thread via GitHub


lianetm commented on code in PR #17230:
URL: https://github.com/apache/kafka/pull/17230#discussion_r1776850858


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -842,8 +843,8 @@ public void seekToEnd(Collection 
partitions) {
 
 acquireAndEnsureOpen();
 try {
-Collection parts = partitions.isEmpty() ? 
subscriptions.assignedPartitions() : partitions;
-subscriptions.requestOffsetReset(parts, 
OffsetResetStrategy.LATEST);
+cachedSubscriptionHasAllFetchPositions = false;
+applicationEventHandler.add(new ResetOffsetEvent(partitions, 
OffsetResetStrategy.LATEST));

Review Comment:
   Hey here, sorry for the late reply. I agree that these 3 methods should have 
similar behaviour, but I wonder if the case is that they should all use 
`addAndGet`? My concern is, when seeking (either of the 3 methods), we do 
expect that as soon as they complete, the position is updated so that other 
funcs will see it (ex. currentLag, poll). We would be not respecting that 
anymore if we use add. seek could complete without having updated the offsets 
right?
   
   From the seek contract, it states that "Overrides the fetch offsets that the 
consumer will use on the next {@link #poll(Duration) poll(timeout)}", so we 
need to:
   1. override fetch offsets (this requires `addAndGet` I expect)
   2. make sure they are available on the next poll (again addAndGet on seek, 
otherwise consumer.seek + consumer.poll may not find the offsets if the poll 
happens when the `seek` with `add` is still being processed in the background 
right?). Also calls like currentLag after that seek + add could not have the 
info they need I guess.
   
   Thoughts?
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15266) Static configs set for non primary synonyms are ignored for Log configs

2024-09-26 Thread Kamal Chandraprakash (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kamal Chandraprakash resolved KAFKA-15266.
--
Resolution: Duplicate

> Static configs set for non primary synonyms are ignored for Log configs
> ---
>
> Key: KAFKA-15266
> URL: https://issues.apache.org/jira/browse/KAFKA-15266
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Aman Harish Gandhi
>Assignee: Aman Harish Gandhi
>Priority: Major
>
> In our server.properties we had the following config
> {code:java}
> log.retention.hours=48
> {code}
> We noticed that after running alter configs to update broker level config(for 
> a config unrelated to retention) we were only deleting data after 7 days 
> instead of the configured 2.
> The alterconfig we had ran was similar to this
> {code:java}
> sh kafka-config.sh --bootstrap-server localhost:9092 --alter --add-config 
> "log.segment.bytes=50"
> {code}
> Digging deeper the issue could be pin pointed to the reconfigure block of 
> DynamicLogConfig inside DynamicBrokerConfig. Here we only look at the 
> "primary" KafkaConfig synonym of the LogConfig and if it is not set then we 
> remove the value set in default log config as well. This eventually leads to 
> the retention.ms not being set in the default log config and that leads to 
> the default value of 7 days being used. The value set in 
> "log.retention.hours" is completely ignored in this case.
> Pasting the relevant code block here
> {code:java}
> newConfig.valuesFromThisConfig.forEach { (k, v) =>
>   if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
> DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
>   if (v == null)
>  newBrokerDefaults.remove(configName)
>   else
> newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
> }
>   }
> } {code}
> In the above block `DynamicLogConfig.ReconfigurableConfigs` contains only 
> log.retention.ms. It does not contain the other synonyms like 
> `log.retention.minutes` or `log.retention.hours`.
> This issue seems prevalent in all cases where there are more than 1 
> KafkaConfig synonyms for the LogConfig.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Refactored and fixed minor share fetch code (KIP-932) [kafka]

2024-09-26 Thread via GitHub


mumrah merged PR #17269:
URL: https://github.com/apache/kafka/pull/17269


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-17618: group consumer heartbeat interval should be less than session timeout [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 opened a new pull request, #17281:
URL: https://github.com/apache/kafka/pull/17281

   
[KIP-848](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session)
 mentions:
   > The member is expected to heartbeat every 
group.consumer.heartbeat.interval.ms in order to keep its session opened. If it 
does not heartbeat at least once within the group.consumer.session.timeout.ms, 
the group coordinator will kick the member out from the group.
   
   To avoid misconfiguration, we can add validation for it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


mumrah commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776991897


##
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/StateBatchUtil.java:
##
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.server.share.PersisterStateBatch;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+
+public class StateBatchUtil {
+/**
+ * Util method which takes in 2 lists containing {@link 
PersisterStateBatch}
+ * and the startOffset.
+ * This method removes any batches where the lastOffset < startOffset, if 
the startOffset > -1.
+ * It then merges any contiguous intervals with same state. If states 
differ,
+ * based on various conditions it creates new non-overlapping batches 
preferring new ones.
+ * @param batchesSoFar - List containing current soft state of {@link 
PersisterStateBatch}
+ * @param newBatches - List containing {@link PersisterStateBatch} in 
incoming request
+ * @param startOffset - startOffset to consider when removing old batches.
+ * @return List containing combined batches
+ */
+public static List combineStateBatches(
+List batchesSoFar,
+List newBatches,
+long startOffset
+) {
+List combinedList = new 
ArrayList<>(batchesSoFar.size() + newBatches.size());
+combinedList.addAll(batchesSoFar);
+combinedList.addAll(newBatches);
+
+return mergeBatches(
+pruneBatches(
+combinedList,
+startOffset
+)
+);
+}
+
+/**
+ * Encapsulates the main merge algorithm. Consider 2 batches (A, B):
+ * - Same state (delivery count and state)
+ *  - If overlapping - merge into single batch
+ *  - If contiguous (A.lastOffset + 1 == B.firstOffset) - merge batches 
into a single 1
+ * - Different state (delivery count or state differ)
+ *  - Based on various cases:
+ *  - swallow lower priority batch within bounds of offsets
+ *  - break batch into other non-overlapping batches
+ * @param batches - List of {@link PersisterStateBatch}
+ * @return List of non-overlapping {@link PersisterStateBatch}
+ */
+private static List 
mergeBatches(List batches) {
+if (batches.size() < 2) {
+return batches;
+}
+TreeSet sortedBatches = new TreeSet<>(batches);
+List finalBatches = new 
ArrayList<>(batches.size() * 2); // heuristic size
+
+BatchOverlapState overlapState = getOverlappingState(sortedBatches);
+
+while (overlapState != BatchOverlapState.SENTINEL) {
+PersisterStateBatch last = overlapState.last();
+PersisterStateBatch candidate = overlapState.candidate();
+
+// remove non overlapping prefix from sortedBatches,
+// will make getting next overlapping pair efficient
+// as a prefix batch which is non overlapping will only
+// be checked once.
+if (overlapState.nonOverlapping() != null) {
+overlapState.nonOverlapping().forEach(sortedBatches::remove);
+finalBatches.addAll(overlapState.nonOverlapping());
+}
+
+if (candidate == null) {
+overlapState = BatchOverlapState.SENTINEL;
+continue;
+}
+
+// remove both last and candidate for easier
+// assessment about adding batches to sortedBatches
+sortedBatches.remove(last);
+sortedBatches.remove(candidate);
+
+// overlap and same state (last.firstOffset <= 
candidate.firstOffset) due to sort
+// covers:
+// case:12  34  5  
 6  7 (contiguous)
+// last:__   ______  ___   ___ 
  ___
+// candidate:   __

Re: [PR] KAFKA-17612: Remove some tests that only apply to ZK mode or migration [kafka]

2024-09-26 Thread via GitHub


chia7712 merged PR #17276:
URL: https://github.com/apache/kafka/pull/17276


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17621) Reduce logging verbosity on ConsumerGroupHeartbeat path

2024-09-26 Thread David Jacot (Jira)
David Jacot created KAFKA-17621:
---

 Summary: Reduce logging verbosity on ConsumerGroupHeartbeat path
 Key: KAFKA-17621
 URL: https://issues.apache.org/jira/browse/KAFKA-17621
 Project: Kafka
  Issue Type: Sub-task
Reporter: David Jacot
Assignee: David Jacot






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout

2024-09-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17618:
--
Component/s: clients
 consumer

> group consumer heartbeat interval should be less than session timeout
> -
>
> Key: KAFKA-17618
> URL: https://issues.apache.org/jira/browse/KAFKA-17618
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848
>
> [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session]
>  mentions:
> bq. The member is expected to heartbeat every 
> group.consumer.heartbeat.interval.ms in order to keep its session opened. If 
> it does not heartbeat at least once within the 
> group.consumer.session.timeout.ms, the group coordinator will kick the member 
> out from the group.
> To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than 
> _group.consumer.session.timeout.ms_, we can add validation for it.
> We can do similar validation for _group.share.heartbeat.interval.ms_ and 
> _group.share.session.timeout.ms_ as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16343: Add unit tests of foreignKeyJoin classes [kafka]

2024-09-26 Thread via GitHub


wrwksexahatvani commented on PR #15564:
URL: https://github.com/apache/kafka/pull/15564#issuecomment-2376930948

   @wcarlson5 @mjsax In which release will these tests and of course the fix in 
KAFKA-16394 be integrated? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-17618) group consumer heartbeat interval should be less than session timeout

2024-09-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17618?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17618:
--
Component/s: (was: clients)
 (was: consumer)

> group consumer heartbeat interval should be less than session timeout
> -
>
> Key: KAFKA-17618
> URL: https://issues.apache.org/jira/browse/KAFKA-17618
> Project: Kafka
>  Issue Type: Task
>Reporter: PoAn Yang
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: kip-848
>
> [KIP-848|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-Heartbeat&Session]
>  mentions:
> bq. The member is expected to heartbeat every 
> group.consumer.heartbeat.interval.ms in order to keep its session opened. If 
> it does not heartbeat at least once within the 
> group.consumer.session.timeout.ms, the group coordinator will kick the member 
> out from the group.
> To avoid users configure _group.consumer.heartbeat.interval.ms_ bigger than 
> _group.consumer.session.timeout.ms_, we can add validation for it.
> We can do similar validation for _group.share.heartbeat.interval.ms_ and 
> _group.share.session.timeout.ms_ as well.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]

2024-09-26 Thread via GitHub


lianetm commented on PR #17165:
URL: https://github.com/apache/kafka/pull/17165#issuecomment-2377143597

   FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky 
consumer integration test that I noticed here, has been flaky for a while.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback

2024-09-26 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-17623:
--

 Summary: Flaky 
testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
 Key: KAFKA-17623
 URL: https://issues.apache.org/jira/browse/KAFKA-17623
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Lianet Magrans


Flaky for the new consumer, failing with :

org.apache.kafka.common.KafkaException: User rebalance callback throws an error 
at 
app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259)
 at 
app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867)
 at 
app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195)
 at 
app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181)
 at 
app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758)
 at 
app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618)

...

Caused by: java.lang.IllegalStateException: No current assignment for partition 
topic-0 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378)
 at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395)
 at 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425)
 at 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147)
 at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171)

 

Flaky behaviour:

 

https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]

2024-09-26 Thread via GitHub


FrankYang0529 commented on PR #17165:
URL: https://github.com/apache/kafka/pull/17165#issuecomment-2377164009

   > FYI, I filed https://issues.apache.org/jira/browse/KAFKA-17623 for a flaky 
consumer integration test that I noticed here, has been flaky for a while.
   
   Okay. I will take a look. Thanks for filing the issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback

2024-09-26 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885047#comment-17885047
 ] 

PoAn Yang commented on KAFKA-17623:
---

Hi [~lianetm], may I take this? Thank you.

> Flaky 
> testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
> 
>
> Key: KAFKA-17623
> URL: https://issues.apache.org/jira/browse/KAFKA-17623
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Lianet Magrans
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Flaky for the new consumer, failing with :
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error at 
> app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618)
> ...
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition topic-0 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171)
>  
> Flaky behaviour:
>  
> https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] (WIP) MINOR: Cache topic resolution in TopicIds set [kafka]

2024-09-26 Thread via GitHub


dajac closed pull request #16527: (WIP) MINOR: Cache topic resolution in 
TopicIds set
URL: https://github.com/apache/kafka/pull/16527


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-17317: Validate and maybe trigger downgrade after static member replacement [kafka]

2024-09-26 Thread via GitHub


dongnuo123 closed pull request #17008: KAFKA-17317: Validate and maybe trigger 
downgrade after static member replacement
URL: https://github.com/apache/kafka/pull/17008


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback

2024-09-26 Thread Lianet Magrans (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885049#comment-17885049
 ] 

Lianet Magrans commented on KAFKA-17623:


Sure, thanks!

> Flaky 
> testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
> 
>
> Key: KAFKA-17623
> URL: https://issues.apache.org/jira/browse/KAFKA-17623
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Flaky for the new consumer, failing with :
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error at 
> app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618)
> ...
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition topic-0 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171)
>  
> Flaky behaviour:
>  
> https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-17317: Validate and maybe trigger downgrade after static member replacement [kafka]

2024-09-26 Thread via GitHub


dongnuo123 opened a new pull request, #17286:
URL: https://github.com/apache/kafka/pull/17286

   https://issues.apache.org/jira/browse/KAFKA-17317
   
   This patch makes the online downgrade trigger asynchronous by scheduling a 
timer to downgrade the group in the appendFuture of the coordinator result. To 
avoid duplication, the rebalance is only triggered after the downgrade 
conversion but not after the last consumer protocol member leaves the group. If 
the downgrade is triggered by static member replacement, no rebalance will be 
triggered.
   
   This also fixes the bug discovered in the system test, where the group can't 
be downgraded when the last static member using the consumer protocol is 
replaced by a classic member with the same instance id.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] (WIP) MINOR: Cache topic resolution in TopicIds set [kafka]

2024-09-26 Thread via GitHub


dajac commented on PR #16527:
URL: https://github.com/apache/kafka/pull/16527#issuecomment-2377167228

   Replaced by https://github.com/apache/kafka/pull/17285.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-17623) Flaky testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback

2024-09-26 Thread Lianet Magrans (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans reassigned KAFKA-17623:
--

Assignee: PoAn Yang

> Flaky 
> testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback
> 
>
> Key: KAFKA-17623
> URL: https://issues.apache.org/jira/browse/KAFKA-17623
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Lianet Magrans
>Assignee: PoAn Yang
>Priority: Major
>  Labels: consumer-threading-refactor
>
> Flaky for the new consumer, failing with :
> org.apache.kafka.common.KafkaException: User rebalance callback throws an 
> error at 
> app//org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:259)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.invokeRebalanceCallbacks(AsyncKafkaConsumer.java:1867)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:195)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer$BackgroundEventProcessor.process(AsyncKafkaConsumer.java:181)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.processBackgroundEvents(AsyncKafkaConsumer.java:1758)
>  at 
> app//org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.updateAssignmentMetadataIfNeeded(AsyncKafkaConsumer.java:1618)
> ...
> Caused by: java.lang.IllegalStateException: No current assignment for 
> partition topic-0 at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:378)
>  at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:395)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:425)
>  at 
> org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor.process(ApplicationEventProcessor.java:147)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.processApplicationEvents(ConsumerNetworkThread.java:171)
>  
> Flaky behaviour:
>  
> https://ge.apache.org/scans/tests?search.buildOutcome=failure&search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172740959&search.startTimeMin=172248480&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=integration.kafka.api.PlaintextConsumerCallbackTest&tests.test=testSeekPositionAndPauseNewlyAssignedPartitionOnPartitionsAssignedCallback(String%2C%20String)%5B3%5D



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-17367: Share coordinator impl. New merge batches algorithm. [3/N] [kafka]

2024-09-26 Thread via GitHub


mumrah commented on code in PR #17149:
URL: https://github.com/apache/kafka/pull/17149#discussion_r1776956765


##
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/StateBatchUtilTest.java:
##
@@ -60,36 +60,60 @@ static class BatchTestHolder {
 this.shouldRun = shouldRun;
 }
 
+static List singleBatch(
+long firstOffset,
+long lastOffset,
+byte deliveryState,
+short deliveryCount

Review Comment:
   I would let these be `int` so we can avoid casting in the test code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Fix a race and add JMH bench for HdrHistogram [kafka]

2024-09-26 Thread via GitHub


jeffkbkim commented on code in PR #17221:
URL: https://github.com/apache/kafka/pull/17221#discussion_r1776938178


##
coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java:
##
@@ -172,4 +178,39 @@ public void testHistogramDataReset() {
 assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + 
maxSnapshotAgeMs));
 assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + 
maxSnapshotAgeMs));
 }
+
+@Test
+public void testLatestHistogramRace() throws InterruptedException, 
ExecutionException {

Review Comment:
   nit: how's testLatestHistogramShouldNotReturnUnfilledHistogram?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   3   >